diff options
| -rw-r--r-- | Makefile | 29 | ||||
| -rw-r--r-- | ip_reset_plug.c | 572 | ||||
| -rw-r--r-- | ip_reset_plug.h | 62 | ||||
| -rw-r--r-- | macip.conf | 27 | ||||
| -rw-r--r-- | readme.txt | 16 |
5 files changed, 706 insertions, 0 deletions
diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..d100b7d --- /dev/null +++ b/Makefile @@ -0,0 +1,29 @@ +CC = gcc +CCC = g++ +CFLAGS = -g -fPIC +CFLAGS += -D__FAVOR_BSD=1 -D__USE_MISC=1 -D_GNU_SOURCE=1 +CFLAGS += $(OPTFLAGS) +OBJECTS = ip_reset_plug.o +MODULES = + +TARGET = ip_reset_plug.so + +INCS = -I/opt/MESA/include/MESA + + +.PHONY: all clean +all: $(TARGET) cp + +$(TARGET) : $(OBJECTS) +#ip_reset_plug.so: ip_reset_plug.o + #$(CCC) -o $@ -shared -fPIC $(CFLAGS) $^ $(MODULES) -lrdkafka -lcJSON -lMESA_handle_logger -lMESA_prof_load + $(CC) -o $@ -shared -fPIC $(CFLAGS) $^ $(MODULES) -lrdkafka -lcJSON -lMESA_handle_logger -lMESA_prof_load + #$(CC) -o $@ -shared -fPIC $(CFLAGS) $^ $(MODULES) -lrdkafka -lcjson -lMESA_handle_logger -lMESA_prof_load +.c.o: + $(CC) -c -o $@ $(CFLAGS) -I. $(INCS) $< +cp: + #cp $(TARGET) /home/wfm/sapp/plug/business/ip_reset_plug/ + cp $(TARGET) /home/wfm/sapp/plug/business/ip_reset_plug/ +clean: + rm -f $(TARGET) $(OBJECTS) + diff --git a/ip_reset_plug.c b/ip_reset_plug.c new file mode 100644 index 0000000..0dc1a1d --- /dev/null +++ b/ip_reset_plug.c @@ -0,0 +1,572 @@ +/************************* + +* File Name :ip_reset_plug.c +* Author:wfm +* Date:2019-02-28 + +*add new mac_addr 2019-07-09 +*add search mac_addr,and filter 10.IP 2019-07-26 +*add multi thread by 2019-07-29 +**************************/ +//#define __FAVOR_BSD +#include "ip_reset_plug.h" + +#ifdef __cplusplus +extern "C" +{ +#endif + +static int run = 1; +static void stop(int sig){ + run = 0; + fclose(stdin); +} +const char * ipreset_version_VERSION_20190729 = ""; +const char *module_name = "IPRESET"; +common_module_t common; + +static void dr_msg_cb(rd_kafka_t *rk,const rd_kafka_message_t *rkmessage, void *opaque) { + if (rkmessage->err) + MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"Message delivery failed: %s\n",rd_kafka_err2str(rkmessage->err)); + else + MESA_handle_runtime_log(common.log_handle,RLOG_LV_DEBUG,module_name,"Message delivered (%zd bytes, partition %"PRId32")\n",rkmessage->len,rkmessage->partition); + +} +int init_kafka() +{ + rd_kafka_conf_t *conf; /*临时配置对象*/ + conf = rd_kafka_conf_new(); + +/* if (rd_kafka_conf_set(conf, "bootstrap.servers", common.kafka_servers, common.errstr, sizeof(common.errstr)) != RD_KAFKA_CONF_OK) + { + MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"rd_kafka_conf_new is failed ,%s",common.errstr); + return -1; + } + common.kafka_producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, common.errstr, sizeof(common.errstr)); + if(!common.kafka_producer) + { + MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"kafka_producer is failed, %s",common.errstr); + return -1; + } +*/ + common.kafka_producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, common.errstr, sizeof(common.errstr)); //this is line + if(!common.kafka_producer) + { + MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"kafka_producer is failed, %s",common.errstr); + return -1; + } + if (rd_kafka_brokers_add(common.kafka_producer, common.kafka_servers) == 0) + { + MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"rd_kafka_conf_new is failed ,%s",common.errstr); + return -1; + } + + common.rd_kafka_topic = rd_kafka_topic_new(common.kafka_producer,common.kafka_topic,NULL); + if(!common.rd_kafka_topic) + { + MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"Failed to create topic object: %s",common.errstr); + rd_kafka_destroy(common.kafka_producer); + return -1; + } + + rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); + MESA_handle_runtime_log(common.log_handle,RLOG_LV_DEBUG,module_name,"set callback func success"); + + return 0; +} + +int kafka_destroy() +{ + if (common.rd_kafka_topic == NULL) + { + MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"rd_kafka_topic is NULL: %s",common.errstr); + return -1; + } + if (common.kafka_producer == NULL) + { + MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"rd_kafka_t is NULL: %s",common.errstr); + return -1; + } + rd_kafka_topic_destroy(common.kafka_producer); + rd_kafka_destroy(common.rd_kafka_topic); + + return 0; +} +static int create_path (char *path) +{ + if(access(path, 0) == -1) + { + if(mkdir(path, 0777)) + { + printf("cannnot create path:%s",path); + MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"cannot create path:%s",path); + return -1; + } + } + + return 0; +} + +static char *jump_over_char (char *src, char c) +{ + while (*src == c) src ++; + + return src; +} + +int create_path_p (char *path) +{ + char *pos = NULL; + char *dest = NULL; + char tmp[128] = {0}; + int len = 0; + + dest = path; + dest = jump_over_char (dest, '/'); + while ((dest != NULL) && ((pos = strchr (dest, '/')) != NULL)) + { + len = pos - path; + strncpy (tmp, path, len); + tmp[len] = '\0'; + + if (create_path (tmp) < 0) + { + return -1; + } + + dest = pos +1; + } + + if (*dest != '\0') + { + if (create_path (path) < 0) + { + return -1; + } + } + return 0; +} + +void check_file_path (char *file) +{ + char path[MAX_PATH_LENGTH] = {0}; + char *pos = NULL; + + pos = strrchr (file, '/'); + if (pos != NULL) + { + strncpy (path, file, pos - file); + create_path_p (path); + } +} +/*int get_macip_map() +{ + char line[1000]; + char macaddr[1000],tmp[64]; + int i=0; + FILE *mapfile=fopen("./resetconf/macip.conf","r"); + while(fgets(line,sizeof(line),mapfile)) + { + sscanf(line,"%s %s",macaddr,tmp); + strcpy(common.macip[i],macaddr); + strcpy(common.cljip[i],tmp); + i++; + } + fclose(mapfile); + return 0; +} +int get_clj_ip(char mac_addr[30]) +{ + int j; + for(j=0;j<=strlen(common.macip);j++) + { + if(strcmp(common.macip[j],mac_addr)==0) + { + printf("%s\n",common.cljip[j]); + strcpy(common.clj_ip,common.cljip[j]); + MESA_handle_runtime_log(common.log_handle,RLOG_LV_DEBUG,module_name,"The mac_addr map cljip is %s",common.clj_ip); + break; + } + } + return 0; +}*/ +int get_macip_map() +{ + int ret=0,num=0; + int opt_int; + + common.hashset = (struct hash_set_t *)malloc(sizeof(struct hash_set_t) * HASH_THREAD_NUM); + if(NULL == common.hashset ) + { + //to do! + MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,"[GET_IP]","common.hashset is failed!"); + + return -1; + } + memset(common.hashset , 0, sizeof(struct hash_set_t) * HASH_THREAD_NUM); + + for(num=0;num<HASH_THREAD_NUM;num++){ + common.hashset[num].htable =NULL; + common.hashset[num].htable = MESA_htable_born(); + //assert(common.htable != NULL); + if(common.hashset[num].htable == NULL){ + MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,"[GET_IP]","MESA_htable_born error!"); + return -1; + } + + opt_int = 0; + MESA_htable_set_opt(common.hashset[num].htable, MHO_THREAD_SAFE, &opt_int, sizeof(int)); + // opt_int = 32; //line sapp/conf/main.conf threadnum= + opt_int = HASH_THREAD_NUM; + MESA_htable_set_opt(common.hashset[num].htable, MHO_MUTEX_NUM, &opt_int, sizeof(int)); + opt_int = 0; + MESA_htable_set_opt(common.hashset[num].htable, MHO_EXPIRE_TIME, &opt_int, sizeof(int)); + opt_int = HASH_ELIMINATE_ALGO_LRU; + MESA_htable_set_opt(common.hashset[num].htable, MHO_ELIMIMINATE_TYPE, &opt_int, sizeof(int)); + opt_int = 0; + MESA_htable_set_opt(common.hashset[num].htable, MHO_SCREEN_PRINT_CTRL, &opt_int, sizeof(int)); + + ret = MESA_htable_mature(common.hashset[num].htable); + if(0 == ret){ + printf("MESA_htable_mature success!\n"); + }else{ + printf("MESA_htable_mature error!\n"); + MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,"[GET_IP]","MESA_htable_mature error!"); + + return -1; + } + char line[512]; + char macaddr[17],cljaddr[16]; + FILE *mapfile=fopen("./resetconf/macip.conf","r"); + int i=0; + char *cljipnew; + while(fgets(line,sizeof(line),mapfile)) + { + sscanf(line,"%s %s",macaddr,cljaddr); + // printf("common.cljips is %s, ip %s\n",macaddr,cljaddr); + // printf("macaddr is %p,cljaddr is %p\n",macaddr,cljaddr); + cljipnew=strdup(cljaddr); + ret=MESA_htable_add(common.hashset[num].htable, macaddr, strlen(macaddr), cljipnew); + if(ret >=0){ + ++i; + } + } + printf("MESA_htable_add is ok ,num is %d\n",i); + MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,"[GET_IP]","MESA_htable_add is ok ,num is %d",i); + fclose(mapfile); + } +/* cljip_t *mip_search = NULL; + for(j=0;j<i/2;j++) + { + // mip_search =(cljip_t *)MESA_htable_search(common.htable,common.macip[j], strlen(common.macip[j])); + char *mip_search =(char *)MESA_htable_search(common.htable,common.macip[j], strlen(common.macip[j])); + printf("common.macip[j] is %p, mip_searchis %p\n",common.macip[j],mip_search); + if(mip_search != NULL) + { + memcpy(cljip,mip_search,strlen(mip_search)+1); + printf("the search is ok,%s\n",cljip); + // printf("the common.macip[j] cljip is %s\n",mip_search->clj_ip); + } + }*/ + return 0; +} +int get_clj_ip(char mac_addr[17],int thread_num) +{ + char *results; + results =(char *)MESA_htable_search(common.hashset[thread_num].htable,mac_addr, strlen(mac_addr)); + if(results != NULL) + { + strcpy(common.clj_ip,results); + printf("result-cljip is %s\n",common.clj_ip); + MESA_handle_runtime_log(common.log_handle,RLOG_LV_DEBUG,"[GET_IP]","common.clj_ip is %s",common.clj_ip); + + return 1; + } + +/* strcpy(common.cljaddr,result); + printf("result-cljip is %s\n",common.cljaddr); + memcpy(common.clj_ip,common.cljaddr,strlen(common.cljaddr)); + common.clj_ip[strlen(common.cljaddr)]='\0'; + */ + return 0; +} +int ipreset_init() +{ + int ret=0,log_level = 0,batch_send_number=0,send_types=0; + char log_path[MAX_PATH_LENGTH] = ""; + char kafka_servers[MAX_LENGTH] = ""; + char kafka_topic[MAX_PATH_LENGTH] = ""; + memset(&common,0,sizeof(common)); + + ret = MESA_load_profile_string_nodef (IPRESET_CONFIG_FILE, "KAFKA", "kafka_bootstrap_servers", kafka_servers, MAX_LENGTH); + ret = MESA_load_profile_string_nodef (IPRESET_CONFIG_FILE, "KAFKA", "kafka_bootstrap_topic", kafka_topic, MAX_PATH_LENGTH); + ret = MESA_load_profile_string_def (IPRESET_CONFIG_FILE, "CONFIG", "log_path", log_path, MAX_PATH_LENGTH,"./ipresetlog/ipreset.log"); + ret = MESA_load_profile_int_def (IPRESET_CONFIG_FILE, "CONFIG", "log_level", &log_level, RLOG_LV_INFO); + ret = MESA_load_profile_int_def (IPRESET_CONFIG_FILE, "CONFIG", "batch_send_number", &batch_send_number,100); + ret = MESA_load_profile_int_def (IPRESET_CONFIG_FILE, "CONFIG", "send_types", &send_types,0); + + check_file_path (log_path); + + sprintf (common.kafka_servers, "%s",kafka_servers); + sprintf(common.kafka_topic,"%s",kafka_topic); + //strcpy(common->kafka_servers, kafka_servers); + //strcpy(common->kafka_topic,kafka_topic); + common.batch_send_number=batch_send_number; + common.send_types=send_types; + common.log_handle = MESA_create_runtime_log_handle(log_path,log_level); + if(ret < 0){ + MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,"[INIT]","ipreset read configfile failed"); + assert(0); + } + + ret = init_kafka(); + if(ret < 0){ + MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,"[INIT]","ipreset plug init failed"); + return -1; + } + else + MESA_handle_runtime_log(common.log_handle,RLOG_LV_DEBUG,"[INIT]","ipreset init_kafka() ok!"); + +// common.htable =NULL; + ret = get_macip_map(); + if(ret<0){ + MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,"[INIT]","get_macip_map() failed!"); + return -1; + } + + return 0; +} +/*void __free_data(void *data) +{ + free(data); + data = NULL; +*/ +void ipreset_destroy() +{ + rd_kafka_flush(common.kafka_producer, 10 * 1000); + //free(rkmessages); + kafka_destroy(); + //MESA_handle_runtime_log(log_handle,log_level,module_name,"%s \n","ipreset plug destroy successfully"); + int num=0; + for(num=0;num<HASH_THREAD_NUM;num++){ + MESA_htable_destroy(common.hashset[num].htable,NULL); + } + +} + +/*int get_found_time() +{ + time_t timep; + time(&timep); + ip_info.found_time = asctime(gmtime(&timep)); + ip_info.found_time[strlen(ip_info.found_time) - 1] = 0; + return 0; +}*/ + +char* getJSONString(unsigned char src_ip[16], unsigned char dst_ip[16], UINT16 src_port, UINT16 dst_port,char* protocol) +{ + time_t t; + cJSON *json = cJSON_CreateObject(); + cJSON_AddItemToObject(json, "cfg_id",cJSON_CreateNumber(0)); + cJSON_AddItemToObject(json, "found_time",cJSON_CreateNumber(time(&t))); + cJSON_AddItemToObject(json, "recv_time",cJSON_CreateNumber(time(&t))); + cJSON_AddItemToObject(json, "protocol",cJSON_CreateString(protocol)); + cJSON_AddItemToObject(json, "addr_type",cJSON_CreateNumber(4)); + cJSON_AddItemToObject(json, "server_ip", cJSON_CreateString(dst_ip)); + cJSON_AddItemToObject(json, "client_ip", cJSON_CreateString(src_ip)); + cJSON_AddItemToObject(json, "server_port", cJSON_CreateNumber(dst_port)); + cJSON_AddItemToObject(json, "client_port", cJSON_CreateNumber(src_port)); + cJSON_AddItemToObject(json, "server_type",cJSON_CreateNumber(0)); + cJSON_AddItemToObject(json, "entrance_id",cJSON_CreateNumber(0)); + cJSON_AddItemToObject(json, "device_id",cJSON_CreateNumber(0)); + cJSON_AddItemToObject(json, "direction",cJSON_CreateNumber(0)); + cJSON_AddItemToObject(json, "stream_type",cJSON_CreateNumber(0)); + + cJSON_AddItemToObject(json, "clj_ip",cJSON_CreateString(common.clj_ip)); + cJSON_AddItemToObject(json, "nest_addr_list",cJSON_CreateString("")); + cJSON_AddItemToObject(json, "user_region",cJSON_CreateString("")); + cJSON_AddItemToObject(json, "send_type",cJSON_CreateString(common.send_type)); + char* buf = cJSON_PrintUnformatted(json); + cJSON_Delete(json); + return buf; +} +char ipreset_ip_entry(struct streaminfo *f_stream,unsigned char routedir,int thread_seq,struct ip * a_packet) +{ + struct layer_addr_ipv4* laddr_ipv4; + laddr_ipv4= f_stream->addr.ipv4; + + unsigned char src_ip[16], dst_ip[16]; + char* protocol; + unsigned short src_port, dst_port; + int ret=0,res=0; + unsigned char rec= APP_STATE_GIVEME; + inet_ntop(AF_INET, (void *)&(laddr_ipv4->saddr), src_ip, sizeof(src_ip)); + inet_ntop(AF_INET, (void *)&(laddr_ipv4->daddr), dst_ip, sizeof(dst_ip)); + + //filter 10. IP + char *ips="10.0.0.1"; + if(strncmp(src_ip,ips,3)!=0 || strncmp(dst_ip,ips,3)!=0){ +// if(!strncmp(src_ip,ips,3) || !strncmp(dst_ip,ips,3)){ + + char mac_addr[17]; + void *raw_pkt_hdr; + ret =get_rawpkt_opt_from_streaminfo(f_stream, RAW_PKT_GET_DATA, &raw_pkt_hdr); + if(0 == ret){ + struct ethhdr *eth_hdr = (struct ethhdr *)raw_pkt_hdr; + eth_hdr->h_source; + sprintf(mac_addr, "%02x:%02x:%02x:%02x:%02x:%02x", + (unsigned char)eth_hdr->h_source[0], + (unsigned char)eth_hdr->h_source[1], + (unsigned char)eth_hdr->h_source[2], + (unsigned char)eth_hdr->h_source[3], + (unsigned char)eth_hdr->h_source[4], + (unsigned char)eth_hdr->h_source[5] + ); + + /* sprintf(mac_addr, "%02x%02x-%02x%02x-%02x%02x", + (unsigned char)eth_hdr->h_source[0], + (unsigned char)eth_hdr->h_source[1], + (unsigned char)eth_hdr->h_source[2], + (unsigned char)eth_hdr->h_source[3], + (unsigned char)eth_hdr->h_source[4], + (unsigned char)eth_hdr->h_source[5]); + */ + // printf("mac_addr is %s\n",mac_addr); + //map ip mac + res = get_clj_ip(mac_addr,thread_seq); + if(res<0){ + MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,module_name,"get_clj_ip is failed,res is %d",res); + return APP_STATE_DROPME; + } + } + else{ + printf("The ret is error!\n"); + MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,module_name,"There is no mac_addr,ret is %d",ret); + } + + if(a_packet->ip_p == 6) + { + protocol = "IPv4_TCP"; + struct iphdr *ips = (struct iphdr*)a_packet; + struct tcphdr *tcps = (struct tcphdr*)((char*)a_packet+ips->ihl*4); + // if(tcphdra->th_flags&TH_RST) //rst + if(tcps->rst==1) + { + if(common.send_types==1 || common.send_types==0) + { + sprintf (common.send_type, "%s","reset"); + //get port and send data + src_port = ntohs(tcps->source); + dst_port = ntohs(tcps->dest); + + char* buf = getJSONString(src_ip, dst_ip, src_port, dst_port,protocol); + MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,module_name,"reset cjson buf is %s",buf); + signal(SIGINT, stop); + if(buf !=NULL) + { + while(1) + { + ret = rd_kafka_produce(common.rd_kafka_topic,RD_KAFKA_PARTITION_UA,RD_KAFKA_MSG_F_FREE,buf,strlen(buf),NULL,0,NULL); + rd_kafka_poll(common.kafka_producer,0); + if(ret == 0 || rd_kafka_last_error()!= RD_KAFKA_RESP_ERR__QUEUE_FULL ) + // if(ret == 0 ) + break; + } + if(ret !=0) + free(buf); + rec = APP_STATE_GIVEME; + } + else{ + + MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,module_name,"rd_kafka_producer TCP reset failed, %d",ret); + return APP_STATE_DROPME; + } + } + } + else if(tcps->syn==1 && tcps->ack==1) + { + if(common.send_types==2 || common.send_types==0) + { + sprintf (common.send_type, "%s","synack"); + //get port and send data + src_port = ntohs(tcps->source); + dst_port = ntohs(tcps->dest); + + char* buf = getJSONString(src_ip, dst_ip, src_port, dst_port,protocol); + MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,module_name,"synack cjson buf is %s",buf); + + signal(SIGINT, stop); + if(buf !=NULL) + { + while(1) + { + ret = rd_kafka_produce(common.rd_kafka_topic,RD_KAFKA_PARTITION_UA,RD_KAFKA_MSG_F_FREE,buf,strlen(buf),NULL,0,NULL); + rd_kafka_poll(common.kafka_producer,0); + if(ret == 0 || rd_kafka_last_error()!= RD_KAFKA_RESP_ERR__QUEUE_FULL ) + break; + } + if(ret !=0) + free(buf); + rec = APP_STATE_GIVEME; + } + else{ + + MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,module_name,"rd_kafka_producer TCP syn+ack failed, %d",ret); + return APP_STATE_DROPME; + } + } + } + } + else if(a_packet->ip_p == 17) + { + if(common.send_types==3 || common.send_types==0) + { + protocol = "IPv4_UDP"; + struct iphdr *ips = (struct iphdr*)a_packet; + struct udphdr *udps = (struct udphdr*)((char*)a_packet+ips->ihl*4); + src_port = ntohs(udps->source); + dst_port = ntohs(udps->dest); + + signal(SIGINT, stop); + if(src_port == 53 || dst_port == 53) + { + sprintf (common.send_type, "%s","53port"); + char* buf = getJSONString(src_ip, dst_ip, src_port, dst_port,protocol); + MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,module_name,"53port cjson buf is %s",buf); + //insert into kafka + if(buf !=NULL) + { + while(1) + { + // ret = rd_kafka_produce(common.rd_kafka_topic,RD_KAFKA_PARTITION_UA,RD_KAFKA_MSG_F_COPY,buf,strlen(buf),NULL,0,NULL); + ret = rd_kafka_produce(common.rd_kafka_topic,RD_KAFKA_PARTITION_UA,RD_KAFKA_MSG_F_FREE,buf,strlen(buf),NULL,0,NULL); + // if(ret == -1 )break; + rd_kafka_poll(common.kafka_producer,0); + if(ret == 0 || rd_kafka_last_error()!= RD_KAFKA_RESP_ERR__QUEUE_FULL ) + //printf("rd_kafka_last_error is full! and ret=0"); + break; + +// if(ret == 0 )break; + // usleep(1000); + } + if(ret !=0) + free(buf); + rec = APP_STATE_GIVEME; + } + else{ + + MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,module_name,"rd_kafka_producer UDP failed, %d",ret); + return APP_STATE_DROPME; + } + } + } + } + else ; + + } + return rec; +} + +#ifdef __cplusplus +} +#endif diff --git a/ip_reset_plug.h b/ip_reset_plug.h new file mode 100644 index 0000000..8647b40 --- /dev/null +++ b/ip_reset_plug.h @@ -0,0 +1,62 @@ +#include <stdio.h> +#include <signal.h> +#include <string.h> +#include <stdlib.h> +#include <netinet/in.h> +#include <time.h> +#include <stdarg.h> +#include <rdkafka.h> +#include <stream.h> +#include <cJSON.h> +#include "MESA_prof_load.h" +#include "MESA_handle_logger.h" +#include <netdb.h> +#include <netinet/ip.h> +#include <netinet/tcp.h> +#include <netinet/udp.h> +#include <linux/if_ether.h> +#include "MESA_htable.h" +#include <assert.h> + +#define IPRESET_CONFIG_FILE "./resetconf/ip_reset_plug.conf" + +#define SIZE 128 +#define BUFSIZE 1024 +#define STRSIZE 1024 +#define MAX_PATH_LENGTH 256 +#define MAX_LENGTH 4096 +//#define MAX_MACIP_NUM 4096 + +#define HASH_THREAD_NUM 32 +//#define HASH_THREAD_NUM 10 + +typedef struct hash_set_t{ + int hash_num; + MESA_htable_handle htable; +}hash_set_t; + +typedef struct common_module_t { + void *log_handle; + char config_path[MAX_PATH_LENGTH]; + char log_path[MAX_PATH_LENGTH]; + char kafka_servers[MAX_LENGTH]; + char kafka_topic[MAX_LENGTH]; + int batch_send_number; + rd_kafka_t *kafka_producer; /*Producer instance handle*/ + rd_kafka_topic_t *rd_kafka_topic; /*topic对象*/ + char errstr[512]; + char buf[512]; + char clj_ip[16]; + char send_type[32]; + int send_types; /*rst:1,syn+ack:2,53port:3,all:0*/ +// char macip[MAX_MACIP_NUM][17]; + // char cljip[MAX_MACIP_NUM][32]; + +// MESA_htable_handle htable; //add by 20190722,htable + hash_set_t *hashset; +}common_module_t; + + +int ipreset_init(); +void ipreset_destroy(); +char ipreset_ip_entry(struct streaminfo *f_stream,unsigned char routedir,int thread_seq,struct ip * a_packet); diff --git a/macip.conf b/macip.conf new file mode 100644 index 0000000..91988d0 --- /dev/null +++ b/macip.conf @@ -0,0 +1,27 @@ +20:28:3e:e1:f4:9e 10.172.207.2 +20:28:3e:e2:0b:12 10.172.207.3 +20:28:3e:e1:f4:ae 10.172.207.4 +20:28:3e:e1:f4:32 10.172.207.5 +20:28:3e:7b:20:0f 10.172.207.6 +20:28:3e:e1:f4:7e 10.172.207.7 +20:28:3e:e2:0b:22 10.172.207.8 +20:28:3e:e1:f4:ac 10.172.207.9 +20:28:3e:e1:f4:52 10.172.207.10 +20:28:3e:e1:f4:7a 10.172.207.11 +20:28:3e:e1:f4:7c 10.172.207.12 +20:28:3e:e1:f4:54 10.172.207.13 +20:28:3e:e1:f4:c4 10.172.207.14 +20:28:3e:e1:f4:2e 10.172.207.15 +20:28:3e:e2:0b:1e 10.172.207.16 +20:28:3e:e1:f4:c2 10.172.207.17 +20:28:3e:e1:f4:ec 10.172.207.18 +20:28:3e:e1:f4:6e 10.172.207.19 +20:28:3e:e1:f5:82 10.172.207.20 +20:28:3e:e1:f4:dc 10.172.207.21 +20:28:3e:e2:0b:2c 10.172.207.22 +20:28:3e:e1:f4:0c 10.172.207.23 +20:28:3e:e1:f4:a2 10.172.207.24 +20:28:3e:e1:f4:6c 10.172.207.25 +20:28:3e:e1:f4:ce 10.172.207.26 +20:28:3e:e1:f4:d2 10.172.207.27 +20:28:3e:e1:f5:32 10.172.207.28 diff --git a/readme.txt b/readme.txt new file mode 100644 index 0000000..bb8ec6c --- /dev/null +++ b/readme.txt @@ -0,0 +1,16 @@ +add by 20190705 +1. send three types of quaternions to kafka ,need one topic +a. rst =1 +b. syn=1 and ack=1 +c. udp 53 port + +add by 20190708 +1. edit clj_ip +Mapping from MAC to IP + +match to ip according to mac address +2. filter 10. IP from sapp pkt + +add by 20190729 +1. add MESA_htable to match +2. add multi-thread to deal the sapp pkt
\ No newline at end of file |
