summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorluwenpeng <[email protected]>2021-08-20 11:32:33 +0800
committerluwenpeng <[email protected]>2021-08-20 15:36:58 +0800
commitf764a4bae1d23f2b182302ac80cac9878c7d4cd8 (patch)
tree24454bc4c99b6ad92a24a30791e26527addfe26a
parentc41a67ca2b410c13bfdc651af4992d8f8bc824e4 (diff)
优化 watchdog tfe worker thread 的实现逻辑v4.5.12-202108
-rw-r--r--conf/tfe/tfe.conf9
-rw-r--r--platform/CMakeLists.txt2
-rw-r--r--platform/include/internal/platform.h1
-rw-r--r--platform/include/internal/proxy.h1
-rw-r--r--platform/include/internal/watchdog_tfe.h7
-rw-r--r--platform/src/acceptor_kni_v3.cpp4
-rw-r--r--platform/src/proxy.cpp17
-rw-r--r--platform/src/ssl_stream.cpp2
-rw-r--r--platform/src/watchdog_kni.cpp42
-rw-r--r--platform/src/watchdog_tfe.cpp152
10 files changed, 193 insertions, 44 deletions
diff --git a/conf/tfe/tfe.conf b/conf/tfe/tfe.conf
index 9577012..66069d8 100644
--- a/conf/tfe/tfe.conf
+++ b/conf/tfe/tfe.conf
@@ -42,6 +42,15 @@ cmsg_port=2475
watchdog_switch=1
watchdog_port=2476
+[watchdog_tfe]
+# The worker thread updates the timestamp every two seconds
+# The watchdog thread checks the timestamp every second
+enable=1
+timeout_seconds=5
+statistics_window=20
+timeout_cnt_as_fail=3
+timeout_debug=0
+
[ssl]
ssl_ja3_debug=0
ssl_ja3_table=PXY_SSL_FINGERPRINT
diff --git a/platform/CMakeLists.txt b/platform/CMakeLists.txt
index 71912ea..388c8dd 100644
--- a/platform/CMakeLists.txt
+++ b/platform/CMakeLists.txt
@@ -4,7 +4,7 @@ find_package(NFNETLINK REQUIRED)
add_executable(tfe src/acceptor_kni_v1.cpp src/acceptor_kni_v2.cpp src/acceptor_kni_v3.cpp src/ssl_stream.cpp src/key_keeper.cpp src/ssl_fetch_cert.cpp
src/ssl_sess_cache.cpp src/ssl_sess_ticket.cpp src/ssl_service_cache.cpp
src/ssl_trusted_cert_storage.cpp src/ev_root_ca_metadata.cpp src/ssl_utils.cpp
- src/tcp_stream.cpp src/main.cpp src/proxy.cpp src/sender_scm.cpp src/watchdog_kni.cpp src/ssl_ja3.cpp)
+ src/tcp_stream.cpp src/main.cpp src/proxy.cpp src/sender_scm.cpp src/watchdog_kni.cpp src/watchdog_tfe.cpp src/ssl_ja3.cpp)
target_include_directories(tfe PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include/external)
target_include_directories(tfe PRIVATE ${CMAKE_CURRENT_LIST_DIR}/include/internal)
diff --git a/platform/include/internal/platform.h b/platform/include/internal/platform.h
index 00371ca..92540ac 100644
--- a/platform/include/internal/platform.h
+++ b/platform/include/internal/platform.h
@@ -11,6 +11,7 @@ struct tfe_thread_ctx
{
struct tfe_proxy *proxy;
pthread_t thr;
+ int readable_tid;
unsigned int thread_id;
unsigned int load;
diff --git a/platform/include/internal/proxy.h b/platform/include/internal/proxy.h
index 5b91e94..5a71deb 100644
--- a/platform/include/internal/proxy.h
+++ b/platform/include/internal/proxy.h
@@ -118,6 +118,7 @@ struct tfe_proxy
struct acceptor_kni_v3 * kni_v3_acceptor;
struct sender_scm * scm_sender;
struct watchdog_kni * watchdog_kni;
+ struct watchdog_tfe * watchdog_tfe;
/* DEBUG OPTIONS */
unsigned int tcp_all_passthrough;
diff --git a/platform/include/internal/watchdog_tfe.h b/platform/include/internal/watchdog_tfe.h
new file mode 100644
index 0000000..4e555b6
--- /dev/null
+++ b/platform/include/internal/watchdog_tfe.h
@@ -0,0 +1,7 @@
+#ifndef TFE_WATCHDOG_TFE_H
+#define TFE_WATCHDOG_TFE_H
+
+struct watchdog_tfe;
+struct watchdog_tfe *watchdog_tfe_create(struct tfe_proxy *proxy, const char *profile, void *logger);
+
+#endif //TFE_WATCHDOG_TFE_H \ No newline at end of file
diff --git a/platform/src/acceptor_kni_v3.cpp b/platform/src/acceptor_kni_v3.cpp
index 61cf031..290167f 100644
--- a/platform/src/acceptor_kni_v3.cpp
+++ b/platform/src/acceptor_kni_v3.cpp
@@ -313,7 +313,7 @@ static int payload_handler_cb(struct nfq_q_handle *qh, struct nfgenmsg *nfmsg, s
raw_payload_len = nfq_get_payload(nfa, &raw_payload);
if ((unsigned int)raw_payload_len <= (MIN(sizeof(struct iphdr), sizeof(struct ip6_hdr)) + sizeof(struct tcphdr)))
{
- TFE_LOG_ERROR(g_default_logger, "Failed at nfq_get_payload(), paylod len %d too small, less than %d", raw_payload_len, (MIN(sizeof(struct iphdr), sizeof(struct ip6_hdr)) + sizeof(struct tcphdr)));
+ TFE_LOG_ERROR(g_default_logger, "Failed at nfq_get_payload(), paylod len %d too small, less than %lu", raw_payload_len, (MIN(sizeof(struct iphdr), sizeof(struct ip6_hdr)) + sizeof(struct tcphdr)));
tfe_hexdump2file(stderr, "Failed at parsing payload, payload len too small", raw_payload, (unsigned int)raw_payload_len);
goto end;
}
@@ -353,7 +353,7 @@ static int payload_handler_cb(struct nfq_q_handle *qh, struct nfgenmsg *nfmsg, s
// check if there is a tcp options
if (pktinfo.tcphdr_len <= sizeof(struct tcphdr))
{
- TFE_LOG_ERROR(g_default_logger, "Failed at parser TCP header, TCP header len %d too small, less than %d", pktinfo.tcphdr_len, sizeof(struct tcphdr));
+ TFE_LOG_ERROR(g_default_logger, "Failed at parser TCP header, TCP header len %d too small, less than %lu", pktinfo.tcphdr_len, sizeof(struct tcphdr));
tfe_hexdump2file(stderr, "Failed at parsing TCP header, TCP header len too small", raw_payload, (unsigned int)raw_payload_len);
goto end;
}
diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp
index acdd45d..d3b895e 100644
--- a/platform/src/proxy.cpp
+++ b/platform/src/proxy.cpp
@@ -18,6 +18,7 @@
#include <getopt.h>
#include <libgen.h>
#include <unistd.h>
+#include <sys/syscall.h>
#include <event2/event.h>
#include <event2/dns.h>
@@ -47,6 +48,7 @@
#include <acceptor_kni_v2.h>
#include <acceptor_kni_v3.h>
#include <watchdog_kni.h>
+#include <watchdog_tfe.h>
#include <key_keeper.h>
/* Breakpad */
@@ -238,7 +240,7 @@ static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg)
while (ATOMIC_READ(&(ctx->proxy->make_work_thread_sleep)) > 0)
{
- TFE_LOG_ERROR(g_default_logger, "recv SIGUSR1, make worker thread %d sleep", ctx->thread_id);
+ TFE_LOG_ERROR(g_default_logger, "recv SIGUSR1, make worker thread[%d] %d sleep", ctx->thread_id, ctx->readable_tid);
sleep(1);
}
}
@@ -290,6 +292,7 @@ static void * tfe_work_thread(void * arg)
{
struct tfe_thread_ctx * ctx = (struct tfe_thread_ctx *) arg;
struct timeval timer_delay = {2, 0};
+ ctx->readable_tid = syscall(SYS_gettid);
struct event * ev = event_new(ctx->evbase, -1, EV_PERSIST, __dummy_event_handler, ctx);
if (unlikely(ev == NULL))
@@ -534,12 +537,12 @@ void tfe_proxy_acceptor_init(struct tfe_proxy * proxy, const char * profile)
static void usage(char *cmd)
{
fprintf(stderr, "USAGE: %s [OPTIONS]\n", cmd);
- fprintf(stderr, " -v -- show version\n");
- fprintf(stderr, " -g -- generate coredump\n");
- fprintf(stderr, " -h -- show help info\n\n");
+ fprintf(stderr, " -v -- show version\n");
+ fprintf(stderr, " -g -- generate coredump\n");
+ fprintf(stderr, " -h -- show help info\n\n");
fprintf(stderr, "kill -s SIGHUP $pid -- reload zlog configure\n");
fprintf(stderr, "kill -s SIGUSR1 $pid -- make worker thread sleep\n");
- fprintf(stderr, "kill -s SIGUSR2 $pid -- wake worker thread form sleep\n");
+ fprintf(stderr, "kill -s SIGUSR2 $pid -- wake worker thread from sleep\n");
}
int main(int argc, char * argv[])
@@ -680,6 +683,10 @@ int main(int argc, char * argv[])
g_default_proxy->watchdog_kni = watchdog_kni_create(g_default_proxy, main_profile, g_default_logger);
CHECK_OR_EXIT(g_default_proxy->watchdog_kni != NULL, "Failed at creating KNI watchdog, Exit.");
+ /* Watchdog TFE */
+ g_default_proxy->watchdog_tfe = watchdog_tfe_create(g_default_proxy, main_profile, g_default_logger);
+ CHECK_OR_EXIT(g_default_proxy->watchdog_tfe != NULL, "Failed at creating TFE watchdog, Exit.");
+
TFE_LOG_ERROR(g_default_logger, "Tango Frontend Engine initialized, Version: %s.", __tfe_version);
/* If TFE is run by systemd's notify, then tell the systemd our tfe is ready.
diff --git a/platform/src/ssl_stream.cpp b/platform/src/ssl_stream.cpp
index 865e623..7b154bb 100644
--- a/platform/src/ssl_stream.cpp
+++ b/platform/src/ssl_stream.cpp
@@ -2039,7 +2039,7 @@ void ssl_stream_free(struct ssl_stream * s_stream, struct event_base * evbase, s
if (s_stream->dir == CONN_DIR_UPSTREAM)
{
size_t rx_offset_this_time = 0;
- int ret = tfe_stream_info_get(s_stream->tcp_stream, INFO_FROM_UPSTREAM_RX_OFFSET, &rx_offset_this_time, sizeof(rx_offset_this_time));
+ tfe_stream_info_get(s_stream->tcp_stream, INFO_FROM_UPSTREAM_RX_OFFSET, &rx_offset_this_time, sizeof(rx_offset_this_time));
const char * sni = (s_stream->up_parts.client_hello && s_stream->up_parts.client_hello->sni) ? s_stream->up_parts.client_hello->sni : "null";
TFE_LOG_DEBUG(g_default_logger, "ssl up stream close, rx_offset:%d, sni:%s", rx_offset_this_time, sni);
}
diff --git a/platform/src/watchdog_kni.cpp b/platform/src/watchdog_kni.cpp
index b467ef7..6238760 100644
--- a/platform/src/watchdog_kni.cpp
+++ b/platform/src/watchdog_kni.cpp
@@ -6,6 +6,7 @@
#include <event2/buffer.h>
#include <unistd.h>
#include <assert.h>
+#include <sys/prctl.h>
#include <proxy.h>
#include <platform.h>
@@ -202,37 +203,19 @@ retry:
void * watchdog_kni_thread(void * arg)
{
+ char thread_name[16];
+ snprintf(thread_name, sizeof(thread_name), "watchdog:kni");
+ prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL);
+
struct watchdog_kni * __ctx = (struct watchdog_kni *)arg;
while(event_base_dispatch(__ctx->ev_base) >= 0) {}
- DIE("watchdog thread is terminated.");
-}
-
-static void health_check_for_thread_worker(evutil_socket_t fd, short what, void * arg)
-{
- struct tfe_proxy *proxy = (struct tfe_proxy *)arg;
- struct timespec now;
- time_t temp;
-
- clock_gettime(CLOCK_MONOTONIC, &now);
-
- for (unsigned int i = 0; i < proxy->nr_work_threads; i++)
- {
- temp = ATOMIC_READ(&(proxy->work_threads[i]->lastime));
- if (temp + 2 + 2 + 1 < now.tv_sec)
- {
- TFE_LOG_ERROR(g_default_logger, "Watchdog thread nowtime %ld, Worker thread %d lastime %ld, Worker thread no reply, Exit ! ! ! ", now.tv_sec, proxy->work_threads[i]->thread_id, temp);
- abort();
- }
- // TFE_LOG_DEBUG(g_default_logger, "Watchdog thread nowtime %ld, Worker thread %d lastime %lds ", now.tv_sec, proxy->work_threads[i]->thread_id, temp);
- }
+ DIE("Watchdog KNI thread is terminated.");
}
struct watchdog_kni * watchdog_kni_create(struct tfe_proxy * proxy, const char * profile, void * logger)
{
struct watchdog_kni * __ctx = ALLOC(struct watchdog_kni, 1);
int ret = 0;
- struct event *ev = NULL;
- struct timeval timer_delay = {2, 0};
__ctx->proxy = proxy;
__ctx->profile = profile;
@@ -274,17 +257,6 @@ struct watchdog_kni * watchdog_kni_create(struct tfe_proxy * proxy, const char *
errno = 0;
goto __errout;
}
-
- ev = event_new(__ctx->ev_base, -1, EV_PERSIST, health_check_for_thread_worker, proxy);
- if (unlikely(ev == NULL))
- {
- TFE_LOG_ERROR(__ctx->logger, "Failed at creating health check event for worker thread");
- /* after log, reset errno */
- errno = 0;
- goto __errout;
- }
- evtimer_add(ev, &timer_delay);
-
watchdog_kni_reset(__ctx);
watchdog_kni_try_connect(__ctx);
@@ -298,7 +270,7 @@ struct watchdog_kni * watchdog_kni_create(struct tfe_proxy * proxy, const char *
goto __errout;
}
- TFE_LOG_INFO(__ctx->logger, "KNI watchdog module init successfully.");
+ TFE_LOG_INFO(__ctx->logger, "Watchdog KNI module init successfully.");
return __ctx;
__errout:
diff --git a/platform/src/watchdog_tfe.cpp b/platform/src/watchdog_tfe.cpp
new file mode 100644
index 0000000..9cbbee6
--- /dev/null
+++ b/platform/src/watchdog_tfe.cpp
@@ -0,0 +1,152 @@
+#include <arpa/inet.h>
+#include <netinet/tcp.h>
+#include <event2/bufferevent.h>
+#include <event2/event.h>
+#include <event2/buffer.h>
+#include <unistd.h>
+#include <assert.h>
+#include <sys/prctl.h>
+#include <stdlib.h>
+
+#include <proxy.h>
+#include <platform.h>
+#include <tfe_utils.h>
+#include <watchdog_tfe.h>
+#include <MESA/MESA_prof_load.h>
+
+struct watchdog_tfe
+{
+ struct tfe_proxy *proxy;
+ struct event_base *ev_base;
+ pthread_t pthread;
+ const char *profile;
+ void *logger;
+
+ unsigned int enable;
+ unsigned int timeout_seconds;
+ unsigned int statistics_window;
+ unsigned int timeout_cnt_as_fail;
+ unsigned int timeout_debug;
+
+ unsigned int cur_time_window_fail_cnt;
+ time_t cur_time_window_begin;
+ time_t cur_time_window_end;
+};
+
+void *watchdog_tfe_thread(void *arg)
+{
+ char thread_name[16];
+ snprintf(thread_name, sizeof(thread_name), "watchdog:tfe");
+ prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL);
+
+ struct watchdog_tfe *__ctx = (struct watchdog_tfe *)arg;
+ while (event_base_dispatch(__ctx->ev_base) >= 0)
+ {
+ }
+ DIE("Watchdog TFE thread is terminated.");
+}
+
+static void watchdog_tfe_thread_handle(evutil_socket_t fd, short what, void *arg)
+{
+ struct tfe_proxy *proxy = (struct tfe_proxy *)arg;
+ struct watchdog_tfe *__ctx = proxy->watchdog_tfe;
+ struct timespec now;
+ time_t temp;
+
+ clock_gettime(CLOCK_MONOTONIC, &now);
+
+ if (now.tv_sec > __ctx->cur_time_window_end)
+ {
+ __ctx->cur_time_window_begin = now.tv_sec;
+ __ctx->cur_time_window_end = now.tv_sec + __ctx->statistics_window;
+ __ctx->cur_time_window_fail_cnt = 0;
+ }
+
+ for (unsigned int i = 0; i < proxy->nr_work_threads; i++)
+ {
+ temp = ATOMIC_READ(&(proxy->work_threads[i]->lastime));
+ if (temp + __ctx->timeout_seconds < now.tv_sec)
+ {
+ if (__ctx->timeout_debug)
+ {
+ TFE_LOG_ERROR(__ctx->logger, "Current timestamp is %ld, Worker thread[%d] tid %d timestamp is %ld, Worker thread timeout, Exit !!!",
+ now.tv_sec, proxy->work_threads[i]->thread_id, proxy->work_threads[i]->readable_tid, temp);
+ abort();
+ }
+ else
+ {
+ __ctx->cur_time_window_fail_cnt++;
+ TFE_LOG_ERROR(__ctx->logger, "Current timestamp is %ld, Worker thread[%d] tid %d timestamp is %ld, Worker thread timeout, fail count %d !!!",
+ now.tv_sec, proxy->work_threads[i]->thread_id, proxy->work_threads[i]->readable_tid, temp, __ctx->cur_time_window_fail_cnt);
+ if (__ctx->cur_time_window_fail_cnt >= __ctx->timeout_cnt_as_fail)
+ {
+ TFE_LOG_ERROR(__ctx->logger, "Frome %ld to %ld, there are %d timeouts of the worker threads, Exit !!!",
+ __ctx->cur_time_window_begin, __ctx->cur_time_window_end, __ctx->cur_time_window_fail_cnt);
+ exit(-1);
+ }
+ }
+ }
+ }
+}
+
+struct watchdog_tfe *watchdog_tfe_create(struct tfe_proxy *proxy, const char *profile, void *logger)
+{
+ struct watchdog_tfe *__ctx = ALLOC(struct watchdog_tfe, 1);
+ int ret = 0;
+ struct event *ev = NULL;
+ // The worker thread updates the timestamp every two seconds
+ // The watchdog thread checks the timestamp every second
+ struct timeval timer_delay = {1, 0};
+
+ __ctx->proxy = proxy;
+ __ctx->profile = profile;
+ __ctx->logger = logger;
+
+ MESA_load_profile_uint_def(profile, "watchdog_tfe", "enable", &(__ctx->enable), 1);
+ MESA_load_profile_uint_def(profile, "watchdog_tfe", "timeout_seconds", &(__ctx->timeout_seconds), 5);
+ MESA_load_profile_uint_def(profile, "watchdog_tfe", "statistics_window", &(__ctx->statistics_window), 20);
+ MESA_load_profile_uint_def(profile, "watchdog_tfe", "timeout_cnt_as_fail", &(__ctx->timeout_cnt_as_fail), 3);
+ MESA_load_profile_uint_def(profile, "watchdog_tfe", "timeout_debug", &(__ctx->timeout_debug), 0);
+
+ if (!__ctx->enable)
+ {
+ return __ctx;
+ }
+
+ struct timespec now;
+ clock_gettime(CLOCK_MONOTONIC, &now);
+ __ctx->cur_time_window_begin = now.tv_sec;
+ __ctx->cur_time_window_end = now.tv_sec + __ctx->statistics_window;
+ __ctx->cur_time_window_fail_cnt = 0;
+
+ __ctx->ev_base = event_base_new();
+ if (!__ctx->ev_base)
+ {
+ TFE_LOG_ERROR(__ctx->logger, "Fail to create event base: %s", strerror(errno));
+ errno = 0;
+ goto errout;
+ }
+
+ ev = event_new(__ctx->ev_base, -1, EV_PERSIST, watchdog_tfe_thread_handle, proxy);
+ if (unlikely(ev == NULL))
+ {
+ TFE_LOG_ERROR(__ctx->logger, "Fail to create tfe watchdog event");
+ errno = 0;
+ goto errout;
+ }
+ evtimer_add(ev, &timer_delay);
+
+ ret = pthread_create(&__ctx->pthread, NULL, watchdog_tfe_thread, (void *)__ctx);
+ if (unlikely(ret < 0))
+ {
+ TFE_LOG_ERROR(__ctx->logger, "Fail to create tfe watchdog thread: %s", strerror(errno));
+ errno = 0;
+ goto errout;
+ }
+
+ TFE_LOG_INFO(__ctx->logger, "Watchdog TFE module init successfully.");
+ return __ctx;
+
+errout:
+ return NULL;
+}; \ No newline at end of file