summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorlishu <[email protected]>2019-08-20 17:33:02 +0800
committerlishu <[email protected]>2019-08-20 17:33:02 +0800
commit30bad7c8270f7b22153bbe19bdddf7374065c0b0 (patch)
treeeb008a35019bf99085284547d1c2f9c3bea4c987 /src
parentfdc7dd827a1afb3df59c4d07a8384b9218638adc (diff)
碎片GK,扫描maat,自定义字段长度不受限制
Diffstat (limited to 'src')
-rw-r--r--src/Makefile11
-rw-r--r--src/frag_block.c7
-rw-r--r--src/frag_monitor.c53
-rw-r--r--src/frag_monitor.h4
4 files changed, 47 insertions, 28 deletions
diff --git a/src/Makefile b/src/Makefile
index c48206b..272ea3f 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -1,7 +1,7 @@
vpath %.a ../lib
vpath %.h ./inc
-PAPP_PATH=/home/lishu/sapp_k/
+PAPP_PATH=/home/mesasoft/sapp/
#CFLAGS = -g3 -Wall -fPIC -Werror -O
#CFLAGS = -g3 -Wall -fPIC -O
@@ -13,6 +13,15 @@ CFLAGS += $(INCLUDES)
CC = g++
CCC = g++
+
+ifeq ($(PROJECT), K)
+CFLAGS += -DK_PROJECT=1
+else ifeq ($(PROJECT), Z)
+CFLAGS += -DK_PROJECT=0
+else
+CFLAGS += -DK_PROJECT=0
+endif
+
LIB = -L./lib/
LIB += -L/usr/local/lib/
LIB += -lrdkafka
diff --git a/src/frag_block.c b/src/frag_block.c
index 34853d0..b27850f 100644
--- a/src/frag_block.c
+++ b/src/frag_block.c
@@ -273,7 +273,7 @@ char frag_check_block(frag_info_t *frag, http_infor* a_http, struct streaminfo *
memset(cfg_url, 0, sizeof(cfg_url));
maat_define = (char*)dictator_malloc(thread_seq, sizeof(char)*maat_result[i].serv_def_len+1);
define_return = Maat_read_rule(g_AV_global_feather, &maat_result[i], MAAT_RULE_SERV_DEFINE, maat_define, maat_result[i].serv_def_len);
- if(0==define_return)
+ if(0!=define_return)
{
maat_define[maat_result[i].serv_def_len] = '\0';
src = maat_define;
@@ -306,7 +306,10 @@ char frag_check_block(frag_info_t *frag, http_infor* a_http, struct streaminfo *
pos = strtok_r(NULL, "|", &saveptr);
}
}
- dictator_free(thread_seq, maat_define);
+ if(NULL!=maat_define)
+ {
+ dictator_free(thread_seq, maat_define);
+ }
/*��£��������־*/
frag_kill_connection(feature_info.mid, frag, a_http, a_tcp, a_packet);
frag_send_block_log(fd_type, locate_url, &feature_info, frag, a_http, a_tcp);
diff --git a/src/frag_monitor.c b/src/frag_monitor.c
index 306ee47..87127e5 100644
--- a/src/frag_monitor.c
+++ b/src/frag_monitor.c
@@ -295,13 +295,18 @@ void frgmnt_index_sendback(frag_info_t* frag, http_infor* a_http, struct streami
snprintf(filename,sizeof(filename),"%s/%s,%u", g_frag_prog_para.trace_dir,addr, a_http->http_session_seq);
index_local_log(filename, out, out_len);
}
- /*kfaka*/
+ /*kfaka*/
+#if K_PROJECT
if(NULL!=g_multi_kafka_producer)
- {
- //cb_ret = g_multi_kafka_producer->SendData(topic_name, (void *)out, (size_t)out_len);
+ {
cb_ret = rd_kafka_produce(g_frag_prog_para.frag_topic_rkt[MEDIA_INDEX_TOPIC_RKT], RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, out, out_len, NULL, 0, NULL);
-
- }
+ }
+#else
+ if(NULL!=g_frag_prog_para.index_kafka_producer)
+ {
+ cb_ret = g_frag_prog_para.index_kafka_producer->SendData(topic_name, (void *)out, (size_t)out_len);
+ }
+#endif
if(out)
{
free(out);
@@ -1488,8 +1493,9 @@ int read_main_conf()
return -1;
}
- // index Kafka��ʼ��//
- /*
+#if K_PROJECT
+#else
+ // index Kafka��ʼ��//
wired_cfg_read(g_frag_prog_para.avcomconf_handle,"COMMON","FD_TYPE2_KAFKA_BROKERS",g_frag_prog_para.index_brokers, sizeof(g_frag_prog_para.index_brokers),"0");
g_frag_prog_para.index_kafka_producer = new KafkaProducer(g_frag_prog_para.index_brokers);
if(NULL==g_frag_prog_para.index_kafka_producer)
@@ -1510,8 +1516,8 @@ int read_main_conf()
MESA_handle_runtime_log(g_frag_prog_para.logger, RLOG_LV_FATAL, FRGMNT_PLUGIN_NAME,
"{%s:%d} KafkaConnection %s succ.",
__FILE__,__LINE__, g_frag_prog_para.index_brokers);
- }
- */
+ }
+#endif
//TRACE
wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","TraceConfFile",config_buff, sizeof(config_buff),"./avconf/frag_monitor/addr_trace.conf");
@@ -1528,26 +1534,27 @@ extern "C" int FRAG_MONITOR_INIT()
return -1;
}
- /*kafka*/
+ /*kafka*/
+#if K_PROJECT
if(NULL!=g_multi_kafka_producer)
- {
- /*
- if((g_multi_kafka_producer->CreateTopicHandle(TOPIC_MEDIA_INDEX_DATA)) == NULL)
- {
- printf("Kafka CreateTopicHandle %s failed.", TOPIC_MEDIA_INDEX_DATA);
- return -1;
- }
- if((g_multi_kafka_producer->CreateTopicHandle(TOPIC_MEDIA_CONVERGE_DATA)) == NULL)
- {
- printf("Kafka CreateTopicHandle %s failed.", TOPIC_MEDIA_CONVERGE_DATA);
- return -1;
- }
- */
+ {
rd_kafka_topic_conf_t* frag_topic_conf = rd_kafka_topic_conf_new();
g_frag_prog_para.frag_topic_rkt[MEDIA_INDEX_TOPIC_RKT] = rd_kafka_topic_new(g_multi_kafka_producer, TOPIC_MEDIA_INDEX_DATA, frag_topic_conf);
frag_topic_conf = rd_kafka_topic_conf_new();
g_frag_prog_para.frag_topic_rkt[MEDIA_CNVG_TOPIC_RKT] = rd_kafka_topic_new(g_multi_kafka_producer, TOPIC_MEDIA_CONVERGE_DATA, frag_topic_conf);
}
+#else
+ if((g_frag_prog_para.index_kafka_producer->CreateTopicHandle(TOPIC_MEDIA_INDEX_DATA)) == NULL)
+ {
+ printf("Kafka CreateTopicHandle %s failed.", TOPIC_MEDIA_INDEX_DATA);
+ return -1;
+ }
+ if((g_frag_prog_para.index_kafka_producer->CreateTopicHandle(TOPIC_MEDIA_CONVERGE_DATA)) == NULL)
+ {
+ printf("Kafka CreateTopicHandle %s failed.", TOPIC_MEDIA_CONVERGE_DATA);
+ return -1;
+ }
+#endif
g_frag_prog_para.frag_project_id = project_producer_register("FRAG", PROJECT_VAL_TYPE_CHAR, NULL);
return 0;
diff --git a/src/frag_monitor.h b/src/frag_monitor.h
index 7833209..79e2980 100644
--- a/src/frag_monitor.h
+++ b/src/frag_monitor.h
@@ -4,7 +4,7 @@
#include "KafkaProducer.h"
-#define K_PROJECT 1
+//#define K_PROJECT 1
#define FRGMNT_PLUGIN_NAME "[frag_monitor.so]"
#define MAX_PATH_LEN 256
#define MAAT_RESULT_NUM 64
@@ -203,7 +203,7 @@ typedef struct frag_monitor_runtime_parameter_s
Maat_feather_t feather;
MESA_htable_handle media_hash; //���߳�����hash
void* addr_trace;
- //KafkaProducer* index_kafka_producer;
+ KafkaProducer* index_kafka_producer;
char index_brokers[512]; //kafka
uint32 local_ip_nr;
uint32 s2c_cache_size;