summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichiel Schuurmans <michielschuurmans@gmail.com>2020-12-11 22:12:50 +0100
committerMichiel Schuurmans <michielschuurmans@gmail.com>2020-12-11 22:12:50 +0100
commit6088667f673370d78040fc5c85aa919a8188e540 (patch)
treedd7765b250855ed7f4f9b17dbccf2a7d2c649ec8
parent0cd799afc2d12a955c1ebbd96a53649d7d4a688b (diff)
more work on queuesv2
Signed-off-by: Michiel Schuurmans <michielschuurmans@gmail.com>
-rw-r--r--main.c16
-rw-r--r--queue.c1
-rw-r--r--threads.c58
-rw-r--r--threads.h16
4 files changed, 85 insertions, 6 deletions
diff --git a/main.c b/main.c
index 4ad2b1d..b329b15 100644
--- a/main.c
+++ b/main.c
@@ -1,16 +1,30 @@
#include <unistd.h>
+#include <stdlib.h>
#include "log.h"
#include <stdbool.h>
#include "threads.h"
int main(int argc, char** argv)
{
+ log_set_level(LOG_DEBUG);
log_info("Started.");
bool spawn = true;
thread_pool_init(&spawn);
- sleep(10);
+ int count = 0;
+ for (;;) {
+ struct REQUEST *r = malloc(sizeof(struct REQUEST));
+
+ r->priority = 0;
+
+ log_trace("Enqueing request.");
+ request_enqueue(r);
+ sleep(1);
+
+ if (count++ > 20)
+ break;
+ }
thread_pool_stop();
diff --git a/queue.c b/queue.c
index 6d51072..fba2717 100644
--- a/queue.c
+++ b/queue.c
@@ -1,4 +1,5 @@
#include "queue.h"
+#include "log.h"
#include <talloc.h>
#include <stdbool.h>
diff --git a/threads.c b/threads.c
index 24f93ef..f420db4 100644
--- a/threads.c
+++ b/threads.c
@@ -8,12 +8,48 @@
static struct THREAD_POOL thread_pool;
static bool pool_initialized = false;
+int request_enqueue(struct REQUEST *request)
+{
+ if (!atomic_queue_push(thread_pool.queue[request->priority], request)) {
+ log_error("Failed inserting request into the queue");
+ return 0;
+ }
+
+ thread_pool.request_count++;
+
+ sem_post(&thread_pool.semaphore);
+
+ return 1;
+}
+
+static int request_dequeue(struct REQUEST **prerequest)
+{
+ int i;
+ struct REQUEST *request = NULL;
+
+ for (i = 0; i < NUM_FIFOS; i++) {
+ if (!atomic_queue_pop(thread_pool.queue[i], (void **) &request))
+ continue;
+ else
+ break;
+ }
+
+ if (!request)
+ return 0;
+
+ *prerequest = request;
+
+ CAS_INCR(thread_pool.active_threads);
+
+ return 1;
+}
+
void* thread_worker(void *arg)
{
struct THREAD_HANDLE *handle = (struct THREAD_HANDLE*)arg;
do {
- log_debug("Thread %d waiting to be assigned a request", handle->thread_num);
+ log_trace("Thread %d waiting to be assigned a request", handle->thread_num);
re_wait:
if (sem_wait(&thread_pool.semaphore) != 0) {
@@ -26,13 +62,25 @@ void* thread_worker(void *arg)
break;
}
- log_debug("Thread %d got semaphore", handle->thread_num);
+ log_trace("Thread %d got semaphore", handle->thread_num);
/*
* The server is exiting.
*/
if (thread_pool.stop_flag)
break;
+
+ if (!request_dequeue(&handle->request))
+ continue;
+
+ handle->request_count++;
+
+ log_debug("thread %d Handling request", handle->thread_num);
+
+ /* does this work?? */
+ free(handle->request);
+
+ handle->request = NULL;
} while(handle->status != THREAD_CANCELLED);
log_debug("Thread %d exiting...", handle->thread_num);
@@ -138,7 +186,7 @@ int thread_pool_init(bool *spawn_flag)
thread_pool.cleanup_delay = 5;
thread_pool.stop_flag = false;
thread_pool.spawn_flag = *spawn_flag;
- thread_pool.max_queue_size = 100;
+ thread_pool.max_queue_size = 10;
thread_pool.start_threads = 3;
thread_pool.max_threads = 5;
@@ -203,4 +251,8 @@ void thread_pool_stop(void)
pthread_join(handle->pthread_id, NULL);
thread_delete(handle);
}
+
+ for (i = 0; i < NUM_FIFOS; i++) {
+ talloc_free(thread_pool.queue[i]);
+ }
}
diff --git a/threads.h b/threads.h
index 741fdc7..fc232b6 100644
--- a/threads.h
+++ b/threads.h
@@ -15,8 +15,18 @@
#define NUM_FIFOS 5
+#define CAS_INCR(_x) do { uint32_t num; \
+ num = load(_x); \
+ if (cas_incr(_x, num)) break; \
+ } while (true)
+
+#define CAS_DECR(_x) do { uint32_t num; \
+ num = load(_x); \
+ if (cas_decr(_x, num)) break; \
+ } while (true)
struct REQUEST {
int id;
+ int priority;
};
struct THREAD_HANDLE {
@@ -64,7 +74,9 @@ struct THREAD_POOL {
struct atomic_queue_t *queue[NUM_FIFOS];
};
-int thread_pool_init(bool *spawn_flag);
-void thread_pool_stop(void);
+int thread_pool_init(bool *spawn_flag);
+void thread_pool_stop(void);
+int request_enqueue(struct REQUEST *request);
+static int request_dequeue(struct REQUEST **prerequest);
#endif