/************************* * File Name :ip_reset_plug.c * Author:wfm * Date:2019-02-28 *add new mac_addr 2019-07-09 *add search mac_addr,and filter 10.IP 2019-07-26 *add multi thread by 2019-07-29 **************************/ //#define __FAVOR_BSD #include "ip_reset_plug.h" #ifdef __cplusplus extern "C" { #endif static int run = 1; static void stop(int sig){ run = 0; fclose(stdin); } const char * ipreset_version_VERSION_20190729 = ""; const char *module_name = "IPRESET"; common_module_t common; static void dr_msg_cb(rd_kafka_t *rk,const rd_kafka_message_t *rkmessage, void *opaque) { if (rkmessage->err) MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"Message delivery failed: %s\n",rd_kafka_err2str(rkmessage->err)); else MESA_handle_runtime_log(common.log_handle,RLOG_LV_DEBUG,module_name,"Message delivered (%zd bytes, partition %"PRId32")\n",rkmessage->len,rkmessage->partition); } int init_kafka() { rd_kafka_conf_t *conf; /*临时配置对象*/ conf = rd_kafka_conf_new(); /* if (rd_kafka_conf_set(conf, "bootstrap.servers", common.kafka_servers, common.errstr, sizeof(common.errstr)) != RD_KAFKA_CONF_OK) { MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"rd_kafka_conf_new is failed ,%s",common.errstr); return -1; } common.kafka_producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, common.errstr, sizeof(common.errstr)); if(!common.kafka_producer) { MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"kafka_producer is failed, %s",common.errstr); return -1; } */ common.kafka_producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, common.errstr, sizeof(common.errstr)); //this is line if(!common.kafka_producer) { MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"kafka_producer is failed, %s",common.errstr); return -1; } if (rd_kafka_brokers_add(common.kafka_producer, common.kafka_servers) == 0) { MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"rd_kafka_conf_new is failed ,%s",common.errstr); return -1; } common.rd_kafka_topic = rd_kafka_topic_new(common.kafka_producer,common.kafka_topic,NULL); if(!common.rd_kafka_topic) { MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"Failed to create topic object: %s",common.errstr); rd_kafka_destroy(common.kafka_producer); return -1; } rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); MESA_handle_runtime_log(common.log_handle,RLOG_LV_DEBUG,module_name,"set callback func success"); return 0; } int kafka_destroy() { if (common.rd_kafka_topic == NULL) { MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"rd_kafka_topic is NULL: %s",common.errstr); return -1; } if (common.kafka_producer == NULL) { MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"rd_kafka_t is NULL: %s",common.errstr); return -1; } rd_kafka_topic_destroy(common.kafka_producer); rd_kafka_destroy(common.rd_kafka_topic); return 0; } static int create_path (char *path) { if(access(path, 0) == -1) { if(mkdir(path, 0777)) { printf("cannnot create path:%s",path); MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,module_name,"cannot create path:%s",path); return -1; } } return 0; } static char *jump_over_char (char *src, char c) { while (*src == c) src ++; return src; } int create_path_p (char *path) { char *pos = NULL; char *dest = NULL; char tmp[128] = {0}; int len = 0; dest = path; dest = jump_over_char (dest, '/'); while ((dest != NULL) && ((pos = strchr (dest, '/')) != NULL)) { len = pos - path; strncpy (tmp, path, len); tmp[len] = '\0'; if (create_path (tmp) < 0) { return -1; } dest = pos +1; } if (*dest != '\0') { if (create_path (path) < 0) { return -1; } } return 0; } void check_file_path (char *file) { char path[MAX_PATH_LENGTH] = {0}; char *pos = NULL; pos = strrchr (file, '/'); if (pos != NULL) { strncpy (path, file, pos - file); create_path_p (path); } } /*int get_macip_map() { char line[1000]; char macaddr[1000],tmp[64]; int i=0; FILE *mapfile=fopen("./resetconf/macip.conf","r"); while(fgets(line,sizeof(line),mapfile)) { sscanf(line,"%s %s",macaddr,tmp); strcpy(common.macip[i],macaddr); strcpy(common.cljip[i],tmp); i++; } fclose(mapfile); return 0; } int get_clj_ip(char mac_addr[30]) { int j; for(j=0;j<=strlen(common.macip);j++) { if(strcmp(common.macip[j],mac_addr)==0) { printf("%s\n",common.cljip[j]); strcpy(common.clj_ip,common.cljip[j]); MESA_handle_runtime_log(common.log_handle,RLOG_LV_DEBUG,module_name,"The mac_addr map cljip is %s",common.clj_ip); break; } } return 0; }*/ int get_macip_map() { int ret=0,num=0; int opt_int; common.hashset = (struct hash_set_t *)malloc(sizeof(struct hash_set_t) * HASH_THREAD_NUM); if(NULL == common.hashset ) { //to do! MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,"[GET_IP]","common.hashset is failed!"); return -1; } memset(common.hashset , 0, sizeof(struct hash_set_t) * HASH_THREAD_NUM); for(num=0;num=0){ ++i; } } printf("MESA_htable_add is ok ,num is %d\n",i); MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,"[GET_IP]","MESA_htable_add is ok ,num is %d",i); fclose(mapfile); } /* cljip_t *mip_search = NULL; for(j=0;jclj_ip); } }*/ return 0; } int get_clj_ip(char mac_addr[17],int thread_num) { char *results; results =(char *)MESA_htable_search(common.hashset[thread_num].htable,mac_addr, strlen(mac_addr)); if(results != NULL) { strcpy(common.clj_ip,results); printf("result-cljip is %s\n",common.clj_ip); MESA_handle_runtime_log(common.log_handle,RLOG_LV_DEBUG,"[GET_IP]","common.clj_ip is %s",common.clj_ip); return 1; } /* strcpy(common.cljaddr,result); printf("result-cljip is %s\n",common.cljaddr); memcpy(common.clj_ip,common.cljaddr,strlen(common.cljaddr)); common.clj_ip[strlen(common.cljaddr)]='\0'; */ return 0; } int ipreset_init() { int ret=0,log_level = 0,batch_send_number=0,send_types=0; char log_path[MAX_PATH_LENGTH] = ""; char kafka_servers[MAX_LENGTH] = ""; char kafka_topic[MAX_PATH_LENGTH] = ""; memset(&common,0,sizeof(common)); ret = MESA_load_profile_string_nodef (IPRESET_CONFIG_FILE, "KAFKA", "kafka_bootstrap_servers", kafka_servers, MAX_LENGTH); ret = MESA_load_profile_string_nodef (IPRESET_CONFIG_FILE, "KAFKA", "kafka_bootstrap_topic", kafka_topic, MAX_PATH_LENGTH); ret = MESA_load_profile_string_def (IPRESET_CONFIG_FILE, "CONFIG", "log_path", log_path, MAX_PATH_LENGTH,"./ipresetlog/ipreset.log"); ret = MESA_load_profile_int_def (IPRESET_CONFIG_FILE, "CONFIG", "log_level", &log_level, RLOG_LV_INFO); ret = MESA_load_profile_int_def (IPRESET_CONFIG_FILE, "CONFIG", "batch_send_number", &batch_send_number,100); ret = MESA_load_profile_int_def (IPRESET_CONFIG_FILE, "CONFIG", "send_types", &send_types,0); check_file_path (log_path); sprintf (common.kafka_servers, "%s",kafka_servers); sprintf(common.kafka_topic,"%s",kafka_topic); //strcpy(common->kafka_servers, kafka_servers); //strcpy(common->kafka_topic,kafka_topic); common.batch_send_number=batch_send_number; common.send_types=send_types; common.log_handle = MESA_create_runtime_log_handle(log_path,log_level); if(ret < 0){ MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,"[INIT]","ipreset read configfile failed"); assert(0); } ret = init_kafka(); if(ret < 0){ MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,"[INIT]","ipreset plug init failed"); return -1; } else MESA_handle_runtime_log(common.log_handle,RLOG_LV_DEBUG,"[INIT]","ipreset init_kafka() ok!"); // common.htable =NULL; ret = get_macip_map(); if(ret<0){ MESA_handle_runtime_log(common.log_handle,RLOG_LV_FATAL,"[INIT]","get_macip_map() failed!"); return -1; } return 0; } /*void __free_data(void *data) { free(data); data = NULL; */ void ipreset_destroy() { rd_kafka_flush(common.kafka_producer, 10 * 1000); //free(rkmessages); kafka_destroy(); //MESA_handle_runtime_log(log_handle,log_level,module_name,"%s \n","ipreset plug destroy successfully"); int num=0; for(num=0;numaddr.ipv4; unsigned char src_ip[16], dst_ip[16]; char* protocol; unsigned short src_port, dst_port; int ret=0,res=0; unsigned char rec= APP_STATE_GIVEME; inet_ntop(AF_INET, (void *)&(laddr_ipv4->saddr), src_ip, sizeof(src_ip)); inet_ntop(AF_INET, (void *)&(laddr_ipv4->daddr), dst_ip, sizeof(dst_ip)); //filter 10. IP char *ips="10.0.0.1"; if(strncmp(src_ip,ips,3)!=0 || strncmp(dst_ip,ips,3)!=0){ // if(!strncmp(src_ip,ips,3) || !strncmp(dst_ip,ips,3)){ char mac_addr[17]; void *raw_pkt_hdr; ret =get_rawpkt_opt_from_streaminfo(f_stream, RAW_PKT_GET_DATA, &raw_pkt_hdr); if(0 == ret){ struct ethhdr *eth_hdr = (struct ethhdr *)raw_pkt_hdr; eth_hdr->h_source; sprintf(mac_addr, "%02x:%02x:%02x:%02x:%02x:%02x", (unsigned char)eth_hdr->h_source[0], (unsigned char)eth_hdr->h_source[1], (unsigned char)eth_hdr->h_source[2], (unsigned char)eth_hdr->h_source[3], (unsigned char)eth_hdr->h_source[4], (unsigned char)eth_hdr->h_source[5] ); /* sprintf(mac_addr, "%02x%02x-%02x%02x-%02x%02x", (unsigned char)eth_hdr->h_source[0], (unsigned char)eth_hdr->h_source[1], (unsigned char)eth_hdr->h_source[2], (unsigned char)eth_hdr->h_source[3], (unsigned char)eth_hdr->h_source[4], (unsigned char)eth_hdr->h_source[5]); */ // printf("mac_addr is %s\n",mac_addr); //map ip mac res = get_clj_ip(mac_addr,thread_seq); if(res<0){ MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,module_name,"get_clj_ip is failed,res is %d",res); return APP_STATE_DROPME; } } else{ printf("The ret is error!\n"); MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,module_name,"There is no mac_addr,ret is %d",ret); } if(a_packet->ip_p == 6) { protocol = "IPv4_TCP"; struct iphdr *ips = (struct iphdr*)a_packet; struct tcphdr *tcps = (struct tcphdr*)((char*)a_packet+ips->ihl*4); // if(tcphdra->th_flags&TH_RST) //rst if(tcps->rst==1) { if(common.send_types==1 || common.send_types==0) { sprintf (common.send_type, "%s","reset"); //get port and send data src_port = ntohs(tcps->source); dst_port = ntohs(tcps->dest); char* buf = getJSONString(src_ip, dst_ip, src_port, dst_port,protocol); MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,module_name,"reset cjson buf is %s",buf); signal(SIGINT, stop); if(buf !=NULL) { while(1) { ret = rd_kafka_produce(common.rd_kafka_topic,RD_KAFKA_PARTITION_UA,RD_KAFKA_MSG_F_FREE,buf,strlen(buf),NULL,0,NULL); rd_kafka_poll(common.kafka_producer,0); if(ret == 0 || rd_kafka_last_error()!= RD_KAFKA_RESP_ERR__QUEUE_FULL ) // if(ret == 0 ) break; } if(ret !=0) free(buf); rec = APP_STATE_GIVEME; } else{ MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,module_name,"rd_kafka_producer TCP reset failed, %d",ret); return APP_STATE_DROPME; } } } else if(tcps->syn==1 && tcps->ack==1) { if(common.send_types==2 || common.send_types==0) { sprintf (common.send_type, "%s","synack"); //get port and send data src_port = ntohs(tcps->source); dst_port = ntohs(tcps->dest); char* buf = getJSONString(src_ip, dst_ip, src_port, dst_port,protocol); MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,module_name,"synack cjson buf is %s",buf); signal(SIGINT, stop); if(buf !=NULL) { while(1) { ret = rd_kafka_produce(common.rd_kafka_topic,RD_KAFKA_PARTITION_UA,RD_KAFKA_MSG_F_FREE,buf,strlen(buf),NULL,0,NULL); rd_kafka_poll(common.kafka_producer,0); if(ret == 0 || rd_kafka_last_error()!= RD_KAFKA_RESP_ERR__QUEUE_FULL ) break; } if(ret !=0) free(buf); rec = APP_STATE_GIVEME; } else{ MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,module_name,"rd_kafka_producer TCP syn+ack failed, %d",ret); return APP_STATE_DROPME; } } } } else if(a_packet->ip_p == 17) { if(common.send_types==3 || common.send_types==0) { protocol = "IPv4_UDP"; struct iphdr *ips = (struct iphdr*)a_packet; struct udphdr *udps = (struct udphdr*)((char*)a_packet+ips->ihl*4); src_port = ntohs(udps->source); dst_port = ntohs(udps->dest); signal(SIGINT, stop); if(src_port == 53 || dst_port == 53) { sprintf (common.send_type, "%s","53port"); char* buf = getJSONString(src_ip, dst_ip, src_port, dst_port,protocol); MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,module_name,"53port cjson buf is %s",buf); //insert into kafka if(buf !=NULL) { while(1) { // ret = rd_kafka_produce(common.rd_kafka_topic,RD_KAFKA_PARTITION_UA,RD_KAFKA_MSG_F_COPY,buf,strlen(buf),NULL,0,NULL); ret = rd_kafka_produce(common.rd_kafka_topic,RD_KAFKA_PARTITION_UA,RD_KAFKA_MSG_F_FREE,buf,strlen(buf),NULL,0,NULL); // if(ret == -1 )break; rd_kafka_poll(common.kafka_producer,0); if(ret == 0 || rd_kafka_last_error()!= RD_KAFKA_RESP_ERR__QUEUE_FULL ) //printf("rd_kafka_last_error is full! and ret=0"); break; // if(ret == 0 )break; // usleep(1000); } if(ret !=0) free(buf); rec = APP_STATE_GIVEME; } else{ MESA_handle_runtime_log(common.log_handle,RLOG_LV_INFO,module_name,"rd_kafka_producer UDP failed, %d",ret); return APP_STATE_DROPME; } } } } else ; } return rec; } #ifdef __cplusplus } #endif