summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2019-01-08 17:13:53 +0600
committeryangwei <[email protected]>2019-01-08 17:13:53 +0600
commit2adf5c81f181e18fb996ecd973724ea8687c9c5d (patch)
tree8968555a44d5f6c06a840871505f293aaebd940e
parenta4f1f69cd9be7f87db1aea8208a1010d04625a22 (diff)
同步线上代码
1、新增fs2输出项 2、调整默认配置文件读取路径
-rw-r--r--src/ntc_http_collect.c83
-rw-r--r--src/ntc_http_collect.h4
2 files changed, 57 insertions, 30 deletions
diff --git a/src/ntc_http_collect.c b/src/ntc_http_collect.c
index 743455e..2ac741b 100644
--- a/src/ntc_http_collect.c
+++ b/src/ntc_http_collect.c
@@ -37,8 +37,8 @@
#define LOCAL_IP "127.0.0.1"
#define PLUGIN_NAME "NTC_HTTP_COLLECT"
-#define PROFILE_PATH "./avconf/av.conf"
-#define FILTER_PATH "./avconf/http_url_filter.conf"
+#define PROFILE_PATH "./t1conf/main.conf"
+#define FILTER_PATH "./t1conf/http_url_filter.conf"
#ifndef MAX_BUF_LEN
#define MAX_BUF_LEN 4096
@@ -51,8 +51,6 @@
#define MIN(a, b) (((a) < (b)) ? (a) : (b))
-#define APP_NAME "NTC_HTTP_COLLECT"
-
int NTC_HTTP_COLLECT_VERSION_20180910 = 1;
@@ -118,13 +116,12 @@ static int append_http_query_key(struct streaminfo *a_tcp,int http_seq, char *ou
}
ret = MESA_get_stream_opt(a_tcp, MSO_TCP_ISN_C2S, &tcp_seq, &tcp_seq_len);
- if(ret != 0)
- return -1;
-
+ if(ret != 0)return -1;
+
ret = MESA_get_stream_opt(a_tcp, MSO_STREAM_TUNNEL_TYPE, &tunnel_type, &tunnel_type_len);
- if(ret != 0)
- return -1;
+ if(ret != 0)return -1;
+
if(STREAM_TUNNLE_NON == tunnel_type)
{
const char * addr = printaddr(&a_tcp->addr, a_tcp->threadnum);
@@ -192,15 +189,16 @@ static void feedback_url_to_kafka(streaminfo *a_tcp, int http_seq, store_hash_da
{
cJSON_AddStringToObject(value_node, "referer", store_hash_data->refer_buf);
}
- char *value = cJSON_Print(value_node);
+ cJSON_AddNumberToObject(value_node, "found_time", store_hash_data->store_time);
+ //char *value = cJSON_Print(value_node);
+ char *value = cJSON_PrintUnformatted(value_node);
cJSON_AddStringToObject(feedback_node, "k", store_key);
cJSON_AddStringToObject(feedback_node, "v", value);
- cJSON_AddStringToObject(feedback_node, "v", value);
- cJSON_AddNumberToObject(feedback_node, "found_time", store_hash_data->store_time);
- char *feedback = cJSON_Print(feedback_node);
+ //char *feedback = cJSON_Print(feedback_node);
+ char *feedback = cJSON_PrintUnformatted(feedback_node);
rd_kafka_produce(g_ntc_http_collect_kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, feedback, strlen(feedback), NULL, 0, NULL);
@@ -335,10 +333,10 @@ static void NTC_HTTP_COLLECT_load_profile()
{
short fs_server_port = 0;
char fs_server_ip[128] = "";
- MESA_load_profile_uint_def(g_item.profile_path, "moudle", "stat_interval", &g_item.stat_interval, 1);
- MESA_load_profile_string_def(g_item.profile_path, "moudle", "stat_path", g_item.stat_path, sizeof(g_item.stat_path), "./avlog/url_discovery.stat");
- MESA_load_profile_string_nodef(g_item.profile_path,"moudle","stat_server_ip",fs_server_ip, sizeof(fs_server_ip));
- MESA_load_profile_short_nodef(g_item.profile_path, "moudle","stat_server_port", &(fs_server_port));
+ MESA_load_profile_uint_def(g_item.profile_path, PLUGIN_NAME, "stat_interval", &g_item.stat_interval, 1);
+ MESA_load_profile_string_def(g_item.profile_path, PLUGIN_NAME, "stat_path", g_item.stat_path, sizeof(g_item.stat_path), "./avlog/url_discovery.stat");
+ MESA_load_profile_string_nodef(g_item.profile_path,PLUGIN_NAME,"stat_server_ip",fs_server_ip, sizeof(fs_server_ip));
+ MESA_load_profile_short_nodef(g_item.profile_path, PLUGIN_NAME,"stat_server_port", &(fs_server_port));
FILE *fp = fopen(g_item.stat_path, "w");
if(fp != NULL)
@@ -346,6 +344,7 @@ static void NTC_HTTP_COLLECT_load_profile()
fclose(fp);
g_item.field_handle = FS_create_handle();
FS_set_para(g_item.field_handle, OUTPUT_DEVICE, (const void *)g_item.stat_path, strlen(g_item.stat_path));
+ FS_set_para(g_item.field_handle, APP_NAME, "ntc_http", strlen("ntc_http")+1);
int value = 2;
FS_set_para(g_item.field_handle, PRINT_MODE, (const void *)&value, sizeof(int));
value = 1;
@@ -368,12 +367,16 @@ static void NTC_HTTP_COLLECT_load_profile()
g_item.fs_url_len_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "URL_LEN");
g_item.fs_refer_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "REFER_NUM");
g_item.fs_refer_len_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "REFER_LEN");
- g_item.fs_single_url_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "SINGLE_NUM");
- g_item.fs_double_url_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "DOUBLE_NUM");
- g_item.fs_filter_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "FILTER_NUM");
-
- g_item.fs_err_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ERR_NUM");
- g_item.fs_drop_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "DROP_NUM");
+ g_item.fs_single_url_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "C2S_HTTP");
+ g_item.fs_double_url_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "DOUBLE_HTTP");
+ g_item.fs_s2c_http_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "S2C_HTTP");
+
+ g_item.fs_c2s_nourl_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "C2S_NOURL");
+ g_item.fs_double_nourl_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "DOUBLE_NOURL");
+ g_item.fs_lost_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "LOST");
+ g_item.fs_err_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ERR");
+ g_item.fs_drop_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "DROP");
+ g_item.fs_filter_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "FILTER");
FS_start(g_item.field_handle);
}
}
@@ -383,7 +386,7 @@ static void NTC_HTTP_COLLECT_load_profile()
int NTC_HTTP_COLLECT_kaka_init(int kafka_mode)
{
char kafka_errstr[1024];
- MESA_load_profile_string_def(g_item.profile_path, PLUGIN_NAME, "topic", g_item.topic_buf, sizeof(g_item.topic_buf), "AIM");
+ MESA_load_profile_string_def(g_item.profile_path, PLUGIN_NAME, "kafka_topic", g_item.topic_buf, sizeof(g_item.topic_buf), "AIM");
if((kafka_mode&INDIE_KAFKA) == INDIE_KAFKA)
{
if(0 > MESA_load_profile_string_nodef(PROFILE_PATH, PLUGIN_NAME, "kafka_brokelist", g_item.kafka_brokelist, sizeof(g_item.kafka_brokelist)))
@@ -438,7 +441,7 @@ int NTC_HTTP_COLLECT_kaka_init(int kafka_mode)
return 0;
}
-int NTC_HTTP_COLLECT_INIT(void)
+extern "C" int NTC_HTTP_COLLECT_INIT(void)
{
int rec = 0;
memset(&g_item, 0, sizeof(g_item));
@@ -446,8 +449,9 @@ int NTC_HTTP_COLLECT_INIT(void)
rec = http_url_discovery_init_hash(g_iThreadNum);
if(rec != 0)
return -1;
+ rec = NTC_HTTP_COLLECT_kaka_init(g_item.kafka_mode);
+ if(rec != 0)return -1;
- if(NTC_HTTP_COLLECT_kaka_init(g_item.kafka_mode) == 0)return -1;
if(g_item.enable_filter == 1)http_url_discovery_load_filter(g_item.filter_file_path);
g_item.log_handle = MESA_create_runtime_log_handle(g_item.log_path, g_item.log_level);
@@ -462,14 +466,17 @@ extern "C" UCHAR NTC_HTTP_COLLECT_ENTRY(stSessionInfo *session_info, void **para
{
http_infor *http_info = NULL;
if(a_tcp->dir == DIR_S2C)
+ {
+ FS_operate(g_item.field_handle, g_item.fs_s2c_http_num_id,0, FS_OP_ADD, 1);
return PROT_STATE_DROPME;
+ }
if(a_tcp->dir == DIR_DOUBLE && g_item.enable_double == 0)
return PROT_STATE_DROPME;
if(session_info->prot_flag == HTTP_REFERER || session_info->prot_flag == HTTP_MESSAGE_URL)
{
if(session_info->prot_flag == HTTP_MESSAGE_URL && a_tcp->ptcpdetail->lostlen > 0 && g_item.enable_lostlen == 1)
{
- if(g_item.field_handle != NULL)FS_operate(g_item.field_handle, g_item.fs_drop_num_id,0, FS_OP_ADD, 1);
+ if(g_item.field_handle != NULL)FS_operate(g_item.field_handle, g_item.fs_lost_num_id,0, FS_OP_ADD, 1);
if(*param != NULL)
{
store_data_free(*param);
@@ -485,6 +492,9 @@ extern "C" UCHAR NTC_HTTP_COLLECT_ENTRY(stSessionInfo *session_info, void **para
p_hash_data->store_time = g_CurrentTime;
if(session_info->prot_flag == HTTP_MESSAGE_URL && session_info->buf != NULL && session_info->buflen > 0)
{
+ FS_operate(g_item.field_handle, g_item.fs_url_num_id,0, FS_OP_ADD, 1);
+ FS_operate(g_item.field_handle, g_item.fs_url_len_id,0, FS_OP_ADD, session_info->buflen);
+
if(a_tcp->dir == DIR_DOUBLE && g_item.field_handle != NULL)
FS_operate(g_item.field_handle, g_item.fs_double_url_num_id,0, FS_OP_ADD, 1);
if(a_tcp->dir == DIR_C2S && g_item.field_handle != NULL)
@@ -499,6 +509,9 @@ extern "C" UCHAR NTC_HTTP_COLLECT_ENTRY(stSessionInfo *session_info, void **para
p_hash_data->refer_buf = (char *)calloc(session_info->buflen+1, 1);
memcpy(p_hash_data->refer_buf, session_info->buf, session_info->buflen);
p_hash_data->refer_len = session_info->buflen;
+
+ FS_operate(g_item.field_handle, g_item.fs_refer_num_id,0, FS_OP_ADD, 1);
+ FS_operate(g_item.field_handle, g_item.fs_refer_len_id,0, FS_OP_ADD, session_info->buflen);
}
}
else if(session_info->prot_flag == HTTP_STATE)
@@ -544,14 +557,24 @@ extern "C" UCHAR NTC_HTTP_COLLECT_ENTRY(stSessionInfo *session_info, void **para
http_info = (http_infor *)session_info->app_info;
if(http_info->method != HTTP_METHOD_CONNECT)
{
- if(g_item.field_handle != NULL)FS_operate(g_item.field_handle, g_item.fs_err_num_id,0, FS_OP_ADD, 1);
+ //if(g_item.field_handle != NULL)FS_operate(g_item.field_handle, g_item.fs_err_num_id,0, FS_OP_ADD, 1);
int tcp_seq = 0;
int tcp_seq_len = sizeof(tcp_seq);
MESA_get_stream_opt(a_tcp, MSO_TCP_ISN_C2S, &tcp_seq, &tcp_seq_len);
http_infor *p_http = (http_infor *)(session_info->app_info);
char tuple_buf[MAX_BUF_LEN];
+ if(a_tcp->dir == DIR_C2S)
+ {
+ FS_operate(g_item.field_handle, g_item.fs_c2s_nourl_num_id,0, FS_OP_ADD, 1);
+ }
+ if(a_tcp->dir == DIR_DOUBLE)
+ {
+ FS_operate(g_item.field_handle, g_item.fs_double_nourl_num_id,0, FS_OP_ADD, 1);
+ }
+
+ FS_operate(g_item.field_handle, g_item.fs_refer_num_id,0, FS_OP_ADD, 1);
sprintf(tuple_buf, "%s#%d#0#%u", printaddr(&a_tcp->addr, a_tcp->threadnum), p_http->http_session_seq, (unsigned int)tcp_seq);
- MESA_handle_runtime_log(g_item.log_handle, RLOG_LV_FATAL, APP_NAME, "C2S or DOUBLE HTTP session without URL, dir = %d, tuple5 = %s", a_tcp->dir, tuple_buf);
+ MESA_handle_runtime_log(g_item.log_handle, RLOG_LV_FATAL, __FUNCTION__, "C2S or DOUBLE HTTP session without URL, dir = %d, tuple5 = %s", a_tcp->dir, tuple_buf);
}
}
@@ -561,7 +584,7 @@ extern "C" UCHAR NTC_HTTP_COLLECT_ENTRY(stSessionInfo *session_info, void **para
return PROT_STATE_GIVEME;
}
-void NTC_HTTP_COLLECT_DESTROY(void)
+extern "C" void NTC_HTTP_COLLECT_DESTROY(void)
{
MESA_destroy_runtime_log_handle(g_item.log_handle);
http_url_discovery_deinit_hash(g_iThreadNum);
diff --git a/src/ntc_http_collect.h b/src/ntc_http_collect.h
index 67f7858..8755e5e 100644
--- a/src/ntc_http_collect.h
+++ b/src/ntc_http_collect.h
@@ -28,6 +28,10 @@ typedef struct _g_http_url_discovery_item
int fs_single_url_num_id;
int fs_double_url_num_id;
+ int fs_s2c_http_num_id;
+ int fs_lost_num_id;
+ int fs_c2s_nourl_num_id;
+ int fs_double_nourl_num_id;
int fs_filter_num_id;
int fs_url_num_id;