diff options
| author | [email protected] <[email protected]> | 2021-11-02 12:34:05 +0800 |
|---|---|---|
| committer | [email protected] <[email protected]> | 2021-11-02 12:34:05 +0800 |
| commit | 31f55f0b88d4af34a8a36497f5e49c69b88b2fbf (patch) | |
| tree | 63515b3ceb361369cdc88ae6db1a808fc80e5b42 /src/wy_singleflow_broadcast.cpp | |
Diffstat (limited to 'src/wy_singleflow_broadcast.cpp')
| -rw-r--r-- | src/wy_singleflow_broadcast.cpp | 126 |
1 files changed, 126 insertions, 0 deletions
diff --git a/src/wy_singleflow_broadcast.cpp b/src/wy_singleflow_broadcast.cpp new file mode 100644 index 0000000..3cc86c4 --- /dev/null +++ b/src/wy_singleflow_broadcast.cpp @@ -0,0 +1,126 @@ +#include <sys/stat.h> +#include <sys/types.h> +#include <unistd.h> +#include <stdio.h> +#include <stdlib.h> +#include <assert.h> +#include <errno.h> +#include <pthread.h> +#include <string.h> +#include <sys/ioctl.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <arpa/inet.h> +#include <sys/prctl.h> + +#include "wy_singleflow_entry.h" + + +extern struct wysf_global_info g_wysf_global_info; + +static int udp_broadcast_server_socket_init(void *log_runtime, u_int16_t port) +{ + struct sockaddr_in servaddr; + int on = 1, socket_fd; + + if((socket_fd = socket(AF_INET,SOCK_DGRAM,0))<0)//AF_INET:IPV4 AF_INET6:IPV6 + { + return -1; + } + if(setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))<0) + { + MESA_RUNTIME_LOGV3(log_runtime, RLOG_LV_FATAL, "setsockopt for udp_port %u failed, because:%s!!", port, strerror(errno)); + } + on = 5242880; + if(setsockopt(socket_fd, SOL_SOCKET, SO_RCVBUF, &on, sizeof(on))<0) + { + MESA_RUNTIME_LOGV3(log_runtime, RLOG_LV_FATAL, "setsockopt for udp_port %u failed, because:%s!!", port, strerror(errno)); + } + + bzero(&servaddr,sizeof(servaddr)); + servaddr.sin_family = AF_INET; + servaddr.sin_addr.s_addr = htonl(INADDR_ANY); + servaddr.sin_port = htons(port); + + if(bind(socket_fd,(struct sockaddr *)&servaddr,sizeof(servaddr))<0) + { + MESA_RUNTIME_LOGV3(log_runtime, RLOG_LV_FATAL, "bind udp socket port %u failed, because: %s!!", port, strerror(errno)); + close(socket_fd); + return -2; + } + evutil_make_socket_nonblocking(socket_fd); + return socket_fd; +} + +static long broadcast_stream_tuple_htable_cb(void *data, const uchar *key, uint size, void *user_arg) +{ + int32_t clj_ip=*(int32_t *)user_arg; + std::map<int32_t, struct judian_as_group*>::iterator iter; + struct judian_as_group *group; + + if((iter = g_wysf_global_info.forwardip2judian->find(clj_ip)) == g_wysf_global_info.forwardip2judian->end()) + { + char ipbuffer[32]; + inet_ntop(AF_INET, &clj_ip, ipbuffer, 32); + MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_FATAL, "lookup group for clj_ip: %s not found!!", ipbuffer); + return -1; + } + group = iter->second; + + if(data != NULL) + { + MESA_htable_del(g_wysf_global_info.stream_tuple_mapping, key, size, NULL); + } + MESA_htable_add(g_wysf_global_info.stream_tuple_mapping, key, size, group); + return 0; +} + +void udp_server_read_broadcast_cb(evutil_socket_t fd, short events, void *arg) +{ + struct udp_broadcast_msg *broadmsg; + char buffer[4096]; + int from_len = 0, buf_len, clj_ip; + struct sockaddr_in saddr; + long cb_ret; + + from_len = sizeof (struct sockaddr); + while((buf_len = recvfrom (fd, buffer, 4096, 0, (struct sockaddr*)&saddr, (socklen_t *)&from_len))>0) + { + broadmsg = (struct udp_broadcast_msg *)buffer; + if((u_int32_t)buf_len!=sizeof(struct udp_broadcast_msg) || broadmsg->header.magic_number != WYSF_MAGIC_NUMBER || + broadmsg->header.total_len!=sizeof(struct broadcast_message)) + { + continue; + } + + clj_ip = broadmsg->msg.clj_ip; + broadmsg->msg.clj_ip = 0; + MESA_htable_search_cb(g_wysf_global_info.stream_tuple_mapping, (uchar*)&broadmsg->msg, sizeof(struct broadcast_message), + broadcast_stream_tuple_htable_cb, &clj_ip, &cb_ret); + + from_len = sizeof (struct sockaddr); + } +} + +void *thread_fwdnode_breoadcast(void *arg) +{ + struct event_base *evbase; + struct event udp_server_event; + + evbase = event_base_new(); + + if((g_wysf_global_info.broadcast_udp_servert_sockfd = udp_broadcast_server_socket_init(g_wysf_global_info.log_runtime, g_wysf_global_info.broadcast_udp_server_port)) < 0) + { + assert(0);return NULL; + } + event_assign(&udp_server_event, evbase, g_wysf_global_info.broadcast_udp_servert_sockfd, + EV_READ|EV_PERSIST, udp_server_read_broadcast_cb, NULL); + event_add(&udp_server_event, NULL); + + event_base_dispatch(evbase); + printf("Libevent dispath error, should not run here.\n"); + MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_FATAL, "Libevent dispath error, should not run here."); + assert(0);return NULL; +} + |
