summaryrefslogtreecommitdiff
path: root/src/wy_singleflow_keepalive.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/wy_singleflow_keepalive.cpp')
-rw-r--r--src/wy_singleflow_keepalive.cpp237
1 files changed, 237 insertions, 0 deletions
diff --git a/src/wy_singleflow_keepalive.cpp b/src/wy_singleflow_keepalive.cpp
new file mode 100644
index 0000000..2c48705
--- /dev/null
+++ b/src/wy_singleflow_keepalive.cpp
@@ -0,0 +1,237 @@
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <errno.h>
+#include <pthread.h>
+#include <string.h>
+#include <sys/ioctl.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <arpa/inet.h>
+#include <sys/prctl.h>
+
+#include "wy_singleflow_entry.h"
+#include "wy_singleflow_keepalive.h"
+
+#define DEFAULT_HOST_CAPACITY 4
+#define LOAD_BALANC_VIRT_TIMES 16
+
+extern struct wysf_global_info g_wysf_global_info;
+
+
+static void conhash_delay_destroy_timer_cb(int fd, short kind, void *userp)
+{
+ struct time_event *delay_event=(struct time_event *)userp;
+
+ conhash_instance_free(delay_event->conhash);
+ free(delay_event);
+}
+
+static void load_balance_common_timer_start(struct event *time_event)
+{
+ struct timeval tv;
+
+ tv.tv_sec = 2;
+ tv.tv_usec = 0;
+ evtimer_add(time_event, &tv);
+}
+
+static void conhash_handle_delay_destroy(struct event_base *evbase, struct consistent_hash *conhash)
+{
+ struct time_event *delay_event;
+
+ delay_event = (struct time_event *)malloc(sizeof(struct time_event));
+ delay_event->conhash = conhash;
+ evtimer_assign(&delay_event->timer_event, evbase, conhash_delay_destroy_timer_cb, delay_event);
+ load_balance_common_timer_start(&delay_event->timer_event);
+}
+
+static void conhash_insert_dest_host(struct judian_as_group *func_group, int32_t bucketid)
+{
+ struct conhash_bucket bucket;
+ struct consistent_hash *tmphash, *newhash=NULL;
+ enum CONHASH_ERRCODE code;
+
+ bucket.bucket_id = bucketid;
+ bucket.point_num = DEFAULT_HOST_CAPACITY * LOAD_BALANC_VIRT_TIMES;;
+ bucket.tag = NULL;
+
+ newhash = conhash_instance_copy(func_group->conhash);
+ code = conhash_insert_bucket(newhash, &bucket);
+ assert(code == CONHASH_OK);
+
+ tmphash = func_group->conhash;
+ func_group->conhash = newhash;
+ conhash_handle_delay_destroy(g_wysf_global_info.alive_evbase , tmphash);
+}
+
+static void conhash_remove_dest_host(struct judian_as_group *func_group, int32_t bucketid)
+{
+ struct consistent_hash *tmphash, *newhash=NULL;
+ enum CONHASH_ERRCODE code;
+
+ newhash = conhash_instance_copy(func_group->conhash);
+ code = conhash_remove_bucket(newhash, bucketid, NULL);
+ assert(code == CONHASH_OK || code==CONHASH_BUCKET_NOT_FOUND);
+
+ tmphash = func_group->conhash;
+ func_group->conhash = newhash;
+ conhash_handle_delay_destroy(g_wysf_global_info.alive_evbase, tmphash);
+}
+
+static int udp_keepalive_client_socket_init(void)
+{
+ int on = 1, socket_fd;
+
+ if((socket_fd = socket(AF_INET,SOCK_DGRAM,0))<0)
+ {
+ return -1;
+ }
+ on = 5242880;
+ setsockopt(socket_fd, SOL_SOCKET, SO_SNDBUF, &on, sizeof(on));
+ evutil_make_socket_nonblocking(socket_fd);
+ return socket_fd;
+}
+
+void func_client_read_bfdres_cb(evutil_socket_t fd, short events, void *arg)
+{
+ struct function_node *nodes=(struct function_node *)arg;
+ struct sockaddr_in saddr;
+ char buffer[2048];
+ int from_len, buf_len;
+ SAPP_TLV_T tlv_value;
+
+ from_len = sizeof (struct sockaddr);
+ while((buf_len = recvfrom (fd, buffer, 65536, 0, (struct sockaddr*)&saddr, (socklen_t *)&from_len))>0)
+ {
+ //BFD packet validate
+ nodes->retry_times = 0;
+ if(!nodes->conhash_inserted)
+ {
+ conhash_insert_dest_host(nodes->parent, nodes->ip_as_bucketid);
+ nodes->conhash_inserted = 1;
+ if(g_wysf_global_info.alive_func_nodes == 0)
+ {
+ memset(&tlv_value, 0, sizeof(SAPP_TLV_T));
+ tlv_value.type = GDEV_KEEPALIVE_OPT_GLOBAL_SWITCH;
+ tlv_value.int_value = 1;
+ tlv_value.length = sizeof(int);
+ gdev_keepalive_set_opt(&tlv_value);
+ }
+ g_wysf_global_info.alive_func_nodes += 1;
+ MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_DEBUG, "keepalive ip %s inserted", nodes->dst_ip_str);
+ }
+ from_len = sizeof (struct sockaddr);
+ }
+}
+
+void func_client_send_bfd_packet(struct function_node *nodes, const char *data, int datalen, struct sockaddr *dstaddr)
+{
+ nodes->retry_times++;
+
+ if(datalen != sendto(nodes->udp_sockfd, data, datalen, 0, (struct sockaddr*)&nodes->sinaddr, sizeof(struct sockaddr)))
+ {
+ MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_DEBUG, "sendto ip %s failed: %s", nodes->dst_ip_str, strerror(errno));
+ }
+}
+
+static void func_client_alive_timer_cb(int fd, short kind, void *userp)
+{
+ struct function_node *nodes=(struct function_node *)userp;
+ struct timeval tv;
+ SAPP_TLV_T tlv_value;
+
+ func_client_send_bfd_packet(nodes, NULL, 0, (struct sockaddr *)&nodes->sinaddr); //todo
+
+ if(nodes->conhash_inserted && nodes->retry_times >= g_wysf_global_info.bfd_timeout_times)
+ {
+ if(--g_wysf_global_info.alive_func_nodes == 0)
+ {
+ memset(&tlv_value, 0, sizeof(SAPP_TLV_T));
+ tlv_value.type = GDEV_KEEPALIVE_OPT_GLOBAL_SWITCH;
+ tlv_value.int_value = 0;
+ tlv_value.length = sizeof(int);
+ gdev_keepalive_set_opt(&tlv_value);
+ }
+ conhash_remove_dest_host(nodes->parent, nodes->ip_as_bucketid);
+ nodes->conhash_inserted = 0;
+ MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_DEBUG, "keepalive ip %s removed", nodes->dst_ip_str);
+ }
+
+ tv.tv_sec = 0;
+ tv.tv_usec = g_wysf_global_info.bfd_timeout_ms * 1000;
+ evtimer_add(&nodes->alive_detect_timer, &tv);
+}
+
+static void finger_fs_output_timer_cb(int fd, short kind, void *userp)
+{
+ struct timeval tv;
+ u_int32_t bucket_num=0;
+ SAPP_TLV_T tlv_value;
+
+ for(int32_t i=0; i<g_wysf_global_info.func_group_num; i++)
+ {
+ bucket_num += conhash_get_bucket_num(g_wysf_global_info.judian_group_list[i]->conhash);
+ }
+ FS_operate(g_wysf_global_info.fsstat_handle, g_wysf_global_info.fsstat_status_ids[WYSF_FSSTAT_STATUS_ACTIVE_FUNC], 0, FS_OP_SET, bucket_num);
+
+ bucket_num = MESA_htable_get_elem_num(g_wysf_global_info.stream_tuple_mapping);
+ FS_operate(g_wysf_global_info.fsstat_handle, g_wysf_global_info.fsstat_status_ids[WYSF_FSSTAT_STATUS_FLOW_TABLE], 0, FS_OP_SET, bucket_num);
+
+ memset(&tlv_value, 0, sizeof(SAPP_TLV_T));
+ tlv_value.type = GDEV_KEEPALIVE_OPT_GLOBAL_SWITCH;
+ tlv_value.length = sizeof(int);
+ gdev_keepalive_get_opt(&tlv_value);
+ FS_operate(g_wysf_global_info.fsstat_handle, g_wysf_global_info.fsstat_status_ids[WYSF_FSSTAT_STATUS_KEEPALIVE], 0, FS_OP_SET, tlv_value.int_value);
+
+ FS_passive_output(g_wysf_global_info.fsstat_handle);
+ tv.tv_sec = g_wysf_global_info.fsstat_period;
+ tv.tv_usec = 0;
+ evtimer_add(&g_wysf_global_info.fs_timer_output, &tv);
+}
+
+void *thread_funcnode_keepalive(void *arg)
+{
+ struct event_base *evbase;
+ struct judian_as_group *func_group;
+ struct timeval tv;
+
+ prctl(PR_SET_NAME, "wysf_alive");
+
+ g_wysf_global_info.alive_evbase = evbase = event_base_new();
+
+ for(int i=0; i<g_wysf_global_info.func_group_num; i++)
+ {
+ func_group = g_wysf_global_info.judian_group_list[i];
+ for(int j=0; j<func_group->func_nodes_num; j++)
+ {
+ func_group->func_nodes[i].udp_sockfd = udp_keepalive_client_socket_init();
+ event_assign(&func_group->func_nodes[i].msgevent, evbase, func_group->func_nodes[i].udp_sockfd, EV_READ|EV_PERSIST,
+ func_client_read_bfdres_cb, &func_group->func_nodes[i]);
+ event_add(&func_group->func_nodes[i].msgevent, NULL);
+
+ func_client_send_bfd_packet(&func_group->func_nodes[i], NULL, 0, (struct sockaddr *)&func_group->func_nodes[i].sinaddr); //todo
+
+ tv.tv_sec = 0;
+ tv.tv_usec = g_wysf_global_info.bfd_timeout_ms * 1000;
+ evtimer_assign(&func_group->func_nodes[i].alive_detect_timer, evbase, func_client_alive_timer_cb, &func_group->func_nodes[i]);
+ evtimer_add(&func_group->func_nodes[i].alive_detect_timer, &tv);
+ }
+ }
+
+ evtimer_assign(&g_wysf_global_info.fs_timer_output, evbase, finger_fs_output_timer_cb, NULL);
+ tv.tv_sec = g_wysf_global_info.fsstat_period;
+ tv.tv_usec = 0;
+ evtimer_add(&g_wysf_global_info.fs_timer_output, &tv);
+
+ MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_FATAL, "keepalive thread started.");
+ event_base_dispatch(evbase);
+ printf("Libevent dispath error, should not run here.\n");
+ MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_FATAL, "Libevent dispath error, should not run here.");
+ assert(0);return NULL;
+}
+