From e2d26c7f33190c85b04c320ce8f62c9484203ca6 Mon Sep 17 00:00:00 2001 From: LDA Date: Fri, 14 Jun 2024 23:30:35 +0200 Subject: [PATCH] [ADD/WIP] Start /messages, sync timeouts, cleaning This code _really_ isn't good. I've spotted some fun racing happen and leak of some JSON(somehow?) without timeout, and I've been trying to spend the last several days trying to fix that. The good news is that, with some endpoint spoofing, you can actually _try_ this with a Matrix client(dev.cinny.in for example). I might consider rewriting this pastacode, considering the issues noted above in favour of a better design, with less odds of this. --- Schema/PduV1.json | 4 + Schema/SyncResponse.json | 19 +- src/Main.c | 27 +++ src/Room.c | 70 +++++-- src/Routes.c | 3 + src/Routes/RouteCreateRoom.c | 2 - src/Routes/RouteFilter.c | 2 +- src/Routes/RouteJoinRoomAlias.c | 6 +- src/Routes/RouteMessages.c | 178 ++++++++++++++++++ src/Routes/RoutePushRules.c | 129 +++++++++++++ src/Routes/RouteSendEvent.c | 117 ++++++++++++ src/Routes/RouteSync.c | 184 ++++++++++++++----- src/User.c | 313 +++++++++++++++++++++++++++++++- src/include/Routes.h | 7 +- src/include/User.h | 66 +++++++ 15 files changed, 1040 insertions(+), 87 deletions(-) create mode 100644 src/Routes/RouteMessages.c create mode 100644 src/Routes/RoutePushRules.c diff --git a/Schema/PduV1.json b/Schema/PduV1.json index 47b4494..6a46feb 100644 --- a/Schema/PduV1.json +++ b/Schema/PduV1.json @@ -16,6 +16,10 @@ "fields": { "age": { "type": "integer" + }, + "next_events": { + "type": "[string]", + "required": false } } }, diff --git a/Schema/SyncResponse.json b/Schema/SyncResponse.json index a75f712..c4990bc 100644 --- a/Schema/SyncResponse.json +++ b/Schema/SyncResponse.json @@ -21,13 +21,19 @@ }, "type": "struct" }, - "StrippedStateEvent": { + "InviteState": { "fields": { - "events": { "type": "[Event]" } + "events": { "type": "[StrippedStateEvent]" } }, "type": "struct" }, - "InviteState": { + "States": { + "fields": { + "events": { "type": "[StrippedStateEvent]" } + }, + "type": "struct" + }, + "StrippedStateEvent": { "fields": { "content": { "type": "object", @@ -75,14 +81,15 @@ }, "JoinedRooms": { "fields": { - "timeline": { "type": "Timeline" } + "timeline": { "type": "Timeline" }, + "state": { "type": "States" } }, "type": "struct" }, "Rooms": { "fields": { - "invite": { "type": "InvitedRooms" }, - "join": { "type": "JoinedRooms" } + "invite": { "type": "object" }, + "join": { "type": "object" } }, "type": "struct" }, diff --git a/src/Main.c b/src/Main.c index a4e2880..3d90a32 100644 --- a/src/Main.c +++ b/src/Main.c @@ -86,6 +86,26 @@ SignalHandler(int signal) } } +/* TODO: Move this! */ +static void +UsersClean(MatrixHttpHandlerArgs *args) +{ + Db *db = args->db; + Array *arr = DbList(db, 1, "users"); + size_t i; + + for (i = 0; i < ArraySize(arr); i++) + { + char *id = ArrayGet(arr, i); + User *user = UserLock(db, id); + + UserCleanTemporaryData(user); + UserUnlock(user); + } + + DbListFree(arr); +} + typedef enum ArgFlag { ARG_VERSION = (1 << 0), @@ -147,6 +167,7 @@ start: token = NULL; memset(&matrixArgs, 0, sizeof(matrixArgs)); + UserInitialisePushTable(); if (!LogConfigGlobal()) { @@ -513,10 +534,15 @@ start: Log(LOG_DEBUG, "Registering jobs..."); CronEvery(cron, 30 * 60 * 1000, (JobFunc *) UiaCleanup, &matrixArgs); + CronEvery(cron, 5 * 60 * 1000, (JobFunc *) UsersClean, &matrixArgs); Log(LOG_NOTICE, "Starting job scheduler..."); CronStart(cron); + /* We still call it anyways to be sure. */ + Log(LOG_NOTICE, "Cleaning up user data..."); + UsersClean(&matrixArgs); + Log(LOG_NOTICE, "Building routing tree..."); matrixArgs.router = RouterBuild(); if (!matrixArgs.router) @@ -583,6 +609,7 @@ start: finish: Log(LOG_NOTICE, "Shutting down..."); + UserDestroyPushTable(); if (httpServers) { for (i = 0; i < ArraySize(httpServers); i++) diff --git a/src/Room.c b/src/Room.c index 2752c8a..f61a2af 100644 --- a/src/Room.c +++ b/src/Room.c @@ -52,6 +52,7 @@ #define IsState(p, typ, key) (StrEquals(p->type, typ) && \ StrEquals(p->state_key, key)) + struct Room { Db *db; @@ -1091,22 +1092,25 @@ AuthorizeJoinMembershipV1(Room * room, PduV1 pdu, HashMap *state) { /* Interperet prev properly, as a list of JsonObjects. */ char *prev_id = JsonValueAsString(ArrayGet(prev, 0)); - char *ignored; HashMap *prev_event = RoomEventFetch(room, prev_id); - PduV1 prev_pdu; + bool flag = false; - if (prev && PduV1FromJson(prev_event, &prev_pdu, &ignored)) + if (prev_event) { - if (StrEquals(prev_pdu.type, "m.room.create") && - StrEquals(prev_pdu.sender, pdu.state_key)) + char *type = JsonValueAsString(HashMapGet(prev_event, "type")); + char *sender = JsonValueAsString(HashMapGet(prev_event, "sender")); + if (StrEquals(type, "m.room.create") && + StrEquals(sender, pdu.state_key)) { - PduV1Free(&prev_pdu); - JsonFree(prev_event); - return true; + flag = true; } - PduV1Free(&prev_pdu); } JsonFree(prev_event); + + if (flag) + { + return true; + } } /* Step 5.2.2: If the sender does not match state_key, reject. */ @@ -1633,6 +1637,9 @@ RoomEventSendV1(Room * room, HashMap * event) * errorr status to the user. */ goto finish; } + StateFree(state); + state = NULL; + RoomAddEventV1(room, pdu); valid = true; @@ -1758,6 +1765,7 @@ RoomAddEventV1(Room *room, PduV1 pdu) JsonValue *leaves_val; char *safe_id; size_t i; + if (!room || room->version >= 3) { return false; @@ -1802,7 +1810,7 @@ RoomAddEventV1(Room *room, PduV1 pdu) pdu_json = PduV1ToJson(&pdu); DbJsonSet(event_ref, pdu_json); - + ArrayAdd(leaves, JsonValueObject(pdu_json)); leaves_json = JsonDuplicate(leaves_json); JsonValueFree(HashMapDelete(leaves_json, "leaves")); @@ -1812,12 +1820,39 @@ RoomAddEventV1(Room *room, PduV1 pdu) DbUnlock(room->db, event_ref); - /* TODO: Store DAG relationships, somehow. */ - pdu_json = PduV1ToJson(&pdu); + for (i = 0; i < ArraySize(prev_events); i++) { - HashMap *state = StateCurrent(room); + JsonValue *event_val = ArrayGet(prev_events, i); + char *id = JsonValueAsString(event_val); + char *error = NULL; + PduV1 prev_pdu = { 0 }; + HashMap *prev_object = NULL; + Array *next_events = NULL; + event_ref = DbLock(room->db, 4, "rooms", room->id, "events", id); + PduV1FromJson(DbJson(event_ref), &prev_pdu, &error); + + if (!prev_pdu._unsigned.next_events) + { + prev_pdu._unsigned.next_events = ArrayCreate(); + } + next_events = prev_pdu._unsigned.next_events; + + ArrayAdd(next_events, StrDuplicate(pdu.event_id)); + + prev_object = PduV1ToJson(&prev_pdu); + DbJsonSet(event_ref, prev_object); + + JsonFree(prev_object); + PduV1Free(&prev_pdu); + DbUnlock(room->db, event_ref); + } + + { + HashMap *state; char *type, *state_key, *event_id; + pdu_json = PduV1ToJson(&pdu); + if (StrEquals(pdu.type, "m.room.member")) { CommonID *id = UserIdParse(pdu.state_key, NULL); @@ -1846,18 +1881,16 @@ RoomAddEventV1(Room *room, PduV1 pdu) UserIdFree(id); UserUnlock(user); } - + state = StateCurrent(room); while (StateIterate(state, &type, &state_key, (void **) &event_id)) { if (StrEquals(type, "m.room.member")) { - /* Notify state_key about it. - * Does nothing if the user is not already in the room. */ CommonID *id = UserIdParse(state_key, NULL); User *user = UserLock(room->db, id->local); UserPushEvent(user, pdu_json); - + UserIdFree(id); UserUnlock(user); } @@ -1865,8 +1898,9 @@ RoomAddEventV1(Room *room, PduV1 pdu) Free(state_key); } StateFree(state); + + JsonFree(pdu_json); } - JsonFree(pdu_json); return true; } diff --git a/src/Routes.c b/src/Routes.c index aac7343..76b9491 100644 --- a/src/Routes.c +++ b/src/Routes.c @@ -77,8 +77,10 @@ RouterBuild(void) R("/_matrix/client/v3/user/(.*)/filter/(.*)", RouteFilter); R("/_matrix/client/v3/rooms/(.*)/send/(.*)/(.*)", RouteSendEvent); + R("/_matrix/client/v3/rooms/(.*)/state/(.*)/(.*)", RouteSendState); R("/_matrix/client/v3/rooms/(.*)/event/(.*)", RouteFetchEvent); R("/_matrix/client/v3/rooms/(.*)/join", RouteJoinRoom); + R("/_matrix/client/v3/rooms/(.*)/messages", RouteMessages); R("/_matrix/client/v3/join/(.*)", RouteJoinRoomAlias); @@ -91,6 +93,7 @@ RouterBuild(void) R("/_matrix/client/v3/directory/room/(.*)", RouteAliasDirectory); R("/_matrix/client/v3/rooms/(.*)/aliases", RouteRoomAliases); + /* Telodendria Admin API Routes */ R("/_telodendria/admin/v1/(restart|shutdown|stats)", RouteProcControl); diff --git a/src/Routes/RouteCreateRoom.c b/src/Routes/RouteCreateRoom.c index 376b498..beacfba 100644 --- a/src/Routes/RouteCreateRoom.c +++ b/src/Routes/RouteCreateRoom.c @@ -104,8 +104,6 @@ ROUTE_IMPL(RouteCreateRoom, path, argp) JsonFree(request); request = NULL; - Log(LOG_INFO, "Creating room..."); - if (!(room = RoomCreate(db, user, &parsed, server))) { err = "Couldn't create room."; diff --git a/src/Routes/RouteFilter.c b/src/Routes/RouteFilter.c index c8f4fa8..55bfc32 100644 --- a/src/Routes/RouteFilter.c +++ b/src/Routes/RouteFilter.c @@ -189,7 +189,7 @@ ROUTE_IMPL(RouteFilter, path, argp) DbJsonSet(ref, filterJson); DbUnlock(db, ref); - Free(filterJson); + JsonFree(filterJson); response = HashMapCreate(); HashMapSet(response, "filter_id", JsonValueString(filterId)); diff --git a/src/Routes/RouteJoinRoomAlias.c b/src/Routes/RouteJoinRoomAlias.c index c074f01..51b9be4 100644 --- a/src/Routes/RouteJoinRoomAlias.c +++ b/src/Routes/RouteJoinRoomAlias.c @@ -84,6 +84,7 @@ ROUTE_IMPL(RouteJoinRoomAlias, path, argp) if (*roomId != '!') { roomId = RoomResolveAlias(db, roomId); + Log(LOG_NOTICE, "Here's my guess: %s", roomId); } else { @@ -126,6 +127,7 @@ ROUTE_IMPL(RouteJoinRoomAlias, path, argp) goto finish; } + Log(LOG_INFO, "Trying with token %s", token); user = UserAuthenticate(db, token); if (!user) { @@ -136,11 +138,11 @@ ROUTE_IMPL(RouteJoinRoomAlias, path, argp) id = UserIdParse(UserGetName(user), serverName); id->sigil = '@'; sender = ParserRecomposeCommonID(*id); + Log(LOG_INFO, "Now as %s", sender); room = RoomLock(db, roomId); if (!room) { - err = "Room ID does not exist."; HttpResponseStatus(args->context, HTTP_BAD_REQUEST); response = MatrixErrorCreate(M_UNKNOWN, err); @@ -149,6 +151,8 @@ ROUTE_IMPL(RouteJoinRoomAlias, path, argp) if (RoomContainsUser(room, sender)) { err = "User is already in the room."; + Log(LOG_INFO, "qhar for %s", sender); + HttpResponseStatus(args->context, HTTP_UNAUTHORIZED); response = MatrixErrorCreate(M_FORBIDDEN, err); goto finish; diff --git a/src/Routes/RouteMessages.c b/src/Routes/RouteMessages.c new file mode 100644 index 0000000..fa89694 --- /dev/null +++ b/src/Routes/RouteMessages.c @@ -0,0 +1,178 @@ +/* + * Copyright (C) 2022-2024 Jordan Bancino <@jordan:bancino.net> with + * other valuable contributors. See CONTRIBUTORS.txt for the full list. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation files + * (the "Software"), to deal in the Software without restriction, + * including without limitation the rights to use, copy, modify, merge, + * publish, distribute, sublicense, and/or sell copies of the Software, + * and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +#include + + +#include +#include +#include +#include + +#include +#include + +#include +#include + +#include + +static char * +GetServerName(Db * db) +{ + char *name; + + Config config; + + ConfigLock(db, &config); + if (!config.ok) + { + return NULL; + } + + name = StrDuplicate(config.serverName); + + ConfigUnlock(&config); + + return name; +} + +ROUTE_IMPL(RouteMessages, path, argp) +{ + RouteArgs *args = argp; + Db *db = args->matrixArgs->db; + + HashMap *params = HttpRequestParams(args->context); + HashMap *response = NULL; + + User *user = NULL; + char *token = NULL; + + char *roomId = ArrayGet(path, 0); + char *from = HashMapGet(params, "from"); + char *limitStr = HashMapGet(params, "limit"); + char *end = NULL; + char *sender = NULL; + char *serverName = NULL; + CommonID *id = NULL; + int limit = strtol(limitStr, NULL, 10); + + Array *messages = NULL; + + Room *room = NULL; + + char *err; + + if (!limit || limit < 0) + { + limit = 10; + } + if (!roomId) + { + /* Should be impossible */ + HttpResponseStatus(args->context, HTTP_INTERNAL_SERVER_ERROR); + return MatrixErrorCreate(M_UNKNOWN, NULL); + } + if (HttpRequestMethodGet(args->context) != HTTP_GET) + { + err = "Unknown request method."; + HttpResponseStatus(args->context, HTTP_BAD_REQUEST); + response = MatrixErrorCreate(M_UNRECOGNIZED, err); + goto finish; + } + + serverName = GetServerName(db); + if (!serverName) + { + HttpResponseStatus(args->context, HTTP_INTERNAL_SERVER_ERROR); + response = MatrixErrorCreate(M_UNKNOWN, NULL); + goto finish; + } + + response = MatrixGetAccessToken(args->context, &token); + if (response) + { + goto finish; + } + + user = UserAuthenticate(db, token); + if (!user) + { + HttpResponseStatus(args->context, HTTP_UNAUTHORIZED); + response = MatrixErrorCreate(M_UNKNOWN_TOKEN, NULL); + goto finish; + } + id = UserIdParse(UserGetName(user), serverName); + id->sigil = '@'; + sender = ParserRecomposeCommonID(*id); + + room = RoomLock(db, roomId); + if (!RoomContainsUser(room, sender)) + { + err = "User is not in the room."; + HttpResponseStatus(args->context, HTTP_UNAUTHORIZED); + response = MatrixErrorCreate(M_FORBIDDEN, err); + goto finish; + } + RoomUnlock(room); + + /* TODO: Filters, to, and friends */ + messages = UserFetchMessages(user, limit, from, &end); + if (!messages) + { + Room *r = RoomLock(db, roomId); + Array *leaf = RoomPrevEventsGet(r); + HashMap *start = JsonValueAsObject(ArrayGet(leaf, 0)); + char *startId = StrDuplicate( + JsonValueAsString(HashMapGet(start, "event_id")) + ); + char *token = UserNewMessageToken(user, roomId, startId); + RoomUnlock(r); + + messages = UserFetchMessages(user, limit, token, &end); + Free(token); + Free(startId); + } + response = HashMapCreate(); + JsonSet(response, JsonValueArray(messages), 1, "chunk"); + if (end) + { + JsonSet(response, JsonValueString(end), 1, "end"); + Free(end); + } + + UserFreeMessageToken(user, from); +finish: + UserUnlock(user); + UserIdFree(id); + if (sender) + { + Free(sender); + } + if (serverName) + { + Free(serverName); + } + return response; +} diff --git a/src/Routes/RoutePushRules.c b/src/Routes/RoutePushRules.c new file mode 100644 index 0000000..d07709b --- /dev/null +++ b/src/Routes/RoutePushRules.c @@ -0,0 +1,129 @@ +/* + * Copyright (C) 2022-2024 Jordan Bancino <@jordan:bancino.net> with + * other valuable contributors. See CONTRIBUTORS.txt for the full list. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation files + * (the "Software"), to deal in the Software without restriction, + * including without limitation the rights to use, copy, modify, merge, + * publish, distribute, sublicense, and/or sell copies of the Software, + * and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +#include +#include + +#include +#include +#include +#include + +#include +#include +#include + +#include + + +ROUTE_IMPL(RoutePushRules, path, argp) +{ + RouteArgs *args = argp; + Db *db = args->matrixArgs->db; + + HashMap *response = NULL; + + User *user = NULL; + char *token = NULL; + + char *err; + + if (HttpRequestMethodGet(args->context) != HTTP_GET) + { + err = "Unknown request method."; + HttpResponseStatus(args->context, HTTP_BAD_REQUEST); + response = MatrixErrorCreate(M_UNRECOGNIZED, err); + goto finish; + } + + response = MatrixGetAccessToken(args->context, &token); + if (response) + { + goto finish; + } + + user = UserAuthenticate(db, token); + if (!user) + { + HttpResponseStatus(args->context, HTTP_UNAUTHORIZED); + response = MatrixErrorCreate(M_UNKNOWN_TOKEN, NULL); + goto finish; + } + response = HashMapCreate(); + JsonSet(response, JsonValueObject(HashMapCreate()), 1, "global"); + (void) path; +finish: + UserUnlock(user); + return response; +} + +ROUTE_IMPL(RouteKeyQuery, path, argp) +{ + RouteArgs *args = argp; + Db *db = args->matrixArgs->db; + + HashMap *request = NULL; + HashMap *response = NULL; + + User *user = NULL; + char *token = NULL; + + char *err; + + if (HttpRequestMethodGet(args->context) != HTTP_POST) + { + err = "Unknown request method."; + HttpResponseStatus(args->context, HTTP_BAD_REQUEST); + response = MatrixErrorCreate(M_UNRECOGNIZED, err); + goto finish; + } + + response = MatrixGetAccessToken(args->context, &token); + if (response) + { + goto finish; + } + + user = UserAuthenticate(db, token); + if (!user) + { + HttpResponseStatus(args->context, HTTP_UNAUTHORIZED); + response = MatrixErrorCreate(M_UNKNOWN_TOKEN, NULL); + goto finish; + } + request = JsonDecode(HttpServerStream(args->context)); + if (!request) + { + HttpResponseStatus(args->context, HTTP_BAD_REQUEST); + response = MatrixErrorCreate(M_NOT_JSON, NULL); + goto finish; + } + + response = HashMapCreate(); + (void) path; +finish: + JsonFree(request); + UserUnlock(user); + return response; +} diff --git a/src/Routes/RouteSendEvent.c b/src/Routes/RouteSendEvent.c index e9dbcb2..a4ad505 100644 --- a/src/Routes/RouteSendEvent.c +++ b/src/Routes/RouteSendEvent.c @@ -171,3 +171,120 @@ finish: JsonFree(request); return response; } + +ROUTE_IMPL(RouteSendState, path, argp) +{ + RouteArgs *args = argp; + Db *db = args->matrixArgs->db; + + HashMap *request = NULL, *event = NULL; + HashMap *response = NULL, *filled = NULL; + + User *user = NULL; + CommonID *id = NULL; + char *token = NULL; + + char *serverName = NULL; + + char *roomId = ArrayGet(path, 0); + char *eventType = ArrayGet(path, 1); + char *stateKey = ArrayGet(path, 2); + char *sender = NULL; + + Room *room = NULL; + + char *err; + + if (!roomId || !eventType) + { + /* Should be impossible */ + HttpResponseStatus(args->context, HTTP_INTERNAL_SERVER_ERROR); + return MatrixErrorCreate(M_UNKNOWN, NULL); + } + if (HttpRequestMethodGet(args->context) != HTTP_PUT) + { + err = "Unknown request method."; + HttpResponseStatus(args->context, HTTP_BAD_REQUEST); + response = MatrixErrorCreate(M_UNRECOGNIZED, err); + goto finish; + } + + serverName = GetServerName(db); + if (!serverName) + { + HttpResponseStatus(args->context, HTTP_INTERNAL_SERVER_ERROR); + response = MatrixErrorCreate(M_UNKNOWN, NULL); + goto finish; + } + + response = MatrixGetAccessToken(args->context, &token); + if (response) + { + goto finish; + } + + request = JsonDecode(HttpServerStream(args->context)); + if (!request) + { + HttpResponseStatus(args->context, HTTP_BAD_REQUEST); + response = MatrixErrorCreate(M_NOT_JSON, NULL); + goto finish; + } + + user = UserAuthenticate(db, token); + if (!user) + { + HttpResponseStatus(args->context, HTTP_UNAUTHORIZED); + response = MatrixErrorCreate(M_UNKNOWN_TOKEN, NULL); + goto finish; + } + id = UserIdParse(UserGetName(user), serverName); + id->sigil = '@'; + sender = ParserRecomposeCommonID(*id); + + room = RoomLock(db, roomId); + if (!RoomContainsUser(room, sender)) + { + err = "User is not in the room."; + HttpResponseStatus(args->context, HTTP_UNAUTHORIZED); + response = MatrixErrorCreate(M_FORBIDDEN, err); + goto finish; + } + + Log(LOG_INFO, "State event (%s,%s) coming this way", eventType, stateKey); + event = RoomEventCreate( + sender, + eventType, stateKey ? stateKey : "", + JsonDuplicate(request) + ); + filled = RoomEventSend(room, event); + Log(LOG_INFO, "State event (%s,%s) coming this way", eventType, stateKey); + JsonFree(event); + Log(LOG_INFO, "And thats freed!"); + + if (!filled) + { + err = "User is not allowed to send state."; + HttpResponseStatus(args->context, HTTP_UNAUTHORIZED); + response = MatrixErrorCreate(M_FORBIDDEN, err); + goto finish; + } + + response = HashMapCreate(); + HashMapSet( + response, "event_id", + JsonValueDuplicate(HashMapGet(filled, "event_id")) + ); + JsonFree(filled); +finish: + RoomUnlock(room); + Free(serverName); + if (sender) + { + Free(sender); + } + UserIdFree(id); + UserUnlock(user); + JsonFree(request); + return response; +} diff --git a/src/Routes/RouteSync.c b/src/Routes/RouteSync.c index 2bf075a..e5df807 100644 --- a/src/Routes/RouteSync.c +++ b/src/Routes/RouteSync.c @@ -30,14 +30,33 @@ #include #include #include +#include #include +#include #include #include #include +#include -#include +static ClientEventWithoutRoomID +ClientfyEventSync(HashMap *pdu) +{ + ClientEventWithoutRoomID ret = { 0 }; + char *ignored; + ClientEventWithoutRoomIDFromJson(pdu, &ret, &ignored); + return ret; +} +static StrippedStateEvent +StripStateEventSync(HashMap *pdu) +{ + StrippedStateEvent ret = { 0 }; + char *ignored; + StrippedStateEventFromJson(pdu, &ret, &ignored); + + return ret; +} ROUTE_IMPL(RouteSync, path, argp) { @@ -46,18 +65,27 @@ ROUTE_IMPL(RouteSync, path, argp) HashMap *params = NULL; HashMap *response = NULL; + SyncResponse sync = { 0 }; - - HashMap *rooms = NULL; + Array *invites; + Array *joins; + size_t i; User *user = NULL; char *token = NULL; char *prevBatch = NULL; char *nextBatch = NULL; char *currBatch = NULL; + char *timeout = NULL; char *err; + int timeoutDuration; + + /* TODO: Respect `timeout', (and stop when something is + * pushed, maybe by 'polling' the database? sounds like + * a bad idea) */ + if (HttpRequestMethodGet(args->context) != HTTP_GET) { err = "Unknown request method."; @@ -79,84 +107,142 @@ ROUTE_IMPL(RouteSync, path, argp) response = MatrixErrorCreate(M_UNKNOWN_TOKEN, NULL); goto finish; } + params = HttpRequestParams(args->context); prevBatch = HashMapGet(params, "since"); + timeout = HashMapGet(params, "timeout"); + timeoutDuration = atoi(timeout); + + if (!prevBatch) { + prevBatch = NULL; nextBatch = UserInitSyncDiff(user); UserFillSyncDiff(user, nextBatch); } + else if (timeout && timeoutDuration) + { + char *name = StrDuplicate(UserGetName(user)); + int passed = 0; + + UserUnlock(user); + + /* TODO: Unlocking the user for other threads to notify us is not + * wise. + * Also, Telodendria will NOT reply to ^C while someone is executing + * a sync. */ + while (passed < timeoutDuration) + { + if (UserGetNotification(name)) + { + break; + } + UtilSleepMillis(100); + passed += 100; + } + Free(name); + user = UserAuthenticate(db, token); + } currBatch = prevBatch ? prevBatch : nextBatch; /* TODO: I only am manually parsing this because j2s does not support * a hashmap of unknown keys pointing to a known type. */ - response = HashMapCreate(); - rooms = HashMapCreate(); + sync.rooms.invite = HashMapCreate(); + sync.rooms.join = HashMapCreate(); /* invites */ + invites = UserGetInvites(user, currBatch); + for (i = 0; i < ArraySize(invites); i++) { - Array *invites; - size_t i; + char *roomId = ArrayGet(invites, i); + InvitedRooms invited = { 0 }; + HashMap *invitedObject; - invites = UserGetInvites(user, currBatch); - for (i = 0; i < ArraySize(invites); i++) - { - char *roomId = ArrayGet(invites, i); - JsonSet( - rooms, - JsonValueObject(HashMapCreate()), - 2, "invites", roomId - ); - } - UserFreeList(invites); + invited.invite_state.events = ArrayCreate(); + invitedObject = InvitedRoomsToJson(&invited); + JsonSet( + sync.rooms.invite, + JsonValueObject(invitedObject), + 1, roomId + ); + InvitedRoomsFree(&invited); } + UserFreeList(invites); /* Joins */ + joins = UserGetJoins(user, currBatch); + for (i = 0; i < ArraySize(joins); i++) { - Array *joins; - size_t i; + char *roomId = ArrayGet(joins, i); + Array *el = UserGetEvents(user, currBatch, roomId); + size_t j; + Room *r = RoomLock(db, roomId); + HashMap *state = StateCurrent(r); + char *firstEvent = NULL; + JoinedRooms joined = { 0 }; + HashMap *joinedObj; + char *type, *key, *id; - joins = UserGetJoins(user, currBatch); - for (i = 0; i < ArraySize(joins); i++) + joined.timeline.events = ArrayCreate(); + + for (j = 0; j < ArraySize(el); j++) { - char *roomId = ArrayGet(joins, i); - HashMap *room = HashMapCreate(); - Array *el = UserGetEvents(user, currBatch, roomId); - size_t j; - Room *r = RoomLock(db, roomId); - Array *timeline = ArrayCreate(); + char *event = ArrayGet(el, j); + HashMap *e = RoomEventFetch(r, event); + ClientEventWithoutRoomID rc = ClientfyEventSync(e); + ClientEventWithoutRoomID *c = Malloc(sizeof(*c)); + memcpy(c, &rc, sizeof(*c)); - /* TODO: Add history. */ - for (j = 0; j < ArraySize(el); j++) + JsonFree(e); + + if (j == 0) { - char *event = ArrayGet(el, j); - HashMap *e = RoomEventFetch(r, event); - HashMap *c = RoomEventClientify(e); - - JsonFree(e); - JsonValueFree(HashMapDelete(c, "room_id")); - - ArrayAdd(timeline, JsonValueObject(c)); + firstEvent = c->event_id; } - RoomUnlock(r); - UserFreeList(el); - JsonSet(room, JsonValueArray(timeline), 1, "timeline"); - JsonSet(rooms, JsonValueObject(room), 2, "join", roomId); + ArrayAdd(joined.timeline.events, c); } - UserFreeList(joins); - } + joined.timeline.prev_batch = UserNewMessageToken( + user, roomId, firstEvent + ); + /* TODO: Don't shove the entire state. + * That's a recipe for disaster, especially on large rooms. */ + joined.state.events = ArrayCreate(); + while (StateIterate(state, &type, &key, (void **) &id)) + { + HashMap *e = RoomEventFetch(r, id); + StrippedStateEvent rs = StripStateEventSync(e); + StrippedStateEvent *s = Malloc(sizeof(*s)); + memcpy(s, &rs, sizeof(*s)); + + JsonFree(e); + + ArrayAdd(joined.state.events, s); + Free(type); + Free(key); + } + + StateFree(state); + RoomUnlock(r); + UserFreeList(el); + + joinedObj = JoinedRoomsToJson(&joined); + HashMapSet(sync.rooms.join, roomId, JsonValueObject(joinedObj)); + JoinedRoomsFree(&joined); + } + UserFreeList(joins); - (void) path; if (prevBatch) { UserDropSync(user, prevBatch); nextBatch = UserInitSyncDiff(user); - } - HashMapSet(response, "next_batch", JsonValueString(nextBatch)); - Free(nextBatch); - HashMapSet(response, "rooms", JsonValueObject(rooms)); + UserFillSyncDiff(user, nextBatch); + } + sync.next_batch = nextBatch; + response = SyncResponseToJson(&sync); + SyncResponseFree(&sync); finish: UserUnlock(user); + (void) path; return response; } diff --git a/src/User.c b/src/User.c index 663b2ea..bf4c763 100644 --- a/src/User.c +++ b/src/User.c @@ -32,8 +32,10 @@ #include #include +#include #include +#include struct User { @@ -44,6 +46,9 @@ struct User char *deviceId; }; +static pthread_mutex_t pushLock; +static HashMap *pushTable = NULL; + bool UserValidate(char *localpart, char *domain) { @@ -110,7 +115,7 @@ UserLock(Db * db, char *name) User *user = NULL; DbRef *ref = NULL; - if (!UserExists(db, name)) + if (!name || !UserExists(db, name)) { return NULL; } @@ -174,16 +179,22 @@ bool UserUnlock(User * user) { bool ret; + Db *db; + DbRef *ref; if (!user) { return false; } + db = user->db; + ref = user->ref; Free(user->name); Free(user->deviceId); - ret = DbUnlock(user->db, user->ref); + ret = DbUnlock(db, ref); + user->db = NULL; + user->ref = NULL; Free(user); return ret; @@ -1057,7 +1068,11 @@ UserAddJoin(User *user, char *roomId) data = DbJson(user->ref); joins = JsonValueAsObject(HashMapGet(data, "joins")); - JsonFree(HashMapSet(joins, roomId, JsonValueNull())); + if (!HashMapGet(joins, roomId)) + { + JsonFree(HashMapSet(joins, roomId, JsonValueNull())); + } + UserNotifyUser(user); } void @@ -1074,6 +1089,7 @@ UserRemoveJoin(User *user, char *roomId) joins = JsonValueAsObject(HashMapGet(data, "joins")); JsonFree(HashMapDelete(joins, roomId)); + UserNotifyUser(user); } Array * UserListJoins(User *user) @@ -1142,6 +1158,8 @@ UserInitSyncDiff(User *user) HashMapSet(data, "leaves", JsonValueArray(ArrayCreate())); HashMapSet(data, "joins", JsonValueObject(HashMapCreate())); + HashMapSet(data, "creation", JsonValueInteger(UtilTsMillis())); + DbUnlock(user->db, syncRef); return nextBatch; @@ -1173,6 +1191,7 @@ UserPushInviteSync(User *user, char *roomId) DbUnlock(user->db, syncRef); } DbListFree(entries); + UserNotifyUser(user); } void UserPushJoinSync(User *user, char *roomId) @@ -1212,6 +1231,7 @@ UserPushJoinSync(User *user, char *roomId) DbUnlock(user->db, syncRef); } DbListFree(entries); + UserNotifyUser(user); } void @@ -1251,6 +1271,8 @@ UserPushEvent(User *user, HashMap *event) DbUnlock(user->db, syncRef); } DbListFree(entries); + + UserNotifyUser(user); } void UserDropSync(User *user, char *batch) @@ -1313,9 +1335,21 @@ UserFillSyncDiff(User *user, char *batch) HashMap *joinEntry; HashMap *data = DbJson(syncRef); HashMap *join = JsonValueAsObject(HashMapGet(data, "joins")); + Array *timeline = ArrayCreate(); + Room *r = RoomLock(user->db, roomId); + Array *prevs = RoomPrevEventsGet(r); + size_t j; + + for (j = 0; j < ArraySize(prevs); j++) + { + HashMap *e = JsonValueAsObject(ArrayGet(prevs, j)); + /* TODO: Backfill the user a 'lil more. */ + ArrayAdd(timeline, JsonValueDuplicate(HashMapGet(e, "event_id"))); + } + RoomUnlock(r); joinEntry = HashMapCreate(); - HashMapSet(joinEntry, "timeline", JsonValueArray(ArrayCreate())); + HashMapSet(joinEntry, "timeline", JsonValueArray(timeline)); if (!HashMapGet(join, roomId)) { @@ -1332,6 +1366,7 @@ UserFillSyncDiff(User *user, char *batch) Array * UserGetJoins(User *user, char *batch) { + Db *db; DbRef *syncRef; HashMap *data; Array *keys; @@ -1341,7 +1376,8 @@ UserGetJoins(User *user, char *batch) return NULL; } - syncRef = DbLock(user->db, 4, "users", user->name, "sync", batch); + db = user->db; + syncRef = DbLock(db, 4, "users", user->name, "sync", batch); if (!syncRef) { return NULL; @@ -1356,8 +1392,7 @@ UserGetJoins(User *user, char *batch) ArraySet(keys, i, StrDuplicate(str)); } - - DbUnlock(user->db, syncRef); + DbUnlock(db, syncRef); return keys; } Array * @@ -1395,3 +1430,267 @@ UserGetEvents(User *user, char *batch, char *roomId) DbUnlock(user->db, syncRef); return ret; } + + +char * +UserNewMessageToken(User *user, char *room, char *event) +{ + DbRef *messageRef; + HashMap *json; + char *messageToken; + if (!user || !room || !event) + { + return NULL; + } + + messageToken = StrRandom(16); + messageRef = DbCreate(user->db, + 4, "users", user->name, "msg", messageToken + ); + json = DbJson(messageRef); + + HashMapSet(json, "room", JsonValueString(room)); + HashMapSet(json, "from", JsonValueString(event)); + + DbUnlock(user->db, messageRef); + return messageToken; +} + +Array * +UserFetchMessages(User *user, int n, char *token, char **next) +{ + Array *messages = NULL; + Array *nexts = NULL; + DbRef *messageRef; + HashMap *json; + Room *room; + char *roomId; + size_t i; + + bool limited = false; + bool dir = false; + + if (!user || !token || n == 0) + { + return NULL; + } + + if (n < 0) + { + n = -n; + dir = true; + } + + messageRef = DbLock(user->db, + 4, "users", user->name, "msg", token + ); + json = DbJson(messageRef); + if (!messageRef) + { + /* Regenerate a new one */ + return NULL; + } + + roomId = JsonValueAsString(HashMapGet(json, "room")); + room = RoomLock(user->db, roomId); + + /* TODO (very important): CHECK IF THE USER IS ABLE TO SEE + * HISTORY. THROUGHOUT THE STEPS HERE. */ + if (!room) + { + DbUnlock(user->db, messageRef); + return NULL; + } + + nexts = ArrayCreate(); + messages = ArrayCreate(); + + /* A stack of elements to deal with the DAG. */ + ArrayAdd(nexts, + StrDuplicate(JsonValueAsString(HashMapGet(json, "from"))) + ); + for (i = 0; i < (size_t) n && ArraySize(nexts); i++) + { + char *curr = ArrayDelete(nexts, ArraySize(nexts) - 1); + HashMap *event = RoomEventFetch(room, curr); + Array *prevEvents; + size_t j; + + Free(curr); + + /* Push event into our message list. */ + ArrayAdd(messages, event); + + prevEvents = JsonValueAsArray(HashMapGet(event, "prev_events")); + if (dir) + { + HashMap *unsign = JsonValueAsObject(HashMapGet(event, "unsigned")); + + /* prevEvents is now nextEvents */ + prevEvents = JsonValueAsArray(HashMapGet(unsign, "next_events")); + } + + for (j = 0; j < ArraySize(prevEvents); j++) + { + char *prevEvent = JsonValueAsString(ArrayGet(prevEvents, j)); + + ArrayAdd(nexts, StrDuplicate(prevEvent)); + } + + if (ArraySize(prevEvents) == 0) + { + limited = true; + } + } + for (i = 0; i < ArraySize(nexts); i++) + { + Free(ArrayGet(nexts, i)); + } + ArrayFree(nexts); + RoomUnlock(room); + + if (next && !limited) + { + HashMap *lastMessage = ArrayGet(messages, ArraySize(messages) - 1); + char *eId = JsonValueAsString(HashMapGet(lastMessage, "event_id")); + + *next = UserNewMessageToken(user, roomId, eId); + } + + DbUnlock(user->db, messageRef); + for (i = 0; i < ArraySize(messages); i++) + { + HashMap *e = ArrayGet(messages, i); + ArraySet(messages, i, JsonValueObject(e)); + } + return messages; +} + +void +UserFreeMessageToken(User *user, char *token) +{ + if (!user || !token) + { + return; + } + + DbDelete(user->db, + 4, "users", user->name, "msg", token + ); +} +void +UserCleanTemporaryData(User *user) +{ + Array *list; + size_t i; + + if (!user) + { + return; + } + + list = DbList(user->db, 3, "users", user->name, "msg"); + for (i = 0; i < ArraySize(list); i++) + { + char *token = ArrayGet(list, i); + UserFreeMessageToken(user, token); + } + DbListFree(list); + + list = DbList(user->db, 3, "users", user->name, "sync"); + for (i = 0; i < ArraySize(list); i++) + { + char *token = ArrayGet(list, i); + if (UserIsSyncOld(user, token)) + { + UserDropSync(user, token); + } + } + DbListFree(list); +} +bool +UserIsSyncOld(User *user, char *token) +{ + DbRef *ref; + HashMap *map; + int64_t dt; + + if (!user || !token) + { + return false; + } + ref = DbLock(user->db, 4, "users", user->name, "sync", token); + map = DbJson(ref); + + dt = UtilTsMillis() - JsonValueAsInteger(HashMapGet(map, "creation")); + + DbUnlock(user->db, ref); + return dt > (5 * 60 * 1000); /* 5-minutes of timeout. */ +} +bool +UserSyncExists(User *user, char *sync) +{ + if (!user || !sync) + { + return false; + } + return DbLock(user->db, 4, "users", user->name, "sync", sync); +} + + +extern void +UserInitialisePushTable(void) +{ + if (pushTable) + { + return; + } + pthread_mutex_init(&pushLock, NULL); + + pthread_mutex_lock(&pushLock); + pushTable = HashMapCreate(); + pthread_mutex_unlock(&pushLock); +} + +void +UserNotifyUser(User *user) +{ + if (!user || !pushTable) + { + return; + } + pthread_mutex_lock(&pushLock); + HashMapSet(pushTable, user->name, user->name); + pthread_mutex_unlock(&pushLock); +} + +bool +UserGetNotification(char *user) +{ + bool ret; + if (!user || !pushTable) + { + return false; + } + pthread_mutex_lock(&pushLock); + ret = !!HashMapGet(pushTable, user); + HashMapDelete(pushTable, user); + pthread_mutex_unlock(&pushLock); + + return ret; +} + +void +UserDestroyPushTable(void) +{ + if (!pushTable) + { + return; + } + + pthread_mutex_lock(&pushLock); + HashMapFree(pushTable); + pushTable = NULL; + pthread_mutex_unlock(&pushLock); + pthread_mutex_destroy(&pushLock); +} diff --git a/src/include/Routes.h b/src/include/Routes.h index 5f4472b..e149bbe 100644 --- a/src/include/Routes.h +++ b/src/include/Routes.h @@ -100,12 +100,14 @@ ROUTE(RouteConfig); ROUTE(RoutePrivileges); ROUTE(RouteCreateRoom); -ROUTE(RouteFetchEvent); ROUTE(RouteSendEvent); +ROUTE(RouteSendState); ROUTE(RouteJoinRoom); -ROUTE(RouteJoinedRooms); ROUTE(RouteJoinRoomAlias); +ROUTE(RouteFetchEvent); +ROUTE(RouteJoinedRooms); ROUTE(RouteSync); +ROUTE(RouteMessages); ROUTE(RouteAliasDirectory); ROUTE(RouteRoomAliases); @@ -113,7 +115,6 @@ ROUTE(RouteRoomAliases); ROUTE(RouteAdminDeactivate); ROUTE(RouteAdminTokens); - #undef ROUTE #endif diff --git a/src/include/User.h b/src/include/User.h index d080d11..51d36c5 100644 --- a/src/include/User.h +++ b/src/include/User.h @@ -91,6 +91,7 @@ typedef struct UserLoginInfo char *refreshToken; } UserLoginInfo; + /** * Take a localpart and domain as separate parameters and validate them * against the rules of the Matrix specification. The reasion the @@ -415,4 +416,69 @@ extern Array * UserGetEvents(User *, char *, char *); * exists. */ extern void UserDropSync(User *, char *); + +/** + * Verifies if the 'sync' data is considered old, and can be + * reasonably removed from the database. + */ +extern bool UserIsSyncOld(User *, char *); + +/** + * Verifies if a sync diff exists. + */ +extern bool UserSyncExists(User *, char *); + +/** + * Creates a new "message token" start from a room and event ID, + * given a specified direction(with false<=>'b' and true<=>'f') + * to be used by /messages. + */ +extern char * UserNewMessageToken(User *, char *, char *); + +/** + * Grabs events from a message token(with the correct direction) + * into an array of HashMaps(which store PDU objects). + * If the string pointer is not NULL, this function sets its + * value to a new message token(which is stored on the heap), + * if and only if, there are any new events to retrieve. + * + * If the counter is negative, then it is treated as a 'f', + * otherwise, it is treated as 'b'. + */ +extern Array * UserFetchMessages(User *, int, char *, char **); + +/** + * Deletes a "message token". + */ +extern void UserFreeMessageToken(User *, char *); + + +/** + * Deletes temporary data about a specific user. + */ +extern void UserCleanTemporaryData(User *); + +/** + * Initialise a temporary sync table to signal any /sync changes. + * This should be paired with a call to + * .Fn UserDestroyPushTable . + */ +extern void UserInitialisePushTable(void); + +/** + * Notifies the user of a sync update. + */ +extern void UserNotifyUser(User *); + +/** + * Verifies if the user has been notified, and if it is, then + * removes the notification + */ +extern bool UserGetNotification(char *); + +/** + * Destroys the temporary push table created by + * .Fn UserInitialisePushTable . + */ +extern void UserDestroyPushTable(void); #endif /* TELODENDRIA_USER_H */