summaryrefslogtreecommitdiff
path: root/platform/src/proxy.cpp
diff options
context:
space:
mode:
authorLu <[email protected]>2018-08-21 16:11:50 +0800
committerLu <[email protected]>2018-08-21 16:11:50 +0800
commit768235920afba76f01dea6a2194d67a35b51143c (patch)
tree38f869c372d24f9a44efe9935781607b191c07a8 /platform/src/proxy.cpp
parentdb055eeac8ed3671af4649a3afd4e825f44a49df (diff)
整理目录结构,调整框架部分实现,初步编译通过。
Diffstat (limited to 'platform/src/proxy.cpp')
-rw-r--r--platform/src/proxy.cpp294
1 files changed, 294 insertions, 0 deletions
diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp
new file mode 100644
index 0000000..10e2a3f
--- /dev/null
+++ b/platform/src/proxy.cpp
@@ -0,0 +1,294 @@
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <sys/un.h>
+#include <assert.h>
+#include <signal.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <pthread.h>
+
+#include <event2/event.h>
+#include <event2/listener.h>
+#include <event2/bufferevent.h>
+#include <event2/bufferevent_ssl.h>
+#include <event2/buffer.h>
+#include <event2/thread.h>
+
+#include <MESA/MESA_handle_logger.h>
+#include <tfe_utils.h>
+#include <tfe_stream.h>
+#include <stream.h>
+#include <kni.h>
+#include <sescache.h>
+
+/*
+ * Proxy engine, built around libevent 2.x.
+ */
+
+#define TFE_BACKLOG_DEFAULT 20
+
+const char * module_name_pxy = "TFE_PXY";
+extern struct tfe_instance * g_tfe_instance;
+
+__thread int __currect_thread_id;
+
+static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg)
+{
+ return;
+}
+
+/*
+ * Thread entry point; runs the event loop of the event base.
+ * Does not exit until the libevent loop is broken explicitly.
+ */
+static void * __tfe_thrmgr_thread_entry(void * arg)
+{
+ struct tfe_thread_ctx * ctx = (struct tfe_thread_ctx *) arg;
+ struct timeval timer_delay = {60, 0};
+
+ struct event * ev;
+ ev = event_new(ctx->evbase, -1, EV_PERSIST, __dummy_event_handler, NULL);
+
+ if (!ev) return (void *)NULL;
+
+ evtimer_add(ev, &timer_delay);
+ ctx->running = 1;
+
+ __currect_thread_id = ctx->thread_id;
+ event_base_dispatch(ctx->evbase);
+ event_free(ev);
+
+ return (void *)NULL;
+}
+
+static int signals[] = {SIGTERM, SIGQUIT, SIGHUP, SIGINT, SIGPIPE, SIGUSR1};
+
+struct tfe_proxy
+{
+ char name[TFE_SYMBOL_MAX];
+ struct event_base * evbase;
+ struct event * sev[sizeof(signals) / sizeof(int)];
+ struct event * gcev;
+
+ struct tfe_config * opts;
+ void * main_logger;
+
+ struct sess_cache * dsess_cache;
+ struct sess_cache * ssess_cache;
+
+ unsigned int nr_work_threads;
+ struct tfe_thread_ctx * work_threads;
+
+ unsigned int nr_modules;
+ struct tfe_plugin * modules;
+
+ void * io_mod;
+};
+
+/*
+ * Signal handler for SIGTERM, SIGQUIT, SIGINT, SIGHUP, SIGPIPE and SIGUSR1.
+ */
+static void proxy_signal_cb(evutil_socket_t fd, short what, void * arg)
+{
+ tfe_proxy * ctx = (tfe_proxy *) arg;
+ switch (fd)
+ {
+ case SIGTERM:
+ case SIGQUIT:
+ case SIGINT:
+ case SIGHUP:
+ break;
+ case SIGUSR1:
+ break;
+ case SIGPIPE:
+ TFE_LOG_ERROR("Warning: Received SIGPIPE; ignoring.\n");
+ break;
+ default:
+ TFE_LOG_ERROR("Warning: Received unexpected signal %i\n", fd);
+ break;
+ }
+}
+
+static void proxy_gc_cb(evutil_socket_t fd, short what, void * arg)
+{
+ tfe_proxy * ctx = (tfe_proxy *) arg;
+ (void)fd;
+ (void)what;
+}
+
+unsigned int select_work_thread(struct tfe_proxy * pxy)
+{
+ unsigned int min_thread_id = 0;
+ size_t min_load = pxy->work_threads[min_thread_id].load;
+
+ for (unsigned thread_id = 1; thread_id < pxy->nr_work_threads; thread_id++)
+ {
+ if (min_load > pxy->work_threads[thread_id].load)
+ {
+ min_load = pxy->work_threads[thread_id].load;
+ min_thread_id = thread_id;
+ }
+ }
+
+ pxy->work_threads[min_thread_id].load++;
+ return min_thread_id;
+}
+/*
+ * Callback for accept events on the socket listener bufferevent.
+ */
+static void io_mod_accept_cb(evutil_socket_t upstream_fd, evutil_socket_t downstream_fd,
+ enum tfe_session_proto session_type, void * arg)
+{
+ struct tfe_proxy * pxy = (struct tfe_proxy *) arg;
+
+ unsigned int worker_tid = select_work_thread(pxy);
+ tfe_thread_ctx * worker_thread_ctx = &pxy->work_threads[worker_tid];
+
+ struct tfe_stream_private * stream = tfe_stream_create(upstream_fd,
+ downstream_fd, session_type, worker_thread_ctx);
+
+ assert(stream != NULL);
+ return tfe_stream_setup(stream);
+}
+
+/*
+ * Set up the core event loop.
+ * Socket clisock is the privsep client socket used for binding to ports.
+ * Returns ctx on success, or NULL on error.
+ */
+struct tfe_proxy * tfe_proxy_new(tfe_config * cfg)
+{
+ struct tfe_proxy * proxy = ALLOC(struct tfe_proxy, 1);
+ assert(proxy != NULL);
+
+ struct timeval gc_delay = {60, 0};
+
+ /* adds locking, only required if accessed from separate threads */
+ evthread_use_pthreads();
+ event_enable_debug_mode();
+
+ proxy->evbase = event_base_new();
+ proxy->dsess_cache = session_cache_init();
+ proxy->ssess_cache = session_cache_init();
+
+ proxy->nr_modules = 2;
+ proxy->modules = ALLOC(struct tfe_plugin, proxy->nr_modules);
+
+ proxy->modules[0].proto = APP_PROTO_HTTP1;
+ proxy->modules[1].proto = APP_PROTO_HTTP2;
+
+ proxy->work_threads = ALLOC(struct tfe_thread_ctx, proxy->nr_work_threads);
+
+ for (unsigned int i = 0; i < proxy->nr_work_threads; i++)
+ {
+ proxy->work_threads[i].thread_id = i;
+ proxy->work_threads[i].evbase = event_base_new();
+ proxy->work_threads[i].dsess_cache = proxy->dsess_cache;
+ proxy->work_threads[i].ssess_cache = proxy->ssess_cache;
+ proxy->work_threads[i].nr_modules = proxy->nr_modules;
+ proxy->work_threads[i].modules = proxy->modules;
+ }
+
+ //Todo: Not handle signal if have mutliple proxy instance.
+ for (size_t i = 0; i < (sizeof(signals) / sizeof(int)); i++)
+ {
+ proxy->sev[i] = evsignal_new(proxy->evbase, signals[i], proxy_signal_cb, proxy);
+ if (!proxy->sev[i]) goto error_out;
+ evsignal_add(proxy->sev[i], NULL);
+ }
+
+ proxy->gcev = event_new(proxy->evbase, -1, EV_PERSIST, proxy_gc_cb, proxy);
+ if (!proxy->gcev)
+ goto error_out;
+
+ evtimer_add(proxy->gcev, &gc_delay);
+ return proxy;
+
+error_out:
+ if (proxy->gcev)
+ {
+ event_free(proxy->gcev);
+ }
+
+ for (size_t i = 0; i < (sizeof(proxy->sev) / sizeof(proxy->sev[0])); i++)
+ {
+ if (proxy->sev[i])
+ {
+ event_free(proxy->sev[i]);
+ }
+ }
+
+ for (typeof(proxy->nr_work_threads) i = 0; i < proxy->nr_work_threads; i++)
+ {
+ proxy->work_threads[i].thread_id = i;
+ event_base_free(proxy->work_threads[i].evbase);
+ }
+
+ event_base_free(proxy->evbase);
+
+ free(proxy);
+ return NULL;
+}
+
+/*
+ * Run the event loop. Returns when the event loop is cancelled by a signal
+ * or on failure.
+ */
+void tfe_proxy_run(struct tfe_proxy * proxy)
+{
+ unsigned int thread_id;
+ for (thread_id = 0; thread_id < proxy->nr_work_threads; thread_id++)
+ {
+ if (pthread_create(&(proxy->work_threads[thread_id].thr), NULL,
+ __tfe_thrmgr_thread_entry, &(proxy->work_threads[thread_id])))
+ {
+ MESA_handle_runtime_log(proxy->main_logger, RLOG_LV_FATAL, proxy->name, "pthread_create failed.");
+ }
+
+ while (!proxy->work_threads[thread_id].running)
+ {
+ sched_yield();
+ }
+ }
+
+ event_base_dispatch(proxy->evbase);
+}
+
+/*
+ * Break the loop of the proxy, causing the tfe_proxy_run to return.
+ */
+void proxy_loopbreak(tfe_proxy * ctx)
+{
+ event_base_loopbreak(ctx->evbase);
+}
+
+/*
+ * Free the proxy data structures.
+ */
+void proxy_free(tfe_proxy * ctx)
+{
+}
+
+
+int main(int argc, char *argv[])
+{
+ const char* main_profile="./conf/tfe_main.conf";
+
+ tfe_proxy *proxy=NULL;
+ void* wcfg_handle=NULL;
+
+ //TODO: Initiate Local Cert Cache, Decryption Mirror, Field Stat, Logger and etc.
+ //NOTICE: Maat, Cert Store,Tango Cache are initiated in bussiness plugin.
+
+#if 0
+ g_tfe_instance=ALLOC(struct tfe_instance,1);
+ proxy=tfe_proxy_new(g_tfe_cfg);
+#endif
+
+ tfe_proxy_run(proxy);
+ proxy_free(proxy);
+}