diff options
| author | yangwei <[email protected]> | 2019-01-08 17:13:53 +0600 |
|---|---|---|
| committer | yangwei <[email protected]> | 2019-01-08 17:13:53 +0600 |
| commit | 2adf5c81f181e18fb996ecd973724ea8687c9c5d (patch) | |
| tree | 8968555a44d5f6c06a840871505f293aaebd940e | |
| parent | a4f1f69cd9be7f87db1aea8208a1010d04625a22 (diff) | |
同步线上代码
1、新增fs2输出项
2、调整默认配置文件读取路径
| -rw-r--r-- | src/ntc_http_collect.c | 83 | ||||
| -rw-r--r-- | src/ntc_http_collect.h | 4 |
2 files changed, 57 insertions, 30 deletions
diff --git a/src/ntc_http_collect.c b/src/ntc_http_collect.c index 743455e..2ac741b 100644 --- a/src/ntc_http_collect.c +++ b/src/ntc_http_collect.c @@ -37,8 +37,8 @@ #define LOCAL_IP "127.0.0.1" #define PLUGIN_NAME "NTC_HTTP_COLLECT" -#define PROFILE_PATH "./avconf/av.conf" -#define FILTER_PATH "./avconf/http_url_filter.conf" +#define PROFILE_PATH "./t1conf/main.conf" +#define FILTER_PATH "./t1conf/http_url_filter.conf" #ifndef MAX_BUF_LEN #define MAX_BUF_LEN 4096 @@ -51,8 +51,6 @@ #define MIN(a, b) (((a) < (b)) ? (a) : (b)) -#define APP_NAME "NTC_HTTP_COLLECT" - int NTC_HTTP_COLLECT_VERSION_20180910 = 1; @@ -118,13 +116,12 @@ static int append_http_query_key(struct streaminfo *a_tcp,int http_seq, char *ou } ret = MESA_get_stream_opt(a_tcp, MSO_TCP_ISN_C2S, &tcp_seq, &tcp_seq_len); - if(ret != 0) - return -1; - + if(ret != 0)return -1; + ret = MESA_get_stream_opt(a_tcp, MSO_STREAM_TUNNEL_TYPE, &tunnel_type, &tunnel_type_len); - if(ret != 0) - return -1; + if(ret != 0)return -1; + if(STREAM_TUNNLE_NON == tunnel_type) { const char * addr = printaddr(&a_tcp->addr, a_tcp->threadnum); @@ -192,15 +189,16 @@ static void feedback_url_to_kafka(streaminfo *a_tcp, int http_seq, store_hash_da { cJSON_AddStringToObject(value_node, "referer", store_hash_data->refer_buf); } - char *value = cJSON_Print(value_node); + cJSON_AddNumberToObject(value_node, "found_time", store_hash_data->store_time); + //char *value = cJSON_Print(value_node); + char *value = cJSON_PrintUnformatted(value_node); cJSON_AddStringToObject(feedback_node, "k", store_key); cJSON_AddStringToObject(feedback_node, "v", value); - cJSON_AddStringToObject(feedback_node, "v", value); - cJSON_AddNumberToObject(feedback_node, "found_time", store_hash_data->store_time); - char *feedback = cJSON_Print(feedback_node); + //char *feedback = cJSON_Print(feedback_node); + char *feedback = cJSON_PrintUnformatted(feedback_node); rd_kafka_produce(g_ntc_http_collect_kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, feedback, strlen(feedback), NULL, 0, NULL); @@ -335,10 +333,10 @@ static void NTC_HTTP_COLLECT_load_profile() { short fs_server_port = 0; char fs_server_ip[128] = ""; - MESA_load_profile_uint_def(g_item.profile_path, "moudle", "stat_interval", &g_item.stat_interval, 1); - MESA_load_profile_string_def(g_item.profile_path, "moudle", "stat_path", g_item.stat_path, sizeof(g_item.stat_path), "./avlog/url_discovery.stat"); - MESA_load_profile_string_nodef(g_item.profile_path,"moudle","stat_server_ip",fs_server_ip, sizeof(fs_server_ip)); - MESA_load_profile_short_nodef(g_item.profile_path, "moudle","stat_server_port", &(fs_server_port)); + MESA_load_profile_uint_def(g_item.profile_path, PLUGIN_NAME, "stat_interval", &g_item.stat_interval, 1); + MESA_load_profile_string_def(g_item.profile_path, PLUGIN_NAME, "stat_path", g_item.stat_path, sizeof(g_item.stat_path), "./avlog/url_discovery.stat"); + MESA_load_profile_string_nodef(g_item.profile_path,PLUGIN_NAME,"stat_server_ip",fs_server_ip, sizeof(fs_server_ip)); + MESA_load_profile_short_nodef(g_item.profile_path, PLUGIN_NAME,"stat_server_port", &(fs_server_port)); FILE *fp = fopen(g_item.stat_path, "w"); if(fp != NULL) @@ -346,6 +344,7 @@ static void NTC_HTTP_COLLECT_load_profile() fclose(fp); g_item.field_handle = FS_create_handle(); FS_set_para(g_item.field_handle, OUTPUT_DEVICE, (const void *)g_item.stat_path, strlen(g_item.stat_path)); + FS_set_para(g_item.field_handle, APP_NAME, "ntc_http", strlen("ntc_http")+1); int value = 2; FS_set_para(g_item.field_handle, PRINT_MODE, (const void *)&value, sizeof(int)); value = 1; @@ -368,12 +367,16 @@ static void NTC_HTTP_COLLECT_load_profile() g_item.fs_url_len_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "URL_LEN"); g_item.fs_refer_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "REFER_NUM"); g_item.fs_refer_len_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "REFER_LEN"); - g_item.fs_single_url_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "SINGLE_NUM"); - g_item.fs_double_url_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "DOUBLE_NUM"); - g_item.fs_filter_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "FILTER_NUM"); - - g_item.fs_err_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ERR_NUM"); - g_item.fs_drop_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "DROP_NUM"); + g_item.fs_single_url_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "C2S_HTTP"); + g_item.fs_double_url_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "DOUBLE_HTTP"); + g_item.fs_s2c_http_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "S2C_HTTP"); + + g_item.fs_c2s_nourl_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "C2S_NOURL"); + g_item.fs_double_nourl_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "DOUBLE_NOURL"); + g_item.fs_lost_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "LOST"); + g_item.fs_err_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ERR"); + g_item.fs_drop_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "DROP"); + g_item.fs_filter_num_id = FS_register(g_item.field_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "FILTER"); FS_start(g_item.field_handle); } } @@ -383,7 +386,7 @@ static void NTC_HTTP_COLLECT_load_profile() int NTC_HTTP_COLLECT_kaka_init(int kafka_mode) { char kafka_errstr[1024]; - MESA_load_profile_string_def(g_item.profile_path, PLUGIN_NAME, "topic", g_item.topic_buf, sizeof(g_item.topic_buf), "AIM"); + MESA_load_profile_string_def(g_item.profile_path, PLUGIN_NAME, "kafka_topic", g_item.topic_buf, sizeof(g_item.topic_buf), "AIM"); if((kafka_mode&INDIE_KAFKA) == INDIE_KAFKA) { if(0 > MESA_load_profile_string_nodef(PROFILE_PATH, PLUGIN_NAME, "kafka_brokelist", g_item.kafka_brokelist, sizeof(g_item.kafka_brokelist))) @@ -438,7 +441,7 @@ int NTC_HTTP_COLLECT_kaka_init(int kafka_mode) return 0; } -int NTC_HTTP_COLLECT_INIT(void) +extern "C" int NTC_HTTP_COLLECT_INIT(void) { int rec = 0; memset(&g_item, 0, sizeof(g_item)); @@ -446,8 +449,9 @@ int NTC_HTTP_COLLECT_INIT(void) rec = http_url_discovery_init_hash(g_iThreadNum); if(rec != 0) return -1; + rec = NTC_HTTP_COLLECT_kaka_init(g_item.kafka_mode); + if(rec != 0)return -1; - if(NTC_HTTP_COLLECT_kaka_init(g_item.kafka_mode) == 0)return -1; if(g_item.enable_filter == 1)http_url_discovery_load_filter(g_item.filter_file_path); g_item.log_handle = MESA_create_runtime_log_handle(g_item.log_path, g_item.log_level); @@ -462,14 +466,17 @@ extern "C" UCHAR NTC_HTTP_COLLECT_ENTRY(stSessionInfo *session_info, void **para { http_infor *http_info = NULL; if(a_tcp->dir == DIR_S2C) + { + FS_operate(g_item.field_handle, g_item.fs_s2c_http_num_id,0, FS_OP_ADD, 1); return PROT_STATE_DROPME; + } if(a_tcp->dir == DIR_DOUBLE && g_item.enable_double == 0) return PROT_STATE_DROPME; if(session_info->prot_flag == HTTP_REFERER || session_info->prot_flag == HTTP_MESSAGE_URL) { if(session_info->prot_flag == HTTP_MESSAGE_URL && a_tcp->ptcpdetail->lostlen > 0 && g_item.enable_lostlen == 1) { - if(g_item.field_handle != NULL)FS_operate(g_item.field_handle, g_item.fs_drop_num_id,0, FS_OP_ADD, 1); + if(g_item.field_handle != NULL)FS_operate(g_item.field_handle, g_item.fs_lost_num_id,0, FS_OP_ADD, 1); if(*param != NULL) { store_data_free(*param); @@ -485,6 +492,9 @@ extern "C" UCHAR NTC_HTTP_COLLECT_ENTRY(stSessionInfo *session_info, void **para p_hash_data->store_time = g_CurrentTime; if(session_info->prot_flag == HTTP_MESSAGE_URL && session_info->buf != NULL && session_info->buflen > 0) { + FS_operate(g_item.field_handle, g_item.fs_url_num_id,0, FS_OP_ADD, 1); + FS_operate(g_item.field_handle, g_item.fs_url_len_id,0, FS_OP_ADD, session_info->buflen); + if(a_tcp->dir == DIR_DOUBLE && g_item.field_handle != NULL) FS_operate(g_item.field_handle, g_item.fs_double_url_num_id,0, FS_OP_ADD, 1); if(a_tcp->dir == DIR_C2S && g_item.field_handle != NULL) @@ -499,6 +509,9 @@ extern "C" UCHAR NTC_HTTP_COLLECT_ENTRY(stSessionInfo *session_info, void **para p_hash_data->refer_buf = (char *)calloc(session_info->buflen+1, 1); memcpy(p_hash_data->refer_buf, session_info->buf, session_info->buflen); p_hash_data->refer_len = session_info->buflen; + + FS_operate(g_item.field_handle, g_item.fs_refer_num_id,0, FS_OP_ADD, 1); + FS_operate(g_item.field_handle, g_item.fs_refer_len_id,0, FS_OP_ADD, session_info->buflen); } } else if(session_info->prot_flag == HTTP_STATE) @@ -544,14 +557,24 @@ extern "C" UCHAR NTC_HTTP_COLLECT_ENTRY(stSessionInfo *session_info, void **para http_info = (http_infor *)session_info->app_info; if(http_info->method != HTTP_METHOD_CONNECT) { - if(g_item.field_handle != NULL)FS_operate(g_item.field_handle, g_item.fs_err_num_id,0, FS_OP_ADD, 1); + //if(g_item.field_handle != NULL)FS_operate(g_item.field_handle, g_item.fs_err_num_id,0, FS_OP_ADD, 1); int tcp_seq = 0; int tcp_seq_len = sizeof(tcp_seq); MESA_get_stream_opt(a_tcp, MSO_TCP_ISN_C2S, &tcp_seq, &tcp_seq_len); http_infor *p_http = (http_infor *)(session_info->app_info); char tuple_buf[MAX_BUF_LEN]; + if(a_tcp->dir == DIR_C2S) + { + FS_operate(g_item.field_handle, g_item.fs_c2s_nourl_num_id,0, FS_OP_ADD, 1); + } + if(a_tcp->dir == DIR_DOUBLE) + { + FS_operate(g_item.field_handle, g_item.fs_double_nourl_num_id,0, FS_OP_ADD, 1); + } + + FS_operate(g_item.field_handle, g_item.fs_refer_num_id,0, FS_OP_ADD, 1); sprintf(tuple_buf, "%s#%d#0#%u", printaddr(&a_tcp->addr, a_tcp->threadnum), p_http->http_session_seq, (unsigned int)tcp_seq); - MESA_handle_runtime_log(g_item.log_handle, RLOG_LV_FATAL, APP_NAME, "C2S or DOUBLE HTTP session without URL, dir = %d, tuple5 = %s", a_tcp->dir, tuple_buf); + MESA_handle_runtime_log(g_item.log_handle, RLOG_LV_FATAL, __FUNCTION__, "C2S or DOUBLE HTTP session without URL, dir = %d, tuple5 = %s", a_tcp->dir, tuple_buf); } } @@ -561,7 +584,7 @@ extern "C" UCHAR NTC_HTTP_COLLECT_ENTRY(stSessionInfo *session_info, void **para return PROT_STATE_GIVEME; } -void NTC_HTTP_COLLECT_DESTROY(void) +extern "C" void NTC_HTTP_COLLECT_DESTROY(void) { MESA_destroy_runtime_log_handle(g_item.log_handle); http_url_discovery_deinit_hash(g_iThreadNum); diff --git a/src/ntc_http_collect.h b/src/ntc_http_collect.h index 67f7858..8755e5e 100644 --- a/src/ntc_http_collect.h +++ b/src/ntc_http_collect.h @@ -28,6 +28,10 @@ typedef struct _g_http_url_discovery_item int fs_single_url_num_id; int fs_double_url_num_id; + int fs_s2c_http_num_id; + int fs_lost_num_id; + int fs_c2s_nourl_num_id; + int fs_double_nourl_num_id; int fs_filter_num_id; int fs_url_num_id; |
