diff options
Diffstat (limited to 'src/pxythrmgr.cc')
| -rw-r--r-- | src/pxythrmgr.cc | 340 |
1 files changed, 144 insertions, 196 deletions
diff --git a/src/pxythrmgr.cc b/src/pxythrmgr.cc index a670732..25aff4d 100644 --- a/src/pxythrmgr.cc +++ b/src/pxythrmgr.cc @@ -34,6 +34,11 @@ #include <string.h> #include <pthread.h> +#include <tuple> +#include <vector> +#include <thread> +#include <mutex> + /* * Proxy thread manager: manages the connection handling worker threads * and the per-thread resources (i.e. event bases). The load is shared @@ -43,195 +48,151 @@ * The attach and detach functions are thread-safe. */ -typedef struct pxy_thr_ctx { - pthread_t thr; - size_t load; - struct event_base *evbase; - struct evdns_base *dnsbase; - int running; -} pxy_thr_ctx_t; - -struct pxy_thrmgr_ctx { - int num_thr; - tfe_config *opts; - pxy_thr_ctx_t **thr; - pthread_mutex_t mutex; +struct tfe_thread_ctx +{ + pthread_t thr; + + int thread_id; + size_t load; + struct event_base *evbase; + struct evdns_base *dnsbase; + bool running; + + tfe_thread_ctx(int thread_id, bool en_dns_base); + ~tfe_thread_ctx(); +}; + +tfe_thread_ctx::tfe_thread_ctx(int thread_id, bool en_dns_base) : thread_id(thread_id), running(false) +{ + evbase = event_base_new(); + + if (evbase == nullptr) + { + throw std::runtime_error("Failed at create event_base on thread " + std::to_string(thread_id)); + } + + if (en_dns_base) + { + dnsbase = evdns_base_new(evbase, 1); + } + + if (en_dns_base && dnsbase == nullptr) + { + throw std::runtime_error("Failed at create dnsbase on thread " + std::to_string(thread_id)); + } +} + +tfe_thread_ctx::~tfe_thread_ctx() +{ + if (dnsbase != nullptr) evdns_base_free(dnsbase, 0); + if (evbase != nullptr) event_base_free(evbase); +} + +struct tfe_thread_manager_ctx +{ + tfe_config *opts{nullptr}; + unsigned int nr_thread{0}; + + std::mutex lock; + std::vector<tfe_thread_ctx *> thr_ctx; }; /* * Dummy recurring timer event to prevent the event loops from exiting when * they run out of events. */ -static void -pxy_thrmgr_timer_cb(UNUSED evutil_socket_t fd, UNUSED short what, - UNUSED void *arg) +static void __dummy_event_handler( + UNUSED evutil_socket_t fd, UNUSED short what, + UNUSED void *arg) { - /* do nothing */ + /* do nothing */ } /* * Thread entry point; runs the event loop of the event base. * Does not exit until the libevent loop is broken explicitly. */ -static void * -pxy_thrmgr_thr(void *arg) +static void *__tfe_thrmgr_thread_entry(void *arg) { - pxy_thr_ctx_t *ctx = arg; - struct timeval timer_delay = {60, 0}; - struct event *ev; - - ev = event_new(ctx->evbase, -1, EV_PERSIST, pxy_thrmgr_timer_cb, NULL); - if (!ev) - return NULL; - evtimer_add(ev, &timer_delay); - ctx->running = 1; - event_base_dispatch(ctx->evbase); - event_free(ev); - - return NULL; + struct tfe_thread_ctx *ctx = (struct tfe_thread_ctx *) arg; + struct timeval timer_delay = {60, 0}; + + struct event *ev; + ev = event_new(ctx->evbase, -1, EV_PERSIST, __dummy_event_handler, NULL); + + if (!ev) + return NULL; + + evtimer_add(ev, &timer_delay); + ctx->running = 1; + event_base_dispatch(ctx->evbase); + event_free(ev); + + return NULL; } /* * Create new thread manager but do not start any threads yet. * This gets called before forking to background. */ -pxy_thrmgr_ctx_t * -pxy_thrmgr_new(tfe_config *opts) +struct tfe_thread_manager_ctx *tfe_thread_manager_new(tfe_config *opts) { - pxy_thrmgr_ctx_t *ctx; - - if (!(ctx = malloc(sizeof(pxy_thrmgr_ctx_t)))) - return NULL; - memset(ctx, 0, sizeof(pxy_thrmgr_ctx_t)); + struct tfe_thread_manager_ctx *ctx = new tfe_thread_manager_ctx; - ctx->opts = opts; - ctx->num_thr = 2 * sys_get_cpu_cores(); - return ctx; + //TODO: 用户配置线程数量 + //TODO: 用户配置线程亲和性 + ctx->opts = opts; + ctx->nr_thread = 2 * sys_get_cpu_cores(); + return ctx; } -/* - * Start the thread manager and associated threads. - * This must be called after forking. - * - * Returns -1 on failure, 0 on success. - */ -int -pxy_thrmgr_run(pxy_thrmgr_ctx_t *ctx) +int tfe_thread_manager_run(struct tfe_thread_manager_ctx *ctx) { - int idx = -1, dns = 0; - - dns = tfe_config_has_dns_spec(ctx->opts); - - if (pthread_mutex_init(&ctx->mutex, NULL)) { - log_dbg_printf("Failed to initialize mutex\n"); - goto leave; - } - - if (!(ctx->thr = malloc(ctx->num_thr * sizeof(pxy_thr_ctx_t*)))) { - log_dbg_printf("Failed to allocate memory\n"); - goto leave; - } - memset(ctx->thr, 0, ctx->num_thr * sizeof(pxy_thr_ctx_t*)); - - for (idx = 0; idx < ctx->num_thr; idx++) { - if (!(ctx->thr[idx] = malloc(sizeof(pxy_thr_ctx_t)))) { - log_dbg_printf("Failed to allocate memory\n"); - goto leave; - } - memset(ctx->thr[idx], 0, sizeof(pxy_thr_ctx_t)); - ctx->thr[idx]->evbase = event_base_new(); - if (!ctx->thr[idx]->evbase) { - log_dbg_printf("Failed to create evbase %d\n", idx); - goto leave; - } - if (dns) { - /* only create dns base if we actually need it later */ - ctx->thr[idx]->dnsbase = evdns_base_new( - ctx->thr[idx]->evbase, 1); - if (!ctx->thr[idx]->dnsbase) { - log_dbg_printf("Failed to create dnsbase %d\n", - idx); - goto leave; - } - } - ctx->thr[idx]->load = 0; - ctx->thr[idx]->running = 0; - } - - log_dbg_printf("Initialized %d connection handling threads\n", - ctx->num_thr); - - for (idx = 0; idx < ctx->num_thr; idx++) { - if (pthread_create(&ctx->thr[idx]->thr, NULL, - pxy_thrmgr_thr, ctx->thr[idx])) - goto leave_thr; - while (!ctx->thr[idx]->running) { - sched_yield(); - } - } - - log_dbg_printf("Started %d connection handling threads\n", - ctx->num_thr); - - return 0; - -leave_thr: - idx--; - while (idx >= 0) { - pthread_cancel(ctx->thr[idx]->thr); - pthread_join(ctx->thr[idx]->thr, NULL); - idx--; - } - idx = ctx->num_thr - 1; - -leave: - while (idx >= 0) { - if (ctx->thr[idx]) { - if (ctx->thr[idx]->dnsbase) { - evdns_base_free(ctx->thr[idx]->dnsbase, 0); - } - if (ctx->thr[idx]->evbase) { - event_base_free(ctx->thr[idx]->evbase); - } - free(ctx->thr[idx]); - } - idx--; - } - pthread_mutex_destroy(&ctx->mutex); - if (ctx->thr) { - free(ctx->thr); - ctx->thr = NULL; - } - return -1; + unsigned int thread_id; + bool dns = (bool)tfe_config_has_dns_spec(ctx->opts); + + for (thread_id = 0; thread_id < ctx->nr_thread; thread_id++) + { + auto *__thread_ctx = new tfe_thread_ctx(thread_id, dns); + ctx->thr_ctx.push_back(__thread_ctx); + } + + log_dbg_printf("Initialized %d connection handling threads\n", ctx->nr_thread); + + for (thread_id = 0; thread_id < ctx->nr_thread; thread_id++) + { + if (pthread_create(&ctx->thr_ctx[thread_id]->thr, NULL, + __tfe_thrmgr_thread_entry, ctx->thr_ctx[thread_id])) + { + throw std::runtime_error("Failed at creating thread " + std::to_string(thread_id) + + " :" + std::string(strerror(errno))); + } + + while (!ctx->thr_ctx[thread_id]->running) + { + sched_yield(); + } + } + + log_dbg_printf("Started %d connection handling threads\n", ctx->nr_thread); + return 0; } /* * Destroy the event manager and stop all threads. */ -void -pxy_thrmgr_free(pxy_thrmgr_ctx_t *ctx) +void tfe_thread_manager_free(struct tfe_thread_manager_ctx *ctx) { - pthread_mutex_destroy(&ctx->mutex); - if (ctx->thr) { - for (int idx = 0; idx < ctx->num_thr; idx++) { - event_base_loopbreak(ctx->thr[idx]->evbase); - sched_yield(); - } - for (int idx = 0; idx < ctx->num_thr; idx++) { - pthread_join(ctx->thr[idx]->thr, NULL); - } - for (int idx = 0; idx < ctx->num_thr; idx++) { - if (ctx->thr[idx]->dnsbase) { - evdns_base_free(ctx->thr[idx]->dnsbase, 0); - } - if (ctx->thr[idx]->evbase) { - event_base_free(ctx->thr[idx]->evbase); - } - free(ctx->thr[idx]); - } - free(ctx->thr); - } - free(ctx); + delete ctx; +} + +void tfe_thread_manager_attach_by_thread_id( + tfe_thread_manager_ctx *ctx, int thread_id, + struct event_base **ev_base_out, struct evdns_base **evdns_base_out) +{ + std::lock_guard<decltype(ctx->lock)> __lock_guard(ctx->lock); + *ev_base_out = ctx->thr_ctx[thread_id]->evbase; + *evdns_base_out = ctx->thr_ctx[thread_id]->dnsbase; } /* @@ -240,51 +201,38 @@ pxy_thrmgr_free(pxy_thrmgr_ctx_t *ctx) * Returns the index of the chosen thread (for passing to _detach later). * This function cannot fail. */ -int -pxy_thrmgr_attach(pxy_thrmgr_ctx_t *ctx, struct event_base **evbase, - struct evdns_base **dnsbase) +int tfe_thread_manager_attach(tfe_thread_manager_ctx *ctx, + struct event_base **evbase, struct evdns_base **dnsbase) { - int thridx; - size_t minload; - - thridx = 0; - pthread_mutex_lock(&ctx->mutex); - minload = ctx->thr[thridx]->load; -#ifdef DEBUG_THREAD - log_dbg_printf("===> Proxy connection handler thread status:\n" - "thr[%d]: %zu\n", thridx, minload); -#endif /* DEBUG_THREAD */ - for (int idx = 1; idx < ctx->num_thr; idx++) { -#ifdef DEBUG_THREAD - log_dbg_printf("thr[%d]: %zu\n", idx, ctx->thr[idx]->load); -#endif /* DEBUG_THREAD */ - if (minload > ctx->thr[idx]->load) { - minload = ctx->thr[idx]->load; - thridx = idx; - } - } - *evbase = ctx->thr[thridx]->evbase; - *dnsbase = ctx->thr[thridx]->dnsbase; - ctx->thr[thridx]->load++; - pthread_mutex_unlock(&ctx->mutex); - -#ifdef DEBUG_THREAD - log_dbg_printf("thridx: %d\n", thridx); -#endif /* DEBUG_THREAD */ - - return thridx; + std::lock_guard<decltype(ctx->lock)> __lock_guard(ctx->lock); + + int min_thread_id = 0; + size_t min_load = ctx->thr_ctx[min_thread_id]->load; + + for (unsigned thread_id = 1; thread_id < ctx->nr_thread; thread_id++) + { + if (min_load > ctx->thr_ctx[min_thread_id]->load) + { + min_load = ctx->thr_ctx[thread_id]->load; + min_thread_id = thread_id; + } + } + + *evbase = ctx->thr_ctx[min_thread_id]->evbase; + *dnsbase = ctx->thr_ctx[min_thread_id]->dnsbase; + + ctx->thr_ctx[min_thread_id]->load++; + return min_thread_id; } /* * Detach a connection from a thread by index. * This function cannot fail. */ -void -pxy_thrmgr_detach(pxy_thrmgr_ctx_t *ctx, int thridx) +void tfe_thread_manager_detach(tfe_thread_manager_ctx *ctx, int thread_id) { - pthread_mutex_lock(&ctx->mutex); - ctx->thr[thridx]->load--; - pthread_mutex_unlock(&ctx->mutex); + std::lock_guard<decltype(ctx->lock)> __lock_guard(ctx->lock); + ctx->thr_ctx[thread_id]->load--; } -/* vim: set noet ft=c: */ +/* vim: set noet ft=c: */
\ No newline at end of file |
