diff options
Diffstat (limited to 'src/http_decoder.cpp')
| -rw-r--r-- | src/http_decoder.cpp | 762 |
1 files changed, 762 insertions, 0 deletions
diff --git a/src/http_decoder.cpp b/src/http_decoder.cpp new file mode 100644 index 0000000..f156105 --- /dev/null +++ b/src/http_decoder.cpp @@ -0,0 +1,762 @@ +/* +********************************************************************************************** +* File: http_decoder.c +* Description: +* Authors: Liu WenTan <[email protected]> +* Date: 2024-01-10 +* Copyright: (c) Since 2022 Geedge Networks, Ltd. All rights reserved. +*********************************************************************************************** +*/ +#include <assert.h> +#include <stdio.h> +#include <unistd.h> +#include "http_decoder_inc.h" + +__thread struct http_decoder_stat _th_stat; + +struct http_message *http_message_new(enum http_message_type type, + struct http_decoder_result_queue *queue, + int queue_index) +{ + struct http_message *msg = CALLOC(struct http_message, 1); + msg->type = type; + msg->ref_queue = queue; + msg->queue_index = queue_index; + return msg; +} + +static void http_message_free(void *http_msg, void *cb_arg) +{ + if (http_msg) + { + FREE(http_msg); + } +} + +static void http_event_handler(enum http_event event, + struct http_decoder_half_data **data, + struct http_event_context *ev_ctx) +{ + assert(ev_ctx); + + size_t queue_idx = 0; + nmx_pool_t *mempool = ev_ctx->ref_mempool; + struct http_decoder_result_queue *queue = ev_ctx->ref_queue; + struct http_message *msg = NULL; + struct http_decoder_half_data *half_data = NULL; + int ret = 0; + + switch (event) + { + case HTTP_EVENT_REQ_INIT: + half_data = http_decoder_result_queue_peek_req(queue); + if (half_data != NULL) + { + http_decoder_result_queue_inc_req_index(queue); + } + + half_data = http_decoder_result_queue_peek_req(queue); + if (half_data != NULL) + { + half_data = http_decoder_result_queue_pop_req(queue); + http_decoder_half_data_free(mempool, half_data); + half_data = NULL; + } + + half_data = http_decoder_half_data_new(mempool); + ret = http_decoder_result_queue_push_req(queue, half_data); + if (ret < 0) + { + fprintf(stderr, "http_decoder_result_queue_push req failed."); + http_decoder_half_data_free(mempool, half_data); + half_data = NULL; + } + *data = half_data; + break; + case HTTP_EVENT_REQ_LINE: + queue_idx = http_decoder_result_queue_req_index(queue); + msg = http_message_new(HTTP_MESSAGE_REQ_LINE, queue, queue_idx); + session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + break; + case HTTP_EVENT_REQ_HDR_END: + { + int build_url_final = http_decoder_join_url_finally(ev_ctx, http_decoder_result_queue_peek_req(queue), mempool); + ret = http_decoder_half_data_has_parsed_header(*data); + if (0 == ret && 0 == build_url_final) + { + break; + } + queue_idx = http_decoder_result_queue_req_index(queue); + msg = http_message_new(HTTP_MESSAGE_REQ_HEADER, queue, queue_idx); + session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + } + break; + case HTTP_EVENT_REQ_BODY_BEGIN: + break; + case HTTP_EVENT_REQ_BODY_DATA: + queue_idx = http_decoder_result_queue_req_index(queue); + msg = http_message_new(HTTP_MESSAGE_REQ_BODY, queue, queue_idx); + session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + break; + case HTTP_EVENT_REQ_BODY_END: + break; + case HTTP_EVENT_REQ_END: + http_decoder_result_queue_inc_req_index(queue); + half_data = http_decoder_result_queue_pop_req(queue); + if (half_data != NULL) + { + http_decoder_half_data_free(mempool, half_data); + half_data = NULL; + } + break; + case HTTP_EVENT_RES_INIT: + half_data = http_decoder_result_queue_peek_res(queue); + if (half_data != NULL) + { + http_decoder_result_queue_inc_res_index(queue); + } + + half_data = http_decoder_result_queue_peek_res(queue); + if (half_data != NULL) + { + half_data = http_decoder_result_queue_pop_res(queue); + http_decoder_half_data_free(mempool, half_data); + half_data = NULL; + } + + half_data = http_decoder_half_data_new(mempool); + ret = http_decoder_result_queue_push_res(queue, half_data); + if (ret < 0) + { + fprintf(stderr, "http_decoder_result_queue_push res failed."); + http_decoder_half_data_free(mempool, half_data); + half_data = NULL; + } + *data = half_data; + break; + case HTTP_EVENT_RES_LINE: + queue_idx = http_decoder_result_queue_res_index(queue); + msg = http_message_new(HTTP_MESSAGE_RES_LINE, queue, queue_idx); + session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + break; + case HTTP_EVENT_RES_HDR: + break; + case HTTP_EVENT_RES_HDR_END: + ret = http_decoder_half_data_has_parsed_header(*data); + if (0 == ret) + { + break; + } + + queue_idx = http_decoder_result_queue_res_index(queue); + msg = http_message_new(HTTP_MESSAGE_RES_HEADER, queue, queue_idx); + session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + break; + case HTTP_EVENT_RES_BODY_BEGIN: + break; + case HTTP_EVENT_RES_BODY_DATA: + queue_idx = http_decoder_result_queue_res_index(queue); + msg = http_message_new(HTTP_MESSAGE_RES_BODY, queue, queue_idx); + session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + break; + case HTTP_EVENT_RES_BODY_END: + break; + case HTTP_EVENT_RES_END: + http_decoder_result_queue_inc_res_index(queue); + half_data = http_decoder_result_queue_pop_res(queue); + if (half_data != NULL) + { + http_decoder_half_data_free(mempool, half_data); + half_data = NULL; + } + break; + default: + assert(0); + break; + } +} + +static struct http_decoder * +http_decoder_new(nmx_pool_t *mempool, http_event_cb *ev_cb, + int decompress_switch) +{ + struct http_decoder *decoder = MEMPOOL_CALLOC(mempool, struct http_decoder, 1); + assert(decoder); + + decoder->c2s_half = http_decoder_half_new(mempool, ev_cb, HTTP_REQUEST, + decompress_switch); + decoder->s2c_half = http_decoder_half_new(mempool, ev_cb, HTTP_RESPONSE, + decompress_switch); + return decoder; +} + +static void +http_decoder_free(nmx_pool_t *mempool, struct http_decoder *decoder) +{ + if (NULL == decoder) + { + return; + } + if (decoder->c2s_half != NULL) + { + http_decoder_half_free(mempool, decoder->c2s_half); + decoder->c2s_half = NULL; + } + if (decoder->s2c_half != NULL) + { + http_decoder_half_free(mempool, decoder->s2c_half); + decoder->s2c_half = NULL; + } + MEMPOOL_FREE(mempool, decoder); +} + +static struct http_decoder_exdata * +http_decoder_exdata_new(size_t mempool_size, size_t queue_size, + int decompress_switch) +{ + struct http_decoder_exdata *ex_data = CALLOC(struct http_decoder_exdata, 1); + ex_data->mempool = nmx_create_pool(mempool_size); + ex_data->decoder = http_decoder_new(ex_data->mempool, http_event_handler, + decompress_switch); + ex_data->queue = http_decoder_result_queue_new(ex_data->mempool, queue_size); + + return ex_data; +} + +static void http_decoder_exdata_free(struct http_decoder_exdata *ex_data) +{ + if (unlikely(NULL == ex_data)) + { + return; + } + if (ex_data->decoder != NULL) + { + http_decoder_free(ex_data->mempool, ex_data->decoder); + ex_data->decoder = NULL; + } + if (ex_data->queue != NULL) + { + http_decoder_result_queue_free(ex_data->mempool, ex_data->queue); + ex_data->queue = NULL; + } + + nmx_destroy_pool(ex_data->mempool); + FREE(ex_data); +} + +static int http_protocol_identify(const char *data, size_t data_len) +{ + llhttp_t parser; + llhttp_settings_t settings; + enum llhttp_errno error; + + llhttp_settings_init(&settings); + llhttp_init(&parser, HTTP_BOTH, &settings); + + error = llhttp_execute(&parser, data, data_len); + if (error != HPE_OK) + { + return -1; + } + return 0; +} + +static void _http_decoder_context_free(struct http_decoder_context *ctx) +{ + if (NULL == ctx) + { + return; + } + + if (ctx->fse != NULL) + { + fieldstat_easy_free(ctx->fse); + ctx->fse = NULL; + } + + if (ctx->httpd_msg_topic_id >= 0) + { + stellar_session_mq_destroy_topic(ctx->st, ctx->httpd_msg_topic_id); + ctx->httpd_msg_topic_id = -1; + } + + FREE(ctx); +} + +static int load_http_decoder_config(const char *cfg_path, + struct http_decoder_config *hd_cfg) +{ + FILE *fp = fopen(cfg_path, "r"); + if (NULL == fp) + { + fprintf(stderr, "[%s:%d]Can't open config file:%s", + __FUNCTION__, __LINE__, cfg_path); + return -1; + } + + int ret = 0; + char errbuf[256] = {0}; + + toml_table_t *root = toml_parse_file(fp, errbuf, sizeof(errbuf)); + fclose(fp); + + toml_table_t *basic_sec_tbl = toml_table_in(root, "basic"); + if (NULL == basic_sec_tbl) + { + fprintf(stderr, "[%s:%d]config file:%s has no key: [basic]", + __FUNCTION__, __LINE__, cfg_path); + toml_free(root); + return -1; + } + + toml_datum_t int_val = toml_int_in(basic_sec_tbl, "decompress"); + if (int_val.ok != 0) + { + hd_cfg->decompress_switch = int_val.u.b; + } + + int_val = toml_int_in(basic_sec_tbl, "mempool_size"); + if (int_val.ok != 0) + { + hd_cfg->mempool_size = int_val.u.i; + } + else + { + hd_cfg->mempool_size = DEFAULT_MEMPOOL_SIZE; + } + + int_val = toml_int_in(basic_sec_tbl, "result_queue_len"); + if (int_val.ok != 0) + { + hd_cfg->result_queue_len = int_val.u.i; + } + else + { + hd_cfg->result_queue_len = HD_RESULT_QUEUE_LEN; + } + + int_val = toml_int_in(basic_sec_tbl, "stat_interval_pkts"); + if (int_val.ok != 0) + { + hd_cfg->stat_interval_pkts = int_val.u.i; + } + else + { + hd_cfg->stat_interval_pkts = DEFAULT_STAT_INTERVAL_PKTS; + } + + int_val = toml_int_in(basic_sec_tbl, "stat_output_interval"); + if (int_val.ok != 0) + { + hd_cfg->stat_output_interval = int_val.u.i; + } + else + { + hd_cfg->stat_output_interval = DEFAULT_STAT_OUTPUT_INTERVAL; + } + + toml_free(root); + return ret; +} +#ifdef __cplusplus +extern "C" +{ +#endif + + void _httpd_ex_data_free_cb(struct session *s, int idx, + void *ex_data, void *arg) + { + if (NULL == ex_data) + { + return; + } + struct http_decoder_exdata *exdata = (struct http_decoder_exdata *)ex_data; + http_decoder_exdata_free(exdata); + } + + void *_httpd_session_ctx_new_cb(struct session *sess, void *plugin_env) + { + // If not http, ignore this session + size_t payload_len; + const struct http_decoder_context *httpd_env = (struct http_decoder_context *)plugin_env; + const char *payload = session_get0_current_payload(sess, &payload_len); + if (payload != NULL && payload_len > 0) + { + size_t http_identify_len = payload_len > HTTP_IDENTIFY_LEN + ? HTTP_IDENTIFY_LEN + : payload_len; + + int ret = http_protocol_identify(payload, http_identify_len); + if (ret < 0) + { + stellar_session_plugin_dettach_current_session(sess); + return NULL; + } + } + + struct http_decoder_exdata *ex_data = http_decoder_exdata_new(httpd_env->hd_cfg.mempool_size, + httpd_env->hd_cfg.result_queue_len, + httpd_env->hd_cfg.decompress_switch); + session_exdata_set(sess, httpd_env->ex_data_idx, ex_data); + return (void *)"fake_http_decoder_ctx"; // http decoder not use ctx, use exdata only! + } + + void _httpd_session_ctx_free_cb(struct session *sess, void *session_ctx, void *plugin_env) + { + // done in _httpd_ex_data_free_cb() + } + + void http_decoder_tcp_stream_msg_cb(struct session *sess, int topic_id, const void *msg, void *no_use_ctx, void *plugin_env) + { + struct http_decoder_context *httpd_env = (struct http_decoder_context *)plugin_env; + struct http_decoder_exdata *ex_data = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->ex_data_idx); + size_t payload_len; + + const char *payload = session_get0_current_payload(sess, &payload_len); + if (unlikely(0 == payload_len || NULL == ex_data)) + { + return; + } + + int thread_id = stellar_get_current_thread_id(httpd_env->st); + struct http_decoder_half *cur_half = NULL; + + if (PACKET_DIRECTION_C2S == packet_get_direction(session_get0_current_packet(sess))) + { + cur_half = ex_data->decoder->c2s_half; + } + else + { + cur_half = ex_data->decoder->s2c_half; + } + + http_decoder_half_reinit(cur_half, httpd_env->httpd_msg_topic_id, ex_data->queue, + ex_data->mempool, sess); + int ret = http_decoder_half_parse(cur_half, payload, payload_len); + if (ret < 0) + { + _th_stat.err_pkts += 1; + } + _th_stat.incoming_bytes += payload_len; + _th_stat.incoming_pkts += 1; + _th_stat.incoming_trans += http_decoder_half_trans_count(cur_half); + _th_stat.counter++; + + http_decoder_stat_output(httpd_env, thread_id); + return; + } + + void *http_decoder_init(struct stellar *st) + { + int httpd_msg_topic_id = -1, tcp_stream_topic_id = -1; + int thread_num = 0; + + struct http_decoder_context *httpd_env = CALLOC(struct http_decoder_context, 1); + int ret = load_http_decoder_config(HTTPD_CFG_FILE, &httpd_env->hd_cfg); + if (ret < 0) + { + goto failed; + } + httpd_env->st = st; + httpd_env->ex_data_idx = stellar_session_exdata_new_index(st, "HTTP_DECODER", + _httpd_ex_data_free_cb, + NULL); + httpd_env->plugin_id = stellar_session_plugin_register(st, _httpd_session_ctx_new_cb, + _httpd_session_ctx_free_cb, (void *)httpd_env); + if (httpd_env->plugin_id < 0) + { + goto failed; + } + + httpd_msg_topic_id = stellar_session_mq_get_topic_id(st, HTTP_DECODER_TOPIC); + if (httpd_msg_topic_id < 0) + { + httpd_msg_topic_id = stellar_session_mq_create_topic(st, HTTP_DECODER_TOPIC, + http_message_free, NULL); + } + httpd_env->httpd_msg_topic_id = httpd_msg_topic_id; + + tcp_stream_topic_id = stellar_session_mq_get_topic_id(st, TOPIC_TCP_STREAM); + assert(tcp_stream_topic_id >= 0); + stellar_session_mq_subscribe(st, tcp_stream_topic_id, http_decoder_tcp_stream_msg_cb, httpd_env->plugin_id); + + thread_num = stellar_get_worker_thread_num(st); + assert(thread_num >= 1); + if (http_decoder_stat_init(httpd_env, thread_num) < 0) + { + goto failed; + } + + printf("http_decoder_init succ: ex_data_idx:%d, plugin_id:%d, topic_id:%d\n", + httpd_env->ex_data_idx, httpd_env->plugin_id, httpd_env->httpd_msg_topic_id); + return httpd_env; + + failed: + fprintf(stderr, "http_decoder_init fail!\n"); + _http_decoder_context_free(httpd_env); + return NULL; + } + + void http_decoder_exit(void *plugin_env) + { + if (NULL == plugin_env) + { + return; + } + struct http_decoder_context *httpd_env = (struct http_decoder_context *)plugin_env; + _http_decoder_context_free(httpd_env); + } + + enum http_message_type http_message_type(struct http_message *msg) + { + if (unlikely(NULL == msg)) + { + return HTTP_MESSAGE_MAX; + } + return msg->type; + } + + int http_message_get_request_line(struct http_message *msg, + struct http_request_line *line) + { + if (unlikely(NULL == msg || msg->type != HTTP_MESSAGE_REQ_LINE || NULL == line)) + { + return -1; + } + assert(msg->ref_queue); + assert(msg->queue_index < HD_RESULT_QUEUE_LEN); + + struct http_decoder_half_data *req_data = + msg->ref_queue->array[msg->queue_index].req_data; + + return http_decoder_half_data_get_request_line(req_data, line); + } + + int http_message_get_response_line(struct http_message *msg, + struct http_response_line *line) + { + if (unlikely(NULL == msg || msg->type != HTTP_MESSAGE_RES_LINE || NULL == line)) + { + return -1; + } + assert(msg->ref_queue); + assert(msg->queue_index < HD_RESULT_QUEUE_LEN); + + struct http_decoder_half_data *res_data = + msg->ref_queue->array[msg->queue_index].res_data; + + return http_decoder_half_data_get_response_line(res_data, line); + } + + static int http_msg_get_request_header(struct http_message *msg, struct hstring *key, + struct http_header *hdr_result) + { + struct http_decoder_half_data *req_data = + msg->ref_queue->array[msg->queue_index].req_data; + return http_decoder_half_data_get_header(req_data, key, hdr_result); + } + + static int http_msg_get_response_header(struct http_message *msg, struct hstring *key, + struct http_header *hdr_result) + { + struct http_decoder_half_data *res_data = + msg->ref_queue->array[msg->queue_index].res_data; + return http_decoder_half_data_get_header(res_data, key, hdr_result); + } + + int http_message_get_header(struct http_message *msg, struct hstring *key, + struct http_header *hdr_result) + { + if (unlikely(NULL == msg || NULL == key || NULL == hdr_result)) + { + return -1; + } + assert(msg->ref_queue); + assert(msg->queue_index < HD_RESULT_QUEUE_LEN); + if (HTTP_MESSAGE_REQ_HEADER == msg->type) + { + return http_msg_get_request_header(msg, key, hdr_result); + } + else if (HTTP_MESSAGE_RES_HEADER == msg->type) + { + return http_msg_get_response_header(msg, key, hdr_result); + } + + return -1; + } + + static int http_msg_request_header_next(struct http_message *msg, + struct http_header *hdr) + { + struct http_decoder_half_data *req_data = + msg->ref_queue->array[msg->queue_index].req_data; + return http_decoder_half_data_iter_header(req_data, hdr); + } + + static int http_msg_response_header_next(struct http_message *msg, + struct http_header *hdr) + { + struct http_decoder_half_data *res_data = + msg->ref_queue->array[msg->queue_index].res_data; + return http_decoder_half_data_iter_header(res_data, hdr); + } + + int http_message_header_next(struct http_message *msg, + struct http_header *header) + { + if (unlikely(NULL == msg || NULL == header)) + { + return -1; + } + assert(msg->ref_queue); + assert(msg->queue_index < HD_RESULT_QUEUE_LEN); + if (HTTP_MESSAGE_REQ_HEADER == msg->type) + { + return http_msg_request_header_next(msg, header); + } + else if (HTTP_MESSAGE_RES_HEADER == msg->type) + { + return http_msg_response_header_next(msg, header); + } + + return -1; + } + + int http_message_reset_header_iter(struct http_message *msg) + { + if (unlikely(NULL == msg)) + { + return -1; + } + assert(msg->ref_queue); + assert(msg->queue_index < HD_RESULT_QUEUE_LEN); + if (HTTP_MESSAGE_REQ_HEADER == msg->type) + { + struct http_decoder_half_data *req_data = + msg->ref_queue->array[msg->queue_index].req_data; + return http_decoder_half_data_reset_header_iter(req_data); + } + else if (HTTP_MESSAGE_RES_HEADER == msg->type) + { + struct http_decoder_half_data *res_data = + msg->ref_queue->array[msg->queue_index].res_data; + return http_decoder_half_data_reset_header_iter(res_data); + } + + return -1; + } + + static int http_msg_get_request_raw_body(struct http_message *msg, + struct hstring *body) + { + struct http_decoder_half_data *req_data = + msg->ref_queue->array[msg->queue_index].req_data; + return http_decoder_half_data_get_raw_body(req_data, body); + } + + static int http_msg_get_response_raw_body(struct http_message *msg, + struct hstring *body) + { + struct http_decoder_half_data *res_data = + msg->ref_queue->array[msg->queue_index].res_data; + return http_decoder_half_data_get_raw_body(res_data, body); + } + + int http_message_get_raw_body(struct http_message *msg, + struct hstring *body) + { + if (unlikely(NULL == msg || NULL == body)) + { + return -1; + } + assert(msg->ref_queue); + assert(msg->queue_index < HD_RESULT_QUEUE_LEN); + + if (HTTP_MESSAGE_REQ_BODY == msg->type) + { + return http_msg_get_request_raw_body(msg, body); + } + else if (HTTP_MESSAGE_RES_BODY == msg->type) + { + return http_msg_get_response_raw_body(msg, body); + } + + return -1; + } + + int http_msg_get_request_decompress_body(struct http_message *msg, + struct hstring *body) + { + struct http_decoder_half_data *req_data = + msg->ref_queue->array[msg->queue_index].req_data; + return http_decoder_half_data_get_decompress_body(req_data, body); + } + + int http_msg_get_response_decompress_body(struct http_message *msg, + struct hstring *body) + { + struct http_decoder_half_data *res_data = + msg->ref_queue->array[msg->queue_index].res_data; + return http_decoder_half_data_get_decompress_body(res_data, body); + } + + int http_message_get_decompress_body(struct http_message *msg, + struct hstring *body) + { + if (unlikely(NULL == msg || NULL == body)) + { + return -1; + } + assert(msg->ref_queue); + assert(msg->queue_index < HD_RESULT_QUEUE_LEN); + + if (HTTP_MESSAGE_REQ_BODY == msg->type) + { + return http_msg_get_request_decompress_body(msg, body); + } + else if (HTTP_MESSAGE_RES_BODY == msg->type) + { + return http_msg_get_response_decompress_body(msg, body); + } + + return -1; + } + + int http_message_get_url(struct http_message *msg, struct hstring *url) + { + if (unlikely(NULL == msg || NULL == url)) + { + return -1; + } + + assert(msg->ref_queue); + assert(msg->queue_index < HD_RESULT_QUEUE_LEN); + + struct http_decoder_half_data *req_data = + msg->ref_queue->array[msg->queue_index].req_data; + + return http_half_data_get_url(req_data, url); + } + + int http_message_get_transaction_seq(struct http_message *msg) + { + if (unlikely(NULL == msg)) + { + return -1; + } + assert(msg->ref_queue); + assert(msg->queue_index < HD_RESULT_QUEUE_LEN); + struct http_decoder_half_data *hf_data; + if (HTTP_MESSAGE_REQ_LINE == msg->type || HTTP_MESSAGE_REQ_HEADER == msg->type || HTTP_MESSAGE_REQ_BODY == msg->type) + { + hf_data = msg->ref_queue->array[msg->queue_index].req_data; + } + else + { + hf_data = msg->ref_queue->array[msg->queue_index].res_data; + } + return http_half_data_get_transaction_seq(hf_data); + } +#ifdef __cplusplus +} +#endif
\ No newline at end of file |
