diff options
| author | [email protected] <[email protected]> | 2021-11-02 12:34:05 +0800 |
|---|---|---|
| committer | [email protected] <[email protected]> | 2021-11-02 12:34:05 +0800 |
| commit | 31f55f0b88d4af34a8a36497f5e49c69b88b2fbf (patch) | |
| tree | 63515b3ceb361369cdc88ae6db1a808fc80e5b42 /src/wy_singleflow_keepalive.cpp | |
Diffstat (limited to 'src/wy_singleflow_keepalive.cpp')
| -rw-r--r-- | src/wy_singleflow_keepalive.cpp | 237 |
1 files changed, 237 insertions, 0 deletions
diff --git a/src/wy_singleflow_keepalive.cpp b/src/wy_singleflow_keepalive.cpp new file mode 100644 index 0000000..2c48705 --- /dev/null +++ b/src/wy_singleflow_keepalive.cpp @@ -0,0 +1,237 @@ +#include <sys/stat.h> +#include <sys/types.h> +#include <unistd.h> +#include <stdio.h> +#include <stdlib.h> +#include <assert.h> +#include <errno.h> +#include <pthread.h> +#include <string.h> +#include <sys/ioctl.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <arpa/inet.h> +#include <sys/prctl.h> + +#include "wy_singleflow_entry.h" +#include "wy_singleflow_keepalive.h" + +#define DEFAULT_HOST_CAPACITY 4 +#define LOAD_BALANC_VIRT_TIMES 16 + +extern struct wysf_global_info g_wysf_global_info; + + +static void conhash_delay_destroy_timer_cb(int fd, short kind, void *userp) +{ + struct time_event *delay_event=(struct time_event *)userp; + + conhash_instance_free(delay_event->conhash); + free(delay_event); +} + +static void load_balance_common_timer_start(struct event *time_event) +{ + struct timeval tv; + + tv.tv_sec = 2; + tv.tv_usec = 0; + evtimer_add(time_event, &tv); +} + +static void conhash_handle_delay_destroy(struct event_base *evbase, struct consistent_hash *conhash) +{ + struct time_event *delay_event; + + delay_event = (struct time_event *)malloc(sizeof(struct time_event)); + delay_event->conhash = conhash; + evtimer_assign(&delay_event->timer_event, evbase, conhash_delay_destroy_timer_cb, delay_event); + load_balance_common_timer_start(&delay_event->timer_event); +} + +static void conhash_insert_dest_host(struct judian_as_group *func_group, int32_t bucketid) +{ + struct conhash_bucket bucket; + struct consistent_hash *tmphash, *newhash=NULL; + enum CONHASH_ERRCODE code; + + bucket.bucket_id = bucketid; + bucket.point_num = DEFAULT_HOST_CAPACITY * LOAD_BALANC_VIRT_TIMES;; + bucket.tag = NULL; + + newhash = conhash_instance_copy(func_group->conhash); + code = conhash_insert_bucket(newhash, &bucket); + assert(code == CONHASH_OK); + + tmphash = func_group->conhash; + func_group->conhash = newhash; + conhash_handle_delay_destroy(g_wysf_global_info.alive_evbase , tmphash); +} + +static void conhash_remove_dest_host(struct judian_as_group *func_group, int32_t bucketid) +{ + struct consistent_hash *tmphash, *newhash=NULL; + enum CONHASH_ERRCODE code; + + newhash = conhash_instance_copy(func_group->conhash); + code = conhash_remove_bucket(newhash, bucketid, NULL); + assert(code == CONHASH_OK || code==CONHASH_BUCKET_NOT_FOUND); + + tmphash = func_group->conhash; + func_group->conhash = newhash; + conhash_handle_delay_destroy(g_wysf_global_info.alive_evbase, tmphash); +} + +static int udp_keepalive_client_socket_init(void) +{ + int on = 1, socket_fd; + + if((socket_fd = socket(AF_INET,SOCK_DGRAM,0))<0) + { + return -1; + } + on = 5242880; + setsockopt(socket_fd, SOL_SOCKET, SO_SNDBUF, &on, sizeof(on)); + evutil_make_socket_nonblocking(socket_fd); + return socket_fd; +} + +void func_client_read_bfdres_cb(evutil_socket_t fd, short events, void *arg) +{ + struct function_node *nodes=(struct function_node *)arg; + struct sockaddr_in saddr; + char buffer[2048]; + int from_len, buf_len; + SAPP_TLV_T tlv_value; + + from_len = sizeof (struct sockaddr); + while((buf_len = recvfrom (fd, buffer, 65536, 0, (struct sockaddr*)&saddr, (socklen_t *)&from_len))>0) + { + //BFD packet validate + nodes->retry_times = 0; + if(!nodes->conhash_inserted) + { + conhash_insert_dest_host(nodes->parent, nodes->ip_as_bucketid); + nodes->conhash_inserted = 1; + if(g_wysf_global_info.alive_func_nodes == 0) + { + memset(&tlv_value, 0, sizeof(SAPP_TLV_T)); + tlv_value.type = GDEV_KEEPALIVE_OPT_GLOBAL_SWITCH; + tlv_value.int_value = 1; + tlv_value.length = sizeof(int); + gdev_keepalive_set_opt(&tlv_value); + } + g_wysf_global_info.alive_func_nodes += 1; + MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_DEBUG, "keepalive ip %s inserted", nodes->dst_ip_str); + } + from_len = sizeof (struct sockaddr); + } +} + +void func_client_send_bfd_packet(struct function_node *nodes, const char *data, int datalen, struct sockaddr *dstaddr) +{ + nodes->retry_times++; + + if(datalen != sendto(nodes->udp_sockfd, data, datalen, 0, (struct sockaddr*)&nodes->sinaddr, sizeof(struct sockaddr))) + { + MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_DEBUG, "sendto ip %s failed: %s", nodes->dst_ip_str, strerror(errno)); + } +} + +static void func_client_alive_timer_cb(int fd, short kind, void *userp) +{ + struct function_node *nodes=(struct function_node *)userp; + struct timeval tv; + SAPP_TLV_T tlv_value; + + func_client_send_bfd_packet(nodes, NULL, 0, (struct sockaddr *)&nodes->sinaddr); //todo + + if(nodes->conhash_inserted && nodes->retry_times >= g_wysf_global_info.bfd_timeout_times) + { + if(--g_wysf_global_info.alive_func_nodes == 0) + { + memset(&tlv_value, 0, sizeof(SAPP_TLV_T)); + tlv_value.type = GDEV_KEEPALIVE_OPT_GLOBAL_SWITCH; + tlv_value.int_value = 0; + tlv_value.length = sizeof(int); + gdev_keepalive_set_opt(&tlv_value); + } + conhash_remove_dest_host(nodes->parent, nodes->ip_as_bucketid); + nodes->conhash_inserted = 0; + MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_DEBUG, "keepalive ip %s removed", nodes->dst_ip_str); + } + + tv.tv_sec = 0; + tv.tv_usec = g_wysf_global_info.bfd_timeout_ms * 1000; + evtimer_add(&nodes->alive_detect_timer, &tv); +} + +static void finger_fs_output_timer_cb(int fd, short kind, void *userp) +{ + struct timeval tv; + u_int32_t bucket_num=0; + SAPP_TLV_T tlv_value; + + for(int32_t i=0; i<g_wysf_global_info.func_group_num; i++) + { + bucket_num += conhash_get_bucket_num(g_wysf_global_info.judian_group_list[i]->conhash); + } + FS_operate(g_wysf_global_info.fsstat_handle, g_wysf_global_info.fsstat_status_ids[WYSF_FSSTAT_STATUS_ACTIVE_FUNC], 0, FS_OP_SET, bucket_num); + + bucket_num = MESA_htable_get_elem_num(g_wysf_global_info.stream_tuple_mapping); + FS_operate(g_wysf_global_info.fsstat_handle, g_wysf_global_info.fsstat_status_ids[WYSF_FSSTAT_STATUS_FLOW_TABLE], 0, FS_OP_SET, bucket_num); + + memset(&tlv_value, 0, sizeof(SAPP_TLV_T)); + tlv_value.type = GDEV_KEEPALIVE_OPT_GLOBAL_SWITCH; + tlv_value.length = sizeof(int); + gdev_keepalive_get_opt(&tlv_value); + FS_operate(g_wysf_global_info.fsstat_handle, g_wysf_global_info.fsstat_status_ids[WYSF_FSSTAT_STATUS_KEEPALIVE], 0, FS_OP_SET, tlv_value.int_value); + + FS_passive_output(g_wysf_global_info.fsstat_handle); + tv.tv_sec = g_wysf_global_info.fsstat_period; + tv.tv_usec = 0; + evtimer_add(&g_wysf_global_info.fs_timer_output, &tv); +} + +void *thread_funcnode_keepalive(void *arg) +{ + struct event_base *evbase; + struct judian_as_group *func_group; + struct timeval tv; + + prctl(PR_SET_NAME, "wysf_alive"); + + g_wysf_global_info.alive_evbase = evbase = event_base_new(); + + for(int i=0; i<g_wysf_global_info.func_group_num; i++) + { + func_group = g_wysf_global_info.judian_group_list[i]; + for(int j=0; j<func_group->func_nodes_num; j++) + { + func_group->func_nodes[i].udp_sockfd = udp_keepalive_client_socket_init(); + event_assign(&func_group->func_nodes[i].msgevent, evbase, func_group->func_nodes[i].udp_sockfd, EV_READ|EV_PERSIST, + func_client_read_bfdres_cb, &func_group->func_nodes[i]); + event_add(&func_group->func_nodes[i].msgevent, NULL); + + func_client_send_bfd_packet(&func_group->func_nodes[i], NULL, 0, (struct sockaddr *)&func_group->func_nodes[i].sinaddr); //todo + + tv.tv_sec = 0; + tv.tv_usec = g_wysf_global_info.bfd_timeout_ms * 1000; + evtimer_assign(&func_group->func_nodes[i].alive_detect_timer, evbase, func_client_alive_timer_cb, &func_group->func_nodes[i]); + evtimer_add(&func_group->func_nodes[i].alive_detect_timer, &tv); + } + } + + evtimer_assign(&g_wysf_global_info.fs_timer_output, evbase, finger_fs_output_timer_cb, NULL); + tv.tv_sec = g_wysf_global_info.fsstat_period; + tv.tv_usec = 0; + evtimer_add(&g_wysf_global_info.fs_timer_output, &tv); + + MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_FATAL, "keepalive thread started."); + event_base_dispatch(evbase); + printf("Libevent dispath error, should not run here.\n"); + MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_FATAL, "Libevent dispath error, should not run here."); + assert(0);return NULL; +} + |
