diff options
| author | liuwentan <[email protected]> | 2024-01-17 09:58:10 +0000 |
|---|---|---|
| committer | liuwentan <[email protected]> | 2024-01-17 09:58:10 +0000 |
| commit | 8880b41cccd5853c3f242f2e62d5762d8b98f8b3 (patch) | |
| tree | 99daeeedf4b550b00bdca8df367d5a5e41f2f2c9 /src/http_decoder/http_decoder.c | |
| parent | eedb1ccec5c17f2472e610df214803621eee5998 (diff) | |
[HTTP_DECODER]bugfix for multi-thread
Diffstat (limited to 'src/http_decoder/http_decoder.c')
| -rw-r--r-- | src/http_decoder/http_decoder.c | 78 |
1 files changed, 34 insertions, 44 deletions
diff --git a/src/http_decoder/http_decoder.c b/src/http_decoder/http_decoder.c index c282a2a..523253b 100644 --- a/src/http_decoder/http_decoder.c +++ b/src/http_decoder/http_decoder.c @@ -48,12 +48,6 @@ struct http_decoder_exdata { struct http_decoder *decoder; }; -struct http_event_context { - int topic_id; - struct session *ref_session; - struct http_decoder_exdata *ref_exdata; -}; - struct http_decoder_context { int plugin_id; int topic_id; @@ -63,17 +57,15 @@ struct http_decoder_context { int fs_incoming_trans_id; struct fieldstat_easy *fse; struct stellar *st; - struct http_event_context *http_ev_ctx; }; static void http_event_handler(enum http_event event, struct http_decoder_half_data **data, - void *http_ev_ctx) + struct http_event_context *ev_ctx) { - struct http_event_context *ctx = (struct http_event_context *)http_ev_ctx; - assert(ctx); + assert(ev_ctx); - struct http_decoder_result_queue *queue = ctx->ref_exdata->queue; + struct http_decoder_result_queue *queue = ev_ctx->ref_queue; struct http_message *msg = NULL; struct http_decoder_half_data *half_data = NULL; int ret = 0; @@ -95,7 +87,7 @@ static void http_event_handler(enum http_event event, half_data = http_decoder_half_data_new(); ret = http_decoder_result_queue_push_req(queue, half_data); if (ret < 0) { - fprintf(stderr, "http_decoder_result_queue_push failed."); + fprintf(stderr, "http_decoder_result_queue_push req failed."); http_decoder_half_data_free(half_data); half_data = NULL; } @@ -105,13 +97,13 @@ static void http_event_handler(enum http_event event, msg = CALLOC(struct http_message, 1); msg->type = HTTP_MESSAGE_REQ_LINE; msg->data = *data; - session_mq_publish_message(ctx->ref_session, ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); break; case HTTP_EVENT_REQ_HDR_END: msg = CALLOC(struct http_message, 1); msg->type = HTTP_MESSAGE_REQ_HEADER; msg->data = *data; - session_mq_publish_message(ctx->ref_session, ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); break; case HTTP_EVENT_REQ_BODY_BEGIN: break; @@ -119,7 +111,7 @@ static void http_event_handler(enum http_event event, msg = CALLOC(struct http_message, 1); msg->type = HTTP_MESSAGE_REQ_BODY; msg->data = *data; - session_mq_publish_message(ctx->ref_session, ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); break; case HTTP_EVENT_REQ_BODY_END: break; @@ -147,7 +139,7 @@ static void http_event_handler(enum http_event event, half_data = http_decoder_half_data_new(); ret = http_decoder_result_queue_push_res(queue, half_data); if (ret < 0) { - fprintf(stderr, "http_decoder_result_queue_push failed."); + fprintf(stderr, "http_decoder_result_queue_push res failed."); http_decoder_half_data_free(half_data); half_data = NULL; } @@ -157,7 +149,7 @@ static void http_event_handler(enum http_event event, msg = CALLOC(struct http_message, 1); msg->type = HTTP_MESSAGE_RES_LINE; msg->data = *data; - session_mq_publish_message(ctx->ref_session, ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); break; case HTTP_EVENT_RES_HDR: break; @@ -165,7 +157,7 @@ static void http_event_handler(enum http_event event, msg = CALLOC(struct http_message, 1); msg->type = HTTP_MESSAGE_RES_HEADER; msg->data = *data; - session_mq_publish_message(ctx->ref_session, ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); break; case HTTP_EVENT_RES_BODY_BEGIN: break; @@ -173,7 +165,7 @@ static void http_event_handler(enum http_event event, msg = CALLOC(struct http_message, 1); msg->type = HTTP_MESSAGE_RES_BODY; msg->data = *data; - session_mq_publish_message(ctx->ref_session, ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); break; case HTTP_EVENT_RES_BODY_END: break; @@ -271,7 +263,7 @@ static int http_protocol_identify(const char *data, size_t data_len) int http_decoder_entry(struct session *sess, int events, const struct packet *pkt, void *cb_arg) { - struct http_decoder_context *ctx = (struct http_decoder_context *)cb_arg; + struct http_decoder_context *decoder_ctx = (struct http_decoder_context *)cb_arg; size_t payload_len = 0; uint64_t inner_flag = 0; @@ -280,21 +272,19 @@ int http_decoder_entry(struct session *sess, int events, return 0; } - struct http_decoder_exdata *ex_data = session_get_ex_data(sess, ctx->ex_data_idx); + struct http_decoder_exdata *ex_data = + session_get_ex_data(sess, decoder_ctx->ex_data_idx); if (events & SESS_EV_CLOSING) { if (ex_data != NULL) { http_decoder_exdata_free(ex_data); - session_set_ex_data(sess, ctx->ex_data_idx, NULL); + session_set_ex_data(sess, decoder_ctx->ex_data_idx, NULL); } return 0; } const char *payload = session_get0_current_payload(sess, &payload_len); - // const char *addr_readable = session_get0_readable_addr(sess); - // enum session_addr_type addr_type; - // struct session_addr *addr = session_get0_addr(sess, &addr_type); if (events & SESS_EV_OPENING) { assert(ex_data == NULL); @@ -309,15 +299,16 @@ int http_decoder_entry(struct session *sess, int events, if (ret < 0) { // ignore this session's event struct session_event *s_event = - session_get_intrinsic_event(sess, ctx->plugin_id); + session_get_intrinsic_event(sess, decoder_ctx->plugin_id); - session_event_assign(s_event, ctx->st, sess, 0, http_decoder_entry, ctx); + session_event_assign(s_event, decoder_ctx->st, sess, 0, + http_decoder_entry, decoder_ctx); return 0; } } ex_data = http_decoder_exdata_new(HD_RESULT_QUEUE_SIZE); - session_set_ex_data(sess, ctx->ex_data_idx, ex_data); + session_set_ex_data(sess, decoder_ctx->ex_data_idx, ex_data); } if (0 == payload_len || NULL == ex_data) { @@ -329,7 +320,6 @@ int http_decoder_entry(struct session *sess, int events, return -1; } - // printf("session:%u dir:%d readable:%s\n", addr->ipv4.saddr, dir, addr_readable); struct http_decoder_half *cur_half = NULL; if (dir == PACKET_DIRECTION_C2S) { cur_half = ex_data->decoder->c2s_half; @@ -337,19 +327,22 @@ int http_decoder_entry(struct session *sess, int events, cur_half = ex_data->decoder->s2c_half; } - ctx->http_ev_ctx->topic_id = ctx->topic_id; - ctx->http_ev_ctx->ref_exdata = ex_data; - ctx->http_ev_ctx->ref_session = sess; + http_decoder_half_reinit(cur_half, decoder_ctx->topic_id, ex_data->queue, sess); + http_decoder_half_parse(cur_half, payload, payload_len); - http_decoder_half_parse(cur_half, ctx->http_ev_ctx, payload, payload_len); + int thread_id = session_get_current_thread_id(sess); - long long trans_cnt = http_decoder_half_trans_count(cur_half); - - fieldstat_easy_counter_incrby(ctx->fse, 0, ctx->fs_incoming_bytes_id, + fieldstat_easy_counter_incrby(decoder_ctx->fse, thread_id, + decoder_ctx->fs_incoming_bytes_id, NULL, 0, payload_len); - fieldstat_easy_counter_incrby(ctx->fse, 0, ctx->fs_incoming_pkts_id, + + fieldstat_easy_counter_incrby(decoder_ctx->fse, thread_id, + decoder_ctx->fs_incoming_pkts_id, NULL, 0, 1); - fieldstat_easy_counter_incrby(ctx->fse, 0, ctx->fs_incoming_trans_id, + + long long trans_cnt = http_decoder_half_trans_count(cur_half); + fieldstat_easy_counter_incrby(decoder_ctx->fse, thread_id, + decoder_ctx->fs_incoming_trans_id, NULL, 0, trans_cnt); return 0; @@ -373,10 +366,6 @@ static void _http_decoder_context_free(struct http_decoder_context *ctx) return; } - if (ctx->http_ev_ctx != NULL) { - FREE(ctx->http_ev_ctx); - } - if (ctx->fse != NULL) { fieldstat_easy_free(ctx->fse); ctx->fse = NULL; @@ -406,7 +395,6 @@ void *http_decoder_init(struct stellar *st) { struct http_decoder_context *ctx = CALLOC(struct http_decoder_context, 1); - ctx->http_ev_ctx = CALLOC(struct http_event_context, 1); ctx->st = st; ctx->ex_data_idx = stellar_session_get_ex_new_index(st, "HTTP_DECODER", http_decoder_ex_data_free, @@ -425,7 +413,9 @@ void *http_decoder_init(struct stellar *st) } ctx->topic_id = topic_id; - ctx->fse = fieldstat_easy_new(1, "http_decoder_statistics", NULL, 0); + int thread_num = stellar_get_worker_thread_num(st); + + ctx->fse = fieldstat_easy_new(thread_num, "http_decoder_statistics", NULL, 0); if (NULL == ctx->fse) { fprintf(stderr, "fieldstat_easy_new failed."); goto failed; |
