diff options
| author | yangwei <[email protected]> | 2024-08-15 18:54:01 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2024-08-16 10:48:40 +0800 |
| commit | b6969710d69cce3ca9aeb710b49f66192d0286ca (patch) | |
| tree | b9e3c1bc98f227933d5ae98516f9e322055c4d72 | |
| parent | 08208cf0a5e90caf1dc649ef42bd4b259931f574 (diff) | |
✨ feat(decoder http): integrate http decoder codes
| -rw-r--r-- | decoders/http/CMakeLists.txt | 14 | ||||
| -rw-r--r-- | decoders/http/http_content_decompress.cpp | 254 | ||||
| -rw-r--r-- | decoders/http/http_content_decompress.h | 52 | ||||
| -rw-r--r-- | decoders/http/http_decoder.cpp | 1209 | ||||
| -rw-r--r-- | decoders/http/http_decoder_half.cpp | 1216 | ||||
| -rw-r--r-- | decoders/http/http_decoder_half.h | 105 | ||||
| -rw-r--r-- | decoders/http/http_decoder_inc.h | 162 | ||||
| -rw-r--r-- | decoders/http/http_decoder_result_queue.cpp | 148 | ||||
| -rw-r--r-- | decoders/http/http_decoder_result_queue.h | 61 | ||||
| -rw-r--r-- | decoders/http/http_decoder_stat.cpp | 121 | ||||
| -rw-r--r-- | decoders/http/http_decoder_stat.h | 57 | ||||
| -rw-r--r-- | decoders/http/http_decoder_string.cpp | 260 | ||||
| -rw-r--r-- | decoders/http/http_decoder_string.h | 74 | ||||
| -rw-r--r-- | decoders/http/http_decoder_table.cpp | 534 | ||||
| -rw-r--r-- | decoders/http/http_decoder_table.h | 80 | ||||
| -rw-r--r-- | decoders/http/http_decoder_tunnel.cpp | 107 | ||||
| -rw-r--r-- | decoders/http/http_decoder_tunnel.h | 35 | ||||
| -rw-r--r-- | decoders/http/http_decoder_utils.cpp | 309 | ||||
| -rw-r--r-- | decoders/http/http_decoder_utils.h | 80 | ||||
| -rw-r--r-- | decoders/http/version.map | 11 |
20 files changed, 4889 insertions, 0 deletions
diff --git a/decoders/http/CMakeLists.txt b/decoders/http/CMakeLists.txt new file mode 100644 index 0000000..7e49755 --- /dev/null +++ b/decoders/http/CMakeLists.txt @@ -0,0 +1,14 @@ +add_definitions(-fPIC) + +set(HTTP_SRC ${DEPS_SRC} http_decoder.cpp http_decoder_utils.cpp http_decoder_half.cpp + http_decoder_table.cpp http_decoder_string.cpp http_content_decompress.cpp + http_decoder_result_queue.cpp http_decoder_stat.cpp http_decoder_tunnel.cpp) + +add_library(http_decoder STATIC ${HTTP_SRC}) +set_target_properties(http_decoder PROPERTIES LINK_FLAGS "-Wl,--version-script=${PROJECT_SOURCE_DIR}/src/version.map") +target_include_directories(http_decoder PUBLIC ${CMAKE_SOURCE_DIR}/deps/) +target_link_libraries(http_decoder z llhttp-static fieldstat4) +target_link_libraries(http_decoder brotli-dec-static brotli-common-static ) +set_target_properties(http_decoder PROPERTIES PREFIX "") + +#install(TARGETS http_decoder LIBRARY DESTINATION ${CMAKE_INSTALL_PREFIX}/plugin/${lib_name} COMPONENT LIBRARIES)
\ No newline at end of file diff --git a/decoders/http/http_content_decompress.cpp b/decoders/http/http_content_decompress.cpp new file mode 100644 index 0000000..3b71a40 --- /dev/null +++ b/decoders/http/http_content_decompress.cpp @@ -0,0 +1,254 @@ +/* +********************************************************************************************** +* File: http_content_decompress.c +* Description: +* Authors: LuWenPeng <[email protected]> +* Date: 2022-10-31 +* Copyright: (c) Since 2022 Geedge Networks, Ltd. All rights reserved. +*********************************************************************************************** +*/ +#include <zlib.h> +#include <string.h> +#include <assert.h> +#include <brotli/decode.h> +#include "http_decoder_inc.h" + +#define HTTP_DECOMPRESS_BUFFER_SIZE (4096) + +struct http_content_decompress { + enum http_content_encoding encoding; + z_stream *z_stream_ptr; + BrotliDecoderState *br_state; + char *buffer; + size_t buffer_size; +}; + +enum http_content_encoding +http_content_encoding_str2int(const char *content_encoding) +{ + if (strcasestr(content_encoding, "gzip") != NULL) { + return HTTP_CONTENT_ENCODING_GZIP; + } + if (strcasestr(content_encoding, "deflate") != NULL) { + return HTTP_CONTENT_ENCODING_DEFLATE; + } + if (strcasestr(content_encoding, "br") != NULL) { + return HTTP_CONTENT_ENCODING_BR; + } + return HTTP_CONTENT_ENCODING_NONE; +} + +const char * +http_content_encoding_int2str(enum http_content_encoding content_encoding) +{ + if (content_encoding == HTTP_CONTENT_ENCODING_GZIP) { + return "gzip"; + } + if (content_encoding == HTTP_CONTENT_ENCODING_DEFLATE) { + return "deflate"; + } + if (content_encoding == HTTP_CONTENT_ENCODING_BR) { + return "br"; + } + return "unknown"; +} + +struct http_content_decompress * +http_content_decompress_create(enum http_content_encoding encoding) +{ + struct http_content_decompress *decompress = + CALLOC(struct http_content_decompress, 1); + assert(decompress); + + decompress->encoding = encoding; + decompress->z_stream_ptr = NULL; + decompress->br_state = NULL; + + if (encoding == HTTP_CONTENT_ENCODING_GZIP + || encoding == HTTP_CONTENT_ENCODING_DEFLATE) { + decompress->z_stream_ptr = CALLOC(z_stream, 1); + assert(decompress->z_stream_ptr); + + decompress->z_stream_ptr->zalloc = NULL; + decompress->z_stream_ptr->zfree = NULL; + decompress->z_stream_ptr->opaque = NULL; + decompress->z_stream_ptr->avail_in = 0; + decompress->z_stream_ptr->next_in = Z_NULL; + + if (encoding == HTTP_CONTENT_ENCODING_GZIP) { + if (inflateInit2(decompress->z_stream_ptr, MAX_WBITS + 16) + != Z_OK) { + goto error; + } + } + if (encoding == HTTP_CONTENT_ENCODING_DEFLATE) { + if (inflateInit2(decompress->z_stream_ptr, -MAX_WBITS) != Z_OK) { + goto error; + } + } + } + + if (encoding == HTTP_CONTENT_ENCODING_BR) { + decompress->br_state = BrotliDecoderCreateInstance(NULL, NULL, NULL); + if (decompress->br_state == NULL) { + goto error; + } + } + return decompress; + +error: + http_content_decompress_destroy(decompress); + return NULL; +} + +void http_content_decompress_destroy(struct http_content_decompress *decompress) +{ + if (NULL == decompress) { + return; + } + if (decompress->z_stream_ptr != NULL) { + inflateEnd(decompress->z_stream_ptr); + FREE(decompress->z_stream_ptr); + } + if (decompress->br_state) { + BrotliDecoderDestroyInstance(decompress->br_state); + decompress->br_state = NULL; + } + FREE(decompress->buffer); + FREE(decompress); +} + +static int +http_content_decompress_write_zlib(struct http_content_decompress *decompress, + const char *indata, size_t indata_len, + char **outdata, size_t *outdata_len) +{ + z_stream *z_stream_ptr = decompress->z_stream_ptr; + z_stream_ptr->avail_in = (unsigned int)indata_len; + z_stream_ptr->next_in = (unsigned char *)indata; + z_stream_ptr->avail_out = (unsigned int)decompress->buffer_size; + z_stream_ptr->next_out = (unsigned char *)decompress->buffer; + + *outdata = NULL; + *outdata_len = 0; + + do { + int ret = inflate(z_stream_ptr, Z_NO_FLUSH); + if (ret == Z_STREAM_ERROR || ret == Z_NEED_DICT + || ret == Z_DATA_ERROR || ret == Z_MEM_ERROR) { + (void)inflateEnd(z_stream_ptr); + return -1; + } + + size_t have = decompress->buffer_size - z_stream_ptr->avail_out; + if (have > 0) { + if (0 == z_stream_ptr->avail_out) { + fprintf(stderr, "realloc outbuffer,before: %zu bytes, after :%zu B\n", decompress->buffer_size , decompress->buffer_size + have); ; + decompress->buffer_size += have; + decompress->buffer = REALLOC(char, decompress->buffer, + decompress->buffer_size); + *outdata = decompress->buffer; + *outdata_len = *outdata_len + have; + // http_decoder_log(DEBUG, "%s realloc outbuffer %zu bytes", + // http_content_encoding_int2str(decompress->encoding), + // decompress->buffer_size); + z_stream_ptr->avail_out = have; + z_stream_ptr->next_out = (unsigned char *)decompress->buffer + + (decompress->buffer_size - have); + } else { + *outdata = decompress->buffer; + *outdata_len = have; + } + } + if(Z_STREAM_END == ret){ + break; + } + } while (z_stream_ptr->avail_in != 0); + decompress->buffer = NULL; + decompress->buffer_size = 0; + return 0; +} + +static int +http_content_decompress_write_br(struct http_content_decompress *decompress, + const char *indata, size_t indata_len, + char **outdata, size_t *outdata_len) +{ + size_t available_in = indata_len; + const unsigned char *next_in = (const unsigned char *)indata; + size_t available_out = decompress->buffer_size; + unsigned char *next_out = (unsigned char *)decompress->buffer; + + *outdata = NULL; + *outdata_len = 0; + + for (;;) { + int ret = BrotliDecoderDecompressStream(decompress->br_state, &available_in, + &next_in, &available_out, &next_out, 0); + size_t have = decompress->buffer_size - available_out; + if (have > 0) { + if (0 == available_out) { + decompress->buffer_size += have; + decompress->buffer = REALLOC(char, decompress->buffer, + decompress->buffer_size); + *outdata = decompress->buffer; + *outdata_len = *outdata_len + have; + available_out = have; + next_out = (unsigned char *)decompress->buffer + + (decompress->buffer_size - have); + } else { + *outdata = decompress->buffer; + *outdata_len = have; + } + } + + if (ret == BROTLI_DECODER_RESULT_SUCCESS || + ret == BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT) { + decompress->buffer =NULL; + decompress->buffer_size = 0; + return 0; + } + + if (ret == BROTLI_DECODER_RESULT_ERROR) { + //BrotliDecoderErrorCode errcode = + BrotliDecoderGetErrorCode(decompress->br_state); + *outdata = NULL; + *outdata_len = 0; + return -1; + } + + assert(ret == BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT); + } +} + +int http_content_decompress_write(struct http_content_decompress *decompress, + const char *indata, size_t indata_len, + char **outdata, size_t *outdata_len) +{ + assert(decompress); + assert(indata); + assert(indata_len > 0); + assert(outdata); + assert(outdata_len); + + *outdata = NULL; + *outdata_len = 0; + + if(NULL == decompress->buffer ){ + decompress->buffer = CALLOC(char, HTTP_DECOMPRESS_BUFFER_SIZE); + assert(decompress->buffer); + decompress->buffer_size = HTTP_DECOMPRESS_BUFFER_SIZE; + } + + if (decompress->encoding == HTTP_CONTENT_ENCODING_GZIP || + decompress->encoding == HTTP_CONTENT_ENCODING_DEFLATE) { + return http_content_decompress_write_zlib(decompress, indata, indata_len, + outdata, outdata_len); + } + if (decompress->encoding == HTTP_CONTENT_ENCODING_BR) { + return http_content_decompress_write_br(decompress, indata, indata_len, + outdata, outdata_len); + } + assert(0); + return -1; +}
\ No newline at end of file diff --git a/decoders/http/http_content_decompress.h b/decoders/http/http_content_decompress.h new file mode 100644 index 0000000..3c2ba48 --- /dev/null +++ b/decoders/http/http_content_decompress.h @@ -0,0 +1,52 @@ +/* +********************************************************************************************** +* File: http_content_decompress.h +* Description: +* Authors: LuWenPeng <[email protected]> +* Date: 2022-10-31 +* Copyright: (c) Since 2022 Geedge Networks, Ltd. All rights reserved. +*********************************************************************************************** +*/ + + +#ifndef _HTTP_CONTENT_DECOMPRESS_H_ +#define _HTTP_CONTENT_DECOMPRESS_H_ + +#ifdef __cplusplus +extern "C" +{ +#endif + +#include <stddef.h> + +enum http_content_encoding { + HTTP_CONTENT_ENCODING_NONE = 0, + HTTP_CONTENT_ENCODING_GZIP = 1 << 1, + HTTP_CONTENT_ENCODING_DEFLATE = 1 << 2, + HTTP_CONTENT_ENCODING_BR = 1 << 3, +}; + +struct http_content_decompress; + +enum http_content_encoding +http_content_encoding_str2int(const char *content_encoding); + +const char * +http_content_encoding_int2str(enum http_content_encoding content_encoding); + +struct http_content_decompress * +http_content_decompress_create(enum http_content_encoding encoding); + +void http_content_decompress_destroy(struct http_content_decompress *decompress); + +// return 0 : success +// return -1 : error +int http_content_decompress_write(struct http_content_decompress *decompress, + const char *indata, size_t indata_len, + char **outdata, size_t *outdata_len); + +#ifdef __cplusplus +} +#endif + +#endif
\ No newline at end of file diff --git a/decoders/http/http_decoder.cpp b/decoders/http/http_decoder.cpp new file mode 100644 index 0000000..218c508 --- /dev/null +++ b/decoders/http/http_decoder.cpp @@ -0,0 +1,1209 @@ +#include "stellar/http.h" +#include <assert.h> +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include "http_decoder_inc.h" + +#pragma GCC diagnostic ignored "-Wunused-parameter" + +struct http_message *http_message_new(enum http_message_type type, struct http_decoder_result_queue *queue, + int queue_index, uint8_t flow_type) +{ + struct http_message *msg = CALLOC(struct http_message, 1); + msg->type = type; + msg->ref_queue = queue; + msg->queue_index = queue_index; + msg->flow_type = flow_type; + return msg; +} + +struct http_message *http_body_message_new(enum http_message_type type, struct http_decoder_result_queue *queue, + int queue_index, uint8_t flow_type, hstring *raw_payload, hstring *decompress_payload) +{ + struct http_message *msg = CALLOC(struct http_message, 1); + msg->type = type; + msg->ref_queue = queue; + msg->queue_index = queue_index; + msg->flow_type = flow_type; + if (raw_payload) + { + msg->raw_payload.iov_base = raw_payload->iov_base; + msg->raw_payload.iov_len = raw_payload->iov_len; + } + if (decompress_payload) + { + msg->decompress_payload.iov_base = decompress_payload->iov_base; + msg->decompress_payload.iov_len = decompress_payload->iov_len; + } + return msg; +} + +static void http_message_decompress_buffer_free(struct http_message *msg) +{ + struct http_decoder_half_data *ref_data = NULL; + if (HTTP_MESSAGE_REQ_BODY_START == msg->type || HTTP_MESSAGE_REQ_BODY == msg->type || HTTP_MESSAGE_REQ_BODY_END == msg->type) + { + ref_data = msg->ref_queue->array[msg->queue_index].req_data; + } + else if (HTTP_MESSAGE_RES_BODY_START == msg->type || HTTP_MESSAGE_RES_BODY == msg->type || HTTP_MESSAGE_RES_BODY_END == msg->type) + { + ref_data = msg->ref_queue->array[msg->queue_index].res_data; + } + if (ref_data != NULL && msg->decompress_payload.iov_base != NULL) + { + http_half_decompress_buffer_free(ref_data, &msg->decompress_payload); + } +} + +static void http_message_free(void *http_msg, void *cb_arg) +{ + if (http_msg) + { + http_message_decompress_buffer_free((struct http_message *)http_msg); + FREE(http_msg); + } +} + +static void http_event_handler(enum http_event event, struct http_decoder_half_data **data, + struct http_event_context *ev_ctx, void *httpd_plugin_env) +{ + assert(ev_ctx); + assert(httpd_plugin_env); + struct http_decoder_env *httpd_env = (struct http_decoder_env *)httpd_plugin_env; + size_t queue_idx = 0; + nmx_pool_t *mempool = ev_ctx->ref_mempool; + struct http_decoder_result_queue *queue = ev_ctx->ref_queue; + struct http_message *msg = NULL; + struct http_decoder_half_data *half_data = NULL; + int ret = 0; + u_int8_t flow_flag = 0; + struct http_decoder_exdata *exdata = ev_ctx->ref_httpd_ctx; + + int thread_id = stellar_get_current_thread_index(); + + if (http_event_is_req(event)) + { + queue_idx = http_decoder_result_queue_req_index(queue); + half_data = http_decoder_result_queue_peek_req(queue); + } + else + { + queue_idx = http_decoder_result_queue_res_index(queue); + half_data = http_decoder_result_queue_peek_res(queue); + } + + switch (event) + { + case HTTP_EVENT_REQ_INIT: + half_data = http_decoder_result_queue_peek_req(queue); + if (half_data != NULL) + { + http_decoder_result_queue_inc_req_index(queue); + } + + half_data = http_decoder_result_queue_peek_req(queue); + if (half_data != NULL) + { + half_data = http_decoder_result_queue_pop_req(queue); + http_decoder_half_data_free(mempool, half_data); + half_data = NULL; + } + + half_data = http_decoder_half_data_new(mempool); + ret = http_decoder_result_queue_push_req(queue, half_data); + if (ret < 0) + { + fprintf(stderr, "http_decoder_result_queue_push req failed."); + http_decoder_half_data_free(mempool, half_data); + half_data = NULL; + } + *data = half_data; + queue_idx = http_decoder_result_queue_req_index(queue); // get the index after inc + /* llhttp always call on_message_begin() even if llhttp_execute() error!!! */ + msg = http_message_new(HTTP_TRANSACTION_START, queue, queue_idx, HTTP_REQUEST); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_NEW, 1); + break; + case HTTP_EVENT_REQ_LINE: + msg = http_message_new(HTTP_MESSAGE_REQ_LINE, queue, queue_idx, HTTP_REQUEST); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); + if (httpd_tunnel_identify(httpd_env, FLOW_DIRECTION_C2S, half_data)) + { + exdata->tunnel_state = HTTP_TUN_C2S_HDR_START; + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TUNNEL, 1); + } + if (httpd_is_tunnel_session(httpd_env, exdata)) + { + http_decoder_get_url(half_data, mempool); + } + break; + case HTTP_EVENT_REQ_HDR: + msg = http_message_new(HTTP_MESSAGE_REQ_HEADER, queue, queue_idx, HTTP_REQUEST); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); + break; + case HTTP_EVENT_REQ_HDR_END: + { + http_decoder_join_url_finally(ev_ctx, half_data, mempool); + /* maybe some parsed headers in buffer, but has not pushed to plugins yet */ + + if (http_decoder_half_data_has_parsed_header(half_data)) + { + msg = http_message_new(HTTP_MESSAGE_REQ_HEADER, queue, queue_idx, HTTP_REQUEST); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); + } + http_half_data_update_commit_index(half_data); + msg = http_message_new(HTTP_MESSAGE_REQ_HEADER_END, queue, queue_idx, HTTP_REQUEST); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); + + int tot_c2s_headers = http_half_data_get_total_parsed_header_count(half_data); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_HEADERS_C2S, tot_c2s_headers); + + hstring tmp_url = {}; + http_half_data_get_url(half_data, &tmp_url); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_URL_BYTES, tmp_url.iov_len); + } + break; + case HTTP_EVENT_REQ_BODY_BEGIN: + msg = http_message_new(HTTP_MESSAGE_REQ_BODY_START, queue, queue_idx, HTTP_REQUEST); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); + break; + case HTTP_EVENT_REQ_BODY_DATA: + { + hstring raw_body = {}; + hstring decompress_body = {}; + http_decoder_half_data_get_raw_body(half_data, &raw_body); + http_half_get_lastest_decompress_buffer(half_data, &decompress_body); + msg = http_body_message_new(HTTP_MESSAGE_REQ_BODY, queue, queue_idx, HTTP_REQUEST, &raw_body, &decompress_body); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); + if(decompress_body.iov_base != NULL){ + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_ZIP_BYTES, raw_body.iov_len); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_UNZIP_BYTES, decompress_body.iov_len); + } + } + break; + case HTTP_EVENT_REQ_BODY_END: + msg = http_message_new(HTTP_MESSAGE_REQ_BODY_END, queue, queue_idx, HTTP_REQUEST); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); + break; + case HTTP_EVENT_REQ_END: + { + session_is_symmetric(ev_ctx->ref_session, &flow_flag); + + if (SESSION_SEEN_C2S_FLOW == flow_flag) + { + msg = http_message_new(HTTP_TRANSACTION_END, queue, queue_idx, HTTP_REQUEST); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_FREE, 1); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_ASYMMETRY_TRANSACTION_C2S, 1); + } + if (httpd_is_tunnel_session(httpd_env, exdata)) + { + if (SESSION_SEEN_C2S_FLOW == flow_flag) + { + exdata->tunnel_state = HTTP_TUN_INNER_STARTING; + exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].sub_topic_id; + } + else + { + exdata->tunnel_state = HTTP_TUN_C2S_END; + } + } + http_half_update_state(half_data, event); + http_decoder_result_queue_inc_req_index(queue); + half_data = http_decoder_result_queue_pop_req(queue); + if (half_data != NULL) + { + http_decoder_half_data_free(mempool, half_data); + half_data = NULL; + } + } + break; + + case HTTP_EVENT_RES_INIT: + half_data = http_decoder_result_queue_peek_res(queue); + if (half_data != NULL) + { + http_decoder_result_queue_inc_res_index(queue); + } + + half_data = http_decoder_result_queue_peek_res(queue); + if (half_data != NULL) + { + half_data = http_decoder_result_queue_pop_res(queue); + http_decoder_half_data_free(mempool, half_data); + half_data = NULL; + } + + half_data = http_decoder_half_data_new(mempool); + ret = http_decoder_result_queue_push_res(queue, half_data); + if (ret < 0) + { + fprintf(stderr, "http_decoder_result_queue_push res failed."); + http_decoder_half_data_free(mempool, half_data); + half_data = NULL; + } + queue_idx = http_decoder_result_queue_res_index(queue); // get the index after inc + *data = half_data; + if (0 == session_is_symmetric(ev_ctx->ref_session, &flow_flag)) + { + if (SESSION_SEEN_S2C_FLOW == flow_flag) + { + msg = http_message_new(HTTP_TRANSACTION_START, queue, queue_idx, HTTP_RESPONSE); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); + } + } + break; + case HTTP_EVENT_RES_LINE: + msg = http_message_new(HTTP_MESSAGE_RES_LINE, queue, queue_idx, HTTP_RESPONSE); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); + if (httpd_tunnel_identify(httpd_env, FLOW_DIRECTION_S2C, half_data)) + { + exdata->tunnel_state = HTTP_TUN_S2C_START; + } + else + { + // connect response fail, reset tunnel_state + exdata->tunnel_state = HTTP_TUN_NON; + } + break; + case HTTP_EVENT_RES_HDR: + msg = http_message_new(HTTP_MESSAGE_RES_HEADER, queue, queue_idx, HTTP_RESPONSE); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); + break; + case HTTP_EVENT_RES_HDR_END: + { + /* maybe some header in table buffer but has not pushed to plugins */ + half_data = http_decoder_result_queue_peek_res(queue); + if (http_decoder_half_data_has_parsed_header(half_data)) + { + msg = http_message_new(HTTP_MESSAGE_RES_HEADER, queue, queue_idx, HTTP_RESPONSE); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); + } + http_half_data_update_commit_index(half_data); + msg = http_message_new(HTTP_MESSAGE_RES_HEADER_END, queue, queue_idx, HTTP_RESPONSE); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); + + int tot_s2c_headers = http_half_data_get_total_parsed_header_count(half_data); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_HEADERS_S2C, tot_s2c_headers); + + if (httpd_is_tunnel_session(httpd_env, exdata)) + { + exdata->tunnel_state = HTTP_TUN_INNER_STARTING; + http_half_pre_context_free(ev_ctx->ref_session, exdata); + exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].sub_topic_id; + } + } + break; + case HTTP_EVENT_RES_BODY_BEGIN: + msg = http_message_new(HTTP_MESSAGE_RES_BODY_START, queue, queue_idx, HTTP_RESPONSE); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); + break; + case HTTP_EVENT_RES_BODY_DATA: + { + hstring raw_body = {}; + http_decoder_half_data_get_raw_body(half_data, &raw_body); + hstring decompress_body = {}; + http_half_get_lastest_decompress_buffer(half_data, &decompress_body); + msg = http_body_message_new(HTTP_MESSAGE_RES_BODY, queue, queue_idx, HTTP_RESPONSE, &raw_body, &decompress_body); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); + if(decompress_body.iov_base != NULL){ + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_ZIP_BYTES, raw_body.iov_len); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_UNZIP_BYTES, decompress_body.iov_len); + } + } + break; + case HTTP_EVENT_RES_BODY_END: + msg = http_message_new(HTTP_MESSAGE_RES_BODY_END, queue, queue_idx, HTTP_RESPONSE); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); + break; + case HTTP_EVENT_RES_END: + msg = http_message_new(HTTP_TRANSACTION_END, queue, queue_idx, HTTP_RESPONSE); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_FREE, 1); + session_is_symmetric(ev_ctx->ref_session, &flow_flag); + if (SESSION_SEEN_S2C_FLOW == flow_flag) + { + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_ASYMMETRY_TRANSACTION_S2C, 1); + } + + http_half_update_state(half_data, event); + http_decoder_result_queue_inc_res_index(queue); + half_data = http_decoder_result_queue_pop_res(queue); + if (half_data != NULL) + { + http_decoder_half_data_free(mempool, half_data); + half_data = NULL; + } + break; + default: + assert(0); + break; + } + if (half_data) + { + http_half_update_state(half_data, event); + } +} + +static struct http_decoder *http_decoder_new(struct http_decoder_exdata *hd_ctx, nmx_pool_t *mempool, + http_event_cb *ev_cb, int decompress_switch, struct http_decoder_env *httpd_env, + long long req_start_seq, long long res_start_seq) +{ + struct http_decoder *decoder = MEMPOOL_CALLOC(mempool, struct http_decoder, 1); + assert(decoder); + decoder->c2s_half = http_decoder_half_new(hd_ctx, mempool, ev_cb, HTTP_REQUEST, decompress_switch, httpd_env, req_start_seq); + decoder->s2c_half = http_decoder_half_new(hd_ctx, mempool, ev_cb, HTTP_RESPONSE, decompress_switch, httpd_env, res_start_seq); + return decoder; +} + +static void http_decoder_free(nmx_pool_t *mempool, struct http_decoder *decoder) +{ + if (NULL == decoder) + { + return; + } + if (decoder->c2s_half != NULL) + { + http_decoder_half_free(mempool, decoder->c2s_half); + decoder->c2s_half = NULL; + } + if (decoder->s2c_half != NULL) + { + http_decoder_half_free(mempool, decoder->s2c_half); + decoder->s2c_half = NULL; + } + MEMPOOL_FREE(mempool, decoder); +} + +static struct http_decoder_exdata *http_decoder_exdata_new(size_t mempool_size, size_t queue_size, + int decompress_switch, struct http_decoder_env *httpd_env, + long long req_start_seq, long long res_start_seq) +{ + struct http_decoder_exdata *hd_ctx = CALLOC(struct http_decoder_exdata, 1); + hd_ctx->mempool = nmx_create_pool(mempool_size); + hd_ctx->decoder = http_decoder_new(hd_ctx, hd_ctx->mempool, http_event_handler, decompress_switch, + httpd_env, req_start_seq, res_start_seq); + hd_ctx->queue = http_decoder_result_queue_new(hd_ctx->mempool, queue_size); + return hd_ctx; +} + +static void http_decoder_exdata_free(struct http_decoder_exdata *ex_data) +{ + if (unlikely(NULL == ex_data)) + { + return; + } + if (ex_data->decoder != NULL) + { + http_decoder_free(ex_data->mempool, ex_data->decoder); + ex_data->decoder = NULL; + } + if (ex_data->queue != NULL) + { + http_decoder_result_queue_free(ex_data->mempool, ex_data->queue); + ex_data->queue = NULL; + } + if (ex_data->mempool) + { + nmx_destroy_pool(ex_data->mempool); + } + FREE(ex_data); +} + +static int http_protocol_identify(const char *data, size_t data_len) +{ + llhttp_t parser; + llhttp_settings_t settings; + enum llhttp_errno error; + + llhttp_settings_init(&settings); + llhttp_init(&parser, HTTP_BOTH, &settings); + + error = llhttp_execute(&parser, data, data_len); + if (error != HPE_OK) + { + return -1; + } + return 1; +} + +static void _http_decoder_context_free(struct http_decoder_env *env) +{ + if (NULL == env) + { + return; + } + + http_decoder_stat_free(&env->hd_stat); + + for (int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++) + { + if (env->topic_exdata_compose[i].msg_free_cb) + { + stellar_mq_destroy_topic(env->st, env->topic_exdata_compose[i].sub_topic_id); + } + } + + FREE(env); +} + +static int load_http_decoder_config(const char *cfg_path, + struct http_decoder_config *hd_cfg) +{ + FILE *fp = fopen(cfg_path, "r"); + if (NULL == fp) + { + fprintf(stderr, "[%s:%d]Can't open config file:%s", + __FUNCTION__, __LINE__, cfg_path); + return -1; + } + + int ret = 0; + char errbuf[256] = {0}; + + toml_table_t *root = toml_parse_file(fp, errbuf, sizeof(errbuf)); + fclose(fp); + + toml_table_t *basic_sec_tbl = toml_table_in(root, "basic"); + if (NULL == basic_sec_tbl) + { + fprintf(stderr, "[%s:%d]config file:%s has no key: [basic]", + __FUNCTION__, __LINE__, cfg_path); + toml_free(root); + return -1; + } + + toml_datum_t int_val = toml_int_in(basic_sec_tbl, "decompress"); + if (int_val.ok != 0) + { + hd_cfg->decompress_switch = int_val.u.b; + } + + int_val = toml_int_in(basic_sec_tbl, "mempool_size"); + if (int_val.ok != 0) + { + hd_cfg->mempool_size = int_val.u.i; + } + else + { + hd_cfg->mempool_size = DEFAULT_MEMPOOL_SIZE; + } + + int_val = toml_int_in(basic_sec_tbl, "result_queue_len"); + if (int_val.ok != 0) + { + hd_cfg->result_queue_len = int_val.u.i; + } + else + { + hd_cfg->result_queue_len = HD_RESULT_QUEUE_LEN; + } + + int_val = toml_int_in(basic_sec_tbl, "stat_interval_pkts"); + if (int_val.ok != 0) + { + hd_cfg->stat_interval_pkts = int_val.u.i; + } + else + { + hd_cfg->stat_interval_pkts = DEFAULT_STAT_INTERVAL_PKTS; + } + + int_val = toml_int_in(basic_sec_tbl, "stat_output_interval"); + if (int_val.ok != 0) + { + hd_cfg->stat_output_interval = int_val.u.i; + } + else + { + hd_cfg->stat_output_interval = DEFAULT_STAT_OUTPUT_INTERVAL; + } + + int_val = toml_int_in(basic_sec_tbl, "proxy_enable"); + if (int_val.ok != 0) + { + hd_cfg->proxy_enable = int_val.u.i; + } + else + { + hd_cfg->proxy_enable = 0; + } + + toml_free(root); + return ret; +} + +static int http_msg_get_request_header(const struct http_message *msg, const hstring *key, + struct http_header *hdr_result) +{ + const struct http_decoder_half_data *req_data = + msg->ref_queue->array[msg->queue_index].req_data; + return http_decoder_half_data_get_header(req_data, key, hdr_result); +} + +static int http_msg_get_response_header(const struct http_message *msg, const hstring *key, + struct http_header *hdr_result) +{ + const struct http_decoder_half_data *res_data = + msg->ref_queue->array[msg->queue_index].res_data; + return http_decoder_half_data_get_header(res_data, key, hdr_result); +} + +static int http_msg_request_header_next(const struct http_message *msg, + struct http_header *hdr) +{ + const struct http_decoder_half_data *req_data = + msg->ref_queue->array[msg->queue_index].req_data; + return http_decoder_half_data_iter_header((struct http_decoder_half_data *)req_data, hdr); +} + +static int http_msg_response_header_next(const struct http_message *msg, struct http_header *hdr) +{ + const struct http_decoder_half_data *res_data = + msg->ref_queue->array[msg->queue_index].res_data; + return http_decoder_half_data_iter_header((struct http_decoder_half_data *)res_data, hdr); +} + +#if 0 +static int http_msg_get_request_raw_body(const struct http_message *msg, hstring *body) +{ + const struct http_decoder_half_data *req_data = + msg->ref_queue->array[msg->queue_index].req_data; + return http_decoder_half_data_get_raw_body(req_data, body); +} + +static int http_msg_get_response_raw_body(const struct http_message *msg, hstring *body) +{ + const struct http_decoder_half_data *res_data = + msg->ref_queue->array[msg->queue_index].res_data; + return http_decoder_half_data_get_raw_body(res_data, body); +} + +static int http_msg_get_request_decompress_body(const struct http_message *msg, hstring *body) +{ + const struct http_decoder_half_data *req_data = + msg->ref_queue->array[msg->queue_index].req_data; + return http_decoder_half_data_get_decompress_body(req_data, body); +} + +static int http_msg_get_response_decompress_body(const struct http_message *msg, hstring *body) +{ + const struct http_decoder_half_data *res_data = + msg->ref_queue->array[msg->queue_index].res_data; + return http_decoder_half_data_get_decompress_body(res_data, body); +} +#endif + +static struct http_decoder_exdata *httpd_session_exdata_new(struct session *sess, struct http_decoder_env *httpd_env, + long long req_start_seq, long long res_start_seq) +{ + struct http_decoder_exdata *exdata = http_decoder_exdata_new(httpd_env->hd_cfg.mempool_size, + httpd_env->hd_cfg.result_queue_len, + httpd_env->hd_cfg.decompress_switch, + httpd_env, req_start_seq, res_start_seq); + // exdata->sub_topic_id = sub_topic_id; + int thread_id = stellar_get_current_thread_index(); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_SESSION_NEW, 1); + return exdata; +} + +#ifdef __cplusplus +extern "C" +{ +#endif + + void httpd_ex_data_free_cb(int idx, void *ex_data, void *arg) + { + if (NULL == ex_data) + { + return; + } + struct http_decoder_exdata *exdata = (struct http_decoder_exdata *)ex_data; + http_decoder_exdata_free(exdata); + } + + void *httpd_session_ctx_new_cb(struct session *sess, void *plugin_env) + { + return (void *)HTTP_CTX_IS_HTTP; + } + + void httpd_session_ctx_free_cb(struct session *sess, void *session_ctx, void *plugin_env) + { + if (NULL == plugin_env || NULL == session_ctx) + { + return; + } + if (strncmp((const char *)session_ctx, HTTP_CTX_NOT_HTTP, strlen(HTTP_CTX_NOT_HTTP)) == 0) + { + return; + } + struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env; + int thread_id = stellar_get_current_thread_index(); + unsigned char flow_flag = 0; + session_is_symmetric(sess, &flow_flag); + + if (SESSION_SEEN_C2S_FLOW == flow_flag) + { + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_ASYMMETRY_SESSION_C2S, 1); + } + else if (SESSION_SEEN_S2C_FLOW == flow_flag) + { + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_ASYMMETRY_SESSION_S2C, 1); + } + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_SESSION_FREE, 1); + } + + static void http_decoder_execute(struct session *sess, struct http_decoder_env *httpd_env, http_decoder_exdata *exdata, const char *payload, uint16_t payload_len) + { + if (httpd_in_tunnel_transmitting(httpd_env, exdata)) + { + http_decoder_push_tunnel_data(sess, exdata, httpd_tunnel_state_to_msg(exdata), payload, payload_len); + httpd_tunnel_state_update(exdata); + return; + } + + int thread_id = stellar_get_current_thread_index(); + struct http_decoder_half *cur_half = NULL; + enum flow_direction sess_dir = session_get_current_flow_direction(sess); + if (FLOW_DIRECTION_C2S == sess_dir) + { + cur_half = exdata->decoder->c2s_half; + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_BYTES_C2S, payload_len); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TCP_SEG_C2S, 1); + } + else + { + cur_half = exdata->decoder->s2c_half; + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_BYTES_S2C, payload_len); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TCP_SEG_S2C, 1); + } + + http_decoder_half_reinit(cur_half, exdata->queue, exdata->mempool, sess); + int ret = http_decoder_half_parse(httpd_env->hd_cfg.proxy_enable, cur_half, payload, payload_len); + if (ret < 0) + { + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_PARSE_ERR, 1); + stellar_session_plugin_dettach_current_session(sess); + } + } + + void http_decoder_tunnel_msg_cb(struct session *sess, int topic_id, const void *tmsg, void *per_session_ctx, void *plugin_env) + { + struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env; + if (0 == httpd_env->hd_cfg.proxy_enable) + { + return; + } + hstring tunnel_payload; + http_tunnel_message_get_payload((const struct http_tunnel_message *)tmsg, &tunnel_payload); + uint16_t payload_len = tunnel_payload.iov_len; + const char *payload = (char *)tunnel_payload.iov_base; + if (NULL == payload || 0 == payload_len) + { + return; + } + + struct http_decoder_exdata *exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].exdata_id); + enum http_tunnel_message_type tmsg_type = http_tunnel_message_type_get((const struct http_tunnel_message *)tmsg); + + switch (tmsg_type) + { + case HTTP_TUNNEL_OPENING: + { + if (NULL != exdata) + { + // not support nested http tunnel + session_mq_ignore_message(sess, topic_id, httpd_env->plugin_id); + return; + } + size_t http_identify_len = payload_len > HTTP_IDENTIFY_LEN ? HTTP_IDENTIFY_LEN : payload_len; + int is_http = http_protocol_identify(payload, http_identify_len); + if (is_http) + { + long long max_req_seq = 0, max_res_seq = 0; + struct http_decoder_exdata *tcp_stream_exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id); + http_half_get_max_transaction_seq(tcp_stream_exdata, &max_req_seq, &max_res_seq); + exdata = httpd_session_exdata_new(sess, httpd_env, max_req_seq, max_res_seq); + session_exdata_set(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].exdata_id, exdata); + exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_MSG_INDEX].sub_topic_id; + exdata->in_tunnel_is_http = 1; + } + else + { + // inner tunnel is not http, do nothing, do not push this message again !!! + session_mq_ignore_message(sess, topic_id, httpd_env->plugin_id); + return; + } + } + break; + + case HTTP_TUNNEL_ACTIVE: + if (NULL == exdata) + { + session_mq_ignore_message(sess, topic_id, httpd_env->plugin_id); + http_decoder_stat_update(&httpd_env->hd_stat, stellar_get_current_thread_index(), HTTPD_STAT_PARSE_ERR, 1); + return; + } + break; + + case HTTP_TUNNEL_CLOSING: + if (NULL == exdata) + { + http_decoder_stat_update(&httpd_env->hd_stat, stellar_get_current_thread_index(), HTTPD_STAT_PARSE_ERR, 1); + return; + } + if (exdata->in_tunnel_is_http) + { + http_half_pre_context_free(sess, exdata); + } + return; + break; + + default: + break; + } + if (exdata->in_tunnel_is_http) + { + http_decoder_execute(sess, httpd_env, exdata, payload, payload_len); + } + return; + } + + void http_decoder_tcp_stream_msg_cb(struct session *sess, int topic_id, const void *msg, void *nouse_session_ctx, void *plugin_env) + { + struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env; + struct http_decoder_exdata *exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id); + enum session_state sess_state = session_get_current_state(sess); + const char *payload = NULL; + uint16_t payload_len = 0; + + if (SESSION_STATE_CLOSED == sess_state) + { + if (httpd_in_tunnel_transmitting(httpd_env, exdata)) + { + http_decoder_push_tunnel_data(sess, exdata, HTTP_TUNNEL_CLOSING, NULL, 0); + } + else + { + http_half_pre_context_free(sess, exdata); + } + return; + } + assert(msg != NULL); + + payload_len = tcp_segment_get_len((struct tcp_segment *)msg); + payload = tcp_segment_get_data((const struct tcp_segment *)msg); + if (unlikely(0 == payload_len || NULL == payload)) + { + return; + } + if (NULL == exdata) // first packet + { + size_t http_identify_len = payload_len > HTTP_IDENTIFY_LEN ? HTTP_IDENTIFY_LEN : payload_len; + int ret = http_protocol_identify(payload, http_identify_len); + if (ret < 0) + { + stellar_session_plugin_dettach_current_session(sess); + return; + } + exdata = httpd_session_exdata_new(sess, httpd_env, 0, 0); + exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_MSG_INDEX].sub_topic_id; + session_exdata_set(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id, exdata); + } + http_decoder_execute(sess, httpd_env, exdata, payload, payload_len); + return; + } + + static const struct http_topic_exdata_compose g_topic_exdata_compose[HTTPD_TOPIC_INDEX_MAX] = + { + {HTTPD_TOPIC_TCP_STREAM_INDEX, TOPIC_TCP_STREAM, http_decoder_tcp_stream_msg_cb, NULL, "HTTP_DECODER_EXDATA_BASEON_TCP_STREAM", httpd_ex_data_free_cb, -1, -1}, + {HTTPD_TOPIC_HTTP_MSG_INDEX, HTTP_DECODER_TOPIC, NULL, http_message_free, NULL, NULL, -1, -1}, + {HTTPD_TOPIC_HTTP_TUNNEL_INDEX, HTTP_DECODER_TUNNEL_TOPIC, http_decoder_tunnel_msg_cb, http_message_free, "HTTP_DECODER_EXDATA_BASEON_HTTP_TUNNEL", httpd_ex_data_free_cb, -1, -1}, + }; + + static void http_decoder_topic_exdata_compose_init(struct http_decoder_env *httpd_env) + { + memcpy(httpd_env->topic_exdata_compose, g_topic_exdata_compose, sizeof(g_topic_exdata_compose)); + for (int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++) + { + httpd_env->topic_exdata_compose[i].sub_topic_id = stellar_session_mq_get_topic_id_reliable(httpd_env->st, + httpd_env->topic_exdata_compose[i].topic_name, + httpd_env->topic_exdata_compose[i].msg_free_cb, + NULL); + assert(httpd_env->topic_exdata_compose[i].sub_topic_id >= 0); + + if (httpd_env->topic_exdata_compose[i].exdata_name) + { + httpd_env->topic_exdata_compose[i].exdata_id = stellar_exdata_new_index(httpd_env->st, + httpd_env->topic_exdata_compose[i].exdata_name, + httpd_env->topic_exdata_compose[i].exdata_free_cb, + NULL); + assert(httpd_env->topic_exdata_compose[i].exdata_id >= 0); + } + + if (httpd_env->topic_exdata_compose[i].on_msg_cb) + { + stellar_session_mq_subscribe(httpd_env->st, httpd_env->topic_exdata_compose[i].sub_topic_id, + httpd_env->topic_exdata_compose[i].on_msg_cb, httpd_env->plugin_id); + } + } + } + + int http_topic_exdata_compose_get_index(const struct http_decoder_env *httpd_env, int by_topic_id) + { + for (int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++) + { + if (httpd_env->topic_exdata_compose[i].sub_topic_id == by_topic_id) + { + return i; + } + } + assert(0); + return -1; + } + + void *http_decoder_init(struct stellar *st) + { + int thread_num = 0; + + struct http_decoder_env *httpd_env = CALLOC(struct http_decoder_env, 1); + int ret = load_http_decoder_config(HTTPD_CFG_FILE, &httpd_env->hd_cfg); + if (ret < 0) + { + goto failed; + } + httpd_env->st = st; + httpd_env->plugin_id = stellar_session_plugin_register(st, httpd_session_ctx_new_cb, + httpd_session_ctx_free_cb, (void *)httpd_env); + if (httpd_env->plugin_id < 0) + { + goto failed; + } + http_decoder_topic_exdata_compose_init(httpd_env); + + thread_num = stellar_get_worker_thread_num(st); + assert(thread_num >= 1); + if (http_decoder_stat_init(&httpd_env->hd_stat, thread_num, + httpd_env->hd_cfg.stat_interval_pkts, httpd_env->hd_cfg.stat_output_interval) < 0) + { + goto failed; + } + + return httpd_env; + + failed: + fprintf(stderr, "http_decoder_init fail!\n"); + _http_decoder_context_free(httpd_env); + return NULL; + } + + void http_decoder_exit(void *plugin_env) + { + if (NULL == plugin_env) + { + return; + } + struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env; + _http_decoder_context_free(httpd_env); + } + + enum http_message_type http_message_type_get(const struct http_message *msg) + { + if (unlikely(NULL == msg)) + { + return HTTP_MESSAGE_MAX; + } + return msg->type; + } + + void http_message_request_line_get0(const struct http_message *msg, + struct http_request_line *line) + { + if (unlikely(NULL == msg || msg->type != HTTP_MESSAGE_REQ_LINE)) + { + if (line) + { + line->method.iov_base = NULL; + line->uri.iov_base = NULL; + line->version.iov_base = NULL; + } + return; + } + assert(msg->ref_queue); + assert(msg->queue_index < HD_RESULT_QUEUE_LEN); + + struct http_decoder_half_data *req_data = + msg->ref_queue->array[msg->queue_index].req_data; + + http_decoder_half_data_get_request_line(req_data, line); + } + + void http_message_response_line_get0(const struct http_message *msg, + struct http_response_line *line) + { + if (unlikely(NULL == msg || msg->type != HTTP_MESSAGE_RES_LINE)) + { + if (line) + { + line->version.iov_base = NULL; + line->status.iov_base = NULL; + } + return; + } + assert(msg->ref_queue); + assert(msg->queue_index < HD_RESULT_QUEUE_LEN); + + struct http_decoder_half_data *res_data = + msg->ref_queue->array[msg->queue_index].res_data; + + http_decoder_half_data_get_response_line(res_data, line); + } + + void http_message_header_get0(const struct http_message *msg, const hstring *key, + struct http_header *hdr_result) + { + int ret = -1; + if (unlikely(NULL == msg || NULL == key)) + { + goto fail; + } + assert(msg->ref_queue); + assert(msg->queue_index < HD_RESULT_QUEUE_LEN); + if (HTTP_MESSAGE_REQ_HEADER == msg->type) + { + ret = http_msg_get_request_header(msg, key, hdr_result); + } + else if (HTTP_MESSAGE_RES_HEADER == msg->type) + { + ret = http_msg_get_response_header(msg, key, hdr_result); + } + if (ret >= 0) + { + return; + } + fail: + if (hdr_result) + { + hdr_result->key.iov_base = NULL; + hdr_result->val.iov_base = NULL; + } + return; + } + + int http_message_header_next(const struct http_message *msg, struct http_header *header) + { + int ret = 1; + if (unlikely(NULL == msg)) + { + goto fail; + } + assert(msg->ref_queue); + assert(msg->queue_index < HD_RESULT_QUEUE_LEN); + if (HTTP_MESSAGE_REQ_HEADER == msg->type) + { + ret = http_msg_request_header_next(msg, header); + } + else if (HTTP_MESSAGE_RES_HEADER == msg->type) + { + ret = http_msg_response_header_next(msg, header); + } + if (ret < 0) + { + goto fail; + } + return 0; + fail: + if (header) + { + header->key.iov_base = NULL; + header->val.iov_base = NULL; + } + return -1; + } + + int http_message_reset_header_iter(struct http_message *msg) + { + if (unlikely(NULL == msg)) + { + return -1; + } + assert(msg->ref_queue); + assert(msg->queue_index < HD_RESULT_QUEUE_LEN); + if (HTTP_MESSAGE_REQ_HEADER == msg->type) + { + struct http_decoder_half_data *req_data = + msg->ref_queue->array[msg->queue_index].req_data; + return http_decoder_half_data_reset_header_iter(req_data); + } + else if (HTTP_MESSAGE_RES_HEADER == msg->type) + { + struct http_decoder_half_data *res_data = + msg->ref_queue->array[msg->queue_index].res_data; + return http_decoder_half_data_reset_header_iter(res_data); + } + return -1; + } + + void http_message_raw_body_get0(const struct http_message *msg, hstring *body) + { + if (unlikely(NULL == msg)) + { + goto fail; + } + assert(msg->ref_queue); + assert(msg->queue_index < HD_RESULT_QUEUE_LEN); + if (msg->raw_payload.iov_base != NULL && msg->raw_payload.iov_len != 0) + { + body->iov_base = msg->raw_payload.iov_base; + body->iov_len = msg->raw_payload.iov_len; + return; + } + fail: + if (body) + { + body->iov_base = NULL; + body->iov_len = 0; + } + return; + } + + void http_message_decompress_body_get0(const struct http_message *msg, hstring *decompress_body) + { + enum http_content_encoding ecode = HTTP_CONTENT_ENCODING_NONE; + struct http_decoder_half_data *ref_data = NULL; + if (unlikely(NULL == msg)) + { + goto fail; + } + assert(msg->ref_queue); + assert(msg->queue_index < HD_RESULT_QUEUE_LEN); + if (msg->decompress_payload.iov_base != NULL && msg->decompress_payload.iov_len != 0) + { + decompress_body->iov_base = msg->decompress_payload.iov_base; + decompress_body->iov_len = msg->decompress_payload.iov_len; + return; + } + /** + * @brief If the body hasn't been compressed, same as http_message_raw_body_get0(). + * + */ + + if (HTTP_MESSAGE_REQ_BODY_START == msg->type + || HTTP_MESSAGE_REQ_BODY == msg->type + || HTTP_MESSAGE_REQ_BODY_END == msg->type) + { + ref_data = msg->ref_queue->array[msg->queue_index].req_data; + } + else if (HTTP_MESSAGE_RES_BODY_START == msg->type + || HTTP_MESSAGE_RES_BODY == msg->type + || HTTP_MESSAGE_RES_BODY_END == msg->type) + { + ref_data = msg->ref_queue->array[msg->queue_index].res_data; + } + ecode = http_half_data_get_content_encoding(ref_data); + if(ref_data != NULL && HTTP_CONTENT_ENCODING_NONE != ecode){ + goto fail; + } + + if (msg->raw_payload.iov_base != NULL && msg->raw_payload.iov_len != 0) + { + decompress_body->iov_base = msg->raw_payload.iov_base; + decompress_body->iov_len = msg->raw_payload.iov_len; + } + return; + fail: + if (decompress_body) + { + decompress_body->iov_base = NULL; + decompress_body->iov_len = 0; + } + return; + } + + void http_message_raw_url_get0(const struct http_message *msg, hstring *url) + { + if (unlikely(NULL == msg)) + { + if (url) + { + url->iov_base = NULL; + url->iov_len = 0; + } + return; + } + assert(msg->ref_queue); + assert(msg->queue_index < HD_RESULT_QUEUE_LEN); + + struct http_decoder_half_data *req_data = + msg->ref_queue->array[msg->queue_index].req_data; + + if (http_half_data_get_url(req_data, url) < 0) + { + goto fail; + } + return; + + fail: + if (url) + { + url->iov_base = NULL; + url->iov_len = 0; + } + return; + } + + void http_message_decoded_url_get0(const struct http_message *msg, struct iovec *url) + { + if (unlikely(NULL == msg)) + { + if (url) + { + url->iov_base = NULL; + url->iov_len = 0; + } + return; + } + assert(msg->ref_queue); + assert(msg->queue_index < HD_RESULT_QUEUE_LEN); + + struct http_decoder_half_data *req_data = + msg->ref_queue->array[msg->queue_index].req_data; + + if (http_half_data_get_decode_url(req_data, url) < 0) + { + goto fail; + } + return; + + fail: + if (url) + { + url->iov_base = NULL; + url->iov_len = 0; + } + return; + } + + int http_message_get_transaction_seq(const struct http_message *msg) + { + if (unlikely(NULL == msg)) + { + return -1; + } + assert(msg->ref_queue); + assert(msg->queue_index < HD_RESULT_QUEUE_LEN); + struct http_decoder_half_data *hf_data = NULL; + if (HTTP_REQUEST == msg->flow_type) + { + hf_data = msg->ref_queue->array[msg->queue_index].req_data; + } + else + { + hf_data = msg->ref_queue->array[msg->queue_index].res_data; + } + return http_half_data_get_transaction_seq(hf_data); + } +#ifdef __cplusplus +} +#endif
\ No newline at end of file diff --git a/decoders/http/http_decoder_half.cpp b/decoders/http/http_decoder_half.cpp new file mode 100644 index 0000000..bea97d0 --- /dev/null +++ b/decoders/http/http_decoder_half.cpp @@ -0,0 +1,1216 @@ +#include <assert.h> +#include <stdio.h> +#include <string.h> +#include <arpa/inet.h> +#include "http_decoder_inc.h" +#include "llhttp.h" +#include "uthash/utlist.h" + +struct http_decompress_buffer +{ + struct iovec iov; + char is_commit; + struct http_decompress_buffer *next, *prev; +}; + +struct http_decoder_half_data +{ + struct http_decoder_table *table; + + int major_version; + int minor_version; + int status_code; + + enum http_event state; + + enum http_content_encoding content_encoding; + struct http_content_decompress *decompress; +#if 0 + char *ref_decompress_body; + size_t decompress_body_len; +#else + struct http_decompress_buffer *decompress_buffer_list; +#endif + int joint_url_complete; + int url_is_encoded; + hstring joint_url; // http://<host>[:<port>]/<path>?<searchpart> + hstring decoded_url; + long long transaction_index; +}; + +struct http_decoder_half +{ + llhttp_t parser; + llhttp_settings_t settings; + enum llhttp_errno error; + int decompress_switch; + struct http_decoder_env *httpd_env; + + // uint8_t is_request_flow; + enum http_event event; + http_event_cb *http_ev_cb; + struct http_event_context *http_ev_ctx; + + struct http_decoder_half_data *ref_data; + + long long trans_counter; + long long err_counter; + long long transaction_seq; // accumulated + + const char *data; + int data_len; +}; + +// #define HTTP_DECODER_DEBUG +#ifdef HTTP_DECODER_DEBUG +static void printf_debug_info(const char *desc, const char *at, size_t length) +{ + if (at) + { + char *temp = safe_dup(at, length); + printf("HTTP PARSER STAGE: %s: %s\n", desc, temp); + FREE(temp); + } + else + { + printf("HTTP PARSER STAGE: %s\n", desc); + } +} +#else +#define printf_debug_info(desc, at, length) +#endif + +void http_half_decompress_buffer_free(struct http_decoder_half_data *data, hstring *decompress_body) +{ + struct http_decompress_buffer *el, *tmp; + DL_FOREACH_SAFE(data->decompress_buffer_list, el, tmp) + { + if (el->iov.iov_base == decompress_body->iov_base + && el->iov.iov_len == decompress_body->iov_len) + { + DL_DELETE(data->decompress_buffer_list, el); + if (el->iov.iov_base) + { + FREE(el->iov.iov_base); + } + FREE(el); + break; + } + } +} + +void http_half_get_lastest_decompress_buffer(struct http_decoder_half_data *data, hstring *decompress_body) +{ + if (data->content_encoding == HTTP_CONTENT_ENCODING_NONE) + { + return; + } + if (data->decompress_buffer_list == NULL) + { + decompress_body->iov_base = NULL; + decompress_body->iov_len = 0; + return; + } + if (data->decompress_buffer_list->prev->is_commit == 1) + { + decompress_body->iov_base = NULL; + decompress_body->iov_len = 0; + return; + } + decompress_body->iov_base = data->decompress_buffer_list->prev->iov.iov_base; + decompress_body->iov_len = data->decompress_buffer_list->prev->iov.iov_len; + data->decompress_buffer_list->prev->is_commit = 1; +} + +static void http_decoder_half_data_decompress(struct http_decoder_half_data *data) +{ + assert(data); + + if (data->content_encoding == HTTP_CONTENT_ENCODING_NONE) + { + return; + } + + hstring raw_body = {}; + http_decoder_table_get_body(data->table, &raw_body); + if (raw_body.iov_base == NULL || raw_body.iov_len == 0) + { + return; + } + + if (NULL == data->decompress) + { + data->decompress = http_content_decompress_create(data->content_encoding); + } + + assert(data->decompress); + char *local_outdata = NULL; + size_t local_outdata_len = 0; + if (http_content_decompress_write(data->decompress, (char *)raw_body.iov_base, + raw_body.iov_len, + &local_outdata, + &local_outdata_len) == -1) + { + // log error + http_content_decompress_destroy(data->decompress); + data->decompress = NULL; + return; + } + + if(local_outdata!= NULL && local_outdata_len > 0) + { + struct http_decompress_buffer *decompress_buffer = CALLOC(struct http_decompress_buffer, 1); + assert(decompress_buffer); + decompress_buffer->iov.iov_base = local_outdata; + decompress_buffer->iov.iov_len = local_outdata_len; + DL_APPEND(data->decompress_buffer_list, decompress_buffer); + } +} + +/* Possible return values 0, -1, `HPE_PAUSED` */ +static int on_message_begin(llhttp_t *http) +{ + printf_debug_info("on_message_begin", NULL, 0); + + struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); + assert(half); + + if (half->parser.type == HTTP_REQUEST) + { + half->event = HTTP_EVENT_REQ_INIT; + } + else + { + half->event = HTTP_EVENT_RES_INIT; + } + + half->ref_data = NULL; + + assert(half->http_ev_cb != NULL); + half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler() + + half->trans_counter++; + half->ref_data->transaction_index = half->transaction_seq++; + return 0; +} + +static int on_message_complete(llhttp_t *http) +{ + printf_debug_info("on_message_complete", NULL, 0); + + struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); + assert(half); + + if (half->parser.type == HTTP_REQUEST) + { + if (half->event == HTTP_EVENT_REQ_BODY_DATA) + { + half->event = HTTP_EVENT_REQ_BODY_END; + half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); + } + } + else + { + if (half->event == HTTP_EVENT_RES_BODY_DATA) + { + half->event = HTTP_EVENT_RES_BODY_END; + half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); + } + } + + // trigger req_end/res_end + if (half->parser.type == HTTP_REQUEST) + { + half->event = HTTP_EVENT_REQ_END; + half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); + } + else + { + half->event = HTTP_EVENT_RES_END; + half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); + } + + return 0; +} + +static int on_reset(llhttp_t *http __attribute__((unused))) +{ + printf_debug_info("on_reset", NULL, 0); + + return 0; +} + +static inline int is_line_crlf(struct http_decoder_half *half) +{ + const char *chr_r = (char *)memrchr(half->data, '\r', half->data_len); + const char *chr_n = (char *)memrchr(half->data, '\n', half->data_len); + if (chr_r && chr_n && (chr_r + 1 == chr_n)) + { + return 1; + } + return 0; +} + +static int on_method(llhttp_t *http, const char *at, size_t length) +{ + printf_debug_info("on_method", at, length); + + struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); + assert(half); + + http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_METHOD, at, length); + return 0; +} + +/* Information-only callbacks, return value is ignored */ +static int on_method_complete(llhttp_t *http) +{ + printf_debug_info("on_method_complete", NULL, 0); + + struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); + assert(half); + + if (is_line_crlf(half) == 0) + { + http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_METHOD); + } + + http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_METHOD); + + return 0; +} + +/* Possible return values 0, -1, HPE_USER */ +static int on_uri(llhttp_t *http, const char *at, size_t length) +{ + printf_debug_info("on_uri", at, length); + + struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); + assert(half); + + http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_URI, at, length); + return 0; +} + +static void http_decoder_cached_portion_url(struct http_decoder_half *half, const hstring *uri_result) +{ + struct http_decoder_half_data *ref_data = half->ref_data; + int uri_skip_len = 0; + + if ((uri_result->iov_len) > 7 && (strncasecmp("http://", (char *)uri_result->iov_base, 7) == 0)) // absolute URI + { + uri_skip_len = strlen("http://"); + ref_data->joint_url_complete = 1; + } + else + { + ref_data->joint_url_complete = 0; + } + + ref_data->joint_url.iov_len = uri_result->iov_len - uri_skip_len; + ref_data->joint_url.iov_base = MEMPOOL_CALLOC(half->http_ev_ctx->ref_mempool, char, ref_data->joint_url.iov_len); + memcpy(ref_data->joint_url.iov_base, (char *)uri_result->iov_base + uri_skip_len, ref_data->joint_url.iov_len); +} + +/* Information-only callbacks, return value is ignored */ +static int on_uri_complete(llhttp_t *http) +{ + printf_debug_info("on_uri_complete", NULL, 0); + + struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); + assert(half); + + if (is_line_crlf(half) == 0) + { + http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_URI); + } + + http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_URI); + + hstring uri_result = {}; + http_decoder_table_get_uri(half->ref_data->table, &uri_result); + assert(uri_result.iov_base); + http_decoder_cached_portion_url(half, &uri_result); + + return 0; +} + +/* Possible return values 0, -1, HPE_USER */ +static int on_version(llhttp_t *http, const char *at, size_t length) +{ + printf_debug_info("on_version", at, length); + + struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); + assert(half); + + http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_VERSION, at, length); + return 0; +} + +/* Information-only callbacks, return value is ignored */ +static int on_version_complete(llhttp_t *http) +{ + printf_debug_info("on_version_complete", NULL, 0); + + struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); + assert(half); + + if (is_line_crlf(half) == 0) + { + http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_VERSION); + } + + http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_VERSION); + + half->ref_data->major_version = llhttp_get_http_major(&half->parser); + half->ref_data->minor_version = llhttp_get_http_minor(&half->parser); + + if (half->parser.type == HTTP_REQUEST) + { + half->event = HTTP_EVENT_REQ_LINE; + if (half->http_ev_cb) // http_event_handler() + { + half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); + } + } + + return 0; +} + +/* Possible return values 0, -1, HPE_USER */ +static int on_status(llhttp_t *http, const char *at, size_t length) +{ + printf_debug_info("on_status", at, length); + + struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); + assert(half); + + http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_STATUS, at, length); + return 0; +} + +/* Information-only callbacks, return value is ignored */ +static int on_status_complete(llhttp_t *http) +{ + printf_debug_info("on_status_complete", NULL, 0); + + struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); + assert(half); + + if (is_line_crlf(half) == 0) + { + http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_STATUS); + } + + http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_STATUS); + half->ref_data->status_code = llhttp_get_status_code(&half->parser); + + if (half->parser.type == HTTP_RESPONSE) + { + half->event = HTTP_EVENT_RES_LINE; + if (half->http_ev_cb != NULL) // http_event_handler() + { + half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); + } + } + + return 0; +} + +/* Possible return values 0, -1, HPE_USER */ +static int on_header_field(llhttp_t *http, const char *at, size_t length) +{ + printf_debug_info("on_header_field", at, length); + + struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); + assert(half); + + http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_HDRKEY, at, length); + return 0; +} + +/* Information-only callbacks, return value is ignored */ +static int on_header_field_complete(llhttp_t *http) +{ + printf_debug_info("on_header_field_complete", NULL, 0); + + struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); + assert(half); + + http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_HDRKEY); + + return 0; +} + +/* Possible return values 0, -1, HPE_USER */ +static int on_header_value(llhttp_t *http, const char *at, size_t length) +{ + printf_debug_info("on_header_value", at, length); + + struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); + assert(half); + + http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_HDRVAL, at, length); + return 0; +} + +#define MAX_ENCODING_STR_LEN 8 +/* Information-only callbacks, return value is ignored */ +static int on_header_value_complete(llhttp_t *http) +{ + printf_debug_info("on_header_value_complete", NULL, 0); + + struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); + assert(half); + + if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_HDRKEY) == + STRING_STATE_CACHE) + { + http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_HDRKEY); + } + + http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_HDRVAL); + + if (half->ref_data->content_encoding == HTTP_CONTENT_ENCODING_NONE) + { + struct http_header http_hdr = {}; + hstring key = {.iov_base = (char *)"Content-Encoding", .iov_len = 16}; + + if (http_decoder_table_get_header(half->ref_data->table, &key, &http_hdr) == 0) + { + char encoding_str[MAX_ENCODING_STR_LEN + 1] = {0}; + size_t iov_len = http_hdr.val.iov_len; + if (iov_len > MAX_ENCODING_STR_LEN) + { + iov_len = MAX_ENCODING_STR_LEN; + } + memcpy(encoding_str, http_hdr.val.iov_base, iov_len); + half->ref_data->content_encoding = http_content_encoding_str2int(encoding_str); + } + } + + if (http->type == HTTP_REQUEST) + { + http_decoder_get_host_feed_url(half); + } + return 0; +} + +/* When on_chunk_header is called, the current chunk length is stored + * in parser->content_length. + * Possible return values 0, -1, `HPE_PAUSED` + */ +static int on_chunk_header(llhttp_t *http __attribute__((unused))) +{ + printf_debug_info("on_chunk_header", NULL, 0); + + return 0; +} + +/* When on_chunk_header is called, the current chunk length is stored + * in parser->content_length. + * Possible return values 0, -1, `HPE_PAUSED` + */ +static int on_chunk_header_complete(llhttp_t *http __attribute__((unused))) +{ + printf_debug_info("on_chunk_header_complete", NULL, 0); + + return 0; +} + +/* Possible return values: + * 0 - Proceed normally + * 1 - Assume that request/response has no body, and proceed to parsing the next message + * 2 - Assume absence of body (as above) and make `llhttp_execute()` return `HPE_PAUSED_UPGRADE` + * -1 - Error `HPE_PAUSED` + */ +static int on_headers_complete(llhttp_t *http) +{ + printf_debug_info("on_headers_complete", NULL, 0); + + struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); + assert(half); + assert(half->ref_data); + + http_decoder_table_set_header_complete(half->ref_data->table); + + if (half->parser.type == HTTP_REQUEST) + { + half->event = HTTP_EVENT_REQ_HDR_END; + } + else + { + half->event = HTTP_EVENT_RES_HDR_END; + } + half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler() + + return 0; +} + +/* Possible return values 0, -1, HPE_USER */ +static int on_body(llhttp_t *http, const char *at, size_t length) +{ + printf_debug_info("on_body", at, length); + + struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); + assert(half); + + // trigger body_begin event + if (half->parser.type == HTTP_REQUEST) + { + if (half->event == HTTP_EVENT_REQ_HDR_END) + { + half->event = HTTP_EVENT_REQ_BODY_BEGIN; + half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler() + } + } + else + { + if (half->event == HTTP_EVENT_RES_HDR_END) + { + half->event = HTTP_EVENT_RES_BODY_BEGIN; + half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); + } + } + + if (half->ref_data != NULL) + { + if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_BODY) == + STRING_STATE_COMMIT) + { + http_decoder_table_reset(half->ref_data->table, HTTP_ITEM_BODY); + } + + http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_BODY, at, length); + http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_BODY); + } + + if (1 == half->decompress_switch && half->ref_data->content_encoding != HTTP_CONTENT_ENCODING_NONE) + { + http_decoder_half_data_decompress(half->ref_data); + } + + if (half->parser.type == HTTP_REQUEST) + { + half->event = HTTP_EVENT_REQ_BODY_DATA; + half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler() + } + else + { + half->event = HTTP_EVENT_RES_BODY_DATA; + half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); + } + + return 0; +} + +static void http_decoder_half_init(struct http_decoder_half *half, http_event_cb *http_ev_cb, enum llhttp_type type) +{ + llhttp_settings_init(&half->settings); + llhttp_init(&half->parser, type, &half->settings); + + // half->is_request_flow = (type == HTTP_REQUEST) ? 1 : 0; + half->settings.on_message_begin = on_message_begin; + half->settings.on_message_complete = on_message_complete; + half->settings.on_reset = on_reset; + + half->settings.on_url = on_uri; + half->settings.on_url_complete = on_uri_complete; + + half->settings.on_status = on_status; + half->settings.on_status_complete = on_status_complete; + + half->settings.on_method = on_method; + half->settings.on_method_complete = on_method_complete; + + half->settings.on_version = on_version; + half->settings.on_version_complete = on_version_complete; + + half->settings.on_header_field = on_header_field; + half->settings.on_header_field_complete = on_header_field_complete; + + half->settings.on_header_value = on_header_value; + half->settings.on_header_value_complete = on_header_value_complete; + + half->settings.on_chunk_header = on_chunk_header; + half->settings.on_chunk_complete = on_chunk_header_complete; + + half->settings.on_headers_complete = on_headers_complete; + half->settings.on_body = on_body; + + half->error = HPE_OK; + half->http_ev_cb = http_ev_cb; // http_event_handler() + half->ref_data = NULL; +} + +struct http_decoder_half *http_decoder_half_new(struct http_decoder_exdata *hd_ctx, nmx_pool_t *mempool, + http_event_cb *ev_cb, enum llhttp_type http_type, + int decompress_switch, struct http_decoder_env *httpd_env, long long start_seq) +{ + struct http_decoder_half *half = MEMPOOL_CALLOC(mempool, struct http_decoder_half, 1); + assert(half); + + half->decompress_switch = decompress_switch; + half->http_ev_ctx = MEMPOOL_CALLOC(mempool, struct http_event_context, 1); + http_decoder_half_init(half, ev_cb, http_type); + half->http_ev_ctx->ref_httpd_ctx = hd_ctx; + half->httpd_env = httpd_env; + half->transaction_seq = start_seq; + return half; +} + +void http_decoder_half_free(nmx_pool_t *mempool, struct http_decoder_half *half) +{ + if (NULL == half) + { + return; + } + + if (half->http_ev_ctx != NULL) + { + MEMPOOL_FREE(mempool, half->http_ev_ctx); + half->http_ev_ctx = NULL; + } + + MEMPOOL_FREE(mempool, half); +} + +void http_decoder_half_reinit(struct http_decoder_half *half, + struct http_decoder_result_queue *queue, + nmx_pool_t *mempool, struct session *sess) +{ + assert(half != NULL); + if (half->ref_data != NULL) + { + http_decoder_table_reinit(half->ref_data->table); + } + half->http_ev_ctx->ref_mempool = mempool; + half->http_ev_ctx->ref_session = sess; + half->http_ev_ctx->ref_queue = queue; +} + +static void publish_message_for_parsed_header(struct http_decoder_half *half) +{ + if (0 == http_decoder_table_has_parsed_header(half->ref_data->table)) + { + return; + } + if (half->parser.type == HTTP_REQUEST) + { + half->event = HTTP_EVENT_REQ_HDR; + } + else + { + half->event = HTTP_EVENT_RES_HDR; + } + half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler(); + return; +} + +int http_decoder_half_parse(int proxy_enable, struct http_decoder_half *half, const char *data, size_t data_len) +{ + assert(half && data); + + half->data = (const char *)data; + half->data_len = data_len; + half->error = llhttp_execute(&half->parser, data, data_len); + + int ret = 0; + enum llhttp_type type = HTTP_BOTH; + + switch (half->error) + { + case HPE_OK: + break; + case HPE_PAUSED: + llhttp_resume(&half->parser); + break; + case HPE_PAUSED_UPGRADE: + if (proxy_enable) + { + llhttp_resume_after_upgrade(&half->parser); + } + ret = 0; + break; + default: + type = (enum llhttp_type)half->parser.type; + llhttp_init(&half->parser, type, &half->settings); + ret = -1; + break; + } + + if (ret < 0) + { + // fprintf(stdout, + // "llhttp_execute parse error: %s err_reason:%s\n", + // llhttp_errno_name(half->error), half->parser.reason); + return half->error; + } + + if (half->ref_data != NULL) + { + if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_URI) == STRING_STATE_REFER) + { + http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_URI); + } + + if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_STATUS) == STRING_STATE_REFER) + { + http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_STATUS); + } + + if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_METHOD) == STRING_STATE_REFER) + { + http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_METHOD); + } + + if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_VERSION) == STRING_STATE_REFER) + { + http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_VERSION); + } + + if (http_decoder_table_header_complete(half->ref_data->table)) + { + http_decoder_table_reset_header_complete(half->ref_data->table); + } + else + { + // if headers are not completed with EOF \r\n\r\n, push the parsed headers so far + publish_message_for_parsed_header(half); + } + + enum string_state hdr_key_state = + http_decoder_table_state(half->ref_data->table, HTTP_ITEM_HDRKEY); + enum string_state hdr_val_state = + http_decoder_table_state(half->ref_data->table, HTTP_ITEM_HDRVAL); + + /* Truncated in http header key + For example http header k-v => User-Agent: Chrome + case1: + packet1: User- hdr_key_state == STRING_STATE_REFER + packet2: Agent: Chrome + + case2: + packet1: User-Agent: hdr_key_state == STRING_STATE_COMMIT + hdr_val_state == STRING_STATE_INIT + packet2: Chrome + */ + if (hdr_key_state == STRING_STATE_REFER || + (hdr_key_state == STRING_STATE_COMMIT && hdr_val_state == STRING_STATE_INIT)) + { + http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_HDRKEY); + } + + /* Truncated in http header value + For example http header k-v => User-Agent: Chrome + packet1: User-Agent: Ch hdr_key_state == STRING_STATE_COMMIT + hdr_val_state == STRING_STATE_REFER + + packet2: rome + */ + if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_HDRVAL) == STRING_STATE_REFER) + { + /* Header key should have been committed + If it's not cached, cache it for next packet to use + */ + http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_HDRKEY); + http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_HDRVAL); + } + + if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_BODY) == STRING_STATE_REFER) + { + http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_BODY); + } + } + + return 0; +} + +long long http_decoder_half_trans_count(struct http_decoder_half *half) +{ + if (NULL == half) + { + return 0; + } + + long long trans_cnt = half->trans_counter; + half->trans_counter = 0; + + return trans_cnt; +} + +struct http_decoder_half_data * +http_decoder_half_data_new(nmx_pool_t *mempool) +{ + struct http_decoder_half_data *data = + MEMPOOL_CALLOC(mempool, struct http_decoder_half_data, 1); + assert(data); + + data->table = http_decoder_table_new(mempool); + assert(data->table); + + data->major_version = -1; + data->minor_version = -1; + data->status_code = -1; + + data->content_encoding = HTTP_CONTENT_ENCODING_NONE; + // data->ref_decompress_body = NULL; + // data->decompress_body_len = 0; + data->decompress_buffer_list = NULL; + return data; +} + +static void http_decoder_half_decompress_buf_free(struct http_decoder_half_data *ref_data) +{ + if (ref_data == NULL) + { + return; + } + struct http_decompress_buffer *el, *tmp; + DL_FOREACH_SAFE(ref_data->decompress_buffer_list, el, tmp) + { + DL_DELETE(ref_data->decompress_buffer_list, el); + if (el->iov.iov_base != NULL) + { + FREE(el->iov.iov_base); + } + FREE(el); + } + ref_data->decompress_buffer_list = NULL; +} + +void http_decoder_half_data_free(nmx_pool_t *mempool, struct http_decoder_half_data *data) +{ + if (NULL == data) + { + return; + } + + if (data->table != NULL) + { + http_decoder_table_free(data->table); + data->table = NULL; + } + + if (data->decompress != NULL) + { + http_content_decompress_destroy(data->decompress); + data->decompress = NULL; + } + + if (data->joint_url.iov_base) + { + MEMPOOL_FREE(mempool, data->joint_url.iov_base); + data->joint_url.iov_base = NULL; + data->joint_url_complete = 0; + } + http_decoder_half_decompress_buf_free(data); + MEMPOOL_FREE(mempool, data); +} + +int http_decoder_half_data_get_request_line(struct http_decoder_half_data *data, + struct http_request_line *line) +{ + http_decoder_table_get_method(data->table, &line->method); + http_decoder_table_get_uri(data->table, &line->uri); + http_decoder_table_get_version(data->table, &line->version); + + line->major_version = data->major_version; + line->minor_version = data->minor_version; + + return 0; +} + +int http_decoder_half_data_get_response_line(struct http_decoder_half_data *data, + struct http_response_line *line) +{ + http_decoder_table_get_version(data->table, &line->version); + http_decoder_table_get_status(data->table, &line->status); + + line->major_version = data->major_version; + line->minor_version = data->minor_version; + line->status_code = data->status_code; + + return 0; +} + +int http_decoder_half_data_get_header(const struct http_decoder_half_data *data, + const hstring *key, + struct http_header *hdr_result) +{ + return http_decoder_table_get_header(data->table, key, hdr_result); +} + +int http_decoder_half_data_iter_header(struct http_decoder_half_data *data, + struct http_header *header) +{ + return http_decoder_table_iter_header((struct http_decoder_table *)data->table, header); +} + +int http_decoder_half_data_reset_header_iter(struct http_decoder_half_data *req_data) +{ + if (NULL == req_data) + { + return -1; + } + return http_decoder_table_reset_header_iter(req_data->table); +} + +int http_decoder_half_data_has_parsed_header(struct http_decoder_half_data *data) +{ + if (NULL == data) + { + return 0; + } + return http_decoder_table_has_parsed_header(data->table); +} + +int http_decoder_half_data_get_raw_body(const struct http_decoder_half_data *data, hstring *body) +{ + if (NULL == data || NULL == body) + { + return -1; + } + return http_decoder_table_get_body(data->table, body); +} +#if 0 +int http_decoder_half_data_get_decompress_body(const struct http_decoder_half_data *data, hstring *body) +{ + if (HTTP_CONTENT_ENCODING_NONE == data->content_encoding) + { + return http_decoder_table_get_body(data->table, body); + } + + body->iov_base = data->ref_decompress_body; + body->iov_len = data->decompress_body_len; + return 0; +} +#endif + +void http_decoder_half_data_dump(struct http_decoder_half *half) +{ + if (NULL == half || NULL == half->ref_data) + { + return; + } + + http_decoder_table_dump(half->ref_data->table); +} + +static void using_session_addr_as_host(struct session *ref_session, + struct http_header *host_result, nmx_pool_t *mempool) +{ +#if 1 // in native steallar, can't get the tuple4 from the session yet!!! + struct httpd_session_addr ssaddr = {}; + httpd_session_get_addr(ref_session, &ssaddr); + if (ssaddr.ipver != 4 && ssaddr.ipver != 6) + { + host_result->val.iov_base = MEMPOOL_CALLOC(mempool, char, 1); + sprintf((char *)host_result->val.iov_base, "%s", ""); + host_result->val.iov_len = strlen((char *)host_result->val.iov_base); + return; + } + + char ip_string_buf[INET6_ADDRSTRLEN]; + if (4 == ssaddr.ipver) + { + host_result->val.iov_base = MEMPOOL_CALLOC(mempool, char, (INET_ADDRSTRLEN + 7) /* "ip:port" max length */); + inet_ntop(AF_INET, &ssaddr.daddr4, ip_string_buf, INET_ADDRSTRLEN); + sprintf((char *)host_result->val.iov_base, "%s:%u", ip_string_buf, ntohs(ssaddr.dport)); + host_result->val.iov_len = strlen((char *)host_result->val.iov_base); + } + else if (6 == ssaddr.ipver) + { + host_result->val.iov_base = MEMPOOL_CALLOC(mempool, char, (INET6_ADDRSTRLEN + 7) /* "ip:port" max length */); + inet_ntop(AF_INET6, &ssaddr.daddr6, ip_string_buf, INET6_ADDRSTRLEN); + sprintf((char *)host_result->val.iov_base, "%s:%u", ip_string_buf, ntohs(ssaddr.dport)); + host_result->val.iov_len = strlen((char *)host_result->val.iov_base); + } + else + { + assert(0); + } +#else + host_result->val.iov_base = MEMPOOL_CALLOC(mempool, char, 32); + sprintf((char *)host_result->val.iov_base, "%s", "todo:get_tuple4"); + host_result->val.iov_len = strlen((char *)host_result->val.iov_base); +#endif +} + +void http_decoder_join_url(struct http_decoder_half_data *hfdata, nmx_pool_t *mempool, const struct http_header *host_hdr) +{ + int append_slash_len = 0; + if ('/' != ((char *)hfdata->joint_url.iov_base)[0]) + { + append_slash_len = 1; + } + int url_cache_str_len = host_hdr->val.iov_len + hfdata->joint_url.iov_len + append_slash_len; + char *url_cache_str = MEMPOOL_CALLOC(mempool, char, url_cache_str_len); + + char *ptr = url_cache_str; + memcpy(ptr, host_hdr->val.iov_base, host_hdr->val.iov_len); + ptr += host_hdr->val.iov_len; + if (append_slash_len) + { + *ptr = '/'; + ptr++; + } + memcpy(ptr, hfdata->joint_url.iov_base, hfdata->joint_url.iov_len); + + MEMPOOL_FREE(mempool, hfdata->joint_url.iov_base); // free the cached uri buffer + hfdata->joint_url.iov_base = url_cache_str; + hfdata->joint_url.iov_len = url_cache_str_len; + + hfdata->url_is_encoded = httpd_url_is_encoded(url_cache_str, url_cache_str_len); + if (hfdata->url_is_encoded) + { + hfdata->decoded_url.iov_base = MEMPOOL_CALLOC(mempool, char, url_cache_str_len); + httpd_url_decode(url_cache_str, url_cache_str_len, (char *)hfdata->decoded_url.iov_base, &hfdata->decoded_url.iov_len); + } + else + { + hfdata->decoded_url.iov_base = url_cache_str; + hfdata->decoded_url.iov_len = url_cache_str_len; + } + + hfdata->joint_url_complete = 1; +} + +void http_decoder_get_url(struct http_decoder_half_data *hfdata, nmx_pool_t *mempool) +{ + struct http_request_line reqline = {}; + http_decoder_half_data_get_request_line(hfdata, &reqline); + if (unlikely(strncasecmp_safe("CONNECT", (char *)reqline.method.iov_base, 7, reqline.method.iov_len) == 0)) + { + hfdata->joint_url.iov_base = MEMPOOL_CALLOC(mempool, char, reqline.uri.iov_len + 1); + memcpy(hfdata->joint_url.iov_base, reqline.uri.iov_base, reqline.uri.iov_len); + hfdata->joint_url.iov_len = reqline.uri.iov_len; + hfdata->joint_url_complete = 1; + } +} + +int http_decoder_join_url_finally(struct http_event_context *ev_ctx, + struct http_decoder_half_data *hfdata, + nmx_pool_t *mempool) +{ + struct http_header addr_as_host = {}; + + if (hfdata->joint_url_complete) + { + return 0; + } + + using_session_addr_as_host(ev_ctx->ref_session, &addr_as_host, mempool); + http_decoder_join_url(hfdata, mempool, &addr_as_host); + MEMPOOL_FREE(mempool, addr_as_host.val.iov_base); // free session addr to host buffer + return 1; +} + +void http_decoder_get_host_feed_url(struct http_decoder_half *half) +{ + struct http_header host_result = {}; + hstring host_key = {(char *)"Host", 4}; + + if (half->ref_data->joint_url_complete) + { + return; + } + + int host_header_cnt = http_decoder_half_data_get_header(half->ref_data, &host_key, + &host_result); + if (host_header_cnt < 0) + { + return; + } + + http_decoder_join_url(half->ref_data, half->http_ev_ctx->ref_mempool, &host_result); +} + +int http_half_data_get_url(struct http_decoder_half_data *res_data, hstring *url) +{ + if (0 == res_data->joint_url_complete) + { + return -1; + } + url->iov_base = res_data->joint_url.iov_base; + url->iov_len = res_data->joint_url.iov_len; + return 0; +} +int http_half_data_get_decode_url(struct http_decoder_half_data *res_data, hstring *url) +{ + if (0 == res_data->joint_url_complete) + { + return -1; + } + url->iov_base = res_data->decoded_url.iov_base; + url->iov_len = res_data->decoded_url.iov_len; + return 0; +} + +int http_half_data_get_transaction_seq(struct http_decoder_half_data *hf_data) +{ + return hf_data->transaction_index; +} + +void http_half_data_update_commit_index(struct http_decoder_half_data *half_data) +{ + http_decoder_table_update_commit_index(half_data->table); +} + +int http_half_data_get_total_parsed_header_count(struct http_decoder_half_data *half_data) +{ + return http_decoder_table_get_total_parsed_header(half_data->table); +} + +void http_half_pre_context_free(struct session *sess, struct http_decoder_exdata *exdata) +{ + struct http_message *msg = NULL; + struct http_decoder_half_data *res_data = NULL; + struct http_decoder_result_queue *queue = NULL; + + if (exdata) + { + queue = exdata->queue; + for (size_t i = 0; i < queue->queue_size; i++) + { + struct http_decoder_half_data *req_data = queue->array[i].req_data; + res_data = queue->array[i].res_data; + if ((req_data != NULL) && (NULL == res_data) && (req_data->state < HTTP_EVENT_REQ_END)) + { + msg = http_message_new(HTTP_TRANSACTION_END, queue, i, HTTP_REQUEST); + session_mq_publish_message(sess, exdata->pub_topic_id, msg); + } + } + + for (size_t i = 0; i < queue->queue_size; i++) + { + res_data = queue->array[i].res_data; + if ((res_data != NULL) && (res_data->state < HTTP_EVENT_RES_END)) + { + msg = http_message_new(HTTP_TRANSACTION_END, queue, i, HTTP_RESPONSE); + session_mq_publish_message(sess, exdata->pub_topic_id, msg); + } + } + } +} + +void http_half_update_state(struct http_decoder_half_data *hf_data, enum http_event state) +{ + hf_data->state = state; +} + +void http_half_get_max_transaction_seq(struct http_decoder_exdata *exdata, long long *max_req_seq, long long *max_res_seq) +{ + assert(exdata && max_req_seq && max_res_seq); + *max_req_seq = exdata->decoder->c2s_half->transaction_seq; + *max_res_seq = exdata->decoder->s2c_half->transaction_seq; +} + +enum http_content_encoding http_half_data_get_content_encoding(struct http_decoder_half_data *hf_data) +{ + if(NULL == hf_data) + { + return HTTP_CONTENT_ENCODING_NONE; + } + return hf_data->content_encoding; +}
\ No newline at end of file diff --git a/decoders/http/http_decoder_half.h b/decoders/http/http_decoder_half.h new file mode 100644 index 0000000..5deeaa6 --- /dev/null +++ b/decoders/http/http_decoder_half.h @@ -0,0 +1,105 @@ +#ifndef _HTTP_DECODER_HALF_H_ +#define _HTTP_DECODER_HALF_H_ + +#include <stddef.h> +#include "stellar/session.h" +#include "stellar/http.h" +#include "http_content_decompress.h" +#include "http_decoder_result_queue.h" +#include "llhttp.h" + +// only one http event is fired at a time +enum http_event { + HTTP_EVENT_REQ_INIT = 1 << 1, + HTTP_EVENT_REQ_LINE = 1 << 2, + HTTP_EVENT_REQ_HDR = 1 << 3, + HTTP_EVENT_REQ_HDR_END = 1 << 4, + HTTP_EVENT_REQ_BODY_BEGIN = 1 << 5, + HTTP_EVENT_REQ_BODY_DATA = 1 << 6, + HTTP_EVENT_REQ_BODY_END = 1 << 7, + HTTP_EVENT_REQ_END = 1 << 8, + + HTTP_EVENT_RES_INIT = 1 << 9, + HTTP_EVENT_RES_LINE = 1 << 10, + HTTP_EVENT_RES_HDR = 1 << 11, + HTTP_EVENT_RES_HDR_END = 1 << 12, + HTTP_EVENT_RES_BODY_BEGIN = 1 << 13, + HTTP_EVENT_RES_BODY_DATA = 1 << 14, + HTTP_EVENT_RES_BODY_END = 1 << 15, + HTTP_EVENT_RES_END = 1 << 16, +}; + +struct http_event_context { + struct http_decoder_exdata *ref_httpd_ctx; + nmx_pool_t *ref_mempool; + struct session *ref_session; + struct http_decoder_result_queue *ref_queue; +}; + +struct http_decoder_half; +struct http_decoder_half_data; + +typedef void http_event_cb(enum http_event event, struct http_decoder_half_data **data, + struct http_event_context *ev_ctx, void *httpd_plugin_env); + +struct http_decoder_half * +http_decoder_half_new(struct http_decoder_exdata *hd_ctx, nmx_pool_t *mempool, http_event_cb *event_cb, + enum llhttp_type http_type, int decompress_switch, struct http_decoder_env *httpd_env,long long start_seq); + +void http_decoder_half_free(nmx_pool_t *mempool, struct http_decoder_half *half); + +void http_decoder_half_reinit(struct http_decoder_half *half, + struct http_decoder_result_queue *queue, + nmx_pool_t *mempool, struct session *sess); + +int http_decoder_half_parse(int proxy_enable, struct http_decoder_half *half, const char *data, size_t data_len); + +long long http_decoder_half_trans_count(struct http_decoder_half *half); + +//http decoder half data API +struct http_decoder_half_data * +http_decoder_half_data_new(nmx_pool_t *mempool); + +void http_decoder_half_data_free(nmx_pool_t *mempool, struct http_decoder_half_data *data); + +int http_decoder_half_data_get_request_line(struct http_decoder_half_data *data, + struct http_request_line *line); + +int http_decoder_half_data_get_response_line(struct http_decoder_half_data *data, + struct http_response_line *line); + +int http_decoder_half_data_get_header(const struct http_decoder_half_data *data, + const hstring *key, struct http_header *hdr_res); + +int http_decoder_half_data_iter_header(struct http_decoder_half_data *data, + struct http_header *header); +int http_decoder_half_data_reset_header_iter(struct http_decoder_half_data *req_data); +int http_decoder_half_data_has_parsed_header(struct http_decoder_half_data *data); + +int http_decoder_half_data_get_raw_body(const struct http_decoder_half_data *data, hstring *body); + +int http_decoder_half_data_get_decompress_body(const struct http_decoder_half_data *data, hstring *body); +void http_half_get_lastest_decompress_buffer(struct http_decoder_half_data *data, hstring *decompress_body); +void http_half_decompress_buffer_free(struct http_decoder_half_data *data, hstring *decompress_body); +void http_decoder_half_data_dump(struct http_decoder_half *half); + +void http_decoder_get_host_feed_url(struct http_decoder_half *half); +void http_decoder_get_url(struct http_decoder_half_data *hfdata, nmx_pool_t *mempool); +int http_half_data_get_decode_url(struct http_decoder_half_data *res_data, hstring *url); +void http_decoder_join_url(struct http_decoder_half_data *hfdata, + nmx_pool_t *mempool, + const struct http_header *host_hdr); +int http_decoder_join_url_finally(struct http_event_context *ev_ctx, + struct http_decoder_half_data *hfdata, + nmx_pool_t *mempool); +int http_half_data_get_url(struct http_decoder_half_data *res_data, hstring *url); +int http_half_data_get_transaction_seq(struct http_decoder_half_data *hf_data); + +void http_half_data_update_commit_index(struct http_decoder_half_data * half_data); +void http_half_pre_context_free(struct session *sess, struct http_decoder_exdata *exdata); +void http_half_update_state(struct http_decoder_half_data *hf_data, enum http_event state); +int http_half_data_get_total_parsed_header_count(struct http_decoder_half_data * half_data); +void http_half_get_max_transaction_seq(struct http_decoder_exdata *exdata, long long *max_req_seq, long long *max_res_seq); + +enum http_content_encoding http_half_data_get_content_encoding(struct http_decoder_half_data *hf_data); +#endif
\ No newline at end of file diff --git a/decoders/http/http_decoder_inc.h b/decoders/http/http_decoder_inc.h new file mode 100644 index 0000000..aee121c --- /dev/null +++ b/decoders/http/http_decoder_inc.h @@ -0,0 +1,162 @@ +/* +********************************************************************************************** +* File: http_decoder_inc.h +* Description: +* Authors: Liu WenTan <[email protected]> +* Date: 2024-01-10 +* Copyright: (c) Since 2022 Geedge Networks, Ltd. All rights reserved. +*********************************************************************************************** +*/ + +#ifndef _HTTP_DECODER_INC_H_ +#define _HTTP_DECODER_INC_H_ + +#ifndef __USE_MISC +#define __USE_MISC 1 +#endif + +#include <cstddef> +#ifdef __cplusplus +extern "C" +{ +#endif +#include <bits/types/struct_iovec.h> +#include "stellar/stellar.h" +#include "stellar/layer.h" +#include "stellar/packet.h" +#include "stellar/utils.h" +#include "stellar/session.h" +#include "stellar/stellar_mq.h" +#include "stellar/stellar_exdata.h" + +#include "mempool/nmx_palloc.h" +#include "stellar/utils.h" +#include "stellar/http.h" +#include "http_decoder_result_queue.h" +#include "http_decoder_half.h" +#include "http_decoder_table.h" +#include "http_decoder_result_queue.h" +#include "http_decoder_utils.h" +#include "http_decoder_stat.h" +#include "http_decoder_tunnel.h" +#include "fieldstat/fieldstat_easy.h" +#include "toml/toml.h" + +#ifndef likely +#define likely(x) __builtin_expect((x), 1) +#endif +#ifndef unlikely +#define unlikely(x) __builtin_expect((x), 0) +#endif + +#define MEMPOOL_CALLOC(pool, type, number) ((type *)nmx_pcalloc(pool, sizeof(type) * number)) +#define MEMPOOL_REALLOC(pool) +#define MEMPOOL_FREE(pool, p) nmx_pfree(pool, p) + +#define ENABLE_MEMPOOL 0 +#if ENABLE_MEMPOOL +#define HD_CALLOC(pool, type, number) MEMPOOL_CALLOC(pool, number, type) +#define HD_FREE(pool, p) MEMPOOL_FREE(pool, p) +#else +#define HD_CALLOC(pool, type, number) CALLOC(type, number) +#define HD_FREE(pool, p) FREE(p) +#endif + +#define HTTP_IDENTIFY_LEN 16 +#define HD_RESULT_QUEUE_LEN 16 + +#define DEFAULT_STAT_OUTPUT_INTERVAL 1 +#define DEFAULT_STAT_INTERVAL_PKTS 1000 +#define DEFAULT_MEMPOOL_SIZE (32 * 1024) + +#define HTTPD_CFG_FILE "./etc/http/http_decoder.toml" +#define FILEDSTAT_OUTPUT_FILE "./metrics/http_decoder_fs4.json" + +#define HTTP_CTX_NOT_HTTP "__NOT_HTTP_SESS__" +#define HTTP_CTX_IS_HTTP "__FAKE_HTTP_CTX__" + +struct http_decoder_config +{ + int decompress_switch; + int stat_interval_pkts; // call fieldstat_incrby every stat_interval_pkts + int stat_output_interval; + int proxy_enable; + size_t result_queue_len; // per session result queue length + size_t mempool_size; // per session mempool size +}; + +/** + * NOTE: http_message don't have the ownership of data + */ +struct http_message +{ + uint8_t flow_type; + enum http_message_type type; + size_t queue_index; + struct http_decoder_result_queue *ref_queue; + hstring raw_payload;//cause tcp reorder, maybe receive many tcp segments for one packet + hstring decompress_payload; + hstring tunnel_payload; +}; + +struct http_decoder +{ + struct http_decoder_half *c2s_half; + struct http_decoder_half *s2c_half; +}; + +enum httpd_topic_index{ + HTTPD_TOPIC_TCP_STREAM_INDEX = 0, + HTTPD_TOPIC_HTTP_MSG_INDEX, + HTTPD_TOPIC_HTTP_TUNNEL_INDEX, + HTTPD_TOPIC_INDEX_MAX, +}; + +struct http_decoder_exdata +{ + int sub_topic_id; //tcp_stream + int pub_topic_id; //http message or http tunnel msg + struct http_decoder_result_queue *queue; + struct http_decoder *decoder; + nmx_pool_t *mempool; + enum http_tunnel_state tunnel_state; + int in_tunnel_is_http; +}; + +// struct http_decoder_context{ +// int array_size; +// struct http_decoder_exdata **exdata_array; //raw tcp stream for http msg; http tunnel for inner http transaction. +// }; + +struct http_topic_exdata_compose{ + enum httpd_topic_index index; + const char *topic_name; + on_session_msg_cb_func *on_msg_cb; + stellar_msg_free_cb_func *msg_free_cb; + const char *exdata_name; + stellar_exdata_free *exdata_free_cb; + int sub_topic_id; //as consumer + int exdata_id; +}; + +struct http_decoder_env +{ + struct stellar *st; + int plugin_id; + struct http_topic_exdata_compose topic_exdata_compose[HTTPD_TOPIC_INDEX_MAX]; + struct http_decoder_config hd_cfg; + struct http_decoder_stat hd_stat; +}; + +struct http_message; + +struct http_message *http_message_new(enum http_message_type type, struct http_decoder_result_queue *queue, + int queue_index, unsigned char flow_type); +struct http_message *http_body_message_new(enum http_message_type type, struct http_decoder_result_queue *queue, + int queue_index, uint8_t flow_type, hstring *raw_payload); +int http_topic_exdata_compose_get_index(const struct http_decoder_env *httpd_env, int by_topic_id); +#ifdef __cplusplus +} +#endif + +#endif
\ No newline at end of file diff --git a/decoders/http/http_decoder_result_queue.cpp b/decoders/http/http_decoder_result_queue.cpp new file mode 100644 index 0000000..6a5e003 --- /dev/null +++ b/decoders/http/http_decoder_result_queue.cpp @@ -0,0 +1,148 @@ +/* +********************************************************************************************** +* File: http_decoder_result_queue.c +* Description: +* Authors: Liuwentan <[email protected]> +* Date: 2024-01-15 +* Copyright: (c) Since 2022 Geedge Networks, Ltd. All rights reserved. +*********************************************************************************************** +*/ +#include <assert.h> +#include "http_decoder_inc.h" + +struct http_decoder_result_queue * +http_decoder_result_queue_new(nmx_pool_t *mempool, size_t queue_size) +{ + struct http_decoder_result_queue *queue = + MEMPOOL_CALLOC(mempool, struct http_decoder_result_queue, 1); + assert(queue); + + queue->req_index = 0; + queue->res_index = 0; + queue->queue_size = queue_size; + queue->array = MEMPOOL_CALLOC(mempool, struct http_decoder_result, + queue->queue_size); + assert(queue->array); + return queue; +} + +void http_decoder_result_queue_free(nmx_pool_t *mempool, struct http_decoder_result_queue *queue) +{ + if (NULL == queue) { + return; + } + + if (queue->array != NULL) { + for (size_t i = 0; i < queue->queue_size; i++) { + if (queue->array[i].req_data != NULL) { + http_decoder_half_data_free(mempool, queue->array[i].req_data); + queue->array[i].req_data = NULL; + } + + if (queue->array[i].res_data != NULL) { + http_decoder_half_data_free(mempool, queue->array[i].res_data); + queue->array[i].res_data = NULL; + } + } + MEMPOOL_FREE(mempool, queue->array); + } + MEMPOOL_FREE(mempool, queue); +} + +void http_decoder_result_queue_inc_req_index(struct http_decoder_result_queue *queue) +{ + assert(queue); + queue->req_index++; + queue->req_index = queue->req_index % queue->queue_size; +} + +void http_decoder_result_queue_inc_res_index(struct http_decoder_result_queue *queue) +{ + assert(queue); + queue->res_index++; + queue->res_index = queue->res_index % queue->queue_size; +} + +size_t http_decoder_result_queue_req_index(struct http_decoder_result_queue *queue) +{ + assert(queue); + return queue->req_index; +} + +size_t http_decoder_result_queue_res_index(struct http_decoder_result_queue *queue) +{ + assert(queue); + return queue->res_index; +} + +int http_decoder_result_queue_push_req(struct http_decoder_result_queue *queue, + struct http_decoder_half_data *req_data) +{ + if (NULL == queue || NULL == req_data) { + return -1; + } + assert(queue->array[queue->req_index].req_data == NULL); + if (queue->array[queue->req_index].req_data != NULL) { + return -1; + } + queue->array[queue->req_index].req_data = req_data; + return 0; +} + +int http_decoder_result_queue_push_res(struct http_decoder_result_queue *queue, + struct http_decoder_half_data *res_data) +{ + if (NULL == queue || NULL == res_data) { + return -1; + } + assert(queue->array[queue->res_index].res_data == NULL); + if (queue->array[queue->res_index].res_data != NULL) { + return -1; + } + + queue->array[queue->res_index].res_data = res_data; + return 0; +} + +struct http_decoder_half_data * +http_decoder_result_queue_pop_req(struct http_decoder_result_queue *queue) +{ + if (NULL == queue) { + return NULL; + } + struct http_decoder_half_data *req_data = queue->array[queue->req_index].req_data; + queue->array[queue->req_index].req_data = NULL; + return req_data; +} + +struct http_decoder_half_data * +http_decoder_result_queue_pop_res(struct http_decoder_result_queue *queue) +{ + if (NULL == queue) { + return NULL; + } + + struct http_decoder_half_data *res_data = queue->array[queue->res_index].res_data; + queue->array[queue->res_index].res_data = NULL; + return res_data; +} + +struct http_decoder_half_data * +http_decoder_result_queue_peek_req(struct http_decoder_result_queue *queue) +{ + if (NULL == queue) { + return NULL; + } + assert(queue->req_index < queue->queue_size); + return queue->array[queue->req_index].req_data; +} + +struct http_decoder_half_data * +http_decoder_result_queue_peek_res(struct http_decoder_result_queue *queue) +{ + if (NULL == queue) { + return NULL; + } + assert(queue->res_index < queue->queue_size); + return queue->array[queue->res_index].res_data; +}
\ No newline at end of file diff --git a/decoders/http/http_decoder_result_queue.h b/decoders/http/http_decoder_result_queue.h new file mode 100644 index 0000000..4d024eb --- /dev/null +++ b/decoders/http/http_decoder_result_queue.h @@ -0,0 +1,61 @@ +/* +********************************************************************************************** +* File: http_decoder_result_queue.h +* Description: +* Authors: Liuwentan <[email protected]> +* Date: 2024-01-15 +* Copyright: (c) Since 2022 Geedge Networks, Ltd. All rights reserved. +*********************************************************************************************** +*/ +#ifndef _HTTP_DECODER_RESULT_QUEUE_H_ +#define _HTTP_DECODER_RESULT_QUEUE_H_ + +#include <stddef.h> +#include "mempool/nmx_palloc.h" +#include "http_decoder_half.h" + +struct http_decoder_result { + struct http_decoder_half_data *req_data; + struct http_decoder_half_data *res_data; +}; + +struct http_decoder_result_queue { + size_t req_index; + size_t res_index; + size_t queue_size; + struct http_decoder_result *array; +}; + +struct http_decoder_result_queue * +http_decoder_result_queue_new(nmx_pool_t *mempool, size_t queue_size); + +void http_decoder_result_queue_free(nmx_pool_t *mempool, + struct http_decoder_result_queue *queue); + +void http_decoder_result_queue_inc_req_index(struct http_decoder_result_queue *queue); + +void http_decoder_result_queue_inc_res_index(struct http_decoder_result_queue *queue); + +size_t http_decoder_result_queue_req_index(struct http_decoder_result_queue *queue); + +size_t http_decoder_result_queue_res_index(struct http_decoder_result_queue *queue); + +struct http_decoder_half_data * +http_decoder_result_queue_pop_req(struct http_decoder_result_queue *queue); + +struct http_decoder_half_data * +http_decoder_result_queue_pop_res(struct http_decoder_result_queue *queue); + +int http_decoder_result_queue_push_req(struct http_decoder_result_queue *queue, + struct http_decoder_half_data *req_data); + +int http_decoder_result_queue_push_res(struct http_decoder_result_queue *queue, + struct http_decoder_half_data *res_data); + +struct http_decoder_half_data * +http_decoder_result_queue_peek_req(struct http_decoder_result_queue *queue); + +struct http_decoder_half_data * +http_decoder_result_queue_peek_res(struct http_decoder_result_queue *queue); + +#endif
\ No newline at end of file diff --git a/decoders/http/http_decoder_stat.cpp b/decoders/http/http_decoder_stat.cpp new file mode 100644 index 0000000..f616658 --- /dev/null +++ b/decoders/http/http_decoder_stat.cpp @@ -0,0 +1,121 @@ +#include <assert.h> +#include <stdio.h> +#include <pthread.h> +#include <unistd.h> +#include "http_decoder_inc.h" + +static const struct hd_stat_config_tuple g_httpd_stat_tuple[HTTPD_STAT_MAX] = +{ + {HTTPD_STAT_BYTES_C2S, "bytes_c2s"}, + {HTTPD_STAT_BYTES_S2C, "bytes_s2c"}, + {HTTPD_STAT_TCP_SEG_C2S, "tcp_seg_c2s"}, + {HTTPD_STAT_TCP_SEG_S2C, "tcp_seg_s2c"}, + {HTTPD_STAT_HEADERS_C2S, "headers_c2s"}, + {HTTPD_STAT_HEADERS_S2C, "headers_s2c"}, + {HTTPD_STAT_ZIP_BYTES, "zip_bytes"}, + {HTTPD_STAT_UNZIP_BYTES, "unzip_bytes"}, + {HTTPD_STAT_URL_BYTES, "url_bytes"}, + {HTTPD_STAT_SESSION_NEW, "session_new"}, + {HTTPD_STAT_SESSION_FREE, "session_free"}, + {HTTPD_STAT_SESSION_EXCEPTION, "sess_exception"}, + {HTTPD_STAT_TUNNEL, "tunnel"}, + {HTTPD_STAT_TRANSACTION_NEW, "trans_new"}, + {HTTPD_STAT_TRANSACTION_FREE, "trans_free"}, + {HTTPD_STAT_ASYMMETRY_SESSION_C2S, "asymmetry_sess_c2s"}, + {HTTPD_STAT_ASYMMETRY_SESSION_S2C, "asymmetry_sess_s2c"}, + {HTTPD_STAT_ASYMMETRY_TRANSACTION_C2S, "asymmetry_trans_c2s"}, + {HTTPD_STAT_ASYMMETRY_TRANSACTION_S2C, "asymmetry_trans_s2c"}, + {HTTPD_STAT_PARSE_ERR, "parse_err"}, +}; + +void http_decoder_stat_free(struct http_decoder_stat *hd_stat) +{ + if(hd_stat->timer_pid != 0){ + pthread_cancel(hd_stat->timer_pid); + } + if(hd_stat->stats != NULL){ + free(hd_stat->stats); + } + if(hd_stat->fse != NULL){ + fieldstat_easy_free(hd_stat->fse); + } +} + +static void *httpd_stat_timer_thread(void *arg) +{ + pthread_setname_np(pthread_self(), "http_decoder_timer_thread"); + struct http_decoder_stat *hd_stat = (struct http_decoder_stat *)arg; + struct timespec res; + while(1){ + clock_gettime(CLOCK_MONOTONIC, &res); + hd_stat->current_time_ms = (res.tv_sec * 1000) + (res.tv_nsec / 1000000); + usleep(800); + } + return NULL; +} + +int http_decoder_stat_init(struct http_decoder_stat *hd_stat, int thread_max, int stat_interval_pkts, int stat_interval_time) +{ + assert(sizeof(g_httpd_stat_tuple)/sizeof(struct hd_stat_config_tuple) == HTTPD_STAT_MAX); + if(sizeof(g_httpd_stat_tuple)/sizeof(struct hd_stat_config_tuple) != HTTPD_STAT_MAX){ + fprintf(stderr, "enum http_decoder_stat_type number not match with g_httpd_stat_tuple!"); + return -1; + } + hd_stat->fse = fieldstat_easy_new(thread_max, "http_decoder_statistics", NULL, 0); + if (NULL == hd_stat->fse) + { + fprintf(stderr, "fieldstat_easy_new failed."); + return -1; + } + + for(int i = 0; i < HTTPD_STAT_MAX; i++) + { + hd_stat->field_stat_id[i] = fieldstat_easy_register_counter(hd_stat->fse, g_httpd_stat_tuple[i].name); + if (hd_stat->field_stat_id[i] < 0) + { + fprintf(stderr, "fieldstat_easy_register_counter %s failed.", g_httpd_stat_tuple[i].name); + fieldstat_easy_free(hd_stat->fse); + hd_stat->fse = NULL; + return -1; + } + } + + int ret = fieldstat_easy_enable_auto_output(hd_stat->fse, FILEDSTAT_OUTPUT_FILE, stat_interval_time); + if (ret < 0) + { + fprintf(stderr, "fieldstat_easy_enable_auto_output failed."); + fieldstat_easy_free(hd_stat->fse); + hd_stat->fse = NULL; + return -1; + } + hd_stat->stats = (struct hd_statistics *)calloc(thread_max, sizeof(struct hd_statistics)); + hd_stat->stat_interval_pkts = stat_interval_pkts; + hd_stat->stat_interval_time = stat_interval_time; + pthread_create(&hd_stat->timer_pid, NULL, httpd_stat_timer_thread, hd_stat); + pthread_detach(hd_stat->timer_pid); + return 0; +} + +void http_decoder_stat_update(struct http_decoder_stat *hd_stat, int thread_id, enum http_decoder_stat_type type, long long value) +{ + assert(hd_stat); + assert(thread_id >= 0); + assert(type < HTTPD_STAT_MAX); + + if(unlikely(hd_stat->stats == NULL)){ + return; + } + + struct hd_statistics *cur_hds = &hd_stat->stats[thread_id]; + + cur_hds->counter[type] += value; + cur_hds->batch[type]++; + + if(cur_hds->batch[type] >= hd_stat->stat_interval_pkts + || cur_hds->time_ms[type] + 1000 < hd_stat->current_time_ms){ + fieldstat_easy_counter_incrby(hd_stat->fse, thread_id, hd_stat->field_stat_id[type], NULL, 0, cur_hds->counter[type]); + cur_hds->counter[type] = 0; + cur_hds->batch[type] = 0; + cur_hds->time_ms[type] = hd_stat->current_time_ms; + } +}
\ No newline at end of file diff --git a/decoders/http/http_decoder_stat.h b/decoders/http/http_decoder_stat.h new file mode 100644 index 0000000..719e6a0 --- /dev/null +++ b/decoders/http/http_decoder_stat.h @@ -0,0 +1,57 @@ +#ifndef _HTTP_DECODER_STAT_H_ +#define _HTTP_DECODER_STAT_H_ 1 + +#include <fieldstat/fieldstat_easy.h> +enum http_decoder_stat_type +{ + HTTPD_STAT_BYTES_C2S = 0, + HTTPD_STAT_BYTES_S2C, + HTTPD_STAT_TCP_SEG_C2S, + HTTPD_STAT_TCP_SEG_S2C, + HTTPD_STAT_HEADERS_C2S, + HTTPD_STAT_HEADERS_S2C, + HTTPD_STAT_ZIP_BYTES, //only if Content-Encoding is gzip, deflate, br + HTTPD_STAT_UNZIP_BYTES, //only if Content-Encoding is gzip, deflate, br + HTTPD_STAT_URL_BYTES, + HTTPD_STAT_SESSION_NEW, + HTTPD_STAT_SESSION_FREE, + HTTPD_STAT_SESSION_EXCEPTION, // rst, kickout, lost packet, etc. + HTTPD_STAT_TUNNEL, + HTTPD_STAT_TRANSACTION_NEW, + HTTPD_STAT_TRANSACTION_FREE, + HTTPD_STAT_ASYMMETRY_SESSION_C2S, + HTTPD_STAT_ASYMMETRY_SESSION_S2C, + HTTPD_STAT_ASYMMETRY_TRANSACTION_C2S, + HTTPD_STAT_ASYMMETRY_TRANSACTION_S2C, + HTTPD_STAT_PARSE_ERR, + HTTPD_STAT_MAX, +}; + +struct hd_stat_config_tuple +{ + enum http_decoder_stat_type type; + const char *name; +}; + +struct hd_statistics +{ + long long time_ms[HTTPD_STAT_MAX]; + long long counter[HTTPD_STAT_MAX]; + int batch[HTTPD_STAT_MAX]; //call fieldstat_easy_counter_incrby() per batch +}__attribute__ ((aligned (64))); + +struct http_decoder_stat +{ + pthread_t timer_pid; + long long current_time_ms; + struct fieldstat_easy *fse; + int stat_interval_pkts; // call fieldstat_incrby every stat_interval_pkts + int stat_interval_time; //second + int field_stat_id[HTTPD_STAT_MAX]; + struct hd_statistics *stats; //size is thread number +}; + +int http_decoder_stat_init(struct http_decoder_stat *hd_stat, int thread_max, int stat_interval_pkts, int stat_interval_time); +void http_decoder_stat_free(struct http_decoder_stat *hd_stat); +void http_decoder_stat_update(struct http_decoder_stat *hd_stat, int thread_id, enum http_decoder_stat_type type, long long value); +#endif
\ No newline at end of file diff --git a/decoders/http/http_decoder_string.cpp b/decoders/http/http_decoder_string.cpp new file mode 100644 index 0000000..d54d22b --- /dev/null +++ b/decoders/http/http_decoder_string.cpp @@ -0,0 +1,260 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include "http_decoder_inc.h" + +static const char *string_state_to_desc(enum string_state state) +{ + switch (state) { + case STRING_STATE_INIT: + return "init"; + break; + case STRING_STATE_REFER: + return "refer"; + break; + case STRING_STATE_CACHE: + return "cache"; + break; + case STRING_STATE_COMMIT: + return "commit"; + break; + default: + return "unknown"; + break; + } +} + +void http_decoder_string_refer(struct http_decoder_string *rstr, + const char *at, size_t length) +{ + if (NULL == rstr) { + return; + } + + switch (rstr->state) { + case STRING_STATE_INIT: + case STRING_STATE_CACHE: + rstr->refer.iov_base = (char *)at; + rstr->refer.iov_len = length; + break; + default: + abort(); + break; + } + + rstr->state = STRING_STATE_REFER; +} + +static void string_refer2cache(struct http_decoder_string *rstr) +{ + if (0 == rstr->refer.iov_len) { + return; + } + if (rstr->cache.iov_len >= rstr->max_cache_size) { + return; + } + + size_t length = rstr->cache.iov_len + rstr->refer.iov_len; + if (length > rstr->max_cache_size) { + length = rstr->max_cache_size; + } + + if (NULL == rstr->cache.iov_base) { + rstr->cache.iov_base = CALLOC(char, length + 1); + memcpy(rstr->cache.iov_base, rstr->refer.iov_base, length); + } else { + rstr->cache.iov_base = REALLOC(char, rstr->cache.iov_base, length + 1); + memcpy((char *)rstr->cache.iov_base + rstr->cache.iov_len, rstr->refer.iov_base, + (length - rstr->cache.iov_len)); + } + + rstr->cache.iov_len = length; + rstr->refer.iov_base = NULL; + rstr->refer.iov_len = 0; +} + +static void string_commit2cache(struct http_decoder_string *rstr) +{ + if (rstr->cache.iov_len == rstr->commit.iov_len && + rstr->cache.iov_base == rstr->commit.iov_base) { + rstr->commit.iov_base = NULL; + rstr->commit.iov_len = 0; + return; + } + + //Only http header key need to backward to cache + size_t length = 0; + if (rstr->commit.iov_len > rstr->max_cache_size) { + length = rstr->max_cache_size; + } else { + length = rstr->commit.iov_len; + } + + if (length > 0) { + if (NULL == rstr->cache.iov_base) { + rstr->cache.iov_base = CALLOC(char, length + 1); + } else { + abort(); + } + memcpy(rstr->cache.iov_base, rstr->commit.iov_base, length); + rstr->cache.iov_len = length; + + rstr->commit.iov_base = NULL; + rstr->commit.iov_len = 0; + } +} + +void http_decoder_string_cache(struct http_decoder_string *rstr) +{ + if (NULL == rstr) { + return; + } + + switch (rstr->state) { + case STRING_STATE_REFER: + string_refer2cache(rstr); + break; + case STRING_STATE_CACHE: + break; + case STRING_STATE_COMMIT: + //commit backward to cache + string_commit2cache(rstr); + break; + default: + abort(); + break; + } + rstr->state = STRING_STATE_CACHE; +} + +void http_decoder_string_commit(struct http_decoder_string *rstr) +{ + if (NULL == rstr) { + return; + } + + switch (rstr->state) { + case STRING_STATE_REFER: + if (rstr->cache.iov_len) { + http_decoder_string_cache(rstr); + + rstr->commit.iov_base = rstr->cache.iov_base; + rstr->commit.iov_len = rstr->cache.iov_len; + // not overwrite rstr->cache.iov_base + } else { + rstr->commit.iov_base = rstr->refer.iov_base; + rstr->commit.iov_len = rstr->refer.iov_len; + + rstr->refer.iov_base = NULL; + rstr->refer.iov_len = 0; + } + break; + case STRING_STATE_CACHE: + rstr->commit.iov_base = rstr->cache.iov_base; + rstr->commit.iov_len = rstr->cache.iov_len; + // not overwrite rstr->cache.iov_base + break; + default: + //abort(); + break; + } + + rstr->state = STRING_STATE_COMMIT; +} + +void http_decoder_string_reset(struct http_decoder_string *rstr) +{ + assert(rstr); + + switch (rstr->state) { + case STRING_STATE_INIT: + case STRING_STATE_REFER: + case STRING_STATE_CACHE: + case STRING_STATE_COMMIT: + FREE(rstr->cache.iov_base); + memset(rstr, 0, sizeof(struct http_decoder_string)); + break; + default: + abort(); + break; + } + + rstr->state = STRING_STATE_INIT; +} + + +void http_decoder_string_init(struct http_decoder_string *rstr, + size_t max_cache_size) +{ + rstr->max_cache_size = max_cache_size; +} + +void http_decoder_string_reinit(struct http_decoder_string *rstr) +{ + if (rstr->state == STRING_STATE_CACHE) { + return; + } + + if (rstr->state == STRING_STATE_COMMIT && + rstr->cache.iov_base == rstr->commit.iov_base && + rstr->cache.iov_len == rstr->commit.iov_len) { + return; + } + + if (rstr->cache.iov_base != NULL) { + FREE(rstr->cache.iov_base); + rstr->cache.iov_len = 0; + } + +#if 0 + rstr->refer.iov_base = NULL; + rstr->refer.iov_len = 0; + rstr->commit.iov_base = NULL; + rstr->commit.iov_len = 0; + rstr->state = STRING_STATE_INIT; +#endif +} + +enum string_state http_decoder_string_state(const struct http_decoder_string *rstr) +{ + return rstr->state; +} + +int http_decoder_string_get(const struct http_decoder_string *rstr, hstring *out) +{ + if (NULL == rstr || NULL == out) { + return -1; + } + + if (http_decoder_string_state(rstr) == STRING_STATE_COMMIT) { + out->iov_base = rstr->commit.iov_base; + out->iov_len = rstr->commit.iov_len; + } else { + out->iov_base = NULL; + out->iov_len = 0; + } + + return 0; +} + +void http_decoder_string_dump(struct http_decoder_string *rstr, const char *desc) +{ + if (NULL == rstr) { + return; + } + + char *refer_str = safe_dup((char *)rstr->refer.iov_base, rstr->refer.iov_len); + char *cache_str = safe_dup((char *)rstr->cache.iov_base, rstr->cache.iov_len); + char *commit_str = safe_dup((char *)rstr->commit.iov_base, rstr->commit.iov_len); + + printf("%s: state: %s, refer: {len: %02zu, iov_base: %s}, cache: {len: %02zu, iov_base: %s}, commit: {len: %02zu, iov_base: %s}\n", + desc, string_state_to_desc(rstr->state), + rstr->refer.iov_len, refer_str, + rstr->cache.iov_len, cache_str, + rstr->commit.iov_len, commit_str); + + FREE(refer_str); + FREE(cache_str); + FREE(commit_str); +}
\ No newline at end of file diff --git a/decoders/http/http_decoder_string.h b/decoders/http/http_decoder_string.h new file mode 100644 index 0000000..3b25732 --- /dev/null +++ b/decoders/http/http_decoder_string.h @@ -0,0 +1,74 @@ +#ifndef _HTTP_DECODER_STRING_H_ +#define _HTTP_DECODER_STRING_H_ + +#include "stellar/http.h" + +enum string_state { + STRING_STATE_INIT, + STRING_STATE_REFER, + STRING_STATE_CACHE, + STRING_STATE_COMMIT, +}; + +/* state transition diagram + * +----------+ + * | | + * \|/ | + * +------+ | + * | init | | + * +------+ | + * | | + * +---->| | + * | \|/ | + * | +-------+ | + * | | refer |--+ | + * | +-------+ | | + * | | | | + * | \|/ | | + * | +-------+ | | + * +--| cache | | | + * +-------+ | | + * | | | + * |<------+ | + * \|/ | + * +--------+ | + * | commit | | + * +--------+ | + * | | + * \|/ | + * +--------+ | + * | reset |----+ + * +--------+ + */ + + +//http decoder string +struct http_decoder_string { + hstring refer; // shallow copy + hstring cache; // deep copy + hstring commit; + + enum string_state state; + size_t max_cache_size; +}; + +void http_decoder_string_refer(struct http_decoder_string *rstr, + const char *at, size_t length); + +void http_decoder_string_cache(struct http_decoder_string *rstr); + +void http_decoder_string_commit(struct http_decoder_string *rstr); + +void http_decoder_string_reset(struct http_decoder_string *rstr); + +void http_decoder_string_init(struct http_decoder_string *rstr, + size_t max_cache_size); + +void http_decoder_string_reinit(struct http_decoder_string *rstr); + +enum string_state http_decoder_string_state(const struct http_decoder_string *rstr); + +int http_decoder_string_get(const struct http_decoder_string *rstr, hstring *out); + +void http_decoder_string_dump(struct http_decoder_string *rstr, const char *desc); +#endif
\ No newline at end of file diff --git a/decoders/http/http_decoder_table.cpp b/decoders/http/http_decoder_table.cpp new file mode 100644 index 0000000..b58154f --- /dev/null +++ b/decoders/http/http_decoder_table.cpp @@ -0,0 +1,534 @@ +#include <assert.h> +#include <stdlib.h> +#include <string.h> +#include "http_decoder_inc.h" + +#define INIT_HEADER_CNT 16 +#define MAX_URI_CACHE_SIZE 2048 +#define MAX_STATUS_CACHE_SIZE 32 +#define MAX_METHOD_CACHE_SIZE 8 +#define MAX_VERSION_CACHE_SIZE 4 +#define MAX_HEADER_KEY_CACHE_SIZE 4096 +#define MAX_HEADER_VALUE_CACHE_SIZE 4096 + +struct http_decoder_header { + struct http_decoder_string key; + struct http_decoder_string val; +}; + +struct http_decoder_table { + struct http_decoder_string uri; + struct http_decoder_string status; + struct http_decoder_string method; + struct http_decoder_string version; + struct http_decoder_string body; + + nmx_pool_t *ref_mempool; + int header_complete; // flag for all headers parsed completely + size_t header_cnt; + size_t header_index; //current parsing header + size_t header_iter; //plugins iterate cursor + size_t commit_header_index; //pushed to plugins, whether has called http_message_header_next() + struct http_decoder_header *headers; +}; + +static void http_decoder_table_init(struct http_decoder_table *table) +{ + if (NULL == table) { + return; + } + + struct http_decoder_header *header = NULL; + assert(table); + + http_decoder_string_init(&table->uri, MAX_URI_CACHE_SIZE); + http_decoder_string_init(&table->status, MAX_STATUS_CACHE_SIZE); + http_decoder_string_init(&table->method, MAX_METHOD_CACHE_SIZE); + http_decoder_string_init(&table->version, MAX_METHOD_CACHE_SIZE); + + for (size_t i = 0; i < table->header_cnt; i++) { + header = &table->headers[i]; + http_decoder_string_init(&header->key, MAX_HEADER_KEY_CACHE_SIZE); + http_decoder_string_init(&header->val, MAX_HEADER_VALUE_CACHE_SIZE); + } + + http_decoder_string_init(&table->body, 0); +} + +struct http_decoder_table *http_decoder_table_new(nmx_pool_t *mempool) +{ + struct http_decoder_table *table = + MEMPOOL_CALLOC(mempool, struct http_decoder_table, 1); + assert(table); + + table->ref_mempool = mempool; + table->header_cnt = INIT_HEADER_CNT; + table->headers = MEMPOOL_CALLOC(mempool, struct http_decoder_header, + table->header_cnt); + table->commit_header_index = 0; + http_decoder_table_init(table); + + return table; +} + +void http_decoder_table_free(struct http_decoder_table *table) +{ + if (NULL == table) { + return; + } + if (table->uri.cache.iov_base != NULL) { + FREE(table->uri.cache.iov_base); + } + if (table->status.cache.iov_base != NULL) { + FREE(table->status.cache.iov_base); + } + if (table->method.cache.iov_base != NULL) { + FREE(table->method.cache.iov_base); + } + if (table->version.cache.iov_base != NULL) { + FREE(table->version.cache.iov_base); + } + if (table->body.cache.iov_base != NULL) { + FREE(table->body.cache.iov_base); + } + + if (table->headers != NULL) { + for (size_t i = 0; i < table->header_cnt; i++) { + if (table->headers[i].key.cache.iov_base != NULL) { + FREE(table->headers[i].key.cache.iov_base); + } + + if (table->headers[i].val.cache.iov_base != NULL) { + FREE(table->headers[i].val.cache.iov_base); + } + } + + MEMPOOL_FREE(table->ref_mempool, table->headers); + table->headers = NULL; + } + MEMPOOL_FREE(table->ref_mempool, table); +} + +enum string_state +http_decoder_table_state(struct http_decoder_table *table, enum http_item type) +{ + if (NULL == table) { + return STRING_STATE_INIT; + } + struct http_decoder_header *header = NULL; + enum string_state state = STRING_STATE_INIT; + assert(table); + + switch (type) { + case HTTP_ITEM_URI: + state = http_decoder_string_state(&table->uri); + break; + case HTTP_ITEM_STATUS: + state = http_decoder_string_state(&table->status); + break; + case HTTP_ITEM_METHOD: + state = http_decoder_string_state(&table->method); + break; + case HTTP_ITEM_VERSION: + state = http_decoder_string_state(&table->version); + break; + case HTTP_ITEM_HDRKEY: + assert(table->header_index < table->header_cnt); + header = &table->headers[table->header_index]; + state = http_decoder_string_state(&header->key); + break; + case HTTP_ITEM_HDRVAL: + assert(table->header_index < table->header_cnt); + header = &table->headers[table->header_index]; + state = http_decoder_string_state(&header->val); + break; + case HTTP_ITEM_BODY: + state = http_decoder_string_state(&table->body); + break; + default: + abort(); + break; + } + + return state; +} + +void http_decoder_table_refer(struct http_decoder_table *table, enum http_item type, + const char *at, size_t len) +{ + if (NULL == table) { + return; + } + + struct http_decoder_header *header = NULL; + assert(table); + + switch (type) { + case HTTP_ITEM_URI: + http_decoder_string_refer(&table->uri, at, len); + break; + case HTTP_ITEM_STATUS: + http_decoder_string_refer(&table->status, at, len); + break; + case HTTP_ITEM_METHOD: + http_decoder_string_refer(&table->method, at, len); + break; + case HTTP_ITEM_VERSION: + http_decoder_string_refer(&table->version, at, len); + break; + case HTTP_ITEM_HDRKEY: + assert(table->header_index < table->header_cnt); + header = &table->headers[table->header_index]; + http_decoder_string_refer(&header->key, at, len); + break; + case HTTP_ITEM_HDRVAL: + assert(table->header_index < table->header_cnt); + header = &table->headers[table->header_index]; + http_decoder_string_refer(&header->val, at, len); + break; + case HTTP_ITEM_BODY: + http_decoder_string_refer(&table->body, at, len); + break; + default: + abort(); + break; + } +} + +void http_decoder_table_cache(struct http_decoder_table *table, enum http_item type) +{ + if (NULL == table) { + return; + } + + struct http_decoder_header *header = NULL; + assert(table); + + switch (type) { + case HTTP_ITEM_URI: + http_decoder_string_cache(&table->uri); + break; + case HTTP_ITEM_STATUS: + http_decoder_string_cache(&table->status); + break; + case HTTP_ITEM_METHOD: + http_decoder_string_cache(&table->method); + break; + case HTTP_ITEM_VERSION: + http_decoder_string_cache(&table->version); + break; + case HTTP_ITEM_HDRKEY: + assert(table->header_index < table->header_cnt); + header = &table->headers[table->header_index]; + http_decoder_string_cache(&header->key); + break; + case HTTP_ITEM_HDRVAL: + assert(table->header_index < table->header_cnt); + header = &table->headers[table->header_index]; + http_decoder_string_cache(&header->val); + break; + case HTTP_ITEM_BODY: + http_decoder_string_cache(&table->body); + break; + default: + abort(); + break; + } +} + +void http_decoder_table_commit(struct http_decoder_table *table, enum http_item type) +{ + if (NULL == table) { + return; + } + + size_t i = 0; + struct http_decoder_header *header = NULL; + assert(table); + + switch (type) { + case HTTP_ITEM_URI: + http_decoder_string_commit(&table->uri); + break; + case HTTP_ITEM_STATUS: + http_decoder_string_commit(&table->status); + break; + case HTTP_ITEM_METHOD: + http_decoder_string_commit(&table->method); + break; + case HTTP_ITEM_VERSION: + http_decoder_string_commit(&table->version); + break; + case HTTP_ITEM_HDRKEY: + assert(table->header_index < table->header_cnt); + header = &table->headers[table->header_index]; + http_decoder_string_commit(&header->key); + break; + case HTTP_ITEM_HDRVAL: + header = &table->headers[table->header_index]; + http_decoder_string_commit(&header->val); + // inc index + if ((table->header_index + 1) >= table->header_cnt) { + struct http_decoder_header *old_headers = table->headers; + table->headers = + MEMPOOL_CALLOC(table->ref_mempool, struct http_decoder_header, + table->header_cnt * 2); + table->header_cnt *= 2; + + for (i = 0; i <= table->header_index; i++) { + table->headers[i] = old_headers[i]; + } + + MEMPOOL_FREE(table->ref_mempool, old_headers); + + for (i = table->header_index + 1; i < table->header_cnt; i++) { + header = &table->headers[i]; + memset(header, 0, sizeof(struct http_decoder_header)); + http_decoder_string_init(&header->key, MAX_HEADER_KEY_CACHE_SIZE); + http_decoder_string_init(&header->val, MAX_HEADER_VALUE_CACHE_SIZE); + } + } + table->header_index++; + break; + case HTTP_ITEM_BODY: + http_decoder_string_commit(&table->body); + break; + default: + abort(); + break; + } +} + +void http_decoder_table_reset(struct http_decoder_table *table, enum http_item type) +{ + if (NULL == table) { + return; + } + + struct http_decoder_header *header = NULL; + assert(table); + + switch (type) { + case HTTP_ITEM_URI: + http_decoder_string_reset(&table->uri); + break; + case HTTP_ITEM_STATUS: + http_decoder_string_reset(&table->status); + break; + case HTTP_ITEM_METHOD: + http_decoder_string_reset(&table->method); + break; + case HTTP_ITEM_VERSION: + http_decoder_string_reset(&table->version); + break; + case HTTP_ITEM_HDRKEY: + header = &table->headers[table->header_index]; + http_decoder_string_reset(&header->key); + break; + case HTTP_ITEM_HDRVAL: + header = &table->headers[table->header_index]; + http_decoder_string_reset(&header->val); + break; + case HTTP_ITEM_BODY: + http_decoder_string_reset(&table->body); + break; + default: + abort(); + break; + } +} + +void http_decoder_table_reinit(struct http_decoder_table *table) +{ + assert(table); + struct http_decoder_header *header = NULL; + + http_decoder_string_reinit(&table->uri); + http_decoder_string_reinit(&table->status); + http_decoder_string_reinit(&table->method); + http_decoder_string_reinit(&table->version); + // for (size_t i = 0; i < table->header_iter; i++) { + for (size_t i = 0; i < table->commit_header_index; i++) { + //todo, reset header_index, avoid realloc headers as much as possible + header = &table->headers[i]; + http_decoder_string_reinit(&header->key); + http_decoder_string_reinit(&header->val); + } + + http_decoder_string_reinit(&table->body); +} + +void http_decoder_table_dump(struct http_decoder_table *table) +{ + if (NULL == table) { + return; + } + + http_decoder_string_dump(&table->uri, "uri"); + http_decoder_string_dump(&table->status, "status"); + http_decoder_string_dump(&table->method, "method"); + http_decoder_string_dump(&table->version, "version"); + http_decoder_string_dump(&table->body, "body"); + + for (size_t i = 0; i < table->header_cnt; i++) { + struct http_decoder_header *header = &table->headers[i]; + if (NULL == header) { + continue; + } + + http_decoder_string_dump(&header->key, "key"); + http_decoder_string_dump(&header->val, "val"); + } +} + +int http_decoder_table_get_uri(const struct http_decoder_table *table, hstring *out) +{ + if (NULL == table || NULL == out) { + return -1; + } + return http_decoder_string_get(&table->uri, out); +} + +int http_decoder_table_get_method(const struct http_decoder_table *table, hstring *out) +{ + if (NULL == table || NULL == out) { + return -1; + } + return http_decoder_string_get(&table->method, out); +} + +int http_decoder_table_get_status(const struct http_decoder_table *table, hstring *out) +{ + if (NULL == table || NULL == out) { + return -1; + } + return http_decoder_string_get(&table->status, out); +} + +int http_decoder_table_get_version(const struct http_decoder_table *table, hstring *out) +{ + if (NULL == table || NULL == out) { + return -1; + } + return http_decoder_string_get(&table->version, out); +} + +int http_decoder_table_get_body(const struct http_decoder_table *table, hstring *out) +{ + if (NULL == table || NULL == out) { + return -1; + } + return http_decoder_string_get(&table->body, out); +} + +int http_decoder_table_get_header(const struct http_decoder_table *table, const hstring *key, + struct http_header *hdr_result) +{ + for (size_t i = 0; i < table->header_cnt; i++) { + const struct http_decoder_header *tmp_header = &table->headers[i]; + if (tmp_header->key.commit.iov_len != key->iov_len) { + continue; + } + + if (http_decoder_string_state(&tmp_header->key) == STRING_STATE_COMMIT && + http_decoder_string_state(&tmp_header->val) == STRING_STATE_COMMIT) { + hstring tmp_key; + http_decoder_string_get(&tmp_header->key, &tmp_key); + + if (tmp_key.iov_len == key->iov_len && + (0 == strncasecmp((char *)tmp_key.iov_base, (char *)key->iov_base, key->iov_len))) { + http_decoder_string_get(&tmp_header->key, &hdr_result->key); + http_decoder_string_get(&tmp_header->val, &hdr_result->val); + return 0; + } + } + } + return -1; +} + +int http_decoder_table_iter_header(struct http_decoder_table *table, + struct http_header *hdr) +{ + if (NULL == table || NULL == hdr) { + return -1; + } + if (table->header_iter >= table->header_cnt) { + return -1; + } + + struct http_decoder_header *tmp_header = &table->headers[table->header_iter]; + if (tmp_header != NULL) { + if (http_decoder_string_state(&tmp_header->key) == STRING_STATE_COMMIT && + http_decoder_string_state(&tmp_header->val) == STRING_STATE_COMMIT) { + + http_decoder_string_get(&tmp_header->key, &hdr->key); + http_decoder_string_get(&tmp_header->val, &hdr->val); + table->header_iter++; + return 1; + } + } + + hdr->key.iov_base = NULL; + hdr->key.iov_len = 0; + hdr->val.iov_base = NULL; + hdr->val.iov_len = 0; + + return -1; +} + +int http_decoder_table_reset_header_iter(struct http_decoder_table *table) +{ + table->header_iter = 0; + return 0; +} + +int http_decoder_table_has_parsed_header(struct http_decoder_table *table) +{ + // if (NULL == table || (table->header_iter == table->header_index)) { + if (NULL == table || (table->commit_header_index == table->header_index)) { + return 0; + } + + const struct http_decoder_header *tmp_header = &table->headers[table->header_iter]; + + if (http_decoder_string_state(&tmp_header->key) == STRING_STATE_COMMIT + && http_decoder_string_state(&tmp_header->val) == STRING_STATE_COMMIT) { + return 1; + } + + return 0; +} + +int http_decoder_table_header_complete(struct http_decoder_table *table) +{ + if (NULL == table) { + return -1; + } + return table->header_complete; +} + +void http_decoder_table_set_header_complete(struct http_decoder_table *table) +{ + if (NULL == table) { + return; + } + table->header_complete = 1; +} + +void http_decoder_table_reset_header_complete(struct http_decoder_table *table) +{ + if (NULL == table) { + return; + } + table->header_complete = 0; +} + +void http_decoder_table_update_commit_index(struct http_decoder_table *table) +{ + table->commit_header_index = table->header_index; +} + +int http_decoder_table_get_total_parsed_header(struct http_decoder_table *table) +{ + return table->header_index; +}
\ No newline at end of file diff --git a/decoders/http/http_decoder_table.h b/decoders/http/http_decoder_table.h new file mode 100644 index 0000000..bbc6956 --- /dev/null +++ b/decoders/http/http_decoder_table.h @@ -0,0 +1,80 @@ +#ifndef _HTTP_DECODER_TABLE_H_ +#define _HTTP_DECODER_TABLE_H_ +#include <stddef.h> +#include "stellar/http.h" +#include "http_decoder_inc.h" +#include "http_decoder_string.h" + +enum http_item { + HTTP_ITEM_URI = 0x01, + HTTP_ITEM_STATUS = 0x02, + HTTP_ITEM_METHOD = 0x03, + HTTP_ITEM_VERSION = 0x04, + HTTP_ITEM_HDRKEY = 0x05, + HTTP_ITEM_HDRVAL = 0x06, + HTTP_ITEM_BODY = 0x07, +}; + +struct http_decoder_table; +struct http_decoder_table *http_decoder_table_new(nmx_pool_t *mempool); + +void http_decoder_table_free(struct http_decoder_table *table); + +enum string_state +http_decoder_table_state(struct http_decoder_table *table, enum http_item type); + +void http_decoder_table_refer(struct http_decoder_table *table, enum http_item type, + const char *at, size_t len); + +void http_decoder_table_cache(struct http_decoder_table *table, enum http_item type); + +void http_decoder_table_commit(struct http_decoder_table *table, enum http_item type); + +void http_decoder_table_reset(struct http_decoder_table *table, enum http_item type); + +void http_decoder_table_reinit(struct http_decoder_table *table); + +void http_decoder_table_dump(struct http_decoder_table *table); + +int http_decoder_table_get_uri(const struct http_decoder_table *table, hstring *out); + +int http_decoder_table_get_method(const struct http_decoder_table *table, hstring *out); + +int http_decoder_table_get_status(const struct http_decoder_table *table, hstring *out); + +int http_decoder_table_get_version(const struct http_decoder_table *table, hstring *out); + +int http_decoder_table_get_body(const struct http_decoder_table *table, hstring *out); + +int http_decoder_table_get_header(const struct http_decoder_table *table, + const hstring *key, + struct http_header *hdr_res); + +int http_decoder_table_iter_header(struct http_decoder_table *table, + struct http_header *hdr); +int http_decoder_table_reset_header_iter(struct http_decoder_table *table); +/** + * @brief Is there a parsed header + * + * @retval yes(1) no(0) +*/ +int http_decoder_table_has_parsed_header(struct http_decoder_table *table); + +/** + * @brief If headers have been parsed completely + * + * @retval yes(1) no(0) + */ +int http_decoder_table_header_complete(struct http_decoder_table *table); + +/** + * @brief set flag for headers parsed completely +*/ +void http_decoder_table_set_header_complete(struct http_decoder_table *table); + +void http_decoder_table_reset_header_complete(struct http_decoder_table *table); + +void http_decoder_table_update_commit_index(struct http_decoder_table *table); + +int http_decoder_table_get_total_parsed_header(struct http_decoder_table *table); +#endif
\ No newline at end of file diff --git a/decoders/http/http_decoder_tunnel.cpp b/decoders/http/http_decoder_tunnel.cpp new file mode 100644 index 0000000..42f100a --- /dev/null +++ b/decoders/http/http_decoder_tunnel.cpp @@ -0,0 +1,107 @@ +#include <assert.h> +#include <stdio.h> +#include <string.h> +#include <strings.h> +#include <unistd.h> +#include "http_decoder_inc.h" +#include "llhttp.h" + +struct http_tunnel_message +{ + enum http_tunnel_message_type type; + hstring tunnel_payload; +}; + + +int httpd_tunnel_identify(struct http_decoder_env *httpd_env, int curdir, struct http_decoder_half_data *hfdata) +{ + if(0 == httpd_env->hd_cfg.proxy_enable){ + return 0; + } + + if(FLOW_DIRECTION_C2S == curdir){ + struct http_request_line reqline = {}; + http_decoder_half_data_get_request_line(hfdata, &reqline); + if(0 == strncasecmp_safe("CONNECT", (char *)reqline.method.iov_base, + 7, reqline.method.iov_len)){ + return 1; + } + }else{ + struct http_response_line resline = {}; + http_decoder_half_data_get_response_line(hfdata, &resline); + if(resline.status_code == HTTP_STATUS_OK + && 0 == strncasecmp_safe("Connection established", (char *)resline.status.iov_base, + strlen("Connection established"), resline.status.iov_len)){ + return 1; + } + } + + return 0; +} + +int httpd_is_tunnel_session(const struct http_decoder_env *httpd_env, const struct http_decoder_exdata *ex_data) +{ + if(0 == httpd_env->hd_cfg.proxy_enable){ + return 0; + } + return (ex_data && ex_data->tunnel_state != HTTP_TUN_NON); +} + +int httpd_in_tunnel_transmitting(const struct http_decoder_env *httpd_env, struct http_decoder_exdata *ex_data) +{ + if(0 == httpd_env->hd_cfg.proxy_enable){ + return 0; + } + return (ex_data && ex_data->tunnel_state >= HTTP_TUN_INNER_STARTING); +} + +enum http_tunnel_message_type httpd_tunnel_state_to_msg(const struct http_decoder_exdata *ex_data) +{ + if(ex_data->tunnel_state == HTTP_TUN_INNER_STARTING){ + return HTTP_TUNNEL_OPENING; + } + if(ex_data->tunnel_state == HTTP_TUN_INNER_TRANS){ + return HTTP_TUNNEL_ACTIVE; + } + return HTTP_TUNNEL_MSG_MAX; +} + +void httpd_tunnel_state_update(struct http_decoder_exdata *ex_data) +{ + if(ex_data->tunnel_state == HTTP_TUN_INNER_STARTING){ + ex_data->tunnel_state = HTTP_TUN_INNER_TRANS; + } +} + +void http_decoder_push_tunnel_data(struct session *sess, const struct http_decoder_exdata *exdata, enum http_tunnel_message_type type, const char *payload, uint16_t payload_len) +{ + struct http_tunnel_message *tmsg = (struct http_tunnel_message *)CALLOC(struct http_tunnel_message, 1); + tmsg->type = type; + tmsg->tunnel_payload.iov_base = (char *)payload; + tmsg->tunnel_payload.iov_len = payload_len; + session_mq_publish_message(sess, exdata->pub_topic_id, tmsg); +} + +#ifdef __cplusplus +extern "C" +{ +#endif +void http_tunnel_message_get_payload(const struct http_tunnel_message *tmsg, + hstring *tunnel_payload) +{ + if (unlikely(NULL == tmsg || tunnel_payload == NULL)) + { + return; + } + tunnel_payload->iov_base = tmsg->tunnel_payload.iov_base; + tunnel_payload->iov_len = tmsg->tunnel_payload.iov_len; +} + +enum http_tunnel_message_type http_tunnel_message_type_get(const struct http_tunnel_message *tmsg) +{ + return tmsg->type; +} + +#ifdef __cplusplus +} +#endif diff --git a/decoders/http/http_decoder_tunnel.h b/decoders/http/http_decoder_tunnel.h new file mode 100644 index 0000000..b90e402 --- /dev/null +++ b/decoders/http/http_decoder_tunnel.h @@ -0,0 +1,35 @@ +#pragma once +#include "http_decoder_inc.h" +#include "http_decoder_half.h" + +enum http_tunnel_state{ + HTTP_TUN_NON = 0, // init, or not tunnel session + HTTP_TUN_C2S_HDR_START, //CONNECT ... + HTTP_TUN_C2S_END, //CONNECT request end + HTTP_TUN_S2C_START, // HTTP 200 connet established + HTTP_TUN_INNER_STARTING, // http inner tunnel protocol starting + HTTP_TUN_INNER_TRANS, // http inner tunnel protocol transmitting +}; + +/************************************************************ +* HTTP TUNNEL WITH CONNECT METHOD. +*************************************************************/ +struct http_tunnel_message; +#define HTTP_DECODER_TUNNEL_TOPIC "HTTP_DECODER_TUNNEL_MESSAGE" + +enum http_tunnel_message_type { + HTTP_TUNNEL_OPENING, + HTTP_TUNNEL_ACTIVE, + HTTP_TUNNEL_CLOSING, + HTTP_TUNNEL_MSG_MAX +}; +enum http_tunnel_message_type http_tunnel_message_type_get(const struct http_tunnel_message *tmsg); +void http_tunnel_message_get_payload(const struct http_tunnel_message *tmsg, struct iovec *tunnel_payload); + + +int httpd_tunnel_identify(struct http_decoder_env *httpd_env, int curdir, struct http_decoder_half_data *hfdata); +int httpd_is_tunnel_session(const struct http_decoder_env *httpd_env, const struct http_decoder_exdata *ex_data); +int httpd_in_tunnel_transmitting(const struct http_decoder_env *httpd_env, struct http_decoder_exdata *ex_data); +void httpd_tunnel_state_update(struct http_decoder_exdata *ex_data); +void http_decoder_push_tunnel_data(struct session *sess, const struct http_decoder_exdata *exdata, enum http_tunnel_message_type type, const char *payload, uint16_t payload_len); +enum http_tunnel_message_type httpd_tunnel_state_to_msg(const struct http_decoder_exdata *ex_data);
\ No newline at end of file diff --git a/decoders/http/http_decoder_utils.cpp b/decoders/http/http_decoder_utils.cpp new file mode 100644 index 0000000..93fef81 --- /dev/null +++ b/decoders/http/http_decoder_utils.cpp @@ -0,0 +1,309 @@ +#include <string.h> +#include <assert.h> +#include "stellar/http.h" +#include "http_decoder_inc.h" + +char *safe_dup(const char *str, size_t len) +{ + if (str == NULL || len == 0) + { + return NULL; + } + char *dup = CALLOC(char, len + 1); + memcpy(dup, str, len); + return dup; +} + +int strncasecmp_safe(const char *fix_s1, const char *dyn_s2, size_t fix_n1, size_t dyn_n2) +{ + if (fix_s1 == NULL || dyn_s2 == NULL) + { + return -1; + } + if (fix_n1 != dyn_n2) + { + return -1; + } + return strncasecmp(fix_s1, dyn_s2, fix_n1); +} + +const char *http_message_type_to_string(enum http_message_type type) +{ + const char *sname = "unknown_msg_type"; + + switch (type) + { + case HTTP_MESSAGE_REQ_LINE: + sname = "HTTP_MESSAGE_REQ_LINE"; + break; + case HTTP_MESSAGE_REQ_HEADER: + sname = "HTTP_MESSAGE_REQ_HEADER"; + break; + case HTTP_MESSAGE_REQ_HEADER_END: + sname = "HTTP_MESSAGE_REQ_HEADER_END"; + break; + case HTTP_MESSAGE_REQ_BODY_START: + sname = "HTTP_MESSAGE_REQ_BODY_START"; + break; + case HTTP_MESSAGE_REQ_BODY: + sname = "HTTP_MESSAGE_REQ_BODY"; + break; + case HTTP_MESSAGE_REQ_BODY_END: + sname = "HTTP_MESSAGE_REQ_BODY_END"; + break; + case HTTP_MESSAGE_RES_LINE: + sname = "HTTP_MESSAGE_RES_LINE"; + break; + case HTTP_MESSAGE_RES_HEADER: + sname = "HTTP_MESSAGE_RES_HEADER"; + break; + case HTTP_MESSAGE_RES_HEADER_END: + sname = "HTTP_MESSAGE_RES_HEADER_END"; + break; + case HTTP_MESSAGE_RES_BODY_START: + sname = "HTTP_MESSAGE_RES_BODY_START"; + break; + case HTTP_MESSAGE_RES_BODY: + sname = "HTTP_MESSAGE_RES_BODY"; + break; + case HTTP_MESSAGE_RES_BODY_END: + sname = "HTTP_MESSAGE_RES_BODY_END"; + break; + case HTTP_TRANSACTION_START: + sname = "HTTP_TRANSACTION_START"; + break; + case HTTP_TRANSACTION_END: + sname = "HTTP_TRANSACTION_END"; + break; + + default: + break; + } + + return sname; +} + +int http_message_type_is_req(struct session *sess, enum http_message_type msg_type) +{ + int is_req_msg = 0; + + switch (msg_type) + { + case HTTP_MESSAGE_REQ_LINE: + case HTTP_MESSAGE_REQ_HEADER: + case HTTP_MESSAGE_REQ_HEADER_END: + case HTTP_MESSAGE_REQ_BODY_START: + case HTTP_MESSAGE_REQ_BODY: + case HTTP_MESSAGE_REQ_BODY_END: + is_req_msg = 1; + break; + + case HTTP_MESSAGE_RES_LINE: + case HTTP_MESSAGE_RES_HEADER: + case HTTP_MESSAGE_RES_HEADER_END: + case HTTP_MESSAGE_RES_BODY_START: + case HTTP_MESSAGE_RES_BODY: + case HTTP_MESSAGE_RES_BODY_END: + is_req_msg = 0; + break; + + case HTTP_TRANSACTION_START: + case HTTP_TRANSACTION_END: + { + enum flow_direction cur_dir = session_get_current_flow_direction(sess); + if (FLOW_DIRECTION_C2S == cur_dir) + { + is_req_msg = 1; + } + else + { + is_req_msg = 0; + } + } + break; + + default: + assert(0); + fprintf(stderr, "unknow message type:%d\n", (int)msg_type); + break; + } + return is_req_msg; +} + +int http_event_is_req(enum http_event event) +{ + switch (event) + { + case HTTP_EVENT_REQ_INIT: + case HTTP_EVENT_REQ_LINE: + case HTTP_EVENT_REQ_HDR: + case HTTP_EVENT_REQ_HDR_END: + case HTTP_EVENT_REQ_BODY_BEGIN: + case HTTP_EVENT_REQ_BODY_DATA: + case HTTP_EVENT_REQ_BODY_END: + case HTTP_EVENT_REQ_END: + return 1; + break; + + case HTTP_EVENT_RES_INIT: + case HTTP_EVENT_RES_LINE: + case HTTP_EVENT_RES_HDR: + case HTTP_EVENT_RES_HDR_END: + case HTTP_EVENT_RES_BODY_BEGIN: + case HTTP_EVENT_RES_BODY_DATA: + case HTTP_EVENT_RES_BODY_END: + case HTTP_EVENT_RES_END: + return 0; + break; + + default: + assert(0); + fprintf(stderr, "unknow event type:%d\n", (int)event); + break; + } + return -1; +} + +int stellar_session_mq_get_topic_id_reliable(struct stellar *st, const char *topic_name, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg) +{ + int topic_id = stellar_mq_get_topic_id(st, topic_name); + if (topic_id < 0) + { + topic_id = stellar_mq_create_topic(st, topic_name, msg_free_cb, msg_free_arg); + } + return topic_id; +} + +static const unsigned char __g_httpd_hextable[] = { + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 0, 0, 0, /* 0x30 - 0x3f */ + 0, 10, 11, 12, 13, 14, 15, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* 0x40 - 0x4f */ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* 0x50 - 0x5f */ + 0, 10, 11, 12, 13, 14, 15 /* 0x60 - 0x66 */ +}; + +/* the input is a single hex digit */ +#define onehex2dec(x) __g_httpd_hextable[x - '0'] + +#include <ctype.h> +// https://github.com/curl/curl/blob/2e930c333658725657b94a923d175c6622e0f41d/lib/urlapi.c +void httpd_url_decode(const char *string, size_t length, char *ostring, size_t *olen) +{ + size_t alloc = length; + char *ns = ostring; + + while (alloc) + { + unsigned char in = (unsigned char)*string; + if (('%' == in) && (alloc > 2) && + isxdigit(string[1]) && isxdigit(string[2])) + { + /* this is two hexadecimal digits following a '%' */ + in = (unsigned char)(onehex2dec(string[1]) << 4) | onehex2dec(string[2]); + string += 3; + alloc -= 3; + } + else + { + string++; + alloc--; + } + *ns++ = (char)in; + } + *ns = 0; /* terminate it */ + + if (olen) + /* store output size */ + *olen = ns - ostring; + + return; +} + +int httpd_url_is_encoded(const char *url, size_t len) +{ + for (size_t i = 0; i < len; i++) + { + if (url[i] == '%') + { + return 1; + } + } + return 0; +} + +static void httpd_set_tcp_addr(const struct tcphdr *tcph, struct httpd_session_addr *addr, enum flow_direction fdir) +{ + if (FLOW_DIRECTION_C2S == fdir) + { + addr->sport = tcph->th_sport; + addr->dport = tcph->th_dport; + } + else + { + addr->sport = tcph->th_dport; + addr->dport = tcph->th_sport; + } +} +static void httpd_set_ipv4_addr(const struct ip *ip4h, struct httpd_session_addr *addr, enum flow_direction fdir) +{ + addr->ipver = 4; + if (FLOW_DIRECTION_C2S == fdir) + { + addr->saddr4 = ip4h->ip_src.s_addr; + addr->daddr4 = ip4h->ip_dst.s_addr; + } + else + { + addr->saddr4 = ip4h->ip_dst.s_addr; + addr->daddr4 = ip4h->ip_src.s_addr; + } +} +static void httpd_set_ipv6_addr(const struct ip6_hdr *ip6h, struct httpd_session_addr *addr, enum flow_direction fdir) +{ + addr->ipver = 6; + if (FLOW_DIRECTION_C2S == fdir) + { + memcpy(&addr->saddr6, &ip6h->ip6_src, sizeof(struct in6_addr)); + memcpy(&addr->daddr6, &ip6h->ip6_dst, sizeof(struct in6_addr)); + } + else + { + memcpy(&addr->saddr6, &ip6h->ip6_dst, sizeof(struct in6_addr)); + memcpy(&addr->daddr6, &ip6h->ip6_src, sizeof(struct in6_addr)); + } +} + +void httpd_session_get_addr(const struct session *sess, struct httpd_session_addr *addr) +{ + if (sess == NULL || addr == NULL) + { + return; + } + enum flow_direction fdir = session_get_current_flow_direction(sess); + const struct packet *raw_pkt = session_get_first_packet(sess, fdir); + if (NULL == raw_pkt) + { + addr->ipver = 0; + return; + } + + struct layer pkt_layer = {}; + PACKET_FOREACH_LAYER_REVERSE(raw_pkt, pkt_layer) + { + if (pkt_layer.proto == LAYER_PROTO_TCP) + { + httpd_set_tcp_addr(pkt_layer.hdr.tcp, addr, fdir); + } + else if (pkt_layer.proto == LAYER_PROTO_IPV4) + { + httpd_set_ipv4_addr(pkt_layer.hdr.ip4, addr, fdir); + break; + } + else if (pkt_layer.proto == LAYER_PROTO_IPV6) + { + httpd_set_ipv6_addr(pkt_layer.hdr.ip6, addr, fdir); + break; + } + } + + return; +}
\ No newline at end of file diff --git a/decoders/http/http_decoder_utils.h b/decoders/http/http_decoder_utils.h new file mode 100644 index 0000000..33483cf --- /dev/null +++ b/decoders/http/http_decoder_utils.h @@ -0,0 +1,80 @@ +#pragma once + +#include <stdlib.h> +#include <stdio.h> +#include <cstddef> +#ifdef __cplusplus +extern "C" +{ +#endif +#include <bits/types/struct_iovec.h> +#include "stellar/stellar.h" +#include "stellar/utils.h" +#include "stellar/session.h" +#include "stellar/stellar_mq.h" +#include "stellar/stellar_exdata.h" +#ifdef __cplusplus +} +#endif + +char *safe_dup(const char *str, size_t len); +int strncasecmp_safe(const char *fix_s1, const char *dyn_s2, size_t fix_n1, size_t dyn_n2); +const char *http_message_type_to_string(enum http_message_type type); +int http_message_type_is_req(struct session *sess, enum http_message_type msg_type); +int http_event_is_req(enum http_event event); +int stellar_session_mq_get_topic_id_reliable(struct stellar *st, const char *topic_name, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg); +void httpd_url_decode(const char *string, size_t length, char *ostring, size_t *olen); +int httpd_url_is_encoded(const char *url, size_t len); +/****************************************************************************** + * Logger + ******************************************************************************/ + +enum http_decoder_log_level { + DEBUG = 0x11, + WARN = 0x12, + INFO = 0x13, + ERROR = 0x14, +}; + +#ifndef http_decoder_log +#define http_decoder_log(level, format, ...) \ + { \ + switch (level) \ + { \ + case DEBUG: \ + fprintf(stdout, "HTTP_DECODER [DEBUG] " format "\n", ##__VA_ARGS__); \ + fflush(stdout); \ + break; \ + case WARN: \ + fprintf(stdout, "HTTP_DECODER [WARN] " format "\n", ##__VA_ARGS__); \ + fflush(stdout); \ + break; \ + case INFO: \ + fprintf(stdout, "HTTP_DECODER [INFO] " format "\n", ##__VA_ARGS__); \ + fflush(stdout); \ + break; \ + case ERROR: \ + fprintf(stderr, "HTTP_DECODER [ERROR] " format "\n", ##__VA_ARGS__); \ + fflush(stderr); \ + break; \ + } \ + } +#endif + + +#include <netinet/in.h> + +struct httpd_session_addr +{ + uint8_t ipver; /* 4 or 6 */ + uint16_t sport; /* network order */ + uint16_t dport; /* network order */ + union + { + uint32_t saddr4; + uint32_t daddr4; + struct in6_addr saddr6; + struct in6_addr daddr6; + }; +}; +void httpd_session_get_addr(const struct session *sess, struct httpd_session_addr *addr); diff --git a/decoders/http/version.map b/decoders/http/version.map new file mode 100644 index 0000000..15d1d95 --- /dev/null +++ b/decoders/http/version.map @@ -0,0 +1,11 @@ +VERS_3.0{ +global: + extern "C" { + http_message_*; + http_decoder_init; + http_decoder_exit; + http_decoder_tcp_stream_msg_cb; + http_tunnel_message_*; + }; +local: *; +};
\ No newline at end of file |
