summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-08-15 18:54:01 +0800
committeryangwei <[email protected]>2024-08-16 10:48:40 +0800
commitb6969710d69cce3ca9aeb710b49f66192d0286ca (patch)
treeb9e3c1bc98f227933d5ae98516f9e322055c4d72
parent08208cf0a5e90caf1dc649ef42bd4b259931f574 (diff)
✨ feat(decoder http): integrate http decoder codes
-rw-r--r--decoders/http/CMakeLists.txt14
-rw-r--r--decoders/http/http_content_decompress.cpp254
-rw-r--r--decoders/http/http_content_decompress.h52
-rw-r--r--decoders/http/http_decoder.cpp1209
-rw-r--r--decoders/http/http_decoder_half.cpp1216
-rw-r--r--decoders/http/http_decoder_half.h105
-rw-r--r--decoders/http/http_decoder_inc.h162
-rw-r--r--decoders/http/http_decoder_result_queue.cpp148
-rw-r--r--decoders/http/http_decoder_result_queue.h61
-rw-r--r--decoders/http/http_decoder_stat.cpp121
-rw-r--r--decoders/http/http_decoder_stat.h57
-rw-r--r--decoders/http/http_decoder_string.cpp260
-rw-r--r--decoders/http/http_decoder_string.h74
-rw-r--r--decoders/http/http_decoder_table.cpp534
-rw-r--r--decoders/http/http_decoder_table.h80
-rw-r--r--decoders/http/http_decoder_tunnel.cpp107
-rw-r--r--decoders/http/http_decoder_tunnel.h35
-rw-r--r--decoders/http/http_decoder_utils.cpp309
-rw-r--r--decoders/http/http_decoder_utils.h80
-rw-r--r--decoders/http/version.map11
20 files changed, 4889 insertions, 0 deletions
diff --git a/decoders/http/CMakeLists.txt b/decoders/http/CMakeLists.txt
new file mode 100644
index 0000000..7e49755
--- /dev/null
+++ b/decoders/http/CMakeLists.txt
@@ -0,0 +1,14 @@
+add_definitions(-fPIC)
+
+set(HTTP_SRC ${DEPS_SRC} http_decoder.cpp http_decoder_utils.cpp http_decoder_half.cpp
+ http_decoder_table.cpp http_decoder_string.cpp http_content_decompress.cpp
+ http_decoder_result_queue.cpp http_decoder_stat.cpp http_decoder_tunnel.cpp)
+
+add_library(http_decoder STATIC ${HTTP_SRC})
+set_target_properties(http_decoder PROPERTIES LINK_FLAGS "-Wl,--version-script=${PROJECT_SOURCE_DIR}/src/version.map")
+target_include_directories(http_decoder PUBLIC ${CMAKE_SOURCE_DIR}/deps/)
+target_link_libraries(http_decoder z llhttp-static fieldstat4)
+target_link_libraries(http_decoder brotli-dec-static brotli-common-static )
+set_target_properties(http_decoder PROPERTIES PREFIX "")
+
+#install(TARGETS http_decoder LIBRARY DESTINATION ${CMAKE_INSTALL_PREFIX}/plugin/${lib_name} COMPONENT LIBRARIES) \ No newline at end of file
diff --git a/decoders/http/http_content_decompress.cpp b/decoders/http/http_content_decompress.cpp
new file mode 100644
index 0000000..3b71a40
--- /dev/null
+++ b/decoders/http/http_content_decompress.cpp
@@ -0,0 +1,254 @@
+/*
+**********************************************************************************************
+* File: http_content_decompress.c
+* Description:
+* Authors: LuWenPeng <[email protected]>
+* Date: 2022-10-31
+* Copyright: (c) Since 2022 Geedge Networks, Ltd. All rights reserved.
+***********************************************************************************************
+*/
+#include <zlib.h>
+#include <string.h>
+#include <assert.h>
+#include <brotli/decode.h>
+#include "http_decoder_inc.h"
+
+#define HTTP_DECOMPRESS_BUFFER_SIZE (4096)
+
+struct http_content_decompress {
+ enum http_content_encoding encoding;
+ z_stream *z_stream_ptr;
+ BrotliDecoderState *br_state;
+ char *buffer;
+ size_t buffer_size;
+};
+
+enum http_content_encoding
+http_content_encoding_str2int(const char *content_encoding)
+{
+ if (strcasestr(content_encoding, "gzip") != NULL) {
+ return HTTP_CONTENT_ENCODING_GZIP;
+ }
+ if (strcasestr(content_encoding, "deflate") != NULL) {
+ return HTTP_CONTENT_ENCODING_DEFLATE;
+ }
+ if (strcasestr(content_encoding, "br") != NULL) {
+ return HTTP_CONTENT_ENCODING_BR;
+ }
+ return HTTP_CONTENT_ENCODING_NONE;
+}
+
+const char *
+http_content_encoding_int2str(enum http_content_encoding content_encoding)
+{
+ if (content_encoding == HTTP_CONTENT_ENCODING_GZIP) {
+ return "gzip";
+ }
+ if (content_encoding == HTTP_CONTENT_ENCODING_DEFLATE) {
+ return "deflate";
+ }
+ if (content_encoding == HTTP_CONTENT_ENCODING_BR) {
+ return "br";
+ }
+ return "unknown";
+}
+
+struct http_content_decompress *
+http_content_decompress_create(enum http_content_encoding encoding)
+{
+ struct http_content_decompress *decompress =
+ CALLOC(struct http_content_decompress, 1);
+ assert(decompress);
+
+ decompress->encoding = encoding;
+ decompress->z_stream_ptr = NULL;
+ decompress->br_state = NULL;
+
+ if (encoding == HTTP_CONTENT_ENCODING_GZIP
+ || encoding == HTTP_CONTENT_ENCODING_DEFLATE) {
+ decompress->z_stream_ptr = CALLOC(z_stream, 1);
+ assert(decompress->z_stream_ptr);
+
+ decompress->z_stream_ptr->zalloc = NULL;
+ decompress->z_stream_ptr->zfree = NULL;
+ decompress->z_stream_ptr->opaque = NULL;
+ decompress->z_stream_ptr->avail_in = 0;
+ decompress->z_stream_ptr->next_in = Z_NULL;
+
+ if (encoding == HTTP_CONTENT_ENCODING_GZIP) {
+ if (inflateInit2(decompress->z_stream_ptr, MAX_WBITS + 16)
+ != Z_OK) {
+ goto error;
+ }
+ }
+ if (encoding == HTTP_CONTENT_ENCODING_DEFLATE) {
+ if (inflateInit2(decompress->z_stream_ptr, -MAX_WBITS) != Z_OK) {
+ goto error;
+ }
+ }
+ }
+
+ if (encoding == HTTP_CONTENT_ENCODING_BR) {
+ decompress->br_state = BrotliDecoderCreateInstance(NULL, NULL, NULL);
+ if (decompress->br_state == NULL) {
+ goto error;
+ }
+ }
+ return decompress;
+
+error:
+ http_content_decompress_destroy(decompress);
+ return NULL;
+}
+
+void http_content_decompress_destroy(struct http_content_decompress *decompress)
+{
+ if (NULL == decompress) {
+ return;
+ }
+ if (decompress->z_stream_ptr != NULL) {
+ inflateEnd(decompress->z_stream_ptr);
+ FREE(decompress->z_stream_ptr);
+ }
+ if (decompress->br_state) {
+ BrotliDecoderDestroyInstance(decompress->br_state);
+ decompress->br_state = NULL;
+ }
+ FREE(decompress->buffer);
+ FREE(decompress);
+}
+
+static int
+http_content_decompress_write_zlib(struct http_content_decompress *decompress,
+ const char *indata, size_t indata_len,
+ char **outdata, size_t *outdata_len)
+{
+ z_stream *z_stream_ptr = decompress->z_stream_ptr;
+ z_stream_ptr->avail_in = (unsigned int)indata_len;
+ z_stream_ptr->next_in = (unsigned char *)indata;
+ z_stream_ptr->avail_out = (unsigned int)decompress->buffer_size;
+ z_stream_ptr->next_out = (unsigned char *)decompress->buffer;
+
+ *outdata = NULL;
+ *outdata_len = 0;
+
+ do {
+ int ret = inflate(z_stream_ptr, Z_NO_FLUSH);
+ if (ret == Z_STREAM_ERROR || ret == Z_NEED_DICT
+ || ret == Z_DATA_ERROR || ret == Z_MEM_ERROR) {
+ (void)inflateEnd(z_stream_ptr);
+ return -1;
+ }
+
+ size_t have = decompress->buffer_size - z_stream_ptr->avail_out;
+ if (have > 0) {
+ if (0 == z_stream_ptr->avail_out) {
+ fprintf(stderr, "realloc outbuffer,before: %zu bytes, after :%zu B\n", decompress->buffer_size , decompress->buffer_size + have); ;
+ decompress->buffer_size += have;
+ decompress->buffer = REALLOC(char, decompress->buffer,
+ decompress->buffer_size);
+ *outdata = decompress->buffer;
+ *outdata_len = *outdata_len + have;
+ // http_decoder_log(DEBUG, "%s realloc outbuffer %zu bytes",
+ // http_content_encoding_int2str(decompress->encoding),
+ // decompress->buffer_size);
+ z_stream_ptr->avail_out = have;
+ z_stream_ptr->next_out = (unsigned char *)decompress->buffer +
+ (decompress->buffer_size - have);
+ } else {
+ *outdata = decompress->buffer;
+ *outdata_len = have;
+ }
+ }
+ if(Z_STREAM_END == ret){
+ break;
+ }
+ } while (z_stream_ptr->avail_in != 0);
+ decompress->buffer = NULL;
+ decompress->buffer_size = 0;
+ return 0;
+}
+
+static int
+http_content_decompress_write_br(struct http_content_decompress *decompress,
+ const char *indata, size_t indata_len,
+ char **outdata, size_t *outdata_len)
+{
+ size_t available_in = indata_len;
+ const unsigned char *next_in = (const unsigned char *)indata;
+ size_t available_out = decompress->buffer_size;
+ unsigned char *next_out = (unsigned char *)decompress->buffer;
+
+ *outdata = NULL;
+ *outdata_len = 0;
+
+ for (;;) {
+ int ret = BrotliDecoderDecompressStream(decompress->br_state, &available_in,
+ &next_in, &available_out, &next_out, 0);
+ size_t have = decompress->buffer_size - available_out;
+ if (have > 0) {
+ if (0 == available_out) {
+ decompress->buffer_size += have;
+ decompress->buffer = REALLOC(char, decompress->buffer,
+ decompress->buffer_size);
+ *outdata = decompress->buffer;
+ *outdata_len = *outdata_len + have;
+ available_out = have;
+ next_out = (unsigned char *)decompress->buffer +
+ (decompress->buffer_size - have);
+ } else {
+ *outdata = decompress->buffer;
+ *outdata_len = have;
+ }
+ }
+
+ if (ret == BROTLI_DECODER_RESULT_SUCCESS ||
+ ret == BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT) {
+ decompress->buffer =NULL;
+ decompress->buffer_size = 0;
+ return 0;
+ }
+
+ if (ret == BROTLI_DECODER_RESULT_ERROR) {
+ //BrotliDecoderErrorCode errcode =
+ BrotliDecoderGetErrorCode(decompress->br_state);
+ *outdata = NULL;
+ *outdata_len = 0;
+ return -1;
+ }
+
+ assert(ret == BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT);
+ }
+}
+
+int http_content_decompress_write(struct http_content_decompress *decompress,
+ const char *indata, size_t indata_len,
+ char **outdata, size_t *outdata_len)
+{
+ assert(decompress);
+ assert(indata);
+ assert(indata_len > 0);
+ assert(outdata);
+ assert(outdata_len);
+
+ *outdata = NULL;
+ *outdata_len = 0;
+
+ if(NULL == decompress->buffer ){
+ decompress->buffer = CALLOC(char, HTTP_DECOMPRESS_BUFFER_SIZE);
+ assert(decompress->buffer);
+ decompress->buffer_size = HTTP_DECOMPRESS_BUFFER_SIZE;
+ }
+
+ if (decompress->encoding == HTTP_CONTENT_ENCODING_GZIP ||
+ decompress->encoding == HTTP_CONTENT_ENCODING_DEFLATE) {
+ return http_content_decompress_write_zlib(decompress, indata, indata_len,
+ outdata, outdata_len);
+ }
+ if (decompress->encoding == HTTP_CONTENT_ENCODING_BR) {
+ return http_content_decompress_write_br(decompress, indata, indata_len,
+ outdata, outdata_len);
+ }
+ assert(0);
+ return -1;
+} \ No newline at end of file
diff --git a/decoders/http/http_content_decompress.h b/decoders/http/http_content_decompress.h
new file mode 100644
index 0000000..3c2ba48
--- /dev/null
+++ b/decoders/http/http_content_decompress.h
@@ -0,0 +1,52 @@
+/*
+**********************************************************************************************
+* File: http_content_decompress.h
+* Description:
+* Authors: LuWenPeng <[email protected]>
+* Date: 2022-10-31
+* Copyright: (c) Since 2022 Geedge Networks, Ltd. All rights reserved.
+***********************************************************************************************
+*/
+
+
+#ifndef _HTTP_CONTENT_DECOMPRESS_H_
+#define _HTTP_CONTENT_DECOMPRESS_H_
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+#include <stddef.h>
+
+enum http_content_encoding {
+ HTTP_CONTENT_ENCODING_NONE = 0,
+ HTTP_CONTENT_ENCODING_GZIP = 1 << 1,
+ HTTP_CONTENT_ENCODING_DEFLATE = 1 << 2,
+ HTTP_CONTENT_ENCODING_BR = 1 << 3,
+};
+
+struct http_content_decompress;
+
+enum http_content_encoding
+http_content_encoding_str2int(const char *content_encoding);
+
+const char *
+http_content_encoding_int2str(enum http_content_encoding content_encoding);
+
+struct http_content_decompress *
+http_content_decompress_create(enum http_content_encoding encoding);
+
+void http_content_decompress_destroy(struct http_content_decompress *decompress);
+
+// return 0 : success
+// return -1 : error
+int http_content_decompress_write(struct http_content_decompress *decompress,
+ const char *indata, size_t indata_len,
+ char **outdata, size_t *outdata_len);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif \ No newline at end of file
diff --git a/decoders/http/http_decoder.cpp b/decoders/http/http_decoder.cpp
new file mode 100644
index 0000000..218c508
--- /dev/null
+++ b/decoders/http/http_decoder.cpp
@@ -0,0 +1,1209 @@
+#include "stellar/http.h"
+#include <assert.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include "http_decoder_inc.h"
+
+#pragma GCC diagnostic ignored "-Wunused-parameter"
+
+struct http_message *http_message_new(enum http_message_type type, struct http_decoder_result_queue *queue,
+ int queue_index, uint8_t flow_type)
+{
+ struct http_message *msg = CALLOC(struct http_message, 1);
+ msg->type = type;
+ msg->ref_queue = queue;
+ msg->queue_index = queue_index;
+ msg->flow_type = flow_type;
+ return msg;
+}
+
+struct http_message *http_body_message_new(enum http_message_type type, struct http_decoder_result_queue *queue,
+ int queue_index, uint8_t flow_type, hstring *raw_payload, hstring *decompress_payload)
+{
+ struct http_message *msg = CALLOC(struct http_message, 1);
+ msg->type = type;
+ msg->ref_queue = queue;
+ msg->queue_index = queue_index;
+ msg->flow_type = flow_type;
+ if (raw_payload)
+ {
+ msg->raw_payload.iov_base = raw_payload->iov_base;
+ msg->raw_payload.iov_len = raw_payload->iov_len;
+ }
+ if (decompress_payload)
+ {
+ msg->decompress_payload.iov_base = decompress_payload->iov_base;
+ msg->decompress_payload.iov_len = decompress_payload->iov_len;
+ }
+ return msg;
+}
+
+static void http_message_decompress_buffer_free(struct http_message *msg)
+{
+ struct http_decoder_half_data *ref_data = NULL;
+ if (HTTP_MESSAGE_REQ_BODY_START == msg->type || HTTP_MESSAGE_REQ_BODY == msg->type || HTTP_MESSAGE_REQ_BODY_END == msg->type)
+ {
+ ref_data = msg->ref_queue->array[msg->queue_index].req_data;
+ }
+ else if (HTTP_MESSAGE_RES_BODY_START == msg->type || HTTP_MESSAGE_RES_BODY == msg->type || HTTP_MESSAGE_RES_BODY_END == msg->type)
+ {
+ ref_data = msg->ref_queue->array[msg->queue_index].res_data;
+ }
+ if (ref_data != NULL && msg->decompress_payload.iov_base != NULL)
+ {
+ http_half_decompress_buffer_free(ref_data, &msg->decompress_payload);
+ }
+}
+
+static void http_message_free(void *http_msg, void *cb_arg)
+{
+ if (http_msg)
+ {
+ http_message_decompress_buffer_free((struct http_message *)http_msg);
+ FREE(http_msg);
+ }
+}
+
+static void http_event_handler(enum http_event event, struct http_decoder_half_data **data,
+ struct http_event_context *ev_ctx, void *httpd_plugin_env)
+{
+ assert(ev_ctx);
+ assert(httpd_plugin_env);
+ struct http_decoder_env *httpd_env = (struct http_decoder_env *)httpd_plugin_env;
+ size_t queue_idx = 0;
+ nmx_pool_t *mempool = ev_ctx->ref_mempool;
+ struct http_decoder_result_queue *queue = ev_ctx->ref_queue;
+ struct http_message *msg = NULL;
+ struct http_decoder_half_data *half_data = NULL;
+ int ret = 0;
+ u_int8_t flow_flag = 0;
+ struct http_decoder_exdata *exdata = ev_ctx->ref_httpd_ctx;
+
+ int thread_id = stellar_get_current_thread_index();
+
+ if (http_event_is_req(event))
+ {
+ queue_idx = http_decoder_result_queue_req_index(queue);
+ half_data = http_decoder_result_queue_peek_req(queue);
+ }
+ else
+ {
+ queue_idx = http_decoder_result_queue_res_index(queue);
+ half_data = http_decoder_result_queue_peek_res(queue);
+ }
+
+ switch (event)
+ {
+ case HTTP_EVENT_REQ_INIT:
+ half_data = http_decoder_result_queue_peek_req(queue);
+ if (half_data != NULL)
+ {
+ http_decoder_result_queue_inc_req_index(queue);
+ }
+
+ half_data = http_decoder_result_queue_peek_req(queue);
+ if (half_data != NULL)
+ {
+ half_data = http_decoder_result_queue_pop_req(queue);
+ http_decoder_half_data_free(mempool, half_data);
+ half_data = NULL;
+ }
+
+ half_data = http_decoder_half_data_new(mempool);
+ ret = http_decoder_result_queue_push_req(queue, half_data);
+ if (ret < 0)
+ {
+ fprintf(stderr, "http_decoder_result_queue_push req failed.");
+ http_decoder_half_data_free(mempool, half_data);
+ half_data = NULL;
+ }
+ *data = half_data;
+ queue_idx = http_decoder_result_queue_req_index(queue); // get the index after inc
+ /* llhttp always call on_message_begin() even if llhttp_execute() error!!! */
+ msg = http_message_new(HTTP_TRANSACTION_START, queue, queue_idx, HTTP_REQUEST);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_NEW, 1);
+ break;
+ case HTTP_EVENT_REQ_LINE:
+ msg = http_message_new(HTTP_MESSAGE_REQ_LINE, queue, queue_idx, HTTP_REQUEST);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
+ if (httpd_tunnel_identify(httpd_env, FLOW_DIRECTION_C2S, half_data))
+ {
+ exdata->tunnel_state = HTTP_TUN_C2S_HDR_START;
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TUNNEL, 1);
+ }
+ if (httpd_is_tunnel_session(httpd_env, exdata))
+ {
+ http_decoder_get_url(half_data, mempool);
+ }
+ break;
+ case HTTP_EVENT_REQ_HDR:
+ msg = http_message_new(HTTP_MESSAGE_REQ_HEADER, queue, queue_idx, HTTP_REQUEST);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
+ break;
+ case HTTP_EVENT_REQ_HDR_END:
+ {
+ http_decoder_join_url_finally(ev_ctx, half_data, mempool);
+ /* maybe some parsed headers in buffer, but has not pushed to plugins yet */
+
+ if (http_decoder_half_data_has_parsed_header(half_data))
+ {
+ msg = http_message_new(HTTP_MESSAGE_REQ_HEADER, queue, queue_idx, HTTP_REQUEST);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
+ }
+ http_half_data_update_commit_index(half_data);
+ msg = http_message_new(HTTP_MESSAGE_REQ_HEADER_END, queue, queue_idx, HTTP_REQUEST);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
+
+ int tot_c2s_headers = http_half_data_get_total_parsed_header_count(half_data);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_HEADERS_C2S, tot_c2s_headers);
+
+ hstring tmp_url = {};
+ http_half_data_get_url(half_data, &tmp_url);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_URL_BYTES, tmp_url.iov_len);
+ }
+ break;
+ case HTTP_EVENT_REQ_BODY_BEGIN:
+ msg = http_message_new(HTTP_MESSAGE_REQ_BODY_START, queue, queue_idx, HTTP_REQUEST);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
+ break;
+ case HTTP_EVENT_REQ_BODY_DATA:
+ {
+ hstring raw_body = {};
+ hstring decompress_body = {};
+ http_decoder_half_data_get_raw_body(half_data, &raw_body);
+ http_half_get_lastest_decompress_buffer(half_data, &decompress_body);
+ msg = http_body_message_new(HTTP_MESSAGE_REQ_BODY, queue, queue_idx, HTTP_REQUEST, &raw_body, &decompress_body);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
+ if(decompress_body.iov_base != NULL){
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_ZIP_BYTES, raw_body.iov_len);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_UNZIP_BYTES, decompress_body.iov_len);
+ }
+ }
+ break;
+ case HTTP_EVENT_REQ_BODY_END:
+ msg = http_message_new(HTTP_MESSAGE_REQ_BODY_END, queue, queue_idx, HTTP_REQUEST);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
+ break;
+ case HTTP_EVENT_REQ_END:
+ {
+ session_is_symmetric(ev_ctx->ref_session, &flow_flag);
+
+ if (SESSION_SEEN_C2S_FLOW == flow_flag)
+ {
+ msg = http_message_new(HTTP_TRANSACTION_END, queue, queue_idx, HTTP_REQUEST);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_FREE, 1);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_ASYMMETRY_TRANSACTION_C2S, 1);
+ }
+ if (httpd_is_tunnel_session(httpd_env, exdata))
+ {
+ if (SESSION_SEEN_C2S_FLOW == flow_flag)
+ {
+ exdata->tunnel_state = HTTP_TUN_INNER_STARTING;
+ exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].sub_topic_id;
+ }
+ else
+ {
+ exdata->tunnel_state = HTTP_TUN_C2S_END;
+ }
+ }
+ http_half_update_state(half_data, event);
+ http_decoder_result_queue_inc_req_index(queue);
+ half_data = http_decoder_result_queue_pop_req(queue);
+ if (half_data != NULL)
+ {
+ http_decoder_half_data_free(mempool, half_data);
+ half_data = NULL;
+ }
+ }
+ break;
+
+ case HTTP_EVENT_RES_INIT:
+ half_data = http_decoder_result_queue_peek_res(queue);
+ if (half_data != NULL)
+ {
+ http_decoder_result_queue_inc_res_index(queue);
+ }
+
+ half_data = http_decoder_result_queue_peek_res(queue);
+ if (half_data != NULL)
+ {
+ half_data = http_decoder_result_queue_pop_res(queue);
+ http_decoder_half_data_free(mempool, half_data);
+ half_data = NULL;
+ }
+
+ half_data = http_decoder_half_data_new(mempool);
+ ret = http_decoder_result_queue_push_res(queue, half_data);
+ if (ret < 0)
+ {
+ fprintf(stderr, "http_decoder_result_queue_push res failed.");
+ http_decoder_half_data_free(mempool, half_data);
+ half_data = NULL;
+ }
+ queue_idx = http_decoder_result_queue_res_index(queue); // get the index after inc
+ *data = half_data;
+ if (0 == session_is_symmetric(ev_ctx->ref_session, &flow_flag))
+ {
+ if (SESSION_SEEN_S2C_FLOW == flow_flag)
+ {
+ msg = http_message_new(HTTP_TRANSACTION_START, queue, queue_idx, HTTP_RESPONSE);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
+ }
+ }
+ break;
+ case HTTP_EVENT_RES_LINE:
+ msg = http_message_new(HTTP_MESSAGE_RES_LINE, queue, queue_idx, HTTP_RESPONSE);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
+ if (httpd_tunnel_identify(httpd_env, FLOW_DIRECTION_S2C, half_data))
+ {
+ exdata->tunnel_state = HTTP_TUN_S2C_START;
+ }
+ else
+ {
+ // connect response fail, reset tunnel_state
+ exdata->tunnel_state = HTTP_TUN_NON;
+ }
+ break;
+ case HTTP_EVENT_RES_HDR:
+ msg = http_message_new(HTTP_MESSAGE_RES_HEADER, queue, queue_idx, HTTP_RESPONSE);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
+ break;
+ case HTTP_EVENT_RES_HDR_END:
+ {
+ /* maybe some header in table buffer but has not pushed to plugins */
+ half_data = http_decoder_result_queue_peek_res(queue);
+ if (http_decoder_half_data_has_parsed_header(half_data))
+ {
+ msg = http_message_new(HTTP_MESSAGE_RES_HEADER, queue, queue_idx, HTTP_RESPONSE);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
+ }
+ http_half_data_update_commit_index(half_data);
+ msg = http_message_new(HTTP_MESSAGE_RES_HEADER_END, queue, queue_idx, HTTP_RESPONSE);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
+
+ int tot_s2c_headers = http_half_data_get_total_parsed_header_count(half_data);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_HEADERS_S2C, tot_s2c_headers);
+
+ if (httpd_is_tunnel_session(httpd_env, exdata))
+ {
+ exdata->tunnel_state = HTTP_TUN_INNER_STARTING;
+ http_half_pre_context_free(ev_ctx->ref_session, exdata);
+ exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].sub_topic_id;
+ }
+ }
+ break;
+ case HTTP_EVENT_RES_BODY_BEGIN:
+ msg = http_message_new(HTTP_MESSAGE_RES_BODY_START, queue, queue_idx, HTTP_RESPONSE);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
+ break;
+ case HTTP_EVENT_RES_BODY_DATA:
+ {
+ hstring raw_body = {};
+ http_decoder_half_data_get_raw_body(half_data, &raw_body);
+ hstring decompress_body = {};
+ http_half_get_lastest_decompress_buffer(half_data, &decompress_body);
+ msg = http_body_message_new(HTTP_MESSAGE_RES_BODY, queue, queue_idx, HTTP_RESPONSE, &raw_body, &decompress_body);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
+ if(decompress_body.iov_base != NULL){
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_ZIP_BYTES, raw_body.iov_len);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_UNZIP_BYTES, decompress_body.iov_len);
+ }
+ }
+ break;
+ case HTTP_EVENT_RES_BODY_END:
+ msg = http_message_new(HTTP_MESSAGE_RES_BODY_END, queue, queue_idx, HTTP_RESPONSE);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
+ break;
+ case HTTP_EVENT_RES_END:
+ msg = http_message_new(HTTP_TRANSACTION_END, queue, queue_idx, HTTP_RESPONSE);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_FREE, 1);
+ session_is_symmetric(ev_ctx->ref_session, &flow_flag);
+ if (SESSION_SEEN_S2C_FLOW == flow_flag)
+ {
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_ASYMMETRY_TRANSACTION_S2C, 1);
+ }
+
+ http_half_update_state(half_data, event);
+ http_decoder_result_queue_inc_res_index(queue);
+ half_data = http_decoder_result_queue_pop_res(queue);
+ if (half_data != NULL)
+ {
+ http_decoder_half_data_free(mempool, half_data);
+ half_data = NULL;
+ }
+ break;
+ default:
+ assert(0);
+ break;
+ }
+ if (half_data)
+ {
+ http_half_update_state(half_data, event);
+ }
+}
+
+static struct http_decoder *http_decoder_new(struct http_decoder_exdata *hd_ctx, nmx_pool_t *mempool,
+ http_event_cb *ev_cb, int decompress_switch, struct http_decoder_env *httpd_env,
+ long long req_start_seq, long long res_start_seq)
+{
+ struct http_decoder *decoder = MEMPOOL_CALLOC(mempool, struct http_decoder, 1);
+ assert(decoder);
+ decoder->c2s_half = http_decoder_half_new(hd_ctx, mempool, ev_cb, HTTP_REQUEST, decompress_switch, httpd_env, req_start_seq);
+ decoder->s2c_half = http_decoder_half_new(hd_ctx, mempool, ev_cb, HTTP_RESPONSE, decompress_switch, httpd_env, res_start_seq);
+ return decoder;
+}
+
+static void http_decoder_free(nmx_pool_t *mempool, struct http_decoder *decoder)
+{
+ if (NULL == decoder)
+ {
+ return;
+ }
+ if (decoder->c2s_half != NULL)
+ {
+ http_decoder_half_free(mempool, decoder->c2s_half);
+ decoder->c2s_half = NULL;
+ }
+ if (decoder->s2c_half != NULL)
+ {
+ http_decoder_half_free(mempool, decoder->s2c_half);
+ decoder->s2c_half = NULL;
+ }
+ MEMPOOL_FREE(mempool, decoder);
+}
+
+static struct http_decoder_exdata *http_decoder_exdata_new(size_t mempool_size, size_t queue_size,
+ int decompress_switch, struct http_decoder_env *httpd_env,
+ long long req_start_seq, long long res_start_seq)
+{
+ struct http_decoder_exdata *hd_ctx = CALLOC(struct http_decoder_exdata, 1);
+ hd_ctx->mempool = nmx_create_pool(mempool_size);
+ hd_ctx->decoder = http_decoder_new(hd_ctx, hd_ctx->mempool, http_event_handler, decompress_switch,
+ httpd_env, req_start_seq, res_start_seq);
+ hd_ctx->queue = http_decoder_result_queue_new(hd_ctx->mempool, queue_size);
+ return hd_ctx;
+}
+
+static void http_decoder_exdata_free(struct http_decoder_exdata *ex_data)
+{
+ if (unlikely(NULL == ex_data))
+ {
+ return;
+ }
+ if (ex_data->decoder != NULL)
+ {
+ http_decoder_free(ex_data->mempool, ex_data->decoder);
+ ex_data->decoder = NULL;
+ }
+ if (ex_data->queue != NULL)
+ {
+ http_decoder_result_queue_free(ex_data->mempool, ex_data->queue);
+ ex_data->queue = NULL;
+ }
+ if (ex_data->mempool)
+ {
+ nmx_destroy_pool(ex_data->mempool);
+ }
+ FREE(ex_data);
+}
+
+static int http_protocol_identify(const char *data, size_t data_len)
+{
+ llhttp_t parser;
+ llhttp_settings_t settings;
+ enum llhttp_errno error;
+
+ llhttp_settings_init(&settings);
+ llhttp_init(&parser, HTTP_BOTH, &settings);
+
+ error = llhttp_execute(&parser, data, data_len);
+ if (error != HPE_OK)
+ {
+ return -1;
+ }
+ return 1;
+}
+
+static void _http_decoder_context_free(struct http_decoder_env *env)
+{
+ if (NULL == env)
+ {
+ return;
+ }
+
+ http_decoder_stat_free(&env->hd_stat);
+
+ for (int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++)
+ {
+ if (env->topic_exdata_compose[i].msg_free_cb)
+ {
+ stellar_mq_destroy_topic(env->st, env->topic_exdata_compose[i].sub_topic_id);
+ }
+ }
+
+ FREE(env);
+}
+
+static int load_http_decoder_config(const char *cfg_path,
+ struct http_decoder_config *hd_cfg)
+{
+ FILE *fp = fopen(cfg_path, "r");
+ if (NULL == fp)
+ {
+ fprintf(stderr, "[%s:%d]Can't open config file:%s",
+ __FUNCTION__, __LINE__, cfg_path);
+ return -1;
+ }
+
+ int ret = 0;
+ char errbuf[256] = {0};
+
+ toml_table_t *root = toml_parse_file(fp, errbuf, sizeof(errbuf));
+ fclose(fp);
+
+ toml_table_t *basic_sec_tbl = toml_table_in(root, "basic");
+ if (NULL == basic_sec_tbl)
+ {
+ fprintf(stderr, "[%s:%d]config file:%s has no key: [basic]",
+ __FUNCTION__, __LINE__, cfg_path);
+ toml_free(root);
+ return -1;
+ }
+
+ toml_datum_t int_val = toml_int_in(basic_sec_tbl, "decompress");
+ if (int_val.ok != 0)
+ {
+ hd_cfg->decompress_switch = int_val.u.b;
+ }
+
+ int_val = toml_int_in(basic_sec_tbl, "mempool_size");
+ if (int_val.ok != 0)
+ {
+ hd_cfg->mempool_size = int_val.u.i;
+ }
+ else
+ {
+ hd_cfg->mempool_size = DEFAULT_MEMPOOL_SIZE;
+ }
+
+ int_val = toml_int_in(basic_sec_tbl, "result_queue_len");
+ if (int_val.ok != 0)
+ {
+ hd_cfg->result_queue_len = int_val.u.i;
+ }
+ else
+ {
+ hd_cfg->result_queue_len = HD_RESULT_QUEUE_LEN;
+ }
+
+ int_val = toml_int_in(basic_sec_tbl, "stat_interval_pkts");
+ if (int_val.ok != 0)
+ {
+ hd_cfg->stat_interval_pkts = int_val.u.i;
+ }
+ else
+ {
+ hd_cfg->stat_interval_pkts = DEFAULT_STAT_INTERVAL_PKTS;
+ }
+
+ int_val = toml_int_in(basic_sec_tbl, "stat_output_interval");
+ if (int_val.ok != 0)
+ {
+ hd_cfg->stat_output_interval = int_val.u.i;
+ }
+ else
+ {
+ hd_cfg->stat_output_interval = DEFAULT_STAT_OUTPUT_INTERVAL;
+ }
+
+ int_val = toml_int_in(basic_sec_tbl, "proxy_enable");
+ if (int_val.ok != 0)
+ {
+ hd_cfg->proxy_enable = int_val.u.i;
+ }
+ else
+ {
+ hd_cfg->proxy_enable = 0;
+ }
+
+ toml_free(root);
+ return ret;
+}
+
+static int http_msg_get_request_header(const struct http_message *msg, const hstring *key,
+ struct http_header *hdr_result)
+{
+ const struct http_decoder_half_data *req_data =
+ msg->ref_queue->array[msg->queue_index].req_data;
+ return http_decoder_half_data_get_header(req_data, key, hdr_result);
+}
+
+static int http_msg_get_response_header(const struct http_message *msg, const hstring *key,
+ struct http_header *hdr_result)
+{
+ const struct http_decoder_half_data *res_data =
+ msg->ref_queue->array[msg->queue_index].res_data;
+ return http_decoder_half_data_get_header(res_data, key, hdr_result);
+}
+
+static int http_msg_request_header_next(const struct http_message *msg,
+ struct http_header *hdr)
+{
+ const struct http_decoder_half_data *req_data =
+ msg->ref_queue->array[msg->queue_index].req_data;
+ return http_decoder_half_data_iter_header((struct http_decoder_half_data *)req_data, hdr);
+}
+
+static int http_msg_response_header_next(const struct http_message *msg, struct http_header *hdr)
+{
+ const struct http_decoder_half_data *res_data =
+ msg->ref_queue->array[msg->queue_index].res_data;
+ return http_decoder_half_data_iter_header((struct http_decoder_half_data *)res_data, hdr);
+}
+
+#if 0
+static int http_msg_get_request_raw_body(const struct http_message *msg, hstring *body)
+{
+ const struct http_decoder_half_data *req_data =
+ msg->ref_queue->array[msg->queue_index].req_data;
+ return http_decoder_half_data_get_raw_body(req_data, body);
+}
+
+static int http_msg_get_response_raw_body(const struct http_message *msg, hstring *body)
+{
+ const struct http_decoder_half_data *res_data =
+ msg->ref_queue->array[msg->queue_index].res_data;
+ return http_decoder_half_data_get_raw_body(res_data, body);
+}
+
+static int http_msg_get_request_decompress_body(const struct http_message *msg, hstring *body)
+{
+ const struct http_decoder_half_data *req_data =
+ msg->ref_queue->array[msg->queue_index].req_data;
+ return http_decoder_half_data_get_decompress_body(req_data, body);
+}
+
+static int http_msg_get_response_decompress_body(const struct http_message *msg, hstring *body)
+{
+ const struct http_decoder_half_data *res_data =
+ msg->ref_queue->array[msg->queue_index].res_data;
+ return http_decoder_half_data_get_decompress_body(res_data, body);
+}
+#endif
+
+static struct http_decoder_exdata *httpd_session_exdata_new(struct session *sess, struct http_decoder_env *httpd_env,
+ long long req_start_seq, long long res_start_seq)
+{
+ struct http_decoder_exdata *exdata = http_decoder_exdata_new(httpd_env->hd_cfg.mempool_size,
+ httpd_env->hd_cfg.result_queue_len,
+ httpd_env->hd_cfg.decompress_switch,
+ httpd_env, req_start_seq, res_start_seq);
+ // exdata->sub_topic_id = sub_topic_id;
+ int thread_id = stellar_get_current_thread_index();
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_SESSION_NEW, 1);
+ return exdata;
+}
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+ void httpd_ex_data_free_cb(int idx, void *ex_data, void *arg)
+ {
+ if (NULL == ex_data)
+ {
+ return;
+ }
+ struct http_decoder_exdata *exdata = (struct http_decoder_exdata *)ex_data;
+ http_decoder_exdata_free(exdata);
+ }
+
+ void *httpd_session_ctx_new_cb(struct session *sess, void *plugin_env)
+ {
+ return (void *)HTTP_CTX_IS_HTTP;
+ }
+
+ void httpd_session_ctx_free_cb(struct session *sess, void *session_ctx, void *plugin_env)
+ {
+ if (NULL == plugin_env || NULL == session_ctx)
+ {
+ return;
+ }
+ if (strncmp((const char *)session_ctx, HTTP_CTX_NOT_HTTP, strlen(HTTP_CTX_NOT_HTTP)) == 0)
+ {
+ return;
+ }
+ struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env;
+ int thread_id = stellar_get_current_thread_index();
+ unsigned char flow_flag = 0;
+ session_is_symmetric(sess, &flow_flag);
+
+ if (SESSION_SEEN_C2S_FLOW == flow_flag)
+ {
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_ASYMMETRY_SESSION_C2S, 1);
+ }
+ else if (SESSION_SEEN_S2C_FLOW == flow_flag)
+ {
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_ASYMMETRY_SESSION_S2C, 1);
+ }
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_SESSION_FREE, 1);
+ }
+
+ static void http_decoder_execute(struct session *sess, struct http_decoder_env *httpd_env, http_decoder_exdata *exdata, const char *payload, uint16_t payload_len)
+ {
+ if (httpd_in_tunnel_transmitting(httpd_env, exdata))
+ {
+ http_decoder_push_tunnel_data(sess, exdata, httpd_tunnel_state_to_msg(exdata), payload, payload_len);
+ httpd_tunnel_state_update(exdata);
+ return;
+ }
+
+ int thread_id = stellar_get_current_thread_index();
+ struct http_decoder_half *cur_half = NULL;
+ enum flow_direction sess_dir = session_get_current_flow_direction(sess);
+ if (FLOW_DIRECTION_C2S == sess_dir)
+ {
+ cur_half = exdata->decoder->c2s_half;
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_BYTES_C2S, payload_len);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TCP_SEG_C2S, 1);
+ }
+ else
+ {
+ cur_half = exdata->decoder->s2c_half;
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_BYTES_S2C, payload_len);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TCP_SEG_S2C, 1);
+ }
+
+ http_decoder_half_reinit(cur_half, exdata->queue, exdata->mempool, sess);
+ int ret = http_decoder_half_parse(httpd_env->hd_cfg.proxy_enable, cur_half, payload, payload_len);
+ if (ret < 0)
+ {
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_PARSE_ERR, 1);
+ stellar_session_plugin_dettach_current_session(sess);
+ }
+ }
+
+ void http_decoder_tunnel_msg_cb(struct session *sess, int topic_id, const void *tmsg, void *per_session_ctx, void *plugin_env)
+ {
+ struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env;
+ if (0 == httpd_env->hd_cfg.proxy_enable)
+ {
+ return;
+ }
+ hstring tunnel_payload;
+ http_tunnel_message_get_payload((const struct http_tunnel_message *)tmsg, &tunnel_payload);
+ uint16_t payload_len = tunnel_payload.iov_len;
+ const char *payload = (char *)tunnel_payload.iov_base;
+ if (NULL == payload || 0 == payload_len)
+ {
+ return;
+ }
+
+ struct http_decoder_exdata *exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].exdata_id);
+ enum http_tunnel_message_type tmsg_type = http_tunnel_message_type_get((const struct http_tunnel_message *)tmsg);
+
+ switch (tmsg_type)
+ {
+ case HTTP_TUNNEL_OPENING:
+ {
+ if (NULL != exdata)
+ {
+ // not support nested http tunnel
+ session_mq_ignore_message(sess, topic_id, httpd_env->plugin_id);
+ return;
+ }
+ size_t http_identify_len = payload_len > HTTP_IDENTIFY_LEN ? HTTP_IDENTIFY_LEN : payload_len;
+ int is_http = http_protocol_identify(payload, http_identify_len);
+ if (is_http)
+ {
+ long long max_req_seq = 0, max_res_seq = 0;
+ struct http_decoder_exdata *tcp_stream_exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id);
+ http_half_get_max_transaction_seq(tcp_stream_exdata, &max_req_seq, &max_res_seq);
+ exdata = httpd_session_exdata_new(sess, httpd_env, max_req_seq, max_res_seq);
+ session_exdata_set(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].exdata_id, exdata);
+ exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_MSG_INDEX].sub_topic_id;
+ exdata->in_tunnel_is_http = 1;
+ }
+ else
+ {
+ // inner tunnel is not http, do nothing, do not push this message again !!!
+ session_mq_ignore_message(sess, topic_id, httpd_env->plugin_id);
+ return;
+ }
+ }
+ break;
+
+ case HTTP_TUNNEL_ACTIVE:
+ if (NULL == exdata)
+ {
+ session_mq_ignore_message(sess, topic_id, httpd_env->plugin_id);
+ http_decoder_stat_update(&httpd_env->hd_stat, stellar_get_current_thread_index(), HTTPD_STAT_PARSE_ERR, 1);
+ return;
+ }
+ break;
+
+ case HTTP_TUNNEL_CLOSING:
+ if (NULL == exdata)
+ {
+ http_decoder_stat_update(&httpd_env->hd_stat, stellar_get_current_thread_index(), HTTPD_STAT_PARSE_ERR, 1);
+ return;
+ }
+ if (exdata->in_tunnel_is_http)
+ {
+ http_half_pre_context_free(sess, exdata);
+ }
+ return;
+ break;
+
+ default:
+ break;
+ }
+ if (exdata->in_tunnel_is_http)
+ {
+ http_decoder_execute(sess, httpd_env, exdata, payload, payload_len);
+ }
+ return;
+ }
+
+ void http_decoder_tcp_stream_msg_cb(struct session *sess, int topic_id, const void *msg, void *nouse_session_ctx, void *plugin_env)
+ {
+ struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env;
+ struct http_decoder_exdata *exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id);
+ enum session_state sess_state = session_get_current_state(sess);
+ const char *payload = NULL;
+ uint16_t payload_len = 0;
+
+ if (SESSION_STATE_CLOSED == sess_state)
+ {
+ if (httpd_in_tunnel_transmitting(httpd_env, exdata))
+ {
+ http_decoder_push_tunnel_data(sess, exdata, HTTP_TUNNEL_CLOSING, NULL, 0);
+ }
+ else
+ {
+ http_half_pre_context_free(sess, exdata);
+ }
+ return;
+ }
+ assert(msg != NULL);
+
+ payload_len = tcp_segment_get_len((struct tcp_segment *)msg);
+ payload = tcp_segment_get_data((const struct tcp_segment *)msg);
+ if (unlikely(0 == payload_len || NULL == payload))
+ {
+ return;
+ }
+ if (NULL == exdata) // first packet
+ {
+ size_t http_identify_len = payload_len > HTTP_IDENTIFY_LEN ? HTTP_IDENTIFY_LEN : payload_len;
+ int ret = http_protocol_identify(payload, http_identify_len);
+ if (ret < 0)
+ {
+ stellar_session_plugin_dettach_current_session(sess);
+ return;
+ }
+ exdata = httpd_session_exdata_new(sess, httpd_env, 0, 0);
+ exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_MSG_INDEX].sub_topic_id;
+ session_exdata_set(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id, exdata);
+ }
+ http_decoder_execute(sess, httpd_env, exdata, payload, payload_len);
+ return;
+ }
+
+ static const struct http_topic_exdata_compose g_topic_exdata_compose[HTTPD_TOPIC_INDEX_MAX] =
+ {
+ {HTTPD_TOPIC_TCP_STREAM_INDEX, TOPIC_TCP_STREAM, http_decoder_tcp_stream_msg_cb, NULL, "HTTP_DECODER_EXDATA_BASEON_TCP_STREAM", httpd_ex_data_free_cb, -1, -1},
+ {HTTPD_TOPIC_HTTP_MSG_INDEX, HTTP_DECODER_TOPIC, NULL, http_message_free, NULL, NULL, -1, -1},
+ {HTTPD_TOPIC_HTTP_TUNNEL_INDEX, HTTP_DECODER_TUNNEL_TOPIC, http_decoder_tunnel_msg_cb, http_message_free, "HTTP_DECODER_EXDATA_BASEON_HTTP_TUNNEL", httpd_ex_data_free_cb, -1, -1},
+ };
+
+ static void http_decoder_topic_exdata_compose_init(struct http_decoder_env *httpd_env)
+ {
+ memcpy(httpd_env->topic_exdata_compose, g_topic_exdata_compose, sizeof(g_topic_exdata_compose));
+ for (int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++)
+ {
+ httpd_env->topic_exdata_compose[i].sub_topic_id = stellar_session_mq_get_topic_id_reliable(httpd_env->st,
+ httpd_env->topic_exdata_compose[i].topic_name,
+ httpd_env->topic_exdata_compose[i].msg_free_cb,
+ NULL);
+ assert(httpd_env->topic_exdata_compose[i].sub_topic_id >= 0);
+
+ if (httpd_env->topic_exdata_compose[i].exdata_name)
+ {
+ httpd_env->topic_exdata_compose[i].exdata_id = stellar_exdata_new_index(httpd_env->st,
+ httpd_env->topic_exdata_compose[i].exdata_name,
+ httpd_env->topic_exdata_compose[i].exdata_free_cb,
+ NULL);
+ assert(httpd_env->topic_exdata_compose[i].exdata_id >= 0);
+ }
+
+ if (httpd_env->topic_exdata_compose[i].on_msg_cb)
+ {
+ stellar_session_mq_subscribe(httpd_env->st, httpd_env->topic_exdata_compose[i].sub_topic_id,
+ httpd_env->topic_exdata_compose[i].on_msg_cb, httpd_env->plugin_id);
+ }
+ }
+ }
+
+ int http_topic_exdata_compose_get_index(const struct http_decoder_env *httpd_env, int by_topic_id)
+ {
+ for (int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++)
+ {
+ if (httpd_env->topic_exdata_compose[i].sub_topic_id == by_topic_id)
+ {
+ return i;
+ }
+ }
+ assert(0);
+ return -1;
+ }
+
+ void *http_decoder_init(struct stellar *st)
+ {
+ int thread_num = 0;
+
+ struct http_decoder_env *httpd_env = CALLOC(struct http_decoder_env, 1);
+ int ret = load_http_decoder_config(HTTPD_CFG_FILE, &httpd_env->hd_cfg);
+ if (ret < 0)
+ {
+ goto failed;
+ }
+ httpd_env->st = st;
+ httpd_env->plugin_id = stellar_session_plugin_register(st, httpd_session_ctx_new_cb,
+ httpd_session_ctx_free_cb, (void *)httpd_env);
+ if (httpd_env->plugin_id < 0)
+ {
+ goto failed;
+ }
+ http_decoder_topic_exdata_compose_init(httpd_env);
+
+ thread_num = stellar_get_worker_thread_num(st);
+ assert(thread_num >= 1);
+ if (http_decoder_stat_init(&httpd_env->hd_stat, thread_num,
+ httpd_env->hd_cfg.stat_interval_pkts, httpd_env->hd_cfg.stat_output_interval) < 0)
+ {
+ goto failed;
+ }
+
+ return httpd_env;
+
+ failed:
+ fprintf(stderr, "http_decoder_init fail!\n");
+ _http_decoder_context_free(httpd_env);
+ return NULL;
+ }
+
+ void http_decoder_exit(void *plugin_env)
+ {
+ if (NULL == plugin_env)
+ {
+ return;
+ }
+ struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env;
+ _http_decoder_context_free(httpd_env);
+ }
+
+ enum http_message_type http_message_type_get(const struct http_message *msg)
+ {
+ if (unlikely(NULL == msg))
+ {
+ return HTTP_MESSAGE_MAX;
+ }
+ return msg->type;
+ }
+
+ void http_message_request_line_get0(const struct http_message *msg,
+ struct http_request_line *line)
+ {
+ if (unlikely(NULL == msg || msg->type != HTTP_MESSAGE_REQ_LINE))
+ {
+ if (line)
+ {
+ line->method.iov_base = NULL;
+ line->uri.iov_base = NULL;
+ line->version.iov_base = NULL;
+ }
+ return;
+ }
+ assert(msg->ref_queue);
+ assert(msg->queue_index < HD_RESULT_QUEUE_LEN);
+
+ struct http_decoder_half_data *req_data =
+ msg->ref_queue->array[msg->queue_index].req_data;
+
+ http_decoder_half_data_get_request_line(req_data, line);
+ }
+
+ void http_message_response_line_get0(const struct http_message *msg,
+ struct http_response_line *line)
+ {
+ if (unlikely(NULL == msg || msg->type != HTTP_MESSAGE_RES_LINE))
+ {
+ if (line)
+ {
+ line->version.iov_base = NULL;
+ line->status.iov_base = NULL;
+ }
+ return;
+ }
+ assert(msg->ref_queue);
+ assert(msg->queue_index < HD_RESULT_QUEUE_LEN);
+
+ struct http_decoder_half_data *res_data =
+ msg->ref_queue->array[msg->queue_index].res_data;
+
+ http_decoder_half_data_get_response_line(res_data, line);
+ }
+
+ void http_message_header_get0(const struct http_message *msg, const hstring *key,
+ struct http_header *hdr_result)
+ {
+ int ret = -1;
+ if (unlikely(NULL == msg || NULL == key))
+ {
+ goto fail;
+ }
+ assert(msg->ref_queue);
+ assert(msg->queue_index < HD_RESULT_QUEUE_LEN);
+ if (HTTP_MESSAGE_REQ_HEADER == msg->type)
+ {
+ ret = http_msg_get_request_header(msg, key, hdr_result);
+ }
+ else if (HTTP_MESSAGE_RES_HEADER == msg->type)
+ {
+ ret = http_msg_get_response_header(msg, key, hdr_result);
+ }
+ if (ret >= 0)
+ {
+ return;
+ }
+ fail:
+ if (hdr_result)
+ {
+ hdr_result->key.iov_base = NULL;
+ hdr_result->val.iov_base = NULL;
+ }
+ return;
+ }
+
+ int http_message_header_next(const struct http_message *msg, struct http_header *header)
+ {
+ int ret = 1;
+ if (unlikely(NULL == msg))
+ {
+ goto fail;
+ }
+ assert(msg->ref_queue);
+ assert(msg->queue_index < HD_RESULT_QUEUE_LEN);
+ if (HTTP_MESSAGE_REQ_HEADER == msg->type)
+ {
+ ret = http_msg_request_header_next(msg, header);
+ }
+ else if (HTTP_MESSAGE_RES_HEADER == msg->type)
+ {
+ ret = http_msg_response_header_next(msg, header);
+ }
+ if (ret < 0)
+ {
+ goto fail;
+ }
+ return 0;
+ fail:
+ if (header)
+ {
+ header->key.iov_base = NULL;
+ header->val.iov_base = NULL;
+ }
+ return -1;
+ }
+
+ int http_message_reset_header_iter(struct http_message *msg)
+ {
+ if (unlikely(NULL == msg))
+ {
+ return -1;
+ }
+ assert(msg->ref_queue);
+ assert(msg->queue_index < HD_RESULT_QUEUE_LEN);
+ if (HTTP_MESSAGE_REQ_HEADER == msg->type)
+ {
+ struct http_decoder_half_data *req_data =
+ msg->ref_queue->array[msg->queue_index].req_data;
+ return http_decoder_half_data_reset_header_iter(req_data);
+ }
+ else if (HTTP_MESSAGE_RES_HEADER == msg->type)
+ {
+ struct http_decoder_half_data *res_data =
+ msg->ref_queue->array[msg->queue_index].res_data;
+ return http_decoder_half_data_reset_header_iter(res_data);
+ }
+ return -1;
+ }
+
+ void http_message_raw_body_get0(const struct http_message *msg, hstring *body)
+ {
+ if (unlikely(NULL == msg))
+ {
+ goto fail;
+ }
+ assert(msg->ref_queue);
+ assert(msg->queue_index < HD_RESULT_QUEUE_LEN);
+ if (msg->raw_payload.iov_base != NULL && msg->raw_payload.iov_len != 0)
+ {
+ body->iov_base = msg->raw_payload.iov_base;
+ body->iov_len = msg->raw_payload.iov_len;
+ return;
+ }
+ fail:
+ if (body)
+ {
+ body->iov_base = NULL;
+ body->iov_len = 0;
+ }
+ return;
+ }
+
+ void http_message_decompress_body_get0(const struct http_message *msg, hstring *decompress_body)
+ {
+ enum http_content_encoding ecode = HTTP_CONTENT_ENCODING_NONE;
+ struct http_decoder_half_data *ref_data = NULL;
+ if (unlikely(NULL == msg))
+ {
+ goto fail;
+ }
+ assert(msg->ref_queue);
+ assert(msg->queue_index < HD_RESULT_QUEUE_LEN);
+ if (msg->decompress_payload.iov_base != NULL && msg->decompress_payload.iov_len != 0)
+ {
+ decompress_body->iov_base = msg->decompress_payload.iov_base;
+ decompress_body->iov_len = msg->decompress_payload.iov_len;
+ return;
+ }
+ /**
+ * @brief If the body hasn't been compressed, same as http_message_raw_body_get0().
+ *
+ */
+
+ if (HTTP_MESSAGE_REQ_BODY_START == msg->type
+ || HTTP_MESSAGE_REQ_BODY == msg->type
+ || HTTP_MESSAGE_REQ_BODY_END == msg->type)
+ {
+ ref_data = msg->ref_queue->array[msg->queue_index].req_data;
+ }
+ else if (HTTP_MESSAGE_RES_BODY_START == msg->type
+ || HTTP_MESSAGE_RES_BODY == msg->type
+ || HTTP_MESSAGE_RES_BODY_END == msg->type)
+ {
+ ref_data = msg->ref_queue->array[msg->queue_index].res_data;
+ }
+ ecode = http_half_data_get_content_encoding(ref_data);
+ if(ref_data != NULL && HTTP_CONTENT_ENCODING_NONE != ecode){
+ goto fail;
+ }
+
+ if (msg->raw_payload.iov_base != NULL && msg->raw_payload.iov_len != 0)
+ {
+ decompress_body->iov_base = msg->raw_payload.iov_base;
+ decompress_body->iov_len = msg->raw_payload.iov_len;
+ }
+ return;
+ fail:
+ if (decompress_body)
+ {
+ decompress_body->iov_base = NULL;
+ decompress_body->iov_len = 0;
+ }
+ return;
+ }
+
+ void http_message_raw_url_get0(const struct http_message *msg, hstring *url)
+ {
+ if (unlikely(NULL == msg))
+ {
+ if (url)
+ {
+ url->iov_base = NULL;
+ url->iov_len = 0;
+ }
+ return;
+ }
+ assert(msg->ref_queue);
+ assert(msg->queue_index < HD_RESULT_QUEUE_LEN);
+
+ struct http_decoder_half_data *req_data =
+ msg->ref_queue->array[msg->queue_index].req_data;
+
+ if (http_half_data_get_url(req_data, url) < 0)
+ {
+ goto fail;
+ }
+ return;
+
+ fail:
+ if (url)
+ {
+ url->iov_base = NULL;
+ url->iov_len = 0;
+ }
+ return;
+ }
+
+ void http_message_decoded_url_get0(const struct http_message *msg, struct iovec *url)
+ {
+ if (unlikely(NULL == msg))
+ {
+ if (url)
+ {
+ url->iov_base = NULL;
+ url->iov_len = 0;
+ }
+ return;
+ }
+ assert(msg->ref_queue);
+ assert(msg->queue_index < HD_RESULT_QUEUE_LEN);
+
+ struct http_decoder_half_data *req_data =
+ msg->ref_queue->array[msg->queue_index].req_data;
+
+ if (http_half_data_get_decode_url(req_data, url) < 0)
+ {
+ goto fail;
+ }
+ return;
+
+ fail:
+ if (url)
+ {
+ url->iov_base = NULL;
+ url->iov_len = 0;
+ }
+ return;
+ }
+
+ int http_message_get_transaction_seq(const struct http_message *msg)
+ {
+ if (unlikely(NULL == msg))
+ {
+ return -1;
+ }
+ assert(msg->ref_queue);
+ assert(msg->queue_index < HD_RESULT_QUEUE_LEN);
+ struct http_decoder_half_data *hf_data = NULL;
+ if (HTTP_REQUEST == msg->flow_type)
+ {
+ hf_data = msg->ref_queue->array[msg->queue_index].req_data;
+ }
+ else
+ {
+ hf_data = msg->ref_queue->array[msg->queue_index].res_data;
+ }
+ return http_half_data_get_transaction_seq(hf_data);
+ }
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file
diff --git a/decoders/http/http_decoder_half.cpp b/decoders/http/http_decoder_half.cpp
new file mode 100644
index 0000000..bea97d0
--- /dev/null
+++ b/decoders/http/http_decoder_half.cpp
@@ -0,0 +1,1216 @@
+#include <assert.h>
+#include <stdio.h>
+#include <string.h>
+#include <arpa/inet.h>
+#include "http_decoder_inc.h"
+#include "llhttp.h"
+#include "uthash/utlist.h"
+
+struct http_decompress_buffer
+{
+ struct iovec iov;
+ char is_commit;
+ struct http_decompress_buffer *next, *prev;
+};
+
+struct http_decoder_half_data
+{
+ struct http_decoder_table *table;
+
+ int major_version;
+ int minor_version;
+ int status_code;
+
+ enum http_event state;
+
+ enum http_content_encoding content_encoding;
+ struct http_content_decompress *decompress;
+#if 0
+ char *ref_decompress_body;
+ size_t decompress_body_len;
+#else
+ struct http_decompress_buffer *decompress_buffer_list;
+#endif
+ int joint_url_complete;
+ int url_is_encoded;
+ hstring joint_url; // http://<host>[:<port>]/<path>?<searchpart>
+ hstring decoded_url;
+ long long transaction_index;
+};
+
+struct http_decoder_half
+{
+ llhttp_t parser;
+ llhttp_settings_t settings;
+ enum llhttp_errno error;
+ int decompress_switch;
+ struct http_decoder_env *httpd_env;
+
+ // uint8_t is_request_flow;
+ enum http_event event;
+ http_event_cb *http_ev_cb;
+ struct http_event_context *http_ev_ctx;
+
+ struct http_decoder_half_data *ref_data;
+
+ long long trans_counter;
+ long long err_counter;
+ long long transaction_seq; // accumulated
+
+ const char *data;
+ int data_len;
+};
+
+// #define HTTP_DECODER_DEBUG
+#ifdef HTTP_DECODER_DEBUG
+static void printf_debug_info(const char *desc, const char *at, size_t length)
+{
+ if (at)
+ {
+ char *temp = safe_dup(at, length);
+ printf("HTTP PARSER STAGE: %s: %s\n", desc, temp);
+ FREE(temp);
+ }
+ else
+ {
+ printf("HTTP PARSER STAGE: %s\n", desc);
+ }
+}
+#else
+#define printf_debug_info(desc, at, length)
+#endif
+
+void http_half_decompress_buffer_free(struct http_decoder_half_data *data, hstring *decompress_body)
+{
+ struct http_decompress_buffer *el, *tmp;
+ DL_FOREACH_SAFE(data->decompress_buffer_list, el, tmp)
+ {
+ if (el->iov.iov_base == decompress_body->iov_base
+ && el->iov.iov_len == decompress_body->iov_len)
+ {
+ DL_DELETE(data->decompress_buffer_list, el);
+ if (el->iov.iov_base)
+ {
+ FREE(el->iov.iov_base);
+ }
+ FREE(el);
+ break;
+ }
+ }
+}
+
+void http_half_get_lastest_decompress_buffer(struct http_decoder_half_data *data, hstring *decompress_body)
+{
+ if (data->content_encoding == HTTP_CONTENT_ENCODING_NONE)
+ {
+ return;
+ }
+ if (data->decompress_buffer_list == NULL)
+ {
+ decompress_body->iov_base = NULL;
+ decompress_body->iov_len = 0;
+ return;
+ }
+ if (data->decompress_buffer_list->prev->is_commit == 1)
+ {
+ decompress_body->iov_base = NULL;
+ decompress_body->iov_len = 0;
+ return;
+ }
+ decompress_body->iov_base = data->decompress_buffer_list->prev->iov.iov_base;
+ decompress_body->iov_len = data->decompress_buffer_list->prev->iov.iov_len;
+ data->decompress_buffer_list->prev->is_commit = 1;
+}
+
+static void http_decoder_half_data_decompress(struct http_decoder_half_data *data)
+{
+ assert(data);
+
+ if (data->content_encoding == HTTP_CONTENT_ENCODING_NONE)
+ {
+ return;
+ }
+
+ hstring raw_body = {};
+ http_decoder_table_get_body(data->table, &raw_body);
+ if (raw_body.iov_base == NULL || raw_body.iov_len == 0)
+ {
+ return;
+ }
+
+ if (NULL == data->decompress)
+ {
+ data->decompress = http_content_decompress_create(data->content_encoding);
+ }
+
+ assert(data->decompress);
+ char *local_outdata = NULL;
+ size_t local_outdata_len = 0;
+ if (http_content_decompress_write(data->decompress, (char *)raw_body.iov_base,
+ raw_body.iov_len,
+ &local_outdata,
+ &local_outdata_len) == -1)
+ {
+ // log error
+ http_content_decompress_destroy(data->decompress);
+ data->decompress = NULL;
+ return;
+ }
+
+ if(local_outdata!= NULL && local_outdata_len > 0)
+ {
+ struct http_decompress_buffer *decompress_buffer = CALLOC(struct http_decompress_buffer, 1);
+ assert(decompress_buffer);
+ decompress_buffer->iov.iov_base = local_outdata;
+ decompress_buffer->iov.iov_len = local_outdata_len;
+ DL_APPEND(data->decompress_buffer_list, decompress_buffer);
+ }
+}
+
+/* Possible return values 0, -1, `HPE_PAUSED` */
+static int on_message_begin(llhttp_t *http)
+{
+ printf_debug_info("on_message_begin", NULL, 0);
+
+ struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
+ assert(half);
+
+ if (half->parser.type == HTTP_REQUEST)
+ {
+ half->event = HTTP_EVENT_REQ_INIT;
+ }
+ else
+ {
+ half->event = HTTP_EVENT_RES_INIT;
+ }
+
+ half->ref_data = NULL;
+
+ assert(half->http_ev_cb != NULL);
+ half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler()
+
+ half->trans_counter++;
+ half->ref_data->transaction_index = half->transaction_seq++;
+ return 0;
+}
+
+static int on_message_complete(llhttp_t *http)
+{
+ printf_debug_info("on_message_complete", NULL, 0);
+
+ struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
+ assert(half);
+
+ if (half->parser.type == HTTP_REQUEST)
+ {
+ if (half->event == HTTP_EVENT_REQ_BODY_DATA)
+ {
+ half->event = HTTP_EVENT_REQ_BODY_END;
+ half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env);
+ }
+ }
+ else
+ {
+ if (half->event == HTTP_EVENT_RES_BODY_DATA)
+ {
+ half->event = HTTP_EVENT_RES_BODY_END;
+ half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env);
+ }
+ }
+
+ // trigger req_end/res_end
+ if (half->parser.type == HTTP_REQUEST)
+ {
+ half->event = HTTP_EVENT_REQ_END;
+ half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env);
+ }
+ else
+ {
+ half->event = HTTP_EVENT_RES_END;
+ half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env);
+ }
+
+ return 0;
+}
+
+static int on_reset(llhttp_t *http __attribute__((unused)))
+{
+ printf_debug_info("on_reset", NULL, 0);
+
+ return 0;
+}
+
+static inline int is_line_crlf(struct http_decoder_half *half)
+{
+ const char *chr_r = (char *)memrchr(half->data, '\r', half->data_len);
+ const char *chr_n = (char *)memrchr(half->data, '\n', half->data_len);
+ if (chr_r && chr_n && (chr_r + 1 == chr_n))
+ {
+ return 1;
+ }
+ return 0;
+}
+
+static int on_method(llhttp_t *http, const char *at, size_t length)
+{
+ printf_debug_info("on_method", at, length);
+
+ struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
+ assert(half);
+
+ http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_METHOD, at, length);
+ return 0;
+}
+
+/* Information-only callbacks, return value is ignored */
+static int on_method_complete(llhttp_t *http)
+{
+ printf_debug_info("on_method_complete", NULL, 0);
+
+ struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
+ assert(half);
+
+ if (is_line_crlf(half) == 0)
+ {
+ http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_METHOD);
+ }
+
+ http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_METHOD);
+
+ return 0;
+}
+
+/* Possible return values 0, -1, HPE_USER */
+static int on_uri(llhttp_t *http, const char *at, size_t length)
+{
+ printf_debug_info("on_uri", at, length);
+
+ struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
+ assert(half);
+
+ http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_URI, at, length);
+ return 0;
+}
+
+static void http_decoder_cached_portion_url(struct http_decoder_half *half, const hstring *uri_result)
+{
+ struct http_decoder_half_data *ref_data = half->ref_data;
+ int uri_skip_len = 0;
+
+ if ((uri_result->iov_len) > 7 && (strncasecmp("http://", (char *)uri_result->iov_base, 7) == 0)) // absolute URI
+ {
+ uri_skip_len = strlen("http://");
+ ref_data->joint_url_complete = 1;
+ }
+ else
+ {
+ ref_data->joint_url_complete = 0;
+ }
+
+ ref_data->joint_url.iov_len = uri_result->iov_len - uri_skip_len;
+ ref_data->joint_url.iov_base = MEMPOOL_CALLOC(half->http_ev_ctx->ref_mempool, char, ref_data->joint_url.iov_len);
+ memcpy(ref_data->joint_url.iov_base, (char *)uri_result->iov_base + uri_skip_len, ref_data->joint_url.iov_len);
+}
+
+/* Information-only callbacks, return value is ignored */
+static int on_uri_complete(llhttp_t *http)
+{
+ printf_debug_info("on_uri_complete", NULL, 0);
+
+ struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
+ assert(half);
+
+ if (is_line_crlf(half) == 0)
+ {
+ http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_URI);
+ }
+
+ http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_URI);
+
+ hstring uri_result = {};
+ http_decoder_table_get_uri(half->ref_data->table, &uri_result);
+ assert(uri_result.iov_base);
+ http_decoder_cached_portion_url(half, &uri_result);
+
+ return 0;
+}
+
+/* Possible return values 0, -1, HPE_USER */
+static int on_version(llhttp_t *http, const char *at, size_t length)
+{
+ printf_debug_info("on_version", at, length);
+
+ struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
+ assert(half);
+
+ http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_VERSION, at, length);
+ return 0;
+}
+
+/* Information-only callbacks, return value is ignored */
+static int on_version_complete(llhttp_t *http)
+{
+ printf_debug_info("on_version_complete", NULL, 0);
+
+ struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
+ assert(half);
+
+ if (is_line_crlf(half) == 0)
+ {
+ http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_VERSION);
+ }
+
+ http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_VERSION);
+
+ half->ref_data->major_version = llhttp_get_http_major(&half->parser);
+ half->ref_data->minor_version = llhttp_get_http_minor(&half->parser);
+
+ if (half->parser.type == HTTP_REQUEST)
+ {
+ half->event = HTTP_EVENT_REQ_LINE;
+ if (half->http_ev_cb) // http_event_handler()
+ {
+ half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env);
+ }
+ }
+
+ return 0;
+}
+
+/* Possible return values 0, -1, HPE_USER */
+static int on_status(llhttp_t *http, const char *at, size_t length)
+{
+ printf_debug_info("on_status", at, length);
+
+ struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
+ assert(half);
+
+ http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_STATUS, at, length);
+ return 0;
+}
+
+/* Information-only callbacks, return value is ignored */
+static int on_status_complete(llhttp_t *http)
+{
+ printf_debug_info("on_status_complete", NULL, 0);
+
+ struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
+ assert(half);
+
+ if (is_line_crlf(half) == 0)
+ {
+ http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_STATUS);
+ }
+
+ http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_STATUS);
+ half->ref_data->status_code = llhttp_get_status_code(&half->parser);
+
+ if (half->parser.type == HTTP_RESPONSE)
+ {
+ half->event = HTTP_EVENT_RES_LINE;
+ if (half->http_ev_cb != NULL) // http_event_handler()
+ {
+ half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env);
+ }
+ }
+
+ return 0;
+}
+
+/* Possible return values 0, -1, HPE_USER */
+static int on_header_field(llhttp_t *http, const char *at, size_t length)
+{
+ printf_debug_info("on_header_field", at, length);
+
+ struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
+ assert(half);
+
+ http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_HDRKEY, at, length);
+ return 0;
+}
+
+/* Information-only callbacks, return value is ignored */
+static int on_header_field_complete(llhttp_t *http)
+{
+ printf_debug_info("on_header_field_complete", NULL, 0);
+
+ struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
+ assert(half);
+
+ http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_HDRKEY);
+
+ return 0;
+}
+
+/* Possible return values 0, -1, HPE_USER */
+static int on_header_value(llhttp_t *http, const char *at, size_t length)
+{
+ printf_debug_info("on_header_value", at, length);
+
+ struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
+ assert(half);
+
+ http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_HDRVAL, at, length);
+ return 0;
+}
+
+#define MAX_ENCODING_STR_LEN 8
+/* Information-only callbacks, return value is ignored */
+static int on_header_value_complete(llhttp_t *http)
+{
+ printf_debug_info("on_header_value_complete", NULL, 0);
+
+ struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
+ assert(half);
+
+ if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_HDRKEY) ==
+ STRING_STATE_CACHE)
+ {
+ http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_HDRKEY);
+ }
+
+ http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_HDRVAL);
+
+ if (half->ref_data->content_encoding == HTTP_CONTENT_ENCODING_NONE)
+ {
+ struct http_header http_hdr = {};
+ hstring key = {.iov_base = (char *)"Content-Encoding", .iov_len = 16};
+
+ if (http_decoder_table_get_header(half->ref_data->table, &key, &http_hdr) == 0)
+ {
+ char encoding_str[MAX_ENCODING_STR_LEN + 1] = {0};
+ size_t iov_len = http_hdr.val.iov_len;
+ if (iov_len > MAX_ENCODING_STR_LEN)
+ {
+ iov_len = MAX_ENCODING_STR_LEN;
+ }
+ memcpy(encoding_str, http_hdr.val.iov_base, iov_len);
+ half->ref_data->content_encoding = http_content_encoding_str2int(encoding_str);
+ }
+ }
+
+ if (http->type == HTTP_REQUEST)
+ {
+ http_decoder_get_host_feed_url(half);
+ }
+ return 0;
+}
+
+/* When on_chunk_header is called, the current chunk length is stored
+ * in parser->content_length.
+ * Possible return values 0, -1, `HPE_PAUSED`
+ */
+static int on_chunk_header(llhttp_t *http __attribute__((unused)))
+{
+ printf_debug_info("on_chunk_header", NULL, 0);
+
+ return 0;
+}
+
+/* When on_chunk_header is called, the current chunk length is stored
+ * in parser->content_length.
+ * Possible return values 0, -1, `HPE_PAUSED`
+ */
+static int on_chunk_header_complete(llhttp_t *http __attribute__((unused)))
+{
+ printf_debug_info("on_chunk_header_complete", NULL, 0);
+
+ return 0;
+}
+
+/* Possible return values:
+ * 0 - Proceed normally
+ * 1 - Assume that request/response has no body, and proceed to parsing the next message
+ * 2 - Assume absence of body (as above) and make `llhttp_execute()` return `HPE_PAUSED_UPGRADE`
+ * -1 - Error `HPE_PAUSED`
+ */
+static int on_headers_complete(llhttp_t *http)
+{
+ printf_debug_info("on_headers_complete", NULL, 0);
+
+ struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
+ assert(half);
+ assert(half->ref_data);
+
+ http_decoder_table_set_header_complete(half->ref_data->table);
+
+ if (half->parser.type == HTTP_REQUEST)
+ {
+ half->event = HTTP_EVENT_REQ_HDR_END;
+ }
+ else
+ {
+ half->event = HTTP_EVENT_RES_HDR_END;
+ }
+ half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler()
+
+ return 0;
+}
+
+/* Possible return values 0, -1, HPE_USER */
+static int on_body(llhttp_t *http, const char *at, size_t length)
+{
+ printf_debug_info("on_body", at, length);
+
+ struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
+ assert(half);
+
+ // trigger body_begin event
+ if (half->parser.type == HTTP_REQUEST)
+ {
+ if (half->event == HTTP_EVENT_REQ_HDR_END)
+ {
+ half->event = HTTP_EVENT_REQ_BODY_BEGIN;
+ half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler()
+ }
+ }
+ else
+ {
+ if (half->event == HTTP_EVENT_RES_HDR_END)
+ {
+ half->event = HTTP_EVENT_RES_BODY_BEGIN;
+ half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env);
+ }
+ }
+
+ if (half->ref_data != NULL)
+ {
+ if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_BODY) ==
+ STRING_STATE_COMMIT)
+ {
+ http_decoder_table_reset(half->ref_data->table, HTTP_ITEM_BODY);
+ }
+
+ http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_BODY, at, length);
+ http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_BODY);
+ }
+
+ if (1 == half->decompress_switch && half->ref_data->content_encoding != HTTP_CONTENT_ENCODING_NONE)
+ {
+ http_decoder_half_data_decompress(half->ref_data);
+ }
+
+ if (half->parser.type == HTTP_REQUEST)
+ {
+ half->event = HTTP_EVENT_REQ_BODY_DATA;
+ half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler()
+ }
+ else
+ {
+ half->event = HTTP_EVENT_RES_BODY_DATA;
+ half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env);
+ }
+
+ return 0;
+}
+
+static void http_decoder_half_init(struct http_decoder_half *half, http_event_cb *http_ev_cb, enum llhttp_type type)
+{
+ llhttp_settings_init(&half->settings);
+ llhttp_init(&half->parser, type, &half->settings);
+
+ // half->is_request_flow = (type == HTTP_REQUEST) ? 1 : 0;
+ half->settings.on_message_begin = on_message_begin;
+ half->settings.on_message_complete = on_message_complete;
+ half->settings.on_reset = on_reset;
+
+ half->settings.on_url = on_uri;
+ half->settings.on_url_complete = on_uri_complete;
+
+ half->settings.on_status = on_status;
+ half->settings.on_status_complete = on_status_complete;
+
+ half->settings.on_method = on_method;
+ half->settings.on_method_complete = on_method_complete;
+
+ half->settings.on_version = on_version;
+ half->settings.on_version_complete = on_version_complete;
+
+ half->settings.on_header_field = on_header_field;
+ half->settings.on_header_field_complete = on_header_field_complete;
+
+ half->settings.on_header_value = on_header_value;
+ half->settings.on_header_value_complete = on_header_value_complete;
+
+ half->settings.on_chunk_header = on_chunk_header;
+ half->settings.on_chunk_complete = on_chunk_header_complete;
+
+ half->settings.on_headers_complete = on_headers_complete;
+ half->settings.on_body = on_body;
+
+ half->error = HPE_OK;
+ half->http_ev_cb = http_ev_cb; // http_event_handler()
+ half->ref_data = NULL;
+}
+
+struct http_decoder_half *http_decoder_half_new(struct http_decoder_exdata *hd_ctx, nmx_pool_t *mempool,
+ http_event_cb *ev_cb, enum llhttp_type http_type,
+ int decompress_switch, struct http_decoder_env *httpd_env, long long start_seq)
+{
+ struct http_decoder_half *half = MEMPOOL_CALLOC(mempool, struct http_decoder_half, 1);
+ assert(half);
+
+ half->decompress_switch = decompress_switch;
+ half->http_ev_ctx = MEMPOOL_CALLOC(mempool, struct http_event_context, 1);
+ http_decoder_half_init(half, ev_cb, http_type);
+ half->http_ev_ctx->ref_httpd_ctx = hd_ctx;
+ half->httpd_env = httpd_env;
+ half->transaction_seq = start_seq;
+ return half;
+}
+
+void http_decoder_half_free(nmx_pool_t *mempool, struct http_decoder_half *half)
+{
+ if (NULL == half)
+ {
+ return;
+ }
+
+ if (half->http_ev_ctx != NULL)
+ {
+ MEMPOOL_FREE(mempool, half->http_ev_ctx);
+ half->http_ev_ctx = NULL;
+ }
+
+ MEMPOOL_FREE(mempool, half);
+}
+
+void http_decoder_half_reinit(struct http_decoder_half *half,
+ struct http_decoder_result_queue *queue,
+ nmx_pool_t *mempool, struct session *sess)
+{
+ assert(half != NULL);
+ if (half->ref_data != NULL)
+ {
+ http_decoder_table_reinit(half->ref_data->table);
+ }
+ half->http_ev_ctx->ref_mempool = mempool;
+ half->http_ev_ctx->ref_session = sess;
+ half->http_ev_ctx->ref_queue = queue;
+}
+
+static void publish_message_for_parsed_header(struct http_decoder_half *half)
+{
+ if (0 == http_decoder_table_has_parsed_header(half->ref_data->table))
+ {
+ return;
+ }
+ if (half->parser.type == HTTP_REQUEST)
+ {
+ half->event = HTTP_EVENT_REQ_HDR;
+ }
+ else
+ {
+ half->event = HTTP_EVENT_RES_HDR;
+ }
+ half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler();
+ return;
+}
+
+int http_decoder_half_parse(int proxy_enable, struct http_decoder_half *half, const char *data, size_t data_len)
+{
+ assert(half && data);
+
+ half->data = (const char *)data;
+ half->data_len = data_len;
+ half->error = llhttp_execute(&half->parser, data, data_len);
+
+ int ret = 0;
+ enum llhttp_type type = HTTP_BOTH;
+
+ switch (half->error)
+ {
+ case HPE_OK:
+ break;
+ case HPE_PAUSED:
+ llhttp_resume(&half->parser);
+ break;
+ case HPE_PAUSED_UPGRADE:
+ if (proxy_enable)
+ {
+ llhttp_resume_after_upgrade(&half->parser);
+ }
+ ret = 0;
+ break;
+ default:
+ type = (enum llhttp_type)half->parser.type;
+ llhttp_init(&half->parser, type, &half->settings);
+ ret = -1;
+ break;
+ }
+
+ if (ret < 0)
+ {
+ // fprintf(stdout,
+ // "llhttp_execute parse error: %s err_reason:%s\n",
+ // llhttp_errno_name(half->error), half->parser.reason);
+ return half->error;
+ }
+
+ if (half->ref_data != NULL)
+ {
+ if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_URI) == STRING_STATE_REFER)
+ {
+ http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_URI);
+ }
+
+ if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_STATUS) == STRING_STATE_REFER)
+ {
+ http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_STATUS);
+ }
+
+ if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_METHOD) == STRING_STATE_REFER)
+ {
+ http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_METHOD);
+ }
+
+ if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_VERSION) == STRING_STATE_REFER)
+ {
+ http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_VERSION);
+ }
+
+ if (http_decoder_table_header_complete(half->ref_data->table))
+ {
+ http_decoder_table_reset_header_complete(half->ref_data->table);
+ }
+ else
+ {
+ // if headers are not completed with EOF \r\n\r\n, push the parsed headers so far
+ publish_message_for_parsed_header(half);
+ }
+
+ enum string_state hdr_key_state =
+ http_decoder_table_state(half->ref_data->table, HTTP_ITEM_HDRKEY);
+ enum string_state hdr_val_state =
+ http_decoder_table_state(half->ref_data->table, HTTP_ITEM_HDRVAL);
+
+ /* Truncated in http header key
+ For example http header k-v => User-Agent: Chrome
+ case1:
+ packet1: User- hdr_key_state == STRING_STATE_REFER
+ packet2: Agent: Chrome
+
+ case2:
+ packet1: User-Agent: hdr_key_state == STRING_STATE_COMMIT
+ hdr_val_state == STRING_STATE_INIT
+ packet2: Chrome
+ */
+ if (hdr_key_state == STRING_STATE_REFER ||
+ (hdr_key_state == STRING_STATE_COMMIT && hdr_val_state == STRING_STATE_INIT))
+ {
+ http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_HDRKEY);
+ }
+
+ /* Truncated in http header value
+ For example http header k-v => User-Agent: Chrome
+ packet1: User-Agent: Ch hdr_key_state == STRING_STATE_COMMIT
+ hdr_val_state == STRING_STATE_REFER
+
+ packet2: rome
+ */
+ if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_HDRVAL) == STRING_STATE_REFER)
+ {
+ /* Header key should have been committed
+ If it's not cached, cache it for next packet to use
+ */
+ http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_HDRKEY);
+ http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_HDRVAL);
+ }
+
+ if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_BODY) == STRING_STATE_REFER)
+ {
+ http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_BODY);
+ }
+ }
+
+ return 0;
+}
+
+long long http_decoder_half_trans_count(struct http_decoder_half *half)
+{
+ if (NULL == half)
+ {
+ return 0;
+ }
+
+ long long trans_cnt = half->trans_counter;
+ half->trans_counter = 0;
+
+ return trans_cnt;
+}
+
+struct http_decoder_half_data *
+http_decoder_half_data_new(nmx_pool_t *mempool)
+{
+ struct http_decoder_half_data *data =
+ MEMPOOL_CALLOC(mempool, struct http_decoder_half_data, 1);
+ assert(data);
+
+ data->table = http_decoder_table_new(mempool);
+ assert(data->table);
+
+ data->major_version = -1;
+ data->minor_version = -1;
+ data->status_code = -1;
+
+ data->content_encoding = HTTP_CONTENT_ENCODING_NONE;
+ // data->ref_decompress_body = NULL;
+ // data->decompress_body_len = 0;
+ data->decompress_buffer_list = NULL;
+ return data;
+}
+
+static void http_decoder_half_decompress_buf_free(struct http_decoder_half_data *ref_data)
+{
+ if (ref_data == NULL)
+ {
+ return;
+ }
+ struct http_decompress_buffer *el, *tmp;
+ DL_FOREACH_SAFE(ref_data->decompress_buffer_list, el, tmp)
+ {
+ DL_DELETE(ref_data->decompress_buffer_list, el);
+ if (el->iov.iov_base != NULL)
+ {
+ FREE(el->iov.iov_base);
+ }
+ FREE(el);
+ }
+ ref_data->decompress_buffer_list = NULL;
+}
+
+void http_decoder_half_data_free(nmx_pool_t *mempool, struct http_decoder_half_data *data)
+{
+ if (NULL == data)
+ {
+ return;
+ }
+
+ if (data->table != NULL)
+ {
+ http_decoder_table_free(data->table);
+ data->table = NULL;
+ }
+
+ if (data->decompress != NULL)
+ {
+ http_content_decompress_destroy(data->decompress);
+ data->decompress = NULL;
+ }
+
+ if (data->joint_url.iov_base)
+ {
+ MEMPOOL_FREE(mempool, data->joint_url.iov_base);
+ data->joint_url.iov_base = NULL;
+ data->joint_url_complete = 0;
+ }
+ http_decoder_half_decompress_buf_free(data);
+ MEMPOOL_FREE(mempool, data);
+}
+
+int http_decoder_half_data_get_request_line(struct http_decoder_half_data *data,
+ struct http_request_line *line)
+{
+ http_decoder_table_get_method(data->table, &line->method);
+ http_decoder_table_get_uri(data->table, &line->uri);
+ http_decoder_table_get_version(data->table, &line->version);
+
+ line->major_version = data->major_version;
+ line->minor_version = data->minor_version;
+
+ return 0;
+}
+
+int http_decoder_half_data_get_response_line(struct http_decoder_half_data *data,
+ struct http_response_line *line)
+{
+ http_decoder_table_get_version(data->table, &line->version);
+ http_decoder_table_get_status(data->table, &line->status);
+
+ line->major_version = data->major_version;
+ line->minor_version = data->minor_version;
+ line->status_code = data->status_code;
+
+ return 0;
+}
+
+int http_decoder_half_data_get_header(const struct http_decoder_half_data *data,
+ const hstring *key,
+ struct http_header *hdr_result)
+{
+ return http_decoder_table_get_header(data->table, key, hdr_result);
+}
+
+int http_decoder_half_data_iter_header(struct http_decoder_half_data *data,
+ struct http_header *header)
+{
+ return http_decoder_table_iter_header((struct http_decoder_table *)data->table, header);
+}
+
+int http_decoder_half_data_reset_header_iter(struct http_decoder_half_data *req_data)
+{
+ if (NULL == req_data)
+ {
+ return -1;
+ }
+ return http_decoder_table_reset_header_iter(req_data->table);
+}
+
+int http_decoder_half_data_has_parsed_header(struct http_decoder_half_data *data)
+{
+ if (NULL == data)
+ {
+ return 0;
+ }
+ return http_decoder_table_has_parsed_header(data->table);
+}
+
+int http_decoder_half_data_get_raw_body(const struct http_decoder_half_data *data, hstring *body)
+{
+ if (NULL == data || NULL == body)
+ {
+ return -1;
+ }
+ return http_decoder_table_get_body(data->table, body);
+}
+#if 0
+int http_decoder_half_data_get_decompress_body(const struct http_decoder_half_data *data, hstring *body)
+{
+ if (HTTP_CONTENT_ENCODING_NONE == data->content_encoding)
+ {
+ return http_decoder_table_get_body(data->table, body);
+ }
+
+ body->iov_base = data->ref_decompress_body;
+ body->iov_len = data->decompress_body_len;
+ return 0;
+}
+#endif
+
+void http_decoder_half_data_dump(struct http_decoder_half *half)
+{
+ if (NULL == half || NULL == half->ref_data)
+ {
+ return;
+ }
+
+ http_decoder_table_dump(half->ref_data->table);
+}
+
+static void using_session_addr_as_host(struct session *ref_session,
+ struct http_header *host_result, nmx_pool_t *mempool)
+{
+#if 1 // in native steallar, can't get the tuple4 from the session yet!!!
+ struct httpd_session_addr ssaddr = {};
+ httpd_session_get_addr(ref_session, &ssaddr);
+ if (ssaddr.ipver != 4 && ssaddr.ipver != 6)
+ {
+ host_result->val.iov_base = MEMPOOL_CALLOC(mempool, char, 1);
+ sprintf((char *)host_result->val.iov_base, "%s", "");
+ host_result->val.iov_len = strlen((char *)host_result->val.iov_base);
+ return;
+ }
+
+ char ip_string_buf[INET6_ADDRSTRLEN];
+ if (4 == ssaddr.ipver)
+ {
+ host_result->val.iov_base = MEMPOOL_CALLOC(mempool, char, (INET_ADDRSTRLEN + 7) /* "ip:port" max length */);
+ inet_ntop(AF_INET, &ssaddr.daddr4, ip_string_buf, INET_ADDRSTRLEN);
+ sprintf((char *)host_result->val.iov_base, "%s:%u", ip_string_buf, ntohs(ssaddr.dport));
+ host_result->val.iov_len = strlen((char *)host_result->val.iov_base);
+ }
+ else if (6 == ssaddr.ipver)
+ {
+ host_result->val.iov_base = MEMPOOL_CALLOC(mempool, char, (INET6_ADDRSTRLEN + 7) /* "ip:port" max length */);
+ inet_ntop(AF_INET6, &ssaddr.daddr6, ip_string_buf, INET6_ADDRSTRLEN);
+ sprintf((char *)host_result->val.iov_base, "%s:%u", ip_string_buf, ntohs(ssaddr.dport));
+ host_result->val.iov_len = strlen((char *)host_result->val.iov_base);
+ }
+ else
+ {
+ assert(0);
+ }
+#else
+ host_result->val.iov_base = MEMPOOL_CALLOC(mempool, char, 32);
+ sprintf((char *)host_result->val.iov_base, "%s", "todo:get_tuple4");
+ host_result->val.iov_len = strlen((char *)host_result->val.iov_base);
+#endif
+}
+
+void http_decoder_join_url(struct http_decoder_half_data *hfdata, nmx_pool_t *mempool, const struct http_header *host_hdr)
+{
+ int append_slash_len = 0;
+ if ('/' != ((char *)hfdata->joint_url.iov_base)[0])
+ {
+ append_slash_len = 1;
+ }
+ int url_cache_str_len = host_hdr->val.iov_len + hfdata->joint_url.iov_len + append_slash_len;
+ char *url_cache_str = MEMPOOL_CALLOC(mempool, char, url_cache_str_len);
+
+ char *ptr = url_cache_str;
+ memcpy(ptr, host_hdr->val.iov_base, host_hdr->val.iov_len);
+ ptr += host_hdr->val.iov_len;
+ if (append_slash_len)
+ {
+ *ptr = '/';
+ ptr++;
+ }
+ memcpy(ptr, hfdata->joint_url.iov_base, hfdata->joint_url.iov_len);
+
+ MEMPOOL_FREE(mempool, hfdata->joint_url.iov_base); // free the cached uri buffer
+ hfdata->joint_url.iov_base = url_cache_str;
+ hfdata->joint_url.iov_len = url_cache_str_len;
+
+ hfdata->url_is_encoded = httpd_url_is_encoded(url_cache_str, url_cache_str_len);
+ if (hfdata->url_is_encoded)
+ {
+ hfdata->decoded_url.iov_base = MEMPOOL_CALLOC(mempool, char, url_cache_str_len);
+ httpd_url_decode(url_cache_str, url_cache_str_len, (char *)hfdata->decoded_url.iov_base, &hfdata->decoded_url.iov_len);
+ }
+ else
+ {
+ hfdata->decoded_url.iov_base = url_cache_str;
+ hfdata->decoded_url.iov_len = url_cache_str_len;
+ }
+
+ hfdata->joint_url_complete = 1;
+}
+
+void http_decoder_get_url(struct http_decoder_half_data *hfdata, nmx_pool_t *mempool)
+{
+ struct http_request_line reqline = {};
+ http_decoder_half_data_get_request_line(hfdata, &reqline);
+ if (unlikely(strncasecmp_safe("CONNECT", (char *)reqline.method.iov_base, 7, reqline.method.iov_len) == 0))
+ {
+ hfdata->joint_url.iov_base = MEMPOOL_CALLOC(mempool, char, reqline.uri.iov_len + 1);
+ memcpy(hfdata->joint_url.iov_base, reqline.uri.iov_base, reqline.uri.iov_len);
+ hfdata->joint_url.iov_len = reqline.uri.iov_len;
+ hfdata->joint_url_complete = 1;
+ }
+}
+
+int http_decoder_join_url_finally(struct http_event_context *ev_ctx,
+ struct http_decoder_half_data *hfdata,
+ nmx_pool_t *mempool)
+{
+ struct http_header addr_as_host = {};
+
+ if (hfdata->joint_url_complete)
+ {
+ return 0;
+ }
+
+ using_session_addr_as_host(ev_ctx->ref_session, &addr_as_host, mempool);
+ http_decoder_join_url(hfdata, mempool, &addr_as_host);
+ MEMPOOL_FREE(mempool, addr_as_host.val.iov_base); // free session addr to host buffer
+ return 1;
+}
+
+void http_decoder_get_host_feed_url(struct http_decoder_half *half)
+{
+ struct http_header host_result = {};
+ hstring host_key = {(char *)"Host", 4};
+
+ if (half->ref_data->joint_url_complete)
+ {
+ return;
+ }
+
+ int host_header_cnt = http_decoder_half_data_get_header(half->ref_data, &host_key,
+ &host_result);
+ if (host_header_cnt < 0)
+ {
+ return;
+ }
+
+ http_decoder_join_url(half->ref_data, half->http_ev_ctx->ref_mempool, &host_result);
+}
+
+int http_half_data_get_url(struct http_decoder_half_data *res_data, hstring *url)
+{
+ if (0 == res_data->joint_url_complete)
+ {
+ return -1;
+ }
+ url->iov_base = res_data->joint_url.iov_base;
+ url->iov_len = res_data->joint_url.iov_len;
+ return 0;
+}
+int http_half_data_get_decode_url(struct http_decoder_half_data *res_data, hstring *url)
+{
+ if (0 == res_data->joint_url_complete)
+ {
+ return -1;
+ }
+ url->iov_base = res_data->decoded_url.iov_base;
+ url->iov_len = res_data->decoded_url.iov_len;
+ return 0;
+}
+
+int http_half_data_get_transaction_seq(struct http_decoder_half_data *hf_data)
+{
+ return hf_data->transaction_index;
+}
+
+void http_half_data_update_commit_index(struct http_decoder_half_data *half_data)
+{
+ http_decoder_table_update_commit_index(half_data->table);
+}
+
+int http_half_data_get_total_parsed_header_count(struct http_decoder_half_data *half_data)
+{
+ return http_decoder_table_get_total_parsed_header(half_data->table);
+}
+
+void http_half_pre_context_free(struct session *sess, struct http_decoder_exdata *exdata)
+{
+ struct http_message *msg = NULL;
+ struct http_decoder_half_data *res_data = NULL;
+ struct http_decoder_result_queue *queue = NULL;
+
+ if (exdata)
+ {
+ queue = exdata->queue;
+ for (size_t i = 0; i < queue->queue_size; i++)
+ {
+ struct http_decoder_half_data *req_data = queue->array[i].req_data;
+ res_data = queue->array[i].res_data;
+ if ((req_data != NULL) && (NULL == res_data) && (req_data->state < HTTP_EVENT_REQ_END))
+ {
+ msg = http_message_new(HTTP_TRANSACTION_END, queue, i, HTTP_REQUEST);
+ session_mq_publish_message(sess, exdata->pub_topic_id, msg);
+ }
+ }
+
+ for (size_t i = 0; i < queue->queue_size; i++)
+ {
+ res_data = queue->array[i].res_data;
+ if ((res_data != NULL) && (res_data->state < HTTP_EVENT_RES_END))
+ {
+ msg = http_message_new(HTTP_TRANSACTION_END, queue, i, HTTP_RESPONSE);
+ session_mq_publish_message(sess, exdata->pub_topic_id, msg);
+ }
+ }
+ }
+}
+
+void http_half_update_state(struct http_decoder_half_data *hf_data, enum http_event state)
+{
+ hf_data->state = state;
+}
+
+void http_half_get_max_transaction_seq(struct http_decoder_exdata *exdata, long long *max_req_seq, long long *max_res_seq)
+{
+ assert(exdata && max_req_seq && max_res_seq);
+ *max_req_seq = exdata->decoder->c2s_half->transaction_seq;
+ *max_res_seq = exdata->decoder->s2c_half->transaction_seq;
+}
+
+enum http_content_encoding http_half_data_get_content_encoding(struct http_decoder_half_data *hf_data)
+{
+ if(NULL == hf_data)
+ {
+ return HTTP_CONTENT_ENCODING_NONE;
+ }
+ return hf_data->content_encoding;
+} \ No newline at end of file
diff --git a/decoders/http/http_decoder_half.h b/decoders/http/http_decoder_half.h
new file mode 100644
index 0000000..5deeaa6
--- /dev/null
+++ b/decoders/http/http_decoder_half.h
@@ -0,0 +1,105 @@
+#ifndef _HTTP_DECODER_HALF_H_
+#define _HTTP_DECODER_HALF_H_
+
+#include <stddef.h>
+#include "stellar/session.h"
+#include "stellar/http.h"
+#include "http_content_decompress.h"
+#include "http_decoder_result_queue.h"
+#include "llhttp.h"
+
+// only one http event is fired at a time
+enum http_event {
+ HTTP_EVENT_REQ_INIT = 1 << 1,
+ HTTP_EVENT_REQ_LINE = 1 << 2,
+ HTTP_EVENT_REQ_HDR = 1 << 3,
+ HTTP_EVENT_REQ_HDR_END = 1 << 4,
+ HTTP_EVENT_REQ_BODY_BEGIN = 1 << 5,
+ HTTP_EVENT_REQ_BODY_DATA = 1 << 6,
+ HTTP_EVENT_REQ_BODY_END = 1 << 7,
+ HTTP_EVENT_REQ_END = 1 << 8,
+
+ HTTP_EVENT_RES_INIT = 1 << 9,
+ HTTP_EVENT_RES_LINE = 1 << 10,
+ HTTP_EVENT_RES_HDR = 1 << 11,
+ HTTP_EVENT_RES_HDR_END = 1 << 12,
+ HTTP_EVENT_RES_BODY_BEGIN = 1 << 13,
+ HTTP_EVENT_RES_BODY_DATA = 1 << 14,
+ HTTP_EVENT_RES_BODY_END = 1 << 15,
+ HTTP_EVENT_RES_END = 1 << 16,
+};
+
+struct http_event_context {
+ struct http_decoder_exdata *ref_httpd_ctx;
+ nmx_pool_t *ref_mempool;
+ struct session *ref_session;
+ struct http_decoder_result_queue *ref_queue;
+};
+
+struct http_decoder_half;
+struct http_decoder_half_data;
+
+typedef void http_event_cb(enum http_event event, struct http_decoder_half_data **data,
+ struct http_event_context *ev_ctx, void *httpd_plugin_env);
+
+struct http_decoder_half *
+http_decoder_half_new(struct http_decoder_exdata *hd_ctx, nmx_pool_t *mempool, http_event_cb *event_cb,
+ enum llhttp_type http_type, int decompress_switch, struct http_decoder_env *httpd_env,long long start_seq);
+
+void http_decoder_half_free(nmx_pool_t *mempool, struct http_decoder_half *half);
+
+void http_decoder_half_reinit(struct http_decoder_half *half,
+ struct http_decoder_result_queue *queue,
+ nmx_pool_t *mempool, struct session *sess);
+
+int http_decoder_half_parse(int proxy_enable, struct http_decoder_half *half, const char *data, size_t data_len);
+
+long long http_decoder_half_trans_count(struct http_decoder_half *half);
+
+//http decoder half data API
+struct http_decoder_half_data *
+http_decoder_half_data_new(nmx_pool_t *mempool);
+
+void http_decoder_half_data_free(nmx_pool_t *mempool, struct http_decoder_half_data *data);
+
+int http_decoder_half_data_get_request_line(struct http_decoder_half_data *data,
+ struct http_request_line *line);
+
+int http_decoder_half_data_get_response_line(struct http_decoder_half_data *data,
+ struct http_response_line *line);
+
+int http_decoder_half_data_get_header(const struct http_decoder_half_data *data,
+ const hstring *key, struct http_header *hdr_res);
+
+int http_decoder_half_data_iter_header(struct http_decoder_half_data *data,
+ struct http_header *header);
+int http_decoder_half_data_reset_header_iter(struct http_decoder_half_data *req_data);
+int http_decoder_half_data_has_parsed_header(struct http_decoder_half_data *data);
+
+int http_decoder_half_data_get_raw_body(const struct http_decoder_half_data *data, hstring *body);
+
+int http_decoder_half_data_get_decompress_body(const struct http_decoder_half_data *data, hstring *body);
+void http_half_get_lastest_decompress_buffer(struct http_decoder_half_data *data, hstring *decompress_body);
+void http_half_decompress_buffer_free(struct http_decoder_half_data *data, hstring *decompress_body);
+void http_decoder_half_data_dump(struct http_decoder_half *half);
+
+void http_decoder_get_host_feed_url(struct http_decoder_half *half);
+void http_decoder_get_url(struct http_decoder_half_data *hfdata, nmx_pool_t *mempool);
+int http_half_data_get_decode_url(struct http_decoder_half_data *res_data, hstring *url);
+void http_decoder_join_url(struct http_decoder_half_data *hfdata,
+ nmx_pool_t *mempool,
+ const struct http_header *host_hdr);
+int http_decoder_join_url_finally(struct http_event_context *ev_ctx,
+ struct http_decoder_half_data *hfdata,
+ nmx_pool_t *mempool);
+int http_half_data_get_url(struct http_decoder_half_data *res_data, hstring *url);
+int http_half_data_get_transaction_seq(struct http_decoder_half_data *hf_data);
+
+void http_half_data_update_commit_index(struct http_decoder_half_data * half_data);
+void http_half_pre_context_free(struct session *sess, struct http_decoder_exdata *exdata);
+void http_half_update_state(struct http_decoder_half_data *hf_data, enum http_event state);
+int http_half_data_get_total_parsed_header_count(struct http_decoder_half_data * half_data);
+void http_half_get_max_transaction_seq(struct http_decoder_exdata *exdata, long long *max_req_seq, long long *max_res_seq);
+
+enum http_content_encoding http_half_data_get_content_encoding(struct http_decoder_half_data *hf_data);
+#endif \ No newline at end of file
diff --git a/decoders/http/http_decoder_inc.h b/decoders/http/http_decoder_inc.h
new file mode 100644
index 0000000..aee121c
--- /dev/null
+++ b/decoders/http/http_decoder_inc.h
@@ -0,0 +1,162 @@
+/*
+**********************************************************************************************
+* File: http_decoder_inc.h
+* Description:
+* Authors: Liu WenTan <[email protected]>
+* Date: 2024-01-10
+* Copyright: (c) Since 2022 Geedge Networks, Ltd. All rights reserved.
+***********************************************************************************************
+*/
+
+#ifndef _HTTP_DECODER_INC_H_
+#define _HTTP_DECODER_INC_H_
+
+#ifndef __USE_MISC
+#define __USE_MISC 1
+#endif
+
+#include <cstddef>
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+#include <bits/types/struct_iovec.h>
+#include "stellar/stellar.h"
+#include "stellar/layer.h"
+#include "stellar/packet.h"
+#include "stellar/utils.h"
+#include "stellar/session.h"
+#include "stellar/stellar_mq.h"
+#include "stellar/stellar_exdata.h"
+
+#include "mempool/nmx_palloc.h"
+#include "stellar/utils.h"
+#include "stellar/http.h"
+#include "http_decoder_result_queue.h"
+#include "http_decoder_half.h"
+#include "http_decoder_table.h"
+#include "http_decoder_result_queue.h"
+#include "http_decoder_utils.h"
+#include "http_decoder_stat.h"
+#include "http_decoder_tunnel.h"
+#include "fieldstat/fieldstat_easy.h"
+#include "toml/toml.h"
+
+#ifndef likely
+#define likely(x) __builtin_expect((x), 1)
+#endif
+#ifndef unlikely
+#define unlikely(x) __builtin_expect((x), 0)
+#endif
+
+#define MEMPOOL_CALLOC(pool, type, number) ((type *)nmx_pcalloc(pool, sizeof(type) * number))
+#define MEMPOOL_REALLOC(pool)
+#define MEMPOOL_FREE(pool, p) nmx_pfree(pool, p)
+
+#define ENABLE_MEMPOOL 0
+#if ENABLE_MEMPOOL
+#define HD_CALLOC(pool, type, number) MEMPOOL_CALLOC(pool, number, type)
+#define HD_FREE(pool, p) MEMPOOL_FREE(pool, p)
+#else
+#define HD_CALLOC(pool, type, number) CALLOC(type, number)
+#define HD_FREE(pool, p) FREE(p)
+#endif
+
+#define HTTP_IDENTIFY_LEN 16
+#define HD_RESULT_QUEUE_LEN 16
+
+#define DEFAULT_STAT_OUTPUT_INTERVAL 1
+#define DEFAULT_STAT_INTERVAL_PKTS 1000
+#define DEFAULT_MEMPOOL_SIZE (32 * 1024)
+
+#define HTTPD_CFG_FILE "./etc/http/http_decoder.toml"
+#define FILEDSTAT_OUTPUT_FILE "./metrics/http_decoder_fs4.json"
+
+#define HTTP_CTX_NOT_HTTP "__NOT_HTTP_SESS__"
+#define HTTP_CTX_IS_HTTP "__FAKE_HTTP_CTX__"
+
+struct http_decoder_config
+{
+ int decompress_switch;
+ int stat_interval_pkts; // call fieldstat_incrby every stat_interval_pkts
+ int stat_output_interval;
+ int proxy_enable;
+ size_t result_queue_len; // per session result queue length
+ size_t mempool_size; // per session mempool size
+};
+
+/**
+ * NOTE: http_message don't have the ownership of data
+ */
+struct http_message
+{
+ uint8_t flow_type;
+ enum http_message_type type;
+ size_t queue_index;
+ struct http_decoder_result_queue *ref_queue;
+ hstring raw_payload;//cause tcp reorder, maybe receive many tcp segments for one packet
+ hstring decompress_payload;
+ hstring tunnel_payload;
+};
+
+struct http_decoder
+{
+ struct http_decoder_half *c2s_half;
+ struct http_decoder_half *s2c_half;
+};
+
+enum httpd_topic_index{
+ HTTPD_TOPIC_TCP_STREAM_INDEX = 0,
+ HTTPD_TOPIC_HTTP_MSG_INDEX,
+ HTTPD_TOPIC_HTTP_TUNNEL_INDEX,
+ HTTPD_TOPIC_INDEX_MAX,
+};
+
+struct http_decoder_exdata
+{
+ int sub_topic_id; //tcp_stream
+ int pub_topic_id; //http message or http tunnel msg
+ struct http_decoder_result_queue *queue;
+ struct http_decoder *decoder;
+ nmx_pool_t *mempool;
+ enum http_tunnel_state tunnel_state;
+ int in_tunnel_is_http;
+};
+
+// struct http_decoder_context{
+// int array_size;
+// struct http_decoder_exdata **exdata_array; //raw tcp stream for http msg; http tunnel for inner http transaction.
+// };
+
+struct http_topic_exdata_compose{
+ enum httpd_topic_index index;
+ const char *topic_name;
+ on_session_msg_cb_func *on_msg_cb;
+ stellar_msg_free_cb_func *msg_free_cb;
+ const char *exdata_name;
+ stellar_exdata_free *exdata_free_cb;
+ int sub_topic_id; //as consumer
+ int exdata_id;
+};
+
+struct http_decoder_env
+{
+ struct stellar *st;
+ int plugin_id;
+ struct http_topic_exdata_compose topic_exdata_compose[HTTPD_TOPIC_INDEX_MAX];
+ struct http_decoder_config hd_cfg;
+ struct http_decoder_stat hd_stat;
+};
+
+struct http_message;
+
+struct http_message *http_message_new(enum http_message_type type, struct http_decoder_result_queue *queue,
+ int queue_index, unsigned char flow_type);
+struct http_message *http_body_message_new(enum http_message_type type, struct http_decoder_result_queue *queue,
+ int queue_index, uint8_t flow_type, hstring *raw_payload);
+int http_topic_exdata_compose_get_index(const struct http_decoder_env *httpd_env, int by_topic_id);
+#ifdef __cplusplus
+}
+#endif
+
+#endif \ No newline at end of file
diff --git a/decoders/http/http_decoder_result_queue.cpp b/decoders/http/http_decoder_result_queue.cpp
new file mode 100644
index 0000000..6a5e003
--- /dev/null
+++ b/decoders/http/http_decoder_result_queue.cpp
@@ -0,0 +1,148 @@
+/*
+**********************************************************************************************
+* File: http_decoder_result_queue.c
+* Description:
+* Authors: Liuwentan <[email protected]>
+* Date: 2024-01-15
+* Copyright: (c) Since 2022 Geedge Networks, Ltd. All rights reserved.
+***********************************************************************************************
+*/
+#include <assert.h>
+#include "http_decoder_inc.h"
+
+struct http_decoder_result_queue *
+http_decoder_result_queue_new(nmx_pool_t *mempool, size_t queue_size)
+{
+ struct http_decoder_result_queue *queue =
+ MEMPOOL_CALLOC(mempool, struct http_decoder_result_queue, 1);
+ assert(queue);
+
+ queue->req_index = 0;
+ queue->res_index = 0;
+ queue->queue_size = queue_size;
+ queue->array = MEMPOOL_CALLOC(mempool, struct http_decoder_result,
+ queue->queue_size);
+ assert(queue->array);
+ return queue;
+}
+
+void http_decoder_result_queue_free(nmx_pool_t *mempool, struct http_decoder_result_queue *queue)
+{
+ if (NULL == queue) {
+ return;
+ }
+
+ if (queue->array != NULL) {
+ for (size_t i = 0; i < queue->queue_size; i++) {
+ if (queue->array[i].req_data != NULL) {
+ http_decoder_half_data_free(mempool, queue->array[i].req_data);
+ queue->array[i].req_data = NULL;
+ }
+
+ if (queue->array[i].res_data != NULL) {
+ http_decoder_half_data_free(mempool, queue->array[i].res_data);
+ queue->array[i].res_data = NULL;
+ }
+ }
+ MEMPOOL_FREE(mempool, queue->array);
+ }
+ MEMPOOL_FREE(mempool, queue);
+}
+
+void http_decoder_result_queue_inc_req_index(struct http_decoder_result_queue *queue)
+{
+ assert(queue);
+ queue->req_index++;
+ queue->req_index = queue->req_index % queue->queue_size;
+}
+
+void http_decoder_result_queue_inc_res_index(struct http_decoder_result_queue *queue)
+{
+ assert(queue);
+ queue->res_index++;
+ queue->res_index = queue->res_index % queue->queue_size;
+}
+
+size_t http_decoder_result_queue_req_index(struct http_decoder_result_queue *queue)
+{
+ assert(queue);
+ return queue->req_index;
+}
+
+size_t http_decoder_result_queue_res_index(struct http_decoder_result_queue *queue)
+{
+ assert(queue);
+ return queue->res_index;
+}
+
+int http_decoder_result_queue_push_req(struct http_decoder_result_queue *queue,
+ struct http_decoder_half_data *req_data)
+{
+ if (NULL == queue || NULL == req_data) {
+ return -1;
+ }
+ assert(queue->array[queue->req_index].req_data == NULL);
+ if (queue->array[queue->req_index].req_data != NULL) {
+ return -1;
+ }
+ queue->array[queue->req_index].req_data = req_data;
+ return 0;
+}
+
+int http_decoder_result_queue_push_res(struct http_decoder_result_queue *queue,
+ struct http_decoder_half_data *res_data)
+{
+ if (NULL == queue || NULL == res_data) {
+ return -1;
+ }
+ assert(queue->array[queue->res_index].res_data == NULL);
+ if (queue->array[queue->res_index].res_data != NULL) {
+ return -1;
+ }
+
+ queue->array[queue->res_index].res_data = res_data;
+ return 0;
+}
+
+struct http_decoder_half_data *
+http_decoder_result_queue_pop_req(struct http_decoder_result_queue *queue)
+{
+ if (NULL == queue) {
+ return NULL;
+ }
+ struct http_decoder_half_data *req_data = queue->array[queue->req_index].req_data;
+ queue->array[queue->req_index].req_data = NULL;
+ return req_data;
+}
+
+struct http_decoder_half_data *
+http_decoder_result_queue_pop_res(struct http_decoder_result_queue *queue)
+{
+ if (NULL == queue) {
+ return NULL;
+ }
+
+ struct http_decoder_half_data *res_data = queue->array[queue->res_index].res_data;
+ queue->array[queue->res_index].res_data = NULL;
+ return res_data;
+}
+
+struct http_decoder_half_data *
+http_decoder_result_queue_peek_req(struct http_decoder_result_queue *queue)
+{
+ if (NULL == queue) {
+ return NULL;
+ }
+ assert(queue->req_index < queue->queue_size);
+ return queue->array[queue->req_index].req_data;
+}
+
+struct http_decoder_half_data *
+http_decoder_result_queue_peek_res(struct http_decoder_result_queue *queue)
+{
+ if (NULL == queue) {
+ return NULL;
+ }
+ assert(queue->res_index < queue->queue_size);
+ return queue->array[queue->res_index].res_data;
+} \ No newline at end of file
diff --git a/decoders/http/http_decoder_result_queue.h b/decoders/http/http_decoder_result_queue.h
new file mode 100644
index 0000000..4d024eb
--- /dev/null
+++ b/decoders/http/http_decoder_result_queue.h
@@ -0,0 +1,61 @@
+/*
+**********************************************************************************************
+* File: http_decoder_result_queue.h
+* Description:
+* Authors: Liuwentan <[email protected]>
+* Date: 2024-01-15
+* Copyright: (c) Since 2022 Geedge Networks, Ltd. All rights reserved.
+***********************************************************************************************
+*/
+#ifndef _HTTP_DECODER_RESULT_QUEUE_H_
+#define _HTTP_DECODER_RESULT_QUEUE_H_
+
+#include <stddef.h>
+#include "mempool/nmx_palloc.h"
+#include "http_decoder_half.h"
+
+struct http_decoder_result {
+ struct http_decoder_half_data *req_data;
+ struct http_decoder_half_data *res_data;
+};
+
+struct http_decoder_result_queue {
+ size_t req_index;
+ size_t res_index;
+ size_t queue_size;
+ struct http_decoder_result *array;
+};
+
+struct http_decoder_result_queue *
+http_decoder_result_queue_new(nmx_pool_t *mempool, size_t queue_size);
+
+void http_decoder_result_queue_free(nmx_pool_t *mempool,
+ struct http_decoder_result_queue *queue);
+
+void http_decoder_result_queue_inc_req_index(struct http_decoder_result_queue *queue);
+
+void http_decoder_result_queue_inc_res_index(struct http_decoder_result_queue *queue);
+
+size_t http_decoder_result_queue_req_index(struct http_decoder_result_queue *queue);
+
+size_t http_decoder_result_queue_res_index(struct http_decoder_result_queue *queue);
+
+struct http_decoder_half_data *
+http_decoder_result_queue_pop_req(struct http_decoder_result_queue *queue);
+
+struct http_decoder_half_data *
+http_decoder_result_queue_pop_res(struct http_decoder_result_queue *queue);
+
+int http_decoder_result_queue_push_req(struct http_decoder_result_queue *queue,
+ struct http_decoder_half_data *req_data);
+
+int http_decoder_result_queue_push_res(struct http_decoder_result_queue *queue,
+ struct http_decoder_half_data *res_data);
+
+struct http_decoder_half_data *
+http_decoder_result_queue_peek_req(struct http_decoder_result_queue *queue);
+
+struct http_decoder_half_data *
+http_decoder_result_queue_peek_res(struct http_decoder_result_queue *queue);
+
+#endif \ No newline at end of file
diff --git a/decoders/http/http_decoder_stat.cpp b/decoders/http/http_decoder_stat.cpp
new file mode 100644
index 0000000..f616658
--- /dev/null
+++ b/decoders/http/http_decoder_stat.cpp
@@ -0,0 +1,121 @@
+#include <assert.h>
+#include <stdio.h>
+#include <pthread.h>
+#include <unistd.h>
+#include "http_decoder_inc.h"
+
+static const struct hd_stat_config_tuple g_httpd_stat_tuple[HTTPD_STAT_MAX] =
+{
+ {HTTPD_STAT_BYTES_C2S, "bytes_c2s"},
+ {HTTPD_STAT_BYTES_S2C, "bytes_s2c"},
+ {HTTPD_STAT_TCP_SEG_C2S, "tcp_seg_c2s"},
+ {HTTPD_STAT_TCP_SEG_S2C, "tcp_seg_s2c"},
+ {HTTPD_STAT_HEADERS_C2S, "headers_c2s"},
+ {HTTPD_STAT_HEADERS_S2C, "headers_s2c"},
+ {HTTPD_STAT_ZIP_BYTES, "zip_bytes"},
+ {HTTPD_STAT_UNZIP_BYTES, "unzip_bytes"},
+ {HTTPD_STAT_URL_BYTES, "url_bytes"},
+ {HTTPD_STAT_SESSION_NEW, "session_new"},
+ {HTTPD_STAT_SESSION_FREE, "session_free"},
+ {HTTPD_STAT_SESSION_EXCEPTION, "sess_exception"},
+ {HTTPD_STAT_TUNNEL, "tunnel"},
+ {HTTPD_STAT_TRANSACTION_NEW, "trans_new"},
+ {HTTPD_STAT_TRANSACTION_FREE, "trans_free"},
+ {HTTPD_STAT_ASYMMETRY_SESSION_C2S, "asymmetry_sess_c2s"},
+ {HTTPD_STAT_ASYMMETRY_SESSION_S2C, "asymmetry_sess_s2c"},
+ {HTTPD_STAT_ASYMMETRY_TRANSACTION_C2S, "asymmetry_trans_c2s"},
+ {HTTPD_STAT_ASYMMETRY_TRANSACTION_S2C, "asymmetry_trans_s2c"},
+ {HTTPD_STAT_PARSE_ERR, "parse_err"},
+};
+
+void http_decoder_stat_free(struct http_decoder_stat *hd_stat)
+{
+ if(hd_stat->timer_pid != 0){
+ pthread_cancel(hd_stat->timer_pid);
+ }
+ if(hd_stat->stats != NULL){
+ free(hd_stat->stats);
+ }
+ if(hd_stat->fse != NULL){
+ fieldstat_easy_free(hd_stat->fse);
+ }
+}
+
+static void *httpd_stat_timer_thread(void *arg)
+{
+ pthread_setname_np(pthread_self(), "http_decoder_timer_thread");
+ struct http_decoder_stat *hd_stat = (struct http_decoder_stat *)arg;
+ struct timespec res;
+ while(1){
+ clock_gettime(CLOCK_MONOTONIC, &res);
+ hd_stat->current_time_ms = (res.tv_sec * 1000) + (res.tv_nsec / 1000000);
+ usleep(800);
+ }
+ return NULL;
+}
+
+int http_decoder_stat_init(struct http_decoder_stat *hd_stat, int thread_max, int stat_interval_pkts, int stat_interval_time)
+{
+ assert(sizeof(g_httpd_stat_tuple)/sizeof(struct hd_stat_config_tuple) == HTTPD_STAT_MAX);
+ if(sizeof(g_httpd_stat_tuple)/sizeof(struct hd_stat_config_tuple) != HTTPD_STAT_MAX){
+ fprintf(stderr, "enum http_decoder_stat_type number not match with g_httpd_stat_tuple!");
+ return -1;
+ }
+ hd_stat->fse = fieldstat_easy_new(thread_max, "http_decoder_statistics", NULL, 0);
+ if (NULL == hd_stat->fse)
+ {
+ fprintf(stderr, "fieldstat_easy_new failed.");
+ return -1;
+ }
+
+ for(int i = 0; i < HTTPD_STAT_MAX; i++)
+ {
+ hd_stat->field_stat_id[i] = fieldstat_easy_register_counter(hd_stat->fse, g_httpd_stat_tuple[i].name);
+ if (hd_stat->field_stat_id[i] < 0)
+ {
+ fprintf(stderr, "fieldstat_easy_register_counter %s failed.", g_httpd_stat_tuple[i].name);
+ fieldstat_easy_free(hd_stat->fse);
+ hd_stat->fse = NULL;
+ return -1;
+ }
+ }
+
+ int ret = fieldstat_easy_enable_auto_output(hd_stat->fse, FILEDSTAT_OUTPUT_FILE, stat_interval_time);
+ if (ret < 0)
+ {
+ fprintf(stderr, "fieldstat_easy_enable_auto_output failed.");
+ fieldstat_easy_free(hd_stat->fse);
+ hd_stat->fse = NULL;
+ return -1;
+ }
+ hd_stat->stats = (struct hd_statistics *)calloc(thread_max, sizeof(struct hd_statistics));
+ hd_stat->stat_interval_pkts = stat_interval_pkts;
+ hd_stat->stat_interval_time = stat_interval_time;
+ pthread_create(&hd_stat->timer_pid, NULL, httpd_stat_timer_thread, hd_stat);
+ pthread_detach(hd_stat->timer_pid);
+ return 0;
+}
+
+void http_decoder_stat_update(struct http_decoder_stat *hd_stat, int thread_id, enum http_decoder_stat_type type, long long value)
+{
+ assert(hd_stat);
+ assert(thread_id >= 0);
+ assert(type < HTTPD_STAT_MAX);
+
+ if(unlikely(hd_stat->stats == NULL)){
+ return;
+ }
+
+ struct hd_statistics *cur_hds = &hd_stat->stats[thread_id];
+
+ cur_hds->counter[type] += value;
+ cur_hds->batch[type]++;
+
+ if(cur_hds->batch[type] >= hd_stat->stat_interval_pkts
+ || cur_hds->time_ms[type] + 1000 < hd_stat->current_time_ms){
+ fieldstat_easy_counter_incrby(hd_stat->fse, thread_id, hd_stat->field_stat_id[type], NULL, 0, cur_hds->counter[type]);
+ cur_hds->counter[type] = 0;
+ cur_hds->batch[type] = 0;
+ cur_hds->time_ms[type] = hd_stat->current_time_ms;
+ }
+} \ No newline at end of file
diff --git a/decoders/http/http_decoder_stat.h b/decoders/http/http_decoder_stat.h
new file mode 100644
index 0000000..719e6a0
--- /dev/null
+++ b/decoders/http/http_decoder_stat.h
@@ -0,0 +1,57 @@
+#ifndef _HTTP_DECODER_STAT_H_
+#define _HTTP_DECODER_STAT_H_ 1
+
+#include <fieldstat/fieldstat_easy.h>
+enum http_decoder_stat_type
+{
+ HTTPD_STAT_BYTES_C2S = 0,
+ HTTPD_STAT_BYTES_S2C,
+ HTTPD_STAT_TCP_SEG_C2S,
+ HTTPD_STAT_TCP_SEG_S2C,
+ HTTPD_STAT_HEADERS_C2S,
+ HTTPD_STAT_HEADERS_S2C,
+ HTTPD_STAT_ZIP_BYTES, //only if Content-Encoding is gzip, deflate, br
+ HTTPD_STAT_UNZIP_BYTES, //only if Content-Encoding is gzip, deflate, br
+ HTTPD_STAT_URL_BYTES,
+ HTTPD_STAT_SESSION_NEW,
+ HTTPD_STAT_SESSION_FREE,
+ HTTPD_STAT_SESSION_EXCEPTION, // rst, kickout, lost packet, etc.
+ HTTPD_STAT_TUNNEL,
+ HTTPD_STAT_TRANSACTION_NEW,
+ HTTPD_STAT_TRANSACTION_FREE,
+ HTTPD_STAT_ASYMMETRY_SESSION_C2S,
+ HTTPD_STAT_ASYMMETRY_SESSION_S2C,
+ HTTPD_STAT_ASYMMETRY_TRANSACTION_C2S,
+ HTTPD_STAT_ASYMMETRY_TRANSACTION_S2C,
+ HTTPD_STAT_PARSE_ERR,
+ HTTPD_STAT_MAX,
+};
+
+struct hd_stat_config_tuple
+{
+ enum http_decoder_stat_type type;
+ const char *name;
+};
+
+struct hd_statistics
+{
+ long long time_ms[HTTPD_STAT_MAX];
+ long long counter[HTTPD_STAT_MAX];
+ int batch[HTTPD_STAT_MAX]; //call fieldstat_easy_counter_incrby() per batch
+}__attribute__ ((aligned (64)));
+
+struct http_decoder_stat
+{
+ pthread_t timer_pid;
+ long long current_time_ms;
+ struct fieldstat_easy *fse;
+ int stat_interval_pkts; // call fieldstat_incrby every stat_interval_pkts
+ int stat_interval_time; //second
+ int field_stat_id[HTTPD_STAT_MAX];
+ struct hd_statistics *stats; //size is thread number
+};
+
+int http_decoder_stat_init(struct http_decoder_stat *hd_stat, int thread_max, int stat_interval_pkts, int stat_interval_time);
+void http_decoder_stat_free(struct http_decoder_stat *hd_stat);
+void http_decoder_stat_update(struct http_decoder_stat *hd_stat, int thread_id, enum http_decoder_stat_type type, long long value);
+#endif \ No newline at end of file
diff --git a/decoders/http/http_decoder_string.cpp b/decoders/http/http_decoder_string.cpp
new file mode 100644
index 0000000..d54d22b
--- /dev/null
+++ b/decoders/http/http_decoder_string.cpp
@@ -0,0 +1,260 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include "http_decoder_inc.h"
+
+static const char *string_state_to_desc(enum string_state state)
+{
+ switch (state) {
+ case STRING_STATE_INIT:
+ return "init";
+ break;
+ case STRING_STATE_REFER:
+ return "refer";
+ break;
+ case STRING_STATE_CACHE:
+ return "cache";
+ break;
+ case STRING_STATE_COMMIT:
+ return "commit";
+ break;
+ default:
+ return "unknown";
+ break;
+ }
+}
+
+void http_decoder_string_refer(struct http_decoder_string *rstr,
+ const char *at, size_t length)
+{
+ if (NULL == rstr) {
+ return;
+ }
+
+ switch (rstr->state) {
+ case STRING_STATE_INIT:
+ case STRING_STATE_CACHE:
+ rstr->refer.iov_base = (char *)at;
+ rstr->refer.iov_len = length;
+ break;
+ default:
+ abort();
+ break;
+ }
+
+ rstr->state = STRING_STATE_REFER;
+}
+
+static void string_refer2cache(struct http_decoder_string *rstr)
+{
+ if (0 == rstr->refer.iov_len) {
+ return;
+ }
+ if (rstr->cache.iov_len >= rstr->max_cache_size) {
+ return;
+ }
+
+ size_t length = rstr->cache.iov_len + rstr->refer.iov_len;
+ if (length > rstr->max_cache_size) {
+ length = rstr->max_cache_size;
+ }
+
+ if (NULL == rstr->cache.iov_base) {
+ rstr->cache.iov_base = CALLOC(char, length + 1);
+ memcpy(rstr->cache.iov_base, rstr->refer.iov_base, length);
+ } else {
+ rstr->cache.iov_base = REALLOC(char, rstr->cache.iov_base, length + 1);
+ memcpy((char *)rstr->cache.iov_base + rstr->cache.iov_len, rstr->refer.iov_base,
+ (length - rstr->cache.iov_len));
+ }
+
+ rstr->cache.iov_len = length;
+ rstr->refer.iov_base = NULL;
+ rstr->refer.iov_len = 0;
+}
+
+static void string_commit2cache(struct http_decoder_string *rstr)
+{
+ if (rstr->cache.iov_len == rstr->commit.iov_len &&
+ rstr->cache.iov_base == rstr->commit.iov_base) {
+ rstr->commit.iov_base = NULL;
+ rstr->commit.iov_len = 0;
+ return;
+ }
+
+ //Only http header key need to backward to cache
+ size_t length = 0;
+ if (rstr->commit.iov_len > rstr->max_cache_size) {
+ length = rstr->max_cache_size;
+ } else {
+ length = rstr->commit.iov_len;
+ }
+
+ if (length > 0) {
+ if (NULL == rstr->cache.iov_base) {
+ rstr->cache.iov_base = CALLOC(char, length + 1);
+ } else {
+ abort();
+ }
+ memcpy(rstr->cache.iov_base, rstr->commit.iov_base, length);
+ rstr->cache.iov_len = length;
+
+ rstr->commit.iov_base = NULL;
+ rstr->commit.iov_len = 0;
+ }
+}
+
+void http_decoder_string_cache(struct http_decoder_string *rstr)
+{
+ if (NULL == rstr) {
+ return;
+ }
+
+ switch (rstr->state) {
+ case STRING_STATE_REFER:
+ string_refer2cache(rstr);
+ break;
+ case STRING_STATE_CACHE:
+ break;
+ case STRING_STATE_COMMIT:
+ //commit backward to cache
+ string_commit2cache(rstr);
+ break;
+ default:
+ abort();
+ break;
+ }
+ rstr->state = STRING_STATE_CACHE;
+}
+
+void http_decoder_string_commit(struct http_decoder_string *rstr)
+{
+ if (NULL == rstr) {
+ return;
+ }
+
+ switch (rstr->state) {
+ case STRING_STATE_REFER:
+ if (rstr->cache.iov_len) {
+ http_decoder_string_cache(rstr);
+
+ rstr->commit.iov_base = rstr->cache.iov_base;
+ rstr->commit.iov_len = rstr->cache.iov_len;
+ // not overwrite rstr->cache.iov_base
+ } else {
+ rstr->commit.iov_base = rstr->refer.iov_base;
+ rstr->commit.iov_len = rstr->refer.iov_len;
+
+ rstr->refer.iov_base = NULL;
+ rstr->refer.iov_len = 0;
+ }
+ break;
+ case STRING_STATE_CACHE:
+ rstr->commit.iov_base = rstr->cache.iov_base;
+ rstr->commit.iov_len = rstr->cache.iov_len;
+ // not overwrite rstr->cache.iov_base
+ break;
+ default:
+ //abort();
+ break;
+ }
+
+ rstr->state = STRING_STATE_COMMIT;
+}
+
+void http_decoder_string_reset(struct http_decoder_string *rstr)
+{
+ assert(rstr);
+
+ switch (rstr->state) {
+ case STRING_STATE_INIT:
+ case STRING_STATE_REFER:
+ case STRING_STATE_CACHE:
+ case STRING_STATE_COMMIT:
+ FREE(rstr->cache.iov_base);
+ memset(rstr, 0, sizeof(struct http_decoder_string));
+ break;
+ default:
+ abort();
+ break;
+ }
+
+ rstr->state = STRING_STATE_INIT;
+}
+
+
+void http_decoder_string_init(struct http_decoder_string *rstr,
+ size_t max_cache_size)
+{
+ rstr->max_cache_size = max_cache_size;
+}
+
+void http_decoder_string_reinit(struct http_decoder_string *rstr)
+{
+ if (rstr->state == STRING_STATE_CACHE) {
+ return;
+ }
+
+ if (rstr->state == STRING_STATE_COMMIT &&
+ rstr->cache.iov_base == rstr->commit.iov_base &&
+ rstr->cache.iov_len == rstr->commit.iov_len) {
+ return;
+ }
+
+ if (rstr->cache.iov_base != NULL) {
+ FREE(rstr->cache.iov_base);
+ rstr->cache.iov_len = 0;
+ }
+
+#if 0
+ rstr->refer.iov_base = NULL;
+ rstr->refer.iov_len = 0;
+ rstr->commit.iov_base = NULL;
+ rstr->commit.iov_len = 0;
+ rstr->state = STRING_STATE_INIT;
+#endif
+}
+
+enum string_state http_decoder_string_state(const struct http_decoder_string *rstr)
+{
+ return rstr->state;
+}
+
+int http_decoder_string_get(const struct http_decoder_string *rstr, hstring *out)
+{
+ if (NULL == rstr || NULL == out) {
+ return -1;
+ }
+
+ if (http_decoder_string_state(rstr) == STRING_STATE_COMMIT) {
+ out->iov_base = rstr->commit.iov_base;
+ out->iov_len = rstr->commit.iov_len;
+ } else {
+ out->iov_base = NULL;
+ out->iov_len = 0;
+ }
+
+ return 0;
+}
+
+void http_decoder_string_dump(struct http_decoder_string *rstr, const char *desc)
+{
+ if (NULL == rstr) {
+ return;
+ }
+
+ char *refer_str = safe_dup((char *)rstr->refer.iov_base, rstr->refer.iov_len);
+ char *cache_str = safe_dup((char *)rstr->cache.iov_base, rstr->cache.iov_len);
+ char *commit_str = safe_dup((char *)rstr->commit.iov_base, rstr->commit.iov_len);
+
+ printf("%s: state: %s, refer: {len: %02zu, iov_base: %s}, cache: {len: %02zu, iov_base: %s}, commit: {len: %02zu, iov_base: %s}\n",
+ desc, string_state_to_desc(rstr->state),
+ rstr->refer.iov_len, refer_str,
+ rstr->cache.iov_len, cache_str,
+ rstr->commit.iov_len, commit_str);
+
+ FREE(refer_str);
+ FREE(cache_str);
+ FREE(commit_str);
+} \ No newline at end of file
diff --git a/decoders/http/http_decoder_string.h b/decoders/http/http_decoder_string.h
new file mode 100644
index 0000000..3b25732
--- /dev/null
+++ b/decoders/http/http_decoder_string.h
@@ -0,0 +1,74 @@
+#ifndef _HTTP_DECODER_STRING_H_
+#define _HTTP_DECODER_STRING_H_
+
+#include "stellar/http.h"
+
+enum string_state {
+ STRING_STATE_INIT,
+ STRING_STATE_REFER,
+ STRING_STATE_CACHE,
+ STRING_STATE_COMMIT,
+};
+
+/* state transition diagram
+ * +----------+
+ * | |
+ * \|/ |
+ * +------+ |
+ * | init | |
+ * +------+ |
+ * | |
+ * +---->| |
+ * | \|/ |
+ * | +-------+ |
+ * | | refer |--+ |
+ * | +-------+ | |
+ * | | | |
+ * | \|/ | |
+ * | +-------+ | |
+ * +--| cache | | |
+ * +-------+ | |
+ * | | |
+ * |<------+ |
+ * \|/ |
+ * +--------+ |
+ * | commit | |
+ * +--------+ |
+ * | |
+ * \|/ |
+ * +--------+ |
+ * | reset |----+
+ * +--------+
+ */
+
+
+//http decoder string
+struct http_decoder_string {
+ hstring refer; // shallow copy
+ hstring cache; // deep copy
+ hstring commit;
+
+ enum string_state state;
+ size_t max_cache_size;
+};
+
+void http_decoder_string_refer(struct http_decoder_string *rstr,
+ const char *at, size_t length);
+
+void http_decoder_string_cache(struct http_decoder_string *rstr);
+
+void http_decoder_string_commit(struct http_decoder_string *rstr);
+
+void http_decoder_string_reset(struct http_decoder_string *rstr);
+
+void http_decoder_string_init(struct http_decoder_string *rstr,
+ size_t max_cache_size);
+
+void http_decoder_string_reinit(struct http_decoder_string *rstr);
+
+enum string_state http_decoder_string_state(const struct http_decoder_string *rstr);
+
+int http_decoder_string_get(const struct http_decoder_string *rstr, hstring *out);
+
+void http_decoder_string_dump(struct http_decoder_string *rstr, const char *desc);
+#endif \ No newline at end of file
diff --git a/decoders/http/http_decoder_table.cpp b/decoders/http/http_decoder_table.cpp
new file mode 100644
index 0000000..b58154f
--- /dev/null
+++ b/decoders/http/http_decoder_table.cpp
@@ -0,0 +1,534 @@
+#include <assert.h>
+#include <stdlib.h>
+#include <string.h>
+#include "http_decoder_inc.h"
+
+#define INIT_HEADER_CNT 16
+#define MAX_URI_CACHE_SIZE 2048
+#define MAX_STATUS_CACHE_SIZE 32
+#define MAX_METHOD_CACHE_SIZE 8
+#define MAX_VERSION_CACHE_SIZE 4
+#define MAX_HEADER_KEY_CACHE_SIZE 4096
+#define MAX_HEADER_VALUE_CACHE_SIZE 4096
+
+struct http_decoder_header {
+ struct http_decoder_string key;
+ struct http_decoder_string val;
+};
+
+struct http_decoder_table {
+ struct http_decoder_string uri;
+ struct http_decoder_string status;
+ struct http_decoder_string method;
+ struct http_decoder_string version;
+ struct http_decoder_string body;
+
+ nmx_pool_t *ref_mempool;
+ int header_complete; // flag for all headers parsed completely
+ size_t header_cnt;
+ size_t header_index; //current parsing header
+ size_t header_iter; //plugins iterate cursor
+ size_t commit_header_index; //pushed to plugins, whether has called http_message_header_next()
+ struct http_decoder_header *headers;
+};
+
+static void http_decoder_table_init(struct http_decoder_table *table)
+{
+ if (NULL == table) {
+ return;
+ }
+
+ struct http_decoder_header *header = NULL;
+ assert(table);
+
+ http_decoder_string_init(&table->uri, MAX_URI_CACHE_SIZE);
+ http_decoder_string_init(&table->status, MAX_STATUS_CACHE_SIZE);
+ http_decoder_string_init(&table->method, MAX_METHOD_CACHE_SIZE);
+ http_decoder_string_init(&table->version, MAX_METHOD_CACHE_SIZE);
+
+ for (size_t i = 0; i < table->header_cnt; i++) {
+ header = &table->headers[i];
+ http_decoder_string_init(&header->key, MAX_HEADER_KEY_CACHE_SIZE);
+ http_decoder_string_init(&header->val, MAX_HEADER_VALUE_CACHE_SIZE);
+ }
+
+ http_decoder_string_init(&table->body, 0);
+}
+
+struct http_decoder_table *http_decoder_table_new(nmx_pool_t *mempool)
+{
+ struct http_decoder_table *table =
+ MEMPOOL_CALLOC(mempool, struct http_decoder_table, 1);
+ assert(table);
+
+ table->ref_mempool = mempool;
+ table->header_cnt = INIT_HEADER_CNT;
+ table->headers = MEMPOOL_CALLOC(mempool, struct http_decoder_header,
+ table->header_cnt);
+ table->commit_header_index = 0;
+ http_decoder_table_init(table);
+
+ return table;
+}
+
+void http_decoder_table_free(struct http_decoder_table *table)
+{
+ if (NULL == table) {
+ return;
+ }
+ if (table->uri.cache.iov_base != NULL) {
+ FREE(table->uri.cache.iov_base);
+ }
+ if (table->status.cache.iov_base != NULL) {
+ FREE(table->status.cache.iov_base);
+ }
+ if (table->method.cache.iov_base != NULL) {
+ FREE(table->method.cache.iov_base);
+ }
+ if (table->version.cache.iov_base != NULL) {
+ FREE(table->version.cache.iov_base);
+ }
+ if (table->body.cache.iov_base != NULL) {
+ FREE(table->body.cache.iov_base);
+ }
+
+ if (table->headers != NULL) {
+ for (size_t i = 0; i < table->header_cnt; i++) {
+ if (table->headers[i].key.cache.iov_base != NULL) {
+ FREE(table->headers[i].key.cache.iov_base);
+ }
+
+ if (table->headers[i].val.cache.iov_base != NULL) {
+ FREE(table->headers[i].val.cache.iov_base);
+ }
+ }
+
+ MEMPOOL_FREE(table->ref_mempool, table->headers);
+ table->headers = NULL;
+ }
+ MEMPOOL_FREE(table->ref_mempool, table);
+}
+
+enum string_state
+http_decoder_table_state(struct http_decoder_table *table, enum http_item type)
+{
+ if (NULL == table) {
+ return STRING_STATE_INIT;
+ }
+ struct http_decoder_header *header = NULL;
+ enum string_state state = STRING_STATE_INIT;
+ assert(table);
+
+ switch (type) {
+ case HTTP_ITEM_URI:
+ state = http_decoder_string_state(&table->uri);
+ break;
+ case HTTP_ITEM_STATUS:
+ state = http_decoder_string_state(&table->status);
+ break;
+ case HTTP_ITEM_METHOD:
+ state = http_decoder_string_state(&table->method);
+ break;
+ case HTTP_ITEM_VERSION:
+ state = http_decoder_string_state(&table->version);
+ break;
+ case HTTP_ITEM_HDRKEY:
+ assert(table->header_index < table->header_cnt);
+ header = &table->headers[table->header_index];
+ state = http_decoder_string_state(&header->key);
+ break;
+ case HTTP_ITEM_HDRVAL:
+ assert(table->header_index < table->header_cnt);
+ header = &table->headers[table->header_index];
+ state = http_decoder_string_state(&header->val);
+ break;
+ case HTTP_ITEM_BODY:
+ state = http_decoder_string_state(&table->body);
+ break;
+ default:
+ abort();
+ break;
+ }
+
+ return state;
+}
+
+void http_decoder_table_refer(struct http_decoder_table *table, enum http_item type,
+ const char *at, size_t len)
+{
+ if (NULL == table) {
+ return;
+ }
+
+ struct http_decoder_header *header = NULL;
+ assert(table);
+
+ switch (type) {
+ case HTTP_ITEM_URI:
+ http_decoder_string_refer(&table->uri, at, len);
+ break;
+ case HTTP_ITEM_STATUS:
+ http_decoder_string_refer(&table->status, at, len);
+ break;
+ case HTTP_ITEM_METHOD:
+ http_decoder_string_refer(&table->method, at, len);
+ break;
+ case HTTP_ITEM_VERSION:
+ http_decoder_string_refer(&table->version, at, len);
+ break;
+ case HTTP_ITEM_HDRKEY:
+ assert(table->header_index < table->header_cnt);
+ header = &table->headers[table->header_index];
+ http_decoder_string_refer(&header->key, at, len);
+ break;
+ case HTTP_ITEM_HDRVAL:
+ assert(table->header_index < table->header_cnt);
+ header = &table->headers[table->header_index];
+ http_decoder_string_refer(&header->val, at, len);
+ break;
+ case HTTP_ITEM_BODY:
+ http_decoder_string_refer(&table->body, at, len);
+ break;
+ default:
+ abort();
+ break;
+ }
+}
+
+void http_decoder_table_cache(struct http_decoder_table *table, enum http_item type)
+{
+ if (NULL == table) {
+ return;
+ }
+
+ struct http_decoder_header *header = NULL;
+ assert(table);
+
+ switch (type) {
+ case HTTP_ITEM_URI:
+ http_decoder_string_cache(&table->uri);
+ break;
+ case HTTP_ITEM_STATUS:
+ http_decoder_string_cache(&table->status);
+ break;
+ case HTTP_ITEM_METHOD:
+ http_decoder_string_cache(&table->method);
+ break;
+ case HTTP_ITEM_VERSION:
+ http_decoder_string_cache(&table->version);
+ break;
+ case HTTP_ITEM_HDRKEY:
+ assert(table->header_index < table->header_cnt);
+ header = &table->headers[table->header_index];
+ http_decoder_string_cache(&header->key);
+ break;
+ case HTTP_ITEM_HDRVAL:
+ assert(table->header_index < table->header_cnt);
+ header = &table->headers[table->header_index];
+ http_decoder_string_cache(&header->val);
+ break;
+ case HTTP_ITEM_BODY:
+ http_decoder_string_cache(&table->body);
+ break;
+ default:
+ abort();
+ break;
+ }
+}
+
+void http_decoder_table_commit(struct http_decoder_table *table, enum http_item type)
+{
+ if (NULL == table) {
+ return;
+ }
+
+ size_t i = 0;
+ struct http_decoder_header *header = NULL;
+ assert(table);
+
+ switch (type) {
+ case HTTP_ITEM_URI:
+ http_decoder_string_commit(&table->uri);
+ break;
+ case HTTP_ITEM_STATUS:
+ http_decoder_string_commit(&table->status);
+ break;
+ case HTTP_ITEM_METHOD:
+ http_decoder_string_commit(&table->method);
+ break;
+ case HTTP_ITEM_VERSION:
+ http_decoder_string_commit(&table->version);
+ break;
+ case HTTP_ITEM_HDRKEY:
+ assert(table->header_index < table->header_cnt);
+ header = &table->headers[table->header_index];
+ http_decoder_string_commit(&header->key);
+ break;
+ case HTTP_ITEM_HDRVAL:
+ header = &table->headers[table->header_index];
+ http_decoder_string_commit(&header->val);
+ // inc index
+ if ((table->header_index + 1) >= table->header_cnt) {
+ struct http_decoder_header *old_headers = table->headers;
+ table->headers =
+ MEMPOOL_CALLOC(table->ref_mempool, struct http_decoder_header,
+ table->header_cnt * 2);
+ table->header_cnt *= 2;
+
+ for (i = 0; i <= table->header_index; i++) {
+ table->headers[i] = old_headers[i];
+ }
+
+ MEMPOOL_FREE(table->ref_mempool, old_headers);
+
+ for (i = table->header_index + 1; i < table->header_cnt; i++) {
+ header = &table->headers[i];
+ memset(header, 0, sizeof(struct http_decoder_header));
+ http_decoder_string_init(&header->key, MAX_HEADER_KEY_CACHE_SIZE);
+ http_decoder_string_init(&header->val, MAX_HEADER_VALUE_CACHE_SIZE);
+ }
+ }
+ table->header_index++;
+ break;
+ case HTTP_ITEM_BODY:
+ http_decoder_string_commit(&table->body);
+ break;
+ default:
+ abort();
+ break;
+ }
+}
+
+void http_decoder_table_reset(struct http_decoder_table *table, enum http_item type)
+{
+ if (NULL == table) {
+ return;
+ }
+
+ struct http_decoder_header *header = NULL;
+ assert(table);
+
+ switch (type) {
+ case HTTP_ITEM_URI:
+ http_decoder_string_reset(&table->uri);
+ break;
+ case HTTP_ITEM_STATUS:
+ http_decoder_string_reset(&table->status);
+ break;
+ case HTTP_ITEM_METHOD:
+ http_decoder_string_reset(&table->method);
+ break;
+ case HTTP_ITEM_VERSION:
+ http_decoder_string_reset(&table->version);
+ break;
+ case HTTP_ITEM_HDRKEY:
+ header = &table->headers[table->header_index];
+ http_decoder_string_reset(&header->key);
+ break;
+ case HTTP_ITEM_HDRVAL:
+ header = &table->headers[table->header_index];
+ http_decoder_string_reset(&header->val);
+ break;
+ case HTTP_ITEM_BODY:
+ http_decoder_string_reset(&table->body);
+ break;
+ default:
+ abort();
+ break;
+ }
+}
+
+void http_decoder_table_reinit(struct http_decoder_table *table)
+{
+ assert(table);
+ struct http_decoder_header *header = NULL;
+
+ http_decoder_string_reinit(&table->uri);
+ http_decoder_string_reinit(&table->status);
+ http_decoder_string_reinit(&table->method);
+ http_decoder_string_reinit(&table->version);
+ // for (size_t i = 0; i < table->header_iter; i++) {
+ for (size_t i = 0; i < table->commit_header_index; i++) {
+ //todo, reset header_index, avoid realloc headers as much as possible
+ header = &table->headers[i];
+ http_decoder_string_reinit(&header->key);
+ http_decoder_string_reinit(&header->val);
+ }
+
+ http_decoder_string_reinit(&table->body);
+}
+
+void http_decoder_table_dump(struct http_decoder_table *table)
+{
+ if (NULL == table) {
+ return;
+ }
+
+ http_decoder_string_dump(&table->uri, "uri");
+ http_decoder_string_dump(&table->status, "status");
+ http_decoder_string_dump(&table->method, "method");
+ http_decoder_string_dump(&table->version, "version");
+ http_decoder_string_dump(&table->body, "body");
+
+ for (size_t i = 0; i < table->header_cnt; i++) {
+ struct http_decoder_header *header = &table->headers[i];
+ if (NULL == header) {
+ continue;
+ }
+
+ http_decoder_string_dump(&header->key, "key");
+ http_decoder_string_dump(&header->val, "val");
+ }
+}
+
+int http_decoder_table_get_uri(const struct http_decoder_table *table, hstring *out)
+{
+ if (NULL == table || NULL == out) {
+ return -1;
+ }
+ return http_decoder_string_get(&table->uri, out);
+}
+
+int http_decoder_table_get_method(const struct http_decoder_table *table, hstring *out)
+{
+ if (NULL == table || NULL == out) {
+ return -1;
+ }
+ return http_decoder_string_get(&table->method, out);
+}
+
+int http_decoder_table_get_status(const struct http_decoder_table *table, hstring *out)
+{
+ if (NULL == table || NULL == out) {
+ return -1;
+ }
+ return http_decoder_string_get(&table->status, out);
+}
+
+int http_decoder_table_get_version(const struct http_decoder_table *table, hstring *out)
+{
+ if (NULL == table || NULL == out) {
+ return -1;
+ }
+ return http_decoder_string_get(&table->version, out);
+}
+
+int http_decoder_table_get_body(const struct http_decoder_table *table, hstring *out)
+{
+ if (NULL == table || NULL == out) {
+ return -1;
+ }
+ return http_decoder_string_get(&table->body, out);
+}
+
+int http_decoder_table_get_header(const struct http_decoder_table *table, const hstring *key,
+ struct http_header *hdr_result)
+{
+ for (size_t i = 0; i < table->header_cnt; i++) {
+ const struct http_decoder_header *tmp_header = &table->headers[i];
+ if (tmp_header->key.commit.iov_len != key->iov_len) {
+ continue;
+ }
+
+ if (http_decoder_string_state(&tmp_header->key) == STRING_STATE_COMMIT &&
+ http_decoder_string_state(&tmp_header->val) == STRING_STATE_COMMIT) {
+ hstring tmp_key;
+ http_decoder_string_get(&tmp_header->key, &tmp_key);
+
+ if (tmp_key.iov_len == key->iov_len &&
+ (0 == strncasecmp((char *)tmp_key.iov_base, (char *)key->iov_base, key->iov_len))) {
+ http_decoder_string_get(&tmp_header->key, &hdr_result->key);
+ http_decoder_string_get(&tmp_header->val, &hdr_result->val);
+ return 0;
+ }
+ }
+ }
+ return -1;
+}
+
+int http_decoder_table_iter_header(struct http_decoder_table *table,
+ struct http_header *hdr)
+{
+ if (NULL == table || NULL == hdr) {
+ return -1;
+ }
+ if (table->header_iter >= table->header_cnt) {
+ return -1;
+ }
+
+ struct http_decoder_header *tmp_header = &table->headers[table->header_iter];
+ if (tmp_header != NULL) {
+ if (http_decoder_string_state(&tmp_header->key) == STRING_STATE_COMMIT &&
+ http_decoder_string_state(&tmp_header->val) == STRING_STATE_COMMIT) {
+
+ http_decoder_string_get(&tmp_header->key, &hdr->key);
+ http_decoder_string_get(&tmp_header->val, &hdr->val);
+ table->header_iter++;
+ return 1;
+ }
+ }
+
+ hdr->key.iov_base = NULL;
+ hdr->key.iov_len = 0;
+ hdr->val.iov_base = NULL;
+ hdr->val.iov_len = 0;
+
+ return -1;
+}
+
+int http_decoder_table_reset_header_iter(struct http_decoder_table *table)
+{
+ table->header_iter = 0;
+ return 0;
+}
+
+int http_decoder_table_has_parsed_header(struct http_decoder_table *table)
+{
+ // if (NULL == table || (table->header_iter == table->header_index)) {
+ if (NULL == table || (table->commit_header_index == table->header_index)) {
+ return 0;
+ }
+
+ const struct http_decoder_header *tmp_header = &table->headers[table->header_iter];
+
+ if (http_decoder_string_state(&tmp_header->key) == STRING_STATE_COMMIT
+ && http_decoder_string_state(&tmp_header->val) == STRING_STATE_COMMIT) {
+ return 1;
+ }
+
+ return 0;
+}
+
+int http_decoder_table_header_complete(struct http_decoder_table *table)
+{
+ if (NULL == table) {
+ return -1;
+ }
+ return table->header_complete;
+}
+
+void http_decoder_table_set_header_complete(struct http_decoder_table *table)
+{
+ if (NULL == table) {
+ return;
+ }
+ table->header_complete = 1;
+}
+
+void http_decoder_table_reset_header_complete(struct http_decoder_table *table)
+{
+ if (NULL == table) {
+ return;
+ }
+ table->header_complete = 0;
+}
+
+void http_decoder_table_update_commit_index(struct http_decoder_table *table)
+{
+ table->commit_header_index = table->header_index;
+}
+
+int http_decoder_table_get_total_parsed_header(struct http_decoder_table *table)
+{
+ return table->header_index;
+} \ No newline at end of file
diff --git a/decoders/http/http_decoder_table.h b/decoders/http/http_decoder_table.h
new file mode 100644
index 0000000..bbc6956
--- /dev/null
+++ b/decoders/http/http_decoder_table.h
@@ -0,0 +1,80 @@
+#ifndef _HTTP_DECODER_TABLE_H_
+#define _HTTP_DECODER_TABLE_H_
+#include <stddef.h>
+#include "stellar/http.h"
+#include "http_decoder_inc.h"
+#include "http_decoder_string.h"
+
+enum http_item {
+ HTTP_ITEM_URI = 0x01,
+ HTTP_ITEM_STATUS = 0x02,
+ HTTP_ITEM_METHOD = 0x03,
+ HTTP_ITEM_VERSION = 0x04,
+ HTTP_ITEM_HDRKEY = 0x05,
+ HTTP_ITEM_HDRVAL = 0x06,
+ HTTP_ITEM_BODY = 0x07,
+};
+
+struct http_decoder_table;
+struct http_decoder_table *http_decoder_table_new(nmx_pool_t *mempool);
+
+void http_decoder_table_free(struct http_decoder_table *table);
+
+enum string_state
+http_decoder_table_state(struct http_decoder_table *table, enum http_item type);
+
+void http_decoder_table_refer(struct http_decoder_table *table, enum http_item type,
+ const char *at, size_t len);
+
+void http_decoder_table_cache(struct http_decoder_table *table, enum http_item type);
+
+void http_decoder_table_commit(struct http_decoder_table *table, enum http_item type);
+
+void http_decoder_table_reset(struct http_decoder_table *table, enum http_item type);
+
+void http_decoder_table_reinit(struct http_decoder_table *table);
+
+void http_decoder_table_dump(struct http_decoder_table *table);
+
+int http_decoder_table_get_uri(const struct http_decoder_table *table, hstring *out);
+
+int http_decoder_table_get_method(const struct http_decoder_table *table, hstring *out);
+
+int http_decoder_table_get_status(const struct http_decoder_table *table, hstring *out);
+
+int http_decoder_table_get_version(const struct http_decoder_table *table, hstring *out);
+
+int http_decoder_table_get_body(const struct http_decoder_table *table, hstring *out);
+
+int http_decoder_table_get_header(const struct http_decoder_table *table,
+ const hstring *key,
+ struct http_header *hdr_res);
+
+int http_decoder_table_iter_header(struct http_decoder_table *table,
+ struct http_header *hdr);
+int http_decoder_table_reset_header_iter(struct http_decoder_table *table);
+/**
+ * @brief Is there a parsed header
+ *
+ * @retval yes(1) no(0)
+*/
+int http_decoder_table_has_parsed_header(struct http_decoder_table *table);
+
+/**
+ * @brief If headers have been parsed completely
+ *
+ * @retval yes(1) no(0)
+ */
+int http_decoder_table_header_complete(struct http_decoder_table *table);
+
+/**
+ * @brief set flag for headers parsed completely
+*/
+void http_decoder_table_set_header_complete(struct http_decoder_table *table);
+
+void http_decoder_table_reset_header_complete(struct http_decoder_table *table);
+
+void http_decoder_table_update_commit_index(struct http_decoder_table *table);
+
+int http_decoder_table_get_total_parsed_header(struct http_decoder_table *table);
+#endif \ No newline at end of file
diff --git a/decoders/http/http_decoder_tunnel.cpp b/decoders/http/http_decoder_tunnel.cpp
new file mode 100644
index 0000000..42f100a
--- /dev/null
+++ b/decoders/http/http_decoder_tunnel.cpp
@@ -0,0 +1,107 @@
+#include <assert.h>
+#include <stdio.h>
+#include <string.h>
+#include <strings.h>
+#include <unistd.h>
+#include "http_decoder_inc.h"
+#include "llhttp.h"
+
+struct http_tunnel_message
+{
+ enum http_tunnel_message_type type;
+ hstring tunnel_payload;
+};
+
+
+int httpd_tunnel_identify(struct http_decoder_env *httpd_env, int curdir, struct http_decoder_half_data *hfdata)
+{
+ if(0 == httpd_env->hd_cfg.proxy_enable){
+ return 0;
+ }
+
+ if(FLOW_DIRECTION_C2S == curdir){
+ struct http_request_line reqline = {};
+ http_decoder_half_data_get_request_line(hfdata, &reqline);
+ if(0 == strncasecmp_safe("CONNECT", (char *)reqline.method.iov_base,
+ 7, reqline.method.iov_len)){
+ return 1;
+ }
+ }else{
+ struct http_response_line resline = {};
+ http_decoder_half_data_get_response_line(hfdata, &resline);
+ if(resline.status_code == HTTP_STATUS_OK
+ && 0 == strncasecmp_safe("Connection established", (char *)resline.status.iov_base,
+ strlen("Connection established"), resline.status.iov_len)){
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
+int httpd_is_tunnel_session(const struct http_decoder_env *httpd_env, const struct http_decoder_exdata *ex_data)
+{
+ if(0 == httpd_env->hd_cfg.proxy_enable){
+ return 0;
+ }
+ return (ex_data && ex_data->tunnel_state != HTTP_TUN_NON);
+}
+
+int httpd_in_tunnel_transmitting(const struct http_decoder_env *httpd_env, struct http_decoder_exdata *ex_data)
+{
+ if(0 == httpd_env->hd_cfg.proxy_enable){
+ return 0;
+ }
+ return (ex_data && ex_data->tunnel_state >= HTTP_TUN_INNER_STARTING);
+}
+
+enum http_tunnel_message_type httpd_tunnel_state_to_msg(const struct http_decoder_exdata *ex_data)
+{
+ if(ex_data->tunnel_state == HTTP_TUN_INNER_STARTING){
+ return HTTP_TUNNEL_OPENING;
+ }
+ if(ex_data->tunnel_state == HTTP_TUN_INNER_TRANS){
+ return HTTP_TUNNEL_ACTIVE;
+ }
+ return HTTP_TUNNEL_MSG_MAX;
+}
+
+void httpd_tunnel_state_update(struct http_decoder_exdata *ex_data)
+{
+ if(ex_data->tunnel_state == HTTP_TUN_INNER_STARTING){
+ ex_data->tunnel_state = HTTP_TUN_INNER_TRANS;
+ }
+}
+
+void http_decoder_push_tunnel_data(struct session *sess, const struct http_decoder_exdata *exdata, enum http_tunnel_message_type type, const char *payload, uint16_t payload_len)
+{
+ struct http_tunnel_message *tmsg = (struct http_tunnel_message *)CALLOC(struct http_tunnel_message, 1);
+ tmsg->type = type;
+ tmsg->tunnel_payload.iov_base = (char *)payload;
+ tmsg->tunnel_payload.iov_len = payload_len;
+ session_mq_publish_message(sess, exdata->pub_topic_id, tmsg);
+}
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+void http_tunnel_message_get_payload(const struct http_tunnel_message *tmsg,
+ hstring *tunnel_payload)
+{
+ if (unlikely(NULL == tmsg || tunnel_payload == NULL))
+ {
+ return;
+ }
+ tunnel_payload->iov_base = tmsg->tunnel_payload.iov_base;
+ tunnel_payload->iov_len = tmsg->tunnel_payload.iov_len;
+}
+
+enum http_tunnel_message_type http_tunnel_message_type_get(const struct http_tunnel_message *tmsg)
+{
+ return tmsg->type;
+}
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/decoders/http/http_decoder_tunnel.h b/decoders/http/http_decoder_tunnel.h
new file mode 100644
index 0000000..b90e402
--- /dev/null
+++ b/decoders/http/http_decoder_tunnel.h
@@ -0,0 +1,35 @@
+#pragma once
+#include "http_decoder_inc.h"
+#include "http_decoder_half.h"
+
+enum http_tunnel_state{
+ HTTP_TUN_NON = 0, // init, or not tunnel session
+ HTTP_TUN_C2S_HDR_START, //CONNECT ...
+ HTTP_TUN_C2S_END, //CONNECT request end
+ HTTP_TUN_S2C_START, // HTTP 200 connet established
+ HTTP_TUN_INNER_STARTING, // http inner tunnel protocol starting
+ HTTP_TUN_INNER_TRANS, // http inner tunnel protocol transmitting
+};
+
+/************************************************************
+* HTTP TUNNEL WITH CONNECT METHOD.
+*************************************************************/
+struct http_tunnel_message;
+#define HTTP_DECODER_TUNNEL_TOPIC "HTTP_DECODER_TUNNEL_MESSAGE"
+
+enum http_tunnel_message_type {
+ HTTP_TUNNEL_OPENING,
+ HTTP_TUNNEL_ACTIVE,
+ HTTP_TUNNEL_CLOSING,
+ HTTP_TUNNEL_MSG_MAX
+};
+enum http_tunnel_message_type http_tunnel_message_type_get(const struct http_tunnel_message *tmsg);
+void http_tunnel_message_get_payload(const struct http_tunnel_message *tmsg, struct iovec *tunnel_payload);
+
+
+int httpd_tunnel_identify(struct http_decoder_env *httpd_env, int curdir, struct http_decoder_half_data *hfdata);
+int httpd_is_tunnel_session(const struct http_decoder_env *httpd_env, const struct http_decoder_exdata *ex_data);
+int httpd_in_tunnel_transmitting(const struct http_decoder_env *httpd_env, struct http_decoder_exdata *ex_data);
+void httpd_tunnel_state_update(struct http_decoder_exdata *ex_data);
+void http_decoder_push_tunnel_data(struct session *sess, const struct http_decoder_exdata *exdata, enum http_tunnel_message_type type, const char *payload, uint16_t payload_len);
+enum http_tunnel_message_type httpd_tunnel_state_to_msg(const struct http_decoder_exdata *ex_data); \ No newline at end of file
diff --git a/decoders/http/http_decoder_utils.cpp b/decoders/http/http_decoder_utils.cpp
new file mode 100644
index 0000000..93fef81
--- /dev/null
+++ b/decoders/http/http_decoder_utils.cpp
@@ -0,0 +1,309 @@
+#include <string.h>
+#include <assert.h>
+#include "stellar/http.h"
+#include "http_decoder_inc.h"
+
+char *safe_dup(const char *str, size_t len)
+{
+ if (str == NULL || len == 0)
+ {
+ return NULL;
+ }
+ char *dup = CALLOC(char, len + 1);
+ memcpy(dup, str, len);
+ return dup;
+}
+
+int strncasecmp_safe(const char *fix_s1, const char *dyn_s2, size_t fix_n1, size_t dyn_n2)
+{
+ if (fix_s1 == NULL || dyn_s2 == NULL)
+ {
+ return -1;
+ }
+ if (fix_n1 != dyn_n2)
+ {
+ return -1;
+ }
+ return strncasecmp(fix_s1, dyn_s2, fix_n1);
+}
+
+const char *http_message_type_to_string(enum http_message_type type)
+{
+ const char *sname = "unknown_msg_type";
+
+ switch (type)
+ {
+ case HTTP_MESSAGE_REQ_LINE:
+ sname = "HTTP_MESSAGE_REQ_LINE";
+ break;
+ case HTTP_MESSAGE_REQ_HEADER:
+ sname = "HTTP_MESSAGE_REQ_HEADER";
+ break;
+ case HTTP_MESSAGE_REQ_HEADER_END:
+ sname = "HTTP_MESSAGE_REQ_HEADER_END";
+ break;
+ case HTTP_MESSAGE_REQ_BODY_START:
+ sname = "HTTP_MESSAGE_REQ_BODY_START";
+ break;
+ case HTTP_MESSAGE_REQ_BODY:
+ sname = "HTTP_MESSAGE_REQ_BODY";
+ break;
+ case HTTP_MESSAGE_REQ_BODY_END:
+ sname = "HTTP_MESSAGE_REQ_BODY_END";
+ break;
+ case HTTP_MESSAGE_RES_LINE:
+ sname = "HTTP_MESSAGE_RES_LINE";
+ break;
+ case HTTP_MESSAGE_RES_HEADER:
+ sname = "HTTP_MESSAGE_RES_HEADER";
+ break;
+ case HTTP_MESSAGE_RES_HEADER_END:
+ sname = "HTTP_MESSAGE_RES_HEADER_END";
+ break;
+ case HTTP_MESSAGE_RES_BODY_START:
+ sname = "HTTP_MESSAGE_RES_BODY_START";
+ break;
+ case HTTP_MESSAGE_RES_BODY:
+ sname = "HTTP_MESSAGE_RES_BODY";
+ break;
+ case HTTP_MESSAGE_RES_BODY_END:
+ sname = "HTTP_MESSAGE_RES_BODY_END";
+ break;
+ case HTTP_TRANSACTION_START:
+ sname = "HTTP_TRANSACTION_START";
+ break;
+ case HTTP_TRANSACTION_END:
+ sname = "HTTP_TRANSACTION_END";
+ break;
+
+ default:
+ break;
+ }
+
+ return sname;
+}
+
+int http_message_type_is_req(struct session *sess, enum http_message_type msg_type)
+{
+ int is_req_msg = 0;
+
+ switch (msg_type)
+ {
+ case HTTP_MESSAGE_REQ_LINE:
+ case HTTP_MESSAGE_REQ_HEADER:
+ case HTTP_MESSAGE_REQ_HEADER_END:
+ case HTTP_MESSAGE_REQ_BODY_START:
+ case HTTP_MESSAGE_REQ_BODY:
+ case HTTP_MESSAGE_REQ_BODY_END:
+ is_req_msg = 1;
+ break;
+
+ case HTTP_MESSAGE_RES_LINE:
+ case HTTP_MESSAGE_RES_HEADER:
+ case HTTP_MESSAGE_RES_HEADER_END:
+ case HTTP_MESSAGE_RES_BODY_START:
+ case HTTP_MESSAGE_RES_BODY:
+ case HTTP_MESSAGE_RES_BODY_END:
+ is_req_msg = 0;
+ break;
+
+ case HTTP_TRANSACTION_START:
+ case HTTP_TRANSACTION_END:
+ {
+ enum flow_direction cur_dir = session_get_current_flow_direction(sess);
+ if (FLOW_DIRECTION_C2S == cur_dir)
+ {
+ is_req_msg = 1;
+ }
+ else
+ {
+ is_req_msg = 0;
+ }
+ }
+ break;
+
+ default:
+ assert(0);
+ fprintf(stderr, "unknow message type:%d\n", (int)msg_type);
+ break;
+ }
+ return is_req_msg;
+}
+
+int http_event_is_req(enum http_event event)
+{
+ switch (event)
+ {
+ case HTTP_EVENT_REQ_INIT:
+ case HTTP_EVENT_REQ_LINE:
+ case HTTP_EVENT_REQ_HDR:
+ case HTTP_EVENT_REQ_HDR_END:
+ case HTTP_EVENT_REQ_BODY_BEGIN:
+ case HTTP_EVENT_REQ_BODY_DATA:
+ case HTTP_EVENT_REQ_BODY_END:
+ case HTTP_EVENT_REQ_END:
+ return 1;
+ break;
+
+ case HTTP_EVENT_RES_INIT:
+ case HTTP_EVENT_RES_LINE:
+ case HTTP_EVENT_RES_HDR:
+ case HTTP_EVENT_RES_HDR_END:
+ case HTTP_EVENT_RES_BODY_BEGIN:
+ case HTTP_EVENT_RES_BODY_DATA:
+ case HTTP_EVENT_RES_BODY_END:
+ case HTTP_EVENT_RES_END:
+ return 0;
+ break;
+
+ default:
+ assert(0);
+ fprintf(stderr, "unknow event type:%d\n", (int)event);
+ break;
+ }
+ return -1;
+}
+
+int stellar_session_mq_get_topic_id_reliable(struct stellar *st, const char *topic_name, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
+{
+ int topic_id = stellar_mq_get_topic_id(st, topic_name);
+ if (topic_id < 0)
+ {
+ topic_id = stellar_mq_create_topic(st, topic_name, msg_free_cb, msg_free_arg);
+ }
+ return topic_id;
+}
+
+static const unsigned char __g_httpd_hextable[] = {
+ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 0, 0, 0, /* 0x30 - 0x3f */
+ 0, 10, 11, 12, 13, 14, 15, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* 0x40 - 0x4f */
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* 0x50 - 0x5f */
+ 0, 10, 11, 12, 13, 14, 15 /* 0x60 - 0x66 */
+};
+
+/* the input is a single hex digit */
+#define onehex2dec(x) __g_httpd_hextable[x - '0']
+
+#include <ctype.h>
+// https://github.com/curl/curl/blob/2e930c333658725657b94a923d175c6622e0f41d/lib/urlapi.c
+void httpd_url_decode(const char *string, size_t length, char *ostring, size_t *olen)
+{
+ size_t alloc = length;
+ char *ns = ostring;
+
+ while (alloc)
+ {
+ unsigned char in = (unsigned char)*string;
+ if (('%' == in) && (alloc > 2) &&
+ isxdigit(string[1]) && isxdigit(string[2]))
+ {
+ /* this is two hexadecimal digits following a '%' */
+ in = (unsigned char)(onehex2dec(string[1]) << 4) | onehex2dec(string[2]);
+ string += 3;
+ alloc -= 3;
+ }
+ else
+ {
+ string++;
+ alloc--;
+ }
+ *ns++ = (char)in;
+ }
+ *ns = 0; /* terminate it */
+
+ if (olen)
+ /* store output size */
+ *olen = ns - ostring;
+
+ return;
+}
+
+int httpd_url_is_encoded(const char *url, size_t len)
+{
+ for (size_t i = 0; i < len; i++)
+ {
+ if (url[i] == '%')
+ {
+ return 1;
+ }
+ }
+ return 0;
+}
+
+static void httpd_set_tcp_addr(const struct tcphdr *tcph, struct httpd_session_addr *addr, enum flow_direction fdir)
+{
+ if (FLOW_DIRECTION_C2S == fdir)
+ {
+ addr->sport = tcph->th_sport;
+ addr->dport = tcph->th_dport;
+ }
+ else
+ {
+ addr->sport = tcph->th_dport;
+ addr->dport = tcph->th_sport;
+ }
+}
+static void httpd_set_ipv4_addr(const struct ip *ip4h, struct httpd_session_addr *addr, enum flow_direction fdir)
+{
+ addr->ipver = 4;
+ if (FLOW_DIRECTION_C2S == fdir)
+ {
+ addr->saddr4 = ip4h->ip_src.s_addr;
+ addr->daddr4 = ip4h->ip_dst.s_addr;
+ }
+ else
+ {
+ addr->saddr4 = ip4h->ip_dst.s_addr;
+ addr->daddr4 = ip4h->ip_src.s_addr;
+ }
+}
+static void httpd_set_ipv6_addr(const struct ip6_hdr *ip6h, struct httpd_session_addr *addr, enum flow_direction fdir)
+{
+ addr->ipver = 6;
+ if (FLOW_DIRECTION_C2S == fdir)
+ {
+ memcpy(&addr->saddr6, &ip6h->ip6_src, sizeof(struct in6_addr));
+ memcpy(&addr->daddr6, &ip6h->ip6_dst, sizeof(struct in6_addr));
+ }
+ else
+ {
+ memcpy(&addr->saddr6, &ip6h->ip6_dst, sizeof(struct in6_addr));
+ memcpy(&addr->daddr6, &ip6h->ip6_src, sizeof(struct in6_addr));
+ }
+}
+
+void httpd_session_get_addr(const struct session *sess, struct httpd_session_addr *addr)
+{
+ if (sess == NULL || addr == NULL)
+ {
+ return;
+ }
+ enum flow_direction fdir = session_get_current_flow_direction(sess);
+ const struct packet *raw_pkt = session_get_first_packet(sess, fdir);
+ if (NULL == raw_pkt)
+ {
+ addr->ipver = 0;
+ return;
+ }
+
+ struct layer pkt_layer = {};
+ PACKET_FOREACH_LAYER_REVERSE(raw_pkt, pkt_layer)
+ {
+ if (pkt_layer.proto == LAYER_PROTO_TCP)
+ {
+ httpd_set_tcp_addr(pkt_layer.hdr.tcp, addr, fdir);
+ }
+ else if (pkt_layer.proto == LAYER_PROTO_IPV4)
+ {
+ httpd_set_ipv4_addr(pkt_layer.hdr.ip4, addr, fdir);
+ break;
+ }
+ else if (pkt_layer.proto == LAYER_PROTO_IPV6)
+ {
+ httpd_set_ipv6_addr(pkt_layer.hdr.ip6, addr, fdir);
+ break;
+ }
+ }
+
+ return;
+} \ No newline at end of file
diff --git a/decoders/http/http_decoder_utils.h b/decoders/http/http_decoder_utils.h
new file mode 100644
index 0000000..33483cf
--- /dev/null
+++ b/decoders/http/http_decoder_utils.h
@@ -0,0 +1,80 @@
+#pragma once
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <cstddef>
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+#include <bits/types/struct_iovec.h>
+#include "stellar/stellar.h"
+#include "stellar/utils.h"
+#include "stellar/session.h"
+#include "stellar/stellar_mq.h"
+#include "stellar/stellar_exdata.h"
+#ifdef __cplusplus
+}
+#endif
+
+char *safe_dup(const char *str, size_t len);
+int strncasecmp_safe(const char *fix_s1, const char *dyn_s2, size_t fix_n1, size_t dyn_n2);
+const char *http_message_type_to_string(enum http_message_type type);
+int http_message_type_is_req(struct session *sess, enum http_message_type msg_type);
+int http_event_is_req(enum http_event event);
+int stellar_session_mq_get_topic_id_reliable(struct stellar *st, const char *topic_name, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
+void httpd_url_decode(const char *string, size_t length, char *ostring, size_t *olen);
+int httpd_url_is_encoded(const char *url, size_t len);
+/******************************************************************************
+ * Logger
+ ******************************************************************************/
+
+enum http_decoder_log_level {
+ DEBUG = 0x11,
+ WARN = 0x12,
+ INFO = 0x13,
+ ERROR = 0x14,
+};
+
+#ifndef http_decoder_log
+#define http_decoder_log(level, format, ...) \
+ { \
+ switch (level) \
+ { \
+ case DEBUG: \
+ fprintf(stdout, "HTTP_DECODER [DEBUG] " format "\n", ##__VA_ARGS__); \
+ fflush(stdout); \
+ break; \
+ case WARN: \
+ fprintf(stdout, "HTTP_DECODER [WARN] " format "\n", ##__VA_ARGS__); \
+ fflush(stdout); \
+ break; \
+ case INFO: \
+ fprintf(stdout, "HTTP_DECODER [INFO] " format "\n", ##__VA_ARGS__); \
+ fflush(stdout); \
+ break; \
+ case ERROR: \
+ fprintf(stderr, "HTTP_DECODER [ERROR] " format "\n", ##__VA_ARGS__); \
+ fflush(stderr); \
+ break; \
+ } \
+ }
+#endif
+
+
+#include <netinet/in.h>
+
+struct httpd_session_addr
+{
+ uint8_t ipver; /* 4 or 6 */
+ uint16_t sport; /* network order */
+ uint16_t dport; /* network order */
+ union
+ {
+ uint32_t saddr4;
+ uint32_t daddr4;
+ struct in6_addr saddr6;
+ struct in6_addr daddr6;
+ };
+};
+void httpd_session_get_addr(const struct session *sess, struct httpd_session_addr *addr);
diff --git a/decoders/http/version.map b/decoders/http/version.map
new file mode 100644
index 0000000..15d1d95
--- /dev/null
+++ b/decoders/http/version.map
@@ -0,0 +1,11 @@
+VERS_3.0{
+global:
+ extern "C" {
+ http_message_*;
+ http_decoder_init;
+ http_decoder_exit;
+ http_decoder_tcp_stream_msg_cb;
+ http_tunnel_message_*;
+ };
+local: *;
+}; \ No newline at end of file