summaryrefslogtreecommitdiff
path: root/src/wy_singleflow_entry.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/wy_singleflow_entry.cpp')
-rw-r--r--src/wy_singleflow_entry.cpp324
1 files changed, 324 insertions, 0 deletions
diff --git a/src/wy_singleflow_entry.cpp b/src/wy_singleflow_entry.cpp
new file mode 100644
index 0000000..a464d8f
--- /dev/null
+++ b/src/wy_singleflow_entry.cpp
@@ -0,0 +1,324 @@
+#include <assert.h>
+#include <string.h>
+#include <unistd.h>
+#include <ctype.h>
+#include <stdlib.h>
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <netinet/ip6.h>
+#include <sys/time.h>
+#include <errno.h>
+
+#include <MESA/MESA_prof_load.h>
+#include <MESA/stream.h>
+
+#include "wy_singleflow_entry.h"
+
+struct wysf_global_info g_wysf_global_info;
+
+
+char WYSF_TCP_STREAM_PKT_ENTRY(struct streaminfo *a_stream, void **pme, int thread_seq, void *a_packet)
+{
+ struct wysf_stream_context *context=(struct wysf_stream_context *)*pme;
+
+ if(a_stream->pktstate == OP_STATE_PENDING)
+ {
+ context = *pme = (struct wysf_stream_context *)dictator_malloc(thread_seq, sizeof(struct wysf_stream_context));
+ context->func_node = NULL;
+ }
+
+
+ return APP_STATE_GIVEME;
+}
+
+
+static int wysf_register_field_stat(struct wysf_global_info *info)
+{
+ const char *field_names[WYSF_FSSTAT_FIELD_MAX]={"FlowsTotal", "FlowsIntcpt", "FlowsRtnBack", "FlowsIntBytes",
+ "BroadCstSent", "BroadCstRecv"};
+ const char *status_names[WYSF_FSSTAT_STATUS_MAX]={"FlowTable", "ActiveFuncs", "KeepAliveSW"};
+ int value;
+
+ info->fsstat_handle = FS_create_handle();
+ FS_set_para(info->fsstat_handle, OUTPUT_DEVICE, info->fsstat_filepath, strlen(info->fsstat_filepath)+1);
+ if(info->fsstat_print_mode == 1)
+ {
+ FS_set_para(info->fsstat_handle, PRINT_MODE, &info->fsstat_print_mode, sizeof(info->fsstat_print_mode));
+ }
+ else
+ {
+ FS_set_para(info->fsstat_handle, PRINT_MODE, &info->fsstat_print_mode, sizeof(info->fsstat_print_mode));
+ value = 1;
+ FS_set_para(info->fsstat_handle, FLUSH_BY_DATE, &value, sizeof(value));
+ }
+ value = info->fsstat_period;
+ FS_set_para(info->fsstat_handle, STAT_CYCLE, &value, sizeof(value));
+ value = 0;
+ FS_set_para(info->fsstat_handle, CREATE_THREAD, &value, sizeof(value));
+ FS_set_para(info->fsstat_handle, APP_NAME, info->fsstat_appname, strlen(info->fsstat_appname)+1);
+ FS_set_para(info->fsstat_handle, STATS_SERVER_IP, info->fsstat_dst_ip, strlen(info->fsstat_dst_ip)+1);
+ FS_set_para(info->fsstat_handle, STATS_SERVER_PORT, &info->fsstat_dst_port, sizeof(info->fsstat_dst_port));
+
+ for(int i=0; i<WYSF_FSSTAT_FIELD_MAX; i++)
+ {
+ info->fsstat_field_ids[i] = FS_register(info->fsstat_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, field_names[i]);
+ }
+ for(int i=0; i<WYSF_FSSTAT_STATUS_MAX; i++)
+ {
+ info->fsstat_status_ids[i] = FS_register(info->fsstat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, status_names[i]);
+ }
+ FS_start(info->fsstat_handle);
+ return 0;
+}
+
+void flow_tuple_htable_data_expires(void *data)
+{
+}
+
+MESA_htable_handle init_and_create_htable(unsigned int slot_size, int expire_time, int lock_num,
+ void (*data_free)(void *data),
+ int (*data_expire_with_condition)(void *data, int eliminate_type))
+{
+ MESA_htable_create_args_t htable_args;
+
+ memset(&htable_args, 0, sizeof(MESA_htable_create_args_t));
+
+ htable_args.thread_safe = lock_num;
+ htable_args.recursive = 1;
+ htable_args.hash_slot_size = slot_size;
+ htable_args.max_elem_num = 10*slot_size;
+ htable_args.eliminate_type = HASH_ELIMINATE_ALGO_FIFO;
+ htable_args.expire_time = expire_time;
+ htable_args.data_free = data_free; //�ͷŽڵ��ڴ�
+ htable_args.data_expire_with_condition = data_expire_with_condition;
+
+ return MESA_htable_create(&htable_args, sizeof(MESA_htable_create_args_t));
+}
+
+static int udp_broadcast_client_socket_init(void *log_runtime)
+{
+ int on = 1, socket_fd;
+
+ if((socket_fd = socket(AF_INET,SOCK_DGRAM,0))<0)
+ {
+ MESA_RUNTIME_LOGV3(log_runtime, RLOG_LV_FATAL, "create udp socket error: %s", strerror(errno));
+ return -1;
+ }
+ on = 5242880;
+ setsockopt(socket_fd, SOL_SOCKET, SO_SNDBUF, &on, sizeof(on));
+ return socket_fd;
+}
+
+static int _unfold_IP_range(char* ip_range, char***ip_list, int size)
+{
+ int i=0,count=0, ret=0;
+ int range_digits[5];
+ memset(range_digits,0,sizeof(range_digits));
+ ret=sscanf(ip_range,"%d.%d.%d.%d-%d",&range_digits[0],&range_digits[1],&range_digits[2],&range_digits[3],&range_digits[4]);
+ if(ret!=4&&ret!=5)
+ {
+ return 0;
+ }
+ if(ret==4&&range_digits[4]==0)
+ {
+ range_digits[4]=range_digits[3];
+ }
+ for(i=0;i<5;i++)
+ {
+ if(range_digits[i]<0||range_digits[i]>255)
+ {
+ return 0;
+ }
+ }
+ count=range_digits[4]-range_digits[3]+1;
+ *ip_list=(char**)realloc(*ip_list, sizeof(char*)*(size+count));
+ for(i=0;i<count;i++)
+ {
+ (*ip_list)[size+i]=(char*)malloc(64);
+ snprintf((*ip_list)[size+i],64,"%d.%d.%d.%d",range_digits[0],range_digits[1],range_digits[2],range_digits[3]+i);
+ }
+ return count;
+}
+
+static int unfold_IP_range(const char* ip_range, char***ip_list)
+{
+ char *token=NULL,*sub_token=NULL,*saveptr;
+ char *buffer=(char*)calloc(sizeof(char),strlen(ip_range)+1);
+ int count=0;
+ strcpy(buffer,ip_range);
+ for (token = buffer; ; token= NULL)
+ {
+ sub_token= strtok_r(token,";", &saveptr);
+ if (sub_token == NULL)
+ break;
+ count+=_unfold_IP_range(sub_token, ip_list,count);
+ }
+ free(buffer);
+ return count;
+}
+
+struct judian_as_group *init_function_group_nodes(const char *judian_name, char **ip_list, int ipnum, struct wysf_global_info *info)
+{
+ struct judian_as_group *group;
+
+ group = (struct judian_as_group *)calloc(1, sizeof(struct judian_as_group));
+ group->conhash = conhash_instance_new(NULL, 0);
+ snprintf(group->groupname, 64, "%s", judian_name);
+
+ group->func_nodes = (struct function_node *)calloc(1, sizeof(struct function_node)*ipnum);
+ group->func_nodes_num = ipnum;
+ for(int i=0; i<ipnum; i++)
+ {
+ inet_pton(AF_INET, ip_list[i], &group->func_nodes[i].ip_as_bucketid);
+ group->func_nodes[i].parent = group;
+ group->func_nodes[i].sinaddr.sin_family = AF_INET;
+ group->func_nodes[i].sinaddr.sin_addr.s_addr = group->func_nodes[i].ip_as_bucketid;
+ group->func_nodes[i].sinaddr.sin_port = htons(info->bfd_dest_port);
+ group->func_nodes[i].dst_ip_str = ip_list[i];
+
+ info->func_keepalive->insert(std::make_pair(group->func_nodes[i].ip_as_bucketid, &group->func_nodes[i]));
+ }
+
+ assert(info->func_group_num < 1024);
+ info->judian_group_list[info->func_group_num++] = group;
+ info->group_judian->insert(std::make_pair(std::string(judian_name), group));
+ return group;
+}
+
+int init_forward_group_nodes(struct judian_as_group *group, const char *judian_name,
+ char **ip_list, int ipnum, struct wysf_global_info *info)
+{
+ group->fwd_nodes = (struct forward_nodes *)calloc(1, sizeof(struct forward_nodes)*ipnum);
+ group->fwd_nodes_num = ipnum;
+ for(int i=0; i<ipnum; i++)
+ {
+ inet_pton(AF_INET, ip_list[i], &group->fwd_nodes[i].dst_ip);
+ group->fwd_nodes[i].parent = group;
+ group->fwd_nodes[i].sinaddr.sin_family = AF_INET;
+ group->fwd_nodes[i].sinaddr.sin_addr.s_addr = group->fwd_nodes[i].dst_ip;
+ group->fwd_nodes[i].sinaddr.sin_port = htons(info->bfd_dest_port);
+ group->fwd_nodes[i].dst_ip_str = ip_list[i];
+ if((group->fwd_nodes[i].udp_sockfd = udp_broadcast_client_socket_init(info->log_runtime)) < 0)
+ {
+ return -1;
+ }
+
+ info->forwardip2judian->insert(std::make_pair(group->fwd_nodes[i].dst_ip, group));
+ }
+ return 0;
+}
+
+int load_config_judian_function_iplist(const char *config_file, struct wysf_global_info *info)
+{
+ char namebuffer[4096], ipbuffer[1024], *judian_name, *save=NULL, **node_iplist;
+ struct judian_as_group *group;
+ int ipnum;
+
+ if(0>=MESA_load_profile_string_nodef(config_file, "MODULE", "judian_name_list", namebuffer, sizeof(namebuffer)))
+ {
+ MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [MODULE]judian_name_list not found!", config_file);
+ assert(0);return -1;
+ }
+
+ for(judian_name=strtok_r(namebuffer, ";", &save); judian_name!=NULL; judian_name=strtok_r(NULL, ";", &save))
+ {
+ if(0>=MESA_load_profile_string_nodef(config_file, judian_name, "judian_func_ip_list", ipbuffer, sizeof(ipbuffer)))
+ {
+ MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]judian_func_ip_list not found!", config_file, judian_name);
+ assert(0);return -1;
+ }
+ if((ipnum = unfold_IP_range(ipbuffer, &node_iplist)) == 0)
+ {
+ MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]judian_func_ip_list is empty!", config_file, judian_name);
+ continue;
+ }
+ group = init_function_group_nodes(judian_name, node_iplist, ipnum, info);
+ free(node_iplist);
+
+ if(0>=MESA_load_profile_string_nodef(config_file, judian_name, "judian_forward_ip_list", ipbuffer, sizeof(ipbuffer)))
+ {
+ MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]judian_forward_ip_list not found!", config_file, judian_name);
+ assert(0);return -1;
+ }
+ if((ipnum=unfold_IP_range(ipbuffer, &node_iplist)) == 0)
+ {
+ MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]judian_forward_ip_list is empty!", config_file, judian_name);
+ continue;
+ }
+ init_forward_group_nodes(group, judian_name, node_iplist, ipnum, info);
+ free(node_iplist);
+ }
+ return 0;
+}
+
+int WYSF_STREAM_PKT_INIT(void)
+{
+ char root_log_name[256];
+ pthread_t thread_desc;
+ pthread_attr_t attr;
+ int log_level;
+ SAPP_TLV_T tlv_value;
+
+ memset(&g_wysf_global_info, 0, sizeof(struct wysf_global_info));
+ g_wysf_global_info.group_judian = new std::map<std::string, struct judian_as_group*>;
+ g_wysf_global_info.func_keepalive = new std::map<int32_t, struct function_node*>;
+ g_wysf_global_info.forwardip2judian = new std::map<int32_t, struct judian_as_group*>;
+
+ MESA_load_profile_string_def(WYSF_CONFIG_FILE, "MODULE", "RUN_LOG_NAME", root_log_name, sizeof(root_log_name), "./log/wysf_runtime.log");
+ MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "RUN_LOG_LV", &log_level, 10);
+ g_wysf_global_info.log_runtime = MESA_create_runtime_log_handle(root_log_name, log_level);
+ if(NULL == g_wysf_global_info.log_runtime)
+ {
+ printf("MESA_create_runtime_log_handle %s failed: %s\n", root_log_name, strerror(errno));
+ return -1;
+ }
+ if(load_config_judian_function_iplist(WYSF_CONFIG_FILE, &g_wysf_global_info))
+ {
+ return -2;
+ }
+
+ MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "broadcast_receive_port", &g_wysf_global_info.broadcast_udp_server_port, 6789);
+
+ MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "bfd_dest_port", &g_wysf_global_info.bfd_dest_port, 8193);
+ MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "bfd_timeout_ms", &g_wysf_global_info.bfd_timeout_ms, 20);
+ MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "bfd_timeout_times", &g_wysf_global_info.bfd_timeout_times, 3);
+ MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "htable_flow_slots", &g_wysf_global_info.flowhtable_slots, 16777216);
+ MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "htable_flow_expires", &g_wysf_global_info.flowhtable_expires, 120);
+ MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "htable_locks_num", &g_wysf_global_info.htable_locknum, 67);
+
+ MESA_load_profile_string_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_APPNAME", g_wysf_global_info.fsstat_appname, 16, "WYSF");
+ MESA_load_profile_string_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_FILEPATH", g_wysf_global_info.fsstat_filepath, 256, "./log/wysf.fs");
+ MESA_load_profile_uint_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_INTERVAL", &g_wysf_global_info.fsstat_period, 5);
+ MESA_load_profile_uint_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_PRINT_MODE", &g_wysf_global_info.fsstat_print_mode, 1);
+ MESA_load_profile_string_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_DST_IP", g_wysf_global_info.fsstat_dst_ip, 64, "127.0.0.1");
+ MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_DST_PORT", &g_wysf_global_info.fsstat_dst_port, 8125);
+ wysf_register_field_stat(&g_wysf_global_info);
+
+ g_wysf_global_info.stream_tuple_mapping = init_and_create_htable(g_wysf_global_info.flowhtable_slots,
+ g_wysf_global_info.flowhtable_expires, g_wysf_global_info.htable_locknum, NULL, NULL);
+
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+ if(pthread_create(&thread_desc, &attr, thread_funcnode_keepalive, NULL))
+ {
+ MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno));
+ assert(0);return -5;
+ }
+ if(pthread_create(&thread_desc, &attr, thread_fwdnode_breoadcast, NULL))
+ {
+ MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno));
+ assert(0);return -5;
+ }
+
+ memset(&tlv_value, 0, sizeof(SAPP_TLV_T));
+ tlv_value.type = GDEV_KEEPALIVE_OPT_GLOBAL_SWITCH;
+ tlv_value.int_value = 0;
+ tlv_value.length = sizeof(int);
+ gdev_keepalive_set_opt(&tlv_value);
+ return 0;
+}
+
+void WYSF_STREAM_PKT_DESTROY(void)
+{
+}
+