diff --git a/Schema/PduV1.json b/Schema/PduV1.json index 47b4494..6a46feb 100644 --- a/Schema/PduV1.json +++ b/Schema/PduV1.json @@ -16,6 +16,10 @@ "fields": { "age": { "type": "integer" + }, + "next_events": { + "type": "[string]", + "required": false } } }, diff --git a/Schema/SyncResponse.json b/Schema/SyncResponse.json index a75f712..c4990bc 100644 --- a/Schema/SyncResponse.json +++ b/Schema/SyncResponse.json @@ -21,13 +21,19 @@ }, "type": "struct" }, - "StrippedStateEvent": { + "InviteState": { "fields": { - "events": { "type": "[Event]" } + "events": { "type": "[StrippedStateEvent]" } }, "type": "struct" }, - "InviteState": { + "States": { + "fields": { + "events": { "type": "[StrippedStateEvent]" } + }, + "type": "struct" + }, + "StrippedStateEvent": { "fields": { "content": { "type": "object", @@ -75,14 +81,15 @@ }, "JoinedRooms": { "fields": { - "timeline": { "type": "Timeline" } + "timeline": { "type": "Timeline" }, + "state": { "type": "States" } }, "type": "struct" }, "Rooms": { "fields": { - "invite": { "type": "InvitedRooms" }, - "join": { "type": "JoinedRooms" } + "invite": { "type": "object" }, + "join": { "type": "object" } }, "type": "struct" }, diff --git a/src/Main.c b/src/Main.c index a4e2880..3d90a32 100644 --- a/src/Main.c +++ b/src/Main.c @@ -86,6 +86,26 @@ SignalHandler(int signal) } } +/* TODO: Move this! */ +static void +UsersClean(MatrixHttpHandlerArgs *args) +{ + Db *db = args->db; + Array *arr = DbList(db, 1, "users"); + size_t i; + + for (i = 0; i < ArraySize(arr); i++) + { + char *id = ArrayGet(arr, i); + User *user = UserLock(db, id); + + UserCleanTemporaryData(user); + UserUnlock(user); + } + + DbListFree(arr); +} + typedef enum ArgFlag { ARG_VERSION = (1 << 0), @@ -147,6 +167,7 @@ start: token = NULL; memset(&matrixArgs, 0, sizeof(matrixArgs)); + UserInitialisePushTable(); if (!LogConfigGlobal()) { @@ -513,10 +534,15 @@ start: Log(LOG_DEBUG, "Registering jobs..."); CronEvery(cron, 30 * 60 * 1000, (JobFunc *) UiaCleanup, &matrixArgs); + CronEvery(cron, 5 * 60 * 1000, (JobFunc *) UsersClean, &matrixArgs); Log(LOG_NOTICE, "Starting job scheduler..."); CronStart(cron); + /* We still call it anyways to be sure. */ + Log(LOG_NOTICE, "Cleaning up user data..."); + UsersClean(&matrixArgs); + Log(LOG_NOTICE, "Building routing tree..."); matrixArgs.router = RouterBuild(); if (!matrixArgs.router) @@ -583,6 +609,7 @@ start: finish: Log(LOG_NOTICE, "Shutting down..."); + UserDestroyPushTable(); if (httpServers) { for (i = 0; i < ArraySize(httpServers); i++) diff --git a/src/Room.c b/src/Room.c index 2752c8a..f61a2af 100644 --- a/src/Room.c +++ b/src/Room.c @@ -52,6 +52,7 @@ #define IsState(p, typ, key) (StrEquals(p->type, typ) && \ StrEquals(p->state_key, key)) + struct Room { Db *db; @@ -1091,22 +1092,25 @@ AuthorizeJoinMembershipV1(Room * room, PduV1 pdu, HashMap *state) { /* Interperet prev properly, as a list of JsonObjects. */ char *prev_id = JsonValueAsString(ArrayGet(prev, 0)); - char *ignored; HashMap *prev_event = RoomEventFetch(room, prev_id); - PduV1 prev_pdu; + bool flag = false; - if (prev && PduV1FromJson(prev_event, &prev_pdu, &ignored)) + if (prev_event) { - if (StrEquals(prev_pdu.type, "m.room.create") && - StrEquals(prev_pdu.sender, pdu.state_key)) + char *type = JsonValueAsString(HashMapGet(prev_event, "type")); + char *sender = JsonValueAsString(HashMapGet(prev_event, "sender")); + if (StrEquals(type, "m.room.create") && + StrEquals(sender, pdu.state_key)) { - PduV1Free(&prev_pdu); - JsonFree(prev_event); - return true; + flag = true; } - PduV1Free(&prev_pdu); } JsonFree(prev_event); + + if (flag) + { + return true; + } } /* Step 5.2.2: If the sender does not match state_key, reject. */ @@ -1633,6 +1637,9 @@ RoomEventSendV1(Room * room, HashMap * event) * errorr status to the user. */ goto finish; } + StateFree(state); + state = NULL; + RoomAddEventV1(room, pdu); valid = true; @@ -1758,6 +1765,7 @@ RoomAddEventV1(Room *room, PduV1 pdu) JsonValue *leaves_val; char *safe_id; size_t i; + if (!room || room->version >= 3) { return false; @@ -1802,7 +1810,7 @@ RoomAddEventV1(Room *room, PduV1 pdu) pdu_json = PduV1ToJson(&pdu); DbJsonSet(event_ref, pdu_json); - + ArrayAdd(leaves, JsonValueObject(pdu_json)); leaves_json = JsonDuplicate(leaves_json); JsonValueFree(HashMapDelete(leaves_json, "leaves")); @@ -1812,12 +1820,39 @@ RoomAddEventV1(Room *room, PduV1 pdu) DbUnlock(room->db, event_ref); - /* TODO: Store DAG relationships, somehow. */ - pdu_json = PduV1ToJson(&pdu); + for (i = 0; i < ArraySize(prev_events); i++) { - HashMap *state = StateCurrent(room); + JsonValue *event_val = ArrayGet(prev_events, i); + char *id = JsonValueAsString(event_val); + char *error = NULL; + PduV1 prev_pdu = { 0 }; + HashMap *prev_object = NULL; + Array *next_events = NULL; + event_ref = DbLock(room->db, 4, "rooms", room->id, "events", id); + PduV1FromJson(DbJson(event_ref), &prev_pdu, &error); + + if (!prev_pdu._unsigned.next_events) + { + prev_pdu._unsigned.next_events = ArrayCreate(); + } + next_events = prev_pdu._unsigned.next_events; + + ArrayAdd(next_events, StrDuplicate(pdu.event_id)); + + prev_object = PduV1ToJson(&prev_pdu); + DbJsonSet(event_ref, prev_object); + + JsonFree(prev_object); + PduV1Free(&prev_pdu); + DbUnlock(room->db, event_ref); + } + + { + HashMap *state; char *type, *state_key, *event_id; + pdu_json = PduV1ToJson(&pdu); + if (StrEquals(pdu.type, "m.room.member")) { CommonID *id = UserIdParse(pdu.state_key, NULL); @@ -1846,18 +1881,16 @@ RoomAddEventV1(Room *room, PduV1 pdu) UserIdFree(id); UserUnlock(user); } - + state = StateCurrent(room); while (StateIterate(state, &type, &state_key, (void **) &event_id)) { if (StrEquals(type, "m.room.member")) { - /* Notify state_key about it. - * Does nothing if the user is not already in the room. */ CommonID *id = UserIdParse(state_key, NULL); User *user = UserLock(room->db, id->local); UserPushEvent(user, pdu_json); - + UserIdFree(id); UserUnlock(user); } @@ -1865,8 +1898,9 @@ RoomAddEventV1(Room *room, PduV1 pdu) Free(state_key); } StateFree(state); + + JsonFree(pdu_json); } - JsonFree(pdu_json); return true; } diff --git a/src/Routes.c b/src/Routes.c index aac7343..76b9491 100644 --- a/src/Routes.c +++ b/src/Routes.c @@ -77,8 +77,10 @@ RouterBuild(void) R("/_matrix/client/v3/user/(.*)/filter/(.*)", RouteFilter); R("/_matrix/client/v3/rooms/(.*)/send/(.*)/(.*)", RouteSendEvent); + R("/_matrix/client/v3/rooms/(.*)/state/(.*)/(.*)", RouteSendState); R("/_matrix/client/v3/rooms/(.*)/event/(.*)", RouteFetchEvent); R("/_matrix/client/v3/rooms/(.*)/join", RouteJoinRoom); + R("/_matrix/client/v3/rooms/(.*)/messages", RouteMessages); R("/_matrix/client/v3/join/(.*)", RouteJoinRoomAlias); @@ -91,6 +93,7 @@ RouterBuild(void) R("/_matrix/client/v3/directory/room/(.*)", RouteAliasDirectory); R("/_matrix/client/v3/rooms/(.*)/aliases", RouteRoomAliases); + /* Telodendria Admin API Routes */ R("/_telodendria/admin/v1/(restart|shutdown|stats)", RouteProcControl); diff --git a/src/Routes/RouteCreateRoom.c b/src/Routes/RouteCreateRoom.c index 376b498..beacfba 100644 --- a/src/Routes/RouteCreateRoom.c +++ b/src/Routes/RouteCreateRoom.c @@ -104,8 +104,6 @@ ROUTE_IMPL(RouteCreateRoom, path, argp) JsonFree(request); request = NULL; - Log(LOG_INFO, "Creating room..."); - if (!(room = RoomCreate(db, user, &parsed, server))) { err = "Couldn't create room."; diff --git a/src/Routes/RouteFilter.c b/src/Routes/RouteFilter.c index c8f4fa8..55bfc32 100644 --- a/src/Routes/RouteFilter.c +++ b/src/Routes/RouteFilter.c @@ -189,7 +189,7 @@ ROUTE_IMPL(RouteFilter, path, argp) DbJsonSet(ref, filterJson); DbUnlock(db, ref); - Free(filterJson); + JsonFree(filterJson); response = HashMapCreate(); HashMapSet(response, "filter_id", JsonValueString(filterId)); diff --git a/src/Routes/RouteJoinRoomAlias.c b/src/Routes/RouteJoinRoomAlias.c index c074f01..51b9be4 100644 --- a/src/Routes/RouteJoinRoomAlias.c +++ b/src/Routes/RouteJoinRoomAlias.c @@ -84,6 +84,7 @@ ROUTE_IMPL(RouteJoinRoomAlias, path, argp) if (*roomId != '!') { roomId = RoomResolveAlias(db, roomId); + Log(LOG_NOTICE, "Here's my guess: %s", roomId); } else { @@ -126,6 +127,7 @@ ROUTE_IMPL(RouteJoinRoomAlias, path, argp) goto finish; } + Log(LOG_INFO, "Trying with token %s", token); user = UserAuthenticate(db, token); if (!user) { @@ -136,11 +138,11 @@ ROUTE_IMPL(RouteJoinRoomAlias, path, argp) id = UserIdParse(UserGetName(user), serverName); id->sigil = '@'; sender = ParserRecomposeCommonID(*id); + Log(LOG_INFO, "Now as %s", sender); room = RoomLock(db, roomId); if (!room) { - err = "Room ID does not exist."; HttpResponseStatus(args->context, HTTP_BAD_REQUEST); response = MatrixErrorCreate(M_UNKNOWN, err); @@ -149,6 +151,8 @@ ROUTE_IMPL(RouteJoinRoomAlias, path, argp) if (RoomContainsUser(room, sender)) { err = "User is already in the room."; + Log(LOG_INFO, "qhar for %s", sender); + HttpResponseStatus(args->context, HTTP_UNAUTHORIZED); response = MatrixErrorCreate(M_FORBIDDEN, err); goto finish; diff --git a/src/Routes/RouteMessages.c b/src/Routes/RouteMessages.c new file mode 100644 index 0000000..fa89694 --- /dev/null +++ b/src/Routes/RouteMessages.c @@ -0,0 +1,178 @@ +/* + * Copyright (C) 2022-2024 Jordan Bancino <@jordan:bancino.net> with + * other valuable contributors. See CONTRIBUTORS.txt for the full list. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation files + * (the "Software"), to deal in the Software without restriction, + * including without limitation the rights to use, copy, modify, merge, + * publish, distribute, sublicense, and/or sell copies of the Software, + * and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +#include + + +#include +#include +#include +#include + +#include +#include + +#include +#include + +#include + +static char * +GetServerName(Db * db) +{ + char *name; + + Config config; + + ConfigLock(db, &config); + if (!config.ok) + { + return NULL; + } + + name = StrDuplicate(config.serverName); + + ConfigUnlock(&config); + + return name; +} + +ROUTE_IMPL(RouteMessages, path, argp) +{ + RouteArgs *args = argp; + Db *db = args->matrixArgs->db; + + HashMap *params = HttpRequestParams(args->context); + HashMap *response = NULL; + + User *user = NULL; + char *token = NULL; + + char *roomId = ArrayGet(path, 0); + char *from = HashMapGet(params, "from"); + char *limitStr = HashMapGet(params, "limit"); + char *end = NULL; + char *sender = NULL; + char *serverName = NULL; + CommonID *id = NULL; + int limit = strtol(limitStr, NULL, 10); + + Array *messages = NULL; + + Room *room = NULL; + + char *err; + + if (!limit || limit < 0) + { + limit = 10; + } + if (!roomId) + { + /* Should be impossible */ + HttpResponseStatus(args->context, HTTP_INTERNAL_SERVER_ERROR); + return MatrixErrorCreate(M_UNKNOWN, NULL); + } + if (HttpRequestMethodGet(args->context) != HTTP_GET) + { + err = "Unknown request method."; + HttpResponseStatus(args->context, HTTP_BAD_REQUEST); + response = MatrixErrorCreate(M_UNRECOGNIZED, err); + goto finish; + } + + serverName = GetServerName(db); + if (!serverName) + { + HttpResponseStatus(args->context, HTTP_INTERNAL_SERVER_ERROR); + response = MatrixErrorCreate(M_UNKNOWN, NULL); + goto finish; + } + + response = MatrixGetAccessToken(args->context, &token); + if (response) + { + goto finish; + } + + user = UserAuthenticate(db, token); + if (!user) + { + HttpResponseStatus(args->context, HTTP_UNAUTHORIZED); + response = MatrixErrorCreate(M_UNKNOWN_TOKEN, NULL); + goto finish; + } + id = UserIdParse(UserGetName(user), serverName); + id->sigil = '@'; + sender = ParserRecomposeCommonID(*id); + + room = RoomLock(db, roomId); + if (!RoomContainsUser(room, sender)) + { + err = "User is not in the room."; + HttpResponseStatus(args->context, HTTP_UNAUTHORIZED); + response = MatrixErrorCreate(M_FORBIDDEN, err); + goto finish; + } + RoomUnlock(room); + + /* TODO: Filters, to, and friends */ + messages = UserFetchMessages(user, limit, from, &end); + if (!messages) + { + Room *r = RoomLock(db, roomId); + Array *leaf = RoomPrevEventsGet(r); + HashMap *start = JsonValueAsObject(ArrayGet(leaf, 0)); + char *startId = StrDuplicate( + JsonValueAsString(HashMapGet(start, "event_id")) + ); + char *token = UserNewMessageToken(user, roomId, startId); + RoomUnlock(r); + + messages = UserFetchMessages(user, limit, token, &end); + Free(token); + Free(startId); + } + response = HashMapCreate(); + JsonSet(response, JsonValueArray(messages), 1, "chunk"); + if (end) + { + JsonSet(response, JsonValueString(end), 1, "end"); + Free(end); + } + + UserFreeMessageToken(user, from); +finish: + UserUnlock(user); + UserIdFree(id); + if (sender) + { + Free(sender); + } + if (serverName) + { + Free(serverName); + } + return response; +} diff --git a/src/Routes/RoutePushRules.c b/src/Routes/RoutePushRules.c new file mode 100644 index 0000000..d07709b --- /dev/null +++ b/src/Routes/RoutePushRules.c @@ -0,0 +1,129 @@ +/* + * Copyright (C) 2022-2024 Jordan Bancino <@jordan:bancino.net> with + * other valuable contributors. See CONTRIBUTORS.txt for the full list. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation files + * (the "Software"), to deal in the Software without restriction, + * including without limitation the rights to use, copy, modify, merge, + * publish, distribute, sublicense, and/or sell copies of the Software, + * and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +#include +#include + +#include +#include +#include +#include + +#include +#include +#include + +#include + + +ROUTE_IMPL(RoutePushRules, path, argp) +{ + RouteArgs *args = argp; + Db *db = args->matrixArgs->db; + + HashMap *response = NULL; + + User *user = NULL; + char *token = NULL; + + char *err; + + if (HttpRequestMethodGet(args->context) != HTTP_GET) + { + err = "Unknown request method."; + HttpResponseStatus(args->context, HTTP_BAD_REQUEST); + response = MatrixErrorCreate(M_UNRECOGNIZED, err); + goto finish; + } + + response = MatrixGetAccessToken(args->context, &token); + if (response) + { + goto finish; + } + + user = UserAuthenticate(db, token); + if (!user) + { + HttpResponseStatus(args->context, HTTP_UNAUTHORIZED); + response = MatrixErrorCreate(M_UNKNOWN_TOKEN, NULL); + goto finish; + } + response = HashMapCreate(); + JsonSet(response, JsonValueObject(HashMapCreate()), 1, "global"); + (void) path; +finish: + UserUnlock(user); + return response; +} + +ROUTE_IMPL(RouteKeyQuery, path, argp) +{ + RouteArgs *args = argp; + Db *db = args->matrixArgs->db; + + HashMap *request = NULL; + HashMap *response = NULL; + + User *user = NULL; + char *token = NULL; + + char *err; + + if (HttpRequestMethodGet(args->context) != HTTP_POST) + { + err = "Unknown request method."; + HttpResponseStatus(args->context, HTTP_BAD_REQUEST); + response = MatrixErrorCreate(M_UNRECOGNIZED, err); + goto finish; + } + + response = MatrixGetAccessToken(args->context, &token); + if (response) + { + goto finish; + } + + user = UserAuthenticate(db, token); + if (!user) + { + HttpResponseStatus(args->context, HTTP_UNAUTHORIZED); + response = MatrixErrorCreate(M_UNKNOWN_TOKEN, NULL); + goto finish; + } + request = JsonDecode(HttpServerStream(args->context)); + if (!request) + { + HttpResponseStatus(args->context, HTTP_BAD_REQUEST); + response = MatrixErrorCreate(M_NOT_JSON, NULL); + goto finish; + } + + response = HashMapCreate(); + (void) path; +finish: + JsonFree(request); + UserUnlock(user); + return response; +} diff --git a/src/Routes/RouteSendEvent.c b/src/Routes/RouteSendEvent.c index e9dbcb2..a4ad505 100644 --- a/src/Routes/RouteSendEvent.c +++ b/src/Routes/RouteSendEvent.c @@ -171,3 +171,120 @@ finish: JsonFree(request); return response; } + +ROUTE_IMPL(RouteSendState, path, argp) +{ + RouteArgs *args = argp; + Db *db = args->matrixArgs->db; + + HashMap *request = NULL, *event = NULL; + HashMap *response = NULL, *filled = NULL; + + User *user = NULL; + CommonID *id = NULL; + char *token = NULL; + + char *serverName = NULL; + + char *roomId = ArrayGet(path, 0); + char *eventType = ArrayGet(path, 1); + char *stateKey = ArrayGet(path, 2); + char *sender = NULL; + + Room *room = NULL; + + char *err; + + if (!roomId || !eventType) + { + /* Should be impossible */ + HttpResponseStatus(args->context, HTTP_INTERNAL_SERVER_ERROR); + return MatrixErrorCreate(M_UNKNOWN, NULL); + } + if (HttpRequestMethodGet(args->context) != HTTP_PUT) + { + err = "Unknown request method."; + HttpResponseStatus(args->context, HTTP_BAD_REQUEST); + response = MatrixErrorCreate(M_UNRECOGNIZED, err); + goto finish; + } + + serverName = GetServerName(db); + if (!serverName) + { + HttpResponseStatus(args->context, HTTP_INTERNAL_SERVER_ERROR); + response = MatrixErrorCreate(M_UNKNOWN, NULL); + goto finish; + } + + response = MatrixGetAccessToken(args->context, &token); + if (response) + { + goto finish; + } + + request = JsonDecode(HttpServerStream(args->context)); + if (!request) + { + HttpResponseStatus(args->context, HTTP_BAD_REQUEST); + response = MatrixErrorCreate(M_NOT_JSON, NULL); + goto finish; + } + + user = UserAuthenticate(db, token); + if (!user) + { + HttpResponseStatus(args->context, HTTP_UNAUTHORIZED); + response = MatrixErrorCreate(M_UNKNOWN_TOKEN, NULL); + goto finish; + } + id = UserIdParse(UserGetName(user), serverName); + id->sigil = '@'; + sender = ParserRecomposeCommonID(*id); + + room = RoomLock(db, roomId); + if (!RoomContainsUser(room, sender)) + { + err = "User is not in the room."; + HttpResponseStatus(args->context, HTTP_UNAUTHORIZED); + response = MatrixErrorCreate(M_FORBIDDEN, err); + goto finish; + } + + Log(LOG_INFO, "State event (%s,%s) coming this way", eventType, stateKey); + event = RoomEventCreate( + sender, + eventType, stateKey ? stateKey : "", + JsonDuplicate(request) + ); + filled = RoomEventSend(room, event); + Log(LOG_INFO, "State event (%s,%s) coming this way", eventType, stateKey); + JsonFree(event); + Log(LOG_INFO, "And thats freed!"); + + if (!filled) + { + err = "User is not allowed to send state."; + HttpResponseStatus(args->context, HTTP_UNAUTHORIZED); + response = MatrixErrorCreate(M_FORBIDDEN, err); + goto finish; + } + + response = HashMapCreate(); + HashMapSet( + response, "event_id", + JsonValueDuplicate(HashMapGet(filled, "event_id")) + ); + JsonFree(filled); +finish: + RoomUnlock(room); + Free(serverName); + if (sender) + { + Free(sender); + } + UserIdFree(id); + UserUnlock(user); + JsonFree(request); + return response; +} diff --git a/src/Routes/RouteSync.c b/src/Routes/RouteSync.c index 2bf075a..e5df807 100644 --- a/src/Routes/RouteSync.c +++ b/src/Routes/RouteSync.c @@ -30,14 +30,33 @@ #include #include #include +#include #include +#include #include #include #include +#include -#include +static ClientEventWithoutRoomID +ClientfyEventSync(HashMap *pdu) +{ + ClientEventWithoutRoomID ret = { 0 }; + char *ignored; + ClientEventWithoutRoomIDFromJson(pdu, &ret, &ignored); + return ret; +} +static StrippedStateEvent +StripStateEventSync(HashMap *pdu) +{ + StrippedStateEvent ret = { 0 }; + char *ignored; + StrippedStateEventFromJson(pdu, &ret, &ignored); + + return ret; +} ROUTE_IMPL(RouteSync, path, argp) { @@ -46,18 +65,27 @@ ROUTE_IMPL(RouteSync, path, argp) HashMap *params = NULL; HashMap *response = NULL; + SyncResponse sync = { 0 }; - - HashMap *rooms = NULL; + Array *invites; + Array *joins; + size_t i; User *user = NULL; char *token = NULL; char *prevBatch = NULL; char *nextBatch = NULL; char *currBatch = NULL; + char *timeout = NULL; char *err; + int timeoutDuration; + + /* TODO: Respect `timeout', (and stop when something is + * pushed, maybe by 'polling' the database? sounds like + * a bad idea) */ + if (HttpRequestMethodGet(args->context) != HTTP_GET) { err = "Unknown request method."; @@ -79,84 +107,142 @@ ROUTE_IMPL(RouteSync, path, argp) response = MatrixErrorCreate(M_UNKNOWN_TOKEN, NULL); goto finish; } + params = HttpRequestParams(args->context); prevBatch = HashMapGet(params, "since"); + timeout = HashMapGet(params, "timeout"); + timeoutDuration = atoi(timeout); + + if (!prevBatch) { + prevBatch = NULL; nextBatch = UserInitSyncDiff(user); UserFillSyncDiff(user, nextBatch); } + else if (timeout && timeoutDuration) + { + char *name = StrDuplicate(UserGetName(user)); + int passed = 0; + + UserUnlock(user); + + /* TODO: Unlocking the user for other threads to notify us is not + * wise. + * Also, Telodendria will NOT reply to ^C while someone is executing + * a sync. */ + while (passed < timeoutDuration) + { + if (UserGetNotification(name)) + { + break; + } + UtilSleepMillis(100); + passed += 100; + } + Free(name); + user = UserAuthenticate(db, token); + } currBatch = prevBatch ? prevBatch : nextBatch; /* TODO: I only am manually parsing this because j2s does not support * a hashmap of unknown keys pointing to a known type. */ - response = HashMapCreate(); - rooms = HashMapCreate(); + sync.rooms.invite = HashMapCreate(); + sync.rooms.join = HashMapCreate(); /* invites */ + invites = UserGetInvites(user, currBatch); + for (i = 0; i < ArraySize(invites); i++) { - Array *invites; - size_t i; + char *roomId = ArrayGet(invites, i); + InvitedRooms invited = { 0 }; + HashMap *invitedObject; - invites = UserGetInvites(user, currBatch); - for (i = 0; i < ArraySize(invites); i++) - { - char *roomId = ArrayGet(invites, i); - JsonSet( - rooms, - JsonValueObject(HashMapCreate()), - 2, "invites", roomId - ); - } - UserFreeList(invites); + invited.invite_state.events = ArrayCreate(); + invitedObject = InvitedRoomsToJson(&invited); + JsonSet( + sync.rooms.invite, + JsonValueObject(invitedObject), + 1, roomId + ); + InvitedRoomsFree(&invited); } + UserFreeList(invites); /* Joins */ + joins = UserGetJoins(user, currBatch); + for (i = 0; i < ArraySize(joins); i++) { - Array *joins; - size_t i; + char *roomId = ArrayGet(joins, i); + Array *el = UserGetEvents(user, currBatch, roomId); + size_t j; + Room *r = RoomLock(db, roomId); + HashMap *state = StateCurrent(r); + char *firstEvent = NULL; + JoinedRooms joined = { 0 }; + HashMap *joinedObj; + char *type, *key, *id; - joins = UserGetJoins(user, currBatch); - for (i = 0; i < ArraySize(joins); i++) + joined.timeline.events = ArrayCreate(); + + for (j = 0; j < ArraySize(el); j++) { - char *roomId = ArrayGet(joins, i); - HashMap *room = HashMapCreate(); - Array *el = UserGetEvents(user, currBatch, roomId); - size_t j; - Room *r = RoomLock(db, roomId); - Array *timeline = ArrayCreate(); + char *event = ArrayGet(el, j); + HashMap *e = RoomEventFetch(r, event); + ClientEventWithoutRoomID rc = ClientfyEventSync(e); + ClientEventWithoutRoomID *c = Malloc(sizeof(*c)); + memcpy(c, &rc, sizeof(*c)); - /* TODO: Add history. */ - for (j = 0; j < ArraySize(el); j++) + JsonFree(e); + + if (j == 0) { - char *event = ArrayGet(el, j); - HashMap *e = RoomEventFetch(r, event); - HashMap *c = RoomEventClientify(e); - - JsonFree(e); - JsonValueFree(HashMapDelete(c, "room_id")); - - ArrayAdd(timeline, JsonValueObject(c)); + firstEvent = c->event_id; } - RoomUnlock(r); - UserFreeList(el); - JsonSet(room, JsonValueArray(timeline), 1, "timeline"); - JsonSet(rooms, JsonValueObject(room), 2, "join", roomId); + ArrayAdd(joined.timeline.events, c); } - UserFreeList(joins); - } + joined.timeline.prev_batch = UserNewMessageToken( + user, roomId, firstEvent + ); + /* TODO: Don't shove the entire state. + * That's a recipe for disaster, especially on large rooms. */ + joined.state.events = ArrayCreate(); + while (StateIterate(state, &type, &key, (void **) &id)) + { + HashMap *e = RoomEventFetch(r, id); + StrippedStateEvent rs = StripStateEventSync(e); + StrippedStateEvent *s = Malloc(sizeof(*s)); + memcpy(s, &rs, sizeof(*s)); + + JsonFree(e); + + ArrayAdd(joined.state.events, s); + Free(type); + Free(key); + } + + StateFree(state); + RoomUnlock(r); + UserFreeList(el); + + joinedObj = JoinedRoomsToJson(&joined); + HashMapSet(sync.rooms.join, roomId, JsonValueObject(joinedObj)); + JoinedRoomsFree(&joined); + } + UserFreeList(joins); - (void) path; if (prevBatch) { UserDropSync(user, prevBatch); nextBatch = UserInitSyncDiff(user); - } - HashMapSet(response, "next_batch", JsonValueString(nextBatch)); - Free(nextBatch); - HashMapSet(response, "rooms", JsonValueObject(rooms)); + UserFillSyncDiff(user, nextBatch); + } + sync.next_batch = nextBatch; + response = SyncResponseToJson(&sync); + SyncResponseFree(&sync); finish: UserUnlock(user); + (void) path; return response; } diff --git a/src/User.c b/src/User.c index 663b2ea..bf4c763 100644 --- a/src/User.c +++ b/src/User.c @@ -32,8 +32,10 @@ #include #include +#include #include +#include struct User { @@ -44,6 +46,9 @@ struct User char *deviceId; }; +static pthread_mutex_t pushLock; +static HashMap *pushTable = NULL; + bool UserValidate(char *localpart, char *domain) { @@ -110,7 +115,7 @@ UserLock(Db * db, char *name) User *user = NULL; DbRef *ref = NULL; - if (!UserExists(db, name)) + if (!name || !UserExists(db, name)) { return NULL; } @@ -174,16 +179,22 @@ bool UserUnlock(User * user) { bool ret; + Db *db; + DbRef *ref; if (!user) { return false; } + db = user->db; + ref = user->ref; Free(user->name); Free(user->deviceId); - ret = DbUnlock(user->db, user->ref); + ret = DbUnlock(db, ref); + user->db = NULL; + user->ref = NULL; Free(user); return ret; @@ -1057,7 +1068,11 @@ UserAddJoin(User *user, char *roomId) data = DbJson(user->ref); joins = JsonValueAsObject(HashMapGet(data, "joins")); - JsonFree(HashMapSet(joins, roomId, JsonValueNull())); + if (!HashMapGet(joins, roomId)) + { + JsonFree(HashMapSet(joins, roomId, JsonValueNull())); + } + UserNotifyUser(user); } void @@ -1074,6 +1089,7 @@ UserRemoveJoin(User *user, char *roomId) joins = JsonValueAsObject(HashMapGet(data, "joins")); JsonFree(HashMapDelete(joins, roomId)); + UserNotifyUser(user); } Array * UserListJoins(User *user) @@ -1142,6 +1158,8 @@ UserInitSyncDiff(User *user) HashMapSet(data, "leaves", JsonValueArray(ArrayCreate())); HashMapSet(data, "joins", JsonValueObject(HashMapCreate())); + HashMapSet(data, "creation", JsonValueInteger(UtilTsMillis())); + DbUnlock(user->db, syncRef); return nextBatch; @@ -1173,6 +1191,7 @@ UserPushInviteSync(User *user, char *roomId) DbUnlock(user->db, syncRef); } DbListFree(entries); + UserNotifyUser(user); } void UserPushJoinSync(User *user, char *roomId) @@ -1212,6 +1231,7 @@ UserPushJoinSync(User *user, char *roomId) DbUnlock(user->db, syncRef); } DbListFree(entries); + UserNotifyUser(user); } void @@ -1251,6 +1271,8 @@ UserPushEvent(User *user, HashMap *event) DbUnlock(user->db, syncRef); } DbListFree(entries); + + UserNotifyUser(user); } void UserDropSync(User *user, char *batch) @@ -1313,9 +1335,21 @@ UserFillSyncDiff(User *user, char *batch) HashMap *joinEntry; HashMap *data = DbJson(syncRef); HashMap *join = JsonValueAsObject(HashMapGet(data, "joins")); + Array *timeline = ArrayCreate(); + Room *r = RoomLock(user->db, roomId); + Array *prevs = RoomPrevEventsGet(r); + size_t j; + + for (j = 0; j < ArraySize(prevs); j++) + { + HashMap *e = JsonValueAsObject(ArrayGet(prevs, j)); + /* TODO: Backfill the user a 'lil more. */ + ArrayAdd(timeline, JsonValueDuplicate(HashMapGet(e, "event_id"))); + } + RoomUnlock(r); joinEntry = HashMapCreate(); - HashMapSet(joinEntry, "timeline", JsonValueArray(ArrayCreate())); + HashMapSet(joinEntry, "timeline", JsonValueArray(timeline)); if (!HashMapGet(join, roomId)) { @@ -1332,6 +1366,7 @@ UserFillSyncDiff(User *user, char *batch) Array * UserGetJoins(User *user, char *batch) { + Db *db; DbRef *syncRef; HashMap *data; Array *keys; @@ -1341,7 +1376,8 @@ UserGetJoins(User *user, char *batch) return NULL; } - syncRef = DbLock(user->db, 4, "users", user->name, "sync", batch); + db = user->db; + syncRef = DbLock(db, 4, "users", user->name, "sync", batch); if (!syncRef) { return NULL; @@ -1356,8 +1392,7 @@ UserGetJoins(User *user, char *batch) ArraySet(keys, i, StrDuplicate(str)); } - - DbUnlock(user->db, syncRef); + DbUnlock(db, syncRef); return keys; } Array * @@ -1395,3 +1430,267 @@ UserGetEvents(User *user, char *batch, char *roomId) DbUnlock(user->db, syncRef); return ret; } + + +char * +UserNewMessageToken(User *user, char *room, char *event) +{ + DbRef *messageRef; + HashMap *json; + char *messageToken; + if (!user || !room || !event) + { + return NULL; + } + + messageToken = StrRandom(16); + messageRef = DbCreate(user->db, + 4, "users", user->name, "msg", messageToken + ); + json = DbJson(messageRef); + + HashMapSet(json, "room", JsonValueString(room)); + HashMapSet(json, "from", JsonValueString(event)); + + DbUnlock(user->db, messageRef); + return messageToken; +} + +Array * +UserFetchMessages(User *user, int n, char *token, char **next) +{ + Array *messages = NULL; + Array *nexts = NULL; + DbRef *messageRef; + HashMap *json; + Room *room; + char *roomId; + size_t i; + + bool limited = false; + bool dir = false; + + if (!user || !token || n == 0) + { + return NULL; + } + + if (n < 0) + { + n = -n; + dir = true; + } + + messageRef = DbLock(user->db, + 4, "users", user->name, "msg", token + ); + json = DbJson(messageRef); + if (!messageRef) + { + /* Regenerate a new one */ + return NULL; + } + + roomId = JsonValueAsString(HashMapGet(json, "room")); + room = RoomLock(user->db, roomId); + + /* TODO (very important): CHECK IF THE USER IS ABLE TO SEE + * HISTORY. THROUGHOUT THE STEPS HERE. */ + if (!room) + { + DbUnlock(user->db, messageRef); + return NULL; + } + + nexts = ArrayCreate(); + messages = ArrayCreate(); + + /* A stack of elements to deal with the DAG. */ + ArrayAdd(nexts, + StrDuplicate(JsonValueAsString(HashMapGet(json, "from"))) + ); + for (i = 0; i < (size_t) n && ArraySize(nexts); i++) + { + char *curr = ArrayDelete(nexts, ArraySize(nexts) - 1); + HashMap *event = RoomEventFetch(room, curr); + Array *prevEvents; + size_t j; + + Free(curr); + + /* Push event into our message list. */ + ArrayAdd(messages, event); + + prevEvents = JsonValueAsArray(HashMapGet(event, "prev_events")); + if (dir) + { + HashMap *unsign = JsonValueAsObject(HashMapGet(event, "unsigned")); + + /* prevEvents is now nextEvents */ + prevEvents = JsonValueAsArray(HashMapGet(unsign, "next_events")); + } + + for (j = 0; j < ArraySize(prevEvents); j++) + { + char *prevEvent = JsonValueAsString(ArrayGet(prevEvents, j)); + + ArrayAdd(nexts, StrDuplicate(prevEvent)); + } + + if (ArraySize(prevEvents) == 0) + { + limited = true; + } + } + for (i = 0; i < ArraySize(nexts); i++) + { + Free(ArrayGet(nexts, i)); + } + ArrayFree(nexts); + RoomUnlock(room); + + if (next && !limited) + { + HashMap *lastMessage = ArrayGet(messages, ArraySize(messages) - 1); + char *eId = JsonValueAsString(HashMapGet(lastMessage, "event_id")); + + *next = UserNewMessageToken(user, roomId, eId); + } + + DbUnlock(user->db, messageRef); + for (i = 0; i < ArraySize(messages); i++) + { + HashMap *e = ArrayGet(messages, i); + ArraySet(messages, i, JsonValueObject(e)); + } + return messages; +} + +void +UserFreeMessageToken(User *user, char *token) +{ + if (!user || !token) + { + return; + } + + DbDelete(user->db, + 4, "users", user->name, "msg", token + ); +} +void +UserCleanTemporaryData(User *user) +{ + Array *list; + size_t i; + + if (!user) + { + return; + } + + list = DbList(user->db, 3, "users", user->name, "msg"); + for (i = 0; i < ArraySize(list); i++) + { + char *token = ArrayGet(list, i); + UserFreeMessageToken(user, token); + } + DbListFree(list); + + list = DbList(user->db, 3, "users", user->name, "sync"); + for (i = 0; i < ArraySize(list); i++) + { + char *token = ArrayGet(list, i); + if (UserIsSyncOld(user, token)) + { + UserDropSync(user, token); + } + } + DbListFree(list); +} +bool +UserIsSyncOld(User *user, char *token) +{ + DbRef *ref; + HashMap *map; + int64_t dt; + + if (!user || !token) + { + return false; + } + ref = DbLock(user->db, 4, "users", user->name, "sync", token); + map = DbJson(ref); + + dt = UtilTsMillis() - JsonValueAsInteger(HashMapGet(map, "creation")); + + DbUnlock(user->db, ref); + return dt > (5 * 60 * 1000); /* 5-minutes of timeout. */ +} +bool +UserSyncExists(User *user, char *sync) +{ + if (!user || !sync) + { + return false; + } + return DbLock(user->db, 4, "users", user->name, "sync", sync); +} + + +extern void +UserInitialisePushTable(void) +{ + if (pushTable) + { + return; + } + pthread_mutex_init(&pushLock, NULL); + + pthread_mutex_lock(&pushLock); + pushTable = HashMapCreate(); + pthread_mutex_unlock(&pushLock); +} + +void +UserNotifyUser(User *user) +{ + if (!user || !pushTable) + { + return; + } + pthread_mutex_lock(&pushLock); + HashMapSet(pushTable, user->name, user->name); + pthread_mutex_unlock(&pushLock); +} + +bool +UserGetNotification(char *user) +{ + bool ret; + if (!user || !pushTable) + { + return false; + } + pthread_mutex_lock(&pushLock); + ret = !!HashMapGet(pushTable, user); + HashMapDelete(pushTable, user); + pthread_mutex_unlock(&pushLock); + + return ret; +} + +void +UserDestroyPushTable(void) +{ + if (!pushTable) + { + return; + } + + pthread_mutex_lock(&pushLock); + HashMapFree(pushTable); + pushTable = NULL; + pthread_mutex_unlock(&pushLock); + pthread_mutex_destroy(&pushLock); +} diff --git a/src/include/Routes.h b/src/include/Routes.h index 5f4472b..e149bbe 100644 --- a/src/include/Routes.h +++ b/src/include/Routes.h @@ -100,12 +100,14 @@ ROUTE(RouteConfig); ROUTE(RoutePrivileges); ROUTE(RouteCreateRoom); -ROUTE(RouteFetchEvent); ROUTE(RouteSendEvent); +ROUTE(RouteSendState); ROUTE(RouteJoinRoom); -ROUTE(RouteJoinedRooms); ROUTE(RouteJoinRoomAlias); +ROUTE(RouteFetchEvent); +ROUTE(RouteJoinedRooms); ROUTE(RouteSync); +ROUTE(RouteMessages); ROUTE(RouteAliasDirectory); ROUTE(RouteRoomAliases); @@ -113,7 +115,6 @@ ROUTE(RouteRoomAliases); ROUTE(RouteAdminDeactivate); ROUTE(RouteAdminTokens); - #undef ROUTE #endif diff --git a/src/include/User.h b/src/include/User.h index d080d11..51d36c5 100644 --- a/src/include/User.h +++ b/src/include/User.h @@ -91,6 +91,7 @@ typedef struct UserLoginInfo char *refreshToken; } UserLoginInfo; + /** * Take a localpart and domain as separate parameters and validate them * against the rules of the Matrix specification. The reasion the @@ -415,4 +416,69 @@ extern Array * UserGetEvents(User *, char *, char *); * exists. */ extern void UserDropSync(User *, char *); + +/** + * Verifies if the 'sync' data is considered old, and can be + * reasonably removed from the database. + */ +extern bool UserIsSyncOld(User *, char *); + +/** + * Verifies if a sync diff exists. + */ +extern bool UserSyncExists(User *, char *); + +/** + * Creates a new "message token" start from a room and event ID, + * given a specified direction(with false<=>'b' and true<=>'f') + * to be used by /messages. + */ +extern char * UserNewMessageToken(User *, char *, char *); + +/** + * Grabs events from a message token(with the correct direction) + * into an array of HashMaps(which store PDU objects). + * If the string pointer is not NULL, this function sets its + * value to a new message token(which is stored on the heap), + * if and only if, there are any new events to retrieve. + * + * If the counter is negative, then it is treated as a 'f', + * otherwise, it is treated as 'b'. + */ +extern Array * UserFetchMessages(User *, int, char *, char **); + +/** + * Deletes a "message token". + */ +extern void UserFreeMessageToken(User *, char *); + + +/** + * Deletes temporary data about a specific user. + */ +extern void UserCleanTemporaryData(User *); + +/** + * Initialise a temporary sync table to signal any /sync changes. + * This should be paired with a call to + * .Fn UserDestroyPushTable . + */ +extern void UserInitialisePushTable(void); + +/** + * Notifies the user of a sync update. + */ +extern void UserNotifyUser(User *); + +/** + * Verifies if the user has been notified, and if it is, then + * removes the notification + */ +extern bool UserGetNotification(char *); + +/** + * Destroys the temporary push table created by + * .Fn UserInitialisePushTable . + */ +extern void UserDestroyPushTable(void); #endif /* TELODENDRIA_USER_H */