#include "ntc_ip_comm.h" #include "soq_sendlog.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "MESA_prof_load.h" #include "MESA_handle_logger.h" #include "field_stat2.h" #include "stream_internal.h" #include #include #define PLUGIN_NAME "NTC_IP_COMM" #define PROFILE_PATH "./t1conf/main.conf" #define NTC_IP_COMM_TOPIC "ntc_ip_comm_log" #ifdef __cplusplus extern "C" { #endif #define GIT_VERSION_CATTER(v) __attribute__((__used__)) const char * GIT_VERSION_##v = NULL #define GIT_VERSION_EXPEND(v) GIT_VERSION_CATTER(v) /* VERSION TAG */ #ifdef GIT_VERSION GIT_VERSION_EXPEND(GIT_VERSION); #else static __attribute__((__used__)) const char * GIT_VERSION_UNKNOWN = NULL; #endif #undef GIT_VERSION_CATTER #undef GIT_VERSION_EXPEND #ifdef __cplusplus } #endif extern long long g_CurrentTime; ntc_ip_comm_global_item g_ntc_ip_comm_item; int NTC_IP_COMM_VERSION_20180625 = 1; rd_kafka_t *g_ntc_ip_comm_kafka_handle = NULL; rd_kafka_topic_t *g_ntc_ip_comm_kafka_topic = NULL; static unsigned int get_ip_by_eth_name(const char *ifname) { int sockfd; struct ifreq ifr; unsigned int ip; sockfd = socket(AF_INET, SOCK_DGRAM, 0); if (-1 == sockfd) { goto error; } strcpy(ifr.ifr_name,ifname); if (ioctl(sockfd, SIOCGIFADDR, &ifr) < 0) { goto error; } ip = ((struct sockaddr_in*)&(ifr.ifr_addr))->sin_addr.s_addr; close(sockfd); return ip; error: close(sockfd); return INADDR_NONE; } static int soq_addStreamInfo_to_jsonObj(cJSON *json_obj, const struct streaminfo *a_stream) { int ret = 0; const char *addr_proto = NULL; const char* null_addr="0"; short null_port=0; unsigned short tunnel_type=0; char nest_addr_buf[1024] = {0}; int tunnel_type_size=sizeof(tunnel_type); const struct layer_addr *addr=NULL; char src_ip_str[128] = {0}, dst_ip_str[128] = {0}; cJSON_AddNumberToObject(json_obj, "stream_dir", a_stream->dir); addr=&(a_stream->addr); switch(addr->addrtype) { case ADDR_TYPE_IPV4: case __ADDR_TYPE_IP_PAIR_V4: inet_ntop(AF_INET, &addr->ipv4->saddr, src_ip_str, sizeof(src_ip_str)); inet_ntop(AF_INET, &addr->ipv4->daddr, dst_ip_str, sizeof(dst_ip_str)); cJSON_AddStringToObject(json_obj, "s_ip", src_ip_str); cJSON_AddStringToObject(json_obj, "d_ip", dst_ip_str); cJSON_AddNumberToObject(json_obj, "s_port", ntohs(addr->ipv4->source)); cJSON_AddNumberToObject(json_obj, "d_port", ntohs(addr->ipv4->dest)); break; case ADDR_TYPE_IPV6: case __ADDR_TYPE_IP_PAIR_V6: cJSON_AddNumberToObject(json_obj, "addr_type", addr->addrtype); inet_ntop(AF_INET6, addr->ipv6->saddr, src_ip_str, sizeof(src_ip_str)); inet_ntop(AF_INET6, addr->ipv6->daddr, dst_ip_str, sizeof(dst_ip_str)); cJSON_AddStringToObject(json_obj, "s_ip", src_ip_str); cJSON_AddStringToObject(json_obj, "d_ip", dst_ip_str); cJSON_AddNumberToObject(json_obj, "s_port", ntohs(addr->ipv6->source)); cJSON_AddNumberToObject(json_obj, "d_port", ntohs(addr->ipv6->dest)); break; case ADDR_TYPE_VLAN: case ADDR_TYPE_GRE: case ADDR_TYPE_MPLS: case ADDR_TYPE_PPPOE_SES: case ADDR_TYPE_L2TP: case ADDR_TYPE_PPP: cJSON_AddNumberToObject(json_obj, "addr_type", addr->addrtype); cJSON_AddStringToObject(json_obj, "s_ip", null_addr); cJSON_AddStringToObject(json_obj, "d_ip", null_addr); cJSON_AddNumberToObject(json_obj, "s_port", null_port); cJSON_AddNumberToObject(json_obj, "d_port", null_port); break; case ADDR_TYPE_PPTP: cJSON_AddNumberToObject(json_obj, "addr_type", addr->addrtype); cJSON_AddStringToObject(json_obj, "s_ip", null_addr); cJSON_AddStringToObject(json_obj, "d_ip", null_addr); cJSON_AddNumberToObject(json_obj, "s_port", ntohs(addr->pptp->C2S_call_id)); cJSON_AddNumberToObject(json_obj, "d_port", ntohs(addr->pptp->S2C_call_id)); break; default: break; } addr_proto = layer_addr_prefix_ntop(a_stream); cJSON_AddStringToObject(json_obj, "trans_proto", addr_proto); ret=MESA_get_stream_opt(a_stream, MSO_STREAM_TUNNEL_TYPE, &tunnel_type, &tunnel_type_size); assert(ret==0); if(tunnel_type==STREAM_TUNNLE_NON) { layer_addr_ntop_r(a_stream,nest_addr_buf, sizeof(nest_addr_buf)); } else { stream_addr_list_ntop(a_stream,nest_addr_buf, sizeof(nest_addr_buf)); } if(strlen(nest_addr_buf) > 0) { cJSON_AddStringToObject(json_obj, "addr_list", nest_addr_buf); } return 0; } void ntc_ip_comm_send_kafka_log(rd_kafka_topic_t *topic, struct streaminfo *a_stream, comm_context_t *ctx, char *dpkt_buf, int dpkt_buflen, char *user_buf, int user_buflen) { cJSON *log_obj = cJSON_CreateObject(); cJSON_AddNumberToObject(log_obj, "service", g_ntc_ip_comm_item.service); cJSON_AddStringToObject(log_obj, "cap_ip", g_ntc_ip_comm_item.local_ip_str); cJSON_AddNumberToObject(log_obj, "entrance_id", g_ntc_ip_comm_item.entry_id); cJSON_AddNumberToObject(log_obj, "found_time", g_CurrentTime); cJSON_AddNumberToObject(log_obj, "recv_time", g_CurrentTime); soq_addStreamInfo_to_jsonObj(log_obj, a_stream); char tmp[128]; sprintf(tmp, "%llu", ctx->c2s_pkts); cJSON_AddStringToObject(log_obj, "c2s_pkt_num", tmp); sprintf(tmp, "%llu", ctx->s2c_pkts); cJSON_AddStringToObject(log_obj, "s2c_pkt_num", tmp); sprintf(tmp, "%llu", ctx->c2s_bytes); cJSON_AddStringToObject(log_obj, "c2s_byte_num", tmp); sprintf(tmp, "%llu", ctx->s2c_bytes); cJSON_AddStringToObject(log_obj, "s2c_byte_num", tmp); if(dpkt_buf != NULL && dpkt_buflen > 0) { cJSON_AddStringToObject(log_obj, "app_label", dpkt_buf); } cJSON_AddNumberToObject(log_obj, "create_time", a_stream->ptcpdetail->createtime); cJSON_AddNumberToObject(log_obj, "lastmtime", a_stream->ptcpdetail->lastmtime); if(user_buflen > 0) { cJSON_AddStringToObject(log_obj, "user_region", user_buf); } //char *payload = cJSON_Print(log_obj); char *payload = cJSON_PrintUnformatted(log_obj); int paylen = strlen(payload); if(g_ntc_ip_comm_item.comm_log_mode > 0) { rd_kafka_produce(g_ntc_ip_comm_kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, payload, paylen, NULL, 0, NULL); } MESA_handle_runtime_log(g_ntc_ip_comm_item.log_handle, RLOG_LV_DEBUG, __FUNCTION__ , "%s", payload); free(payload); cJSON_Delete(log_obj); log_obj = NULL; return ; } void ntc_ip_comm_send_ntc_log(struct streaminfo *a_stream, comm_context_t *ctx, char *dpkt_buf, int dpkt_buflen, char *user_buf, int user_buflen) { soq_log_t log_msg; Maat_rule_t maat_rule; struct opt_unit_t opts[5]; int opt_num = 4; char c2s_pkt[128] = ""; char s2c_pkt[128] = ""; char c2s_byte[128] = ""; char s2c_byte[128] = ""; memset(&maat_rule, 0, sizeof(maat_rule)); maat_rule.action = SOQ_ACTION_MONITOR; maat_rule.config_id = 0; maat_rule.service_id = g_ntc_ip_comm_item.service; maat_rule.do_log = 1; if(user_buflen > 0) { snprintf(maat_rule.service_defined,sizeof(maat_rule.service_defined),"%s", user_buf); } log_msg.stream = a_stream; log_msg.result = &maat_rule; log_msg.result_num =1; sprintf(c2s_byte, "%llu", ctx->c2s_bytes); opts[0].opt_len = strlen(c2s_byte); opts[0].opt_type =LOG_OPT_C2S_BYTE_NUM ; opts[0].opt_value = c2s_byte; sprintf(s2c_byte, "%llu", ctx->s2c_bytes); opts[1].opt_len = strlen(s2c_byte); opts[1].opt_type =LOG_OPT_S2C_BYTE_NUM ; opts[1].opt_value = s2c_byte; sprintf(c2s_pkt, "%llu", ctx->c2s_pkts); opts[2].opt_len = strlen(c2s_pkt); opts[2].opt_type =LOG_OPT_C2S_PKT_NUM ; opts[2].opt_value = c2s_pkt; sprintf(s2c_pkt, "%llu", ctx->s2c_pkts); opts[3].opt_len = strlen(s2c_pkt); opts[3].opt_type =LOG_OPT_S2C_PKT_NUM ; opts[3].opt_value = s2c_pkt; if(dpkt_buf != NULL && dpkt_buflen > 0) { opts[4].opt_len = dpkt_buflen; opts[4].opt_type =LOG_OPT_APP_LABEL ; opts[4].opt_value =dpkt_buf; opt_num+=1; } soq_send_log(&log_msg, opts, opt_num, a_stream->threadnum); return; } int ntc_ip_comm_assemble_user_buf(struct streaminfo *a_stream, char *user_buf, int *user_buflen) { if(a_stream == NULL) { *user_buflen = 0; return -1; } int killed_flag = 0; if(g_ntc_ip_comm_item.after_kill_switch == 1) { struct streaminfo_private *pstream_pr = (struct streaminfo_private *)a_stream; if(pstream_pr->stream_killed_flag == 1) { killed_flag = 1; } } snprintf(user_buf, *user_buflen, "thread=%d;index=%d;hash=%d;killed=%d", a_stream->threadnum, a_stream->stream_index, a_stream->hash_index, killed_flag); return 0; } int ntc_ip_comm_get_dpkt_label(struct streaminfo *a_stream, const char* label_name, char *label_buf, int *label_buflen) { dpkt_lable_t *dpkt_info = (dpkt_lable_t*)project_req_get_struct(a_stream,g_ntc_ip_comm_item.dpkt_project_id); if(dpkt_info == NULL) { *label_buflen = 0; return -1; } snprintf(label_buf, *label_buflen, "PROTO_ID=%u;APP_ID=%u;OS_ID=%u;BS_ID=%u;WEB_ID=%u;BEHAV_ID=%u;", dpkt_info->dpkt_proto_type, dpkt_info->dpkt_app_type, dpkt_info->dpkt_op_type, dpkt_info->dpkt_browser_type, dpkt_info->dpkt_web_type, dpkt_info->dpkt_behavior_type); *label_buflen = strlen(label_buf); return 0; } void ntc_ip_comm_update_counter(comm_context_t *ctx, struct streaminfo *a_stream) { if(a_stream->curdir == DIR_C2S) { ctx->c2s_bytes+=a_stream->ptcpdetail->datalen; if(a_stream->ptcpdetail->datalen > 0)ctx->c2s_pkts+=1; } if(a_stream->curdir == DIR_S2C) { ctx->s2c_bytes+=a_stream->ptcpdetail->datalen; if(a_stream->ptcpdetail->datalen > 0)ctx->s2c_pkts+=1; } return; } void ntc_ip_comm_judge_counter(comm_context_t *ctx, struct streaminfo *a_stream) { struct tcp_flow_stat *tflow_project; struct udp_flow_stat *uflow_project; comm_context_t sapp_ctx; memset(&sapp_ctx, 0, sizeof(sapp_ctx)); if(-1 != g_ntc_ip_comm_item.tcp_flow_id && a_stream->type == STREAM_TYPE_TCP) { tflow_project = (struct tcp_flow_stat *)project_req_get_struct(a_stream, g_ntc_ip_comm_item.tcp_flow_id); if(tflow_project != NULL) { sapp_ctx.c2s_bytes = tflow_project->C2S_data_byte; sapp_ctx.s2c_bytes = tflow_project->S2C_data_byte; sapp_ctx.c2s_pkts = tflow_project->C2S_data_pkt; sapp_ctx.s2c_pkts = tflow_project->S2C_data_pkt; } } if(-1 != g_ntc_ip_comm_item.udp_flow_id && a_stream->type == STREAM_TYPE_UDP) { uflow_project = (struct udp_flow_stat *)project_req_get_struct(a_stream, g_ntc_ip_comm_item.udp_flow_id); if(uflow_project != NULL) { sapp_ctx.c2s_bytes = uflow_project->C2S_byte; sapp_ctx.s2c_bytes = uflow_project->S2C_byte; sapp_ctx.c2s_pkts = uflow_project->C2S_pkt; sapp_ctx.s2c_pkts = uflow_project->S2C_pkt; } } if(memcmp(&sapp_ctx, ctx, sizeof(comm_context_t)) != 0) { MESA_handle_runtime_log(g_ntc_ip_comm_item.log_handle, RLOG_LV_INFO, __FUNCTION__ , "sapp_ctx diff with counter, sapp_ctx:%llu:%llu->%llu:%llu, counter:%llu:%llu->%llu:%llu, %s", sapp_ctx.c2s_pkts, sapp_ctx.c2s_bytes, sapp_ctx.s2c_pkts, sapp_ctx.s2c_bytes, ctx->c2s_pkts, ctx->c2s_bytes, ctx->s2c_pkts, ctx->s2c_bytes, printaddr(&a_stream->addr, a_stream->threadnum)); } return; } UCHAR ntc_ip_comm_transfer_process(struct streaminfo *a_stream, void **pme, int thread_seq,const void *raw_pkt, UCHAR opstate) { unsigned char ret = APP_STATE_GIVEME; char label_buf[128] = {0}; char user_buf[128] = {0}; int label_buflen = sizeof(label_buf); int user_buflen = sizeof(user_buf); comm_context_t *ctx = NULL; switch (opstate) { case OP_STATE_PENDING: ctx = (comm_context_t *)calloc(sizeof(comm_context_t ), 1); ntc_ip_comm_update_counter(ctx, a_stream); *pme = ctx; break; case OP_STATE_DATA: ctx = (comm_context_t *)*pme; ntc_ip_comm_update_counter(ctx, a_stream); break; case OP_STATE_CLOSE: if(*pme != NULL) { ctx = (comm_context_t *)*pme; ntc_ip_comm_update_counter(ctx, a_stream); ntc_ip_comm_judge_counter(ctx, a_stream); if(ctx->c2s_bytes+ctx->s2c_bytes >= g_ntc_ip_comm_item.min_bytes &&ctx->c2s_pkts+ctx->s2c_pkts >= g_ntc_ip_comm_item.min_pkts) { ntc_ip_comm_get_dpkt_label(a_stream, g_ntc_ip_comm_item.dpkt_label, label_buf, &label_buflen); ntc_ip_comm_assemble_user_buf(a_stream, user_buf, &user_buflen); if((g_ntc_ip_comm_item.comm_log_mode&SEND_LOG) == SEND_LOG) { ntc_ip_comm_send_ntc_log(a_stream, ctx, label_buf, label_buflen, user_buf, user_buflen); } else { ntc_ip_comm_send_kafka_log(g_ntc_ip_comm_kafka_topic, a_stream, ctx, label_buf, label_buflen, user_buf, user_buflen); } } free(ctx); ctx = NULL; ret= APP_STATE_DROPME; } break; } return ret ; } extern "C" UCHAR ntc_ip_comm_tcpall_entry(struct streaminfo *a_stream, void **pme, int thread_seq,const void *raw_pkt) { if(g_ntc_ip_comm_item.after_kill_switch == 1) { if(a_stream->pktstate == OP_STATE_PENDING) { unsigned char mopt = 1; MESA_set_stream_opt(a_stream, MSO_TCPALL_VALID_AFTER_KILL, &mopt, sizeof(mopt)); } } return ntc_ip_comm_transfer_process(a_stream, pme, thread_seq, raw_pkt, a_stream->pktstate); } extern "C" UCHAR ntc_ip_comm_transfer_entry(struct streaminfo *a_stream, void **pme, int thread_seq,const void *raw_pkt) { return ntc_ip_comm_transfer_process(a_stream, pme, thread_seq, raw_pkt, a_stream->opstate); } void ntc_ip_comm_load_profile() { MESA_load_profile_string_def(PROFILE_PATH, PLUGIN_NAME, "log_path", g_ntc_ip_comm_item.log_path, sizeof(g_ntc_ip_comm_item.log_path), "./t1log/ip_comm_log"); MESA_load_profile_uint_def(PROFILE_PATH,PLUGIN_NAME, "log_level", &g_ntc_ip_comm_item.log_level, 30); char nic_name[64]; MESA_load_profile_string_def(PROFILE_PATH,"SYSTEM", "NIC_NAME",nic_name,sizeof(nic_name),"eth0"); g_ntc_ip_comm_item.local_ip_nr=get_ip_by_eth_name(nic_name); if(g_ntc_ip_comm_item.local_ip_nr==INADDR_NONE) { printf("get_ip_by_eth_name in %s return NULL, exit!\n", nic_name); exit(0); } inet_ntop(AF_INET,&(g_ntc_ip_comm_item.local_ip_nr),g_ntc_ip_comm_item.local_ip_str,sizeof(g_ntc_ip_comm_item.local_ip_str)); MESA_load_profile_int_def(PROFILE_PATH,"SYSTEM", "ENTRANCE_ID",&(g_ntc_ip_comm_item.entry_id),0); MESA_load_profile_uint_def(PROFILE_PATH,PLUGIN_NAME, "min_pkts", &g_ntc_ip_comm_item.min_pkts, 5); MESA_load_profile_uint_def(PROFILE_PATH,PLUGIN_NAME, "min_bytes", &g_ntc_ip_comm_item.min_bytes, 5); MESA_load_profile_int_def(PROFILE_PATH,PLUGIN_NAME, "service", &g_ntc_ip_comm_item.service, 0); MESA_load_profile_int_def(PROFILE_PATH,PLUGIN_NAME, "after_kill_switch", &g_ntc_ip_comm_item.after_kill_switch, 0); MESA_load_profile_uint_def(PROFILE_PATH,PLUGIN_NAME, "comm_log_mode", &g_ntc_ip_comm_item.comm_log_mode, 0); MESA_load_profile_string_def(PROFILE_PATH, PLUGIN_NAME, "dpkt_label", g_ntc_ip_comm_item.dpkt_label, sizeof(g_ntc_ip_comm_item.dpkt_label), "DPKT_PROJECT"); return ; } int ntc_ip_comm_kaka_init(int kafka_mode) { char kafka_errstr[1024]; MESA_load_profile_string_def(PROFILE_PATH, PLUGIN_NAME, "kafka_topic", g_ntc_ip_comm_item.kafka_topic, sizeof(g_ntc_ip_comm_item.kafka_topic), NTC_IP_COMM_TOPIC); if((kafka_mode&INDIE_KAFKA) == INDIE_KAFKA) { if(0 > MESA_load_profile_string_nodef(PROFILE_PATH, PLUGIN_NAME, "kafka_brokelist", g_ntc_ip_comm_item.kafka_brokelist, sizeof(g_ntc_ip_comm_item.kafka_brokelist))) { return -1; } rd_kafka_conf_t *rdkafka_conf = rd_kafka_conf_new(); rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr)); rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000",kafka_errstr, sizeof(kafka_errstr)); rd_kafka_conf_set(rdkafka_conf, "security.protocol", "MG", kafka_errstr, sizeof(kafka_errstr)); if (!(g_ntc_ip_comm_kafka_handle = rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr)))) { return -1; } if (rd_kafka_brokers_add(g_ntc_ip_comm_kafka_handle, g_ntc_ip_comm_item.kafka_brokelist) == 0) { return -1; } } else { if(0 > MESA_load_profile_string_nodef(PROFILE_PATH, PLUGIN_NAME, "kafka_handle_provide_path", g_ntc_ip_comm_item.kafka_handle_provide_path, sizeof(g_ntc_ip_comm_item.kafka_handle_provide_path))) { return -1; } void * dl_handle = dlopen(g_ntc_ip_comm_item.kafka_handle_provide_path, RTLD_NOW|RTLD_GLOBAL); if(dl_handle == NULL) { return -1; } if(0 > MESA_load_profile_string_nodef(PROFILE_PATH, PLUGIN_NAME, "kafka_handle_name", g_ntc_ip_comm_item.kafka_handle_name, sizeof(g_ntc_ip_comm_item.kafka_handle_name))) { return -1; } void *dl_return = dlsym(dl_handle, g_ntc_ip_comm_item.kafka_handle_name); if(dl_return == NULL) { return -1; } g_ntc_ip_comm_kafka_handle = *(rd_kafka_t **)dl_return; if(g_ntc_ip_comm_kafka_handle == NULL) { return -1; } } rd_kafka_topic_conf_t*ip_comm_topic_conf = rd_kafka_topic_conf_new(); g_ntc_ip_comm_kafka_topic = rd_kafka_topic_new(g_ntc_ip_comm_kafka_handle, g_ntc_ip_comm_item.kafka_topic, ip_comm_topic_conf); return 0; } extern "C" int ntc_ip_comm_init() { memset(&g_ntc_ip_comm_item, 0, sizeof(g_ntc_ip_comm_item)); ntc_ip_comm_load_profile(); g_ntc_ip_comm_item.log_handle = MESA_create_runtime_log_handle(g_ntc_ip_comm_item.log_path, g_ntc_ip_comm_item.log_level); if(g_ntc_ip_comm_item.log_handle == NULL) return ERROR; g_ntc_ip_comm_item.dpkt_project_id = project_customer_register(g_ntc_ip_comm_item.dpkt_label, "struct"); g_ntc_ip_comm_item.tcp_flow_id = project_customer_register("tcp_flow_stat", "struct"); g_ntc_ip_comm_item.udp_flow_id = project_customer_register("udp_flow_stat", "struct"); if((g_ntc_ip_comm_item.comm_log_mode&SEND_KAFKA) == SEND_KAFKA) { if(0 != ntc_ip_comm_kaka_init(g_ntc_ip_comm_item.comm_log_mode)) return ERROR; } return OK; } extern "C" void ntc_ip_comm_destroy(void) { MESA_destroy_runtime_log_handle(g_ntc_ip_comm_item.log_handle); return; }