summaryrefslogtreecommitdiff
path: root/src/ntc_ip_comm.cpp
diff options
context:
space:
mode:
author[email protected] <[email protected]>2019-04-23 12:20:05 +0800
committer[email protected] <[email protected]>2019-04-23 12:20:05 +0800
commitc6a5d4ed1b042980051567b4dff9fd930e7defa1 (patch)
tree8c2b20e705702206089c3b5cefb81a1e703c33e8 /src/ntc_ip_comm.cpp
parentef55f76fbdbb455881700cbfe0074ec40b3ecc3b (diff)
增加cmakelist以及自动版本号
Diffstat (limited to 'src/ntc_ip_comm.cpp')
-rw-r--r--src/ntc_ip_comm.cpp506
1 files changed, 506 insertions, 0 deletions
diff --git a/src/ntc_ip_comm.cpp b/src/ntc_ip_comm.cpp
new file mode 100644
index 0000000..b758995
--- /dev/null
+++ b/src/ntc_ip_comm.cpp
@@ -0,0 +1,506 @@
+#include "ntc_ip_comm.h"
+#include "soq_sendlog.h"
+
+
+#include <assert.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <netinet/ip.h>
+#include <netinet/ip6.h>
+#include <netinet/udp.h>
+
+#include <net/if.h>
+#include <sys/types.h>
+#include <sys/ioctl.h>
+#include <dlfcn.h>
+
+
+#include "MESA_prof_load.h"
+#include "MESA_handle_logger.h"
+#include "field_stat2.h"
+#include <rdkafka.h>
+#include <cJSON.h>
+
+#define PLUGIN_NAME "NTC_IP_COMM"
+#define PROFILE_PATH "./t1conf/main.conf"
+#define NTC_IP_COMM_TOPIC "ntc_ip_comm_log"
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+#define GIT_VERSION_CATTER(v) __attribute__((__used__)) const char * GIT_VERSION_##v = NULL
+#define GIT_VERSION_EXPEND(v) GIT_VERSION_CATTER(v)
+
+/* VERSION TAG */
+#ifdef GIT_VERSION
+GIT_VERSION_EXPEND(GIT_VERSION);
+#else
+static __attribute__((__used__)) const char * GIT_VERSION_UNKNOWN = NULL;
+#endif
+#undef GIT_VERSION_CATTER
+#undef GIT_VERSION_EXPEND
+
+#ifdef __cplusplus
+}
+#endif
+
+extern long long g_CurrentTime;
+
+ntc_ip_comm_global_item g_ntc_ip_comm_item;
+
+int NTC_IP_COMM_VERSION_20180625 = 1;
+
+rd_kafka_t *g_ntc_ip_comm_kafka_handle = NULL;
+rd_kafka_topic_t *g_ntc_ip_comm_kafka_topic = NULL;
+
+static unsigned int get_ip_by_eth_name(const char *ifname)
+{
+ int sockfd;
+ struct ifreq ifr;
+ unsigned int ip;
+
+ sockfd = socket(AF_INET, SOCK_DGRAM, 0);
+ if (-1 == sockfd) {
+ goto error;
+ }
+
+ strcpy(ifr.ifr_name,ifname);
+ if (ioctl(sockfd, SIOCGIFADDR, &ifr) < 0) {
+ goto error;
+ }
+
+ ip = ((struct sockaddr_in*)&(ifr.ifr_addr))->sin_addr.s_addr;
+ close(sockfd);
+ return ip;
+
+error:
+ close(sockfd);
+ return INADDR_NONE;
+}
+
+static int soq_addStreamInfo_to_jsonObj(cJSON *json_obj, const struct streaminfo *a_stream)
+{
+ int ret = 0;
+ const char *addr_proto = NULL;
+ const char* null_addr="0";
+ short null_port=0;
+ unsigned short tunnel_type=0;
+ char nest_addr_buf[1024] = {0};
+ int tunnel_type_size=sizeof(tunnel_type);
+ const struct layer_addr *addr=NULL;
+ char src_ip_str[128] = {0}, dst_ip_str[128] = {0};
+
+ cJSON_AddNumberToObject(json_obj, "stream_dir", a_stream->dir);
+
+ addr=&(a_stream->addr);
+ switch(addr->addrtype)
+ {
+ case ADDR_TYPE_IPV4:
+ case __ADDR_TYPE_IP_PAIR_V4:
+ inet_ntop(AF_INET, &addr->ipv4->saddr, src_ip_str, sizeof(src_ip_str));
+ inet_ntop(AF_INET, &addr->ipv4->daddr, dst_ip_str, sizeof(dst_ip_str));
+ cJSON_AddStringToObject(json_obj, "s_ip", src_ip_str);
+ cJSON_AddStringToObject(json_obj, "d_ip", dst_ip_str);
+ cJSON_AddNumberToObject(json_obj, "s_port", ntohs(addr->ipv4->source));
+ cJSON_AddNumberToObject(json_obj, "d_port", ntohs(addr->ipv4->dest));
+ break;
+ case ADDR_TYPE_IPV6:
+ case __ADDR_TYPE_IP_PAIR_V6:
+ cJSON_AddNumberToObject(json_obj, "addr_type", addr->addrtype);
+ inet_ntop(AF_INET6, addr->ipv6->saddr, src_ip_str, sizeof(src_ip_str));
+ inet_ntop(AF_INET6, addr->ipv6->daddr, dst_ip_str, sizeof(dst_ip_str));
+ cJSON_AddStringToObject(json_obj, "s_ip", src_ip_str);
+ cJSON_AddStringToObject(json_obj, "d_ip", dst_ip_str);
+ cJSON_AddNumberToObject(json_obj, "s_port", ntohs(addr->ipv6->source));
+ cJSON_AddNumberToObject(json_obj, "d_port", ntohs(addr->ipv6->dest));
+ break;
+ case ADDR_TYPE_VLAN:
+ case ADDR_TYPE_GRE:
+ case ADDR_TYPE_MPLS:
+ case ADDR_TYPE_PPPOE_SES:
+ case ADDR_TYPE_L2TP:
+ case ADDR_TYPE_PPP:
+ cJSON_AddNumberToObject(json_obj, "addr_type", addr->addrtype);
+ cJSON_AddStringToObject(json_obj, "s_ip", null_addr);
+ cJSON_AddStringToObject(json_obj, "d_ip", null_addr);
+ cJSON_AddNumberToObject(json_obj, "s_port", null_port);
+ cJSON_AddNumberToObject(json_obj, "d_port", null_port);
+ break;
+ case ADDR_TYPE_PPTP:
+ cJSON_AddNumberToObject(json_obj, "addr_type", addr->addrtype);
+ cJSON_AddStringToObject(json_obj, "s_ip", null_addr);
+ cJSON_AddStringToObject(json_obj, "d_ip", null_addr);
+ cJSON_AddNumberToObject(json_obj, "s_port", ntohs(addr->pptp->C2S_call_id));
+ cJSON_AddNumberToObject(json_obj, "d_port", ntohs(addr->pptp->S2C_call_id));
+ break;
+ default:
+ break;
+ }
+
+ addr_proto = layer_addr_prefix_ntop(a_stream);
+ cJSON_AddStringToObject(json_obj, "trans_proto", addr_proto);
+
+ ret=MESA_get_stream_opt(a_stream, MSO_STREAM_TUNNEL_TYPE, &tunnel_type, &tunnel_type_size);
+ assert(ret==0);
+ if(tunnel_type==STREAM_TUNNLE_NON)
+ {
+ layer_addr_ntop_r(a_stream,nest_addr_buf, sizeof(nest_addr_buf));
+ }
+ else
+ {
+ stream_addr_list_ntop(a_stream,nest_addr_buf, sizeof(nest_addr_buf));
+ }
+ if(strlen(nest_addr_buf) > 0)
+ {
+ cJSON_AddStringToObject(json_obj, "addr_list", nest_addr_buf);
+ }
+
+ return 0;
+}
+
+
+void ntc_ip_comm_send_kafka_log(rd_kafka_topic_t *topic, struct streaminfo *a_stream, comm_context_t *ctx, char *dpkt_buf, int dpkt_buflen)
+{
+ cJSON *log_obj = cJSON_CreateObject();
+
+ cJSON_AddNumberToObject(log_obj, "service", g_ntc_ip_comm_item.service);
+ cJSON_AddStringToObject(log_obj, "cap_ip", g_ntc_ip_comm_item.local_ip_str);
+ cJSON_AddNumberToObject(log_obj, "entrance_id", g_ntc_ip_comm_item.entry_id);
+
+ cJSON_AddNumberToObject(log_obj, "found_time", g_CurrentTime);
+ cJSON_AddNumberToObject(log_obj, "recv_time", g_CurrentTime);
+ soq_addStreamInfo_to_jsonObj(log_obj, a_stream);
+
+ char tmp[128];
+ sprintf(tmp, "%llu", ctx->c2s_pkts);
+ cJSON_AddStringToObject(log_obj, "c2s_pkt_num", tmp);
+ sprintf(tmp, "%llu", ctx->s2c_pkts);
+ cJSON_AddStringToObject(log_obj, "s2c_pkt_num", tmp);
+
+ sprintf(tmp, "%llu", ctx->c2s_bytes);
+ cJSON_AddStringToObject(log_obj, "c2s_byte_num", tmp);
+
+ sprintf(tmp, "%llu", ctx->s2c_bytes);
+ cJSON_AddStringToObject(log_obj, "s2c_byte_num", tmp);
+ if(dpkt_buf != NULL && dpkt_buflen > 0)
+ {
+ cJSON_AddStringToObject(log_obj, "app_label", dpkt_buf);
+ }
+
+ cJSON_AddNumberToObject(log_obj, "create_time", a_stream->ptcpdetail->createtime);
+ cJSON_AddNumberToObject(log_obj, "lastmtime", a_stream->ptcpdetail->lastmtime);
+
+ char user_region_buf[4096] = " ";
+ snprintf(user_region_buf,sizeof(user_region_buf), "thread=%d;index=%d;hash=%d;", a_stream->threadnum, a_stream->stream_index, a_stream->hash_index);
+ //cJSON_AddNumberToObject(log_obj, "user_region", a_stream->threadnum);
+ cJSON_AddStringToObject(log_obj, "user_region", user_region_buf);
+
+ //char *payload = cJSON_Print(log_obj);
+ char *payload = cJSON_PrintUnformatted(log_obj);
+ int paylen = strlen(payload);
+ if(g_ntc_ip_comm_item.comm_log_mode > 0)
+ {
+ rd_kafka_produce(g_ntc_ip_comm_kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, payload, paylen, NULL, 0, NULL);
+ }
+ MESA_handle_runtime_log(g_ntc_ip_comm_item.log_handle, RLOG_LV_DEBUG, __FUNCTION__ , "%s", payload);
+ free(payload);
+ cJSON_Delete(log_obj);
+ log_obj = NULL;
+ return ;
+}
+
+void ntc_ip_comm_send_ntc_log(struct streaminfo *a_stream, comm_context_t *ctx, char *dpkt_buf, int dpkt_buflen)
+{
+ soq_log_t log_msg;
+ Maat_rule_t maat_rule;
+ struct opt_unit_t opts[5];
+ int opt_num = 4;
+
+ char c2s_pkt[128] = "";
+ char s2c_pkt[128] = "";
+ char c2s_byte[128] = "";
+ char s2c_byte[128] = "";
+
+ memset(&maat_rule, 0, sizeof(maat_rule));
+ maat_rule.action = SOQ_ACTION_MONITOR;
+ maat_rule.config_id = 0;
+ maat_rule.service_id = g_ntc_ip_comm_item.service;
+ maat_rule.do_log = 1;
+ snprintf(maat_rule.service_defined,sizeof(maat_rule.service_defined),"thread=%d;index=%d;hash=%d;", a_stream->threadnum, a_stream->stream_index, a_stream->hash_index);
+ log_msg.stream = a_stream;
+ log_msg.result = &maat_rule;
+ log_msg.result_num =1;
+
+ sprintf(c2s_byte, "%llu", ctx->c2s_bytes);
+ opts[0].opt_len = strlen(c2s_byte);
+ opts[0].opt_type =LOG_OPT_C2S_BYTE_NUM ;
+ opts[0].opt_value = c2s_byte;
+
+ sprintf(s2c_byte, "%llu", ctx->s2c_bytes);
+ opts[1].opt_len = strlen(s2c_byte);
+ opts[1].opt_type =LOG_OPT_S2C_BYTE_NUM ;
+ opts[1].opt_value = s2c_byte;
+
+ sprintf(c2s_pkt, "%llu", ctx->c2s_pkts);
+ opts[2].opt_len = strlen(c2s_pkt);
+ opts[2].opt_type =LOG_OPT_C2S_PKT_NUM ;
+ opts[2].opt_value = c2s_pkt;
+
+ sprintf(s2c_pkt, "%llu", ctx->s2c_pkts);
+ opts[3].opt_len = strlen(s2c_pkt);
+ opts[3].opt_type =LOG_OPT_S2C_PKT_NUM ;
+ opts[3].opt_value = s2c_pkt;
+
+ if(dpkt_buf != NULL && dpkt_buflen > 0)
+ {
+ opts[4].opt_len = dpkt_buflen;
+ opts[4].opt_type =LOG_OPT_APP_LABEL ;
+ opts[4].opt_value =dpkt_buf;
+ opt_num+=1;
+ }
+ soq_send_log(&log_msg, opts, opt_num, a_stream->threadnum);
+ return;
+}
+
+
+int ntc_ip_comm_get_dpkt_label(struct streaminfo *a_stream, const char* label_name, char *label_buf, int *label_buflen)
+{
+ dpkt_lable_t *dpkt_info = (dpkt_lable_t*)project_req_get_struct(a_stream,g_ntc_ip_comm_item.dpkt_project_id);
+ if(dpkt_info == NULL)
+ {
+ *label_buflen = 0;
+ return -1;
+ }
+ snprintf(label_buf, *label_buflen, "PROTO_ID=%u;APP_ID=%u;OS_ID=%u;BS_ID=%u;WEB_ID=%u;BEHAV_ID=%u;",
+ dpkt_info->dpkt_proto_type,
+ dpkt_info->dpkt_app_type,
+ dpkt_info->dpkt_op_type,
+ dpkt_info->dpkt_browser_type,
+ dpkt_info->dpkt_web_type,
+ dpkt_info->dpkt_behavior_type);
+ *label_buflen = strlen(label_buf);
+ return 0;
+}
+
+void ntc_ip_comm_update_counter(comm_context_t *ctx, struct streaminfo *a_stream)
+{
+ if(a_stream->curdir == DIR_C2S)
+ {
+ ctx->c2s_bytes+=a_stream->ptcpdetail->datalen;
+ if(a_stream->ptcpdetail->datalen > 0)ctx->c2s_pkts+=1;
+ }
+ if(a_stream->curdir == DIR_S2C)
+ {
+ ctx->s2c_bytes+=a_stream->ptcpdetail->datalen;
+ if(a_stream->ptcpdetail->datalen > 0)ctx->s2c_pkts+=1;
+ }
+ return;
+}
+
+void ntc_ip_comm_judge_counter(comm_context_t *ctx, struct streaminfo *a_stream)
+{
+ struct tcp_flow_stat *tflow_project;
+ struct udp_flow_stat *uflow_project;
+ comm_context_t sapp_ctx;
+ memset(&sapp_ctx, 0, sizeof(sapp_ctx));
+ if(-1 != g_ntc_ip_comm_item.tcp_flow_id && a_stream->type == STREAM_TYPE_TCP)
+ {
+ tflow_project = (struct tcp_flow_stat *)project_req_get_struct(a_stream, g_ntc_ip_comm_item.tcp_flow_id);
+ if(tflow_project != NULL)
+ {
+ sapp_ctx.c2s_bytes = tflow_project->C2S_data_byte;
+ sapp_ctx.s2c_bytes = tflow_project->S2C_data_byte;
+ sapp_ctx.c2s_pkts = tflow_project->C2S_data_pkt;
+ sapp_ctx.s2c_pkts = tflow_project->S2C_data_pkt;
+ }
+ }
+
+ if(-1 != g_ntc_ip_comm_item.udp_flow_id && a_stream->type == STREAM_TYPE_UDP)
+ {
+ uflow_project = (struct udp_flow_stat *)project_req_get_struct(a_stream, g_ntc_ip_comm_item.udp_flow_id);
+ if(uflow_project != NULL)
+ {
+ sapp_ctx.c2s_bytes = uflow_project->C2S_byte;
+ sapp_ctx.s2c_bytes = uflow_project->S2C_byte;
+ sapp_ctx.c2s_pkts = uflow_project->C2S_pkt;
+ sapp_ctx.s2c_pkts = uflow_project->S2C_pkt;
+ }
+ }
+
+ if(memcmp(&sapp_ctx, ctx, sizeof(comm_context_t)) != 0)
+ {
+ MESA_handle_runtime_log(g_ntc_ip_comm_item.log_handle, RLOG_LV_INFO, __FUNCTION__ , "sapp_ctx diff with counter, sapp_ctx:%llu:%llu->%llu:%llu, counter:%llu:%llu->%llu:%llu, %s",
+ sapp_ctx.c2s_pkts, sapp_ctx.c2s_bytes,
+ sapp_ctx.s2c_pkts, sapp_ctx.s2c_bytes,
+ ctx->c2s_pkts, ctx->c2s_bytes,
+ ctx->s2c_pkts, ctx->s2c_bytes,
+ printaddr(&a_stream->addr, a_stream->threadnum));
+ }
+ return;
+}
+
+extern "C" UCHAR ntc_ip_comm_transfer_entry(struct streaminfo *a_stream, void **pme, int thread_seq,const void *raw_pkt)
+{
+ unsigned char ret = APP_STATE_GIVEME;
+ char label_buf[128] = {0};
+ int label_buflen = sizeof(label_buf);
+ comm_context_t *ctx = NULL;
+ switch (a_stream->opstate)
+ {
+ case OP_STATE_PENDING:
+ ctx = (comm_context_t *)calloc(sizeof(comm_context_t ), 1);
+ ntc_ip_comm_update_counter(ctx, a_stream);
+ *pme = ctx;
+ break;
+ case OP_STATE_DATA:
+ ctx = (comm_context_t *)*pme;
+ ntc_ip_comm_update_counter(ctx, a_stream);
+ break;
+ case OP_STATE_CLOSE:
+ if(*pme != NULL)
+ {
+ ctx = (comm_context_t *)*pme;
+ ntc_ip_comm_update_counter(ctx, a_stream);
+ ntc_ip_comm_judge_counter(ctx, a_stream);
+ if(ctx->c2s_bytes+ctx->s2c_bytes >= g_ntc_ip_comm_item.min_bytes &&ctx->c2s_pkts+ctx->s2c_pkts >= g_ntc_ip_comm_item.min_pkts)
+ {
+ ntc_ip_comm_get_dpkt_label(a_stream, g_ntc_ip_comm_item.dpkt_label, label_buf, &label_buflen);
+ if((g_ntc_ip_comm_item.comm_log_mode&SEND_LOG) == SEND_LOG)
+ {
+ ntc_ip_comm_send_ntc_log(a_stream, ctx, label_buf, label_buflen);
+ }
+ //if((g_ntc_ip_comm_item.comm_log_mode&SEND_KAFKA) == SEND_KAFKA)
+ else
+ {
+ ntc_ip_comm_send_kafka_log(g_ntc_ip_comm_kafka_topic, a_stream, ctx, label_buf, label_buflen);
+ }
+ }
+ free(ctx);
+ ctx = NULL;
+ ret= APP_STATE_DROPME;
+ }
+ break;
+ }
+ return ret ;
+}
+
+void ntc_ip_comm_load_profile()
+{
+ MESA_load_profile_string_def(PROFILE_PATH, PLUGIN_NAME, "log_path", g_ntc_ip_comm_item.log_path, sizeof(g_ntc_ip_comm_item.log_path), "./t1log/ip_comm_log");
+ MESA_load_profile_uint_def(PROFILE_PATH,PLUGIN_NAME, "log_level", &g_ntc_ip_comm_item.log_level, 30);
+
+ char nic_name[64];
+ MESA_load_profile_string_def(PROFILE_PATH,"SYSTEM", "NIC_NAME",nic_name,sizeof(nic_name),"eth0");
+ g_ntc_ip_comm_item.local_ip_nr=get_ip_by_eth_name(nic_name);
+ if(g_ntc_ip_comm_item.local_ip_nr==INADDR_NONE)
+ {
+ printf("get_ip_by_eth_name in %s return NULL, exit!\n", nic_name);
+ exit(0);
+ }
+ inet_ntop(AF_INET,&(g_ntc_ip_comm_item.local_ip_nr),g_ntc_ip_comm_item.local_ip_str,sizeof(g_ntc_ip_comm_item.local_ip_str));
+ MESA_load_profile_int_def(PROFILE_PATH,"SYSTEM", "ENTRANCE_ID",&(g_ntc_ip_comm_item.entry_id),0);
+
+
+ MESA_load_profile_uint_def(PROFILE_PATH,PLUGIN_NAME, "min_pkts", &g_ntc_ip_comm_item.min_pkts, 5);
+ MESA_load_profile_uint_def(PROFILE_PATH,PLUGIN_NAME, "min_bytes", &g_ntc_ip_comm_item.min_bytes, 5);
+
+ MESA_load_profile_int_def(PROFILE_PATH,PLUGIN_NAME, "service", &g_ntc_ip_comm_item.service, 0);
+
+ MESA_load_profile_uint_def(PROFILE_PATH,PLUGIN_NAME, "comm_log_mode", &g_ntc_ip_comm_item.comm_log_mode, 0);
+
+ MESA_load_profile_string_def(PROFILE_PATH, PLUGIN_NAME, "dpkt_label", g_ntc_ip_comm_item.dpkt_label, sizeof(g_ntc_ip_comm_item.dpkt_label), "DPKT_PROJECT");
+ return ;
+}
+
+int ntc_ip_comm_kaka_init(int kafka_mode)
+{
+ char kafka_errstr[1024];
+ MESA_load_profile_string_def(PROFILE_PATH, PLUGIN_NAME, "kafka_topic", g_ntc_ip_comm_item.kafka_topic, sizeof(g_ntc_ip_comm_item.kafka_topic), NTC_IP_COMM_TOPIC);
+ if((kafka_mode&INDIE_KAFKA) == INDIE_KAFKA)
+ {
+ if(0 > MESA_load_profile_string_nodef(PROFILE_PATH, PLUGIN_NAME, "kafka_brokelist", g_ntc_ip_comm_item.kafka_brokelist, sizeof(g_ntc_ip_comm_item.kafka_brokelist)))
+ {
+ return -1;
+ }
+ rd_kafka_conf_t *rdkafka_conf = rd_kafka_conf_new();
+ rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000",kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "security.protocol", "MG", kafka_errstr, sizeof(kafka_errstr));
+
+ if (!(g_ntc_ip_comm_kafka_handle = rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr))))
+ {
+ return -1;
+ }
+
+ if (rd_kafka_brokers_add(g_ntc_ip_comm_kafka_handle, g_ntc_ip_comm_item.kafka_brokelist) == 0)
+ {
+ return -1;
+ }
+ }
+ else
+ {
+ if(0 > MESA_load_profile_string_nodef(PROFILE_PATH, PLUGIN_NAME, "kafka_handle_provide_path", g_ntc_ip_comm_item.kafka_handle_provide_path, sizeof(g_ntc_ip_comm_item.kafka_handle_provide_path)))
+ {
+ return -1;
+ }
+ void * dl_handle = dlopen(g_ntc_ip_comm_item.kafka_handle_provide_path, RTLD_NOW|RTLD_GLOBAL);
+ if(dl_handle == NULL)
+ {
+ return -1;
+ }
+ if(0 > MESA_load_profile_string_nodef(PROFILE_PATH, PLUGIN_NAME, "kafka_handle_name", g_ntc_ip_comm_item.kafka_handle_name, sizeof(g_ntc_ip_comm_item.kafka_handle_name)))
+ {
+ return -1;
+ }
+ void *dl_return = dlsym(dl_handle, g_ntc_ip_comm_item.kafka_handle_name);
+
+ if(dl_return == NULL)
+ {
+ return -1;
+ }
+ g_ntc_ip_comm_kafka_handle = *(rd_kafka_t **)dl_return;
+ if(g_ntc_ip_comm_kafka_handle == NULL)
+ {
+ return -1;
+ }
+
+ }
+ rd_kafka_topic_conf_t*ip_comm_topic_conf = rd_kafka_topic_conf_new();
+ g_ntc_ip_comm_kafka_topic = rd_kafka_topic_new(g_ntc_ip_comm_kafka_handle, g_ntc_ip_comm_item.kafka_topic, ip_comm_topic_conf);
+ return 0;
+}
+
+extern "C" int ntc_ip_comm_init()
+{
+ memset(&g_ntc_ip_comm_item, 0, sizeof(g_ntc_ip_comm_item));
+ ntc_ip_comm_load_profile();
+ g_ntc_ip_comm_item.log_handle = MESA_create_runtime_log_handle(g_ntc_ip_comm_item.log_path, g_ntc_ip_comm_item.log_level);
+ if(g_ntc_ip_comm_item.log_handle == NULL)
+ return ERROR;
+
+
+ g_ntc_ip_comm_item.dpkt_project_id = project_customer_register(g_ntc_ip_comm_item.dpkt_label, "struct");
+ g_ntc_ip_comm_item.tcp_flow_id = project_customer_register("tcp_flow_stat", "struct");
+ g_ntc_ip_comm_item.udp_flow_id = project_customer_register("udp_flow_stat", "struct");
+
+ if((g_ntc_ip_comm_item.comm_log_mode&SEND_KAFKA) == SEND_KAFKA)
+ {
+ if(0 != ntc_ip_comm_kaka_init(g_ntc_ip_comm_item.comm_log_mode))
+ return ERROR;
+ }
+ return OK;
+}
+
+
+
+extern "C" void ntc_ip_comm_destroy(void)
+{
+ MESA_destroy_runtime_log_handle(g_ntc_ip_comm_item.log_handle);
+ return;
+}