summaryrefslogtreecommitdiff
path: root/src/frag_monitor.c
diff options
context:
space:
mode:
authorlishu <[email protected]>2019-08-20 17:33:02 +0800
committerlishu <[email protected]>2019-08-20 17:33:02 +0800
commit30bad7c8270f7b22153bbe19bdddf7374065c0b0 (patch)
treeeb008a35019bf99085284547d1c2f9c3bea4c987 /src/frag_monitor.c
parentfdc7dd827a1afb3df59c4d07a8384b9218638adc (diff)
碎片GK,扫描maat,自定义字段长度不受限制
Diffstat (limited to 'src/frag_monitor.c')
-rw-r--r--src/frag_monitor.c53
1 files changed, 30 insertions, 23 deletions
diff --git a/src/frag_monitor.c b/src/frag_monitor.c
index 306ee47..87127e5 100644
--- a/src/frag_monitor.c
+++ b/src/frag_monitor.c
@@ -295,13 +295,18 @@ void frgmnt_index_sendback(frag_info_t* frag, http_infor* a_http, struct streami
snprintf(filename,sizeof(filename),"%s/%s,%u", g_frag_prog_para.trace_dir,addr, a_http->http_session_seq);
index_local_log(filename, out, out_len);
}
- /*kfaka*/
+ /*kfaka*/
+#if K_PROJECT
if(NULL!=g_multi_kafka_producer)
- {
- //cb_ret = g_multi_kafka_producer->SendData(topic_name, (void *)out, (size_t)out_len);
+ {
cb_ret = rd_kafka_produce(g_frag_prog_para.frag_topic_rkt[MEDIA_INDEX_TOPIC_RKT], RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, out, out_len, NULL, 0, NULL);
-
- }
+ }
+#else
+ if(NULL!=g_frag_prog_para.index_kafka_producer)
+ {
+ cb_ret = g_frag_prog_para.index_kafka_producer->SendData(topic_name, (void *)out, (size_t)out_len);
+ }
+#endif
if(out)
{
free(out);
@@ -1488,8 +1493,9 @@ int read_main_conf()
return -1;
}
- // index Kafka��ʼ��//
- /*
+#if K_PROJECT
+#else
+ // index Kafka��ʼ��//
wired_cfg_read(g_frag_prog_para.avcomconf_handle,"COMMON","FD_TYPE2_KAFKA_BROKERS",g_frag_prog_para.index_brokers, sizeof(g_frag_prog_para.index_brokers),"0");
g_frag_prog_para.index_kafka_producer = new KafkaProducer(g_frag_prog_para.index_brokers);
if(NULL==g_frag_prog_para.index_kafka_producer)
@@ -1510,8 +1516,8 @@ int read_main_conf()
MESA_handle_runtime_log(g_frag_prog_para.logger, RLOG_LV_FATAL, FRGMNT_PLUGIN_NAME,
"{%s:%d} KafkaConnection %s succ.",
__FILE__,__LINE__, g_frag_prog_para.index_brokers);
- }
- */
+ }
+#endif
//TRACE
wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","TraceConfFile",config_buff, sizeof(config_buff),"./avconf/frag_monitor/addr_trace.conf");
@@ -1528,26 +1534,27 @@ extern "C" int FRAG_MONITOR_INIT()
return -1;
}
- /*kafka*/
+ /*kafka*/
+#if K_PROJECT
if(NULL!=g_multi_kafka_producer)
- {
- /*
- if((g_multi_kafka_producer->CreateTopicHandle(TOPIC_MEDIA_INDEX_DATA)) == NULL)
- {
- printf("Kafka CreateTopicHandle %s failed.", TOPIC_MEDIA_INDEX_DATA);
- return -1;
- }
- if((g_multi_kafka_producer->CreateTopicHandle(TOPIC_MEDIA_CONVERGE_DATA)) == NULL)
- {
- printf("Kafka CreateTopicHandle %s failed.", TOPIC_MEDIA_CONVERGE_DATA);
- return -1;
- }
- */
+ {
rd_kafka_topic_conf_t* frag_topic_conf = rd_kafka_topic_conf_new();
g_frag_prog_para.frag_topic_rkt[MEDIA_INDEX_TOPIC_RKT] = rd_kafka_topic_new(g_multi_kafka_producer, TOPIC_MEDIA_INDEX_DATA, frag_topic_conf);
frag_topic_conf = rd_kafka_topic_conf_new();
g_frag_prog_para.frag_topic_rkt[MEDIA_CNVG_TOPIC_RKT] = rd_kafka_topic_new(g_multi_kafka_producer, TOPIC_MEDIA_CONVERGE_DATA, frag_topic_conf);
}
+#else
+ if((g_frag_prog_para.index_kafka_producer->CreateTopicHandle(TOPIC_MEDIA_INDEX_DATA)) == NULL)
+ {
+ printf("Kafka CreateTopicHandle %s failed.", TOPIC_MEDIA_INDEX_DATA);
+ return -1;
+ }
+ if((g_frag_prog_para.index_kafka_producer->CreateTopicHandle(TOPIC_MEDIA_CONVERGE_DATA)) == NULL)
+ {
+ printf("Kafka CreateTopicHandle %s failed.", TOPIC_MEDIA_CONVERGE_DATA);
+ return -1;
+ }
+#endif
g_frag_prog_para.frag_project_id = project_producer_register("FRAG", PROJECT_VAL_TYPE_CHAR, NULL);
return 0;