diff options
| author | lishu <[email protected]> | 2019-08-20 17:33:02 +0800 |
|---|---|---|
| committer | lishu <[email protected]> | 2019-08-20 17:33:02 +0800 |
| commit | 30bad7c8270f7b22153bbe19bdddf7374065c0b0 (patch) | |
| tree | eb008a35019bf99085284547d1c2f9c3bea4c987 /src | |
| parent | fdc7dd827a1afb3df59c4d07a8384b9218638adc (diff) | |
碎片GK,扫描maat,自定义字段长度不受限制
Diffstat (limited to 'src')
| -rw-r--r-- | src/Makefile | 11 | ||||
| -rw-r--r-- | src/frag_block.c | 7 | ||||
| -rw-r--r-- | src/frag_monitor.c | 53 | ||||
| -rw-r--r-- | src/frag_monitor.h | 4 |
4 files changed, 47 insertions, 28 deletions
diff --git a/src/Makefile b/src/Makefile index c48206b..272ea3f 100644 --- a/src/Makefile +++ b/src/Makefile @@ -1,7 +1,7 @@ vpath %.a ../lib vpath %.h ./inc -PAPP_PATH=/home/lishu/sapp_k/ +PAPP_PATH=/home/mesasoft/sapp/ #CFLAGS = -g3 -Wall -fPIC -Werror -O #CFLAGS = -g3 -Wall -fPIC -O @@ -13,6 +13,15 @@ CFLAGS += $(INCLUDES) CC = g++ CCC = g++ + +ifeq ($(PROJECT), K) +CFLAGS += -DK_PROJECT=1 +else ifeq ($(PROJECT), Z) +CFLAGS += -DK_PROJECT=0 +else +CFLAGS += -DK_PROJECT=0 +endif + LIB = -L./lib/ LIB += -L/usr/local/lib/ LIB += -lrdkafka diff --git a/src/frag_block.c b/src/frag_block.c index 34853d0..b27850f 100644 --- a/src/frag_block.c +++ b/src/frag_block.c @@ -273,7 +273,7 @@ char frag_check_block(frag_info_t *frag, http_infor* a_http, struct streaminfo * memset(cfg_url, 0, sizeof(cfg_url)); maat_define = (char*)dictator_malloc(thread_seq, sizeof(char)*maat_result[i].serv_def_len+1); define_return = Maat_read_rule(g_AV_global_feather, &maat_result[i], MAAT_RULE_SERV_DEFINE, maat_define, maat_result[i].serv_def_len); - if(0==define_return) + if(0!=define_return) { maat_define[maat_result[i].serv_def_len] = '\0'; src = maat_define; @@ -306,7 +306,10 @@ char frag_check_block(frag_info_t *frag, http_infor* a_http, struct streaminfo * pos = strtok_r(NULL, "|", &saveptr); } } - dictator_free(thread_seq, maat_define); + if(NULL!=maat_define) + { + dictator_free(thread_seq, maat_define); + } /*��£��������־*/ frag_kill_connection(feature_info.mid, frag, a_http, a_tcp, a_packet); frag_send_block_log(fd_type, locate_url, &feature_info, frag, a_http, a_tcp); diff --git a/src/frag_monitor.c b/src/frag_monitor.c index 306ee47..87127e5 100644 --- a/src/frag_monitor.c +++ b/src/frag_monitor.c @@ -295,13 +295,18 @@ void frgmnt_index_sendback(frag_info_t* frag, http_infor* a_http, struct streami snprintf(filename,sizeof(filename),"%s/%s,%u", g_frag_prog_para.trace_dir,addr, a_http->http_session_seq); index_local_log(filename, out, out_len); } - /*kfaka*/ + /*kfaka*/ +#if K_PROJECT if(NULL!=g_multi_kafka_producer) - { - //cb_ret = g_multi_kafka_producer->SendData(topic_name, (void *)out, (size_t)out_len); + { cb_ret = rd_kafka_produce(g_frag_prog_para.frag_topic_rkt[MEDIA_INDEX_TOPIC_RKT], RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, out, out_len, NULL, 0, NULL); - - } + } +#else + if(NULL!=g_frag_prog_para.index_kafka_producer) + { + cb_ret = g_frag_prog_para.index_kafka_producer->SendData(topic_name, (void *)out, (size_t)out_len); + } +#endif if(out) { free(out); @@ -1488,8 +1493,9 @@ int read_main_conf() return -1; } - // index Kafka��ʼ��// - /* +#if K_PROJECT +#else + // index Kafka��ʼ��// wired_cfg_read(g_frag_prog_para.avcomconf_handle,"COMMON","FD_TYPE2_KAFKA_BROKERS",g_frag_prog_para.index_brokers, sizeof(g_frag_prog_para.index_brokers),"0"); g_frag_prog_para.index_kafka_producer = new KafkaProducer(g_frag_prog_para.index_brokers); if(NULL==g_frag_prog_para.index_kafka_producer) @@ -1510,8 +1516,8 @@ int read_main_conf() MESA_handle_runtime_log(g_frag_prog_para.logger, RLOG_LV_FATAL, FRGMNT_PLUGIN_NAME, "{%s:%d} KafkaConnection %s succ.", __FILE__,__LINE__, g_frag_prog_para.index_brokers); - } - */ + } +#endif //TRACE wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","TraceConfFile",config_buff, sizeof(config_buff),"./avconf/frag_monitor/addr_trace.conf"); @@ -1528,26 +1534,27 @@ extern "C" int FRAG_MONITOR_INIT() return -1; } - /*kafka*/ + /*kafka*/ +#if K_PROJECT if(NULL!=g_multi_kafka_producer) - { - /* - if((g_multi_kafka_producer->CreateTopicHandle(TOPIC_MEDIA_INDEX_DATA)) == NULL) - { - printf("Kafka CreateTopicHandle %s failed.", TOPIC_MEDIA_INDEX_DATA); - return -1; - } - if((g_multi_kafka_producer->CreateTopicHandle(TOPIC_MEDIA_CONVERGE_DATA)) == NULL) - { - printf("Kafka CreateTopicHandle %s failed.", TOPIC_MEDIA_CONVERGE_DATA); - return -1; - } - */ + { rd_kafka_topic_conf_t* frag_topic_conf = rd_kafka_topic_conf_new(); g_frag_prog_para.frag_topic_rkt[MEDIA_INDEX_TOPIC_RKT] = rd_kafka_topic_new(g_multi_kafka_producer, TOPIC_MEDIA_INDEX_DATA, frag_topic_conf); frag_topic_conf = rd_kafka_topic_conf_new(); g_frag_prog_para.frag_topic_rkt[MEDIA_CNVG_TOPIC_RKT] = rd_kafka_topic_new(g_multi_kafka_producer, TOPIC_MEDIA_CONVERGE_DATA, frag_topic_conf); } +#else + if((g_frag_prog_para.index_kafka_producer->CreateTopicHandle(TOPIC_MEDIA_INDEX_DATA)) == NULL) + { + printf("Kafka CreateTopicHandle %s failed.", TOPIC_MEDIA_INDEX_DATA); + return -1; + } + if((g_frag_prog_para.index_kafka_producer->CreateTopicHandle(TOPIC_MEDIA_CONVERGE_DATA)) == NULL) + { + printf("Kafka CreateTopicHandle %s failed.", TOPIC_MEDIA_CONVERGE_DATA); + return -1; + } +#endif g_frag_prog_para.frag_project_id = project_producer_register("FRAG", PROJECT_VAL_TYPE_CHAR, NULL); return 0; diff --git a/src/frag_monitor.h b/src/frag_monitor.h index 7833209..79e2980 100644 --- a/src/frag_monitor.h +++ b/src/frag_monitor.h @@ -4,7 +4,7 @@ #include "KafkaProducer.h" -#define K_PROJECT 1 +//#define K_PROJECT 1 #define FRGMNT_PLUGIN_NAME "[frag_monitor.so]" #define MAX_PATH_LEN 256 #define MAAT_RESULT_NUM 64 @@ -203,7 +203,7 @@ typedef struct frag_monitor_runtime_parameter_s Maat_feather_t feather; MESA_htable_handle media_hash; //���߳�����hash void* addr_trace; - //KafkaProducer* index_kafka_producer; + KafkaProducer* index_kafka_producer; char index_brokers[512]; //kafka uint32 local_ip_nr; uint32 s2c_cache_size; |
