diff options
| author | Lu <[email protected]> | 2018-08-21 16:11:50 +0800 |
|---|---|---|
| committer | Lu <[email protected]> | 2018-08-21 16:11:50 +0800 |
| commit | 768235920afba76f01dea6a2194d67a35b51143c (patch) | |
| tree | 38f869c372d24f9a44efe9935781607b191c07a8 /platform/src/proxy.cpp | |
| parent | db055eeac8ed3671af4649a3afd4e825f44a49df (diff) | |
整理目录结构,调整框架部分实现,初步编译通过。
Diffstat (limited to 'platform/src/proxy.cpp')
| -rw-r--r-- | platform/src/proxy.cpp | 294 |
1 files changed, 294 insertions, 0 deletions
diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp new file mode 100644 index 0000000..10e2a3f --- /dev/null +++ b/platform/src/proxy.cpp @@ -0,0 +1,294 @@ + +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <sys/un.h> +#include <assert.h> +#include <signal.h> +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <errno.h> +#include <pthread.h> + +#include <event2/event.h> +#include <event2/listener.h> +#include <event2/bufferevent.h> +#include <event2/bufferevent_ssl.h> +#include <event2/buffer.h> +#include <event2/thread.h> + +#include <MESA/MESA_handle_logger.h> +#include <tfe_utils.h> +#include <tfe_stream.h> +#include <stream.h> +#include <kni.h> +#include <sescache.h> + +/* + * Proxy engine, built around libevent 2.x. + */ + +#define TFE_BACKLOG_DEFAULT 20 + +const char * module_name_pxy = "TFE_PXY"; +extern struct tfe_instance * g_tfe_instance; + +__thread int __currect_thread_id; + +static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg) +{ + return; +} + +/* + * Thread entry point; runs the event loop of the event base. + * Does not exit until the libevent loop is broken explicitly. + */ +static void * __tfe_thrmgr_thread_entry(void * arg) +{ + struct tfe_thread_ctx * ctx = (struct tfe_thread_ctx *) arg; + struct timeval timer_delay = {60, 0}; + + struct event * ev; + ev = event_new(ctx->evbase, -1, EV_PERSIST, __dummy_event_handler, NULL); + + if (!ev) return (void *)NULL; + + evtimer_add(ev, &timer_delay); + ctx->running = 1; + + __currect_thread_id = ctx->thread_id; + event_base_dispatch(ctx->evbase); + event_free(ev); + + return (void *)NULL; +} + +static int signals[] = {SIGTERM, SIGQUIT, SIGHUP, SIGINT, SIGPIPE, SIGUSR1}; + +struct tfe_proxy +{ + char name[TFE_SYMBOL_MAX]; + struct event_base * evbase; + struct event * sev[sizeof(signals) / sizeof(int)]; + struct event * gcev; + + struct tfe_config * opts; + void * main_logger; + + struct sess_cache * dsess_cache; + struct sess_cache * ssess_cache; + + unsigned int nr_work_threads; + struct tfe_thread_ctx * work_threads; + + unsigned int nr_modules; + struct tfe_plugin * modules; + + void * io_mod; +}; + +/* + * Signal handler for SIGTERM, SIGQUIT, SIGINT, SIGHUP, SIGPIPE and SIGUSR1. + */ +static void proxy_signal_cb(evutil_socket_t fd, short what, void * arg) +{ + tfe_proxy * ctx = (tfe_proxy *) arg; + switch (fd) + { + case SIGTERM: + case SIGQUIT: + case SIGINT: + case SIGHUP: + break; + case SIGUSR1: + break; + case SIGPIPE: + TFE_LOG_ERROR("Warning: Received SIGPIPE; ignoring.\n"); + break; + default: + TFE_LOG_ERROR("Warning: Received unexpected signal %i\n", fd); + break; + } +} + +static void proxy_gc_cb(evutil_socket_t fd, short what, void * arg) +{ + tfe_proxy * ctx = (tfe_proxy *) arg; + (void)fd; + (void)what; +} + +unsigned int select_work_thread(struct tfe_proxy * pxy) +{ + unsigned int min_thread_id = 0; + size_t min_load = pxy->work_threads[min_thread_id].load; + + for (unsigned thread_id = 1; thread_id < pxy->nr_work_threads; thread_id++) + { + if (min_load > pxy->work_threads[thread_id].load) + { + min_load = pxy->work_threads[thread_id].load; + min_thread_id = thread_id; + } + } + + pxy->work_threads[min_thread_id].load++; + return min_thread_id; +} +/* + * Callback for accept events on the socket listener bufferevent. + */ +static void io_mod_accept_cb(evutil_socket_t upstream_fd, evutil_socket_t downstream_fd, + enum tfe_session_proto session_type, void * arg) +{ + struct tfe_proxy * pxy = (struct tfe_proxy *) arg; + + unsigned int worker_tid = select_work_thread(pxy); + tfe_thread_ctx * worker_thread_ctx = &pxy->work_threads[worker_tid]; + + struct tfe_stream_private * stream = tfe_stream_create(upstream_fd, + downstream_fd, session_type, worker_thread_ctx); + + assert(stream != NULL); + return tfe_stream_setup(stream); +} + +/* + * Set up the core event loop. + * Socket clisock is the privsep client socket used for binding to ports. + * Returns ctx on success, or NULL on error. + */ +struct tfe_proxy * tfe_proxy_new(tfe_config * cfg) +{ + struct tfe_proxy * proxy = ALLOC(struct tfe_proxy, 1); + assert(proxy != NULL); + + struct timeval gc_delay = {60, 0}; + + /* adds locking, only required if accessed from separate threads */ + evthread_use_pthreads(); + event_enable_debug_mode(); + + proxy->evbase = event_base_new(); + proxy->dsess_cache = session_cache_init(); + proxy->ssess_cache = session_cache_init(); + + proxy->nr_modules = 2; + proxy->modules = ALLOC(struct tfe_plugin, proxy->nr_modules); + + proxy->modules[0].proto = APP_PROTO_HTTP1; + proxy->modules[1].proto = APP_PROTO_HTTP2; + + proxy->work_threads = ALLOC(struct tfe_thread_ctx, proxy->nr_work_threads); + + for (unsigned int i = 0; i < proxy->nr_work_threads; i++) + { + proxy->work_threads[i].thread_id = i; + proxy->work_threads[i].evbase = event_base_new(); + proxy->work_threads[i].dsess_cache = proxy->dsess_cache; + proxy->work_threads[i].ssess_cache = proxy->ssess_cache; + proxy->work_threads[i].nr_modules = proxy->nr_modules; + proxy->work_threads[i].modules = proxy->modules; + } + + //Todo: Not handle signal if have mutliple proxy instance. + for (size_t i = 0; i < (sizeof(signals) / sizeof(int)); i++) + { + proxy->sev[i] = evsignal_new(proxy->evbase, signals[i], proxy_signal_cb, proxy); + if (!proxy->sev[i]) goto error_out; + evsignal_add(proxy->sev[i], NULL); + } + + proxy->gcev = event_new(proxy->evbase, -1, EV_PERSIST, proxy_gc_cb, proxy); + if (!proxy->gcev) + goto error_out; + + evtimer_add(proxy->gcev, &gc_delay); + return proxy; + +error_out: + if (proxy->gcev) + { + event_free(proxy->gcev); + } + + for (size_t i = 0; i < (sizeof(proxy->sev) / sizeof(proxy->sev[0])); i++) + { + if (proxy->sev[i]) + { + event_free(proxy->sev[i]); + } + } + + for (typeof(proxy->nr_work_threads) i = 0; i < proxy->nr_work_threads; i++) + { + proxy->work_threads[i].thread_id = i; + event_base_free(proxy->work_threads[i].evbase); + } + + event_base_free(proxy->evbase); + + free(proxy); + return NULL; +} + +/* + * Run the event loop. Returns when the event loop is cancelled by a signal + * or on failure. + */ +void tfe_proxy_run(struct tfe_proxy * proxy) +{ + unsigned int thread_id; + for (thread_id = 0; thread_id < proxy->nr_work_threads; thread_id++) + { + if (pthread_create(&(proxy->work_threads[thread_id].thr), NULL, + __tfe_thrmgr_thread_entry, &(proxy->work_threads[thread_id]))) + { + MESA_handle_runtime_log(proxy->main_logger, RLOG_LV_FATAL, proxy->name, "pthread_create failed."); + } + + while (!proxy->work_threads[thread_id].running) + { + sched_yield(); + } + } + + event_base_dispatch(proxy->evbase); +} + +/* + * Break the loop of the proxy, causing the tfe_proxy_run to return. + */ +void proxy_loopbreak(tfe_proxy * ctx) +{ + event_base_loopbreak(ctx->evbase); +} + +/* + * Free the proxy data structures. + */ +void proxy_free(tfe_proxy * ctx) +{ +} + + +int main(int argc, char *argv[]) +{ + const char* main_profile="./conf/tfe_main.conf"; + + tfe_proxy *proxy=NULL; + void* wcfg_handle=NULL; + + //TODO: Initiate Local Cert Cache, Decryption Mirror, Field Stat, Logger and etc. + //NOTICE: Maat, Cert Store,Tango Cache are initiated in bussiness plugin. + +#if 0 + g_tfe_instance=ALLOC(struct tfe_instance,1); + proxy=tfe_proxy_new(g_tfe_cfg); +#endif + + tfe_proxy_run(proxy); + proxy_free(proxy); +} |
