[WIP] Moar filtering, cond variables with timeout
Some checks are pending
Compile Telodendria / Compile Telodendria (x86, alpine-v3.19) (push) Waiting to run
Compile Telodendria / Compile Telodendria (x86, debian-v12.4) (push) Waiting to run
Compile Telodendria / Compile Telodendria (x86, freebsd-v14.0) (push) Waiting to run
Compile Telodendria / Compile Telodendria (x86, netbsd-v9.3) (push) Waiting to run
Compile Telodendria / Compile Telodendria (x86_64, alpine-v3.19) (push) Waiting to run
Compile Telodendria / Compile Telodendria (x86_64, debian-v12.4) (push) Waiting to run
Compile Telodendria / Compile Telodendria (x86_64, freebsd-v14.0) (push) Waiting to run
Compile Telodendria / Compile Telodendria (x86_64, netbsd-v9.3) (push) Waiting to run
Compile Telodendria / Compile Telodendria (x86_64, openbsd-v7.4) (push) Waiting to run

Still isn't _too_ tested, hmph.
This commit is contained in:
LDA 2024-07-23 19:49:32 +02:00
parent 55652eaa15
commit 123aa0b23a
4 changed files with 210 additions and 61 deletions

View file

@ -32,42 +32,95 @@
#include <Schema/Filter.h> #include <Schema/Filter.h>
/* Verifies whenever an item passes through a set of blacklisted and
* whitelisted groups(e.g "not_rooms"/"rooms"), and makes the calling
* function return true/false if it is explicitely in a filtered list,
* or does nothing otherwise, to allow for chaining multiple filters. */
#define DAFiltered(blacklist, whitelist, item) \
do \
{ \
size_t i, count; \
count = ArraySize(blacklist); \
for (i = 0; i < count; i++) \
{ \
char *notItem = ArrayGet(blacklist, i); \
if (StrEquals(item, notItem)) \
{ \
return true; \
} \
} \
count = ArraySize(whitelist); \
if (count) \
{ \
for (i = 0; i < count; i++) \
{ \
char *yesItem = ArrayGet(whitelist, i); \
if (StrEquals(yesItem, item)) \
{ \
return false; \
} \
} \
return true; \
} \
} while (0)
static bool
IsSenderFiltered(Filter *filter, char *senderID)
{
if (!filter || !senderID)
{
return false;
}
DAFiltered(
filter->room.account_data.not_senders,
filter->room.account_data.senders,
senderID
);
DAFiltered(
filter->room.state.not_senders,
filter->room.state.senders,
senderID
);
DAFiltered(
filter->room.ephemeral.not_senders,
filter->room.ephemeral.senders,
senderID
);
return false;
}
bool bool
IsRoomFiltered(Filter *filter, char *roomID) IsRoomFiltered(Filter *filter, char *roomID)
{ {
size_t i, count;
if (!filter || !roomID) if (!filter || !roomID)
{ {
return false; return false;
} }
count = ArraySize(filter->room.not_rooms); DAFiltered(
for (i = 0; i < count; i++) filter->room.not_rooms,
{ filter->room.rooms,
char *notRoom = ArrayGet(filter->room.not_rooms, i); roomID
if (StrEquals(roomID, notRoom)) );
{ DAFiltered(
return true; filter->room.account_data.not_rooms,
} filter->room.account_data.rooms,
} roomID
);
count = ArraySize(filter->room.rooms); DAFiltered(
if (count) filter->room.state.not_rooms,
{ filter->room.state.rooms,
for (i = 0; i < count; i++) roomID
{ );
char *room = ArrayGet(filter->room.rooms, i); DAFiltered(
if (StrEquals(roomID, room)) filter->room.ephemeral.not_rooms,
{ filter->room.ephemeral.rooms,
return false; roomID
} );
}
return true;
}
return false; return false;
} }
/* TODO: MORE FILTERS! */
HashMap * HashMap *
FilterApply(Filter * filter, HashMap * event) FilterApply(Filter * filter, HashMap * event)
{ {
@ -86,15 +139,17 @@ FilterApply(Filter * filter, HashMap * event)
sender = JsonValueAsString(HashMapGet(event, "sender")); sender = JsonValueAsString(HashMapGet(event, "sender"));
room = JsonValueAsString(HashMapGet(event, "room_id")); room = JsonValueAsString(HashMapGet(event, "room_id"));
if (IsRoomFiltered(filter, room)) if (IsRoomFiltered(filter, room) ||
IsSenderFiltered(filter, sender))
{ {
return NULL; return NULL;
} }
copy = JsonDuplicate(event); copy = JsonDuplicate(event);
(void) sender;
return copy; return copy;
} }
Filter * Filter *
FilterDecode(Db *db, char *user, char *filterStr) FilterDecode(Db *db, char *user, char *filterStr)
{ {

View file

@ -138,23 +138,9 @@ ROUTE_IMPL(RouteSync, path, argp)
else if (timeout && timeoutDuration) else if (timeout && timeoutDuration)
{ {
char *name = StrDuplicate(UserGetName(user)); char *name = StrDuplicate(UserGetName(user));
int passed = 0;
UserUnlock(user); UserUnlock(user);
UserAwaitNotification(name, timeoutDuration);
/* TODO: Using pthreads' condition variables with a timeout would be
* the best way to proceed (as in Parsee's ParseeAwaitStanza function).
* Also, how would Cytoplasm proceed if the all of HTTP threads are used
* for syncing? That sounds like a really bad bottleneck... */
while (passed < timeoutDuration)
{
if (UserGetNotification(name))
{
break;
}
UtilSleepMillis(100);
passed += 100;
}
Free(name); Free(name);
user = UserAuthenticate(db, token); user = UserAuthenticate(db, token);
} }

View file

@ -34,8 +34,9 @@
#include <Parser.h> #include <Parser.h>
#include <Room.h> #include <Room.h>
#include <string.h>
#include <pthread.h> #include <pthread.h>
#include <string.h>
#include <errno.h>
struct User struct User
{ {
@ -1637,6 +1638,16 @@ UserSyncExists(User *user, char *sync)
return DbLock(user->db, 4, "users", user->name, "sync", sync); return DbLock(user->db, 4, "users", user->name, "sync", sync);
} }
typedef struct NotificationEntry {
enum {
NOTIF_AWAIT,
NOTIF_GOTTEN
} type;
pthread_mutex_t lock;
pthread_cond_t cond;
bool notified;
} NotificationEntry;
extern void extern void
UserInitialisePushTable(void) UserInitialisePushTable(void)
@ -1655,29 +1666,34 @@ UserInitialisePushTable(void)
void void
UserNotifyUser(User *user) UserNotifyUser(User *user)
{ {
NotificationEntry *entry;
if (!user || !pushTable) if (!user || !pushTable)
{ {
return; return;
} }
pthread_mutex_lock(&pushLock); pthread_mutex_lock(&pushLock);
HashMapSet(pushTable, user->name, user->name); entry = HashMapGet(pushTable, user->name);
if (entry && entry->type == NOTIF_AWAIT)
{
pthread_mutex_lock(&entry->lock);
entry->notified = true;
pthread_cond_signal(&entry->cond);
pthread_mutex_unlock(&entry->lock);
pthread_mutex_unlock(&pushLock); pthread_mutex_unlock(&pushLock);
return;
}
else if (!entry)
{
entry = Malloc(sizeof(*entry));
entry->type = NOTIF_GOTTEN;
pthread_mutex_unlock(&pushLock);
return;
} }
bool /* There was already a notification on the line. */
UserGetNotification(char *user)
{
bool ret;
if (!user || !pushTable)
{
return false;
}
pthread_mutex_lock(&pushLock);
ret = !!HashMapGet(pushTable, user);
HashMapDelete(pushTable, user);
pthread_mutex_unlock(&pushLock); pthread_mutex_unlock(&pushLock);
return ret;
} }
void void
@ -1694,3 +1710,94 @@ UserDestroyPushTable(void)
pthread_mutex_unlock(&pushLock); pthread_mutex_unlock(&pushLock);
pthread_mutex_destroy(&pushLock); pthread_mutex_destroy(&pushLock);
} }
bool
UserAwaitNotification(char *user, int await)
{
NotificationEntry *entry, ownEntry;
struct timespec timeout;
int code;
bool timedout = false, notified = false;
if (!user)
{
return false;
}
if (await < 0)
{
/* 30 seconds */
await = 30000;
}
pthread_mutex_lock(&pushLock);
/* Check if we got any notifications yet. */
entry = HashMapGet(pushTable, user);
if (entry && entry->type == NOTIF_GOTTEN)
{
/* Got a notification entry already. */
Free(entry);
HashMapDelete(pushTable, user);
pthread_mutex_unlock(&pushLock);
return true;
}
else if (entry)
{
/* Another thread's awaiting...
* TODO: Manage these conditions. */
Log(LOG_ERR,
"Unimplemented feature: awaiting for other threads."
);
pthread_mutex_unlock(&pushLock);
return false;
}
/* No one's waiting or notifying; let's create our own entry,
* and await for something(NOTE that we're not allocating from
* the heap, since I've noticed some strange behaviour with
* Cytoplasm on ARM64... This is a hack and deserves to be fixed). */
entry = &ownEntry;
entry->type = NOTIF_AWAIT;
entry->notified = false;
pthread_mutex_init(&entry->lock, NULL);
pthread_cond_init(&entry->cond, NULL);
HashMapSet(pushTable, user, entry);
pthread_mutex_unlock(&pushLock);
/* Now, it's time for us to wait. */
clock_gettime(CLOCK_REALTIME, &timeout);
timeout.tv_sec += await / 1000;
timeout.tv_nsec += (await % 1000) * 1000000;
if (timeout.tv_nsec > 999999999)
{
uint64_t driftSeconds = timeout.tv_nsec / 1000000000;
uint64_t driftMicros = timeout.tv_nsec % 1000000000;
timeout.tv_nsec = driftMicros;
timeout.tv_sec += driftSeconds;
}
pthread_mutex_lock(&entry->lock);
while (!entry->notified)
{
code = pthread_cond_timedwait(&entry->cond, &entry->lock, &timeout);
if ((code == ETIMEDOUT))
{
timedout = true;
break;
}
}
pthread_mutex_unlock(&entry->lock);
pthread_mutex_destroy(&entry->lock);
pthread_cond_destroy(&entry->cond);
notified = !timedout && entry->notified;
pthread_mutex_lock(&pushLock);
HashMapDelete(pushTable, user);
pthread_mutex_unlock(&pushLock);
return notified;
}

View file

@ -471,10 +471,11 @@ extern void UserInitialisePushTable(void);
extern void UserNotifyUser(User *); extern void UserNotifyUser(User *);
/** /**
* Verifies if the user has been notified, and if it is, then * Waits for a notification for a specific user for a specific
* removes the notification * duration in milliseconds to wait (with 30000ms being the default,
* if the value is negative), and returns true if anything came up.
*/ */
extern bool UserGetNotification(char *); extern bool UserAwaitNotification(char *, int);
/** /**
* Destroys the temporary push table created by * Destroys the temporary push table created by