summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichiel Schuurmans <michielschuurmans@gmail.com>2020-12-05 17:10:22 +0100
committerMichiel Schuurmans <michielschuurmans@gmail.com>2020-12-05 17:10:22 +0100
commit9a410f6fa56a2462c64b37c140c76fa9c59f019a (patch)
tree2ddf1ff7aa3e403d8a382cd68f231605169d0b17
Base
Signed-off-by: Michiel Schuurmans <michielschuurmans@gmail.com>
-rw-r--r--Makefile24
-rw-r--r--log.c96
-rw-r--r--log.h27
-rw-r--r--main.c18
-rw-r--r--threads.c201
-rw-r--r--threads.h68
6 files changed, 434 insertions, 0 deletions
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..71de129
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,24 @@
+NAME = threads
+VERSION = 0
+PATCHLEVEL = 1
+SUBLEVEL =
+EXTRAVERSION =
+
+FULLVERSION = $(VERSION)$(if $(PATCHLEVEL),.$(PATCHLEVEL)$(if $(SUBLEVEL),.$(SUBLEVEL)))$(EXTRAVERSION)
+
+EXECUTABLE = $(NAME)-$(FULLVERSION)
+
+all: $(EXECUTABLE)
+
+SOURCES=$(shell find . -type f -name "*.c")
+OBJECTS=$(SOURCES:.c=.o)
+
+CFLAGS = -g -Wall -Wextra
+CLIBS = -lpthread
+
+$(EXECUTABLE): $(OBJECTS)
+ $(CC) $(CFLAGS) -o $(EXECUTABLE) $(OBJECTS) $(CLIBS)
+
+clean:
+ $(RM) $(OBJECTS) $(NAME)-*
+
diff --git a/log.c b/log.c
new file mode 100644
index 0000000..fbc318c
--- /dev/null
+++ b/log.c
@@ -0,0 +1,96 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdarg.h>
+#include <string.h>
+#include <time.h>
+#include <pthread.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include "log.h"
+
+static struct {
+ pthread_mutex_t *lock;
+ FILE *fp;
+ int level;
+ int quiet;
+} _l;
+
+static const char *_level_names[] = {
+ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"
+};
+
+static const char *_level_colors[] = {
+ "\x1b[94m", "\x1b[36m", "\x1b[32m", "\x1b[33m", "\x1b[31m", "\x1b[35m"
+};
+
+static void lock(void)
+{
+ if (!_l.lock) {
+ _l.lock = malloc(sizeof(pthread_mutex_t));
+ pthread_mutex_init(_l.lock, NULL);
+ }
+
+ pthread_mutex_lock(_l.lock);
+}
+
+static void unlock(void)
+{
+ pthread_mutex_unlock(_l.lock);
+}
+
+void log_set_fp(FILE *fp)
+{
+ _l.fp = fp;
+}
+
+void log_set_level(int level)
+{
+ _l.level = level;
+}
+
+void log_set_quiet(int enable)
+{
+ _l.quiet = enable ? 1 : 0;
+}
+
+void log_write(int level, const char *file, const char *fmt, ...)
+{
+ if (level < _l.level)
+ return;
+
+ lock();
+
+ time_t t = time(NULL);
+ struct tm *lt = localtime(&t);
+
+ if (!_l.quiet) {
+ va_list args;
+ char buf[16];
+ buf[strftime(buf, sizeof(buf), "%H:%M:%S", lt)] = '\0';
+
+ fprintf(stderr, "%s %s%-5s\x1b[0m \x1b[90m%d %s:\x1b[0m ", buf,
+ _level_colors[level],
+ _level_names[level], getpid(), file);
+
+ va_start(args, fmt);
+ vfprintf(stderr, fmt, args);
+ va_end(args);
+ fprintf(stderr, "\n");
+ fflush(stderr);
+ }
+
+ if (_l.fp) {
+ va_list args;
+ char buf[32];
+ buf[strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", lt)] = '\0';
+ fprintf(_l.fp, "%s %-5s %d %s: ", buf, _level_names[level], getpid(), file);
+ va_start(args, fmt);
+ vfprintf(_l.fp, fmt, args);
+ va_end(args);
+ fprintf(_l.fp, "\n");
+ fflush(_l.fp);
+ }
+
+ unlock();
+}
diff --git a/log.h b/log.h
new file mode 100644
index 0000000..021e802
--- /dev/null
+++ b/log.h
@@ -0,0 +1,27 @@
+#ifndef __LOG_H_
+#define __LOG_H_
+
+#include <stdio.h>
+#include <stdarg.h>
+#include <string.h>
+
+enum { LOG_TRACE, LOG_DEBUG, LOG_INFO, LOG_WARN, LOG_ERROR, LOG_FATAL };
+
+#define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
+
+typedef void (*log_lock_function)(void*, int);
+
+#define log_trace(...) log_write(LOG_TRACE, __FILENAME__, __VA_ARGS__)
+#define log_debug(...) log_write(LOG_DEBUG, __FILENAME__, __VA_ARGS__)
+#define log_info(...) log_write(LOG_INFO, __FILENAME__, __VA_ARGS__)
+#define log_warn(...) log_write(LOG_WARN, __FILENAME__, __VA_ARGS__)
+#define log_error(...) log_write(LOG_ERROR, __FILENAME__, __VA_ARGS__)
+#define log_fatal(...) log_write(LOG_FATAL, __FILENAME__, __VA_ARGS__)
+
+void log_set_fp(FILE *fp);
+void log_set_level(int level);
+void log_set_quiet(int enable);
+
+void log_write(int level, const char *file, const char *fmt, ...);
+
+#endif
diff --git a/main.c b/main.c
new file mode 100644
index 0000000..4ad2b1d
--- /dev/null
+++ b/main.c
@@ -0,0 +1,18 @@
+#include <unistd.h>
+#include "log.h"
+#include <stdbool.h>
+#include "threads.h"
+
+int main(int argc, char** argv)
+{
+ log_info("Started.");
+
+ bool spawn = true;
+ thread_pool_init(&spawn);
+
+ sleep(10);
+
+ thread_pool_stop();
+
+ return 0;
+}
diff --git a/threads.c b/threads.c
new file mode 100644
index 0000000..8161f6a
--- /dev/null
+++ b/threads.c
@@ -0,0 +1,201 @@
+#include "threads.h"
+#include "log.h"
+#include <semaphore.h>
+#include <stdlib.h>
+#include <errno.h>
+
+static struct THREAD_POOL thread_pool;
+static bool pool_initialized = false;
+
+void* thread_worker(void *arg)
+{
+ struct THREAD_HANDLE *handle = (struct THREAD_HANDLE*)arg;
+
+ do {
+ log_debug("Thread %d waiting to be assigned a request", handle->thread_num);
+
+ re_wait:
+ if (sem_wait(&thread_pool.semaphore) != 0) {
+ if ((errno == EINTR || errno == EAGAIN)) {
+ log_debug("Re-wait %d", handle->thread_num);
+ goto re_wait;
+ }
+
+ log_error("Thread %d failed waiting for semaphore: %s: Exiting", handle->thread_num);
+ break;
+ }
+
+ log_debug("Thread %d got semaphore", handle->thread_num);
+
+ /*
+ * The server is exiting.
+ */
+ if (thread_pool.stop_flag)
+ break;
+ } while(handle->status != THREAD_CANCELLED);
+
+ log_debug("Thread %d exiting...", handle->thread_num);
+
+ pthread_mutex_lock(&thread_pool.queue_mutex);
+ thread_pool.exited_threads++;
+ pthread_mutex_unlock(&thread_pool.queue_mutex);
+
+ handle->request = NULL;
+ handle->status = THREAD_EXITED;
+
+ return NULL;
+}
+
+/*
+ * Spawns a new thread and adds it in the thread pool.
+ */
+static struct THREAD_HANDLE *thread_spawn(time_t now)
+{
+ struct THREAD_HANDLE *handle;
+ int rcode;
+
+ if (thread_pool.total_threads >= thread_pool.max_threads) {
+ log_debug("Failed to spawn thread. Maximum number of threads (%d) already running.", thread_pool.max_threads);
+ return NULL;
+ }
+
+ handle = malloc(sizeof(struct THREAD_HANDLE));
+ memset(handle, 0, sizeof(struct THREAD_HANDLE));
+ handle->prev = NULL;
+ handle->next = NULL;
+ handle->thread_num = thread_pool.max_thread_num++;
+ handle->request_count = 0;
+ handle->status = THREAD_RUNNING;
+ handle->timestamp = time(NULL);
+
+ rcode = pthread_create(&handle->pthread_id, 0, thread_worker, handle);
+ if (rcode != 0) {
+ free(handle);
+ log_error("Thread create failed!");
+ return NULL;
+ }
+
+ thread_pool.total_threads++;
+ log_debug("Thread spawned new child %d, Total threads in pool: %d",
+ handle->thread_num, thread_pool.total_threads);
+
+ /*
+ * Add thread to tail of pool.
+ */
+ if (thread_pool.tail) {
+ thread_pool.tail->next = handle;
+ handle->prev = thread_pool.tail;
+ thread_pool.tail = handle;
+ } else {
+ thread_pool.head = thread_pool.tail = handle;
+ }
+
+ thread_pool.time_last_spawned = now;
+
+ return handle;
+}
+
+static void thread_delete(struct THREAD_HANDLE *handle)
+{
+ struct THREAD_HANDLE *prev;
+ struct THREAD_HANDLE *next;
+
+ log_debug("Deleting thread %d", handle->thread_num);
+
+ prev = handle->prev;
+ next = handle->next;
+
+ thread_pool.total_threads--;
+
+ if (prev == NULL)
+ thread_pool.head = next;
+ else
+ prev->next = next;
+
+
+ if (next == NULL)
+ thread_pool.tail = prev;
+ else
+ next->prev = prev;
+
+ free(handle);
+}
+
+int thread_pool_init(bool *spawn_flag)
+{
+ uint32_t i;
+ int rcode;
+ time_t now;
+
+ now = time(NULL);
+
+ memset(&thread_pool, 0, sizeof(struct THREAD_POOL));
+ thread_pool.head = NULL;
+ thread_pool.tail = NULL;
+ thread_pool.total_threads = 0;
+ thread_pool.max_thread_num = 1;
+ thread_pool.cleanup_delay = 5;
+ thread_pool.stop_flag = false;
+ thread_pool.spawn_flag = *spawn_flag;
+
+ thread_pool.start_threads = 3;
+ thread_pool.max_threads = 5;
+
+ if (!*spawn_flag)
+ return 0;
+
+ if ((pthread_mutex_init(&thread_pool.wait_mutex, NULL) != 0)) {
+ log_error("FATAL: Failed to initialize wait mutex");
+
+ return -1;
+ }
+
+ memset(&thread_pool.semaphore, 0, sizeof(thread_pool.semaphore));
+ rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);
+ if (rcode != 0) {
+ log_error("FATAL: Failed to initialize semaphore");
+ return -1;
+ }
+
+ /*
+ * TODO init request queue?
+ */
+
+
+ for (i = 0; i < thread_pool.start_threads; i++) {
+ if (thread_spawn(now) == NULL) {
+ return -1;
+ }
+ }
+
+ log_debug("Thread pool initialized");
+ pool_initialized = true;
+ return 0;
+}
+
+void thread_pool_stop(void)
+{
+ int total_threads;
+ int i;
+ struct THREAD_HANDLE *handle;
+ struct THREAD_HANDLE *next;
+
+ if (!pool_initialized)
+ return;
+
+ thread_pool.stop_flag = true;
+
+ /*
+ * Wakeup all thread to make them see stop flag.
+ */
+ total_threads = thread_pool.total_threads;
+ for (i = 0; i != total_threads; i++) {
+ sem_post(&thread_pool.semaphore);
+ }
+
+ for (handle = thread_pool.head; handle; handle = next) {
+ next = handle->next;
+ pthread_join(handle->pthread_id, NULL);
+ thread_delete(handle);
+ }
+}
diff --git a/threads.h b/threads.h
new file mode 100644
index 0000000..ee9079a
--- /dev/null
+++ b/threads.h
@@ -0,0 +1,68 @@
+#ifndef __THREADS_H_
+#define __THREADS_H_
+
+#include <pthread.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <stdbool.h>
+#include <semaphore.h>
+
+#define SEMAPHORE_LOCKED (0)
+
+#define THREAD_RUNNING (1)
+#define THREAD_CANCELLED (2)
+#define THREAD_EXITED (3)
+
+struct REQUEST {
+ int id;
+};
+
+struct THREAD_HANDLE {
+ struct THREAD_HANDLE *prev;
+ struct THREAD_HANDLE *next;
+ pthread_t pthread_id;
+ int thread_num;
+ int status;
+ unsigned int request_count;
+ time_t timestamp;
+ struct REQUEST *request;
+};
+
+struct THREAD_POOL {
+ struct THREAD_HANDLE *head;
+ struct THREAD_HANDLE *tail;
+
+ uint32_t total_threads;
+
+ uint32_t max_thread_num;
+ uint32_t start_threads;
+ uint32_t max_threads;
+ uint32_t min_spare_threads;
+ uint32_t max_spare_threads;
+ uint32_t max_request_per_thread;
+ uint32_t request_count;
+ time_t time_last_spawned;
+ uint32_t cleanup_delay;
+ bool stop_flag;
+ bool spawn_flag;
+
+ pthread_mutex_t wait_mutex;
+ /*
+ * Threads wait for this semaphore for requests
+ * to enter the queue
+ */
+ sem_t semaphore;
+
+ pthread_mutex_t queue_mutex;
+ uint32_t active_threads; /* Protected by queue_mutex */
+ uint32_t exited_threads;
+ uint32_t num_queued;
+ /*
+ * TODO Add a queue
+ */
+};
+
+int thread_pool_init(bool *spawn_flag);
+void thread_pool_stop(void);
+
+#endif