[MOD/WIP] Parallel awaiting of notifications
Some checks are pending
Compile Telodendria / Compile Telodendria (x86, alpine-v3.19) (push) Waiting to run
Compile Telodendria / Compile Telodendria (x86, debian-v12.4) (push) Waiting to run
Compile Telodendria / Compile Telodendria (x86, freebsd-v14.0) (push) Waiting to run
Compile Telodendria / Compile Telodendria (x86, netbsd-v9.3) (push) Waiting to run
Compile Telodendria / Compile Telodendria (x86_64, alpine-v3.19) (push) Waiting to run
Compile Telodendria / Compile Telodendria (x86_64, debian-v12.4) (push) Waiting to run
Compile Telodendria / Compile Telodendria (x86_64, freebsd-v14.0) (push) Waiting to run
Compile Telodendria / Compile Telodendria (x86_64, netbsd-v9.3) (push) Waiting to run
Compile Telodendria / Compile Telodendria (x86_64, openbsd-v7.4) (push) Waiting to run

Passed the very basic test with the tanuki account, over two sessions.
This commit is contained in:
LDA 2024-08-16 19:02:57 +02:00
parent acd1bf0ec0
commit dc0e463c27

View file

@ -1665,28 +1665,44 @@ void
UserNotifyUser(User *user) UserNotifyUser(User *user)
{ {
NotificationEntry *entry; NotificationEntry *entry;
Array *entries;
size_t size, i;
if (!user || !pushTable) if (!user || !pushTable)
{ {
return; return;
} }
pthread_mutex_lock(&pushLock); pthread_mutex_lock(&pushLock);
entry = HashMapGet(pushTable, user->name); entries = HashMapGet(pushTable, user->name);
size = ArraySize(entries);
entry = ArrayGet(entries, 0);
if (entry && entry->type == NOTIF_AWAIT) if (entry && entry->type == NOTIF_AWAIT)
{ {
pthread_mutex_lock(&entry->lock); /* First element being an await -> wake up everyone */
entry->notified = true; for (i = 0; i < size; i++)
pthread_cond_signal(&entry->cond); {
pthread_mutex_unlock(&entry->lock); entry = ArrayGet(entries, i);
pthread_mutex_lock(&entry->lock);
entry->notified = true;
pthread_cond_signal(&entry->cond);
pthread_mutex_unlock(&entry->lock);
}
pthread_mutex_unlock(&pushLock); pthread_mutex_unlock(&pushLock);
return; return;
} }
else if (!entry) else if (!entry)
{ {
/* No elements in the awaits, create a notification note */
if (!entries)
{
entries = ArrayCreate();
}
entry = Malloc(sizeof(*entry)); entry = Malloc(sizeof(*entry));
entry->type = NOTIF_GOTTEN; entry->type = NOTIF_GOTTEN;
ArrayAdd(entries, entry);
HashMapSet(pushTable, user->name, entry); HashMapSet(pushTable, user->name, entries);
pthread_mutex_unlock(&pushLock); pthread_mutex_unlock(&pushLock);
return; return;
} }
@ -1698,12 +1714,29 @@ UserNotifyUser(User *user)
void void
UserDestroyPushTable(void) UserDestroyPushTable(void)
{ {
char *key;
Array *value;
if (!pushTable) if (!pushTable)
{ {
return; return;
} }
pthread_mutex_lock(&pushLock); pthread_mutex_lock(&pushLock);
while (HashMapIterate(pushTable, &key, (void **) &value))
{
size_t i;
for (i = 0; i < ArraySize(value); i++)
{
NotificationEntry *entry = ArrayGet(value, i);
/* Should we use the Memory API? */
if (entry->type == NOTIF_GOTTEN)
{
Free(entry);
}
}
ArrayFree(value);
}
HashMapFree(pushTable); HashMapFree(pushTable);
pushTable = NULL; pushTable = NULL;
pthread_mutex_unlock(&pushLock); pthread_mutex_unlock(&pushLock);
@ -1714,9 +1747,11 @@ bool
UserAwaitNotification(char *user, int await) UserAwaitNotification(char *user, int await)
{ {
NotificationEntry *entry, ownEntry; NotificationEntry *entry, ownEntry;
Array *entries;
struct timespec timeout; struct timespec timeout;
int code; int code;
bool timedout = false, notified = false; bool timedout = false, notified = false;
size_t i;
if (!user) if (!user)
{ {
return false; return false;
@ -1731,25 +1766,33 @@ UserAwaitNotification(char *user, int await)
pthread_mutex_lock(&pushLock); pthread_mutex_lock(&pushLock);
/* Check if we got any notifications yet. */ /* Check if we got any notifications yet. */
entry = HashMapGet(pushTable, user); entries = HashMapGet(pushTable, user);
entry = ArrayGet(entries, 0);
if (entry && entry->type == NOTIF_GOTTEN) if (entry && entry->type == NOTIF_GOTTEN)
{ {
/* Got a notification entry already. */ /* Got a notification entry already. */
Free(entry); entries = HashMapDelete(pushTable, user);
HashMapDelete(pushTable, user); for (i = 0; i < ArraySize(entries); i++)
{
NotificationEntry *entry = ArrayGet(entries, i);
/* Should we use the Memory API? */
if (entry->type == NOTIF_GOTTEN)
{
Free(entry);
}
}
ArrayFree(entries);
pthread_mutex_unlock(&pushLock); pthread_mutex_unlock(&pushLock);
return true; return true;
} }
else if (entry)
{
/* Another thread's awaiting... TODO: Manage these conditions. */
Log(LOG_ERR,
"Unimplemented feature: awaiting for other threads."
);
pthread_mutex_unlock(&pushLock); if (!entries)
return false; {
entries = ArrayCreate();
HashMapSet(pushTable, user, entries);
} }
/* No one's waiting or notifying; let's create our own entry, /* No one's waiting or notifying; let's create our own entry,
@ -1761,7 +1804,7 @@ UserAwaitNotification(char *user, int await)
entry->notified = false; entry->notified = false;
pthread_mutex_init(&entry->lock, NULL); pthread_mutex_init(&entry->lock, NULL);
pthread_cond_init(&entry->cond, NULL); pthread_cond_init(&entry->cond, NULL);
HashMapSet(pushTable, user, entry); ArrayAdd(entries, entry);
pthread_mutex_unlock(&pushLock); pthread_mutex_unlock(&pushLock);
/* Now, it's time for us to wait. */ /* Now, it's time for us to wait. */
@ -1794,7 +1837,15 @@ UserAwaitNotification(char *user, int await)
notified = !timedout && entry->notified; notified = !timedout && entry->notified;
pthread_mutex_lock(&pushLock); pthread_mutex_lock(&pushLock);
HashMapDelete(pushTable, user); for (i = 0; i < ArraySize(entries); i++)
{
NotificationEntry *subEntry = ArrayGet(entries, i);
if (subEntry == entry)
{
ArrayDelete(entries, i);
break;
}
}
pthread_mutex_unlock(&pushLock); pthread_mutex_unlock(&pushLock);
return notified; return notified;