Implement basic thread pool with dummy 500 messages.

This commit is contained in:
Jordan Bancino 2022-08-24 19:31:28 -04:00
parent f950233dbc
commit 9378a4d369

View file

@ -25,6 +25,8 @@
#include <HttpServer.h> #include <HttpServer.h>
#include <Queue.h> #include <Queue.h>
#include <Array.h>
#include <Util.h>
#include <pthread.h> #include <pthread.h>
#include <stdio.h> #include <stdio.h>
@ -52,6 +54,8 @@ struct HttpServer
Queue *connQueue; Queue *connQueue;
pthread_mutex_t connQueueMutex; pthread_mutex_t connQueueMutex;
Array *threadPool;
}; };
static int static int
@ -107,26 +111,34 @@ HttpServerCreate(unsigned short port, unsigned int nThreads, unsigned int maxCon
return NULL; return NULL;
} }
server = malloc(sizeof(HttpServer)); server = calloc(1, sizeof(HttpServer));
if (!server) if (!server)
{ {
return NULL; goto error;
}
server->threadPool = ArrayCreate();
if (!server->threadPool)
{
goto error;
} }
server->connQueue = QueueCreate(maxConnections); server->connQueue = QueueCreate(maxConnections);
if (!server->connQueue) if (!server->connQueue)
{ {
free(server); goto error;
return NULL; }
if (pthread_mutex_init(&server->connQueueMutex, NULL) != 0)
{
goto error;
} }
pthread_mutex_init(&server->connQueueMutex, NULL);
server->sd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); server->sd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (server->sd < 0) if (server->sd < 0)
{ {
free(server); goto error;
return NULL;
} }
sa.sin_family = AF_INET; sa.sin_family = AF_INET;
@ -135,16 +147,12 @@ HttpServerCreate(unsigned short port, unsigned int nThreads, unsigned int maxCon
if (bind(server->sd, (struct sockaddr *) & sa, sizeof(sa)) < 0) if (bind(server->sd, (struct sockaddr *) & sa, sizeof(sa)) < 0)
{ {
close(server->sd); goto error;
free(server);
return NULL;
} }
if (listen(server->sd, maxConnections) < 0) if (listen(server->sd, maxConnections) < 0)
{ {
close(server->sd); goto error;
free(server);
return NULL;
} }
server->nThreads = nThreads; server->nThreads = nThreads;
@ -155,6 +163,33 @@ HttpServerCreate(unsigned short port, unsigned int nThreads, unsigned int maxCon
server->isRunning = 0; server->isRunning = 0;
return server; return server;
error:
if (server)
{
if (server->connQueue)
{
QueueFree(server->connQueue);
}
if (server->connQueueMutex)
{
pthread_mutex_destroy(&server->connQueueMutex);
}
if (server->threadPool)
{
ArrayFree(server->threadPool);
}
if (server->sd)
{
close(server->sd);
}
free(server);
}
return NULL;
} }
void void
@ -168,15 +203,46 @@ HttpServerFree(HttpServer * server)
close(server->sd); close(server->sd);
QueueFree(server->connQueue); QueueFree(server->connQueue);
pthread_mutex_destroy(&server->connQueueMutex); pthread_mutex_destroy(&server->connQueueMutex);
ArrayFree(server->threadPool);
free(server); free(server);
} }
static void *
HttpServerWorkerThread(void *args)
{
HttpServer *server = (HttpServer *) args;
while (!server->stop)
{
FILE *fp = DequeueConnection(server);
if (!fp)
{
/* Block for 1 millisecond before continuting so we don't
* murder the CPU */
UtilSleepMillis(1);
continue;
}
fprintf(fp, "HTTP/1.1 500 Internal Server Error\n");
fprintf(fp, "Server: Telodendria v" TELODENDRIA_VERSION "\n");
fprintf(fp, "Content-Type: application/json\n");
fprintf(fp, "\n");
fprintf(fp, "{}\n");
fclose(fp);
}
return NULL;
}
static void * static void *
HttpServerEventThread(void *args) HttpServerEventThread(void *args)
{ {
HttpServer *server = (HttpServer *) args; HttpServer *server = (HttpServer *) args;
struct pollfd pollFds[1]; struct pollfd pollFds[1];
FILE *fp; FILE *fp;
size_t i;
server->isRunning = 1; server->isRunning = 1;
server->stop = 0; server->stop = 0;
@ -184,6 +250,27 @@ HttpServerEventThread(void *args)
pollFds[0].fd = server->sd; pollFds[0].fd = server->sd;
pollFds[0].events = POLLIN; pollFds[0].events = POLLIN;
for (i = 0; i < server->nThreads; i++)
{
pthread_t *workerThread = malloc(sizeof(pthread_t));
if (!workerThread)
{
/* TODO: Make the event thread return an error to the main
* thread */
return NULL;
}
if (pthread_create(workerThread, NULL, HttpServerWorkerThread, server) != 0)
{
/* TODO: Make the event thread return an error to the main
* thread */
return NULL;
}
ArrayAdd(server->threadPool, workerThread);
}
while (!server->stop) while (!server->stop)
{ {
struct sockaddr_storage addr; struct sockaddr_storage addr;
@ -209,7 +296,13 @@ HttpServerEventThread(void *args)
QueueConnection(server, connFd); QueueConnection(server, connFd);
} }
/* Wait on all threads in the thread pool */ for (i = 0; i < server->nThreads; i++)
{
pthread_t *workerThread = ArrayGet(server->threadPool, i);
pthread_join(*workerThread, NULL);
free(workerThread);
}
while ((fp = DequeueConnection(server))) while ((fp = DequeueConnection(server)))
{ {