diff options
Diffstat (limited to 'src/http_decoder.cpp')
| -rw-r--r-- | src/http_decoder.cpp | 155 |
1 files changed, 107 insertions, 48 deletions
diff --git a/src/http_decoder.cpp b/src/http_decoder.cpp index 750a739..8aa39e9 100644 --- a/src/http_decoder.cpp +++ b/src/http_decoder.cpp @@ -1,10 +1,9 @@ #include <assert.h> #include <stdio.h> +#include <string.h> #include <unistd.h> #include "http_decoder_inc.h" -__thread struct http_decoder_stat _th_stat; - struct http_message *http_message_new(enum http_message_type type, struct http_decoder_result_queue *queue, int queue_index, uint8_t flow_type) { @@ -25,10 +24,11 @@ static void http_message_free(struct session *sess, void *http_msg, void *cb_arg } static void http_event_handler(enum http_event event, struct http_decoder_half_data **data, - struct http_event_context *ev_ctx) + struct http_event_context *ev_ctx, void *httpd_plugin_env) { assert(ev_ctx); - + assert(httpd_plugin_env); + struct http_decoder_env *httpd_env = (struct http_decoder_env *)httpd_plugin_env; size_t queue_idx = 0; nmx_pool_t *mempool = ev_ctx->ref_mempool; struct http_decoder_result_queue *queue = ev_ctx->ref_queue; @@ -37,10 +37,14 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d int ret = 0; u_int8_t flow_flag = 0; + int thread_id = stellar_get_current_thread_id(httpd_env->st); + if(http_event_is_req(event)){ queue_idx = http_decoder_result_queue_req_index(queue); + half_data = http_decoder_result_queue_peek_req(queue); }else{ queue_idx = http_decoder_result_queue_res_index(queue); + half_data = http_decoder_result_queue_peek_res(queue); } switch (event) @@ -70,10 +74,18 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d } *data = half_data; queue_idx = http_decoder_result_queue_req_index(queue); //get the index after inc +#if 1 /* llhttp always call on_message_begin() even if llhttp_execute() error!!! */ msg = http_message_new(HTTP_TRANSACTION_NEW, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_NEW, 1); +#endif break; case HTTP_EVENT_REQ_LINE: +#if 0 + msg = http_message_new(HTTP_TRANSACTION_NEW, queue, queue_idx, HTTP_REQUEST); + session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_NEW, 1); +#endif msg = http_message_new(HTTP_MESSAGE_REQ_LINE, queue, queue_idx, HTTP_REQUEST); session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); break; @@ -92,7 +104,14 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d } http_half_data_update_commit_index(half_data); msg = http_message_new(HTTP_MESSAGE_REQ_HEADER_END, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + + int tot_c2s_headers = http_half_data_get_total_parsed_header_count(half_data); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_HEADERS_C2S, tot_c2s_headers); + + struct hstring tmp_url = {}; + http_half_data_get_url(half_data, &tmp_url); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_URL_BYTES, tmp_url.str_len); } break; case HTTP_EVENT_REQ_BODY_BEGIN: @@ -108,12 +127,14 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d break; case HTTP_EVENT_REQ_END: { - if(0 == session_is_symmetric(ev_ctx->ref_session, &flow_flag)){ - if(SESSION_SEEN_C2S_FLOW == flow_flag){ - msg = http_message_new(HTTP_TRANSACTION_FREE, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); - } + session_is_symmetric(ev_ctx->ref_session, &flow_flag); + if(SESSION_SEEN_C2S_FLOW == flow_flag){ + msg = http_message_new(HTTP_TRANSACTION_FREE, queue, queue_idx, HTTP_REQUEST); + session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_FREE, 1); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_ASYMMETRY_TRANSACTION_C2S, 1); } + http_half_update_state(half_data, event); http_decoder_result_queue_inc_req_index(queue); half_data = http_decoder_result_queue_pop_req(queue); if (half_data != NULL) @@ -161,10 +182,11 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); break; case HTTP_EVENT_RES_HDR: - msg = http_message_new(HTTP_MESSAGE_RES_HEADER, queue, queue_idx, HTTP_REQUEST); + msg = http_message_new(HTTP_MESSAGE_RES_HEADER, queue, queue_idx, HTTP_RESPONSE); session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); break; case HTTP_EVENT_RES_HDR_END: + { /* maybe some header in table buffer but has not pushed to plugins */ half_data = http_decoder_result_queue_peek_res(queue); if(http_decoder_half_data_has_parsed_header(half_data)){ @@ -174,6 +196,10 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d http_half_data_update_commit_index(half_data); msg = http_message_new(HTTP_MESSAGE_RES_HEADER_END, queue, queue_idx, HTTP_RESPONSE); session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + + int tot_s2c_headers = http_half_data_get_total_parsed_header_count(half_data); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_HEADERS_S2C, tot_s2c_headers); + } break; case HTTP_EVENT_RES_BODY_BEGIN: break; @@ -188,6 +214,12 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d case HTTP_EVENT_RES_END: msg = http_message_new(HTTP_TRANSACTION_FREE, queue, queue_idx, HTTP_RESPONSE); session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_FREE, 1); + session_is_symmetric(ev_ctx->ref_session, &flow_flag); + if(SESSION_SEEN_S2C_FLOW == flow_flag){ + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_ASYMMETRY_TRANSACTION_S2C, 1); + } + http_half_update_state(half_data, event); http_decoder_result_queue_inc_res_index(queue); half_data = http_decoder_result_queue_pop_res(queue); if (half_data != NULL) @@ -200,14 +232,17 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d assert(0); break; } + if(half_data){ + http_half_update_state(half_data, event); + } } -static struct http_decoder *http_decoder_new(nmx_pool_t *mempool, http_event_cb *ev_cb, int decompress_switch) +static struct http_decoder *http_decoder_new(nmx_pool_t *mempool, http_event_cb *ev_cb, int decompress_switch, struct http_decoder_env *httpd_env) { struct http_decoder *decoder = MEMPOOL_CALLOC(mempool, struct http_decoder, 1); assert(decoder); - decoder->c2s_half = http_decoder_half_new(mempool, ev_cb, HTTP_REQUEST, decompress_switch); - decoder->s2c_half = http_decoder_half_new(mempool, ev_cb, HTTP_RESPONSE, decompress_switch); + decoder->c2s_half = http_decoder_half_new(mempool, ev_cb, HTTP_REQUEST, decompress_switch, httpd_env); + decoder->s2c_half = http_decoder_half_new(mempool, ev_cb, HTTP_RESPONSE, decompress_switch, httpd_env); return decoder; } @@ -230,11 +265,12 @@ static void http_decoder_free(nmx_pool_t *mempool, struct http_decoder *decoder) MEMPOOL_FREE(mempool, decoder); } -static struct http_decoder_exdata *http_decoder_exdata_new(size_t mempool_size, size_t queue_size,int decompress_switch) +static struct http_decoder_exdata *http_decoder_exdata_new(size_t mempool_size, size_t queue_size, + int decompress_switch, struct http_decoder_env *httpd_env) { struct http_decoder_exdata *ex_data = CALLOC(struct http_decoder_exdata, 1); ex_data->mempool = nmx_create_pool(mempool_size); - ex_data->decoder = http_decoder_new(ex_data->mempool, http_event_handler, decompress_switch); + ex_data->decoder = http_decoder_new(ex_data->mempool, http_event_handler, decompress_switch, httpd_env); ex_data->queue = http_decoder_result_queue_new(ex_data->mempool, queue_size); return ex_data; } @@ -277,26 +313,27 @@ static int http_protocol_identify(const char *data, size_t data_len) return 0; } -static void _http_decoder_context_free(struct http_decoder_context *ctx) +static void _http_decoder_context_free(struct http_decoder_env *env) { - if (NULL == ctx) + if (NULL == env) { return; } - - if (ctx->fse != NULL) + if (env->hd_stat.fse != NULL) { - fieldstat_easy_free(ctx->fse); - ctx->fse = NULL; + fieldstat_easy_free(env->hd_stat.fse); + env->hd_stat.fse = NULL; } - if (ctx->httpd_msg_topic_id >= 0) + http_decoder_stat_free(&env->hd_stat); + + if (env->httpd_msg_topic_id >= 0) { - stellar_session_mq_destroy_topic(ctx->st, ctx->httpd_msg_topic_id); - ctx->httpd_msg_topic_id = -1; + stellar_session_mq_destroy_topic(env->st, env->httpd_msg_topic_id); + env->httpd_msg_topic_id = -1; } - FREE(ctx); + FREE(env); } static int load_http_decoder_config(const char *cfg_path, @@ -444,8 +481,7 @@ extern "C" { #endif - void _httpd_ex_data_free_cb(struct session *s, int idx, - void *ex_data, void *arg) + void _httpd_ex_data_free_cb(struct session *s, int idx, void *ex_data, void *arg) { if (NULL == ex_data) { @@ -459,40 +495,63 @@ extern "C" { // If not http, ignore this session size_t payload_len; - const struct http_decoder_context *httpd_env = (struct http_decoder_context *)plugin_env; + struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env; const char *payload = session_get0_current_payload(sess, &payload_len); if (payload != NULL && payload_len > 0) { - size_t http_identify_len = payload_len > HTTP_IDENTIFY_LEN - ? HTTP_IDENTIFY_LEN - : payload_len; - + size_t http_identify_len = payload_len > HTTP_IDENTIFY_LEN ? HTTP_IDENTIFY_LEN : payload_len; int ret = http_protocol_identify(payload, http_identify_len); if (ret < 0) { stellar_session_plugin_dettach_current_session(sess); - return NULL; + return (void *)"not_http_session"; } } struct http_decoder_exdata *ex_data = http_decoder_exdata_new(httpd_env->hd_cfg.mempool_size, httpd_env->hd_cfg.result_queue_len, - httpd_env->hd_cfg.decompress_switch); + httpd_env->hd_cfg.decompress_switch, + httpd_env); session_exdata_set(sess, httpd_env->ex_data_idx, ex_data); + int thread_id = stellar_get_current_thread_id(httpd_env->st); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_SESSION_NEW, 1); + return (void *)"fake_http_decoder_ctx"; // http decoder not use ctx, use exdata only! } void _httpd_session_ctx_free_cb(struct session *sess, void *session_ctx, void *plugin_env) { - // done in _httpd_ex_data_free_cb() + if(NULL == plugin_env || NULL == session_ctx){ + return; + } + if(strncmp((const char *)session_ctx, "not_http_session", strlen("not_http_session")) == 0){ + return; + } + struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env; + int thread_id = stellar_get_current_thread_id(httpd_env->st); + unsigned char flow_flag = 0; + session_is_symmetric(sess, &flow_flag); + + if(SESSION_SEEN_C2S_FLOW == flow_flag){ + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_ASYMMETRY_SESSION_C2S, 1); + }else if(SESSION_SEEN_S2C_FLOW == flow_flag){ + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_ASYMMETRY_SESSION_S2C, 1); + }else{ + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_SESSION_FREE, 1); + } } void http_decoder_tcp_stream_msg_cb(struct session *sess, int topic_id, const void *msg, void *no_use_ctx, void *plugin_env) { - struct http_decoder_context *httpd_env = (struct http_decoder_context *)plugin_env; + struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env; struct http_decoder_exdata *ex_data = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->ex_data_idx); size_t payload_len; + if(SESSION_STATE_CLOSING == session_get_current_state(sess)){ + http_half_pre_context_free(sess, httpd_env, ex_data); + return; + } + const char *payload = session_get0_current_payload(sess, &payload_len); if (unlikely(0 == payload_len || NULL == ex_data)) { @@ -505,10 +564,14 @@ extern "C" if (PACKET_DIRECTION_C2S == packet_get_direction(session_get0_current_packet(sess))) { cur_half = ex_data->decoder->c2s_half; + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_BYTES_C2S, payload_len); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TCP_SEG_C2S, 1); } else { cur_half = ex_data->decoder->s2c_half; + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_BYTES_S2C, payload_len); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TCP_SEG_S2C, 1); } http_decoder_half_reinit(cur_half, httpd_env->httpd_msg_topic_id, ex_data->queue, @@ -516,14 +579,9 @@ extern "C" int ret = http_decoder_half_parse(cur_half, payload, payload_len); if (ret < 0) { - _th_stat.err_pkts += 1; + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_PARSE_ERR, 1); + stellar_session_plugin_dettach_current_session(sess); } - _th_stat.incoming_bytes += payload_len; - _th_stat.incoming_pkts += 1; - _th_stat.incoming_trans += http_decoder_half_trans_count(cur_half); - _th_stat.counter++; - - http_decoder_stat_output(httpd_env, thread_id); return; } @@ -532,7 +590,7 @@ extern "C" int httpd_msg_topic_id = -1, tcp_stream_topic_id = -1; int thread_num = 0; - struct http_decoder_context *httpd_env = CALLOC(struct http_decoder_context, 1); + struct http_decoder_env *httpd_env = CALLOC(struct http_decoder_env, 1); int ret = load_http_decoder_config(HTTPD_CFG_FILE, &httpd_env->hd_cfg); if (ret < 0) { @@ -563,7 +621,8 @@ extern "C" thread_num = stellar_get_worker_thread_num(st); assert(thread_num >= 1); - if (http_decoder_stat_init(httpd_env, thread_num) < 0) + if (http_decoder_stat_init( &httpd_env->hd_stat, thread_num, + httpd_env->hd_cfg.stat_interval_pkts, httpd_env->hd_cfg.stat_output_interval) < 0) { goto failed; } @@ -584,7 +643,7 @@ extern "C" { return; } - struct http_decoder_context *httpd_env = (struct http_decoder_context *)plugin_env; + struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env; _http_decoder_context_free(httpd_env); } |
