#include "ntc_ssl_collect.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "ssl.h" #include "MESA_prof_load.h" #include "MESA_handle_logger.h" #include "field_stat2.h" #include #define PLUGIN_NAME "NTC_SSL_COLLECT" #define PROFILE_PATH "./t1conf/main.conf" #define NTC_SSL_COLLECT_TOPIC "ntc_ssl_collect_log" #define DEBUG (0) #ifdef __cplusplus extern "C" { #endif #define GIT_VERSION_CATTER(v) __attribute__((__used__)) const char * GIT_VERSION_##v = NULL #define GIT_VERSION_EXPEND(v) GIT_VERSION_CATTER(v) /* VERSION TAG */ #ifdef GIT_VERSION GIT_VERSION_EXPEND(GIT_VERSION); #else static __attribute__((__used__)) const char * GIT_VERSION_UNKNOWN = NULL; #endif #undef GIT_VERSION_CATTER #undef GIT_VERSION_EXPEND #ifdef __cplusplus } #endif extern long long g_CurrentTime; ntc_ssl_collect_global_item g_ssl_collect_item; int NTC_SSL_COLLECT_VERSION_20190123 = 1; static rd_kafka_t *g_ntc_kafka_handle = NULL; static rd_kafka_topic_t *g_ntc_kafka_topic = NULL; static unsigned int get_ip_by_eth_name(const char *ifname) { int sockfd; struct ifreq ifr; unsigned int ip; sockfd = socket(AF_INET, SOCK_DGRAM, 0); if (-1 == sockfd) { goto error; } strcpy(ifr.ifr_name, ifname); if (ioctl(sockfd, SIOCGIFADDR, &ifr) < 0) { goto error; } ip = ((struct sockaddr_in *)&(ifr.ifr_addr))->sin_addr.s_addr; close(sockfd); return ip; error: close(sockfd); return INADDR_NONE; } static int ntc_addStreamInfo_to_jsonObj(cJSON *json_obj, const struct streaminfo *a_stream) { int ret = 0; const char *addr_proto = NULL; const char *null_addr = "0"; short null_port = 0; unsigned short tunnel_type = 0; char nest_addr_buf[1024]; int tunnel_type_size = sizeof(tunnel_type); const struct layer_addr *addr = NULL; char src_ip_str[128] = {0}, dst_ip_str[128] = {0}; cJSON_AddNumberToObject(json_obj, "stream_dir", a_stream->dir); addr = &(a_stream->addr); switch (addr->addrtype) { case ADDR_TYPE_IPV4: case __ADDR_TYPE_IP_PAIR_V4: inet_ntop(AF_INET, &addr->ipv4->saddr, src_ip_str, sizeof(src_ip_str)); inet_ntop(AF_INET, &addr->ipv4->daddr, dst_ip_str, sizeof(dst_ip_str)); cJSON_AddStringToObject(json_obj, "s_ip", src_ip_str); cJSON_AddStringToObject(json_obj, "d_ip", dst_ip_str); cJSON_AddNumberToObject(json_obj, "s_port", ntohs(addr->ipv4->source)); cJSON_AddNumberToObject(json_obj, "d_port", ntohs(addr->ipv4->dest)); break; case ADDR_TYPE_IPV6: case __ADDR_TYPE_IP_PAIR_V6: cJSON_AddNumberToObject(json_obj, "addr_type", addr->addrtype); inet_ntop(AF_INET6, addr->ipv6->saddr, src_ip_str, sizeof(src_ip_str)); inet_ntop(AF_INET6, addr->ipv6->daddr, dst_ip_str, sizeof(dst_ip_str)); cJSON_AddStringToObject(json_obj, "s_ip", src_ip_str); cJSON_AddStringToObject(json_obj, "d_ip", dst_ip_str); cJSON_AddNumberToObject(json_obj, "s_port", ntohs(addr->ipv6->source)); cJSON_AddNumberToObject(json_obj, "d_port", ntohs(addr->ipv6->dest)); break; case ADDR_TYPE_VLAN: case ADDR_TYPE_GRE: case ADDR_TYPE_MPLS: case ADDR_TYPE_PPPOE_SES: case ADDR_TYPE_L2TP: case ADDR_TYPE_PPP: cJSON_AddNumberToObject(json_obj, "addr_type", addr->addrtype); cJSON_AddStringToObject(json_obj, "s_ip", null_addr); cJSON_AddStringToObject(json_obj, "d_ip", null_addr); cJSON_AddNumberToObject(json_obj, "s_port", null_port); cJSON_AddNumberToObject(json_obj, "d_port", null_port); break; case ADDR_TYPE_PPTP: cJSON_AddNumberToObject(json_obj, "addr_type", addr->addrtype); cJSON_AddStringToObject(json_obj, "s_ip", null_addr); cJSON_AddStringToObject(json_obj, "d_ip", null_addr); cJSON_AddNumberToObject(json_obj, "s_port", ntohs(addr->pptp->C2S_call_id)); cJSON_AddNumberToObject(json_obj, "d_port", ntohs(addr->pptp->S2C_call_id)); break; default: break; } addr_proto = layer_addr_prefix_ntop(a_stream); cJSON_AddStringToObject(json_obj, "trans_proto", addr_proto); ret = MESA_get_stream_opt(a_stream, MSO_STREAM_TUNNEL_TYPE, &tunnel_type, &tunnel_type_size); assert(ret == 0); if (tunnel_type == STREAM_TUNNLE_NON) { layer_addr_ntop_r(a_stream, nest_addr_buf, sizeof(nest_addr_buf)); } else { stream_addr_list_ntop(a_stream, nest_addr_buf, sizeof(nest_addr_buf)); } cJSON_AddStringToObject(json_obj, "addr_list", nest_addr_buf); return 0; } void ntc_ssl_collect_send_kafka_log(rd_kafka_topic_t *topic, struct streaminfo *a_stream, comm_context_t *ctx, char *dpkt_buf, int dpkt_buflen, ssl_stream *a_ssl) { if (a_stream == NULL || a_ssl == NULL) { return; } cJSON *log_obj = cJSON_CreateObject(); //cJSON_AddNumberToObject(log_obj, "service", g_ssl_collect_item.service); cJSON_AddStringToObject(log_obj, "cap_ip", g_ssl_collect_item.local_ip_str); cJSON_AddNumberToObject(log_obj, "entrance_id", g_ssl_collect_item.entry_id); cJSON_AddNumberToObject(log_obj, "found_time", g_CurrentTime); cJSON_AddNumberToObject(log_obj, "recv_time", g_CurrentTime); ntc_addStreamInfo_to_jsonObj(log_obj, a_stream); char tmp[128]; sprintf(tmp, "%llu", ctx->c2s_pkts); cJSON_AddStringToObject(log_obj, "c2s_pkt_num", tmp); sprintf(tmp, "%llu", ctx->s2c_pkts); cJSON_AddStringToObject(log_obj, "s2c_pkt_num", tmp); sprintf(tmp, "%llu", ctx->c2s_bytes); cJSON_AddStringToObject(log_obj, "c2s_byte_num", tmp); sprintf(tmp, "%llu", ctx->s2c_bytes); cJSON_AddStringToObject(log_obj, "s2c_byte_num", tmp); if (a_ssl->stClientHello != NULL && a_ssl->stClientHello->server_name != NULL) { cJSON_AddStringToObject(log_obj, "SNI", (const char *)a_ssl->stClientHello->server_name); } if (dpkt_buf != NULL && dpkt_buflen > 0) { cJSON_AddStringToObject(log_obj, "app_label", dpkt_buf); } cJSON_AddNumberToObject(log_obj, "create_time", a_stream->ptcpdetail->createtime); cJSON_AddNumberToObject(log_obj, "lastmtime", a_stream->ptcpdetail->lastmtime); if (ctx->server_cert != NULL) { cJSON_AddItemToObject(log_obj, "server_cert", ctx->server_cert); } if (ctx->client_cert != NULL) { cJSON_AddItemToObject(log_obj, "client_cert", ctx->client_cert); } if (ctx->server_ciphersuits != NULL) { cJSON_AddStringToObject(log_obj, "server_ciphersuits", ctx->server_ciphersuits); } if (ctx->client_ciphersuits != NULL) { cJSON_AddStringToObject(log_obj, "client_ciphersuits", ctx->client_ciphersuits); } char *payload = cJSON_PrintUnformatted(log_obj); if(payload != NULL) { int paylen = strlen(payload); if (g_ssl_collect_item.comm_log_mode > 0) { rd_kafka_produce(g_ntc_kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, payload, paylen, NULL, 0, NULL); } MESA_handle_runtime_log(g_ssl_collect_item.log_handle, RLOG_LV_DEBUG, __FUNCTION__, "%s", payload); free(payload); } cJSON_Delete(log_obj); log_obj = NULL; return; } int ntc_get_dpkt_label(struct streaminfo *a_stream, const char *label_name, char *label_buf, int *label_buflen) { dpkt_lable_t *dpkt_info = (dpkt_lable_t *)project_req_get_struct(a_stream, g_ssl_collect_item.dpkt_project_id); if (dpkt_info == NULL) { *label_buflen = 0; return -1; } snprintf(label_buf, *label_buflen, "PROTO_ID=%u;APP_ID=%u;OS_ID=%u;BS_ID=%u;WEB_ID=%u;BEHAV_ID=%u;", dpkt_info->dpkt_proto_type, dpkt_info->dpkt_app_type, dpkt_info->dpkt_op_type, dpkt_info->dpkt_browser_type, dpkt_info->dpkt_web_type, dpkt_info->dpkt_behavior_type); *label_buflen = strlen(label_buf); return 0; } int nct_get_flow_stat(comm_context_t *ctx, struct streaminfo *a_stream, int flow_id) { struct tcp_flow_stat *tflow_project; struct udp_flow_stat *uflow_project; if (flow_id < 0) { return -1; } if (a_stream->type == STREAM_TYPE_TCP) { tflow_project = (struct tcp_flow_stat *)project_req_get_struct(a_stream, flow_id); if (tflow_project != NULL) { ctx->c2s_bytes = tflow_project->C2S_data_byte; ctx->s2c_bytes = tflow_project->S2C_data_byte; ctx->c2s_pkts = tflow_project->C2S_data_pkt; ctx->s2c_pkts = tflow_project->S2C_data_pkt; } } else if (a_stream->type == STREAM_TYPE_UDP) { uflow_project = (struct udp_flow_stat *)project_req_get_struct(a_stream, flow_id); if (uflow_project != NULL) { ctx->c2s_bytes = uflow_project->C2S_byte; ctx->s2c_bytes = uflow_project->S2C_byte; ctx->c2s_pkts = uflow_project->C2S_pkt; ctx->s2c_pkts = uflow_project->S2C_pkt; } } else { return -1; } return 0; } static char *convert_bin_to_string(const unsigned char *bin_string, unsigned int bin_strlen) { if (bin_string == NULL || bin_strlen <= 0) return NULL; int out_len = bin_strlen * 2 + 1; char *out_buf = (char *)calloc(out_len, 1); for (unsigned int i = 0; i < bin_strlen; i++) { sprintf(out_buf+(i*2), "%02X", bin_string[i]); } return out_buf; } int ntc_ssl_store_cert(comm_context_t *ctx, struct streaminfo *a_stream, ssl_stream *a_ssl) { #if DEBUG struct timespec start; record_time_start(&start); #endif st_cert_t *cert = a_ssl->stSSLCert; if (cert == NULL) return -1; if (cert->cert_type != CERT_TYPE_INDIVIDUAL) { return 0; } cJSON *cert_obj = cJSON_CreateObject(); #if DEBUG printf("SSL_COLLECT JSON create using %ld(us)\n", record_time_elapse_us(&start)); #endif if (strlen(cert->SSLVersion) > 0) cJSON_AddStringToObject(cert_obj, "version", cert->SSLVersion); if (strlen(cert->SSLSerialNum) > 0) { //char serial_buf[512] = "0x"; //for (unsigned int i = 0; i < strlen(cert->SSLSerialNum); i++) //{ // sprintf(serial_buf, "%s%02X", serial_buf, cert->SSLSerialNum[i]); //} char *serial_buf = convert_bin_to_string((const unsigned char*)cert->SSLSerialNum, strlen(cert->SSLSerialNum)); cJSON_AddStringToObject(cert_obj, "serial_number", serial_buf); free(serial_buf); serial_buf = NULL; } #if DEBUG printf("SSL_COLLECT serial_number using %ld(us)\n", record_time_elapse_us(&start)); #endif if (strlen(cert->SSLFPAg) > 0) cJSON_AddStringToObject(cert_obj, "algorithm", cert->SSLFPAg); if (strlen(cert->SSLAgID) > 0) cJSON_AddStringToObject(cert_obj, "algorithm_id", cert->SSLAgID); if (strlen(cert->SSLIssuer) > 0) cJSON_AddStringToObject(cert_obj, "issuer", cert->SSLIssuer); if (strlen(cert->SSLIssuerC) > 0) cJSON_AddStringToObject(cert_obj, "issue_country", cert->SSLIssuerC); if (strlen(cert->SSLIssuerO) > 0) cJSON_AddStringToObject(cert_obj, "issue_organize", cert->SSLIssuerO); if (strlen(cert->SSLIssuerCN) > 0) cJSON_AddStringToObject(cert_obj, "issue_cname", cert->SSLIssuerCN); if (strlen(cert->SSLSub) > 0) cJSON_AddStringToObject(cert_obj, "sub", cert->SSLSub); if (strlen(cert->SSLSubC) > 0) cJSON_AddStringToObject(cert_obj, "sub_country", cert->SSLSubC); if (strlen(cert->SSLSubO) > 0) cJSON_AddStringToObject(cert_obj, "sub_organize", cert->SSLSubO); if (strlen(cert->SSLSubCN) > 0) cJSON_AddStringToObject(cert_obj, "sub_cname", cert->SSLSubCN); if (strlen(cert->SSLFrom) > 0) cJSON_AddStringToObject(cert_obj, "start_time", cert->SSLFrom); if (strlen(cert->SSLTo) > 0) cJSON_AddStringToObject(cert_obj, "expire_time", cert->SSLTo); #if DEBUG printf("SSL_COLLECT expre_time using %ld(us)\n", record_time_elapse_us(&start)); #endif if (g_ssl_collect_item.collect_san_sw > 0) { if (cert->SSLSubAltName != NULL && cert->SSLSubAltName->count > 0) { if(g_ssl_collect_item.collect_san_sw == 1) { int san_buf_len1 = 0, san_buf_len2 = 0; char *san_buf = NULL; for(int i =0;i < cert->SSLSubAltName->count; i++) { if(i == 0) { san_buf_len1 = strlen(cert->SSLSubAltName->san_array[i].san); san_buf = (char*)dictator_malloc(a_stream->threadnum,san_buf_len1); memcpy(san_buf,cert->SSLSubAltName->san_array[i].san,san_buf_len1); } else { san_buf_len2 = san_buf_len1; san_buf_len1 += strlen(cert->SSLSubAltName->san_array[i].san); san_buf_len1 += 1; san_buf = (char*)dictator_realloc(a_stream->threadnum,san_buf,san_buf_len1); memcpy(san_buf+san_buf_len2,";",strlen(";")); memcpy(san_buf+san_buf_len2+1,cert->SSLSubAltName->san_array[i].san,strlen(cert->SSLSubAltName->san_array[i].san)); } } cJSON_AddStringToObject(cert_obj, "san", san_buf); free(san_buf); san_buf = NULL; } else { //cJSON *san = cJSON_CreateStringArray((const char **)cert->SSLSubAltName->san_array, cert->SSLSubAltName->count); cJSON *san = cJSON_CreateArray(); cJSON *member = NULL; for (int i = 0; i < cert->SSLSubAltName->count; i++) { member = cJSON_CreateString(cert->SSLSubAltName->san_array[i].san); cJSON_AddItemToArray(san, member); } cJSON_AddItemToObject(cert_obj, "san_array", san); } } #if DEBUG printf("SSL_COLLECT san using %ld(us)\n", record_time_elapse_us(&start)); #endif } if (a_stream->curdir == DIR_C2S) { ctx->client_cert = cert_obj; } else if (a_stream->curdir == DIR_S2C) { ctx->server_cert = cert_obj; } else { cJSON_Delete(cert_obj); cert_obj = NULL; return -1; } #if DEBUG printf("SSL_COLLECT JSON finish using %ld(us)\n", record_time_elapse_us(&start)); #endif return 1; } void ntc_ssl_collect_close(comm_context_t *ctx, struct streaminfo *a_stream, ssl_stream *a_ssl) { char label_buf[128] = {0}; int label_buflen = sizeof(label_buf); nct_get_flow_stat(ctx, a_stream, g_ssl_collect_item.tcp_flow_id); ntc_get_dpkt_label(a_stream, g_ssl_collect_item.dpkt_label, label_buf, &label_buflen); ntc_ssl_collect_send_kafka_log(g_ntc_kafka_topic, a_stream, ctx, label_buf, label_buflen, a_ssl); if (ctx != NULL) { if (ctx->server_ciphersuits != NULL) { free(ctx->server_ciphersuits); ctx->server_ciphersuits = NULL; } if (ctx->client_ciphersuits != NULL) { free(ctx->client_ciphersuits); ctx->client_ciphersuits = NULL; } free(ctx); ctx = NULL; } return; } extern "C" UCHAR ntc_ssl_collect_entry(stSessionInfo *session_info, void **param, int thread_seq, struct streaminfo *a_stream, void *a_packet) { #if DEBUG struct timespec start; record_time_start(&start); #endif unsigned char ret = PROT_STATE_GIVEME; comm_context_t *ctx = NULL; if ((session_info->session_state & SESSION_STATE_PENDING) == SESSION_STATE_PENDING) { *param = calloc(sizeof(comm_context_t), 1); } ctx = (comm_context_t *)*param; ssl_stream *a_ssl = (ssl_stream *)session_info->app_info; switch (session_info->prot_flag) { case SSL_CLIENT_HELLO: if (a_ssl != NULL && a_ssl->stClientHello != NULL) { //if (a_ssl->stClientHello->ciphersuits.suite_len > 0 && a_ssl->stClientHello->ciphersuits.suite_value != NULL) //{ //ctx->client_ciphersuits = ssl_get_suite(&a_ssl->stClientHello->ciphersuits); //} ctx->client_ciphersuits = convert_bin_to_string(a_ssl->stClientHello->ciphersuits.suite_value, a_ssl->stClientHello->ciphersuits.suite_len); #if DEBUG printf("SSL_COLLECT client_hello using %ld(us)\n", record_time_elapse_us(&start)); #endif } break; case SSL_SERVER_HELLO: if (a_ssl != NULL && a_ssl->stServerHello != NULL) { //if (a_ssl->stServerHello->ciphersuits.suite_len > 0 && a_ssl->stServerHello->ciphersuits.suite_value != NULL) //{ // ctx->server_ciphersuits = ssl_get_suite(&a_ssl->stServerHello->ciphersuits); //} ctx->server_ciphersuits = convert_bin_to_string(a_ssl->stServerHello->ciphersuits.suite_value, a_ssl->stServerHello->ciphersuits.suite_len); #if DEBUG printf("SSL_COLLECT server_hello using %ld(us)\n", record_time_elapse_us(&start)); #endif } break; case SSL_CERTIFICATE_DETAIL: if (a_ssl != NULL) { ntc_ssl_store_cert(ctx, a_stream, a_ssl); #if DEBUG printf("SSL_COLLECT cert using %ld(us)\n", record_time_elapse_us(&start)); #endif } break; case SSL_APPLICATION_DATA: if (g_ssl_collect_item.collect_all_sw == 0) { ntc_ssl_collect_close(ctx, a_stream, a_ssl); ret = PROT_STATE_DROPME; #if DEBUG printf("SSL_COLLECT close using %ld(us)\n", record_time_elapse_us(&start)); #endif } break; default: break; } if ((session_info->session_state & SESSION_STATE_CLOSE) == SESSION_STATE_CLOSE && ret != PROT_STATE_DROPME) { ntc_ssl_collect_close(ctx, a_stream, (ssl_stream *)session_info->app_info); ret = PROT_STATE_DROPME; #if DEBUG printf("SSL_COLLECT close using %ld(us)\n", record_time_elapse_us(&start)); #endif } return ret; } int ntc_ssl_collect_load_profile() { MESA_load_profile_string_def(PROFILE_PATH, PLUGIN_NAME, "log_path", g_ssl_collect_item.log_path, sizeof(g_ssl_collect_item.log_path), "./ntclog/ip_comm_log"); MESA_load_profile_uint_def(PROFILE_PATH, PLUGIN_NAME, "log_level", &g_ssl_collect_item.log_level, 30); char nic_name[64]; MESA_load_profile_string_def(PROFILE_PATH, "SYSTEM", "NIC_NAME", nic_name, sizeof(nic_name), "eth0"); g_ssl_collect_item.local_ip_nr = get_ip_by_eth_name(nic_name); if (g_ssl_collect_item.local_ip_nr == INADDR_NONE) { printf("get_ip_by_eth_name in %s return NULL, exit!\n", nic_name); return -1; } inet_ntop(AF_INET, &(g_ssl_collect_item.local_ip_nr), g_ssl_collect_item.local_ip_str, sizeof(g_ssl_collect_item.local_ip_str)); MESA_load_profile_int_def(PROFILE_PATH, "SYSTEM", "ENTRANCE_ID", &(g_ssl_collect_item.entry_id), 0); MESA_load_profile_int_def(PROFILE_PATH, PLUGIN_NAME, "service", &g_ssl_collect_item.service, 0); MESA_load_profile_int_def(PROFILE_PATH, PLUGIN_NAME, "collect_all_sw", &g_ssl_collect_item.collect_all_sw, 0); MESA_load_profile_int_def(PROFILE_PATH, PLUGIN_NAME, "collect_san_sw", &g_ssl_collect_item.collect_san_sw, 0); MESA_load_profile_string_def(PROFILE_PATH, PLUGIN_NAME, "dpkt_label", g_ssl_collect_item.dpkt_label, sizeof(g_ssl_collect_item.dpkt_label), "DPKT_PROJECT"); return 0; } int ntc_ssl_collect_kaka_init() { char kafka_errstr[1024]; MESA_load_profile_uint_def(PROFILE_PATH, PLUGIN_NAME, "kafka_mode", &g_ssl_collect_item.comm_log_mode, 3); MESA_load_profile_string_def(PROFILE_PATH, PLUGIN_NAME, "kafka_topic", g_ssl_collect_item.kafka_topic, sizeof(g_ssl_collect_item.kafka_topic), NTC_SSL_COLLECT_TOPIC); if (g_ssl_collect_item.comm_log_mode == 0) return 0; if ((g_ssl_collect_item.comm_log_mode & INDIE_KAFKA) == INDIE_KAFKA) { if (0 > MESA_load_profile_string_nodef(PROFILE_PATH, PLUGIN_NAME, "kafka_brokelist", g_ssl_collect_item.kafka_brokelist, sizeof(g_ssl_collect_item.kafka_brokelist))) { return -1; } rd_kafka_conf_t *rdkafka_conf = rd_kafka_conf_new(); rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr)); rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000", kafka_errstr, sizeof(kafka_errstr)); rd_kafka_conf_set(rdkafka_conf, "security.protocol", "MG", kafka_errstr, sizeof(kafka_errstr)); if (!(g_ntc_kafka_handle = rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr)))) { return -1; } if (rd_kafka_brokers_add(g_ntc_kafka_handle, g_ssl_collect_item.kafka_brokelist) == 0) { return -1; } } else { if (0 > MESA_load_profile_string_nodef(PROFILE_PATH, PLUGIN_NAME, "kafka_handle_provide_path", g_ssl_collect_item.kafka_handle_provide_path, sizeof(g_ssl_collect_item.kafka_handle_provide_path))) { return -1; } void *dl_handle = dlopen(g_ssl_collect_item.kafka_handle_provide_path, RTLD_NOW | RTLD_GLOBAL); if (dl_handle == NULL) { return -1; } if (0 > MESA_load_profile_string_nodef(PROFILE_PATH, PLUGIN_NAME, "kafka_handle_name", g_ssl_collect_item.kafka_handle_name, sizeof(g_ssl_collect_item.kafka_handle_name))) { return -1; } void *dl_return = dlsym(dl_handle, g_ssl_collect_item.kafka_handle_name); if (dl_return == NULL) { return -1; } g_ntc_kafka_handle = *(rd_kafka_t **)dl_return; if (g_ntc_kafka_handle == NULL) { return -1; } } rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new(); g_ntc_kafka_topic = rd_kafka_topic_new(g_ntc_kafka_handle, g_ssl_collect_item.kafka_topic, topic_conf); return 0; } extern "C" int ntc_ssl_collect_init() { memset(&g_ssl_collect_item, 0, sizeof(g_ssl_collect_item)); if (0 != ntc_ssl_collect_load_profile()) { return ERROR; } g_ssl_collect_item.log_handle = MESA_create_runtime_log_handle(g_ssl_collect_item.log_path, g_ssl_collect_item.log_level); if (g_ssl_collect_item.log_handle == NULL) return ERROR; g_ssl_collect_item.dpkt_project_id = project_customer_register(g_ssl_collect_item.dpkt_label, "struct"); g_ssl_collect_item.tcp_flow_id = project_customer_register("tcp_flow_stat", "struct"); if (0 != ntc_ssl_collect_kaka_init()) return ERROR; return OK; } extern "C" void ntc_ssl_collect_destroy(void) { MESA_destroy_runtime_log_handle(g_ssl_collect_item.log_handle); return; }