From 30bad7c8270f7b22153bbe19bdddf7374065c0b0 Mon Sep 17 00:00:00 2001 From: lishu Date: Tue, 20 Aug 2019 17:33:02 +0800 Subject: 碎片GK,扫描maat,自定义字段长度不受限制 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/frag_monitor.c | 53 ++++++++++++++++++++++++++++++----------------------- 1 file changed, 30 insertions(+), 23 deletions(-) (limited to 'src/frag_monitor.c') diff --git a/src/frag_monitor.c b/src/frag_monitor.c index 306ee47..87127e5 100644 --- a/src/frag_monitor.c +++ b/src/frag_monitor.c @@ -295,13 +295,18 @@ void frgmnt_index_sendback(frag_info_t* frag, http_infor* a_http, struct streami snprintf(filename,sizeof(filename),"%s/%s,%u", g_frag_prog_para.trace_dir,addr, a_http->http_session_seq); index_local_log(filename, out, out_len); } - /*kfaka*/ + /*kfaka*/ +#if K_PROJECT if(NULL!=g_multi_kafka_producer) - { - //cb_ret = g_multi_kafka_producer->SendData(topic_name, (void *)out, (size_t)out_len); + { cb_ret = rd_kafka_produce(g_frag_prog_para.frag_topic_rkt[MEDIA_INDEX_TOPIC_RKT], RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, out, out_len, NULL, 0, NULL); - - } + } +#else + if(NULL!=g_frag_prog_para.index_kafka_producer) + { + cb_ret = g_frag_prog_para.index_kafka_producer->SendData(topic_name, (void *)out, (size_t)out_len); + } +#endif if(out) { free(out); @@ -1488,8 +1493,9 @@ int read_main_conf() return -1; } - // index Kafkaʼ// - /* +#if K_PROJECT +#else + // index Kafkaʼ// wired_cfg_read(g_frag_prog_para.avcomconf_handle,"COMMON","FD_TYPE2_KAFKA_BROKERS",g_frag_prog_para.index_brokers, sizeof(g_frag_prog_para.index_brokers),"0"); g_frag_prog_para.index_kafka_producer = new KafkaProducer(g_frag_prog_para.index_brokers); if(NULL==g_frag_prog_para.index_kafka_producer) @@ -1510,8 +1516,8 @@ int read_main_conf() MESA_handle_runtime_log(g_frag_prog_para.logger, RLOG_LV_FATAL, FRGMNT_PLUGIN_NAME, "{%s:%d} KafkaConnection %s succ.", __FILE__,__LINE__, g_frag_prog_para.index_brokers); - } - */ + } +#endif //TRACE wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","TraceConfFile",config_buff, sizeof(config_buff),"./avconf/frag_monitor/addr_trace.conf"); @@ -1528,26 +1534,27 @@ extern "C" int FRAG_MONITOR_INIT() return -1; } - /*kafka*/ + /*kafka*/ +#if K_PROJECT if(NULL!=g_multi_kafka_producer) - { - /* - if((g_multi_kafka_producer->CreateTopicHandle(TOPIC_MEDIA_INDEX_DATA)) == NULL) - { - printf("Kafka CreateTopicHandle %s failed.", TOPIC_MEDIA_INDEX_DATA); - return -1; - } - if((g_multi_kafka_producer->CreateTopicHandle(TOPIC_MEDIA_CONVERGE_DATA)) == NULL) - { - printf("Kafka CreateTopicHandle %s failed.", TOPIC_MEDIA_CONVERGE_DATA); - return -1; - } - */ + { rd_kafka_topic_conf_t* frag_topic_conf = rd_kafka_topic_conf_new(); g_frag_prog_para.frag_topic_rkt[MEDIA_INDEX_TOPIC_RKT] = rd_kafka_topic_new(g_multi_kafka_producer, TOPIC_MEDIA_INDEX_DATA, frag_topic_conf); frag_topic_conf = rd_kafka_topic_conf_new(); g_frag_prog_para.frag_topic_rkt[MEDIA_CNVG_TOPIC_RKT] = rd_kafka_topic_new(g_multi_kafka_producer, TOPIC_MEDIA_CONVERGE_DATA, frag_topic_conf); } +#else + if((g_frag_prog_para.index_kafka_producer->CreateTopicHandle(TOPIC_MEDIA_INDEX_DATA)) == NULL) + { + printf("Kafka CreateTopicHandle %s failed.", TOPIC_MEDIA_INDEX_DATA); + return -1; + } + if((g_frag_prog_para.index_kafka_producer->CreateTopicHandle(TOPIC_MEDIA_CONVERGE_DATA)) == NULL) + { + printf("Kafka CreateTopicHandle %s failed.", TOPIC_MEDIA_CONVERGE_DATA); + return -1; + } +#endif g_frag_prog_para.frag_project_id = project_producer_register("FRAG", PROJECT_VAL_TYPE_CHAR, NULL); return 0; -- cgit v1.2.3