summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Makefile65
-rw-r--r--src/ntc_ip_comm.cpp92
-rw-r--r--src/ntc_ip_comm.h2
3 files changed, 75 insertions, 84 deletions
diff --git a/src/Makefile b/src/Makefile
deleted file mode 100644
index e1f68b5..0000000
--- a/src/Makefile
+++ /dev/null
@@ -1,65 +0,0 @@
-#path to find lib and header files
-#vpath %.a ../lib
-#vpath %.h ../inc
-
-CCC=g++
-CC=g++
-
-
-INCLUDEPATH+=-I../inc
-
-CFLAGS= -g3 -Wall -fPIC -O0
-CFLAGS+=$(INCLUDEPATH)
-
-ifdef ASAN
-CFLAGS+=-fsanitize=address -fno-omit-frame-pointer
-endif
-
-#GIT_BRANCH=$(shell git symbolic-ref --short -q HEAD)
-#GIT_SHA1=$(shell git rev-parse HEAD)
-#MAKE_TIME=$(shell date "+%Y-%m-%d_%H:%M:%S")
-#GIT_VERSION=VERSION_$(GIT_BRANCH)-$(GIT_SHA1)
-#VERSION_FLAGS += -D$(GIT_VERSION)=1
-#CFLAGS += ${VERSION_FLAGS}
-
-CPPFLAGS=$(CFLAGS)
-
-
-LIB+=-lMESA_handle_logger
-LIB+=-lMESA_prof_load
-LIB+=-lrdkafka
-#LIB+=-lcjson
-LIB+=-lcJSON
-LIB+=-ldl
-
-ifdef ASAN
-LIB+=-lasan
-endif
-
-SOURCES=$(wildcard *.c)
-OBJECTS=$(SOURCES:.c=.o)
-DEPS=$(SOURCES:.c=.d)
-
-#target name
-TARGET=ntc_ip_comm.so
-
-.PHONY:clean all
-
-all:$(TARGET)
-
-$(TARGET):$(OBJECTS) $(LIB_FILE)
- $(CC) -shared $(CFLAGS) $(OBJECTS) $(LIB) -Wl,--version-script=./version.map -o $@
-#copy target to dest dir
- @awk '/VERSION_20/{print $$2}' $(SOURCES) |xargs -i echo -e "make \033[32;49;1m$@({})\033[32;49;0m \033[31;49;1m[success]\033[31;49;0m"
- @cp -f $@ ../bin
- @echo -e "copy \033[32;49;1m$@\033[32;49;0m to ../bin\033[31;49;1m[success]\033[31;49;0m"
-
-.c.o:
-
-%.d:%.c
- $(CC) $< -MM $(INCLUDEPATH) > $@
-
--include $(DEPS)
-
-clean :
- rm -f $(OBJECTS) $(DEPS) $(TARGET)
diff --git a/src/ntc_ip_comm.cpp b/src/ntc_ip_comm.cpp
index b758995..77415f5 100644
--- a/src/ntc_ip_comm.cpp
+++ b/src/ntc_ip_comm.cpp
@@ -22,6 +22,7 @@
#include "MESA_prof_load.h"
#include "MESA_handle_logger.h"
#include "field_stat2.h"
+#include "stream_internal.h"
#include <rdkafka.h>
#include <cJSON.h>
@@ -165,7 +166,8 @@ static int soq_addStreamInfo_to_jsonObj(cJSON *json_obj, const struct streaminfo
}
-void ntc_ip_comm_send_kafka_log(rd_kafka_topic_t *topic, struct streaminfo *a_stream, comm_context_t *ctx, char *dpkt_buf, int dpkt_buflen)
+void ntc_ip_comm_send_kafka_log(rd_kafka_topic_t *topic, struct streaminfo *a_stream, comm_context_t *ctx, char
+ *dpkt_buf, int dpkt_buflen, char *user_buf, int user_buflen)
{
cJSON *log_obj = cJSON_CreateObject();
@@ -195,12 +197,12 @@ void ntc_ip_comm_send_kafka_log(rd_kafka_topic_t *topic, struct streaminfo *a_st
cJSON_AddNumberToObject(log_obj, "create_time", a_stream->ptcpdetail->createtime);
cJSON_AddNumberToObject(log_obj, "lastmtime", a_stream->ptcpdetail->lastmtime);
-
- char user_region_buf[4096] = " ";
- snprintf(user_region_buf,sizeof(user_region_buf), "thread=%d;index=%d;hash=%d;", a_stream->threadnum, a_stream->stream_index, a_stream->hash_index);
- //cJSON_AddNumberToObject(log_obj, "user_region", a_stream->threadnum);
- cJSON_AddStringToObject(log_obj, "user_region", user_region_buf);
-
+
+ if(user_buflen > 0)
+ {
+ cJSON_AddStringToObject(log_obj, "user_region", user_buf);
+ }
+
//char *payload = cJSON_Print(log_obj);
char *payload = cJSON_PrintUnformatted(log_obj);
int paylen = strlen(payload);
@@ -215,7 +217,7 @@ void ntc_ip_comm_send_kafka_log(rd_kafka_topic_t *topic, struct streaminfo *a_st
return ;
}
-void ntc_ip_comm_send_ntc_log(struct streaminfo *a_stream, comm_context_t *ctx, char *dpkt_buf, int dpkt_buflen)
+void ntc_ip_comm_send_ntc_log(struct streaminfo *a_stream, comm_context_t *ctx, char *dpkt_buf, int dpkt_buflen, char *user_buf, int user_buflen)
{
soq_log_t log_msg;
Maat_rule_t maat_rule;
@@ -232,8 +234,13 @@ void ntc_ip_comm_send_ntc_log(struct streaminfo *a_stream, comm_context_t *ctx,
maat_rule.config_id = 0;
maat_rule.service_id = g_ntc_ip_comm_item.service;
maat_rule.do_log = 1;
- snprintf(maat_rule.service_defined,sizeof(maat_rule.service_defined),"thread=%d;index=%d;hash=%d;", a_stream->threadnum, a_stream->stream_index, a_stream->hash_index);
- log_msg.stream = a_stream;
+
+ if(user_buflen > 0)
+ {
+ snprintf(maat_rule.service_defined,sizeof(maat_rule.service_defined),"%s", user_buf);
+ }
+
+ log_msg.stream = a_stream;
log_msg.result = &maat_rule;
log_msg.result_num =1;
@@ -268,6 +275,27 @@ void ntc_ip_comm_send_ntc_log(struct streaminfo *a_stream, comm_context_t *ctx,
return;
}
+int ntc_ip_comm_assemble_user_buf(struct streaminfo *a_stream, char *user_buf, int *user_buflen)
+{
+ if(a_stream == NULL)
+ {
+ *user_buflen = 0;
+ return -1;
+ }
+
+ int killed_flag = 0;
+ if(g_ntc_ip_comm_item.after_kill_switch == 1)
+ {
+ struct streaminfo_private *pstream_pr = (struct streaminfo_private *)a_stream;
+ if(pstream_pr->stream_killed_flag == 1)
+ {
+ killed_flag = 1;
+ }
+ }
+ snprintf(user_buf, *user_buflen, "thread=%d;index=%d;hash=%d;killed=%d", a_stream->threadnum,
+ a_stream->stream_index, a_stream->hash_index, killed_flag);
+ return 0;
+}
int ntc_ip_comm_get_dpkt_label(struct streaminfo *a_stream, const char* label_name, char *label_buf, int *label_buflen)
{
@@ -345,13 +373,18 @@ void ntc_ip_comm_judge_counter(comm_context_t *ctx, struct streaminfo *a_stream)
return;
}
-extern "C" UCHAR ntc_ip_comm_transfer_entry(struct streaminfo *a_stream, void **pme, int thread_seq,const void *raw_pkt)
+
+
+UCHAR ntc_ip_comm_transfer_process(struct streaminfo *a_stream, void **pme, int thread_seq,const void *raw_pkt,
+ UCHAR opstate)
{
unsigned char ret = APP_STATE_GIVEME;
char label_buf[128] = {0};
- int label_buflen = sizeof(label_buf);
+ char user_buf[128] = {0};
+ int label_buflen = sizeof(label_buf);
+ int user_buflen = sizeof(user_buf);
comm_context_t *ctx = NULL;
- switch (a_stream->opstate)
+ switch (opstate)
{
case OP_STATE_PENDING:
ctx = (comm_context_t *)calloc(sizeof(comm_context_t ), 1);
@@ -370,15 +403,16 @@ extern "C" UCHAR ntc_ip_comm_transfer_entry(struct streaminfo *a_stream, void *
ntc_ip_comm_judge_counter(ctx, a_stream);
if(ctx->c2s_bytes+ctx->s2c_bytes >= g_ntc_ip_comm_item.min_bytes &&ctx->c2s_pkts+ctx->s2c_pkts >= g_ntc_ip_comm_item.min_pkts)
{
- ntc_ip_comm_get_dpkt_label(a_stream, g_ntc_ip_comm_item.dpkt_label, label_buf, &label_buflen);
- if((g_ntc_ip_comm_item.comm_log_mode&SEND_LOG) == SEND_LOG)
+ ntc_ip_comm_get_dpkt_label(a_stream, g_ntc_ip_comm_item.dpkt_label, label_buf, &label_buflen);
+ ntc_ip_comm_assemble_user_buf(a_stream, user_buf, &user_buflen);
+ if((g_ntc_ip_comm_item.comm_log_mode&SEND_LOG) == SEND_LOG)
{
- ntc_ip_comm_send_ntc_log(a_stream, ctx, label_buf, label_buflen);
+ ntc_ip_comm_send_ntc_log(a_stream, ctx, label_buf, label_buflen, user_buf, user_buflen);
}
- //if((g_ntc_ip_comm_item.comm_log_mode&SEND_KAFKA) == SEND_KAFKA)
else
{
- ntc_ip_comm_send_kafka_log(g_ntc_ip_comm_kafka_topic, a_stream, ctx, label_buf, label_buflen);
+ ntc_ip_comm_send_kafka_log(g_ntc_ip_comm_kafka_topic, a_stream, ctx, label_buf,
+ label_buflen, user_buf, user_buflen);
}
}
free(ctx);
@@ -390,6 +424,24 @@ extern "C" UCHAR ntc_ip_comm_transfer_entry(struct streaminfo *a_stream, void *
return ret ;
}
+extern "C" UCHAR ntc_ip_comm_tcpall_entry(struct streaminfo *a_stream, void **pme, int thread_seq,const void *raw_pkt)
+{
+ if(g_ntc_ip_comm_item.after_kill_switch == 1)
+ {
+ if(a_stream->pktstate == OP_STATE_PENDING)
+ {
+ unsigned char mopt = 1;
+ MESA_set_stream_opt(a_stream, MSO_TCPALL_VALID_AFTER_KILL, &mopt, sizeof(mopt));
+ }
+ }
+ return ntc_ip_comm_transfer_process(a_stream, pme, thread_seq, raw_pkt, a_stream->pktstate);
+}
+
+extern "C" UCHAR ntc_ip_comm_transfer_entry(struct streaminfo *a_stream, void **pme, int thread_seq,const void *raw_pkt)
+{
+ return ntc_ip_comm_transfer_process(a_stream, pme, thread_seq, raw_pkt, a_stream->opstate);
+}
+
void ntc_ip_comm_load_profile()
{
MESA_load_profile_string_def(PROFILE_PATH, PLUGIN_NAME, "log_path", g_ntc_ip_comm_item.log_path, sizeof(g_ntc_ip_comm_item.log_path), "./t1log/ip_comm_log");
@@ -412,7 +464,9 @@ void ntc_ip_comm_load_profile()
MESA_load_profile_int_def(PROFILE_PATH,PLUGIN_NAME, "service", &g_ntc_ip_comm_item.service, 0);
- MESA_load_profile_uint_def(PROFILE_PATH,PLUGIN_NAME, "comm_log_mode", &g_ntc_ip_comm_item.comm_log_mode, 0);
+ MESA_load_profile_int_def(PROFILE_PATH,PLUGIN_NAME, "after_kill_switch", &g_ntc_ip_comm_item.after_kill_switch, 0);
+
+ MESA_load_profile_uint_def(PROFILE_PATH,PLUGIN_NAME, "comm_log_mode", &g_ntc_ip_comm_item.comm_log_mode, 0);
MESA_load_profile_string_def(PROFILE_PATH, PLUGIN_NAME, "dpkt_label", g_ntc_ip_comm_item.dpkt_label, sizeof(g_ntc_ip_comm_item.dpkt_label), "DPKT_PROJECT");
return ;
diff --git a/src/ntc_ip_comm.h b/src/ntc_ip_comm.h
index fc49d06..838d42f 100644
--- a/src/ntc_ip_comm.h
+++ b/src/ntc_ip_comm.h
@@ -21,6 +21,8 @@ extern "C"
char kafka_handle_name[1024];
char kafka_topic[1024];
+ int after_kill_switch;
+
unsigned int local_ip_nr;
char local_ip_str[128];
char dpkt_label[1024];