#include #include #include #include #include #include #include "sce.h" #include "log.h" #include "utils.h" #include "sf_metrics.h" #include "global_metrics.h" struct breakpad_instance *g_breakpad = NULL; static int is_need_stop = 0; #ifdef SCE_GIT_VERSION static __attribute__((__used__)) const char *__sce_version = SCE_GIT_VERSION; #else static __attribute__((__used__)) const char *__sce_version = "Unknown"; #endif static void usage(char *cmd) { fprintf(stderr, "USAGE: %s [OPTIONS]\n", cmd); fprintf(stderr, " -v -- show version\n"); fprintf(stderr, " -h -- show help info\n\n"); fprintf(stderr, "kill -s SIGHUP $PID -- reload zlog configure\n"); fprintf(stderr, "kill -s SIGINT $PID -- exit gracefully\n"); fprintf(stderr, "kill -s SIGQUIT $PID -- exit gracefully\n"); fprintf(stderr, "kill -s SIGTERM $PID -- exit gracefully\n"); } static void sig_handler(int signo) { if (signo == SIGHUP) { LOG_INFO("%s: recv SIGHUP, reload zlog.conf", LOG_TAG_SCE); LOG_RELOAD(); } if (signo == SIGINT) { LOG_ERROR("%s: recv SIGINT, exit !!!", LOG_TAG_SCE); ATOMIC_SET(&is_need_stop, 1); } if (signo == SIGQUIT) { LOG_ERROR("%s: recv SIGQUIT, exit !!!", LOG_TAG_SCE); ATOMIC_SET(&is_need_stop, 1); } if (signo == SIGTERM) { LOG_ERROR("%s: recv SIGTERM, exit !!!", LOG_TAG_SCE); ATOMIC_SET(&is_need_stop, 1); } } static void *worker_thread_cycle(void *arg) { struct thread_ctx *thread_ctx = (struct thread_ctx *)arg; struct packet_io *handle = thread_ctx->ref_io; struct sce_ctx *sce_ctx = thread_ctx->ref_sce_ctx; struct timestamp *ts = sce_ctx->ts; struct sf_metrics *sf_metrics = sce_ctx->sf_metrics; struct session_table *session_table = thread_ctx->session_table; struct thread_metrics *thread_metrics = &thread_ctx->thread_metrics; struct global_metrics *global_metrics = thread_ctx->ref_global_metrics; int thread_index = thread_ctx->thread_index; int timeout_ms = 0; int n_packet_recved = 0; char thread_name[16]; uint64_t sf_metrics_last_send_ts = timestamp_get_msec(ts); uint64_t sf_metrics_send_interval = sf_metrics_get_interval(sf_metrics); ATOMIC_SET(&thread_ctx->thread_is_runing, 1); snprintf(thread_name, sizeof(thread_name), "sce:worker-%d", thread_index); prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL); if (packet_io_init(handle, thread_ctx) != 0) { goto error_out; } LOG_INFO("%s: worker thread %d is running", LOG_TAG_SCE, thread_index); while (!ATOMIC_READ(&is_need_stop)) { n_packet_recved = packet_io_polling_nf(handle, thread_ctx); n_packet_recved += packet_io_polling_endpoint_l3(handle, thread_ctx); n_packet_recved += packet_io_polling_endpoint_l2(handle, thread_ctx); if (n_packet_recved == 0) { timeout_ms = sf_metrics_last_send_ts + sf_metrics_send_interval - timestamp_get_msec(ts); if (timeout_ms <= 0) { timeout_ms = 0; } packet_io_wait(handle, thread_ctx, timeout_ms); } global_metrics_sync(global_metrics, thread_metrics, thread_index); if (ATOMIC_READ(&thread_ctx->session_table_need_reset) > 0) { int n_session = session_table_count(session_table); session_table_reset(session_table); ATOMIC_ZERO(&thread_ctx->session_table_need_reset); ATOMIC_ADD(&(thread_metrics->session_free), n_session); } if (timestamp_get_msec(ts) - sf_metrics_last_send_ts >= sf_metrics_send_interval) { sf_metrics_output(sf_metrics, thread_index); sf_metrics_reset(sf_metrics, thread_index); sf_metrics_last_send_ts = timestamp_get_msec(ts); } } error_out: ATOMIC_SET(&thread_ctx->thread_is_runing, 0); LOG_ERROR("%s: worker thread %d exiting", LOG_TAG_SCE, thread_ctx->thread_index); return (void *)NULL; } int main(int argc, char **argv) { const char *profile = "./conf/sce.conf"; uint64_t ts_update_interval = 0; uint64_t g_metrics_last_send_ts = 0; uint64_t g_metrics_send_interval = 0; int opt = 0; while ((opt = getopt(argc, argv, "vh")) != -1) { switch (opt) { case 'v': fprintf(stderr, "TSG Service Chaining Engine, Version: %s\n", __sce_version); return 0; case 'h': /* fall through */ default: usage(argv[0]); return 0; } } if (LOG_INIT("./conf/zlog.conf") == -1) { return -1; } LOG_ERROR("%s: TSG Service Chaining Engine, Version: %s Start ...", LOG_TAG_SCE, __sce_version); if (signal(SIGHUP, sig_handler) == SIG_ERR) { LOG_ERROR("%s: unable to register SIGHUP signal handler, error %d: %s", LOG_TAG_SCE, errno, strerror(errno)); LOG_CLOSE(); return -1; } if (signal(SIGINT, sig_handler) == SIG_ERR) { LOG_ERROR("%s: unable to register SIGINT signal handler, error %d: %s", LOG_TAG_SCE, errno, strerror(errno)); LOG_CLOSE(); return -1; } if (signal(SIGQUIT, sig_handler) == SIG_ERR) { LOG_ERROR("%s: unable to register SIGQUIT signal handler, error %d: %s", LOG_TAG_SCE, errno, strerror(errno)); LOG_CLOSE(); return -1; } if (signal(SIGTERM, sig_handler) == SIG_ERR) { LOG_ERROR("%s: unable to register SIGTERM signal handler, error %d: %s", LOG_TAG_SCE, errno, strerror(errno)); LOG_CLOSE(); return -1; } g_breakpad = breakpad_init(profile, "system", g_default_logger, __sce_version); struct sce_ctx *ctx = sce_ctx_create(profile); if (ctx == NULL) { LOG_CLOSE(); return -1; } for (int i = 0; i < ctx->nr_worker_threads; i++) { ctx->work_threads[i].tid = 0; ctx->work_threads[i].thread_index = i; ctx->work_threads[i].session_table = session_table_create(); ctx->work_threads[i].ref_io = ctx->io; ctx->work_threads[i].ref_global_metrics = ctx->metrics; ctx->work_threads[i].ref_enforcer = ctx->enforcer; ctx->work_threads[i].ref_sce_ctx = ctx; ctx->work_threads[i].session_table_need_reset = 0; ctx->work_threads[i].tx_packets_ipid = random(); } for (int i = 0; i < ctx->nr_worker_threads; i++) { struct thread_ctx *thread_ctx = &ctx->work_threads[i]; if (pthread_create(&thread_ctx->tid, NULL, worker_thread_cycle, (void *)thread_ctx) < 0) { LOG_ERROR("%s: unable to create worker thread %d, error %d: %s", LOG_TAG_SCE, i, errno, strerror(errno)); goto error_out; } } timestamp_update(ctx->ts); ts_update_interval = timestamp_update_interval_ms(ctx->ts); g_metrics_last_send_ts = timestamp_get_msec(ctx->ts); g_metrics_send_interval = ctx->metrics->config.statsd_cycle * 1000; while (!ATOMIC_READ(&is_need_stop)) { if (timestamp_get_msec(ctx->ts) - g_metrics_last_send_ts >= g_metrics_send_interval) { global_metrics_flush(ctx->metrics); g_metrics_last_send_ts = timestamp_get_msec(ctx->ts); } usleep(ts_update_interval * 1000); timestamp_update(ctx->ts); } error_out: for (int i = 0; i < ctx->nr_worker_threads; i++) { while (1) { if (ATOMIC_READ(&(ctx->work_threads[i].thread_is_runing)) == 0) { struct thread_ctx *thread_ctx = &ctx->work_threads[i]; session_table_destory(thread_ctx->session_table); break; } else { sleep(1); LOG_ERROR("%s: wait thread %d exit", LOG_TAG_SCE, i); } } } sce_ctx_destory(ctx); LOG_CLOSE(); return 0; }