summaryrefslogtreecommitdiff
path: root/src/recv.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/recv.c')
-rw-r--r--src/recv.c176
1 files changed, 176 insertions, 0 deletions
diff --git a/src/recv.c b/src/recv.c
new file mode 100644
index 0000000..8a171c4
--- /dev/null
+++ b/src/recv.c
@@ -0,0 +1,176 @@
+
+#include "MESA_list_queue.h"
+#include "MESA_htable.h"
+#include "MESA_handle_logger.h"
+#include "my_socket.h"
+#include "recv.h"
+#include "log.h"
+#include "main.h"
+#include "digest_detection.h"
+
+extern dd_parameter_t g_dd_run;
+extern dd_configure_t g_dd_cfg;
+extern dd_status_t g_dd_stat;
+
+void free_recv_data(recv_data_t* recv_data)
+{
+ if(NULL!=recv_data)
+ {
+ if(NULL!=recv_data->data)
+ {
+ free(recv_data->data);
+ recv_data->data = NULL;
+ }
+ free(recv_data);
+ }
+}
+
+void proc_data(uint32_t thread_id, uint32_t src_ip, char* packet, int size)
+{
+ msg_head_t* mhead = (msg_head_t*)packet;
+ msg_meta_t* minfo = (msg_meta_t*)(packet + sizeof(msg_head_t));
+ av_data_t* mdata = (av_data_t*)(packet + sizeof(msg_head_t));
+ char* data = (char*)mdata + sizeof(av_data_t);
+ char* opt_ptr = NULL;
+ int buf_len = 32;
+ char pbuf[32] = {0};
+ char qbuf[32] = {0};
+
+ /* Ignore invalid packet */
+ if(AV_MAGIC_VALUE != mhead->magic || size <= (int)sizeof(msg_head_t) || size>1600)
+ {
+ atomic_inc(&g_dd_stat.stat_info[INVALID_RECV][TOTAL_PKTS]);
+ atomic_add(&g_dd_stat.stat_info[INVALID_RECV][TOTAL_BYTES],size);
+ return;
+ }
+ /*prco msg*/
+ switch(mhead->m_type)
+ {
+ case AV_TYPE_META:
+ /*write log*/
+ if(RLOG_LV_INFO>=g_dd_run.log_level)
+ {
+ inet_ntop(AF_INET, &src_ip, pbuf, buf_len);
+ inet_ntop(AF_INET, &minfo->capip, qbuf, buf_len);
+ MESA_handle_runtime_log(g_dd_run.logger, RLOG_LV_INFO, DD_MODULE_NAME,
+ "{%s:%d} %s:%u RECV META [PID:%llu, media_len:%llu, media_type:0x%02x]",
+ __FILE__,__LINE__, pbuf, g_dd_cfg.recv_port, *(uint64_t*)minfo->pid, minfo->proglen, minfo->mediatype);
+ }
+ /*avoid uncomplete chunk*/
+ if(mhead->c_len-sizeof(msg_head_t)>2000) return;
+ opt_ptr = packet + sizeof(msg_head_t) + sizeof(msg_meta_t);
+ add_media_info(minfo, opt_ptr, src_ip, thread_id);
+ atomic_inc(&g_dd_stat.stat_info[META_RECV][TOTAL_PKTS]);
+ atomic_add(&g_dd_stat.stat_info[META_RECV][TOTAL_BYTES],size);
+ break;
+
+ case AV_TYPE_DATA:
+ /*write log*/
+ if(RLOG_LV_DEBUG>=g_dd_run.log_level)
+ {
+ inet_ntop(AF_INET, &src_ip, pbuf, buf_len);
+ {
+ MESA_handle_runtime_log(g_dd_run.logger, RLOG_LV_DEBUG, DD_MODULE_NAME,
+ "{%s:%d} %s:%u RECV DATA [PID:%llu, offset:%llu, datalen:%u]",
+ __FILE__,__LINE__, pbuf, g_dd_cfg.recv_port, *(uint64_t*)mdata->pid, mdata->offset, mhead->c_len-sizeof(av_data_t));
+ }
+ }
+ if(mhead->c_len-sizeof(av_data_t)>2000) return;
+ if(mdata->offset>1000000000000) return;
+ add_frag(*(uint64_t*)mdata->pid, mdata->offset,data, mhead->c_len-sizeof(av_data_t), 0, src_ip, thread_id);
+ atomic_inc(&g_dd_stat.stat_info[DATA_RECV][TOTAL_PKTS]);
+ atomic_add(&g_dd_stat.stat_info[DATA_RECV][TOTAL_BYTES],size);
+ break;
+
+ default:
+ atomic_inc(&g_dd_stat.stat_info[OTHER_RECV][TOTAL_PKTS]);
+ atomic_add(&g_dd_stat.stat_info[OTHER_RECV][TOTAL_BYTES],size);
+
+ return;
+ }
+}
+
+void* recv_data_from_queue(void *param)
+{
+ long tid = (long)param;
+ recv_data_t* recv_data = NULL;
+ long recv_data_size = sizeof(recv_data);
+
+ while(1)
+ {
+ recv_data = NULL;
+ int rec = MESA_lqueue_get_head(g_dd_run.recv_lq[tid], &recv_data, &recv_data_size);
+ if (rec<0)
+ {
+ usleep(10);
+ continue;
+ }
+ else
+ {
+ if(NULL!=recv_data && NULL!=recv_data->data)
+ {
+ /*get from queue stat */
+ atomic_inc(&g_dd_stat.sysinfo_stat[RECV_QUEUE][QUEUE_OUT]);
+ proc_data(tid, recv_data->src_ip, recv_data->data, recv_data->size);
+ }
+ free_recv_data(recv_data);
+ }
+ }
+ return NULL;
+}
+
+void* udp_recv_data(void *param)
+{
+ char buf[BUF_SIZE] = {0};
+ int size = 0;
+ uint32_t src_ip = 0;
+ recv_data_t* recv_data = NULL;
+ long tid = 0;
+ int lq_rec = 0;
+ fd_set rset;
+
+ while(1)
+ {
+ FD_ZERO(&rset);
+ FD_SET(g_dd_run.recv_data_sd,&rset);
+ if(-1==select(g_dd_run.recv_data_sd+1,&rset,NULL,NULL,NULL))
+ {
+ continue;
+ }
+ size = recv_udp_socket_recv(g_dd_run.recv_data_sd, &src_ip, (unsigned char*)buf, sizeof(buf));
+ if(size>0)
+ {
+ if(g_dd_stat.sysinfo_stat[RECV_QUEUE][QUEUE_CURRENT]<g_dd_cfg.recv_queue_maxnum)
+ {
+ recv_data = (recv_data_t*)calloc(1, sizeof(recv_data_t));
+ recv_data->size = size;
+ recv_data->src_ip = src_ip;
+ recv_data->data = (char*)malloc(size);
+ memcpy(recv_data->data, buf, size);
+ /*data[15] = the last byte of PID*/
+ tid = recv_data->data[15]%g_dd_cfg.thread_num;
+ lq_rec = MESA_lqueue_join_tail(g_dd_run.recv_lq[tid], &recv_data, sizeof(recv_data));
+ if(lq_rec==MESA_QUEUE_RET_OK)
+ {
+ /*add to queue stat */
+ atomic_inc(&g_dd_stat.sysinfo_stat[RECV_QUEUE][QUEUE_IN]);
+ }
+ else
+ {
+ free_recv_data(recv_data);
+ atomic_inc(&g_dd_stat.stat_info[RECV_DROP][TOTAL_PKTS]);
+ atomic_add(&g_dd_stat.stat_info[RECV_DROP][TOTAL_BYTES], size);
+ }
+ }
+ else
+ {
+ atomic_inc(&g_dd_stat.stat_info[RECV_DROP][TOTAL_PKTS]);
+ atomic_add(&g_dd_stat.stat_info[RECV_DROP][TOTAL_BYTES], size);
+ }
+ atomic_inc(&g_dd_stat.stat_info[RECV][TOTAL_PKTS]);
+ atomic_add(&g_dd_stat.stat_info[RECV][TOTAL_BYTES], size);
+ }
+ }
+ return NULL;
+}
+