summaryrefslogtreecommitdiff
path: root/decoders
diff options
context:
space:
mode:
authorlijia <[email protected]>2024-10-27 18:08:00 +0800
committerlijia <[email protected]>2024-11-08 11:23:16 +0800
commit627cfac992c52e3c7950355c0d447764056a5276 (patch)
treeafb5f8f462d964a764dbb071f5cfadad199cbe4d /decoders
parentd0a868591470a4a9d71a65a5d540058e72c8d92c (diff)
httpv2.0 rebase onto develop-2.0dev-http-v2.0
Diffstat (limited to 'decoders')
-rw-r--r--decoders/CMakeLists.txt2
-rw-r--r--decoders/http/CMakeLists.txt20
-rw-r--r--decoders/http/http_content_decompress.c268
-rw-r--r--decoders/http/http_decoder.c1239
-rw-r--r--decoders/http/http_decoder.h104
-rw-r--r--decoders/http/http_decoder_decompress.c274
-rw-r--r--decoders/http/http_decoder_decompress.h (renamed from decoders/http/http_content_decompress.h)9
-rw-r--r--decoders/http/http_decoder_half.c1548
-rw-r--r--decoders/http/http_decoder_half.h263
-rw-r--r--decoders/http/http_decoder_llhttp_wrap.c318
-rw-r--r--decoders/http/http_decoder_module.c299
-rw-r--r--decoders/http/http_decoder_private.h155
-rw-r--r--decoders/http/http_decoder_result_queue.c152
-rw-r--r--decoders/http/http_decoder_result_queue.h51
-rw-r--r--decoders/http/http_decoder_stat.c182
-rw-r--r--decoders/http/http_decoder_stat.h96
-rw-r--r--decoders/http/http_decoder_string.c289
-rw-r--r--decoders/http/http_decoder_string.h73
-rw-r--r--decoders/http/http_decoder_table.c579
-rw-r--r--decoders/http/http_decoder_table.h79
-rw-r--r--decoders/http/http_decoder_tunnel.c116
-rw-r--r--decoders/http/http_decoder_tunnel.h36
-rw-r--r--decoders/http/http_decoder_utils.c536
-rw-r--r--decoders/http/http_decoder_utils.h92
-rw-r--r--decoders/http/version.map9
25 files changed, 2295 insertions, 4494 deletions
diff --git a/decoders/CMakeLists.txt b/decoders/CMakeLists.txt
index 7946822..eaba19e 100644
--- a/decoders/CMakeLists.txt
+++ b/decoders/CMakeLists.txt
@@ -1,5 +1,5 @@
add_subdirectory(lpi_plus)
-#add_subdirectory(http)
+add_subdirectory(http)
#add_subdirectory(socks)
#add_subdirectory(stratum)
#add_subdirectory(session_flags) \ No newline at end of file
diff --git a/decoders/http/CMakeLists.txt b/decoders/http/CMakeLists.txt
index c242afe..3a2b637 100644
--- a/decoders/http/CMakeLists.txt
+++ b/decoders/http/CMakeLists.txt
@@ -1,15 +1,15 @@
+add_definitions(-fPIC)
include_directories(${CMAKE_SOURCE_DIR}/deps)
-set(HTTP_SRC http_decoder.c http_decoder_utils.c http_decoder_half.c
- http_decoder_table.c http_decoder_string.c http_content_decompress.c
- http_decoder_result_queue.c http_decoder_stat.c http_decoder_tunnel.c)
+set(HTTP_SRC http_decoder_module.c http_decoder.c
+ http_decoder_llhttp_wrap.c
+ http_decoder_utils.c
+ http_decoder_decompress.c
+ http_decoder_half.c
+ http_decoder_stat.c )
-add_library(http STATIC ${HTTP_SRC})
-add_library(http_dyn SHARED ${HTTP_SRC})
+add_library(http ${HTTP_SRC})
set_target_properties(http PROPERTIES LINK_FLAGS "-Wl,--version-script=${CMAKE_CURRENT_SOURCE_DIR}/version.map")
target_include_directories(http PUBLIC ${CMAKE_SOURCE_DIR}/deps/)
-target_link_libraries(http z llhttp-static fieldstat4 brotli-dec-static brotli-common-static nmx_pool toml)
-set_target_properties(http PROPERTIES PREFIX "")
-
-set_target_properties(http_dyn PROPERTIES PREFIX "")
-target_link_libraries(http_dyn z llhttp-static fieldstat4 brotli-dec-static brotli-common-static nmx_pool toml) \ No newline at end of file
+target_link_libraries(http z llhttp-static fieldstat4 brotli-dec-static brotli-common-static toml)
+set_target_properties(http PROPERTIES PREFIX "") \ No newline at end of file
diff --git a/decoders/http/http_content_decompress.c b/decoders/http/http_content_decompress.c
deleted file mode 100644
index 922c624..0000000
--- a/decoders/http/http_content_decompress.c
+++ /dev/null
@@ -1,268 +0,0 @@
-#include <zlib.h>
-#include <string.h>
-#include <assert.h>
-#include <brotli/decode.h>
-#include "http_decoder_private.h"
-
-#define HTTP_DECOMPRESS_BUFFER_SIZE (4096)
-
-struct http_content_decompress
-{
- enum http_content_encoding encoding;
- z_stream *z_stream_ptr;
- BrotliDecoderState *br_state;
- char *buffer;
- size_t buffer_size;
-};
-
-void http_content_decompress_ownership_borrow(struct http_content_decompress *decompress)
-{
- decompress->buffer = NULL; // ownership move to data->decompress_buffer_list, will be freed when message has been processed by all plugins
-}
-
-enum http_content_encoding http_content_encoding_str2int(const char *content_encoding, size_t encoding_str_len)
-{
- if (http_strncasecmp_safe("gzip", content_encoding, 4, encoding_str_len) == 0)
- {
- return HTTP_CONTENT_ENCODING_GZIP;
- }
- if (http_strncasecmp_safe("deflate", content_encoding, 7, encoding_str_len) == 0)
- {
- return HTTP_CONTENT_ENCODING_DEFLATE;
- }
- if (http_strncasecmp_safe("br", content_encoding, 2, encoding_str_len) == 0)
- {
- return HTTP_CONTENT_ENCODING_BR;
- }
- return HTTP_CONTENT_ENCODING_NONE;
-}
-
-const char *http_content_encoding_int2str(enum http_content_encoding content_encoding)
-{
- if (content_encoding == HTTP_CONTENT_ENCODING_GZIP)
- {
- return "gzip";
- }
- if (content_encoding == HTTP_CONTENT_ENCODING_DEFLATE)
- {
- return "deflate";
- }
- if (content_encoding == HTTP_CONTENT_ENCODING_BR)
- {
- return "br";
- }
- return "unknown";
-}
-
-struct http_content_decompress *http_content_decompress_create(enum http_content_encoding encoding)
-{
- struct http_content_decompress *decompress =
- CALLOC(struct http_content_decompress, 1);
- assert(decompress);
-
- decompress->encoding = encoding;
- decompress->z_stream_ptr = NULL;
- decompress->br_state = NULL;
-
- if (encoding == HTTP_CONTENT_ENCODING_GZIP || encoding == HTTP_CONTENT_ENCODING_DEFLATE)
- {
- decompress->z_stream_ptr = CALLOC(z_stream, 1);
- assert(decompress->z_stream_ptr);
-
- decompress->z_stream_ptr->zalloc = NULL;
- decompress->z_stream_ptr->zfree = NULL;
- decompress->z_stream_ptr->opaque = NULL;
- decompress->z_stream_ptr->avail_in = 0;
- decompress->z_stream_ptr->next_in = Z_NULL;
-
- if (encoding == HTTP_CONTENT_ENCODING_GZIP)
- {
- if (inflateInit2(decompress->z_stream_ptr, MAX_WBITS + 16) != Z_OK)
- {
- goto error;
- }
- }
- if (encoding == HTTP_CONTENT_ENCODING_DEFLATE)
- {
- if (inflateInit2(decompress->z_stream_ptr, -MAX_WBITS) != Z_OK)
- {
- goto error;
- }
- }
- }
-
- if (encoding == HTTP_CONTENT_ENCODING_BR)
- {
- decompress->br_state = BrotliDecoderCreateInstance(NULL, NULL, NULL);
- if (decompress->br_state == NULL)
- {
- goto error;
- }
- }
- return decompress;
-
-error:
- http_content_decompress_destroy(decompress);
- return NULL;
-}
-
-void http_content_decompress_destroy(struct http_content_decompress *decompress)
-{
- if (NULL == decompress)
- {
- return;
- }
- if (decompress->z_stream_ptr != NULL)
- {
- inflateEnd(decompress->z_stream_ptr);
- FREE(decompress->z_stream_ptr);
- }
- if (decompress->br_state)
- {
- BrotliDecoderDestroyInstance(decompress->br_state);
- decompress->br_state = NULL;
- }
- FREE(decompress->buffer);
- FREE(decompress);
-}
-
-static int http_content_decompress_write_zlib(struct http_content_decompress *decompress,
- const char *indata, size_t indata_len,
- char **outdata, size_t *outdata_len)
-{
- z_stream *z_stream_ptr = decompress->z_stream_ptr;
- z_stream_ptr->avail_in = (unsigned int)indata_len;
- z_stream_ptr->next_in = (unsigned char *)indata;
- z_stream_ptr->avail_out = (unsigned int)HTTP_DECOMPRESS_BUFFER_SIZE;
- z_stream_ptr->next_out = (unsigned char *)decompress->buffer;
- *outdata = NULL;
- *outdata_len = 0;
- size_t total_have = 0;
- int no_buffer;
-
- do
- {
- int ret = inflate(z_stream_ptr, Z_NO_FLUSH);
- if (ret == Z_STREAM_ERROR || ret == Z_NEED_DICT || ret == Z_DATA_ERROR || ret == Z_MEM_ERROR)
- {
- (void)inflateEnd(z_stream_ptr);
- return -1;
- }
- size_t have = HTTP_DECOMPRESS_BUFFER_SIZE - z_stream_ptr->avail_out;
- if (have > 0)
- {
- total_have += have;
- if (0 == z_stream_ptr->avail_out)
- {
- decompress->buffer_size += HTTP_DECOMPRESS_BUFFER_SIZE;
- decompress->buffer = REALLOC(char, decompress->buffer, decompress->buffer_size);
- z_stream_ptr->avail_out = HTTP_DECOMPRESS_BUFFER_SIZE;
- z_stream_ptr->next_out = (unsigned char *)decompress->buffer + total_have;
- *outdata = decompress->buffer;
- *outdata_len = total_have;
- no_buffer = 1;
- }
- else
- {
- *outdata = decompress->buffer;
- *outdata_len = total_have;
- no_buffer = 0;
- }
- }
- else
- {
- break;
- }
- if (Z_STREAM_END == ret)
- {
- break;
- }
- } while (no_buffer == 1);
- return 0;
-}
-
-static int http_content_decompress_write_br(struct http_content_decompress *decompress,
- const char *indata, size_t indata_len,
- char **outdata, size_t *outdata_len)
-{
- size_t available_in = indata_len;
- const unsigned char *next_in = (const unsigned char *)indata;
- size_t available_out = HTTP_DECOMPRESS_BUFFER_SIZE;
- unsigned char *next_out = (unsigned char *)decompress->buffer;
-
- *outdata = NULL;
- *outdata_len = 0;
- size_t total_have = 0;
- int no_buffer;
-
- do
- {
- int ret = BrotliDecoderDecompressStream(decompress->br_state, &available_in,
- &next_in, &available_out, &next_out, NULL);
- if (ret == BROTLI_DECODER_RESULT_ERROR)
- {
- // BrotliDecoderErrorCode errcode = BrotliDecoderGetErrorCode(decompress->br_state);
- *outdata = NULL;
- *outdata_len = 0;
- return -1;
- }
- size_t have = HTTP_DECOMPRESS_BUFFER_SIZE - available_out;
- if (have > 0)
- {
- total_have += have;
- if (0 == available_out)
- {
- decompress->buffer_size += HTTP_DECOMPRESS_BUFFER_SIZE;
- decompress->buffer = REALLOC(char, decompress->buffer, decompress->buffer_size);
- available_out = HTTP_DECOMPRESS_BUFFER_SIZE;
- next_out = (unsigned char *)decompress->buffer + total_have;
- *outdata = decompress->buffer;
- *outdata_len = total_have;
- no_buffer = 1;
- }
- else
- {
- *outdata = decompress->buffer;
- *outdata_len = have;
- no_buffer = 0;
- }
- }
- else
- {
- break;
- }
- } while (no_buffer == 1);
- return 0;
-}
-
-int http_content_decompress_write(struct http_content_decompress *decompress,
- const char *indata, size_t indata_len,
- char **outdata, size_t *outdata_len)
-{
- assert(decompress);
- assert(indata);
- assert(indata_len > 0);
- assert(outdata);
- assert(outdata_len);
- *outdata = NULL;
- *outdata_len = 0;
-
- if (NULL == decompress->buffer)
- {
- decompress->buffer = CALLOC(char, HTTP_DECOMPRESS_BUFFER_SIZE);
- assert(decompress->buffer);
- decompress->buffer_size = HTTP_DECOMPRESS_BUFFER_SIZE;
- }
-
- if (decompress->encoding == HTTP_CONTENT_ENCODING_GZIP ||
- decompress->encoding == HTTP_CONTENT_ENCODING_DEFLATE)
- {
- return http_content_decompress_write_zlib(decompress, indata, indata_len, outdata, outdata_len);
- }
- if (decompress->encoding == HTTP_CONTENT_ENCODING_BR)
- {
- return http_content_decompress_write_br(decompress, indata, indata_len, outdata, outdata_len);
- }
- assert(0);
- return -1;
-} \ No newline at end of file
diff --git a/decoders/http/http_decoder.c b/decoders/http/http_decoder.c
index 10f8369..39406a3 100644
--- a/decoders/http/http_decoder.c
+++ b/decoders/http/http_decoder.c
@@ -1,1205 +1,184 @@
-#include "stellar/http.h"
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
-#include "http_decoder_private.h"
-
-#pragma GCC diagnostic ignored "-Wunused-parameter"
-
-struct http_message *http_message_new(enum http_message_type type, struct http_decoder_result_queue *queue,
- int queue_index, uint8_t flow_type)
-{
- struct http_message *msg = CALLOC(struct http_message, 1);
- msg->type = type;
- msg->ref_queue = queue;
- msg->queue_index = queue_index;
- msg->flow_type = flow_type;
- return msg;
-}
-
-struct http_message *http_body_message_new(enum http_message_type type, struct http_decoder_result_queue *queue,
- int queue_index, uint8_t flow_type, hstring *raw_payload, hstring *decompress_payload)
-{
- struct http_message *msg = CALLOC(struct http_message, 1);
- msg->type = type;
- msg->ref_queue = queue;
- msg->queue_index = queue_index;
- msg->flow_type = flow_type;
- if (raw_payload)
- {
- msg->raw_payload.iov_base = raw_payload->iov_base;
- msg->raw_payload.iov_len = raw_payload->iov_len;
- }
- if (decompress_payload)
- {
- msg->decompress_payload.iov_base = decompress_payload->iov_base;
- msg->decompress_payload.iov_len = decompress_payload->iov_len;
- }
- return msg;
-}
-
-static void http_message_decompress_buffer_free(struct http_message *msg)
-{
- struct http_decoder_half_data *ref_data = NULL;
- if (HTTP_MESSAGE_REQ_BODY_START == msg->type || HTTP_MESSAGE_REQ_BODY == msg->type || HTTP_MESSAGE_REQ_BODY_END == msg->type)
- {
- ref_data = msg->ref_queue->array[msg->queue_index].req_data;
- }
- else if (HTTP_MESSAGE_RES_BODY_START == msg->type || HTTP_MESSAGE_RES_BODY == msg->type || HTTP_MESSAGE_RES_BODY_END == msg->type)
- {
- ref_data = msg->ref_queue->array[msg->queue_index].res_data;
- }
- if (ref_data != NULL && msg->decompress_payload.iov_base != NULL)
- {
- http_half_decompress_buffer_free(ref_data, &msg->decompress_payload);
- }
-}
-
-static void http_message_free(void *http_msg, void *cb_arg)
-{
- if (http_msg)
- {
- http_message_decompress_buffer_free((struct http_message *)http_msg);
- FREE(http_msg);
- }
-}
-
-static void http_event_handler(enum http_event event, struct http_decoder_half_data **data,
- struct http_event_context *ev_ctx, void *httpd_plugin_env)
-{
- assert(ev_ctx);
- assert(httpd_plugin_env);
- struct http_decoder_env *httpd_env = (struct http_decoder_env *)httpd_plugin_env;
- size_t queue_idx = 0;
- nmx_pool_t *mempool = ev_ctx->ref_mempool;
- struct http_decoder_result_queue *queue = ev_ctx->ref_queue;
- struct http_message *msg = NULL;
- struct http_decoder_half_data *half_data = NULL;
- int ret = 0;
- u_int8_t flow_flag = 0;
- struct http_decoder_exdata *exdata = ev_ctx->ref_httpd_ctx;
-
- int thread_id = stellar_get_current_thread_index();
-
- if (http_event_is_req(event))
- {
- queue_idx = http_decoder_result_queue_req_index(queue);
- half_data = http_decoder_result_queue_peek_req(queue);
- }
- else
- {
- queue_idx = http_decoder_result_queue_res_index(queue);
- half_data = http_decoder_result_queue_peek_res(queue);
- }
-
- switch (event)
- {
- case HTTP_EVENT_REQ_INIT:
- half_data = http_decoder_result_queue_peek_req(queue);
- if (half_data != NULL)
- {
- http_decoder_result_queue_inc_req_index(queue);
- }
-
- half_data = http_decoder_result_queue_peek_req(queue);
- if (half_data != NULL)
- {
- half_data = http_decoder_result_queue_pop_req(queue);
- http_decoder_half_data_free(mempool, half_data);
- half_data = NULL;
- }
-
- half_data = http_decoder_half_data_new(mempool);
- ret = http_decoder_result_queue_push_req(queue, half_data);
- if (ret < 0)
- {
- fprintf(stderr, "http_decoder_result_queue_push req failed.");
- http_decoder_half_data_free(mempool, half_data);
- half_data = NULL;
- }
- *data = half_data;
- queue_idx = http_decoder_result_queue_req_index(queue); // get the index after inc
- /* llhttp always call on_message_begin() even if llhttp_execute() error!!! */
- msg = http_message_new(HTTP_TRANSACTION_START, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_TRANSACTION_NEW, 1);
- break;
- case HTTP_EVENT_REQ_LINE:
- msg = http_message_new(HTTP_MESSAGE_REQ_LINE, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
- if (httpd_tunnel_identify(httpd_env, FLOW_TYPE_C2S, half_data))
- {
- exdata->tunnel_state = HTTP_TUN_C2S_HDR_START;
- // http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TUNNEL, 1);
- }
- if (httpd_is_tunnel_session(httpd_env, exdata))
- {
- http_decoder_get_url(half_data, mempool);
- }
- break;
- case HTTP_EVENT_REQ_HDR:
- msg = http_message_new(HTTP_MESSAGE_REQ_HEADER, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
- break;
- case HTTP_EVENT_REQ_HDR_END:
- {
- http_decoder_join_url_finally(ev_ctx, half_data, mempool);
- /* maybe some parsed headers in buffer, but has not pushed to plugins yet */
-
- if (http_decoder_half_data_has_parsed_header(half_data))
- {
- msg = http_message_new(HTTP_MESSAGE_REQ_HEADER, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
- }
- http_half_data_update_commit_index(half_data);
- msg = http_message_new(HTTP_MESSAGE_REQ_HEADER_END, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
-
- int tot_c2s_headers = http_half_data_get_total_parsed_header_count(half_data);
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_C2S_HEADERS, tot_c2s_headers);
-
- const char *tmp_url = NULL;
- size_t tmp_url_len = 0;
- http_half_data_get_url(half_data, &tmp_url, &tmp_url_len);
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_URL_BYTES, tmp_url_len);
- }
- break;
- case HTTP_EVENT_REQ_BODY_BEGIN:
- msg = http_message_new(HTTP_MESSAGE_REQ_BODY_START, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
- break;
- case HTTP_EVENT_REQ_BODY_DATA:
- {
- hstring raw_body = {};
- hstring decompress_body = {};
- http_decoder_half_data_get_raw_body(half_data, (const char **)&raw_body.iov_base, &raw_body.iov_len);
- http_half_get_lastest_decompress_buffer(half_data, &decompress_body);
- msg = http_body_message_new(HTTP_MESSAGE_REQ_BODY, queue, queue_idx, HTTP_REQUEST, &raw_body, &decompress_body);
- session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
- if (decompress_body.iov_base != NULL)
- {
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_C2S_ZIP_BYTES, raw_body.iov_len);
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_C2S_UNZIP_BYTES, decompress_body.iov_len);
- }
- }
- break;
- case HTTP_EVENT_REQ_BODY_END:
- msg = http_message_new(HTTP_MESSAGE_REQ_BODY_END, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
- break;
- case HTTP_EVENT_REQ_END:
- {
- session_is_symmetric(ev_ctx->ref_session, &flow_flag);
-
- if (SESSION_SEEN_C2S_FLOW == flow_flag)
- {
- msg = http_message_new(HTTP_TRANSACTION_END, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_TRANSACTION_FREE, 1);
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_C2S_ASYMMETRY_TRANSACTION, 1);
- }
- if (httpd_is_tunnel_session(httpd_env, exdata))
- {
- if (SESSION_SEEN_C2S_FLOW == flow_flag)
- {
- exdata->tunnel_state = HTTP_TUN_INNER_STARTING;
- exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].sub_topic_id;
- }
- else
- {
- exdata->tunnel_state = HTTP_TUN_C2S_END;
- }
- }
- http_half_update_state(half_data, event);
- http_decoder_result_queue_inc_req_index(queue);
- half_data = http_decoder_result_queue_pop_req(queue);
- if (half_data != NULL)
- {
- http_decoder_half_data_free(mempool, half_data);
- half_data = NULL;
- }
- }
- break;
-
- case HTTP_EVENT_RES_INIT:
- half_data = http_decoder_result_queue_peek_res(queue);
- if (half_data != NULL)
- {
- http_decoder_result_queue_inc_res_index(queue);
- }
-
- half_data = http_decoder_result_queue_peek_res(queue);
- if (half_data != NULL)
- {
- half_data = http_decoder_result_queue_pop_res(queue);
- http_decoder_half_data_free(mempool, half_data);
- half_data = NULL;
- }
-
- half_data = http_decoder_half_data_new(mempool);
- ret = http_decoder_result_queue_push_res(queue, half_data);
- if (ret < 0)
- {
- fprintf(stderr, "http_decoder_result_queue_push res failed.");
- http_decoder_half_data_free(mempool, half_data);
- half_data = NULL;
- }
- queue_idx = http_decoder_result_queue_res_index(queue); // get the index after inc
- *data = half_data;
- if (0 == session_is_symmetric(ev_ctx->ref_session, &flow_flag))
- {
- if (SESSION_SEEN_S2C_FLOW == flow_flag)
- {
- msg = http_message_new(HTTP_TRANSACTION_START, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
- }
- }
- break;
- case HTTP_EVENT_RES_LINE:
- msg = http_message_new(HTTP_MESSAGE_RES_LINE, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
- if (httpd_tunnel_identify(httpd_env, FLOW_TYPE_S2C, half_data))
- {
- exdata->tunnel_state = HTTP_TUN_S2C_START;
- }
- else
- {
- // connect response fail, reset tunnel_state
- exdata->tunnel_state = HTTP_TUN_NON;
- }
- break;
- case HTTP_EVENT_RES_HDR:
- msg = http_message_new(HTTP_MESSAGE_RES_HEADER, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
- break;
- case HTTP_EVENT_RES_HDR_END:
- {
- /* maybe some header in table buffer but has not pushed to plugins */
- half_data = http_decoder_result_queue_peek_res(queue);
- if (http_decoder_half_data_has_parsed_header(half_data))
- {
- msg = http_message_new(HTTP_MESSAGE_RES_HEADER, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
- }
- http_half_data_update_commit_index(half_data);
- msg = http_message_new(HTTP_MESSAGE_RES_HEADER_END, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
-
- int tot_s2c_headers = http_half_data_get_total_parsed_header_count(half_data);
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_S2C_HEADERS, tot_s2c_headers);
-
- if (httpd_is_tunnel_session(httpd_env, exdata))
- {
- exdata->tunnel_state = HTTP_TUN_INNER_STARTING;
- http_half_pre_context_free(ev_ctx->ref_session, exdata);
- exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].sub_topic_id;
- }
- }
- break;
- case HTTP_EVENT_RES_BODY_BEGIN:
- msg = http_message_new(HTTP_MESSAGE_RES_BODY_START, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
- break;
- case HTTP_EVENT_RES_BODY_DATA:
- {
- hstring raw_body = {};
- http_decoder_half_data_get_raw_body(half_data, (const char **)&raw_body.iov_base, &raw_body.iov_len);
- hstring decompress_body = {};
- http_half_get_lastest_decompress_buffer(half_data, &decompress_body);
- msg = http_body_message_new(HTTP_MESSAGE_RES_BODY, queue, queue_idx, HTTP_RESPONSE, &raw_body, &decompress_body);
- session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
- if (decompress_body.iov_base != NULL)
- {
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_S2C_ZIP_BYTES, raw_body.iov_len);
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_S2C_UNZIP_BYTES, decompress_body.iov_len);
- }
- }
- break;
- case HTTP_EVENT_RES_BODY_END:
- msg = http_message_new(HTTP_MESSAGE_RES_BODY_END, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
- break;
- case HTTP_EVENT_RES_END:
- msg = http_message_new(HTTP_TRANSACTION_END, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_TRANSACTION_FREE, 1);
- session_is_symmetric(ev_ctx->ref_session, &flow_flag);
- if (SESSION_SEEN_S2C_FLOW == flow_flag)
- {
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_S2C_ASYMMETRY_TRANSACTION, 1);
- }
-
- http_half_update_state(half_data, event);
- http_decoder_result_queue_inc_res_index(queue);
- half_data = http_decoder_result_queue_pop_res(queue);
- if (half_data != NULL)
- {
- http_decoder_half_data_free(mempool, half_data);
- half_data = NULL;
- }
- break;
- default:
- assert(0);
- break;
- }
- if (half_data)
- {
- http_half_update_state(half_data, event);
- }
-}
+#include "stellar/utils.h"
+#include "stellar/http.h"
+#include "stellar/session.h"
+#include "http_decoder.h"
+#include "http_decoder_utils.h"
+#include "http_decoder_stat.h"
+#include "http_decoder_half.h"
-static struct http_decoder *http_decoder_new(struct http_decoder_exdata *hd_ctx, nmx_pool_t *mempool,
- http_event_cb *ev_cb, int decompress_switch, struct http_decoder_env *httpd_env,
- long long req_start_seq, long long res_start_seq)
+static struct http_decoder *http_decoder_new(void)
{
- struct http_decoder *decoder = MEMPOOL_CALLOC(mempool, struct http_decoder, 1);
- assert(decoder);
- decoder->c2s_half = http_decoder_half_new(hd_ctx, mempool, ev_cb, HTTP_REQUEST, decompress_switch, httpd_env, req_start_seq);
- decoder->s2c_half = http_decoder_half_new(hd_ctx, mempool, ev_cb, HTTP_RESPONSE, decompress_switch, httpd_env, res_start_seq);
+ struct http_decoder *decoder = (struct http_decoder *)calloc(1, sizeof(struct http_decoder));
return decoder;
}
-static void http_decoder_free(nmx_pool_t *mempool, struct http_decoder *decoder)
+static void http_decoder_free(struct http_decoder *decoder)
{
if (NULL == decoder)
{
return;
}
- if (decoder->c2s_half != NULL)
+ if (decoder->flow_c2s)
{
- http_decoder_half_free(mempool, decoder->c2s_half);
- decoder->c2s_half = NULL;
+ http_half_free(decoder->flow_c2s);
}
- if (decoder->s2c_half != NULL)
+ if (decoder->flow_s2c)
{
- http_decoder_half_free(mempool, decoder->s2c_half);
- decoder->s2c_half = NULL;
+ http_half_free(decoder->flow_s2c);
}
- MEMPOOL_FREE(mempool, decoder);
-}
-
-static struct http_decoder_exdata *http_decoder_exdata_new(size_t mempool_size, size_t queue_size,
- int decompress_switch, struct http_decoder_env *httpd_env,
- long long req_start_seq, long long res_start_seq)
-{
- struct http_decoder_exdata *hd_ctx = CALLOC(struct http_decoder_exdata, 1);
- hd_ctx->mempool = nmx_create_pool(mempool_size);
- hd_ctx->decoder = http_decoder_new(hd_ctx, hd_ctx->mempool, http_event_handler, decompress_switch,
- httpd_env, req_start_seq, res_start_seq);
- hd_ctx->queue = http_decoder_result_queue_new(hd_ctx->mempool, queue_size);
- return hd_ctx;
+ free(decoder);
}
-static void http_decoder_exdata_free(struct http_decoder_exdata *ex_data)
+static struct http_exdata *http_session_new(struct session *sess, struct http *http_env,
+ const char *tcp_payload, uint32_t tcp_payload_len)
{
- if (unlikely(NULL == ex_data))
- {
- return;
- }
- if (ex_data->decoder != NULL)
- {
- http_decoder_free(ex_data->mempool, ex_data->decoder);
- ex_data->decoder = NULL;
- }
- if (ex_data->queue != NULL)
+ struct http_exdata *exdata = (struct http_exdata *)calloc(1, sizeof(struct http_exdata));
+ session_set_exdata(sess, http_env->exdata_id, exdata);
+ exdata->sess = sess;
+ exdata->http_env = http_env;
+ size_t http_identify_len = MIN(tcp_payload_len, HTTP_IDENTIFY_LEN);
+ int ret = http_protocol_identify(tcp_payload, http_identify_len);
+ if (ret < 0)
{
- http_decoder_result_queue_free(ex_data->mempool, ex_data->queue);
- ex_data->queue = NULL;
+ STELLAR_LOG_DEBUG(http_env->logger_ref, HTTP_MODULE_NAME,
+ "http identify error, ignore this session: %s", session_get_readable_addr(sess));
+ exdata->ignore_session = 1;
+ return exdata;
}
- if (ex_data->mempool)
- {
- nmx_destroy_pool(ex_data->mempool);
- }
- FREE(ex_data);
-}
-
-static int http_protocol_identify(const char *data, size_t data_len)
-{
- llhttp_t parser;
- llhttp_settings_t settings;
- enum llhttp_errno error;
-
- llhttp_settings_init(&settings);
- llhttp_init(&parser, HTTP_BOTH, &settings);
-
- error = llhttp_execute(&parser, data, data_len);
- if (error != HPE_OK)
- {
- return -1;
- }
- return 1;
-}
-
-static void _http_decoder_context_free(struct http_decoder_env *env)
-{
- if (NULL == env)
- {
- return;
- }
-
- http_decoder_stat_free(&env->hd_stat);
-
- for (int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++)
- {
- if (env->topic_exdata_compose[i].msg_free_cb)
- {
- stellar_mq_destroy_topic(env->st, env->topic_exdata_compose[i].sub_topic_id);
- }
- }
-
- FREE(env);
-}
-
-static int load_http_decoder_config(const char *cfg_path,
- struct http_decoder_config *hd_cfg)
-{
- FILE *fp = fopen(cfg_path, "r");
- if (NULL == fp)
- {
- fprintf(stderr, "[%s:%d]Can't open config file:%s",
- __FUNCTION__, __LINE__, cfg_path);
- return -1;
- }
-
- int ret = 0;
- char errbuf[256] = {0};
-
- toml_table_t *root = toml_parse_file(fp, errbuf, sizeof(errbuf));
- fclose(fp);
-
- toml_table_t *basic_sec_tbl = toml_table_in(root, "basic");
- if (NULL == basic_sec_tbl)
- {
- fprintf(stderr, "[%s:%d]config file:%s has no key: [basic]",
- __FUNCTION__, __LINE__, cfg_path);
- toml_free(root);
- return -1;
- }
-
- toml_datum_t int_val = toml_int_in(basic_sec_tbl, "decompress");
- if (int_val.ok != 0)
- {
- hd_cfg->decompress_switch = int_val.u.b;
- }
-
- int_val = toml_int_in(basic_sec_tbl, "mempool_size");
- if (int_val.ok != 0)
- {
- hd_cfg->mempool_size = int_val.u.i;
- }
- else
- {
- hd_cfg->mempool_size = DEFAULT_MEMPOOL_SIZE;
- }
-
- int_val = toml_int_in(basic_sec_tbl, "result_queue_len");
- if (int_val.ok != 0)
- {
- hd_cfg->result_queue_len = int_val.u.i;
- }
- else
- {
- hd_cfg->result_queue_len = HD_RESULT_QUEUE_LEN;
- }
-
- int_val = toml_int_in(basic_sec_tbl, "stat_interval_pkts");
- if (int_val.ok != 0)
- {
- hd_cfg->stat_interval_pkts = int_val.u.i;
- }
- else
- {
- hd_cfg->stat_interval_pkts = DEFAULT_STAT_INTERVAL_PKTS;
- }
-
- int_val = toml_int_in(basic_sec_tbl, "stat_output_interval");
- if (int_val.ok != 0)
- {
- hd_cfg->stat_output_interval = int_val.u.i;
- }
- else
- {
- hd_cfg->stat_output_interval = DEFAULT_STAT_OUTPUT_INTERVAL;
- }
-
- int_val = toml_int_in(basic_sec_tbl, "proxy_enable");
- if (int_val.ok != 0)
- {
- hd_cfg->proxy_enable = int_val.u.i;
- }
- else
- {
- hd_cfg->proxy_enable = 0;
- }
-
- toml_free(root);
- return ret;
-}
-
-static int http_msg_get_request_header(const struct http_message *msg, const char *name, size_t name_len,
- struct http_header_field *hdr_result)
-{
- const struct http_decoder_half_data *req_data =
- msg->ref_queue->array[msg->queue_index].req_data;
- return http_decoder_half_data_get_header(req_data, name, name_len, hdr_result);
-}
-
-static int http_msg_get_response_header(const struct http_message *msg, const char *name, size_t name_len,
- struct http_header_field *hdr_result)
-{
- const struct http_decoder_half_data *res_data =
- msg->ref_queue->array[msg->queue_index].res_data;
- return http_decoder_half_data_get_header(res_data, name, name_len, hdr_result);
-}
-
-static int http_msg_request_header_next(const struct http_message *msg,
- struct http_header_field *hdr)
-{
- const struct http_decoder_half_data *req_data =
- msg->ref_queue->array[msg->queue_index].req_data;
- return http_decoder_half_data_iter_header((struct http_decoder_half_data *)req_data, hdr);
-}
-
-static int http_msg_response_header_next(const struct http_message *msg, struct http_header_field *hdr)
-{
- const struct http_decoder_half_data *res_data =
- msg->ref_queue->array[msg->queue_index].res_data;
- return http_decoder_half_data_iter_header((struct http_decoder_half_data *)res_data, hdr);
-}
-
-#if 0
-static int http_msg_get_request_raw_body(const struct http_message *msg, hstring *body)
-{
- const struct http_decoder_half_data *req_data =
- msg->ref_queue->array[msg->queue_index].req_data;
- return http_decoder_half_data_get_raw_body(req_data, body);
-}
-
-static int http_msg_get_response_raw_body(const struct http_message *msg, hstring *body)
-{
- const struct http_decoder_half_data *res_data =
- msg->ref_queue->array[msg->queue_index].res_data;
- return http_decoder_half_data_get_raw_body(res_data, body);
-}
-
-static int http_msg_get_request_decompress_body(const struct http_message *msg, hstring *body)
-{
- const struct http_decoder_half_data *req_data =
- msg->ref_queue->array[msg->queue_index].req_data;
- return http_decoder_half_data_get_decompress_body(req_data, body);
-}
-
-static int http_msg_get_response_decompress_body(const struct http_message *msg, hstring *body)
-{
- const struct http_decoder_half_data *res_data =
- msg->ref_queue->array[msg->queue_index].res_data;
- return http_decoder_half_data_get_decompress_body(res_data, body);
-}
-#endif
-
-static struct http_decoder_exdata *httpd_session_exdata_new(struct session *sess, struct http_decoder_env *httpd_env,
- long long req_start_seq, long long res_start_seq)
-{
- struct http_decoder_exdata *exdata = http_decoder_exdata_new(httpd_env->hd_cfg.mempool_size,
- httpd_env->hd_cfg.result_queue_len,
- httpd_env->hd_cfg.decompress_switch,
- httpd_env, req_start_seq, res_start_seq);
- // exdata->sub_topic_id = sub_topic_id;
- int thread_id = stellar_get_current_thread_index();
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_SESSION_NEW, 1);
+ exdata->decoder = http_decoder_new();
+ int thread_id = module_manager_get_thread_id(http_env->mod_mgr_ref);
+ http_stat_update(&http_env->stat, thread_id, HTTP_SESSION_NEW, 1);
+ STELLAR_LOG_INFO(http_env->logger_ref, HTTP_MODULE_NAME,
+ "http identify succ, session: %s", session_get_readable_addr(sess));
return exdata;
}
-#ifdef __cplusplus
-extern "C"
+static void http_decoder_execute(struct session *sess, struct http *http_env, struct http_exdata *exdata,
+ const char *tcp_payload, uint16_t tcp_payload_len)
{
-#endif
+ int thread_id = module_manager_get_thread_id(http_env->mod_mgr_ref);
+ enum flow_type flow_dir = session_get_flow_type(sess);
- void httpd_ex_data_free_cb(int idx, void *ex_data, void *arg)
+ http_stat_update_tcp_seg(&http_env->stat, thread_id, flow_dir, tcp_payload_len);
+ struct http_half *half = http_flow_get_nx(exdata, flow_dir, sess, http_env);
+ int ret = http_half_flow_process(half, tcp_payload, tcp_payload_len);
+ if (ret < 0)
{
- if (NULL == ex_data)
- {
- return;
- }
- struct http_decoder_exdata *exdata = (struct http_decoder_exdata *)ex_data;
- http_decoder_exdata_free(exdata);
- }
-
- void *httpd_session_ctx_new_cb(struct session *sess, void *plugin_env)
- {
- return (void *)HTTP_CTX_IS_HTTP;
- }
-
- void httpd_session_ctx_free_cb(struct session *sess, void *session_ctx, void *plugin_env)
- {
- if (NULL == plugin_env || NULL == session_ctx)
- {
- return;
- }
- if (strncmp((const char *)session_ctx, HTTP_CTX_NOT_HTTP, strlen(HTTP_CTX_NOT_HTTP)) == 0)
- {
- return;
- }
- struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env;
- int thread_id = stellar_get_current_thread_index();
- unsigned char flow_flag = 0;
- session_is_symmetric(sess, &flow_flag);
-
- if (SESSION_SEEN_C2S_FLOW == flow_flag)
- {
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_C2S_ASYMMETRY_SESSION, 1);
- }
- else if (SESSION_SEEN_S2C_FLOW == flow_flag)
- {
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_S2C_ASYMMETRY_SESSION, 1);
- }
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_SESSION_FREE, 1);
- }
-
- static void http_decoder_execute(struct session *sess, struct http_decoder_env *httpd_env, struct http_decoder_exdata *exdata, const char *payload, uint16_t payload_len)
- {
- if (httpd_in_tunnel_transmitting(httpd_env, exdata))
- {
- http_decoder_push_tunnel_data(sess, exdata, httpd_tunnel_state_to_msg(exdata), payload, payload_len);
- httpd_tunnel_state_update(exdata);
- return;
- }
-
- int thread_id = stellar_get_current_thread_index();
- struct http_decoder_half *cur_half = NULL;
- enum flow_type sess_dir = session_get_flow_type(sess);
- if (FLOW_TYPE_C2S == sess_dir)
- {
- cur_half = exdata->decoder->c2s_half;
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_C2S_BYTES, payload_len);
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_C2S_TCP_SEG, 1);
- }
- else
- {
- cur_half = exdata->decoder->s2c_half;
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_S2C_BYTES, payload_len);
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_S2C_TCP_SEG, 1);
- }
-
- http_decoder_half_reinit(cur_half, exdata->queue, exdata->mempool, sess);
- int ret = http_decoder_half_parse(httpd_env->hd_cfg.proxy_enable, cur_half, payload, payload_len);
- if (ret < 0)
- {
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTP_STAT_PARSE_ERR, 1);
- stellar_session_plugin_dettach_current_session(sess);
- }
- }
-
- void http_decoder_tunnel_msg_cb(struct session *sess, int topic_id, const void *tmsg, void *per_session_ctx, void *plugin_env)
- {
- struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env;
- if (0 == httpd_env->hd_cfg.proxy_enable)
- {
- return;
- }
- hstring tunnel_payload;
- http_tunnel_message_get_payload((const struct http_tunnel_message *)tmsg, &tunnel_payload);
- uint16_t payload_len = tunnel_payload.iov_len;
- const char *payload = (char *)tunnel_payload.iov_base;
- if (NULL == payload || 0 == payload_len)
- {
- return;
- }
-
- struct http_decoder_exdata *exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].exdata_id);
- enum http_tunnel_message_type tmsg_type = http_tunnel_message_type_get((const struct http_tunnel_message *)tmsg);
-
- switch (tmsg_type)
- {
- case HTTP_TUNNEL_OPENING:
- {
- if (NULL != exdata)
- {
- // not support nested http tunnel
- session_mq_ignore_message(sess, topic_id, httpd_env->plugin_id);
- return;
- }
- size_t http_identify_len = payload_len > HTTP_IDENTIFY_LEN ? HTTP_IDENTIFY_LEN : payload_len;
- int is_http = http_protocol_identify(payload, http_identify_len);
- if (is_http)
- {
- long long max_req_seq = 0, max_res_seq = 0;
- struct http_decoder_exdata *tcp_stream_exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id);
- http_half_get_max_transaction_seq(tcp_stream_exdata, &max_req_seq, &max_res_seq);
- exdata = httpd_session_exdata_new(sess, httpd_env, max_req_seq, max_res_seq);
- session_exdata_set(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].exdata_id, exdata);
- exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_MSG_INDEX].sub_topic_id;
- exdata->in_tunnel_is_http = 1;
- }
- else
- {
- // inner tunnel is not http, do nothing, do not push this message again !!!
- session_mq_ignore_message(sess, topic_id, httpd_env->plugin_id);
- return;
- }
- }
- break;
-
- case HTTP_TUNNEL_ACTIVE:
- if (NULL == exdata)
- {
- session_mq_ignore_message(sess, topic_id, httpd_env->plugin_id);
- http_decoder_stat_update(&httpd_env->hd_stat, stellar_get_current_thread_index(), HTTP_STAT_PARSE_ERR, 1);
- return;
- }
- break;
-
- case HTTP_TUNNEL_CLOSING:
- if (NULL == exdata)
- {
- http_decoder_stat_update(&httpd_env->hd_stat, stellar_get_current_thread_index(), HTTP_STAT_PARSE_ERR, 1);
- return;
- }
- if (exdata->in_tunnel_is_http)
- {
- http_half_pre_context_free(sess, exdata);
- }
- return;
- break;
-
- default:
- break;
- }
- if (exdata->in_tunnel_is_http)
- {
- http_decoder_execute(sess, httpd_env, exdata, payload, payload_len);
- }
+ STELLAR_LOG_WARN(http_env->logger_ref, HTTP_MODULE_NAME,
+ "llhttp parse error, ignore this session: %s", session_get_readable_addr(sess));
+ http_stat_update(&http_env->stat, thread_id, HTTP_STAT_PARSE_ERR, 1);
+ exdata->ignore_session = 1;
return;
}
-
-static void http_decoder_on_session_free(struct session *sess,void *per_session_ctx, void *plugin_env)
-{
- struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env;
- struct http_decoder_exdata *exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id);
- if (httpd_in_tunnel_transmitting(httpd_env, exdata))
- {
- http_decoder_push_tunnel_data(sess, exdata, HTTP_TUNNEL_CLOSING, NULL, 0);
- }
- else
- {
- http_half_pre_context_free(sess, exdata);
- }
- return;
}
- void http_decoder_tcp_stream_msg_cb(struct session *sess, int topic_id, const void *msg, void *nouse_session_ctx, void *plugin_env)
- {
- struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env;
- struct http_decoder_exdata *exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id);
- const char *payload = NULL;
- uint16_t payload_len = 0;
- assert(msg != NULL);
-
- payload_len = tcp_segment_get_len((struct tcp_segment *)msg);
- payload = tcp_segment_get_data((const struct tcp_segment *)msg);
- if (unlikely(0 == payload_len || NULL == payload))
- {
- return;
- }
- if (NULL == exdata) // first packet
- {
- size_t http_identify_len = payload_len > HTTP_IDENTIFY_LEN ? HTTP_IDENTIFY_LEN : payload_len;
- int ret = http_protocol_identify(payload, http_identify_len);
- if (ret < 0)
- {
- stellar_session_plugin_dettach_current_session(sess);
- return;
- }
- exdata = httpd_session_exdata_new(sess, httpd_env, 0, 0);
- exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_MSG_INDEX].sub_topic_id;
- session_exdata_set(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id, exdata);
- }
- http_decoder_execute(sess, httpd_env, exdata, payload, payload_len);
- return;
- }
-
- static const struct http_topic_exdata_compose g_topic_exdata_compose[HTTPD_TOPIC_INDEX_MAX] =
- {
- {HTTPD_TOPIC_TCP_STREAM_INDEX, TOPIC_TCP_STREAM, http_decoder_tcp_stream_msg_cb, NULL, "HTTP_DECODER_EXDATA_BASEON_TCP_STREAM", httpd_ex_data_free_cb, -1, -1},
- {HTTPD_TOPIC_HTTP_MSG_INDEX, HTTP_TOPIC, NULL, http_message_free, NULL, NULL, -1, -1},
- {HTTPD_TOPIC_HTTP_TUNNEL_INDEX, HTTP_DECODER_TUNNEL_TOPIC, http_decoder_tunnel_msg_cb, http_message_free, "HTTP_DECODER_EXDATA_BASEON_HTTP_TUNNEL", httpd_ex_data_free_cb, -1, -1},
- };
-
- static void http_decoder_topic_exdata_compose_init(struct http_decoder_env *httpd_env)
+void http_message_free_cb(void *msg, void *msg_free_arg UNUSED)
+{
+ struct http_message *hmsg = (struct http_message *)msg;
+ struct http_half_data *flow_data = hmsg->flow_data;
+ if (hmsg->topic_type == HTTP_TOPIC_REQ_HEADER || hmsg->topic_type == HTTP_TOPIC_RES_HEADER)
{
- memcpy(httpd_env->topic_exdata_compose, g_topic_exdata_compose, sizeof(g_topic_exdata_compose));
- for (int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++)
+ if (flow_data)
{
- httpd_env->topic_exdata_compose[i].sub_topic_id = stellar_session_mq_get_topic_id_reliable(httpd_env->st,
- httpd_env->topic_exdata_compose[i].topic_name,
- httpd_env->topic_exdata_compose[i].msg_free_cb,
- NULL);
- assert(httpd_env->topic_exdata_compose[i].sub_topic_id >= 0);
-
- if (httpd_env->topic_exdata_compose[i].exdata_name)
+ if (flow_data->ut_filed_array)
{
- httpd_env->topic_exdata_compose[i].exdata_id = stellar_exdata_new_index(httpd_env->st,
- httpd_env->topic_exdata_compose[i].exdata_name,
- httpd_env->topic_exdata_compose[i].exdata_free_cb,
- NULL);
- assert(httpd_env->topic_exdata_compose[i].exdata_id >= 0);
+ utarray_free(flow_data->ut_filed_array);
+ flow_data->ut_filed_array = NULL;
}
-
- if (httpd_env->topic_exdata_compose[i].on_msg_cb)
+ if (flow_data->joint_url.iov_base)
{
- stellar_session_mq_subscribe(httpd_env->st, httpd_env->topic_exdata_compose[i].sub_topic_id,
- httpd_env->topic_exdata_compose[i].on_msg_cb, httpd_env->plugin_id);
+ free(flow_data->joint_url.iov_base);
+ flow_data->joint_url.iov_base = NULL;
}
- }
- }
-
- int http_topic_exdata_compose_get_index(const struct http_decoder_env *httpd_env, int by_topic_id)
- {
- for (int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++)
- {
- if (httpd_env->topic_exdata_compose[i].sub_topic_id == by_topic_id)
+ if (flow_data->cached_header_buffer != NULL)
{
- return i;
+ assert(flow_data->cached_header_buffer->reference > 0);
+ flow_data->cached_header_buffer->reference--;
+ if (flow_data->cached_header_buffer->reference == 0)
+ {
+ free(flow_data->cached_header_buffer->buffer);
+ free(flow_data->cached_header_buffer);
+ flow_data->cached_header_buffer = NULL;
+ }
}
}
- assert(0);
- return -1;
- }
-
- void *http_decoder_init(struct stellar *st)
- {
- int thread_num = 0;
-
- struct http_decoder_env *httpd_env = CALLOC(struct http_decoder_env, 1);
- int ret = load_http_decoder_config(HTTPD_CFG_FILE, &httpd_env->hd_cfg);
- if (ret < 0)
- {
- goto failed;
- }
- httpd_env->st = st;
- httpd_env->plugin_id = stellar_plugin_register(st, httpd_session_ctx_new_cb,
- httpd_session_ctx_free_cb, NULL,http_decoder_on_session_free,(void *)httpd_env);
- if (httpd_env->plugin_id < 0)
- {
- goto failed;
- }
- http_decoder_topic_exdata_compose_init(httpd_env);
-
- thread_num = stellar_get_worker_thread_num(st);
- assert(thread_num >= 1);
- if (http_decoder_stat_init(&httpd_env->hd_stat, thread_num,
- httpd_env->hd_cfg.stat_interval_pkts, httpd_env->hd_cfg.stat_output_interval) < 0)
- {
- goto failed;
- }
-
- return httpd_env;
-
- failed:
- fprintf(stderr, "http_decoder_init fail!\n");
- _http_decoder_context_free(httpd_env);
- return NULL;
- }
-
- void http_decoder_exit(void *plugin_env)
- {
- if (NULL == plugin_env)
- {
- return;
- }
- struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env;
- _http_decoder_context_free(httpd_env);
}
-
- enum http_message_type http_message_get_type(const struct http_message *msg)
+ if (hmsg->event == HTTP_EVENT_REQ_BODY_DATA || hmsg->event == HTTP_EVENT_RES_BODY_DATA)
{
- if (unlikely(NULL == msg))
+ if (flow_data->decompress_body.body != NULL)
{
- return HTTP_MESSAGE_MAX;
+ FREE(flow_data->decompress_body.body);
}
- return msg->type;
}
-
- void http_message_get0_request_line(const struct http_message *msg,
- struct http_request_line *line)
+ if (hmsg->event == HTTP_EVENT_REQ_END || hmsg->event == HTTP_EVENT_RES_END)
{
- if (unlikely(NULL == msg || msg->type != HTTP_MESSAGE_REQ_LINE))
+ if (flow_data)
{
- if (line)
- {
- line->method = NULL;
- line->uri = NULL;
- line->version = NULL;
- }
- return;
+ http_half_data_free(flow_data);
+ hmsg->flow_data = NULL;
}
- assert(msg->ref_queue);
- assert(msg->queue_index < HD_RESULT_QUEUE_LEN);
-
- struct http_decoder_half_data *req_data =
- msg->ref_queue->array[msg->queue_index].req_data;
-
- http_decoder_half_data_get_request_line(req_data, line);
}
- void http_message_get0_response_line(const struct http_message *msg,
- struct http_response_line *line)
- {
- if (unlikely(NULL == msg || msg->type != HTTP_MESSAGE_RES_LINE))
- {
- if (line)
- {
- line->version = NULL;
- line->status = NULL;
- }
- return;
- }
- assert(msg->ref_queue);
- assert(msg->queue_index < HD_RESULT_QUEUE_LEN);
-
- struct http_decoder_half_data *res_data =
- msg->ref_queue->array[msg->queue_index].res_data;
-
- http_decoder_half_data_get_response_line(res_data, line);
- }
+ free(hmsg);
+}
- void http_message_get0_header(const struct http_message *msg, const char *name, size_t name_len,
- struct http_header_field *hdr_result)
+void http_exdata_free_cb(UNUSED int idx, void *ex_data, UNUSED void *plugin_env)
+{
+ if (NULL == ex_data)
{
- int ret = -1;
- if (unlikely(NULL == msg || NULL == name || 0 == name_len))
- {
- goto fail;
- }
- assert(msg->ref_queue);
- assert(msg->queue_index < HD_RESULT_QUEUE_LEN);
- if (HTTP_MESSAGE_REQ_HEADER == msg->type)
- {
- ret = http_msg_get_request_header(msg, name, name_len, hdr_result);
- }
- else if (HTTP_MESSAGE_RES_HEADER == msg->type)
- {
- ret = http_msg_get_response_header(msg, name, name_len, hdr_result);
- }
- if (ret >= 0)
- {
- return;
- }
- fail:
- if (hdr_result)
- {
- hdr_result->name = NULL;
- hdr_result->value = NULL;
- }
return;
}
+ struct http_exdata *exdata = (struct http_exdata *)ex_data;
+ struct http *http_env = (struct http *)plugin_env;
- int http_message_get0_next_header(const struct http_message *msg, struct http_header_field *header)
+ if (exdata->ignore_session == 0)
{
- int ret = 1;
- if (unlikely(NULL == msg))
- {
- goto fail;
- }
- assert(msg->ref_queue);
- assert(msg->queue_index < HD_RESULT_QUEUE_LEN);
- if (HTTP_MESSAGE_REQ_HEADER == msg->type)
- {
- ret = http_msg_request_header_next(msg, header);
- }
- else if (HTTP_MESSAGE_RES_HEADER == msg->type)
- {
- ret = http_msg_response_header_next(msg, header);
- }
- if (ret < 0)
- {
- goto fail;
- }
- return 0;
- fail:
- if (header)
- {
- header->name = NULL;
- header->value = NULL;
- }
- return -1;
- }
-
- int http_message_reset_header_iter(struct http_message *msg)
- {
- if (unlikely(NULL == msg))
+ int thread_id = module_manager_get_thread_id(http_env->mod_mgr_ref);
+ unsigned char flow_flag = 0;
+ if (exdata->sess)
{
- return -1;
+ session_is_symmetric(exdata->sess, &flow_flag);
+ STELLAR_LOG_INFO(http_env->logger_ref, HTTP_MODULE_NAME,
+ "http session free: %s", session_get_readable_addr(exdata->sess));
}
- assert(msg->ref_queue);
- assert(msg->queue_index < HD_RESULT_QUEUE_LEN);
- if (HTTP_MESSAGE_REQ_HEADER == msg->type)
+ if (SESSION_SEEN_C2S_FLOW == flow_flag)
{
- struct http_decoder_half_data *req_data =
- msg->ref_queue->array[msg->queue_index].req_data;
- return http_decoder_half_data_reset_header_iter(req_data);
+ http_stat_update(&http_env->stat, thread_id, HTTP_C2S_ASYMMETRY_SESSION, 1);
}
- else if (HTTP_MESSAGE_RES_HEADER == msg->type)
+ else if (SESSION_SEEN_S2C_FLOW == flow_flag)
{
- struct http_decoder_half_data *res_data =
- msg->ref_queue->array[msg->queue_index].res_data;
- return http_decoder_half_data_reset_header_iter(res_data);
+ http_stat_update(&http_env->stat, thread_id, HTTP_S2C_ASYMMETRY_SESSION, 1);
}
- return -1;
+ http_stat_update(&http_env->stat, thread_id, HTTP_SESSION_FREE, 1);
}
+ http_decoder_free(exdata->decoder);
+ exdata->decoder = NULL;
+ free(exdata);
+}
- void http_message_get0_uncompressed_body(const struct http_message *msg, const char **body_ptr, size_t *body_len)
- {
- if (unlikely(NULL == msg))
- {
- goto fail;
- }
- assert(msg->ref_queue);
- assert(msg->queue_index < HD_RESULT_QUEUE_LEN);
- if (msg->raw_payload.iov_base != NULL && msg->raw_payload.iov_len != 0)
- {
- *body_ptr = msg->raw_payload.iov_base;
- *body_len = msg->raw_payload.iov_len;
- return;
- }
- fail:
- if (body_ptr)
- {
- *body_ptr = NULL;
- *body_len = 0;
- }
- return;
- }
+void http_on_tcp_stream_cb(struct session *sess, enum session_state state UNUSED, const char *tcp_payload, uint32_t tcp_payload_len, void *args)
+{
+ struct http *http_env = (struct http *)args;
+ struct http_exdata *exdata = (struct http_exdata *)session_get_exdata(sess, http_env->exdata_id);
- void http_message_get0_decompressed_body(const struct http_message *msg, const char **dec_body_ptr, size_t *dec_body_len)
+ if (unlikely(0 == tcp_payload_len || NULL == tcp_payload))
{
- enum http_content_encoding ecode = HTTP_CONTENT_ENCODING_NONE;
- struct http_decoder_half_data *ref_data = NULL;
- if (unlikely(NULL == msg))
- {
- goto fail;
- }
- assert(msg->ref_queue);
- assert(msg->queue_index < HD_RESULT_QUEUE_LEN);
- if (msg->decompress_payload.iov_base != NULL && msg->decompress_payload.iov_len != 0)
- {
- *dec_body_ptr = msg->decompress_payload.iov_base;
- *dec_body_len = msg->decompress_payload.iov_len;
- return;
- }
- /**
- * @brief If the body hasn't been compressed, same as http_message_get0_uncompressed_body().
- *
- */
-
- if (HTTP_MESSAGE_REQ_BODY_START == msg->type || HTTP_MESSAGE_REQ_BODY == msg->type || HTTP_MESSAGE_REQ_BODY_END == msg->type)
- {
- ref_data = msg->ref_queue->array[msg->queue_index].req_data;
- }
- else if (HTTP_MESSAGE_RES_BODY_START == msg->type || HTTP_MESSAGE_RES_BODY == msg->type || HTTP_MESSAGE_RES_BODY_END == msg->type)
- {
- ref_data = msg->ref_queue->array[msg->queue_index].res_data;
- }
- ecode = http_half_data_get_content_encoding(ref_data);
- if (ref_data != NULL && HTTP_CONTENT_ENCODING_NONE != ecode)
- {
- goto fail;
- }
-
- if (msg->raw_payload.iov_base != NULL && msg->raw_payload.iov_len != 0)
- {
- *dec_body_ptr = msg->raw_payload.iov_base;
- *dec_body_len = msg->raw_payload.iov_len;
- }
- return;
- fail:
- if (dec_body_ptr)
- {
- *dec_body_ptr = NULL;
- *dec_body_len = 0;
- }
return;
}
-
- void http_message_get0_raw_url(const struct http_message *msg, const char **url_val, size_t *url_len)
+ if (unlikely(NULL == exdata)) // first packet
{
- struct http_decoder_half_data *req_data = NULL;
- if (unlikely(NULL == msg))
- {
- goto fail;
- }
- assert(msg->ref_queue);
- assert(msg->queue_index < HD_RESULT_QUEUE_LEN);
-
- req_data = msg->ref_queue->array[msg->queue_index].req_data;
-
- if (http_half_data_get_url(req_data, url_val, url_len) < 0)
- {
- goto fail;
- }
- return;
-
- fail:
- if (url_val)
- {
- *url_val = NULL;
- *url_len = 0;
- }
- return;
+ exdata = http_session_new(sess, http_env, tcp_payload, tcp_payload_len);
}
-#if 0
- void http_message_decoded_url_get0(const struct http_message *msg, struct iovec *url)
+ if (exdata->ignore_session) // quic path
{
- if (unlikely(NULL == msg))
- {
- if (url)
- {
- url->iov_base = NULL;
- url->iov_len = 0;
- }
- return;
- }
- assert(msg->ref_queue);
- assert(msg->queue_index < HD_RESULT_QUEUE_LEN);
-
- struct http_decoder_half_data *req_data =
- msg->ref_queue->array[msg->queue_index].req_data;
-
- if (http_half_data_get_decode_url(req_data, url) < 0)
- {
- goto fail;
- }
- return;
-
- fail:
- if (url)
- {
- url->iov_base = NULL;
- url->iov_len = 0;
- }
return;
}
-#endif
- int http_message_get_transaction_seq(const struct http_message *msg)
- {
- if (unlikely(NULL == msg))
- {
- return -1;
- }
- assert(msg->ref_queue);
- assert(msg->queue_index < HD_RESULT_QUEUE_LEN);
- struct http_decoder_half_data *hf_data = NULL;
- if (HTTP_REQUEST == msg->flow_type)
- {
- hf_data = msg->ref_queue->array[msg->queue_index].req_data;
- }
- else
- {
- hf_data = msg->ref_queue->array[msg->queue_index].res_data;
- }
- return http_half_data_get_transaction_seq(hf_data);
- }
-#ifdef __cplusplus
-}
-#endif \ No newline at end of file
+ STELLAR_LOG_DEBUG(http_env->logger_ref, HTTP_MODULE_NAME,
+ "sess: %s, rcv tcp payload len: %u", session_get_readable_addr(exdata->sess), tcp_payload_len);
+ http_decoder_execute(sess, http_env, exdata, tcp_payload, tcp_payload_len);
+ return;
+} \ No newline at end of file
diff --git a/decoders/http/http_decoder.h b/decoders/http/http_decoder.h
new file mode 100644
index 0000000..06cc72b
--- /dev/null
+++ b/decoders/http/http_decoder.h
@@ -0,0 +1,104 @@
+
+#pragma once
+#include <stdint.h>
+#include <stddef.h>
+#include "stellar/session.h"
+#include "stellar/http.h"
+#include "llhttp.h"
+#include "http_decoder_stat.h"
+#include "http_decoder_half.h"
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+#define HTTP_IDENTIFY_LEN 16
+#define HTTP_HEADERS_CACHE_MAX_SIZE 4096
+
+#define HTTP_MODULE_NAME "HTTP"
+#define HTTP_EXDATA_NAME "HTTP_EXDATA"
+
+#ifndef likely
+#define likely(x) __builtin_expect((x), 1)
+#endif
+#ifndef unlikely
+#define unlikely(x) __builtin_expect((x), 0)
+#endif
+
+#ifndef UNUSED
+#define UNUSED __attribute__((unused))
+#endif
+
+ struct http_config
+ {
+ int decompress_switch;
+ // int stat_output_interval;
+ // size_t mempool_size; // per session mempool size
+ };
+
+ enum http_topic_type
+ {
+ HTTP_TOPIC_REQ_HEADER = 0,
+ HTTP_TOPIC_REQ_BODY,
+ HTTP_TOPIC_RES_HEADER,
+ HTTP_TOPIC_RES_BODY,
+ HTTP_TOPIC_MAX,
+ };
+
+ struct http_topic_compose
+ {
+ const char *topic_name;
+ int topic_id;
+ };
+
+ struct http_topic_manager
+ {
+ struct http_topic_compose topic_compose[HTTP_TOPIC_MAX];
+ };
+
+ struct http
+ {
+ struct module_manager *mod_mgr_ref;
+ struct logger *logger_ref;
+ struct http_topic_manager *http_topic_mgr;
+ int exdata_id;
+ int plugin_id;
+ struct http_config hd_cfg;
+ struct http_stat stat;
+ };
+
+ struct http_half;
+ struct http_half_data;
+ struct http_decoder
+ {
+ struct http_half *flow_c2s;
+ struct http_half *flow_s2c;
+ };
+
+ struct http_exdata
+ {
+ int ignore_session;
+ struct http *http_env;
+ struct session *sess;
+ struct http_decoder *decoder;
+ };
+
+ struct http_message
+ {
+ struct session *sess_ref;
+ enum http_topic_type topic_type;
+ struct http_half_data *flow_data;
+ enum http_event event;
+ };
+
+ void http_on_tcp_stream_cb(struct session *sess, enum session_state state, const char *tcp_payload, uint32_t tcp_payload_len, void *args);
+ struct http_topic_manager *http_topic_mgr_init(struct module_manager *mod_mgr);
+ struct http_half *http_flow_get_nx(struct http_exdata *exdata, enum flow_type flow_dir, struct session *sess, struct http *http_env);
+ void http_message_free_cb(void *msg, void *msg_free_arg);
+ void http_exdata_free_cb(int idx, void *ex_data, void *plugin_env);
+ void http_topic_mgr_free(struct http_topic_manager *topic_mgr);
+
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file
diff --git a/decoders/http/http_decoder_decompress.c b/decoders/http/http_decoder_decompress.c
new file mode 100644
index 0000000..438987e
--- /dev/null
+++ b/decoders/http/http_decoder_decompress.c
@@ -0,0 +1,274 @@
+#include <zlib.h>
+#include <string.h>
+#include <assert.h>
+#include <brotli/decode.h>
+#include "http_decoder_utils.h"
+#include "http_decoder_decompress.h"
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+ struct http_content_decompress
+ {
+ enum http_content_encoding encoding;
+ z_stream *z_stream_ptr;
+ BrotliDecoderState *br_state;
+ char *buffer;
+ size_t buffer_size;
+ };
+
+ void http_content_decompress_ownership_borrow(struct http_content_decompress *decompress)
+ {
+ decompress->buffer = NULL; // ownership move to flow_data, will be freed when message has been processed by all modules
+ }
+
+ enum http_content_encoding http_content_encoding_str2int(const char *content_encoding, size_t encoding_str_len)
+ {
+ if (http_strncasecmp_safe("gzip", content_encoding, 4, encoding_str_len) == 0)
+ {
+ return HTTP_CONTENT_ENCODING_GZIP;
+ }
+ if (http_strncasecmp_safe("deflate", content_encoding, 7, encoding_str_len) == 0)
+ {
+ return HTTP_CONTENT_ENCODING_DEFLATE;
+ }
+ if (http_strncasecmp_safe("br", content_encoding, 2, encoding_str_len) == 0)
+ {
+ return HTTP_CONTENT_ENCODING_BR;
+ }
+ return HTTP_CONTENT_ENCODING_NONE;
+ }
+
+ const char *http_content_encoding_int2str(enum http_content_encoding content_encoding)
+ {
+ if (content_encoding == HTTP_CONTENT_ENCODING_GZIP)
+ {
+ return "gzip";
+ }
+ if (content_encoding == HTTP_CONTENT_ENCODING_DEFLATE)
+ {
+ return "deflate";
+ }
+ if (content_encoding == HTTP_CONTENT_ENCODING_BR)
+ {
+ return "br";
+ }
+ return "unknown";
+ }
+
+ struct http_content_decompress *http_content_decompress_create(enum http_content_encoding encoding)
+ {
+ struct http_content_decompress *decompress = CALLOC(struct http_content_decompress, 1);
+ assert(decompress);
+
+ decompress->encoding = encoding;
+ decompress->z_stream_ptr = NULL;
+ decompress->br_state = NULL;
+
+ if (encoding == HTTP_CONTENT_ENCODING_GZIP || encoding == HTTP_CONTENT_ENCODING_DEFLATE)
+ {
+ decompress->z_stream_ptr = CALLOC(z_stream, 1);
+ assert(decompress->z_stream_ptr);
+
+ decompress->z_stream_ptr->zalloc = NULL;
+ decompress->z_stream_ptr->zfree = NULL;
+ decompress->z_stream_ptr->opaque = NULL;
+ decompress->z_stream_ptr->avail_in = 0;
+ decompress->z_stream_ptr->next_in = Z_NULL;
+
+ if (encoding == HTTP_CONTENT_ENCODING_GZIP)
+ {
+ if (inflateInit2(decompress->z_stream_ptr, MAX_WBITS + 16) != Z_OK)
+ {
+ goto error;
+ }
+ }
+ if (encoding == HTTP_CONTENT_ENCODING_DEFLATE)
+ {
+ if (inflateInit2(decompress->z_stream_ptr, -MAX_WBITS) != Z_OK)
+ {
+ goto error;
+ }
+ }
+ }
+
+ if (encoding == HTTP_CONTENT_ENCODING_BR)
+ {
+ decompress->br_state = BrotliDecoderCreateInstance(NULL, NULL, NULL);
+ if (decompress->br_state == NULL)
+ {
+ goto error;
+ }
+ }
+ return decompress;
+
+ error:
+ http_content_decompress_destroy(decompress);
+ return NULL;
+ }
+
+ void http_content_decompress_destroy(struct http_content_decompress *decompress)
+ {
+ if (NULL == decompress)
+ {
+ return;
+ }
+ if (decompress->z_stream_ptr != NULL)
+ {
+ inflateEnd(decompress->z_stream_ptr);
+ FREE(decompress->z_stream_ptr);
+ }
+ if (decompress->br_state)
+ {
+ BrotliDecoderDestroyInstance(decompress->br_state);
+ decompress->br_state = NULL;
+ }
+ FREE(decompress->buffer);
+ FREE(decompress);
+ }
+
+ static int http_content_decompress_write_zlib(struct http_content_decompress *decompress,
+ const char *indata, size_t indata_len,
+ char **outdata, size_t *outdata_len)
+ {
+ z_stream *z_stream_ptr = decompress->z_stream_ptr;
+ z_stream_ptr->avail_in = (unsigned int)indata_len;
+ z_stream_ptr->next_in = (unsigned char *)indata;
+ z_stream_ptr->avail_out = (unsigned int)HTTP_DECOMPRESS_BUFFER_SIZE;
+ z_stream_ptr->next_out = (unsigned char *)decompress->buffer;
+ *outdata = NULL;
+ *outdata_len = 0;
+ size_t total_have = 0;
+ int no_buffer;
+
+ do
+ {
+ int ret = inflate(z_stream_ptr, Z_NO_FLUSH);
+ if (ret == Z_STREAM_ERROR || ret == Z_NEED_DICT || ret == Z_DATA_ERROR || ret == Z_MEM_ERROR)
+ {
+ (void)inflateEnd(z_stream_ptr);
+ return -1;
+ }
+ size_t have = HTTP_DECOMPRESS_BUFFER_SIZE - z_stream_ptr->avail_out;
+ if (have > 0)
+ {
+ total_have += have;
+ if (0 == z_stream_ptr->avail_out)
+ {
+ decompress->buffer_size += HTTP_DECOMPRESS_BUFFER_SIZE;
+ decompress->buffer = REALLOC(char, decompress->buffer, decompress->buffer_size);
+ z_stream_ptr->avail_out = HTTP_DECOMPRESS_BUFFER_SIZE;
+ z_stream_ptr->next_out = (unsigned char *)decompress->buffer + total_have;
+ *outdata = decompress->buffer;
+ *outdata_len = total_have;
+ no_buffer = 1;
+ }
+ else
+ {
+ *outdata = decompress->buffer;
+ *outdata_len = total_have;
+ no_buffer = 0;
+ }
+ }
+ else
+ {
+ break;
+ }
+ if (Z_STREAM_END == ret)
+ {
+ break;
+ }
+ } while (no_buffer == 1);
+ return 0;
+ }
+
+ static int http_content_decompress_write_br(struct http_content_decompress *decompress,
+ const char *indata, size_t indata_len,
+ char **outdata, size_t *outdata_len)
+ {
+ size_t available_in = indata_len;
+ const unsigned char *next_in = (const unsigned char *)indata;
+ size_t available_out = HTTP_DECOMPRESS_BUFFER_SIZE;
+ unsigned char *next_out = (unsigned char *)decompress->buffer;
+
+ *outdata = NULL;
+ *outdata_len = 0;
+ size_t total_have = 0;
+ int no_buffer;
+
+ do
+ {
+ int ret = BrotliDecoderDecompressStream(decompress->br_state, &available_in,
+ &next_in, &available_out, &next_out, NULL);
+ if (ret == BROTLI_DECODER_RESULT_ERROR)
+ {
+ // BrotliDecoderErrorCode errcode = BrotliDecoderGetErrorCode(decompress->br_state);
+ *outdata = NULL;
+ *outdata_len = 0;
+ return -1;
+ }
+ size_t have = HTTP_DECOMPRESS_BUFFER_SIZE - available_out;
+ if (have > 0)
+ {
+ total_have += have;
+ if (0 == available_out)
+ {
+ decompress->buffer_size += HTTP_DECOMPRESS_BUFFER_SIZE;
+ decompress->buffer = REALLOC(char, decompress->buffer, decompress->buffer_size);
+ available_out = HTTP_DECOMPRESS_BUFFER_SIZE;
+ next_out = (unsigned char *)decompress->buffer + total_have;
+ *outdata = decompress->buffer;
+ *outdata_len = total_have;
+ no_buffer = 1;
+ }
+ else
+ {
+ *outdata = decompress->buffer;
+ *outdata_len = have;
+ no_buffer = 0;
+ }
+ }
+ else
+ {
+ break;
+ }
+ } while (no_buffer == 1);
+ return 0;
+ }
+
+ int http_content_decompress_write(struct http_content_decompress *decompress,
+ const char *indata, size_t indata_len,
+ char **outdata, size_t *outdata_len)
+ {
+ assert(decompress);
+ assert(indata);
+ assert(indata_len > 0);
+ assert(outdata);
+ assert(outdata_len);
+ *outdata = NULL;
+ *outdata_len = 0;
+
+ if (NULL == decompress->buffer)
+ {
+ decompress->buffer = CALLOC(char, HTTP_DECOMPRESS_BUFFER_SIZE);
+ assert(decompress->buffer);
+ decompress->buffer_size = HTTP_DECOMPRESS_BUFFER_SIZE;
+ }
+
+ if (decompress->encoding == HTTP_CONTENT_ENCODING_GZIP ||
+ decompress->encoding == HTTP_CONTENT_ENCODING_DEFLATE)
+ {
+ return http_content_decompress_write_zlib(decompress, indata, indata_len, outdata, outdata_len);
+ }
+ if (decompress->encoding == HTTP_CONTENT_ENCODING_BR)
+ {
+ return http_content_decompress_write_br(decompress, indata, indata_len, outdata, outdata_len);
+ }
+ assert(0);
+ return -1;
+ }
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file
diff --git a/decoders/http/http_content_decompress.h b/decoders/http/http_decoder_decompress.h
index 2ac4689..fd82dd5 100644
--- a/decoders/http/http_content_decompress.h
+++ b/decoders/http/http_decoder_decompress.h
@@ -6,6 +6,9 @@ extern "C"
#endif
#include <stddef.h>
+#include "stellar/http.h"
+
+#define HTTP_DECOMPRESS_BUFFER_SIZE (4096)
enum http_content_encoding
{
@@ -13,18 +16,14 @@ extern "C"
HTTP_CONTENT_ENCODING_GZIP = 1 << 1,
HTTP_CONTENT_ENCODING_DEFLATE = 1 << 2,
HTTP_CONTENT_ENCODING_BR = 1 << 3,
+ HTTP_CONTENT_ENCODING_UNKNOWN = 1 << 4,
};
struct http_content_decompress;
-
enum http_content_encoding http_content_encoding_str2int(const char *content_encoding, size_t encoding_str_len);
-
const char *http_content_encoding_int2str(enum http_content_encoding content_encoding);
-
struct http_content_decompress *http_content_decompress_create(enum http_content_encoding encoding);
-
void http_content_decompress_destroy(struct http_content_decompress *decompress);
-
// return 0 : success
// return -1 : error
int http_content_decompress_write(struct http_content_decompress *decompress,
diff --git a/decoders/http/http_decoder_half.c b/decoders/http/http_decoder_half.c
index 70991c6..84874a3 100644
--- a/decoders/http/http_decoder_half.c
+++ b/decoders/http/http_decoder_half.c
@@ -2,1187 +2,689 @@
#include <stdio.h>
#include <string.h>
#include <arpa/inet.h>
-#include "http_decoder_private.h"
+#include "http_decoder.h"
+#include "http_decoder_half.h"
+#include "http_decoder_utils.h"
+#include "http_decoder_decompress.h"
#include "llhttp.h"
+#include "stellar/session.h"
#include "uthash/utlist.h"
+#include "uthash/utarray.h"
-struct http_decompress_buffer
+#ifdef __cplusplus
+extern "C"
{
- struct iovec iov;
- char is_commit;
- struct http_decompress_buffer *next, *prev;
-};
-
-struct http_decoder_half_data
-{
- struct http_decoder_table *table;
-
- int major_version;
- int minor_version;
- int status_code;
-
- enum http_event state;
-
- enum http_content_encoding content_encoding;
- struct http_content_decompress *decompress;
-#if 0
- char *ref_decompress_body;
- size_t decompress_body_len;
-#else
- struct http_decompress_buffer *decompress_buffer_list;
-#endif
- int joint_url_complete;
- int url_is_encoded;
- // http://<host>[:<port>]/<path>?<searchpart>
- hstring joint_url;
- hstring decoded_url;
- long long transaction_index;
-};
-
-struct http_decoder_half
-{
- llhttp_t parser;
- llhttp_settings_t settings;
- enum llhttp_errno error;
- int decompress_switch;
- struct http_decoder_env *httpd_env;
-
- // uint8_t is_request_flow;
- enum http_event event;
- http_event_cb *http_ev_cb;
- struct http_event_context *http_ev_ctx;
-
- struct http_decoder_half_data *ref_data;
-
- long long trans_counter;
- long long err_counter;
- long long transaction_seq; // accumulated
-
- const char *data;
- int data_len;
-};
-
-// #define HTTP_DECODER_DEBUG
-#ifdef HTTP_DECODER_DEBUG
-static void printf_debug_info(const char *desc, const char *at, size_t length)
-{
- if (at)
- {
- char *temp = http_safe_dup(at, length);
- printf("HTTP PARSER STAGE: %s: %s\n", desc, temp);
- FREE(temp);
- }
- else
- {
- printf("HTTP PARSER STAGE: %s\n", desc);
- }
-}
-#else
-#define printf_debug_info(desc, at, length)
#endif
-void http_half_decompress_buffer_free(struct http_decoder_half_data *data, hstring *decompress_body)
-{
- struct http_decompress_buffer *el, *tmp;
- DL_FOREACH_SAFE(data->decompress_buffer_list, el, tmp)
+ static inline void http_half_stage_reset_flow_data(struct http_half *half)
{
- if (el->iov.iov_base == decompress_body->iov_base && el->iov.iov_len == decompress_body->iov_len)
+ if (half->flow_data)
{
- DL_DELETE(data->decompress_buffer_list, el);
- if (el->iov.iov_base)
+ memset(&half->flow_data->req_line, 0, sizeof(struct http_request_line));
+ memset(&half->flow_data->status_line, 0, sizeof(struct http_status_line));
+ memset(&half->flow_data->header, 0, sizeof(struct http_header));
+ if (half->flow_data->ut_filed_array)
{
- FREE(el->iov.iov_base);
+ utarray_clear(half->flow_data->ut_filed_array);
}
- FREE(el);
- break;
}
}
-}
-
-void http_half_get_lastest_decompress_buffer(struct http_decoder_half_data *data, hstring *decompress_body)
-{
- if (data->content_encoding == HTTP_CONTENT_ENCODING_NONE)
- {
- return;
- }
- if (data->decompress_buffer_list == NULL)
- {
- decompress_body->iov_base = NULL;
- decompress_body->iov_len = 0;
- return;
- }
- if (data->decompress_buffer_list->prev->is_commit == 1)
+ static inline void http_half_stage_reset_stat(struct http_half *half)
{
- decompress_body->iov_base = NULL;
- decompress_body->iov_len = 0;
- return;
+ memset(&half->half_stage, 0, sizeof(struct http_half_llhttp_stage));
}
- decompress_body->iov_base = data->decompress_buffer_list->prev->iov.iov_base;
- decompress_body->iov_len = data->decompress_buffer_list->prev->iov.iov_len;
- data->decompress_buffer_list->prev->is_commit = 1;
-}
-
-static void http_decoder_half_data_decompress(struct http_decoder_half_data *data)
-{
- assert(data);
-
- if (data->content_encoding == HTTP_CONTENT_ENCODING_NONE)
+ static void http_half_stage_reset(struct http_half *half)
{
- return;
+ http_half_stage_reset_stat(half);
+ http_half_stage_reset_flow_data(half);
}
- hstring raw_body = {};
- http_decoder_table_get_body(data->table, (char **)&raw_body.iov_base, &raw_body.iov_len);
- if (raw_body.iov_base == NULL || raw_body.iov_len == 0)
+ void http_flow_append_header_filed(struct http_half_data *flow_data, const char *at, size_t length)
{
- return;
+ if (NULL == flow_data->ut_filed_array) // the first field
+ {
+ UT_icd header_icd = {sizeof(struct http_header_field), NULL, NULL, NULL};
+ utarray_new(flow_data->ut_filed_array, &header_icd);
+ assert(flow_data->ut_filed_array != NULL);
+ utarray_reserve(flow_data->ut_filed_array, HTTP_HEADER_FIELD_RESERVE_NUM);
+ }
+ struct http_header_field new_filed = {};
+ new_filed.field_name = (char *)at;
+ new_filed.field_name_len = length;
+ utarray_push_back(flow_data->ut_filed_array, &new_filed);
}
- if (NULL == data->decompress)
+ void http_flow_append_header_value(struct http_half_data *flow_data, const char *at, size_t length)
{
- data->decompress = http_content_decompress_create(data->content_encoding);
+ assert(flow_data->ut_filed_array != NULL);
+ struct http_header_field *last_field = (struct http_header_field *)utarray_back(flow_data->ut_filed_array);
+ assert(last_field != NULL);
+ if (last_field)
+ {
+ last_field->field_value = (char *)at;
+ last_field->field_value_len = length;
+ }
}
- assert(data->decompress);
- char *local_outdata = NULL;
- size_t local_outdata_len = 0;
- if (http_content_decompress_write(data->decompress, (char *)raw_body.iov_base,
- raw_body.iov_len,
- &local_outdata,
- &local_outdata_len) == -1)
+ static struct http_message *http_produce_message(struct session *sess, enum http_event event,
+ enum http_topic_type topic_type, struct http_half_data *flow_data)
{
- // log error
- http_content_decompress_destroy(data->decompress);
- data->decompress = NULL;
- return;
+ struct http_message *msg = (struct http_message *)calloc(1, sizeof(struct http_message));
+ msg->sess_ref = sess;
+ msg->topic_type = topic_type;
+ msg->flow_data = flow_data;
+ msg->event = event;
+ return msg;
}
- if (local_outdata != NULL && local_outdata_len > 0)
+ static struct http_half_data *http_half_data_new(enum flow_type flow_dir)
{
- struct http_decompress_buffer *decompress_buffer = CALLOC(struct http_decompress_buffer, 1);
- assert(decompress_buffer);
- decompress_buffer->iov.iov_base = local_outdata;
- decompress_buffer->iov.iov_len = local_outdata_len;
- DL_APPEND(data->decompress_buffer_list, decompress_buffer);
- http_content_decompress_ownership_borrow(data->decompress);
+ struct http_half_data *half_data = (struct http_half_data *)calloc(1, sizeof(struct http_half_data));
+ half_data->flow_dir = flow_dir;
+ /* update transaction seq in http_half_parse_headers_finally(), not here */
+ return half_data;
}
-}
-
-/* Possible return values 0, -1, `HPE_PAUSED` */
-static int on_message_begin(llhttp_t *http)
-{
- printf_debug_info("on_message_begin", NULL, 0);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
- if (half->parser.type == HTTP_REQUEST)
- {
- half->event = HTTP_EVENT_REQ_INIT;
- }
- else
+ void http_half_data_free(struct http_half_data *half_data)
{
- half->event = HTTP_EVENT_RES_INIT;
+ if (NULL == half_data)
+ {
+ return;
+ }
+ if (half_data->ut_filed_array)
+ {
+ utarray_free(half_data->ut_filed_array);
+ half_data->ut_filed_array = NULL;
+ }
+ if (half_data->joint_url.iov_base)
+ {
+ FREE(half_data->joint_url.iov_base);
+ }
+ if (half_data->decompress)
+ {
+ http_content_decompress_destroy(half_data->decompress);
+ half_data->decompress = NULL;
+ }
+ free(half_data);
}
- half->ref_data = NULL;
-
- assert(half->http_ev_cb != NULL);
- half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler()
-
- half->trans_counter++;
- half->ref_data->transaction_index = half->transaction_seq++;
- return 0;
-}
-
-static int on_message_complete(llhttp_t *http)
-{
- printf_debug_info("on_message_complete", NULL, 0);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
-
- if (half->parser.type == HTTP_REQUEST)
+ void http_half_free(struct http_half *half)
{
- if (half->event == HTTP_EVENT_REQ_BODY_DATA)
+ if (NULL == half)
{
- half->event = HTTP_EVENT_REQ_BODY_END;
- half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env);
+ return;
}
- }
- else
- {
- if (half->event == HTTP_EVENT_RES_BODY_DATA)
+ llhttp_reset(&half->parser.llhttp_parser);
+ http_half_data_free(half->flow_data);
+ if (half->cached_header_buffer)
{
- half->event = HTTP_EVENT_RES_BODY_END;
- half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env);
+ if (half->cached_header_buffer->buffer)
+ {
+ FREE(half->cached_header_buffer->buffer);
+ }
+ FREE(half->cached_header_buffer);
}
+ free(half);
}
- // trigger req_end/res_end
- if (half->parser.type == HTTP_REQUEST)
- {
- half->event = HTTP_EVENT_REQ_END;
- half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env);
- }
- else
+ struct http_half *http_half_new(enum flow_type flow_dir)
{
- half->event = HTTP_EVENT_RES_END;
- half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env);
+ struct http_half *half = (struct http_half *)calloc(1, sizeof(struct http_half));
+ if (FLOW_TYPE_C2S == flow_dir)
+ {
+ http_flow_parser_init(&half->parser, HTTP_REQUEST);
+ }
+ else
+ {
+ http_flow_parser_init(&half->parser, HTTP_RESPONSE);
+ }
+ half->parser.half_ref = half;
+ return half;
}
- return 0;
-}
-
-static int on_reset(llhttp_t *http __attribute__((unused)))
-{
- printf_debug_info("on_reset", NULL, 0);
-
- return 0;
-}
-
-static inline int is_line_crlf(struct http_decoder_half *half)
-{
- const char *chr_r = (char *)memrchr(half->data, '\r', half->data_len);
- const char *chr_n = (char *)memrchr(half->data, '\n', half->data_len);
- if (chr_r && chr_n && (chr_r + 1 == chr_n))
+ static struct http_header_field *http_flow_get_header_field(struct http_half_data *flow_data, const char *field_name, size_t field_name_len)
{
- return 1;
+ struct http_header_field *p;
+ if (NULL == flow_data->ut_filed_array) // some response no header fileds, such as "HTTP/1.1 100 Continue\r\n\r\n"
+ {
+ return NULL;
+ }
+ for (p = (struct http_header_field *)utarray_front(flow_data->ut_filed_array);
+ p != NULL;
+ p = (struct http_header_field *)utarray_next(flow_data->ut_filed_array, p))
+ {
+ if ((field_name_len == p->field_name_len) &&
+ (strncasecmp(p->field_name, field_name, field_name_len) == 0))
+ {
+ return p;
+ }
+ }
+ return NULL;
}
- return 0;
-}
-
-static int on_method(llhttp_t *http, const char *at, size_t length)
-{
- printf_debug_info("on_method", at, length);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
-
- http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_METHOD, at, length);
- return 0;
-}
-
-/* Information-only callbacks, return value is ignored */
-static int on_method_complete(llhttp_t *http)
-{
- printf_debug_info("on_method_complete", NULL, 0);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
- if (is_line_crlf(half) == 0)
+ static void http_using_session_addr_as_host(struct session *ref_session, struct http_header_field *host_result)
{
- http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_METHOD);
- }
-
- http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_METHOD);
-
- return 0;
-}
-
-/* Possible return values 0, -1, HPE_USER */
-static int on_uri(llhttp_t *http, const char *at, size_t length)
-{
- printf_debug_info("on_uri", at, length);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
-
- http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_URI, at, length);
- return 0;
-}
-
-static void http_decoder_cached_portion_url(struct http_decoder_half *half, const hstring *uri_result)
-{
- struct http_decoder_half_data *ref_data = half->ref_data;
- int uri_skip_len = 0;
+ memset(host_result, 0, sizeof(struct http_header_field));
+ struct http_session_addr ssaddr = {};
+ httpd_session_get_addr(ref_session, &ssaddr);
+ char ip_string_buf[INET6_ADDRSTRLEN];
- if ((uri_result->iov_len) > 7 && (strncasecmp("http://", (char *)uri_result->iov_base, 7) == 0)) // absolute URI
- {
- uri_skip_len = strlen("http://");
- ref_data->joint_url_complete = 1;
- }
- else
- {
- ref_data->joint_url_complete = 0;
+ if (4 == ssaddr.ipver)
+ {
+ host_result->field_value = (char *)calloc(1, (INET6_ADDRSTRLEN + 7) /* "ip:port" max length */);
+ inet_ntop(AF_INET, &ssaddr.daddr4, ip_string_buf, INET6_ADDRSTRLEN);
+ snprintf((char *)host_result->field_value, INET6_ADDRSTRLEN + 7, "%s:%u", ip_string_buf, ntohs(ssaddr.dport));
+ host_result->field_value_len = strlen(host_result->field_value);
+ }
+ else if (6 == ssaddr.ipver)
+ {
+ host_result->field_value = (char *)calloc(1, (INET6_ADDRSTRLEN + 7) /* "ip:port" max length */);
+ inet_ntop(AF_INET6, &ssaddr.daddr6, ip_string_buf, INET6_ADDRSTRLEN);
+ snprintf((char *)host_result->field_value, INET6_ADDRSTRLEN + 7, "%s:%u", ip_string_buf, ntohs(ssaddr.dport));
+ host_result->field_value_len = strlen(host_result->field_value);
+ }
+ else
+ {
+ host_result->field_value = (char *)calloc(1, 1);
+ host_result->field_value_len = 1;
+ }
+ return;
}
- ref_data->joint_url.iov_len = uri_result->iov_len - uri_skip_len;
- ref_data->joint_url.iov_base = MEMPOOL_CALLOC(half->http_ev_ctx->ref_mempool, char, ref_data->joint_url.iov_len);
- memcpy(ref_data->joint_url.iov_base, (char *)uri_result->iov_base + uri_skip_len, ref_data->joint_url.iov_len);
-}
-
-/* Information-only callbacks, return value is ignored */
-static int on_uri_complete(llhttp_t *http)
-{
- printf_debug_info("on_uri_complete", NULL, 0);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
-
- if (is_line_crlf(half) == 0)
+ static void http_flow_join_url(struct http_half_data *flow_data, const struct http_header_field *host_filed)
{
- http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_URI);
- }
-
- http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_URI);
-
- hstring uri_result = {};
- http_decoder_table_get_uri(half->ref_data->table, (char **)&uri_result.iov_base, &uri_result.iov_len);
- assert(uri_result.iov_base);
- http_decoder_cached_portion_url(half, &uri_result);
-
- return 0;
-}
+ int append_slash_len = 0;
+ const char *join_uri = flow_data->req_line.uri;
+ size_t join_url_len = flow_data->req_line.uri_len;
-/* Possible return values 0, -1, HPE_USER */
-static int on_version(llhttp_t *http, const char *at, size_t length)
-{
- printf_debug_info("on_version", at, length);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
-
- http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_VERSION, at, length);
- return 0;
-}
+ // skip schema "http://"
+ if (join_url_len > 7 && strncasecmp(join_uri, "http://", 7) == 0)
+ {
+ join_uri += 7;
+ join_url_len -= 7;
+ }
-/* Information-only callbacks, return value is ignored */
-static int on_version_complete(llhttp_t *http)
-{
- printf_debug_info("on_version_complete", NULL, 0);
+ // if uri not absolute path(start with '/'), append '/'
+ if ('/' != join_uri[0])
+ {
+ append_slash_len = 1;
+ }
+ int url_cache_str_len = host_filed->field_value_len + join_url_len + append_slash_len;
+ char *url_cache_str = (char *)calloc(1, url_cache_str_len);
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
+ char *ptr = url_cache_str;
+ memcpy(ptr, host_filed->field_value, host_filed->field_value_len);
+ ptr += host_filed->field_value_len;
+ if (append_slash_len)
+ {
+ *ptr = '/';
+ ptr++;
+ }
+ memcpy(ptr, join_uri, join_url_len);
- if (is_line_crlf(half) == 0)
- {
- http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_VERSION);
+ flow_data->joint_url.iov_base = url_cache_str;
+ flow_data->joint_url.iov_len = url_cache_str_len;
}
- http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_VERSION);
-
- half->ref_data->major_version = llhttp_get_http_major(&half->parser);
- half->ref_data->minor_version = llhttp_get_http_minor(&half->parser);
-
- if (half->parser.type == HTTP_REQUEST)
+ // todo: optimize use hash table
+ static void http_half_get_frequently_used_fileds(struct http_half_data *flow_data)
{
- half->event = HTTP_EVENT_REQ_LINE;
- if (half->http_ev_cb) // http_event_handler()
+ if (FLOW_TYPE_C2S == flow_data->flow_dir)
+ {
+ flow_data->header.host = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_HOST, strlen(HTTP_HEADER_FIELD_NAME_HOST));
+ flow_data->header.user_agent = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_USER_AGENT, strlen(HTTP_HEADER_FIELD_NAME_USER_AGENT));
+ flow_data->header.referer = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_REFERER, strlen(HTTP_HEADER_FIELD_NAME_REFERER));
+ flow_data->header.cookie = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_COOKIE, strlen(HTTP_HEADER_FIELD_NAME_COOKIE));
+ }
+ else
{
- half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env);
+ flow_data->header.set_cookie = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_SET_COOKIE, strlen(HTTP_HEADER_FIELD_NAME_SET_COOKIE));
}
+ flow_data->header.content_type = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_CONTENT_TYPE, strlen(HTTP_HEADER_FIELD_NAME_CONTENT_TYPE));
}
- return 0;
-}
-
-/* Possible return values 0, -1, HPE_USER */
-static int on_status(llhttp_t *http, const char *at, size_t length)
-{
- printf_debug_info("on_status", at, length);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
-
- http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_STATUS, at, length);
- return 0;
-}
-
-/* Information-only callbacks, return value is ignored */
-static int on_status_complete(llhttp_t *http)
-{
- printf_debug_info("on_status_complete", NULL, 0);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
-
- if (is_line_crlf(half) == 0)
+ static int http_half_parse_headers_finally(struct http_half *half)
{
- http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_STATUS);
- }
+ struct http_half_data *flow_data = half->flow_data;
+ http_half_get_frequently_used_fileds(flow_data);
- http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_STATUS);
- half->ref_data->status_code = llhttp_get_status_code(&half->parser);
+ /* update transaction_seq when headers completed */
+ half->flow_data->transaction_seq = half->transaction_num++;
- if (half->parser.type == HTTP_RESPONSE)
- {
- half->event = HTTP_EVENT_RES_LINE;
- if (half->http_ev_cb != NULL) // http_event_handler()
+ if (FLOW_TYPE_C2S == flow_data->flow_dir)
{
- half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env);
+ struct http_header_field tmp_host_field = {};
+ const struct http_header_field *host_filed = flow_data->header.host;
+ if (NULL == host_filed)
+ {
+ http_using_session_addr_as_host(half->sess_ref, &tmp_host_field);
+ host_filed = &tmp_host_field;
+ }
+ http_flow_join_url(flow_data, host_filed);
+ FREE(tmp_host_field.field_name);
+ FREE(tmp_host_field.field_value);
}
- }
- return 0;
-}
-
-/* Possible return values 0, -1, HPE_USER */
-static int on_header_field(llhttp_t *http, const char *at, size_t length)
-{
- printf_debug_info("on_header_field", at, length);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
-
- http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_HDRKEY, at, length);
- return 0;
-}
-
-/* Information-only callbacks, return value is ignored */
-static int on_header_field_complete(llhttp_t *http)
-{
- printf_debug_info("on_header_field_complete", NULL, 0);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
-
- http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_HDRKEY);
-
- return 0;
-}
-
-/* Possible return values 0, -1, HPE_USER */
-static int on_header_value(llhttp_t *http, const char *at, size_t length)
-{
- printf_debug_info("on_header_value", at, length);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
-
- http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_HDRVAL, at, length);
- return 0;
-}
-
-#define MAX_ENCODING_STR_LEN 8
-/* Information-only callbacks, return value is ignored */
-static int on_header_value_complete(llhttp_t *http)
-{
- printf_debug_info("on_header_value_complete", NULL, 0);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
-
- if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_HDRKEY) ==
- STRING_STATE_CACHE)
- {
- http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_HDRKEY);
- }
-
- http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_HDRVAL);
+ struct http_header_field *encoding_field = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_CONTENT_ENCODING, strlen(HTTP_HEADER_FIELD_NAME_CONTENT_ENCODING));
+ if (NULL == encoding_field)
+ {
+ flow_data->content_encoding_type = HTTP_CONTENT_ENCODING_NONE;
+ }
+ else
+ {
+ flow_data->content_encoding_type = http_content_encoding_str2int(encoding_field->field_value, encoding_field->field_value_len);
+ }
- if (half->ref_data->content_encoding == HTTP_CONTENT_ENCODING_NONE)
- {
- struct http_header_field http_hdr = {};
+ struct http_header_field *content_len_field = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_CONTENT_LENGTH, strlen(HTTP_HEADER_FIELD_NAME_CONTENT_LENGTH));
+ if (NULL == content_len_field)
+ {
+ flow_data->header.content_length = -1;
+ }
+ else
+ {
+ flow_data->header.content_length = http_strtoll(content_len_field->field_value, content_len_field->field_value_len);
+ }
- if (http_decoder_table_get_header(half->ref_data->table, (char *)"Content-Encoding", 16, &http_hdr) == 0)
+ struct http_header_field *transf_encoding_field = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_TRANSFER_ENCODING, strlen(HTTP_HEADER_FIELD_NAME_TRANSFER_ENCODING));
+ if (NULL == transf_encoding_field)
{
- half->ref_data->content_encoding = http_content_encoding_str2int(http_hdr.value, http_hdr.value_len);
+ flow_data->transfer_encoding_is_chunked = -1;
}
- }
+ else
+ {
+ if (http_strncasecmp_safe(transf_encoding_field->field_value, "chunked", transf_encoding_field->field_value_len, strlen("chunked")) == 0)
+ {
+ flow_data->transfer_encoding_is_chunked = 1;
+ }
+ else
+ {
+ flow_data->transfer_encoding_is_chunked = 0;
+ }
+ }
+ flow_data->header.header_buf = half->half_stage.first_header_name_ptr;
+ flow_data->header.header_buf_sz = half->half_stage.last_headers_complete_ptr - half->half_stage.first_header_name_ptr;
- if (http->type == HTTP_REQUEST)
- {
- http_decoder_get_host_feed_url(half);
+ // todo, parse other frequently used fields
+ return 0;
}
- return 0;
-}
-
-/* When on_chunk_header is called, the current chunk length is stored
- * in parser->content_length.
- * Possible return values 0, -1, `HPE_PAUSED`
- */
-static int on_chunk_header(llhttp_t *http __attribute__((unused)))
-{
- printf_debug_info("on_chunk_header", NULL, 0);
- return 0;
-}
-
-/* When on_chunk_header is called, the current chunk length is stored
- * in parser->content_length.
- * Possible return values 0, -1, `HPE_PAUSED`
- */
-static int on_chunk_header_complete(llhttp_t *http __attribute__((unused)))
-{
- printf_debug_info("on_chunk_header_complete", NULL, 0);
-
- return 0;
-}
-
-/* Possible return values:
- * 0 - Proceed normally
- * 1 - Assume that request/response has no body, and proceed to parsing the next message
- * 2 - Assume absence of body (as above) and make `llhttp_execute()` return `HPE_PAUSED_UPGRADE`
- * -1 - Error `HPE_PAUSED`
- */
-static int on_headers_complete(llhttp_t *http)
-{
- printf_debug_info("on_headers_complete", NULL, 0);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
- assert(half->ref_data);
-
- http_decoder_table_set_header_complete(half->ref_data->table);
-
- if (half->parser.type == HTTP_REQUEST)
- {
- half->event = HTTP_EVENT_REQ_HDR_END;
- }
- else
+ int http_get_header_field_count(struct http_half_data *flow_data)
{
- half->event = HTTP_EVENT_RES_HDR_END;
+ if (NULL == flow_data->ut_filed_array)
+ {
+ return 0; // some response no header fileds, such as "HTTP/1.1 100 Continue\r\n\r\n"
+ }
+ return utarray_len(flow_data->ut_filed_array);
}
- half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler()
-
- return 0;
-}
-/* Possible return values 0, -1, HPE_USER */
-static int on_body(llhttp_t *http, const char *at, size_t length)
-{
- printf_debug_info("on_body", at, length);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
-
- // trigger body_begin event
- if (half->parser.type == HTTP_REQUEST)
+ void http_flow_body_decompress(struct http_half_data *flow_data, const char *zipdata, size_t zipdatalen)
{
- if (half->event == HTTP_EVENT_REQ_HDR_END)
+ assert(flow_data);
+ if (flow_data->content_encoding_type == HTTP_CONTENT_ENCODING_NONE)
{
- half->event = HTTP_EVENT_REQ_BODY_BEGIN;
- half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler()
+ return;
}
- }
- else
- {
- if (half->event == HTTP_EVENT_RES_HDR_END)
+ if (zipdata == NULL || zipdatalen == 0)
{
- half->event = HTTP_EVENT_RES_BODY_BEGIN;
- half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env);
+ return;
}
- }
-
- if (half->ref_data != NULL)
- {
- if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_BODY) ==
- STRING_STATE_COMMIT)
+ if (NULL == flow_data->decompress)
{
- http_decoder_table_reset(half->ref_data->table, HTTP_ITEM_BODY);
+ flow_data->decompress = http_content_decompress_create(flow_data->content_encoding_type);
}
- http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_BODY, at, length);
- http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_BODY);
- }
+ char *local_outdata = NULL;
+ size_t local_outdata_len = 0;
+ if (http_content_decompress_write(flow_data->decompress, (char *)zipdata,
+ zipdatalen, &local_outdata, &local_outdata_len) == -1)
+ {
+ http_content_decompress_destroy(flow_data->decompress);
+ flow_data->decompress = NULL;
+ flow_data->decompress_body.body = NULL;
+ flow_data->decompress_body.body_sz = 0;
+ return;
+ }
- if (1 == half->decompress_switch && half->ref_data->content_encoding != HTTP_CONTENT_ENCODING_NONE)
- {
- http_decoder_half_data_decompress(half->ref_data);
+ if (local_outdata != NULL && local_outdata_len > 0)
+ {
+ flow_data->decompress_body.body = local_outdata;
+ flow_data->decompress_body.body_sz = local_outdata_len;
+ http_content_decompress_ownership_borrow(flow_data->decompress);
+ }
}
- if (half->parser.type == HTTP_REQUEST)
- {
- half->event = HTTP_EVENT_REQ_BODY_DATA;
- half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler()
- }
- else
+ static void http_add_header_buf_reference(struct http_half *half)
{
- half->event = HTTP_EVENT_RES_BODY_DATA;
- half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env);
- }
-
- return 0;
-}
-
-static void http_decoder_half_init(struct http_decoder_half *half, http_event_cb *http_ev_cb, enum llhttp_type type)
-{
- llhttp_settings_init(&half->settings);
- llhttp_init(&half->parser, type, &half->settings);
-
- // half->is_request_flow = (type == HTTP_REQUEST) ? 1 : 0;
- half->settings.on_message_begin = on_message_begin;
- half->settings.on_message_complete = on_message_complete;
- half->settings.on_reset = on_reset;
-
- half->settings.on_url = on_uri;
- half->settings.on_url_complete = on_uri_complete;
-
- half->settings.on_status = on_status;
- half->settings.on_status_complete = on_status_complete;
-
- half->settings.on_method = on_method;
- half->settings.on_method_complete = on_method_complete;
-
- half->settings.on_version = on_version;
- half->settings.on_version_complete = on_version_complete;
-
- half->settings.on_header_field = on_header_field;
- half->settings.on_header_field_complete = on_header_field_complete;
-
- half->settings.on_header_value = on_header_value;
- half->settings.on_header_value_complete = on_header_value_complete;
-
- half->settings.on_chunk_header = on_chunk_header;
- half->settings.on_chunk_complete = on_chunk_header_complete;
-
- half->settings.on_headers_complete = on_headers_complete;
- half->settings.on_body = on_body;
-
- half->error = HPE_OK;
- half->http_ev_cb = http_ev_cb; // http_event_handler()
- half->ref_data = NULL;
-}
-
-struct http_decoder_half *http_decoder_half_new(struct http_decoder_exdata *hd_ctx, nmx_pool_t *mempool,
- http_event_cb *ev_cb, enum llhttp_type http_type,
- int decompress_switch, struct http_decoder_env *httpd_env, long long start_seq)
-{
- struct http_decoder_half *half = MEMPOOL_CALLOC(mempool, struct http_decoder_half, 1);
- assert(half);
-
- half->decompress_switch = decompress_switch;
- half->http_ev_ctx = MEMPOOL_CALLOC(mempool, struct http_event_context, 1);
- http_decoder_half_init(half, ev_cb, http_type);
- half->http_ev_ctx->ref_httpd_ctx = hd_ctx;
- half->httpd_env = httpd_env;
- half->transaction_seq = start_seq;
- return half;
-}
-
-void http_decoder_half_free(nmx_pool_t *mempool, struct http_decoder_half *half)
-{
- if (NULL == half)
- {
- return;
+ if (half->cached_header_buffer != NULL)
+ {
+ // add reference but not move ownership, because the buffer will be referenced of multiple transaction in pipeline mode
+ half->cached_header_buffer->reference++;
+ half->flow_data->cached_header_buffer = half->cached_header_buffer;
+ }
}
- if (half->http_ev_ctx != NULL)
+ static void http_update_body_offset(struct http_half_data *flow_data)
{
- MEMPOOL_FREE(mempool, half->http_ev_ctx);
- half->http_ev_ctx = NULL;
+ if (flow_data->decompress_body.body != NULL || flow_data->decompress_body.offset > 0)
+ {
+ flow_data->decompress_body.offset += flow_data->decompress_body.body_sz;
+ }
+ else
+ {
+ flow_data->raw_body.offset += flow_data->raw_body.body_sz;
+ }
}
- MEMPOOL_FREE(mempool, half);
-}
-
-void http_decoder_half_reinit(struct http_decoder_half *half,
- struct http_decoder_result_queue *queue,
- nmx_pool_t *mempool, struct session *sess)
-{
- assert(half != NULL);
- if (half->ref_data != NULL)
+ static void http_set_body_finish(struct http_half_data *flow_data)
{
- http_decoder_table_reinit(half->ref_data->table);
+ if (flow_data->decompress_body.offset > 0)
+ {
+ flow_data->decompress_body.body = NULL;
+ flow_data->decompress_body.body_sz = 0;
+ flow_data->decompress_body.is_finished = 1;
+ }
+ else
+ {
+ flow_data->raw_body.body = NULL;
+ flow_data->raw_body.body_sz = 0;
+ flow_data->raw_body.is_finished = 1;
+ }
}
- half->http_ev_ctx->ref_mempool = mempool;
- half->http_ev_ctx->ref_session = sess;
- half->http_ev_ctx->ref_queue = queue;
-}
-static void publish_message_for_parsed_header(struct http_decoder_half *half)
-{
- if (0 == http_decoder_table_has_parsed_header(half->ref_data->table))
- {
- return;
- }
- if (half->parser.type == HTTP_REQUEST)
+ void http_event_handler(struct http_half *half, enum http_event event, struct http *http_env)
{
- half->event = HTTP_EVENT_REQ_HDR;
- }
- else
- {
- half->event = HTTP_EVENT_RES_HDR;
- }
- half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler();
- return;
-}
+ struct http_message *msg = NULL;
+ int thread_id = module_manager_get_thread_id(http_env->mod_mgr_ref);
+ uint8_t flow_flag = 0;
+ struct mq_runtime *mq_rt = module_manager_get_mq_runtime(http_env->mod_mgr_ref);
-int http_decoder_half_parse(int proxy_enable, struct http_decoder_half *half, const char *data, size_t data_len)
-{
- assert(half && data);
-
- half->data = (const char *)data;
- half->data_len = data_len;
- half->error = llhttp_execute(&half->parser, data, data_len);
-
- int ret = 0;
- enum llhttp_type type = HTTP_BOTH;
+ switch (event)
+ {
+ case HTTP_EVENT_REQ_INIT:
+ http_half_stage_reset(half);
+ if (NULL == half->flow_data) /* call llhttp_reset when headers is not completed */
+ {
+ half->flow_data = http_half_data_new(FLOW_TYPE_C2S);
+ }
+ break;
- switch (half->error)
- {
- case HPE_OK:
- break;
- case HPE_PAUSED:
- llhttp_resume(&half->parser);
- break;
- case HPE_PAUSED_UPGRADE:
- if (proxy_enable)
+ case HTTP_EVENT_REQ_HDR_END:
{
- llhttp_resume_after_upgrade(&half->parser);
+ http_half_parse_headers_finally(half);
+ int tot_c2s_headers = http_get_header_field_count(half->flow_data);
+ http_stat_update(&http_env->stat, thread_id, HTTP_TRANSACTION_NEW, 1);
+ http_stat_update(&http_env->stat, thread_id, HTTP_C2S_HEADERS, tot_c2s_headers);
+ http_stat_update(&http_env->stat, thread_id, HTTP_URL_BYTES, half->flow_data->joint_url.iov_len);
+ http_add_header_buf_reference(half);
+ msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_REQ_HEADER, half->flow_data);
+ mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_REQ_HEADER].topic_id, msg);
}
- ret = 0;
break;
- default:
- type = (enum llhttp_type)half->parser.type;
- llhttp_init(&half->parser, type, &half->settings);
- ret = -1;
- break;
- }
-
- if (ret < 0)
- {
- // fprintf(stdout,
- // "llhttp_execute parse error: %s err_reason:%s\n",
- // llhttp_errno_name(half->error), half->parser.reason);
- return half->error;
- }
-
- if (half->ref_data != NULL)
- {
- if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_URI) == STRING_STATE_REFER)
+ case HTTP_EVENT_REQ_BODY_BEGIN:
+ break;
+ case HTTP_EVENT_REQ_BODY_DATA:
{
- http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_URI);
+ http_update_body_offset(half->flow_data);
+ msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_REQ_BODY, half->flow_data);
+ mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_REQ_BODY].topic_id, msg);
}
-
- if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_STATUS) == STRING_STATE_REFER)
+ break;
+ case HTTP_EVENT_REQ_BODY_END:
{
- http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_STATUS);
+ http_set_body_finish(half->flow_data);
+ msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_REQ_BODY, half->flow_data);
+ mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_REQ_BODY].topic_id, msg);
}
-
- if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_METHOD) == STRING_STATE_REFER)
+ break;
+ case HTTP_EVENT_REQ_END:
{
- http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_METHOD);
+ if ((0 == session_is_symmetric(half->sess_ref, &flow_flag) && (SESSION_SEEN_C2S_FLOW == flow_flag)))
+ {
+ http_stat_update(&http_env->stat, thread_id, HTTP_TRANSACTION_FREE, 1);
+ http_stat_update(&http_env->stat, thread_id, HTTP_C2S_ASYMMETRY_TRANSACTION, 1);
+ }
+ msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_REQ_BODY, half->flow_data);
+ mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_REQ_BODY].topic_id, msg);
+ half->flow_data = NULL; // ownership move to message, free it in message free callback
}
+ break;
- if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_VERSION) == STRING_STATE_REFER)
+ case HTTP_EVENT_RES_INIT:
{
- http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_VERSION);
+ http_half_stage_reset(half);
+ if (NULL == half->flow_data) /* call llhttp_reset when headers is not completed */
+ {
+ half->flow_data = http_half_data_new(FLOW_TYPE_S2C);
+ }
}
+ break;
- if (http_decoder_table_header_complete(half->ref_data->table))
+ case HTTP_EVENT_RES_HDR_END:
{
- http_decoder_table_reset_header_complete(half->ref_data->table);
+ http_half_parse_headers_finally(half);
+ int tot_s2c_headers = http_get_header_field_count(half->flow_data);
+ http_stat_update(&http_env->stat, thread_id, HTTP_S2C_HEADERS, tot_s2c_headers);
+ if ((0 == session_is_symmetric(half->sess_ref, &flow_flag) && (SESSION_SEEN_S2C_FLOW == flow_flag)))
+ {
+ http_stat_update(&http_env->stat, thread_id, HTTP_TRANSACTION_NEW, 1);
+ }
+ http_add_header_buf_reference(half);
+ msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_RES_HEADER, half->flow_data);
+ mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_RES_HEADER].topic_id, msg);
}
- else
+ break;
+ case HTTP_EVENT_RES_BODY_BEGIN:
+ break;
+ case HTTP_EVENT_RES_BODY_DATA:
{
- // if headers are not completed with EOF \r\n\r\n, push the parsed headers so far
- publish_message_for_parsed_header(half);
+ http_update_body_offset(half->flow_data);
+ msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_RES_BODY, half->flow_data);
+ mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_RES_BODY].topic_id, msg);
}
-
- enum string_state hdr_key_state =
- http_decoder_table_state(half->ref_data->table, HTTP_ITEM_HDRKEY);
- enum string_state hdr_val_state =
- http_decoder_table_state(half->ref_data->table, HTTP_ITEM_HDRVAL);
-
- /* Truncated in http header key
- For example http header k-v => User-Agent: Chrome
- case1:
- packet1: User- hdr_key_state == STRING_STATE_REFER
- packet2: Agent: Chrome
-
- case2:
- packet1: User-Agent: hdr_key_state == STRING_STATE_COMMIT
- hdr_val_state == STRING_STATE_INIT
- packet2: Chrome
- */
- if (hdr_key_state == STRING_STATE_REFER ||
- (hdr_key_state == STRING_STATE_COMMIT && hdr_val_state == STRING_STATE_INIT))
+ break;
+ case HTTP_EVENT_RES_BODY_END:
{
- http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_HDRKEY);
+ http_set_body_finish(half->flow_data);
+ msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_RES_BODY, half->flow_data);
+ mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_RES_BODY].topic_id, msg);
}
-
- /* Truncated in http header value
- For example http header k-v => User-Agent: Chrome
- packet1: User-Agent: Ch hdr_key_state == STRING_STATE_COMMIT
- hdr_val_state == STRING_STATE_REFER
-
- packet2: rome
- */
- if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_HDRVAL) == STRING_STATE_REFER)
+ break;
+ case HTTP_EVENT_RES_END:
{
- /* Header key should have been committed
- If it's not cached, cache it for next packet to use
- */
- http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_HDRKEY);
- http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_HDRVAL);
+ if ((0 == session_is_symmetric(half->sess_ref, &flow_flag) && (SESSION_SEEN_S2C_FLOW == flow_flag)))
+ {
+ http_stat_update(&http_env->stat, thread_id, HTTP_S2C_ASYMMETRY_TRANSACTION, 1);
+ }
+ http_stat_update(&http_env->stat, thread_id, HTTP_TRANSACTION_FREE, 1);
+ msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_RES_BODY, half->flow_data);
+ mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_RES_BODY].topic_id, msg);
+ half->flow_data = NULL; // ownership move to message, free it in message free callback
}
-
- if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_BODY) == STRING_STATE_REFER)
- {
- http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_BODY);
+ break;
+ default:
+ assert(0);
+ break;
}
}
- return 0;
-}
-
-long long http_decoder_half_trans_count(struct http_decoder_half *half)
-{
- if (NULL == half)
- {
- return 0;
- }
-
- long long trans_cnt = half->trans_counter;
- half->trans_counter = 0;
-
- return trans_cnt;
-}
-
-struct http_decoder_half_data *
-http_decoder_half_data_new(nmx_pool_t *mempool)
-{
- struct http_decoder_half_data *data =
- MEMPOOL_CALLOC(mempool, struct http_decoder_half_data, 1);
- assert(data);
-
- data->table = http_decoder_table_new(mempool);
- assert(data->table);
-
- data->major_version = -1;
- data->minor_version = -1;
- data->status_code = -1;
-
- data->content_encoding = HTTP_CONTENT_ENCODING_NONE;
- // data->ref_decompress_body = NULL;
- // data->decompress_body_len = 0;
- data->decompress_buffer_list = NULL;
- return data;
-}
-
-static void http_decoder_half_decompress_buf_free(struct http_decoder_half_data *ref_data)
-{
- if (ref_data == NULL)
+ static void http_half_stage_buf_free(struct http_half *half)
{
- return;
- }
- struct http_decompress_buffer *el, *tmp;
- DL_FOREACH_SAFE(ref_data->decompress_buffer_list, el, tmp)
- {
- DL_DELETE(ref_data->decompress_buffer_list, el);
- if (el->iov.iov_base != NULL)
+ if (half->cached_header_buffer != NULL)
{
- FREE(el->iov.iov_base);
+ http_buffer_free(half->cached_header_buffer);
}
- FREE(el);
- }
- ref_data->decompress_buffer_list = NULL;
-}
-
-void http_decoder_half_data_free(nmx_pool_t *mempool, struct http_decoder_half_data *data)
-{
- if (NULL == data)
- {
- return;
- }
-
- if (data->table != NULL)
- {
- http_decoder_table_free(data->table);
- data->table = NULL;
+ half->cached_header_buffer = NULL;
}
- if (data->decompress != NULL)
+ static void http_half_stage_buf_reuse(struct http_half *half)
{
- http_content_decompress_destroy(data->decompress);
- data->decompress = NULL;
+ http_half_stage_buf_free(half);
+ half->cached_header_buffer = http_buffer_new();
}
- if (data->joint_url.iov_base)
- {
- MEMPOOL_FREE(mempool, data->joint_url.iov_base);
- data->joint_url.iov_base = NULL;
- data->joint_url_complete = 0;
- }
- http_decoder_half_decompress_buf_free(data);
- MEMPOOL_FREE(mempool, data);
-}
-
-int http_decoder_half_data_get_request_line(struct http_decoder_half_data *data,
- struct http_request_line *line)
-{
- http_decoder_table_get_method(data->table, &line->method, &line->method_len);
- http_decoder_table_get_uri(data->table, &line->uri, &line->uri_len);
- http_decoder_table_get_version(data->table, &line->version, &line->version_len);
-
- line->major_version = data->major_version;
- line->minor_version = data->minor_version;
-
- return 0;
-}
-
-int http_decoder_half_data_get_response_line(struct http_decoder_half_data *data,
- struct http_response_line *line)
-{
- http_decoder_table_get_version(data->table, &line->version, &line->version_len);
- http_decoder_table_get_status(data->table, &line->status, &line->status_len);
-
- line->major_version = data->major_version;
- line->minor_version = data->minor_version;
- line->status_code = data->status_code;
-
- return 0;
-}
-
-int http_decoder_half_data_get_header(const struct http_decoder_half_data *data,
- const char *name, size_t name_len,
- struct http_header_field *hdr_result)
-{
- return http_decoder_table_get_header(data->table, name, name_len, hdr_result);
-}
-
-int http_decoder_half_data_iter_header(struct http_decoder_half_data *data,
- struct http_header_field *header)
-{
- return http_decoder_table_iter_header((struct http_decoder_table *)data->table, header);
-}
-
-int http_decoder_half_data_reset_header_iter(struct http_decoder_half_data *req_data)
-{
- if (NULL == req_data)
- {
- return -1;
- }
- return http_decoder_table_reset_header_iter(req_data->table);
-}
-
-int http_decoder_half_data_has_parsed_header(struct http_decoder_half_data *data)
-{
- if (NULL == data)
+ static int http_half_stage_buf_append(struct http_half *half, const char *newdata, size_t newdata_len)
{
+ if (half->cached_header_buffer == NULL)
+ {
+ half->cached_header_buffer = http_buffer_new();
+ }
+ if (half->cached_header_buffer->buffer_size > HTTP_HEADERS_CACHE_MAX_SIZE)
+ {
+ return -1;
+ }
+ http_buffer_add(half->cached_header_buffer, newdata, newdata_len);
return 0;
}
- return http_decoder_table_has_parsed_header(data->table);
-}
-
-int http_decoder_half_data_get_raw_body(const struct http_decoder_half_data *data, const char **body, size_t *body_len)
-{
- if (NULL == data || NULL == body)
- {
- return -1;
- }
- return http_decoder_table_get_body(data->table, (char **)body, body_len);
-}
-#if 0
-int http_decoder_half_data_get_decompress_body(const struct http_decoder_half_data *data, hstring *body)
-{
- if (HTTP_CONTENT_ENCODING_NONE == data->content_encoding)
- {
- return http_decoder_table_get_body(data->table, body);
- }
-
- body->iov_base = data->ref_decompress_body;
- body->iov_len = data->decompress_body_len;
- return 0;
-}
-#endif
-
-void http_decoder_half_data_dump(struct http_decoder_half *half)
-{
- if (NULL == half || NULL == half->ref_data)
- {
- return;
- }
-
- http_decoder_table_dump(half->ref_data->table);
-}
-
-static void using_session_addr_as_host(struct session *ref_session, struct http_header_field *host_result, nmx_pool_t *mempool)
-{
-#if 1 // in native steallar, can't get the tuple4 from the session yet!!!
- struct httpd_session_addr ssaddr = {};
- httpd_session_get_addr(ref_session, &ssaddr);
- if (ssaddr.ipver != 4 && ssaddr.ipver != 6)
- {
- host_result->value = MEMPOOL_CALLOC(mempool, char, 1);
- sprintf((char *)host_result->value, "%s", "");
- host_result->value_len = strlen((char *)host_result->value);
- return;
- }
-
- char ip_string_buf[INET6_ADDRSTRLEN];
- if (4 == ssaddr.ipver)
- {
- host_result->value = MEMPOOL_CALLOC(mempool, char, (INET_ADDRSTRLEN + 7) /* "ip:port" max length */);
- inet_ntop(AF_INET, &ssaddr.daddr4, ip_string_buf, INET_ADDRSTRLEN);
- sprintf((char *)host_result->value, "%s:%u", ip_string_buf, ntohs(ssaddr.dport));
- host_result->value_len = strlen((char *)host_result->value);
- }
- else if (6 == ssaddr.ipver)
- {
- host_result->value = MEMPOOL_CALLOC(mempool, char, (INET6_ADDRSTRLEN + 7) /* "ip:port" max length */);
- inet_ntop(AF_INET6, &ssaddr.daddr6, ip_string_buf, INET6_ADDRSTRLEN);
- sprintf((char *)host_result->value, "%s:%u", ip_string_buf, ntohs(ssaddr.dport));
- host_result->value_len = strlen((char *)host_result->value);
- }
- else
- {
- assert(0);
- }
-#else
- host_result->val.iov_base = MEMPOOL_CALLOC(mempool, char, 32);
- sprintf((char *)host_result->val.iov_base, "%s", "todo:get_tuple4");
- host_result->val.iov_len = strlen((char *)host_result->val.iov_base);
-#endif
-}
-void http_decoder_join_url(struct http_decoder_half_data *hfdata, nmx_pool_t *mempool, const struct http_header_field *host_hdr)
-{
- int append_slash_len = 0;
- if ('/' != ((char *)hfdata->joint_url.iov_base)[0])
+ static int http_half_cache_uncompleted_header(struct http_half *half, int mem_merged, const char *data, size_t data_len)
{
- append_slash_len = 1;
- }
- int url_cache_str_len = host_hdr->value_len + hfdata->joint_url.iov_len + append_slash_len;
- char *url_cache_str = MEMPOOL_CALLOC(mempool, char, url_cache_str_len);
+ int ret = 0;
+ const char *cached_begin_ptr = NULL;
+ long long cached_len = 0;
+ if (half->half_stage.last_headers_complete_ptr == NULL)
+ {
+ if (mem_merged)
+ {
+ ret = 0; // no pipeline and memory have been merged, do nothing
+ }
+ else
+ {
+ ret = http_half_stage_buf_append(half, data, (size_t)data_len);
+ }
+ }
+ else
+ {
+ /*
+ has pipeline, cache begin ptr is the last completed header_field_value + "\r\n\r\n",
+ for example:
+ GET xxx \r\nheader_name:header:value\r\nheader_name:header:value\r\n\r\nGET yyy ...
+ ^
+ |
+ cache begin ptr
+ */
+ cached_begin_ptr = half->half_stage.last_headers_complete_ptr;
+ cached_len = (long long)data_len - (long long)(cached_begin_ptr - data);
+ assert(cached_len > 0);
- char *ptr = url_cache_str;
- memcpy(ptr, host_hdr->value, host_hdr->value_len);
- ptr += host_hdr->value_len;
- if (append_slash_len)
- {
- *ptr = '/';
- ptr++;
+ if (mem_merged)
+ {
+ /* some message have beed pushed, and pointer to half->half_stage->half_buffer,
+ the half->half_stage->half_buffer is maintain by messages ,
+ so, we can reset the half->half_stage->half_buffer, and append the new data to it.
+ */
+ http_half_stage_buf_reuse(half);
+ ret = http_half_stage_buf_append(half, cached_begin_ptr, (size_t)cached_len);
+ }
+ else
+ {
+ ret = http_half_stage_buf_append(half, cached_begin_ptr, (size_t)cached_len);
+ }
+ }
+ return ret;
}
- memcpy(ptr, hfdata->joint_url.iov_base, hfdata->joint_url.iov_len);
- MEMPOOL_FREE(mempool, hfdata->joint_url.iov_base); // free the cached uri buffer
- hfdata->joint_url.iov_base = url_cache_str;
- hfdata->joint_url.iov_len = url_cache_str_len;
- hfdata->joint_url_complete = 1;
-}
-
-void http_decoder_get_url(struct http_decoder_half_data *hfdata, nmx_pool_t *mempool)
-{
- struct http_request_line reqline = {};
- http_decoder_half_data_get_request_line(hfdata, &reqline);
- if (unlikely(http_strncasecmp_safe("CONNECT", (char *)reqline.method, 7, reqline.method_len) == 0))
+ int http_flow_parse(struct http_half *half, const char *data, size_t data_len)
{
- hfdata->joint_url.iov_base = MEMPOOL_CALLOC(mempool, char, reqline.uri_len + 1);
- memcpy(hfdata->joint_url.iov_base, reqline.uri, reqline.uri_len);
- hfdata->joint_url.iov_len = reqline.uri_len;
- hfdata->joint_url_complete = 1;
+ assert(half && data);
+ llhttp_errno_t ret = HPE_OK /* HPE_OK = 0 */;
+ ret = llhttp_execute(&half->parser.llhttp_parser, data, data_len);
+ switch (ret)
+ {
+ case HPE_OK:
+ break;
+ case HPE_PAUSED:
+ llhttp_resume(&half->parser.llhttp_parser);
+ break;
+ case HPE_PAUSED_UPGRADE:
+ llhttp_resume_after_upgrade(&half->parser.llhttp_parser);
+ break;
+ default:
+ break;
+ }
+ return ret;
}
-}
-int http_decoder_join_url_finally(struct http_event_context *ev_ctx, struct http_decoder_half_data *hfdata, nmx_pool_t *mempool)
-{
- if (hfdata->joint_url_complete)
+ struct http_half *http_flow_get_nx(struct http_exdata *exdata, enum flow_type flow_dir, struct session *sess, struct http *http_env)
{
- return 0;
- }
- struct http_header_field addr_as_host = {};
- using_session_addr_as_host(ev_ctx->ref_session, &addr_as_host, mempool);
- http_decoder_join_url(hfdata, mempool, &addr_as_host);
- MEMPOOL_FREE(mempool, addr_as_host.value); // free session addr to host buffer
- return 1;
-}
+ struct http_half **flow = NULL;
-void http_decoder_get_host_feed_url(struct http_decoder_half *half)
-{
- if (half->ref_data->joint_url_complete)
- {
- return;
- }
- struct http_header_field host_result = {};
- int host_header_cnt = http_decoder_half_data_get_header(half->ref_data, (char *)"Host", 4, &host_result);
- if (host_header_cnt < 0)
- {
- return;
+ if (FLOW_TYPE_C2S == flow_dir)
+ {
+ flow = &exdata->decoder->flow_c2s;
+ }
+ else if (FLOW_TYPE_S2C == flow_dir)
+ {
+ flow = &exdata->decoder->flow_s2c;
+ }
+ else
+ {
+ assert(0);
+ return NULL;
+ }
+ if (NULL == *flow)
+ {
+ *flow = http_half_new(flow_dir);
+ (*flow)->http_env_ref = http_env;
+ (*flow)->sess_ref = sess;
+ }
+ return *flow;
}
- http_decoder_join_url(half->ref_data, half->http_ev_ctx->ref_mempool, &host_result);
-}
-int http_half_data_get_url(struct http_decoder_half_data *res_data, const char **url_val, size_t *url_len)
-{
- if (0 == res_data->joint_url_complete)
- {
- return -1;
- }
- *url_val = res_data->joint_url.iov_base;
- *url_len = res_data->joint_url.iov_len;
- return 0;
-}
-#if 0
-int http_half_data_get_decode_url(struct http_decoder_half_data *res_data, hstring *url)
-{
- if (0 == res_data->joint_url_complete)
+ int http_half_flow_process(struct http_half *half, const char *newdata, size_t newdata_len)
{
- return -1;
- }
- url->iov_base = res_data->decoded_url.iov_base;
- url->iov_len = res_data->decoded_url.iov_len;
- return 0;
-}
-#endif
-
-int http_half_data_get_transaction_seq(struct http_decoder_half_data *hf_data)
-{
- return hf_data->transaction_index;
-}
-
-void http_half_data_update_commit_index(struct http_decoder_half_data *half_data)
-{
- http_decoder_table_update_commit_index(half_data->table);
-}
+ int ret = 0;
+ int mem_merged = 0;
+ const char *parse_data = newdata;
+ size_t parse_data_len = newdata_len;
-int http_half_data_get_total_parsed_header_count(struct http_decoder_half_data *half_data)
-{
- return http_decoder_table_get_total_parsed_header(half_data->table);
-}
-
-void http_half_pre_context_free(struct session *sess, struct http_decoder_exdata *exdata)
-{
- struct http_message *msg = NULL;
- struct http_decoder_half_data *res_data = NULL;
- struct http_decoder_result_queue *queue = NULL;
-
- if (exdata)
- {
- queue = exdata->queue;
- for (size_t i = 0; i < queue->queue_size; i++)
+ /* merge cached uncompleted header and current tcp payload first */
+ if (half->cached_header_buffer != NULL && half->cached_header_buffer->buffer != NULL)
{
- struct http_decoder_half_data *req_data = queue->array[i].req_data;
- res_data = queue->array[i].res_data;
- if ((req_data != NULL) && (NULL == res_data) && (req_data->state < HTTP_EVENT_REQ_END))
+ http_half_stage_buf_append(half, newdata, newdata_len);
+ parse_data = half->cached_header_buffer->buffer;
+ parse_data_len = half->cached_header_buffer->buffer_size;
+ mem_merged = 1;
+ if (parse_data_len > HTTP_HEADERS_CACHE_MAX_SIZE)
{
- msg = http_message_new(HTTP_TRANSACTION_END, queue, i, HTTP_REQUEST);
- session_mq_publish_message(sess, exdata->pub_topic_id, msg);
+ STELLAR_LOG_FATAL(half->http_env_ref->logger_ref, HTTP_MODULE_NAME,
+ "sess: %s, headers buf len more than %d", session_get_readable_addr(half->sess_ref), HTTP_HEADERS_CACHE_MAX_SIZE);
+ return -1;
}
}
+ ret = http_flow_parse(half, parse_data, parse_data_len);
+ if (ret != HPE_OK)
+ {
+ STELLAR_LOG_FATAL(half->http_env_ref->logger_ref, HTTP_MODULE_NAME,
+ "llhttp parser error: %d, %s. sess: %s", ret, llhttp_errno_name(ret), session_get_readable_addr(half->sess_ref));
+ return -1;
+ }
- for (size_t i = 0; i < queue->queue_size; i++)
+ if (half->half_stage.llhttp_last_stage >= LLHTTP_STAGE_MESSAGE_BEGIN &&
+ half->half_stage.llhttp_last_stage < LLHTTP_STAGE_HEADERS_COMPLETE) /* some headers not completed */
{
- res_data = queue->array[i].res_data;
- if ((res_data != NULL) && (res_data->state < HTTP_EVENT_RES_END))
+ STELLAR_LOG_DEBUG(half->http_env_ref->logger_ref, HTTP_MODULE_NAME,
+ "sess: %s, header not completed, need caching, %.*s",
+ session_get_readable_addr(half->sess_ref), parse_data_len > 16 ? 16 : parse_data_len, parse_data);
+ if (http_half_cache_uncompleted_header(half, mem_merged, parse_data, parse_data_len) < 0)
{
- msg = http_message_new(HTTP_TRANSACTION_END, queue, i, HTTP_RESPONSE);
- session_mq_publish_message(sess, exdata->pub_topic_id, msg);
+ STELLAR_LOG_FATAL(half->http_env_ref->logger_ref, HTTP_MODULE_NAME,
+ "sess: %s, headers buf len more than %d", session_get_readable_addr(half->sess_ref), HTTP_HEADERS_CACHE_MAX_SIZE);
+ return -1;
}
+
+ /* uncompleted_header, reset llhttp state and all headers */
+ http_half_stage_reset(half);
+ llhttp_reset(&half->parser.llhttp_parser);
+ return 0;
+ }
+ else
+ {
+ /* ownership is maintained by header message, free it here (reference-- actually) */
+ http_half_stage_buf_free(half);
}
+ return 0;
}
-}
-void http_half_update_state(struct http_decoder_half_data *hf_data, enum http_event state)
-{
- hf_data->state = state;
+#ifdef __cplusplus
}
-
-void http_half_get_max_transaction_seq(struct http_decoder_exdata *exdata, long long *max_req_seq, long long *max_res_seq)
-{
- assert(exdata && max_req_seq && max_res_seq);
- *max_req_seq = exdata->decoder->c2s_half->transaction_seq;
- *max_res_seq = exdata->decoder->s2c_half->transaction_seq;
-}
-
-enum http_content_encoding http_half_data_get_content_encoding(struct http_decoder_half_data *hf_data)
-{
- if (NULL == hf_data)
- {
- return HTTP_CONTENT_ENCODING_NONE;
- }
- return hf_data->content_encoding;
-} \ No newline at end of file
+#endif
diff --git a/decoders/http/http_decoder_half.h b/decoders/http/http_decoder_half.h
index f525f78..6daf32c 100644
--- a/decoders/http/http_decoder_half.h
+++ b/decoders/http/http_decoder_half.h
@@ -1,109 +1,172 @@
#pragma once
+#include <stdint.h>
#include <stddef.h>
#include "stellar/session.h"
#include "stellar/http.h"
-#include "http_content_decompress.h"
-#include "http_decoder_result_queue.h"
-#include <llhttp.h>
+#include "llhttp.h"
+#include "uthash/utarray.h"
+#include "http_decoder_decompress.h"
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+#define HTTP_HEADER_FIELD_RESERVE_NUM (16)
+
+#define HTTP_HEADER_FIELD_NAME_HOST "Host"
+#define HTTP_HEADER_FIELD_NAME_USER_AGENT "User-Agent"
+#define HTTP_HEADER_FIELD_NAME_REFERER "Referer"
+#define HTTP_HEADER_FIELD_NAME_COOKIE "Cookie"
+#define HTTP_HEADER_FIELD_NAME_SET_COOKIE "Set-Cookie"
+#define HTTP_HEADER_FIELD_NAME_CONTENT_TYPE "Content-Type"
+#define HTTP_HEADER_FIELD_NAME_CONTENT_LENGTH "Content-Length"
+#define HTTP_HEADER_FIELD_NAME_CONTENT_ENCODING "Content-Encoding"
+#define HTTP_HEADER_FIELD_NAME_TRANSFER_ENCODING "Transfer-Encoding"
+
+ enum http_event
+ {
+ __HTTP_EVENT_RESERVED = 0,
+ HTTP_EVENT_REQ_INIT = 1 << 1,
+ HTTP_EVENT_REQ_LINE = 1 << 2,
+ HTTP_EVENT_REQ_HDR = 1 << 3,
+ HTTP_EVENT_REQ_HDR_END = 1 << 4,
+ HTTP_EVENT_REQ_BODY_BEGIN = 1 << 5,
+ HTTP_EVENT_REQ_BODY_DATA = 1 << 6,
+ HTTP_EVENT_REQ_BODY_END = 1 << 7,
+ HTTP_EVENT_REQ_END = 1 << 8,
+
+ HTTP_EVENT_RES_INIT = 1 << 9,
+ HTTP_EVENT_RES_LINE = 1 << 10,
+ HTTP_EVENT_RES_HDR = 1 << 11,
+ HTTP_EVENT_RES_HDR_END = 1 << 12,
+ HTTP_EVENT_RES_BODY_BEGIN = 1 << 13,
+ HTTP_EVENT_RES_BODY_DATA = 1 << 14,
+ HTTP_EVENT_RES_BODY_END = 1 << 15,
+ HTTP_EVENT_RES_END = 1 << 16,
+ };
+
+ struct http_body
+ {
+ char *body;
+ size_t body_sz;
+ size_t offset; // accumulated
+ int is_finished;
+ };
#ifndef hstring
#include <bits/types/struct_iovec.h>
-typedef struct iovec hstring;
+ typedef struct iovec hstring;
#endif
-// only one http event is fired at a time
-enum http_event
-{
- HTTP_EVENT_REQ_INIT = 1 << 1,
- HTTP_EVENT_REQ_LINE = 1 << 2,
- HTTP_EVENT_REQ_HDR = 1 << 3,
- HTTP_EVENT_REQ_HDR_END = 1 << 4,
- HTTP_EVENT_REQ_BODY_BEGIN = 1 << 5,
- HTTP_EVENT_REQ_BODY_DATA = 1 << 6,
- HTTP_EVENT_REQ_BODY_END = 1 << 7,
- HTTP_EVENT_REQ_END = 1 << 8,
-
- HTTP_EVENT_RES_INIT = 1 << 9,
- HTTP_EVENT_RES_LINE = 1 << 10,
- HTTP_EVENT_RES_HDR = 1 << 11,
- HTTP_EVENT_RES_HDR_END = 1 << 12,
- HTTP_EVENT_RES_BODY_BEGIN = 1 << 13,
- HTTP_EVENT_RES_BODY_DATA = 1 << 14,
- HTTP_EVENT_RES_BODY_END = 1 << 15,
- HTTP_EVENT_RES_END = 1 << 16,
-};
-
-struct http_event_context
-{
- struct http_decoder_exdata *ref_httpd_ctx;
- nmx_pool_t *ref_mempool;
- struct session *ref_session;
- struct http_decoder_result_queue *ref_queue;
-};
-
-struct http_decoder_half;
-struct http_decoder_half_data;
-struct http_decoder_env;
-
-typedef void http_event_cb(enum http_event event, struct http_decoder_half_data **data,
- struct http_event_context *ev_ctx, void *httpd_plugin_env);
-
-struct http_decoder_half *
-http_decoder_half_new(struct http_decoder_exdata *hd_ctx, nmx_pool_t *mempool, http_event_cb *event_cb,
- enum llhttp_type http_type, int decompress_switch, struct http_decoder_env *httpd_env, long long start_seq);
-
-void http_decoder_half_free(nmx_pool_t *mempool, struct http_decoder_half *half);
-
-void http_decoder_half_reinit(struct http_decoder_half *half,
- struct http_decoder_result_queue *queue,
- nmx_pool_t *mempool, struct session *sess);
-
-int http_decoder_half_parse(int proxy_enable, struct http_decoder_half *half, const char *data, size_t data_len);
-
-long long http_decoder_half_trans_count(struct http_decoder_half *half);
-
-// http decoder half data API
-struct http_decoder_half_data *
-http_decoder_half_data_new(nmx_pool_t *mempool);
-
-void http_decoder_half_data_free(nmx_pool_t *mempool, struct http_decoder_half_data *data);
-
-int http_decoder_half_data_get_request_line(struct http_decoder_half_data *data,
- struct http_request_line *line);
-
-int http_decoder_half_data_get_response_line(struct http_decoder_half_data *data,
- struct http_response_line *line);
-
-int http_decoder_half_data_get_header(const struct http_decoder_half_data *data,
- const char *name, size_t name_len, struct http_header_field *hdr_res);
-
-int http_decoder_half_data_iter_header(struct http_decoder_half_data *data,
- struct http_header_field *header);
-int http_decoder_half_data_reset_header_iter(struct http_decoder_half_data *req_data);
-int http_decoder_half_data_has_parsed_header(struct http_decoder_half_data *data);
-
-int http_decoder_half_data_get_raw_body(const struct http_decoder_half_data *data, const char **body, size_t *body_len);
-
-int http_decoder_half_data_get_decompress_body(const struct http_decoder_half_data *data, const char **body, size_t *body_len);
-void http_half_get_lastest_decompress_buffer(struct http_decoder_half_data *data, hstring *decompress_body);
-void http_half_decompress_buffer_free(struct http_decoder_half_data *data, hstring *decompress_body);
-void http_decoder_half_data_dump(struct http_decoder_half *half);
-
-void http_decoder_get_host_feed_url(struct http_decoder_half *half);
-void http_decoder_get_url(struct http_decoder_half_data *hfdata, nmx_pool_t *mempool);
-int http_half_data_get_decode_url(struct http_decoder_half_data *res_data, hstring *url);
-void http_decoder_join_url(struct http_decoder_half_data *hfdata,
- nmx_pool_t *mempool,
- const struct http_header_field *host_hdr);
-int http_decoder_join_url_finally(struct http_event_context *ev_ctx,
- struct http_decoder_half_data *hfdata,
- nmx_pool_t *mempool);
-int http_half_data_get_url(struct http_decoder_half_data *res_data, const char **url_val, size_t *url_len);
-int http_half_data_get_transaction_seq(struct http_decoder_half_data *hf_data);
-
-void http_half_data_update_commit_index(struct http_decoder_half_data *half_data);
-void http_half_pre_context_free(struct session *sess, struct http_decoder_exdata *exdata);
-void http_half_update_state(struct http_decoder_half_data *hf_data, enum http_event state);
-int http_half_data_get_total_parsed_header_count(struct http_decoder_half_data *half_data);
-void http_half_get_max_transaction_seq(struct http_decoder_exdata *exdata, long long *max_req_seq, long long *max_res_seq);
-enum http_content_encoding http_half_data_get_content_encoding(struct http_decoder_half_data *hf_data); \ No newline at end of file
+ struct http_half_data
+ {
+ enum flow_type flow_dir;
+ uint32_t transaction_seq; // seq of this half flow, is last http_lalf->transaction_num value
+ union
+ {
+ struct http_request_line req_line;
+ struct http_status_line status_line;
+ };
+
+ /* headers */
+ struct http_buffer *cached_header_buffer; // maybe null
+ hstring joint_url; // malloc, need be free
+ enum http_content_encoding content_encoding_type;
+ int transfer_encoding_is_chunked; // -1: not set, 0: false, 1: true
+ UT_array *ut_filed_array; // inner struct, need to transform to header->field_array
+ struct http_header header;
+
+ /* body */
+ struct http_body raw_body;
+ struct http_content_decompress *decompress;
+ struct http_body decompress_body;
+ };
+
+ struct http_half_parser
+ {
+ llhttp_t llhttp_parser;
+ llhttp_settings_t settings;
+ struct http_half *half_ref; // used in llhttp callback context to get flow
+ };
+
+ enum http_stage
+ {
+ HTTP_STAGE_INIT = 0,
+ HTTP_STAGE_PENDING = 1, /* body without Content-Length, no Chunk-Encoding */
+ HTTP_STAGE_HEADER_PARTIAL = 2,
+ HTTP_STAGE_BODY = 3,
+ };
+
+ struct http_stage_shaper
+ {
+ enum http_stage stage;
+ char *data;
+ size_t data_len;
+ long long remain_content_length;
+ const char *headers_start; // the first header field name
+ const char *headers_end; // the last char of \r\n\r\n
+ struct http_buffer *headers_cache; /* ownership move to struct http_decoder_half_data when headers completed */
+ };
+
+ struct http_half_buffer
+ {
+ int is_malloc; // need free
+ int ref_count; // +1 when push a new message
+ char *buffer;
+ size_t buffer_size;
+ };
+
+ enum http_half_llhttp_stage_type
+ {
+ LLHTTP_STAGE_MESSAGE_BEGIN = 0,
+ LLHTTP_STAGE_URI,
+ LLHTTP_STAGE_METHOD,
+ LLHTTP_STAGE_STATUS,
+ LLHTTP_STAGE_VERSION,
+ LLHTTP_STAGE_HEADER_FIELD,
+ LLHTTP_STAGE_HEADER_FIELD_COMPLETE,
+ LLHTTP_STAGE_HEADER_VALUE,
+ LLHTTP_STAGE_HEADER_VALUE_COMPLETE,
+ LLHTTP_STAGE_HEADERS_COMPLETE,
+ LLHTTP_STAGE_BODY,
+ LLHTTP_STAGE_MESSAGE_COMPLETE,
+ __LLHTTP_STAGE_MAX,
+ };
+
+ struct http_half_llhttp_stage
+ {
+ const char *first_header_name_ptr;
+ const char *last_header_value_complete_ptr; /* at + length + 4 \r\n\r\n) */
+ const char *last_headers_complete_ptr;
+ enum http_half_llhttp_stage_type llhttp_last_stage;
+ uint8_t llhttp_cb_count[__LLHTTP_STAGE_MAX];
+ };
+
+ struct http_half
+ {
+ struct session *sess_ref;
+ struct http *http_env_ref;
+
+ uint32_t transaction_num; // accumulated of all flows in this session
+ enum http_event event;
+ // struct http_stage_shaper shaper;
+
+ struct http_half_parser parser;
+ struct http_half_data *flow_data; // malloc when every transaction start, ownership move to message when message completed
+
+ struct http_buffer *cached_header_buffer;
+ struct http_half_llhttp_stage half_stage;
+ };
+ void http_half_free(struct http_half *half);
+ void http_half_data_free(struct http_half_data *half_data);
+ void http_flow_parser_init(struct http_half_parser *flow_parser, enum llhttp_type type);
+ void http_flow_body_decompress(struct http_half_data *flow_data, const char *zipdata, size_t zipdatalen);
+ void http_event_handler(struct http_half *half, enum http_event event, struct http *httpd_env);
+ int http_flow_stage_shaping(struct http_half *half, const char *newdata, size_t newdata_len);
+ int http_half_flow_process(struct http_half *half, const char *newdata, size_t newdata_len);
+ int http_get_header_field_count(struct http_half_data *flow_data);
+ void http_flow_append_header_filed(struct http_half_data *flow_data, const char *at, size_t length);
+ void http_flow_append_header_value(struct http_half_data *flow_data, const char *at, size_t length);
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file
diff --git a/decoders/http/http_decoder_llhttp_wrap.c b/decoders/http/http_decoder_llhttp_wrap.c
new file mode 100644
index 0000000..b025311
--- /dev/null
+++ b/decoders/http/http_decoder_llhttp_wrap.c
@@ -0,0 +1,318 @@
+#include <assert.h>
+#include <stdio.h>
+#include <string.h>
+#include <arpa/inet.h>
+#include "stellar/utils.h"
+#include "http_decoder.h"
+#include "llhttp.h"
+#include "http_decoder_half.h"
+
+/* Possible return values 0, -1, `HPE_PAUSED` */
+static int on_message_begin(llhttp_t *http)
+{
+ struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser);
+ struct http_half *half = parser->half_ref;
+ half->half_stage.llhttp_last_stage = LLHTTP_STAGE_MESSAGE_BEGIN;
+ half->half_stage.llhttp_cb_count[LLHTTP_STAGE_MESSAGE_BEGIN]++;
+ if (http->type == HTTP_REQUEST)
+ {
+ half->event = HTTP_EVENT_REQ_INIT;
+ }
+ else
+ {
+ half->event = HTTP_EVENT_RES_INIT;
+ }
+ http_event_handler(parser->half_ref, half->event, parser->half_ref->http_env_ref);
+ return 0;
+}
+
+static int on_message_complete(llhttp_t *http)
+{
+ struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser);
+ struct http_half *half = parser->half_ref;
+ half->half_stage.llhttp_last_stage = LLHTTP_STAGE_MESSAGE_COMPLETE;
+ half->half_stage.llhttp_cb_count[LLHTTP_STAGE_MESSAGE_COMPLETE]++;
+ if (http->type == HTTP_REQUEST)
+ {
+ if (half->event == HTTP_EVENT_REQ_BODY_DATA)
+ {
+ half->event = HTTP_EVENT_REQ_BODY_END;
+ http_event_handler(parser->half_ref, half->event, parser->half_ref->http_env_ref);
+ }
+ }
+ else
+ {
+ if (half->event == HTTP_EVENT_RES_BODY_DATA)
+ {
+ half->event = HTTP_EVENT_RES_BODY_END;
+ http_event_handler(parser->half_ref, half->event, parser->half_ref->http_env_ref);
+ }
+ }
+
+ // trigger req_end/res_end
+ if (http->type == HTTP_REQUEST)
+ {
+ half->event = HTTP_EVENT_REQ_END;
+ }
+ else
+ {
+ half->event = HTTP_EVENT_RES_END;
+ }
+ http_event_handler(parser->half_ref, half->event, parser->half_ref->http_env_ref);
+ half->event = __HTTP_EVENT_RESERVED;
+ return 0;
+}
+
+static int on_method(llhttp_t *http, const char *at, size_t length)
+{
+ if (length == 0)
+ {
+ return 0;
+ }
+ struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser);
+ struct http_half *half = parser->half_ref;
+ half->flow_data->req_line.method = (char *)at;
+ half->flow_data->req_line.method_len = length;
+ half->half_stage.llhttp_last_stage = LLHTTP_STAGE_METHOD;
+ half->half_stage.llhttp_cb_count[LLHTTP_STAGE_METHOD]++;
+ return 0;
+}
+
+/* Possible return values 0, -1, HPE_USER */
+static int on_uri(llhttp_t *http, const char *at, size_t length)
+{
+ if (length == 0)
+ {
+ return 0;
+ }
+ struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser);
+ struct http_half *half = parser->half_ref;
+ half->flow_data->req_line.uri = (char *)at;
+ half->flow_data->req_line.uri_len = length;
+ half->half_stage.llhttp_last_stage = LLHTTP_STAGE_URI;
+ half->half_stage.llhttp_cb_count[LLHTTP_STAGE_URI]++;
+ return 0;
+}
+
+/* Possible return values 0, -1, HPE_USER */
+static int on_version(llhttp_t *http, const char *at, size_t length)
+{
+ if (length == 0)
+ {
+ return 0;
+ }
+ struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser);
+ struct http_half *half = parser->half_ref;
+ if (http->type == HTTP_REQUEST)
+ {
+ half->flow_data->req_line.version = (char *)at;
+ half->flow_data->req_line.version_len = length;
+ }
+ else
+ {
+ half->flow_data->status_line.version = (char *)at;
+ half->flow_data->status_line.version_len = length;
+ }
+ half->half_stage.llhttp_last_stage = LLHTTP_STAGE_VERSION;
+ half->half_stage.llhttp_cb_count[LLHTTP_STAGE_VERSION]++;
+ return 0;
+}
+
+/* Information-only callbacks, return value is ignored */
+static int on_version_complete(llhttp_t *http)
+{
+ struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser);
+ struct http_half *half = parser->half_ref;
+ if (http->type == HTTP_REQUEST)
+ {
+ half->flow_data->req_line.major_version = llhttp_get_http_major(http);
+ half->flow_data->req_line.minor_version = llhttp_get_http_minor(http);
+ }
+ else
+ {
+ half->flow_data->status_line.major_version = llhttp_get_http_major(http);
+ half->flow_data->status_line.minor_version = llhttp_get_http_minor(http);
+ }
+ half->half_stage.llhttp_last_stage = LLHTTP_STAGE_VERSION;
+ half->half_stage.llhttp_cb_count[LLHTTP_STAGE_VERSION]++;
+ return 0;
+}
+
+/* Possible return values 0, -1, HPE_USER */
+static int on_status(llhttp_t *http, const char *at, size_t length)
+{
+ if (length == 0)
+ {
+ return 0;
+ }
+ struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser);
+ struct http_half *half = parser->half_ref;
+ half->flow_data->status_line.status = (char *)at;
+ half->flow_data->status_line.status_len = length;
+ half->half_stage.llhttp_last_stage = LLHTTP_STAGE_STATUS;
+ half->half_stage.llhttp_cb_count[LLHTTP_STAGE_STATUS]++;
+ return 0;
+}
+
+/* Information-only callbacks, return value is ignored */
+static int on_status_complete(llhttp_t *http)
+{
+ struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser);
+ struct http_half *half = parser->half_ref;
+ half->flow_data->status_line.status_code = llhttp_get_status_code(http);
+ half->half_stage.llhttp_last_stage = LLHTTP_STAGE_STATUS;
+ half->half_stage.llhttp_cb_count[LLHTTP_STAGE_STATUS]++;
+ return 0;
+}
+
+/* Possible return values 0, -1, HPE_USER */
+static int on_header_field(llhttp_t *http, const char *at, size_t length)
+{
+ if (length == 0)
+ {
+ return 0;
+ }
+ struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser);
+ struct http_half *half = parser->half_ref;
+ http_flow_append_header_filed(half->flow_data, at, length);
+ half->half_stage.llhttp_last_stage = LLHTTP_STAGE_HEADER_FIELD;
+ half->half_stage.llhttp_cb_count[LLHTTP_STAGE_HEADER_FIELD]++;
+ if (half->half_stage.first_header_name_ptr == NULL)
+ {
+ half->half_stage.first_header_name_ptr = at;
+ }
+ return 0;
+}
+
+static int on_header_field_complete(llhttp_t *http)
+{
+ struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser);
+ struct http_half *half = parser->half_ref;
+ half->half_stage.llhttp_last_stage = LLHTTP_STAGE_HEADER_FIELD_COMPLETE;
+ half->half_stage.llhttp_cb_count[LLHTTP_STAGE_HEADER_FIELD_COMPLETE]++;
+ return 0;
+}
+
+/* Possible return values 0, -1, HPE_USER */
+static int on_header_value(llhttp_t *http, const char *at, size_t length)
+{
+ if (length == 0)
+ {
+ return 0;
+ }
+ struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser);
+ struct http_half *half = parser->half_ref;
+ http_flow_append_header_value(half->flow_data, at, length);
+ half->half_stage.last_header_value_complete_ptr = at + length;
+ return 0;
+}
+
+static int on_header_value_complete(llhttp_t *http)
+{
+ struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser);
+ struct http_half *half = parser->half_ref;
+ half->half_stage.llhttp_last_stage = LLHTTP_STAGE_HEADER_VALUE_COMPLETE;
+ half->half_stage.llhttp_cb_count[LLHTTP_STAGE_HEADER_VALUE_COMPLETE]++;
+ if (NULL != half->half_stage.last_header_value_complete_ptr) /* header value maybe empty(NULL) */
+ {
+ if (strncmp("\r\n", half->half_stage.last_header_value_complete_ptr, 2) == 0)
+ {
+ half->half_stage.last_header_value_complete_ptr += 2;
+ }
+ }
+ return 0;
+}
+
+/* Possible return values:
+ * 0 - Proceed normally
+ * 1 - Assume that request/response has no body, and proceed to parsing the next message
+ * 2 - Assume absence of body (as above) and make `llhttp_execute()` return `HPE_PAUSED_UPGRADE`
+ * -1 - Error `HPE_PAUSED`
+ */
+static int on_headers_complete(llhttp_t *http)
+{
+ struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser);
+ struct http_half *half = parser->half_ref;
+ if (http->type == HTTP_REQUEST)
+ {
+ half->event = HTTP_EVENT_REQ_HDR_END;
+ }
+ else
+ {
+ half->event = HTTP_EVENT_RES_HDR_END;
+ }
+ half->half_stage.last_headers_complete_ptr = half->half_stage.last_header_value_complete_ptr;
+ if (half->half_stage.last_headers_complete_ptr != NULL &&
+ strncmp("\r\n", half->half_stage.last_headers_complete_ptr, 2) == 0)
+ {
+ half->half_stage.last_headers_complete_ptr += 2;
+ }
+ half->half_stage.llhttp_last_stage = LLHTTP_STAGE_HEADERS_COMPLETE;
+ half->half_stage.llhttp_cb_count[LLHTTP_STAGE_HEADERS_COMPLETE]++;
+ http_event_handler(half, half->event, half->http_env_ref);
+ return 0;
+}
+
+/* Possible return values 0, -1, HPE_USER */
+static int on_body(llhttp_t *http, const char *at, size_t length)
+{
+ if (length == 0)
+ {
+ return 0;
+ }
+ int has_new_data = 0;
+ struct http_half_parser *parser = container_of(http, struct http_half_parser, llhttp_parser);
+ struct http_half *half = parser->half_ref;
+ half->half_stage.llhttp_last_stage = LLHTTP_STAGE_BODY;
+ half->half_stage.llhttp_cb_count[LLHTTP_STAGE_BODY]++;
+
+ if (half->http_env_ref->hd_cfg.decompress_switch != 0 &&
+ half->flow_data->content_encoding_type != HTTP_CONTENT_ENCODING_NONE)
+ {
+ http_flow_body_decompress(half->flow_data, at, length);
+ if (half->flow_data->decompress_body.body != NULL && half->flow_data->decompress_body.body_sz > 0)
+ {
+ has_new_data = 1;
+ }
+ }
+ else
+ {
+ half->flow_data->raw_body.body = (char *)at;
+ half->flow_data->raw_body.body_sz = length;
+ has_new_data = 1;
+ }
+ if (has_new_data)
+ {
+ if (http->type == HTTP_REQUEST)
+ {
+ half->event = HTTP_EVENT_REQ_BODY_DATA;
+ }
+ else
+ {
+ half->event = HTTP_EVENT_RES_BODY_DATA;
+ }
+ http_event_handler(half, half->event, half->http_env_ref);
+ }
+ return 0;
+}
+
+void http_flow_parser_init(struct http_half_parser *flow_parser, enum llhttp_type type)
+{
+ llhttp_settings_init(&flow_parser->settings);
+ llhttp_settings_t *sets = &flow_parser->settings;
+ sets->on_message_begin = on_message_begin;
+ sets->on_url = on_uri;
+ sets->on_status = on_status;
+ sets->on_status_complete = on_status_complete;
+ sets->on_method = on_method;
+ sets->on_version = on_version;
+ sets->on_version_complete = on_version_complete;
+ sets->on_header_field = on_header_field;
+ sets->on_header_field_complete = on_header_field_complete;
+ sets->on_header_value = on_header_value;
+ sets->on_header_value_complete = on_header_value_complete;
+ sets->on_headers_complete = on_headers_complete;
+ sets->on_body = on_body;
+ sets->on_message_complete = on_message_complete;
+ llhttp_init(&flow_parser->llhttp_parser, type, sets);
+} \ No newline at end of file
diff --git a/decoders/http/http_decoder_module.c b/decoders/http/http_decoder_module.c
new file mode 100644
index 0000000..53ff71b
--- /dev/null
+++ b/decoders/http/http_decoder_module.c
@@ -0,0 +1,299 @@
+#include <stdio.h>
+#include <assert.h>
+#include "stellar/session.h"
+#include "stellar/module.h"
+#include "http_decoder_utils.h"
+#include "http_decoder_half.h"
+#include "http_decoder.h"
+#include "toml/toml.h"
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+ __thread struct http_topic_manager *chaotic_http_topic_mgr;
+
+#define HTTP_TOPIC_NAME_REQ_HDR "HTTP_TOPIC_REQ_HDR"
+#define HTTP_TOPIC_NAME_REQ_BODY "HTTP_TOPIC_REQ_BODY"
+#define HTTP_TOPIC_NAME_RES_HDR "HTTP_TOPIC_RES_HDR"
+#define HTTP_TOPIC_NAME_RES_BODY "HTTP_TOPIC_RES_BODY"
+
+ static void http_set_default_config(struct http_config *hd_cfg)
+ {
+ hd_cfg->decompress_switch = 1;
+ }
+ static int http_load_config(const char *cfg_path, struct http_config *hd_cfg)
+ {
+ FILE *fp = fopen(cfg_path, "r");
+ if (NULL == fp)
+ {
+ fprintf(stderr, "[%s]Can't open config file:%s", __FUNCTION__, cfg_path);
+ return -1;
+ }
+ int ret = 0;
+ char errbuf[256] = {0};
+ toml_table_t *root = toml_parse_file(fp, errbuf, sizeof(errbuf));
+ fclose(fp);
+
+ toml_table_t *section = toml_table_in(root, "http");
+ if (section == NULL)
+ {
+ fprintf(stderr, "(logger) config file %s missing 'http' section\n", cfg_path);
+ goto error_exit;
+ }
+
+ toml_datum_t int_val = toml_int_in(section, "decompress_enable");
+ if (int_val.ok != 0)
+ {
+ hd_cfg->decompress_switch = int_val.u.b;
+ }
+ error_exit:
+ toml_free(root);
+ return ret;
+ }
+
+ static void http_update_header_array(struct http_half_data *flow_data)
+ {
+ if (0 == http_get_header_field_count(flow_data))
+ {
+ flow_data->header.field_array = NULL;
+ flow_data->header.field_array_num = 0;
+ }
+ else
+ {
+ flow_data->header.field_array = (struct http_header_field *)utarray_front(flow_data->ut_filed_array);
+ flow_data->header.field_array_num = utarray_len(flow_data->ut_filed_array);
+ }
+ }
+
+ static void http_on_msg_dispatch(int topic_id UNUSED, void *msg, on_msg_cb_func *on_msg_cb,
+ void *on_msg_cb_arg, void *dispatch_arg UNUSED)
+ {
+ assert(msg != NULL && on_msg_cb != NULL);
+ struct http_message *hmsg = (struct http_message *)msg;
+ struct http_half_data *flow_data = hmsg->flow_data;
+
+ if (hmsg->event == HTTP_EVENT_REQ_END || hmsg->event == HTTP_EVENT_RES_END)
+ {
+ /* notify subscriber ? */
+ return;
+ }
+ switch (hmsg->topic_type)
+ {
+ case HTTP_TOPIC_REQ_HEADER:
+ {
+ http_update_header_array(flow_data);
+ http_on_request_header_cb *on_req_hdr_cb = (http_on_request_header_cb *)((void *)on_msg_cb);
+ on_req_hdr_cb(hmsg->sess_ref, flow_data->transaction_seq, &flow_data->req_line, &flow_data->header,
+ (const char *)flow_data->joint_url.iov_base, flow_data->joint_url.iov_len,
+ on_msg_cb_arg);
+ }
+ break;
+ case HTTP_TOPIC_RES_HEADER:
+ {
+ http_update_header_array(flow_data);
+ http_on_response_header_cb *on_res_hdr_cb = (http_on_response_header_cb *)((void *)on_msg_cb);
+ on_res_hdr_cb(hmsg->sess_ref, flow_data->transaction_seq, &flow_data->status_line, &flow_data->header, on_msg_cb_arg);
+ }
+ break;
+ case HTTP_TOPIC_REQ_BODY:
+ case HTTP_TOPIC_RES_BODY:
+ {
+ http_on_body_cb *on_body_cb = (http_on_body_cb *)((void *)on_msg_cb);
+ if (flow_data->decompress_body.body != NULL || flow_data->decompress_body.offset > 0)
+ {
+ on_body_cb(hmsg->sess_ref, flow_data->decompress_body.body, flow_data->decompress_body.body_sz, flow_data->decompress_body.offset,
+ flow_data->transaction_seq, flow_data->decompress_body.is_finished, on_msg_cb_arg);
+ }
+ else
+ {
+ on_body_cb(hmsg->sess_ref, flow_data->raw_body.body, flow_data->raw_body.body_sz, flow_data->raw_body.offset,
+ flow_data->transaction_seq, flow_data->raw_body.is_finished, on_msg_cb_arg);
+ }
+ }
+ break;
+ default:
+ assert(0);
+ break;
+ }
+ }
+
+ static int http_create_topic_nx(struct module_manager *mod_mgr, const char *topic_name)
+ {
+ struct mq_schema *mq_s = module_manager_get_mq_schema(mod_mgr);
+ assert(mq_s != NULL);
+ int quic_topic_id = mq_schema_get_topic_id(mq_s, topic_name);
+ if (quic_topic_id >= 0)
+ {
+ return quic_topic_id;
+ }
+ int topic_id = mq_schema_create_topic(mq_s, topic_name, (on_msg_dispatch_cb_func *)http_on_msg_dispatch,
+ NULL, http_message_free_cb, NULL);
+ return topic_id;
+ }
+
+ struct http_topic_manager *http_topic_mgr_init(struct module_manager *mod_mgr)
+ {
+ if (chaotic_http_topic_mgr != NULL)
+ {
+ return chaotic_http_topic_mgr;
+ }
+ struct http_topic_manager *http_topic_mgr = (struct http_topic_manager *)calloc(1, sizeof(struct http_topic_manager));
+ assert(http_topic_mgr != NULL);
+ struct http_topic_compose *topic_compose = http_topic_mgr->topic_compose;
+
+ topic_compose[HTTP_TOPIC_REQ_HEADER].topic_name = HTTP_TOPIC_NAME_REQ_HDR;
+ topic_compose[HTTP_TOPIC_REQ_BODY].topic_name = HTTP_TOPIC_NAME_REQ_BODY;
+ topic_compose[HTTP_TOPIC_RES_HEADER].topic_name = HTTP_TOPIC_NAME_RES_HDR;
+ topic_compose[HTTP_TOPIC_RES_BODY].topic_name = HTTP_TOPIC_NAME_RES_BODY;
+
+ topic_compose[HTTP_TOPIC_REQ_HEADER].topic_id = http_create_topic_nx(mod_mgr, HTTP_TOPIC_NAME_REQ_HDR);
+ topic_compose[HTTP_TOPIC_REQ_BODY].topic_id = http_create_topic_nx(mod_mgr, HTTP_TOPIC_NAME_REQ_BODY);
+ topic_compose[HTTP_TOPIC_RES_HEADER].topic_id = http_create_topic_nx(mod_mgr, HTTP_TOPIC_NAME_RES_HDR);
+ topic_compose[HTTP_TOPIC_RES_BODY].topic_id = http_create_topic_nx(mod_mgr, HTTP_TOPIC_NAME_RES_BODY);
+ chaotic_http_topic_mgr = http_topic_mgr;
+ return http_topic_mgr;
+ }
+
+ void http_topic_mgr_free(struct http_topic_manager *topic_mgr)
+ {
+ assert(topic_mgr != NULL);
+ FREE(topic_mgr);
+ }
+
+ static int http_subscribe_common(struct module_manager *mod_mgr, enum http_topic_type topic_type, void *cb, void *args)
+ {
+ struct http_topic_manager *http_topic_mgr = chaotic_http_topic_mgr;
+ if (http_topic_mgr == NULL)
+ {
+ http_topic_mgr = http_topic_mgr_init(mod_mgr);
+ chaotic_http_topic_mgr = http_topic_mgr;
+ }
+ return mq_schema_subscribe(module_manager_get_mq_schema(mod_mgr),
+ http_topic_mgr->topic_compose[topic_type].topic_id,
+ (on_msg_cb_func *)cb, args);
+ }
+
+ int http_subscribe_request_header(struct module_manager *mod_mgr, http_on_request_header_cb *cb, void *args)
+ {
+ assert(mod_mgr != NULL);
+ return http_subscribe_common(mod_mgr, HTTP_TOPIC_REQ_HEADER, cb, args);
+ }
+
+ int http_subscribe_response_header(struct module_manager *mod_mgr, http_on_response_header_cb *cb, void *args)
+ {
+ assert(mod_mgr != NULL);
+ return http_subscribe_common(mod_mgr, HTTP_TOPIC_RES_HEADER, cb, args);
+ }
+
+ int http_subscribe_request_body(struct module_manager *mod_mgr, http_on_body_cb *cb, void *args)
+ {
+ assert(mod_mgr != NULL);
+ return http_subscribe_common(mod_mgr, HTTP_TOPIC_REQ_BODY, cb, args);
+ }
+
+ int http_subscribe_response_body(struct module_manager *mod_mgr, http_on_body_cb *cb, void *args)
+ {
+ assert(mod_mgr != NULL);
+ return http_subscribe_common(mod_mgr, HTTP_TOPIC_RES_BODY, cb, args);
+ }
+
+ int http_subscribe(struct http *http, struct http_subscirbe_params *params, void *arg)
+ {
+ assert(http != NULL && params != NULL);
+ struct module_manager *mod_mgr = http->mod_mgr_ref;
+ int ret = 0;
+ if (params->req_hdr_cb != NULL)
+ {
+ ret = http_subscribe_request_header(mod_mgr, params->req_hdr_cb, arg);
+ if (ret < 0)
+ {
+ return ret;
+ }
+ }
+ if (params->res_hdr_cb != NULL)
+ {
+ ret = http_subscribe_response_header(mod_mgr, params->res_hdr_cb, arg);
+ if (ret < 0)
+ {
+ return ret;
+ }
+ }
+ if (params->req_body_cb != NULL)
+ {
+ ret = http_subscribe_request_body(mod_mgr, params->req_body_cb, arg);
+ if (ret < 0)
+ {
+ return ret;
+ }
+ }
+ if (params->res_body_cb != NULL)
+ {
+ ret = http_subscribe_response_body(mod_mgr, params->res_body_cb, arg);
+ if (ret < 0)
+ {
+ return ret;
+ }
+ }
+ return ret;
+ }
+
+ struct module *http_init(struct module_manager *mod_mgr)
+ {
+ assert(mod_mgr != NULL);
+ struct http *http_env = (struct http *)calloc(1, sizeof(struct http));
+ http_set_default_config(&http_env->hd_cfg);
+ http_load_config(module_manager_get_toml_path(mod_mgr), &http_env->hd_cfg);
+ http_stat_init(mod_mgr, &http_env->stat);
+ struct module *mod = module_new(HTTP_MODULE_NAME, http_env);
+ http_env->mod_mgr_ref = mod_mgr;
+
+ http_env->logger_ref = module_manager_get_logger(mod_mgr);
+ assert(http_env->logger_ref != NULL);
+ struct module *sess_mod = module_manager_get_module(mod_mgr, SESSION_MANAGER_MODULE_NAME);
+ struct session_manager *sess_mgr = module_to_session_manager(sess_mod);
+ assert(sess_mgr != NULL);
+
+ struct http_topic_manager *http_topic_mgr = http_topic_mgr_init(mod_mgr);
+ assert(http_topic_mgr != NULL);
+ http_env->http_topic_mgr = http_topic_mgr;
+
+ session_manager_subscribe_tcp_stream(sess_mgr, http_on_tcp_stream_cb, http_env);
+ http_env->exdata_id = session_manager_new_session_exdata_index(sess_mgr, HTTP_EXDATA_NAME, http_exdata_free_cb, http_env);
+ STELLAR_LOG_FATAL(http_env->logger_ref, HTTP_MODULE_NAME,
+ "http init success, decompress_switch:%d", http_env->hd_cfg.decompress_switch);
+ return mod;
+ }
+ struct module *http_get_module(struct module_manager *mod_mgr)
+ {
+ assert(mod_mgr != NULL);
+
+ struct module *http_mod = module_manager_get_module(mod_mgr, HTTP_MODULE_NAME);
+ if (NULL == http_mod)
+ {
+ http_mod = http_init(mod_mgr);
+ }
+ return http_mod;
+ }
+
+ struct http *http_module_to_http(struct module *http_mod)
+ {
+ assert(http_mod);
+ return (struct http *)module_get_ctx(http_mod);
+ }
+
+ void http_exit(struct module_manager *mod_mgr UNUSED, struct module *mod)
+ {
+ assert(mod != NULL);
+ struct http *http_env = (struct http *)module_get_ctx(mod);
+ http_stat_free(&http_env->stat);
+ http_topic_mgr_free(http_env->http_topic_mgr);
+ STELLAR_LOG_FATAL(http_env->logger_ref, HTTP_MODULE_NAME, "http exit!");
+ FREE(http_env);
+ module_free(mod);
+ }
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/decoders/http/http_decoder_private.h b/decoders/http/http_decoder_private.h
deleted file mode 100644
index 04130f0..0000000
--- a/decoders/http/http_decoder_private.h
+++ /dev/null
@@ -1,155 +0,0 @@
-
-#pragma once
-
-#ifndef __USE_MISC
-#define __USE_MISC 1
-#endif
-
-#ifdef __cplusplus
-extern "C"
-{
-#endif
-#include <bits/types/struct_iovec.h>
-#include "stellar/stellar.h"
-#include "stellar/packet.h"
-#include "stellar/utils.h"
-#include "stellar/session.h"
-#include "stellar/stellar_mq.h"
-#include "stellar/stellar_exdata.h"
-
-#include "nmx_pool/nmx_palloc.h"
-#include "stellar/utils.h"
-#include "stellar/http.h"
-#include "http_decoder_result_queue.h"
-#include "http_decoder_half.h"
-#include "http_decoder_table.h"
-#include "http_decoder_result_queue.h"
-#include "http_decoder_utils.h"
-#include "http_decoder_stat.h"
-#include "http_decoder_tunnel.h"
-#include "fieldstat/fieldstat_easy.h"
-#include "toml/toml.h"
-
-#ifndef hstring
-#include <bits/types/struct_iovec.h>
-typedef struct iovec hstring;
-#endif
-
-#ifndef likely
-#define likely(x) __builtin_expect((x), 1)
-#endif
-#ifndef unlikely
-#define unlikely(x) __builtin_expect((x), 0)
-#endif
-
-#define MEMPOOL_CALLOC(pool, type, number) ((type *)nmx_pcalloc(pool, sizeof(type) * number))
-#define MEMPOOL_REALLOC(pool)
-#define MEMPOOL_FREE(pool, p) nmx_pfree(pool, p)
-
-#define ENABLE_MEMPOOL 0
-#if ENABLE_MEMPOOL
-#define HD_CALLOC(pool, type, number) MEMPOOL_CALLOC(pool, number, type)
-#define HD_FREE(pool, p) MEMPOOL_FREE(pool, p)
-#else
-#define HD_CALLOC(pool, type, number) CALLOC(type, number)
-#define HD_FREE(pool, p) FREE(p)
-#endif
-
-#define HTTP_IDENTIFY_LEN 16
-#define HD_RESULT_QUEUE_LEN 16
-
-#define DEFAULT_STAT_OUTPUT_INTERVAL 1
-#define DEFAULT_STAT_INTERVAL_PKTS 1000
-#define DEFAULT_MEMPOOL_SIZE (32 * 1024)
-
-#define HTTPD_CFG_FILE "./etc/http/http_decoder.toml"
-#define FILEDSTAT_OUTPUT_FILE "./metrics/http_decoder_fs4.json"
-
-#define HTTP_CTX_NOT_HTTP "__NOT_HTTP_SESS__"
-#define HTTP_CTX_IS_HTTP "__FAKE_HTTP_CTX__"
-
- struct http_decoder_config
- {
- int decompress_switch;
- int stat_interval_pkts; // call fieldstat_incrby every stat_interval_pkts
- int stat_output_interval;
- int proxy_enable;
- size_t result_queue_len; // per session result queue length
- size_t mempool_size; // per session mempool size
- };
-
- /**
- * NOTE: http_message don't have the ownership of data
- */
- struct http_message
- {
- uint8_t flow_type;
- enum http_message_type type;
- size_t queue_index;
- struct http_decoder_result_queue *ref_queue;
- hstring raw_payload; // cause tcp reorder, maybe receive many tcp segments for one packet
- hstring decompress_payload;
- hstring tunnel_payload;
- };
-
- struct http_decoder
- {
- struct http_decoder_half *c2s_half;
- struct http_decoder_half *s2c_half;
- };
-
- enum httpd_topic_index
- {
- HTTPD_TOPIC_TCP_STREAM_INDEX = 0,
- HTTPD_TOPIC_HTTP_MSG_INDEX,
- HTTPD_TOPIC_HTTP_TUNNEL_INDEX,
- HTTPD_TOPIC_INDEX_MAX,
- };
-
- struct http_decoder_exdata
- {
- int sub_topic_id; // tcp_stream
- int pub_topic_id; // http message or http tunnel msg
- struct http_decoder_result_queue *queue;
- struct http_decoder *decoder;
- nmx_pool_t *mempool;
- enum http_tunnel_state tunnel_state;
- int in_tunnel_is_http;
- };
-
- // struct http_decoder_context{
- // int array_size;
- // struct http_decoder_exdata **exdata_array; //raw tcp stream for http msg; http tunnel for inner http transaction.
- // };
-
- struct http_topic_exdata_compose
- {
- enum httpd_topic_index index;
- const char *topic_name;
- on_session_msg_cb_func *on_msg_cb;
- stellar_msg_free_cb_func *msg_free_cb;
- const char *exdata_name;
- stellar_exdata_free *exdata_free_cb;
- int sub_topic_id; // as consumer
- int exdata_id;
- };
-
- struct http_decoder_env
- {
- struct stellar *st;
- int plugin_id;
- struct http_topic_exdata_compose topic_exdata_compose[HTTPD_TOPIC_INDEX_MAX];
- struct http_decoder_config hd_cfg;
- struct http_decoder_stat hd_stat;
- };
-
- struct http_message;
-
- struct http_message *http_message_new(enum http_message_type type, struct http_decoder_result_queue *queue,
- int queue_index, uint8_t flow_type);
- struct http_message *http_body_message_new(enum http_message_type type, struct http_decoder_result_queue *queue,
- int queue_index, uint8_t flow_type, hstring *raw_payload, hstring *decompress_payload);
- int http_topic_exdata_compose_get_index(const struct http_decoder_env *httpd_env, int by_topic_id);
-#ifdef __cplusplus
-}
-#endif
diff --git a/decoders/http/http_decoder_result_queue.c b/decoders/http/http_decoder_result_queue.c
deleted file mode 100644
index 5695138..0000000
--- a/decoders/http/http_decoder_result_queue.c
+++ /dev/null
@@ -1,152 +0,0 @@
-#include <assert.h>
-#include "http_decoder_private.h"
-
-struct http_decoder_result_queue *
-http_decoder_result_queue_new(nmx_pool_t *mempool, size_t queue_size)
-{
- struct http_decoder_result_queue *queue =
- MEMPOOL_CALLOC(mempool, struct http_decoder_result_queue, 1);
- assert(queue);
-
- queue->req_index = 0;
- queue->res_index = 0;
- queue->queue_size = queue_size;
- queue->array = MEMPOOL_CALLOC(mempool, struct http_decoder_result,
- queue->queue_size);
- assert(queue->array);
- return queue;
-}
-
-void http_decoder_result_queue_free(nmx_pool_t *mempool, struct http_decoder_result_queue *queue)
-{
- if (NULL == queue)
- {
- return;
- }
-
- if (queue->array != NULL)
- {
- for (size_t i = 0; i < queue->queue_size; i++)
- {
- if (queue->array[i].req_data != NULL)
- {
- http_decoder_half_data_free(mempool, queue->array[i].req_data);
- queue->array[i].req_data = NULL;
- }
-
- if (queue->array[i].res_data != NULL)
- {
- http_decoder_half_data_free(mempool, queue->array[i].res_data);
- queue->array[i].res_data = NULL;
- }
- }
- MEMPOOL_FREE(mempool, queue->array);
- }
- MEMPOOL_FREE(mempool, queue);
-}
-
-void http_decoder_result_queue_inc_req_index(struct http_decoder_result_queue *queue)
-{
- assert(queue);
- queue->req_index++;
- queue->req_index = queue->req_index % queue->queue_size;
-}
-
-void http_decoder_result_queue_inc_res_index(struct http_decoder_result_queue *queue)
-{
- assert(queue);
- queue->res_index++;
- queue->res_index = queue->res_index % queue->queue_size;
-}
-
-size_t http_decoder_result_queue_req_index(struct http_decoder_result_queue *queue)
-{
- assert(queue);
- return queue->req_index;
-}
-
-size_t http_decoder_result_queue_res_index(struct http_decoder_result_queue *queue)
-{
- assert(queue);
- return queue->res_index;
-}
-
-int http_decoder_result_queue_push_req(struct http_decoder_result_queue *queue,
- struct http_decoder_half_data *req_data)
-{
- if (NULL == queue || NULL == req_data)
- {
- return -1;
- }
- assert(queue->array[queue->req_index].req_data == NULL);
- if (queue->array[queue->req_index].req_data != NULL)
- {
- return -1;
- }
- queue->array[queue->req_index].req_data = req_data;
- return 0;
-}
-
-int http_decoder_result_queue_push_res(struct http_decoder_result_queue *queue,
- struct http_decoder_half_data *res_data)
-{
- if (NULL == queue || NULL == res_data)
- {
- return -1;
- }
- assert(queue->array[queue->res_index].res_data == NULL);
- if (queue->array[queue->res_index].res_data != NULL)
- {
- return -1;
- }
-
- queue->array[queue->res_index].res_data = res_data;
- return 0;
-}
-
-struct http_decoder_half_data *
-http_decoder_result_queue_pop_req(struct http_decoder_result_queue *queue)
-{
- if (NULL == queue)
- {
- return NULL;
- }
- struct http_decoder_half_data *req_data = queue->array[queue->req_index].req_data;
- queue->array[queue->req_index].req_data = NULL;
- return req_data;
-}
-
-struct http_decoder_half_data *
-http_decoder_result_queue_pop_res(struct http_decoder_result_queue *queue)
-{
- if (NULL == queue)
- {
- return NULL;
- }
-
- struct http_decoder_half_data *res_data = queue->array[queue->res_index].res_data;
- queue->array[queue->res_index].res_data = NULL;
- return res_data;
-}
-
-struct http_decoder_half_data *
-http_decoder_result_queue_peek_req(struct http_decoder_result_queue *queue)
-{
- if (NULL == queue)
- {
- return NULL;
- }
- assert(queue->req_index < queue->queue_size);
- return queue->array[queue->req_index].req_data;
-}
-
-struct http_decoder_half_data *
-http_decoder_result_queue_peek_res(struct http_decoder_result_queue *queue)
-{
- if (NULL == queue)
- {
- return NULL;
- }
- assert(queue->res_index < queue->queue_size);
- return queue->array[queue->res_index].res_data;
-} \ No newline at end of file
diff --git a/decoders/http/http_decoder_result_queue.h b/decoders/http/http_decoder_result_queue.h
deleted file mode 100644
index a6ff0ca..0000000
--- a/decoders/http/http_decoder_result_queue.h
+++ /dev/null
@@ -1,51 +0,0 @@
-#pragma once
-
-#include <stddef.h>
-#include "nmx_pool/nmx_palloc.h"
-#include "http_decoder_half.h"
-
-struct http_decoder_result
-{
- struct http_decoder_half_data *req_data;
- struct http_decoder_half_data *res_data;
-};
-
-struct http_decoder_result_queue
-{
- size_t req_index;
- size_t res_index;
- size_t queue_size;
- struct http_decoder_result *array;
-};
-
-struct http_decoder_result_queue *
-http_decoder_result_queue_new(nmx_pool_t *mempool, size_t queue_size);
-
-void http_decoder_result_queue_free(nmx_pool_t *mempool,
- struct http_decoder_result_queue *queue);
-
-void http_decoder_result_queue_inc_req_index(struct http_decoder_result_queue *queue);
-
-void http_decoder_result_queue_inc_res_index(struct http_decoder_result_queue *queue);
-
-size_t http_decoder_result_queue_req_index(struct http_decoder_result_queue *queue);
-
-size_t http_decoder_result_queue_res_index(struct http_decoder_result_queue *queue);
-
-struct http_decoder_half_data *
-http_decoder_result_queue_pop_req(struct http_decoder_result_queue *queue);
-
-struct http_decoder_half_data *
-http_decoder_result_queue_pop_res(struct http_decoder_result_queue *queue);
-
-int http_decoder_result_queue_push_req(struct http_decoder_result_queue *queue,
- struct http_decoder_half_data *req_data);
-
-int http_decoder_result_queue_push_res(struct http_decoder_result_queue *queue,
- struct http_decoder_half_data *res_data);
-
-struct http_decoder_half_data *
-http_decoder_result_queue_peek_req(struct http_decoder_result_queue *queue);
-
-struct http_decoder_half_data *
-http_decoder_result_queue_peek_res(struct http_decoder_result_queue *queue);
diff --git a/decoders/http/http_decoder_stat.c b/decoders/http/http_decoder_stat.c
index dec6c91..a8240c2 100644
--- a/decoders/http/http_decoder_stat.c
+++ b/decoders/http/http_decoder_stat.c
@@ -2,126 +2,104 @@
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
-#include "http_decoder_private.h"
+#include "http_decoder_stat.h"
-static const struct hd_stat_config_tuple g_httpd_stat_tuple[HTTP_STAT_MAX] =
- {
- {HTTP_C2S_BYTES, "http_c2s_bytes"},
- {HTTP_S2C_BYTES, "http_s2c_bytes"},
- {HTTP_C2S_TCP_SEG, "http_c2s_tcp_seg"},
- {HTTP_S2C_TCP_SEG, "http_s2c_tcp_seg"},
- {HTTP_C2S_HEADERS, "http_c2s_headers"},
- {HTTP_S2C_HEADERS, "http_s2c_headers"},
- {HTTP_C2S_ZIP_BYTES, "http_c2s_zip_bytes"},
- {HTTP_S2C_ZIP_BYTES, "http_s2c_zip_bytes"},
- {HTTP_C2S_UNZIP_BYTES, "http_c2s_unzip_bytes"},
- {HTTP_S2C_UNZIP_BYTES, "http_s2c_unzip_bytes"},
- {HTTP_URL_BYTES, "http_url_bytes"},
- {HTTP_SESSION_NEW, "http_session_new"},
- {HTTP_SESSION_FREE, "http_session_free"},
- {HTTP_TRANSACTION_NEW, "http_transaction_new"},
- {HTTP_TRANSACTION_FREE, "http_transaction_free"},
- {HTTP_C2S_ASYMMETRY_SESSION, "http_c2s_asymmetry_sess"},
- {HTTP_S2C_ASYMMETRY_SESSION, "http_s2c_asymmetry_sess"},
- {HTTP_C2S_ASYMMETRY_TRANSACTION, "http_c2s_asymmetry_trans"},
- {HTTP_S2C_ASYMMETRY_TRANSACTION, "http_s2c_asymmetry_trans"},
- {HTTP_STAT_PARSE_ERR, "http_parse_error"},
-};
-
-void http_decoder_stat_free(struct http_decoder_stat *hd_stat)
+#ifdef __cplusplus
+extern "C"
{
- if (hd_stat->timer_pid != 0)
- {
- pthread_cancel(hd_stat->timer_pid);
- }
- if (hd_stat->stats != NULL)
- {
- free(hd_stat->stats);
- }
- if (hd_stat->fse != NULL)
- {
- fieldstat_easy_free(hd_stat->fse);
- }
-}
+#endif
+ static const struct hd_stat_config_tuple g_httpd_stat_tuple[HTTP_STAT_MAX] =
+ {
+ {HTTP_C2S_BYTES, "http_c2s_bytes"},
+ {HTTP_S2C_BYTES, "http_s2c_bytes"},
+ {HTTP_C2S_TCP_SEG, "http_c2s_tcp_seg"},
+ {HTTP_S2C_TCP_SEG, "http_s2c_tcp_seg"},
+ {HTTP_C2S_HEADERS, "http_c2s_headers"},
+ {HTTP_S2C_HEADERS, "http_s2c_headers"},
+ {HTTP_C2S_ZIP_BYTES, "http_c2s_zip_bytes"},
+ {HTTP_S2C_ZIP_BYTES, "http_s2c_zip_bytes"},
+ {HTTP_C2S_UNZIP_BYTES, "http_c2s_unzip_bytes"},
+ {HTTP_S2C_UNZIP_BYTES, "http_s2c_unzip_bytes"},
+ {HTTP_URL_BYTES, "http_url_bytes"},
+ {HTTP_SESSION_NEW, "http_session_new"},
+ {HTTP_SESSION_FREE, "http_session_free"},
+ {HTTP_TRANSACTION_NEW, "http_transaction_new"},
+ {HTTP_TRANSACTION_FREE, "http_transaction_free"},
+ {HTTP_C2S_ASYMMETRY_SESSION, "http_c2s_asymmetry_sess"},
+ {HTTP_S2C_ASYMMETRY_SESSION, "http_s2c_asymmetry_sess"},
+ {HTTP_C2S_ASYMMETRY_TRANSACTION, "http_c2s_asymmetry_trans"},
+ {HTTP_S2C_ASYMMETRY_TRANSACTION, "http_s2c_asymmetry_trans"},
+ {HTTP_STAT_PARSE_ERR, "http_parse_error"},
+ };
-static void *httpd_stat_timer_thread(void *arg)
-{
- pthread_setname_np(pthread_self(), "http_decoder_timer_thread");
- struct http_decoder_stat *hd_stat = (struct http_decoder_stat *)arg;
- struct timespec res;
- while (1)
+ void http_stat_free(struct http_stat *hd_stat)
{
- clock_gettime(CLOCK_MONOTONIC, &res);
- hd_stat->current_time_ms = (res.tv_sec * 1000) + (res.tv_nsec / 1000000);
- usleep(800);
+ if (hd_stat->fs4_ins != NULL)
+ {
+ fieldstat_easy_free(hd_stat->fs4_ins);
+ }
}
- return NULL;
-}
-int http_decoder_stat_init(struct http_decoder_stat *hd_stat, int thread_max, int stat_interval_pkts, int stat_interval_time)
-{
- assert(sizeof(g_httpd_stat_tuple) / sizeof(struct hd_stat_config_tuple) == HTTP_STAT_MAX);
- if (sizeof(g_httpd_stat_tuple) / sizeof(struct hd_stat_config_tuple) != HTTP_STAT_MAX)
- {
- fprintf(stderr, "enum http_decoder_stat_type number not match with g_httpd_stat_tuple!");
- return -1;
- }
- hd_stat->fse = fieldstat_easy_new(thread_max, "http_decoder_statistics", NULL, 0);
- if (NULL == hd_stat->fse)
+ static struct fieldstat_easy *http_get_fs4_ins(struct module_manager *mod_mgr)
{
- fprintf(stderr, "fieldstat_easy_new failed.");
- return -1;
+ // todo: use stellar global fieldstat4 instance
+ struct fieldstat_easy *fs4 = fieldstat_easy_new(module_manager_get_max_thread_num(mod_mgr), "HTTP", NULL, 0);
+ fieldstat_easy_enable_auto_output(fs4, "./log/http.fs4", 1);
+ return fs4;
}
- for (int i = 0; i < HTTP_STAT_MAX; i++)
+ int http_stat_init(struct module_manager *mod_mgr, struct http_stat *hd_stat)
{
- hd_stat->field_stat_id[i] = fieldstat_easy_register_counter(hd_stat->fse, g_httpd_stat_tuple[i].name);
- if (hd_stat->field_stat_id[i] < 0)
+ assert(sizeof(g_httpd_stat_tuple) / sizeof(struct hd_stat_config_tuple) == HTTP_STAT_MAX);
+ if (sizeof(g_httpd_stat_tuple) / sizeof(struct hd_stat_config_tuple) != HTTP_STAT_MAX)
{
- fprintf(stderr, "fieldstat_easy_register_counter %s failed.", g_httpd_stat_tuple[i].name);
- fieldstat_easy_free(hd_stat->fse);
- hd_stat->fse = NULL;
+ fprintf(stderr, "enum http_decoder_stat_type number not match with g_httpd_stat_tuple!");
+ return -1;
+ }
+ hd_stat->fs4_ins = http_get_fs4_ins(mod_mgr);
+ if (NULL == hd_stat->fs4_ins)
+ {
+ fprintf(stderr, "fieldstat_easy_new failed.");
return -1;
}
- }
- int ret = fieldstat_easy_enable_auto_output(hd_stat->fse, FILEDSTAT_OUTPUT_FILE, stat_interval_time);
- if (ret < 0)
- {
- fprintf(stderr, "fieldstat_easy_enable_auto_output failed.");
- fieldstat_easy_free(hd_stat->fse);
- hd_stat->fse = NULL;
- return -1;
+ for (int i = 0; i < HTTP_STAT_MAX; i++)
+ {
+ hd_stat->field_stat_id[i] = fieldstat_easy_register_counter(hd_stat->fs4_ins, g_httpd_stat_tuple[i].name);
+ if (hd_stat->field_stat_id[i] < 0)
+ {
+ fprintf(stderr, "fieldstat_easy_register_counter %s failed.", g_httpd_stat_tuple[i].name);
+ fieldstat_easy_free(hd_stat->fs4_ins);
+ hd_stat->fs4_ins = NULL;
+ return -1;
+ }
+ }
+ return 0;
}
- hd_stat->stats = (struct hd_statistics *)calloc(thread_max, sizeof(struct hd_statistics));
- hd_stat->stat_interval_pkts = stat_interval_pkts;
- hd_stat->stat_interval_time = stat_interval_time;
- pthread_create(&hd_stat->timer_pid, NULL, httpd_stat_timer_thread, hd_stat);
- pthread_detach(hd_stat->timer_pid);
- return 0;
-}
-
-void http_decoder_stat_update(struct http_decoder_stat *hd_stat, int thread_id, enum http_decoder_stat_type type, long long value)
-{
- assert(hd_stat);
- assert(thread_id >= 0);
- assert(type < HTTP_STAT_MAX);
- if (unlikely(hd_stat->stats == NULL))
+ void http_stat_update(struct http_stat *hd_stat, int thread_id, enum http_decoder_stat_type type, long long value)
{
- return;
+ assert(hd_stat);
+ assert(thread_id >= 0);
+ assert(type < HTTP_STAT_MAX);
+ // todo: performance optimization, use batch update
+ fieldstat_easy_counter_incrby(hd_stat->fs4_ins, thread_id, hd_stat->field_stat_id[type], NULL, 0, value);
}
- struct hd_statistics *cur_hds = &hd_stat->stats[thread_id];
-
- cur_hds->counter[type] += value;
- cur_hds->batch[type]++;
-
- if (cur_hds->batch[type] >= hd_stat->stat_interval_pkts || cur_hds->time_ms[type] + 1000 < hd_stat->current_time_ms)
+ void http_stat_update_tcp_seg(struct http_stat *stat, int thread_id, enum flow_type ftype, long long value)
{
- fieldstat_easy_counter_incrby(hd_stat->fse, thread_id, hd_stat->field_stat_id[type], NULL, 0, cur_hds->counter[type]);
- cur_hds->counter[type] = 0;
- cur_hds->batch[type] = 0;
- cur_hds->time_ms[type] = hd_stat->current_time_ms;
+ if (FLOW_TYPE_C2S == ftype)
+ {
+ http_stat_update(stat, thread_id, HTTP_C2S_BYTES, value);
+ http_stat_update(stat, thread_id, HTTP_C2S_TCP_SEG, 1);
+ }
+ else
+ {
+ http_stat_update(stat, thread_id, HTTP_S2C_BYTES, value);
+ http_stat_update(stat, thread_id, HTTP_S2C_TCP_SEG, 1);
+ }
}
-} \ No newline at end of file
+
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file
diff --git a/decoders/http/http_decoder_stat.h b/decoders/http/http_decoder_stat.h
index 339e81d..0f9c1a7 100644
--- a/decoders/http/http_decoder_stat.h
+++ b/decoders/http/http_decoder_stat.h
@@ -1,55 +1,57 @@
#pragma once
#include <fieldstat/fieldstat_easy.h>
-enum http_decoder_stat_type
-{
- HTTP_C2S_BYTES = 0,
- HTTP_S2C_BYTES,
- HTTP_C2S_TCP_SEG,
- HTTP_S2C_TCP_SEG,
- HTTP_C2S_HEADERS,
- HTTP_S2C_HEADERS,
- HTTP_C2S_ZIP_BYTES, // only if Content-Encoding is gzip, deflate, br
- HTTP_S2C_ZIP_BYTES, // only if Content-Encoding is gzip, deflate, br
- HTTP_C2S_UNZIP_BYTES, // only if Content-Encoding is gzip, deflate, br
- HTTP_S2C_UNZIP_BYTES, // only if Content-Encoding is gzip, deflate, br
- HTTP_URL_BYTES,
- HTTP_SESSION_NEW,
- HTTP_SESSION_FREE,
- HTTP_TRANSACTION_NEW,
- HTTP_TRANSACTION_FREE,
- HTTP_C2S_ASYMMETRY_SESSION,
- HTTP_S2C_ASYMMETRY_SESSION,
- HTTP_C2S_ASYMMETRY_TRANSACTION,
- HTTP_S2C_ASYMMETRY_TRANSACTION,
- HTTP_STAT_PARSE_ERR,
- HTTP_STAT_MAX,
-};
+#include "stellar/module.h"
+#include "stellar/session.h"
-struct hd_stat_config_tuple
+#ifdef __cplusplus
+extern "C"
{
- enum http_decoder_stat_type type;
- const char *name;
-};
+#endif
-struct hd_statistics
-{
- long long time_ms[HTTP_STAT_MAX];
- long long counter[HTTP_STAT_MAX];
- int batch[HTTP_STAT_MAX]; // call fieldstat_easy_counter_incrby() per batch
-} __attribute__((aligned(64)));
+ enum http_decoder_stat_type
+ {
+ HTTP_C2S_BYTES = 0,
+ HTTP_S2C_BYTES,
+ HTTP_C2S_TCP_SEG,
+ HTTP_S2C_TCP_SEG,
+ HTTP_C2S_HEADERS,
+ HTTP_S2C_HEADERS,
+ HTTP_C2S_ZIP_BYTES, // only update if Content-Encoding is gzip, deflate, br
+ HTTP_S2C_ZIP_BYTES, // only update if Content-Encoding is gzip, deflate, br
+ HTTP_C2S_UNZIP_BYTES, // only update if Content-Encoding is gzip, deflate, br
+ HTTP_S2C_UNZIP_BYTES, // only update if Content-Encoding is gzip, deflate, br
+ HTTP_URL_BYTES,
+ HTTP_SESSION_NEW,
+ HTTP_SESSION_FREE,
+ HTTP_TRANSACTION_NEW,
+ HTTP_TRANSACTION_FREE,
+ HTTP_C2S_ASYMMETRY_SESSION,
+ HTTP_S2C_ASYMMETRY_SESSION,
+ HTTP_C2S_ASYMMETRY_TRANSACTION,
+ HTTP_S2C_ASYMMETRY_TRANSACTION,
+ HTTP_STAT_PARSE_ERR,
+ HTTP_STAT_MAX,
+ };
-struct http_decoder_stat
-{
- pthread_t timer_pid;
- long long current_time_ms;
- struct fieldstat_easy *fse;
- int stat_interval_pkts; // call fieldstat_incrby every stat_interval_pkts
- int stat_interval_time; // second
- int field_stat_id[HTTP_STAT_MAX];
- struct hd_statistics *stats; // size is thread number
-};
+ struct hd_stat_config_tuple
+ {
+ enum http_decoder_stat_type type;
+ const char *name;
+ };
+
+ struct http_stat
+ {
+ struct fieldstat_easy *fs4_ins;
+ int stat_interval_time; // second
+ int field_stat_id[HTTP_STAT_MAX];
+ };
+
+ int http_stat_init(struct module_manager *mod_mgr, struct http_stat *hd_stat);
+ void http_stat_free(struct http_stat *hd_stat);
+ void http_stat_update(struct http_stat *hd_stat, int thread_id, enum http_decoder_stat_type type, long long value);
+ void http_stat_update_tcp_seg(struct http_stat *stat, int thread_id, enum flow_type ftype, long long value);
-int http_decoder_stat_init(struct http_decoder_stat *hd_stat, int thread_max, int stat_interval_pkts, int stat_interval_time);
-void http_decoder_stat_free(struct http_decoder_stat *hd_stat);
-void http_decoder_stat_update(struct http_decoder_stat *hd_stat, int thread_id, enum http_decoder_stat_type type, long long value);
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file
diff --git a/decoders/http/http_decoder_string.c b/decoders/http/http_decoder_string.c
deleted file mode 100644
index 6fd5b04..0000000
--- a/decoders/http/http_decoder_string.c
+++ /dev/null
@@ -1,289 +0,0 @@
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <assert.h>
-#include "http_decoder_private.h"
-
-static const char *string_state_to_desc(enum string_state state)
-{
- switch (state)
- {
- case STRING_STATE_INIT:
- return "init";
- break;
- case STRING_STATE_REFER:
- return "refer";
- break;
- case STRING_STATE_CACHE:
- return "cache";
- break;
- case STRING_STATE_COMMIT:
- return "commit";
- break;
- default:
- return "unknown";
- break;
- }
-}
-
-void http_decoder_string_refer(struct http_decoder_string *rstr, const char *at, size_t length)
-{
- if (NULL == rstr)
- {
- return;
- }
-
- switch (rstr->state)
- {
- case STRING_STATE_INIT:
- case STRING_STATE_CACHE:
- rstr->refer.iov_base = (char *)at;
- rstr->refer.iov_len = length;
- break;
- default:
- abort();
- break;
- }
-
- rstr->state = STRING_STATE_REFER;
-}
-
-static void string_refer2cache(struct http_decoder_string *rstr)
-{
- if (0 == rstr->refer.iov_len)
- {
- return;
- }
- if (rstr->cache.iov_len >= rstr->max_cache_size)
- {
- return;
- }
-
- size_t length = rstr->cache.iov_len + rstr->refer.iov_len;
- if (length > rstr->max_cache_size)
- {
- length = rstr->max_cache_size;
- }
-
- if (NULL == rstr->cache.iov_base)
- {
- rstr->cache.iov_base = CALLOC(char, length + 1);
- memcpy(rstr->cache.iov_base, rstr->refer.iov_base, length);
- }
- else
- {
- rstr->cache.iov_base = REALLOC(char, rstr->cache.iov_base, length + 1);
- memcpy((char *)rstr->cache.iov_base + rstr->cache.iov_len, rstr->refer.iov_base,
- (length - rstr->cache.iov_len));
- }
-
- rstr->cache.iov_len = length;
- rstr->refer.iov_base = NULL;
- rstr->refer.iov_len = 0;
-}
-
-static void string_commit2cache(struct http_decoder_string *rstr)
-{
- if (rstr->cache.iov_len == rstr->commit.iov_len &&
- rstr->cache.iov_base == rstr->commit.iov_base)
- {
- rstr->commit.iov_base = NULL;
- rstr->commit.iov_len = 0;
- return;
- }
-
- // Only http header key need to backward to cache
- size_t length = 0;
- if (rstr->commit.iov_len > rstr->max_cache_size)
- {
- length = rstr->max_cache_size;
- }
- else
- {
- length = rstr->commit.iov_len;
- }
-
- if (length > 0)
- {
- if (NULL == rstr->cache.iov_base)
- {
- rstr->cache.iov_base = CALLOC(char, length + 1);
- }
- else
- {
- abort();
- }
- memcpy(rstr->cache.iov_base, rstr->commit.iov_base, length);
- rstr->cache.iov_len = length;
-
- rstr->commit.iov_base = NULL;
- rstr->commit.iov_len = 0;
- }
-}
-
-void http_decoder_string_cache(struct http_decoder_string *rstr)
-{
- if (NULL == rstr)
- {
- return;
- }
-
- switch (rstr->state)
- {
- case STRING_STATE_REFER:
- string_refer2cache(rstr);
- break;
- case STRING_STATE_CACHE:
- break;
- case STRING_STATE_COMMIT:
- // commit backward to cache
- string_commit2cache(rstr);
- break;
- default:
- abort();
- break;
- }
- rstr->state = STRING_STATE_CACHE;
-}
-
-void http_decoder_string_commit(struct http_decoder_string *rstr)
-{
- if (NULL == rstr)
- {
- return;
- }
-
- switch (rstr->state)
- {
- case STRING_STATE_REFER:
- if (rstr->cache.iov_len)
- {
- http_decoder_string_cache(rstr);
-
- rstr->commit.iov_base = rstr->cache.iov_base;
- rstr->commit.iov_len = rstr->cache.iov_len;
- // not overwrite rstr->cache.iov_base
- }
- else
- {
- rstr->commit.iov_base = rstr->refer.iov_base;
- rstr->commit.iov_len = rstr->refer.iov_len;
-
- rstr->refer.iov_base = NULL;
- rstr->refer.iov_len = 0;
- }
- break;
- case STRING_STATE_CACHE:
- rstr->commit.iov_base = rstr->cache.iov_base;
- rstr->commit.iov_len = rstr->cache.iov_len;
- // not overwrite rstr->cache.iov_base
- break;
- default:
- // abort();
- break;
- }
-
- rstr->state = STRING_STATE_COMMIT;
-}
-
-void http_decoder_string_reset(struct http_decoder_string *rstr)
-{
- assert(rstr);
-
- switch (rstr->state)
- {
- case STRING_STATE_INIT:
- case STRING_STATE_REFER:
- case STRING_STATE_CACHE:
- case STRING_STATE_COMMIT:
- FREE(rstr->cache.iov_base);
- memset(rstr, 0, sizeof(struct http_decoder_string));
- break;
- default:
- abort();
- break;
- }
-
- rstr->state = STRING_STATE_INIT;
-}
-
-void http_decoder_string_init(struct http_decoder_string *rstr, size_t max_cache_size)
-{
- rstr->max_cache_size = max_cache_size;
-}
-
-void http_decoder_string_reinit(struct http_decoder_string *rstr)
-{
- if (rstr->state == STRING_STATE_CACHE)
- {
- return;
- }
-
- if (rstr->state == STRING_STATE_COMMIT &&
- rstr->cache.iov_base == rstr->commit.iov_base &&
- rstr->cache.iov_len == rstr->commit.iov_len)
- {
- return;
- }
-
- if (rstr->cache.iov_base != NULL)
- {
- FREE(rstr->cache.iov_base);
- rstr->cache.iov_len = 0;
- }
-
-#if 0
- rstr->refer.iov_base = NULL;
- rstr->refer.iov_len = 0;
- rstr->commit.iov_base = NULL;
- rstr->commit.iov_len = 0;
- rstr->state = STRING_STATE_INIT;
-#endif
-}
-
-enum string_state http_decoder_string_state(const struct http_decoder_string *rstr)
-{
- return rstr->state;
-}
-
-int http_decoder_string_get(const struct http_decoder_string *rstr, char **name, size_t *name_len)
-{
- if (NULL == rstr || NULL == name || 0 == name_len)
- {
- return -1;
- }
-
- if (http_decoder_string_state(rstr) == STRING_STATE_COMMIT)
- {
- *name = rstr->commit.iov_base;
- *name_len = rstr->commit.iov_len;
- }
- else
- {
- *name = NULL;
- *name_len = 0;
- }
- return 0;
-}
-
-void http_decoder_string_dump(struct http_decoder_string *rstr, const char *desc)
-{
- if (NULL == rstr)
- {
- return;
- }
-
- char *refer_str = http_safe_dup((char *)rstr->refer.iov_base, rstr->refer.iov_len);
- char *cache_str = http_safe_dup((char *)rstr->cache.iov_base, rstr->cache.iov_len);
- char *commit_str = http_safe_dup((char *)rstr->commit.iov_base, rstr->commit.iov_len);
-
- printf("%s: state: %s, refer: {len: %02zu, iov_base: %s}, cache: {len: %02zu, iov_base: %s}, commit: {len: %02zu, iov_base: %s}\n",
- desc, string_state_to_desc(rstr->state),
- rstr->refer.iov_len, refer_str,
- rstr->cache.iov_len, cache_str,
- rstr->commit.iov_len, commit_str);
-
- FREE(refer_str);
- FREE(cache_str);
- FREE(commit_str);
-} \ No newline at end of file
diff --git a/decoders/http/http_decoder_string.h b/decoders/http/http_decoder_string.h
deleted file mode 100644
index 83721e9..0000000
--- a/decoders/http/http_decoder_string.h
+++ /dev/null
@@ -1,73 +0,0 @@
-#pragma once
-
-#include "stellar/http.h"
-
-enum string_state {
- STRING_STATE_INIT,
- STRING_STATE_REFER,
- STRING_STATE_CACHE,
- STRING_STATE_COMMIT,
-};
-
-/* state transition diagram
- * +----------+
- * | |
- * \|/ |
- * +------+ |
- * | init | |
- * +------+ |
- * | |
- * +---->| |
- * | \|/ |
- * | +-------+ |
- * | | refer |--+ |
- * | +-------+ | |
- * | | | |
- * | \|/ | |
- * | +-------+ | |
- * +--| cache | | |
- * +-------+ | |
- * | | |
- * |<------+ |
- * \|/ |
- * +--------+ |
- * | commit | |
- * +--------+ |
- * | |
- * \|/ |
- * +--------+ |
- * | reset |----+
- * +--------+
- */
-
-
-//http decoder string
-struct http_decoder_string {
- hstring refer; // shallow copy
- hstring cache; // deep copy
- hstring commit;
-
- enum string_state state;
- size_t max_cache_size;
-};
-
-void http_decoder_string_refer(struct http_decoder_string *rstr,
- const char *at, size_t length);
-
-void http_decoder_string_cache(struct http_decoder_string *rstr);
-
-void http_decoder_string_commit(struct http_decoder_string *rstr);
-
-void http_decoder_string_reset(struct http_decoder_string *rstr);
-
-void http_decoder_string_init(struct http_decoder_string *rstr,
- size_t max_cache_size);
-
-void http_decoder_string_reinit(struct http_decoder_string *rstr);
-
-enum string_state http_decoder_string_state(const struct http_decoder_string *rstr);
-
-int http_decoder_string_get(const struct http_decoder_string *rstr, char **name, size_t *name_len);
-
-void http_decoder_string_dump(struct http_decoder_string *rstr, const char *desc);
- \ No newline at end of file
diff --git a/decoders/http/http_decoder_table.c b/decoders/http/http_decoder_table.c
deleted file mode 100644
index c85b876..0000000
--- a/decoders/http/http_decoder_table.c
+++ /dev/null
@@ -1,579 +0,0 @@
-#include <assert.h>
-#include <stdlib.h>
-#include <string.h>
-#include "http_decoder_private.h"
-
-#define INIT_HEADER_CNT 16
-#define MAX_URI_CACHE_SIZE 2048
-#define MAX_STATUS_CACHE_SIZE 32
-#define MAX_METHOD_CACHE_SIZE 8
-#define MAX_VERSION_CACHE_SIZE 4
-#define MAX_HEADER_KEY_CACHE_SIZE 4096
-#define MAX_HEADER_VALUE_CACHE_SIZE 4096
-
-struct http_decoder_header
-{
- struct http_decoder_string key;
- struct http_decoder_string val;
-};
-
-struct http_decoder_table
-{
- struct http_decoder_string uri;
- struct http_decoder_string status;
- struct http_decoder_string method;
- struct http_decoder_string version;
- struct http_decoder_string body;
-
- nmx_pool_t *ref_mempool;
- int header_complete; // flag for all headers parsed completely
- size_t header_cnt;
- size_t header_index; // current parsing header
- size_t header_iter; // plugins iterate cursor
- size_t commit_header_index; // pushed to plugins, whether has called http_message_get0_next_header()
- struct http_decoder_header *headers;
-};
-
-static void http_decoder_table_init(struct http_decoder_table *table)
-{
- if (NULL == table)
- {
- return;
- }
-
- struct http_decoder_header *header = NULL;
- assert(table);
-
- http_decoder_string_init(&table->uri, MAX_URI_CACHE_SIZE);
- http_decoder_string_init(&table->status, MAX_STATUS_CACHE_SIZE);
- http_decoder_string_init(&table->method, MAX_METHOD_CACHE_SIZE);
- http_decoder_string_init(&table->version, MAX_METHOD_CACHE_SIZE);
-
- for (size_t i = 0; i < table->header_cnt; i++)
- {
- header = &table->headers[i];
- http_decoder_string_init(&header->key, MAX_HEADER_KEY_CACHE_SIZE);
- http_decoder_string_init(&header->val, MAX_HEADER_VALUE_CACHE_SIZE);
- }
-
- http_decoder_string_init(&table->body, 0);
-}
-
-struct http_decoder_table *http_decoder_table_new(nmx_pool_t *mempool)
-{
- struct http_decoder_table *table =
- MEMPOOL_CALLOC(mempool, struct http_decoder_table, 1);
- assert(table);
-
- table->ref_mempool = mempool;
- table->header_cnt = INIT_HEADER_CNT;
- table->headers = MEMPOOL_CALLOC(mempool, struct http_decoder_header,
- table->header_cnt);
- table->commit_header_index = 0;
- http_decoder_table_init(table);
-
- return table;
-}
-
-void http_decoder_table_free(struct http_decoder_table *table)
-{
- if (NULL == table)
- {
- return;
- }
- if (table->uri.cache.iov_base != NULL)
- {
- FREE(table->uri.cache.iov_base);
- }
- if (table->status.cache.iov_base != NULL)
- {
- FREE(table->status.cache.iov_base);
- }
- if (table->method.cache.iov_base != NULL)
- {
- FREE(table->method.cache.iov_base);
- }
- if (table->version.cache.iov_base != NULL)
- {
- FREE(table->version.cache.iov_base);
- }
- if (table->body.cache.iov_base != NULL)
- {
- FREE(table->body.cache.iov_base);
- }
-
- if (table->headers != NULL)
- {
- for (size_t i = 0; i < table->header_cnt; i++)
- {
- if (table->headers[i].key.cache.iov_base != NULL)
- {
- FREE(table->headers[i].key.cache.iov_base);
- }
-
- if (table->headers[i].val.cache.iov_base != NULL)
- {
- FREE(table->headers[i].val.cache.iov_base);
- }
- }
-
- MEMPOOL_FREE(table->ref_mempool, table->headers);
- table->headers = NULL;
- }
- MEMPOOL_FREE(table->ref_mempool, table);
-}
-
-enum string_state http_decoder_table_state(struct http_decoder_table *table, enum http_item type)
-{
- if (NULL == table)
- {
- return STRING_STATE_INIT;
- }
- struct http_decoder_header *header = NULL;
- enum string_state state = STRING_STATE_INIT;
- assert(table);
-
- switch (type)
- {
- case HTTP_ITEM_URI:
- state = http_decoder_string_state(&table->uri);
- break;
- case HTTP_ITEM_STATUS:
- state = http_decoder_string_state(&table->status);
- break;
- case HTTP_ITEM_METHOD:
- state = http_decoder_string_state(&table->method);
- break;
- case HTTP_ITEM_VERSION:
- state = http_decoder_string_state(&table->version);
- break;
- case HTTP_ITEM_HDRKEY:
- assert(table->header_index < table->header_cnt);
- header = &table->headers[table->header_index];
- state = http_decoder_string_state(&header->key);
- break;
- case HTTP_ITEM_HDRVAL:
- assert(table->header_index < table->header_cnt);
- header = &table->headers[table->header_index];
- state = http_decoder_string_state(&header->val);
- break;
- case HTTP_ITEM_BODY:
- state = http_decoder_string_state(&table->body);
- break;
- default:
- abort();
- break;
- }
-
- return state;
-}
-
-void http_decoder_table_refer(struct http_decoder_table *table, enum http_item type, const char *at, size_t len)
-{
- if (NULL == table)
- {
- return;
- }
-
- struct http_decoder_header *header = NULL;
- assert(table);
-
- switch (type)
- {
- case HTTP_ITEM_URI:
- http_decoder_string_refer(&table->uri, at, len);
- break;
- case HTTP_ITEM_STATUS:
- http_decoder_string_refer(&table->status, at, len);
- break;
- case HTTP_ITEM_METHOD:
- http_decoder_string_refer(&table->method, at, len);
- break;
- case HTTP_ITEM_VERSION:
- http_decoder_string_refer(&table->version, at, len);
- break;
- case HTTP_ITEM_HDRKEY:
- assert(table->header_index < table->header_cnt);
- header = &table->headers[table->header_index];
- http_decoder_string_refer(&header->key, at, len);
- break;
- case HTTP_ITEM_HDRVAL:
- assert(table->header_index < table->header_cnt);
- header = &table->headers[table->header_index];
- http_decoder_string_refer(&header->val, at, len);
- break;
- case HTTP_ITEM_BODY:
- http_decoder_string_refer(&table->body, at, len);
- break;
- default:
- abort();
- break;
- }
-}
-
-void http_decoder_table_cache(struct http_decoder_table *table, enum http_item type)
-{
- if (NULL == table)
- {
- return;
- }
-
- struct http_decoder_header *header = NULL;
- assert(table);
-
- switch (type)
- {
- case HTTP_ITEM_URI:
- http_decoder_string_cache(&table->uri);
- break;
- case HTTP_ITEM_STATUS:
- http_decoder_string_cache(&table->status);
- break;
- case HTTP_ITEM_METHOD:
- http_decoder_string_cache(&table->method);
- break;
- case HTTP_ITEM_VERSION:
- http_decoder_string_cache(&table->version);
- break;
- case HTTP_ITEM_HDRKEY:
- assert(table->header_index < table->header_cnt);
- header = &table->headers[table->header_index];
- http_decoder_string_cache(&header->key);
- break;
- case HTTP_ITEM_HDRVAL:
- assert(table->header_index < table->header_cnt);
- header = &table->headers[table->header_index];
- http_decoder_string_cache(&header->val);
- break;
- case HTTP_ITEM_BODY:
- http_decoder_string_cache(&table->body);
- break;
- default:
- abort();
- break;
- }
-}
-
-void http_decoder_table_commit(struct http_decoder_table *table, enum http_item type)
-{
- if (NULL == table)
- {
- return;
- }
-
- size_t i = 0;
- struct http_decoder_header *header = NULL;
- assert(table);
-
- switch (type)
- {
- case HTTP_ITEM_URI:
- http_decoder_string_commit(&table->uri);
- break;
- case HTTP_ITEM_STATUS:
- http_decoder_string_commit(&table->status);
- break;
- case HTTP_ITEM_METHOD:
- http_decoder_string_commit(&table->method);
- break;
- case HTTP_ITEM_VERSION:
- http_decoder_string_commit(&table->version);
- break;
- case HTTP_ITEM_HDRKEY:
- assert(table->header_index < table->header_cnt);
- header = &table->headers[table->header_index];
- http_decoder_string_commit(&header->key);
- break;
- case HTTP_ITEM_HDRVAL:
- header = &table->headers[table->header_index];
- http_decoder_string_commit(&header->val);
- // inc index
- if ((table->header_index + 1) >= table->header_cnt)
- {
- struct http_decoder_header *old_headers = table->headers;
- table->headers =
- MEMPOOL_CALLOC(table->ref_mempool, struct http_decoder_header,
- table->header_cnt * 2);
- table->header_cnt *= 2;
-
- for (i = 0; i <= table->header_index; i++)
- {
- table->headers[i] = old_headers[i];
- }
-
- MEMPOOL_FREE(table->ref_mempool, old_headers);
-
- for (i = table->header_index + 1; i < table->header_cnt; i++)
- {
- header = &table->headers[i];
- memset(header, 0, sizeof(struct http_decoder_header));
- http_decoder_string_init(&header->key, MAX_HEADER_KEY_CACHE_SIZE);
- http_decoder_string_init(&header->val, MAX_HEADER_VALUE_CACHE_SIZE);
- }
- }
- table->header_index++;
- break;
- case HTTP_ITEM_BODY:
- http_decoder_string_commit(&table->body);
- break;
- default:
- abort();
- break;
- }
-}
-
-void http_decoder_table_reset(struct http_decoder_table *table, enum http_item type)
-{
- if (NULL == table)
- {
- return;
- }
-
- struct http_decoder_header *header = NULL;
- assert(table);
-
- switch (type)
- {
- case HTTP_ITEM_URI:
- http_decoder_string_reset(&table->uri);
- break;
- case HTTP_ITEM_STATUS:
- http_decoder_string_reset(&table->status);
- break;
- case HTTP_ITEM_METHOD:
- http_decoder_string_reset(&table->method);
- break;
- case HTTP_ITEM_VERSION:
- http_decoder_string_reset(&table->version);
- break;
- case HTTP_ITEM_HDRKEY:
- header = &table->headers[table->header_index];
- http_decoder_string_reset(&header->key);
- break;
- case HTTP_ITEM_HDRVAL:
- header = &table->headers[table->header_index];
- http_decoder_string_reset(&header->val);
- break;
- case HTTP_ITEM_BODY:
- http_decoder_string_reset(&table->body);
- break;
- default:
- abort();
- break;
- }
-}
-
-void http_decoder_table_reinit(struct http_decoder_table *table)
-{
- assert(table);
- struct http_decoder_header *header = NULL;
-
- http_decoder_string_reinit(&table->uri);
- http_decoder_string_reinit(&table->status);
- http_decoder_string_reinit(&table->method);
- http_decoder_string_reinit(&table->version);
- // for (size_t i = 0; i < table->header_iter; i++) {
- for (size_t i = 0; i < table->commit_header_index; i++)
- {
- // todo, reset header_index, avoid realloc headers as much as possible
- header = &table->headers[i];
- http_decoder_string_reinit(&header->key);
- http_decoder_string_reinit(&header->val);
- }
-
- http_decoder_string_reinit(&table->body);
-}
-
-void http_decoder_table_dump(struct http_decoder_table *table)
-{
- if (NULL == table)
- {
- return;
- }
-
- http_decoder_string_dump(&table->uri, "uri");
- http_decoder_string_dump(&table->status, "status");
- http_decoder_string_dump(&table->method, "method");
- http_decoder_string_dump(&table->version, "version");
- http_decoder_string_dump(&table->body, "body");
-
- for (size_t i = 0; i < table->header_cnt; i++)
- {
- struct http_decoder_header *header = &table->headers[i];
- if (NULL == header)
- {
- continue;
- }
-
- http_decoder_string_dump(&header->key, "key");
- http_decoder_string_dump(&header->val, "val");
- }
-}
-
-int http_decoder_table_get_uri(const struct http_decoder_table *table, char **out, size_t *out_len)
-{
- if (NULL == table || NULL == out)
- {
- return -1;
- }
- return http_decoder_string_get(&table->uri, out, out_len);
-}
-
-int http_decoder_table_get_method(const struct http_decoder_table *table, char **out, size_t *out_len)
-{
- if (NULL == table || NULL == out)
- {
- return -1;
- }
- return http_decoder_string_get(&table->method, out, out_len);
-}
-
-int http_decoder_table_get_status(const struct http_decoder_table *table, char **out, size_t *out_len)
-{
- if (NULL == table || NULL == out)
- {
- return -1;
- }
- return http_decoder_string_get(&table->status, out, out_len);
-}
-
-int http_decoder_table_get_version(const struct http_decoder_table *table, char **out, size_t *out_len)
-{
- if (NULL == table || NULL == out)
- {
- return -1;
- }
- return http_decoder_string_get(&table->version, out, out_len);
-}
-
-int http_decoder_table_get_body(const struct http_decoder_table *table, char **out, size_t *out_len)
-{
- if (NULL == table || NULL == out)
- {
- return -1;
- }
- return http_decoder_string_get(&table->body, (char **)out, out_len);
-}
-
-int http_decoder_table_get_header(const struct http_decoder_table *table, const char *name, size_t name_len,
- struct http_header_field *hdr_result)
-{
- for (size_t i = 0; i < table->header_cnt; i++)
- {
- const struct http_decoder_header *tmp_header = &table->headers[i];
- if (tmp_header->key.commit.iov_len != name_len)
- {
- continue;
- }
-
- if (http_decoder_string_state(&tmp_header->key) == STRING_STATE_COMMIT &&
- http_decoder_string_state(&tmp_header->val) == STRING_STATE_COMMIT)
- {
- hstring tmp_key;
- http_decoder_string_get(&tmp_header->key, (char **)&tmp_key.iov_base, &tmp_key.iov_len);
-
- if (tmp_key.iov_len == name_len &&
- (0 == strncasecmp((char *)tmp_key.iov_base, name, name_len)))
- {
- http_decoder_string_get(&tmp_header->key, &hdr_result->name, &hdr_result->name_len);
- http_decoder_string_get(&tmp_header->val, &hdr_result->value, &hdr_result->value_len);
- return 0;
- }
- }
- }
- return -1;
-}
-
-int http_decoder_table_iter_header(struct http_decoder_table *table, struct http_header_field *hdr)
-{
- if (NULL == table || NULL == hdr)
- {
- return -1;
- }
- if (table->header_iter >= table->header_cnt)
- {
- return -1;
- }
-
- struct http_decoder_header *tmp_header = &table->headers[table->header_iter];
- if (tmp_header != NULL)
- {
- if (http_decoder_string_state(&tmp_header->key) == STRING_STATE_COMMIT &&
- http_decoder_string_state(&tmp_header->val) == STRING_STATE_COMMIT)
- {
-
- http_decoder_string_get(&tmp_header->key, &hdr->name, &hdr->name_len);
- http_decoder_string_get(&tmp_header->val, &hdr->value, &hdr->value_len);
- table->header_iter++;
- return 1;
- }
- }
-
- hdr->name = NULL;
- hdr->name_len = 0;
- hdr->value = NULL;
- hdr->value_len = 0;
-
- return -1;
-}
-
-int http_decoder_table_reset_header_iter(struct http_decoder_table *table)
-{
- table->header_iter = 0;
- return 0;
-}
-
-int http_decoder_table_has_parsed_header(struct http_decoder_table *table)
-{
- // if (NULL == table || (table->header_iter == table->header_index)) {
- if (NULL == table || (table->commit_header_index == table->header_index))
- {
- return 0;
- }
-
- const struct http_decoder_header *tmp_header = &table->headers[table->header_iter];
-
- if (http_decoder_string_state(&tmp_header->key) == STRING_STATE_COMMIT && http_decoder_string_state(&tmp_header->val) == STRING_STATE_COMMIT)
- {
- return 1;
- }
-
- return 0;
-}
-
-int http_decoder_table_header_complete(struct http_decoder_table *table)
-{
- if (NULL == table)
- {
- return -1;
- }
- return table->header_complete;
-}
-
-void http_decoder_table_set_header_complete(struct http_decoder_table *table)
-{
- if (NULL == table)
- {
- return;
- }
- table->header_complete = 1;
-}
-
-void http_decoder_table_reset_header_complete(struct http_decoder_table *table)
-{
- if (NULL == table)
- {
- return;
- }
- table->header_complete = 0;
-}
-
-void http_decoder_table_update_commit_index(struct http_decoder_table *table)
-{
- table->commit_header_index = table->header_index;
-}
-
-int http_decoder_table_get_total_parsed_header(struct http_decoder_table *table)
-{
- return table->header_index;
-} \ No newline at end of file
diff --git a/decoders/http/http_decoder_table.h b/decoders/http/http_decoder_table.h
deleted file mode 100644
index 9a8d948..0000000
--- a/decoders/http/http_decoder_table.h
+++ /dev/null
@@ -1,79 +0,0 @@
-#pragma once
-#include <stddef.h>
-#include "stellar/http.h"
-#include "http_decoder_private.h"
-#include "http_decoder_string.h"
-
-enum http_item
-{
- HTTP_ITEM_URI = 0x01,
- HTTP_ITEM_STATUS = 0x02,
- HTTP_ITEM_METHOD = 0x03,
- HTTP_ITEM_VERSION = 0x04,
- HTTP_ITEM_HDRKEY = 0x05,
- HTTP_ITEM_HDRVAL = 0x06,
- HTTP_ITEM_BODY = 0x07,
-};
-
-struct http_decoder_table;
-struct http_decoder_table *http_decoder_table_new(nmx_pool_t *mempool);
-
-void http_decoder_table_free(struct http_decoder_table *table);
-
-enum string_state
-http_decoder_table_state(struct http_decoder_table *table, enum http_item type);
-
-void http_decoder_table_refer(struct http_decoder_table *table, enum http_item type,
- const char *at, size_t len);
-
-void http_decoder_table_cache(struct http_decoder_table *table, enum http_item type);
-
-void http_decoder_table_commit(struct http_decoder_table *table, enum http_item type);
-
-void http_decoder_table_reset(struct http_decoder_table *table, enum http_item type);
-
-void http_decoder_table_reinit(struct http_decoder_table *table);
-
-void http_decoder_table_dump(struct http_decoder_table *table);
-
-int http_decoder_table_get_uri(const struct http_decoder_table *table, char **out, size_t *out_len);
-
-int http_decoder_table_get_method(const struct http_decoder_table *table, char **out, size_t *out_len);
-
-int http_decoder_table_get_status(const struct http_decoder_table *table, char **out, size_t *out_len);
-
-int http_decoder_table_get_version(const struct http_decoder_table *table, char **out, size_t *out_len);
-
-int http_decoder_table_get_body(const struct http_decoder_table *table, char **out, size_t *out_len);
-
-int http_decoder_table_get_header(const struct http_decoder_table *table,
- const char *name, size_t name_len,
- struct http_header_field *hdr_res);
-
-int http_decoder_table_iter_header(struct http_decoder_table *table,
- struct http_header_field *hdr);
-int http_decoder_table_reset_header_iter(struct http_decoder_table *table);
-/**
- * @brief Is there a parsed header
- *
- * @retval yes(1) no(0)
- */
-int http_decoder_table_has_parsed_header(struct http_decoder_table *table);
-
-/**
- * @brief If headers have been parsed completely
- *
- * @retval yes(1) no(0)
- */
-int http_decoder_table_header_complete(struct http_decoder_table *table);
-
-/**
- * @brief set flag for headers parsed completely
- */
-void http_decoder_table_set_header_complete(struct http_decoder_table *table);
-
-void http_decoder_table_reset_header_complete(struct http_decoder_table *table);
-
-void http_decoder_table_update_commit_index(struct http_decoder_table *table);
-
-int http_decoder_table_get_total_parsed_header(struct http_decoder_table *table);
diff --git a/decoders/http/http_decoder_tunnel.c b/decoders/http/http_decoder_tunnel.c
deleted file mode 100644
index a6abda8..0000000
--- a/decoders/http/http_decoder_tunnel.c
+++ /dev/null
@@ -1,116 +0,0 @@
-#include <assert.h>
-#include <stdio.h>
-#include <string.h>
-#include <strings.h>
-#include <unistd.h>
-#include "http_decoder_private.h"
-#include "llhttp.h"
-
-struct http_tunnel_message
-{
- enum http_tunnel_message_type type;
- hstring tunnel_payload;
-};
-
-int httpd_tunnel_identify(struct http_decoder_env *httpd_env, int curdir, struct http_decoder_half_data *hfdata)
-{
- if (0 == httpd_env->hd_cfg.proxy_enable)
- {
- return 0;
- }
-
- if (FLOW_TYPE_C2S == curdir)
- {
- struct http_request_line reqline = {};
- http_decoder_half_data_get_request_line(hfdata, &reqline);
- if (0 == http_strncasecmp_safe("CONNECT", (char *)reqline.method,
- 7, reqline.method_len))
- {
- return 1;
- }
- }
- else
- {
- struct http_response_line resline = {};
- http_decoder_half_data_get_response_line(hfdata, &resline);
- if (resline.status_code == HTTP_STATUS_OK && 0 == http_strncasecmp_safe("Connection established", (char *)resline.status,
- strlen("Connection established"), resline.status_len))
- {
- return 1;
- }
- }
-
- return 0;
-}
-
-int httpd_is_tunnel_session(const struct http_decoder_env *httpd_env, const struct http_decoder_exdata *ex_data)
-{
- if (0 == httpd_env->hd_cfg.proxy_enable)
- {
- return 0;
- }
- return (ex_data && ex_data->tunnel_state != HTTP_TUN_NON);
-}
-
-int httpd_in_tunnel_transmitting(const struct http_decoder_env *httpd_env, struct http_decoder_exdata *ex_data)
-{
- if (0 == httpd_env->hd_cfg.proxy_enable)
- {
- return 0;
- }
- return (ex_data && ex_data->tunnel_state >= HTTP_TUN_INNER_STARTING);
-}
-
-enum http_tunnel_message_type httpd_tunnel_state_to_msg(const struct http_decoder_exdata *ex_data)
-{
- if (ex_data->tunnel_state == HTTP_TUN_INNER_STARTING)
- {
- return HTTP_TUNNEL_OPENING;
- }
- if (ex_data->tunnel_state == HTTP_TUN_INNER_TRANS)
- {
- return HTTP_TUNNEL_ACTIVE;
- }
- return HTTP_TUNNEL_MSG_MAX;
-}
-
-void httpd_tunnel_state_update(struct http_decoder_exdata *ex_data)
-{
- if (ex_data->tunnel_state == HTTP_TUN_INNER_STARTING)
- {
- ex_data->tunnel_state = HTTP_TUN_INNER_TRANS;
- }
-}
-
-void http_decoder_push_tunnel_data(struct session *sess, const struct http_decoder_exdata *exdata, enum http_tunnel_message_type type, const char *payload, uint16_t payload_len)
-{
- struct http_tunnel_message *tmsg = (struct http_tunnel_message *)CALLOC(struct http_tunnel_message, 1);
- tmsg->type = type;
- tmsg->tunnel_payload.iov_base = (char *)payload;
- tmsg->tunnel_payload.iov_len = payload_len;
- session_mq_publish_message(sess, exdata->pub_topic_id, tmsg);
-}
-
-#ifdef __cplusplus
-extern "C"
-{
-#endif
- void http_tunnel_message_get_payload(const struct http_tunnel_message *tmsg,
- hstring *tunnel_payload)
- {
- if (unlikely(NULL == tmsg || tunnel_payload == NULL))
- {
- return;
- }
- tunnel_payload->iov_base = tmsg->tunnel_payload.iov_base;
- tunnel_payload->iov_len = tmsg->tunnel_payload.iov_len;
- }
-
- enum http_tunnel_message_type http_tunnel_message_type_get(const struct http_tunnel_message *tmsg)
- {
- return tmsg->type;
- }
-
-#ifdef __cplusplus
-}
-#endif
diff --git a/decoders/http/http_decoder_tunnel.h b/decoders/http/http_decoder_tunnel.h
deleted file mode 100644
index 52882e5..0000000
--- a/decoders/http/http_decoder_tunnel.h
+++ /dev/null
@@ -1,36 +0,0 @@
-#pragma once
-#include "http_decoder_private.h"
-#include "http_decoder_half.h"
-
-enum http_tunnel_state
-{
- HTTP_TUN_NON = 0, // init, or not tunnel session
- HTTP_TUN_C2S_HDR_START, // CONNECT ...
- HTTP_TUN_C2S_END, // CONNECT request end
- HTTP_TUN_S2C_START, // HTTP 200 connet established
- HTTP_TUN_INNER_STARTING, // http inner tunnel protocol starting
- HTTP_TUN_INNER_TRANS, // http inner tunnel protocol transmitting
-};
-
-/************************************************************
- * HTTP TUNNEL WITH CONNECT METHOD.
- *************************************************************/
-struct http_tunnel_message;
-#define HTTP_DECODER_TUNNEL_TOPIC "HTTP_DECODER_TUNNEL_MESSAGE"
-
-enum http_tunnel_message_type
-{
- HTTP_TUNNEL_OPENING,
- HTTP_TUNNEL_ACTIVE,
- HTTP_TUNNEL_CLOSING,
- HTTP_TUNNEL_MSG_MAX
-};
-enum http_tunnel_message_type http_tunnel_message_type_get(const struct http_tunnel_message *tmsg);
-void http_tunnel_message_get_payload(const struct http_tunnel_message *tmsg, struct iovec *tunnel_payload);
-
-int httpd_tunnel_identify(struct http_decoder_env *httpd_env, int curdir, struct http_decoder_half_data *hfdata);
-int httpd_is_tunnel_session(const struct http_decoder_env *httpd_env, const struct http_decoder_exdata *ex_data);
-int httpd_in_tunnel_transmitting(const struct http_decoder_env *httpd_env, struct http_decoder_exdata *ex_data);
-void httpd_tunnel_state_update(struct http_decoder_exdata *ex_data);
-void http_decoder_push_tunnel_data(struct session *sess, const struct http_decoder_exdata *exdata, enum http_tunnel_message_type type, const char *payload, uint16_t payload_len);
-enum http_tunnel_message_type httpd_tunnel_state_to_msg(const struct http_decoder_exdata *ex_data); \ No newline at end of file
diff --git a/decoders/http/http_decoder_utils.c b/decoders/http/http_decoder_utils.c
index 08c66b0..6c20ce7 100644
--- a/decoders/http/http_decoder_utils.c
+++ b/decoders/http/http_decoder_utils.c
@@ -1,311 +1,367 @@
#include <string.h>
#include <assert.h>
+#include <arpa/inet.h>
#include "stellar/http.h"
-#include "http_decoder_private.h"
+#include "stellar/utils.h"
+#include "http_decoder_utils.h"
+#include "http_decoder.h"
+#include "llhttp.h"
-char *http_safe_dup(const char *str, size_t len)
+#ifdef __cplusplus
+extern "C"
{
- if (str == NULL || len == 0)
- {
- return NULL;
- }
- char *dup = CALLOC(char, len + 1);
- memcpy(dup, str, len);
- return dup;
-}
+#endif
-int http_strncasecmp_safe(const char *fix_s1, const char *dyn_s2, size_t fix_n1, size_t dyn_n2)
-{
- if (fix_s1 == NULL || dyn_s2 == NULL)
+ char *http_safe_dup(const char *str, size_t len)
{
- return -1;
+ if (str == NULL || len == 0)
+ {
+ return NULL;
+ }
+ char *dup = CALLOC(char, len + 1);
+ memcpy(dup, str, len);
+ return dup;
}
- if (fix_n1 != dyn_n2)
+
+ int http_strncasecmp_safe(const char *fix_s1, const char *dyn_s2, size_t fix_n1, size_t dyn_n2)
{
- return -1;
+ if (fix_s1 == NULL || dyn_s2 == NULL)
+ {
+ return -1;
+ }
+ if (fix_n1 != dyn_n2)
+ {
+ return -1;
+ }
+ return strncasecmp(fix_s1, dyn_s2, fix_n1);
}
- return strncasecmp(fix_s1, dyn_s2, fix_n1);
-}
-const char *http_message_type_to_string(enum http_message_type type)
-{
- const char *sname = "unknown_msg_type";
-
- switch (type)
+ const char *http_topic_type_to_string(enum http_topic_type type)
{
- case HTTP_MESSAGE_REQ_LINE:
- sname = "HTTP_MESSAGE_REQ_LINE";
- break;
- case HTTP_MESSAGE_REQ_HEADER:
- sname = "HTTP_MESSAGE_REQ_HEADER";
- break;
- case HTTP_MESSAGE_REQ_HEADER_END:
- sname = "HTTP_MESSAGE_REQ_HEADER_END";
- break;
- case HTTP_MESSAGE_REQ_BODY_START:
- sname = "HTTP_MESSAGE_REQ_BODY_START";
- break;
- case HTTP_MESSAGE_REQ_BODY:
- sname = "HTTP_MESSAGE_REQ_BODY";
- break;
- case HTTP_MESSAGE_REQ_BODY_END:
- sname = "HTTP_MESSAGE_REQ_BODY_END";
- break;
- case HTTP_MESSAGE_RES_LINE:
- sname = "HTTP_MESSAGE_RES_LINE";
- break;
- case HTTP_MESSAGE_RES_HEADER:
- sname = "HTTP_MESSAGE_RES_HEADER";
- break;
- case HTTP_MESSAGE_RES_HEADER_END:
- sname = "HTTP_MESSAGE_RES_HEADER_END";
- break;
- case HTTP_MESSAGE_RES_BODY_START:
- sname = "HTTP_MESSAGE_RES_BODY_START";
- break;
- case HTTP_MESSAGE_RES_BODY:
- sname = "HTTP_MESSAGE_RES_BODY";
- break;
- case HTTP_MESSAGE_RES_BODY_END:
- sname = "HTTP_MESSAGE_RES_BODY_END";
- break;
- case HTTP_TRANSACTION_START:
- sname = "HTTP_TRANSACTION_START";
- break;
- case HTTP_TRANSACTION_END:
- sname = "HTTP_TRANSACTION_END";
- break;
+ const char *sname = "unknown_topic";
- default:
- break;
+ switch (type)
+ {
+ case HTTP_TOPIC_REQ_HEADER:
+ sname = "HTTP_TOPIC_REQ_HEADER";
+ break;
+ case HTTP_TOPIC_REQ_BODY:
+ sname = "HTTP_TOPIC_REQ_BODY";
+ break;
+ case HTTP_TOPIC_RES_HEADER:
+ sname = "HTTP_TOPIC_RES_HEADER";
+ break;
+ case HTTP_TOPIC_RES_BODY:
+ sname = "HTTP_TOPIC_RES_BODY";
+ break;
+ default:
+ break;
+ }
+ return sname;
}
- return sname;
-}
-
-int http_message_type_is_req(struct session *sess, enum http_message_type msg_type)
-{
- int is_req_msg = 0;
-
- switch (msg_type)
- {
- case HTTP_MESSAGE_REQ_LINE:
- case HTTP_MESSAGE_REQ_HEADER:
- case HTTP_MESSAGE_REQ_HEADER_END:
- case HTTP_MESSAGE_REQ_BODY_START:
- case HTTP_MESSAGE_REQ_BODY:
- case HTTP_MESSAGE_REQ_BODY_END:
- is_req_msg = 1;
- break;
+ static const unsigned char __g_httpd_hextable[] = {
+ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 0, 0, 0, /* 0x30 - 0x3f */
+ 0, 10, 11, 12, 13, 14, 15, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* 0x40 - 0x4f */
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* 0x50 - 0x5f */
+ 0, 10, 11, 12, 13, 14, 15 /* 0x60 - 0x66 */
+ };
- case HTTP_MESSAGE_RES_LINE:
- case HTTP_MESSAGE_RES_HEADER:
- case HTTP_MESSAGE_RES_HEADER_END:
- case HTTP_MESSAGE_RES_BODY_START:
- case HTTP_MESSAGE_RES_BODY:
- case HTTP_MESSAGE_RES_BODY_END:
- is_req_msg = 0;
- break;
+/* the input is a single hex digit */
+#define onehex2dec(x) __g_httpd_hextable[x - '0']
- case HTTP_TRANSACTION_START:
- case HTTP_TRANSACTION_END:
+#include <ctype.h>
+ // https://github.com/curl/curl/blob/2e930c333658725657b94a923d175c6622e0f41d/lib/urlapi.c
+ // void httpd_url_decode(const char *string, size_t length, char *ostring, size_t *olen)
+ size_t http_url_decode(const char *string, size_t length, char *ostring, size_t olen)
{
- enum flow_type cur_dir = session_get_flow_type(sess);
- if (FLOW_TYPE_C2S == cur_dir)
+ char *ns = ostring;
+ if (NULL == string || NULL == ostring || 0 == olen)
{
- is_req_msg = 1;
+ return 0;
}
- else
+ size_t alloc = length;
+ while (alloc)
{
- is_req_msg = 0;
+ unsigned char in = (unsigned char)*string;
+ if (('%' == in) && (alloc > 2) &&
+ isxdigit(string[1]) && isxdigit(string[2]))
+ {
+ /* this is two hexadecimal digits following a '%' */
+ in = (unsigned char)(onehex2dec(string[1]) << 4) | onehex2dec(string[2]);
+ string += 3;
+ alloc -= 3;
+ }
+ else
+ {
+ string++;
+ alloc--;
+ }
+ *ns++ = (char)in;
+ // if ((size_t)(ns - ostring) >= olen - 1)
+ // {
+ // return 1;
+ // }
}
+ return ns - ostring;
}
- break;
-
- default:
- assert(0);
- fprintf(stderr, "unknow message type:%d\n", (int)msg_type);
- break;
- }
- return is_req_msg;
-}
-int http_event_is_req(enum http_event event)
-{
- switch (event)
+ int httpd_url_is_encoded(const char *url, size_t len)
{
- case HTTP_EVENT_REQ_INIT:
- case HTTP_EVENT_REQ_LINE:
- case HTTP_EVENT_REQ_HDR:
- case HTTP_EVENT_REQ_HDR_END:
- case HTTP_EVENT_REQ_BODY_BEGIN:
- case HTTP_EVENT_REQ_BODY_DATA:
- case HTTP_EVENT_REQ_BODY_END:
- case HTTP_EVENT_REQ_END:
- return 1;
- break;
-
- case HTTP_EVENT_RES_INIT:
- case HTTP_EVENT_RES_LINE:
- case HTTP_EVENT_RES_HDR:
- case HTTP_EVENT_RES_HDR_END:
- case HTTP_EVENT_RES_BODY_BEGIN:
- case HTTP_EVENT_RES_BODY_DATA:
- case HTTP_EVENT_RES_BODY_END:
- case HTTP_EVENT_RES_END:
+ for (size_t i = 0; i < len; i++)
+ {
+ if (url[i] == '%')
+ {
+ return 1;
+ }
+ }
return 0;
- break;
-
- default:
- assert(0);
- fprintf(stderr, "unknow event type:%d\n", (int)event);
- break;
}
- return -1;
-}
-int stellar_session_mq_get_topic_id_reliable(struct stellar *st, const char *topic_name,
- stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
-{
- int topic_id = stellar_mq_get_topic_id(st, topic_name);
- if (topic_id < 0)
+ static void httpd_set_tcp_addr(const struct tcphdr *tcph, struct http_session_addr *addr, enum flow_type fdir)
{
- topic_id = stellar_mq_create_topic(st, topic_name, msg_free_cb, msg_free_arg);
+ if (FLOW_TYPE_C2S == fdir)
+ {
+ addr->sport = tcph->th_sport;
+ addr->dport = tcph->th_dport;
+ }
+ else
+ {
+ addr->sport = tcph->th_dport;
+ addr->dport = tcph->th_sport;
+ }
}
- return topic_id;
-}
-
-static const unsigned char __g_httpd_hextable[] = {
- 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 0, 0, 0, /* 0x30 - 0x3f */
- 0, 10, 11, 12, 13, 14, 15, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* 0x40 - 0x4f */
- 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* 0x50 - 0x5f */
- 0, 10, 11, 12, 13, 14, 15 /* 0x60 - 0x66 */
-};
-
-/* the input is a single hex digit */
-#define onehex2dec(x) __g_httpd_hextable[x - '0']
-
-#include <ctype.h>
-// https://github.com/curl/curl/blob/2e930c333658725657b94a923d175c6622e0f41d/lib/urlapi.c
-// void httpd_url_decode(const char *string, size_t length, char *ostring, size_t *olen)
-size_t http_url_decode(const char *string, size_t length, char *ostring, size_t olen)
-{
- char *ns = ostring;
- if (NULL == string || NULL == ostring || 0 == olen)
+ static void httpd_set_ipv4_addr(const struct ip *ip4h, struct http_session_addr *addr, enum flow_type fdir)
{
- return 0;
+ addr->ipver = 4;
+ if (FLOW_TYPE_C2S == fdir)
+ {
+ addr->saddr4 = ip4h->ip_src.s_addr;
+ addr->daddr4 = ip4h->ip_dst.s_addr;
+ }
+ else
+ {
+ addr->saddr4 = ip4h->ip_dst.s_addr;
+ addr->daddr4 = ip4h->ip_src.s_addr;
+ }
}
- size_t alloc = length;
- while (alloc)
+ static void httpd_set_ipv6_addr(const struct ip6_hdr *ip6h, struct http_session_addr *addr, enum flow_type fdir)
{
- unsigned char in = (unsigned char)*string;
- if (('%' == in) && (alloc > 2) &&
- isxdigit(string[1]) && isxdigit(string[2]))
+ addr->ipver = 6;
+ if (FLOW_TYPE_C2S == fdir)
{
- /* this is two hexadecimal digits following a '%' */
- in = (unsigned char)(onehex2dec(string[1]) << 4) | onehex2dec(string[2]);
- string += 3;
- alloc -= 3;
+ memcpy(&addr->saddr6, &ip6h->ip6_src, sizeof(struct in6_addr));
+ memcpy(&addr->daddr6, &ip6h->ip6_dst, sizeof(struct in6_addr));
}
else
{
- string++;
- alloc--;
+ memcpy(&addr->saddr6, &ip6h->ip6_dst, sizeof(struct in6_addr));
+ memcpy(&addr->daddr6, &ip6h->ip6_src, sizeof(struct in6_addr));
}
- *ns++ = (char)in;
- // if ((size_t)(ns - ostring) >= olen - 1)
- // {
- // return 1;
- // }
}
- return ns - ostring;
-}
-int httpd_url_is_encoded(const char *url, size_t len)
-{
- for (size_t i = 0; i < len; i++)
+ void httpd_session_get_addr(const struct session *sess, struct http_session_addr *addr)
{
- if (url[i] == '%')
+ if (sess == NULL || addr == NULL)
+ {
+ return;
+ }
+ enum flow_type fdir = session_get_flow_type(sess);
+ const struct packet *raw_pkt = session_get_first_packet(sess, fdir);
+ if (NULL == raw_pkt)
{
- return 1;
+ addr->ipver = 0;
+ return;
+ }
+
+ int count = packet_get_layer_count(raw_pkt);
+ for (int i = count - 1; i >= 0; i--)
+ {
+ const struct layer *layer = packet_get_layer_by_idx(raw_pkt, i);
+ if (layer->proto == LAYER_PROTO_TCP)
+ {
+ httpd_set_tcp_addr(layer->hdr.tcp, addr, fdir);
+ }
+ else if (layer->proto == LAYER_PROTO_IPV4)
+ {
+ httpd_set_ipv4_addr(layer->hdr.ip4, addr, fdir);
+ break;
+ }
+ else if (layer->proto == LAYER_PROTO_IPV6)
+ {
+ httpd_set_ipv6_addr(layer->hdr.ip6, addr, fdir);
+ break;
+ }
}
}
- return 0;
-}
-static void httpd_set_tcp_addr(const struct tcphdr *tcph, struct httpd_session_addr *addr, enum flow_type fdir)
-{
- if (FLOW_TYPE_C2S == fdir)
+ void http_session_addr_ntop(const struct http_session_addr *sesaddr, char *buf, size_t buflen)
{
- addr->sport = tcph->th_sport;
- addr->dport = tcph->th_dport;
+ char sip_str[INET6_ADDRSTRLEN] = {0};
+ char dip_str[INET6_ADDRSTRLEN] = {0};
+ uint16_t sport_host, dport_host;
+ if (sesaddr->ipver == 4)
+ {
+ inet_ntop(AF_INET, &sesaddr->saddr4, sip_str, sizeof(sip_str));
+ inet_ntop(AF_INET, &sesaddr->daddr4, dip_str, sizeof(dip_str));
+ }
+ else if (sesaddr->ipver == 6)
+ {
+ inet_ntop(AF_INET6, &sesaddr->saddr6, sip_str, sizeof(sip_str));
+ inet_ntop(AF_INET6, &sesaddr->daddr6, dip_str, sizeof(dip_str));
+ }
+ sport_host = ntohs(sesaddr->sport);
+ dport_host = ntohs(sesaddr->dport);
+ snprintf(buf, buflen, "%s:%u-%s:%u", sip_str, sport_host, dip_str, dport_host);
}
- else
+
+ struct http_buffer *http_buffer_new(void)
{
- addr->sport = tcph->th_dport;
- addr->dport = tcph->th_sport;
+ struct http_buffer *buffer = CALLOC(struct http_buffer, 1);
+ buffer->buffer = NULL;
+ buffer->buffer_size = 0;
+ buffer->reference = 1;
+ return buffer;
}
-}
-static void httpd_set_ipv4_addr(const struct ip *ip4h, struct httpd_session_addr *addr, enum flow_type fdir)
-{
- addr->ipver = 4;
- if (FLOW_TYPE_C2S == fdir)
+
+ void http_buffer_free(struct http_buffer *buffer)
{
- addr->saddr4 = ip4h->ip_src.s_addr;
- addr->daddr4 = ip4h->ip_dst.s_addr;
+ if (NULL == buffer)
+ {
+ return;
+ }
+ if (buffer->reference > 1)
+ {
+ buffer->reference--;
+ return;
+ }
+ if (buffer->buffer)
+ {
+ FREE(buffer->buffer);
+ }
+ FREE(buffer);
}
- else
+
+ int http_buffer_add(struct http_buffer *buffer, const char *data, size_t data_len)
{
- addr->saddr4 = ip4h->ip_dst.s_addr;
- addr->daddr4 = ip4h->ip_src.s_addr;
+ if (NULL == buffer || NULL == data || 0 == data_len)
+ {
+ return -1;
+ }
+ buffer->buffer = REALLOC(char, buffer->buffer, buffer->buffer_size + data_len);
+ memcpy(buffer->buffer + buffer->buffer_size, data, data_len);
+ buffer->buffer_size += data_len;
+ return 0;
}
-}
-static void httpd_set_ipv6_addr(const struct ip6_hdr *ip6h, struct httpd_session_addr *addr, enum flow_type fdir)
-{
- addr->ipver = 6;
- if (FLOW_TYPE_C2S == fdir)
+
+ int http_buffer_read(struct http_buffer *buffer, char **data, size_t *data_len)
{
- memcpy(&addr->saddr6, &ip6h->ip6_src, sizeof(struct in6_addr));
- memcpy(&addr->daddr6, &ip6h->ip6_dst, sizeof(struct in6_addr));
+ if (NULL == buffer)
+ {
+ if (data)
+ {
+ *data = NULL;
+ }
+ if (data_len)
+ {
+ *data_len = 0;
+ }
+ return -1;
+ }
+ *data = buffer->buffer;
+ *data_len = buffer->buffer_size;
+ return 0;
}
- else
+
+ char *http_string_dup(const char *str, size_t len)
{
- memcpy(&addr->saddr6, &ip6h->ip6_dst, sizeof(struct in6_addr));
- memcpy(&addr->daddr6, &ip6h->ip6_src, sizeof(struct in6_addr));
+ if (NULL == str || 0 == len)
+ {
+ return NULL;
+ }
+ char *new_str = ALLOC(char, len);
+ memcpy(new_str, str, len);
+ return new_str;
}
-}
-void httpd_session_get_addr(const struct session *sess, struct httpd_session_addr *addr)
-{
- if (sess == NULL || addr == NULL)
+ long long http_strtoll(const char *str, size_t strlen)
{
- return;
+ if (NULL == str || 0 == strlen || strlen >= 19 /* INT64_MAX */)
+ {
+ return 0;
+ }
+ char tmp_str[strlen + 1];
+ memcpy(tmp_str, str, strlen);
+ tmp_str[strlen] = '\0';
+ return strtoll(tmp_str, NULL, 10);
}
- enum flow_type fdir = session_get_flow_type(sess);
- const struct packet *raw_pkt = session_get_first_packet(sess, fdir);
- if (NULL == raw_pkt)
+
+ /*
+ * return value:
+ * EOF offset of beggining.
+ */
+ size_t http_line_header_completed(const char *data, size_t data_len)
{
- addr->ipver = 0;
- return;
+ if (data_len < 4) //"\r\n\r\n"
+ {
+ return 0;
+ }
+ void *ptr = memmem(data, data_len, "\r\n\r\n", 4);
+ if (ptr != NULL)
+ {
+ return (char *)ptr - data + 4;
+ }
+ return 0;
}
- int count = packet_get_layer_count(raw_pkt);
- for (int i = count - 1; i >= 0; i--)
+ int http_protocol_identify(const char *data, size_t data_len)
{
- const struct layer *layer = packet_get_layer_by_idx(raw_pkt, i);
- if (layer->proto == LAYER_PROTO_TCP)
+ llhttp_t parser;
+ llhttp_settings_t settings;
+ enum llhttp_errno error;
+
+ if (NULL == data || 0 == data_len)
{
- httpd_set_tcp_addr(layer->hdr.tcp, addr, fdir);
+ return -1;
}
- else if (layer->proto == LAYER_PROTO_IPV4)
+ llhttp_settings_init(&settings);
+ llhttp_init(&parser, HTTP_BOTH, &settings);
+
+ data_len = MIN(HTTP_IDENTIFY_LEN, data_len);
+ error = llhttp_execute(&parser, data, data_len);
+ if (error != HPE_OK)
{
- httpd_set_ipv4_addr(layer->hdr.ip4, addr, fdir);
- break;
+ return -1;
}
- else if (layer->proto == LAYER_PROTO_IPV6)
+ return 1;
+ }
+
+ void http_truncate_extract_headers(const char *raw_data, size_t raw_data_len, const char **headers_start, const char **headers_end)
+ {
+ const char *start = memmem(raw_data, raw_data_len, "\r\n", 2);
+ if (start != NULL)
{
- httpd_set_ipv6_addr(layer->hdr.ip6, addr, fdir);
- break;
+ start += 2;
+ *headers_start = start;
}
+ else
+ {
+ *headers_start = NULL;
+ }
+ const char *end = memmem(raw_data, raw_data_len, "\r\n\r\n", 4);
+ if (end != NULL)
+ {
+ end += 4;
+ *headers_end = end;
+ }
+ else
+ {
+ *headers_start = NULL;
+ *headers_end = NULL;
+ }
+ return;
}
-} \ No newline at end of file
+
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file
diff --git a/decoders/http/http_decoder_utils.h b/decoders/http/http_decoder_utils.h
index 01a32b2..c370228 100644
--- a/decoders/http/http_decoder_utils.h
+++ b/decoders/http/http_decoder_utils.h
@@ -10,31 +10,27 @@ extern "C"
#include "stellar/stellar.h"
#include "stellar/utils.h"
#include "stellar/session.h"
-#include "stellar/stellar_mq.h"
-#include "stellar/stellar_exdata.h"
-#ifdef __cplusplus
-}
+#include "llhttp.h"
+
+#ifndef UNUSED
+#define UNUSED __attribute__((unused))
#endif
-char *http_safe_dup(const char *str, size_t len);
-int http_strncasecmp_safe(const char *fix_s1, const char *dyn_s2, size_t fix_n1, size_t dyn_n2);
-const char *http_message_type_to_string(enum http_message_type type);
-int http_message_type_is_req(struct session *sess, enum http_message_type msg_type);
-int http_event_is_req(enum http_event event);
-int stellar_session_mq_get_topic_id_reliable(struct stellar *st, const char *topic_name, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
-void httpd_url_decode(const char *string, size_t length, char *ostring, size_t *olen);
-int httpd_url_is_encoded(const char *url, size_t len);
-/******************************************************************************
- * Logger
- ******************************************************************************/
+ char *http_safe_dup(const char *str, size_t len);
+ int http_strncasecmp_safe(const char *fix_s1, const char *dyn_s2, size_t fix_n1, size_t dyn_n2);
+ void httpd_url_decode(const char *string, size_t length, char *ostring, size_t *olen);
+ int httpd_url_is_encoded(const char *url, size_t len);
+ /******************************************************************************
+ * Logger
+ ******************************************************************************/
-enum http_decoder_log_level
-{
- DEBUG = 0x11,
- WARN = 0x12,
- INFO = 0x13,
- ERROR = 0x14,
-};
+ enum http_decoder_log_level
+ {
+ DEBUG = 0x11,
+ WARN = 0x12,
+ INFO = 0x13,
+ ERROR = 0x14,
+ };
#ifndef http_decoder_log
#define http_decoder_log(level, format, ...) \
@@ -63,17 +59,45 @@ enum http_decoder_log_level
#include <netinet/in.h>
-struct httpd_session_addr
-{
- uint8_t ipver; /* 4 or 6 */
- uint16_t sport; /* network order */
- uint16_t dport; /* network order */
- union
+ struct http_session_addr
+ {
+ uint8_t ipver; /* 4 or 6 */
+ uint16_t sport; /* network order */
+ uint16_t dport; /* network order */
+ union
+ {
+ uint32_t saddr4;
+ struct in6_addr saddr6;
+ };
+ union
+ {
+ uint32_t daddr4;
+ struct in6_addr daddr6;
+ };
+ };
+ /*
+ why not use libevent evbuffer?
+ 1. evbuffer is a buffer chain, it is not suitable for http half flow cache;
+ 2. http_half_flow_buffer is a whole continuous buffer;
+ */
+ struct http_buffer
{
- uint32_t saddr4;
- uint32_t daddr4;
- struct in6_addr saddr6;
- struct in6_addr daddr6;
+ int reference; // used by other
+ char *buffer;
+ size_t buffer_size;
};
-};
-void httpd_session_get_addr(const struct session *sess, struct httpd_session_addr *addr);
+ int http_protocol_identify(const char *data, size_t data_len);
+ void httpd_session_get_addr(const struct session *sess, struct http_session_addr *addr);
+ void http_session_addr_ntop(const struct http_session_addr *sesaddr, char *buf, size_t buflen);
+ struct http_buffer *http_buffer_new(void);
+ void http_buffer_free(struct http_buffer *buffer);
+ int http_buffer_add(struct http_buffer *buffer, const char *data, size_t data_len);
+ int http_buffer_read(struct http_buffer *buffer, char **data, size_t *data_len);
+ char *http_string_dup(const char *str, size_t len);
+ long long http_strtoll(const char *str, size_t strlen);
+ size_t http_line_header_completed(const char *data, size_t data_len);
+ void http_truncate_extract_headers(const char *raw_data, size_t raw_data_len, const char **headers_start, const char **headers_end);
+
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file
diff --git a/decoders/http/version.map b/decoders/http/version.map
index a64b729..c20d021 100644
--- a/decoders/http/version.map
+++ b/decoders/http/version.map
@@ -1,11 +1,12 @@
VERS_3.0{
global:
extern "C" {
- http_message_*;
- http_decoder_init;
- http_decoder_exit;
- http_decoder_tcp_stream_msg_cb;
+ http_init;
+ http_exit;
+ http_subscribe_*;
+ http_get0_*;
http_url_decode;
+ http_module_to_http;
};
local: *;
}; \ No newline at end of file