diff options
Diffstat (limited to 'decoders')
25 files changed, 2295 insertions, 4494 deletions
diff --git a/decoders/CMakeLists.txt b/decoders/CMakeLists.txt index 7946822..eaba19e 100644 --- a/decoders/CMakeLists.txt +++ b/decoders/CMakeLists.txt @@ -1,5 +1,5 @@ add_subdirectory(lpi_plus) -#add_subdirectory(http) +add_subdirectory(http) #add_subdirectory(socks) #add_subdirectory(stratum) #add_subdirectory(session_flags)
\ No newline at end of file diff --git a/decoders/http/CMakeLists.txt b/decoders/http/CMakeLists.txt index c242afe..3a2b637 100644 --- a/decoders/http/CMakeLists.txt +++ b/decoders/http/CMakeLists.txt @@ -1,15 +1,15 @@ +add_definitions(-fPIC) include_directories(${CMAKE_SOURCE_DIR}/deps) -set(HTTP_SRC http_decoder.c http_decoder_utils.c http_decoder_half.c - http_decoder_table.c http_decoder_string.c http_content_decompress.c - http_decoder_result_queue.c http_decoder_stat.c http_decoder_tunnel.c) +set(HTTP_SRC http_decoder_module.c http_decoder.c + http_decoder_llhttp_wrap.c + http_decoder_utils.c + http_decoder_decompress.c + http_decoder_half.c + http_decoder_stat.c ) -add_library(http STATIC ${HTTP_SRC}) -add_library(http_dyn SHARED ${HTTP_SRC}) +add_library(http ${HTTP_SRC}) set_target_properties(http PROPERTIES LINK_FLAGS "-Wl,--version-script=${CMAKE_CURRENT_SOURCE_DIR}/version.map") target_include_directories(http PUBLIC ${CMAKE_SOURCE_DIR}/deps/) -target_link_libraries(http z llhttp-static fieldstat4 brotli-dec-static brotli-common-static nmx_pool toml) -set_target_properties(http PROPERTIES PREFIX "") - -set_target_properties(http_dyn PROPERTIES PREFIX "") -target_link_libraries(http_dyn z llhttp-static fieldstat4 brotli-dec-static brotli-common-static nmx_pool toml)
\ No newline at end of file +target_link_libraries(http z llhttp-static fieldstat4 brotli-dec-static brotli-common-static toml) +set_target_properties(http PROPERTIES PREFIX "")
\ No newline at end of file diff --git a/decoders/http/http_content_decompress.c b/decoders/http/http_content_decompress.c deleted file mode 100644 index 922c624..0000000 --- a/decoders/http/http_content_decompress.c +++ /dev/null @@ -1,268 +0,0 @@ -#include <zlib.h> -#include <string.h> -#include <assert.h> -#include <brotli/decode.h> -#include "http_decoder_private.h" - -#define HTTP_DECOMPRESS_BUFFER_SIZE (4096) - -struct http_content_decompress -{ - enum http_content_encoding encoding; - z_stream *z_stream_ptr; - BrotliDecoderState *br_state; - char *buffer; - size_t buffer_size; -}; - -void http_content_decompress_ownership_borrow(struct http_content_decompress *decompress) -{ - decompress->buffer = NULL; // ownership move to data->decompress_buffer_list, will be freed when message has been processed by all plugins -} - -enum http_content_encoding http_content_encoding_str2int(const char *content_encoding, size_t encoding_str_len) -{ - if (http_strncasecmp_safe("gzip", content_encoding, 4, encoding_str_len) == 0) - { - return HTTP_CONTENT_ENCODING_GZIP; - } - if (http_strncasecmp_safe("deflate", content_encoding, 7, encoding_str_len) == 0) - { - return HTTP_CONTENT_ENCODING_DEFLATE; - } - if (http_strncasecmp_safe("br", content_encoding, 2, encoding_str_len) == 0) - { - return HTTP_CONTENT_ENCODING_BR; - } - return HTTP_CONTENT_ENCODING_NONE; -} - -const char *http_content_encoding_int2str(enum http_content_encoding content_encoding) -{ - if (content_encoding == HTTP_CONTENT_ENCODING_GZIP) - { - return "gzip"; - } - if (content_encoding == HTTP_CONTENT_ENCODING_DEFLATE) - { - return "deflate"; - } - if (content_encoding == HTTP_CONTENT_ENCODING_BR) - { - return "br"; - } - return "unknown"; -} - -struct http_content_decompress *http_content_decompress_create(enum http_content_encoding encoding) -{ - struct http_content_decompress *decompress = - CALLOC(struct http_content_decompress, 1); - assert(decompress); - - decompress->encoding = encoding; - decompress->z_stream_ptr = NULL; - decompress->br_state = NULL; - - if (encoding == HTTP_CONTENT_ENCODING_GZIP || encoding == HTTP_CONTENT_ENCODING_DEFLATE) - { - decompress->z_stream_ptr = CALLOC(z_stream, 1); - assert(decompress->z_stream_ptr); - - decompress->z_stream_ptr->zalloc = NULL; - decompress->z_stream_ptr->zfree = NULL; - decompress->z_stream_ptr->opaque = NULL; - decompress->z_stream_ptr->avail_in = 0; - decompress->z_stream_ptr->next_in = Z_NULL; - - if (encoding == HTTP_CONTENT_ENCODING_GZIP) - { - if (inflateInit2(decompress->z_stream_ptr, MAX_WBITS + 16) != Z_OK) - { - goto error; - } - } - if (encoding == HTTP_CONTENT_ENCODING_DEFLATE) - { - if (inflateInit2(decompress->z_stream_ptr, -MAX_WBITS) != Z_OK) - { - goto error; - } - } - } - - if (encoding == HTTP_CONTENT_ENCODING_BR) - { - decompress->br_state = BrotliDecoderCreateInstance(NULL, NULL, NULL); - if (decompress->br_state == NULL) - { - goto error; - } - } - return decompress; - -error: - http_content_decompress_destroy(decompress); - return NULL; -} - -void http_content_decompress_destroy(struct http_content_decompress *decompress) -{ - if (NULL == decompress) - { - return; - } - if (decompress->z_stream_ptr != NULL) - { - inflateEnd(decompress->z_stream_ptr); - FREE(decompress->z_stream_ptr); - } - if (decompress->br_state) - { - BrotliDecoderDestroyInstance(decompress->br_state); - decompress->br_state = NULL; - } - FREE(decompress->buffer); - FREE(decompress); -} - -static int http_content_decompress_write_zlib(struct http_content_decompress *decompress, - const char *indata, size_t indata_len, - char **outdata, size_t *outdata_len) -{ - z_stream *z_stream_ptr = decompress->z_stream_ptr; - z_stream_ptr->avail_in = (unsigned int)indata_len; - z_stream_ptr->next_in = (unsigned char *)indata; - z_stream_ptr->avail_out = (unsigned int)HTTP_DECOMPRESS_BUFFER_SIZE; - z_stream_ptr->next_out = (unsigned char *)decompress->buffer; - *outdata = NULL; - *outdata_len = 0; - size_t total_have = 0; - int no_buffer; - - do - { - int ret = inflate(z_stream_ptr, Z_NO_FLUSH); - if (ret == Z_STREAM_ERROR || ret == Z_NEED_DICT || ret == Z_DATA_ERROR || ret == Z_MEM_ERROR) - { - (void)inflateEnd(z_stream_ptr); - return -1; - } - size_t have = HTTP_DECOMPRESS_BUFFER_SIZE - z_stream_ptr->avail_out; - if (have > 0) - { - total_have += have; - if (0 == z_stream_ptr->avail_out) - { - decompress->buffer_size += HTTP_DECOMPRESS_BUFFER_SIZE; - decompress->buffer = REALLOC(char, decompress->buffer, decompress->buffer_size); - z_stream_ptr->avail_out = HTTP_DECOMPRESS_BUFFER_SIZE; - z_stream_ptr->next_out = (unsigned char *)decompress->buffer + total_have; - *outdata = decompress->buffer; - *outdata_len = total_have; - no_buffer = 1; - } - else - { - *outdata = decompress->buffer; - *outdata_len = total_have; - no_buffer = 0; - } - } - else - { - break; - } - if (Z_STREAM_END == ret) - { - break; - } - } while (no_buffer == 1); - return 0; -} - -static int http_content_decompress_write_br(struct http_content_decompress *decompress, - const char *indata, size_t indata_len, - char **outdata, size_t *outdata_len) -{ - size_t available_in = indata_len; - const unsigned char *next_in = (const unsigned char *)indata; - size_t available_out = HTTP_DECOMPRESS_BUFFER_SIZE; - unsigned char *next_out = (unsigned char *)decompress->buffer; - - *outdata = NULL; - *outdata_len = 0; - size_t total_have = 0; - int no_buffer; - - do - { - int ret = BrotliDecoderDecompressStream(decompress->br_state, &available_in, - &next_in, &available_out, &next_out, NULL); - if (ret == BROTLI_DECODER_RESULT_ERROR) - { - // BrotliDecoderErrorCode errcode = BrotliDecoderGetErrorCode(decompress->br_state); - *outdata = NULL; - *outdata_len = 0; - return -1; - } - size_t have = HTTP_DECOMPRESS_BUFFER_SIZE - available_out; - if (have > 0) - { - total_have += have; - if (0 == available_out) - { - decompress->buffer_size += HTTP_DECOMPRESS_BUFFER_SIZE; - decompress->buffer = REALLOC(char, decompress->buffer, decompress->buffer_size); - available_out = HTTP_DECOMPRESS_BUFFER_SIZE; - next_out = (unsigned char *)decompress->buffer + total_have; - *outdata = decompress->buffer; - *outdata_len = total_have; - no_buffer = 1; - } - else - { - *outdata = decompress->buffer; - *outdata_len = have; - no_buffer = 0; - } - } - else - { - break; - } - } while (no_buffer == 1); - return 0; -} - -int http_content_decompress_write(struct http_content_decompress *decompress, - const char *indata, size_t indata_len, - char **outdata, size_t *outdata_len) -{ - assert(decompress); - assert(indata); - assert(indata_len > 0); - assert(outdata); - assert(outdata_len); - *outdata = NULL; - *outdata_len = 0; - - if (NULL == decompress->buffer) - { - decompress->buffer = CALLOC(char, HTTP_DECOMPRESS_BUFFER_SIZE); - assert(decompress->buffer); - decompress->buffer_size = HTTP_DECOMPRESS_BUFFER_SIZE; - } - - if (decompress->encoding == HTTP_CONTENT_ENCODING_GZIP || - decompress->encoding == HTTP_CONTENT_ENCODING_DEFLATE) - { - return http_content_decompress_write_zlib(decompress, indata, indata_len, outdata, outdata_len); - } - if (decompress->encoding == HTTP_CONTENT_ENCODING_BR) - { - return http_content_decompress_write_br(decompress, indata, indata_len, outdata, outdata_len); - } - assert(0); - return -1; -}
\ No newline at end of file diff --git a/decoders/http/http_decoder.c b/decoders/http/http_decoder.c index 10f8369..39406a3 100644 --- a/decoders/http/http_decoder.c +++ b/decoders/http/http_decoder.c @@ -1,1205 +1,184 @@ -#include "stellar/http.h" #include <assert.h> #include <stdio.h> #include <string.h> #include <unistd.h> -#include "http_decoder_private.h" - -#pragma GCC diagnostic ignored "-Wunused-parameter" - -struct http_message *http_message_new(enum http_message_type type, struct http_decoder_result_queue *queue, - int queue_index, uint8_t flow_type) -{ - struct http_message *msg = CALLOC(struct http_message, 1); - msg->type = type; - msg->ref_queue = queue; - msg->queue_index = queue_index; - msg->flow_type = flow_type; - return msg; -} - -struct http_message *http_body_message_new(enum http_message_type type, struct http_decoder_result_queue *queue, - int queue_index, uint8_t flow_type, hstring *raw_payload, hstring *decompress_payload) -{ - struct http_message *msg = CALLOC(struct http_message, 1); - msg->type = type; - msg->ref_queue = queue; - msg->queue_index = queue_index; - msg->flow_type = flow_type; - if (raw_payload) - { - msg->raw_payload.iov_base = raw_payload->iov_base; - msg->raw_payload.iov_len = raw_payload->iov_len; - } - if (decompress_payload) - { - msg->decompress_payload.iov_base = decompress_payload->iov_base; - msg->decompress_payload.iov_len = decompress_payload->iov_len; - } - return msg; -} - -static void http_message_decompress_buffer_free(struct http_message *msg) -{ - struct http_decoder_half_data *ref_data = NULL; - if (HTTP_MESSAGE_REQ_BODY_START == msg->type || HTTP_MESSAGE_REQ_BODY == msg->type || HTTP_MESSAGE_REQ_BODY_END == msg->type) - { - ref_data = msg->ref_queue->array[msg->queue_index].req_data; - } - else if (HTTP_MESSAGE_RES_BODY_START == msg->type || HTTP_MESSAGE_RES_BODY == msg->type || HTTP_MESSAGE_RES_BODY_END == msg->type) - { - ref_data = msg->ref_queue->array[msg->queue_index].res_data; - } - if (ref_data != NULL && msg->decompress_payload.iov_base != NULL) - { - http_half_decompress_buffer_free(ref_data, &msg->decompress_payload); - } -} - -static void http_message_free(void *http_msg, void *cb_arg) -{ - if (http_msg) - { - http_message_decompress_buffer_free((struct http_message *)http_msg); - FREE(http_msg); - } -} - -static void http_event_handler(enum http_event event, struct http_decoder_half_data **data, - struct http_event_context *ev_ctx, void *httpd_plugin_env) -{ - assert(ev_ctx); - assert(httpd_plugin_env); - struct http_decoder_env *httpd_env = (struct http_decoder_env *)httpd_plugin_env; - size_t queue_idx = 0; - nmx_pool_t *mempool = ev_ctx->ref_mempool; - struct http_decoder_result_queue *queue = ev_ctx->ref_queue; - struct http_message *msg = NULL; - struct http_decoder_half_data *half_data = NULL; - int ret = 0; - u_int8_t flow_flag = 0; - struct http_decoder_exdata *exdata = ev_ctx->ref_httpd_ctx; - - int thread_id = stellar_get_current_thread_index(); - - if (http_event_is_req(event)) - { - queue_idx = http_decoder_result_queue_req_index(queue); - half_data = http_decoder_result_queue_peek_req(queue); - } - else - { - queue_idx = http_decoder_result_queue_res_index(queue); - half_data = http_decoder_result_queue_peek_res(queue); - } - - switch (event) - { - case HTTP_EVENT_REQ_INIT: - half_data = http_decoder_result_queue_peek_req(queue); - if (half_data != NULL) - { - http_decoder_result_queue_inc_req_index(queue); - } - - half_data = http_decoder_result_queue_peek_req(queue); - if (half_data != NULL) - { - half_data = http_decoder_result_queue_pop_req(queue); - http_decoder_half_data_free(mempool, half_data); - half_data = NULL; - } - - half_data = http_decoder_half_data_new(mempool); - ret = http_decoder_result_queue_push_req(queue, half_data); - if (ret < 0) - { - fprintf(stderr, "http_decoder_result_queue_push req failed."); - http_decoder_half_data_free(mempool, half_data); - half_data = NULL; - } - *data = half_data; - queue_idx = http_decoder_result_queue_req_index(queue); // get the index after inc - /* llhttp always call on_message_begin() even if llhttp_execute() error!!! */ - msg = http_message_new(HTTP_TRANSACTION_START, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_TRANSACTION_NEW, 1); - break; - case HTTP_EVENT_REQ_LINE: - msg = http_message_new(HTTP_MESSAGE_REQ_LINE, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); - if (httpd_tunnel_identify(httpd_env, FLOW_TYPE_C2S, half_data)) - { - exdata->tunnel_state = HTTP_TUN_C2S_HDR_START; - // http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TUNNEL, 1); - } - if (httpd_is_tunnel_session(httpd_env, exdata)) - { - http_decoder_get_url(half_data, mempool); - } - break; - case HTTP_EVENT_REQ_HDR: - msg = http_message_new(HTTP_MESSAGE_REQ_HEADER, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); - break; - case HTTP_EVENT_REQ_HDR_END: - { - http_decoder_join_url_finally(ev_ctx, half_data, mempool); - /* maybe some parsed headers in buffer, but has not pushed to plugins yet */ - - if (http_decoder_half_data_has_parsed_header(half_data)) - { - msg = http_message_new(HTTP_MESSAGE_REQ_HEADER, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); - } - http_half_data_update_commit_index(half_data); - msg = http_message_new(HTTP_MESSAGE_REQ_HEADER_END, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); - - int tot_c2s_headers = http_half_data_get_total_parsed_header_count(half_data); - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_C2S_HEADERS, tot_c2s_headers); - - const char *tmp_url = NULL; - size_t tmp_url_len = 0; - http_half_data_get_url(half_data, &tmp_url, &tmp_url_len); - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_URL_BYTES, tmp_url_len); - } - break; - case HTTP_EVENT_REQ_BODY_BEGIN: - msg = http_message_new(HTTP_MESSAGE_REQ_BODY_START, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); - break; - case HTTP_EVENT_REQ_BODY_DATA: - { - hstring raw_body = {}; - hstring decompress_body = {}; - http_decoder_half_data_get_raw_body(half_data, (const char **)&raw_body.iov_base, &raw_body.iov_len); - http_half_get_lastest_decompress_buffer(half_data, &decompress_body); - msg = http_body_message_new(HTTP_MESSAGE_REQ_BODY, queue, queue_idx, HTTP_REQUEST, &raw_body, &decompress_body); - session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); - if (decompress_body.iov_base != NULL) - { - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_C2S_ZIP_BYTES, raw_body.iov_len); - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_C2S_UNZIP_BYTES, decompress_body.iov_len); - } - } - break; - case HTTP_EVENT_REQ_BODY_END: - msg = http_message_new(HTTP_MESSAGE_REQ_BODY_END, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); - break; - case HTTP_EVENT_REQ_END: - { - session_is_symmetric(ev_ctx->ref_session, &flow_flag); - - if (SESSION_SEEN_C2S_FLOW == flow_flag) - { - msg = http_message_new(HTTP_TRANSACTION_END, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_TRANSACTION_FREE, 1); - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_C2S_ASYMMETRY_TRANSACTION, 1); - } - if (httpd_is_tunnel_session(httpd_env, exdata)) - { - if (SESSION_SEEN_C2S_FLOW == flow_flag) - { - exdata->tunnel_state = HTTP_TUN_INNER_STARTING; - exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].sub_topic_id; - } - else - { - exdata->tunnel_state = HTTP_TUN_C2S_END; - } - } - http_half_update_state(half_data, event); - http_decoder_result_queue_inc_req_index(queue); - half_data = http_decoder_result_queue_pop_req(queue); - if (half_data != NULL) - { - http_decoder_half_data_free(mempool, half_data); - half_data = NULL; - } - } - break; - - case HTTP_EVENT_RES_INIT: - half_data = http_decoder_result_queue_peek_res(queue); - if (half_data != NULL) - { - http_decoder_result_queue_inc_res_index(queue); - } - - half_data = http_decoder_result_queue_peek_res(queue); - if (half_data != NULL) - { - half_data = http_decoder_result_queue_pop_res(queue); - http_decoder_half_data_free(mempool, half_data); - half_data = NULL; - } - - half_data = http_decoder_half_data_new(mempool); - ret = http_decoder_result_queue_push_res(queue, half_data); - if (ret < 0) - { - fprintf(stderr, "http_decoder_result_queue_push res failed."); - http_decoder_half_data_free(mempool, half_data); - half_data = NULL; - } - queue_idx = http_decoder_result_queue_res_index(queue); // get the index after inc - *data = half_data; - if (0 == session_is_symmetric(ev_ctx->ref_session, &flow_flag)) - { - if (SESSION_SEEN_S2C_FLOW == flow_flag) - { - msg = http_message_new(HTTP_TRANSACTION_START, queue, queue_idx, HTTP_RESPONSE); - session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); - } - } - break; - case HTTP_EVENT_RES_LINE: - msg = http_message_new(HTTP_MESSAGE_RES_LINE, queue, queue_idx, HTTP_RESPONSE); - session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); - if (httpd_tunnel_identify(httpd_env, FLOW_TYPE_S2C, half_data)) - { - exdata->tunnel_state = HTTP_TUN_S2C_START; - } - else - { - // connect response fail, reset tunnel_state - exdata->tunnel_state = HTTP_TUN_NON; - } - break; - case HTTP_EVENT_RES_HDR: - msg = http_message_new(HTTP_MESSAGE_RES_HEADER, queue, queue_idx, HTTP_RESPONSE); - session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); - break; - case HTTP_EVENT_RES_HDR_END: - { - /* maybe some header in table buffer but has not pushed to plugins */ - half_data = http_decoder_result_queue_peek_res(queue); - if (http_decoder_half_data_has_parsed_header(half_data)) - { - msg = http_message_new(HTTP_MESSAGE_RES_HEADER, queue, queue_idx, HTTP_RESPONSE); - session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); - } - http_half_data_update_commit_index(half_data); - msg = http_message_new(HTTP_MESSAGE_RES_HEADER_END, queue, queue_idx, HTTP_RESPONSE); - session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); - - int tot_s2c_headers = http_half_data_get_total_parsed_header_count(half_data); - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_S2C_HEADERS, tot_s2c_headers); - - if (httpd_is_tunnel_session(httpd_env, exdata)) - { - exdata->tunnel_state = HTTP_TUN_INNER_STARTING; - http_half_pre_context_free(ev_ctx->ref_session, exdata); - exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].sub_topic_id; - } - } - break; - case HTTP_EVENT_RES_BODY_BEGIN: - msg = http_message_new(HTTP_MESSAGE_RES_BODY_START, queue, queue_idx, HTTP_RESPONSE); - session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); - break; - case HTTP_EVENT_RES_BODY_DATA: - { - hstring raw_body = {}; - http_decoder_half_data_get_raw_body(half_data, (const char **)&raw_body.iov_base, &raw_body.iov_len); - hstring decompress_body = {}; - http_half_get_lastest_decompress_buffer(half_data, &decompress_body); - msg = http_body_message_new(HTTP_MESSAGE_RES_BODY, queue, queue_idx, HTTP_RESPONSE, &raw_body, &decompress_body); - session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); - if (decompress_body.iov_base != NULL) - { - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_S2C_ZIP_BYTES, raw_body.iov_len); - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_S2C_UNZIP_BYTES, decompress_body.iov_len); - } - } - break; - case HTTP_EVENT_RES_BODY_END: - msg = http_message_new(HTTP_MESSAGE_RES_BODY_END, queue, queue_idx, HTTP_RESPONSE); - session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); - break; - case HTTP_EVENT_RES_END: - msg = http_message_new(HTTP_TRANSACTION_END, queue, queue_idx, HTTP_RESPONSE); - session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_TRANSACTION_FREE, 1); - session_is_symmetric(ev_ctx->ref_session, &flow_flag); - if (SESSION_SEEN_S2C_FLOW == flow_flag) - { - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_S2C_ASYMMETRY_TRANSACTION, 1); - } - - http_half_update_state(half_data, event); - http_decoder_result_queue_inc_res_index(queue); - half_data = http_decoder_result_queue_pop_res(queue); - if (half_data != NULL) - { - http_decoder_half_data_free(mempool, half_data); - half_data = NULL; - } - break; - default: - assert(0); - break; - } - if (half_data) - { - http_half_update_state(half_data, event); - } -} +#include "stellar/utils.h" +#include "stellar/http.h" +#include "stellar/session.h" +#include "http_decoder.h" +#include "http_decoder_utils.h" +#include "http_decoder_stat.h" +#include "http_decoder_half.h" -static struct http_decoder *http_decoder_new(struct http_decoder_exdata *hd_ctx, nmx_pool_t *mempool, - http_event_cb *ev_cb, int decompress_switch, struct http_decoder_env *httpd_env, - long long req_start_seq, long long res_start_seq) +static struct http_decoder *http_decoder_new(void) { - struct http_decoder *decoder = MEMPOOL_CALLOC(mempool, struct http_decoder, 1); - assert(decoder); - decoder->c2s_half = http_decoder_half_new(hd_ctx, mempool, ev_cb, HTTP_REQUEST, decompress_switch, httpd_env, req_start_seq); - decoder->s2c_half = http_decoder_half_new(hd_ctx, mempool, ev_cb, HTTP_RESPONSE, decompress_switch, httpd_env, res_start_seq); + struct http_decoder *decoder = (struct http_decoder *)calloc(1, sizeof(struct http_decoder)); return decoder; } -static void http_decoder_free(nmx_pool_t *mempool, struct http_decoder *decoder) +static void http_decoder_free(struct http_decoder *decoder) { if (NULL == decoder) { return; } - if (decoder->c2s_half != NULL) + if (decoder->flow_c2s) { - http_decoder_half_free(mempool, decoder->c2s_half); - decoder->c2s_half = NULL; + http_half_free(decoder->flow_c2s); } - if (decoder->s2c_half != NULL) + if (decoder->flow_s2c) { - http_decoder_half_free(mempool, decoder->s2c_half); - decoder->s2c_half = NULL; + http_half_free(decoder->flow_s2c); } - MEMPOOL_FREE(mempool, decoder); -} - -static struct http_decoder_exdata *http_decoder_exdata_new(size_t mempool_size, size_t queue_size, - int decompress_switch, struct http_decoder_env *httpd_env, - long long req_start_seq, long long res_start_seq) -{ - struct http_decoder_exdata *hd_ctx = CALLOC(struct http_decoder_exdata, 1); - hd_ctx->mempool = nmx_create_pool(mempool_size); - hd_ctx->decoder = http_decoder_new(hd_ctx, hd_ctx->mempool, http_event_handler, decompress_switch, - httpd_env, req_start_seq, res_start_seq); - hd_ctx->queue = http_decoder_result_queue_new(hd_ctx->mempool, queue_size); - return hd_ctx; + free(decoder); } -static void http_decoder_exdata_free(struct http_decoder_exdata *ex_data) +static struct http_exdata *http_session_new(struct session *sess, struct http *http_env, + const char *tcp_payload, uint32_t tcp_payload_len) { - if (unlikely(NULL == ex_data)) - { - return; - } - if (ex_data->decoder != NULL) - { - http_decoder_free(ex_data->mempool, ex_data->decoder); - ex_data->decoder = NULL; - } - if (ex_data->queue != NULL) + struct http_exdata *exdata = (struct http_exdata *)calloc(1, sizeof(struct http_exdata)); + session_set_exdata(sess, http_env->exdata_id, exdata); + exdata->sess = sess; + exdata->http_env = http_env; + size_t http_identify_len = MIN(tcp_payload_len, HTTP_IDENTIFY_LEN); + int ret = http_protocol_identify(tcp_payload, http_identify_len); + if (ret < 0) { - http_decoder_result_queue_free(ex_data->mempool, ex_data->queue); - ex_data->queue = NULL; + STELLAR_LOG_DEBUG(http_env->logger_ref, HTTP_MODULE_NAME, + "http identify error, ignore this session: %s", session_get_readable_addr(sess)); + exdata->ignore_session = 1; + return exdata; } - if (ex_data->mempool) - { - nmx_destroy_pool(ex_data->mempool); - } - FREE(ex_data); -} - -static int http_protocol_identify(const char *data, size_t data_len) -{ - llhttp_t parser; - llhttp_settings_t settings; - enum llhttp_errno error; - - llhttp_settings_init(&settings); - llhttp_init(&parser, HTTP_BOTH, &settings); - - error = llhttp_execute(&parser, data, data_len); - if (error != HPE_OK) - { - return -1; - } - return 1; -} - -static void _http_decoder_context_free(struct http_decoder_env *env) -{ - if (NULL == env) - { - return; - } - - http_decoder_stat_free(&env->hd_stat); - - for (int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++) - { - if (env->topic_exdata_compose[i].msg_free_cb) - { - stellar_mq_destroy_topic(env->st, env->topic_exdata_compose[i].sub_topic_id); - } - } - - FREE(env); -} - -static int load_http_decoder_config(const char *cfg_path, - struct http_decoder_config *hd_cfg) -{ - FILE *fp = fopen(cfg_path, "r"); - if (NULL == fp) - { - fprintf(stderr, "[%s:%d]Can't open config file:%s", - __FUNCTION__, __LINE__, cfg_path); - return -1; - } - - int ret = 0; - char errbuf[256] = {0}; - - toml_table_t *root = toml_parse_file(fp, errbuf, sizeof(errbuf)); - fclose(fp); - - toml_table_t *basic_sec_tbl = toml_table_in(root, "basic"); - if (NULL == basic_sec_tbl) - { - fprintf(stderr, "[%s:%d]config file:%s has no key: [basic]", - __FUNCTION__, __LINE__, cfg_path); - toml_free(root); - return -1; - } - - toml_datum_t int_val = toml_int_in(basic_sec_tbl, "decompress"); - if (int_val.ok != 0) - { - hd_cfg->decompress_switch = int_val.u.b; - } - - int_val = toml_int_in(basic_sec_tbl, "mempool_size"); - if (int_val.ok != 0) - { - hd_cfg->mempool_size = int_val.u.i; - } - else - { - hd_cfg->mempool_size = DEFAULT_MEMPOOL_SIZE; - } - - int_val = toml_int_in(basic_sec_tbl, "result_queue_len"); - if (int_val.ok != 0) - { - hd_cfg->result_queue_len = int_val.u.i; - } - else - { - hd_cfg->result_queue_len = HD_RESULT_QUEUE_LEN; - } - - int_val = toml_int_in(basic_sec_tbl, "stat_interval_pkts"); - if (int_val.ok != 0) - { - hd_cfg->stat_interval_pkts = int_val.u.i; - } - else - { - hd_cfg->stat_interval_pkts = DEFAULT_STAT_INTERVAL_PKTS; - } - - int_val = toml_int_in(basic_sec_tbl, "stat_output_interval"); - if (int_val.ok != 0) - { - hd_cfg->stat_output_interval = int_val.u.i; - } - else - { - hd_cfg->stat_output_interval = DEFAULT_STAT_OUTPUT_INTERVAL; - } - - int_val = toml_int_in(basic_sec_tbl, "proxy_enable"); - if (int_val.ok != 0) - { - hd_cfg->proxy_enable = int_val.u.i; - } - else - { - hd_cfg->proxy_enable = 0; - } - - toml_free(root); - return ret; -} - -static int http_msg_get_request_header(const struct http_message *msg, const char *name, size_t name_len, - struct http_header_field *hdr_result) -{ - const struct http_decoder_half_data *req_data = - msg->ref_queue->array[msg->queue_index].req_data; - return http_decoder_half_data_get_header(req_data, name, name_len, hdr_result); -} - -static int http_msg_get_response_header(const struct http_message *msg, const char *name, size_t name_len, - struct http_header_field *hdr_result) -{ - const struct http_decoder_half_data *res_data = - msg->ref_queue->array[msg->queue_index].res_data; - return http_decoder_half_data_get_header(res_data, name, name_len, hdr_result); -} - -static int http_msg_request_header_next(const struct http_message *msg, - struct http_header_field *hdr) -{ - const struct http_decoder_half_data *req_data = - msg->ref_queue->array[msg->queue_index].req_data; - return http_decoder_half_data_iter_header((struct http_decoder_half_data *)req_data, hdr); -} - -static int http_msg_response_header_next(const struct http_message *msg, struct http_header_field *hdr) -{ - const struct http_decoder_half_data *res_data = - msg->ref_queue->array[msg->queue_index].res_data; - return http_decoder_half_data_iter_header((struct http_decoder_half_data *)res_data, hdr); -} - -#if 0 -static int http_msg_get_request_raw_body(const struct http_message *msg, hstring *body) -{ - const struct http_decoder_half_data *req_data = - msg->ref_queue->array[msg->queue_index].req_data; - return http_decoder_half_data_get_raw_body(req_data, body); -} - -static int http_msg_get_response_raw_body(const struct http_message *msg, hstring *body) -{ - const struct http_decoder_half_data *res_data = - msg->ref_queue->array[msg->queue_index].res_data; - return http_decoder_half_data_get_raw_body(res_data, body); -} - -static int http_msg_get_request_decompress_body(const struct http_message *msg, hstring *body) -{ - const struct http_decoder_half_data *req_data = - msg->ref_queue->array[msg->queue_index].req_data; - return http_decoder_half_data_get_decompress_body(req_data, body); -} - -static int http_msg_get_response_decompress_body(const struct http_message *msg, hstring *body) -{ - const struct http_decoder_half_data *res_data = - msg->ref_queue->array[msg->queue_index].res_data; - return http_decoder_half_data_get_decompress_body(res_data, body); -} -#endif - -static struct http_decoder_exdata *httpd_session_exdata_new(struct session *sess, struct http_decoder_env *httpd_env, - long long req_start_seq, long long res_start_seq) -{ - struct http_decoder_exdata *exdata = http_decoder_exdata_new(httpd_env->hd_cfg.mempool_size, - httpd_env->hd_cfg.result_queue_len, - httpd_env->hd_cfg.decompress_switch, - httpd_env, req_start_seq, res_start_seq); - // exdata->sub_topic_id = sub_topic_id; - int thread_id = stellar_get_current_thread_index(); - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_SESSION_NEW, 1); + exdata->decoder = http_decoder_new(); + int thread_id = module_manager_get_thread_id(http_env->mod_mgr_ref); + http_stat_update(&http_env->stat, thread_id, HTTP_SESSION_NEW, 1); + STELLAR_LOG_INFO(http_env->logger_ref, HTTP_MODULE_NAME, + "http identify succ, session: %s", session_get_readable_addr(sess)); return exdata; } -#ifdef __cplusplus -extern "C" +static void http_decoder_execute(struct session *sess, struct http *http_env, struct http_exdata *exdata, + const char *tcp_payload, uint16_t tcp_payload_len) { -#endif + int thread_id = module_manager_get_thread_id(http_env->mod_mgr_ref); + enum flow_type flow_dir = session_get_flow_type(sess); - void httpd_ex_data_free_cb(int idx, void *ex_data, void *arg) + http_stat_update_tcp_seg(&http_env->stat, thread_id, flow_dir, tcp_payload_len); + struct http_half *half = http_flow_get_nx(exdata, flow_dir, sess, http_env); + int ret = http_half_flow_process(half, tcp_payload, tcp_payload_len); + if (ret < 0) { - if (NULL == ex_data) - { - return; - } - struct http_decoder_exdata *exdata = (struct http_decoder_exdata *)ex_data; - http_decoder_exdata_free(exdata); - } - - void *httpd_session_ctx_new_cb(struct session *sess, void *plugin_env) - { - return (void *)HTTP_CTX_IS_HTTP; - } - - void httpd_session_ctx_free_cb(struct session *sess, void *session_ctx, void *plugin_env) - { - if (NULL == plugin_env || NULL == session_ctx) - { - return; - } - if (strncmp((const char *)session_ctx, HTTP_CTX_NOT_HTTP, strlen(HTTP_CTX_NOT_HTTP)) == 0) - { - return; - } - struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env; - int thread_id = stellar_get_current_thread_index(); - unsigned char flow_flag = 0; - session_is_symmetric(sess, &flow_flag); - - if (SESSION_SEEN_C2S_FLOW == flow_flag) - { - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_C2S_ASYMMETRY_SESSION, 1); - } - else if (SESSION_SEEN_S2C_FLOW == flow_flag) - { - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_S2C_ASYMMETRY_SESSION, 1); - } - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_SESSION_FREE, 1); - } - - static void http_decoder_execute(struct session *sess, struct http_decoder_env *httpd_env, struct http_decoder_exdata *exdata, const char *payload, uint16_t payload_len) - { - if (httpd_in_tunnel_transmitting(httpd_env, exdata)) - { - http_decoder_push_tunnel_data(sess, exdata, httpd_tunnel_state_to_msg(exdata), payload, payload_len); - httpd_tunnel_state_update(exdata); - return; - } - - int thread_id = stellar_get_current_thread_index(); - struct http_decoder_half *cur_half = NULL; - enum flow_type sess_dir = session_get_flow_type(sess); - if (FLOW_TYPE_C2S == sess_dir) - { - cur_half = exdata->decoder->c2s_half; - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_C2S_BYTES, payload_len); - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_C2S_TCP_SEG, 1); - } - else - { - cur_half = exdata->decoder->s2c_half; - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_S2C_BYTES, payload_len); - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_S2C_TCP_SEG, 1); - } - - http_decoder_half_reinit(cur_half, exdata->queue, exdata->mempool, sess); - int ret = http_decoder_half_parse(httpd_env->hd_cfg.proxy_enable, cur_half, payload, payload_len); - if (ret < 0) - { - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_STAT_PARSE_ERR, 1); - stellar_session_plugin_dettach_current_session(sess); - } - } - - void http_decoder_tunnel_msg_cb(struct session *sess, int topic_id, const void *tmsg, void *per_session_ctx, void *plugin_env) - { - struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env; - if (0 == httpd_env->hd_cfg.proxy_enable) - { - return; - } - hstring tunnel_payload; - http_tunnel_message_get_payload((const struct http_tunnel_message *)tmsg, &tunnel_payload); - uint16_t payload_len = tunnel_payload.iov_len; - const char *payload = (char *)tunnel_payload.iov_base; - if (NULL == payload || 0 == payload_len) - { - return; - } - - struct http_decoder_exdata *exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].exdata_id); - enum http_tunnel_message_type tmsg_type = http_tunnel_message_type_get((const struct http_tunnel_message *)tmsg); - - switch (tmsg_type) - { - case HTTP_TUNNEL_OPENING: - { - if (NULL != exdata) - { - // not support nested http tunnel - session_mq_ignore_message(sess, topic_id, httpd_env->plugin_id); - return; - } - size_t http_identify_len = payload_len > HTTP_IDENTIFY_LEN ? HTTP_IDENTIFY_LEN : payload_len; - int is_http = http_protocol_identify(payload, http_identify_len); - if (is_http) - { - long long max_req_seq = 0, max_res_seq = 0; - struct http_decoder_exdata *tcp_stream_exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id); - http_half_get_max_transaction_seq(tcp_stream_exdata, &max_req_seq, &max_res_seq); - exdata = httpd_session_exdata_new(sess, httpd_env, max_req_seq, max_res_seq); - session_exdata_set(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].exdata_id, exdata); - exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_MSG_INDEX].sub_topic_id; - exdata->in_tunnel_is_http = 1; - } - else - { - // inner tunnel is not http, do nothing, do not push this message again !!! - session_mq_ignore_message(sess, topic_id, httpd_env->plugin_id); - return; - } - } - break; - - case HTTP_TUNNEL_ACTIVE: - if (NULL == exdata) - { - session_mq_ignore_message(sess, topic_id, httpd_env->plugin_id); - http_decoder_stat_update(&httpd_env->hd_stat, stellar_get_current_thread_index(), HTTP_STAT_PARSE_ERR, 1); - return; - } - break; - - case HTTP_TUNNEL_CLOSING: - if (NULL == exdata) - { - http_decoder_stat_update(&httpd_env->hd_stat, stellar_get_current_thread_index(), HTTP_STAT_PARSE_ERR, 1); - return; - } - if (exdata->in_tunnel_is_http) - { - http_half_pre_context_free(sess, exdata); - } - return; - break; - - default: - break; - } - if (exdata->in_tunnel_is_http) - { - http_decoder_execute(sess, httpd_env, exdata, payload, payload_len); - } + STELLAR_LOG_WARN(http_env->logger_ref, HTTP_MODULE_NAME, + "llhttp parse error, ignore this session: %s", session_get_readable_addr(sess)); + http_stat_update(&http_env->stat, thread_id, HTTP_STAT_PARSE_ERR, 1); + exdata->ignore_session = 1; return; } - -static void http_decoder_on_session_free(struct session *sess,void *per_session_ctx, void *plugin_env) -{ - struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env; - struct http_decoder_exdata *exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id); - if (httpd_in_tunnel_transmitting(httpd_env, exdata)) - { - http_decoder_push_tunnel_data(sess, exdata, HTTP_TUNNEL_CLOSING, NULL, 0); - } - else - { - http_half_pre_context_free(sess, exdata); - } - return; } - void http_decoder_tcp_stream_msg_cb(struct session *sess, int topic_id, const void *msg, void *nouse_session_ctx, void *plugin_env) - { - struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env; - struct http_decoder_exdata *exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id); - const char *payload = NULL; - uint16_t payload_len = 0; - assert(msg != NULL); - - payload_len = tcp_segment_get_len((struct tcp_segment *)msg); - payload = tcp_segment_get_data((const struct tcp_segment *)msg); - if (unlikely(0 == payload_len || NULL == payload)) - { - return; - } - if (NULL == exdata) // first packet - { - size_t http_identify_len = payload_len > HTTP_IDENTIFY_LEN ? HTTP_IDENTIFY_LEN : payload_len; - int ret = http_protocol_identify(payload, http_identify_len); - if (ret < 0) - { - stellar_session_plugin_dettach_current_session(sess); - return; - } - exdata = httpd_session_exdata_new(sess, httpd_env, 0, 0); - exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_MSG_INDEX].sub_topic_id; - session_exdata_set(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id, exdata); - } - http_decoder_execute(sess, httpd_env, exdata, payload, payload_len); - return; - } - - static const struct http_topic_exdata_compose g_topic_exdata_compose[HTTPD_TOPIC_INDEX_MAX] = - { - {HTTPD_TOPIC_TCP_STREAM_INDEX, TOPIC_TCP_STREAM, http_decoder_tcp_stream_msg_cb, NULL, "HTTP_DECODER_EXDATA_BASEON_TCP_STREAM", httpd_ex_data_free_cb, -1, -1}, - {HTTPD_TOPIC_HTTP_MSG_INDEX, HTTP_TOPIC, NULL, http_message_free, NULL, NULL, -1, -1}, - {HTTPD_TOPIC_HTTP_TUNNEL_INDEX, HTTP_DECODER_TUNNEL_TOPIC, http_decoder_tunnel_msg_cb, http_message_free, "HTTP_DECODER_EXDATA_BASEON_HTTP_TUNNEL", httpd_ex_data_free_cb, -1, -1}, - }; - - static void http_decoder_topic_exdata_compose_init(struct http_decoder_env *httpd_env) +void http_message_free_cb(void *msg, void *msg_free_arg UNUSED) +{ + struct http_message *hmsg = (struct http_message *)msg; + struct http_half_data *flow_data = hmsg->flow_data; + if (hmsg->topic_type == HTTP_TOPIC_REQ_HEADER || hmsg->topic_type == HTTP_TOPIC_RES_HEADER) { - memcpy(httpd_env->topic_exdata_compose, g_topic_exdata_compose, sizeof(g_topic_exdata_compose)); - for (int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++) + if (flow_data) { - httpd_env->topic_exdata_compose[i].sub_topic_id = stellar_session_mq_get_topic_id_reliable(httpd_env->st, - httpd_env->topic_exdata_compose[i].topic_name, - httpd_env->topic_exdata_compose[i].msg_free_cb, - NULL); - assert(httpd_env->topic_exdata_compose[i].sub_topic_id >= 0); - - if (httpd_env->topic_exdata_compose[i].exdata_name) + if (flow_data->ut_filed_array) { - httpd_env->topic_exdata_compose[i].exdata_id = stellar_exdata_new_index(httpd_env->st, - httpd_env->topic_exdata_compose[i].exdata_name, - httpd_env->topic_exdata_compose[i].exdata_free_cb, - NULL); - assert(httpd_env->topic_exdata_compose[i].exdata_id >= 0); + utarray_free(flow_data->ut_filed_array); + flow_data->ut_filed_array = NULL; } - - if (httpd_env->topic_exdata_compose[i].on_msg_cb) + if (flow_data->joint_url.iov_base) { - stellar_session_mq_subscribe(httpd_env->st, httpd_env->topic_exdata_compose[i].sub_topic_id, - httpd_env->topic_exdata_compose[i].on_msg_cb, httpd_env->plugin_id); + free(flow_data->joint_url.iov_base); + flow_data->joint_url.iov_base = NULL; } - } - } - - int http_topic_exdata_compose_get_index(const struct http_decoder_env *httpd_env, int by_topic_id) - { - for (int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++) - { - if (httpd_env->topic_exdata_compose[i].sub_topic_id == by_topic_id) + if (flow_data->cached_header_buffer != NULL) { - return i; + assert(flow_data->cached_header_buffer->reference > 0); + flow_data->cached_header_buffer->reference--; + if (flow_data->cached_header_buffer->reference == 0) + { + free(flow_data->cached_header_buffer->buffer); + free(flow_data->cached_header_buffer); + flow_data->cached_header_buffer = NULL; + } } } - assert(0); - return -1; - } - - void *http_decoder_init(struct stellar *st) - { - int thread_num = 0; - - struct http_decoder_env *httpd_env = CALLOC(struct http_decoder_env, 1); - int ret = load_http_decoder_config(HTTPD_CFG_FILE, &httpd_env->hd_cfg); - if (ret < 0) - { - goto failed; - } - httpd_env->st = st; - httpd_env->plugin_id = stellar_plugin_register(st, httpd_session_ctx_new_cb, - httpd_session_ctx_free_cb, NULL,http_decoder_on_session_free,(void *)httpd_env); - if (httpd_env->plugin_id < 0) - { - goto failed; - } - http_decoder_topic_exdata_compose_init(httpd_env); - - thread_num = stellar_get_worker_thread_num(st); - assert(thread_num >= 1); - if (http_decoder_stat_init(&httpd_env->hd_stat, thread_num, - httpd_env->hd_cfg.stat_interval_pkts, httpd_env->hd_cfg.stat_output_interval) < 0) - { - goto failed; - } - - return httpd_env; - - failed: - fprintf(stderr, "http_decoder_init fail!\n"); - _http_decoder_context_free(httpd_env); - return NULL; - } - - void http_decoder_exit(void *plugin_env) - { - if (NULL == plugin_env) - { - return; - } - struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env; - _http_decoder_context_free(httpd_env); } - - enum http_message_type http_message_get_type(const struct http_message *msg) + if (hmsg->event == HTTP_EVENT_REQ_BODY_DATA || hmsg->event == HTTP_EVENT_RES_BODY_DATA) { - if (unlikely(NULL == msg)) + if (flow_data->decompress_body.body != NULL) { - return HTTP_MESSAGE_MAX; + FREE(flow_data->decompress_body.body); } - return msg->type; } - - void http_message_get0_request_line(const struct http_message *msg, - struct http_request_line *line) + if (hmsg->event == HTTP_EVENT_REQ_END || hmsg->event == HTTP_EVENT_RES_END) { - if (unlikely(NULL == msg || msg->type != HTTP_MESSAGE_REQ_LINE)) + if (flow_data) { - if (line) - { - line->method = NULL; - line->uri = NULL; - line->version = NULL; - } - return; + http_half_data_free(flow_data); + hmsg->flow_data = NULL; } - assert(msg->ref_queue); - assert(msg->queue_index < HD_RESULT_QUEUE_LEN); - - struct http_decoder_half_data *req_data = - msg->ref_queue->array[msg->queue_index].req_data; - - http_decoder_half_data_get_request_line(req_data, line); } - void http_message_get0_response_line(const struct http_message *msg, - struct http_response_line *line) - { - if (unlikely(NULL == msg || msg->type != HTTP_MESSAGE_RES_LINE)) - { - if (line) - { - line->version = NULL; - line->status = NULL; - } - return; - } - assert(msg->ref_queue); - assert(msg->queue_index < HD_RESULT_QUEUE_LEN); - - struct http_decoder_half_data *res_data = - msg->ref_queue->array[msg->queue_index].res_data; - - http_decoder_half_data_get_response_line(res_data, line); - } + free(hmsg); +} - void http_message_get0_header(const struct http_message *msg, const char *name, size_t name_len, - struct http_header_field *hdr_result) +void http_exdata_free_cb(UNUSED int idx, void *ex_data, UNUSED void *plugin_env) +{ + if (NULL == ex_data) { - int ret = -1; - if (unlikely(NULL == msg || NULL == name || 0 == name_len)) - { - goto fail; - } - assert(msg->ref_queue); - assert(msg->queue_index < HD_RESULT_QUEUE_LEN); - if (HTTP_MESSAGE_REQ_HEADER == msg->type) - { - ret = http_msg_get_request_header(msg, name, name_len, hdr_result); - } - else if (HTTP_MESSAGE_RES_HEADER == msg->type) - { - ret = http_msg_get_response_header(msg, name, name_len, hdr_result); - } - if (ret >= 0) - { - return; - } - fail: - if (hdr_result) - { - hdr_result->name = NULL; - hdr_result->value = NULL; - } return; } + struct http_exdata *exdata = (struct http_exdata *)ex_data; + struct http *http_env = (struct http *)plugin_env; - int http_message_get0_next_header(const struct http_message *msg, struct http_header_field *header) + if (exdata->ignore_session == 0) { - int ret = 1; - if (unlikely(NULL == msg)) - { - goto fail; - } - assert(msg->ref_queue); - assert(msg->queue_index < HD_RESULT_QUEUE_LEN); - if (HTTP_MESSAGE_REQ_HEADER == msg->type) - { - ret = http_msg_request_header_next(msg, header); - } - else if (HTTP_MESSAGE_RES_HEADER == msg->type) - { - ret = http_msg_response_header_next(msg, header); - } - if (ret < 0) - { - goto fail; - } - return 0; - fail: - if (header) - { - header->name = NULL; - header->value = NULL; - } - return -1; - } - - int http_message_reset_header_iter(struct http_message *msg) - { - if (unlikely(NULL == msg)) + int thread_id = module_manager_get_thread_id(http_env->mod_mgr_ref); + unsigned char flow_flag = 0; + if (exdata->sess) { - return -1; + session_is_symmetric(exdata->sess, &flow_flag); + STELLAR_LOG_INFO(http_env->logger_ref, HTTP_MODULE_NAME, + "http session free: %s", session_get_readable_addr(exdata->sess)); } - assert(msg->ref_queue); - assert(msg->queue_index < HD_RESULT_QUEUE_LEN); - if (HTTP_MESSAGE_REQ_HEADER == msg->type) + if (SESSION_SEEN_C2S_FLOW == flow_flag) { - struct http_decoder_half_data *req_data = - msg->ref_queue->array[msg->queue_index].req_data; - return http_decoder_half_data_reset_header_iter(req_data); + http_stat_update(&http_env->stat, thread_id, HTTP_C2S_ASYMMETRY_SESSION, 1); } - else if (HTTP_MESSAGE_RES_HEADER == msg->type) + else if (SESSION_SEEN_S2C_FLOW == flow_flag) { - struct http_decoder_half_data *res_data = - msg->ref_queue->array[msg->queue_index].res_data; - return http_decoder_half_data_reset_header_iter(res_data); + http_stat_update(&http_env->stat, thread_id, HTTP_S2C_ASYMMETRY_SESSION, 1); } - return -1; + http_stat_update(&http_env->stat, thread_id, HTTP_SESSION_FREE, 1); } + http_decoder_free(exdata->decoder); + exdata->decoder = NULL; + free(exdata); +} - void http_message_get0_uncompressed_body(const struct http_message *msg, const char **body_ptr, size_t *body_len) - { - if (unlikely(NULL == msg)) - { - goto fail; - } - assert(msg->ref_queue); - assert(msg->queue_index < HD_RESULT_QUEUE_LEN); - if (msg->raw_payload.iov_base != NULL && msg->raw_payload.iov_len != 0) - { - *body_ptr = msg->raw_payload.iov_base; - *body_len = msg->raw_payload.iov_len; - return; - } - fail: - if (body_ptr) - { - *body_ptr = NULL; - *body_len = 0; - } - return; - } +void http_on_tcp_stream_cb(struct session *sess, enum session_state state UNUSED, const char *tcp_payload, uint32_t tcp_payload_len, void *args) +{ + struct http *http_env = (struct http *)args; + struct http_exdata *exdata = (struct http_exdata *)session_get_exdata(sess, http_env->exdata_id); - void http_message_get0_decompressed_body(const struct http_message *msg, const char **dec_body_ptr, size_t *dec_body_len) + if (unlikely(0 == tcp_payload_len || NULL == tcp_payload)) { - enum http_content_encoding ecode = HTTP_CONTENT_ENCODING_NONE; - struct http_decoder_half_data *ref_data = NULL; - if (unlikely(NULL == msg)) - { - goto fail; - } - assert(msg->ref_queue); - assert(msg->queue_index < HD_RESULT_QUEUE_LEN); - if (msg->decompress_payload.iov_base != NULL && msg->decompress_payload.iov_len != 0) - { - *dec_body_ptr = msg->decompress_payload.iov_base; - *dec_body_len = msg->decompress_payload.iov_len; - return; - } - /** - * @brief If the body hasn't been compressed, same as http_message_get0_uncompressed_body(). - * - */ - - if (HTTP_MESSAGE_REQ_BODY_START == msg->type || HTTP_MESSAGE_REQ_BODY == msg->type || HTTP_MESSAGE_REQ_BODY_END == msg->type) - { - ref_data = msg->ref_queue->array[msg->queue_index].req_data; - } - else if (HTTP_MESSAGE_RES_BODY_START == msg->type || HTTP_MESSAGE_RES_BODY == msg->type || HTTP_MESSAGE_RES_BODY_END == msg->type) - { - ref_data = msg->ref_queue->array[msg->queue_index].res_data; - } - ecode = http_half_data_get_content_encoding(ref_data); - if (ref_data != NULL && HTTP_CONTENT_ENCODING_NONE != ecode) - { - goto fail; - } - - if (msg->raw_payload.iov_base != NULL && msg->raw_payload.iov_len != 0) - { - *dec_body_ptr = msg->raw_payload.iov_base; - *dec_body_len = msg->raw_payload.iov_len; - } - return; - fail: - if (dec_body_ptr) - { - *dec_body_ptr = NULL; - *dec_body_len = 0; - } return; } - - void http_message_get0_raw_url(const struct http_message *msg, const char **url_val, size_t *url_len) + if (unlikely(NULL == exdata)) // first packet { - struct http_decoder_half_data *req_data = NULL; - if (unlikely(NULL == msg)) - { - goto fail; - } - assert(msg->ref_queue); - assert(msg->queue_index < HD_RESULT_QUEUE_LEN); - - req_data = msg->ref_queue->array[msg->queue_index].req_data; - - if (http_half_data_get_url(req_data, url_val, url_len) < 0) - { - goto fail; - } - return; - - fail: - if (url_val) - { - *url_val = NULL; - *url_len = 0; - } - return; + exdata = http_session_new(sess, http_env, tcp_payload, tcp_payload_len); } -#if 0 - void http_message_decoded_url_get0(const struct http_message *msg, struct iovec *url) + if (exdata->ignore_session) // quic path { - if (unlikely(NULL == msg)) - { - if (url) - { - url->iov_base = NULL; - url->iov_len = 0; - } - return; - } - assert(msg->ref_queue); - assert(msg->queue_index < HD_RESULT_QUEUE_LEN); - - struct http_decoder_half_data *req_data = - msg->ref_queue->array[msg->queue_index].req_data; - - if (http_half_data_get_decode_url(req_data, url) < 0) - { - goto fail; - } - return; - - fail: - if (url) - { - url->iov_base = NULL; - url->iov_len = 0; - } return; } -#endif - int http_message_get_transaction_seq(const struct http_message *msg) - { - if (unlikely(NULL == msg)) - { - return -1; - } - assert(msg->ref_queue); - assert(msg->queue_index < HD_RESULT_QUEUE_LEN); - struct http_decoder_half_data *hf_data = NULL; - if (HTTP_REQUEST == msg->flow_type) - { - hf_data = msg->ref_queue->array[msg->queue_index].req_data; - } - else - { - hf_data = msg->ref_queue->array[msg->queue_index].res_data; - } - return http_half_data_get_transaction_seq(hf_data); - } -#ifdef __cplusplus -} -#endif
\ No newline at end of file + STELLAR_LOG_DEBUG(http_env->logger_ref, HTTP_MODULE_NAME, + "sess: %s, rcv tcp payload len: %u", session_get_readable_addr(exdata->sess), tcp_payload_len); + http_decoder_execute(sess, http_env, exdata, tcp_payload, tcp_payload_len); + return; +}
\ No newline at end of file diff --git a/decoders/http/http_decoder.h b/decoders/http/http_decoder.h new file mode 100644 index 0000000..06cc72b --- /dev/null +++ b/decoders/http/http_decoder.h @@ -0,0 +1,104 @@ + +#pragma once +#include <stdint.h> +#include <stddef.h> +#include "stellar/session.h" +#include "stellar/http.h" +#include "llhttp.h" +#include "http_decoder_stat.h" +#include "http_decoder_half.h" + +#ifdef __cplusplus +extern "C" +{ +#endif + +#define HTTP_IDENTIFY_LEN 16 +#define HTTP_HEADERS_CACHE_MAX_SIZE 4096 + +#define HTTP_MODULE_NAME "HTTP" +#define HTTP_EXDATA_NAME "HTTP_EXDATA" + +#ifndef likely +#define likely(x) __builtin_expect((x), 1) +#endif +#ifndef unlikely +#define unlikely(x) __builtin_expect((x), 0) +#endif + +#ifndef UNUSED +#define UNUSED __attribute__((unused)) +#endif + + struct http_config + { + int decompress_switch; + // int stat_output_interval; + // size_t mempool_size; // per session mempool size + }; + + enum http_topic_type + { + HTTP_TOPIC_REQ_HEADER = 0, + HTTP_TOPIC_REQ_BODY, + HTTP_TOPIC_RES_HEADER, + HTTP_TOPIC_RES_BODY, + HTTP_TOPIC_MAX, + }; + + struct http_topic_compose + { + const char *topic_name; + int topic_id; + }; + + struct http_topic_manager + { + struct http_topic_compose topic_compose[HTTP_TOPIC_MAX]; + }; + + struct http + { + struct module_manager *mod_mgr_ref; + struct logger *logger_ref; + struct http_topic_manager *http_topic_mgr; + int exdata_id; + int plugin_id; + struct http_config hd_cfg; + struct http_stat stat; + }; + + struct http_half; + struct http_half_data; + struct http_decoder + { + struct http_half *flow_c2s; + struct http_half *flow_s2c; + }; + + struct http_exdata + { + int ignore_session; + struct http *http_env; + struct session *sess; + struct http_decoder *decoder; + }; + + struct http_message + { + struct session *sess_ref; + enum http_topic_type topic_type; + struct http_half_data *flow_data; + enum http_event event; + }; + + void http_on_tcp_stream_cb(struct session *sess, enum session_state state, const char *tcp_payload, uint32_t tcp_payload_len, void *args); + struct http_topic_manager *http_topic_mgr_init(struct module_manager *mod_mgr); + struct http_half *http_flow_get_nx(struct http_exdata *exdata, enum flow_type flow_dir, struct session *sess, struct http *http_env); + void http_message_free_cb(void *msg, void *msg_free_arg); + void http_exdata_free_cb(int idx, void *ex_data, void *plugin_env); + void http_topic_mgr_free(struct http_topic_manager *topic_mgr); + +#ifdef __cplusplus +} +#endif
\ No newline at end of file diff --git a/decoders/http/http_decoder_decompress.c b/decoders/http/http_decoder_decompress.c new file mode 100644 index 0000000..438987e --- /dev/null +++ b/decoders/http/http_decoder_decompress.c @@ -0,0 +1,274 @@ +#include <zlib.h> +#include <string.h> +#include <assert.h> +#include <brotli/decode.h> +#include "http_decoder_utils.h" +#include "http_decoder_decompress.h" + +#ifdef __cplusplus +extern "C" +{ +#endif + + struct http_content_decompress + { + enum http_content_encoding encoding; + z_stream *z_stream_ptr; + BrotliDecoderState *br_state; + char *buffer; + size_t buffer_size; + }; + + void http_content_decompress_ownership_borrow(struct http_content_decompress *decompress) + { + decompress->buffer = NULL; // ownership move to flow_data, will be freed when message has been processed by all modules + } + + enum http_content_encoding http_content_encoding_str2int(const char *content_encoding, size_t encoding_str_len) + { + if (http_strncasecmp_safe("gzip", content_encoding, 4, encoding_str_len) == 0) + { + return HTTP_CONTENT_ENCODING_GZIP; + } + if (http_strncasecmp_safe("deflate", content_encoding, 7, encoding_str_len) == 0) + { + return HTTP_CONTENT_ENCODING_DEFLATE; + } + if (http_strncasecmp_safe("br", content_encoding, 2, encoding_str_len) == 0) + { + return HTTP_CONTENT_ENCODING_BR; + } + return HTTP_CONTENT_ENCODING_NONE; + } + + const char *http_content_encoding_int2str(enum http_content_encoding content_encoding) + { + if (content_encoding == HTTP_CONTENT_ENCODING_GZIP) + { + return "gzip"; + } + if (content_encoding == HTTP_CONTENT_ENCODING_DEFLATE) + { + return "deflate"; + } + if (content_encoding == HTTP_CONTENT_ENCODING_BR) + { + return "br"; + } + return "unknown"; + } + + struct http_content_decompress *http_content_decompress_create(enum http_content_encoding encoding) + { + struct http_content_decompress *decompress = CALLOC(struct http_content_decompress, 1); + assert(decompress); + + decompress->encoding = encoding; + decompress->z_stream_ptr = NULL; + decompress->br_state = NULL; + + if (encoding == HTTP_CONTENT_ENCODING_GZIP || encoding == HTTP_CONTENT_ENCODING_DEFLATE) + { + decompress->z_stream_ptr = CALLOC(z_stream, 1); + assert(decompress->z_stream_ptr); + + decompress->z_stream_ptr->zalloc = NULL; + decompress->z_stream_ptr->zfree = NULL; + decompress->z_stream_ptr->opaque = NULL; + decompress->z_stream_ptr->avail_in = 0; + decompress->z_stream_ptr->next_in = Z_NULL; + + if (encoding == HTTP_CONTENT_ENCODING_GZIP) + { + if (inflateInit2(decompress->z_stream_ptr, MAX_WBITS + 16) != Z_OK) + { + goto error; + } + } + if (encoding == HTTP_CONTENT_ENCODING_DEFLATE) + { + if (inflateInit2(decompress->z_stream_ptr, -MAX_WBITS) != Z_OK) + { + goto error; + } + } + } + + if (encoding == HTTP_CONTENT_ENCODING_BR) + { + decompress->br_state = BrotliDecoderCreateInstance(NULL, NULL, NULL); + if (decompress->br_state == NULL) + { + goto error; + } + } + return decompress; + + error: + http_content_decompress_destroy(decompress); + return NULL; + } + + void http_content_decompress_destroy(struct http_content_decompress *decompress) + { + if (NULL == decompress) + { + return; + } + if (decompress->z_stream_ptr != NULL) + { + inflateEnd(decompress->z_stream_ptr); + FREE(decompress->z_stream_ptr); + } + if (decompress->br_state) + { + BrotliDecoderDestroyInstance(decompress->br_state); + decompress->br_state = NULL; + } + FREE(decompress->buffer); + FREE(decompress); + } + + static int http_content_decompress_write_zlib(struct http_content_decompress *decompress, + const char *indata, size_t indata_len, + char **outdata, size_t *outdata_len) + { + z_stream *z_stream_ptr = decompress->z_stream_ptr; + z_stream_ptr->avail_in = (unsigned int)indata_len; + z_stream_ptr->next_in = (unsigned char *)indata; + z_stream_ptr->avail_out = (unsigned int)HTTP_DECOMPRESS_BUFFER_SIZE; + z_stream_ptr->next_out = (unsigned char *)decompress->buffer; + *outdata = NULL; + *outdata_len = 0; + size_t total_have = 0; + int no_buffer; + + do + { + int ret = inflate(z_stream_ptr, Z_NO_FLUSH); + if (ret == Z_STREAM_ERROR || ret == Z_NEED_DICT || ret == Z_DATA_ERROR || ret == Z_MEM_ERROR) + { + (void)inflateEnd(z_stream_ptr); + return -1; + } + size_t have = HTTP_DECOMPRESS_BUFFER_SIZE - z_stream_ptr->avail_out; + if (have > 0) + { + total_have += have; + if (0 == z_stream_ptr->avail_out) + { + decompress->buffer_size += HTTP_DECOMPRESS_BUFFER_SIZE; + decompress->buffer = REALLOC(char, decompress->buffer, decompress->buffer_size); + z_stream_ptr->avail_out = HTTP_DECOMPRESS_BUFFER_SIZE; + z_stream_ptr->next_out = (unsigned char *)decompress->buffer + total_have; + *outdata = decompress->buffer; + *outdata_len = total_have; + no_buffer = 1; + } + else + { + *outdata = decompress->buffer; + *outdata_len = total_have; + no_buffer = 0; + } + } + else + { + break; + } + if (Z_STREAM_END == ret) + { + break; + } + } while (no_buffer == 1); + return 0; + } + + static int http_content_decompress_write_br(struct http_content_decompress *decompress, + const char *indata, size_t indata_len, + char **outdata, size_t *outdata_len) + { + size_t available_in = indata_len; + const unsigned char *next_in = (const unsigned char *)indata; + size_t available_out = HTTP_DECOMPRESS_BUFFER_SIZE; + unsigned char *next_out = (unsigned char *)decompress->buffer; + + *outdata = NULL; + *outdata_len = 0; + size_t total_have = 0; + int no_buffer; + + do + { + int ret = BrotliDecoderDecompressStream(decompress->br_state, &available_in, + &next_in, &available_out, &next_out, NULL); + if (ret == BROTLI_DECODER_RESULT_ERROR) + { + // BrotliDecoderErrorCode errcode = BrotliDecoderGetErrorCode(decompress->br_state); + *outdata = NULL; + *outdata_len = 0; + return -1; + } + size_t have = HTTP_DECOMPRESS_BUFFER_SIZE - available_out; + if (have > 0) + { + total_have += have; + if (0 == available_out) + { + decompress->buffer_size += HTTP_DECOMPRESS_BUFFER_SIZE; + decompress->buffer = REALLOC(char, decompress->buffer, decompress->buffer_size); + available_out = HTTP_DECOMPRESS_BUFFER_SIZE; + next_out = (unsigned char *)decompress->buffer + total_have; + *outdata = decompress->buffer; + *outdata_len = total_have; + no_buffer = 1; + } + else + { + *outdata = decompress->buffer; + *outdata_len = have; + no_buffer = 0; + } + } + else + { + break; + } + } while (no_buffer == 1); + return 0; + } + + int http_content_decompress_write(struct http_content_decompress *decompress, + const char *indata, size_t indata_len, + char **outdata, size_t *outdata_len) + { + assert(decompress); + assert(indata); + assert(indata_len > 0); + assert(outdata); + assert(outdata_len); + *outdata = NULL; + *outdata_len = 0; + + if (NULL == decompress->buffer) + { + decompress->buffer = CALLOC(char, HTTP_DECOMPRESS_BUFFER_SIZE); + assert(decompress->buffer); + decompress->buffer_size = HTTP_DECOMPRESS_BUFFER_SIZE; + } + + if (decompress->encoding == HTTP_CONTENT_ENCODING_GZIP || + decompress->encoding == HTTP_CONTENT_ENCODING_DEFLATE) + { + return http_content_decompress_write_zlib(decompress, indata, indata_len, outdata, outdata_len); + } + if (decompress->encoding == HTTP_CONTENT_ENCODING_BR) + { + return http_content_decompress_write_br(decompress, indata, indata_len, outdata, outdata_len); + } + assert(0); + return -1; + } +#ifdef __cplusplus +} +#endif
\ No newline at end of file diff --git a/decoders/http/http_content_decompress.h b/decoders/http/http_decoder_decompress.h index 2ac4689..fd82dd5 100644 --- a/decoders/http/http_content_decompress.h +++ b/decoders/http/http_decoder_decompress.h @@ -6,6 +6,9 @@ extern "C" #endif #include <stddef.h> +#include "stellar/http.h" + +#define HTTP_DECOMPRESS_BUFFER_SIZE (4096) enum http_content_encoding { @@ -13,18 +16,14 @@ extern "C" HTTP_CONTENT_ENCODING_GZIP = 1 << 1, HTTP_CONTENT_ENCODING_DEFLATE = 1 << 2, HTTP_CONTENT_ENCODING_BR = 1 << 3, + HTTP_CONTENT_ENCODING_UNKNOWN = 1 << 4, }; struct http_content_decompress; - enum http_content_encoding http_content_encoding_str2int(const char *content_encoding, size_t encoding_str_len); - const char *http_content_encoding_int2str(enum http_content_encoding content_encoding); - struct http_content_decompress *http_content_decompress_create(enum http_content_encoding encoding); - void http_content_decompress_destroy(struct http_content_decompress *decompress); - // return 0 : success // return -1 : error int http_content_decompress_write(struct http_content_decompress *decompress, diff --git a/decoders/http/http_decoder_half.c b/decoders/http/http_decoder_half.c index 70991c6..84874a3 100644 --- a/decoders/http/http_decoder_half.c +++ b/decoders/http/http_decoder_half.c @@ -2,1187 +2,689 @@ #include <stdio.h> #include <string.h> #include <arpa/inet.h> -#include "http_decoder_private.h" +#include "http_decoder.h" +#include "http_decoder_half.h" +#include "http_decoder_utils.h" +#include "http_decoder_decompress.h" #include "llhttp.h" +#include "stellar/session.h" #include "uthash/utlist.h" +#include "uthash/utarray.h" -struct http_decompress_buffer +#ifdef __cplusplus +extern "C" { - struct iovec iov; - char is_commit; - struct http_decompress_buffer *next, *prev; -}; - -struct http_decoder_half_data -{ - struct http_decoder_table *table; - - int major_version; - int minor_version; - int status_code; - - enum http_event state; - - enum http_content_encoding content_encoding; - struct http_content_decompress *decompress; -#if 0 - char *ref_decompress_body; - size_t decompress_body_len; -#else - struct http_decompress_buffer *decompress_buffer_list; -#endif - int joint_url_complete; - int url_is_encoded; - // http://<host>[:<port>]/<path>?<searchpart> - hstring joint_url; - hstring decoded_url; - long long transaction_index; -}; - -struct http_decoder_half -{ - llhttp_t parser; - llhttp_settings_t settings; - enum llhttp_errno error; - int decompress_switch; - struct http_decoder_env *httpd_env; - - // uint8_t is_request_flow; - enum http_event event; - http_event_cb *http_ev_cb; - struct http_event_context *http_ev_ctx; - - struct http_decoder_half_data *ref_data; - - long long trans_counter; - long long err_counter; - long long transaction_seq; // accumulated - - const char *data; - int data_len; -}; - -// #define HTTP_DECODER_DEBUG -#ifdef HTTP_DECODER_DEBUG -static void printf_debug_info(const char *desc, const char *at, size_t length) -{ - if (at) - { - char *temp = http_safe_dup(at, length); - printf("HTTP PARSER STAGE: %s: %s\n", desc, temp); - FREE(temp); - } - else - { - printf("HTTP PARSER STAGE: %s\n", desc); - } -} -#else -#define printf_debug_info(desc, at, length) #endif -void http_half_decompress_buffer_free(struct http_decoder_half_data *data, hstring *decompress_body) -{ - struct http_decompress_buffer *el, *tmp; - DL_FOREACH_SAFE(data->decompress_buffer_list, el, tmp) + static inline void http_half_stage_reset_flow_data(struct http_half *half) { - if (el->iov.iov_base == decompress_body->iov_base && el->iov.iov_len == decompress_body->iov_len) + if (half->flow_data) { - DL_DELETE(data->decompress_buffer_list, el); - if (el->iov.iov_base) + memset(&half->flow_data->req_line, 0, sizeof(struct http_request_line)); + memset(&half->flow_data->status_line, 0, sizeof(struct http_status_line)); + memset(&half->flow_data->header, 0, sizeof(struct http_header)); + if (half->flow_data->ut_filed_array) { - FREE(el->iov.iov_base); + utarray_clear(half->flow_data->ut_filed_array); } - FREE(el); - break; } } -} - -void http_half_get_lastest_decompress_buffer(struct http_decoder_half_data *data, hstring *decompress_body) -{ - if (data->content_encoding == HTTP_CONTENT_ENCODING_NONE) - { - return; - } - if (data->decompress_buffer_list == NULL) - { - decompress_body->iov_base = NULL; - decompress_body->iov_len = 0; - return; - } - if (data->decompress_buffer_list->prev->is_commit == 1) + static inline void http_half_stage_reset_stat(struct http_half *half) { - decompress_body->iov_base = NULL; - decompress_body->iov_len = 0; - return; + memset(&half->half_stage, 0, sizeof(struct http_half_llhttp_stage)); } - decompress_body->iov_base = data->decompress_buffer_list->prev->iov.iov_base; - decompress_body->iov_len = data->decompress_buffer_list->prev->iov.iov_len; - data->decompress_buffer_list->prev->is_commit = 1; -} - -static void http_decoder_half_data_decompress(struct http_decoder_half_data *data) -{ - assert(data); - - if (data->content_encoding == HTTP_CONTENT_ENCODING_NONE) + static void http_half_stage_reset(struct http_half *half) { - return; + http_half_stage_reset_stat(half); + http_half_stage_reset_flow_data(half); } - hstring raw_body = {}; - http_decoder_table_get_body(data->table, (char **)&raw_body.iov_base, &raw_body.iov_len); - if (raw_body.iov_base == NULL || raw_body.iov_len == 0) + void http_flow_append_header_filed(struct http_half_data *flow_data, const char *at, size_t length) { - return; + if (NULL == flow_data->ut_filed_array) // the first field + { + UT_icd header_icd = {sizeof(struct http_header_field), NULL, NULL, NULL}; + utarray_new(flow_data->ut_filed_array, &header_icd); + assert(flow_data->ut_filed_array != NULL); + utarray_reserve(flow_data->ut_filed_array, HTTP_HEADER_FIELD_RESERVE_NUM); + } + struct http_header_field new_filed = {}; + new_filed.field_name = (char *)at; + new_filed.field_name_len = length; + utarray_push_back(flow_data->ut_filed_array, &new_filed); } - if (NULL == data->decompress) + void http_flow_append_header_value(struct http_half_data *flow_data, const char *at, size_t length) { - data->decompress = http_content_decompress_create(data->content_encoding); + assert(flow_data->ut_filed_array != NULL); + struct http_header_field *last_field = (struct http_header_field *)utarray_back(flow_data->ut_filed_array); + assert(last_field != NULL); + if (last_field) + { + last_field->field_value = (char *)at; + last_field->field_value_len = length; + } } - assert(data->decompress); - char *local_outdata = NULL; - size_t local_outdata_len = 0; - if (http_content_decompress_write(data->decompress, (char *)raw_body.iov_base, - raw_body.iov_len, - &local_outdata, - &local_outdata_len) == -1) + static struct http_message *http_produce_message(struct session *sess, enum http_event event, + enum http_topic_type topic_type, struct http_half_data *flow_data) { - // log error - http_content_decompress_destroy(data->decompress); - data->decompress = NULL; - return; + struct http_message *msg = (struct http_message *)calloc(1, sizeof(struct http_message)); + msg->sess_ref = sess; + msg->topic_type = topic_type; + msg->flow_data = flow_data; + msg->event = event; + return msg; } - if (local_outdata != NULL && local_outdata_len > 0) + static struct http_half_data *http_half_data_new(enum flow_type flow_dir) { - struct http_decompress_buffer *decompress_buffer = CALLOC(struct http_decompress_buffer, 1); - assert(decompress_buffer); - decompress_buffer->iov.iov_base = local_outdata; - decompress_buffer->iov.iov_len = local_outdata_len; - DL_APPEND(data->decompress_buffer_list, decompress_buffer); - http_content_decompress_ownership_borrow(data->decompress); + struct http_half_data *half_data = (struct http_half_data *)calloc(1, sizeof(struct http_half_data)); + half_data->flow_dir = flow_dir; + /* update transaction seq in http_half_parse_headers_finally(), not here */ + return half_data; } -} - -/* Possible return values 0, -1, `HPE_PAUSED` */ -static int on_message_begin(llhttp_t *http) -{ - printf_debug_info("on_message_begin", NULL, 0); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - if (half->parser.type == HTTP_REQUEST) - { - half->event = HTTP_EVENT_REQ_INIT; - } - else + void http_half_data_free(struct http_half_data *half_data) { - half->event = HTTP_EVENT_RES_INIT; + if (NULL == half_data) + { + return; + } + if (half_data->ut_filed_array) + { + utarray_free(half_data->ut_filed_array); + half_data->ut_filed_array = NULL; + } + if (half_data->joint_url.iov_base) + { + FREE(half_data->joint_url.iov_base); + } + if (half_data->decompress) + { + http_content_decompress_destroy(half_data->decompress); + half_data->decompress = NULL; + } + free(half_data); } - half->ref_data = NULL; - - assert(half->http_ev_cb != NULL); - half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler() - - half->trans_counter++; - half->ref_data->transaction_index = half->transaction_seq++; - return 0; -} - -static int on_message_complete(llhttp_t *http) -{ - printf_debug_info("on_message_complete", NULL, 0); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - - if (half->parser.type == HTTP_REQUEST) + void http_half_free(struct http_half *half) { - if (half->event == HTTP_EVENT_REQ_BODY_DATA) + if (NULL == half) { - half->event = HTTP_EVENT_REQ_BODY_END; - half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); + return; } - } - else - { - if (half->event == HTTP_EVENT_RES_BODY_DATA) + llhttp_reset(&half->parser.llhttp_parser); + http_half_data_free(half->flow_data); + if (half->cached_header_buffer) { - half->event = HTTP_EVENT_RES_BODY_END; - half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); + if (half->cached_header_buffer->buffer) + { + FREE(half->cached_header_buffer->buffer); + } + FREE(half->cached_header_buffer); } + free(half); } - // trigger req_end/res_end - if (half->parser.type == HTTP_REQUEST) - { - half->event = HTTP_EVENT_REQ_END; - half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); - } - else + struct http_half *http_half_new(enum flow_type flow_dir) { - half->event = HTTP_EVENT_RES_END; - half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); + struct http_half *half = (struct http_half *)calloc(1, sizeof(struct http_half)); + if (FLOW_TYPE_C2S == flow_dir) + { + http_flow_parser_init(&half->parser, HTTP_REQUEST); + } + else + { + http_flow_parser_init(&half->parser, HTTP_RESPONSE); + } + half->parser.half_ref = half; + return half; } - return 0; -} - -static int on_reset(llhttp_t *http __attribute__((unused))) -{ - printf_debug_info("on_reset", NULL, 0); - - return 0; -} - -static inline int is_line_crlf(struct http_decoder_half *half) -{ - const char *chr_r = (char *)memrchr(half->data, '\r', half->data_len); - const char *chr_n = (char *)memrchr(half->data, '\n', half->data_len); - if (chr_r && chr_n && (chr_r + 1 == chr_n)) + static struct http_header_field *http_flow_get_header_field(struct http_half_data *flow_data, const char *field_name, size_t field_name_len) { - return 1; + struct http_header_field *p; + if (NULL == flow_data->ut_filed_array) // some response no header fileds, such as "HTTP/1.1 100 Continue\r\n\r\n" + { + return NULL; + } + for (p = (struct http_header_field *)utarray_front(flow_data->ut_filed_array); + p != NULL; + p = (struct http_header_field *)utarray_next(flow_data->ut_filed_array, p)) + { + if ((field_name_len == p->field_name_len) && + (strncasecmp(p->field_name, field_name, field_name_len) == 0)) + { + return p; + } + } + return NULL; } - return 0; -} - -static int on_method(llhttp_t *http, const char *at, size_t length) -{ - printf_debug_info("on_method", at, length); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - - http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_METHOD, at, length); - return 0; -} - -/* Information-only callbacks, return value is ignored */ -static int on_method_complete(llhttp_t *http) -{ - printf_debug_info("on_method_complete", NULL, 0); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - if (is_line_crlf(half) == 0) + static void http_using_session_addr_as_host(struct session *ref_session, struct http_header_field *host_result) { - http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_METHOD); - } - - http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_METHOD); - - return 0; -} - -/* Possible return values 0, -1, HPE_USER */ -static int on_uri(llhttp_t *http, const char *at, size_t length) -{ - printf_debug_info("on_uri", at, length); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - - http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_URI, at, length); - return 0; -} - -static void http_decoder_cached_portion_url(struct http_decoder_half *half, const hstring *uri_result) -{ - struct http_decoder_half_data *ref_data = half->ref_data; - int uri_skip_len = 0; + memset(host_result, 0, sizeof(struct http_header_field)); + struct http_session_addr ssaddr = {}; + httpd_session_get_addr(ref_session, &ssaddr); + char ip_string_buf[INET6_ADDRSTRLEN]; - if ((uri_result->iov_len) > 7 && (strncasecmp("http://", (char *)uri_result->iov_base, 7) == 0)) // absolute URI - { - uri_skip_len = strlen("http://"); - ref_data->joint_url_complete = 1; - } - else - { - ref_data->joint_url_complete = 0; + if (4 == ssaddr.ipver) + { + host_result->field_value = (char *)calloc(1, (INET6_ADDRSTRLEN + 7) /* "ip:port" max length */); + inet_ntop(AF_INET, &ssaddr.daddr4, ip_string_buf, INET6_ADDRSTRLEN); + snprintf((char *)host_result->field_value, INET6_ADDRSTRLEN + 7, "%s:%u", ip_string_buf, ntohs(ssaddr.dport)); + host_result->field_value_len = strlen(host_result->field_value); + } + else if (6 == ssaddr.ipver) + { + host_result->field_value = (char *)calloc(1, (INET6_ADDRSTRLEN + 7) /* "ip:port" max length */); + inet_ntop(AF_INET6, &ssaddr.daddr6, ip_string_buf, INET6_ADDRSTRLEN); + snprintf((char *)host_result->field_value, INET6_ADDRSTRLEN + 7, "%s:%u", ip_string_buf, ntohs(ssaddr.dport)); + host_result->field_value_len = strlen(host_result->field_value); + } + else + { + host_result->field_value = (char *)calloc(1, 1); + host_result->field_value_len = 1; + } + return; } - ref_data->joint_url.iov_len = uri_result->iov_len - uri_skip_len; - ref_data->joint_url.iov_base = MEMPOOL_CALLOC(half->http_ev_ctx->ref_mempool, char, ref_data->joint_url.iov_len); - memcpy(ref_data->joint_url.iov_base, (char *)uri_result->iov_base + uri_skip_len, ref_data->joint_url.iov_len); -} - -/* Information-only callbacks, return value is ignored */ -static int on_uri_complete(llhttp_t *http) -{ - printf_debug_info("on_uri_complete", NULL, 0); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - - if (is_line_crlf(half) == 0) + static void http_flow_join_url(struct http_half_data *flow_data, const struct http_header_field *host_filed) { - http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_URI); - } - - http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_URI); - - hstring uri_result = {}; - http_decoder_table_get_uri(half->ref_data->table, (char **)&uri_result.iov_base, &uri_result.iov_len); - assert(uri_result.iov_base); - http_decoder_cached_portion_url(half, &uri_result); - - return 0; -} + int append_slash_len = 0; + const char *join_uri = flow_data->req_line.uri; + size_t join_url_len = flow_data->req_line.uri_len; -/* Possible return values 0, -1, HPE_USER */ -static int on_version(llhttp_t *http, const char *at, size_t length) -{ - printf_debug_info("on_version", at, length); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - - http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_VERSION, at, length); - return 0; -} + // skip schema "http://" + if (join_url_len > 7 && strncasecmp(join_uri, "http://", 7) == 0) + { + join_uri += 7; + join_url_len -= 7; + } -/* Information-only callbacks, return value is ignored */ -static int on_version_complete(llhttp_t *http) -{ - printf_debug_info("on_version_complete", NULL, 0); + // if uri not absolute path(start with '/'), append '/' + if ('/' != join_uri[0]) + { + append_slash_len = 1; + } + int url_cache_str_len = host_filed->field_value_len + join_url_len + append_slash_len; + char *url_cache_str = (char *)calloc(1, url_cache_str_len); - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); + char *ptr = url_cache_str; + memcpy(ptr, host_filed->field_value, host_filed->field_value_len); + ptr += host_filed->field_value_len; + if (append_slash_len) + { + *ptr = '/'; + ptr++; + } + memcpy(ptr, join_uri, join_url_len); - if (is_line_crlf(half) == 0) - { - http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_VERSION); + flow_data->joint_url.iov_base = url_cache_str; + flow_data->joint_url.iov_len = url_cache_str_len; } - http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_VERSION); - - half->ref_data->major_version = llhttp_get_http_major(&half->parser); - half->ref_data->minor_version = llhttp_get_http_minor(&half->parser); - - if (half->parser.type == HTTP_REQUEST) + // todo: optimize use hash table + static void http_half_get_frequently_used_fileds(struct http_half_data *flow_data) { - half->event = HTTP_EVENT_REQ_LINE; - if (half->http_ev_cb) // http_event_handler() + if (FLOW_TYPE_C2S == flow_data->flow_dir) + { + flow_data->header.host = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_HOST, strlen(HTTP_HEADER_FIELD_NAME_HOST)); + flow_data->header.user_agent = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_USER_AGENT, strlen(HTTP_HEADER_FIELD_NAME_USER_AGENT)); + flow_data->header.referer = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_REFERER, strlen(HTTP_HEADER_FIELD_NAME_REFERER)); + flow_data->header.cookie = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_COOKIE, strlen(HTTP_HEADER_FIELD_NAME_COOKIE)); + } + else { - half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); + flow_data->header.set_cookie = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_SET_COOKIE, strlen(HTTP_HEADER_FIELD_NAME_SET_COOKIE)); } + flow_data->header.content_type = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_CONTENT_TYPE, strlen(HTTP_HEADER_FIELD_NAME_CONTENT_TYPE)); } - return 0; -} - -/* Possible return values 0, -1, HPE_USER */ -static int on_status(llhttp_t *http, const char *at, size_t length) -{ - printf_debug_info("on_status", at, length); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - - http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_STATUS, at, length); - return 0; -} - -/* Information-only callbacks, return value is ignored */ -static int on_status_complete(llhttp_t *http) -{ - printf_debug_info("on_status_complete", NULL, 0); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - - if (is_line_crlf(half) == 0) + static int http_half_parse_headers_finally(struct http_half *half) { - http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_STATUS); - } + struct http_half_data *flow_data = half->flow_data; + http_half_get_frequently_used_fileds(flow_data); - http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_STATUS); - half->ref_data->status_code = llhttp_get_status_code(&half->parser); + /* update transaction_seq when headers completed */ + half->flow_data->transaction_seq = half->transaction_num++; - if (half->parser.type == HTTP_RESPONSE) - { - half->event = HTTP_EVENT_RES_LINE; - if (half->http_ev_cb != NULL) // http_event_handler() + if (FLOW_TYPE_C2S == flow_data->flow_dir) { - half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); + struct http_header_field tmp_host_field = {}; + const struct http_header_field *host_filed = flow_data->header.host; + if (NULL == host_filed) + { + http_using_session_addr_as_host(half->sess_ref, &tmp_host_field); + host_filed = &tmp_host_field; + } + http_flow_join_url(flow_data, host_filed); + FREE(tmp_host_field.field_name); + FREE(tmp_host_field.field_value); } - } - return 0; -} - -/* Possible return values 0, -1, HPE_USER */ -static int on_header_field(llhttp_t *http, const char *at, size_t length) -{ - printf_debug_info("on_header_field", at, length); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - - http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_HDRKEY, at, length); - return 0; -} - -/* Information-only callbacks, return value is ignored */ -static int on_header_field_complete(llhttp_t *http) -{ - printf_debug_info("on_header_field_complete", NULL, 0); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - - http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_HDRKEY); - - return 0; -} - -/* Possible return values 0, -1, HPE_USER */ -static int on_header_value(llhttp_t *http, const char *at, size_t length) -{ - printf_debug_info("on_header_value", at, length); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - - http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_HDRVAL, at, length); - return 0; -} - -#define MAX_ENCODING_STR_LEN 8 -/* Information-only callbacks, return value is ignored */ -static int on_header_value_complete(llhttp_t *http) -{ - printf_debug_info("on_header_value_complete", NULL, 0); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - - if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_HDRKEY) == - STRING_STATE_CACHE) - { - http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_HDRKEY); - } - - http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_HDRVAL); + struct http_header_field *encoding_field = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_CONTENT_ENCODING, strlen(HTTP_HEADER_FIELD_NAME_CONTENT_ENCODING)); + if (NULL == encoding_field) + { + flow_data->content_encoding_type = HTTP_CONTENT_ENCODING_NONE; + } + else + { + flow_data->content_encoding_type = http_content_encoding_str2int(encoding_field->field_value, encoding_field->field_value_len); + } - if (half->ref_data->content_encoding == HTTP_CONTENT_ENCODING_NONE) - { - struct http_header_field http_hdr = {}; + struct http_header_field *content_len_field = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_CONTENT_LENGTH, strlen(HTTP_HEADER_FIELD_NAME_CONTENT_LENGTH)); + if (NULL == content_len_field) + { + flow_data->header.content_length = -1; + } + else + { + flow_data->header.content_length = http_strtoll(content_len_field->field_value, content_len_field->field_value_len); + } - if (http_decoder_table_get_header(half->ref_data->table, (char *)"Content-Encoding", 16, &http_hdr) == 0) + struct http_header_field *transf_encoding_field = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_TRANSFER_ENCODING, strlen(HTTP_HEADER_FIELD_NAME_TRANSFER_ENCODING)); + if (NULL == transf_encoding_field) { - half->ref_data->content_encoding = http_content_encoding_str2int(http_hdr.value, http_hdr.value_len); + flow_data->transfer_encoding_is_chunked = -1; } - } + else + { + if (http_strncasecmp_safe(transf_encoding_field->field_value, "chunked", transf_encoding_field->field_value_len, strlen("chunked")) == 0) + { + flow_data->transfer_encoding_is_chunked = 1; + } + else + { + flow_data->transfer_encoding_is_chunked = 0; + } + } + flow_data->header.header_buf = half->half_stage.first_header_name_ptr; + flow_data->header.header_buf_sz = half->half_stage.last_headers_complete_ptr - half->half_stage.first_header_name_ptr; - if (http->type == HTTP_REQUEST) - { - http_decoder_get_host_feed_url(half); + // todo, parse other frequently used fields + return 0; } - return 0; -} - -/* When on_chunk_header is called, the current chunk length is stored - * in parser->content_length. - * Possible return values 0, -1, `HPE_PAUSED` - */ -static int on_chunk_header(llhttp_t *http __attribute__((unused))) -{ - printf_debug_info("on_chunk_header", NULL, 0); - return 0; -} - -/* When on_chunk_header is called, the current chunk length is stored - * in parser->content_length. - * Possible return values 0, -1, `HPE_PAUSED` - */ -static int on_chunk_header_complete(llhttp_t *http __attribute__((unused))) -{ - printf_debug_info("on_chunk_header_complete", NULL, 0); - - return 0; -} - -/* Possible return values: - * 0 - Proceed normally - * 1 - Assume that request/response has no body, and proceed to parsing the next message - * 2 - Assume absence of body (as above) and make `llhttp_execute()` return `HPE_PAUSED_UPGRADE` - * -1 - Error `HPE_PAUSED` - */ -static int on_headers_complete(llhttp_t *http) -{ - printf_debug_info("on_headers_complete", NULL, 0); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - assert(half->ref_data); - - http_decoder_table_set_header_complete(half->ref_data->table); - - if (half->parser.type == HTTP_REQUEST) - { - half->event = HTTP_EVENT_REQ_HDR_END; - } - else + int http_get_header_field_count(struct http_half_data *flow_data) { - half->event = HTTP_EVENT_RES_HDR_END; + if (NULL == flow_data->ut_filed_array) + { + return 0; // some response no header fileds, such as "HTTP/1.1 100 Continue\r\n\r\n" + } + return utarray_len(flow_data->ut_filed_array); } - half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler() - - return 0; -} -/* Possible return values 0, -1, HPE_USER */ -static int on_body(llhttp_t *http, const char *at, size_t length) -{ - printf_debug_info("on_body", at, length); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - - // trigger body_begin event - if (half->parser.type == HTTP_REQUEST) + void http_flow_body_decompress(struct http_half_data *flow_data, const char *zipdata, size_t zipdatalen) { - if (half->event == HTTP_EVENT_REQ_HDR_END) + assert(flow_data); + if (flow_data->content_encoding_type == HTTP_CONTENT_ENCODING_NONE) { - half->event = HTTP_EVENT_REQ_BODY_BEGIN; - half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler() + return; } - } - else - { - if (half->event == HTTP_EVENT_RES_HDR_END) + if (zipdata == NULL || zipdatalen == 0) { - half->event = HTTP_EVENT_RES_BODY_BEGIN; - half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); + return; } - } - - if (half->ref_data != NULL) - { - if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_BODY) == - STRING_STATE_COMMIT) + if (NULL == flow_data->decompress) { - http_decoder_table_reset(half->ref_data->table, HTTP_ITEM_BODY); + flow_data->decompress = http_content_decompress_create(flow_data->content_encoding_type); } - http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_BODY, at, length); - http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_BODY); - } + char *local_outdata = NULL; + size_t local_outdata_len = 0; + if (http_content_decompress_write(flow_data->decompress, (char *)zipdata, + zipdatalen, &local_outdata, &local_outdata_len) == -1) + { + http_content_decompress_destroy(flow_data->decompress); + flow_data->decompress = NULL; + flow_data->decompress_body.body = NULL; + flow_data->decompress_body.body_sz = 0; + return; + } - if (1 == half->decompress_switch && half->ref_data->content_encoding != HTTP_CONTENT_ENCODING_NONE) - { - http_decoder_half_data_decompress(half->ref_data); + if (local_outdata != NULL && local_outdata_len > 0) + { + flow_data->decompress_body.body = local_outdata; + flow_data->decompress_body.body_sz = local_outdata_len; + http_content_decompress_ownership_borrow(flow_data->decompress); + } } - if (half->parser.type == HTTP_REQUEST) - { - half->event = HTTP_EVENT_REQ_BODY_DATA; - half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler() - } - else + static void http_add_header_buf_reference(struct http_half *half) { - half->event = HTTP_EVENT_RES_BODY_DATA; - half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); - } - - return 0; -} - -static void http_decoder_half_init(struct http_decoder_half *half, http_event_cb *http_ev_cb, enum llhttp_type type) -{ - llhttp_settings_init(&half->settings); - llhttp_init(&half->parser, type, &half->settings); - - // half->is_request_flow = (type == HTTP_REQUEST) ? 1 : 0; - half->settings.on_message_begin = on_message_begin; - half->settings.on_message_complete = on_message_complete; - half->settings.on_reset = on_reset; - - half->settings.on_url = on_uri; - half->settings.on_url_complete = on_uri_complete; - - half->settings.on_status = on_status; - half->settings.on_status_complete = on_status_complete; - - half->settings.on_method = on_method; - half->settings.on_method_complete = on_method_complete; - - half->settings.on_version = on_version; - half->settings.on_version_complete = on_version_complete; - - half->settings.on_header_field = on_header_field; - half->settings.on_header_field_complete = on_header_field_complete; - - half->settings.on_header_value = on_header_value; - half->settings.on_header_value_complete = on_header_value_complete; - - half->settings.on_chunk_header = on_chunk_header; - half->settings.on_chunk_complete = on_chunk_header_complete; - - half->settings.on_headers_complete = on_headers_complete; - half->settings.on_body = on_body; - - half->error = HPE_OK; - half->http_ev_cb = http_ev_cb; // http_event_handler() - half->ref_data = NULL; -} - -struct http_decoder_half *http_decoder_half_new(struct http_decoder_exdata *hd_ctx, nmx_pool_t *mempool, - http_event_cb *ev_cb, enum llhttp_type http_type, - int decompress_switch, struct http_decoder_env *httpd_env, long long start_seq) -{ - struct http_decoder_half *half = MEMPOOL_CALLOC(mempool, struct http_decoder_half, 1); - assert(half); - - half->decompress_switch = decompress_switch; - half->http_ev_ctx = MEMPOOL_CALLOC(mempool, struct http_event_context, 1); - http_decoder_half_init(half, ev_cb, http_type); - half->http_ev_ctx->ref_httpd_ctx = hd_ctx; - half->httpd_env = httpd_env; - half->transaction_seq = start_seq; - return half; -} - -void http_decoder_half_free(nmx_pool_t *mempool, struct http_decoder_half *half) -{ - if (NULL == half) - { - return; + if (half->cached_header_buffer != NULL) + { + // add reference but not move ownership, because the buffer will be referenced of multiple transaction in pipeline mode + half->cached_header_buffer->reference++; + half->flow_data->cached_header_buffer = half->cached_header_buffer; + } } - if (half->http_ev_ctx != NULL) + static void http_update_body_offset(struct http_half_data *flow_data) { - MEMPOOL_FREE(mempool, half->http_ev_ctx); - half->http_ev_ctx = NULL; + if (flow_data->decompress_body.body != NULL || flow_data->decompress_body.offset > 0) + { + flow_data->decompress_body.offset += flow_data->decompress_body.body_sz; + } + else + { + flow_data->raw_body.offset += flow_data->raw_body.body_sz; + } } - MEMPOOL_FREE(mempool, half); -} - -void http_decoder_half_reinit(struct http_decoder_half *half, - struct http_decoder_result_queue *queue, - nmx_pool_t *mempool, struct session *sess) -{ - assert(half != NULL); - if (half->ref_data != NULL) + static void http_set_body_finish(struct http_half_data *flow_data) { - http_decoder_table_reinit(half->ref_data->table); + if (flow_data->decompress_body.offset > 0) + { + flow_data->decompress_body.body = NULL; + flow_data->decompress_body.body_sz = 0; + flow_data->decompress_body.is_finished = 1; + } + else + { + flow_data->raw_body.body = NULL; + flow_data->raw_body.body_sz = 0; + flow_data->raw_body.is_finished = 1; + } } - half->http_ev_ctx->ref_mempool = mempool; - half->http_ev_ctx->ref_session = sess; - half->http_ev_ctx->ref_queue = queue; -} -static void publish_message_for_parsed_header(struct http_decoder_half *half) -{ - if (0 == http_decoder_table_has_parsed_header(half->ref_data->table)) - { - return; - } - if (half->parser.type == HTTP_REQUEST) + void http_event_handler(struct http_half *half, enum http_event event, struct http *http_env) { - half->event = HTTP_EVENT_REQ_HDR; - } - else - { - half->event = HTTP_EVENT_RES_HDR; - } - half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler(); - return; -} + struct http_message *msg = NULL; + int thread_id = module_manager_get_thread_id(http_env->mod_mgr_ref); + uint8_t flow_flag = 0; + struct mq_runtime *mq_rt = module_manager_get_mq_runtime(http_env->mod_mgr_ref); -int http_decoder_half_parse(int proxy_enable, struct http_decoder_half *half, const char *data, size_t data_len) -{ - assert(half && data); - - half->data = (const char *)data; - half->data_len = data_len; - half->error = llhttp_execute(&half->parser, data, data_len); - - int ret = 0; - enum llhttp_type type = HTTP_BOTH; + switch (event) + { + case HTTP_EVENT_REQ_INIT: + http_half_stage_reset(half); + if (NULL == half->flow_data) /* call llhttp_reset when headers is not completed */ + { + half->flow_data = http_half_data_new(FLOW_TYPE_C2S); + } + break; - switch (half->error) - { - case HPE_OK: - break; - case HPE_PAUSED: - llhttp_resume(&half->parser); - break; - case HPE_PAUSED_UPGRADE: - if (proxy_enable) + case HTTP_EVENT_REQ_HDR_END: { - llhttp_resume_after_upgrade(&half->parser); + http_half_parse_headers_finally(half); + int tot_c2s_headers = http_get_header_field_count(half->flow_data); + http_stat_update(&http_env->stat, thread_id, HTTP_TRANSACTION_NEW, 1); + http_stat_update(&http_env->stat, thread_id, HTTP_C2S_HEADERS, tot_c2s_headers); + http_stat_update(&http_env->stat, thread_id, HTTP_URL_BYTES, half->flow_data->joint_url.iov_len); + http_add_header_buf_reference(half); + msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_REQ_HEADER, half->flow_data); + mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_REQ_HEADER].topic_id, msg); } - ret = 0; break; - default: - type = (enum llhttp_type)half->parser.type; - llhttp_init(&half->parser, type, &half->settings); - ret = -1; - break; - } - - if (ret < 0) - { - // fprintf(stdout, - // "llhttp_execute parse error: %s err_reason:%s\n", - // llhttp_errno_name(half->error), half->parser.reason); - return half->error; - } - - if (half->ref_data != NULL) - { - if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_URI) == STRING_STATE_REFER) + case HTTP_EVENT_REQ_BODY_BEGIN: + break; + case HTTP_EVENT_REQ_BODY_DATA: { - http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_URI); + http_update_body_offset(half->flow_data); + msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_REQ_BODY, half->flow_data); + mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_REQ_BODY].topic_id, msg); } - - if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_STATUS) == STRING_STATE_REFER) + break; + case HTTP_EVENT_REQ_BODY_END: { - http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_STATUS); + http_set_body_finish(half->flow_data); + msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_REQ_BODY, half->flow_data); + mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_REQ_BODY].topic_id, msg); } - - if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_METHOD) == STRING_STATE_REFER) + break; + case HTTP_EVENT_REQ_END: { - http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_METHOD); + if ((0 == session_is_symmetric(half->sess_ref, &flow_flag) && (SESSION_SEEN_C2S_FLOW == flow_flag))) + { + http_stat_update(&http_env->stat, thread_id, HTTP_TRANSACTION_FREE, 1); + http_stat_update(&http_env->stat, thread_id, HTTP_C2S_ASYMMETRY_TRANSACTION, 1); + } + msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_REQ_BODY, half->flow_data); + mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_REQ_BODY].topic_id, msg); + half->flow_data = NULL; // ownership move to message, free it in message free callback } + break; - if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_VERSION) == STRING_STATE_REFER) + case HTTP_EVENT_RES_INIT: { - http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_VERSION); + http_half_stage_reset(half); + if (NULL == half->flow_data) /* call llhttp_reset when headers is not completed */ + { + half->flow_data = http_half_data_new(FLOW_TYPE_S2C); + } } + break; - if (http_decoder_table_header_complete(half->ref_data->table)) + case HTTP_EVENT_RES_HDR_END: { - http_decoder_table_reset_header_complete(half->ref_data->table); + http_half_parse_headers_finally(half); + int tot_s2c_headers = http_get_header_field_count(half->flow_data); + http_stat_update(&http_env->stat, thread_id, HTTP_S2C_HEADERS, tot_s2c_headers); + if ((0 == session_is_symmetric(half->sess_ref, &flow_flag) && (SESSION_SEEN_S2C_FLOW == flow_flag))) + { + http_stat_update(&http_env->stat, thread_id, HTTP_TRANSACTION_NEW, 1); + } + http_add_header_buf_reference(half); + msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_RES_HEADER, half->flow_data); + mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_RES_HEADER].topic_id, msg); } - else + break; + case HTTP_EVENT_RES_BODY_BEGIN: + break; + case HTTP_EVENT_RES_BODY_DATA: { - // if headers are not completed with EOF \r\n\r\n, push the parsed headers so far - publish_message_for_parsed_header(half); + http_update_body_offset(half->flow_data); + msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_RES_BODY, half->flow_data); + mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_RES_BODY].topic_id, msg); } - - enum string_state hdr_key_state = - http_decoder_table_state(half->ref_data->table, HTTP_ITEM_HDRKEY); - enum string_state hdr_val_state = - http_decoder_table_state(half->ref_data->table, HTTP_ITEM_HDRVAL); - - /* Truncated in http header key - For example http header k-v => User-Agent: Chrome - case1: - packet1: User- hdr_key_state == STRING_STATE_REFER - packet2: Agent: Chrome - - case2: - packet1: User-Agent: hdr_key_state == STRING_STATE_COMMIT - hdr_val_state == STRING_STATE_INIT - packet2: Chrome - */ - if (hdr_key_state == STRING_STATE_REFER || - (hdr_key_state == STRING_STATE_COMMIT && hdr_val_state == STRING_STATE_INIT)) + break; + case HTTP_EVENT_RES_BODY_END: { - http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_HDRKEY); + http_set_body_finish(half->flow_data); + msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_RES_BODY, half->flow_data); + mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_RES_BODY].topic_id, msg); } - - /* Truncated in http header value - For example http header k-v => User-Agent: Chrome - packet1: User-Agent: Ch hdr_key_state == STRING_STATE_COMMIT - hdr_val_state == STRING_STATE_REFER - - packet2: rome - */ - if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_HDRVAL) == STRING_STATE_REFER) + break; + case HTTP_EVENT_RES_END: { - /* Header key should have been committed - If it's not cached, cache it for next packet to use - */ - http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_HDRKEY); - http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_HDRVAL); + if ((0 == session_is_symmetric(half->sess_ref, &flow_flag) && (SESSION_SEEN_S2C_FLOW == flow_flag))) + { + http_stat_update(&http_env->stat, thread_id, HTTP_S2C_ASYMMETRY_TRANSACTION, 1); + } + http_stat_update(&http_env->stat, thread_id, HTTP_TRANSACTION_FREE, 1); + msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_RES_BODY, half->flow_data); + mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_RES_BODY].topic_id, msg); + half->flow_data = NULL; // ownership move to message, free it in message free callback } - - if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_BODY) == STRING_STATE_REFER) - { - http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_BODY); + break; + default: + assert(0); + break; } } - return 0; -} - -long long http_decoder_half_trans_count(struct http_decoder_half *half) -{ - if (NULL == half) - { - return 0; - } - - long long trans_cnt = half->trans_counter; - half->trans_counter = 0; - - return trans_cnt; -} - -struct http_decoder_half_data * -http_decoder_half_data_new(nmx_pool_t *mempool) -{ - struct http_decoder_half_data *data = - MEMPOOL_CALLOC(mempool, struct http_decoder_half_data, 1); - assert(data); - - data->table = http_decoder_table_new(mempool); - assert(data->table); - - data->major_version = -1; - data->minor_version = -1; - data->status_code = -1; - - data->content_encoding = HTTP_CONTENT_ENCODING_NONE; - // data->ref_decompress_body = NULL; - // data->decompress_body_len = 0; - data->decompress_buffer_list = NULL; - return data; -} - -static void http_decoder_half_decompress_buf_free(struct http_decoder_half_data *ref_data) -{ - if (ref_data == NULL) + static void http_half_stage_buf_free(struct http_half *half) { - return; - } - struct http_decompress_buffer *el, *tmp; - DL_FOREACH_SAFE(ref_data->decompress_buffer_list, el, tmp) - { - DL_DELETE(ref_data->decompress_buffer_list, el); - if (el->iov.iov_base != NULL) + if (half->cached_header_buffer != NULL) { - FREE(el->iov.iov_base); + http_buffer_free(half->cached_header_buffer); } - FREE(el); - } - ref_data->decompress_buffer_list = NULL; -} - -void http_decoder_half_data_free(nmx_pool_t *mempool, struct http_decoder_half_data *data) -{ - if (NULL == data) - { - return; - } - - if (data->table != NULL) - { - http_decoder_table_free(data->table); - data->table = NULL; + half->cached_header_buffer = NULL; } - if (data->decompress != NULL) + static void http_half_stage_buf_reuse(struct http_half *half) { - http_content_decompress_destroy(data->decompress); - data->decompress = NULL; + http_half_stage_buf_free(half); + half->cached_header_buffer = http_buffer_new(); } - if (data->joint_url.iov_base) - { - MEMPOOL_FREE(mempool, data->joint_url.iov_base); - data->joint_url.iov_base = NULL; - data->joint_url_complete = 0; - } - http_decoder_half_decompress_buf_free(data); - MEMPOOL_FREE(mempool, data); -} - -int http_decoder_half_data_get_request_line(struct http_decoder_half_data *data, - struct http_request_line *line) -{ - http_decoder_table_get_method(data->table, &line->method, &line->method_len); - http_decoder_table_get_uri(data->table, &line->uri, &line->uri_len); - http_decoder_table_get_version(data->table, &line->version, &line->version_len); - - line->major_version = data->major_version; - line->minor_version = data->minor_version; - - return 0; -} - -int http_decoder_half_data_get_response_line(struct http_decoder_half_data *data, - struct http_response_line *line) -{ - http_decoder_table_get_version(data->table, &line->version, &line->version_len); - http_decoder_table_get_status(data->table, &line->status, &line->status_len); - - line->major_version = data->major_version; - line->minor_version = data->minor_version; - line->status_code = data->status_code; - - return 0; -} - -int http_decoder_half_data_get_header(const struct http_decoder_half_data *data, - const char *name, size_t name_len, - struct http_header_field *hdr_result) -{ - return http_decoder_table_get_header(data->table, name, name_len, hdr_result); -} - -int http_decoder_half_data_iter_header(struct http_decoder_half_data *data, - struct http_header_field *header) -{ - return http_decoder_table_iter_header((struct http_decoder_table *)data->table, header); -} - -int http_decoder_half_data_reset_header_iter(struct http_decoder_half_data *req_data) -{ - if (NULL == req_data) - { - return -1; - } - return http_decoder_table_reset_header_iter(req_data->table); -} - -int http_decoder_half_data_has_parsed_header(struct http_decoder_half_data *data) -{ - if (NULL == data) + static int http_half_stage_buf_append(struct http_half *half, const char *newdata, size_t newdata_len) { + if (half->cached_header_buffer == NULL) + { + half->cached_header_buffer = http_buffer_new(); + } + if (half->cached_header_buffer->buffer_size > HTTP_HEADERS_CACHE_MAX_SIZE) + { + return -1; + } + http_buffer_add(half->cached_header_buffer, newdata, newdata_len); return 0; } - return http_decoder_table_has_parsed_header(data->table); -} - -int http_decoder_half_data_get_raw_body(const struct http_decoder_half_data *data, const char **body, size_t *body_len) -{ - if (NULL == data || NULL == body) - { - return -1; - } - return http_decoder_table_get_body(data->table, (char **)body, body_len); -} -#if 0 -int http_decoder_half_data_get_decompress_body(const struct http_decoder_half_data *data, hstring *body) -{ - if (HTTP_CONTENT_ENCODING_NONE == data->content_encoding) - { - return http_decoder_table_get_body(data->table, body); - } - - body->iov_base = data->ref_decompress_body; - body->iov_len = data->decompress_body_len; - return 0; -} -#endif - -void http_decoder_half_data_dump(struct http_decoder_half *half) -{ - if (NULL == half || NULL == half->ref_data) - { - return; - } - - http_decoder_table_dump(half->ref_data->table); -} - -static void using_session_addr_as_host(struct session *ref_session, struct http_header_field *host_result, nmx_pool_t *mempool) -{ -#if 1 // in native steallar, can't get the tuple4 from the session yet!!! - struct httpd_session_addr ssaddr = {}; - httpd_session_get_addr(ref_session, &ssaddr); - if (ssaddr.ipver != 4 && ssaddr.ipver != 6) - { - host_result->value = MEMPOOL_CALLOC(mempool, char, 1); - sprintf((char *)host_result->value, "%s", ""); - host_result->value_len = strlen((char *)host_result->value); - return; - } - - char ip_string_buf[INET6_ADDRSTRLEN]; - if (4 == ssaddr.ipver) - { - host_result->value = MEMPOOL_CALLOC(mempool, char, (INET_ADDRSTRLEN + 7) /* "ip:port" max length */); - inet_ntop(AF_INET, &ssaddr.daddr4, ip_string_buf, INET_ADDRSTRLEN); - sprintf((char *)host_result->value, "%s:%u", ip_string_buf, ntohs(ssaddr.dport)); - host_result->value_len = strlen((char *)host_result->value); - } - else if (6 == ssaddr.ipver) - { - host_result->value = MEMPOOL_CALLOC(mempool, char, (INET6_ADDRSTRLEN + 7) /* "ip:port" max length */); - inet_ntop(AF_INET6, &ssaddr.daddr6, ip_string_buf, INET6_ADDRSTRLEN); - sprintf((char *)host_result->value, "%s:%u", ip_string_buf, ntohs(ssaddr.dport)); - host_result->value_len = strlen((char *)host_result->value); - } - else - { - assert(0); - } -#else - host_result->val.iov_base = MEMPOOL_CALLOC(mempool, char, 32); - sprintf((char *)host_result->val.iov_base, "%s", "todo:get_tuple4"); - host_result->val.iov_len = strlen((char *)host_result->val.iov_base); -#endif -} -void http_decoder_join_url(struct http_decoder_half_data *hfdata, nmx_pool_t *mempool, const struct http_header_field *host_hdr) -{ - int append_slash_len = 0; - if ('/' != ((char *)hfdata->joint_url.iov_base)[0]) + static int http_half_cache_uncompleted_header(struct http_half *half, int mem_merged, const char *data, size_t data_len) { - append_slash_len = 1; - } - int url_cache_str_len = host_hdr->value_len + hfdata->joint_url.iov_len + append_slash_len; - char *url_cache_str = MEMPOOL_CALLOC(mempool, char, url_cache_str_len); + int ret = 0; + const char *cached_begin_ptr = NULL; + long long cached_len = 0; + if (half->half_stage.last_headers_complete_ptr == NULL) + { + if (mem_merged) + { + ret = 0; // no pipeline and memory have been merged, do nothing + } + else + { + ret = http_half_stage_buf_append(half, data, (size_t)data_len); + } + } + else + { + /* + has pipeline, cache begin ptr is the last completed header_field_value + "\r\n\r\n", + for example: + GET xxx \r\nheader_name:header:value\r\nheader_name:header:value\r\n\r\nGET yyy ... + ^ + | + cache begin ptr + */ + cached_begin_ptr = half->half_stage.last_headers_complete_ptr; + cached_len = (long long)data_len - (long long)(cached_begin_ptr - data); + assert(cached_len > 0); - char *ptr = url_cache_str; - memcpy(ptr, host_hdr->value, host_hdr->value_len); - ptr += host_hdr->value_len; - if (append_slash_len) - { - *ptr = '/'; - ptr++; + if (mem_merged) + { + /* some message have beed pushed, and pointer to half->half_stage->half_buffer, + the half->half_stage->half_buffer is maintain by messages , + so, we can reset the half->half_stage->half_buffer, and append the new data to it. + */ + http_half_stage_buf_reuse(half); + ret = http_half_stage_buf_append(half, cached_begin_ptr, (size_t)cached_len); + } + else + { + ret = http_half_stage_buf_append(half, cached_begin_ptr, (size_t)cached_len); + } + } + return ret; } - memcpy(ptr, hfdata->joint_url.iov_base, hfdata->joint_url.iov_len); - MEMPOOL_FREE(mempool, hfdata->joint_url.iov_base); // free the cached uri buffer - hfdata->joint_url.iov_base = url_cache_str; - hfdata->joint_url.iov_len = url_cache_str_len; - hfdata->joint_url_complete = 1; -} - -void http_decoder_get_url(struct http_decoder_half_data *hfdata, nmx_pool_t *mempool) -{ - struct http_request_line reqline = {}; - http_decoder_half_data_get_request_line(hfdata, &reqline); - if (unlikely(http_strncasecmp_safe("CONNECT", (char *)reqline.method, 7, reqline.method_len) == 0)) + int http_flow_parse(struct http_half *half, const char *data, size_t data_len) { - hfdata->joint_url.iov_base = MEMPOOL_CALLOC(mempool, char, reqline.uri_len + 1); - memcpy(hfdata->joint_url.iov_base, reqline.uri, reqline.uri_len); - hfdata->joint_url.iov_len = reqline.uri_len; - hfdata->joint_url_complete = 1; + assert(half && data); + llhttp_errno_t ret = HPE_OK /* HPE_OK = 0 */; + ret = llhttp_execute(&half->parser.llhttp_parser, data, data_len); + switch (ret) + { + case HPE_OK: + break; + case HPE_PAUSED: + llhttp_resume(&half->parser.llhttp_parser); + break; + case HPE_PAUSED_UPGRADE: + llhttp_resume_after_upgrade(&half->parser.llhttp_parser); + break; + default: + break; + } + return ret; } -} -int http_decoder_join_url_finally(struct http_event_context *ev_ctx, struct http_decoder_half_data *hfdata, nmx_pool_t *mempool) -{ - if (hfdata->joint_url_complete) + struct http_half *http_flow_get_nx(struct http_exdata *exdata, enum flow_type flow_dir, struct session *sess, struct http *http_env) { - return 0; - } - struct http_header_field addr_as_host = {}; - using_session_addr_as_host(ev_ctx->ref_session, &addr_as_host, mempool); - http_decoder_join_url(hfdata, mempool, &addr_as_host); - MEMPOOL_FREE(mempool, addr_as_host.value); // free session addr to host buffer - return 1; -} + struct http_half **flow = NULL; -void http_decoder_get_host_feed_url(struct http_decoder_half *half) -{ - if (half->ref_data->joint_url_complete) - { - return; - } - struct http_header_field host_result = {}; - int host_header_cnt = http_decoder_half_data_get_header(half->ref_data, (char *)"Host", 4, &host_result); - if (host_header_cnt < 0) - { - return; + if (FLOW_TYPE_C2S == flow_dir) + { + flow = &exdata->decoder->flow_c2s; + } + else if (FLOW_TYPE_S2C == flow_dir) + { + flow = &exdata->decoder->flow_s2c; + } + else + { + assert(0); + return NULL; + } + if (NULL == *flow) + { + *flow = http_half_new(flow_dir); + (*flow)->http_env_ref = http_env; + (*flow)->sess_ref = sess; + } + return *flow; } - http_decoder_join_url(half->ref_data, half->http_ev_ctx->ref_mempool, &host_result); -} -int http_half_data_get_url(struct http_decoder_half_data *res_data, const char **url_val, size_t *url_len) -{ - if (0 == res_data->joint_url_complete) - { - return -1; - } - *url_val = res_data->joint_url.iov_base; - *url_len = res_data->joint_url.iov_len; - return 0; -} -#if 0 -int http_half_data_get_decode_url(struct http_decoder_half_data *res_data, hstring *url) -{ - if (0 == res_data->joint_url_complete) + int http_half_flow_process(struct http_half *half, const char *newdata, size_t newdata_len) { - return -1; - } - url->iov_base = res_data->decoded_url.iov_base; - url->iov_len = res_data->decoded_url.iov_len; - return 0; -} -#endif - -int http_half_data_get_transaction_seq(struct http_decoder_half_data *hf_data) -{ - return hf_data->transaction_index; -} - -void http_half_data_update_commit_index(struct http_decoder_half_data *half_data) -{ - http_decoder_table_update_commit_index(half_data->table); -} + int ret = 0; + int mem_merged = 0; + const char *parse_data = newdata; + size_t parse_data_len = newdata_len; -int http_half_data_get_total_parsed_header_count(struct http_decoder_half_data *half_data) -{ - return http_decoder_table_get_total_parsed_header(half_data->table); -} - -void http_half_pre_context_free(struct session *sess, struct http_decoder_exdata *exdata) -{ - struct http_message *msg = NULL; - struct http_decoder_half_data *res_data = NULL; - struct http_decoder_result_queue *queue = NULL; - - if (exdata) - { - queue = exdata->queue; - for (size_t i = 0; i < queue->queue_size; i++) + /* merge cached uncompleted header and current tcp payload first */ + if (half->cached_header_buffer != NULL && half->cached_header_buffer->buffer != NULL) { - struct http_decoder_half_data *req_data = queue->array[i].req_data; - res_data = queue->array[i].res_data; - if ((req_data != NULL) && (NULL == res_data) && (req_data->state < HTTP_EVENT_REQ_END)) + http_half_stage_buf_append(half, newdata, newdata_len); + parse_data = half->cached_header_buffer->buffer; + parse_data_len = half->cached_header_buffer->buffer_size; + mem_merged = 1; + if (parse_data_len > HTTP_HEADERS_CACHE_MAX_SIZE) { - msg = http_message_new(HTTP_TRANSACTION_END, queue, i, HTTP_REQUEST); - session_mq_publish_message(sess, exdata->pub_topic_id, msg); + STELLAR_LOG_FATAL(half->http_env_ref->logger_ref, HTTP_MODULE_NAME, + "sess: %s, headers buf len more than %d", session_get_readable_addr(half->sess_ref), HTTP_HEADERS_CACHE_MAX_SIZE); + return -1; } } + ret = http_flow_parse(half, parse_data, parse_data_len); + if (ret != HPE_OK) + { + STELLAR_LOG_FATAL(half->http_env_ref->logger_ref, HTTP_MODULE_NAME, + "llhttp parser error: %d, %s. sess: %s", ret, llhttp_errno_name(ret), session_get_readable_addr(half->sess_ref)); + return -1; + } - for (size_t i = 0; i < queue->queue_size; i++) + if (half->half_stage.llhttp_last_stage >= LLHTTP_STAGE_MESSAGE_BEGIN && + half->half_stage.llhttp_last_stage < LLHTTP_STAGE_HEADERS_COMPLETE) /* some headers not completed */ { - res_data = queue->array[i].res_data; - if ((res_data != NULL) && (res_data->state < HTTP_EVENT_RES_END)) + STELLAR_LOG_DEBUG(half->http_env_ref->logger_ref, HTTP_MODULE_NAME, + "sess: %s, header not completed, need caching, %.*s", + session_get_readable_addr(half->sess_ref), parse_data_len > 16 ? 16 : parse_data_len, parse_data); + if (http_half_cache_uncompleted_header(half, mem_merged, parse_data, parse_data_len) < 0) { - msg = http_message_new(HTTP_TRANSACTION_END, queue, i, HTTP_RESPONSE); - session_mq_publish_message(sess, exdata->pub_topic_id, msg); + STELLAR_LOG_FATAL(half->http_env_ref->logger_ref, HTTP_MODULE_NAME, + "sess: %s, headers buf len more than %d", session_get_readable_addr(half->sess_ref), HTTP_HEADERS_CACHE_MAX_SIZE); + return -1; } + + /* uncompleted_header, reset llhttp state and all headers */ + http_half_stage_reset(half); + llhttp_reset(&half->parser.llhttp_parser); + return 0; + } + else + { + /* ownership is maintained by header message, free it here (reference-- actually) */ + http_half_stage_buf_free(half); } + return 0; } -} -void http_half_update_state(struct http_decoder_half_data *hf_data, enum http_event state) -{ - hf_data->state = state; +#ifdef __cplusplus } - -void http_half_get_max_transaction_seq(struct http_decoder_exdata *exdata, long long *max_req_seq, long long *max_res_seq) -{ - assert(exdata && max_req_seq && max_res_seq); - *max_req_seq = exdata->decoder->c2s_half->transaction_seq; - *max_res_seq = exdata->decoder->s2c_half->transaction_seq; -} - -enum http_content_encoding http_half_data_get_content_encoding(struct http_decoder_half_data *hf_data) -{ - if (NULL == hf_data) - { - return HTTP_CONTENT_ENCODING_NONE; - } - return hf_data->content_encoding; -}
\ No newline at end of file +#endif diff --git a/decoders/http/http_decoder_half.h b/decoders/http/http_decoder_half.h index f525f78..6daf32c 100644 --- a/decoders/http/http_decoder_half.h +++ b/decoders/http/http_decoder_half.h @@ -1,109 +1,172 @@ #pragma once +#include <stdint.h> #include <stddef.h> #include "stellar/session.h" #include "stellar/http.h" -#include "http_content_decompress.h" -#include "http_decoder_result_queue.h" -#include <llhttp.h> +#include "llhttp.h" +#include "uthash/utarray.h" +#include "http_decoder_decompress.h" + +#ifdef __cplusplus +extern "C" +{ +#endif +#define HTTP_HEADER_FIELD_RESERVE_NUM (16) + +#define HTTP_HEADER_FIELD_NAME_HOST "Host" +#define HTTP_HEADER_FIELD_NAME_USER_AGENT "User-Agent" +#define HTTP_HEADER_FIELD_NAME_REFERER "Referer" +#define HTTP_HEADER_FIELD_NAME_COOKIE "Cookie" +#define HTTP_HEADER_FIELD_NAME_SET_COOKIE "Set-Cookie" +#define HTTP_HEADER_FIELD_NAME_CONTENT_TYPE "Content-Type" +#define HTTP_HEADER_FIELD_NAME_CONTENT_LENGTH "Content-Length" +#define HTTP_HEADER_FIELD_NAME_CONTENT_ENCODING "Content-Encoding" +#define HTTP_HEADER_FIELD_NAME_TRANSFER_ENCODING "Transfer-Encoding" + + enum http_event + { + __HTTP_EVENT_RESERVED = 0, + HTTP_EVENT_REQ_INIT = 1 << 1, + HTTP_EVENT_REQ_LINE = 1 << 2, + HTTP_EVENT_REQ_HDR = 1 << 3, + HTTP_EVENT_REQ_HDR_END = 1 << 4, + HTTP_EVENT_REQ_BODY_BEGIN = 1 << 5, + HTTP_EVENT_REQ_BODY_DATA = 1 << 6, + HTTP_EVENT_REQ_BODY_END = 1 << 7, + HTTP_EVENT_REQ_END = 1 << 8, + + HTTP_EVENT_RES_INIT = 1 << 9, + HTTP_EVENT_RES_LINE = 1 << 10, + HTTP_EVENT_RES_HDR = 1 << 11, + HTTP_EVENT_RES_HDR_END = 1 << 12, + HTTP_EVENT_RES_BODY_BEGIN = 1 << 13, + HTTP_EVENT_RES_BODY_DATA = 1 << 14, + HTTP_EVENT_RES_BODY_END = 1 << 15, + HTTP_EVENT_RES_END = 1 << 16, + }; + + struct http_body + { + char *body; + size_t body_sz; + size_t offset; // accumulated + int is_finished; + }; #ifndef hstring #include <bits/types/struct_iovec.h> -typedef struct iovec hstring; + typedef struct iovec hstring; #endif -// only one http event is fired at a time -enum http_event -{ - HTTP_EVENT_REQ_INIT = 1 << 1, - HTTP_EVENT_REQ_LINE = 1 << 2, - HTTP_EVENT_REQ_HDR = 1 << 3, - HTTP_EVENT_REQ_HDR_END = 1 << 4, - HTTP_EVENT_REQ_BODY_BEGIN = 1 << 5, - HTTP_EVENT_REQ_BODY_DATA = 1 << 6, - HTTP_EVENT_REQ_BODY_END = 1 << 7, - HTTP_EVENT_REQ_END = 1 << 8, - - HTTP_EVENT_RES_INIT = 1 << 9, - HTTP_EVENT_RES_LINE = 1 << 10, - HTTP_EVENT_RES_HDR = 1 << 11, - HTTP_EVENT_RES_HDR_END = 1 << 12, - HTTP_EVENT_RES_BODY_BEGIN = 1 << 13, - HTTP_EVENT_RES_BODY_DATA = 1 << 14, - HTTP_EVENT_RES_BODY_END = 1 << 15, - HTTP_EVENT_RES_END = 1 << 16, -}; - -struct http_event_context -{ - struct http_decoder_exdata *ref_httpd_ctx; - nmx_pool_t *ref_mempool; - struct session *ref_session; - struct http_decoder_result_queue *ref_queue; -}; - -struct http_decoder_half; -struct http_decoder_half_data; -struct http_decoder_env; - -typedef void http_event_cb(enum http_event event, struct http_decoder_half_data **data, - struct http_event_context *ev_ctx, void *httpd_plugin_env); - -struct http_decoder_half * -http_decoder_half_new(struct http_decoder_exdata *hd_ctx, nmx_pool_t *mempool, http_event_cb *event_cb, - enum llhttp_type http_type, int decompress_switch, struct http_decoder_env *httpd_env, long long start_seq); - -void http_decoder_half_free(nmx_pool_t *mempool, struct http_decoder_half *half); - -void http_decoder_half_reinit(struct http_decoder_half *half, - struct http_decoder_result_queue *queue, - nmx_pool_t *mempool, struct session *sess); - -int http_decoder_half_parse(int proxy_enable, struct http_decoder_half *half, const char *data, size_t data_len); - -long long http_decoder_half_trans_count(struct http_decoder_half *half); - -// http decoder half data API -struct http_decoder_half_data * -http_decoder_half_data_new(nmx_pool_t *mempool); - -void http_decoder_half_data_free(nmx_pool_t *mempool, struct http_decoder_half_data *data); - -int http_decoder_half_data_get_request_line(struct http_decoder_half_data *data, - struct http_request_line *line); - -int http_decoder_half_data_get_response_line(struct http_decoder_half_data *data, - struct http_response_line *line); - -int http_decoder_half_data_get_header(const struct http_decoder_half_data *data, - const char *name, size_t name_len, struct http_header_field *hdr_res); - -int http_decoder_half_data_iter_header(struct http_decoder_half_data *data, - struct http_header_field *header); -int http_decoder_half_data_reset_header_iter(struct http_decoder_half_data *req_data); -int http_decoder_half_data_has_parsed_header(struct http_decoder_half_data *data); - -int http_decoder_half_data_get_raw_body(const struct http_decoder_half_data *data, const char **body, size_t *body_len); - -int http_decoder_half_data_get_decompress_body(const struct http_decoder_half_data *data, const char **body, size_t *body_len); -void http_half_get_lastest_decompress_buffer(struct http_decoder_half_data *data, hstring *decompress_body); -void http_half_decompress_buffer_free(struct http_decoder_half_data *data, hstring *decompress_body); -void http_decoder_half_data_dump(struct http_decoder_half *half); - -void http_decoder_get_host_feed_url(struct http_decoder_half *half); -void http_decoder_get_url(struct http_decoder_half_data *hfdata, nmx_pool_t *mempool); -int http_half_data_get_decode_url(struct http_decoder_half_data *res_data, hstring *url); -void http_decoder_join_url(struct http_decoder_half_data *hfdata, - nmx_pool_t *mempool, - const struct http_header_field *host_hdr); -int http_decoder_join_url_finally(struct http_event_context *ev_ctx, - struct http_decoder_half_data *hfdata, - nmx_pool_t *mempool); -int http_half_data_get_url(struct http_decoder_half_data *res_data, const char **url_val, size_t *url_len); -int http_half_data_get_transaction_seq(struct http_decoder_half_data *hf_data); - -void http_half_data_update_commit_index(struct http_decoder_half_data *half_data); -void http_half_pre_context_free(struct session *sess, struct http_decoder_exdata *exdata); -void http_half_update_state(struct http_decoder_half_data *hf_data, enum http_event state); -int http_half_data_get_total_parsed_header_count(struct http_decoder_half_data *half_data); -void http_half_get_max_transaction_seq(struct http_decoder_exdata *exdata, long long *max_req_seq, long long *max_res_seq); -enum http_content_encoding http_half_data_get_content_encoding(struct http_decoder_half_data *hf_data);
\ No newline at end of file + struct http_half_data + { + enum flow_type flow_dir; + uint32_t transaction_seq; // seq of this half flow, is last http_lalf->transaction_num value + union + { + struct http_request_line req_line; + struct http_status_line status_line; + }; + + /* headers */ + struct http_buffer *cached_header_buffer; // maybe null + hstring joint_url; // malloc, need be free + enum http_content_encoding content_encoding_type; + int transfer_encoding_is_chunked; // -1: not set, 0: false, 1: true + UT_array *ut_filed_array; // inner struct, need to transform to header->field_array + struct http_header header; + + /* body */ + struct http_body raw_body; + struct http_content_decompress *decompress; + struct http_body decompress_body; + }; + + struct http_half_parser + { + llhttp_t llhttp_parser; + llhttp_settings_t settings; + struct http_half *half_ref; // used in llhttp callback context to get flow + }; + + enum http_stage + { + HTTP_STAGE_INIT = 0, + HTTP_STAGE_PENDING = 1, /* body without Content-Length, no Chunk-Encoding */ + HTTP_STAGE_HEADER_PARTIAL = 2, + HTTP_STAGE_BODY = 3, + }; + + struct http_stage_shaper + { + enum http_stage stage; + char *data; + size_t data_len; + long long remain_content_length; + const char *headers_start; // the first header field name + const char *headers_end; // the last char of \r\n\r\n + struct http_buffer *headers_cache; /* ownership move to struct http_decoder_half_data when headers completed */ + }; + + struct http_half_buffer + { + int is_malloc; // need free + int ref_count; // +1 when push a new message + char *buffer; + size_t buffer_size; + }; + + enum http_half_llhttp_stage_type + { + LLHTTP_STAGE_MESSAGE_BEGIN = 0, + LLHTTP_STAGE_URI, + LLHTTP_STAGE_METHOD, + LLHTTP_STAGE_STATUS, + LLHTTP_STAGE_VERSION, + LLHTTP_STAGE_HEADER_FIELD, + LLHTTP_STAGE_HEADER_FIELD_COMPLETE, + LLHTTP_STAGE_HEADER_VALUE, + LLHTTP_STAGE_HEADER_VALUE_COMPLETE, + LLHTTP_STAGE_HEADERS_COMPLETE, + LLHTTP_STAGE_BODY, + LLHTTP_STAGE_MESSAGE_COMPLETE, + __LLHTTP_STAGE_MAX, + }; + + struct http_half_llhttp_stage + { + const char *first_header_name_ptr; + const char *last_header_value_complete_ptr; /* at + length + 4 \r\n\r\n) */ + const char *last_headers_complete_ptr; + enum http_half_llhttp_stage_type llhttp_last_stage; + uint8_t llhttp_cb_count[__LLHTTP_STAGE_MAX]; + }; + + struct http_half + { + struct session *sess_ref; + struct http *http_env_ref; + + uint32_t transaction_num; // accumulated of all flows in this session + enum http_event event; + // struct http_stage_shaper shaper; + + struct http_half_parser parser; + struct http_half_data *flow_data; // malloc when every transaction start, ownership move to message when message completed + + struct http_buffer *cached_header_buffer; + struct http_half_llhttp_stage half_stage; + }; + void http_half_free(struct http_half *half); + void http_half_data_free(struct http_half_data *half_data); + void http_flow_parser_init(struct http_half_parser *flow_parser, enum llhttp_type type); + void http_flow_body_decompress(struct http_half_data *flow_data, const char *zipdata, size_t zipdatalen); + void http_event_handler(struct http_half *half, enum http_event event, struct http *httpd_env); + int http_flow_stage_shaping(struct http_half *half, const char *newdata, size_t newdata_len); + int http_half_flow_process(struct http_half *half, const char *newdata, size_t newdata_len); + int http_get_header_field_count(struct http_half_data *flow_data); + void http_flow_append_header_filed(struct http_half_data *flow_data, const char *at, size_t length); + void http_flow_append_header_value(struct http_half_data *flow_data, const char *at, size_t length); +#ifdef __cplusplus +} +#endif
\ No newline at end of file diff --git a/decoders/http/http_decoder_llhttp_wrap.c b/decoders/http/http_decoder_llhttp_wrap.c new file mode 100644 index 0000000..b025311 --- /dev/null +++ b/decoders/http/http_decoder_llhttp_wrap.c @@ -0,0 +1,318 @@ +#include <assert.h> +#include <stdio.h> +#include <string.h> +#include <arpa/inet.h> +#include "stellar/utils.h" +#include "http_decoder.h" +#include "llhttp.h" +#include "http_decoder_half.h" + +/* Possible return values 0, -1, `HPE_PAUSED` */ +static int on_message_begin(llhttp_t *http) +{ + struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser); + struct http_half *half = parser->half_ref; + half->half_stage.llhttp_last_stage = LLHTTP_STAGE_MESSAGE_BEGIN; + half->half_stage.llhttp_cb_count[LLHTTP_STAGE_MESSAGE_BEGIN]++; + if (http->type == HTTP_REQUEST) + { + half->event = HTTP_EVENT_REQ_INIT; + } + else + { + half->event = HTTP_EVENT_RES_INIT; + } + http_event_handler(parser->half_ref, half->event, parser->half_ref->http_env_ref); + return 0; +} + +static int on_message_complete(llhttp_t *http) +{ + struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser); + struct http_half *half = parser->half_ref; + half->half_stage.llhttp_last_stage = LLHTTP_STAGE_MESSAGE_COMPLETE; + half->half_stage.llhttp_cb_count[LLHTTP_STAGE_MESSAGE_COMPLETE]++; + if (http->type == HTTP_REQUEST) + { + if (half->event == HTTP_EVENT_REQ_BODY_DATA) + { + half->event = HTTP_EVENT_REQ_BODY_END; + http_event_handler(parser->half_ref, half->event, parser->half_ref->http_env_ref); + } + } + else + { + if (half->event == HTTP_EVENT_RES_BODY_DATA) + { + half->event = HTTP_EVENT_RES_BODY_END; + http_event_handler(parser->half_ref, half->event, parser->half_ref->http_env_ref); + } + } + + // trigger req_end/res_end + if (http->type == HTTP_REQUEST) + { + half->event = HTTP_EVENT_REQ_END; + } + else + { + half->event = HTTP_EVENT_RES_END; + } + http_event_handler(parser->half_ref, half->event, parser->half_ref->http_env_ref); + half->event = __HTTP_EVENT_RESERVED; + return 0; +} + +static int on_method(llhttp_t *http, const char *at, size_t length) +{ + if (length == 0) + { + return 0; + } + struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser); + struct http_half *half = parser->half_ref; + half->flow_data->req_line.method = (char *)at; + half->flow_data->req_line.method_len = length; + half->half_stage.llhttp_last_stage = LLHTTP_STAGE_METHOD; + half->half_stage.llhttp_cb_count[LLHTTP_STAGE_METHOD]++; + return 0; +} + +/* Possible return values 0, -1, HPE_USER */ +static int on_uri(llhttp_t *http, const char *at, size_t length) +{ + if (length == 0) + { + return 0; + } + struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser); + struct http_half *half = parser->half_ref; + half->flow_data->req_line.uri = (char *)at; + half->flow_data->req_line.uri_len = length; + half->half_stage.llhttp_last_stage = LLHTTP_STAGE_URI; + half->half_stage.llhttp_cb_count[LLHTTP_STAGE_URI]++; + return 0; +} + +/* Possible return values 0, -1, HPE_USER */ +static int on_version(llhttp_t *http, const char *at, size_t length) +{ + if (length == 0) + { + return 0; + } + struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser); + struct http_half *half = parser->half_ref; + if (http->type == HTTP_REQUEST) + { + half->flow_data->req_line.version = (char *)at; + half->flow_data->req_line.version_len = length; + } + else + { + half->flow_data->status_line.version = (char *)at; + half->flow_data->status_line.version_len = length; + } + half->half_stage.llhttp_last_stage = LLHTTP_STAGE_VERSION; + half->half_stage.llhttp_cb_count[LLHTTP_STAGE_VERSION]++; + return 0; +} + +/* Information-only callbacks, return value is ignored */ +static int on_version_complete(llhttp_t *http) +{ + struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser); + struct http_half *half = parser->half_ref; + if (http->type == HTTP_REQUEST) + { + half->flow_data->req_line.major_version = llhttp_get_http_major(http); + half->flow_data->req_line.minor_version = llhttp_get_http_minor(http); + } + else + { + half->flow_data->status_line.major_version = llhttp_get_http_major(http); + half->flow_data->status_line.minor_version = llhttp_get_http_minor(http); + } + half->half_stage.llhttp_last_stage = LLHTTP_STAGE_VERSION; + half->half_stage.llhttp_cb_count[LLHTTP_STAGE_VERSION]++; + return 0; +} + +/* Possible return values 0, -1, HPE_USER */ +static int on_status(llhttp_t *http, const char *at, size_t length) +{ + if (length == 0) + { + return 0; + } + struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser); + struct http_half *half = parser->half_ref; + half->flow_data->status_line.status = (char *)at; + half->flow_data->status_line.status_len = length; + half->half_stage.llhttp_last_stage = LLHTTP_STAGE_STATUS; + half->half_stage.llhttp_cb_count[LLHTTP_STAGE_STATUS]++; + return 0; +} + +/* Information-only callbacks, return value is ignored */ +static int on_status_complete(llhttp_t *http) +{ + struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser); + struct http_half *half = parser->half_ref; + half->flow_data->status_line.status_code = llhttp_get_status_code(http); + half->half_stage.llhttp_last_stage = LLHTTP_STAGE_STATUS; + half->half_stage.llhttp_cb_count[LLHTTP_STAGE_STATUS]++; + return 0; +} + +/* Possible return values 0, -1, HPE_USER */ +static int on_header_field(llhttp_t *http, const char *at, size_t length) +{ + if (length == 0) + { + return 0; + } + struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser); + struct http_half *half = parser->half_ref; + http_flow_append_header_filed(half->flow_data, at, length); + half->half_stage.llhttp_last_stage = LLHTTP_STAGE_HEADER_FIELD; + half->half_stage.llhttp_cb_count[LLHTTP_STAGE_HEADER_FIELD]++; + if (half->half_stage.first_header_name_ptr == NULL) + { + half->half_stage.first_header_name_ptr = at; + } + return 0; +} + +static int on_header_field_complete(llhttp_t *http) +{ + struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser); + struct http_half *half = parser->half_ref; + half->half_stage.llhttp_last_stage = LLHTTP_STAGE_HEADER_FIELD_COMPLETE; + half->half_stage.llhttp_cb_count[LLHTTP_STAGE_HEADER_FIELD_COMPLETE]++; + return 0; +} + +/* Possible return values 0, -1, HPE_USER */ +static int on_header_value(llhttp_t *http, const char *at, size_t length) +{ + if (length == 0) + { + return 0; + } + struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser); + struct http_half *half = parser->half_ref; + http_flow_append_header_value(half->flow_data, at, length); + half->half_stage.last_header_value_complete_ptr = at + length; + return 0; +} + +static int on_header_value_complete(llhttp_t *http) +{ + struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser); + struct http_half *half = parser->half_ref; + half->half_stage.llhttp_last_stage = LLHTTP_STAGE_HEADER_VALUE_COMPLETE; + half->half_stage.llhttp_cb_count[LLHTTP_STAGE_HEADER_VALUE_COMPLETE]++; + if (NULL != half->half_stage.last_header_value_complete_ptr) /* header value maybe empty(NULL) */ + { + if (strncmp("\r\n", half->half_stage.last_header_value_complete_ptr, 2) == 0) + { + half->half_stage.last_header_value_complete_ptr += 2; + } + } + return 0; +} + +/* Possible return values: + * 0 - Proceed normally + * 1 - Assume that request/response has no body, and proceed to parsing the next message + * 2 - Assume absence of body (as above) and make `llhttp_execute()` return `HPE_PAUSED_UPGRADE` + * -1 - Error `HPE_PAUSED` + */ +static int on_headers_complete(llhttp_t *http) +{ + struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser); + struct http_half *half = parser->half_ref; + if (http->type == HTTP_REQUEST) + { + half->event = HTTP_EVENT_REQ_HDR_END; + } + else + { + half->event = HTTP_EVENT_RES_HDR_END; + } + half->half_stage.last_headers_complete_ptr = half->half_stage.last_header_value_complete_ptr; + if (half->half_stage.last_headers_complete_ptr != NULL && + strncmp("\r\n", half->half_stage.last_headers_complete_ptr, 2) == 0) + { + half->half_stage.last_headers_complete_ptr += 2; + } + half->half_stage.llhttp_last_stage = LLHTTP_STAGE_HEADERS_COMPLETE; + half->half_stage.llhttp_cb_count[LLHTTP_STAGE_HEADERS_COMPLETE]++; + http_event_handler(half, half->event, half->http_env_ref); + return 0; +} + +/* Possible return values 0, -1, HPE_USER */ +static int on_body(llhttp_t *http, const char *at, size_t length) +{ + if (length == 0) + { + return 0; + } + int has_new_data = 0; + struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser); + struct http_half *half = parser->half_ref; + half->half_stage.llhttp_last_stage = LLHTTP_STAGE_BODY; + half->half_stage.llhttp_cb_count[LLHTTP_STAGE_BODY]++; + + if (half->http_env_ref->hd_cfg.decompress_switch != 0 && + half->flow_data->content_encoding_type != HTTP_CONTENT_ENCODING_NONE) + { + http_flow_body_decompress(half->flow_data, at, length); + if (half->flow_data->decompress_body.body != NULL && half->flow_data->decompress_body.body_sz > 0) + { + has_new_data = 1; + } + } + else + { + half->flow_data->raw_body.body = (char *)at; + half->flow_data->raw_body.body_sz = length; + has_new_data = 1; + } + if (has_new_data) + { + if (http->type == HTTP_REQUEST) + { + half->event = HTTP_EVENT_REQ_BODY_DATA; + } + else + { + half->event = HTTP_EVENT_RES_BODY_DATA; + } + http_event_handler(half, half->event, half->http_env_ref); + } + return 0; +} + +void http_flow_parser_init(struct http_half_parser *flow_parser, enum llhttp_type type) +{ + llhttp_settings_init(&flow_parser->settings); + llhttp_settings_t *sets = &flow_parser->settings; + sets->on_message_begin = on_message_begin; + sets->on_url = on_uri; + sets->on_status = on_status; + sets->on_status_complete = on_status_complete; + sets->on_method = on_method; + sets->on_version = on_version; + sets->on_version_complete = on_version_complete; + sets->on_header_field = on_header_field; + sets->on_header_field_complete = on_header_field_complete; + sets->on_header_value = on_header_value; + sets->on_header_value_complete = on_header_value_complete; + sets->on_headers_complete = on_headers_complete; + sets->on_body = on_body; + sets->on_message_complete = on_message_complete; + llhttp_init(&flow_parser->llhttp_parser, type, sets); +}
\ No newline at end of file diff --git a/decoders/http/http_decoder_module.c b/decoders/http/http_decoder_module.c new file mode 100644 index 0000000..53ff71b --- /dev/null +++ b/decoders/http/http_decoder_module.c @@ -0,0 +1,299 @@ +#include <stdio.h> +#include <assert.h> +#include "stellar/session.h" +#include "stellar/module.h" +#include "http_decoder_utils.h" +#include "http_decoder_half.h" +#include "http_decoder.h" +#include "toml/toml.h" + +#ifdef __cplusplus +extern "C" +{ +#endif + + __thread struct http_topic_manager *chaotic_http_topic_mgr; + +#define HTTP_TOPIC_NAME_REQ_HDR "HTTP_TOPIC_REQ_HDR" +#define HTTP_TOPIC_NAME_REQ_BODY "HTTP_TOPIC_REQ_BODY" +#define HTTP_TOPIC_NAME_RES_HDR "HTTP_TOPIC_RES_HDR" +#define HTTP_TOPIC_NAME_RES_BODY "HTTP_TOPIC_RES_BODY" + + static void http_set_default_config(struct http_config *hd_cfg) + { + hd_cfg->decompress_switch = 1; + } + static int http_load_config(const char *cfg_path, struct http_config *hd_cfg) + { + FILE *fp = fopen(cfg_path, "r"); + if (NULL == fp) + { + fprintf(stderr, "[%s]Can't open config file:%s", __FUNCTION__, cfg_path); + return -1; + } + int ret = 0; + char errbuf[256] = {0}; + toml_table_t *root = toml_parse_file(fp, errbuf, sizeof(errbuf)); + fclose(fp); + + toml_table_t *section = toml_table_in(root, "http"); + if (section == NULL) + { + fprintf(stderr, "(logger) config file %s missing 'http' section\n", cfg_path); + goto error_exit; + } + + toml_datum_t int_val = toml_int_in(section, "decompress_enable"); + if (int_val.ok != 0) + { + hd_cfg->decompress_switch = int_val.u.b; + } + error_exit: + toml_free(root); + return ret; + } + + static void http_update_header_array(struct http_half_data *flow_data) + { + if (0 == http_get_header_field_count(flow_data)) + { + flow_data->header.field_array = NULL; + flow_data->header.field_array_num = 0; + } + else + { + flow_data->header.field_array = (struct http_header_field *)utarray_front(flow_data->ut_filed_array); + flow_data->header.field_array_num = utarray_len(flow_data->ut_filed_array); + } + } + + static void http_on_msg_dispatch(int topic_id UNUSED, void *msg, on_msg_cb_func *on_msg_cb, + void *on_msg_cb_arg, void *dispatch_arg UNUSED) + { + assert(msg != NULL && on_msg_cb != NULL); + struct http_message *hmsg = (struct http_message *)msg; + struct http_half_data *flow_data = hmsg->flow_data; + + if (hmsg->event == HTTP_EVENT_REQ_END || hmsg->event == HTTP_EVENT_RES_END) + { + /* notify subscriber ? */ + return; + } + switch (hmsg->topic_type) + { + case HTTP_TOPIC_REQ_HEADER: + { + http_update_header_array(flow_data); + http_on_request_header_cb *on_req_hdr_cb = (http_on_request_header_cb *)((void *)on_msg_cb); + on_req_hdr_cb(hmsg->sess_ref, flow_data->transaction_seq, &flow_data->req_line, &flow_data->header, + (const char *)flow_data->joint_url.iov_base, flow_data->joint_url.iov_len, + on_msg_cb_arg); + } + break; + case HTTP_TOPIC_RES_HEADER: + { + http_update_header_array(flow_data); + http_on_response_header_cb *on_res_hdr_cb = (http_on_response_header_cb *)((void *)on_msg_cb); + on_res_hdr_cb(hmsg->sess_ref, flow_data->transaction_seq, &flow_data->status_line, &flow_data->header, on_msg_cb_arg); + } + break; + case HTTP_TOPIC_REQ_BODY: + case HTTP_TOPIC_RES_BODY: + { + http_on_body_cb *on_body_cb = (http_on_body_cb *)((void *)on_msg_cb); + if (flow_data->decompress_body.body != NULL || flow_data->decompress_body.offset > 0) + { + on_body_cb(hmsg->sess_ref, flow_data->decompress_body.body, flow_data->decompress_body.body_sz, flow_data->decompress_body.offset, + flow_data->transaction_seq, flow_data->decompress_body.is_finished, on_msg_cb_arg); + } + else + { + on_body_cb(hmsg->sess_ref, flow_data->raw_body.body, flow_data->raw_body.body_sz, flow_data->raw_body.offset, + flow_data->transaction_seq, flow_data->raw_body.is_finished, on_msg_cb_arg); + } + } + break; + default: + assert(0); + break; + } + } + + static int http_create_topic_nx(struct module_manager *mod_mgr, const char *topic_name) + { + struct mq_schema *mq_s = module_manager_get_mq_schema(mod_mgr); + assert(mq_s != NULL); + int quic_topic_id = mq_schema_get_topic_id(mq_s, topic_name); + if (quic_topic_id >= 0) + { + return quic_topic_id; + } + int topic_id = mq_schema_create_topic(mq_s, topic_name, (on_msg_dispatch_cb_func *)http_on_msg_dispatch, + NULL, http_message_free_cb, NULL); + return topic_id; + } + + struct http_topic_manager *http_topic_mgr_init(struct module_manager *mod_mgr) + { + if (chaotic_http_topic_mgr != NULL) + { + return chaotic_http_topic_mgr; + } + struct http_topic_manager *http_topic_mgr = (struct http_topic_manager *)calloc(1, sizeof(struct http_topic_manager)); + assert(http_topic_mgr != NULL); + struct http_topic_compose *topic_compose = http_topic_mgr->topic_compose; + + topic_compose[HTTP_TOPIC_REQ_HEADER].topic_name = HTTP_TOPIC_NAME_REQ_HDR; + topic_compose[HTTP_TOPIC_REQ_BODY].topic_name = HTTP_TOPIC_NAME_REQ_BODY; + topic_compose[HTTP_TOPIC_RES_HEADER].topic_name = HTTP_TOPIC_NAME_RES_HDR; + topic_compose[HTTP_TOPIC_RES_BODY].topic_name = HTTP_TOPIC_NAME_RES_BODY; + + topic_compose[HTTP_TOPIC_REQ_HEADER].topic_id = http_create_topic_nx(mod_mgr, HTTP_TOPIC_NAME_REQ_HDR); + topic_compose[HTTP_TOPIC_REQ_BODY].topic_id = http_create_topic_nx(mod_mgr, HTTP_TOPIC_NAME_REQ_BODY); + topic_compose[HTTP_TOPIC_RES_HEADER].topic_id = http_create_topic_nx(mod_mgr, HTTP_TOPIC_NAME_RES_HDR); + topic_compose[HTTP_TOPIC_RES_BODY].topic_id = http_create_topic_nx(mod_mgr, HTTP_TOPIC_NAME_RES_BODY); + chaotic_http_topic_mgr = http_topic_mgr; + return http_topic_mgr; + } + + void http_topic_mgr_free(struct http_topic_manager *topic_mgr) + { + assert(topic_mgr != NULL); + FREE(topic_mgr); + } + + static int http_subscribe_common(struct module_manager *mod_mgr, enum http_topic_type topic_type, void *cb, void *args) + { + struct http_topic_manager *http_topic_mgr = chaotic_http_topic_mgr; + if (http_topic_mgr == NULL) + { + http_topic_mgr = http_topic_mgr_init(mod_mgr); + chaotic_http_topic_mgr = http_topic_mgr; + } + return mq_schema_subscribe(module_manager_get_mq_schema(mod_mgr), + http_topic_mgr->topic_compose[topic_type].topic_id, + (on_msg_cb_func *)cb, args); + } + + int http_subscribe_request_header(struct module_manager *mod_mgr, http_on_request_header_cb *cb, void *args) + { + assert(mod_mgr != NULL); + return http_subscribe_common(mod_mgr, HTTP_TOPIC_REQ_HEADER, cb, args); + } + + int http_subscribe_response_header(struct module_manager *mod_mgr, http_on_response_header_cb *cb, void *args) + { + assert(mod_mgr != NULL); + return http_subscribe_common(mod_mgr, HTTP_TOPIC_RES_HEADER, cb, args); + } + + int http_subscribe_request_body(struct module_manager *mod_mgr, http_on_body_cb *cb, void *args) + { + assert(mod_mgr != NULL); + return http_subscribe_common(mod_mgr, HTTP_TOPIC_REQ_BODY, cb, args); + } + + int http_subscribe_response_body(struct module_manager *mod_mgr, http_on_body_cb *cb, void *args) + { + assert(mod_mgr != NULL); + return http_subscribe_common(mod_mgr, HTTP_TOPIC_RES_BODY, cb, args); + } + + int http_subscribe(struct http *http, struct http_subscirbe_params *params, void *arg) + { + assert(http != NULL && params != NULL); + struct module_manager *mod_mgr = http->mod_mgr_ref; + int ret = 0; + if (params->req_hdr_cb != NULL) + { + ret = http_subscribe_request_header(mod_mgr, params->req_hdr_cb, arg); + if (ret < 0) + { + return ret; + } + } + if (params->res_hdr_cb != NULL) + { + ret = http_subscribe_response_header(mod_mgr, params->res_hdr_cb, arg); + if (ret < 0) + { + return ret; + } + } + if (params->req_body_cb != NULL) + { + ret = http_subscribe_request_body(mod_mgr, params->req_body_cb, arg); + if (ret < 0) + { + return ret; + } + } + if (params->res_body_cb != NULL) + { + ret = http_subscribe_response_body(mod_mgr, params->res_body_cb, arg); + if (ret < 0) + { + return ret; + } + } + return ret; + } + + struct module *http_init(struct module_manager *mod_mgr) + { + assert(mod_mgr != NULL); + struct http *http_env = (struct http *)calloc(1, sizeof(struct http)); + http_set_default_config(&http_env->hd_cfg); + http_load_config(module_manager_get_toml_path(mod_mgr), &http_env->hd_cfg); + http_stat_init(mod_mgr, &http_env->stat); + struct module *mod = module_new(HTTP_MODULE_NAME, http_env); + http_env->mod_mgr_ref = mod_mgr; + + http_env->logger_ref = module_manager_get_logger(mod_mgr); + assert(http_env->logger_ref != NULL); + struct module *sess_mod = module_manager_get_module(mod_mgr, SESSION_MANAGER_MODULE_NAME); + struct session_manager *sess_mgr = module_to_session_manager(sess_mod); + assert(sess_mgr != NULL); + + struct http_topic_manager *http_topic_mgr = http_topic_mgr_init(mod_mgr); + assert(http_topic_mgr != NULL); + http_env->http_topic_mgr = http_topic_mgr; + + session_manager_subscribe_tcp_stream(sess_mgr, http_on_tcp_stream_cb, http_env); + http_env->exdata_id = session_manager_new_session_exdata_index(sess_mgr, HTTP_EXDATA_NAME, http_exdata_free_cb, http_env); + STELLAR_LOG_FATAL(http_env->logger_ref, HTTP_MODULE_NAME, + "http init success, decompress_switch:%d", http_env->hd_cfg.decompress_switch); + return mod; + } + struct module *http_get_module(struct module_manager *mod_mgr) + { + assert(mod_mgr != NULL); + + struct module *http_mod = module_manager_get_module(mod_mgr, HTTP_MODULE_NAME); + if (NULL == http_mod) + { + http_mod = http_init(mod_mgr); + } + return http_mod; + } + + struct http *http_module_to_http(struct module *http_mod) + { + assert(http_mod); + return (struct http *)module_get_ctx(http_mod); + } + + void http_exit(struct module_manager *mod_mgr UNUSED, struct module *mod) + { + assert(mod != NULL); + struct http *http_env = (struct http *)module_get_ctx(mod); + http_stat_free(&http_env->stat); + http_topic_mgr_free(http_env->http_topic_mgr); + STELLAR_LOG_FATAL(http_env->logger_ref, HTTP_MODULE_NAME, "http exit!"); + FREE(http_env); + module_free(mod); + } + +#ifdef __cplusplus +} +#endif diff --git a/decoders/http/http_decoder_private.h b/decoders/http/http_decoder_private.h deleted file mode 100644 index 04130f0..0000000 --- a/decoders/http/http_decoder_private.h +++ /dev/null @@ -1,155 +0,0 @@ - -#pragma once - -#ifndef __USE_MISC -#define __USE_MISC 1 -#endif - -#ifdef __cplusplus -extern "C" -{ -#endif -#include <bits/types/struct_iovec.h> -#include "stellar/stellar.h" -#include "stellar/packet.h" -#include "stellar/utils.h" -#include "stellar/session.h" -#include "stellar/stellar_mq.h" -#include "stellar/stellar_exdata.h" - -#include "nmx_pool/nmx_palloc.h" -#include "stellar/utils.h" -#include "stellar/http.h" -#include "http_decoder_result_queue.h" -#include "http_decoder_half.h" -#include "http_decoder_table.h" -#include "http_decoder_result_queue.h" -#include "http_decoder_utils.h" -#include "http_decoder_stat.h" -#include "http_decoder_tunnel.h" -#include "fieldstat/fieldstat_easy.h" -#include "toml/toml.h" - -#ifndef hstring -#include <bits/types/struct_iovec.h> -typedef struct iovec hstring; -#endif - -#ifndef likely -#define likely(x) __builtin_expect((x), 1) -#endif -#ifndef unlikely -#define unlikely(x) __builtin_expect((x), 0) -#endif - -#define MEMPOOL_CALLOC(pool, type, number) ((type *)nmx_pcalloc(pool, sizeof(type) * number)) -#define MEMPOOL_REALLOC(pool) -#define MEMPOOL_FREE(pool, p) nmx_pfree(pool, p) - -#define ENABLE_MEMPOOL 0 -#if ENABLE_MEMPOOL -#define HD_CALLOC(pool, type, number) MEMPOOL_CALLOC(pool, number, type) -#define HD_FREE(pool, p) MEMPOOL_FREE(pool, p) -#else -#define HD_CALLOC(pool, type, number) CALLOC(type, number) -#define HD_FREE(pool, p) FREE(p) -#endif - -#define HTTP_IDENTIFY_LEN 16 -#define HD_RESULT_QUEUE_LEN 16 - -#define DEFAULT_STAT_OUTPUT_INTERVAL 1 -#define DEFAULT_STAT_INTERVAL_PKTS 1000 -#define DEFAULT_MEMPOOL_SIZE (32 * 1024) - -#define HTTPD_CFG_FILE "./etc/http/http_decoder.toml" -#define FILEDSTAT_OUTPUT_FILE "./metrics/http_decoder_fs4.json" - -#define HTTP_CTX_NOT_HTTP "__NOT_HTTP_SESS__" -#define HTTP_CTX_IS_HTTP "__FAKE_HTTP_CTX__" - - struct http_decoder_config - { - int decompress_switch; - int stat_interval_pkts; // call fieldstat_incrby every stat_interval_pkts - int stat_output_interval; - int proxy_enable; - size_t result_queue_len; // per session result queue length - size_t mempool_size; // per session mempool size - }; - - /** - * NOTE: http_message don't have the ownership of data - */ - struct http_message - { - uint8_t flow_type; - enum http_message_type type; - size_t queue_index; - struct http_decoder_result_queue *ref_queue; - hstring raw_payload; // cause tcp reorder, maybe receive many tcp segments for one packet - hstring decompress_payload; - hstring tunnel_payload; - }; - - struct http_decoder - { - struct http_decoder_half *c2s_half; - struct http_decoder_half *s2c_half; - }; - - enum httpd_topic_index - { - HTTPD_TOPIC_TCP_STREAM_INDEX = 0, - HTTPD_TOPIC_HTTP_MSG_INDEX, - HTTPD_TOPIC_HTTP_TUNNEL_INDEX, - HTTPD_TOPIC_INDEX_MAX, - }; - - struct http_decoder_exdata - { - int sub_topic_id; // tcp_stream - int pub_topic_id; // http message or http tunnel msg - struct http_decoder_result_queue *queue; - struct http_decoder *decoder; - nmx_pool_t *mempool; - enum http_tunnel_state tunnel_state; - int in_tunnel_is_http; - }; - - // struct http_decoder_context{ - // int array_size; - // struct http_decoder_exdata **exdata_array; //raw tcp stream for http msg; http tunnel for inner http transaction. - // }; - - struct http_topic_exdata_compose - { - enum httpd_topic_index index; - const char *topic_name; - on_session_msg_cb_func *on_msg_cb; - stellar_msg_free_cb_func *msg_free_cb; - const char *exdata_name; - stellar_exdata_free *exdata_free_cb; - int sub_topic_id; // as consumer - int exdata_id; - }; - - struct http_decoder_env - { - struct stellar *st; - int plugin_id; - struct http_topic_exdata_compose topic_exdata_compose[HTTPD_TOPIC_INDEX_MAX]; - struct http_decoder_config hd_cfg; - struct http_decoder_stat hd_stat; - }; - - struct http_message; - - struct http_message *http_message_new(enum http_message_type type, struct http_decoder_result_queue *queue, - int queue_index, uint8_t flow_type); - struct http_message *http_body_message_new(enum http_message_type type, struct http_decoder_result_queue *queue, - int queue_index, uint8_t flow_type, hstring *raw_payload, hstring *decompress_payload); - int http_topic_exdata_compose_get_index(const struct http_decoder_env *httpd_env, int by_topic_id); -#ifdef __cplusplus -} -#endif diff --git a/decoders/http/http_decoder_result_queue.c b/decoders/http/http_decoder_result_queue.c deleted file mode 100644 index 5695138..0000000 --- a/decoders/http/http_decoder_result_queue.c +++ /dev/null @@ -1,152 +0,0 @@ -#include <assert.h> -#include "http_decoder_private.h" - -struct http_decoder_result_queue * -http_decoder_result_queue_new(nmx_pool_t *mempool, size_t queue_size) -{ - struct http_decoder_result_queue *queue = - MEMPOOL_CALLOC(mempool, struct http_decoder_result_queue, 1); - assert(queue); - - queue->req_index = 0; - queue->res_index = 0; - queue->queue_size = queue_size; - queue->array = MEMPOOL_CALLOC(mempool, struct http_decoder_result, - queue->queue_size); - assert(queue->array); - return queue; -} - -void http_decoder_result_queue_free(nmx_pool_t *mempool, struct http_decoder_result_queue *queue) -{ - if (NULL == queue) - { - return; - } - - if (queue->array != NULL) - { - for (size_t i = 0; i < queue->queue_size; i++) - { - if (queue->array[i].req_data != NULL) - { - http_decoder_half_data_free(mempool, queue->array[i].req_data); - queue->array[i].req_data = NULL; - } - - if (queue->array[i].res_data != NULL) - { - http_decoder_half_data_free(mempool, queue->array[i].res_data); - queue->array[i].res_data = NULL; - } - } - MEMPOOL_FREE(mempool, queue->array); - } - MEMPOOL_FREE(mempool, queue); -} - -void http_decoder_result_queue_inc_req_index(struct http_decoder_result_queue *queue) -{ - assert(queue); - queue->req_index++; - queue->req_index = queue->req_index % queue->queue_size; -} - -void http_decoder_result_queue_inc_res_index(struct http_decoder_result_queue *queue) -{ - assert(queue); - queue->res_index++; - queue->res_index = queue->res_index % queue->queue_size; -} - -size_t http_decoder_result_queue_req_index(struct http_decoder_result_queue *queue) -{ - assert(queue); - return queue->req_index; -} - -size_t http_decoder_result_queue_res_index(struct http_decoder_result_queue *queue) -{ - assert(queue); - return queue->res_index; -} - -int http_decoder_result_queue_push_req(struct http_decoder_result_queue *queue, - struct http_decoder_half_data *req_data) -{ - if (NULL == queue || NULL == req_data) - { - return -1; - } - assert(queue->array[queue->req_index].req_data == NULL); - if (queue->array[queue->req_index].req_data != NULL) - { - return -1; - } - queue->array[queue->req_index].req_data = req_data; - return 0; -} - -int http_decoder_result_queue_push_res(struct http_decoder_result_queue *queue, - struct http_decoder_half_data *res_data) -{ - if (NULL == queue || NULL == res_data) - { - return -1; - } - assert(queue->array[queue->res_index].res_data == NULL); - if (queue->array[queue->res_index].res_data != NULL) - { - return -1; - } - - queue->array[queue->res_index].res_data = res_data; - return 0; -} - -struct http_decoder_half_data * -http_decoder_result_queue_pop_req(struct http_decoder_result_queue *queue) -{ - if (NULL == queue) - { - return NULL; - } - struct http_decoder_half_data *req_data = queue->array[queue->req_index].req_data; - queue->array[queue->req_index].req_data = NULL; - return req_data; -} - -struct http_decoder_half_data * -http_decoder_result_queue_pop_res(struct http_decoder_result_queue *queue) -{ - if (NULL == queue) - { - return NULL; - } - - struct http_decoder_half_data *res_data = queue->array[queue->res_index].res_data; - queue->array[queue->res_index].res_data = NULL; - return res_data; -} - -struct http_decoder_half_data * -http_decoder_result_queue_peek_req(struct http_decoder_result_queue *queue) -{ - if (NULL == queue) - { - return NULL; - } - assert(queue->req_index < queue->queue_size); - return queue->array[queue->req_index].req_data; -} - -struct http_decoder_half_data * -http_decoder_result_queue_peek_res(struct http_decoder_result_queue *queue) -{ - if (NULL == queue) - { - return NULL; - } - assert(queue->res_index < queue->queue_size); - return queue->array[queue->res_index].res_data; -}
\ No newline at end of file diff --git a/decoders/http/http_decoder_result_queue.h b/decoders/http/http_decoder_result_queue.h deleted file mode 100644 index a6ff0ca..0000000 --- a/decoders/http/http_decoder_result_queue.h +++ /dev/null @@ -1,51 +0,0 @@ -#pragma once - -#include <stddef.h> -#include "nmx_pool/nmx_palloc.h" -#include "http_decoder_half.h" - -struct http_decoder_result -{ - struct http_decoder_half_data *req_data; - struct http_decoder_half_data *res_data; -}; - -struct http_decoder_result_queue -{ - size_t req_index; - size_t res_index; - size_t queue_size; - struct http_decoder_result *array; -}; - -struct http_decoder_result_queue * -http_decoder_result_queue_new(nmx_pool_t *mempool, size_t queue_size); - -void http_decoder_result_queue_free(nmx_pool_t *mempool, - struct http_decoder_result_queue *queue); - -void http_decoder_result_queue_inc_req_index(struct http_decoder_result_queue *queue); - -void http_decoder_result_queue_inc_res_index(struct http_decoder_result_queue *queue); - -size_t http_decoder_result_queue_req_index(struct http_decoder_result_queue *queue); - -size_t http_decoder_result_queue_res_index(struct http_decoder_result_queue *queue); - -struct http_decoder_half_data * -http_decoder_result_queue_pop_req(struct http_decoder_result_queue *queue); - -struct http_decoder_half_data * -http_decoder_result_queue_pop_res(struct http_decoder_result_queue *queue); - -int http_decoder_result_queue_push_req(struct http_decoder_result_queue *queue, - struct http_decoder_half_data *req_data); - -int http_decoder_result_queue_push_res(struct http_decoder_result_queue *queue, - struct http_decoder_half_data *res_data); - -struct http_decoder_half_data * -http_decoder_result_queue_peek_req(struct http_decoder_result_queue *queue); - -struct http_decoder_half_data * -http_decoder_result_queue_peek_res(struct http_decoder_result_queue *queue); diff --git a/decoders/http/http_decoder_stat.c b/decoders/http/http_decoder_stat.c index dec6c91..a8240c2 100644 --- a/decoders/http/http_decoder_stat.c +++ b/decoders/http/http_decoder_stat.c @@ -2,126 +2,104 @@ #include <stdio.h> #include <pthread.h> #include <unistd.h> -#include "http_decoder_private.h" +#include "http_decoder_stat.h" -static const struct hd_stat_config_tuple g_httpd_stat_tuple[HTTP_STAT_MAX] = - { - {HTTP_C2S_BYTES, "http_c2s_bytes"}, - {HTTP_S2C_BYTES, "http_s2c_bytes"}, - {HTTP_C2S_TCP_SEG, "http_c2s_tcp_seg"}, - {HTTP_S2C_TCP_SEG, "http_s2c_tcp_seg"}, - {HTTP_C2S_HEADERS, "http_c2s_headers"}, - {HTTP_S2C_HEADERS, "http_s2c_headers"}, - {HTTP_C2S_ZIP_BYTES, "http_c2s_zip_bytes"}, - {HTTP_S2C_ZIP_BYTES, "http_s2c_zip_bytes"}, - {HTTP_C2S_UNZIP_BYTES, "http_c2s_unzip_bytes"}, - {HTTP_S2C_UNZIP_BYTES, "http_s2c_unzip_bytes"}, - {HTTP_URL_BYTES, "http_url_bytes"}, - {HTTP_SESSION_NEW, "http_session_new"}, - {HTTP_SESSION_FREE, "http_session_free"}, - {HTTP_TRANSACTION_NEW, "http_transaction_new"}, - {HTTP_TRANSACTION_FREE, "http_transaction_free"}, - {HTTP_C2S_ASYMMETRY_SESSION, "http_c2s_asymmetry_sess"}, - {HTTP_S2C_ASYMMETRY_SESSION, "http_s2c_asymmetry_sess"}, - {HTTP_C2S_ASYMMETRY_TRANSACTION, "http_c2s_asymmetry_trans"}, - {HTTP_S2C_ASYMMETRY_TRANSACTION, "http_s2c_asymmetry_trans"}, - {HTTP_STAT_PARSE_ERR, "http_parse_error"}, -}; - -void http_decoder_stat_free(struct http_decoder_stat *hd_stat) +#ifdef __cplusplus +extern "C" { - if (hd_stat->timer_pid != 0) - { - pthread_cancel(hd_stat->timer_pid); - } - if (hd_stat->stats != NULL) - { - free(hd_stat->stats); - } - if (hd_stat->fse != NULL) - { - fieldstat_easy_free(hd_stat->fse); - } -} +#endif + static const struct hd_stat_config_tuple g_httpd_stat_tuple[HTTP_STAT_MAX] = + { + {HTTP_C2S_BYTES, "http_c2s_bytes"}, + {HTTP_S2C_BYTES, "http_s2c_bytes"}, + {HTTP_C2S_TCP_SEG, "http_c2s_tcp_seg"}, + {HTTP_S2C_TCP_SEG, "http_s2c_tcp_seg"}, + {HTTP_C2S_HEADERS, "http_c2s_headers"}, + {HTTP_S2C_HEADERS, "http_s2c_headers"}, + {HTTP_C2S_ZIP_BYTES, "http_c2s_zip_bytes"}, + {HTTP_S2C_ZIP_BYTES, "http_s2c_zip_bytes"}, + {HTTP_C2S_UNZIP_BYTES, "http_c2s_unzip_bytes"}, + {HTTP_S2C_UNZIP_BYTES, "http_s2c_unzip_bytes"}, + {HTTP_URL_BYTES, "http_url_bytes"}, + {HTTP_SESSION_NEW, "http_session_new"}, + {HTTP_SESSION_FREE, "http_session_free"}, + {HTTP_TRANSACTION_NEW, "http_transaction_new"}, + {HTTP_TRANSACTION_FREE, "http_transaction_free"}, + {HTTP_C2S_ASYMMETRY_SESSION, "http_c2s_asymmetry_sess"}, + {HTTP_S2C_ASYMMETRY_SESSION, "http_s2c_asymmetry_sess"}, + {HTTP_C2S_ASYMMETRY_TRANSACTION, "http_c2s_asymmetry_trans"}, + {HTTP_S2C_ASYMMETRY_TRANSACTION, "http_s2c_asymmetry_trans"}, + {HTTP_STAT_PARSE_ERR, "http_parse_error"}, + }; -static void *httpd_stat_timer_thread(void *arg) -{ - pthread_setname_np(pthread_self(), "http_decoder_timer_thread"); - struct http_decoder_stat *hd_stat = (struct http_decoder_stat *)arg; - struct timespec res; - while (1) + void http_stat_free(struct http_stat *hd_stat) { - clock_gettime(CLOCK_MONOTONIC, &res); - hd_stat->current_time_ms = (res.tv_sec * 1000) + (res.tv_nsec / 1000000); - usleep(800); + if (hd_stat->fs4_ins != NULL) + { + fieldstat_easy_free(hd_stat->fs4_ins); + } } - return NULL; -} -int http_decoder_stat_init(struct http_decoder_stat *hd_stat, int thread_max, int stat_interval_pkts, int stat_interval_time) -{ - assert(sizeof(g_httpd_stat_tuple) / sizeof(struct hd_stat_config_tuple) == HTTP_STAT_MAX); - if (sizeof(g_httpd_stat_tuple) / sizeof(struct hd_stat_config_tuple) != HTTP_STAT_MAX) - { - fprintf(stderr, "enum http_decoder_stat_type number not match with g_httpd_stat_tuple!"); - return -1; - } - hd_stat->fse = fieldstat_easy_new(thread_max, "http_decoder_statistics", NULL, 0); - if (NULL == hd_stat->fse) + static struct fieldstat_easy *http_get_fs4_ins(struct module_manager *mod_mgr) { - fprintf(stderr, "fieldstat_easy_new failed."); - return -1; + // todo: use stellar global fieldstat4 instance + struct fieldstat_easy *fs4 = fieldstat_easy_new(module_manager_get_max_thread_num(mod_mgr), "HTTP", NULL, 0); + fieldstat_easy_enable_auto_output(fs4, "./log/http.fs4", 1); + return fs4; } - for (int i = 0; i < HTTP_STAT_MAX; i++) + int http_stat_init(struct module_manager *mod_mgr, struct http_stat *hd_stat) { - hd_stat->field_stat_id[i] = fieldstat_easy_register_counter(hd_stat->fse, g_httpd_stat_tuple[i].name); - if (hd_stat->field_stat_id[i] < 0) + assert(sizeof(g_httpd_stat_tuple) / sizeof(struct hd_stat_config_tuple) == HTTP_STAT_MAX); + if (sizeof(g_httpd_stat_tuple) / sizeof(struct hd_stat_config_tuple) != HTTP_STAT_MAX) { - fprintf(stderr, "fieldstat_easy_register_counter %s failed.", g_httpd_stat_tuple[i].name); - fieldstat_easy_free(hd_stat->fse); - hd_stat->fse = NULL; + fprintf(stderr, "enum http_decoder_stat_type number not match with g_httpd_stat_tuple!"); + return -1; + } + hd_stat->fs4_ins = http_get_fs4_ins(mod_mgr); + if (NULL == hd_stat->fs4_ins) + { + fprintf(stderr, "fieldstat_easy_new failed."); return -1; } - } - int ret = fieldstat_easy_enable_auto_output(hd_stat->fse, FILEDSTAT_OUTPUT_FILE, stat_interval_time); - if (ret < 0) - { - fprintf(stderr, "fieldstat_easy_enable_auto_output failed."); - fieldstat_easy_free(hd_stat->fse); - hd_stat->fse = NULL; - return -1; + for (int i = 0; i < HTTP_STAT_MAX; i++) + { + hd_stat->field_stat_id[i] = fieldstat_easy_register_counter(hd_stat->fs4_ins, g_httpd_stat_tuple[i].name); + if (hd_stat->field_stat_id[i] < 0) + { + fprintf(stderr, "fieldstat_easy_register_counter %s failed.", g_httpd_stat_tuple[i].name); + fieldstat_easy_free(hd_stat->fs4_ins); + hd_stat->fs4_ins = NULL; + return -1; + } + } + return 0; } - hd_stat->stats = (struct hd_statistics *)calloc(thread_max, sizeof(struct hd_statistics)); - hd_stat->stat_interval_pkts = stat_interval_pkts; - hd_stat->stat_interval_time = stat_interval_time; - pthread_create(&hd_stat->timer_pid, NULL, httpd_stat_timer_thread, hd_stat); - pthread_detach(hd_stat->timer_pid); - return 0; -} - -void http_decoder_stat_update(struct http_decoder_stat *hd_stat, int thread_id, enum http_decoder_stat_type type, long long value) -{ - assert(hd_stat); - assert(thread_id >= 0); - assert(type < HTTP_STAT_MAX); - if (unlikely(hd_stat->stats == NULL)) + void http_stat_update(struct http_stat *hd_stat, int thread_id, enum http_decoder_stat_type type, long long value) { - return; + assert(hd_stat); + assert(thread_id >= 0); + assert(type < HTTP_STAT_MAX); + // todo: performance optimization, use batch update + fieldstat_easy_counter_incrby(hd_stat->fs4_ins, thread_id, hd_stat->field_stat_id[type], NULL, 0, value); } - struct hd_statistics *cur_hds = &hd_stat->stats[thread_id]; - - cur_hds->counter[type] += value; - cur_hds->batch[type]++; - - if (cur_hds->batch[type] >= hd_stat->stat_interval_pkts || cur_hds->time_ms[type] + 1000 < hd_stat->current_time_ms) + void http_stat_update_tcp_seg(struct http_stat *stat, int thread_id, enum flow_type ftype, long long value) { - fieldstat_easy_counter_incrby(hd_stat->fse, thread_id, hd_stat->field_stat_id[type], NULL, 0, cur_hds->counter[type]); - cur_hds->counter[type] = 0; - cur_hds->batch[type] = 0; - cur_hds->time_ms[type] = hd_stat->current_time_ms; + if (FLOW_TYPE_C2S == ftype) + { + http_stat_update(stat, thread_id, HTTP_C2S_BYTES, value); + http_stat_update(stat, thread_id, HTTP_C2S_TCP_SEG, 1); + } + else + { + http_stat_update(stat, thread_id, HTTP_S2C_BYTES, value); + http_stat_update(stat, thread_id, HTTP_S2C_TCP_SEG, 1); + } } -}
\ No newline at end of file + +#ifdef __cplusplus +} +#endif
\ No newline at end of file diff --git a/decoders/http/http_decoder_stat.h b/decoders/http/http_decoder_stat.h index 339e81d..0f9c1a7 100644 --- a/decoders/http/http_decoder_stat.h +++ b/decoders/http/http_decoder_stat.h @@ -1,55 +1,57 @@ #pragma once #include <fieldstat/fieldstat_easy.h> -enum http_decoder_stat_type -{ - HTTP_C2S_BYTES = 0, - HTTP_S2C_BYTES, - HTTP_C2S_TCP_SEG, - HTTP_S2C_TCP_SEG, - HTTP_C2S_HEADERS, - HTTP_S2C_HEADERS, - HTTP_C2S_ZIP_BYTES, // only if Content-Encoding is gzip, deflate, br - HTTP_S2C_ZIP_BYTES, // only if Content-Encoding is gzip, deflate, br - HTTP_C2S_UNZIP_BYTES, // only if Content-Encoding is gzip, deflate, br - HTTP_S2C_UNZIP_BYTES, // only if Content-Encoding is gzip, deflate, br - HTTP_URL_BYTES, - HTTP_SESSION_NEW, - HTTP_SESSION_FREE, - HTTP_TRANSACTION_NEW, - HTTP_TRANSACTION_FREE, - HTTP_C2S_ASYMMETRY_SESSION, - HTTP_S2C_ASYMMETRY_SESSION, - HTTP_C2S_ASYMMETRY_TRANSACTION, - HTTP_S2C_ASYMMETRY_TRANSACTION, - HTTP_STAT_PARSE_ERR, - HTTP_STAT_MAX, -}; +#include "stellar/module.h" +#include "stellar/session.h" -struct hd_stat_config_tuple +#ifdef __cplusplus +extern "C" { - enum http_decoder_stat_type type; - const char *name; -}; +#endif -struct hd_statistics -{ - long long time_ms[HTTP_STAT_MAX]; - long long counter[HTTP_STAT_MAX]; - int batch[HTTP_STAT_MAX]; // call fieldstat_easy_counter_incrby() per batch -} __attribute__((aligned(64))); + enum http_decoder_stat_type + { + HTTP_C2S_BYTES = 0, + HTTP_S2C_BYTES, + HTTP_C2S_TCP_SEG, + HTTP_S2C_TCP_SEG, + HTTP_C2S_HEADERS, + HTTP_S2C_HEADERS, + HTTP_C2S_ZIP_BYTES, // only update if Content-Encoding is gzip, deflate, br + HTTP_S2C_ZIP_BYTES, // only update if Content-Encoding is gzip, deflate, br + HTTP_C2S_UNZIP_BYTES, // only update if Content-Encoding is gzip, deflate, br + HTTP_S2C_UNZIP_BYTES, // only update if Content-Encoding is gzip, deflate, br + HTTP_URL_BYTES, + HTTP_SESSION_NEW, + HTTP_SESSION_FREE, + HTTP_TRANSACTION_NEW, + HTTP_TRANSACTION_FREE, + HTTP_C2S_ASYMMETRY_SESSION, + HTTP_S2C_ASYMMETRY_SESSION, + HTTP_C2S_ASYMMETRY_TRANSACTION, + HTTP_S2C_ASYMMETRY_TRANSACTION, + HTTP_STAT_PARSE_ERR, + HTTP_STAT_MAX, + }; -struct http_decoder_stat -{ - pthread_t timer_pid; - long long current_time_ms; - struct fieldstat_easy *fse; - int stat_interval_pkts; // call fieldstat_incrby every stat_interval_pkts - int stat_interval_time; // second - int field_stat_id[HTTP_STAT_MAX]; - struct hd_statistics *stats; // size is thread number -}; + struct hd_stat_config_tuple + { + enum http_decoder_stat_type type; + const char *name; + }; + + struct http_stat + { + struct fieldstat_easy *fs4_ins; + int stat_interval_time; // second + int field_stat_id[HTTP_STAT_MAX]; + }; + + int http_stat_init(struct module_manager *mod_mgr, struct http_stat *hd_stat); + void http_stat_free(struct http_stat *hd_stat); + void http_stat_update(struct http_stat *hd_stat, int thread_id, enum http_decoder_stat_type type, long long value); + void http_stat_update_tcp_seg(struct http_stat *stat, int thread_id, enum flow_type ftype, long long value); -int http_decoder_stat_init(struct http_decoder_stat *hd_stat, int thread_max, int stat_interval_pkts, int stat_interval_time); -void http_decoder_stat_free(struct http_decoder_stat *hd_stat); -void http_decoder_stat_update(struct http_decoder_stat *hd_stat, int thread_id, enum http_decoder_stat_type type, long long value); +#ifdef __cplusplus +} +#endif
\ No newline at end of file diff --git a/decoders/http/http_decoder_string.c b/decoders/http/http_decoder_string.c deleted file mode 100644 index 6fd5b04..0000000 --- a/decoders/http/http_decoder_string.c +++ /dev/null @@ -1,289 +0,0 @@ -#include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <assert.h> -#include "http_decoder_private.h" - -static const char *string_state_to_desc(enum string_state state) -{ - switch (state) - { - case STRING_STATE_INIT: - return "init"; - break; - case STRING_STATE_REFER: - return "refer"; - break; - case STRING_STATE_CACHE: - return "cache"; - break; - case STRING_STATE_COMMIT: - return "commit"; - break; - default: - return "unknown"; - break; - } -} - -void http_decoder_string_refer(struct http_decoder_string *rstr, const char *at, size_t length) -{ - if (NULL == rstr) - { - return; - } - - switch (rstr->state) - { - case STRING_STATE_INIT: - case STRING_STATE_CACHE: - rstr->refer.iov_base = (char *)at; - rstr->refer.iov_len = length; - break; - default: - abort(); - break; - } - - rstr->state = STRING_STATE_REFER; -} - -static void string_refer2cache(struct http_decoder_string *rstr) -{ - if (0 == rstr->refer.iov_len) - { - return; - } - if (rstr->cache.iov_len >= rstr->max_cache_size) - { - return; - } - - size_t length = rstr->cache.iov_len + rstr->refer.iov_len; - if (length > rstr->max_cache_size) - { - length = rstr->max_cache_size; - } - - if (NULL == rstr->cache.iov_base) - { - rstr->cache.iov_base = CALLOC(char, length + 1); - memcpy(rstr->cache.iov_base, rstr->refer.iov_base, length); - } - else - { - rstr->cache.iov_base = REALLOC(char, rstr->cache.iov_base, length + 1); - memcpy((char *)rstr->cache.iov_base + rstr->cache.iov_len, rstr->refer.iov_base, - (length - rstr->cache.iov_len)); - } - - rstr->cache.iov_len = length; - rstr->refer.iov_base = NULL; - rstr->refer.iov_len = 0; -} - -static void string_commit2cache(struct http_decoder_string *rstr) -{ - if (rstr->cache.iov_len == rstr->commit.iov_len && - rstr->cache.iov_base == rstr->commit.iov_base) - { - rstr->commit.iov_base = NULL; - rstr->commit.iov_len = 0; - return; - } - - // Only http header key need to backward to cache - size_t length = 0; - if (rstr->commit.iov_len > rstr->max_cache_size) - { - length = rstr->max_cache_size; - } - else - { - length = rstr->commit.iov_len; - } - - if (length > 0) - { - if (NULL == rstr->cache.iov_base) - { - rstr->cache.iov_base = CALLOC(char, length + 1); - } - else - { - abort(); - } - memcpy(rstr->cache.iov_base, rstr->commit.iov_base, length); - rstr->cache.iov_len = length; - - rstr->commit.iov_base = NULL; - rstr->commit.iov_len = 0; - } -} - -void http_decoder_string_cache(struct http_decoder_string *rstr) -{ - if (NULL == rstr) - { - return; - } - - switch (rstr->state) - { - case STRING_STATE_REFER: - string_refer2cache(rstr); - break; - case STRING_STATE_CACHE: - break; - case STRING_STATE_COMMIT: - // commit backward to cache - string_commit2cache(rstr); - break; - default: - abort(); - break; - } - rstr->state = STRING_STATE_CACHE; -} - -void http_decoder_string_commit(struct http_decoder_string *rstr) -{ - if (NULL == rstr) - { - return; - } - - switch (rstr->state) - { - case STRING_STATE_REFER: - if (rstr->cache.iov_len) - { - http_decoder_string_cache(rstr); - - rstr->commit.iov_base = rstr->cache.iov_base; - rstr->commit.iov_len = rstr->cache.iov_len; - // not overwrite rstr->cache.iov_base - } - else - { - rstr->commit.iov_base = rstr->refer.iov_base; - rstr->commit.iov_len = rstr->refer.iov_len; - - rstr->refer.iov_base = NULL; - rstr->refer.iov_len = 0; - } - break; - case STRING_STATE_CACHE: - rstr->commit.iov_base = rstr->cache.iov_base; - rstr->commit.iov_len = rstr->cache.iov_len; - // not overwrite rstr->cache.iov_base - break; - default: - // abort(); - break; - } - - rstr->state = STRING_STATE_COMMIT; -} - -void http_decoder_string_reset(struct http_decoder_string *rstr) -{ - assert(rstr); - - switch (rstr->state) - { - case STRING_STATE_INIT: - case STRING_STATE_REFER: - case STRING_STATE_CACHE: - case STRING_STATE_COMMIT: - FREE(rstr->cache.iov_base); - memset(rstr, 0, sizeof(struct http_decoder_string)); - break; - default: - abort(); - break; - } - - rstr->state = STRING_STATE_INIT; -} - -void http_decoder_string_init(struct http_decoder_string *rstr, size_t max_cache_size) -{ - rstr->max_cache_size = max_cache_size; -} - -void http_decoder_string_reinit(struct http_decoder_string *rstr) -{ - if (rstr->state == STRING_STATE_CACHE) - { - return; - } - - if (rstr->state == STRING_STATE_COMMIT && - rstr->cache.iov_base == rstr->commit.iov_base && - rstr->cache.iov_len == rstr->commit.iov_len) - { - return; - } - - if (rstr->cache.iov_base != NULL) - { - FREE(rstr->cache.iov_base); - rstr->cache.iov_len = 0; - } - -#if 0 - rstr->refer.iov_base = NULL; - rstr->refer.iov_len = 0; - rstr->commit.iov_base = NULL; - rstr->commit.iov_len = 0; - rstr->state = STRING_STATE_INIT; -#endif -} - -enum string_state http_decoder_string_state(const struct http_decoder_string *rstr) -{ - return rstr->state; -} - -int http_decoder_string_get(const struct http_decoder_string *rstr, char **name, size_t *name_len) -{ - if (NULL == rstr || NULL == name || 0 == name_len) - { - return -1; - } - - if (http_decoder_string_state(rstr) == STRING_STATE_COMMIT) - { - *name = rstr->commit.iov_base; - *name_len = rstr->commit.iov_len; - } - else - { - *name = NULL; - *name_len = 0; - } - return 0; -} - -void http_decoder_string_dump(struct http_decoder_string *rstr, const char *desc) -{ - if (NULL == rstr) - { - return; - } - - char *refer_str = http_safe_dup((char *)rstr->refer.iov_base, rstr->refer.iov_len); - char *cache_str = http_safe_dup((char *)rstr->cache.iov_base, rstr->cache.iov_len); - char *commit_str = http_safe_dup((char *)rstr->commit.iov_base, rstr->commit.iov_len); - - printf("%s: state: %s, refer: {len: %02zu, iov_base: %s}, cache: {len: %02zu, iov_base: %s}, commit: {len: %02zu, iov_base: %s}\n", - desc, string_state_to_desc(rstr->state), - rstr->refer.iov_len, refer_str, - rstr->cache.iov_len, cache_str, - rstr->commit.iov_len, commit_str); - - FREE(refer_str); - FREE(cache_str); - FREE(commit_str); -}
\ No newline at end of file diff --git a/decoders/http/http_decoder_string.h b/decoders/http/http_decoder_string.h deleted file mode 100644 index 83721e9..0000000 --- a/decoders/http/http_decoder_string.h +++ /dev/null @@ -1,73 +0,0 @@ -#pragma once - -#include "stellar/http.h" - -enum string_state { - STRING_STATE_INIT, - STRING_STATE_REFER, - STRING_STATE_CACHE, - STRING_STATE_COMMIT, -}; - -/* state transition diagram - * +----------+ - * | | - * \|/ | - * +------+ | - * | init | | - * +------+ | - * | | - * +---->| | - * | \|/ | - * | +-------+ | - * | | refer |--+ | - * | +-------+ | | - * | | | | - * | \|/ | | - * | +-------+ | | - * +--| cache | | | - * +-------+ | | - * | | | - * |<------+ | - * \|/ | - * +--------+ | - * | commit | | - * +--------+ | - * | | - * \|/ | - * +--------+ | - * | reset |----+ - * +--------+ - */ - - -//http decoder string -struct http_decoder_string { - hstring refer; // shallow copy - hstring cache; // deep copy - hstring commit; - - enum string_state state; - size_t max_cache_size; -}; - -void http_decoder_string_refer(struct http_decoder_string *rstr, - const char *at, size_t length); - -void http_decoder_string_cache(struct http_decoder_string *rstr); - -void http_decoder_string_commit(struct http_decoder_string *rstr); - -void http_decoder_string_reset(struct http_decoder_string *rstr); - -void http_decoder_string_init(struct http_decoder_string *rstr, - size_t max_cache_size); - -void http_decoder_string_reinit(struct http_decoder_string *rstr); - -enum string_state http_decoder_string_state(const struct http_decoder_string *rstr); - -int http_decoder_string_get(const struct http_decoder_string *rstr, char **name, size_t *name_len); - -void http_decoder_string_dump(struct http_decoder_string *rstr, const char *desc); -
\ No newline at end of file diff --git a/decoders/http/http_decoder_table.c b/decoders/http/http_decoder_table.c deleted file mode 100644 index c85b876..0000000 --- a/decoders/http/http_decoder_table.c +++ /dev/null @@ -1,579 +0,0 @@ -#include <assert.h> -#include <stdlib.h> -#include <string.h> -#include "http_decoder_private.h" - -#define INIT_HEADER_CNT 16 -#define MAX_URI_CACHE_SIZE 2048 -#define MAX_STATUS_CACHE_SIZE 32 -#define MAX_METHOD_CACHE_SIZE 8 -#define MAX_VERSION_CACHE_SIZE 4 -#define MAX_HEADER_KEY_CACHE_SIZE 4096 -#define MAX_HEADER_VALUE_CACHE_SIZE 4096 - -struct http_decoder_header -{ - struct http_decoder_string key; - struct http_decoder_string val; -}; - -struct http_decoder_table -{ - struct http_decoder_string uri; - struct http_decoder_string status; - struct http_decoder_string method; - struct http_decoder_string version; - struct http_decoder_string body; - - nmx_pool_t *ref_mempool; - int header_complete; // flag for all headers parsed completely - size_t header_cnt; - size_t header_index; // current parsing header - size_t header_iter; // plugins iterate cursor - size_t commit_header_index; // pushed to plugins, whether has called http_message_get0_next_header() - struct http_decoder_header *headers; -}; - -static void http_decoder_table_init(struct http_decoder_table *table) -{ - if (NULL == table) - { - return; - } - - struct http_decoder_header *header = NULL; - assert(table); - - http_decoder_string_init(&table->uri, MAX_URI_CACHE_SIZE); - http_decoder_string_init(&table->status, MAX_STATUS_CACHE_SIZE); - http_decoder_string_init(&table->method, MAX_METHOD_CACHE_SIZE); - http_decoder_string_init(&table->version, MAX_METHOD_CACHE_SIZE); - - for (size_t i = 0; i < table->header_cnt; i++) - { - header = &table->headers[i]; - http_decoder_string_init(&header->key, MAX_HEADER_KEY_CACHE_SIZE); - http_decoder_string_init(&header->val, MAX_HEADER_VALUE_CACHE_SIZE); - } - - http_decoder_string_init(&table->body, 0); -} - -struct http_decoder_table *http_decoder_table_new(nmx_pool_t *mempool) -{ - struct http_decoder_table *table = - MEMPOOL_CALLOC(mempool, struct http_decoder_table, 1); - assert(table); - - table->ref_mempool = mempool; - table->header_cnt = INIT_HEADER_CNT; - table->headers = MEMPOOL_CALLOC(mempool, struct http_decoder_header, - table->header_cnt); - table->commit_header_index = 0; - http_decoder_table_init(table); - - return table; -} - -void http_decoder_table_free(struct http_decoder_table *table) -{ - if (NULL == table) - { - return; - } - if (table->uri.cache.iov_base != NULL) - { - FREE(table->uri.cache.iov_base); - } - if (table->status.cache.iov_base != NULL) - { - FREE(table->status.cache.iov_base); - } - if (table->method.cache.iov_base != NULL) - { - FREE(table->method.cache.iov_base); - } - if (table->version.cache.iov_base != NULL) - { - FREE(table->version.cache.iov_base); - } - if (table->body.cache.iov_base != NULL) - { - FREE(table->body.cache.iov_base); - } - - if (table->headers != NULL) - { - for (size_t i = 0; i < table->header_cnt; i++) - { - if (table->headers[i].key.cache.iov_base != NULL) - { - FREE(table->headers[i].key.cache.iov_base); - } - - if (table->headers[i].val.cache.iov_base != NULL) - { - FREE(table->headers[i].val.cache.iov_base); - } - } - - MEMPOOL_FREE(table->ref_mempool, table->headers); - table->headers = NULL; - } - MEMPOOL_FREE(table->ref_mempool, table); -} - -enum string_state http_decoder_table_state(struct http_decoder_table *table, enum http_item type) -{ - if (NULL == table) - { - return STRING_STATE_INIT; - } - struct http_decoder_header *header = NULL; - enum string_state state = STRING_STATE_INIT; - assert(table); - - switch (type) - { - case HTTP_ITEM_URI: - state = http_decoder_string_state(&table->uri); - break; - case HTTP_ITEM_STATUS: - state = http_decoder_string_state(&table->status); - break; - case HTTP_ITEM_METHOD: - state = http_decoder_string_state(&table->method); - break; - case HTTP_ITEM_VERSION: - state = http_decoder_string_state(&table->version); - break; - case HTTP_ITEM_HDRKEY: - assert(table->header_index < table->header_cnt); - header = &table->headers[table->header_index]; - state = http_decoder_string_state(&header->key); - break; - case HTTP_ITEM_HDRVAL: - assert(table->header_index < table->header_cnt); - header = &table->headers[table->header_index]; - state = http_decoder_string_state(&header->val); - break; - case HTTP_ITEM_BODY: - state = http_decoder_string_state(&table->body); - break; - default: - abort(); - break; - } - - return state; -} - -void http_decoder_table_refer(struct http_decoder_table *table, enum http_item type, const char *at, size_t len) -{ - if (NULL == table) - { - return; - } - - struct http_decoder_header *header = NULL; - assert(table); - - switch (type) - { - case HTTP_ITEM_URI: - http_decoder_string_refer(&table->uri, at, len); - break; - case HTTP_ITEM_STATUS: - http_decoder_string_refer(&table->status, at, len); - break; - case HTTP_ITEM_METHOD: - http_decoder_string_refer(&table->method, at, len); - break; - case HTTP_ITEM_VERSION: - http_decoder_string_refer(&table->version, at, len); - break; - case HTTP_ITEM_HDRKEY: - assert(table->header_index < table->header_cnt); - header = &table->headers[table->header_index]; - http_decoder_string_refer(&header->key, at, len); - break; - case HTTP_ITEM_HDRVAL: - assert(table->header_index < table->header_cnt); - header = &table->headers[table->header_index]; - http_decoder_string_refer(&header->val, at, len); - break; - case HTTP_ITEM_BODY: - http_decoder_string_refer(&table->body, at, len); - break; - default: - abort(); - break; - } -} - -void http_decoder_table_cache(struct http_decoder_table *table, enum http_item type) -{ - if (NULL == table) - { - return; - } - - struct http_decoder_header *header = NULL; - assert(table); - - switch (type) - { - case HTTP_ITEM_URI: - http_decoder_string_cache(&table->uri); - break; - case HTTP_ITEM_STATUS: - http_decoder_string_cache(&table->status); - break; - case HTTP_ITEM_METHOD: - http_decoder_string_cache(&table->method); - break; - case HTTP_ITEM_VERSION: - http_decoder_string_cache(&table->version); - break; - case HTTP_ITEM_HDRKEY: - assert(table->header_index < table->header_cnt); - header = &table->headers[table->header_index]; - http_decoder_string_cache(&header->key); - break; - case HTTP_ITEM_HDRVAL: - assert(table->header_index < table->header_cnt); - header = &table->headers[table->header_index]; - http_decoder_string_cache(&header->val); - break; - case HTTP_ITEM_BODY: - http_decoder_string_cache(&table->body); - break; - default: - abort(); - break; - } -} - -void http_decoder_table_commit(struct http_decoder_table *table, enum http_item type) -{ - if (NULL == table) - { - return; - } - - size_t i = 0; - struct http_decoder_header *header = NULL; - assert(table); - - switch (type) - { - case HTTP_ITEM_URI: - http_decoder_string_commit(&table->uri); - break; - case HTTP_ITEM_STATUS: - http_decoder_string_commit(&table->status); - break; - case HTTP_ITEM_METHOD: - http_decoder_string_commit(&table->method); - break; - case HTTP_ITEM_VERSION: - http_decoder_string_commit(&table->version); - break; - case HTTP_ITEM_HDRKEY: - assert(table->header_index < table->header_cnt); - header = &table->headers[table->header_index]; - http_decoder_string_commit(&header->key); - break; - case HTTP_ITEM_HDRVAL: - header = &table->headers[table->header_index]; - http_decoder_string_commit(&header->val); - // inc index - if ((table->header_index + 1) >= table->header_cnt) - { - struct http_decoder_header *old_headers = table->headers; - table->headers = - MEMPOOL_CALLOC(table->ref_mempool, struct http_decoder_header, - table->header_cnt * 2); - table->header_cnt *= 2; - - for (i = 0; i <= table->header_index; i++) - { - table->headers[i] = old_headers[i]; - } - - MEMPOOL_FREE(table->ref_mempool, old_headers); - - for (i = table->header_index + 1; i < table->header_cnt; i++) - { - header = &table->headers[i]; - memset(header, 0, sizeof(struct http_decoder_header)); - http_decoder_string_init(&header->key, MAX_HEADER_KEY_CACHE_SIZE); - http_decoder_string_init(&header->val, MAX_HEADER_VALUE_CACHE_SIZE); - } - } - table->header_index++; - break; - case HTTP_ITEM_BODY: - http_decoder_string_commit(&table->body); - break; - default: - abort(); - break; - } -} - -void http_decoder_table_reset(struct http_decoder_table *table, enum http_item type) -{ - if (NULL == table) - { - return; - } - - struct http_decoder_header *header = NULL; - assert(table); - - switch (type) - { - case HTTP_ITEM_URI: - http_decoder_string_reset(&table->uri); - break; - case HTTP_ITEM_STATUS: - http_decoder_string_reset(&table->status); - break; - case HTTP_ITEM_METHOD: - http_decoder_string_reset(&table->method); - break; - case HTTP_ITEM_VERSION: - http_decoder_string_reset(&table->version); - break; - case HTTP_ITEM_HDRKEY: - header = &table->headers[table->header_index]; - http_decoder_string_reset(&header->key); - break; - case HTTP_ITEM_HDRVAL: - header = &table->headers[table->header_index]; - http_decoder_string_reset(&header->val); - break; - case HTTP_ITEM_BODY: - http_decoder_string_reset(&table->body); - break; - default: - abort(); - break; - } -} - -void http_decoder_table_reinit(struct http_decoder_table *table) -{ - assert(table); - struct http_decoder_header *header = NULL; - - http_decoder_string_reinit(&table->uri); - http_decoder_string_reinit(&table->status); - http_decoder_string_reinit(&table->method); - http_decoder_string_reinit(&table->version); - // for (size_t i = 0; i < table->header_iter; i++) { - for (size_t i = 0; i < table->commit_header_index; i++) - { - // todo, reset header_index, avoid realloc headers as much as possible - header = &table->headers[i]; - http_decoder_string_reinit(&header->key); - http_decoder_string_reinit(&header->val); - } - - http_decoder_string_reinit(&table->body); -} - -void http_decoder_table_dump(struct http_decoder_table *table) -{ - if (NULL == table) - { - return; - } - - http_decoder_string_dump(&table->uri, "uri"); - http_decoder_string_dump(&table->status, "status"); - http_decoder_string_dump(&table->method, "method"); - http_decoder_string_dump(&table->version, "version"); - http_decoder_string_dump(&table->body, "body"); - - for (size_t i = 0; i < table->header_cnt; i++) - { - struct http_decoder_header *header = &table->headers[i]; - if (NULL == header) - { - continue; - } - - http_decoder_string_dump(&header->key, "key"); - http_decoder_string_dump(&header->val, "val"); - } -} - -int http_decoder_table_get_uri(const struct http_decoder_table *table, char **out, size_t *out_len) -{ - if (NULL == table || NULL == out) - { - return -1; - } - return http_decoder_string_get(&table->uri, out, out_len); -} - -int http_decoder_table_get_method(const struct http_decoder_table *table, char **out, size_t *out_len) -{ - if (NULL == table || NULL == out) - { - return -1; - } - return http_decoder_string_get(&table->method, out, out_len); -} - -int http_decoder_table_get_status(const struct http_decoder_table *table, char **out, size_t *out_len) -{ - if (NULL == table || NULL == out) - { - return -1; - } - return http_decoder_string_get(&table->status, out, out_len); -} - -int http_decoder_table_get_version(const struct http_decoder_table *table, char **out, size_t *out_len) -{ - if (NULL == table || NULL == out) - { - return -1; - } - return http_decoder_string_get(&table->version, out, out_len); -} - -int http_decoder_table_get_body(const struct http_decoder_table *table, char **out, size_t *out_len) -{ - if (NULL == table || NULL == out) - { - return -1; - } - return http_decoder_string_get(&table->body, (char **)out, out_len); -} - -int http_decoder_table_get_header(const struct http_decoder_table *table, const char *name, size_t name_len, - struct http_header_field *hdr_result) -{ - for (size_t i = 0; i < table->header_cnt; i++) - { - const struct http_decoder_header *tmp_header = &table->headers[i]; - if (tmp_header->key.commit.iov_len != name_len) - { - continue; - } - - if (http_decoder_string_state(&tmp_header->key) == STRING_STATE_COMMIT && - http_decoder_string_state(&tmp_header->val) == STRING_STATE_COMMIT) - { - hstring tmp_key; - http_decoder_string_get(&tmp_header->key, (char **)&tmp_key.iov_base, &tmp_key.iov_len); - - if (tmp_key.iov_len == name_len && - (0 == strncasecmp((char *)tmp_key.iov_base, name, name_len))) - { - http_decoder_string_get(&tmp_header->key, &hdr_result->name, &hdr_result->name_len); - http_decoder_string_get(&tmp_header->val, &hdr_result->value, &hdr_result->value_len); - return 0; - } - } - } - return -1; -} - -int http_decoder_table_iter_header(struct http_decoder_table *table, struct http_header_field *hdr) -{ - if (NULL == table || NULL == hdr) - { - return -1; - } - if (table->header_iter >= table->header_cnt) - { - return -1; - } - - struct http_decoder_header *tmp_header = &table->headers[table->header_iter]; - if (tmp_header != NULL) - { - if (http_decoder_string_state(&tmp_header->key) == STRING_STATE_COMMIT && - http_decoder_string_state(&tmp_header->val) == STRING_STATE_COMMIT) - { - - http_decoder_string_get(&tmp_header->key, &hdr->name, &hdr->name_len); - http_decoder_string_get(&tmp_header->val, &hdr->value, &hdr->value_len); - table->header_iter++; - return 1; - } - } - - hdr->name = NULL; - hdr->name_len = 0; - hdr->value = NULL; - hdr->value_len = 0; - - return -1; -} - -int http_decoder_table_reset_header_iter(struct http_decoder_table *table) -{ - table->header_iter = 0; - return 0; -} - -int http_decoder_table_has_parsed_header(struct http_decoder_table *table) -{ - // if (NULL == table || (table->header_iter == table->header_index)) { - if (NULL == table || (table->commit_header_index == table->header_index)) - { - return 0; - } - - const struct http_decoder_header *tmp_header = &table->headers[table->header_iter]; - - if (http_decoder_string_state(&tmp_header->key) == STRING_STATE_COMMIT && http_decoder_string_state(&tmp_header->val) == STRING_STATE_COMMIT) - { - return 1; - } - - return 0; -} - -int http_decoder_table_header_complete(struct http_decoder_table *table) -{ - if (NULL == table) - { - return -1; - } - return table->header_complete; -} - -void http_decoder_table_set_header_complete(struct http_decoder_table *table) -{ - if (NULL == table) - { - return; - } - table->header_complete = 1; -} - -void http_decoder_table_reset_header_complete(struct http_decoder_table *table) -{ - if (NULL == table) - { - return; - } - table->header_complete = 0; -} - -void http_decoder_table_update_commit_index(struct http_decoder_table *table) -{ - table->commit_header_index = table->header_index; -} - -int http_decoder_table_get_total_parsed_header(struct http_decoder_table *table) -{ - return table->header_index; -}
\ No newline at end of file diff --git a/decoders/http/http_decoder_table.h b/decoders/http/http_decoder_table.h deleted file mode 100644 index 9a8d948..0000000 --- a/decoders/http/http_decoder_table.h +++ /dev/null @@ -1,79 +0,0 @@ -#pragma once -#include <stddef.h> -#include "stellar/http.h" -#include "http_decoder_private.h" -#include "http_decoder_string.h" - -enum http_item -{ - HTTP_ITEM_URI = 0x01, - HTTP_ITEM_STATUS = 0x02, - HTTP_ITEM_METHOD = 0x03, - HTTP_ITEM_VERSION = 0x04, - HTTP_ITEM_HDRKEY = 0x05, - HTTP_ITEM_HDRVAL = 0x06, - HTTP_ITEM_BODY = 0x07, -}; - -struct http_decoder_table; -struct http_decoder_table *http_decoder_table_new(nmx_pool_t *mempool); - -void http_decoder_table_free(struct http_decoder_table *table); - -enum string_state -http_decoder_table_state(struct http_decoder_table *table, enum http_item type); - -void http_decoder_table_refer(struct http_decoder_table *table, enum http_item type, - const char *at, size_t len); - -void http_decoder_table_cache(struct http_decoder_table *table, enum http_item type); - -void http_decoder_table_commit(struct http_decoder_table *table, enum http_item type); - -void http_decoder_table_reset(struct http_decoder_table *table, enum http_item type); - -void http_decoder_table_reinit(struct http_decoder_table *table); - -void http_decoder_table_dump(struct http_decoder_table *table); - -int http_decoder_table_get_uri(const struct http_decoder_table *table, char **out, size_t *out_len); - -int http_decoder_table_get_method(const struct http_decoder_table *table, char **out, size_t *out_len); - -int http_decoder_table_get_status(const struct http_decoder_table *table, char **out, size_t *out_len); - -int http_decoder_table_get_version(const struct http_decoder_table *table, char **out, size_t *out_len); - -int http_decoder_table_get_body(const struct http_decoder_table *table, char **out, size_t *out_len); - -int http_decoder_table_get_header(const struct http_decoder_table *table, - const char *name, size_t name_len, - struct http_header_field *hdr_res); - -int http_decoder_table_iter_header(struct http_decoder_table *table, - struct http_header_field *hdr); -int http_decoder_table_reset_header_iter(struct http_decoder_table *table); -/** - * @brief Is there a parsed header - * - * @retval yes(1) no(0) - */ -int http_decoder_table_has_parsed_header(struct http_decoder_table *table); - -/** - * @brief If headers have been parsed completely - * - * @retval yes(1) no(0) - */ -int http_decoder_table_header_complete(struct http_decoder_table *table); - -/** - * @brief set flag for headers parsed completely - */ -void http_decoder_table_set_header_complete(struct http_decoder_table *table); - -void http_decoder_table_reset_header_complete(struct http_decoder_table *table); - -void http_decoder_table_update_commit_index(struct http_decoder_table *table); - -int http_decoder_table_get_total_parsed_header(struct http_decoder_table *table); diff --git a/decoders/http/http_decoder_tunnel.c b/decoders/http/http_decoder_tunnel.c deleted file mode 100644 index a6abda8..0000000 --- a/decoders/http/http_decoder_tunnel.c +++ /dev/null @@ -1,116 +0,0 @@ -#include <assert.h> -#include <stdio.h> -#include <string.h> -#include <strings.h> -#include <unistd.h> -#include "http_decoder_private.h" -#include "llhttp.h" - -struct http_tunnel_message -{ - enum http_tunnel_message_type type; - hstring tunnel_payload; -}; - -int httpd_tunnel_identify(struct http_decoder_env *httpd_env, int curdir, struct http_decoder_half_data *hfdata) -{ - if (0 == httpd_env->hd_cfg.proxy_enable) - { - return 0; - } - - if (FLOW_TYPE_C2S == curdir) - { - struct http_request_line reqline = {}; - http_decoder_half_data_get_request_line(hfdata, &reqline); - if (0 == http_strncasecmp_safe("CONNECT", (char *)reqline.method, - 7, reqline.method_len)) - { - return 1; - } - } - else - { - struct http_response_line resline = {}; - http_decoder_half_data_get_response_line(hfdata, &resline); - if (resline.status_code == HTTP_STATUS_OK && 0 == http_strncasecmp_safe("Connection established", (char *)resline.status, - strlen("Connection established"), resline.status_len)) - { - return 1; - } - } - - return 0; -} - -int httpd_is_tunnel_session(const struct http_decoder_env *httpd_env, const struct http_decoder_exdata *ex_data) -{ - if (0 == httpd_env->hd_cfg.proxy_enable) - { - return 0; - } - return (ex_data && ex_data->tunnel_state != HTTP_TUN_NON); -} - -int httpd_in_tunnel_transmitting(const struct http_decoder_env *httpd_env, struct http_decoder_exdata *ex_data) -{ - if (0 == httpd_env->hd_cfg.proxy_enable) - { - return 0; - } - return (ex_data && ex_data->tunnel_state >= HTTP_TUN_INNER_STARTING); -} - -enum http_tunnel_message_type httpd_tunnel_state_to_msg(const struct http_decoder_exdata *ex_data) -{ - if (ex_data->tunnel_state == HTTP_TUN_INNER_STARTING) - { - return HTTP_TUNNEL_OPENING; - } - if (ex_data->tunnel_state == HTTP_TUN_INNER_TRANS) - { - return HTTP_TUNNEL_ACTIVE; - } - return HTTP_TUNNEL_MSG_MAX; -} - -void httpd_tunnel_state_update(struct http_decoder_exdata *ex_data) -{ - if (ex_data->tunnel_state == HTTP_TUN_INNER_STARTING) - { - ex_data->tunnel_state = HTTP_TUN_INNER_TRANS; - } -} - -void http_decoder_push_tunnel_data(struct session *sess, const struct http_decoder_exdata *exdata, enum http_tunnel_message_type type, const char *payload, uint16_t payload_len) -{ - struct http_tunnel_message *tmsg = (struct http_tunnel_message *)CALLOC(struct http_tunnel_message, 1); - tmsg->type = type; - tmsg->tunnel_payload.iov_base = (char *)payload; - tmsg->tunnel_payload.iov_len = payload_len; - session_mq_publish_message(sess, exdata->pub_topic_id, tmsg); -} - -#ifdef __cplusplus -extern "C" -{ -#endif - void http_tunnel_message_get_payload(const struct http_tunnel_message *tmsg, - hstring *tunnel_payload) - { - if (unlikely(NULL == tmsg || tunnel_payload == NULL)) - { - return; - } - tunnel_payload->iov_base = tmsg->tunnel_payload.iov_base; - tunnel_payload->iov_len = tmsg->tunnel_payload.iov_len; - } - - enum http_tunnel_message_type http_tunnel_message_type_get(const struct http_tunnel_message *tmsg) - { - return tmsg->type; - } - -#ifdef __cplusplus -} -#endif diff --git a/decoders/http/http_decoder_tunnel.h b/decoders/http/http_decoder_tunnel.h deleted file mode 100644 index 52882e5..0000000 --- a/decoders/http/http_decoder_tunnel.h +++ /dev/null @@ -1,36 +0,0 @@ -#pragma once -#include "http_decoder_private.h" -#include "http_decoder_half.h" - -enum http_tunnel_state -{ - HTTP_TUN_NON = 0, // init, or not tunnel session - HTTP_TUN_C2S_HDR_START, // CONNECT ... - HTTP_TUN_C2S_END, // CONNECT request end - HTTP_TUN_S2C_START, // HTTP 200 connet established - HTTP_TUN_INNER_STARTING, // http inner tunnel protocol starting - HTTP_TUN_INNER_TRANS, // http inner tunnel protocol transmitting -}; - -/************************************************************ - * HTTP TUNNEL WITH CONNECT METHOD. - *************************************************************/ -struct http_tunnel_message; -#define HTTP_DECODER_TUNNEL_TOPIC "HTTP_DECODER_TUNNEL_MESSAGE" - -enum http_tunnel_message_type -{ - HTTP_TUNNEL_OPENING, - HTTP_TUNNEL_ACTIVE, - HTTP_TUNNEL_CLOSING, - HTTP_TUNNEL_MSG_MAX -}; -enum http_tunnel_message_type http_tunnel_message_type_get(const struct http_tunnel_message *tmsg); -void http_tunnel_message_get_payload(const struct http_tunnel_message *tmsg, struct iovec *tunnel_payload); - -int httpd_tunnel_identify(struct http_decoder_env *httpd_env, int curdir, struct http_decoder_half_data *hfdata); -int httpd_is_tunnel_session(const struct http_decoder_env *httpd_env, const struct http_decoder_exdata *ex_data); -int httpd_in_tunnel_transmitting(const struct http_decoder_env *httpd_env, struct http_decoder_exdata *ex_data); -void httpd_tunnel_state_update(struct http_decoder_exdata *ex_data); -void http_decoder_push_tunnel_data(struct session *sess, const struct http_decoder_exdata *exdata, enum http_tunnel_message_type type, const char *payload, uint16_t payload_len); -enum http_tunnel_message_type httpd_tunnel_state_to_msg(const struct http_decoder_exdata *ex_data);
\ No newline at end of file diff --git a/decoders/http/http_decoder_utils.c b/decoders/http/http_decoder_utils.c index 08c66b0..6c20ce7 100644 --- a/decoders/http/http_decoder_utils.c +++ b/decoders/http/http_decoder_utils.c @@ -1,311 +1,367 @@ #include <string.h> #include <assert.h> +#include <arpa/inet.h> #include "stellar/http.h" -#include "http_decoder_private.h" +#include "stellar/utils.h" +#include "http_decoder_utils.h" +#include "http_decoder.h" +#include "llhttp.h" -char *http_safe_dup(const char *str, size_t len) +#ifdef __cplusplus +extern "C" { - if (str == NULL || len == 0) - { - return NULL; - } - char *dup = CALLOC(char, len + 1); - memcpy(dup, str, len); - return dup; -} +#endif -int http_strncasecmp_safe(const char *fix_s1, const char *dyn_s2, size_t fix_n1, size_t dyn_n2) -{ - if (fix_s1 == NULL || dyn_s2 == NULL) + char *http_safe_dup(const char *str, size_t len) { - return -1; + if (str == NULL || len == 0) + { + return NULL; + } + char *dup = CALLOC(char, len + 1); + memcpy(dup, str, len); + return dup; } - if (fix_n1 != dyn_n2) + + int http_strncasecmp_safe(const char *fix_s1, const char *dyn_s2, size_t fix_n1, size_t dyn_n2) { - return -1; + if (fix_s1 == NULL || dyn_s2 == NULL) + { + return -1; + } + if (fix_n1 != dyn_n2) + { + return -1; + } + return strncasecmp(fix_s1, dyn_s2, fix_n1); } - return strncasecmp(fix_s1, dyn_s2, fix_n1); -} -const char *http_message_type_to_string(enum http_message_type type) -{ - const char *sname = "unknown_msg_type"; - - switch (type) + const char *http_topic_type_to_string(enum http_topic_type type) { - case HTTP_MESSAGE_REQ_LINE: - sname = "HTTP_MESSAGE_REQ_LINE"; - break; - case HTTP_MESSAGE_REQ_HEADER: - sname = "HTTP_MESSAGE_REQ_HEADER"; - break; - case HTTP_MESSAGE_REQ_HEADER_END: - sname = "HTTP_MESSAGE_REQ_HEADER_END"; - break; - case HTTP_MESSAGE_REQ_BODY_START: - sname = "HTTP_MESSAGE_REQ_BODY_START"; - break; - case HTTP_MESSAGE_REQ_BODY: - sname = "HTTP_MESSAGE_REQ_BODY"; - break; - case HTTP_MESSAGE_REQ_BODY_END: - sname = "HTTP_MESSAGE_REQ_BODY_END"; - break; - case HTTP_MESSAGE_RES_LINE: - sname = "HTTP_MESSAGE_RES_LINE"; - break; - case HTTP_MESSAGE_RES_HEADER: - sname = "HTTP_MESSAGE_RES_HEADER"; - break; - case HTTP_MESSAGE_RES_HEADER_END: - sname = "HTTP_MESSAGE_RES_HEADER_END"; - break; - case HTTP_MESSAGE_RES_BODY_START: - sname = "HTTP_MESSAGE_RES_BODY_START"; - break; - case HTTP_MESSAGE_RES_BODY: - sname = "HTTP_MESSAGE_RES_BODY"; - break; - case HTTP_MESSAGE_RES_BODY_END: - sname = "HTTP_MESSAGE_RES_BODY_END"; - break; - case HTTP_TRANSACTION_START: - sname = "HTTP_TRANSACTION_START"; - break; - case HTTP_TRANSACTION_END: - sname = "HTTP_TRANSACTION_END"; - break; + const char *sname = "unknown_topic"; - default: - break; + switch (type) + { + case HTTP_TOPIC_REQ_HEADER: + sname = "HTTP_TOPIC_REQ_HEADER"; + break; + case HTTP_TOPIC_REQ_BODY: + sname = "HTTP_TOPIC_REQ_BODY"; + break; + case HTTP_TOPIC_RES_HEADER: + sname = "HTTP_TOPIC_RES_HEADER"; + break; + case HTTP_TOPIC_RES_BODY: + sname = "HTTP_TOPIC_RES_BODY"; + break; + default: + break; + } + return sname; } - return sname; -} - -int http_message_type_is_req(struct session *sess, enum http_message_type msg_type) -{ - int is_req_msg = 0; - - switch (msg_type) - { - case HTTP_MESSAGE_REQ_LINE: - case HTTP_MESSAGE_REQ_HEADER: - case HTTP_MESSAGE_REQ_HEADER_END: - case HTTP_MESSAGE_REQ_BODY_START: - case HTTP_MESSAGE_REQ_BODY: - case HTTP_MESSAGE_REQ_BODY_END: - is_req_msg = 1; - break; + static const unsigned char __g_httpd_hextable[] = { + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 0, 0, 0, /* 0x30 - 0x3f */ + 0, 10, 11, 12, 13, 14, 15, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* 0x40 - 0x4f */ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* 0x50 - 0x5f */ + 0, 10, 11, 12, 13, 14, 15 /* 0x60 - 0x66 */ + }; - case HTTP_MESSAGE_RES_LINE: - case HTTP_MESSAGE_RES_HEADER: - case HTTP_MESSAGE_RES_HEADER_END: - case HTTP_MESSAGE_RES_BODY_START: - case HTTP_MESSAGE_RES_BODY: - case HTTP_MESSAGE_RES_BODY_END: - is_req_msg = 0; - break; +/* the input is a single hex digit */ +#define onehex2dec(x) __g_httpd_hextable[x - '0'] - case HTTP_TRANSACTION_START: - case HTTP_TRANSACTION_END: +#include <ctype.h> + // https://github.com/curl/curl/blob/2e930c333658725657b94a923d175c6622e0f41d/lib/urlapi.c + // void httpd_url_decode(const char *string, size_t length, char *ostring, size_t *olen) + size_t http_url_decode(const char *string, size_t length, char *ostring, size_t olen) { - enum flow_type cur_dir = session_get_flow_type(sess); - if (FLOW_TYPE_C2S == cur_dir) + char *ns = ostring; + if (NULL == string || NULL == ostring || 0 == olen) { - is_req_msg = 1; + return 0; } - else + size_t alloc = length; + while (alloc) { - is_req_msg = 0; + unsigned char in = (unsigned char)*string; + if (('%' == in) && (alloc > 2) && + isxdigit(string[1]) && isxdigit(string[2])) + { + /* this is two hexadecimal digits following a '%' */ + in = (unsigned char)(onehex2dec(string[1]) << 4) | onehex2dec(string[2]); + string += 3; + alloc -= 3; + } + else + { + string++; + alloc--; + } + *ns++ = (char)in; + // if ((size_t)(ns - ostring) >= olen - 1) + // { + // return 1; + // } } + return ns - ostring; } - break; - - default: - assert(0); - fprintf(stderr, "unknow message type:%d\n", (int)msg_type); - break; - } - return is_req_msg; -} -int http_event_is_req(enum http_event event) -{ - switch (event) + int httpd_url_is_encoded(const char *url, size_t len) { - case HTTP_EVENT_REQ_INIT: - case HTTP_EVENT_REQ_LINE: - case HTTP_EVENT_REQ_HDR: - case HTTP_EVENT_REQ_HDR_END: - case HTTP_EVENT_REQ_BODY_BEGIN: - case HTTP_EVENT_REQ_BODY_DATA: - case HTTP_EVENT_REQ_BODY_END: - case HTTP_EVENT_REQ_END: - return 1; - break; - - case HTTP_EVENT_RES_INIT: - case HTTP_EVENT_RES_LINE: - case HTTP_EVENT_RES_HDR: - case HTTP_EVENT_RES_HDR_END: - case HTTP_EVENT_RES_BODY_BEGIN: - case HTTP_EVENT_RES_BODY_DATA: - case HTTP_EVENT_RES_BODY_END: - case HTTP_EVENT_RES_END: + for (size_t i = 0; i < len; i++) + { + if (url[i] == '%') + { + return 1; + } + } return 0; - break; - - default: - assert(0); - fprintf(stderr, "unknow event type:%d\n", (int)event); - break; } - return -1; -} -int stellar_session_mq_get_topic_id_reliable(struct stellar *st, const char *topic_name, - stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg) -{ - int topic_id = stellar_mq_get_topic_id(st, topic_name); - if (topic_id < 0) + static void httpd_set_tcp_addr(const struct tcphdr *tcph, struct http_session_addr *addr, enum flow_type fdir) { - topic_id = stellar_mq_create_topic(st, topic_name, msg_free_cb, msg_free_arg); + if (FLOW_TYPE_C2S == fdir) + { + addr->sport = tcph->th_sport; + addr->dport = tcph->th_dport; + } + else + { + addr->sport = tcph->th_dport; + addr->dport = tcph->th_sport; + } } - return topic_id; -} - -static const unsigned char __g_httpd_hextable[] = { - 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 0, 0, 0, /* 0x30 - 0x3f */ - 0, 10, 11, 12, 13, 14, 15, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* 0x40 - 0x4f */ - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* 0x50 - 0x5f */ - 0, 10, 11, 12, 13, 14, 15 /* 0x60 - 0x66 */ -}; - -/* the input is a single hex digit */ -#define onehex2dec(x) __g_httpd_hextable[x - '0'] - -#include <ctype.h> -// https://github.com/curl/curl/blob/2e930c333658725657b94a923d175c6622e0f41d/lib/urlapi.c -// void httpd_url_decode(const char *string, size_t length, char *ostring, size_t *olen) -size_t http_url_decode(const char *string, size_t length, char *ostring, size_t olen) -{ - char *ns = ostring; - if (NULL == string || NULL == ostring || 0 == olen) + static void httpd_set_ipv4_addr(const struct ip *ip4h, struct http_session_addr *addr, enum flow_type fdir) { - return 0; + addr->ipver = 4; + if (FLOW_TYPE_C2S == fdir) + { + addr->saddr4 = ip4h->ip_src.s_addr; + addr->daddr4 = ip4h->ip_dst.s_addr; + } + else + { + addr->saddr4 = ip4h->ip_dst.s_addr; + addr->daddr4 = ip4h->ip_src.s_addr; + } } - size_t alloc = length; - while (alloc) + static void httpd_set_ipv6_addr(const struct ip6_hdr *ip6h, struct http_session_addr *addr, enum flow_type fdir) { - unsigned char in = (unsigned char)*string; - if (('%' == in) && (alloc > 2) && - isxdigit(string[1]) && isxdigit(string[2])) + addr->ipver = 6; + if (FLOW_TYPE_C2S == fdir) { - /* this is two hexadecimal digits following a '%' */ - in = (unsigned char)(onehex2dec(string[1]) << 4) | onehex2dec(string[2]); - string += 3; - alloc -= 3; + memcpy(&addr->saddr6, &ip6h->ip6_src, sizeof(struct in6_addr)); + memcpy(&addr->daddr6, &ip6h->ip6_dst, sizeof(struct in6_addr)); } else { - string++; - alloc--; + memcpy(&addr->saddr6, &ip6h->ip6_dst, sizeof(struct in6_addr)); + memcpy(&addr->daddr6, &ip6h->ip6_src, sizeof(struct in6_addr)); } - *ns++ = (char)in; - // if ((size_t)(ns - ostring) >= olen - 1) - // { - // return 1; - // } } - return ns - ostring; -} -int httpd_url_is_encoded(const char *url, size_t len) -{ - for (size_t i = 0; i < len; i++) + void httpd_session_get_addr(const struct session *sess, struct http_session_addr *addr) { - if (url[i] == '%') + if (sess == NULL || addr == NULL) + { + return; + } + enum flow_type fdir = session_get_flow_type(sess); + const struct packet *raw_pkt = session_get_first_packet(sess, fdir); + if (NULL == raw_pkt) { - return 1; + addr->ipver = 0; + return; + } + + int count = packet_get_layer_count(raw_pkt); + for (int i = count - 1; i >= 0; i--) + { + const struct layer *layer = packet_get_layer_by_idx(raw_pkt, i); + if (layer->proto == LAYER_PROTO_TCP) + { + httpd_set_tcp_addr(layer->hdr.tcp, addr, fdir); + } + else if (layer->proto == LAYER_PROTO_IPV4) + { + httpd_set_ipv4_addr(layer->hdr.ip4, addr, fdir); + break; + } + else if (layer->proto == LAYER_PROTO_IPV6) + { + httpd_set_ipv6_addr(layer->hdr.ip6, addr, fdir); + break; + } } } - return 0; -} -static void httpd_set_tcp_addr(const struct tcphdr *tcph, struct httpd_session_addr *addr, enum flow_type fdir) -{ - if (FLOW_TYPE_C2S == fdir) + void http_session_addr_ntop(const struct http_session_addr *sesaddr, char *buf, size_t buflen) { - addr->sport = tcph->th_sport; - addr->dport = tcph->th_dport; + char sip_str[INET6_ADDRSTRLEN] = {0}; + char dip_str[INET6_ADDRSTRLEN] = {0}; + uint16_t sport_host, dport_host; + if (sesaddr->ipver == 4) + { + inet_ntop(AF_INET, &sesaddr->saddr4, sip_str, sizeof(sip_str)); + inet_ntop(AF_INET, &sesaddr->daddr4, dip_str, sizeof(dip_str)); + } + else if (sesaddr->ipver == 6) + { + inet_ntop(AF_INET6, &sesaddr->saddr6, sip_str, sizeof(sip_str)); + inet_ntop(AF_INET6, &sesaddr->daddr6, dip_str, sizeof(dip_str)); + } + sport_host = ntohs(sesaddr->sport); + dport_host = ntohs(sesaddr->dport); + snprintf(buf, buflen, "%s:%u-%s:%u", sip_str, sport_host, dip_str, dport_host); } - else + + struct http_buffer *http_buffer_new(void) { - addr->sport = tcph->th_dport; - addr->dport = tcph->th_sport; + struct http_buffer *buffer = CALLOC(struct http_buffer, 1); + buffer->buffer = NULL; + buffer->buffer_size = 0; + buffer->reference = 1; + return buffer; } -} -static void httpd_set_ipv4_addr(const struct ip *ip4h, struct httpd_session_addr *addr, enum flow_type fdir) -{ - addr->ipver = 4; - if (FLOW_TYPE_C2S == fdir) + + void http_buffer_free(struct http_buffer *buffer) { - addr->saddr4 = ip4h->ip_src.s_addr; - addr->daddr4 = ip4h->ip_dst.s_addr; + if (NULL == buffer) + { + return; + } + if (buffer->reference > 1) + { + buffer->reference--; + return; + } + if (buffer->buffer) + { + FREE(buffer->buffer); + } + FREE(buffer); } - else + + int http_buffer_add(struct http_buffer *buffer, const char *data, size_t data_len) { - addr->saddr4 = ip4h->ip_dst.s_addr; - addr->daddr4 = ip4h->ip_src.s_addr; + if (NULL == buffer || NULL == data || 0 == data_len) + { + return -1; + } + buffer->buffer = REALLOC(char, buffer->buffer, buffer->buffer_size + data_len); + memcpy(buffer->buffer + buffer->buffer_size, data, data_len); + buffer->buffer_size += data_len; + return 0; } -} -static void httpd_set_ipv6_addr(const struct ip6_hdr *ip6h, struct httpd_session_addr *addr, enum flow_type fdir) -{ - addr->ipver = 6; - if (FLOW_TYPE_C2S == fdir) + + int http_buffer_read(struct http_buffer *buffer, char **data, size_t *data_len) { - memcpy(&addr->saddr6, &ip6h->ip6_src, sizeof(struct in6_addr)); - memcpy(&addr->daddr6, &ip6h->ip6_dst, sizeof(struct in6_addr)); + if (NULL == buffer) + { + if (data) + { + *data = NULL; + } + if (data_len) + { + *data_len = 0; + } + return -1; + } + *data = buffer->buffer; + *data_len = buffer->buffer_size; + return 0; } - else + + char *http_string_dup(const char *str, size_t len) { - memcpy(&addr->saddr6, &ip6h->ip6_dst, sizeof(struct in6_addr)); - memcpy(&addr->daddr6, &ip6h->ip6_src, sizeof(struct in6_addr)); + if (NULL == str || 0 == len) + { + return NULL; + } + char *new_str = ALLOC(char, len); + memcpy(new_str, str, len); + return new_str; } -} -void httpd_session_get_addr(const struct session *sess, struct httpd_session_addr *addr) -{ - if (sess == NULL || addr == NULL) + long long http_strtoll(const char *str, size_t strlen) { - return; + if (NULL == str || 0 == strlen || strlen >= 19 /* INT64_MAX */) + { + return 0; + } + char tmp_str[strlen + 1]; + memcpy(tmp_str, str, strlen); + tmp_str[strlen] = '\0'; + return strtoll(tmp_str, NULL, 10); } - enum flow_type fdir = session_get_flow_type(sess); - const struct packet *raw_pkt = session_get_first_packet(sess, fdir); - if (NULL == raw_pkt) + + /* + * return value: + * EOF offset of beggining. + */ + size_t http_line_header_completed(const char *data, size_t data_len) { - addr->ipver = 0; - return; + if (data_len < 4) //"\r\n\r\n" + { + return 0; + } + void *ptr = memmem(data, data_len, "\r\n\r\n", 4); + if (ptr != NULL) + { + return (char *)ptr - data + 4; + } + return 0; } - int count = packet_get_layer_count(raw_pkt); - for (int i = count - 1; i >= 0; i--) + int http_protocol_identify(const char *data, size_t data_len) { - const struct layer *layer = packet_get_layer_by_idx(raw_pkt, i); - if (layer->proto == LAYER_PROTO_TCP) + llhttp_t parser; + llhttp_settings_t settings; + enum llhttp_errno error; + + if (NULL == data || 0 == data_len) { - httpd_set_tcp_addr(layer->hdr.tcp, addr, fdir); + return -1; } - else if (layer->proto == LAYER_PROTO_IPV4) + llhttp_settings_init(&settings); + llhttp_init(&parser, HTTP_BOTH, &settings); + + data_len = MIN(HTTP_IDENTIFY_LEN, data_len); + error = llhttp_execute(&parser, data, data_len); + if (error != HPE_OK) { - httpd_set_ipv4_addr(layer->hdr.ip4, addr, fdir); - break; + return -1; } - else if (layer->proto == LAYER_PROTO_IPV6) + return 1; + } + + void http_truncate_extract_headers(const char *raw_data, size_t raw_data_len, const char **headers_start, const char **headers_end) + { + const char *start = memmem(raw_data, raw_data_len, "\r\n", 2); + if (start != NULL) { - httpd_set_ipv6_addr(layer->hdr.ip6, addr, fdir); - break; + start += 2; + *headers_start = start; } + else + { + *headers_start = NULL; + } + const char *end = memmem(raw_data, raw_data_len, "\r\n\r\n", 4); + if (end != NULL) + { + end += 4; + *headers_end = end; + } + else + { + *headers_start = NULL; + *headers_end = NULL; + } + return; } -}
\ No newline at end of file + +#ifdef __cplusplus +} +#endif
\ No newline at end of file diff --git a/decoders/http/http_decoder_utils.h b/decoders/http/http_decoder_utils.h index 01a32b2..c370228 100644 --- a/decoders/http/http_decoder_utils.h +++ b/decoders/http/http_decoder_utils.h @@ -10,31 +10,27 @@ extern "C" #include "stellar/stellar.h" #include "stellar/utils.h" #include "stellar/session.h" -#include "stellar/stellar_mq.h" -#include "stellar/stellar_exdata.h" -#ifdef __cplusplus -} +#include "llhttp.h" + +#ifndef UNUSED +#define UNUSED __attribute__((unused)) #endif -char *http_safe_dup(const char *str, size_t len); -int http_strncasecmp_safe(const char *fix_s1, const char *dyn_s2, size_t fix_n1, size_t dyn_n2); -const char *http_message_type_to_string(enum http_message_type type); -int http_message_type_is_req(struct session *sess, enum http_message_type msg_type); -int http_event_is_req(enum http_event event); -int stellar_session_mq_get_topic_id_reliable(struct stellar *st, const char *topic_name, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg); -void httpd_url_decode(const char *string, size_t length, char *ostring, size_t *olen); -int httpd_url_is_encoded(const char *url, size_t len); -/****************************************************************************** - * Logger - ******************************************************************************/ + char *http_safe_dup(const char *str, size_t len); + int http_strncasecmp_safe(const char *fix_s1, const char *dyn_s2, size_t fix_n1, size_t dyn_n2); + void httpd_url_decode(const char *string, size_t length, char *ostring, size_t *olen); + int httpd_url_is_encoded(const char *url, size_t len); + /****************************************************************************** + * Logger + ******************************************************************************/ -enum http_decoder_log_level -{ - DEBUG = 0x11, - WARN = 0x12, - INFO = 0x13, - ERROR = 0x14, -}; + enum http_decoder_log_level + { + DEBUG = 0x11, + WARN = 0x12, + INFO = 0x13, + ERROR = 0x14, + }; #ifndef http_decoder_log #define http_decoder_log(level, format, ...) \ @@ -63,17 +59,45 @@ enum http_decoder_log_level #include <netinet/in.h> -struct httpd_session_addr -{ - uint8_t ipver; /* 4 or 6 */ - uint16_t sport; /* network order */ - uint16_t dport; /* network order */ - union + struct http_session_addr + { + uint8_t ipver; /* 4 or 6 */ + uint16_t sport; /* network order */ + uint16_t dport; /* network order */ + union + { + uint32_t saddr4; + struct in6_addr saddr6; + }; + union + { + uint32_t daddr4; + struct in6_addr daddr6; + }; + }; + /* + why not use libevent evbuffer? + 1. evbuffer is a buffer chain, it is not suitable for http half flow cache; + 2. http_half_flow_buffer is a whole continuous buffer; + */ + struct http_buffer { - uint32_t saddr4; - uint32_t daddr4; - struct in6_addr saddr6; - struct in6_addr daddr6; + int reference; // used by other + char *buffer; + size_t buffer_size; }; -}; -void httpd_session_get_addr(const struct session *sess, struct httpd_session_addr *addr); + int http_protocol_identify(const char *data, size_t data_len); + void httpd_session_get_addr(const struct session *sess, struct http_session_addr *addr); + void http_session_addr_ntop(const struct http_session_addr *sesaddr, char *buf, size_t buflen); + struct http_buffer *http_buffer_new(void); + void http_buffer_free(struct http_buffer *buffer); + int http_buffer_add(struct http_buffer *buffer, const char *data, size_t data_len); + int http_buffer_read(struct http_buffer *buffer, char **data, size_t *data_len); + char *http_string_dup(const char *str, size_t len); + long long http_strtoll(const char *str, size_t strlen); + size_t http_line_header_completed(const char *data, size_t data_len); + void http_truncate_extract_headers(const char *raw_data, size_t raw_data_len, const char **headers_start, const char **headers_end); + +#ifdef __cplusplus +} +#endif
\ No newline at end of file diff --git a/decoders/http/version.map b/decoders/http/version.map index a64b729..c20d021 100644 --- a/decoders/http/version.map +++ b/decoders/http/version.map @@ -1,11 +1,12 @@ VERS_3.0{ global: extern "C" { - http_message_*; - http_decoder_init; - http_decoder_exit; - http_decoder_tcp_stream_msg_cb; + http_init; + http_exit; + http_subscribe_*; + http_get0_*; http_url_decode; + http_module_to_http; }; local: *; };
\ No newline at end of file |
