summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryw <[email protected]>2018-06-20 16:31:24 +0800
committeryw <[email protected]>2018-06-20 16:31:24 +0800
commit9dac75d98e3d8996518cf14ac72a042d215cfa58 (patch)
treeaa2b853b5a6798d5931a71f1ceaed5797425c534
first git version
-rw-r--r--.gitignore8
-rw-r--r--Makefile78
-rw-r--r--bin/.gitignore6
-rw-r--r--conf/main.conf16
-rw-r--r--inc/aligment_int64.h68
-rw-r--r--inc/soq_sendlog.h22
-rw-r--r--inc/soq_types.h170
-rw-r--r--inc/t1_public.h32
-rw-r--r--src/Makefile45
-rw-r--r--src/ntc_ip_comm.c382
-rw-r--r--src/ntc_ip_comm.h72
11 files changed, 899 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..267ab48
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,8 @@
+SI/
+*.I*
+*.P*
+*.S*
+*.W*
+*.[od]
+*.[1-9]*
+*.log
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..80eda9c
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,78 @@
+#define
+PLUG_TYPE_BIZ=biz
+PLUG_TYPE_PROTO=proto
+PLUG_TYPE_PLATFORM=platform
+
+
+path=/home/mesasoft/sapp_run
+
+PLUG_TYPE=biz
+plug_name=ntc_ip_comm
+plug_conf_dir=$(path)/ntcconf/
+
+plug_dir_name=$(plug_name)
+plug_inf_name=$(plug_name).inf
+
+plug_so_path=./bin/$(plug_name).so
+#plug_conf_path=./conf/$(plug_name).conf
+plug_inf=./conf/$(plug_name).inf
+
+#-----auto filled----
+
+biz_plug_dir_name=./plug/business
+biz_plug_dir=$(path)/$(biz_plug_dir_name)
+
+protocol_plug_dir_name=./plug/protocol
+protocol_plug_dir=$(path)/$(protocol_plug_dir_name)
+
+
+platform_plug_dir_name=./plug/platform
+platform_plug_dir=$(path)/$(platform_plug_dir_name)
+
+business_conflist_path=$(biz_plug_dir)/conflist_business.inf
+protocol_conflist_path=$(protocol_plug_dir)/conflist_protocol.inf
+platform_conflist_path=$(platform_plug_dir)/conflist_platform.inf
+
+ifeq ($(PLUG_TYPE), $(PLUG_TYPE_BIZ))
+dest_plug_dir_name = $(biz_plug_dir_name)
+dest_plug_dir = $(biz_plug_dir)
+dest_conflist_path=$(business_conflist_path)
+endif
+
+ifeq ($(PLUG_TYPE), $(PLUG_TYPE_PROTO))
+dest_plug_dir_name = $(protocol_plug_dir_name)
+dest_plug_dir = $(protocol_plug_dir)
+dest_conflist_path=$(protocol_conflist_path)
+endif
+
+ifeq ($(PLUG_TYPE), $(PLUG_TYPE_PLATFORM))
+dest_plug_dir_name = $(platform_plug_dir_name)
+dest_plug_dir = $(platform_plug_dir)
+dest_conflist_path=$(platform_conflist_path)
+endif
+
+plug_inf_path=$(dest_plug_dir_name)/$(plug_dir_name)/$(plug_inf_name)
+plug_dir=$(dest_plug_dir)/$(plug_dir_name)
+
+
+.PHONY: all clean opt update
+
+all:
+ cd src && $(MAKE)
+ @echo -e "current path=\033[32;49;1m$(path)\033[32;49;0m , try \033[32;49;1m[make install path=PATH]\033[32;49;0m to install plugin"
+clean:
+ cd src && $(MAKE) clean
+
+opt:
+ $(MAKE) all
+
+update:
+ @cp -f $(plug_so_path) $(plug_dir)
+ @echo -e "copy \033[32;49;1m$(plug_so_path) \033[32;49;0m to \033[32;49;1m$(plug_dir)\033[32;49;0m \033[31;49;1m[success]\033[31;49;0m"
+
+install:
+ mkdir -p $(plug_dir)
+ mkdir -p $(plug_conf_dir)
+ @cp -f $(plug_so_path) $(plug_inf) $(plug_dir)
+ @echo -e "copy \033[32;49;1m$(plug_so_path) $(plug_inf)\033[32;49;0m to \033[32;49;1m$(plug_dir)\033[32;49;0m \033[31;49;1m[success]\033[31;49;0m"
+ @ret=`cat $(dest_conflist_path)|grep $(plug_inf_path)|wc -l`;if [ $$ret -eq 0 ];then echo $(plug_inf_path) >>$(dest_conflist_path);fi
diff --git a/bin/.gitignore b/bin/.gitignore
new file mode 100644
index 0000000..fe22acb
--- /dev/null
+++ b/bin/.gitignore
@@ -0,0 +1,6 @@
+*.so
+SI/
+*.o
+*.d
+*.[1-9]*
+*.log
diff --git a/conf/main.conf b/conf/main.conf
new file mode 100644
index 0000000..47925d4
--- /dev/null
+++ b/conf/main.conf
@@ -0,0 +1,16 @@
+[SYSTEM]
+NIC_NAME=lo
+[ntc_ip_comm]
+log_level=30
+log_path=./ntclog/ntc_ip_comm/run_log
+using_kafka=1
+kafka_topic=ntc_ip_comm_log
+#kafka_handle_provide_path=
+#kafka_handle_name=g_soq_kafka_handle
+indie_kafka=1
+#kafka_brokelist=
+#service=
+#dpkt_label=
+min_bytes=5
+min_pkts=5
+
diff --git a/inc/aligment_int64.h b/inc/aligment_int64.h
new file mode 100644
index 0000000..ae2d79f
--- /dev/null
+++ b/inc/aligment_int64.h
@@ -0,0 +1,68 @@
+#ifndef H_ALIGMENT_INT64_H_INCLUDE
+#define H_ALIGMENT_INT64_H_INCLUDE
+
+#include <stdlib.h>
+
+#define CPU_CACHE_ALIGMENT 64
+typedef long long* mcore_long_t;
+
+inline mcore_long_t aligment_int64_array_alloc(int size)
+{
+ mcore_long_t ret=NULL;
+ ret=(mcore_long_t)calloc(CPU_CACHE_ALIGMENT,size);
+ return ret;
+}
+inline long long aligment_int64_array_sum(mcore_long_t array,int size)
+{
+ long long sum=0;
+ int offset=0,i=0;
+ for(i=0;i<size;i++)
+ {
+ offset=(CPU_CACHE_ALIGMENT/sizeof(long long))*i;
+ sum+=array[offset];
+ }
+ return sum;
+}
+inline void aligment_int64_array_clear(mcore_long_t array,int size)
+{
+ int offset=0,i=0;
+ for(i=0;i<size;i++)
+ {
+ offset=(CPU_CACHE_ALIGMENT/sizeof(long long))*i;
+ array[offset]=0;
+ }
+ return;
+}
+inline long long aligment_int64_array_add(mcore_long_t array,int offset,long long op_val)
+{
+ int idx=(CPU_CACHE_ALIGMENT/sizeof(long long))*offset;
+ array[idx]+=op_val;
+ return array[idx];
+}
+inline long long aligment_int64_array_sub(mcore_long_t array,int offset,long long op_val)
+{
+ int idx=(CPU_CACHE_ALIGMENT/sizeof(long long))*offset;
+ array[idx]-=op_val;
+ return array[idx];
+}
+inline long long aligment_int64_array_cnt(mcore_long_t array,int size)
+{
+ int offset=0,i=0;
+ int cnt=0;
+ for(i=0;i<size;i++)
+ {
+ offset=(CPU_CACHE_ALIGMENT/sizeof(long long))*i;
+ if(array[offset]>0)
+ {
+ cnt++;
+ }
+ }
+ return cnt;
+}
+inline void aligment_int64_array_free(mcore_long_t array)
+{
+ free(array);
+}
+
+#endif
+
diff --git a/inc/soq_sendlog.h b/inc/soq_sendlog.h
new file mode 100644
index 0000000..153a471
--- /dev/null
+++ b/inc/soq_sendlog.h
@@ -0,0 +1,22 @@
+#ifndef _INCLUDE_SOQ_SENDLOG_H_
+#define _INCLUDE_SOQ_SENDLOG_H_
+#include "soq_types.h"
+#include <MESA/Maat_rule.h>
+#include <MESA/stream.h>
+struct opt_unit_t
+{
+ soq_opt_t opt_type;
+ int opt_len;
+ const void* opt_value;
+};
+
+struct soq_log_t
+{
+ const struct streaminfo *stream;
+ const Maat_rule_t*result;
+ int result_num;
+};
+//return 0 if SUCCESS, otherwise return -1
+void soq_send_log(const soq_log_t* log_msg,struct opt_unit_t* log_opt,int opt_num, int thread_id);
+#endif
+
diff --git a/inc/soq_types.h b/inc/soq_types.h
new file mode 100644
index 0000000..a7dd907
--- /dev/null
+++ b/inc/soq_types.h
@@ -0,0 +1,170 @@
+/****************************************************
+* SOQ Project Types declaration *
+* Last modified: 20160903 *
+*****************************************************/
+#ifndef _SOQ_TYPE_H_
+#define _SOQ_TYPE_H_
+#include <MESA/Maat_rule.h>
+#ifndef __cplusplus
+#error("This file should be compiled with C++ compiler")
+#endif
+
+#define NTC_SWITCH
+
+typedef enum _soq_opt
+{
+ //Shared log options
+ LOG_OPT_SCENE_FILE=1, //IP pcap/Mail content
+ LOG_OPT_STREAM_INFO, // data is a struct stream_info *, size =8
+ LOG_OPT_MAAT_RULE, //duplicate option is allowed.
+
+ //Following are options for the respective protocol
+
+ LOG_OPT_HTTP_REQ_LINE,
+ LOG_OPT_HTTP_REQ_HDR,
+ LOG_OPT_HTTP_REQ_BODY,
+ LOG_OPT_HTTP_RES_LINE,
+ LOG_OPT_HTTP_RES_HDR,
+ LOG_OPT_HTTP_RES_BODY,
+ LOG_OPT_HTTP_URL,
+ LOG_OPT_HTTP_C2S_ISN, //size=4
+ LOG_OPT_HTTP_PROXY_FLAG, //size=4 ,0 or 1
+ LOG_OPT_HTTP_SEQ, //size=4
+
+ LOG_OPT_MAIL_PROTO,//string:"pop3","smtp" or "imap4"
+ LOG_OPT_MAIL_FROM,
+ LOG_OPT_MAIL_TO,
+ LOG_OPT_MAIL_SUBJECT,
+ LOG_OPT_MAIL_EML,
+
+ LOG_OPT_DNS_RD, //Shared with FD and JC
+ LOG_OPT_DNS_QTYPE, //Shared with FD and JC
+ LOG_OPT_DNS_QCLASS, //Shared with FD and JC
+ LOG_OPT_DNS_OPCODE, //Shared with FD and JC
+ LOG_OPT_DNS_QNAME, //Shared with FD and JC
+ LOG_OPT_DNS_CHEAT_TYPE, //Only in FD
+ LOG_OPT_DNS_CHEAT_RCODE, //Only in FD
+ LOG_OPT_DNS_CHEAT_STRATEGY, //Only in FD
+ LOG_OPT_DNS_CHEAT_RECORD, //Only in FD
+ LOG_OPT_DNS_CHEAT_TTL, //Only in FD
+ LOG_OPT_DNS_QR, //Only in JC
+ LOG_OPT_DNS_RA, //Only in JC
+ LOG_OPT_DNS_RR, //Only in JC
+ LOG_OPT_DNS_TTL, //Only in JC
+ LOG_OPT_DNS_DNS_SUB, //Only in JC, size=sizeof(int) 0-DNS,1-DNSSEC
+
+ LOG_OPT_FTP_URL,
+
+ LOG_OPT_PPTP_TUNNEL_TYPE, //size=sizeof(int),1-control,2-data
+ LOG_OPT_PPTP_ENCRYPT_MODE, //size=sizeof(int),1-MMPE 2-IPSEC 3-PAP 4-CHAP 5-MS-CHAP(v1/v2) 6-EAP-TLS
+
+ LOG_OPT_L2TP_TUNNEL_TYPE, //size=sizeof(int),1-control,2-data
+ LOG_OPT_L2TP_ENCRYPT_MODE, //size=sizeof(int),0-other,1-IPSEC 2-none
+
+ LOG_OPT_IPSEC_EX_PROTOCOL, //size=sizeof(int),1-ISAKMP(V1) 2-IKEv2 3-other
+ LOG_OPT_IPSEC_ISAKMP_MODE,
+
+ LOG_OPT_OPENVPN_VERSION, //size=sizeof(int)
+ LOG_OPT_OPENVPN_ENCRYPT_MODE, //string
+ LOG_OPT_OPENVPN_HMAC, //size=sizeof(int),1-has,0-not has
+ LOG_OPT_OPENVPN_TUNNEL_TYPE,
+
+ LOG_OPT_SSH_VERSION, //string
+ LOG_OPT_SSH_HOST_KEY, //string
+ LOG_OPT_SSH_HOST_COOKIE, //string
+ LOG_OPT_SSH_ENCRYPT_MODE, //size=sizeof(int)
+ LOG_OPT_SSH_MAC, //string
+ LOG_OPT_SSH_TUNNEL_TYPE,
+
+ LOG_OPT_SSL_VERSION, //string
+ LOG_OPT_SSL_SNI, //string
+ LOG_OPT_SSL_INDIVIDUAL_CERT_FILE,
+ LOG_OPT_SSL_MIDDLE_CERT_FILE,
+ LOG_OPT_SSL_ROOT_CERT_FILE,
+ LOG_OPT_SSL_CHAIN_CERT_FILE,
+
+ LOG_OPT_CHAP_NAME, //L2TP Username
+
+ LOG_OPT_INJECTED_PKT,
+
+ LOG_OPT_RANDOM_BLK_PROTO,
+
+ LOG_OPT_HTTP_COOKIE,
+ LOG_OPT_HTTP_REFERER,
+ LOG_OPT_HTTP_UA,
+ LOG_OPT_HTTP_SET_COOKIE,
+ LOG_OPT_HTTP_CONTENT_LEN,
+ LOG_OPT_HTTP_CONTENT_TYPE,
+ LOG_OPT_HTTP_USER_DEFINE, //key:value+ '\0' ,e.g. "Server:nginx"
+
+ LOG_OPT_APP_LABEL,
+ LOG_OPT_C2S_PKT_NUM,
+ LOG_OPT_S2C_PKT_NUM,
+ LOG_OPT_C2S_BYTE_NUM,
+ LOG_OPT_S2C_BYTE_NUM,
+ LOG_OPT_SSL_SAN,
+ LOG_OPT_SSL_CA,
+ LOG_OPT_DNS_CNAME,
+ LOG_OPT_FTP_CONTENT,
+ LOG_OPT_L2TP_CHAP_NAME,
+
+
+ LOG_OPT_PROTO_TYPE, //value:soq_protocol_t; FOR NTC
+ LOG_OPT_MAX
+}soq_opt_t;
+
+typedef enum _soq_protocol
+{
+ PROTO_IPv4,
+ PROTO_IPv6,
+ PROTO_TCP,
+ PROTO_UDP,
+ PROTO_HTTP,
+ PROTO_MAIL,
+ PROTO_DNS,
+ PROTO_FTP,
+ PROTO_IPSEC,
+ PROTO_VPN,
+ PROTO_SSL,
+ PROTO_SSH,
+ PROTO_PPTP,
+ PROTO_L2TP,
+ PROTO_OPEN_VPN,//alias of PROTO_VPN
+ PROTO_GRE,//PROTO_GRE=15 FOR NTC
+ PROTO_SOCKS,//FOR NTC
+ PROTO_XMPP,//FOR NTC
+ PROTO_SIP,//FOR NTC
+ PROTO_RTP,//FOR NTC
+ PROTO_BGP,//FOR NTC
+ PROTO_MAX
+}soq_protocol_t;
+
+
+typedef enum _soq_action
+{
+ SOQ_ACTION_BLOCK,
+ SOQ_ACTION_MONITOR,
+ SOQ_ACTION_CONTINUE,
+ SOQ_ACTION_ABORT
+}soq_action_t;
+//mid is an context used to record previous pending result;
+//set mid to 0 before FIRST calling this function, then pass it on every following calling.
+//p_block is an output parameter, which can be a substitue of fetch_block_rule.
+// point to a rule which action is SOQ_ACTION_BLOCK among hit_result.
+// point to NULL if no rule's action is SOQ_ACTION_BLOCK.
+soq_action_t decide_soq_action(const Maat_rule_t* hit_result,int cnt,const Maat_rule_t**p_block,int* mid);
+
+//return maat_rule which action is SOQ_ACTION_BLOCK in hit_result.
+//return NULL if no rule's action is SOQ_ACTION_BLOCK.
+Maat_rule_t* fetch_block_rule(Maat_rule_t* hit_result,int cnt);
+
+int scan_nesting_proto_addr(Maat_feather_t maat_feather, const struct streaminfo *a_stream, soq_protocol_t proto, scan_status_t *mid, Maat_rule_t*result,int result_num);
+
+int pcap_buff_calloc(const char *pkt, int pkt_len, char *dest, int dest_size);
+
+
+#define SOQ_PROTO_MAX PROTO_MAX
+#define SOQ_LOG_OPT_MAX LOG_OPT_MAX
+
+#endif
diff --git a/inc/t1_public.h b/inc/t1_public.h
new file mode 100644
index 0000000..5c20512
--- /dev/null
+++ b/inc/t1_public.h
@@ -0,0 +1,32 @@
+#ifndef _INCLUDE_T1_PUBLIC_H_
+#define _INCLUDE_T1_PUBLIC_H_
+
+#ifndef __cplusplus
+#error("This file should be compiled with C++ compiler")
+#endif
+
+/* version : 2016-08-26 */
+#include <MESA/Maat_rule.h>
+#include <MESA/stream.h>
+#include <MESA/DocumentAnalyze.h>
+#include "soq_types.h"
+
+
+extern Maat_feather_t g_t1_maat_feather;
+extern Maat_feather_t g_ipd_static_maat_feather;
+extern Maat_feather_t g_ipd_dyn_maat_feather;
+
+
+extern docanalyze_instance_t g_t1_doc_instance;
+
+
+//return 1 if stream in whitelist, otherwise return 0;
+int is_soq_whitelist(struct streaminfo* a_stream);
+void make_soq_blacklist(struct streaminfo* a_stream,Maat_rule_t *result);
+//ONLY called by encrypt proto,return 1 if block, otherwise return 0.
+int is_random_block(soq_protocol_t protocol,int thread_id,Maat_rule_t* result);
+//return 1 if type match, otherwise return 0;
+int is_scan_type(enum DocumentType doctype);
+int is_feedback_type(enum DocumentType doctype);
+
+#endif
diff --git a/src/Makefile b/src/Makefile
new file mode 100644
index 0000000..56949db
--- /dev/null
+++ b/src/Makefile
@@ -0,0 +1,45 @@
+#path to find lib and header files
+#vpath %.a ../lib
+#vpath %.h ../inc
+
+CCC=g++
+CC=g++
+
+
+INCLUDEPATH+=-I../inc
+
+CFLAGS= -g3 -Wall -fPIC -Werror -O0
+CFLAGS+=$(INCLUDEPATH)
+
+CPPFLAGS=$(CFLAGS)
+
+LIB+=-lMESA_handle_logger
+LIB+=-lMESA_prof_load
+
+SOURCES=$(wildcard *.c)
+OBJECTS=$(SOURCES:.c=.o)
+DEPS=$(SOURCES:.c=.d)
+
+#target name
+TARGET=ntc_ip_comm.so
+
+.PHONY:clean all
+
+all:$(TARGET)
+
+$(TARGET):$(OBJECTS) $(LIB_FILE)
+ $(CC) -shared $(CFLAGS) $(OBJECTS) $(LIB) -o $@
+#copy target to dest dir
+ @awk '/VERSION/{print $$2}' $(SOURCES) |xargs -i echo -e "make \033[32;49;1m$@({})\033[32;49;0m \033[31;49;1m[success]\033[31;49;0m"
+ @cp -f $@ ../bin
+ @echo -e "copy \033[32;49;1m$@\033[32;49;0m to ../bin\033[31;49;1m[success]\033[31;49;0m"
+
+.c.o:
+
+%.d:%.c
+ $(CC) $< -MM $(INCLUDEPATH) > $@
+
+-include $(DEPS)
+
+clean :
+ rm -f $(OBJECTS) $(DEPS) $(TARGET)
diff --git a/src/ntc_ip_comm.c b/src/ntc_ip_comm.c
new file mode 100644
index 0000000..2e4e914
--- /dev/null
+++ b/src/ntc_ip_comm.c
@@ -0,0 +1,382 @@
+/*
+ * t1_rawpkt
+ * author:yangwei
+ * time:2016-09-03
+ * last_version:1.20160903
+ *
+ */
+
+#include "ntc_ip_comm.h"
+#include "soq_sendlog.h"
+
+
+#include <assert.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <netinet/ip.h>
+#include <netinet/ip6.h>
+#include <netinet/udp.h>
+
+#include <net/if.h>
+#include <sys/types.h>
+#include <sys/ioctl.h>
+#include <dlfcn.h>
+
+
+#include "MESA_prof_load.h"
+#include "MESA_handle_logger.h"
+#include "field_stat2.h"
+#include <rdkafka.h>
+#include <cJSON.h>
+
+#define PLUGIN_NAME "NTC_IP_COMMs"
+#define PROFILE_PATH "./ntcconf/main.conf"
+#define NTC_IP_COMM_TOPIC "ntc_ip_comm_log"
+
+extern long long g_CurrentTime;
+
+ntc_ip_comm_global_item g_ntc_ip_comm_item;
+
+int NTC_IP_COMM_VERSION_20180620 = 1;
+
+rd_kafka_t *g_ntc_ip_comm_kafka_handle = NULL;
+rd_kafka_topic_t *g_ntc_ip_comm_kafka_topic = NULL;
+
+static unsigned int get_ip_by_eth_name(const char *ifname)
+{
+ int sockfd;
+ struct ifreq ifr;
+ unsigned int ip;
+
+ sockfd = socket(AF_INET, SOCK_DGRAM, 0);
+ if (-1 == sockfd) {
+ goto error;
+ }
+
+ strcpy(ifr.ifr_name,ifname);
+ if (ioctl(sockfd, SIOCGIFADDR, &ifr) < 0) {
+ goto error;
+ }
+
+ ip = ((struct sockaddr_in*)&(ifr.ifr_addr))->sin_addr.s_addr;
+ close(sockfd);
+ return ip;
+
+error:
+ close(sockfd);
+ return INADDR_NONE;
+}
+
+static int soq_addStreamInfo_to_jsonObj(cJSON *json_obj, const struct streaminfo *a_stream)
+{
+ int ret = 0;
+ const char *addr_proto = NULL;
+ const char* null_addr="0";
+ short null_port=0;
+ unsigned short tunnel_type=0;
+ char nest_addr_buf[1024];
+ int tunnel_type_size=sizeof(tunnel_type);
+ const struct layer_addr *addr=NULL;
+ char src_ip_str[128] = {0}, dst_ip_str[128] = {0};
+
+ cJSON_AddNumberToObject(json_obj, "stream_dir", a_stream->dir);
+
+ addr=&(a_stream->addr);
+ switch(addr->addrtype)
+ {
+ case ADDR_TYPE_IPV4:
+ case __ADDR_TYPE_IP_PAIR_V4:
+ inet_ntop(AF_INET, &addr->ipv4->saddr, src_ip_str, sizeof(src_ip_str));
+ inet_ntop(AF_INET, &addr->ipv4->daddr, dst_ip_str, sizeof(dst_ip_str));
+ cJSON_AddStringToObject(json_obj, "s_ip", src_ip_str);
+ cJSON_AddStringToObject(json_obj, "d_ip", dst_ip_str);
+ cJSON_AddNumberToObject(json_obj, "s_port", ntohs(addr->ipv4->source));
+ cJSON_AddNumberToObject(json_obj, "d_port", ntohs(addr->ipv4->dest));
+ break;
+ case ADDR_TYPE_IPV6:
+ case __ADDR_TYPE_IP_PAIR_V6:
+ cJSON_AddNumberToObject(json_obj, "addr_type", addr->addrtype);
+ inet_ntop(AF_INET6, addr->ipv6->saddr, src_ip_str, sizeof(src_ip_str));
+ inet_ntop(AF_INET6, addr->ipv6->daddr, dst_ip_str, sizeof(dst_ip_str));
+ cJSON_AddStringToObject(json_obj, "s_ip", src_ip_str);
+ cJSON_AddStringToObject(json_obj, "d_ip", dst_ip_str);
+ cJSON_AddNumberToObject(json_obj, "s_port", ntohs(addr->ipv6->source));
+ cJSON_AddNumberToObject(json_obj, "d_port", ntohs(addr->ipv6->dest));
+ break;
+ case ADDR_TYPE_VLAN:
+ case ADDR_TYPE_GRE:
+ case ADDR_TYPE_MPLS:
+ case ADDR_TYPE_PPPOE_SES:
+ case ADDR_TYPE_L2TP:
+ case ADDR_TYPE_PPP:
+ cJSON_AddNumberToObject(json_obj, "addr_type", addr->addrtype);
+ cJSON_AddStringToObject(json_obj, "s_ip", null_addr);
+ cJSON_AddStringToObject(json_obj, "d_ip", null_addr);
+ cJSON_AddNumberToObject(json_obj, "s_port", null_port);
+ cJSON_AddNumberToObject(json_obj, "d_port", null_port);
+ break;
+ case ADDR_TYPE_PPTP:
+ cJSON_AddNumberToObject(json_obj, "addr_type", addr->addrtype);
+ cJSON_AddStringToObject(json_obj, "s_ip", null_addr);
+ cJSON_AddStringToObject(json_obj, "d_ip", null_addr);
+ cJSON_AddNumberToObject(json_obj, "s_port", ntohs(addr->pptp->C2S_call_id));
+ cJSON_AddNumberToObject(json_obj, "d_port", ntohs(addr->pptp->S2C_call_id));
+ break;
+ default:
+ break;
+ }
+
+ addr_proto = layer_addr_prefix_ntop(a_stream);
+ cJSON_AddStringToObject(json_obj, "trans_proto", addr_proto);
+
+ ret=MESA_get_stream_opt(a_stream, MSO_STREAM_TUNNEL_TYPE, &tunnel_type, &tunnel_type_size);
+ assert(ret==0);
+ if(tunnel_type==STREAM_TUNNLE_NON)
+ {
+ layer_addr_ntop_r(a_stream,nest_addr_buf, sizeof(nest_addr_buf));
+ }
+ else
+ {
+ stream_addr_list_ntop(a_stream,nest_addr_buf, sizeof(nest_addr_buf));
+ }
+
+ cJSON_AddStringToObject(json_obj, "addr_list", nest_addr_buf);
+
+ return 0;
+}
+
+
+void ntc_ip_comm_send_kafka_log(rd_kafka_topic_t *topic, struct streaminfo *a_stream, char *dpkt_buf, int dpkt_buflen)
+{
+ cJSON *log_obj = cJSON_CreateObject();
+
+ cJSON_AddNumberToObject(log_obj, "service", g_ntc_ip_comm_item.service);
+ cJSON_AddStringToObject(log_obj, "cap_ip", g_ntc_ip_comm_item.local_ip_str);
+ cJSON_AddNumberToObject(log_obj, "entrance_id", g_ntc_ip_comm_item.entry_id);
+
+ cJSON_AddNumberToObject(log_obj, "found_time", g_CurrentTime);
+ cJSON_AddNumberToObject(log_obj, "recv_time", g_CurrentTime);
+ soq_addStreamInfo_to_jsonObj(log_obj, a_stream);
+
+ cJSON_AddNumberToObject(log_obj, "c2s_pkt_num", a_stream->ptcpdetail->clientpktnum);
+ cJSON_AddNumberToObject(log_obj, "s2c_pkt_num", a_stream->ptcpdetail->serverpktnum);
+ cJSON_AddNumberToObject(log_obj, "c2s_byte_num", a_stream->ptcpdetail->clientbytes);
+ cJSON_AddNumberToObject(log_obj, "s2c_byte_num", a_stream->ptcpdetail->serverbytes);
+ if(dpkt_buf != NULL && dpkt_buflen > 0)
+ {
+ cJSON_AddStringToObject(log_obj, "app_label", dpkt_buf);
+ }
+ char *payload = cJSON_Print(log_obj);
+ int paylen = strlen(payload);
+ rd_kafka_produce(g_ntc_ip_comm_kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, payload, paylen, NULL, 0, NULL);
+
+ free(payload);
+ cJSON_Delete(log_obj);
+ log_obj = NULL;
+ return ;
+}
+
+void ntc_ip_comm_send_ntc_log(struct streaminfo *a_stream, char *dpkt_buf, int dpkt_buflen)
+{
+ soq_log_t log_msg;
+ Maat_rule_t maat_rule;
+ struct opt_unit_t opts[5];
+ int opt_num = 4;
+
+ memset(&maat_rule, 0, sizeof(maat_rule));
+ maat_rule.action = SOQ_ACTION_MONITOR;
+ maat_rule.config_id = 0;
+ maat_rule.service_id = g_ntc_ip_comm_item.service;
+ maat_rule.do_log = 1;
+ log_msg.stream = a_stream;
+ log_msg.result = &maat_rule;
+ log_msg.result_num =1;
+
+ opts[0].opt_len = sizeof(a_stream->ptcpdetail->clientbytes);
+ opts[0].opt_type =LOG_OPT_C2S_BYTE_NUM ;
+ opts[0].opt_value = &(a_stream->ptcpdetail->clientbytes);
+
+ opts[1].opt_len = sizeof(a_stream->ptcpdetail->serverbytes);
+ opts[1].opt_type =LOG_OPT_S2C_BYTE_NUM ;
+ opts[1].opt_value = &(a_stream->ptcpdetail->serverbytes);
+
+ opts[2].opt_len = sizeof(a_stream->ptcpdetail->clientpktnum);
+ opts[2].opt_type =LOG_OPT_C2S_PKT_NUM ;
+ opts[2].opt_value = &(a_stream->ptcpdetail->clientpktnum);
+
+ opts[3].opt_len = sizeof(a_stream->ptcpdetail->serverpktnum);
+ opts[3].opt_type =LOG_OPT_S2C_PKT_NUM ;
+ opts[3].opt_value = &(a_stream->ptcpdetail->serverpktnum);
+
+ if(dpkt_buf != NULL && dpkt_buflen > 0)
+ {
+ opts[4].opt_len = dpkt_buflen;
+ opts[4].opt_type =LOG_OPT_APP_LABEL ;
+ opts[4].opt_value =dpkt_buf;
+ opt_num+=1;
+ }
+ soq_send_log(&log_msg, opts, opt_num, a_stream->threadnum);
+ return;
+}
+
+
+int ntc_ip_comm_get_dpkt_label(struct streaminfo *a_stream, const char* label_name, char *label_buf, int *label_buflen)
+{
+ dpkt_lable_t *dpkt_info = (dpkt_lable_t*)project_req_get_struct(a_stream,g_ntc_ip_comm_item.dpkt_project_id);
+ if(dpkt_info == NULL)return -1;
+
+ snprintf(label_buf, *label_buflen, "serv=%u;proto=%u;app=%u;os=%u;brow=%u;web=%u;beh=%u;",
+ dpkt_info->dpkt_service_type,
+ dpkt_info->dpkt_proto_type,
+ dpkt_info->dpkt_app_type,
+ dpkt_info->dpkt_op_type,
+ dpkt_info->dpkt_browser_type,
+ dpkt_info->dpkt_web_type,
+ dpkt_info->dpkt_behavior_type);
+ *label_buflen = strlen(label_buf);
+ return 0;
+}
+
+extern "C" UCHAR ntc_ip_comm_transfer_entry(struct streaminfo *a_stream, void **pme, int thread_seq,const void *raw_pkt)
+{
+ int state = 0;
+ if(a_stream->type == STREAM_TYPE_TCP)
+ {
+ state = a_stream->pktstate;
+ }
+ if(a_stream->type == STREAM_TYPE_UDP)
+ {
+ state = a_stream->opstate;
+ }
+ if(state == OP_STATE_CLOSE)
+ {
+ char label_buf[128] = {0};
+ int label_buflen = sizeof(label_buf);
+ ntc_ip_comm_get_dpkt_label(a_stream, g_ntc_ip_comm_item.dpkt_label, label_buf, &label_buflen);
+ if(a_stream->ptcpdetail->clientbytes+ a_stream->ptcpdetail->serverbytes >= g_ntc_ip_comm_item.min_bytes && a_stream->ptcpdetail->clientpktnum+ a_stream->ptcpdetail->serverpktnum >= g_ntc_ip_comm_item.min_pkts)
+ {
+ if(g_ntc_ip_comm_item.using_kafka == 1)
+ {
+ ntc_ip_comm_send_kafka_log(g_ntc_ip_comm_kafka_topic, a_stream, label_buf, label_buflen);
+ }
+ else
+ {
+ ntc_ip_comm_send_ntc_log(a_stream, label_buf, label_buflen);
+ }
+ }
+ return APP_STATE_DROPME;
+ }
+ return APP_STATE_GIVEME;
+}
+
+void ntc_ip_comm_load_profile()
+{
+ MESA_load_profile_string_def(PROFILE_PATH, PLUGIN_NAME, "log_path", g_ntc_ip_comm_item.log_path, sizeof(g_ntc_ip_comm_item.log_path), "./log/t1_rawpkt_log");
+ MESA_load_profile_uint_def(PROFILE_PATH,PLUGIN_NAME, "log_level", &g_ntc_ip_comm_item.log_level, 30);
+
+ char nic_name[64];
+ MESA_load_profile_string_def(PROFILE_PATH,"SYSTEM", "NIC_NAME",nic_name,sizeof(nic_name),"eth0");
+ g_ntc_ip_comm_item.local_ip_nr=get_ip_by_eth_name(nic_name);
+ if(g_ntc_ip_comm_item.local_ip_nr==INADDR_NONE)
+ {
+ printf("get_ip_by_eth_name in %s return NULL, exit!\n", nic_name);
+ exit(0);
+ }
+ inet_ntop(AF_INET,&(g_ntc_ip_comm_item.local_ip_nr),g_ntc_ip_comm_item.local_ip_str,sizeof(g_ntc_ip_comm_item.local_ip_str));
+ MESA_load_profile_int_def(PROFILE_PATH,"SYSTEM", "ENTRANCE_ID",&(g_ntc_ip_comm_item.entry_id),0);
+
+
+ MESA_load_profile_uint_def(PROFILE_PATH,PLUGIN_NAME, "min_pkts", &g_ntc_ip_comm_item.min_pkts, 5);
+ MESA_load_profile_uint_def(PROFILE_PATH,PLUGIN_NAME, "min_bytes", &g_ntc_ip_comm_item.min_bytes, 5);
+
+ MESA_load_profile_int_def(PROFILE_PATH,PLUGIN_NAME, "service", &g_ntc_ip_comm_item.service, 0);
+
+ MESA_load_profile_uint_def(PROFILE_PATH,PLUGIN_NAME, "using_kafka", &g_ntc_ip_comm_item.using_kafka, 0);
+
+ MESA_load_profile_string_def(PROFILE_PATH, PLUGIN_NAME, "dpkt_label", g_ntc_ip_comm_item.dpkt_label, sizeof(g_ntc_ip_comm_item.dpkt_label), "DPKT_PROJECT");
+ return ;
+}
+
+int ntc_ip_comm_kaka_init()
+{
+ char kafka_errstr[1024];
+ MESA_load_profile_uint_def(PROFILE_PATH,PLUGIN_NAME, "indie_kafka", &g_ntc_ip_comm_item.indie_kafka, 0);
+ MESA_load_profile_string_def(PROFILE_PATH, PLUGIN_NAME, "kafka_topic", g_ntc_ip_comm_item.kafka_topic, sizeof(g_ntc_ip_comm_item.kafka_topic), NTC_IP_COMM_TOPIC);
+ if(g_ntc_ip_comm_item.indie_kafka == 1)
+ {
+ if(0 > MESA_load_profile_string_nodef(PROFILE_PATH, PLUGIN_NAME, "kafka_brokelist", g_ntc_ip_comm_item.kafka_brokelist, sizeof(g_ntc_ip_comm_item.kafka_brokelist)))
+ {
+ return -1;
+ }
+ rd_kafka_conf_t *rdkafka_conf = rd_kafka_conf_new();
+ rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000",kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "security.protocol", "MG", kafka_errstr, sizeof(kafka_errstr));
+
+ if (!(g_ntc_ip_comm_kafka_handle = rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr))))
+ {
+ return -1;
+ }
+
+ if (rd_kafka_brokers_add(g_ntc_ip_comm_kafka_handle, g_ntc_ip_comm_item.kafka_brokelist) == 0)
+ {
+ return -1;
+ }
+ }
+ else
+ {
+ if(0 > MESA_load_profile_string_nodef(PROFILE_PATH, PLUGIN_NAME, "kafka_handle_provide_path", g_ntc_ip_comm_item.kafka_handle_provide_path, sizeof(g_ntc_ip_comm_item.kafka_handle_provide_path)))
+ {
+ return -1;
+ }
+ void * dl_handle = dlopen(g_ntc_ip_comm_item.kafka_handle_provide_path, RTLD_GLOBAL);
+ if(dl_handle == NULL)
+ {
+ return -1;
+ }
+ if(0 > MESA_load_profile_string_nodef(PROFILE_PATH, PLUGIN_NAME, "kafka_handle_name", g_ntc_ip_comm_item.kafka_handle_name, sizeof(g_ntc_ip_comm_item.kafka_handle_name)))
+ {
+ return -1;
+ }
+ g_ntc_ip_comm_kafka_handle = (rd_kafka_t *)dlsym(dl_handle, g_ntc_ip_comm_item.kafka_handle_name);
+ if(g_ntc_ip_comm_kafka_handle == NULL)
+ {
+ return -1;
+ }
+
+ }
+ rd_kafka_topic_conf_t*ip_comm_topic_conf = rd_kafka_topic_conf_new();
+ g_ntc_ip_comm_kafka_topic = rd_kafka_topic_new(g_ntc_ip_comm_kafka_handle, g_ntc_ip_comm_item.kafka_topic, ip_comm_topic_conf);
+ return 0;
+}
+
+extern "C" int ntc_ip_comm_init()
+{
+ memset(&g_ntc_ip_comm_item, 0, sizeof(g_ntc_ip_comm_item));
+ ntc_ip_comm_load_profile();
+ g_ntc_ip_comm_item.log_handle = MESA_create_runtime_log_handle(g_ntc_ip_comm_item.log_path, g_ntc_ip_comm_item.log_level);
+ if(g_ntc_ip_comm_item.log_handle == NULL)
+ return ERROR;
+
+
+ g_ntc_ip_comm_item.dpkt_project_id = project_customer_register(g_ntc_ip_comm_item.dpkt_label, "struct");
+ if(g_ntc_ip_comm_item.using_kafka == 1)
+ {
+ if(0 != ntc_ip_comm_kaka_init())
+ return ERROR;
+ }
+ return OK;
+}
+
+
+
+extern "C" void ntc_ip_comm_destroy(void)
+{
+ MESA_destroy_runtime_log_handle(g_ntc_ip_comm_item.log_handle);
+ return;
+}
+
diff --git a/src/ntc_ip_comm.h b/src/ntc_ip_comm.h
new file mode 100644
index 0000000..787abd7
--- /dev/null
+++ b/src/ntc_ip_comm.h
@@ -0,0 +1,72 @@
+#ifndef NTC_IP_COMM_H
+#define NTC_IP_COMM_H
+
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+#include "stream.h"
+
+ typedef struct ntc_ip_comm_global_item
+ {
+ void * log_handle;
+ char log_path[1024];
+ unsigned int log_level;
+ unsigned int using_kafka;
+ unsigned int indie_kafka;
+ char kafka_brokelist[1024];
+ char kafka_handle_provide_path[1024];
+ char kafka_handle_name[1024];
+ char kafka_topic[1024];
+
+ unsigned int local_ip_nr;
+ char local_ip_str[128];
+ char dpkt_label[1024];
+ int dpkt_project_id;
+ int entry_id;
+ int service;
+ unsigned int min_bytes;
+ unsigned int min_pkts;
+
+ }ntc_ip_comm_global_item;
+
+
+ typedef enum
+ {
+ OK = 0,
+ ERROR = -1,
+ UNDEFINE = -2
+ } STATUS;
+
+ typedef enum
+ {
+ TRUE = 1,
+ FALSE = 0
+ } BOOL;
+
+
+ typedef struct _dpkt_lable_t
+ {
+ unsigned char trans_proto;
+ unsigned char v6;
+ unsigned short _pad0;
+ unsigned int dpkt_service_type;
+ unsigned int dpkt_proto_type;
+ unsigned int dpkt_app_type;
+ unsigned int dpkt_op_type;
+ unsigned int dpkt_browser_type;
+ unsigned int dpkt_web_type;
+ unsigned int dpkt_behavior_type;
+ }dpkt_lable_t;
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
+
+
+