summaryrefslogtreecommitdiff
path: root/src/pxythrmgr.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/pxythrmgr.cc')
-rw-r--r--src/pxythrmgr.cc340
1 files changed, 144 insertions, 196 deletions
diff --git a/src/pxythrmgr.cc b/src/pxythrmgr.cc
index a670732..25aff4d 100644
--- a/src/pxythrmgr.cc
+++ b/src/pxythrmgr.cc
@@ -34,6 +34,11 @@
#include <string.h>
#include <pthread.h>
+#include <tuple>
+#include <vector>
+#include <thread>
+#include <mutex>
+
/*
* Proxy thread manager: manages the connection handling worker threads
* and the per-thread resources (i.e. event bases). The load is shared
@@ -43,195 +48,151 @@
* The attach and detach functions are thread-safe.
*/
-typedef struct pxy_thr_ctx {
- pthread_t thr;
- size_t load;
- struct event_base *evbase;
- struct evdns_base *dnsbase;
- int running;
-} pxy_thr_ctx_t;
-
-struct pxy_thrmgr_ctx {
- int num_thr;
- tfe_config *opts;
- pxy_thr_ctx_t **thr;
- pthread_mutex_t mutex;
+struct tfe_thread_ctx
+{
+ pthread_t thr;
+
+ int thread_id;
+ size_t load;
+ struct event_base *evbase;
+ struct evdns_base *dnsbase;
+ bool running;
+
+ tfe_thread_ctx(int thread_id, bool en_dns_base);
+ ~tfe_thread_ctx();
+};
+
+tfe_thread_ctx::tfe_thread_ctx(int thread_id, bool en_dns_base) : thread_id(thread_id), running(false)
+{
+ evbase = event_base_new();
+
+ if (evbase == nullptr)
+ {
+ throw std::runtime_error("Failed at create event_base on thread " + std::to_string(thread_id));
+ }
+
+ if (en_dns_base)
+ {
+ dnsbase = evdns_base_new(evbase, 1);
+ }
+
+ if (en_dns_base && dnsbase == nullptr)
+ {
+ throw std::runtime_error("Failed at create dnsbase on thread " + std::to_string(thread_id));
+ }
+}
+
+tfe_thread_ctx::~tfe_thread_ctx()
+{
+ if (dnsbase != nullptr) evdns_base_free(dnsbase, 0);
+ if (evbase != nullptr) event_base_free(evbase);
+}
+
+struct tfe_thread_manager_ctx
+{
+ tfe_config *opts{nullptr};
+ unsigned int nr_thread{0};
+
+ std::mutex lock;
+ std::vector<tfe_thread_ctx *> thr_ctx;
};
/*
* Dummy recurring timer event to prevent the event loops from exiting when
* they run out of events.
*/
-static void
-pxy_thrmgr_timer_cb(UNUSED evutil_socket_t fd, UNUSED short what,
- UNUSED void *arg)
+static void __dummy_event_handler(
+ UNUSED evutil_socket_t fd, UNUSED short what,
+ UNUSED void *arg)
{
- /* do nothing */
+ /* do nothing */
}
/*
* Thread entry point; runs the event loop of the event base.
* Does not exit until the libevent loop is broken explicitly.
*/
-static void *
-pxy_thrmgr_thr(void *arg)
+static void *__tfe_thrmgr_thread_entry(void *arg)
{
- pxy_thr_ctx_t *ctx = arg;
- struct timeval timer_delay = {60, 0};
- struct event *ev;
-
- ev = event_new(ctx->evbase, -1, EV_PERSIST, pxy_thrmgr_timer_cb, NULL);
- if (!ev)
- return NULL;
- evtimer_add(ev, &timer_delay);
- ctx->running = 1;
- event_base_dispatch(ctx->evbase);
- event_free(ev);
-
- return NULL;
+ struct tfe_thread_ctx *ctx = (struct tfe_thread_ctx *) arg;
+ struct timeval timer_delay = {60, 0};
+
+ struct event *ev;
+ ev = event_new(ctx->evbase, -1, EV_PERSIST, __dummy_event_handler, NULL);
+
+ if (!ev)
+ return NULL;
+
+ evtimer_add(ev, &timer_delay);
+ ctx->running = 1;
+ event_base_dispatch(ctx->evbase);
+ event_free(ev);
+
+ return NULL;
}
/*
* Create new thread manager but do not start any threads yet.
* This gets called before forking to background.
*/
-pxy_thrmgr_ctx_t *
-pxy_thrmgr_new(tfe_config *opts)
+struct tfe_thread_manager_ctx *tfe_thread_manager_new(tfe_config *opts)
{
- pxy_thrmgr_ctx_t *ctx;
-
- if (!(ctx = malloc(sizeof(pxy_thrmgr_ctx_t))))
- return NULL;
- memset(ctx, 0, sizeof(pxy_thrmgr_ctx_t));
+ struct tfe_thread_manager_ctx *ctx = new tfe_thread_manager_ctx;
- ctx->opts = opts;
- ctx->num_thr = 2 * sys_get_cpu_cores();
- return ctx;
+ //TODO: 用户配置线程数量
+ //TODO: 用户配置线程亲和性
+ ctx->opts = opts;
+ ctx->nr_thread = 2 * sys_get_cpu_cores();
+ return ctx;
}
-/*
- * Start the thread manager and associated threads.
- * This must be called after forking.
- *
- * Returns -1 on failure, 0 on success.
- */
-int
-pxy_thrmgr_run(pxy_thrmgr_ctx_t *ctx)
+int tfe_thread_manager_run(struct tfe_thread_manager_ctx *ctx)
{
- int idx = -1, dns = 0;
-
- dns = tfe_config_has_dns_spec(ctx->opts);
-
- if (pthread_mutex_init(&ctx->mutex, NULL)) {
- log_dbg_printf("Failed to initialize mutex\n");
- goto leave;
- }
-
- if (!(ctx->thr = malloc(ctx->num_thr * sizeof(pxy_thr_ctx_t*)))) {
- log_dbg_printf("Failed to allocate memory\n");
- goto leave;
- }
- memset(ctx->thr, 0, ctx->num_thr * sizeof(pxy_thr_ctx_t*));
-
- for (idx = 0; idx < ctx->num_thr; idx++) {
- if (!(ctx->thr[idx] = malloc(sizeof(pxy_thr_ctx_t)))) {
- log_dbg_printf("Failed to allocate memory\n");
- goto leave;
- }
- memset(ctx->thr[idx], 0, sizeof(pxy_thr_ctx_t));
- ctx->thr[idx]->evbase = event_base_new();
- if (!ctx->thr[idx]->evbase) {
- log_dbg_printf("Failed to create evbase %d\n", idx);
- goto leave;
- }
- if (dns) {
- /* only create dns base if we actually need it later */
- ctx->thr[idx]->dnsbase = evdns_base_new(
- ctx->thr[idx]->evbase, 1);
- if (!ctx->thr[idx]->dnsbase) {
- log_dbg_printf("Failed to create dnsbase %d\n",
- idx);
- goto leave;
- }
- }
- ctx->thr[idx]->load = 0;
- ctx->thr[idx]->running = 0;
- }
-
- log_dbg_printf("Initialized %d connection handling threads\n",
- ctx->num_thr);
-
- for (idx = 0; idx < ctx->num_thr; idx++) {
- if (pthread_create(&ctx->thr[idx]->thr, NULL,
- pxy_thrmgr_thr, ctx->thr[idx]))
- goto leave_thr;
- while (!ctx->thr[idx]->running) {
- sched_yield();
- }
- }
-
- log_dbg_printf("Started %d connection handling threads\n",
- ctx->num_thr);
-
- return 0;
-
-leave_thr:
- idx--;
- while (idx >= 0) {
- pthread_cancel(ctx->thr[idx]->thr);
- pthread_join(ctx->thr[idx]->thr, NULL);
- idx--;
- }
- idx = ctx->num_thr - 1;
-
-leave:
- while (idx >= 0) {
- if (ctx->thr[idx]) {
- if (ctx->thr[idx]->dnsbase) {
- evdns_base_free(ctx->thr[idx]->dnsbase, 0);
- }
- if (ctx->thr[idx]->evbase) {
- event_base_free(ctx->thr[idx]->evbase);
- }
- free(ctx->thr[idx]);
- }
- idx--;
- }
- pthread_mutex_destroy(&ctx->mutex);
- if (ctx->thr) {
- free(ctx->thr);
- ctx->thr = NULL;
- }
- return -1;
+ unsigned int thread_id;
+ bool dns = (bool)tfe_config_has_dns_spec(ctx->opts);
+
+ for (thread_id = 0; thread_id < ctx->nr_thread; thread_id++)
+ {
+ auto *__thread_ctx = new tfe_thread_ctx(thread_id, dns);
+ ctx->thr_ctx.push_back(__thread_ctx);
+ }
+
+ log_dbg_printf("Initialized %d connection handling threads\n", ctx->nr_thread);
+
+ for (thread_id = 0; thread_id < ctx->nr_thread; thread_id++)
+ {
+ if (pthread_create(&ctx->thr_ctx[thread_id]->thr, NULL,
+ __tfe_thrmgr_thread_entry, ctx->thr_ctx[thread_id]))
+ {
+ throw std::runtime_error("Failed at creating thread " + std::to_string(thread_id)
+ + " :" + std::string(strerror(errno)));
+ }
+
+ while (!ctx->thr_ctx[thread_id]->running)
+ {
+ sched_yield();
+ }
+ }
+
+ log_dbg_printf("Started %d connection handling threads\n", ctx->nr_thread);
+ return 0;
}
/*
* Destroy the event manager and stop all threads.
*/
-void
-pxy_thrmgr_free(pxy_thrmgr_ctx_t *ctx)
+void tfe_thread_manager_free(struct tfe_thread_manager_ctx *ctx)
{
- pthread_mutex_destroy(&ctx->mutex);
- if (ctx->thr) {
- for (int idx = 0; idx < ctx->num_thr; idx++) {
- event_base_loopbreak(ctx->thr[idx]->evbase);
- sched_yield();
- }
- for (int idx = 0; idx < ctx->num_thr; idx++) {
- pthread_join(ctx->thr[idx]->thr, NULL);
- }
- for (int idx = 0; idx < ctx->num_thr; idx++) {
- if (ctx->thr[idx]->dnsbase) {
- evdns_base_free(ctx->thr[idx]->dnsbase, 0);
- }
- if (ctx->thr[idx]->evbase) {
- event_base_free(ctx->thr[idx]->evbase);
- }
- free(ctx->thr[idx]);
- }
- free(ctx->thr);
- }
- free(ctx);
+ delete ctx;
+}
+
+void tfe_thread_manager_attach_by_thread_id(
+ tfe_thread_manager_ctx *ctx, int thread_id,
+ struct event_base **ev_base_out, struct evdns_base **evdns_base_out)
+{
+ std::lock_guard<decltype(ctx->lock)> __lock_guard(ctx->lock);
+ *ev_base_out = ctx->thr_ctx[thread_id]->evbase;
+ *evdns_base_out = ctx->thr_ctx[thread_id]->dnsbase;
}
/*
@@ -240,51 +201,38 @@ pxy_thrmgr_free(pxy_thrmgr_ctx_t *ctx)
* Returns the index of the chosen thread (for passing to _detach later).
* This function cannot fail.
*/
-int
-pxy_thrmgr_attach(pxy_thrmgr_ctx_t *ctx, struct event_base **evbase,
- struct evdns_base **dnsbase)
+int tfe_thread_manager_attach(tfe_thread_manager_ctx *ctx,
+ struct event_base **evbase, struct evdns_base **dnsbase)
{
- int thridx;
- size_t minload;
-
- thridx = 0;
- pthread_mutex_lock(&ctx->mutex);
- minload = ctx->thr[thridx]->load;
-#ifdef DEBUG_THREAD
- log_dbg_printf("===> Proxy connection handler thread status:\n"
- "thr[%d]: %zu\n", thridx, minload);
-#endif /* DEBUG_THREAD */
- for (int idx = 1; idx < ctx->num_thr; idx++) {
-#ifdef DEBUG_THREAD
- log_dbg_printf("thr[%d]: %zu\n", idx, ctx->thr[idx]->load);
-#endif /* DEBUG_THREAD */
- if (minload > ctx->thr[idx]->load) {
- minload = ctx->thr[idx]->load;
- thridx = idx;
- }
- }
- *evbase = ctx->thr[thridx]->evbase;
- *dnsbase = ctx->thr[thridx]->dnsbase;
- ctx->thr[thridx]->load++;
- pthread_mutex_unlock(&ctx->mutex);
-
-#ifdef DEBUG_THREAD
- log_dbg_printf("thridx: %d\n", thridx);
-#endif /* DEBUG_THREAD */
-
- return thridx;
+ std::lock_guard<decltype(ctx->lock)> __lock_guard(ctx->lock);
+
+ int min_thread_id = 0;
+ size_t min_load = ctx->thr_ctx[min_thread_id]->load;
+
+ for (unsigned thread_id = 1; thread_id < ctx->nr_thread; thread_id++)
+ {
+ if (min_load > ctx->thr_ctx[min_thread_id]->load)
+ {
+ min_load = ctx->thr_ctx[thread_id]->load;
+ min_thread_id = thread_id;
+ }
+ }
+
+ *evbase = ctx->thr_ctx[min_thread_id]->evbase;
+ *dnsbase = ctx->thr_ctx[min_thread_id]->dnsbase;
+
+ ctx->thr_ctx[min_thread_id]->load++;
+ return min_thread_id;
}
/*
* Detach a connection from a thread by index.
* This function cannot fail.
*/
-void
-pxy_thrmgr_detach(pxy_thrmgr_ctx_t *ctx, int thridx)
+void tfe_thread_manager_detach(tfe_thread_manager_ctx *ctx, int thread_id)
{
- pthread_mutex_lock(&ctx->mutex);
- ctx->thr[thridx]->load--;
- pthread_mutex_unlock(&ctx->mutex);
+ std::lock_guard<decltype(ctx->lock)> __lock_guard(ctx->lock);
+ ctx->thr_ctx[thread_id]->load--;
}
-/* vim: set noet ft=c: */
+/* vim: set noet ft=c: */ \ No newline at end of file