summaryrefslogtreecommitdiff
path: root/src/http_decoder.cpp
diff options
context:
space:
mode:
authorlijia <[email protected]>2024-06-06 11:16:22 +0800
committerlijia <[email protected]>2024-06-06 11:16:22 +0800
commit7d6170a23027aff0ebf2e7832dc11e4bbdce57ea (patch)
treeaf9caf24c274b561bc92f07f8db7c14b89c5e425 /src/http_decoder.cpp
parent1c232f0176c43c93c3e787ac83f3573bf42c58f1 (diff)
add fieldstat4 statistics, push transaction free msg in session closing state.v2.0.3
Diffstat (limited to 'src/http_decoder.cpp')
-rw-r--r--src/http_decoder.cpp155
1 files changed, 107 insertions, 48 deletions
diff --git a/src/http_decoder.cpp b/src/http_decoder.cpp
index 750a739..8aa39e9 100644
--- a/src/http_decoder.cpp
+++ b/src/http_decoder.cpp
@@ -1,10 +1,9 @@
#include <assert.h>
#include <stdio.h>
+#include <string.h>
#include <unistd.h>
#include "http_decoder_inc.h"
-__thread struct http_decoder_stat _th_stat;
-
struct http_message *http_message_new(enum http_message_type type, struct http_decoder_result_queue *queue,
int queue_index, uint8_t flow_type)
{
@@ -25,10 +24,11 @@ static void http_message_free(struct session *sess, void *http_msg, void *cb_arg
}
static void http_event_handler(enum http_event event, struct http_decoder_half_data **data,
- struct http_event_context *ev_ctx)
+ struct http_event_context *ev_ctx, void *httpd_plugin_env)
{
assert(ev_ctx);
-
+ assert(httpd_plugin_env);
+ struct http_decoder_env *httpd_env = (struct http_decoder_env *)httpd_plugin_env;
size_t queue_idx = 0;
nmx_pool_t *mempool = ev_ctx->ref_mempool;
struct http_decoder_result_queue *queue = ev_ctx->ref_queue;
@@ -37,10 +37,14 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d
int ret = 0;
u_int8_t flow_flag = 0;
+ int thread_id = stellar_get_current_thread_id(httpd_env->st);
+
if(http_event_is_req(event)){
queue_idx = http_decoder_result_queue_req_index(queue);
+ half_data = http_decoder_result_queue_peek_req(queue);
}else{
queue_idx = http_decoder_result_queue_res_index(queue);
+ half_data = http_decoder_result_queue_peek_res(queue);
}
switch (event)
@@ -70,10 +74,18 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d
}
*data = half_data;
queue_idx = http_decoder_result_queue_req_index(queue); //get the index after inc
+#if 1 /* llhttp always call on_message_begin() even if llhttp_execute() error!!! */
msg = http_message_new(HTTP_TRANSACTION_NEW, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_NEW, 1);
+#endif
break;
case HTTP_EVENT_REQ_LINE:
+#if 0
+ msg = http_message_new(HTTP_TRANSACTION_NEW, queue, queue_idx, HTTP_REQUEST);
+ session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_NEW, 1);
+#endif
msg = http_message_new(HTTP_MESSAGE_REQ_LINE, queue, queue_idx, HTTP_REQUEST);
session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
break;
@@ -92,7 +104,14 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d
}
http_half_data_update_commit_index(half_data);
msg = http_message_new(HTTP_MESSAGE_REQ_HEADER_END, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+
+ int tot_c2s_headers = http_half_data_get_total_parsed_header_count(half_data);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_HEADERS_C2S, tot_c2s_headers);
+
+ struct hstring tmp_url = {};
+ http_half_data_get_url(half_data, &tmp_url);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_URL_BYTES, tmp_url.str_len);
}
break;
case HTTP_EVENT_REQ_BODY_BEGIN:
@@ -108,12 +127,14 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d
break;
case HTTP_EVENT_REQ_END:
{
- if(0 == session_is_symmetric(ev_ctx->ref_session, &flow_flag)){
- if(SESSION_SEEN_C2S_FLOW == flow_flag){
- msg = http_message_new(HTTP_TRANSACTION_FREE, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
- }
+ session_is_symmetric(ev_ctx->ref_session, &flow_flag);
+ if(SESSION_SEEN_C2S_FLOW == flow_flag){
+ msg = http_message_new(HTTP_TRANSACTION_FREE, queue, queue_idx, HTTP_REQUEST);
+ session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_FREE, 1);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_ASYMMETRY_TRANSACTION_C2S, 1);
}
+ http_half_update_state(half_data, event);
http_decoder_result_queue_inc_req_index(queue);
half_data = http_decoder_result_queue_pop_req(queue);
if (half_data != NULL)
@@ -161,10 +182,11 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d
session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
break;
case HTTP_EVENT_RES_HDR:
- msg = http_message_new(HTTP_MESSAGE_RES_HEADER, queue, queue_idx, HTTP_REQUEST);
+ msg = http_message_new(HTTP_MESSAGE_RES_HEADER, queue, queue_idx, HTTP_RESPONSE);
session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
break;
case HTTP_EVENT_RES_HDR_END:
+ {
/* maybe some header in table buffer but has not pushed to plugins */
half_data = http_decoder_result_queue_peek_res(queue);
if(http_decoder_half_data_has_parsed_header(half_data)){
@@ -174,6 +196,10 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d
http_half_data_update_commit_index(half_data);
msg = http_message_new(HTTP_MESSAGE_RES_HEADER_END, queue, queue_idx, HTTP_RESPONSE);
session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+
+ int tot_s2c_headers = http_half_data_get_total_parsed_header_count(half_data);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_HEADERS_S2C, tot_s2c_headers);
+ }
break;
case HTTP_EVENT_RES_BODY_BEGIN:
break;
@@ -188,6 +214,12 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d
case HTTP_EVENT_RES_END:
msg = http_message_new(HTTP_TRANSACTION_FREE, queue, queue_idx, HTTP_RESPONSE);
session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_FREE, 1);
+ session_is_symmetric(ev_ctx->ref_session, &flow_flag);
+ if(SESSION_SEEN_S2C_FLOW == flow_flag){
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_ASYMMETRY_TRANSACTION_S2C, 1);
+ }
+ http_half_update_state(half_data, event);
http_decoder_result_queue_inc_res_index(queue);
half_data = http_decoder_result_queue_pop_res(queue);
if (half_data != NULL)
@@ -200,14 +232,17 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d
assert(0);
break;
}
+ if(half_data){
+ http_half_update_state(half_data, event);
+ }
}
-static struct http_decoder *http_decoder_new(nmx_pool_t *mempool, http_event_cb *ev_cb, int decompress_switch)
+static struct http_decoder *http_decoder_new(nmx_pool_t *mempool, http_event_cb *ev_cb, int decompress_switch, struct http_decoder_env *httpd_env)
{
struct http_decoder *decoder = MEMPOOL_CALLOC(mempool, struct http_decoder, 1);
assert(decoder);
- decoder->c2s_half = http_decoder_half_new(mempool, ev_cb, HTTP_REQUEST, decompress_switch);
- decoder->s2c_half = http_decoder_half_new(mempool, ev_cb, HTTP_RESPONSE, decompress_switch);
+ decoder->c2s_half = http_decoder_half_new(mempool, ev_cb, HTTP_REQUEST, decompress_switch, httpd_env);
+ decoder->s2c_half = http_decoder_half_new(mempool, ev_cb, HTTP_RESPONSE, decompress_switch, httpd_env);
return decoder;
}
@@ -230,11 +265,12 @@ static void http_decoder_free(nmx_pool_t *mempool, struct http_decoder *decoder)
MEMPOOL_FREE(mempool, decoder);
}
-static struct http_decoder_exdata *http_decoder_exdata_new(size_t mempool_size, size_t queue_size,int decompress_switch)
+static struct http_decoder_exdata *http_decoder_exdata_new(size_t mempool_size, size_t queue_size,
+ int decompress_switch, struct http_decoder_env *httpd_env)
{
struct http_decoder_exdata *ex_data = CALLOC(struct http_decoder_exdata, 1);
ex_data->mempool = nmx_create_pool(mempool_size);
- ex_data->decoder = http_decoder_new(ex_data->mempool, http_event_handler, decompress_switch);
+ ex_data->decoder = http_decoder_new(ex_data->mempool, http_event_handler, decompress_switch, httpd_env);
ex_data->queue = http_decoder_result_queue_new(ex_data->mempool, queue_size);
return ex_data;
}
@@ -277,26 +313,27 @@ static int http_protocol_identify(const char *data, size_t data_len)
return 0;
}
-static void _http_decoder_context_free(struct http_decoder_context *ctx)
+static void _http_decoder_context_free(struct http_decoder_env *env)
{
- if (NULL == ctx)
+ if (NULL == env)
{
return;
}
-
- if (ctx->fse != NULL)
+ if (env->hd_stat.fse != NULL)
{
- fieldstat_easy_free(ctx->fse);
- ctx->fse = NULL;
+ fieldstat_easy_free(env->hd_stat.fse);
+ env->hd_stat.fse = NULL;
}
- if (ctx->httpd_msg_topic_id >= 0)
+ http_decoder_stat_free(&env->hd_stat);
+
+ if (env->httpd_msg_topic_id >= 0)
{
- stellar_session_mq_destroy_topic(ctx->st, ctx->httpd_msg_topic_id);
- ctx->httpd_msg_topic_id = -1;
+ stellar_session_mq_destroy_topic(env->st, env->httpd_msg_topic_id);
+ env->httpd_msg_topic_id = -1;
}
- FREE(ctx);
+ FREE(env);
}
static int load_http_decoder_config(const char *cfg_path,
@@ -444,8 +481,7 @@ extern "C"
{
#endif
- void _httpd_ex_data_free_cb(struct session *s, int idx,
- void *ex_data, void *arg)
+ void _httpd_ex_data_free_cb(struct session *s, int idx, void *ex_data, void *arg)
{
if (NULL == ex_data)
{
@@ -459,40 +495,63 @@ extern "C"
{
// If not http, ignore this session
size_t payload_len;
- const struct http_decoder_context *httpd_env = (struct http_decoder_context *)plugin_env;
+ struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env;
const char *payload = session_get0_current_payload(sess, &payload_len);
if (payload != NULL && payload_len > 0)
{
- size_t http_identify_len = payload_len > HTTP_IDENTIFY_LEN
- ? HTTP_IDENTIFY_LEN
- : payload_len;
-
+ size_t http_identify_len = payload_len > HTTP_IDENTIFY_LEN ? HTTP_IDENTIFY_LEN : payload_len;
int ret = http_protocol_identify(payload, http_identify_len);
if (ret < 0)
{
stellar_session_plugin_dettach_current_session(sess);
- return NULL;
+ return (void *)"not_http_session";
}
}
struct http_decoder_exdata *ex_data = http_decoder_exdata_new(httpd_env->hd_cfg.mempool_size,
httpd_env->hd_cfg.result_queue_len,
- httpd_env->hd_cfg.decompress_switch);
+ httpd_env->hd_cfg.decompress_switch,
+ httpd_env);
session_exdata_set(sess, httpd_env->ex_data_idx, ex_data);
+ int thread_id = stellar_get_current_thread_id(httpd_env->st);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_SESSION_NEW, 1);
+
return (void *)"fake_http_decoder_ctx"; // http decoder not use ctx, use exdata only!
}
void _httpd_session_ctx_free_cb(struct session *sess, void *session_ctx, void *plugin_env)
{
- // done in _httpd_ex_data_free_cb()
+ if(NULL == plugin_env || NULL == session_ctx){
+ return;
+ }
+ if(strncmp((const char *)session_ctx, "not_http_session", strlen("not_http_session")) == 0){
+ return;
+ }
+ struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env;
+ int thread_id = stellar_get_current_thread_id(httpd_env->st);
+ unsigned char flow_flag = 0;
+ session_is_symmetric(sess, &flow_flag);
+
+ if(SESSION_SEEN_C2S_FLOW == flow_flag){
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_ASYMMETRY_SESSION_C2S, 1);
+ }else if(SESSION_SEEN_S2C_FLOW == flow_flag){
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_ASYMMETRY_SESSION_S2C, 1);
+ }else{
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_SESSION_FREE, 1);
+ }
}
void http_decoder_tcp_stream_msg_cb(struct session *sess, int topic_id, const void *msg, void *no_use_ctx, void *plugin_env)
{
- struct http_decoder_context *httpd_env = (struct http_decoder_context *)plugin_env;
+ struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env;
struct http_decoder_exdata *ex_data = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->ex_data_idx);
size_t payload_len;
+ if(SESSION_STATE_CLOSING == session_get_current_state(sess)){
+ http_half_pre_context_free(sess, httpd_env, ex_data);
+ return;
+ }
+
const char *payload = session_get0_current_payload(sess, &payload_len);
if (unlikely(0 == payload_len || NULL == ex_data))
{
@@ -505,10 +564,14 @@ extern "C"
if (PACKET_DIRECTION_C2S == packet_get_direction(session_get0_current_packet(sess)))
{
cur_half = ex_data->decoder->c2s_half;
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_BYTES_C2S, payload_len);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TCP_SEG_C2S, 1);
}
else
{
cur_half = ex_data->decoder->s2c_half;
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_BYTES_S2C, payload_len);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TCP_SEG_S2C, 1);
}
http_decoder_half_reinit(cur_half, httpd_env->httpd_msg_topic_id, ex_data->queue,
@@ -516,14 +579,9 @@ extern "C"
int ret = http_decoder_half_parse(cur_half, payload, payload_len);
if (ret < 0)
{
- _th_stat.err_pkts += 1;
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_PARSE_ERR, 1);
+ stellar_session_plugin_dettach_current_session(sess);
}
- _th_stat.incoming_bytes += payload_len;
- _th_stat.incoming_pkts += 1;
- _th_stat.incoming_trans += http_decoder_half_trans_count(cur_half);
- _th_stat.counter++;
-
- http_decoder_stat_output(httpd_env, thread_id);
return;
}
@@ -532,7 +590,7 @@ extern "C"
int httpd_msg_topic_id = -1, tcp_stream_topic_id = -1;
int thread_num = 0;
- struct http_decoder_context *httpd_env = CALLOC(struct http_decoder_context, 1);
+ struct http_decoder_env *httpd_env = CALLOC(struct http_decoder_env, 1);
int ret = load_http_decoder_config(HTTPD_CFG_FILE, &httpd_env->hd_cfg);
if (ret < 0)
{
@@ -563,7 +621,8 @@ extern "C"
thread_num = stellar_get_worker_thread_num(st);
assert(thread_num >= 1);
- if (http_decoder_stat_init(httpd_env, thread_num) < 0)
+ if (http_decoder_stat_init( &httpd_env->hd_stat, thread_num,
+ httpd_env->hd_cfg.stat_interval_pkts, httpd_env->hd_cfg.stat_output_interval) < 0)
{
goto failed;
}
@@ -584,7 +643,7 @@ extern "C"
{
return;
}
- struct http_decoder_context *httpd_env = (struct http_decoder_context *)plugin_env;
+ struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env;
_http_decoder_context_free(httpd_env);
}