diff options
| author | yw <[email protected]> | 2018-06-29 16:40:06 +0800 |
|---|---|---|
| committer | yw <[email protected]> | 2018-06-29 16:40:14 +0800 |
| commit | fe66a9d8910c70d691866e346c1c26ff0db4a44e (patch) | |
| tree | 6f8a4faf479c691e70e618eaf23512e4ed3a33a4 | |
first git version
| -rw-r--r-- | .gitignore | 8 | ||||
| -rw-r--r-- | Makefile | 78 | ||||
| -rw-r--r-- | bin/.gitignore | 6 | ||||
| -rw-r--r-- | conf/main.conf | 7 | ||||
| -rw-r--r-- | src/Makefile | 48 | ||||
| -rw-r--r-- | src/ntc_ssl_collect.c | 350 | ||||
| -rw-r--r-- | src/ntc_ssl_collect.h | 77 |
7 files changed, 574 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..267ab48 --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +SI/ +*.I* +*.P* +*.S* +*.W* +*.[od] +*.[1-9]* +*.log diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..37b25d2 --- /dev/null +++ b/Makefile @@ -0,0 +1,78 @@ +#define +PLUG_TYPE_BIZ=biz +PLUG_TYPE_PROTO=proto +PLUG_TYPE_PLATFORM=platform + + +path=/home/mesasoft/sapp_run + +PLUG_TYPE=biz +plug_name=ntc_ssl_collect +plug_conf_dir=$(path)/ntcconf/ + +plug_dir_name=$(plug_name) +plug_inf_name=$(plug_name).inf + +plug_so_path=./bin/$(plug_name).so +#plug_conf_path=./conf/$(plug_name).conf +plug_inf=./conf/$(plug_name).inf + +#-----auto filled---- + +biz_plug_dir_name=./plug/business +biz_plug_dir=$(path)/$(biz_plug_dir_name) + +protocol_plug_dir_name=./plug/protocol +protocol_plug_dir=$(path)/$(protocol_plug_dir_name) + + +platform_plug_dir_name=./plug/platform +platform_plug_dir=$(path)/$(platform_plug_dir_name) + +business_conflist_path=$(biz_plug_dir)/conflist_business.inf +protocol_conflist_path=$(protocol_plug_dir)/conflist_protocol.inf +platform_conflist_path=$(platform_plug_dir)/conflist_platform.inf + +ifeq ($(PLUG_TYPE), $(PLUG_TYPE_BIZ)) +dest_plug_dir_name = $(biz_plug_dir_name) +dest_plug_dir = $(biz_plug_dir) +dest_conflist_path=$(business_conflist_path) +endif + +ifeq ($(PLUG_TYPE), $(PLUG_TYPE_PROTO)) +dest_plug_dir_name = $(protocol_plug_dir_name) +dest_plug_dir = $(protocol_plug_dir) +dest_conflist_path=$(protocol_conflist_path) +endif + +ifeq ($(PLUG_TYPE), $(PLUG_TYPE_PLATFORM)) +dest_plug_dir_name = $(platform_plug_dir_name) +dest_plug_dir = $(platform_plug_dir) +dest_conflist_path=$(platform_conflist_path) +endif + +plug_inf_path=$(dest_plug_dir_name)/$(plug_dir_name)/$(plug_inf_name) +plug_dir=$(dest_plug_dir)/$(plug_dir_name) + + +.PHONY: all clean opt update + +all: + cd src && $(MAKE) + @echo -e "current path=\033[32;49;1m$(path)\033[32;49;0m , try \033[32;49;1m[make install path=PATH]\033[32;49;0m to install plugin" +clean: + cd src && $(MAKE) clean + +opt: + $(MAKE) all + +update: + @cp -f $(plug_so_path) $(plug_dir) + @echo -e "copy \033[32;49;1m$(plug_so_path) \033[32;49;0m to \033[32;49;1m$(plug_dir)\033[32;49;0m \033[31;49;1m[success]\033[31;49;0m" + +install: + mkdir -p $(plug_dir) + mkdir -p $(plug_conf_dir) + @cp -f $(plug_so_path) $(plug_inf) $(plug_dir) + @echo -e "copy \033[32;49;1m$(plug_so_path) $(plug_inf)\033[32;49;0m to \033[32;49;1m$(plug_dir)\033[32;49;0m \033[31;49;1m[success]\033[31;49;0m" + @ret=`cat $(dest_conflist_path)|grep $(plug_inf_path)|wc -l`;if [ $$ret -eq 0 ];then echo $(plug_inf_path) >>$(dest_conflist_path);fi diff --git a/bin/.gitignore b/bin/.gitignore new file mode 100644 index 0000000..fe22acb --- /dev/null +++ b/bin/.gitignore @@ -0,0 +1,6 @@ +*.so +SI/ +*.o +*.d +*.[1-9]* +*.log diff --git a/conf/main.conf b/conf/main.conf new file mode 100644 index 0000000..204dc71 --- /dev/null +++ b/conf/main.conf @@ -0,0 +1,7 @@ +[NTC_SSL_COLLECT] +log_level=30 +log_path=./ntclog/ntc_ssl_collect/run_log +kafka_topic=ntc_ssl_collect_log +#kafka_brokelist= +#service= + diff --git a/src/Makefile b/src/Makefile new file mode 100644 index 0000000..9159f50 --- /dev/null +++ b/src/Makefile @@ -0,0 +1,48 @@ +#path to find lib and header files +#vpath %.a ../lib +#vpath %.h ../inc + +CCC=g++ +CC=g++ + + +INCLUDEPATH+=-I../inc + +CFLAGS= -g3 -Wall -fPIC -Werror -O0 +CFLAGS+=$(INCLUDEPATH) + +CPPFLAGS=$(CFLAGS) + +LIB+=-lMESA_handle_logger +LIB+=-lMESA_prof_load +LIB+=-lrdkafka +LIB+=-lcJSON +LIB+=-ldl + +SOURCES=$(wildcard *.c) +OBJECTS=$(SOURCES:.c=.o) +DEPS=$(SOURCES:.c=.d) + +#target name +TARGET=ntc_ssl_collect.so + +.PHONY:clean all + +all:$(TARGET) + +$(TARGET):$(OBJECTS) $(LIB_FILE) + $(CC) -shared $(CFLAGS) $(OBJECTS) $(LIB) -o $@ +#copy target to dest dir + @awk '/VERSION/{print $$2}' $(SOURCES) |xargs -i echo -e "make \033[32;49;1m$@({})\033[32;49;0m \033[31;49;1m[success]\033[31;49;0m" + @cp -f $@ ../bin + @echo -e "copy \033[32;49;1m$@\033[32;49;0m to ../bin\033[31;49;1m[success]\033[31;49;0m" + +.c.o: + +%.d:%.c + $(CC) $< -MM $(INCLUDEPATH) > $@ + +-include $(DEPS) + +clean : + rm -f $(OBJECTS) $(DEPS) $(TARGET) diff --git a/src/ntc_ssl_collect.c b/src/ntc_ssl_collect.c new file mode 100644 index 0000000..618d01a --- /dev/null +++ b/src/ntc_ssl_collect.c @@ -0,0 +1,350 @@ +#include "ntc_ssl_collect.h" + + +#include <assert.h> +#include <stdio.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <arpa/inet.h> +#include <netinet/in.h> +#include <netinet/ip.h> +#include <netinet/ip6.h> +#include <netinet/udp.h> + +#include <net/if.h> +#include <sys/types.h> +#include <sys/ioctl.h> +#include <dlfcn.h> + +#include "ssl.h" +#include "MESA_prof_load.h" +#include "MESA_handle_logger.h" +#include "field_stat2.h" +#include <rdkafka.h> +#include <cJSON.h> + +#define PLUGIN_NAME "NTC_SSL_COLLECT" +#define PROFILE_PATH "./ntcconf/main.conf" +#define NTC_SSL_COLLECT_TOPIC "ntc_ssl_collect_log" + +extern long long g_CurrentTime; + +ntc_ssl_collect_global_item g_ssl_collect_item; + +int NTC_SSL_COLLECT_VERSION_20180629 = 1; + +static rd_kafka_t *g_ntc_kafka_handle = NULL; +static rd_kafka_topic_t *g_ntc_kafka_topic = NULL; + +static unsigned int get_ip_by_eth_name(const char *ifname) +{ + int sockfd; + struct ifreq ifr; + unsigned int ip; + + sockfd = socket(AF_INET, SOCK_DGRAM, 0); + if (-1 == sockfd) { + goto error; + } + + strcpy(ifr.ifr_name,ifname); + if (ioctl(sockfd, SIOCGIFADDR, &ifr) < 0) { + goto error; + } + + ip = ((struct sockaddr_in*)&(ifr.ifr_addr))->sin_addr.s_addr; + close(sockfd); + return ip; + +error: + close(sockfd); + return INADDR_NONE; +} + +static int ntc_addStreamInfo_to_jsonObj(cJSON *json_obj, const struct streaminfo *a_stream) +{ + int ret = 0; + const char *addr_proto = NULL; + const char* null_addr="0"; + short null_port=0; + unsigned short tunnel_type=0; + char nest_addr_buf[1024]; + int tunnel_type_size=sizeof(tunnel_type); + const struct layer_addr *addr=NULL; + char src_ip_str[128] = {0}, dst_ip_str[128] = {0}; + + cJSON_AddNumberToObject(json_obj, "stream_dir", a_stream->dir); + + addr=&(a_stream->addr); + switch(addr->addrtype) + { + case ADDR_TYPE_IPV4: + case __ADDR_TYPE_IP_PAIR_V4: + inet_ntop(AF_INET, &addr->ipv4->saddr, src_ip_str, sizeof(src_ip_str)); + inet_ntop(AF_INET, &addr->ipv4->daddr, dst_ip_str, sizeof(dst_ip_str)); + cJSON_AddStringToObject(json_obj, "s_ip", src_ip_str); + cJSON_AddStringToObject(json_obj, "d_ip", dst_ip_str); + cJSON_AddNumberToObject(json_obj, "s_port", ntohs(addr->ipv4->source)); + cJSON_AddNumberToObject(json_obj, "d_port", ntohs(addr->ipv4->dest)); + break; + case ADDR_TYPE_IPV6: + case __ADDR_TYPE_IP_PAIR_V6: + cJSON_AddNumberToObject(json_obj, "addr_type", addr->addrtype); + inet_ntop(AF_INET6, addr->ipv6->saddr, src_ip_str, sizeof(src_ip_str)); + inet_ntop(AF_INET6, addr->ipv6->daddr, dst_ip_str, sizeof(dst_ip_str)); + cJSON_AddStringToObject(json_obj, "s_ip", src_ip_str); + cJSON_AddStringToObject(json_obj, "d_ip", dst_ip_str); + cJSON_AddNumberToObject(json_obj, "s_port", ntohs(addr->ipv6->source)); + cJSON_AddNumberToObject(json_obj, "d_port", ntohs(addr->ipv6->dest)); + break; + case ADDR_TYPE_VLAN: + case ADDR_TYPE_GRE: + case ADDR_TYPE_MPLS: + case ADDR_TYPE_PPPOE_SES: + case ADDR_TYPE_L2TP: + case ADDR_TYPE_PPP: + cJSON_AddNumberToObject(json_obj, "addr_type", addr->addrtype); + cJSON_AddStringToObject(json_obj, "s_ip", null_addr); + cJSON_AddStringToObject(json_obj, "d_ip", null_addr); + cJSON_AddNumberToObject(json_obj, "s_port", null_port); + cJSON_AddNumberToObject(json_obj, "d_port", null_port); + break; + case ADDR_TYPE_PPTP: + cJSON_AddNumberToObject(json_obj, "addr_type", addr->addrtype); + cJSON_AddStringToObject(json_obj, "s_ip", null_addr); + cJSON_AddStringToObject(json_obj, "d_ip", null_addr); + cJSON_AddNumberToObject(json_obj, "s_port", ntohs(addr->pptp->C2S_call_id)); + cJSON_AddNumberToObject(json_obj, "d_port", ntohs(addr->pptp->S2C_call_id)); + break; + default: + break; + } + + addr_proto = layer_addr_prefix_ntop(a_stream); + cJSON_AddStringToObject(json_obj, "trans_proto", addr_proto); + + ret=MESA_get_stream_opt(a_stream, MSO_STREAM_TUNNEL_TYPE, &tunnel_type, &tunnel_type_size); + assert(ret==0); + if(tunnel_type==STREAM_TUNNLE_NON) + { + layer_addr_ntop_r(a_stream,nest_addr_buf, sizeof(nest_addr_buf)); + } + else + { + stream_addr_list_ntop(a_stream,nest_addr_buf, sizeof(nest_addr_buf)); + } + + cJSON_AddStringToObject(json_obj, "addr_list", nest_addr_buf); + + return 0; +} + + +void ntc_ssl_collect_send_kafka_log(rd_kafka_topic_t *topic, struct streaminfo *a_stream, comm_context_t *ctx, char *dpkt_buf, int dpkt_buflen, ssl_stream *a_ssl) +{ + if(a_stream == NULL || a_ssl == NULL) + { + return; + } + if(a_ssl->stClientHello == NULL || a_ssl->stClientHello->server_name == NULL) + { + return ; + } + + cJSON *log_obj = cJSON_CreateObject(); + + cJSON_AddNumberToObject(log_obj, "service", g_ssl_collect_item.service); + cJSON_AddStringToObject(log_obj, "cap_ip", g_ssl_collect_item.local_ip_str); + cJSON_AddNumberToObject(log_obj, "entrance_id", g_ssl_collect_item.entry_id); + + cJSON_AddNumberToObject(log_obj, "found_time", g_CurrentTime); + cJSON_AddNumberToObject(log_obj, "recv_time", g_CurrentTime); + ntc_addStreamInfo_to_jsonObj(log_obj, a_stream); + + char tmp[128]; + sprintf(tmp, "%llu", ctx->c2s_pkts); + cJSON_AddStringToObject(log_obj, "c2s_pkt_num", tmp); + sprintf(tmp, "%llu", ctx->s2c_pkts); + cJSON_AddStringToObject(log_obj, "s2c_pkt_num", tmp); + + sprintf(tmp, "%llu", ctx->c2s_bytes); + cJSON_AddStringToObject(log_obj, "c2s_byte_num", tmp); + + sprintf(tmp, "%llu", ctx->s2c_bytes); + cJSON_AddStringToObject(log_obj, "s2c_byte_num", tmp); + + cJSON_AddStringToObject(log_obj, "SNI", (const char *)a_ssl->stClientHello->server_name); + + if(dpkt_buf != NULL && dpkt_buflen > 0) + { + cJSON_AddStringToObject(log_obj, "app_label", dpkt_buf); + } + + cJSON_AddNumberToObject(log_obj, "create_time", a_stream->ptcpdetail->createtime); + cJSON_AddNumberToObject(log_obj, "lastmtime", a_stream->ptcpdetail->lastmtime); + + + char *payload = cJSON_Print(log_obj); + int paylen = strlen(payload); + rd_kafka_produce(g_ntc_kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, payload, paylen, NULL, 0, NULL); + + free(payload); + cJSON_Delete(log_obj); + log_obj = NULL; + return ; +} + + +int ntc_get_dpkt_label(struct streaminfo *a_stream, const char* label_name, char *label_buf, int *label_buflen) +{ + dpkt_lable_t *dpkt_info = (dpkt_lable_t*)project_req_get_struct(a_stream,g_ssl_collect_item.dpkt_project_id); + if(dpkt_info == NULL) + { + *label_buflen = 0; + return -1; + } + snprintf(label_buf, *label_buflen, "SEV_ID=%u;PROTO_ID=%u;APP_ID=%u;OS_ID=%u;BS_ID=%u;WEB_ID=%u;BEHAV_ID=%u;", + dpkt_info->dpkt_service_type, + dpkt_info->dpkt_proto_type, + dpkt_info->dpkt_app_type, + dpkt_info->dpkt_op_type, + dpkt_info->dpkt_browser_type, + dpkt_info->dpkt_web_type, + dpkt_info->dpkt_behavior_type); + *label_buflen = strlen(label_buf); + return 0; +} + +int nct_get_flow_stat(comm_context_t *ctx, struct streaminfo *a_stream, int flow_id) +{ + struct tcp_flow_stat *tflow_project; + struct udp_flow_stat *uflow_project; + if(flow_id < 0 ) + { + return -1; + } + if(a_stream->type == STREAM_TYPE_TCP) + { + tflow_project = (struct tcp_flow_stat *)project_req_get_struct(a_stream,flow_id); + if(tflow_project != NULL) + { + ctx->c2s_bytes = tflow_project->C2S_data_byte; + ctx->s2c_bytes = tflow_project->S2C_data_byte; + ctx->c2s_pkts = tflow_project->C2S_data_pkt; + ctx->s2c_pkts = tflow_project->S2C_data_pkt; + } + } + else if(a_stream->type == STREAM_TYPE_UDP) + { + uflow_project = (struct udp_flow_stat *)project_req_get_struct(a_stream, flow_id); + if(uflow_project != NULL) + { + ctx->c2s_bytes = uflow_project->C2S_byte; + ctx->s2c_bytes = uflow_project->S2C_byte; + ctx->c2s_pkts = uflow_project->C2S_pkt; + ctx->s2c_pkts = uflow_project->S2C_pkt; + } + } + else + { + return -1; + } + return 0; +} + +extern "C" UCHAR ntc_ssl_collect_entry(stSessionInfo *session_info, void **param, int thread_seq, struct streaminfo *a_stream, void *a_packet) +{ + unsigned char ret = APP_STATE_GIVEME; + char label_buf[128] = {0}; + int label_buflen = sizeof(label_buf); + comm_context_t ctx; + ssl_stream *a_ssl; + switch (a_stream->opstate) + { + case OP_STATE_CLOSE: + nct_get_flow_stat(&ctx, a_stream, g_ssl_collect_item.tcp_flow_id); + ntc_get_dpkt_label(a_stream, g_ssl_collect_item.dpkt_label, label_buf, &label_buflen); + a_ssl = (ssl_stream *)session_info->app_info; + ntc_ssl_collect_send_kafka_log(g_ntc_kafka_topic, a_stream, &ctx, label_buf, label_buflen, a_ssl); + ret= APP_STATE_DROPME; + break; + default: + break; + } + return ret ; +} + +void ntc_ssl_collect_load_profile() +{ + MESA_load_profile_string_def(PROFILE_PATH, PLUGIN_NAME, "log_path", g_ssl_collect_item.log_path, sizeof(g_ssl_collect_item.log_path), "./ntclog/ip_comm_log"); + MESA_load_profile_uint_def(PROFILE_PATH,PLUGIN_NAME, "log_level", &g_ssl_collect_item.log_level, 30); + + char nic_name[64]; + MESA_load_profile_string_def(PROFILE_PATH,"SYSTEM", "NIC_NAME",nic_name,sizeof(nic_name),"eth0"); + g_ssl_collect_item.local_ip_nr=get_ip_by_eth_name(nic_name); + if(g_ssl_collect_item.local_ip_nr==INADDR_NONE) + { + printf("get_ip_by_eth_name in %s return NULL, exit!\n", nic_name); + exit(0); + } + inet_ntop(AF_INET,&(g_ssl_collect_item.local_ip_nr),g_ssl_collect_item.local_ip_str,sizeof(g_ssl_collect_item.local_ip_str)); + MESA_load_profile_int_def(PROFILE_PATH,"SYSTEM", "ENTRANCE_ID",&(g_ssl_collect_item.entry_id),0); + + MESA_load_profile_int_def(PROFILE_PATH,PLUGIN_NAME, "service", &g_ssl_collect_item.service, 0); + MESA_load_profile_string_def(PROFILE_PATH, PLUGIN_NAME, "dpkt_label", g_ssl_collect_item.dpkt_label, sizeof(g_ssl_collect_item.dpkt_label), "DPKT_PROJECT"); + return ; +} + +int ntc_ssl_collect_kaka_init() +{ + char kafka_errstr[1024]; + MESA_load_profile_string_def(PROFILE_PATH, PLUGIN_NAME, "kafka_topic", g_ssl_collect_item.kafka_topic, sizeof(g_ssl_collect_item.kafka_topic), NTC_SSL_COLLECT_TOPIC); + + if(0 > MESA_load_profile_string_nodef(PROFILE_PATH, PLUGIN_NAME, "kafka_brokelist", g_ssl_collect_item.kafka_brokelist, sizeof(g_ssl_collect_item.kafka_brokelist))) + { + return -1; + } + rd_kafka_conf_t *rdkafka_conf = rd_kafka_conf_new(); + rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000",kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(rdkafka_conf, "security.protocol", "MG", kafka_errstr, sizeof(kafka_errstr)); + + if (!(g_ntc_kafka_handle = rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr)))) + { + return -1; + } + + if (rd_kafka_brokers_add(g_ntc_kafka_handle, g_ssl_collect_item.kafka_brokelist) == 0) + { + return -1; + } + rd_kafka_topic_conf_t*ip_comm_topic_conf = rd_kafka_topic_conf_new(); + g_ntc_kafka_topic = rd_kafka_topic_new(g_ntc_kafka_handle, g_ssl_collect_item.kafka_topic, ip_comm_topic_conf); + return 0; +} + +extern "C" int ntc_ssl_collect_init() +{ + memset(&g_ssl_collect_item, 0, sizeof(g_ssl_collect_item)); + ntc_ssl_collect_load_profile(); + g_ssl_collect_item.log_handle = MESA_create_runtime_log_handle(g_ssl_collect_item.log_path, g_ssl_collect_item.log_level); + if(g_ssl_collect_item.log_handle == NULL) + return ERROR; + + + g_ssl_collect_item.dpkt_project_id = project_customer_register(g_ssl_collect_item.dpkt_label, "struct"); + g_ssl_collect_item.tcp_flow_id = project_customer_register("tcp_flow_stat", "struct"); + + if(0 != ntc_ssl_collect_kaka_init()) + return ERROR; + return OK; +} + + + +extern "C" void ntc_ssl_collect_destroy(void) +{ + MESA_destroy_runtime_log_handle(g_ssl_collect_item.log_handle); + return; +} diff --git a/src/ntc_ssl_collect.h b/src/ntc_ssl_collect.h new file mode 100644 index 0000000..e0d1a7a --- /dev/null +++ b/src/ntc_ssl_collect.h @@ -0,0 +1,77 @@ +#ifndef NTC_SSL_COLLECT_H +#define NTC_SSL_COLLECT_H + + +#ifdef __cplusplus +extern "C" +{ +#endif + +#include "stream.h" + + typedef struct ntc_ssl_collect_global_item + { + void * log_handle; + char log_path[1024]; + unsigned int log_level; + unsigned int comm_log_mode; + char kafka_brokelist[1024]; + char kafka_topic[1024]; + char dpkt_label[1024]; + int dpkt_project_id; + + int tcp_flow_id; + int udp_flow_id; + + unsigned int local_ip_nr; + char local_ip_str[128]; + + int entry_id; + int service; + }ntc_ssl_collect_global_item; + +typedef struct _dpkt_lable_t +{ + unsigned char trans_proto; + unsigned char v6; + unsigned short _pad0; + unsigned int dpkt_service_type; + unsigned int dpkt_proto_type; + unsigned int dpkt_app_type; + unsigned int dpkt_op_type; + unsigned int dpkt_browser_type; + unsigned int dpkt_web_type; + unsigned int dpkt_behavior_type; + }dpkt_lable_t; + +typedef struct _comm_context_t +{ + unsigned long long c2s_pkts; + unsigned long long s2c_pkts; + unsigned long long c2s_bytes; + unsigned long long s2c_bytes; + }comm_context_t; + + + typedef enum + { + OK = 0, + ERROR = -1, + UNDEFINE = -2 + } STATUS; + + typedef enum + { + TRUE = 1, + FALSE = 0 + } BOOL; + + +#ifdef __cplusplus +} +#endif + +#endif + + + |
