summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile29
-rw-r--r--ip_reset_plug.c572
-rw-r--r--ip_reset_plug.h62
-rw-r--r--macip.conf27
-rw-r--r--readme.txt16
5 files changed, 706 insertions, 0 deletions
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..d100b7d
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,29 @@
+CC = gcc
+CCC = g++
+CFLAGS = -g -fPIC
+CFLAGS += -D__FAVOR_BSD=1 -D__USE_MISC=1 -D_GNU_SOURCE=1
+CFLAGS += $(OPTFLAGS)
+OBJECTS = ip_reset_plug.o
+MODULES =
+
+TARGET = ip_reset_plug.so
+
+INCS = -I/opt/MESA/include/MESA
+
+
+.PHONY: all clean
+all: $(TARGET) cp
+
+$(TARGET) : $(OBJECTS)
+#ip_reset_plug.so: ip_reset_plug.o
+ #$(CCC) -o $@ -shared -fPIC $(CFLAGS) $^ $(MODULES) -lrdkafka -lcJSON -lMESA_handle_logger -lMESA_prof_load
+ $(CC) -o $@ -shared -fPIC $(CFLAGS) $^ $(MODULES) -lrdkafka -lcJSON -lMESA_handle_logger -lMESA_prof_load
+ #$(CC) -o $@ -shared -fPIC $(CFLAGS) $^ $(MODULES) -lrdkafka -lcjson -lMESA_handle_logger -lMESA_prof_load
+.c.o:
+ $(CC) -c -o $@ $(CFLAGS) -I. $(INCS) $<
+cp:
+ #cp $(TARGET) /home/wfm/sapp/plug/business/ip_reset_plug/
+ cp $(TARGET) /home/wfm/sapp/plug/business/ip_reset_plug/
+clean:
+ rm -f $(TARGET) $(OBJECTS)
+
diff --git a/ip_reset_plug.c b/ip_reset_plug.c
new file mode 100644
index 0000000..0dc1a1d
--- /dev/null
+++ b/ip_reset_plug.c
@@ -0,0 +1,572 @@
+/*************************
+
+* File Name :ip_reset_plug.c
+* Author:wfm
+* Date:2019-02-28
+
+*add new mac_addr 2019-07-09
+*add search mac_addr,and filter 10.IP 2019-07-26
+*add multi thread by 2019-07-29
+**************************/
+//#define __FAVOR_BSD
+#include "ip_reset_plug.h"
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+static int run = 1;
+static void stop(int sig){
+ run = 0;
+ fclose(stdin);
+}
+const char * ipreset_version_VERSION_20190729 = "";
+const char *module_name = "IPRESET";
+common_module_t common;
+
+static void dr_msg_cb(rd_kafka_t *rk,const rd_kafka_message_t *rkmessage, void *opaque) {
+ if (rkmessage->err)
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"Message delivery failed: %s\n",rd_kafka_err2str(rkmessage->err));
+ else
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_DEBUG,module_name,"Message delivered (%zd bytes, partition %"PRId32")\n",rkmessage->len,rkmessage->partition);
+
+}
+int init_kafka()
+{
+ rd_kafka_conf_t *conf; /*临时配置对象*/
+ conf = rd_kafka_conf_new();
+
+/* if (rd_kafka_conf_set(conf, "bootstrap.servers", common.kafka_servers, common.errstr, sizeof(common.errstr)) != RD_KAFKA_CONF_OK)
+ {
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"rd_kafka_conf_new is failed ,%s",common.errstr);
+ return -1;
+ }
+ common.kafka_producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, common.errstr, sizeof(common.errstr));
+ if(!common.kafka_producer)
+ {
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"kafka_producer is failed, %s",common.errstr);
+ return -1;
+ }
+*/
+ common.kafka_producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, common.errstr, sizeof(common.errstr)); //this is line
+ if(!common.kafka_producer)
+ {
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"kafka_producer is failed, %s",common.errstr);
+ return -1;
+ }
+ if (rd_kafka_brokers_add(common.kafka_producer, common.kafka_servers) == 0)
+ {
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"rd_kafka_conf_new is failed ,%s",common.errstr);
+ return -1;
+ }
+
+ common.rd_kafka_topic = rd_kafka_topic_new(common.kafka_producer,common.kafka_topic,NULL);
+ if(!common.rd_kafka_topic)
+ {
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"Failed to create topic object: %s",common.errstr);
+ rd_kafka_destroy(common.kafka_producer);
+ return -1;
+ }
+
+ rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_DEBUG,module_name,"set callback func success");
+
+ return 0;
+}
+
+int kafka_destroy()
+{
+ if (common.rd_kafka_topic == NULL)
+ {
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"rd_kafka_topic is NULL: %s",common.errstr);
+ return -1;
+ }
+ if (common.kafka_producer == NULL)
+ {
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"rd_kafka_t is NULL: %s",common.errstr);
+ return -1;
+ }
+ rd_kafka_topic_destroy(common.kafka_producer);
+ rd_kafka_destroy(common.rd_kafka_topic);
+
+ return 0;
+}
+static int create_path (char *path)
+{
+ if(access(path, 0) == -1)
+ {
+ if(mkdir(path, 0777))
+ {
+ printf("cannnot create path:%s",path);
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"cannot create path:%s",path);
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+static char *jump_over_char (char *src, char c)
+{
+ while (*src == c) src ++;
+
+ return src;
+}
+
+int create_path_p (char *path)
+{
+ char *pos = NULL;
+ char *dest = NULL;
+ char tmp[128] = {0};
+ int len = 0;
+
+ dest = path;
+ dest = jump_over_char (dest, '/');
+ while ((dest != NULL) && ((pos = strchr (dest, '/')) != NULL))
+ {
+ len = pos - path;
+ strncpy (tmp, path, len);
+ tmp[len] = '\0';
+
+ if (create_path (tmp) < 0)
+ {
+ return -1;
+ }
+
+ dest = pos +1;
+ }
+
+ if (*dest != '\0')
+ {
+ if (create_path (path) < 0)
+ {
+ return -1;
+ }
+ }
+ return 0;
+}
+
+void check_file_path (char *file)
+{
+ char path[MAX_PATH_LENGTH] = {0};
+ char *pos = NULL;
+
+ pos = strrchr (file, '/');
+ if (pos != NULL)
+ {
+ strncpy (path, file, pos - file);
+ create_path_p (path);
+ }
+}
+/*int get_macip_map()
+{
+ char line[1000];
+ char macaddr[1000],tmp[64];
+ int i=0;
+ FILE *mapfile=fopen("./resetconf/macip.conf","r");
+ while(fgets(line,sizeof(line),mapfile))
+ {
+ sscanf(line,"%s %s",macaddr,tmp);
+ strcpy(common.macip[i],macaddr);
+ strcpy(common.cljip[i],tmp);
+ i++;
+ }
+ fclose(mapfile);
+ return 0;
+}
+int get_clj_ip(char mac_addr[30])
+{
+ int j;
+ for(j=0;j<=strlen(common.macip);j++)
+ {
+ if(strcmp(common.macip[j],mac_addr)==0)
+ {
+ printf("%s\n",common.cljip[j]);
+ strcpy(common.clj_ip,common.cljip[j]);
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_DEBUG,module_name,"The mac_addr map cljip is %s",common.clj_ip);
+ break;
+ }
+ }
+ return 0;
+}*/
+int get_macip_map()
+{
+ int ret=0,num=0;
+ int opt_int;
+
+ common.hashset = (struct hash_set_t *)malloc(sizeof(struct hash_set_t) * HASH_THREAD_NUM);
+ if(NULL == common.hashset )
+ {
+ //to do!
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,"[GET_IP]","common.hashset is failed!");
+
+ return -1;
+ }
+ memset(common.hashset , 0, sizeof(struct hash_set_t) * HASH_THREAD_NUM);
+
+ for(num=0;num<HASH_THREAD_NUM;num++){
+ common.hashset[num].htable =NULL;
+ common.hashset[num].htable = MESA_htable_born();
+ //assert(common.htable != NULL);
+ if(common.hashset[num].htable == NULL){
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,"[GET_IP]","MESA_htable_born error!");
+ return -1;
+ }
+
+ opt_int = 0;
+ MESA_htable_set_opt(common.hashset[num].htable, MHO_THREAD_SAFE, &opt_int, sizeof(int));
+ // opt_int = 32; //line sapp/conf/main.conf threadnum=
+ opt_int = HASH_THREAD_NUM;
+ MESA_htable_set_opt(common.hashset[num].htable, MHO_MUTEX_NUM, &opt_int, sizeof(int));
+ opt_int = 0;
+ MESA_htable_set_opt(common.hashset[num].htable, MHO_EXPIRE_TIME, &opt_int, sizeof(int));
+ opt_int = HASH_ELIMINATE_ALGO_LRU;
+ MESA_htable_set_opt(common.hashset[num].htable, MHO_ELIMIMINATE_TYPE, &opt_int, sizeof(int));
+ opt_int = 0;
+ MESA_htable_set_opt(common.hashset[num].htable, MHO_SCREEN_PRINT_CTRL, &opt_int, sizeof(int));
+
+ ret = MESA_htable_mature(common.hashset[num].htable);
+ if(0 == ret){
+ printf("MESA_htable_mature success!\n");
+ }else{
+ printf("MESA_htable_mature error!\n");
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,"[GET_IP]","MESA_htable_mature error!");
+
+ return -1;
+ }
+ char line[512];
+ char macaddr[17],cljaddr[16];
+ FILE *mapfile=fopen("./resetconf/macip.conf","r");
+ int i=0;
+ char *cljipnew;
+ while(fgets(line,sizeof(line),mapfile))
+ {
+ sscanf(line,"%s %s",macaddr,cljaddr);
+ // printf("common.cljips is %s, ip %s\n",macaddr,cljaddr);
+ // printf("macaddr is %p,cljaddr is %p\n",macaddr,cljaddr);
+ cljipnew=strdup(cljaddr);
+ ret=MESA_htable_add(common.hashset[num].htable, macaddr, strlen(macaddr), cljipnew);
+ if(ret >=0){
+ ++i;
+ }
+ }
+ printf("MESA_htable_add is ok ,num is %d\n",i);
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,"[GET_IP]","MESA_htable_add is ok ,num is %d",i);
+ fclose(mapfile);
+ }
+/* cljip_t *mip_search = NULL;
+ for(j=0;j<i/2;j++)
+ {
+ // mip_search =(cljip_t *)MESA_htable_search(common.htable,common.macip[j], strlen(common.macip[j]));
+ char *mip_search =(char *)MESA_htable_search(common.htable,common.macip[j], strlen(common.macip[j]));
+ printf("common.macip[j] is %p, mip_searchis %p\n",common.macip[j],mip_search);
+ if(mip_search != NULL)
+ {
+ memcpy(cljip,mip_search,strlen(mip_search)+1);
+ printf("the search is ok,%s\n",cljip);
+ // printf("the common.macip[j] cljip is %s\n",mip_search->clj_ip);
+ }
+ }*/
+ return 0;
+}
+int get_clj_ip(char mac_addr[17],int thread_num)
+{
+ char *results;
+ results =(char *)MESA_htable_search(common.hashset[thread_num].htable,mac_addr, strlen(mac_addr));
+ if(results != NULL)
+ {
+ strcpy(common.clj_ip,results);
+ printf("result-cljip is %s\n",common.clj_ip);
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_DEBUG,"[GET_IP]","common.clj_ip is %s",common.clj_ip);
+
+ return 1;
+ }
+
+/* strcpy(common.cljaddr,result);
+ printf("result-cljip is %s\n",common.cljaddr);
+ memcpy(common.clj_ip,common.cljaddr,strlen(common.cljaddr));
+ common.clj_ip[strlen(common.cljaddr)]='\0';
+ */
+ return 0;
+}
+int ipreset_init()
+{
+ int ret=0,log_level = 0,batch_send_number=0,send_types=0;
+ char log_path[MAX_PATH_LENGTH] = "";
+ char kafka_servers[MAX_LENGTH] = "";
+ char kafka_topic[MAX_PATH_LENGTH] = "";
+ memset(&common,0,sizeof(common));
+
+ ret = MESA_load_profile_string_nodef (IPRESET_CONFIG_FILE, "KAFKA", "kafka_bootstrap_servers", kafka_servers, MAX_LENGTH);
+ ret = MESA_load_profile_string_nodef (IPRESET_CONFIG_FILE, "KAFKA", "kafka_bootstrap_topic", kafka_topic, MAX_PATH_LENGTH);
+ ret = MESA_load_profile_string_def (IPRESET_CONFIG_FILE, "CONFIG", "log_path", log_path, MAX_PATH_LENGTH,"./ipresetlog/ipreset.log");
+ ret = MESA_load_profile_int_def (IPRESET_CONFIG_FILE, "CONFIG", "log_level", &log_level, RLOG_LV_INFO);
+ ret = MESA_load_profile_int_def (IPRESET_CONFIG_FILE, "CONFIG", "batch_send_number", &batch_send_number,100);
+ ret = MESA_load_profile_int_def (IPRESET_CONFIG_FILE, "CONFIG", "send_types", &send_types,0);
+
+ check_file_path (log_path);
+
+ sprintf (common.kafka_servers, "%s",kafka_servers);
+ sprintf(common.kafka_topic,"%s",kafka_topic);
+ //strcpy(common->kafka_servers, kafka_servers);
+ //strcpy(common->kafka_topic,kafka_topic);
+ common.batch_send_number=batch_send_number;
+ common.send_types=send_types;
+ common.log_handle = MESA_create_runtime_log_handle(log_path,log_level);
+ if(ret < 0){
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,"[INIT]","ipreset read configfile failed");
+ assert(0);
+ }
+
+ ret = init_kafka();
+ if(ret < 0){
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,"[INIT]","ipreset plug init failed");
+ return -1;
+ }
+ else
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_DEBUG,"[INIT]","ipreset init_kafka() ok!");
+
+// common.htable =NULL;
+ ret = get_macip_map();
+ if(ret<0){
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,"[INIT]","get_macip_map() failed!");
+ return -1;
+ }
+
+ return 0;
+}
+/*void __free_data(void *data)
+{
+ free(data);
+ data = NULL;
+*/
+void ipreset_destroy()
+{
+ rd_kafka_flush(common.kafka_producer, 10 * 1000);
+ //free(rkmessages);
+ kafka_destroy();
+ //MESA_handle_runtime_log(log_handle,log_level,module_name,"%s \n","ipreset plug destroy successfully");
+ int num=0;
+ for(num=0;num<HASH_THREAD_NUM;num++){
+ MESA_htable_destroy(common.hashset[num].htable,NULL);
+ }
+
+}
+
+/*int get_found_time()
+{
+ time_t timep;
+ time(&timep);
+ ip_info.found_time = asctime(gmtime(&timep));
+ ip_info.found_time[strlen(ip_info.found_time) - 1] = 0;
+ return 0;
+}*/
+
+char* getJSONString(unsigned char src_ip[16], unsigned char dst_ip[16], UINT16 src_port, UINT16 dst_port,char* protocol)
+{
+ time_t t;
+ cJSON *json = cJSON_CreateObject();
+ cJSON_AddItemToObject(json, "cfg_id",cJSON_CreateNumber(0));
+ cJSON_AddItemToObject(json, "found_time",cJSON_CreateNumber(time(&t)));
+ cJSON_AddItemToObject(json, "recv_time",cJSON_CreateNumber(time(&t)));
+ cJSON_AddItemToObject(json, "protocol",cJSON_CreateString(protocol));
+ cJSON_AddItemToObject(json, "addr_type",cJSON_CreateNumber(4));
+ cJSON_AddItemToObject(json, "server_ip", cJSON_CreateString(dst_ip));
+ cJSON_AddItemToObject(json, "client_ip", cJSON_CreateString(src_ip));
+ cJSON_AddItemToObject(json, "server_port", cJSON_CreateNumber(dst_port));
+ cJSON_AddItemToObject(json, "client_port", cJSON_CreateNumber(src_port));
+ cJSON_AddItemToObject(json, "server_type",cJSON_CreateNumber(0));
+ cJSON_AddItemToObject(json, "entrance_id",cJSON_CreateNumber(0));
+ cJSON_AddItemToObject(json, "device_id",cJSON_CreateNumber(0));
+ cJSON_AddItemToObject(json, "direction",cJSON_CreateNumber(0));
+ cJSON_AddItemToObject(json, "stream_type",cJSON_CreateNumber(0));
+
+ cJSON_AddItemToObject(json, "clj_ip",cJSON_CreateString(common.clj_ip));
+ cJSON_AddItemToObject(json, "nest_addr_list",cJSON_CreateString(""));
+ cJSON_AddItemToObject(json, "user_region",cJSON_CreateString(""));
+ cJSON_AddItemToObject(json, "send_type",cJSON_CreateString(common.send_type));
+ char* buf = cJSON_PrintUnformatted(json);
+ cJSON_Delete(json);
+ return buf;
+}
+char ipreset_ip_entry(struct streaminfo *f_stream,unsigned char routedir,int thread_seq,struct ip * a_packet)
+{
+ struct layer_addr_ipv4* laddr_ipv4;
+ laddr_ipv4= f_stream->addr.ipv4;
+
+ unsigned char src_ip[16], dst_ip[16];
+ char* protocol;
+ unsigned short src_port, dst_port;
+ int ret=0,res=0;
+ unsigned char rec= APP_STATE_GIVEME;
+ inet_ntop(AF_INET, (void *)&(laddr_ipv4->saddr), src_ip, sizeof(src_ip));
+ inet_ntop(AF_INET, (void *)&(laddr_ipv4->daddr), dst_ip, sizeof(dst_ip));
+
+ //filter 10. IP
+ char *ips="10.0.0.1";
+ if(strncmp(src_ip,ips,3)!=0 || strncmp(dst_ip,ips,3)!=0){
+// if(!strncmp(src_ip,ips,3) || !strncmp(dst_ip,ips,3)){
+
+ char mac_addr[17];
+ void *raw_pkt_hdr;
+ ret =get_rawpkt_opt_from_streaminfo(f_stream, RAW_PKT_GET_DATA, &raw_pkt_hdr);
+ if(0 == ret){
+ struct ethhdr *eth_hdr = (struct ethhdr *)raw_pkt_hdr;
+ eth_hdr->h_source;
+ sprintf(mac_addr, "%02x:%02x:%02x:%02x:%02x:%02x",
+ (unsigned char)eth_hdr->h_source[0],
+ (unsigned char)eth_hdr->h_source[1],
+ (unsigned char)eth_hdr->h_source[2],
+ (unsigned char)eth_hdr->h_source[3],
+ (unsigned char)eth_hdr->h_source[4],
+ (unsigned char)eth_hdr->h_source[5]
+ );
+
+ /* sprintf(mac_addr, "%02x%02x-%02x%02x-%02x%02x",
+ (unsigned char)eth_hdr->h_source[0],
+ (unsigned char)eth_hdr->h_source[1],
+ (unsigned char)eth_hdr->h_source[2],
+ (unsigned char)eth_hdr->h_source[3],
+ (unsigned char)eth_hdr->h_source[4],
+ (unsigned char)eth_hdr->h_source[5]);
+ */
+ // printf("mac_addr is %s\n",mac_addr);
+ //map ip mac
+ res = get_clj_ip(mac_addr,thread_seq);
+ if(res<0){
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,module_name,"get_clj_ip is failed,res is %d",res);
+ return APP_STATE_DROPME;
+ }
+ }
+ else{
+ printf("The ret is error!\n");
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,module_name,"There is no mac_addr,ret is %d",ret);
+ }
+
+ if(a_packet->ip_p == 6)
+ {
+ protocol = "IPv4_TCP";
+ struct iphdr *ips = (struct iphdr*)a_packet;
+ struct tcphdr *tcps = (struct tcphdr*)((char*)a_packet+ips->ihl*4);
+ // if(tcphdra->th_flags&TH_RST) //rst
+ if(tcps->rst==1)
+ {
+ if(common.send_types==1 || common.send_types==0)
+ {
+ sprintf (common.send_type, "%s","reset");
+ //get port and send data
+ src_port = ntohs(tcps->source);
+ dst_port = ntohs(tcps->dest);
+
+ char* buf = getJSONString(src_ip, dst_ip, src_port, dst_port,protocol);
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,module_name,"reset cjson buf is %s",buf);
+ signal(SIGINT, stop);
+ if(buf !=NULL)
+ {
+ while(1)
+ {
+ ret = rd_kafka_produce(common.rd_kafka_topic,RD_KAFKA_PARTITION_UA,RD_KAFKA_MSG_F_FREE,buf,strlen(buf),NULL,0,NULL);
+ rd_kafka_poll(common.kafka_producer,0);
+ if(ret == 0 || rd_kafka_last_error()!= RD_KAFKA_RESP_ERR__QUEUE_FULL )
+ // if(ret == 0 )
+ break;
+ }
+ if(ret !=0)
+ free(buf);
+ rec = APP_STATE_GIVEME;
+ }
+ else{
+
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,module_name,"rd_kafka_producer TCP reset failed, %d",ret);
+ return APP_STATE_DROPME;
+ }
+ }
+ }
+ else if(tcps->syn==1 && tcps->ack==1)
+ {
+ if(common.send_types==2 || common.send_types==0)
+ {
+ sprintf (common.send_type, "%s","synack");
+ //get port and send data
+ src_port = ntohs(tcps->source);
+ dst_port = ntohs(tcps->dest);
+
+ char* buf = getJSONString(src_ip, dst_ip, src_port, dst_port,protocol);
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,module_name,"synack cjson buf is %s",buf);
+
+ signal(SIGINT, stop);
+ if(buf !=NULL)
+ {
+ while(1)
+ {
+ ret = rd_kafka_produce(common.rd_kafka_topic,RD_KAFKA_PARTITION_UA,RD_KAFKA_MSG_F_FREE,buf,strlen(buf),NULL,0,NULL);
+ rd_kafka_poll(common.kafka_producer,0);
+ if(ret == 0 || rd_kafka_last_error()!= RD_KAFKA_RESP_ERR__QUEUE_FULL )
+ break;
+ }
+ if(ret !=0)
+ free(buf);
+ rec = APP_STATE_GIVEME;
+ }
+ else{
+
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,module_name,"rd_kafka_producer TCP syn+ack failed, %d",ret);
+ return APP_STATE_DROPME;
+ }
+ }
+ }
+ }
+ else if(a_packet->ip_p == 17)
+ {
+ if(common.send_types==3 || common.send_types==0)
+ {
+ protocol = "IPv4_UDP";
+ struct iphdr *ips = (struct iphdr*)a_packet;
+ struct udphdr *udps = (struct udphdr*)((char*)a_packet+ips->ihl*4);
+ src_port = ntohs(udps->source);
+ dst_port = ntohs(udps->dest);
+
+ signal(SIGINT, stop);
+ if(src_port == 53 || dst_port == 53)
+ {
+ sprintf (common.send_type, "%s","53port");
+ char* buf = getJSONString(src_ip, dst_ip, src_port, dst_port,protocol);
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,module_name,"53port cjson buf is %s",buf);
+ //insert into kafka
+ if(buf !=NULL)
+ {
+ while(1)
+ {
+ // ret = rd_kafka_produce(common.rd_kafka_topic,RD_KAFKA_PARTITION_UA,RD_KAFKA_MSG_F_COPY,buf,strlen(buf),NULL,0,NULL);
+ ret = rd_kafka_produce(common.rd_kafka_topic,RD_KAFKA_PARTITION_UA,RD_KAFKA_MSG_F_FREE,buf,strlen(buf),NULL,0,NULL);
+ // if(ret == -1 )break;
+ rd_kafka_poll(common.kafka_producer,0);
+ if(ret == 0 || rd_kafka_last_error()!= RD_KAFKA_RESP_ERR__QUEUE_FULL )
+ //printf("rd_kafka_last_error is full! and ret=0");
+ break;
+
+// if(ret == 0 )break;
+ // usleep(1000);
+ }
+ if(ret !=0)
+ free(buf);
+ rec = APP_STATE_GIVEME;
+ }
+ else{
+
+ MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,module_name,"rd_kafka_producer UDP failed, %d",ret);
+ return APP_STATE_DROPME;
+ }
+ }
+ }
+ }
+ else ;
+
+ }
+ return rec;
+}
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/ip_reset_plug.h b/ip_reset_plug.h
new file mode 100644
index 0000000..8647b40
--- /dev/null
+++ b/ip_reset_plug.h
@@ -0,0 +1,62 @@
+#include <stdio.h>
+#include <signal.h>
+#include <string.h>
+#include <stdlib.h>
+#include <netinet/in.h>
+#include <time.h>
+#include <stdarg.h>
+#include <rdkafka.h>
+#include <stream.h>
+#include <cJSON.h>
+#include "MESA_prof_load.h"
+#include "MESA_handle_logger.h"
+#include <netdb.h>
+#include <netinet/ip.h>
+#include <netinet/tcp.h>
+#include <netinet/udp.h>
+#include <linux/if_ether.h>
+#include "MESA_htable.h"
+#include <assert.h>
+
+#define IPRESET_CONFIG_FILE "./resetconf/ip_reset_plug.conf"
+
+#define SIZE 128
+#define BUFSIZE 1024
+#define STRSIZE 1024
+#define MAX_PATH_LENGTH 256
+#define MAX_LENGTH 4096
+//#define MAX_MACIP_NUM 4096
+
+#define HASH_THREAD_NUM 32
+//#define HASH_THREAD_NUM 10
+
+typedef struct hash_set_t{
+ int hash_num;
+ MESA_htable_handle htable;
+}hash_set_t;
+
+typedef struct common_module_t {
+ void *log_handle;
+ char config_path[MAX_PATH_LENGTH];
+ char log_path[MAX_PATH_LENGTH];
+ char kafka_servers[MAX_LENGTH];
+ char kafka_topic[MAX_LENGTH];
+ int batch_send_number;
+ rd_kafka_t *kafka_producer; /*Producer instance handle*/
+ rd_kafka_topic_t *rd_kafka_topic; /*topic对象*/
+ char errstr[512];
+ char buf[512];
+ char clj_ip[16];
+ char send_type[32];
+ int send_types; /*rst:1,syn+ack:2,53port:3,all:0*/
+// char macip[MAX_MACIP_NUM][17];
+ // char cljip[MAX_MACIP_NUM][32];
+
+// MESA_htable_handle htable; //add by 20190722,htable
+ hash_set_t *hashset;
+}common_module_t;
+
+
+int ipreset_init();
+void ipreset_destroy();
+char ipreset_ip_entry(struct streaminfo *f_stream,unsigned char routedir,int thread_seq,struct ip * a_packet);
diff --git a/macip.conf b/macip.conf
new file mode 100644
index 0000000..91988d0
--- /dev/null
+++ b/macip.conf
@@ -0,0 +1,27 @@
+20:28:3e:e1:f4:9e 10.172.207.2
+20:28:3e:e2:0b:12 10.172.207.3
+20:28:3e:e1:f4:ae 10.172.207.4
+20:28:3e:e1:f4:32 10.172.207.5
+20:28:3e:7b:20:0f 10.172.207.6
+20:28:3e:e1:f4:7e 10.172.207.7
+20:28:3e:e2:0b:22 10.172.207.8
+20:28:3e:e1:f4:ac 10.172.207.9
+20:28:3e:e1:f4:52 10.172.207.10
+20:28:3e:e1:f4:7a 10.172.207.11
+20:28:3e:e1:f4:7c 10.172.207.12
+20:28:3e:e1:f4:54 10.172.207.13
+20:28:3e:e1:f4:c4 10.172.207.14
+20:28:3e:e1:f4:2e 10.172.207.15
+20:28:3e:e2:0b:1e 10.172.207.16
+20:28:3e:e1:f4:c2 10.172.207.17
+20:28:3e:e1:f4:ec 10.172.207.18
+20:28:3e:e1:f4:6e 10.172.207.19
+20:28:3e:e1:f5:82 10.172.207.20
+20:28:3e:e1:f4:dc 10.172.207.21
+20:28:3e:e2:0b:2c 10.172.207.22
+20:28:3e:e1:f4:0c 10.172.207.23
+20:28:3e:e1:f4:a2 10.172.207.24
+20:28:3e:e1:f4:6c 10.172.207.25
+20:28:3e:e1:f4:ce 10.172.207.26
+20:28:3e:e1:f4:d2 10.172.207.27
+20:28:3e:e1:f5:32 10.172.207.28
diff --git a/readme.txt b/readme.txt
new file mode 100644
index 0000000..bb8ec6c
--- /dev/null
+++ b/readme.txt
@@ -0,0 +1,16 @@
+add by 20190705
+1. send three types of quaternions to kafka ,need one topic
+a. rst =1
+b. syn=1 and ack=1
+c. udp 53 port
+
+add by 20190708
+1. edit clj_ip
+Mapping from MAC to IP
+
+match to ip according to mac address
+2. filter 10. IP from sapp pkt
+
+add by 20190729
+1. add MESA_htable to match
+2. add multi-thread to deal the sapp pkt \ No newline at end of file