diff options
| author | luwenpeng <[email protected]> | 2021-04-21 11:51:30 +0800 |
|---|---|---|
| committer | luwenpeng <[email protected]> | 2021-04-21 11:51:30 +0800 |
| commit | 1c37ae746df4e8cb4cb242af5a9a2ce4a95a0d21 (patch) | |
| tree | e09d271cfa9d632c1659ec21c4246fc4273c13d3 /platform/src | |
| parent | 3e020b9e28e9126f2544557705c2006047053d36 (diff) | |
TSG-5978 TFE 与 KNI 保活的 watchdog 线程增加对 tfe worker 线程健康状态检测的功能develop-4.4
Diffstat (limited to 'platform/src')
| -rw-r--r-- | platform/src/proxy.cpp | 32 | ||||
| -rw-r--r-- | platform/src/watchdog_kni.cpp | 38 |
2 files changed, 61 insertions, 9 deletions
diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index 4f1dc2e..0da98c0 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -57,7 +57,7 @@ extern struct ssl_policy_enforcer* ssl_policy_enforcer_create(void* logger); extern enum ssl_stream_action ssl_policy_enforce(struct ssl_stream *upstream, void* u_para); -static int signals[] = {SIGHUP, SIGPIPE, SIGUSR1}; +static int signals[] = {SIGHUP, SIGPIPE, SIGUSR1, SIGUSR2}; /* Global Resource */ void * g_default_logger = NULL; @@ -228,8 +228,18 @@ void tfe_proxy_free(tfe_proxy * ctx) static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg) { - //printf("%s alive\n",__FUNCTION__); - return; + struct tfe_thread_ctx *ctx = (struct tfe_thread_ctx *)arg; + struct timespec now; + + clock_gettime(CLOCK_MONOTONIC, &now); + ATOMIC_SET(&(ctx->lastime), now.tv_sec); + TFE_LOG_DEBUG(g_default_logger, "Worker thread %d Update time %lds", ctx->thread_id, now.tv_sec); + + while (ATOMIC_READ(&(ctx->proxy->make_work_thread_sleep)) > 0) + { + TFE_LOG_ERROR(g_default_logger, "recv SIGUSR1, make worker thread %d sleep", ctx->thread_id); + sleep(1); + } } static void __signal_handler_cb(evutil_socket_t fd, short what, void * arg) @@ -243,7 +253,16 @@ static void __signal_handler_cb(evutil_socket_t fd, short what, void * arg) TFE_LOG_ERROR(ctx->logger, "recv SIGHUP, reload zlog.conf"); MESA_handle_runtime_log_reconstruction(NULL); break; - case SIGUSR1: break; + case SIGUSR1: + // enable work thread sleep + TFE_LOG_ERROR(ctx->logger, "recv SIGUSR1, make worker thread sleep"); + ATOMIC_SET(&(ctx->make_work_thread_sleep), 1); + break; + case SIGUSR2: + // disable work thread sleep + TFE_LOG_ERROR(ctx->logger, "recv SIGUSR2, wake worker thread from sleep"); + ATOMIC_ZERO(&(ctx->make_work_thread_sleep)); + break; case SIGPIPE: TFE_PROXY_STAT_INCREASE(STAT_SIGPIPE, 1); TFE_LOG_ERROR(ctx->logger, "Warning: Received SIGPIPE; ignoring.\n"); @@ -269,9 +288,9 @@ static void __gc_handler_cb(evutil_socket_t fd, short what, void * arg) static void * tfe_work_thread(void * arg) { struct tfe_thread_ctx * ctx = (struct tfe_thread_ctx *) arg; - struct timeval timer_delay = {60, 0}; + struct timeval timer_delay = {2, 0}; - struct event * ev = event_new(ctx->evbase, -1, EV_PERSIST, __dummy_event_handler, NULL); + struct event * ev = event_new(ctx->evbase, -1, EV_PERSIST, __dummy_event_handler, ctx); if (unlikely(ev == NULL)) { TFE_LOG_ERROR(g_default_logger, "Failed at creating dummy event for thread %u", ctx->thread_id); @@ -279,7 +298,6 @@ static void * tfe_work_thread(void * arg) } evtimer_add(ev, &timer_delay); - ctx->running = 1; __currect_thread_id = ctx->thread_id; char thread_name[16]; snprintf(thread_name, sizeof(thread_name), "tfe:worker-%d", ctx->thread_id); diff --git a/platform/src/watchdog_kni.cpp b/platform/src/watchdog_kni.cpp index 1769b16..f0300b6 100644 --- a/platform/src/watchdog_kni.cpp +++ b/platform/src/watchdog_kni.cpp @@ -7,6 +7,8 @@ #include <unistd.h> #include <assert.h> +#include <proxy.h> +#include <platform.h> #include <tfe_utils.h> #include <watchdog_kni.h> #include <MESA/MESA_prof_load.h> @@ -205,10 +207,32 @@ void * watchdog_kni_thread(void * arg) DIE("watchdog thread is terminated."); } +static void health_check_for_thread_worker(evutil_socket_t fd, short what, void * arg) +{ + struct tfe_proxy *proxy = (struct tfe_proxy *)arg; + struct timespec now; + time_t temp; + + clock_gettime(CLOCK_MONOTONIC, &now); + + for (unsigned int i = 0; i < proxy->nr_work_threads; i++) + { + temp = ATOMIC_READ(&(proxy->work_threads[i]->lastime)); + if (temp + 2 + 2 + 1 < now.tv_sec) + { + TFE_LOG_ERROR(g_default_logger, "Watchdog thread nowtime %ld, Worker thread %d lastime %ld, Worker thread no reply, Exit ! ! ! ", now.tv_sec, proxy->work_threads[i]->thread_id, temp); + exit(-1); + } + TFE_LOG_DEBUG(g_default_logger, "Watchdog thread nowtime %ld, Worker thread %d lastime %lds ", now.tv_sec, proxy->work_threads[i]->thread_id, temp); + } +} + struct watchdog_kni * watchdog_kni_create(struct tfe_proxy * proxy, const char * profile, void * logger) { struct watchdog_kni * __ctx = ALLOC(struct watchdog_kni, 1); int ret = 0; + struct event *ev = NULL; + struct timeval timer_delay = {2, 0}; __ctx->proxy = proxy; __ctx->profile = profile; @@ -232,7 +256,7 @@ struct watchdog_kni * watchdog_kni_create(struct tfe_proxy * proxy, const char * if (ret < 0) { - TFE_LOG_ERROR(logger, "failed at parsing kni's address, in file %s, section %s, entry %s: %s", + TFE_LOG_ERROR(__ctx->logger, "failed at parsing kni's address, in file %s, section %s, entry %s: %s", profile, "kni", "ip", str_kni_ip); goto __errout; } @@ -245,11 +269,21 @@ struct watchdog_kni * watchdog_kni_create(struct tfe_proxy * proxy, const char * __ctx->ev_base = event_base_new(); if (!__ctx->ev_base) { - TFE_LOG_ERROR(logger, "failed at watchdog event_base_new(): %s", strerror(errno)); + TFE_LOG_ERROR(__ctx->logger, "failed at watchdog event_base_new(): %s", strerror(errno)); + /* after log, reset errno */ + errno = 0; + goto __errout; + } + + ev = event_new(__ctx->ev_base, -1, EV_PERSIST, health_check_for_thread_worker, proxy); + if (unlikely(ev == NULL)) + { + TFE_LOG_ERROR(__ctx->logger, "Failed at creating health check event for worker thread"); /* after log, reset errno */ errno = 0; goto __errout; } + evtimer_add(ev, &timer_delay); watchdog_kni_reset(__ctx); watchdog_kni_try_connect(__ctx); |
