summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/AV_interface.h2
-rw-r--r--src/frag_json.h2
-rw-r--r--src/frag_reassembly_in.h2
-rw-r--r--src/frag_voip.c281
-rw-r--r--src/frag_voip.h5
-rw-r--r--src/main.c57
-rw-r--r--src/message.c4
7 files changed, 327 insertions, 26 deletions
diff --git a/src/AV_interface.h b/src/AV_interface.h
index e65dd9f..67c1869 100644
--- a/src/AV_interface.h
+++ b/src/AV_interface.h
@@ -1,7 +1,7 @@
#ifndef _AV_INTERFACE_H
#define _AV_INTERFACE_H
-#define K_PROJECT 0 //�Ƿ���K��Ŀ����Ҫ�ǽ���Ľӿڲ�һ��
+#define K_PROJECT 1 //�Ƿ���K��Ŀ����Ҫ�ǽ���Ľӿڲ�һ��
#define PROTO_VERSION 3
#define PROTO_MAGICNUM 0x5641
diff --git a/src/frag_json.h b/src/frag_json.h
index 1570bfb..2e905fa 100644
--- a/src/frag_json.h
+++ b/src/frag_json.h
@@ -15,6 +15,8 @@
#define TOPIC_VOIP_EXPIRE_JSON "VOIP_EXPIRE_INFO"
#define TOPIC_VOIP_SURVEY_JSON "VOIP_SURVEY_INFO"
+#define TOPIC_MM_SAMPLE_VOIP_LOG "MM-SAMPLE-VOIP-LOG"
+#define TOPIC_NTC_COLLECT_VOIP_LOG "NTC-COLLECT-VOIP-LOG"
#ifdef __cplusplus
extern "C" {
#endif
diff --git a/src/frag_reassembly_in.h b/src/frag_reassembly_in.h
index 803b78f..c8d3691 100644
--- a/src/frag_reassembly_in.h
+++ b/src/frag_reassembly_in.h
@@ -111,7 +111,7 @@ typedef enum
/*SIPѡ�����*/
#define SIP_OPT_NUM 32
/*SIP��ѯredis���������*/
-#define SIP_REDIS_CMMD_NUM 34
+#define SIP_REDIS_CMMD_NUM 35
/*SIP��ѡ��*/
typedef struct sip_opt_s
{
diff --git a/src/frag_voip.c b/src/frag_voip.c
index 52ad14a..3e2fd81 100644
--- a/src/frag_voip.c
+++ b/src/frag_voip.c
@@ -35,6 +35,7 @@
#include "frag_redis.h"
#include "frag_proc.h"
#include "frag_av.h"
+#include "frag_json.h"
#include "frag_voip.h"
#include "log.h"
#include "field_stat2.h"
@@ -86,6 +87,7 @@ const char* g_sip_cmmd_argv[SIP_REDIS_CMMD_NUM] =
"C_Record_Route",
"C_Route",
"Rescode",
+ "CAPIP",
};
/*opt_type is fulllog opt_type*/
@@ -124,6 +126,8 @@ sip_opt_t g_sip_opt_type[SIP_OPT_NUM] =
{ "C_Record_Route", 0x27}, //28
{ "C_Route", 0x28}, //29
{ "Rescode", 0xFF}, //30
+
+ { "CAPIP", 0x2C}, //31
};
void free_media_sip(media_t* mdi)
@@ -463,7 +467,50 @@ int sip_send_survey_log(media_t* mdi, char* survey, uint32_t survey_len)
OPT_VOIP_RELATION_RTP_LAYER_ADDR_V4,
&log_msg_body->opt_num);
}
-
+
+ /*opt : OPT_VOIP_VOICE_DIR*/
+ memset(opt_buf, 0, sizeof(opt_buf));
+ snprintf(opt_buf, sizeof(opt_buf), "%d", mdi->re_offset);
+ used_len += get_log_opt_unit(opt_buf,
+ strlen(opt_buf),
+ data+used_len,
+ data_len-used_len,
+ OPT_VOIP_VOICE_DIR,
+ &log_msg_body->opt_num);
+ /*opt : OPT_VOIP_CAP_IP*/
+ uint32_t ip = 0;
+ char ip_buf[32] = {0};
+ char cap_ip_buf[128] = {0};
+ for(int i=0;i<QD_MAXNUM;i++)
+ {
+ ip = mdi->qd_info[i].cap_ip;
+ if(!ip)
+ {
+ continue;
+ }
+ inet_ntop(AF_INET, &ip, ip_buf, 32);
+ if(i != 0)
+ {
+ strcat(cap_ip_buf,",");
+
+ }
+ strcat(cap_ip_buf,ip_buf);
+ }
+ char sip_capip[128]={0};
+ if(NULL!=mdi->sip_opt[SIP_CAPIP_OPT_INDEX])
+ {
+ memcpy(sip_capip,mdi->sip_opt[SIP_CAPIP_OPT_INDEX]->opt_value,mdi->sip_opt[SIP_CAPIP_OPT_INDEX]->opt_len);
+ strcat(cap_ip_buf,sip_capip);
+ }
+ used_len += get_log_opt_unit(cap_ip_buf,
+ strlen(cap_ip_buf),
+ data+used_len,
+ data_len-used_len,
+ OPT_VOIP_CAP_IP,
+ &log_msg_body->opt_num);
+
+
+
log_msg_header->cont_len = used_len - sizeof(sip_log_msg_header_t);
assert(used_len<(int)sizeof(data));
@@ -774,6 +821,38 @@ int sip_send_full_log(media_t* mdi)
data_len-used_len,
OPT_VOIP_VOICE_DIR_FULLLOG,
&log_msg_body->opt_num);
+ /*opt : OPT_VOIP_CAP_IP*/
+ uint32_t ip = 0;
+ char ip_buf[32] = {0};
+ char cap_ip_buf[128] = {0};
+ for(int i=0;i<QD_MAXNUM;i++)
+ {
+ ip = mdi->qd_info[i].cap_ip;
+ if(!ip)
+ {
+ continue;
+ }
+ inet_ntop(AF_INET, &ip, ip_buf, 32);
+ if(i != 0)
+ {
+ strcat(cap_ip_buf,",");
+
+ }
+ strcat(cap_ip_buf,ip_buf);
+ }
+ char sip_capip[128]={0};
+ if(NULL!=mdi->sip_opt[SIP_CAPIP_OPT_INDEX])
+ {
+ memcpy(sip_capip,mdi->sip_opt[SIP_CAPIP_OPT_INDEX]->opt_value,mdi->sip_opt[SIP_CAPIP_OPT_INDEX]->opt_len);
+ strcat(cap_ip_buf,sip_capip);
+ }
+ used_len += get_log_opt_unit(cap_ip_buf,
+ strlen(cap_ip_buf),
+ data+used_len,
+ data_len-used_len,
+ OPT_VOIP_CAP_IP_FULLLOG,
+ &log_msg_body->opt_num);
+
log_msg_header->cont_len = used_len - sizeof(sip_log_msg_header_t);
assert(used_len<(int)sizeof(data));
@@ -811,9 +890,209 @@ void set_frag_unit_from_media(media_t* mdi, frag_unit_t* frg_unit)
frg_unit->sip_data_dir = mdi->sip_opt[SIP_RTP_4TUPLE_OPT_INDEX];
}
+void send_voip_full_json_log(media_t* mdi)
+{
+ if(NULL == mdi)
+ return;
+
+ cJSON* root = NULL;
+
+ string topic_name = TOPIC_NTC_COLLECT_VOIP_LOG;
+ char pid_buf[64] = {0};
+ char pbuf[32] = {0};
+
+ char* outbuf = NULL;
+ int len = 0;
+
+ root = cJSON_CreateObject();
+
+ snprintf(pid_buf, sizeof(pid_buf), "%llu", mdi->mid);
+ cJSON_AddStringToObject(root, "pid", pid_buf);
+ cJSON_AddNumberToObject(root, "found_time", mdi->create_time);
+ cJSON_AddNumberToObject(root, "recv_time", time(NULL));
+ if(mdi->re_offset)
+ {
+ cJSON_AddStringToObject(root, "voip_protocol", SIP_PROTO_SIP);
+ }
+ else
+ {
+ cJSON_AddStringToObject(root, "voip_protocol", SIP_PROTO_RTP);
+ }
+
+ memset(pbuf, 0, sizeof(pbuf));
+ inet_ntop(AF_INET, &g_frag_cfg.local_ip_nr, pbuf, sizeof(pbuf));
+ cJSON_AddStringToObject(root, "cap_ip", pbuf);
+
+ memset(pbuf, 0, sizeof(pbuf));
+ snprintf(pbuf, sizeof(pbuf), "%" PRIu64 "", mdi->lastpkt_time - mdi->create_time);
+ cJSON_AddStringToObject(root, "duation", pbuf);
+
+ char src_ip[32] = {0};
+ char src_port[8] = {0};
+ char dst_ip[32] = {0};
+ char dst_port[8] = {0};
+ char ip_4tuple[128] = {0};
+
+ if((NULL != mdi->sip_opt[SIP_RTP_4TUPLE_OPT_INDEX])&&(mdi->sip_opt[SIP_RTP_4TUPLE_OPT_INDEX]->opt_len > 0)&&(NULL != mdi->sip_opt[SIP_RTP_4TUPLE_OPT_INDEX]->opt_value))
+ {
+ memcpy(ip_4tuple, mdi->sip_opt[SIP_RTP_4TUPLE_OPT_INDEX]->opt_value, MIN(mdi->sip_opt[SIP_RTP_4TUPLE_OPT_INDEX]->opt_len, 127));
+ memset(src_ip, 0, sizeof(src_ip));
+ memset(src_port, 0, sizeof(src_port));
+ memset(dst_ip, 0, sizeof(dst_ip));
+ memset(dst_port, 0, sizeof(dst_port));
+ if(0==parse_sip_4tuple(ip_4tuple,src_ip,src_port,dst_ip,dst_port))
+ {
+ cJSON_AddStringToObject(root, "rtp_s_ip", src_ip);
+ cJSON_AddStringToObject(root, "rtp_d_ip", dst_ip);
+ cJSON_AddNumberToObject(root, "rtp_s_port", atoi(src_port));
+ cJSON_AddNumberToObject(root, "rtp_d_port", atoi(dst_port));
+ }
+ }
+
+ if((NULL != mdi->sip_opt[SIP_SIP_4TUPLE_OPT_INDEX])&&(mdi->sip_opt[SIP_SIP_4TUPLE_OPT_INDEX]->opt_len > 0)&&(NULL != mdi->sip_opt[SIP_SIP_4TUPLE_OPT_INDEX]->opt_value))
+ {
+ memcpy(ip_4tuple, mdi->sip_opt[SIP_SIP_4TUPLE_OPT_INDEX]->opt_value, MIN(mdi->sip_opt[SIP_SIP_4TUPLE_OPT_INDEX]->opt_len, 127));
+ memset(src_ip, 0, sizeof(src_ip));
+ memset(src_port, 0, sizeof(src_port));
+ memset(dst_ip, 0, sizeof(dst_ip));
+ memset(dst_port, 0, sizeof(dst_port));
+ if(0==parse_sip_4tuple(ip_4tuple,src_ip,src_port,dst_ip,dst_port))
+ {
+ cJSON_AddStringToObject(root, "sip_s_ip", src_ip);
+ cJSON_AddStringToObject(root, "sip_d_ip", dst_ip);
+ cJSON_AddNumberToObject(root, "sip_s_port", atoi(src_port));
+ cJSON_AddNumberToObject(root, "sip_d_port", atoi(dst_port));
+ }
+ }
+
+ if((NULL != mdi->sip_opt[SIP_FROM_OPT_INDEX])&&(mdi->sip_opt[SIP_FROM_OPT_INDEX]->opt_len > 0)&&(NULL != mdi->sip_opt[SIP_FROM_OPT_INDEX]->opt_value))
+ {
+ cJSON_AddStringToObject(root, "calling_account", mdi->sip_opt[SIP_FROM_OPT_INDEX]->opt_value);
+ }
+
+ if((NULL != mdi->sip_opt[SIP_TO_OPT_INDEX])&&(mdi->sip_opt[SIP_TO_OPT_INDEX]->opt_len > 0)&&(NULL != mdi->sip_opt[SIP_TO_OPT_INDEX]->opt_value))
+ {
+ cJSON_AddStringToObject(root, "called_account", mdi->sip_opt[SIP_TO_OPT_INDEX]->opt_value);
+ }
+
+ if((NULL != mdi->sip_opt[SIP_CALL_ID_OPT_INDEX])&&(mdi->sip_opt[SIP_CALL_ID_OPT_INDEX]->opt_len > 0)&&(NULL != mdi->sip_opt[SIP_CALL_ID_OPT_INDEX]->opt_value))
+ {
+ cJSON_AddStringToObject(root, "call_id", mdi->sip_opt[SIP_CALL_ID_OPT_INDEX]->opt_value);
+ }
+
+ if((NULL != mdi->sip_opt[SIP_URI_OPT_INDEX])&&(mdi->sip_opt[SIP_URI_OPT_INDEX]->opt_len > 0)&&(NULL != mdi->sip_opt[SIP_URI_OPT_INDEX]->opt_value))
+ {
+ cJSON_AddStringToObject(root, "request_uri", mdi->sip_opt[SIP_URI_OPT_INDEX]->opt_value);
+ }
+
+ if(NULL!=mdi->sip_opt[SIP_C_CONTACT_OPT_INDEX])
+ {
+ cJSON_AddStringToObject(root, "Contacts", mdi->sip_opt[SIP_C_CONTACT_OPT_INDEX]->opt_value);
+ }
+ else if(NULL!=mdi->sip_opt[SIP_S_CONTACT_OPT_INDEX])
+ {
+ cJSON_AddStringToObject(root, "Contacts", mdi->sip_opt[SIP_S_CONTACT_OPT_INDEX]->opt_value);
+ }
+
+ if(NULL!=mdi->sip_opt[SIP_C_VIA_OPT_INDEX])
+ {
+ cJSON_AddStringToObject(root, "Via", mdi->sip_opt[SIP_C_VIA_OPT_INDEX]->opt_value);
+ }
+ else if(NULL!=mdi->sip_opt[SIP_S_VIA_OPT_INDEX])
+ {
+ cJSON_AddStringToObject(root, "Via", mdi->sip_opt[SIP_S_VIA_OPT_INDEX]->opt_value);
+ }
+
+ if(NULL!=mdi->sip_opt[SIP_C_ROUTE_OPT_INDEX])
+ {
+ cJSON_AddStringToObject(root, "Route", mdi->sip_opt[SIP_C_ROUTE_OPT_INDEX]->opt_value);
+ }
+ else if(NULL!=mdi->sip_opt[SIP_S_ROUTE_OPT_INDEX])
+ {
+ cJSON_AddStringToObject(root, "Route", mdi->sip_opt[SIP_S_ROUTE_OPT_INDEX]->opt_value);
+ }
+
+ if(NULL!=mdi->sip_opt[SIP_C_RECORD_ROUTES_OPT_INDEX])
+ {
+ cJSON_AddStringToObject(root, "Record_route", mdi->sip_opt[SIP_C_RECORD_ROUTES_OPT_INDEX]->opt_value);
+ }
+ else if(NULL!=mdi->sip_opt[SIP_S_RECORD_ROUTES_OPT_INDEX])
+ {
+ cJSON_AddStringToObject(root, "Record_route", mdi->sip_opt[SIP_S_RECORD_ROUTES_OPT_INDEX]->opt_value);
+ }
+
+ if((NULL != mdi->sip_opt[SIP_USERAGENT_OPT_INDEX])&&(mdi->sip_opt[SIP_USERAGENT_OPT_INDEX]->opt_len > 0)&&(NULL != mdi->sip_opt[SIP_USERAGENT_OPT_INDEX]->opt_value))
+ {
+ cJSON_AddStringToObject(root, "User_agent", mdi->sip_opt[SIP_USERAGENT_OPT_INDEX]->opt_value);
+ }
+
+ char* survey = NULL;
+ uint32_t survey_len = 0;
+ if(mdi->sip_survey_type&SIP_SURVEY_TYPE_FD)
+ {
+ survey = mdi->fd_buf;
+ survey_len = mdi->fd_buflen;
+ }
+ if(mdi->sip_survey_type&SIP_SURVEY_TYPE_JC)
+ {
+ survey = mdi->jc_buf;
+ survey_len = mdi->jc_buflen;
+ }
+ if(NULL != survey)
+ {
+ char* locate_url = NULL;
+ uint32_t locate_urllen = survey_len-sizeof(msg_header_t)-sizeof(resp_checkresult_t);
+ char locate_ipbuf[64] = {0};
+ char locate_urlbuf[1024] = {0};
+ char* locate_url_pos = NULL;
+
+ if(locate_urllen>0)
+ {
+ locate_url = survey + sizeof(msg_header_t) + sizeof(resp_checkresult_t);
+ locate_url_pos = (char*)memchr(locate_url, ':', locate_urllen);
+ if(NULL != locate_url_pos)
+ {
+
+ memcpy(locate_ipbuf, locate_url, locate_url_pos-locate_url);
+ memcpy(locate_urlbuf, locate_url_pos, locate_urllen-(locate_url_pos-locate_url+1));
+ if(mdi->re_offset==2)
+ {
+ cJSON_AddStringToObject(root, "to_from_store_ip", locate_ipbuf);
+ cJSON_AddStringToObject(root, "to_from_store_url", locate_urlbuf);
+ }
+ else
+ {
+ cJSON_AddStringToObject(root, "from_to_store_ip", locate_ipbuf);
+ cJSON_AddStringToObject(root, "from_to_store_url", locate_urlbuf);
+ }
+ }
+ }
+ }
+
+ outbuf = cJSON_Print(root);
+ len = strlen(outbuf);
+ int cb_ret = 0;
+ cb_ret = g_frag_run.kafka_producer->SendData(topic_name, (void *)outbuf, (size_t)len);
+ if(cb_ret < 0)
+ {
+ MESA_handle_runtime_log(g_frag_run.voip_logger,RLOG_LV_FATAL,FRAG_REASSEMBLY_MODULE_NAME,
+ (char*)"[%s:%d] send_voip_full_json_log fail." , __FILE__,__LINE__);
+ }
+ else
+ {
+ MESA_handle_runtime_log(g_frag_run.voip_logger, RLOG_LV_INFO, FRAG_REASSEMBLY_MODULE_NAME,
+ "{%s:%d} send_voip_full_json_log : %s", __FILE__,__LINE__, outbuf);
+ }
+ free(outbuf);
+ outbuf = NULL;
+ cJSON_Delete(root);
+ root = NULL;
+}
+
void send_sip_log_when_expire(media_t* mdi)
{
#if K_PROJECT
+ send_voip_full_json_log(mdi);
#else
/*ȫ����־*/
sip_send_full_log(mdi);
diff --git a/src/frag_voip.h b/src/frag_voip.h
index 7f31706..90dad76 100644
--- a/src/frag_voip.h
+++ b/src/frag_voip.h
@@ -27,6 +27,7 @@
#define SIP_RTP_4TUPLE_OPT_INDEX 0
#define SIP_SIP_4TUPLE_OPT_INDEX 23
#define SIP_DURATION_OPT_INDEX 6
+#define SIP_CAPIP_OPT_INDEX 31
#define SIP_PROTO_SIP "SIP-RTP"
#define SIP_PROTO_RTP "RTP"
@@ -49,6 +50,7 @@
#define OPT_VOIP_PID 0x29
#define OPT_VOIP_DUATION_FULLLOG 0x2A
#define OPT_VOIP_VOICE_DIR_FULLLOG 0x2B
+#define OPT_VOIP_CAP_IP_FULLLOG 0x2C
/*�����־ѡ������*/
#define OPT_LAYER_ADDR_V4 0x3B //0x3B:RTP, 0x2D:SIP, change
@@ -62,6 +64,8 @@
#define OPT_VOIP_FROM_TO_STORE_URL 0x37
#define OPT_VOIP_TO_FROM_STORE_IP 0x38
#define OPT_VOIP_TO_FROM_STORE_URL 0x39
+#define OPT_VOIP_VOICE_DIR 0x3C
+#define OPT_VOIP_CAP_IP 0x3D
/*sip_sendlog_flag*/
#define SIP_SEND_FULL_LOG 0x01
@@ -76,6 +80,7 @@
#define VOIP_DATA_SEQ_OFFSET 4
#define VOIP_DATA_TIME_SEQ_LEN 8
+
typedef struct sip_fulllog_msg_body_s
{
char call_id[128]; // not NULL
diff --git a/src/main.c b/src/main.c
index dd93bb3..b4ac64b 100644
--- a/src/main.c
+++ b/src/main.c
@@ -42,7 +42,7 @@ const char* frag_rssb_version = "2018-08-13T09:00:00";
const char* frag_rssb_version_time = "2018-08-13T09:00:00";
const char* frag_rssb_version_des = "MESA@iie rssb_maskey";
-int FRAG_RSSB_VERSION_1_0_20180927 = 0;
+int FRAG_RSSB_VERSION_1_0_20181008 = 0;
const char* frag_rssb_version_time_in = "2018-09-27";
const char* frag_rssb_version_des_in = "hard balance";
void frag_rssb_history()
@@ -218,6 +218,7 @@ void frag_rssb_history()
//2018.09.13 v4.0 //1.frag removal
//2018.09.20 v4.0//1 voip_fulllog add voice_dir opt
//2018.09.27 v4.0 //1. hard balance
+ //2018.10.08 v4.0//1.add send_voip_full_json_log for K_PROJECT;2.voip_fulllog and voip_surveylog add voice_dir and cap_ip opt
}
frag_rssb_parameter_t g_frag_run;
@@ -369,26 +370,40 @@ int multimedia_read_conf_and_init(const char* filename)
__FILE__,__LINE__, TOPIC_SURVEY_JSON);
}
if((g_frag_run.kafka_producer->CreateTopicHandle(TOPIC_VOIP_CREATE_JSON)) == NULL)
- {
- printf("Kafka CreateTopicHandle %s failed.", TOPIC_VOIP_CREATE_JSON);
- MESA_handle_runtime_log(g_frag_run.logger, RLOG_LV_FATAL, FRAG_REASSEMBLY_MODULE_NAME,
- "{%s:%d} Kafka CreateTopicHandle %s failed.",
- __FILE__,__LINE__, TOPIC_VOIP_CREATE_JSON);
- }
- if((g_frag_run.kafka_producer->CreateTopicHandle(TOPIC_VOIP_EXPIRE_JSON)) == NULL)
- {
- printf("Kafka CreateTopicHandle %s failed.", TOPIC_VOIP_EXPIRE_JSON);
- MESA_handle_runtime_log(g_frag_run.logger, RLOG_LV_FATAL, FRAG_REASSEMBLY_MODULE_NAME,
- "{%s:%d} Kafka CreateTopicHandle %s failed.",
- __FILE__,__LINE__, TOPIC_VOIP_EXPIRE_JSON);
- }
- if((g_frag_run.kafka_producer->CreateTopicHandle(TOPIC_VOIP_SURVEY_JSON)) == NULL)
- {
- printf("Kafka CreateTopicHandle %s failed.", TOPIC_VOIP_SURVEY_JSON);
- MESA_handle_runtime_log(g_frag_run.logger, RLOG_LV_FATAL, FRAG_REASSEMBLY_MODULE_NAME,
- "{%s:%d} Kafka CreateTopicHandle %s failed.",
- __FILE__,__LINE__, TOPIC_VOIP_SURVEY_JSON);
- }
+ {
+ printf("Kafka CreateTopicHandle %s failed.", TOPIC_VOIP_CREATE_JSON);
+ MESA_handle_runtime_log(g_frag_run.logger, RLOG_LV_FATAL, FRAG_REASSEMBLY_MODULE_NAME,
+ "{%s:%d} Kafka CreateTopicHandle %s failed.",
+ __FILE__,__LINE__, TOPIC_VOIP_CREATE_JSON);
+ }
+ if((g_frag_run.kafka_producer->CreateTopicHandle(TOPIC_VOIP_EXPIRE_JSON)) == NULL)
+ {
+ printf("Kafka CreateTopicHandle %s failed.", TOPIC_VOIP_EXPIRE_JSON);
+ MESA_handle_runtime_log(g_frag_run.logger, RLOG_LV_FATAL, FRAG_REASSEMBLY_MODULE_NAME,
+ "{%s:%d} Kafka CreateTopicHandle %s failed.",
+ __FILE__,__LINE__, TOPIC_VOIP_EXPIRE_JSON);
+ }
+ if((g_frag_run.kafka_producer->CreateTopicHandle(TOPIC_VOIP_SURVEY_JSON)) == NULL)
+ {
+ printf("Kafka CreateTopicHandle %s failed.", TOPIC_VOIP_SURVEY_JSON);
+ MESA_handle_runtime_log(g_frag_run.logger, RLOG_LV_FATAL, FRAG_REASSEMBLY_MODULE_NAME,
+ "{%s:%d} Kafka CreateTopicHandle %s failed.",
+ __FILE__,__LINE__, TOPIC_VOIP_SURVEY_JSON);
+ }
+ if((g_frag_run.kafka_producer->CreateTopicHandle(TOPIC_MM_SAMPLE_VOIP_LOG)) == NULL)
+ {
+ printf("Kafka CreateTopicHandle %s failed.", TOPIC_MM_SAMPLE_VOIP_LOG);
+ MESA_handle_runtime_log(g_frag_run.logger, RLOG_LV_FATAL, FRAG_REASSEMBLY_MODULE_NAME,
+ "{%s:%d} Kafka CreateTopicHandle %s failed.",
+ __FILE__,__LINE__, TOPIC_MM_SAMPLE_VOIP_LOG);
+ }
+ if((g_frag_run.kafka_producer->CreateTopicHandle(TOPIC_NTC_COLLECT_VOIP_LOG)) == NULL)
+ {
+ printf("Kafka CreateTopicHandle %s failed.", TOPIC_NTC_COLLECT_VOIP_LOG);
+ MESA_handle_runtime_log(g_frag_run.logger, RLOG_LV_FATAL, FRAG_REASSEMBLY_MODULE_NAME,
+ "{%s:%d} Kafka CreateTopicHandle %s failed.",
+ __FILE__,__LINE__, TOPIC_NTC_COLLECT_VOIP_LOG);
+ }
return 0;
}
diff --git a/src/message.c b/src/message.c
index 15a71a5..a159d55 100644
--- a/src/message.c
+++ b/src/message.c
@@ -384,7 +384,7 @@ void send_json_log(media_t* mdi, resp_checkresult_t* check_res)
cJSON* root = NULL;
- string topic_name = TOPIC_VOIP_SURVEY_JSON;
+ string topic_name = TOPIC_MM_SAMPLE_VOIP_LOG;
char pid_buf[64] = {0};
char pbuf[32] = {0};
@@ -401,7 +401,7 @@ void send_json_log(media_t* mdi, resp_checkresult_t* check_res)
cJSON_AddNumberToObject(root, "found_time", mdi->create_time);
cJSON_AddNumberToObject(root, "recv_time", time(NULL));
cJSON_AddNumberToObject(root, "fd_type", FD_TYPE_ANALYSE);
- cJSON_AddStringToObject(root, "voip_protocol", "RTP");
+ cJSON_AddStringToObject(root, "voip_protocol", SIP_PROTO_RTP);
memset(pbuf, 0, sizeof(pbuf));
inet_ntop(AF_INET, &g_frag_cfg.local_ip_nr, pbuf, sizeof(pbuf));