Add support for spinning up multiple HTTP servers.

This is useful for having a TLS and a non-TLS version port, like Synapse.
I verified that the multiple-servers does in fact work as intended,
although the TLS server part is broken; I must be doing something
incorrectly with LibreSSL in setting up the server.
This commit is contained in:
Jordan Bancino 2023-03-23 02:12:45 +00:00
parent 2fab7b55fe
commit 2441f07848
15 changed files with 311 additions and 90 deletions

View file

@ -38,10 +38,10 @@ Milestone: v0.3.0
[x] Global log object
- So we don't have to pass LogConfig around everywhere
- Also allows debug and error logging in other APIs
[ ] Make 'listen' directive in config an array of objects
[x] Make 'listen' directive in config an array of objects
- Each object has port, threads, maxConnections
- If tls is given, it can be null, false, or an object with cert and key
[ ] Pass TLS certs and keys into HttpServer
[x] Pass TLS certs and keys into HttpServer
[ ] Move configuration to database
[ ] Initial configuration
[ ] If no config, create one-time use registration token that

View file

@ -1,14 +1,19 @@
{
"serverName": "localhost",
"baseUrl": "http://localhost:8008",
"dataDir": "./data",
"federation": true,
"registration": true,
"threads": 2,
"log": {
"output": "stdout",
"level": "debug",
"color": true,
"timestampFormat": "none",
"color": true
"level": "debug"
},
"dataDir": "./data",
"listen": [
{
"port": 8008,
"tls": false
}
],
"registration": true,
"serverName": "localhost",
"baseUrl": "http:\/\/localhost:8008",
"federation": true
}

View file

@ -1,13 +1,25 @@
{
"serverName": "example.com",
"baseUrl": "https://matrix.example.com",
"identityServer": "https://identity.example.com",
"dataDir": "/var/telodendria",
"federation": true,
"registration": false,
"threads": 4,
"maxCache": 512000000,
"log": {
"output": "file"
},
"dataDir": "/var/telodendria",
"listen": [
{
"port": 8008,
"tls": false
},
{
"port": 8448,
"tls": {
"cert": "telodendria.crt",
"key": "telodendria.key"
}
}
],
"serverName": "example.com",
"identityServer": "https://identity.example.com",
"baseUrl": "https://matrix.example.com",
"registration": false,
"federation": true,
"maxCache": 512000000
}

View file

@ -164,7 +164,7 @@ ArrayInsert(Array * array, size_t index, void *value)
}
extern void *
ArraySet(Array * array, size_t index, void * value)
ArraySet(Array * array, size_t index, void *value)
{
void *oldValue;

View file

@ -29,6 +29,7 @@
#include <Array.h>
#include <Str.h>
#include <Db.h>
#include <HttpServer.h>
#include <stdlib.h>
#include <ctype.h>
@ -89,7 +90,7 @@
into = default; \
}
int
static int
ConfigParseRunAs(Config * tConfig, HashMap * config)
{
JsonValue *value;
@ -105,7 +106,109 @@ error:
return 0;
}
int
static int
ConfigParseListen(Config * tConfig, Array * listen)
{
size_t i;
if (!ArraySize(listen))
{
Log(LOG_ERR, "Listen array cannot be empty; you must specify at least");
Log(LOG_ERR, "one listener.");
goto error;
}
if (!tConfig->servers)
{
tConfig->servers = ArrayCreate();
if (!tConfig->servers)
{
Log(LOG_ERR, "Unable to allocate memory for listener configurations.");
goto error;
}
}
for (i = 0; i < ArraySize(listen); i++)
{
JsonValue *val = ArrayGet(listen, i);
HashMap *obj;
HttpServerConfig *serverCfg = Malloc(sizeof(HttpServerConfig));
if (!serverCfg)
{
Log(LOG_ERR, "Unable to allocate memory for listener configuration.");
goto error;
}
if (JsonValueType(val) != JSON_OBJECT)
{
Log(LOG_ERR, "Invalid value in listener array.");
Log(LOG_ERR, "All listeners must be objects.");
goto error;
}
obj = JsonValueAsObject(val);
serverCfg->port = JsonValueAsInteger(HashMapGet(obj, "port"));
serverCfg->threads = JsonValueAsInteger(HashMapGet(obj, "threads"));
serverCfg->maxConnections = JsonValueAsInteger(HashMapGet(obj, "maxConnections"));
if (!serverCfg->port)
{
Log(LOG_WARNING, "No or invalid port specified, listener will be ignored.");
Free(serverCfg);
continue;
}
if (!serverCfg->threads)
{
Log(LOG_DEBUG, "No or invalid number of threads specified for listener.");
Log(LOG_DEBUG, "Using default, which may be subject to change.");
serverCfg->threads = 4;
}
if (!serverCfg->maxConnections)
{
Log(LOG_DEBUG, "No or invalid number of maximum connections specified.");
Log(LOG_DEBUG, "Using default, which may be subject to change.");
serverCfg->maxConnections = 32;
}
val = HashMapGet(obj, "tls");
if ((JsonValueType(val) == JSON_BOOLEAN && !JsonValueAsBoolean(val)) || JsonValueType(val) == JSON_NULL)
{
serverCfg->flags = HTTP_FLAG_NONE;
serverCfg->tlsCert = NULL;
serverCfg->tlsKey = NULL;
}
else if (JsonValueType(val) != JSON_OBJECT)
{
Log(LOG_ERR, "Invalid value for listener.tls. It must be an object.");
goto error;
}
else
{
serverCfg->flags = HTTP_FLAG_TLS;
obj = JsonValueAsObject(val);
serverCfg->tlsCert = StrDuplicate(JsonValueAsString(HashMapGet(obj, "cert")));
serverCfg->tlsKey = StrDuplicate(JsonValueAsString(HashMapGet(obj, "key")));
if (!serverCfg->tlsCert || !serverCfg->tlsKey)
{
Log(LOG_ERR, "TLS cert and key must both be valid file names.");
goto error;
}
}
ArrayAdd(tConfig->servers, serverCfg);
}
return 1;
error:
return 0;
}
static int
ConfigParseLog(Config * tConfig, HashMap * config)
{
JsonValue *value;
@ -210,7 +313,11 @@ ConfigParse(HashMap * config)
memset(tConfig, 0, sizeof(Config));
CONFIG_OPTIONAL_INTEGER(tConfig->listenPort, "listen", 8008);
CONFIG_REQUIRE("listen", JSON_ARRAY);
if (!ConfigParseListen(tConfig, JsonValueAsArray(value)))
{
goto error;
}
CONFIG_REQUIRE("serverName", JSON_STRING);
CONFIG_COPY_STRING(tConfig->serverName);
@ -256,8 +363,6 @@ ConfigParse(HashMap * config)
CONFIG_REQUIRE("dataDir", JSON_STRING);
CONFIG_COPY_STRING(tConfig->dataDir);
CONFIG_OPTIONAL_INTEGER(tConfig->threads, "threads", 1);
CONFIG_OPTIONAL_INTEGER(tConfig->maxConnections, "maxConnections", 32);
CONFIG_OPTIONAL_INTEGER(tConfig->maxCache, "maxCache", 0);
CONFIG_REQUIRE("federation", JSON_BOOLEAN);
@ -303,5 +408,21 @@ ConfigFree(Config * tConfig)
Free(tConfig->logTimestamp);
if (tConfig->servers)
{
size_t i;
for (i = 0; i < ArraySize(tConfig->servers); i++)
{
HttpServerConfig *serverCfg = ArrayGet(tConfig->servers, i);
Free(serverCfg->tlsCert);
Free(serverCfg->tlsKey);
Free(serverCfg);
}
ArrayFree(tConfig->servers);
}
Free(tConfig);
}

View file

@ -27,6 +27,7 @@
#include <Array.h>
#include <Util.h>
#include <Tls.h>
#include <Log.h>
#include <pthread.h>
#include <stdio.h>
@ -48,20 +49,13 @@ static const char ENABLE = 1;
struct HttpServer
{
HttpServerConfig config;
int sd;
unsigned int nThreads;
unsigned int maxConnections;
pthread_t socketThread;
int flags;
char *tlsCrt;
char *tlsKey;
volatile unsigned int stop:1;
volatile unsigned int isRunning:1;
HttpHandler *requestHandler;
void *handlerArgs;
Queue *connQueue;
pthread_mutex_t connQueueMutex;
@ -234,7 +228,7 @@ HttpResponseStatus(HttpServerContext * c, HttpStatus status)
}
HttpStatus
HttpResponseStatusGet(HttpServerContext *c)
HttpResponseStatusGet(HttpServerContext * c)
{
if (!c)
{
@ -291,19 +285,23 @@ DequeueConnection(HttpServer * server)
}
HttpServer *
HttpServerCreate(int flags, unsigned short port, unsigned int nThreads, unsigned int maxConnections,
HttpHandler * requestHandler, void *handlerArgs)
HttpServerCreate(HttpServerConfig * config)
{
HttpServer *server;
struct sockaddr_in sa;
if (!requestHandler)
if (!config)
{
return NULL;
}
if (!config->handler)
{
return NULL;
}
#ifndef TLS_IMPL
if (flags & HTTP_FLAG_TLS)
if (config->flags & HTTP_FLAG_TLS)
{
return NULL;
}
@ -317,7 +315,7 @@ HttpServerCreate(int flags, unsigned short port, unsigned int nThreads, unsigned
memset(server, 0, sizeof(HttpServer));
server->flags = flags;
server->config = *config;
server->threadPool = ArrayCreate();
if (!server->threadPool)
@ -325,7 +323,7 @@ HttpServerCreate(int flags, unsigned short port, unsigned int nThreads, unsigned
goto error;
}
server->connQueue = QueueCreate(maxConnections);
server->connQueue = QueueCreate(config->maxConnections);
if (!server->connQueue)
{
goto error;
@ -363,7 +361,7 @@ HttpServerCreate(int flags, unsigned short port, unsigned int nThreads, unsigned
memset(&sa, 0, sizeof(struct sockaddr_in));
sa.sin_family = AF_INET;
sa.sin_port = htons(port);
sa.sin_port = htons(config->port);
sa.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(server->sd, (struct sockaddr *) & sa, sizeof(sa)) < 0)
@ -371,15 +369,11 @@ HttpServerCreate(int flags, unsigned short port, unsigned int nThreads, unsigned
goto error;
}
if (listen(server->sd, maxConnections) < 0)
if (listen(server->sd, config->maxConnections) < 0)
{
goto error;
}
server->nThreads = nThreads;
server->maxConnections = maxConnections;
server->requestHandler = requestHandler;
server->handlerArgs = handlerArgs;
server->stop = 0;
server->isRunning = 0;
@ -410,6 +404,17 @@ error:
return NULL;
}
HttpServerConfig *
HttpServerConfigGet(HttpServer * server)
{
if (!server)
{
return NULL;
}
return &server->config;
}
void
HttpServerFree(HttpServer * server)
{
@ -561,7 +566,7 @@ HttpServerWorkerThread(void *args)
goto internal_error;
}
server->requestHandler(context, server->handlerArgs);
server->config.handler(context, server->config.handlerArgs);
HttpServerContextFree(context);
fp = NULL; /* The above call will close this
@ -603,7 +608,7 @@ HttpServerEventThread(void *args)
pollFds[0].fd = server->sd;
pollFds[0].events = POLLIN;
for (i = 0; i < server->nThreads; i++)
for (i = 0; i < server->config.threads; i++)
{
HttpServerWorkerThreadArgs *workerThread = Malloc(sizeof(HttpServerWorkerThreadArgs));
@ -657,9 +662,9 @@ HttpServerEventThread(void *args)
}
#ifdef TLS_IMPL
if (server->flags & HTTP_FLAG_TLS)
if (server->config.flags & HTTP_FLAG_TLS)
{
fp = TlsServerStream(connFd, server->tlsCrt, server->tlsKey);
fp = TlsServerStream(connFd, server->config.tlsCert, server->config.tlsKey);
}
else
{
@ -681,7 +686,7 @@ HttpServerEventThread(void *args)
pthread_mutex_unlock(&server->connQueueMutex);
}
for (i = 0; i < server->nThreads; i++)
for (i = 0; i < server->config.threads; i++)
{
HttpServerWorkerThreadArgs *workerThread = ArrayGet(server->threadPool, i);

View file

@ -369,7 +369,7 @@ Logv(LogConfig * config, int level, const char *msg, va_list argp)
}
void
LogTo(LogConfig *config, int level, const char *fmt, ...)
LogTo(LogConfig * config, int level, const char *fmt,...)
{
va_list argp;
@ -379,7 +379,7 @@ LogTo(LogConfig *config, int level, const char *fmt, ...)
}
extern void
Log(int level, const char *fmt, ...)
Log(int level, const char *fmt,...)
{
va_list argp;

View file

@ -43,6 +43,7 @@
#include <Db.h>
#include <Cron.h>
#include <Uia.h>
#include <Util.h>
static Array *httpServers = NULL;
@ -61,6 +62,7 @@ TelodendriaSignalHandler(int signalNo)
for (i = 0; i < ArraySize(httpServers); i++)
{
HttpServer *server = ArrayGet(httpServers, i);
HttpServerStop(server);
}
}
@ -239,6 +241,7 @@ main(int argc, char **argv)
{
Log(LOG_ERR, "Unable to open log file for appending.");
exit = EXIT_FAILURE;
tConfig->flags &= CONFIG_LOG_STDOUT;
goto finish;
}
@ -269,14 +272,11 @@ main(int argc, char **argv)
Log(LOG_DEBUG, "Configuration:");
LogConfigIndent(LogConfigGlobal());
Log(LOG_DEBUG, "Listen On: %d", tConfig->listenPort);
Log(LOG_DEBUG, "Server Name: %s", tConfig->serverName);
Log(LOG_DEBUG, "Base URL: %s", tConfig->baseUrl);
Log(LOG_DEBUG, "Identity Server: %s", tConfig->identityServer);
Log(LOG_DEBUG, "Run As: %s:%s", tConfig->uid, tConfig->gid);
Log(LOG_DEBUG, "Data Directory: %s", tConfig->dataDir);
Log(LOG_DEBUG, "Threads: %d", tConfig->threads);
Log(LOG_DEBUG, "Max Connections: %d", tConfig->maxConnections);
Log(LOG_DEBUG, "Max Cache: %ld", tConfig->maxCache);
Log(LOG_DEBUG, "Flags: %x", tConfig->flags);
LogConfigUnindent(LogConfigGlobal());
@ -292,18 +292,59 @@ main(int argc, char **argv)
goto finish;
}
/* Bind the socket before possibly dropping permissions */
server = HttpServerCreate(HTTP_FLAG_NONE, tConfig->listenPort, tConfig->threads,
tConfig->maxConnections, MatrixHttpHandler, &matrixArgs);
if (!server)
/* Bind servers before possibly dropping permissions. */
for (i = 0; i < ArraySize(tConfig->servers); i++)
{
Log(LOG_ERR, "Unable to create HTTP server on port %d: %s",
tConfig->listenPort, strerror(errno));
HttpServerConfig *serverCfg = ArrayGet(tConfig->servers, i);
Log(LOG_DEBUG, "HTTP listener: %lu", i);
LogConfigIndent(LogConfigGlobal());
Log(LOG_DEBUG, "Port: %hu", serverCfg->port);
Log(LOG_DEBUG, "Threads: %u", serverCfg->threads);
Log(LOG_DEBUG, "Max Connections: %u", serverCfg->maxConnections);
Log(LOG_DEBUG, "Flags: %d", serverCfg->flags);
Log(LOG_DEBUG, "TLS Cert: %s", serverCfg->tlsCert);
Log(LOG_DEBUG, "TLS Key: %s", serverCfg->tlsKey);
LogConfigUnindent(LogConfigGlobal());
serverCfg->handler = MatrixHttpHandler;
serverCfg->handlerArgs = &matrixArgs;
if (serverCfg->flags & HTTP_FLAG_TLS)
{
if (!UtilLastModified(serverCfg->tlsCert))
{
Log(LOG_ERR, "%s: %s", strerror(errno), serverCfg->tlsCert);
exit = EXIT_FAILURE;
goto finish;
}
if (!UtilLastModified(serverCfg->tlsKey))
{
Log(LOG_ERR, "%s: %s", strerror(errno), serverCfg->tlsKey);
exit = EXIT_FAILURE;
goto finish;
}
}
server = HttpServerCreate(serverCfg);
if (!server)
{
Log(LOG_ERR, "Unable to create HTTP server on port %d: %s",
serverCfg->port, strerror(errno));
exit = EXIT_FAILURE;
goto finish;
}
ArrayAdd(httpServers, server);
}
if (!ArraySize(httpServers))
{
Log(LOG_ERR, "No valid HTTP listeners specified in the configuration.");
exit = EXIT_FAILURE;
goto finish;
}
Log(LOG_DEBUG, "Running as uid:gid: %d:%d.", getuid(), getgid());
@ -375,7 +416,6 @@ main(int argc, char **argv)
tConfig->uid = NULL;
tConfig->gid = NULL;
if (!tConfig->maxCache)
{
Log(LOG_WARNING, "Database caching is disabled.");
@ -391,6 +431,10 @@ main(int argc, char **argv)
exit = EXIT_FAILURE;
goto finish;
}
else
{
Log(LOG_DEBUG, "Opened database.");
}
cron = CronCreate(60 * 1000); /* 1-minute tick */
if (!cron)
@ -411,21 +455,24 @@ main(int argc, char **argv)
for (i = 0; i < ArraySize(httpServers); i++)
{
HttpServerConfig *serverCfg;
server = ArrayGet(httpServers, i);
serverCfg = HttpServerConfigGet(server);
if (!HttpServerStart(server))
{
Log(LOG_ERR, "Unable to start HTTP server %lu.", i);
Log(LOG_ERR, "Unable to start HTTP server %lu on port %hu.", i, serverCfg->port);
exit = EXIT_FAILURE;
goto finish;
}
else
{
Log(LOG_DEBUG, "Started HTTP server %lu.", i);
Log(LOG_INFO, "Listening on port: %hu", serverCfg->port);
}
}
Log(LOG_INFO, "Listening on port: %d", tConfig->listenPort);
sigAction.sa_handler = TelodendriaSignalHandler;
sigfillset(&sigAction.sa_mask);
@ -437,6 +484,10 @@ main(int argc, char **argv)
exit = EXIT_FAILURE;
goto finish;
}
else
{
Log(LOG_DEBUG, "Installed SIGINT signal handler.");
}
/* Block this thread until the servers are terminated by a signal
* handler */
@ -444,6 +495,7 @@ main(int argc, char **argv)
{
server = ArrayGet(httpServers, i);
HttpServerJoin(server);
Log(LOG_DEBUG, "Joined HTTP server %lu.", i);
}
finish:
@ -452,11 +504,14 @@ finish:
{
for (i = 0; i < ArraySize(httpServers); i++)
{
Log(LOG_DEBUG, "Freeing HTTP server %lu...", i);
server = ArrayGet(httpServers, i);
HttpServerStop(server);
HttpServerFree(server);
Log(LOG_DEBUG, "Freed HTTP server %lu.", i);
}
ArrayFree(httpServers);
Log(LOG_DEBUG, "Freed HTTP servers array.");
}
if (cron)
@ -466,16 +521,8 @@ finish:
Log(LOG_DEBUG, "Stopped and freed job scheduler.");
}
/*
* If we're not logging to standard output, then we can close it. Otherwise,
* if we are logging to stdout, LogConfigFree() will close it for us.
*/
if (tConfig && !(tConfig->flags & CONFIG_LOG_STDOUT))
{
StreamClose(StreamStdout());
}
DbClose(matrixArgs.db);
Log(LOG_DEBUG, "Closed database.");
ConfigFree(tConfig);
@ -498,6 +545,8 @@ finish:
* memory should be allocated. */
TelodendriaGenerateMemReport();
/* Free any leaked memory now, just in case the operating system
* we're running on won't do it for us. */
MemoryFreeAll();
return exit;
}

View file

@ -119,21 +119,25 @@ TlsInitServer(int fd, const char *crt, const char *key)
if (tls_config_set_cert_file(cookie->cfg, crt) == -1)
{
Log(LOG_ERR, "Error with certificate file.");
goto error;
}
if (tls_config_set_key_file(cookie->cfg, key) == -1)
{
Log(LOG_ERR, "Error with key file.");
goto error;
}
if (tls_configure(cookie->ctx, cookie->cfg) == -1)
{
Log(LOG_ERR, "Error configuring context.");
goto error;
}
if (tls_accept_socket(cookie->ctx, &cookie->cctx, fd) == -1)
{
Log(LOG_ERR, "Error accepting socket.");
goto error;
}

View file

@ -43,7 +43,7 @@ extern int
ArrayInsert(Array *, size_t, void *);
extern void *
ArraySet(Array *, size_t, void *);
ArraySet(Array *, size_t, void *);
extern int
ArrayAdd(Array *, void *);

View file

@ -27,6 +27,7 @@
#include <Log.h>
#include <HashMap.h>
#include <Array.h>
typedef enum ConfigFlag
{
@ -48,15 +49,14 @@ typedef struct Config
char *gid;
char *dataDir;
unsigned short listenPort;
unsigned int flags;
unsigned int threads;
unsigned int maxConnections;
size_t maxCache;
char *logTimestamp;
int logLevel;
Array *servers;
} Config;
extern Config *

View file

@ -36,8 +36,25 @@ typedef struct HttpServer HttpServer;
typedef struct HttpServerContext HttpServerContext;
typedef void (HttpHandler) (HttpServerContext *, void *);
typedef struct HttpServerConfig
{
unsigned short port;
unsigned int threads;
unsigned int maxConnections;
int flags;
char *tlsCert;
char *tlsKey;
HttpHandler *handler;
void *handlerArgs;
} HttpServerConfig;
extern HttpServer *
HttpServerCreate(int, unsigned short, unsigned int, unsigned int, HttpHandler *, void *);
HttpServerCreate(HttpServerConfig *);
extern HttpServerConfig *
HttpServerConfigGet(HttpServer *);
extern void
HttpServerFree(HttpServer *);

View file

@ -76,6 +76,6 @@ extern void
LogTo(LogConfig *, int, const char *,...);
extern void
Log(int, const char *, ...);
Log(int, const char *,...);
#endif

View file

@ -73,8 +73,16 @@ int
main(void)
{
struct sigaction sa;
HttpServerConfig cfg;
server = HttpServerCreate(HTTP_FLAG_NONE, 8008, 1, 1, HttpHandle, NULL);
cfg.flags = HTTP_FLAG_NONE;
cfg.port = 8008;
cfg.threads = 1;
cfg.maxConnections = 1;
cfg.handler = HttpHandle;
cfg.handlerArgs = NULL;
server = HttpServerCreate(&cfg);
if (!HttpServerStart(server))
{
StreamPuts(StreamStderr(), "Unable to start HTTP server.\n");