#include #include #include #include #include #include #include #include #include #include #include #include #include "wy_singleflow_entry.h" struct wysf_global_info g_wysf_global_info; char WYSF_TCP_STREAM_PKT_ENTRY(struct streaminfo *a_stream, void **pme, int thread_seq, void *a_packet) { struct wysf_stream_context *context=(struct wysf_stream_context *)*pme; if(a_stream->pktstate == OP_STATE_PENDING) { context = *pme = (struct wysf_stream_context *)dictator_malloc(thread_seq, sizeof(struct wysf_stream_context)); context->func_node = NULL; } return APP_STATE_GIVEME; } static int wysf_register_field_stat(struct wysf_global_info *info) { const char *field_names[WYSF_FSSTAT_FIELD_MAX]={"FlowsTotal", "FlowsIntcpt", "FlowsRtnBack", "FlowsIntBytes", "BroadCstSent", "BroadCstRecv"}; const char *status_names[WYSF_FSSTAT_STATUS_MAX]={"FlowTable", "ActiveFuncs", "KeepAliveSW"}; int value; info->fsstat_handle = FS_create_handle(); FS_set_para(info->fsstat_handle, OUTPUT_DEVICE, info->fsstat_filepath, strlen(info->fsstat_filepath)+1); if(info->fsstat_print_mode == 1) { FS_set_para(info->fsstat_handle, PRINT_MODE, &info->fsstat_print_mode, sizeof(info->fsstat_print_mode)); } else { FS_set_para(info->fsstat_handle, PRINT_MODE, &info->fsstat_print_mode, sizeof(info->fsstat_print_mode)); value = 1; FS_set_para(info->fsstat_handle, FLUSH_BY_DATE, &value, sizeof(value)); } value = info->fsstat_period; FS_set_para(info->fsstat_handle, STAT_CYCLE, &value, sizeof(value)); value = 0; FS_set_para(info->fsstat_handle, CREATE_THREAD, &value, sizeof(value)); FS_set_para(info->fsstat_handle, APP_NAME, info->fsstat_appname, strlen(info->fsstat_appname)+1); FS_set_para(info->fsstat_handle, STATS_SERVER_IP, info->fsstat_dst_ip, strlen(info->fsstat_dst_ip)+1); FS_set_para(info->fsstat_handle, STATS_SERVER_PORT, &info->fsstat_dst_port, sizeof(info->fsstat_dst_port)); for(int i=0; ifsstat_field_ids[i] = FS_register(info->fsstat_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, field_names[i]); } for(int i=0; ifsstat_status_ids[i] = FS_register(info->fsstat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, status_names[i]); } FS_start(info->fsstat_handle); return 0; } void flow_tuple_htable_data_expires(void *data) { } MESA_htable_handle init_and_create_htable(unsigned int slot_size, int expire_time, int lock_num, void (*data_free)(void *data), int (*data_expire_with_condition)(void *data, int eliminate_type)) { MESA_htable_create_args_t htable_args; memset(&htable_args, 0, sizeof(MESA_htable_create_args_t)); htable_args.thread_safe = lock_num; htable_args.recursive = 1; htable_args.hash_slot_size = slot_size; htable_args.max_elem_num = 10*slot_size; htable_args.eliminate_type = HASH_ELIMINATE_ALGO_FIFO; htable_args.expire_time = expire_time; htable_args.data_free = data_free; //ÊͷŽڵãÄÚ´æ htable_args.data_expire_with_condition = data_expire_with_condition; return MESA_htable_create(&htable_args, sizeof(MESA_htable_create_args_t)); } static int udp_broadcast_client_socket_init(void *log_runtime) { int on = 1, socket_fd; if((socket_fd = socket(AF_INET,SOCK_DGRAM,0))<0) { MESA_RUNTIME_LOGV3(log_runtime, RLOG_LV_FATAL, "create udp socket error: %s", strerror(errno)); return -1; } on = 5242880; setsockopt(socket_fd, SOL_SOCKET, SO_SNDBUF, &on, sizeof(on)); return socket_fd; } static int _unfold_IP_range(char* ip_range, char***ip_list, int size) { int i=0,count=0, ret=0; int range_digits[5]; memset(range_digits,0,sizeof(range_digits)); ret=sscanf(ip_range,"%d.%d.%d.%d-%d",&range_digits[0],&range_digits[1],&range_digits[2],&range_digits[3],&range_digits[4]); if(ret!=4&&ret!=5) { return 0; } if(ret==4&&range_digits[4]==0) { range_digits[4]=range_digits[3]; } for(i=0;i<5;i++) { if(range_digits[i]<0||range_digits[i]>255) { return 0; } } count=range_digits[4]-range_digits[3]+1; *ip_list=(char**)realloc(*ip_list, sizeof(char*)*(size+count)); for(i=0;iconhash = conhash_instance_new(NULL, 0); snprintf(group->groupname, 64, "%s", judian_name); group->func_nodes = (struct function_node *)calloc(1, sizeof(struct function_node)*ipnum); group->func_nodes_num = ipnum; for(int i=0; ifunc_nodes[i].ip_as_bucketid); group->func_nodes[i].parent = group; group->func_nodes[i].sinaddr.sin_family = AF_INET; group->func_nodes[i].sinaddr.sin_addr.s_addr = group->func_nodes[i].ip_as_bucketid; group->func_nodes[i].sinaddr.sin_port = htons(info->bfd_dest_port); group->func_nodes[i].dst_ip_str = ip_list[i]; info->func_keepalive->insert(std::make_pair(group->func_nodes[i].ip_as_bucketid, &group->func_nodes[i])); } assert(info->func_group_num < 1024); info->judian_group_list[info->func_group_num++] = group; info->group_judian->insert(std::make_pair(std::string(judian_name), group)); return group; } int init_forward_group_nodes(struct judian_as_group *group, const char *judian_name, char **ip_list, int ipnum, struct wysf_global_info *info) { group->fwd_nodes = (struct forward_nodes *)calloc(1, sizeof(struct forward_nodes)*ipnum); group->fwd_nodes_num = ipnum; for(int i=0; ifwd_nodes[i].dst_ip); group->fwd_nodes[i].parent = group; group->fwd_nodes[i].sinaddr.sin_family = AF_INET; group->fwd_nodes[i].sinaddr.sin_addr.s_addr = group->fwd_nodes[i].dst_ip; group->fwd_nodes[i].sinaddr.sin_port = htons(info->bfd_dest_port); group->fwd_nodes[i].dst_ip_str = ip_list[i]; if((group->fwd_nodes[i].udp_sockfd = udp_broadcast_client_socket_init(info->log_runtime)) < 0) { return -1; } info->forwardip2judian->insert(std::make_pair(group->fwd_nodes[i].dst_ip, group)); } return 0; } int load_config_judian_function_iplist(const char *config_file, struct wysf_global_info *info) { char namebuffer[4096], ipbuffer[1024], *judian_name, *save=NULL, **node_iplist; struct judian_as_group *group; int ipnum; if(0>=MESA_load_profile_string_nodef(config_file, "MODULE", "judian_name_list", namebuffer, sizeof(namebuffer))) { MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [MODULE]judian_name_list not found!", config_file); assert(0);return -1; } for(judian_name=strtok_r(namebuffer, ";", &save); judian_name!=NULL; judian_name=strtok_r(NULL, ";", &save)) { if(0>=MESA_load_profile_string_nodef(config_file, judian_name, "judian_func_ip_list", ipbuffer, sizeof(ipbuffer))) { MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]judian_func_ip_list not found!", config_file, judian_name); assert(0);return -1; } if((ipnum = unfold_IP_range(ipbuffer, &node_iplist)) == 0) { MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]judian_func_ip_list is empty!", config_file, judian_name); continue; } group = init_function_group_nodes(judian_name, node_iplist, ipnum, info); free(node_iplist); if(0>=MESA_load_profile_string_nodef(config_file, judian_name, "judian_forward_ip_list", ipbuffer, sizeof(ipbuffer))) { MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]judian_forward_ip_list not found!", config_file, judian_name); assert(0);return -1; } if((ipnum=unfold_IP_range(ipbuffer, &node_iplist)) == 0) { MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]judian_forward_ip_list is empty!", config_file, judian_name); continue; } init_forward_group_nodes(group, judian_name, node_iplist, ipnum, info); free(node_iplist); } return 0; } int WYSF_STREAM_PKT_INIT(void) { char root_log_name[256]; pthread_t thread_desc; pthread_attr_t attr; int log_level; SAPP_TLV_T tlv_value; memset(&g_wysf_global_info, 0, sizeof(struct wysf_global_info)); g_wysf_global_info.group_judian = new std::map; g_wysf_global_info.func_keepalive = new std::map; g_wysf_global_info.forwardip2judian = new std::map; MESA_load_profile_string_def(WYSF_CONFIG_FILE, "MODULE", "RUN_LOG_NAME", root_log_name, sizeof(root_log_name), "./log/wysf_runtime.log"); MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "RUN_LOG_LV", &log_level, 10); g_wysf_global_info.log_runtime = MESA_create_runtime_log_handle(root_log_name, log_level); if(NULL == g_wysf_global_info.log_runtime) { printf("MESA_create_runtime_log_handle %s failed: %s\n", root_log_name, strerror(errno)); return -1; } if(load_config_judian_function_iplist(WYSF_CONFIG_FILE, &g_wysf_global_info)) { return -2; } MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "broadcast_receive_port", &g_wysf_global_info.broadcast_udp_server_port, 6789); MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "bfd_dest_port", &g_wysf_global_info.bfd_dest_port, 8193); MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "bfd_timeout_ms", &g_wysf_global_info.bfd_timeout_ms, 20); MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "bfd_timeout_times", &g_wysf_global_info.bfd_timeout_times, 3); MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "htable_flow_slots", &g_wysf_global_info.flowhtable_slots, 16777216); MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "htable_flow_expires", &g_wysf_global_info.flowhtable_expires, 120); MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "htable_locks_num", &g_wysf_global_info.htable_locknum, 67); MESA_load_profile_string_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_APPNAME", g_wysf_global_info.fsstat_appname, 16, "WYSF"); MESA_load_profile_string_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_FILEPATH", g_wysf_global_info.fsstat_filepath, 256, "./log/wysf.fs"); MESA_load_profile_uint_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_INTERVAL", &g_wysf_global_info.fsstat_period, 5); MESA_load_profile_uint_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_PRINT_MODE", &g_wysf_global_info.fsstat_print_mode, 1); MESA_load_profile_string_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_DST_IP", g_wysf_global_info.fsstat_dst_ip, 64, "127.0.0.1"); MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_DST_PORT", &g_wysf_global_info.fsstat_dst_port, 8125); wysf_register_field_stat(&g_wysf_global_info); g_wysf_global_info.stream_tuple_mapping = init_and_create_htable(g_wysf_global_info.flowhtable_slots, g_wysf_global_info.flowhtable_expires, g_wysf_global_info.htable_locknum, NULL, NULL); pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); if(pthread_create(&thread_desc, &attr, thread_funcnode_keepalive, NULL)) { MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno)); assert(0);return -5; } if(pthread_create(&thread_desc, &attr, thread_fwdnode_breoadcast, NULL)) { MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno)); assert(0);return -5; } memset(&tlv_value, 0, sizeof(SAPP_TLV_T)); tlv_value.type = GDEV_KEEPALIVE_OPT_GLOBAL_SWITCH; tlv_value.int_value = 0; tlv_value.length = sizeof(int); gdev_keepalive_set_opt(&tlv_value); return 0; } void WYSF_STREAM_PKT_DESTROY(void) { }