summaryrefslogtreecommitdiff
path: root/src/frag_monitor.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/frag_monitor.c')
-rw-r--r--src/frag_monitor.c53
1 files changed, 30 insertions, 23 deletions
diff --git a/src/frag_monitor.c b/src/frag_monitor.c
index 306ee47..87127e5 100644
--- a/src/frag_monitor.c
+++ b/src/frag_monitor.c
@@ -295,13 +295,18 @@ void frgmnt_index_sendback(frag_info_t* frag, http_infor* a_http, struct streami
snprintf(filename,sizeof(filename),"%s/%s,%u", g_frag_prog_para.trace_dir,addr, a_http->http_session_seq);
index_local_log(filename, out, out_len);
}
- /*kfaka*/
+ /*kfaka*/
+#if K_PROJECT
if(NULL!=g_multi_kafka_producer)
- {
- //cb_ret = g_multi_kafka_producer->SendData(topic_name, (void *)out, (size_t)out_len);
+ {
cb_ret = rd_kafka_produce(g_frag_prog_para.frag_topic_rkt[MEDIA_INDEX_TOPIC_RKT], RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, out, out_len, NULL, 0, NULL);
-
- }
+ }
+#else
+ if(NULL!=g_frag_prog_para.index_kafka_producer)
+ {
+ cb_ret = g_frag_prog_para.index_kafka_producer->SendData(topic_name, (void *)out, (size_t)out_len);
+ }
+#endif
if(out)
{
free(out);
@@ -1488,8 +1493,9 @@ int read_main_conf()
return -1;
}
- // index Kafka��ʼ��//
- /*
+#if K_PROJECT
+#else
+ // index Kafka��ʼ��//
wired_cfg_read(g_frag_prog_para.avcomconf_handle,"COMMON","FD_TYPE2_KAFKA_BROKERS",g_frag_prog_para.index_brokers, sizeof(g_frag_prog_para.index_brokers),"0");
g_frag_prog_para.index_kafka_producer = new KafkaProducer(g_frag_prog_para.index_brokers);
if(NULL==g_frag_prog_para.index_kafka_producer)
@@ -1510,8 +1516,8 @@ int read_main_conf()
MESA_handle_runtime_log(g_frag_prog_para.logger, RLOG_LV_FATAL, FRGMNT_PLUGIN_NAME,
"{%s:%d} KafkaConnection %s succ.",
__FILE__,__LINE__, g_frag_prog_para.index_brokers);
- }
- */
+ }
+#endif
//TRACE
wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","TraceConfFile",config_buff, sizeof(config_buff),"./avconf/frag_monitor/addr_trace.conf");
@@ -1528,26 +1534,27 @@ extern "C" int FRAG_MONITOR_INIT()
return -1;
}
- /*kafka*/
+ /*kafka*/
+#if K_PROJECT
if(NULL!=g_multi_kafka_producer)
- {
- /*
- if((g_multi_kafka_producer->CreateTopicHandle(TOPIC_MEDIA_INDEX_DATA)) == NULL)
- {
- printf("Kafka CreateTopicHandle %s failed.", TOPIC_MEDIA_INDEX_DATA);
- return -1;
- }
- if((g_multi_kafka_producer->CreateTopicHandle(TOPIC_MEDIA_CONVERGE_DATA)) == NULL)
- {
- printf("Kafka CreateTopicHandle %s failed.", TOPIC_MEDIA_CONVERGE_DATA);
- return -1;
- }
- */
+ {
rd_kafka_topic_conf_t* frag_topic_conf = rd_kafka_topic_conf_new();
g_frag_prog_para.frag_topic_rkt[MEDIA_INDEX_TOPIC_RKT] = rd_kafka_topic_new(g_multi_kafka_producer, TOPIC_MEDIA_INDEX_DATA, frag_topic_conf);
frag_topic_conf = rd_kafka_topic_conf_new();
g_frag_prog_para.frag_topic_rkt[MEDIA_CNVG_TOPIC_RKT] = rd_kafka_topic_new(g_multi_kafka_producer, TOPIC_MEDIA_CONVERGE_DATA, frag_topic_conf);
}
+#else
+ if((g_frag_prog_para.index_kafka_producer->CreateTopicHandle(TOPIC_MEDIA_INDEX_DATA)) == NULL)
+ {
+ printf("Kafka CreateTopicHandle %s failed.", TOPIC_MEDIA_INDEX_DATA);
+ return -1;
+ }
+ if((g_frag_prog_para.index_kafka_producer->CreateTopicHandle(TOPIC_MEDIA_CONVERGE_DATA)) == NULL)
+ {
+ printf("Kafka CreateTopicHandle %s failed.", TOPIC_MEDIA_CONVERGE_DATA);
+ return -1;
+ }
+#endif
g_frag_prog_para.frag_project_id = project_producer_register("FRAG", PROJECT_VAL_TYPE_CHAR, NULL);
return 0;