summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichiel Schuurmans <michielschuurmans@gmail.com>2020-12-11 20:42:18 +0100
committerMichiel Schuurmans <michielschuurmans@gmail.com>2020-12-11 20:42:18 +0100
commit0cd799afc2d12a955c1ebbd96a53649d7d4a688b (patch)
tree16d970bede7544298320b266e459f7a3c8d45d52
parent9a410f6fa56a2462c64b37c140c76fa9c59f019a (diff)
Add atomic fifo queue
Signed-off-by: Michiel Schuurmans <michielschuurmans@gmail.com>
-rw-r--r--.gitignore4
-rw-r--r--Makefile2
-rw-r--r--queue.c117
-rw-r--r--queue.h36
-rw-r--r--threads.c13
-rw-r--r--threads.h8
6 files changed, 172 insertions, 8 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..72da18f
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,4 @@
+*.o
+*.sw*
+threads-*
+*.log
diff --git a/Makefile b/Makefile
index 71de129..33e07e2 100644
--- a/Makefile
+++ b/Makefile
@@ -14,7 +14,7 @@ SOURCES=$(shell find . -type f -name "*.c")
OBJECTS=$(SOURCES:.c=.o)
CFLAGS = -g -Wall -Wextra
-CLIBS = -lpthread
+CLIBS = -lpthread -ltalloc
$(EXECUTABLE): $(OBJECTS)
$(CC) $(CFLAGS) -o $(EXECUTABLE) $(OBJECTS) $(CLIBS)
diff --git a/queue.c b/queue.c
new file mode 100644
index 0000000..6d51072
--- /dev/null
+++ b/queue.c
@@ -0,0 +1,117 @@
+#include "queue.h"
+#include <talloc.h>
+#include <stdbool.h>
+
+struct atomic_queue_t *atomic_queue_create(TALLOC_CTX *ctx, int size)
+{
+ int i;
+ int64_t seq;
+ struct atomic_queue_t *aq;
+
+ if (size <= 0)
+ return NULL;
+
+ aq = talloc_size(ctx, sizeof(*aq) + (size - 1) * sizeof(aq->entry[0]));
+
+ if (!aq)
+ return NULL;
+
+ talloc_set_name(aq, "atomic_queue_t");
+
+ for (i = 0; i < size; i++) {
+ seq = i;
+
+ aq->entry[i].data = NULL;
+ store(aq->entry[i].seq, seq);
+ }
+
+ aq->size = size;
+
+ store(aq->head, 0);
+ store(aq->tail, 0);
+ atomic_thread_fence(memory_order_seq_cst);
+
+ return aq;
+}
+
+bool atomic_queue_push(struct atomic_queue_t *aq, void *data)
+{
+ int64_t head;
+ struct atomic_queue_entry_t *entry;
+
+ if (!data)
+ return false;
+
+ head = load(aq->head);
+
+ for (;;) {
+ int64_t seq, diff;
+
+ entry = &aq->entry[ head % aq->size ];
+ seq = aquire(entry->seq);
+ diff = (seq - head);
+
+ /*
+ * Head is larger then current entry, queue is full
+ */
+ if (diff < 0) {
+ return false;
+ }
+
+
+ /*
+ * This entry is written to. Get new head pointer and continue
+ */
+ if (diff > 0) {
+ head = load(aq->head);
+ continue;
+ }
+
+ if (cas_incr(aq->head, head)) {
+ break;
+ }
+ }
+
+ entry->data = data;
+ store(entry->seq, head + 1);
+
+ return true;
+}
+
+bool atomic_queue_pop(struct atomic_queue_t *aq, void **p_data)
+{
+ int64_t tail, seq;
+ struct atomic_queue_entry_t *entry;
+
+ if (!p_data)
+ return false;
+
+ tail = load(aq->tail);
+
+ for (;;) {
+ int64_t diff;
+
+ entry = &aq->entry[ tail % aq->size ];
+ seq = aquire(entry->seq);
+
+ diff = (seq - (tail + 1));
+
+ if (diff < 0)
+ return false;
+
+ if (diff > 0) {
+ tail = load(aq->tail);
+ continue;
+ }
+
+ if (cas_incr(aq->tail, tail))
+ break;
+ }
+
+ *p_data = entry->data;
+
+ seq = tail + aq->size;
+ store(entry->seq, seq);
+
+ return true;
+}
diff --git a/queue.h b/queue.h
new file mode 100644
index 0000000..3a0e3a0
--- /dev/null
+++ b/queue.h
@@ -0,0 +1,36 @@
+#ifndef __QUEUE_H_
+#define __QUEUE_H_
+
+#include <stdalign.h>
+#include <stdint.h>
+#include <inttypes.h>
+#include <stdatomic.h>
+#include <talloc.h>
+#include <stdbool.h>
+
+#define atomic_int64_t _Atomic(int64_t)
+
+#define cas_incr(_store, _var) atomic_compare_exchange_strong_explicit(&_store, &_var, _var + 1, memory_order_release, memory_order_relaxed)
+#define cas_decr(_store, _var) atomic_compare_exchange_strong_explicit(&_store, &_var, _var - 1, memory_order_release, memory_order_relaxed)
+#define load(_var) atomic_load_explicit(&_var, memory_order_relaxed)
+#define aquire(_var) atomic_load_explicit(&_var, memory_order_acquire)
+#define store(_store, _var) atomic_store_explicit(&_store, _var, memory_order_release);
+
+struct atomic_queue_entry_t {
+ alignas(128) void *data;
+ atomic_int64_t seq;
+};
+
+struct atomic_queue_t {
+ alignas(128) atomic_int64_t head;
+ atomic_int64_t tail;
+ int size;
+
+ struct atomic_queue_entry_t entry[1];
+};
+
+struct atomic_queue_t* atomic_queue_create(TALLOC_CTX *ctx, int size);
+bool atomic_queue_push(struct atomic_queue_t *aq, void *data);
+bool atomic_queue_pop(struct atomic_queue_t *aq, void **p_data);
+
+#endif
diff --git a/threads.c b/threads.c
index 8161f6a..24f93ef 100644
--- a/threads.c
+++ b/threads.c
@@ -3,6 +3,7 @@
#include <semaphore.h>
#include <stdlib.h>
#include <errno.h>
+#include "queue.h"
static struct THREAD_POOL thread_pool;
static bool pool_initialized = false;
@@ -137,6 +138,7 @@ int thread_pool_init(bool *spawn_flag)
thread_pool.cleanup_delay = 5;
thread_pool.stop_flag = false;
thread_pool.spawn_flag = *spawn_flag;
+ thread_pool.max_queue_size = 100;
thread_pool.start_threads = 3;
thread_pool.max_threads = 5;
@@ -157,10 +159,13 @@ int thread_pool_init(bool *spawn_flag)
return -1;
}
- /*
- * TODO init request queue?
- */
-
+ for (i = 0; i < NUM_FIFOS; i++) {
+ thread_pool.queue[i] = atomic_queue_create(NULL, thread_pool.max_queue_size);
+ if (!thread_pool.queue[i]) {
+ log_error("FATAL: Failed to set up request fifo");
+ return -1;
+ }
+ }
for (i = 0; i < thread_pool.start_threads; i++) {
if (thread_spawn(now) == NULL) {
diff --git a/threads.h b/threads.h
index ee9079a..741fdc7 100644
--- a/threads.h
+++ b/threads.h
@@ -13,6 +13,8 @@
#define THREAD_CANCELLED (2)
#define THREAD_EXITED (3)
+#define NUM_FIFOS 5
+
struct REQUEST {
int id;
};
@@ -45,6 +47,7 @@ struct THREAD_POOL {
uint32_t cleanup_delay;
bool stop_flag;
bool spawn_flag;
+ uint32_t max_queue_size;
pthread_mutex_t wait_mutex;
/*
@@ -57,9 +60,8 @@ struct THREAD_POOL {
uint32_t active_threads; /* Protected by queue_mutex */
uint32_t exited_threads;
uint32_t num_queued;
- /*
- * TODO Add a queue
- */
+
+ struct atomic_queue_t *queue[NUM_FIFOS];
};
int thread_pool_init(bool *spawn_flag);