diff options
| author | [email protected] <[email protected]> | 2021-11-02 12:34:05 +0800 |
|---|---|---|
| committer | [email protected] <[email protected]> | 2021-11-02 12:34:05 +0800 |
| commit | 31f55f0b88d4af34a8a36497f5e49c69b88b2fbf (patch) | |
| tree | 63515b3ceb361369cdc88ae6db1a808fc80e5b42 /src/wy_singleflow_entry.cpp | |
Diffstat (limited to 'src/wy_singleflow_entry.cpp')
| -rw-r--r-- | src/wy_singleflow_entry.cpp | 324 |
1 files changed, 324 insertions, 0 deletions
diff --git a/src/wy_singleflow_entry.cpp b/src/wy_singleflow_entry.cpp new file mode 100644 index 0000000..a464d8f --- /dev/null +++ b/src/wy_singleflow_entry.cpp @@ -0,0 +1,324 @@ +#include <assert.h> +#include <string.h> +#include <unistd.h> +#include <ctype.h> +#include <stdlib.h> +#include <arpa/inet.h> +#include <netinet/in.h> +#include <netinet/ip6.h> +#include <sys/time.h> +#include <errno.h> + +#include <MESA/MESA_prof_load.h> +#include <MESA/stream.h> + +#include "wy_singleflow_entry.h" + +struct wysf_global_info g_wysf_global_info; + + +char WYSF_TCP_STREAM_PKT_ENTRY(struct streaminfo *a_stream, void **pme, int thread_seq, void *a_packet) +{ + struct wysf_stream_context *context=(struct wysf_stream_context *)*pme; + + if(a_stream->pktstate == OP_STATE_PENDING) + { + context = *pme = (struct wysf_stream_context *)dictator_malloc(thread_seq, sizeof(struct wysf_stream_context)); + context->func_node = NULL; + } + + + return APP_STATE_GIVEME; +} + + +static int wysf_register_field_stat(struct wysf_global_info *info) +{ + const char *field_names[WYSF_FSSTAT_FIELD_MAX]={"FlowsTotal", "FlowsIntcpt", "FlowsRtnBack", "FlowsIntBytes", + "BroadCstSent", "BroadCstRecv"}; + const char *status_names[WYSF_FSSTAT_STATUS_MAX]={"FlowTable", "ActiveFuncs", "KeepAliveSW"}; + int value; + + info->fsstat_handle = FS_create_handle(); + FS_set_para(info->fsstat_handle, OUTPUT_DEVICE, info->fsstat_filepath, strlen(info->fsstat_filepath)+1); + if(info->fsstat_print_mode == 1) + { + FS_set_para(info->fsstat_handle, PRINT_MODE, &info->fsstat_print_mode, sizeof(info->fsstat_print_mode)); + } + else + { + FS_set_para(info->fsstat_handle, PRINT_MODE, &info->fsstat_print_mode, sizeof(info->fsstat_print_mode)); + value = 1; + FS_set_para(info->fsstat_handle, FLUSH_BY_DATE, &value, sizeof(value)); + } + value = info->fsstat_period; + FS_set_para(info->fsstat_handle, STAT_CYCLE, &value, sizeof(value)); + value = 0; + FS_set_para(info->fsstat_handle, CREATE_THREAD, &value, sizeof(value)); + FS_set_para(info->fsstat_handle, APP_NAME, info->fsstat_appname, strlen(info->fsstat_appname)+1); + FS_set_para(info->fsstat_handle, STATS_SERVER_IP, info->fsstat_dst_ip, strlen(info->fsstat_dst_ip)+1); + FS_set_para(info->fsstat_handle, STATS_SERVER_PORT, &info->fsstat_dst_port, sizeof(info->fsstat_dst_port)); + + for(int i=0; i<WYSF_FSSTAT_FIELD_MAX; i++) + { + info->fsstat_field_ids[i] = FS_register(info->fsstat_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, field_names[i]); + } + for(int i=0; i<WYSF_FSSTAT_STATUS_MAX; i++) + { + info->fsstat_status_ids[i] = FS_register(info->fsstat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, status_names[i]); + } + FS_start(info->fsstat_handle); + return 0; +} + +void flow_tuple_htable_data_expires(void *data) +{ +} + +MESA_htable_handle init_and_create_htable(unsigned int slot_size, int expire_time, int lock_num, + void (*data_free)(void *data), + int (*data_expire_with_condition)(void *data, int eliminate_type)) +{ + MESA_htable_create_args_t htable_args; + + memset(&htable_args, 0, sizeof(MESA_htable_create_args_t)); + + htable_args.thread_safe = lock_num; + htable_args.recursive = 1; + htable_args.hash_slot_size = slot_size; + htable_args.max_elem_num = 10*slot_size; + htable_args.eliminate_type = HASH_ELIMINATE_ALGO_FIFO; + htable_args.expire_time = expire_time; + htable_args.data_free = data_free; //�ͷŽڵ��ڴ� + htable_args.data_expire_with_condition = data_expire_with_condition; + + return MESA_htable_create(&htable_args, sizeof(MESA_htable_create_args_t)); +} + +static int udp_broadcast_client_socket_init(void *log_runtime) +{ + int on = 1, socket_fd; + + if((socket_fd = socket(AF_INET,SOCK_DGRAM,0))<0) + { + MESA_RUNTIME_LOGV3(log_runtime, RLOG_LV_FATAL, "create udp socket error: %s", strerror(errno)); + return -1; + } + on = 5242880; + setsockopt(socket_fd, SOL_SOCKET, SO_SNDBUF, &on, sizeof(on)); + return socket_fd; +} + +static int _unfold_IP_range(char* ip_range, char***ip_list, int size) +{ + int i=0,count=0, ret=0; + int range_digits[5]; + memset(range_digits,0,sizeof(range_digits)); + ret=sscanf(ip_range,"%d.%d.%d.%d-%d",&range_digits[0],&range_digits[1],&range_digits[2],&range_digits[3],&range_digits[4]); + if(ret!=4&&ret!=5) + { + return 0; + } + if(ret==4&&range_digits[4]==0) + { + range_digits[4]=range_digits[3]; + } + for(i=0;i<5;i++) + { + if(range_digits[i]<0||range_digits[i]>255) + { + return 0; + } + } + count=range_digits[4]-range_digits[3]+1; + *ip_list=(char**)realloc(*ip_list, sizeof(char*)*(size+count)); + for(i=0;i<count;i++) + { + (*ip_list)[size+i]=(char*)malloc(64); + snprintf((*ip_list)[size+i],64,"%d.%d.%d.%d",range_digits[0],range_digits[1],range_digits[2],range_digits[3]+i); + } + return count; +} + +static int unfold_IP_range(const char* ip_range, char***ip_list) +{ + char *token=NULL,*sub_token=NULL,*saveptr; + char *buffer=(char*)calloc(sizeof(char),strlen(ip_range)+1); + int count=0; + strcpy(buffer,ip_range); + for (token = buffer; ; token= NULL) + { + sub_token= strtok_r(token,";", &saveptr); + if (sub_token == NULL) + break; + count+=_unfold_IP_range(sub_token, ip_list,count); + } + free(buffer); + return count; +} + +struct judian_as_group *init_function_group_nodes(const char *judian_name, char **ip_list, int ipnum, struct wysf_global_info *info) +{ + struct judian_as_group *group; + + group = (struct judian_as_group *)calloc(1, sizeof(struct judian_as_group)); + group->conhash = conhash_instance_new(NULL, 0); + snprintf(group->groupname, 64, "%s", judian_name); + + group->func_nodes = (struct function_node *)calloc(1, sizeof(struct function_node)*ipnum); + group->func_nodes_num = ipnum; + for(int i=0; i<ipnum; i++) + { + inet_pton(AF_INET, ip_list[i], &group->func_nodes[i].ip_as_bucketid); + group->func_nodes[i].parent = group; + group->func_nodes[i].sinaddr.sin_family = AF_INET; + group->func_nodes[i].sinaddr.sin_addr.s_addr = group->func_nodes[i].ip_as_bucketid; + group->func_nodes[i].sinaddr.sin_port = htons(info->bfd_dest_port); + group->func_nodes[i].dst_ip_str = ip_list[i]; + + info->func_keepalive->insert(std::make_pair(group->func_nodes[i].ip_as_bucketid, &group->func_nodes[i])); + } + + assert(info->func_group_num < 1024); + info->judian_group_list[info->func_group_num++] = group; + info->group_judian->insert(std::make_pair(std::string(judian_name), group)); + return group; +} + +int init_forward_group_nodes(struct judian_as_group *group, const char *judian_name, + char **ip_list, int ipnum, struct wysf_global_info *info) +{ + group->fwd_nodes = (struct forward_nodes *)calloc(1, sizeof(struct forward_nodes)*ipnum); + group->fwd_nodes_num = ipnum; + for(int i=0; i<ipnum; i++) + { + inet_pton(AF_INET, ip_list[i], &group->fwd_nodes[i].dst_ip); + group->fwd_nodes[i].parent = group; + group->fwd_nodes[i].sinaddr.sin_family = AF_INET; + group->fwd_nodes[i].sinaddr.sin_addr.s_addr = group->fwd_nodes[i].dst_ip; + group->fwd_nodes[i].sinaddr.sin_port = htons(info->bfd_dest_port); + group->fwd_nodes[i].dst_ip_str = ip_list[i]; + if((group->fwd_nodes[i].udp_sockfd = udp_broadcast_client_socket_init(info->log_runtime)) < 0) + { + return -1; + } + + info->forwardip2judian->insert(std::make_pair(group->fwd_nodes[i].dst_ip, group)); + } + return 0; +} + +int load_config_judian_function_iplist(const char *config_file, struct wysf_global_info *info) +{ + char namebuffer[4096], ipbuffer[1024], *judian_name, *save=NULL, **node_iplist; + struct judian_as_group *group; + int ipnum; + + if(0>=MESA_load_profile_string_nodef(config_file, "MODULE", "judian_name_list", namebuffer, sizeof(namebuffer))) + { + MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [MODULE]judian_name_list not found!", config_file); + assert(0);return -1; + } + + for(judian_name=strtok_r(namebuffer, ";", &save); judian_name!=NULL; judian_name=strtok_r(NULL, ";", &save)) + { + if(0>=MESA_load_profile_string_nodef(config_file, judian_name, "judian_func_ip_list", ipbuffer, sizeof(ipbuffer))) + { + MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]judian_func_ip_list not found!", config_file, judian_name); + assert(0);return -1; + } + if((ipnum = unfold_IP_range(ipbuffer, &node_iplist)) == 0) + { + MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]judian_func_ip_list is empty!", config_file, judian_name); + continue; + } + group = init_function_group_nodes(judian_name, node_iplist, ipnum, info); + free(node_iplist); + + if(0>=MESA_load_profile_string_nodef(config_file, judian_name, "judian_forward_ip_list", ipbuffer, sizeof(ipbuffer))) + { + MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]judian_forward_ip_list not found!", config_file, judian_name); + assert(0);return -1; + } + if((ipnum=unfold_IP_range(ipbuffer, &node_iplist)) == 0) + { + MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]judian_forward_ip_list is empty!", config_file, judian_name); + continue; + } + init_forward_group_nodes(group, judian_name, node_iplist, ipnum, info); + free(node_iplist); + } + return 0; +} + +int WYSF_STREAM_PKT_INIT(void) +{ + char root_log_name[256]; + pthread_t thread_desc; + pthread_attr_t attr; + int log_level; + SAPP_TLV_T tlv_value; + + memset(&g_wysf_global_info, 0, sizeof(struct wysf_global_info)); + g_wysf_global_info.group_judian = new std::map<std::string, struct judian_as_group*>; + g_wysf_global_info.func_keepalive = new std::map<int32_t, struct function_node*>; + g_wysf_global_info.forwardip2judian = new std::map<int32_t, struct judian_as_group*>; + + MESA_load_profile_string_def(WYSF_CONFIG_FILE, "MODULE", "RUN_LOG_NAME", root_log_name, sizeof(root_log_name), "./log/wysf_runtime.log"); + MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "RUN_LOG_LV", &log_level, 10); + g_wysf_global_info.log_runtime = MESA_create_runtime_log_handle(root_log_name, log_level); + if(NULL == g_wysf_global_info.log_runtime) + { + printf("MESA_create_runtime_log_handle %s failed: %s\n", root_log_name, strerror(errno)); + return -1; + } + if(load_config_judian_function_iplist(WYSF_CONFIG_FILE, &g_wysf_global_info)) + { + return -2; + } + + MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "broadcast_receive_port", &g_wysf_global_info.broadcast_udp_server_port, 6789); + + MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "bfd_dest_port", &g_wysf_global_info.bfd_dest_port, 8193); + MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "bfd_timeout_ms", &g_wysf_global_info.bfd_timeout_ms, 20); + MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "bfd_timeout_times", &g_wysf_global_info.bfd_timeout_times, 3); + MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "htable_flow_slots", &g_wysf_global_info.flowhtable_slots, 16777216); + MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "htable_flow_expires", &g_wysf_global_info.flowhtable_expires, 120); + MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "htable_locks_num", &g_wysf_global_info.htable_locknum, 67); + + MESA_load_profile_string_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_APPNAME", g_wysf_global_info.fsstat_appname, 16, "WYSF"); + MESA_load_profile_string_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_FILEPATH", g_wysf_global_info.fsstat_filepath, 256, "./log/wysf.fs"); + MESA_load_profile_uint_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_INTERVAL", &g_wysf_global_info.fsstat_period, 5); + MESA_load_profile_uint_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_PRINT_MODE", &g_wysf_global_info.fsstat_print_mode, 1); + MESA_load_profile_string_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_DST_IP", g_wysf_global_info.fsstat_dst_ip, 64, "127.0.0.1"); + MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_DST_PORT", &g_wysf_global_info.fsstat_dst_port, 8125); + wysf_register_field_stat(&g_wysf_global_info); + + g_wysf_global_info.stream_tuple_mapping = init_and_create_htable(g_wysf_global_info.flowhtable_slots, + g_wysf_global_info.flowhtable_expires, g_wysf_global_info.htable_locknum, NULL, NULL); + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + if(pthread_create(&thread_desc, &attr, thread_funcnode_keepalive, NULL)) + { + MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno)); + assert(0);return -5; + } + if(pthread_create(&thread_desc, &attr, thread_fwdnode_breoadcast, NULL)) + { + MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno)); + assert(0);return -5; + } + + memset(&tlv_value, 0, sizeof(SAPP_TLV_T)); + tlv_value.type = GDEV_KEEPALIVE_OPT_GLOBAL_SWITCH; + tlv_value.int_value = 0; + tlv_value.length = sizeof(int); + gdev_keepalive_set_opt(&tlv_value); + return 0; +} + +void WYSF_STREAM_PKT_DESTROY(void) +{ +} + |
