Compare commits

...

2 commits

Author SHA1 Message Date
LDA
e2d26c7f33 [ADD/WIP] Start /messages, sync timeouts, cleaning
Some checks failed
Compile Telodendria / Compile Telodendria (x86, alpine-v3.19) (push) Has been cancelled
Compile Telodendria / Compile Telodendria (x86, debian-v12.4) (push) Has been cancelled
Compile Telodendria / Compile Telodendria (x86, freebsd-v14.0) (push) Has been cancelled
Compile Telodendria / Compile Telodendria (x86, netbsd-v9.3) (push) Has been cancelled
Compile Telodendria / Compile Telodendria (x86_64, alpine-v3.19) (push) Has been cancelled
Compile Telodendria / Compile Telodendria (x86_64, debian-v12.4) (push) Has been cancelled
Compile Telodendria / Compile Telodendria (x86_64, freebsd-v14.0) (push) Has been cancelled
Compile Telodendria / Compile Telodendria (x86_64, netbsd-v9.3) (push) Has been cancelled
Compile Telodendria / Compile Telodendria (x86_64, openbsd-v7.4) (push) Has been cancelled
This code _really_ isn't good. I've spotted some fun racing happen and
leak of some JSON(somehow?) without timeout, and I've been trying to
spend the last several days trying to fix that.

The good news is that, with some endpoint spoofing, you can actually
_try_ this with a Matrix client(dev.cinny.in for example).

I might consider rewriting this pastacode, considering the issues noted
above in favour of a better design, with less odds of this.
2024-06-14 23:30:35 +02:00
LDA
97b1e4d723 [ADD/WIP] Push events into sync
The code is still ugly(still waiting for j2s to support some things
before I can actually get *somewhere* with the schemas, which is my main
complaint), and it's definitely not spec-compliant, but it now means you
can actually see diffs in rooms you joined from the timeline!
2024-06-09 11:46:44 +02:00
15 changed files with 1100 additions and 75 deletions

View file

@ -16,6 +16,10 @@
"fields": {
"age": {
"type": "integer"
},
"next_events": {
"type": "[string]",
"required": false
}
}
},

View file

@ -21,13 +21,19 @@
},
"type": "struct"
},
"StrippedStateEvent": {
"InviteState": {
"fields": {
"events": { "type": "[Event]" }
"events": { "type": "[StrippedStateEvent]" }
},
"type": "struct"
},
"InviteState": {
"States": {
"fields": {
"events": { "type": "[StrippedStateEvent]" }
},
"type": "struct"
},
"StrippedStateEvent": {
"fields": {
"content": {
"type": "object",
@ -75,14 +81,15 @@
},
"JoinedRooms": {
"fields": {
"timeline": { "type": "Timeline" }
"timeline": { "type": "Timeline" },
"state": { "type": "States" }
},
"type": "struct"
},
"Rooms": {
"fields": {
"invite": { "type": "InvitedRooms" },
"join": { "type": "JoinedRooms" }
"invite": { "type": "object" },
"join": { "type": "object" }
},
"type": "struct"
},

View file

@ -86,6 +86,26 @@ SignalHandler(int signal)
}
}
/* TODO: Move this! */
static void
UsersClean(MatrixHttpHandlerArgs *args)
{
Db *db = args->db;
Array *arr = DbList(db, 1, "users");
size_t i;
for (i = 0; i < ArraySize(arr); i++)
{
char *id = ArrayGet(arr, i);
User *user = UserLock(db, id);
UserCleanTemporaryData(user);
UserUnlock(user);
}
DbListFree(arr);
}
typedef enum ArgFlag
{
ARG_VERSION = (1 << 0),
@ -147,6 +167,7 @@ start:
token = NULL;
memset(&matrixArgs, 0, sizeof(matrixArgs));
UserInitialisePushTable();
if (!LogConfigGlobal())
{
@ -513,10 +534,15 @@ start:
Log(LOG_DEBUG, "Registering jobs...");
CronEvery(cron, 30 * 60 * 1000, (JobFunc *) UiaCleanup, &matrixArgs);
CronEvery(cron, 5 * 60 * 1000, (JobFunc *) UsersClean, &matrixArgs);
Log(LOG_NOTICE, "Starting job scheduler...");
CronStart(cron);
/* We still call it anyways to be sure. */
Log(LOG_NOTICE, "Cleaning up user data...");
UsersClean(&matrixArgs);
Log(LOG_NOTICE, "Building routing tree...");
matrixArgs.router = RouterBuild();
if (!matrixArgs.router)
@ -583,6 +609,7 @@ start:
finish:
Log(LOG_NOTICE, "Shutting down...");
UserDestroyPushTable();
if (httpServers)
{
for (i = 0; i < ArraySize(httpServers); i++)

View file

@ -52,6 +52,7 @@
#define IsState(p, typ, key) (StrEquals(p->type, typ) && \
StrEquals(p->state_key, key))
struct Room
{
Db *db;
@ -1091,22 +1092,25 @@ AuthorizeJoinMembershipV1(Room * room, PduV1 pdu, HashMap *state)
{
/* Interperet prev properly, as a list of JsonObjects. */
char *prev_id = JsonValueAsString(ArrayGet(prev, 0));
char *ignored;
HashMap *prev_event = RoomEventFetch(room, prev_id);
PduV1 prev_pdu;
bool flag = false;
if (prev && PduV1FromJson(prev_event, &prev_pdu, &ignored))
if (prev_event)
{
if (StrEquals(prev_pdu.type, "m.room.create") &&
StrEquals(prev_pdu.sender, pdu.state_key))
char *type = JsonValueAsString(HashMapGet(prev_event, "type"));
char *sender = JsonValueAsString(HashMapGet(prev_event, "sender"));
if (StrEquals(type, "m.room.create") &&
StrEquals(sender, pdu.state_key))
{
PduV1Free(&prev_pdu);
JsonFree(prev_event);
return true;
flag = true;
}
PduV1Free(&prev_pdu);
}
JsonFree(prev_event);
if (flag)
{
return true;
}
}
/* Step 5.2.2: If the sender does not match state_key, reject. */
@ -1633,6 +1637,9 @@ RoomEventSendV1(Room * room, HashMap * event)
* errorr status to the user. */
goto finish;
}
StateFree(state);
state = NULL;
RoomAddEventV1(room, pdu);
valid = true;
@ -1758,6 +1765,7 @@ RoomAddEventV1(Room *room, PduV1 pdu)
JsonValue *leaves_val;
char *safe_id;
size_t i;
if (!room || room->version >= 3)
{
return false;
@ -1812,11 +1820,39 @@ RoomAddEventV1(Room *room, PduV1 pdu)
DbUnlock(room->db, event_ref);
/* TODO: Store DAG relationships, somehow. */
pdu_json = PduV1ToJson(&pdu);
for (i = 0; i < ArraySize(prev_events); i++)
{
CommonID *sid= UserIdParse(pdu.sender, NULL);
User *suser = UserLock(room->db, sid->local);
JsonValue *event_val = ArrayGet(prev_events, i);
char *id = JsonValueAsString(event_val);
char *error = NULL;
PduV1 prev_pdu = { 0 };
HashMap *prev_object = NULL;
Array *next_events = NULL;
event_ref = DbLock(room->db, 4, "rooms", room->id, "events", id);
PduV1FromJson(DbJson(event_ref), &prev_pdu, &error);
if (!prev_pdu._unsigned.next_events)
{
prev_pdu._unsigned.next_events = ArrayCreate();
}
next_events = prev_pdu._unsigned.next_events;
ArrayAdd(next_events, StrDuplicate(pdu.event_id));
prev_object = PduV1ToJson(&prev_pdu);
DbJsonSet(event_ref, prev_object);
JsonFree(prev_object);
PduV1Free(&prev_pdu);
DbUnlock(room->db, event_ref);
}
{
HashMap *state;
char *type, *state_key, *event_id;
pdu_json = PduV1ToJson(&pdu);
if (StrEquals(pdu.type, "m.room.member"))
{
CommonID *id = UserIdParse(pdu.state_key, NULL);
@ -1845,12 +1881,26 @@ RoomAddEventV1(Room *room, PduV1 pdu)
UserIdFree(id);
UserUnlock(user);
}
state = StateCurrent(room);
while (StateIterate(state, &type, &state_key, (void **) &event_id))
{
if (StrEquals(type, "m.room.member"))
{
CommonID *id = UserIdParse(state_key, NULL);
User *user = UserLock(room->db, id->local);
UserPushEvent(suser, pdu_json);
UserIdFree(sid);
UserUnlock(suser);
UserPushEvent(user, pdu_json);
UserIdFree(id);
UserUnlock(user);
}
Free(type);
Free(state_key);
}
StateFree(state);
JsonFree(pdu_json);
}
JsonFree(pdu_json);
return true;
}

View file

@ -77,8 +77,10 @@ RouterBuild(void)
R("/_matrix/client/v3/user/(.*)/filter/(.*)", RouteFilter);
R("/_matrix/client/v3/rooms/(.*)/send/(.*)/(.*)", RouteSendEvent);
R("/_matrix/client/v3/rooms/(.*)/state/(.*)/(.*)", RouteSendState);
R("/_matrix/client/v3/rooms/(.*)/event/(.*)", RouteFetchEvent);
R("/_matrix/client/v3/rooms/(.*)/join", RouteJoinRoom);
R("/_matrix/client/v3/rooms/(.*)/messages", RouteMessages);
R("/_matrix/client/v3/join/(.*)", RouteJoinRoomAlias);
@ -91,6 +93,7 @@ RouterBuild(void)
R("/_matrix/client/v3/directory/room/(.*)", RouteAliasDirectory);
R("/_matrix/client/v3/rooms/(.*)/aliases", RouteRoomAliases);
/* Telodendria Admin API Routes */
R("/_telodendria/admin/v1/(restart|shutdown|stats)", RouteProcControl);

View file

@ -104,8 +104,6 @@ ROUTE_IMPL(RouteCreateRoom, path, argp)
JsonFree(request);
request = NULL;
Log(LOG_INFO, "Creating room...");
if (!(room = RoomCreate(db, user, &parsed, server)))
{
err = "Couldn't create room.";

View file

@ -189,7 +189,7 @@ ROUTE_IMPL(RouteFilter, path, argp)
DbJsonSet(ref, filterJson);
DbUnlock(db, ref);
Free(filterJson);
JsonFree(filterJson);
response = HashMapCreate();
HashMapSet(response, "filter_id", JsonValueString(filterId));

View file

@ -84,6 +84,7 @@ ROUTE_IMPL(RouteJoinRoomAlias, path, argp)
if (*roomId != '!')
{
roomId = RoomResolveAlias(db, roomId);
Log(LOG_NOTICE, "Here's my guess: %s", roomId);
}
else
{
@ -126,6 +127,7 @@ ROUTE_IMPL(RouteJoinRoomAlias, path, argp)
goto finish;
}
Log(LOG_INFO, "Trying with token %s", token);
user = UserAuthenticate(db, token);
if (!user)
{
@ -136,11 +138,11 @@ ROUTE_IMPL(RouteJoinRoomAlias, path, argp)
id = UserIdParse(UserGetName(user), serverName);
id->sigil = '@';
sender = ParserRecomposeCommonID(*id);
Log(LOG_INFO, "Now as %s", sender);
room = RoomLock(db, roomId);
if (!room)
{
err = "Room ID does not exist.";
HttpResponseStatus(args->context, HTTP_BAD_REQUEST);
response = MatrixErrorCreate(M_UNKNOWN, err);
@ -149,6 +151,8 @@ ROUTE_IMPL(RouteJoinRoomAlias, path, argp)
if (RoomContainsUser(room, sender))
{
err = "User is already in the room.";
Log(LOG_INFO, "qhar for %s", sender);
HttpResponseStatus(args->context, HTTP_UNAUTHORIZED);
response = MatrixErrorCreate(M_FORBIDDEN, err);
goto finish;

178
src/Routes/RouteMessages.c Normal file
View file

@ -0,0 +1,178 @@
/*
* Copyright (C) 2022-2024 Jordan Bancino <@jordan:bancino.net> with
* other valuable contributors. See CONTRIBUTORS.txt for the full list.
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation files
* (the "Software"), to deal in the Software without restriction,
* including without limitation the rights to use, copy, modify, merge,
* publish, distribute, sublicense, and/or sell copies of the Software,
* and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#include <Routes.h>
#include <Cytoplasm/HashMap.h>
#include <Cytoplasm/Memory.h>
#include <Cytoplasm/Json.h>
#include <Cytoplasm/Str.h>
#include <User.h>
#include <Room.h>
#include <string.h>
#include <stdlib.h>
#include <Schema/Filter.h>
static char *
GetServerName(Db * db)
{
char *name;
Config config;
ConfigLock(db, &config);
if (!config.ok)
{
return NULL;
}
name = StrDuplicate(config.serverName);
ConfigUnlock(&config);
return name;
}
ROUTE_IMPL(RouteMessages, path, argp)
{
RouteArgs *args = argp;
Db *db = args->matrixArgs->db;
HashMap *params = HttpRequestParams(args->context);
HashMap *response = NULL;
User *user = NULL;
char *token = NULL;
char *roomId = ArrayGet(path, 0);
char *from = HashMapGet(params, "from");
char *limitStr = HashMapGet(params, "limit");
char *end = NULL;
char *sender = NULL;
char *serverName = NULL;
CommonID *id = NULL;
int limit = strtol(limitStr, NULL, 10);
Array *messages = NULL;
Room *room = NULL;
char *err;
if (!limit || limit < 0)
{
limit = 10;
}
if (!roomId)
{
/* Should be impossible */
HttpResponseStatus(args->context, HTTP_INTERNAL_SERVER_ERROR);
return MatrixErrorCreate(M_UNKNOWN, NULL);
}
if (HttpRequestMethodGet(args->context) != HTTP_GET)
{
err = "Unknown request method.";
HttpResponseStatus(args->context, HTTP_BAD_REQUEST);
response = MatrixErrorCreate(M_UNRECOGNIZED, err);
goto finish;
}
serverName = GetServerName(db);
if (!serverName)
{
HttpResponseStatus(args->context, HTTP_INTERNAL_SERVER_ERROR);
response = MatrixErrorCreate(M_UNKNOWN, NULL);
goto finish;
}
response = MatrixGetAccessToken(args->context, &token);
if (response)
{
goto finish;
}
user = UserAuthenticate(db, token);
if (!user)
{
HttpResponseStatus(args->context, HTTP_UNAUTHORIZED);
response = MatrixErrorCreate(M_UNKNOWN_TOKEN, NULL);
goto finish;
}
id = UserIdParse(UserGetName(user), serverName);
id->sigil = '@';
sender = ParserRecomposeCommonID(*id);
room = RoomLock(db, roomId);
if (!RoomContainsUser(room, sender))
{
err = "User is not in the room.";
HttpResponseStatus(args->context, HTTP_UNAUTHORIZED);
response = MatrixErrorCreate(M_FORBIDDEN, err);
goto finish;
}
RoomUnlock(room);
/* TODO: Filters, to, and friends */
messages = UserFetchMessages(user, limit, from, &end);
if (!messages)
{
Room *r = RoomLock(db, roomId);
Array *leaf = RoomPrevEventsGet(r);
HashMap *start = JsonValueAsObject(ArrayGet(leaf, 0));
char *startId = StrDuplicate(
JsonValueAsString(HashMapGet(start, "event_id"))
);
char *token = UserNewMessageToken(user, roomId, startId);
RoomUnlock(r);
messages = UserFetchMessages(user, limit, token, &end);
Free(token);
Free(startId);
}
response = HashMapCreate();
JsonSet(response, JsonValueArray(messages), 1, "chunk");
if (end)
{
JsonSet(response, JsonValueString(end), 1, "end");
Free(end);
}
UserFreeMessageToken(user, from);
finish:
UserUnlock(user);
UserIdFree(id);
if (sender)
{
Free(sender);
}
if (serverName)
{
Free(serverName);
}
return response;
}

129
src/Routes/RoutePushRules.c Normal file
View file

@ -0,0 +1,129 @@
/*
* Copyright (C) 2022-2024 Jordan Bancino <@jordan:bancino.net> with
* other valuable contributors. See CONTRIBUTORS.txt for the full list.
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation files
* (the "Software"), to deal in the Software without restriction,
* including without limitation the rights to use, copy, modify, merge,
* publish, distribute, sublicense, and/or sell copies of the Software,
* and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#include <Cytoplasm/HttpServer.h>
#include <Routes.h>
#include <Cytoplasm/HashMap.h>
#include <Cytoplasm/Memory.h>
#include <Cytoplasm/Json.h>
#include <Cytoplasm/Str.h>
#include <User.h>
#include <Room.h>
#include <string.h>
#include <Schema/Filter.h>
ROUTE_IMPL(RoutePushRules, path, argp)
{
RouteArgs *args = argp;
Db *db = args->matrixArgs->db;
HashMap *response = NULL;
User *user = NULL;
char *token = NULL;
char *err;
if (HttpRequestMethodGet(args->context) != HTTP_GET)
{
err = "Unknown request method.";
HttpResponseStatus(args->context, HTTP_BAD_REQUEST);
response = MatrixErrorCreate(M_UNRECOGNIZED, err);
goto finish;
}
response = MatrixGetAccessToken(args->context, &token);
if (response)
{
goto finish;
}
user = UserAuthenticate(db, token);
if (!user)
{
HttpResponseStatus(args->context, HTTP_UNAUTHORIZED);
response = MatrixErrorCreate(M_UNKNOWN_TOKEN, NULL);
goto finish;
}
response = HashMapCreate();
JsonSet(response, JsonValueObject(HashMapCreate()), 1, "global");
(void) path;
finish:
UserUnlock(user);
return response;
}
ROUTE_IMPL(RouteKeyQuery, path, argp)
{
RouteArgs *args = argp;
Db *db = args->matrixArgs->db;
HashMap *request = NULL;
HashMap *response = NULL;
User *user = NULL;
char *token = NULL;
char *err;
if (HttpRequestMethodGet(args->context) != HTTP_POST)
{
err = "Unknown request method.";
HttpResponseStatus(args->context, HTTP_BAD_REQUEST);
response = MatrixErrorCreate(M_UNRECOGNIZED, err);
goto finish;
}
response = MatrixGetAccessToken(args->context, &token);
if (response)
{
goto finish;
}
user = UserAuthenticate(db, token);
if (!user)
{
HttpResponseStatus(args->context, HTTP_UNAUTHORIZED);
response = MatrixErrorCreate(M_UNKNOWN_TOKEN, NULL);
goto finish;
}
request = JsonDecode(HttpServerStream(args->context));
if (!request)
{
HttpResponseStatus(args->context, HTTP_BAD_REQUEST);
response = MatrixErrorCreate(M_NOT_JSON, NULL);
goto finish;
}
response = HashMapCreate();
(void) path;
finish:
JsonFree(request);
UserUnlock(user);
return response;
}

View file

@ -171,3 +171,120 @@ finish:
JsonFree(request);
return response;
}
ROUTE_IMPL(RouteSendState, path, argp)
{
RouteArgs *args = argp;
Db *db = args->matrixArgs->db;
HashMap *request = NULL, *event = NULL;
HashMap *response = NULL, *filled = NULL;
User *user = NULL;
CommonID *id = NULL;
char *token = NULL;
char *serverName = NULL;
char *roomId = ArrayGet(path, 0);
char *eventType = ArrayGet(path, 1);
char *stateKey = ArrayGet(path, 2);
char *sender = NULL;
Room *room = NULL;
char *err;
if (!roomId || !eventType)
{
/* Should be impossible */
HttpResponseStatus(args->context, HTTP_INTERNAL_SERVER_ERROR);
return MatrixErrorCreate(M_UNKNOWN, NULL);
}
if (HttpRequestMethodGet(args->context) != HTTP_PUT)
{
err = "Unknown request method.";
HttpResponseStatus(args->context, HTTP_BAD_REQUEST);
response = MatrixErrorCreate(M_UNRECOGNIZED, err);
goto finish;
}
serverName = GetServerName(db);
if (!serverName)
{
HttpResponseStatus(args->context, HTTP_INTERNAL_SERVER_ERROR);
response = MatrixErrorCreate(M_UNKNOWN, NULL);
goto finish;
}
response = MatrixGetAccessToken(args->context, &token);
if (response)
{
goto finish;
}
request = JsonDecode(HttpServerStream(args->context));
if (!request)
{
HttpResponseStatus(args->context, HTTP_BAD_REQUEST);
response = MatrixErrorCreate(M_NOT_JSON, NULL);
goto finish;
}
user = UserAuthenticate(db, token);
if (!user)
{
HttpResponseStatus(args->context, HTTP_UNAUTHORIZED);
response = MatrixErrorCreate(M_UNKNOWN_TOKEN, NULL);
goto finish;
}
id = UserIdParse(UserGetName(user), serverName);
id->sigil = '@';
sender = ParserRecomposeCommonID(*id);
room = RoomLock(db, roomId);
if (!RoomContainsUser(room, sender))
{
err = "User is not in the room.";
HttpResponseStatus(args->context, HTTP_UNAUTHORIZED);
response = MatrixErrorCreate(M_FORBIDDEN, err);
goto finish;
}
Log(LOG_INFO, "State event (%s,%s) coming this way", eventType, stateKey);
event = RoomEventCreate(
sender,
eventType, stateKey ? stateKey : "",
JsonDuplicate(request)
);
filled = RoomEventSend(room, event);
Log(LOG_INFO, "State event (%s,%s) coming this way", eventType, stateKey);
JsonFree(event);
Log(LOG_INFO, "And thats freed!");
if (!filled)
{
err = "User is not allowed to send state.";
HttpResponseStatus(args->context, HTTP_UNAUTHORIZED);
response = MatrixErrorCreate(M_FORBIDDEN, err);
goto finish;
}
response = HashMapCreate();
HashMapSet(
response, "event_id",
JsonValueDuplicate(HashMapGet(filled, "event_id"))
);
JsonFree(filled);
finish:
RoomUnlock(room);
Free(serverName);
if (sender)
{
Free(sender);
}
UserIdFree(id);
UserUnlock(user);
JsonFree(request);
return response;
}

View file

@ -30,14 +30,33 @@
#include <Cytoplasm/HashMap.h>
#include <Cytoplasm/Memory.h>
#include <Cytoplasm/Json.h>
#include <Cytoplasm/Util.h>
#include <Cytoplasm/Str.h>
#include <State.h>
#include <User.h>
#include <Room.h>
#include <string.h>
#include <stdlib.h>
#include <Schema/Filter.h>
static ClientEventWithoutRoomID
ClientfyEventSync(HashMap *pdu)
{
ClientEventWithoutRoomID ret = { 0 };
char *ignored;
ClientEventWithoutRoomIDFromJson(pdu, &ret, &ignored);
return ret;
}
static StrippedStateEvent
StripStateEventSync(HashMap *pdu)
{
StrippedStateEvent ret = { 0 };
char *ignored;
StrippedStateEventFromJson(pdu, &ret, &ignored);
return ret;
}
ROUTE_IMPL(RouteSync, path, argp)
{
@ -46,18 +65,27 @@ ROUTE_IMPL(RouteSync, path, argp)
HashMap *params = NULL;
HashMap *response = NULL;
SyncResponse sync = { 0 };
HashMap *rooms = NULL;
Array *invites;
Array *joins;
size_t i;
User *user = NULL;
char *token = NULL;
char *prevBatch = NULL;
char *nextBatch = NULL;
char *currBatch = NULL;
char *timeout = NULL;
char *err;
int timeoutDuration;
/* TODO: Respect `timeout', (and stop when something is
* pushed, maybe by 'polling' the database? sounds like
* a bad idea) */
if (HttpRequestMethodGet(args->context) != HTTP_GET)
{
err = "Unknown request method.";
@ -79,70 +107,142 @@ ROUTE_IMPL(RouteSync, path, argp)
response = MatrixErrorCreate(M_UNKNOWN_TOKEN, NULL);
goto finish;
}
params = HttpRequestParams(args->context);
prevBatch = HashMapGet(params, "since");
timeout = HashMapGet(params, "timeout");
timeoutDuration = atoi(timeout);
if (!prevBatch)
{
prevBatch = NULL;
nextBatch = UserInitSyncDiff(user);
UserFillSyncDiff(user, nextBatch);
}
else if (timeout && timeoutDuration)
{
char *name = StrDuplicate(UserGetName(user));
int passed = 0;
UserUnlock(user);
/* TODO: Unlocking the user for other threads to notify us is not
* wise.
* Also, Telodendria will NOT reply to ^C while someone is executing
* a sync. */
while (passed < timeoutDuration)
{
if (UserGetNotification(name))
{
break;
}
UtilSleepMillis(100);
passed += 100;
}
Free(name);
user = UserAuthenticate(db, token);
}
currBatch = prevBatch ? prevBatch : nextBatch;
/* TODO: I only am manually parsing this because j2s does not support
* a hashmap of unknown keys pointing to a known type. */
response = HashMapCreate();
rooms = HashMapCreate();
sync.rooms.invite = HashMapCreate();
sync.rooms.join = HashMapCreate();
/* invites */
invites = UserGetInvites(user, currBatch);
for (i = 0; i < ArraySize(invites); i++)
{
Array *invites;
size_t i;
char *roomId = ArrayGet(invites, i);
InvitedRooms invited = { 0 };
HashMap *invitedObject;
invites = UserGetInvites(user, currBatch);
for (i = 0; i < ArraySize(invites); i++)
{
char *roomId = ArrayGet(invites, i);
JsonSet(
rooms,
JsonValueObject(HashMapCreate()),
2, "invites", roomId
);
}
UserFreeList(invites);
invited.invite_state.events = ArrayCreate();
invitedObject = InvitedRoomsToJson(&invited);
JsonSet(
sync.rooms.invite,
JsonValueObject(invitedObject),
1, roomId
);
InvitedRoomsFree(&invited);
}
UserFreeList(invites);
/* Joins */
joins = UserGetJoins(user, currBatch);
for (i = 0; i < ArraySize(joins); i++)
{
Array *joins;
size_t i;
char *roomId = ArrayGet(joins, i);
Array *el = UserGetEvents(user, currBatch, roomId);
size_t j;
Room *r = RoomLock(db, roomId);
HashMap *state = StateCurrent(r);
char *firstEvent = NULL;
JoinedRooms joined = { 0 };
HashMap *joinedObj;
char *type, *key, *id;
joins = UserGetJoins(user, currBatch);
for (i = 0; i < ArraySize(joins); i++)
joined.timeline.events = ArrayCreate();
for (j = 0; j < ArraySize(el); j++)
{
char *roomId = ArrayGet(joins, i);
HashMap *room = HashMapCreate();
char *event = ArrayGet(el, j);
HashMap *e = RoomEventFetch(r, event);
ClientEventWithoutRoomID rc = ClientfyEventSync(e);
ClientEventWithoutRoomID *c = Malloc(sizeof(*c));
memcpy(c, &rc, sizeof(*c));
/* TODO: Add history. */
JsonFree(e);
JsonSet(
rooms,
JsonValueObject(room),
2, "join", roomId
);
if (j == 0)
{
firstEvent = c->event_id;
}
ArrayAdd(joined.timeline.events, c);
}
UserFreeList(joins);
}
joined.timeline.prev_batch = UserNewMessageToken(
user, roomId, firstEvent
);
/* TODO: Don't shove the entire state.
* That's a recipe for disaster, especially on large rooms. */
joined.state.events = ArrayCreate();
while (StateIterate(state, &type, &key, (void **) &id))
{
HashMap *e = RoomEventFetch(r, id);
StrippedStateEvent rs = StripStateEventSync(e);
StrippedStateEvent *s = Malloc(sizeof(*s));
memcpy(s, &rs, sizeof(*s));
JsonFree(e);
ArrayAdd(joined.state.events, s);
Free(type);
Free(key);
}
StateFree(state);
RoomUnlock(r);
UserFreeList(el);
joinedObj = JoinedRoomsToJson(&joined);
HashMapSet(sync.rooms.join, roomId, JsonValueObject(joinedObj));
JoinedRoomsFree(&joined);
}
UserFreeList(joins);
(void) path;
if (prevBatch)
{
UserDropSync(user, prevBatch);
nextBatch = UserInitSyncDiff(user);
UserFillSyncDiff(user, nextBatch);
}
HashMapSet(response, "next_batch", JsonValueString(nextBatch));
Free(nextBatch);
HashMapSet(response, "rooms", JsonValueObject(rooms));
sync.next_batch = nextBatch;
response = SyncResponseToJson(&sync);
SyncResponseFree(&sync);
finish:
UserUnlock(user);
(void) path;
return response;
}

View file

@ -32,8 +32,10 @@
#include <Cytoplasm/Log.h>
#include <Parser.h>
#include <Room.h>
#include <string.h>
#include <pthread.h>
struct User
{
@ -44,6 +46,9 @@ struct User
char *deviceId;
};
static pthread_mutex_t pushLock;
static HashMap *pushTable = NULL;
bool
UserValidate(char *localpart, char *domain)
{
@ -110,7 +115,7 @@ UserLock(Db * db, char *name)
User *user = NULL;
DbRef *ref = NULL;
if (!UserExists(db, name))
if (!name || !UserExists(db, name))
{
return NULL;
}
@ -174,16 +179,22 @@ bool
UserUnlock(User * user)
{
bool ret;
Db *db;
DbRef *ref;
if (!user)
{
return false;
}
db = user->db;
ref = user->ref;
Free(user->name);
Free(user->deviceId);
ret = DbUnlock(user->db, user->ref);
ret = DbUnlock(db, ref);
user->db = NULL;
user->ref = NULL;
Free(user);
return ret;
@ -1057,7 +1068,11 @@ UserAddJoin(User *user, char *roomId)
data = DbJson(user->ref);
joins = JsonValueAsObject(HashMapGet(data, "joins"));
JsonFree(HashMapSet(joins, roomId, JsonValueNull()));
if (!HashMapGet(joins, roomId))
{
JsonFree(HashMapSet(joins, roomId, JsonValueNull()));
}
UserNotifyUser(user);
}
void
@ -1074,6 +1089,7 @@ UserRemoveJoin(User *user, char *roomId)
joins = JsonValueAsObject(HashMapGet(data, "joins"));
JsonFree(HashMapDelete(joins, roomId));
UserNotifyUser(user);
}
Array *
UserListJoins(User *user)
@ -1142,6 +1158,8 @@ UserInitSyncDiff(User *user)
HashMapSet(data, "leaves", JsonValueArray(ArrayCreate()));
HashMapSet(data, "joins", JsonValueObject(HashMapCreate()));
HashMapSet(data, "creation", JsonValueInteger(UtilTsMillis()));
DbUnlock(user->db, syncRef);
return nextBatch;
@ -1173,6 +1191,7 @@ UserPushInviteSync(User *user, char *roomId)
DbUnlock(user->db, syncRef);
}
DbListFree(entries);
UserNotifyUser(user);
}
void
UserPushJoinSync(User *user, char *roomId)
@ -1212,6 +1231,7 @@ UserPushJoinSync(User *user, char *roomId)
DbUnlock(user->db, syncRef);
}
DbListFree(entries);
UserNotifyUser(user);
}
void
@ -1251,6 +1271,8 @@ UserPushEvent(User *user, HashMap *event)
DbUnlock(user->db, syncRef);
}
DbListFree(entries);
UserNotifyUser(user);
}
void
UserDropSync(User *user, char *batch)
@ -1313,9 +1335,21 @@ UserFillSyncDiff(User *user, char *batch)
HashMap *joinEntry;
HashMap *data = DbJson(syncRef);
HashMap *join = JsonValueAsObject(HashMapGet(data, "joins"));
Array *timeline = ArrayCreate();
Room *r = RoomLock(user->db, roomId);
Array *prevs = RoomPrevEventsGet(r);
size_t j;
for (j = 0; j < ArraySize(prevs); j++)
{
HashMap *e = JsonValueAsObject(ArrayGet(prevs, j));
/* TODO: Backfill the user a 'lil more. */
ArrayAdd(timeline, JsonValueDuplicate(HashMapGet(e, "event_id")));
}
RoomUnlock(r);
joinEntry = HashMapCreate();
HashMapSet(joinEntry, "timeline", JsonValueArray(ArrayCreate()));
HashMapSet(joinEntry, "timeline", JsonValueArray(timeline));
if (!HashMapGet(join, roomId))
{
@ -1332,6 +1366,7 @@ UserFillSyncDiff(User *user, char *batch)
Array *
UserGetJoins(User *user, char *batch)
{
Db *db;
DbRef *syncRef;
HashMap *data;
Array *keys;
@ -1341,7 +1376,8 @@ UserGetJoins(User *user, char *batch)
return NULL;
}
syncRef = DbLock(user->db, 4, "users", user->name, "sync", batch);
db = user->db;
syncRef = DbLock(db, 4, "users", user->name, "sync", batch);
if (!syncRef)
{
return NULL;
@ -1356,7 +1392,305 @@ UserGetJoins(User *user, char *batch)
ArraySet(keys, i, StrDuplicate(str));
}
DbUnlock(user->db, syncRef);
DbUnlock(db, syncRef);
return keys;
}
Array *
UserGetEvents(User *user, char *batch, char *roomId)
{
DbRef *syncRef;
HashMap *data;
HashMap *joins;
Array *keys, *ret;
size_t i;
if (!user || !batch || !roomId)
{
return NULL;
}
syncRef = DbLock(user->db, 4, "users", user->name, "sync", batch);
if (!syncRef)
{
return NULL;
}
data = DbJson(syncRef);
joins = JsonValueAsObject(JsonGet(data, 2, "joins", roomId));
keys = (JsonValueAsArray(HashMapGet(joins, "timeline")));
ret = ArrayCreate();
for (i = 0; i < ArraySize(keys); i++)
{
char *str = JsonValueAsString(ArrayGet(keys, i));
ArrayAdd(ret, StrDuplicate(str));
}
DbUnlock(user->db, syncRef);
return ret;
}
char *
UserNewMessageToken(User *user, char *room, char *event)
{
DbRef *messageRef;
HashMap *json;
char *messageToken;
if (!user || !room || !event)
{
return NULL;
}
messageToken = StrRandom(16);
messageRef = DbCreate(user->db,
4, "users", user->name, "msg", messageToken
);
json = DbJson(messageRef);
HashMapSet(json, "room", JsonValueString(room));
HashMapSet(json, "from", JsonValueString(event));
DbUnlock(user->db, messageRef);
return messageToken;
}
Array *
UserFetchMessages(User *user, int n, char *token, char **next)
{
Array *messages = NULL;
Array *nexts = NULL;
DbRef *messageRef;
HashMap *json;
Room *room;
char *roomId;
size_t i;
bool limited = false;
bool dir = false;
if (!user || !token || n == 0)
{
return NULL;
}
if (n < 0)
{
n = -n;
dir = true;
}
messageRef = DbLock(user->db,
4, "users", user->name, "msg", token
);
json = DbJson(messageRef);
if (!messageRef)
{
/* Regenerate a new one */
return NULL;
}
roomId = JsonValueAsString(HashMapGet(json, "room"));
room = RoomLock(user->db, roomId);
/* TODO (very important): CHECK IF THE USER IS ABLE TO SEE
* HISTORY. THROUGHOUT THE STEPS HERE. */
if (!room)
{
DbUnlock(user->db, messageRef);
return NULL;
}
nexts = ArrayCreate();
messages = ArrayCreate();
/* A stack of elements to deal with the DAG. */
ArrayAdd(nexts,
StrDuplicate(JsonValueAsString(HashMapGet(json, "from")))
);
for (i = 0; i < (size_t) n && ArraySize(nexts); i++)
{
char *curr = ArrayDelete(nexts, ArraySize(nexts) - 1);
HashMap *event = RoomEventFetch(room, curr);
Array *prevEvents;
size_t j;
Free(curr);
/* Push event into our message list. */
ArrayAdd(messages, event);
prevEvents = JsonValueAsArray(HashMapGet(event, "prev_events"));
if (dir)
{
HashMap *unsign = JsonValueAsObject(HashMapGet(event, "unsigned"));
/* prevEvents is now nextEvents */
prevEvents = JsonValueAsArray(HashMapGet(unsign, "next_events"));
}
for (j = 0; j < ArraySize(prevEvents); j++)
{
char *prevEvent = JsonValueAsString(ArrayGet(prevEvents, j));
ArrayAdd(nexts, StrDuplicate(prevEvent));
}
if (ArraySize(prevEvents) == 0)
{
limited = true;
}
}
for (i = 0; i < ArraySize(nexts); i++)
{
Free(ArrayGet(nexts, i));
}
ArrayFree(nexts);
RoomUnlock(room);
if (next && !limited)
{
HashMap *lastMessage = ArrayGet(messages, ArraySize(messages) - 1);
char *eId = JsonValueAsString(HashMapGet(lastMessage, "event_id"));
*next = UserNewMessageToken(user, roomId, eId);
}
DbUnlock(user->db, messageRef);
for (i = 0; i < ArraySize(messages); i++)
{
HashMap *e = ArrayGet(messages, i);
ArraySet(messages, i, JsonValueObject(e));
}
return messages;
}
void
UserFreeMessageToken(User *user, char *token)
{
if (!user || !token)
{
return;
}
DbDelete(user->db,
4, "users", user->name, "msg", token
);
}
void
UserCleanTemporaryData(User *user)
{
Array *list;
size_t i;
if (!user)
{
return;
}
list = DbList(user->db, 3, "users", user->name, "msg");
for (i = 0; i < ArraySize(list); i++)
{
char *token = ArrayGet(list, i);
UserFreeMessageToken(user, token);
}
DbListFree(list);
list = DbList(user->db, 3, "users", user->name, "sync");
for (i = 0; i < ArraySize(list); i++)
{
char *token = ArrayGet(list, i);
if (UserIsSyncOld(user, token))
{
UserDropSync(user, token);
}
}
DbListFree(list);
}
bool
UserIsSyncOld(User *user, char *token)
{
DbRef *ref;
HashMap *map;
int64_t dt;
if (!user || !token)
{
return false;
}
ref = DbLock(user->db, 4, "users", user->name, "sync", token);
map = DbJson(ref);
dt = UtilTsMillis() - JsonValueAsInteger(HashMapGet(map, "creation"));
DbUnlock(user->db, ref);
return dt > (5 * 60 * 1000); /* 5-minutes of timeout. */
}
bool
UserSyncExists(User *user, char *sync)
{
if (!user || !sync)
{
return false;
}
return DbLock(user->db, 4, "users", user->name, "sync", sync);
}
extern void
UserInitialisePushTable(void)
{
if (pushTable)
{
return;
}
pthread_mutex_init(&pushLock, NULL);
pthread_mutex_lock(&pushLock);
pushTable = HashMapCreate();
pthread_mutex_unlock(&pushLock);
}
void
UserNotifyUser(User *user)
{
if (!user || !pushTable)
{
return;
}
pthread_mutex_lock(&pushLock);
HashMapSet(pushTable, user->name, user->name);
pthread_mutex_unlock(&pushLock);
}
bool
UserGetNotification(char *user)
{
bool ret;
if (!user || !pushTable)
{
return false;
}
pthread_mutex_lock(&pushLock);
ret = !!HashMapGet(pushTable, user);
HashMapDelete(pushTable, user);
pthread_mutex_unlock(&pushLock);
return ret;
}
void
UserDestroyPushTable(void)
{
if (!pushTable)
{
return;
}
pthread_mutex_lock(&pushLock);
HashMapFree(pushTable);
pushTable = NULL;
pthread_mutex_unlock(&pushLock);
pthread_mutex_destroy(&pushLock);
}

View file

@ -100,12 +100,14 @@ ROUTE(RouteConfig);
ROUTE(RoutePrivileges);
ROUTE(RouteCreateRoom);
ROUTE(RouteFetchEvent);
ROUTE(RouteSendEvent);
ROUTE(RouteSendState);
ROUTE(RouteJoinRoom);
ROUTE(RouteJoinedRooms);
ROUTE(RouteJoinRoomAlias);
ROUTE(RouteFetchEvent);
ROUTE(RouteJoinedRooms);
ROUTE(RouteSync);
ROUTE(RouteMessages);
ROUTE(RouteAliasDirectory);
ROUTE(RouteRoomAliases);
@ -113,7 +115,6 @@ ROUTE(RouteRoomAliases);
ROUTE(RouteAdminDeactivate);
ROUTE(RouteAdminTokens);
#undef ROUTE
#endif

View file

@ -91,6 +91,7 @@ typedef struct UserLoginInfo
char *refreshToken;
} UserLoginInfo;
/**
* Take a localpart and domain as separate parameters and validate them
* against the rules of the Matrix specification. The reasion the
@ -403,9 +404,81 @@ extern Array * UserGetInvites(User *, char *);
*/
extern Array * UserGetJoins(User *, char *);
/**
* Get a list of event IDs for a diff table(and room ID),
* to be freed by
* .Fn UserFreeList .
*/
extern Array * UserGetEvents(User *, char *, char *);
/**
* Drops a sync diff, denoted by it's previous batch ID, if it
* exists.
*/
extern void UserDropSync(User *, char *);
/**
* Verifies if the 'sync' data is considered old, and can be
* reasonably removed from the database.
*/
extern bool UserIsSyncOld(User *, char *);
/**
* Verifies if a sync diff exists.
*/
extern bool UserSyncExists(User *, char *);
/**
* Creates a new "message token" start from a room and event ID,
* given a specified direction(with false<=>'b' and true<=>'f')
* to be used by /messages.
*/
extern char * UserNewMessageToken(User *, char *, char *);
/**
* Grabs events from a message token(with the correct direction)
* into an array of HashMaps(which store PDU objects).
* If the string pointer is not NULL, this function sets its
* value to a new message token(which is stored on the heap),
* if and only if, there are any new events to retrieve.
*
* If the counter is negative, then it is treated as a 'f',
* otherwise, it is treated as 'b'.
*/
extern Array * UserFetchMessages(User *, int, char *, char **);
/**
* Deletes a "message token".
*/
extern void UserFreeMessageToken(User *, char *);
/**
* Deletes temporary data about a specific user.
*/
extern void UserCleanTemporaryData(User *);
/**
* Initialise a temporary sync table to signal any /sync changes.
* This should be paired with a call to
* .Fn UserDestroyPushTable .
*/
extern void UserInitialisePushTable(void);
/**
* Notifies the user of a sync update.
*/
extern void UserNotifyUser(User *);
/**
* Verifies if the user has been notified, and if it is, then
* removes the notification
*/
extern bool UserGetNotification(char *);
/**
* Destroys the temporary push table created by
* .Fn UserInitialisePushTable .
*/
extern void UserDestroyPushTable(void);
#endif /* TELODENDRIA_USER_H */