diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/Makefile | 65 | ||||
| -rw-r--r-- | src/ntc_ip_comm.cpp | 92 | ||||
| -rw-r--r-- | src/ntc_ip_comm.h | 2 |
3 files changed, 75 insertions, 84 deletions
diff --git a/src/Makefile b/src/Makefile deleted file mode 100644 index e1f68b5..0000000 --- a/src/Makefile +++ /dev/null @@ -1,65 +0,0 @@ -#path to find lib and header files -#vpath %.a ../lib -#vpath %.h ../inc - -CCC=g++ -CC=g++ - - -INCLUDEPATH+=-I../inc - -CFLAGS= -g3 -Wall -fPIC -O0 -CFLAGS+=$(INCLUDEPATH) - -ifdef ASAN -CFLAGS+=-fsanitize=address -fno-omit-frame-pointer -endif - -#GIT_BRANCH=$(shell git symbolic-ref --short -q HEAD) -#GIT_SHA1=$(shell git rev-parse HEAD) -#MAKE_TIME=$(shell date "+%Y-%m-%d_%H:%M:%S") -#GIT_VERSION=VERSION_$(GIT_BRANCH)-$(GIT_SHA1) -#VERSION_FLAGS += -D$(GIT_VERSION)=1 -#CFLAGS += ${VERSION_FLAGS} - -CPPFLAGS=$(CFLAGS) - - -LIB+=-lMESA_handle_logger -LIB+=-lMESA_prof_load -LIB+=-lrdkafka -#LIB+=-lcjson -LIB+=-lcJSON -LIB+=-ldl - -ifdef ASAN -LIB+=-lasan -endif - -SOURCES=$(wildcard *.c) -OBJECTS=$(SOURCES:.c=.o) -DEPS=$(SOURCES:.c=.d) - -#target name -TARGET=ntc_ip_comm.so - -.PHONY:clean all - -all:$(TARGET) - -$(TARGET):$(OBJECTS) $(LIB_FILE) - $(CC) -shared $(CFLAGS) $(OBJECTS) $(LIB) -Wl,--version-script=./version.map -o $@ -#copy target to dest dir - @awk '/VERSION_20/{print $$2}' $(SOURCES) |xargs -i echo -e "make \033[32;49;1m$@({})\033[32;49;0m \033[31;49;1m[success]\033[31;49;0m" - @cp -f $@ ../bin - @echo -e "copy \033[32;49;1m$@\033[32;49;0m to ../bin\033[31;49;1m[success]\033[31;49;0m" - -.c.o: - -%.d:%.c - $(CC) $< -MM $(INCLUDEPATH) > $@ - --include $(DEPS) - -clean : - rm -f $(OBJECTS) $(DEPS) $(TARGET) diff --git a/src/ntc_ip_comm.cpp b/src/ntc_ip_comm.cpp index b758995..77415f5 100644 --- a/src/ntc_ip_comm.cpp +++ b/src/ntc_ip_comm.cpp @@ -22,6 +22,7 @@ #include "MESA_prof_load.h" #include "MESA_handle_logger.h" #include "field_stat2.h" +#include "stream_internal.h" #include <rdkafka.h> #include <cJSON.h> @@ -165,7 +166,8 @@ static int soq_addStreamInfo_to_jsonObj(cJSON *json_obj, const struct streaminfo } -void ntc_ip_comm_send_kafka_log(rd_kafka_topic_t *topic, struct streaminfo *a_stream, comm_context_t *ctx, char *dpkt_buf, int dpkt_buflen) +void ntc_ip_comm_send_kafka_log(rd_kafka_topic_t *topic, struct streaminfo *a_stream, comm_context_t *ctx, char + *dpkt_buf, int dpkt_buflen, char *user_buf, int user_buflen) { cJSON *log_obj = cJSON_CreateObject(); @@ -195,12 +197,12 @@ void ntc_ip_comm_send_kafka_log(rd_kafka_topic_t *topic, struct streaminfo *a_st cJSON_AddNumberToObject(log_obj, "create_time", a_stream->ptcpdetail->createtime); cJSON_AddNumberToObject(log_obj, "lastmtime", a_stream->ptcpdetail->lastmtime); - - char user_region_buf[4096] = " "; - snprintf(user_region_buf,sizeof(user_region_buf), "thread=%d;index=%d;hash=%d;", a_stream->threadnum, a_stream->stream_index, a_stream->hash_index); - //cJSON_AddNumberToObject(log_obj, "user_region", a_stream->threadnum); - cJSON_AddStringToObject(log_obj, "user_region", user_region_buf); - + + if(user_buflen > 0) + { + cJSON_AddStringToObject(log_obj, "user_region", user_buf); + } + //char *payload = cJSON_Print(log_obj); char *payload = cJSON_PrintUnformatted(log_obj); int paylen = strlen(payload); @@ -215,7 +217,7 @@ void ntc_ip_comm_send_kafka_log(rd_kafka_topic_t *topic, struct streaminfo *a_st return ; } -void ntc_ip_comm_send_ntc_log(struct streaminfo *a_stream, comm_context_t *ctx, char *dpkt_buf, int dpkt_buflen) +void ntc_ip_comm_send_ntc_log(struct streaminfo *a_stream, comm_context_t *ctx, char *dpkt_buf, int dpkt_buflen, char *user_buf, int user_buflen) { soq_log_t log_msg; Maat_rule_t maat_rule; @@ -232,8 +234,13 @@ void ntc_ip_comm_send_ntc_log(struct streaminfo *a_stream, comm_context_t *ctx, maat_rule.config_id = 0; maat_rule.service_id = g_ntc_ip_comm_item.service; maat_rule.do_log = 1; - snprintf(maat_rule.service_defined,sizeof(maat_rule.service_defined),"thread=%d;index=%d;hash=%d;", a_stream->threadnum, a_stream->stream_index, a_stream->hash_index); - log_msg.stream = a_stream; + + if(user_buflen > 0) + { + snprintf(maat_rule.service_defined,sizeof(maat_rule.service_defined),"%s", user_buf); + } + + log_msg.stream = a_stream; log_msg.result = &maat_rule; log_msg.result_num =1; @@ -268,6 +275,27 @@ void ntc_ip_comm_send_ntc_log(struct streaminfo *a_stream, comm_context_t *ctx, return; } +int ntc_ip_comm_assemble_user_buf(struct streaminfo *a_stream, char *user_buf, int *user_buflen) +{ + if(a_stream == NULL) + { + *user_buflen = 0; + return -1; + } + + int killed_flag = 0; + if(g_ntc_ip_comm_item.after_kill_switch == 1) + { + struct streaminfo_private *pstream_pr = (struct streaminfo_private *)a_stream; + if(pstream_pr->stream_killed_flag == 1) + { + killed_flag = 1; + } + } + snprintf(user_buf, *user_buflen, "thread=%d;index=%d;hash=%d;killed=%d", a_stream->threadnum, + a_stream->stream_index, a_stream->hash_index, killed_flag); + return 0; +} int ntc_ip_comm_get_dpkt_label(struct streaminfo *a_stream, const char* label_name, char *label_buf, int *label_buflen) { @@ -345,13 +373,18 @@ void ntc_ip_comm_judge_counter(comm_context_t *ctx, struct streaminfo *a_stream) return; } -extern "C" UCHAR ntc_ip_comm_transfer_entry(struct streaminfo *a_stream, void **pme, int thread_seq,const void *raw_pkt) + + +UCHAR ntc_ip_comm_transfer_process(struct streaminfo *a_stream, void **pme, int thread_seq,const void *raw_pkt, + UCHAR opstate) { unsigned char ret = APP_STATE_GIVEME; char label_buf[128] = {0}; - int label_buflen = sizeof(label_buf); + char user_buf[128] = {0}; + int label_buflen = sizeof(label_buf); + int user_buflen = sizeof(user_buf); comm_context_t *ctx = NULL; - switch (a_stream->opstate) + switch (opstate) { case OP_STATE_PENDING: ctx = (comm_context_t *)calloc(sizeof(comm_context_t ), 1); @@ -370,15 +403,16 @@ extern "C" UCHAR ntc_ip_comm_transfer_entry(struct streaminfo *a_stream, void * ntc_ip_comm_judge_counter(ctx, a_stream); if(ctx->c2s_bytes+ctx->s2c_bytes >= g_ntc_ip_comm_item.min_bytes &&ctx->c2s_pkts+ctx->s2c_pkts >= g_ntc_ip_comm_item.min_pkts) { - ntc_ip_comm_get_dpkt_label(a_stream, g_ntc_ip_comm_item.dpkt_label, label_buf, &label_buflen); - if((g_ntc_ip_comm_item.comm_log_mode&SEND_LOG) == SEND_LOG) + ntc_ip_comm_get_dpkt_label(a_stream, g_ntc_ip_comm_item.dpkt_label, label_buf, &label_buflen); + ntc_ip_comm_assemble_user_buf(a_stream, user_buf, &user_buflen); + if((g_ntc_ip_comm_item.comm_log_mode&SEND_LOG) == SEND_LOG) { - ntc_ip_comm_send_ntc_log(a_stream, ctx, label_buf, label_buflen); + ntc_ip_comm_send_ntc_log(a_stream, ctx, label_buf, label_buflen, user_buf, user_buflen); } - //if((g_ntc_ip_comm_item.comm_log_mode&SEND_KAFKA) == SEND_KAFKA) else { - ntc_ip_comm_send_kafka_log(g_ntc_ip_comm_kafka_topic, a_stream, ctx, label_buf, label_buflen); + ntc_ip_comm_send_kafka_log(g_ntc_ip_comm_kafka_topic, a_stream, ctx, label_buf, + label_buflen, user_buf, user_buflen); } } free(ctx); @@ -390,6 +424,24 @@ extern "C" UCHAR ntc_ip_comm_transfer_entry(struct streaminfo *a_stream, void * return ret ; } +extern "C" UCHAR ntc_ip_comm_tcpall_entry(struct streaminfo *a_stream, void **pme, int thread_seq,const void *raw_pkt) +{ + if(g_ntc_ip_comm_item.after_kill_switch == 1) + { + if(a_stream->pktstate == OP_STATE_PENDING) + { + unsigned char mopt = 1; + MESA_set_stream_opt(a_stream, MSO_TCPALL_VALID_AFTER_KILL, &mopt, sizeof(mopt)); + } + } + return ntc_ip_comm_transfer_process(a_stream, pme, thread_seq, raw_pkt, a_stream->pktstate); +} + +extern "C" UCHAR ntc_ip_comm_transfer_entry(struct streaminfo *a_stream, void **pme, int thread_seq,const void *raw_pkt) +{ + return ntc_ip_comm_transfer_process(a_stream, pme, thread_seq, raw_pkt, a_stream->opstate); +} + void ntc_ip_comm_load_profile() { MESA_load_profile_string_def(PROFILE_PATH, PLUGIN_NAME, "log_path", g_ntc_ip_comm_item.log_path, sizeof(g_ntc_ip_comm_item.log_path), "./t1log/ip_comm_log"); @@ -412,7 +464,9 @@ void ntc_ip_comm_load_profile() MESA_load_profile_int_def(PROFILE_PATH,PLUGIN_NAME, "service", &g_ntc_ip_comm_item.service, 0); - MESA_load_profile_uint_def(PROFILE_PATH,PLUGIN_NAME, "comm_log_mode", &g_ntc_ip_comm_item.comm_log_mode, 0); + MESA_load_profile_int_def(PROFILE_PATH,PLUGIN_NAME, "after_kill_switch", &g_ntc_ip_comm_item.after_kill_switch, 0); + + MESA_load_profile_uint_def(PROFILE_PATH,PLUGIN_NAME, "comm_log_mode", &g_ntc_ip_comm_item.comm_log_mode, 0); MESA_load_profile_string_def(PROFILE_PATH, PLUGIN_NAME, "dpkt_label", g_ntc_ip_comm_item.dpkt_label, sizeof(g_ntc_ip_comm_item.dpkt_label), "DPKT_PROJECT"); return ; diff --git a/src/ntc_ip_comm.h b/src/ntc_ip_comm.h index fc49d06..838d42f 100644 --- a/src/ntc_ip_comm.h +++ b/src/ntc_ip_comm.h @@ -21,6 +21,8 @@ extern "C" char kafka_handle_name[1024]; char kafka_topic[1024]; + int after_kill_switch; + unsigned int local_ip_nr; char local_ip_str[128]; char dpkt_label[1024]; |
