diff --git a/src/HttpServer.c b/src/HttpServer.c index d96367a..12fb90f 100644 --- a/src/HttpServer.c +++ b/src/HttpServer.c @@ -25,6 +25,8 @@ #include #include +#include +#include #include #include @@ -52,6 +54,8 @@ struct HttpServer Queue *connQueue; pthread_mutex_t connQueueMutex; + + Array *threadPool; }; static int @@ -107,26 +111,34 @@ HttpServerCreate(unsigned short port, unsigned int nThreads, unsigned int maxCon return NULL; } - server = malloc(sizeof(HttpServer)); + server = calloc(1, sizeof(HttpServer)); if (!server) { - return NULL; + goto error; + } + + server->threadPool = ArrayCreate(); + if (!server->threadPool) + { + goto error; } server->connQueue = QueueCreate(maxConnections); if (!server->connQueue) { - free(server); - return NULL; + goto error; + } + + if (pthread_mutex_init(&server->connQueueMutex, NULL) != 0) + { + goto error; } - pthread_mutex_init(&server->connQueueMutex, NULL); server->sd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); if (server->sd < 0) { - free(server); - return NULL; + goto error; } sa.sin_family = AF_INET; @@ -135,16 +147,12 @@ HttpServerCreate(unsigned short port, unsigned int nThreads, unsigned int maxCon if (bind(server->sd, (struct sockaddr *) & sa, sizeof(sa)) < 0) { - close(server->sd); - free(server); - return NULL; + goto error; } if (listen(server->sd, maxConnections) < 0) { - close(server->sd); - free(server); - return NULL; + goto error; } server->nThreads = nThreads; @@ -155,6 +163,33 @@ HttpServerCreate(unsigned short port, unsigned int nThreads, unsigned int maxCon server->isRunning = 0; return server; + +error: + if (server) + { + if (server->connQueue) + { + QueueFree(server->connQueue); + } + + if (server->connQueueMutex) + { + pthread_mutex_destroy(&server->connQueueMutex); + } + + if (server->threadPool) + { + ArrayFree(server->threadPool); + } + + if (server->sd) + { + close(server->sd); + } + + free(server); + } + return NULL; } void @@ -168,15 +203,46 @@ HttpServerFree(HttpServer * server) close(server->sd); QueueFree(server->connQueue); pthread_mutex_destroy(&server->connQueueMutex); + ArrayFree(server->threadPool); free(server); } +static void * +HttpServerWorkerThread(void *args) +{ + HttpServer *server = (HttpServer *) args; + + while (!server->stop) + { + FILE *fp = DequeueConnection(server); + + if (!fp) + { + /* Block for 1 millisecond before continuting so we don't + * murder the CPU */ + UtilSleepMillis(1); + continue; + } + + fprintf(fp, "HTTP/1.1 500 Internal Server Error\n"); + fprintf(fp, "Server: Telodendria v" TELODENDRIA_VERSION "\n"); + fprintf(fp, "Content-Type: application/json\n"); + fprintf(fp, "\n"); + fprintf(fp, "{}\n"); + + fclose(fp); + } + + return NULL; +} + static void * HttpServerEventThread(void *args) { HttpServer *server = (HttpServer *) args; struct pollfd pollFds[1]; FILE *fp; + size_t i; server->isRunning = 1; server->stop = 0; @@ -184,6 +250,27 @@ HttpServerEventThread(void *args) pollFds[0].fd = server->sd; pollFds[0].events = POLLIN; + for (i = 0; i < server->nThreads; i++) + { + pthread_t *workerThread = malloc(sizeof(pthread_t)); + + if (!workerThread) + { + /* TODO: Make the event thread return an error to the main + * thread */ + return NULL; + } + + if (pthread_create(workerThread, NULL, HttpServerWorkerThread, server) != 0) + { + /* TODO: Make the event thread return an error to the main + * thread */ + return NULL; + } + + ArrayAdd(server->threadPool, workerThread); + } + while (!server->stop) { struct sockaddr_storage addr; @@ -209,7 +296,13 @@ HttpServerEventThread(void *args) QueueConnection(server, connFd); } - /* Wait on all threads in the thread pool */ + for (i = 0; i < server->nThreads; i++) + { + pthread_t *workerThread = ArrayGet(server->threadPool, i); + + pthread_join(*workerThread, NULL); + free(workerThread); + } while ((fp = DequeueConnection(server))) {