diff options
| author | yw <[email protected]> | 2018-06-20 16:31:24 +0800 |
|---|---|---|
| committer | yw <[email protected]> | 2018-06-20 16:31:24 +0800 |
| commit | 9dac75d98e3d8996518cf14ac72a042d215cfa58 (patch) | |
| tree | aa2b853b5a6798d5931a71f1ceaed5797425c534 | |
first git version
| -rw-r--r-- | .gitignore | 8 | ||||
| -rw-r--r-- | Makefile | 78 | ||||
| -rw-r--r-- | bin/.gitignore | 6 | ||||
| -rw-r--r-- | conf/main.conf | 16 | ||||
| -rw-r--r-- | inc/aligment_int64.h | 68 | ||||
| -rw-r--r-- | inc/soq_sendlog.h | 22 | ||||
| -rw-r--r-- | inc/soq_types.h | 170 | ||||
| -rw-r--r-- | inc/t1_public.h | 32 | ||||
| -rw-r--r-- | src/Makefile | 45 | ||||
| -rw-r--r-- | src/ntc_ip_comm.c | 382 | ||||
| -rw-r--r-- | src/ntc_ip_comm.h | 72 |
11 files changed, 899 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..267ab48 --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +SI/ +*.I* +*.P* +*.S* +*.W* +*.[od] +*.[1-9]* +*.log diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..80eda9c --- /dev/null +++ b/Makefile @@ -0,0 +1,78 @@ +#define +PLUG_TYPE_BIZ=biz +PLUG_TYPE_PROTO=proto +PLUG_TYPE_PLATFORM=platform + + +path=/home/mesasoft/sapp_run + +PLUG_TYPE=biz +plug_name=ntc_ip_comm +plug_conf_dir=$(path)/ntcconf/ + +plug_dir_name=$(plug_name) +plug_inf_name=$(plug_name).inf + +plug_so_path=./bin/$(plug_name).so +#plug_conf_path=./conf/$(plug_name).conf +plug_inf=./conf/$(plug_name).inf + +#-----auto filled---- + +biz_plug_dir_name=./plug/business +biz_plug_dir=$(path)/$(biz_plug_dir_name) + +protocol_plug_dir_name=./plug/protocol +protocol_plug_dir=$(path)/$(protocol_plug_dir_name) + + +platform_plug_dir_name=./plug/platform +platform_plug_dir=$(path)/$(platform_plug_dir_name) + +business_conflist_path=$(biz_plug_dir)/conflist_business.inf +protocol_conflist_path=$(protocol_plug_dir)/conflist_protocol.inf +platform_conflist_path=$(platform_plug_dir)/conflist_platform.inf + +ifeq ($(PLUG_TYPE), $(PLUG_TYPE_BIZ)) +dest_plug_dir_name = $(biz_plug_dir_name) +dest_plug_dir = $(biz_plug_dir) +dest_conflist_path=$(business_conflist_path) +endif + +ifeq ($(PLUG_TYPE), $(PLUG_TYPE_PROTO)) +dest_plug_dir_name = $(protocol_plug_dir_name) +dest_plug_dir = $(protocol_plug_dir) +dest_conflist_path=$(protocol_conflist_path) +endif + +ifeq ($(PLUG_TYPE), $(PLUG_TYPE_PLATFORM)) +dest_plug_dir_name = $(platform_plug_dir_name) +dest_plug_dir = $(platform_plug_dir) +dest_conflist_path=$(platform_conflist_path) +endif + +plug_inf_path=$(dest_plug_dir_name)/$(plug_dir_name)/$(plug_inf_name) +plug_dir=$(dest_plug_dir)/$(plug_dir_name) + + +.PHONY: all clean opt update + +all: + cd src && $(MAKE) + @echo -e "current path=\033[32;49;1m$(path)\033[32;49;0m , try \033[32;49;1m[make install path=PATH]\033[32;49;0m to install plugin" +clean: + cd src && $(MAKE) clean + +opt: + $(MAKE) all + +update: + @cp -f $(plug_so_path) $(plug_dir) + @echo -e "copy \033[32;49;1m$(plug_so_path) \033[32;49;0m to \033[32;49;1m$(plug_dir)\033[32;49;0m \033[31;49;1m[success]\033[31;49;0m" + +install: + mkdir -p $(plug_dir) + mkdir -p $(plug_conf_dir) + @cp -f $(plug_so_path) $(plug_inf) $(plug_dir) + @echo -e "copy \033[32;49;1m$(plug_so_path) $(plug_inf)\033[32;49;0m to \033[32;49;1m$(plug_dir)\033[32;49;0m \033[31;49;1m[success]\033[31;49;0m" + @ret=`cat $(dest_conflist_path)|grep $(plug_inf_path)|wc -l`;if [ $$ret -eq 0 ];then echo $(plug_inf_path) >>$(dest_conflist_path);fi diff --git a/bin/.gitignore b/bin/.gitignore new file mode 100644 index 0000000..fe22acb --- /dev/null +++ b/bin/.gitignore @@ -0,0 +1,6 @@ +*.so +SI/ +*.o +*.d +*.[1-9]* +*.log diff --git a/conf/main.conf b/conf/main.conf new file mode 100644 index 0000000..47925d4 --- /dev/null +++ b/conf/main.conf @@ -0,0 +1,16 @@ +[SYSTEM] +NIC_NAME=lo +[ntc_ip_comm] +log_level=30 +log_path=./ntclog/ntc_ip_comm/run_log +using_kafka=1 +kafka_topic=ntc_ip_comm_log +#kafka_handle_provide_path= +#kafka_handle_name=g_soq_kafka_handle +indie_kafka=1 +#kafka_brokelist= +#service= +#dpkt_label= +min_bytes=5 +min_pkts=5 + diff --git a/inc/aligment_int64.h b/inc/aligment_int64.h new file mode 100644 index 0000000..ae2d79f --- /dev/null +++ b/inc/aligment_int64.h @@ -0,0 +1,68 @@ +#ifndef H_ALIGMENT_INT64_H_INCLUDE +#define H_ALIGMENT_INT64_H_INCLUDE + +#include <stdlib.h> + +#define CPU_CACHE_ALIGMENT 64 +typedef long long* mcore_long_t; + +inline mcore_long_t aligment_int64_array_alloc(int size) +{ + mcore_long_t ret=NULL; + ret=(mcore_long_t)calloc(CPU_CACHE_ALIGMENT,size); + return ret; +} +inline long long aligment_int64_array_sum(mcore_long_t array,int size) +{ + long long sum=0; + int offset=0,i=0; + for(i=0;i<size;i++) + { + offset=(CPU_CACHE_ALIGMENT/sizeof(long long))*i; + sum+=array[offset]; + } + return sum; +} +inline void aligment_int64_array_clear(mcore_long_t array,int size) +{ + int offset=0,i=0; + for(i=0;i<size;i++) + { + offset=(CPU_CACHE_ALIGMENT/sizeof(long long))*i; + array[offset]=0; + } + return; +} +inline long long aligment_int64_array_add(mcore_long_t array,int offset,long long op_val) +{ + int idx=(CPU_CACHE_ALIGMENT/sizeof(long long))*offset; + array[idx]+=op_val; + return array[idx]; +} +inline long long aligment_int64_array_sub(mcore_long_t array,int offset,long long op_val) +{ + int idx=(CPU_CACHE_ALIGMENT/sizeof(long long))*offset; + array[idx]-=op_val; + return array[idx]; +} +inline long long aligment_int64_array_cnt(mcore_long_t array,int size) +{ + int offset=0,i=0; + int cnt=0; + for(i=0;i<size;i++) + { + offset=(CPU_CACHE_ALIGMENT/sizeof(long long))*i; + if(array[offset]>0) + { + cnt++; + } + } + return cnt; +} +inline void aligment_int64_array_free(mcore_long_t array) +{ + free(array); +} + +#endif + diff --git a/inc/soq_sendlog.h b/inc/soq_sendlog.h new file mode 100644 index 0000000..153a471 --- /dev/null +++ b/inc/soq_sendlog.h @@ -0,0 +1,22 @@ +#ifndef _INCLUDE_SOQ_SENDLOG_H_ +#define _INCLUDE_SOQ_SENDLOG_H_ +#include "soq_types.h" +#include <MESA/Maat_rule.h> +#include <MESA/stream.h> +struct opt_unit_t +{ + soq_opt_t opt_type; + int opt_len; + const void* opt_value; +}; + +struct soq_log_t +{ + const struct streaminfo *stream; + const Maat_rule_t*result; + int result_num; +}; +//return 0 if SUCCESS, otherwise return -1 +void soq_send_log(const soq_log_t* log_msg,struct opt_unit_t* log_opt,int opt_num, int thread_id); +#endif + diff --git a/inc/soq_types.h b/inc/soq_types.h new file mode 100644 index 0000000..a7dd907 --- /dev/null +++ b/inc/soq_types.h @@ -0,0 +1,170 @@ +/**************************************************** +* SOQ Project Types declaration * +* Author [email protected],[email protected] * +* Last modified: 20160903 * +*****************************************************/ +#ifndef _SOQ_TYPE_H_ +#define _SOQ_TYPE_H_ +#include <MESA/Maat_rule.h> +#ifndef __cplusplus +#error("This file should be compiled with C++ compiler") +#endif + +#define NTC_SWITCH + +typedef enum _soq_opt +{ + //Shared log options + LOG_OPT_SCENE_FILE=1, //IP pcap/Mail content + LOG_OPT_STREAM_INFO, // data is a struct stream_info *, size =8 + LOG_OPT_MAAT_RULE, //duplicate option is allowed. + + //Following are options for the respective protocol + + LOG_OPT_HTTP_REQ_LINE, + LOG_OPT_HTTP_REQ_HDR, + LOG_OPT_HTTP_REQ_BODY, + LOG_OPT_HTTP_RES_LINE, + LOG_OPT_HTTP_RES_HDR, + LOG_OPT_HTTP_RES_BODY, + LOG_OPT_HTTP_URL, + LOG_OPT_HTTP_C2S_ISN, //size=4 + LOG_OPT_HTTP_PROXY_FLAG, //size=4 ,0 or 1 + LOG_OPT_HTTP_SEQ, //size=4 + + LOG_OPT_MAIL_PROTO,//string:"pop3","smtp" or "imap4" + LOG_OPT_MAIL_FROM, + LOG_OPT_MAIL_TO, + LOG_OPT_MAIL_SUBJECT, + LOG_OPT_MAIL_EML, + + LOG_OPT_DNS_RD, //Shared with FD and JC + LOG_OPT_DNS_QTYPE, //Shared with FD and JC + LOG_OPT_DNS_QCLASS, //Shared with FD and JC + LOG_OPT_DNS_OPCODE, //Shared with FD and JC + LOG_OPT_DNS_QNAME, //Shared with FD and JC + LOG_OPT_DNS_CHEAT_TYPE, //Only in FD + LOG_OPT_DNS_CHEAT_RCODE, //Only in FD + LOG_OPT_DNS_CHEAT_STRATEGY, //Only in FD + LOG_OPT_DNS_CHEAT_RECORD, //Only in FD + LOG_OPT_DNS_CHEAT_TTL, //Only in FD + LOG_OPT_DNS_QR, //Only in JC + LOG_OPT_DNS_RA, //Only in JC + LOG_OPT_DNS_RR, //Only in JC + LOG_OPT_DNS_TTL, //Only in JC + LOG_OPT_DNS_DNS_SUB, //Only in JC, size=sizeof(int) 0-DNS,1-DNSSEC + + LOG_OPT_FTP_URL, + + LOG_OPT_PPTP_TUNNEL_TYPE, //size=sizeof(int),1-control,2-data + LOG_OPT_PPTP_ENCRYPT_MODE, //size=sizeof(int),1-MMPE 2-IPSEC 3-PAP 4-CHAP 5-MS-CHAP(v1/v2) 6-EAP-TLS + + LOG_OPT_L2TP_TUNNEL_TYPE, //size=sizeof(int),1-control,2-data + LOG_OPT_L2TP_ENCRYPT_MODE, //size=sizeof(int),0-other,1-IPSEC 2-none + + LOG_OPT_IPSEC_EX_PROTOCOL, //size=sizeof(int),1-ISAKMP(V1) 2-IKEv2 3-other + LOG_OPT_IPSEC_ISAKMP_MODE, + + LOG_OPT_OPENVPN_VERSION, //size=sizeof(int) + LOG_OPT_OPENVPN_ENCRYPT_MODE, //string + LOG_OPT_OPENVPN_HMAC, //size=sizeof(int),1-has,0-not has + LOG_OPT_OPENVPN_TUNNEL_TYPE, + + LOG_OPT_SSH_VERSION, //string + LOG_OPT_SSH_HOST_KEY, //string + LOG_OPT_SSH_HOST_COOKIE, //string + LOG_OPT_SSH_ENCRYPT_MODE, //size=sizeof(int) + LOG_OPT_SSH_MAC, //string + LOG_OPT_SSH_TUNNEL_TYPE, + + LOG_OPT_SSL_VERSION, //string + LOG_OPT_SSL_SNI, //string + LOG_OPT_SSL_INDIVIDUAL_CERT_FILE, + LOG_OPT_SSL_MIDDLE_CERT_FILE, + LOG_OPT_SSL_ROOT_CERT_FILE, + LOG_OPT_SSL_CHAIN_CERT_FILE, + + LOG_OPT_CHAP_NAME, //L2TP Username + + LOG_OPT_INJECTED_PKT, + + LOG_OPT_RANDOM_BLK_PROTO, + + LOG_OPT_HTTP_COOKIE, + LOG_OPT_HTTP_REFERER, + LOG_OPT_HTTP_UA, + LOG_OPT_HTTP_SET_COOKIE, + LOG_OPT_HTTP_CONTENT_LEN, + LOG_OPT_HTTP_CONTENT_TYPE, + LOG_OPT_HTTP_USER_DEFINE, //key:value+ '\0' ,e.g. "Server:nginx" + + LOG_OPT_APP_LABEL, + LOG_OPT_C2S_PKT_NUM, + LOG_OPT_S2C_PKT_NUM, + LOG_OPT_C2S_BYTE_NUM, + LOG_OPT_S2C_BYTE_NUM, + LOG_OPT_SSL_SAN, + LOG_OPT_SSL_CA, + LOG_OPT_DNS_CNAME, + LOG_OPT_FTP_CONTENT, + LOG_OPT_L2TP_CHAP_NAME, + + + LOG_OPT_PROTO_TYPE, //value:soq_protocol_t; FOR NTC + LOG_OPT_MAX +}soq_opt_t; + +typedef enum _soq_protocol +{ + PROTO_IPv4, + PROTO_IPv6, + PROTO_TCP, + PROTO_UDP, + PROTO_HTTP, + PROTO_MAIL, + PROTO_DNS, + PROTO_FTP, + PROTO_IPSEC, + PROTO_VPN, + PROTO_SSL, + PROTO_SSH, + PROTO_PPTP, + PROTO_L2TP, + PROTO_OPEN_VPN,//alias of PROTO_VPN + PROTO_GRE,//PROTO_GRE=15 FOR NTC + PROTO_SOCKS,//FOR NTC + PROTO_XMPP,//FOR NTC + PROTO_SIP,//FOR NTC + PROTO_RTP,//FOR NTC + PROTO_BGP,//FOR NTC + PROTO_MAX +}soq_protocol_t; + + +typedef enum _soq_action +{ + SOQ_ACTION_BLOCK, + SOQ_ACTION_MONITOR, + SOQ_ACTION_CONTINUE, + SOQ_ACTION_ABORT +}soq_action_t; +//mid is an context used to record previous pending result; +//set mid to 0 before FIRST calling this function, then pass it on every following calling. +//p_block is an output parameter, which can be a substitue of fetch_block_rule. +// point to a rule which action is SOQ_ACTION_BLOCK among hit_result. +// point to NULL if no rule's action is SOQ_ACTION_BLOCK. +soq_action_t decide_soq_action(const Maat_rule_t* hit_result,int cnt,const Maat_rule_t**p_block,int* mid); + +//return maat_rule which action is SOQ_ACTION_BLOCK in hit_result. +//return NULL if no rule's action is SOQ_ACTION_BLOCK. +Maat_rule_t* fetch_block_rule(Maat_rule_t* hit_result,int cnt); + +int scan_nesting_proto_addr(Maat_feather_t maat_feather, const struct streaminfo *a_stream, soq_protocol_t proto, scan_status_t *mid, Maat_rule_t*result,int result_num); + +int pcap_buff_calloc(const char *pkt, int pkt_len, char *dest, int dest_size); + + +#define SOQ_PROTO_MAX PROTO_MAX +#define SOQ_LOG_OPT_MAX LOG_OPT_MAX + +#endif diff --git a/inc/t1_public.h b/inc/t1_public.h new file mode 100644 index 0000000..5c20512 --- /dev/null +++ b/inc/t1_public.h @@ -0,0 +1,32 @@ +#ifndef _INCLUDE_T1_PUBLIC_H_ +#define _INCLUDE_T1_PUBLIC_H_ + +#ifndef __cplusplus +#error("This file should be compiled with C++ compiler") +#endif + +/* version : 2016-08-26 */ +#include <MESA/Maat_rule.h> +#include <MESA/stream.h> +#include <MESA/DocumentAnalyze.h> +#include "soq_types.h" + + +extern Maat_feather_t g_t1_maat_feather; +extern Maat_feather_t g_ipd_static_maat_feather; +extern Maat_feather_t g_ipd_dyn_maat_feather; + + +extern docanalyze_instance_t g_t1_doc_instance; + + +//return 1 if stream in whitelist, otherwise return 0; +int is_soq_whitelist(struct streaminfo* a_stream); +void make_soq_blacklist(struct streaminfo* a_stream,Maat_rule_t *result); +//ONLY called by encrypt proto,return 1 if block, otherwise return 0. +int is_random_block(soq_protocol_t protocol,int thread_id,Maat_rule_t* result); +//return 1 if type match, otherwise return 0; +int is_scan_type(enum DocumentType doctype); +int is_feedback_type(enum DocumentType doctype); + +#endif diff --git a/src/Makefile b/src/Makefile new file mode 100644 index 0000000..56949db --- /dev/null +++ b/src/Makefile @@ -0,0 +1,45 @@ +#path to find lib and header files +#vpath %.a ../lib +#vpath %.h ../inc + +CCC=g++ +CC=g++ + + +INCLUDEPATH+=-I../inc + +CFLAGS= -g3 -Wall -fPIC -Werror -O0 +CFLAGS+=$(INCLUDEPATH) + +CPPFLAGS=$(CFLAGS) + +LIB+=-lMESA_handle_logger +LIB+=-lMESA_prof_load + +SOURCES=$(wildcard *.c) +OBJECTS=$(SOURCES:.c=.o) +DEPS=$(SOURCES:.c=.d) + +#target name +TARGET=ntc_ip_comm.so + +.PHONY:clean all + +all:$(TARGET) + +$(TARGET):$(OBJECTS) $(LIB_FILE) + $(CC) -shared $(CFLAGS) $(OBJECTS) $(LIB) -o $@ +#copy target to dest dir + @awk '/VERSION/{print $$2}' $(SOURCES) |xargs -i echo -e "make \033[32;49;1m$@({})\033[32;49;0m \033[31;49;1m[success]\033[31;49;0m" + @cp -f $@ ../bin + @echo -e "copy \033[32;49;1m$@\033[32;49;0m to ../bin\033[31;49;1m[success]\033[31;49;0m" + +.c.o: + +%.d:%.c + $(CC) $< -MM $(INCLUDEPATH) > $@ + +-include $(DEPS) + +clean : + rm -f $(OBJECTS) $(DEPS) $(TARGET) diff --git a/src/ntc_ip_comm.c b/src/ntc_ip_comm.c new file mode 100644 index 0000000..2e4e914 --- /dev/null +++ b/src/ntc_ip_comm.c @@ -0,0 +1,382 @@ +/*
+ * t1_rawpkt
+ * author:yangwei
+ * time:2016-09-03
+ * last_version:1.20160903
+ *
+ */
+
+#include "ntc_ip_comm.h"
+#include "soq_sendlog.h"
+
+
+#include <assert.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <netinet/ip.h>
+#include <netinet/ip6.h>
+#include <netinet/udp.h>
+
+#include <net/if.h>
+#include <sys/types.h>
+#include <sys/ioctl.h>
+#include <dlfcn.h>
+
+
+#include "MESA_prof_load.h"
+#include "MESA_handle_logger.h"
+#include "field_stat2.h"
+#include <rdkafka.h>
+#include <cJSON.h>
+
+#define PLUGIN_NAME "NTC_IP_COMMs"
+#define PROFILE_PATH "./ntcconf/main.conf"
+#define NTC_IP_COMM_TOPIC "ntc_ip_comm_log"
+
+extern long long g_CurrentTime;
+
+ntc_ip_comm_global_item g_ntc_ip_comm_item;
+
+int NTC_IP_COMM_VERSION_20180620 = 1;
+
+rd_kafka_t *g_ntc_ip_comm_kafka_handle = NULL;
+rd_kafka_topic_t *g_ntc_ip_comm_kafka_topic = NULL;
+
+static unsigned int get_ip_by_eth_name(const char *ifname)
+{
+ int sockfd;
+ struct ifreq ifr;
+ unsigned int ip;
+
+ sockfd = socket(AF_INET, SOCK_DGRAM, 0);
+ if (-1 == sockfd) {
+ goto error;
+ }
+
+ strcpy(ifr.ifr_name,ifname);
+ if (ioctl(sockfd, SIOCGIFADDR, &ifr) < 0) {
+ goto error;
+ }
+
+ ip = ((struct sockaddr_in*)&(ifr.ifr_addr))->sin_addr.s_addr;
+ close(sockfd);
+ return ip;
+
+error:
+ close(sockfd);
+ return INADDR_NONE;
+}
+
+static int soq_addStreamInfo_to_jsonObj(cJSON *json_obj, const struct streaminfo *a_stream)
+{
+ int ret = 0;
+ const char *addr_proto = NULL;
+ const char* null_addr="0";
+ short null_port=0;
+ unsigned short tunnel_type=0;
+ char nest_addr_buf[1024];
+ int tunnel_type_size=sizeof(tunnel_type);
+ const struct layer_addr *addr=NULL;
+ char src_ip_str[128] = {0}, dst_ip_str[128] = {0};
+
+ cJSON_AddNumberToObject(json_obj, "stream_dir", a_stream->dir);
+
+ addr=&(a_stream->addr);
+ switch(addr->addrtype)
+ {
+ case ADDR_TYPE_IPV4:
+ case __ADDR_TYPE_IP_PAIR_V4:
+ inet_ntop(AF_INET, &addr->ipv4->saddr, src_ip_str, sizeof(src_ip_str));
+ inet_ntop(AF_INET, &addr->ipv4->daddr, dst_ip_str, sizeof(dst_ip_str));
+ cJSON_AddStringToObject(json_obj, "s_ip", src_ip_str);
+ cJSON_AddStringToObject(json_obj, "d_ip", dst_ip_str);
+ cJSON_AddNumberToObject(json_obj, "s_port", ntohs(addr->ipv4->source));
+ cJSON_AddNumberToObject(json_obj, "d_port", ntohs(addr->ipv4->dest));
+ break;
+ case ADDR_TYPE_IPV6:
+ case __ADDR_TYPE_IP_PAIR_V6:
+ cJSON_AddNumberToObject(json_obj, "addr_type", addr->addrtype);
+ inet_ntop(AF_INET6, addr->ipv6->saddr, src_ip_str, sizeof(src_ip_str));
+ inet_ntop(AF_INET6, addr->ipv6->daddr, dst_ip_str, sizeof(dst_ip_str));
+ cJSON_AddStringToObject(json_obj, "s_ip", src_ip_str);
+ cJSON_AddStringToObject(json_obj, "d_ip", dst_ip_str);
+ cJSON_AddNumberToObject(json_obj, "s_port", ntohs(addr->ipv6->source));
+ cJSON_AddNumberToObject(json_obj, "d_port", ntohs(addr->ipv6->dest));
+ break;
+ case ADDR_TYPE_VLAN:
+ case ADDR_TYPE_GRE:
+ case ADDR_TYPE_MPLS:
+ case ADDR_TYPE_PPPOE_SES:
+ case ADDR_TYPE_L2TP:
+ case ADDR_TYPE_PPP:
+ cJSON_AddNumberToObject(json_obj, "addr_type", addr->addrtype);
+ cJSON_AddStringToObject(json_obj, "s_ip", null_addr);
+ cJSON_AddStringToObject(json_obj, "d_ip", null_addr);
+ cJSON_AddNumberToObject(json_obj, "s_port", null_port);
+ cJSON_AddNumberToObject(json_obj, "d_port", null_port);
+ break;
+ case ADDR_TYPE_PPTP:
+ cJSON_AddNumberToObject(json_obj, "addr_type", addr->addrtype);
+ cJSON_AddStringToObject(json_obj, "s_ip", null_addr);
+ cJSON_AddStringToObject(json_obj, "d_ip", null_addr);
+ cJSON_AddNumberToObject(json_obj, "s_port", ntohs(addr->pptp->C2S_call_id));
+ cJSON_AddNumberToObject(json_obj, "d_port", ntohs(addr->pptp->S2C_call_id));
+ break;
+ default:
+ break;
+ }
+
+ addr_proto = layer_addr_prefix_ntop(a_stream);
+ cJSON_AddStringToObject(json_obj, "trans_proto", addr_proto);
+
+ ret=MESA_get_stream_opt(a_stream, MSO_STREAM_TUNNEL_TYPE, &tunnel_type, &tunnel_type_size);
+ assert(ret==0);
+ if(tunnel_type==STREAM_TUNNLE_NON)
+ {
+ layer_addr_ntop_r(a_stream,nest_addr_buf, sizeof(nest_addr_buf));
+ }
+ else
+ {
+ stream_addr_list_ntop(a_stream,nest_addr_buf, sizeof(nest_addr_buf));
+ }
+
+ cJSON_AddStringToObject(json_obj, "addr_list", nest_addr_buf);
+
+ return 0;
+}
+
+
+void ntc_ip_comm_send_kafka_log(rd_kafka_topic_t *topic, struct streaminfo *a_stream, char *dpkt_buf, int dpkt_buflen)
+{
+ cJSON *log_obj = cJSON_CreateObject();
+
+ cJSON_AddNumberToObject(log_obj, "service", g_ntc_ip_comm_item.service);
+ cJSON_AddStringToObject(log_obj, "cap_ip", g_ntc_ip_comm_item.local_ip_str);
+ cJSON_AddNumberToObject(log_obj, "entrance_id", g_ntc_ip_comm_item.entry_id);
+
+ cJSON_AddNumberToObject(log_obj, "found_time", g_CurrentTime);
+ cJSON_AddNumberToObject(log_obj, "recv_time", g_CurrentTime);
+ soq_addStreamInfo_to_jsonObj(log_obj, a_stream);
+
+ cJSON_AddNumberToObject(log_obj, "c2s_pkt_num", a_stream->ptcpdetail->clientpktnum);
+ cJSON_AddNumberToObject(log_obj, "s2c_pkt_num", a_stream->ptcpdetail->serverpktnum);
+ cJSON_AddNumberToObject(log_obj, "c2s_byte_num", a_stream->ptcpdetail->clientbytes);
+ cJSON_AddNumberToObject(log_obj, "s2c_byte_num", a_stream->ptcpdetail->serverbytes);
+ if(dpkt_buf != NULL && dpkt_buflen > 0)
+ {
+ cJSON_AddStringToObject(log_obj, "app_label", dpkt_buf);
+ }
+ char *payload = cJSON_Print(log_obj);
+ int paylen = strlen(payload);
+ rd_kafka_produce(g_ntc_ip_comm_kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, payload, paylen, NULL, 0, NULL);
+
+ free(payload);
+ cJSON_Delete(log_obj);
+ log_obj = NULL;
+ return ;
+}
+
+void ntc_ip_comm_send_ntc_log(struct streaminfo *a_stream, char *dpkt_buf, int dpkt_buflen)
+{
+ soq_log_t log_msg;
+ Maat_rule_t maat_rule;
+ struct opt_unit_t opts[5];
+ int opt_num = 4;
+
+ memset(&maat_rule, 0, sizeof(maat_rule));
+ maat_rule.action = SOQ_ACTION_MONITOR;
+ maat_rule.config_id = 0;
+ maat_rule.service_id = g_ntc_ip_comm_item.service;
+ maat_rule.do_log = 1;
+ log_msg.stream = a_stream;
+ log_msg.result = &maat_rule;
+ log_msg.result_num =1;
+
+ opts[0].opt_len = sizeof(a_stream->ptcpdetail->clientbytes);
+ opts[0].opt_type =LOG_OPT_C2S_BYTE_NUM ;
+ opts[0].opt_value = &(a_stream->ptcpdetail->clientbytes);
+
+ opts[1].opt_len = sizeof(a_stream->ptcpdetail->serverbytes);
+ opts[1].opt_type =LOG_OPT_S2C_BYTE_NUM ;
+ opts[1].opt_value = &(a_stream->ptcpdetail->serverbytes);
+
+ opts[2].opt_len = sizeof(a_stream->ptcpdetail->clientpktnum);
+ opts[2].opt_type =LOG_OPT_C2S_PKT_NUM ;
+ opts[2].opt_value = &(a_stream->ptcpdetail->clientpktnum);
+
+ opts[3].opt_len = sizeof(a_stream->ptcpdetail->serverpktnum);
+ opts[3].opt_type =LOG_OPT_S2C_PKT_NUM ;
+ opts[3].opt_value = &(a_stream->ptcpdetail->serverpktnum);
+
+ if(dpkt_buf != NULL && dpkt_buflen > 0)
+ {
+ opts[4].opt_len = dpkt_buflen;
+ opts[4].opt_type =LOG_OPT_APP_LABEL ;
+ opts[4].opt_value =dpkt_buf;
+ opt_num+=1;
+ }
+ soq_send_log(&log_msg, opts, opt_num, a_stream->threadnum);
+ return;
+}
+
+
+int ntc_ip_comm_get_dpkt_label(struct streaminfo *a_stream, const char* label_name, char *label_buf, int *label_buflen)
+{
+ dpkt_lable_t *dpkt_info = (dpkt_lable_t*)project_req_get_struct(a_stream,g_ntc_ip_comm_item.dpkt_project_id);
+ if(dpkt_info == NULL)return -1;
+
+ snprintf(label_buf, *label_buflen, "serv=%u;proto=%u;app=%u;os=%u;brow=%u;web=%u;beh=%u;",
+ dpkt_info->dpkt_service_type,
+ dpkt_info->dpkt_proto_type,
+ dpkt_info->dpkt_app_type,
+ dpkt_info->dpkt_op_type,
+ dpkt_info->dpkt_browser_type,
+ dpkt_info->dpkt_web_type,
+ dpkt_info->dpkt_behavior_type);
+ *label_buflen = strlen(label_buf);
+ return 0;
+}
+
+extern "C" UCHAR ntc_ip_comm_transfer_entry(struct streaminfo *a_stream, void **pme, int thread_seq,const void *raw_pkt)
+{
+ int state = 0;
+ if(a_stream->type == STREAM_TYPE_TCP)
+ {
+ state = a_stream->pktstate;
+ }
+ if(a_stream->type == STREAM_TYPE_UDP)
+ {
+ state = a_stream->opstate;
+ }
+ if(state == OP_STATE_CLOSE)
+ {
+ char label_buf[128] = {0};
+ int label_buflen = sizeof(label_buf);
+ ntc_ip_comm_get_dpkt_label(a_stream, g_ntc_ip_comm_item.dpkt_label, label_buf, &label_buflen);
+ if(a_stream->ptcpdetail->clientbytes+ a_stream->ptcpdetail->serverbytes >= g_ntc_ip_comm_item.min_bytes && a_stream->ptcpdetail->clientpktnum+ a_stream->ptcpdetail->serverpktnum >= g_ntc_ip_comm_item.min_pkts)
+ {
+ if(g_ntc_ip_comm_item.using_kafka == 1)
+ {
+ ntc_ip_comm_send_kafka_log(g_ntc_ip_comm_kafka_topic, a_stream, label_buf, label_buflen);
+ }
+ else
+ {
+ ntc_ip_comm_send_ntc_log(a_stream, label_buf, label_buflen);
+ }
+ }
+ return APP_STATE_DROPME;
+ }
+ return APP_STATE_GIVEME;
+}
+
+void ntc_ip_comm_load_profile()
+{
+ MESA_load_profile_string_def(PROFILE_PATH, PLUGIN_NAME, "log_path", g_ntc_ip_comm_item.log_path, sizeof(g_ntc_ip_comm_item.log_path), "./log/t1_rawpkt_log");
+ MESA_load_profile_uint_def(PROFILE_PATH,PLUGIN_NAME, "log_level", &g_ntc_ip_comm_item.log_level, 30);
+
+ char nic_name[64];
+ MESA_load_profile_string_def(PROFILE_PATH,"SYSTEM", "NIC_NAME",nic_name,sizeof(nic_name),"eth0");
+ g_ntc_ip_comm_item.local_ip_nr=get_ip_by_eth_name(nic_name);
+ if(g_ntc_ip_comm_item.local_ip_nr==INADDR_NONE)
+ {
+ printf("get_ip_by_eth_name in %s return NULL, exit!\n", nic_name);
+ exit(0);
+ }
+ inet_ntop(AF_INET,&(g_ntc_ip_comm_item.local_ip_nr),g_ntc_ip_comm_item.local_ip_str,sizeof(g_ntc_ip_comm_item.local_ip_str));
+ MESA_load_profile_int_def(PROFILE_PATH,"SYSTEM", "ENTRANCE_ID",&(g_ntc_ip_comm_item.entry_id),0);
+
+
+ MESA_load_profile_uint_def(PROFILE_PATH,PLUGIN_NAME, "min_pkts", &g_ntc_ip_comm_item.min_pkts, 5);
+ MESA_load_profile_uint_def(PROFILE_PATH,PLUGIN_NAME, "min_bytes", &g_ntc_ip_comm_item.min_bytes, 5);
+
+ MESA_load_profile_int_def(PROFILE_PATH,PLUGIN_NAME, "service", &g_ntc_ip_comm_item.service, 0);
+
+ MESA_load_profile_uint_def(PROFILE_PATH,PLUGIN_NAME, "using_kafka", &g_ntc_ip_comm_item.using_kafka, 0);
+
+ MESA_load_profile_string_def(PROFILE_PATH, PLUGIN_NAME, "dpkt_label", g_ntc_ip_comm_item.dpkt_label, sizeof(g_ntc_ip_comm_item.dpkt_label), "DPKT_PROJECT");
+ return ;
+}
+
+int ntc_ip_comm_kaka_init()
+{
+ char kafka_errstr[1024];
+ MESA_load_profile_uint_def(PROFILE_PATH,PLUGIN_NAME, "indie_kafka", &g_ntc_ip_comm_item.indie_kafka, 0);
+ MESA_load_profile_string_def(PROFILE_PATH, PLUGIN_NAME, "kafka_topic", g_ntc_ip_comm_item.kafka_topic, sizeof(g_ntc_ip_comm_item.kafka_topic), NTC_IP_COMM_TOPIC);
+ if(g_ntc_ip_comm_item.indie_kafka == 1)
+ {
+ if(0 > MESA_load_profile_string_nodef(PROFILE_PATH, PLUGIN_NAME, "kafka_brokelist", g_ntc_ip_comm_item.kafka_brokelist, sizeof(g_ntc_ip_comm_item.kafka_brokelist)))
+ {
+ return -1;
+ }
+ rd_kafka_conf_t *rdkafka_conf = rd_kafka_conf_new();
+ rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000",kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "security.protocol", "MG", kafka_errstr, sizeof(kafka_errstr));
+
+ if (!(g_ntc_ip_comm_kafka_handle = rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr))))
+ {
+ return -1;
+ }
+
+ if (rd_kafka_brokers_add(g_ntc_ip_comm_kafka_handle, g_ntc_ip_comm_item.kafka_brokelist) == 0)
+ {
+ return -1;
+ }
+ }
+ else
+ {
+ if(0 > MESA_load_profile_string_nodef(PROFILE_PATH, PLUGIN_NAME, "kafka_handle_provide_path", g_ntc_ip_comm_item.kafka_handle_provide_path, sizeof(g_ntc_ip_comm_item.kafka_handle_provide_path)))
+ {
+ return -1;
+ }
+ void * dl_handle = dlopen(g_ntc_ip_comm_item.kafka_handle_provide_path, RTLD_GLOBAL);
+ if(dl_handle == NULL)
+ {
+ return -1;
+ }
+ if(0 > MESA_load_profile_string_nodef(PROFILE_PATH, PLUGIN_NAME, "kafka_handle_name", g_ntc_ip_comm_item.kafka_handle_name, sizeof(g_ntc_ip_comm_item.kafka_handle_name)))
+ {
+ return -1;
+ }
+ g_ntc_ip_comm_kafka_handle = (rd_kafka_t *)dlsym(dl_handle, g_ntc_ip_comm_item.kafka_handle_name);
+ if(g_ntc_ip_comm_kafka_handle == NULL)
+ {
+ return -1;
+ }
+
+ }
+ rd_kafka_topic_conf_t*ip_comm_topic_conf = rd_kafka_topic_conf_new();
+ g_ntc_ip_comm_kafka_topic = rd_kafka_topic_new(g_ntc_ip_comm_kafka_handle, g_ntc_ip_comm_item.kafka_topic, ip_comm_topic_conf);
+ return 0;
+}
+
+extern "C" int ntc_ip_comm_init()
+{
+ memset(&g_ntc_ip_comm_item, 0, sizeof(g_ntc_ip_comm_item));
+ ntc_ip_comm_load_profile();
+ g_ntc_ip_comm_item.log_handle = MESA_create_runtime_log_handle(g_ntc_ip_comm_item.log_path, g_ntc_ip_comm_item.log_level);
+ if(g_ntc_ip_comm_item.log_handle == NULL)
+ return ERROR;
+
+
+ g_ntc_ip_comm_item.dpkt_project_id = project_customer_register(g_ntc_ip_comm_item.dpkt_label, "struct");
+ if(g_ntc_ip_comm_item.using_kafka == 1)
+ {
+ if(0 != ntc_ip_comm_kaka_init())
+ return ERROR;
+ }
+ return OK;
+}
+
+
+
+extern "C" void ntc_ip_comm_destroy(void)
+{
+ MESA_destroy_runtime_log_handle(g_ntc_ip_comm_item.log_handle);
+ return;
+}
+
diff --git a/src/ntc_ip_comm.h b/src/ntc_ip_comm.h new file mode 100644 index 0000000..787abd7 --- /dev/null +++ b/src/ntc_ip_comm.h @@ -0,0 +1,72 @@ +#ifndef NTC_IP_COMM_H +#define NTC_IP_COMM_H + + +#ifdef __cplusplus +extern "C" +{ +#endif + +#include "stream.h" + + typedef struct ntc_ip_comm_global_item + { + void * log_handle; + char log_path[1024]; + unsigned int log_level; + unsigned int using_kafka; + unsigned int indie_kafka; + char kafka_brokelist[1024]; + char kafka_handle_provide_path[1024]; + char kafka_handle_name[1024]; + char kafka_topic[1024]; + + unsigned int local_ip_nr; + char local_ip_str[128]; + char dpkt_label[1024]; + int dpkt_project_id; + int entry_id; + int service; + unsigned int min_bytes; + unsigned int min_pkts; + + }ntc_ip_comm_global_item; + + + typedef enum + { + OK = 0, + ERROR = -1, + UNDEFINE = -2 + } STATUS; + + typedef enum + { + TRUE = 1, + FALSE = 0 + } BOOL; + + + typedef struct _dpkt_lable_t + { + unsigned char trans_proto; + unsigned char v6; + unsigned short _pad0; + unsigned int dpkt_service_type; + unsigned int dpkt_proto_type; + unsigned int dpkt_app_type; + unsigned int dpkt_op_type; + unsigned int dpkt_browser_type; + unsigned int dpkt_web_type; + unsigned int dpkt_behavior_type; + }dpkt_lable_t; + + +#ifdef __cplusplus +} +#endif + +#endif + + + |
