summaryrefslogtreecommitdiff
path: root/decoders/http/http_decoder_half.c
diff options
context:
space:
mode:
Diffstat (limited to 'decoders/http/http_decoder_half.c')
-rw-r--r--decoders/http/http_decoder_half.c1548
1 files changed, 525 insertions, 1023 deletions
diff --git a/decoders/http/http_decoder_half.c b/decoders/http/http_decoder_half.c
index 70991c6..84874a3 100644
--- a/decoders/http/http_decoder_half.c
+++ b/decoders/http/http_decoder_half.c
@@ -2,1187 +2,689 @@
#include <stdio.h>
#include <string.h>
#include <arpa/inet.h>
-#include "http_decoder_private.h"
+#include "http_decoder.h"
+#include "http_decoder_half.h"
+#include "http_decoder_utils.h"
+#include "http_decoder_decompress.h"
#include "llhttp.h"
+#include "stellar/session.h"
#include "uthash/utlist.h"
+#include "uthash/utarray.h"
-struct http_decompress_buffer
+#ifdef __cplusplus
+extern "C"
{
- struct iovec iov;
- char is_commit;
- struct http_decompress_buffer *next, *prev;
-};
-
-struct http_decoder_half_data
-{
- struct http_decoder_table *table;
-
- int major_version;
- int minor_version;
- int status_code;
-
- enum http_event state;
-
- enum http_content_encoding content_encoding;
- struct http_content_decompress *decompress;
-#if 0
- char *ref_decompress_body;
- size_t decompress_body_len;
-#else
- struct http_decompress_buffer *decompress_buffer_list;
-#endif
- int joint_url_complete;
- int url_is_encoded;
- // http://<host>[:<port>]/<path>?<searchpart>
- hstring joint_url;
- hstring decoded_url;
- long long transaction_index;
-};
-
-struct http_decoder_half
-{
- llhttp_t parser;
- llhttp_settings_t settings;
- enum llhttp_errno error;
- int decompress_switch;
- struct http_decoder_env *httpd_env;
-
- // uint8_t is_request_flow;
- enum http_event event;
- http_event_cb *http_ev_cb;
- struct http_event_context *http_ev_ctx;
-
- struct http_decoder_half_data *ref_data;
-
- long long trans_counter;
- long long err_counter;
- long long transaction_seq; // accumulated
-
- const char *data;
- int data_len;
-};
-
-// #define HTTP_DECODER_DEBUG
-#ifdef HTTP_DECODER_DEBUG
-static void printf_debug_info(const char *desc, const char *at, size_t length)
-{
- if (at)
- {
- char *temp = http_safe_dup(at, length);
- printf("HTTP PARSER STAGE: %s: %s\n", desc, temp);
- FREE(temp);
- }
- else
- {
- printf("HTTP PARSER STAGE: %s\n", desc);
- }
-}
-#else
-#define printf_debug_info(desc, at, length)
#endif
-void http_half_decompress_buffer_free(struct http_decoder_half_data *data, hstring *decompress_body)
-{
- struct http_decompress_buffer *el, *tmp;
- DL_FOREACH_SAFE(data->decompress_buffer_list, el, tmp)
+ static inline void http_half_stage_reset_flow_data(struct http_half *half)
{
- if (el->iov.iov_base == decompress_body->iov_base && el->iov.iov_len == decompress_body->iov_len)
+ if (half->flow_data)
{
- DL_DELETE(data->decompress_buffer_list, el);
- if (el->iov.iov_base)
+ memset(&half->flow_data->req_line, 0, sizeof(struct http_request_line));
+ memset(&half->flow_data->status_line, 0, sizeof(struct http_status_line));
+ memset(&half->flow_data->header, 0, sizeof(struct http_header));
+ if (half->flow_data->ut_filed_array)
{
- FREE(el->iov.iov_base);
+ utarray_clear(half->flow_data->ut_filed_array);
}
- FREE(el);
- break;
}
}
-}
-
-void http_half_get_lastest_decompress_buffer(struct http_decoder_half_data *data, hstring *decompress_body)
-{
- if (data->content_encoding == HTTP_CONTENT_ENCODING_NONE)
- {
- return;
- }
- if (data->decompress_buffer_list == NULL)
- {
- decompress_body->iov_base = NULL;
- decompress_body->iov_len = 0;
- return;
- }
- if (data->decompress_buffer_list->prev->is_commit == 1)
+ static inline void http_half_stage_reset_stat(struct http_half *half)
{
- decompress_body->iov_base = NULL;
- decompress_body->iov_len = 0;
- return;
+ memset(&half->half_stage, 0, sizeof(struct http_half_llhttp_stage));
}
- decompress_body->iov_base = data->decompress_buffer_list->prev->iov.iov_base;
- decompress_body->iov_len = data->decompress_buffer_list->prev->iov.iov_len;
- data->decompress_buffer_list->prev->is_commit = 1;
-}
-
-static void http_decoder_half_data_decompress(struct http_decoder_half_data *data)
-{
- assert(data);
-
- if (data->content_encoding == HTTP_CONTENT_ENCODING_NONE)
+ static void http_half_stage_reset(struct http_half *half)
{
- return;
+ http_half_stage_reset_stat(half);
+ http_half_stage_reset_flow_data(half);
}
- hstring raw_body = {};
- http_decoder_table_get_body(data->table, (char **)&raw_body.iov_base, &raw_body.iov_len);
- if (raw_body.iov_base == NULL || raw_body.iov_len == 0)
+ void http_flow_append_header_filed(struct http_half_data *flow_data, const char *at, size_t length)
{
- return;
+ if (NULL == flow_data->ut_filed_array) // the first field
+ {
+ UT_icd header_icd = {sizeof(struct http_header_field), NULL, NULL, NULL};
+ utarray_new(flow_data->ut_filed_array, &header_icd);
+ assert(flow_data->ut_filed_array != NULL);
+ utarray_reserve(flow_data->ut_filed_array, HTTP_HEADER_FIELD_RESERVE_NUM);
+ }
+ struct http_header_field new_filed = {};
+ new_filed.field_name = (char *)at;
+ new_filed.field_name_len = length;
+ utarray_push_back(flow_data->ut_filed_array, &new_filed);
}
- if (NULL == data->decompress)
+ void http_flow_append_header_value(struct http_half_data *flow_data, const char *at, size_t length)
{
- data->decompress = http_content_decompress_create(data->content_encoding);
+ assert(flow_data->ut_filed_array != NULL);
+ struct http_header_field *last_field = (struct http_header_field *)utarray_back(flow_data->ut_filed_array);
+ assert(last_field != NULL);
+ if (last_field)
+ {
+ last_field->field_value = (char *)at;
+ last_field->field_value_len = length;
+ }
}
- assert(data->decompress);
- char *local_outdata = NULL;
- size_t local_outdata_len = 0;
- if (http_content_decompress_write(data->decompress, (char *)raw_body.iov_base,
- raw_body.iov_len,
- &local_outdata,
- &local_outdata_len) == -1)
+ static struct http_message *http_produce_message(struct session *sess, enum http_event event,
+ enum http_topic_type topic_type, struct http_half_data *flow_data)
{
- // log error
- http_content_decompress_destroy(data->decompress);
- data->decompress = NULL;
- return;
+ struct http_message *msg = (struct http_message *)calloc(1, sizeof(struct http_message));
+ msg->sess_ref = sess;
+ msg->topic_type = topic_type;
+ msg->flow_data = flow_data;
+ msg->event = event;
+ return msg;
}
- if (local_outdata != NULL && local_outdata_len > 0)
+ static struct http_half_data *http_half_data_new(enum flow_type flow_dir)
{
- struct http_decompress_buffer *decompress_buffer = CALLOC(struct http_decompress_buffer, 1);
- assert(decompress_buffer);
- decompress_buffer->iov.iov_base = local_outdata;
- decompress_buffer->iov.iov_len = local_outdata_len;
- DL_APPEND(data->decompress_buffer_list, decompress_buffer);
- http_content_decompress_ownership_borrow(data->decompress);
+ struct http_half_data *half_data = (struct http_half_data *)calloc(1, sizeof(struct http_half_data));
+ half_data->flow_dir = flow_dir;
+ /* update transaction seq in http_half_parse_headers_finally(), not here */
+ return half_data;
}
-}
-
-/* Possible return values 0, -1, `HPE_PAUSED` */
-static int on_message_begin(llhttp_t *http)
-{
- printf_debug_info("on_message_begin", NULL, 0);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
- if (half->parser.type == HTTP_REQUEST)
- {
- half->event = HTTP_EVENT_REQ_INIT;
- }
- else
+ void http_half_data_free(struct http_half_data *half_data)
{
- half->event = HTTP_EVENT_RES_INIT;
+ if (NULL == half_data)
+ {
+ return;
+ }
+ if (half_data->ut_filed_array)
+ {
+ utarray_free(half_data->ut_filed_array);
+ half_data->ut_filed_array = NULL;
+ }
+ if (half_data->joint_url.iov_base)
+ {
+ FREE(half_data->joint_url.iov_base);
+ }
+ if (half_data->decompress)
+ {
+ http_content_decompress_destroy(half_data->decompress);
+ half_data->decompress = NULL;
+ }
+ free(half_data);
}
- half->ref_data = NULL;
-
- assert(half->http_ev_cb != NULL);
- half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler()
-
- half->trans_counter++;
- half->ref_data->transaction_index = half->transaction_seq++;
- return 0;
-}
-
-static int on_message_complete(llhttp_t *http)
-{
- printf_debug_info("on_message_complete", NULL, 0);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
-
- if (half->parser.type == HTTP_REQUEST)
+ void http_half_free(struct http_half *half)
{
- if (half->event == HTTP_EVENT_REQ_BODY_DATA)
+ if (NULL == half)
{
- half->event = HTTP_EVENT_REQ_BODY_END;
- half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env);
+ return;
}
- }
- else
- {
- if (half->event == HTTP_EVENT_RES_BODY_DATA)
+ llhttp_reset(&half->parser.llhttp_parser);
+ http_half_data_free(half->flow_data);
+ if (half->cached_header_buffer)
{
- half->event = HTTP_EVENT_RES_BODY_END;
- half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env);
+ if (half->cached_header_buffer->buffer)
+ {
+ FREE(half->cached_header_buffer->buffer);
+ }
+ FREE(half->cached_header_buffer);
}
+ free(half);
}
- // trigger req_end/res_end
- if (half->parser.type == HTTP_REQUEST)
- {
- half->event = HTTP_EVENT_REQ_END;
- half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env);
- }
- else
+ struct http_half *http_half_new(enum flow_type flow_dir)
{
- half->event = HTTP_EVENT_RES_END;
- half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env);
+ struct http_half *half = (struct http_half *)calloc(1, sizeof(struct http_half));
+ if (FLOW_TYPE_C2S == flow_dir)
+ {
+ http_flow_parser_init(&half->parser, HTTP_REQUEST);
+ }
+ else
+ {
+ http_flow_parser_init(&half->parser, HTTP_RESPONSE);
+ }
+ half->parser.half_ref = half;
+ return half;
}
- return 0;
-}
-
-static int on_reset(llhttp_t *http __attribute__((unused)))
-{
- printf_debug_info("on_reset", NULL, 0);
-
- return 0;
-}
-
-static inline int is_line_crlf(struct http_decoder_half *half)
-{
- const char *chr_r = (char *)memrchr(half->data, '\r', half->data_len);
- const char *chr_n = (char *)memrchr(half->data, '\n', half->data_len);
- if (chr_r && chr_n && (chr_r + 1 == chr_n))
+ static struct http_header_field *http_flow_get_header_field(struct http_half_data *flow_data, const char *field_name, size_t field_name_len)
{
- return 1;
+ struct http_header_field *p;
+ if (NULL == flow_data->ut_filed_array) // some response no header fileds, such as "HTTP/1.1 100 Continue\r\n\r\n"
+ {
+ return NULL;
+ }
+ for (p = (struct http_header_field *)utarray_front(flow_data->ut_filed_array);
+ p != NULL;
+ p = (struct http_header_field *)utarray_next(flow_data->ut_filed_array, p))
+ {
+ if ((field_name_len == p->field_name_len) &&
+ (strncasecmp(p->field_name, field_name, field_name_len) == 0))
+ {
+ return p;
+ }
+ }
+ return NULL;
}
- return 0;
-}
-
-static int on_method(llhttp_t *http, const char *at, size_t length)
-{
- printf_debug_info("on_method", at, length);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
-
- http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_METHOD, at, length);
- return 0;
-}
-
-/* Information-only callbacks, return value is ignored */
-static int on_method_complete(llhttp_t *http)
-{
- printf_debug_info("on_method_complete", NULL, 0);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
- if (is_line_crlf(half) == 0)
+ static void http_using_session_addr_as_host(struct session *ref_session, struct http_header_field *host_result)
{
- http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_METHOD);
- }
-
- http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_METHOD);
-
- return 0;
-}
-
-/* Possible return values 0, -1, HPE_USER */
-static int on_uri(llhttp_t *http, const char *at, size_t length)
-{
- printf_debug_info("on_uri", at, length);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
-
- http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_URI, at, length);
- return 0;
-}
-
-static void http_decoder_cached_portion_url(struct http_decoder_half *half, const hstring *uri_result)
-{
- struct http_decoder_half_data *ref_data = half->ref_data;
- int uri_skip_len = 0;
+ memset(host_result, 0, sizeof(struct http_header_field));
+ struct http_session_addr ssaddr = {};
+ httpd_session_get_addr(ref_session, &ssaddr);
+ char ip_string_buf[INET6_ADDRSTRLEN];
- if ((uri_result->iov_len) > 7 && (strncasecmp("http://", (char *)uri_result->iov_base, 7) == 0)) // absolute URI
- {
- uri_skip_len = strlen("http://");
- ref_data->joint_url_complete = 1;
- }
- else
- {
- ref_data->joint_url_complete = 0;
+ if (4 == ssaddr.ipver)
+ {
+ host_result->field_value = (char *)calloc(1, (INET6_ADDRSTRLEN + 7) /* "ip:port" max length */);
+ inet_ntop(AF_INET, &ssaddr.daddr4, ip_string_buf, INET6_ADDRSTRLEN);
+ snprintf((char *)host_result->field_value, INET6_ADDRSTRLEN + 7, "%s:%u", ip_string_buf, ntohs(ssaddr.dport));
+ host_result->field_value_len = strlen(host_result->field_value);
+ }
+ else if (6 == ssaddr.ipver)
+ {
+ host_result->field_value = (char *)calloc(1, (INET6_ADDRSTRLEN + 7) /* "ip:port" max length */);
+ inet_ntop(AF_INET6, &ssaddr.daddr6, ip_string_buf, INET6_ADDRSTRLEN);
+ snprintf((char *)host_result->field_value, INET6_ADDRSTRLEN + 7, "%s:%u", ip_string_buf, ntohs(ssaddr.dport));
+ host_result->field_value_len = strlen(host_result->field_value);
+ }
+ else
+ {
+ host_result->field_value = (char *)calloc(1, 1);
+ host_result->field_value_len = 1;
+ }
+ return;
}
- ref_data->joint_url.iov_len = uri_result->iov_len - uri_skip_len;
- ref_data->joint_url.iov_base = MEMPOOL_CALLOC(half->http_ev_ctx->ref_mempool, char, ref_data->joint_url.iov_len);
- memcpy(ref_data->joint_url.iov_base, (char *)uri_result->iov_base + uri_skip_len, ref_data->joint_url.iov_len);
-}
-
-/* Information-only callbacks, return value is ignored */
-static int on_uri_complete(llhttp_t *http)
-{
- printf_debug_info("on_uri_complete", NULL, 0);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
-
- if (is_line_crlf(half) == 0)
+ static void http_flow_join_url(struct http_half_data *flow_data, const struct http_header_field *host_filed)
{
- http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_URI);
- }
-
- http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_URI);
-
- hstring uri_result = {};
- http_decoder_table_get_uri(half->ref_data->table, (char **)&uri_result.iov_base, &uri_result.iov_len);
- assert(uri_result.iov_base);
- http_decoder_cached_portion_url(half, &uri_result);
-
- return 0;
-}
+ int append_slash_len = 0;
+ const char *join_uri = flow_data->req_line.uri;
+ size_t join_url_len = flow_data->req_line.uri_len;
-/* Possible return values 0, -1, HPE_USER */
-static int on_version(llhttp_t *http, const char *at, size_t length)
-{
- printf_debug_info("on_version", at, length);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
-
- http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_VERSION, at, length);
- return 0;
-}
+ // skip schema "http://"
+ if (join_url_len > 7 && strncasecmp(join_uri, "http://", 7) == 0)
+ {
+ join_uri += 7;
+ join_url_len -= 7;
+ }
-/* Information-only callbacks, return value is ignored */
-static int on_version_complete(llhttp_t *http)
-{
- printf_debug_info("on_version_complete", NULL, 0);
+ // if uri not absolute path(start with '/'), append '/'
+ if ('/' != join_uri[0])
+ {
+ append_slash_len = 1;
+ }
+ int url_cache_str_len = host_filed->field_value_len + join_url_len + append_slash_len;
+ char *url_cache_str = (char *)calloc(1, url_cache_str_len);
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
+ char *ptr = url_cache_str;
+ memcpy(ptr, host_filed->field_value, host_filed->field_value_len);
+ ptr += host_filed->field_value_len;
+ if (append_slash_len)
+ {
+ *ptr = '/';
+ ptr++;
+ }
+ memcpy(ptr, join_uri, join_url_len);
- if (is_line_crlf(half) == 0)
- {
- http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_VERSION);
+ flow_data->joint_url.iov_base = url_cache_str;
+ flow_data->joint_url.iov_len = url_cache_str_len;
}
- http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_VERSION);
-
- half->ref_data->major_version = llhttp_get_http_major(&half->parser);
- half->ref_data->minor_version = llhttp_get_http_minor(&half->parser);
-
- if (half->parser.type == HTTP_REQUEST)
+ // todo: optimize use hash table
+ static void http_half_get_frequently_used_fileds(struct http_half_data *flow_data)
{
- half->event = HTTP_EVENT_REQ_LINE;
- if (half->http_ev_cb) // http_event_handler()
+ if (FLOW_TYPE_C2S == flow_data->flow_dir)
+ {
+ flow_data->header.host = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_HOST, strlen(HTTP_HEADER_FIELD_NAME_HOST));
+ flow_data->header.user_agent = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_USER_AGENT, strlen(HTTP_HEADER_FIELD_NAME_USER_AGENT));
+ flow_data->header.referer = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_REFERER, strlen(HTTP_HEADER_FIELD_NAME_REFERER));
+ flow_data->header.cookie = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_COOKIE, strlen(HTTP_HEADER_FIELD_NAME_COOKIE));
+ }
+ else
{
- half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env);
+ flow_data->header.set_cookie = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_SET_COOKIE, strlen(HTTP_HEADER_FIELD_NAME_SET_COOKIE));
}
+ flow_data->header.content_type = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_CONTENT_TYPE, strlen(HTTP_HEADER_FIELD_NAME_CONTENT_TYPE));
}
- return 0;
-}
-
-/* Possible return values 0, -1, HPE_USER */
-static int on_status(llhttp_t *http, const char *at, size_t length)
-{
- printf_debug_info("on_status", at, length);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
-
- http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_STATUS, at, length);
- return 0;
-}
-
-/* Information-only callbacks, return value is ignored */
-static int on_status_complete(llhttp_t *http)
-{
- printf_debug_info("on_status_complete", NULL, 0);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
-
- if (is_line_crlf(half) == 0)
+ static int http_half_parse_headers_finally(struct http_half *half)
{
- http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_STATUS);
- }
+ struct http_half_data *flow_data = half->flow_data;
+ http_half_get_frequently_used_fileds(flow_data);
- http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_STATUS);
- half->ref_data->status_code = llhttp_get_status_code(&half->parser);
+ /* update transaction_seq when headers completed */
+ half->flow_data->transaction_seq = half->transaction_num++;
- if (half->parser.type == HTTP_RESPONSE)
- {
- half->event = HTTP_EVENT_RES_LINE;
- if (half->http_ev_cb != NULL) // http_event_handler()
+ if (FLOW_TYPE_C2S == flow_data->flow_dir)
{
- half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env);
+ struct http_header_field tmp_host_field = {};
+ const struct http_header_field *host_filed = flow_data->header.host;
+ if (NULL == host_filed)
+ {
+ http_using_session_addr_as_host(half->sess_ref, &tmp_host_field);
+ host_filed = &tmp_host_field;
+ }
+ http_flow_join_url(flow_data, host_filed);
+ FREE(tmp_host_field.field_name);
+ FREE(tmp_host_field.field_value);
}
- }
- return 0;
-}
-
-/* Possible return values 0, -1, HPE_USER */
-static int on_header_field(llhttp_t *http, const char *at, size_t length)
-{
- printf_debug_info("on_header_field", at, length);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
-
- http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_HDRKEY, at, length);
- return 0;
-}
-
-/* Information-only callbacks, return value is ignored */
-static int on_header_field_complete(llhttp_t *http)
-{
- printf_debug_info("on_header_field_complete", NULL, 0);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
-
- http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_HDRKEY);
-
- return 0;
-}
-
-/* Possible return values 0, -1, HPE_USER */
-static int on_header_value(llhttp_t *http, const char *at, size_t length)
-{
- printf_debug_info("on_header_value", at, length);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
-
- http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_HDRVAL, at, length);
- return 0;
-}
-
-#define MAX_ENCODING_STR_LEN 8
-/* Information-only callbacks, return value is ignored */
-static int on_header_value_complete(llhttp_t *http)
-{
- printf_debug_info("on_header_value_complete", NULL, 0);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
-
- if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_HDRKEY) ==
- STRING_STATE_CACHE)
- {
- http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_HDRKEY);
- }
-
- http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_HDRVAL);
+ struct http_header_field *encoding_field = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_CONTENT_ENCODING, strlen(HTTP_HEADER_FIELD_NAME_CONTENT_ENCODING));
+ if (NULL == encoding_field)
+ {
+ flow_data->content_encoding_type = HTTP_CONTENT_ENCODING_NONE;
+ }
+ else
+ {
+ flow_data->content_encoding_type = http_content_encoding_str2int(encoding_field->field_value, encoding_field->field_value_len);
+ }
- if (half->ref_data->content_encoding == HTTP_CONTENT_ENCODING_NONE)
- {
- struct http_header_field http_hdr = {};
+ struct http_header_field *content_len_field = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_CONTENT_LENGTH, strlen(HTTP_HEADER_FIELD_NAME_CONTENT_LENGTH));
+ if (NULL == content_len_field)
+ {
+ flow_data->header.content_length = -1;
+ }
+ else
+ {
+ flow_data->header.content_length = http_strtoll(content_len_field->field_value, content_len_field->field_value_len);
+ }
- if (http_decoder_table_get_header(half->ref_data->table, (char *)"Content-Encoding", 16, &http_hdr) == 0)
+ struct http_header_field *transf_encoding_field = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_TRANSFER_ENCODING, strlen(HTTP_HEADER_FIELD_NAME_TRANSFER_ENCODING));
+ if (NULL == transf_encoding_field)
{
- half->ref_data->content_encoding = http_content_encoding_str2int(http_hdr.value, http_hdr.value_len);
+ flow_data->transfer_encoding_is_chunked = -1;
}
- }
+ else
+ {
+ if (http_strncasecmp_safe(transf_encoding_field->field_value, "chunked", transf_encoding_field->field_value_len, strlen("chunked")) == 0)
+ {
+ flow_data->transfer_encoding_is_chunked = 1;
+ }
+ else
+ {
+ flow_data->transfer_encoding_is_chunked = 0;
+ }
+ }
+ flow_data->header.header_buf = half->half_stage.first_header_name_ptr;
+ flow_data->header.header_buf_sz = half->half_stage.last_headers_complete_ptr - half->half_stage.first_header_name_ptr;
- if (http->type == HTTP_REQUEST)
- {
- http_decoder_get_host_feed_url(half);
+ // todo, parse other frequently used fields
+ return 0;
}
- return 0;
-}
-
-/* When on_chunk_header is called, the current chunk length is stored
- * in parser->content_length.
- * Possible return values 0, -1, `HPE_PAUSED`
- */
-static int on_chunk_header(llhttp_t *http __attribute__((unused)))
-{
- printf_debug_info("on_chunk_header", NULL, 0);
- return 0;
-}
-
-/* When on_chunk_header is called, the current chunk length is stored
- * in parser->content_length.
- * Possible return values 0, -1, `HPE_PAUSED`
- */
-static int on_chunk_header_complete(llhttp_t *http __attribute__((unused)))
-{
- printf_debug_info("on_chunk_header_complete", NULL, 0);
-
- return 0;
-}
-
-/* Possible return values:
- * 0 - Proceed normally
- * 1 - Assume that request/response has no body, and proceed to parsing the next message
- * 2 - Assume absence of body (as above) and make `llhttp_execute()` return `HPE_PAUSED_UPGRADE`
- * -1 - Error `HPE_PAUSED`
- */
-static int on_headers_complete(llhttp_t *http)
-{
- printf_debug_info("on_headers_complete", NULL, 0);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
- assert(half->ref_data);
-
- http_decoder_table_set_header_complete(half->ref_data->table);
-
- if (half->parser.type == HTTP_REQUEST)
- {
- half->event = HTTP_EVENT_REQ_HDR_END;
- }
- else
+ int http_get_header_field_count(struct http_half_data *flow_data)
{
- half->event = HTTP_EVENT_RES_HDR_END;
+ if (NULL == flow_data->ut_filed_array)
+ {
+ return 0; // some response no header fileds, such as "HTTP/1.1 100 Continue\r\n\r\n"
+ }
+ return utarray_len(flow_data->ut_filed_array);
}
- half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler()
-
- return 0;
-}
-/* Possible return values 0, -1, HPE_USER */
-static int on_body(llhttp_t *http, const char *at, size_t length)
-{
- printf_debug_info("on_body", at, length);
-
- struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser);
- assert(half);
-
- // trigger body_begin event
- if (half->parser.type == HTTP_REQUEST)
+ void http_flow_body_decompress(struct http_half_data *flow_data, const char *zipdata, size_t zipdatalen)
{
- if (half->event == HTTP_EVENT_REQ_HDR_END)
+ assert(flow_data);
+ if (flow_data->content_encoding_type == HTTP_CONTENT_ENCODING_NONE)
{
- half->event = HTTP_EVENT_REQ_BODY_BEGIN;
- half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler()
+ return;
}
- }
- else
- {
- if (half->event == HTTP_EVENT_RES_HDR_END)
+ if (zipdata == NULL || zipdatalen == 0)
{
- half->event = HTTP_EVENT_RES_BODY_BEGIN;
- half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env);
+ return;
}
- }
-
- if (half->ref_data != NULL)
- {
- if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_BODY) ==
- STRING_STATE_COMMIT)
+ if (NULL == flow_data->decompress)
{
- http_decoder_table_reset(half->ref_data->table, HTTP_ITEM_BODY);
+ flow_data->decompress = http_content_decompress_create(flow_data->content_encoding_type);
}
- http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_BODY, at, length);
- http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_BODY);
- }
+ char *local_outdata = NULL;
+ size_t local_outdata_len = 0;
+ if (http_content_decompress_write(flow_data->decompress, (char *)zipdata,
+ zipdatalen, &local_outdata, &local_outdata_len) == -1)
+ {
+ http_content_decompress_destroy(flow_data->decompress);
+ flow_data->decompress = NULL;
+ flow_data->decompress_body.body = NULL;
+ flow_data->decompress_body.body_sz = 0;
+ return;
+ }
- if (1 == half->decompress_switch && half->ref_data->content_encoding != HTTP_CONTENT_ENCODING_NONE)
- {
- http_decoder_half_data_decompress(half->ref_data);
+ if (local_outdata != NULL && local_outdata_len > 0)
+ {
+ flow_data->decompress_body.body = local_outdata;
+ flow_data->decompress_body.body_sz = local_outdata_len;
+ http_content_decompress_ownership_borrow(flow_data->decompress);
+ }
}
- if (half->parser.type == HTTP_REQUEST)
- {
- half->event = HTTP_EVENT_REQ_BODY_DATA;
- half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler()
- }
- else
+ static void http_add_header_buf_reference(struct http_half *half)
{
- half->event = HTTP_EVENT_RES_BODY_DATA;
- half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env);
- }
-
- return 0;
-}
-
-static void http_decoder_half_init(struct http_decoder_half *half, http_event_cb *http_ev_cb, enum llhttp_type type)
-{
- llhttp_settings_init(&half->settings);
- llhttp_init(&half->parser, type, &half->settings);
-
- // half->is_request_flow = (type == HTTP_REQUEST) ? 1 : 0;
- half->settings.on_message_begin = on_message_begin;
- half->settings.on_message_complete = on_message_complete;
- half->settings.on_reset = on_reset;
-
- half->settings.on_url = on_uri;
- half->settings.on_url_complete = on_uri_complete;
-
- half->settings.on_status = on_status;
- half->settings.on_status_complete = on_status_complete;
-
- half->settings.on_method = on_method;
- half->settings.on_method_complete = on_method_complete;
-
- half->settings.on_version = on_version;
- half->settings.on_version_complete = on_version_complete;
-
- half->settings.on_header_field = on_header_field;
- half->settings.on_header_field_complete = on_header_field_complete;
-
- half->settings.on_header_value = on_header_value;
- half->settings.on_header_value_complete = on_header_value_complete;
-
- half->settings.on_chunk_header = on_chunk_header;
- half->settings.on_chunk_complete = on_chunk_header_complete;
-
- half->settings.on_headers_complete = on_headers_complete;
- half->settings.on_body = on_body;
-
- half->error = HPE_OK;
- half->http_ev_cb = http_ev_cb; // http_event_handler()
- half->ref_data = NULL;
-}
-
-struct http_decoder_half *http_decoder_half_new(struct http_decoder_exdata *hd_ctx, nmx_pool_t *mempool,
- http_event_cb *ev_cb, enum llhttp_type http_type,
- int decompress_switch, struct http_decoder_env *httpd_env, long long start_seq)
-{
- struct http_decoder_half *half = MEMPOOL_CALLOC(mempool, struct http_decoder_half, 1);
- assert(half);
-
- half->decompress_switch = decompress_switch;
- half->http_ev_ctx = MEMPOOL_CALLOC(mempool, struct http_event_context, 1);
- http_decoder_half_init(half, ev_cb, http_type);
- half->http_ev_ctx->ref_httpd_ctx = hd_ctx;
- half->httpd_env = httpd_env;
- half->transaction_seq = start_seq;
- return half;
-}
-
-void http_decoder_half_free(nmx_pool_t *mempool, struct http_decoder_half *half)
-{
- if (NULL == half)
- {
- return;
+ if (half->cached_header_buffer != NULL)
+ {
+ // add reference but not move ownership, because the buffer will be referenced of multiple transaction in pipeline mode
+ half->cached_header_buffer->reference++;
+ half->flow_data->cached_header_buffer = half->cached_header_buffer;
+ }
}
- if (half->http_ev_ctx != NULL)
+ static void http_update_body_offset(struct http_half_data *flow_data)
{
- MEMPOOL_FREE(mempool, half->http_ev_ctx);
- half->http_ev_ctx = NULL;
+ if (flow_data->decompress_body.body != NULL || flow_data->decompress_body.offset > 0)
+ {
+ flow_data->decompress_body.offset += flow_data->decompress_body.body_sz;
+ }
+ else
+ {
+ flow_data->raw_body.offset += flow_data->raw_body.body_sz;
+ }
}
- MEMPOOL_FREE(mempool, half);
-}
-
-void http_decoder_half_reinit(struct http_decoder_half *half,
- struct http_decoder_result_queue *queue,
- nmx_pool_t *mempool, struct session *sess)
-{
- assert(half != NULL);
- if (half->ref_data != NULL)
+ static void http_set_body_finish(struct http_half_data *flow_data)
{
- http_decoder_table_reinit(half->ref_data->table);
+ if (flow_data->decompress_body.offset > 0)
+ {
+ flow_data->decompress_body.body = NULL;
+ flow_data->decompress_body.body_sz = 0;
+ flow_data->decompress_body.is_finished = 1;
+ }
+ else
+ {
+ flow_data->raw_body.body = NULL;
+ flow_data->raw_body.body_sz = 0;
+ flow_data->raw_body.is_finished = 1;
+ }
}
- half->http_ev_ctx->ref_mempool = mempool;
- half->http_ev_ctx->ref_session = sess;
- half->http_ev_ctx->ref_queue = queue;
-}
-static void publish_message_for_parsed_header(struct http_decoder_half *half)
-{
- if (0 == http_decoder_table_has_parsed_header(half->ref_data->table))
- {
- return;
- }
- if (half->parser.type == HTTP_REQUEST)
+ void http_event_handler(struct http_half *half, enum http_event event, struct http *http_env)
{
- half->event = HTTP_EVENT_REQ_HDR;
- }
- else
- {
- half->event = HTTP_EVENT_RES_HDR;
- }
- half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler();
- return;
-}
+ struct http_message *msg = NULL;
+ int thread_id = module_manager_get_thread_id(http_env->mod_mgr_ref);
+ uint8_t flow_flag = 0;
+ struct mq_runtime *mq_rt = module_manager_get_mq_runtime(http_env->mod_mgr_ref);
-int http_decoder_half_parse(int proxy_enable, struct http_decoder_half *half, const char *data, size_t data_len)
-{
- assert(half && data);
-
- half->data = (const char *)data;
- half->data_len = data_len;
- half->error = llhttp_execute(&half->parser, data, data_len);
-
- int ret = 0;
- enum llhttp_type type = HTTP_BOTH;
+ switch (event)
+ {
+ case HTTP_EVENT_REQ_INIT:
+ http_half_stage_reset(half);
+ if (NULL == half->flow_data) /* call llhttp_reset when headers is not completed */
+ {
+ half->flow_data = http_half_data_new(FLOW_TYPE_C2S);
+ }
+ break;
- switch (half->error)
- {
- case HPE_OK:
- break;
- case HPE_PAUSED:
- llhttp_resume(&half->parser);
- break;
- case HPE_PAUSED_UPGRADE:
- if (proxy_enable)
+ case HTTP_EVENT_REQ_HDR_END:
{
- llhttp_resume_after_upgrade(&half->parser);
+ http_half_parse_headers_finally(half);
+ int tot_c2s_headers = http_get_header_field_count(half->flow_data);
+ http_stat_update(&http_env->stat, thread_id, HTTP_TRANSACTION_NEW, 1);
+ http_stat_update(&http_env->stat, thread_id, HTTP_C2S_HEADERS, tot_c2s_headers);
+ http_stat_update(&http_env->stat, thread_id, HTTP_URL_BYTES, half->flow_data->joint_url.iov_len);
+ http_add_header_buf_reference(half);
+ msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_REQ_HEADER, half->flow_data);
+ mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_REQ_HEADER].topic_id, msg);
}
- ret = 0;
break;
- default:
- type = (enum llhttp_type)half->parser.type;
- llhttp_init(&half->parser, type, &half->settings);
- ret = -1;
- break;
- }
-
- if (ret < 0)
- {
- // fprintf(stdout,
- // "llhttp_execute parse error: %s err_reason:%s\n",
- // llhttp_errno_name(half->error), half->parser.reason);
- return half->error;
- }
-
- if (half->ref_data != NULL)
- {
- if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_URI) == STRING_STATE_REFER)
+ case HTTP_EVENT_REQ_BODY_BEGIN:
+ break;
+ case HTTP_EVENT_REQ_BODY_DATA:
{
- http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_URI);
+ http_update_body_offset(half->flow_data);
+ msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_REQ_BODY, half->flow_data);
+ mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_REQ_BODY].topic_id, msg);
}
-
- if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_STATUS) == STRING_STATE_REFER)
+ break;
+ case HTTP_EVENT_REQ_BODY_END:
{
- http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_STATUS);
+ http_set_body_finish(half->flow_data);
+ msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_REQ_BODY, half->flow_data);
+ mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_REQ_BODY].topic_id, msg);
}
-
- if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_METHOD) == STRING_STATE_REFER)
+ break;
+ case HTTP_EVENT_REQ_END:
{
- http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_METHOD);
+ if ((0 == session_is_symmetric(half->sess_ref, &flow_flag) && (SESSION_SEEN_C2S_FLOW == flow_flag)))
+ {
+ http_stat_update(&http_env->stat, thread_id, HTTP_TRANSACTION_FREE, 1);
+ http_stat_update(&http_env->stat, thread_id, HTTP_C2S_ASYMMETRY_TRANSACTION, 1);
+ }
+ msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_REQ_BODY, half->flow_data);
+ mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_REQ_BODY].topic_id, msg);
+ half->flow_data = NULL; // ownership move to message, free it in message free callback
}
+ break;
- if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_VERSION) == STRING_STATE_REFER)
+ case HTTP_EVENT_RES_INIT:
{
- http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_VERSION);
+ http_half_stage_reset(half);
+ if (NULL == half->flow_data) /* call llhttp_reset when headers is not completed */
+ {
+ half->flow_data = http_half_data_new(FLOW_TYPE_S2C);
+ }
}
+ break;
- if (http_decoder_table_header_complete(half->ref_data->table))
+ case HTTP_EVENT_RES_HDR_END:
{
- http_decoder_table_reset_header_complete(half->ref_data->table);
+ http_half_parse_headers_finally(half);
+ int tot_s2c_headers = http_get_header_field_count(half->flow_data);
+ http_stat_update(&http_env->stat, thread_id, HTTP_S2C_HEADERS, tot_s2c_headers);
+ if ((0 == session_is_symmetric(half->sess_ref, &flow_flag) && (SESSION_SEEN_S2C_FLOW == flow_flag)))
+ {
+ http_stat_update(&http_env->stat, thread_id, HTTP_TRANSACTION_NEW, 1);
+ }
+ http_add_header_buf_reference(half);
+ msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_RES_HEADER, half->flow_data);
+ mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_RES_HEADER].topic_id, msg);
}
- else
+ break;
+ case HTTP_EVENT_RES_BODY_BEGIN:
+ break;
+ case HTTP_EVENT_RES_BODY_DATA:
{
- // if headers are not completed with EOF \r\n\r\n, push the parsed headers so far
- publish_message_for_parsed_header(half);
+ http_update_body_offset(half->flow_data);
+ msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_RES_BODY, half->flow_data);
+ mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_RES_BODY].topic_id, msg);
}
-
- enum string_state hdr_key_state =
- http_decoder_table_state(half->ref_data->table, HTTP_ITEM_HDRKEY);
- enum string_state hdr_val_state =
- http_decoder_table_state(half->ref_data->table, HTTP_ITEM_HDRVAL);
-
- /* Truncated in http header key
- For example http header k-v => User-Agent: Chrome
- case1:
- packet1: User- hdr_key_state == STRING_STATE_REFER
- packet2: Agent: Chrome
-
- case2:
- packet1: User-Agent: hdr_key_state == STRING_STATE_COMMIT
- hdr_val_state == STRING_STATE_INIT
- packet2: Chrome
- */
- if (hdr_key_state == STRING_STATE_REFER ||
- (hdr_key_state == STRING_STATE_COMMIT && hdr_val_state == STRING_STATE_INIT))
+ break;
+ case HTTP_EVENT_RES_BODY_END:
{
- http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_HDRKEY);
+ http_set_body_finish(half->flow_data);
+ msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_RES_BODY, half->flow_data);
+ mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_RES_BODY].topic_id, msg);
}
-
- /* Truncated in http header value
- For example http header k-v => User-Agent: Chrome
- packet1: User-Agent: Ch hdr_key_state == STRING_STATE_COMMIT
- hdr_val_state == STRING_STATE_REFER
-
- packet2: rome
- */
- if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_HDRVAL) == STRING_STATE_REFER)
+ break;
+ case HTTP_EVENT_RES_END:
{
- /* Header key should have been committed
- If it's not cached, cache it for next packet to use
- */
- http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_HDRKEY);
- http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_HDRVAL);
+ if ((0 == session_is_symmetric(half->sess_ref, &flow_flag) && (SESSION_SEEN_S2C_FLOW == flow_flag)))
+ {
+ http_stat_update(&http_env->stat, thread_id, HTTP_S2C_ASYMMETRY_TRANSACTION, 1);
+ }
+ http_stat_update(&http_env->stat, thread_id, HTTP_TRANSACTION_FREE, 1);
+ msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_RES_BODY, half->flow_data);
+ mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_RES_BODY].topic_id, msg);
+ half->flow_data = NULL; // ownership move to message, free it in message free callback
}
-
- if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_BODY) == STRING_STATE_REFER)
- {
- http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_BODY);
+ break;
+ default:
+ assert(0);
+ break;
}
}
- return 0;
-}
-
-long long http_decoder_half_trans_count(struct http_decoder_half *half)
-{
- if (NULL == half)
- {
- return 0;
- }
-
- long long trans_cnt = half->trans_counter;
- half->trans_counter = 0;
-
- return trans_cnt;
-}
-
-struct http_decoder_half_data *
-http_decoder_half_data_new(nmx_pool_t *mempool)
-{
- struct http_decoder_half_data *data =
- MEMPOOL_CALLOC(mempool, struct http_decoder_half_data, 1);
- assert(data);
-
- data->table = http_decoder_table_new(mempool);
- assert(data->table);
-
- data->major_version = -1;
- data->minor_version = -1;
- data->status_code = -1;
-
- data->content_encoding = HTTP_CONTENT_ENCODING_NONE;
- // data->ref_decompress_body = NULL;
- // data->decompress_body_len = 0;
- data->decompress_buffer_list = NULL;
- return data;
-}
-
-static void http_decoder_half_decompress_buf_free(struct http_decoder_half_data *ref_data)
-{
- if (ref_data == NULL)
+ static void http_half_stage_buf_free(struct http_half *half)
{
- return;
- }
- struct http_decompress_buffer *el, *tmp;
- DL_FOREACH_SAFE(ref_data->decompress_buffer_list, el, tmp)
- {
- DL_DELETE(ref_data->decompress_buffer_list, el);
- if (el->iov.iov_base != NULL)
+ if (half->cached_header_buffer != NULL)
{
- FREE(el->iov.iov_base);
+ http_buffer_free(half->cached_header_buffer);
}
- FREE(el);
- }
- ref_data->decompress_buffer_list = NULL;
-}
-
-void http_decoder_half_data_free(nmx_pool_t *mempool, struct http_decoder_half_data *data)
-{
- if (NULL == data)
- {
- return;
- }
-
- if (data->table != NULL)
- {
- http_decoder_table_free(data->table);
- data->table = NULL;
+ half->cached_header_buffer = NULL;
}
- if (data->decompress != NULL)
+ static void http_half_stage_buf_reuse(struct http_half *half)
{
- http_content_decompress_destroy(data->decompress);
- data->decompress = NULL;
+ http_half_stage_buf_free(half);
+ half->cached_header_buffer = http_buffer_new();
}
- if (data->joint_url.iov_base)
- {
- MEMPOOL_FREE(mempool, data->joint_url.iov_base);
- data->joint_url.iov_base = NULL;
- data->joint_url_complete = 0;
- }
- http_decoder_half_decompress_buf_free(data);
- MEMPOOL_FREE(mempool, data);
-}
-
-int http_decoder_half_data_get_request_line(struct http_decoder_half_data *data,
- struct http_request_line *line)
-{
- http_decoder_table_get_method(data->table, &line->method, &line->method_len);
- http_decoder_table_get_uri(data->table, &line->uri, &line->uri_len);
- http_decoder_table_get_version(data->table, &line->version, &line->version_len);
-
- line->major_version = data->major_version;
- line->minor_version = data->minor_version;
-
- return 0;
-}
-
-int http_decoder_half_data_get_response_line(struct http_decoder_half_data *data,
- struct http_response_line *line)
-{
- http_decoder_table_get_version(data->table, &line->version, &line->version_len);
- http_decoder_table_get_status(data->table, &line->status, &line->status_len);
-
- line->major_version = data->major_version;
- line->minor_version = data->minor_version;
- line->status_code = data->status_code;
-
- return 0;
-}
-
-int http_decoder_half_data_get_header(const struct http_decoder_half_data *data,
- const char *name, size_t name_len,
- struct http_header_field *hdr_result)
-{
- return http_decoder_table_get_header(data->table, name, name_len, hdr_result);
-}
-
-int http_decoder_half_data_iter_header(struct http_decoder_half_data *data,
- struct http_header_field *header)
-{
- return http_decoder_table_iter_header((struct http_decoder_table *)data->table, header);
-}
-
-int http_decoder_half_data_reset_header_iter(struct http_decoder_half_data *req_data)
-{
- if (NULL == req_data)
- {
- return -1;
- }
- return http_decoder_table_reset_header_iter(req_data->table);
-}
-
-int http_decoder_half_data_has_parsed_header(struct http_decoder_half_data *data)
-{
- if (NULL == data)
+ static int http_half_stage_buf_append(struct http_half *half, const char *newdata, size_t newdata_len)
{
+ if (half->cached_header_buffer == NULL)
+ {
+ half->cached_header_buffer = http_buffer_new();
+ }
+ if (half->cached_header_buffer->buffer_size > HTTP_HEADERS_CACHE_MAX_SIZE)
+ {
+ return -1;
+ }
+ http_buffer_add(half->cached_header_buffer, newdata, newdata_len);
return 0;
}
- return http_decoder_table_has_parsed_header(data->table);
-}
-
-int http_decoder_half_data_get_raw_body(const struct http_decoder_half_data *data, const char **body, size_t *body_len)
-{
- if (NULL == data || NULL == body)
- {
- return -1;
- }
- return http_decoder_table_get_body(data->table, (char **)body, body_len);
-}
-#if 0
-int http_decoder_half_data_get_decompress_body(const struct http_decoder_half_data *data, hstring *body)
-{
- if (HTTP_CONTENT_ENCODING_NONE == data->content_encoding)
- {
- return http_decoder_table_get_body(data->table, body);
- }
-
- body->iov_base = data->ref_decompress_body;
- body->iov_len = data->decompress_body_len;
- return 0;
-}
-#endif
-
-void http_decoder_half_data_dump(struct http_decoder_half *half)
-{
- if (NULL == half || NULL == half->ref_data)
- {
- return;
- }
-
- http_decoder_table_dump(half->ref_data->table);
-}
-
-static void using_session_addr_as_host(struct session *ref_session, struct http_header_field *host_result, nmx_pool_t *mempool)
-{
-#if 1 // in native steallar, can't get the tuple4 from the session yet!!!
- struct httpd_session_addr ssaddr = {};
- httpd_session_get_addr(ref_session, &ssaddr);
- if (ssaddr.ipver != 4 && ssaddr.ipver != 6)
- {
- host_result->value = MEMPOOL_CALLOC(mempool, char, 1);
- sprintf((char *)host_result->value, "%s", "");
- host_result->value_len = strlen((char *)host_result->value);
- return;
- }
-
- char ip_string_buf[INET6_ADDRSTRLEN];
- if (4 == ssaddr.ipver)
- {
- host_result->value = MEMPOOL_CALLOC(mempool, char, (INET_ADDRSTRLEN + 7) /* "ip:port" max length */);
- inet_ntop(AF_INET, &ssaddr.daddr4, ip_string_buf, INET_ADDRSTRLEN);
- sprintf((char *)host_result->value, "%s:%u", ip_string_buf, ntohs(ssaddr.dport));
- host_result->value_len = strlen((char *)host_result->value);
- }
- else if (6 == ssaddr.ipver)
- {
- host_result->value = MEMPOOL_CALLOC(mempool, char, (INET6_ADDRSTRLEN + 7) /* "ip:port" max length */);
- inet_ntop(AF_INET6, &ssaddr.daddr6, ip_string_buf, INET6_ADDRSTRLEN);
- sprintf((char *)host_result->value, "%s:%u", ip_string_buf, ntohs(ssaddr.dport));
- host_result->value_len = strlen((char *)host_result->value);
- }
- else
- {
- assert(0);
- }
-#else
- host_result->val.iov_base = MEMPOOL_CALLOC(mempool, char, 32);
- sprintf((char *)host_result->val.iov_base, "%s", "todo:get_tuple4");
- host_result->val.iov_len = strlen((char *)host_result->val.iov_base);
-#endif
-}
-void http_decoder_join_url(struct http_decoder_half_data *hfdata, nmx_pool_t *mempool, const struct http_header_field *host_hdr)
-{
- int append_slash_len = 0;
- if ('/' != ((char *)hfdata->joint_url.iov_base)[0])
+ static int http_half_cache_uncompleted_header(struct http_half *half, int mem_merged, const char *data, size_t data_len)
{
- append_slash_len = 1;
- }
- int url_cache_str_len = host_hdr->value_len + hfdata->joint_url.iov_len + append_slash_len;
- char *url_cache_str = MEMPOOL_CALLOC(mempool, char, url_cache_str_len);
+ int ret = 0;
+ const char *cached_begin_ptr = NULL;
+ long long cached_len = 0;
+ if (half->half_stage.last_headers_complete_ptr == NULL)
+ {
+ if (mem_merged)
+ {
+ ret = 0; // no pipeline and memory have been merged, do nothing
+ }
+ else
+ {
+ ret = http_half_stage_buf_append(half, data, (size_t)data_len);
+ }
+ }
+ else
+ {
+ /*
+ has pipeline, cache begin ptr is the last completed header_field_value + "\r\n\r\n",
+ for example:
+ GET xxx \r\nheader_name:header:value\r\nheader_name:header:value\r\n\r\nGET yyy ...
+ ^
+ |
+ cache begin ptr
+ */
+ cached_begin_ptr = half->half_stage.last_headers_complete_ptr;
+ cached_len = (long long)data_len - (long long)(cached_begin_ptr - data);
+ assert(cached_len > 0);
- char *ptr = url_cache_str;
- memcpy(ptr, host_hdr->value, host_hdr->value_len);
- ptr += host_hdr->value_len;
- if (append_slash_len)
- {
- *ptr = '/';
- ptr++;
+ if (mem_merged)
+ {
+ /* some message have beed pushed, and pointer to half->half_stage->half_buffer,
+ the half->half_stage->half_buffer is maintain by messages ,
+ so, we can reset the half->half_stage->half_buffer, and append the new data to it.
+ */
+ http_half_stage_buf_reuse(half);
+ ret = http_half_stage_buf_append(half, cached_begin_ptr, (size_t)cached_len);
+ }
+ else
+ {
+ ret = http_half_stage_buf_append(half, cached_begin_ptr, (size_t)cached_len);
+ }
+ }
+ return ret;
}
- memcpy(ptr, hfdata->joint_url.iov_base, hfdata->joint_url.iov_len);
- MEMPOOL_FREE(mempool, hfdata->joint_url.iov_base); // free the cached uri buffer
- hfdata->joint_url.iov_base = url_cache_str;
- hfdata->joint_url.iov_len = url_cache_str_len;
- hfdata->joint_url_complete = 1;
-}
-
-void http_decoder_get_url(struct http_decoder_half_data *hfdata, nmx_pool_t *mempool)
-{
- struct http_request_line reqline = {};
- http_decoder_half_data_get_request_line(hfdata, &reqline);
- if (unlikely(http_strncasecmp_safe("CONNECT", (char *)reqline.method, 7, reqline.method_len) == 0))
+ int http_flow_parse(struct http_half *half, const char *data, size_t data_len)
{
- hfdata->joint_url.iov_base = MEMPOOL_CALLOC(mempool, char, reqline.uri_len + 1);
- memcpy(hfdata->joint_url.iov_base, reqline.uri, reqline.uri_len);
- hfdata->joint_url.iov_len = reqline.uri_len;
- hfdata->joint_url_complete = 1;
+ assert(half && data);
+ llhttp_errno_t ret = HPE_OK /* HPE_OK = 0 */;
+ ret = llhttp_execute(&half->parser.llhttp_parser, data, data_len);
+ switch (ret)
+ {
+ case HPE_OK:
+ break;
+ case HPE_PAUSED:
+ llhttp_resume(&half->parser.llhttp_parser);
+ break;
+ case HPE_PAUSED_UPGRADE:
+ llhttp_resume_after_upgrade(&half->parser.llhttp_parser);
+ break;
+ default:
+ break;
+ }
+ return ret;
}
-}
-int http_decoder_join_url_finally(struct http_event_context *ev_ctx, struct http_decoder_half_data *hfdata, nmx_pool_t *mempool)
-{
- if (hfdata->joint_url_complete)
+ struct http_half *http_flow_get_nx(struct http_exdata *exdata, enum flow_type flow_dir, struct session *sess, struct http *http_env)
{
- return 0;
- }
- struct http_header_field addr_as_host = {};
- using_session_addr_as_host(ev_ctx->ref_session, &addr_as_host, mempool);
- http_decoder_join_url(hfdata, mempool, &addr_as_host);
- MEMPOOL_FREE(mempool, addr_as_host.value); // free session addr to host buffer
- return 1;
-}
+ struct http_half **flow = NULL;
-void http_decoder_get_host_feed_url(struct http_decoder_half *half)
-{
- if (half->ref_data->joint_url_complete)
- {
- return;
- }
- struct http_header_field host_result = {};
- int host_header_cnt = http_decoder_half_data_get_header(half->ref_data, (char *)"Host", 4, &host_result);
- if (host_header_cnt < 0)
- {
- return;
+ if (FLOW_TYPE_C2S == flow_dir)
+ {
+ flow = &exdata->decoder->flow_c2s;
+ }
+ else if (FLOW_TYPE_S2C == flow_dir)
+ {
+ flow = &exdata->decoder->flow_s2c;
+ }
+ else
+ {
+ assert(0);
+ return NULL;
+ }
+ if (NULL == *flow)
+ {
+ *flow = http_half_new(flow_dir);
+ (*flow)->http_env_ref = http_env;
+ (*flow)->sess_ref = sess;
+ }
+ return *flow;
}
- http_decoder_join_url(half->ref_data, half->http_ev_ctx->ref_mempool, &host_result);
-}
-int http_half_data_get_url(struct http_decoder_half_data *res_data, const char **url_val, size_t *url_len)
-{
- if (0 == res_data->joint_url_complete)
- {
- return -1;
- }
- *url_val = res_data->joint_url.iov_base;
- *url_len = res_data->joint_url.iov_len;
- return 0;
-}
-#if 0
-int http_half_data_get_decode_url(struct http_decoder_half_data *res_data, hstring *url)
-{
- if (0 == res_data->joint_url_complete)
+ int http_half_flow_process(struct http_half *half, const char *newdata, size_t newdata_len)
{
- return -1;
- }
- url->iov_base = res_data->decoded_url.iov_base;
- url->iov_len = res_data->decoded_url.iov_len;
- return 0;
-}
-#endif
-
-int http_half_data_get_transaction_seq(struct http_decoder_half_data *hf_data)
-{
- return hf_data->transaction_index;
-}
-
-void http_half_data_update_commit_index(struct http_decoder_half_data *half_data)
-{
- http_decoder_table_update_commit_index(half_data->table);
-}
+ int ret = 0;
+ int mem_merged = 0;
+ const char *parse_data = newdata;
+ size_t parse_data_len = newdata_len;
-int http_half_data_get_total_parsed_header_count(struct http_decoder_half_data *half_data)
-{
- return http_decoder_table_get_total_parsed_header(half_data->table);
-}
-
-void http_half_pre_context_free(struct session *sess, struct http_decoder_exdata *exdata)
-{
- struct http_message *msg = NULL;
- struct http_decoder_half_data *res_data = NULL;
- struct http_decoder_result_queue *queue = NULL;
-
- if (exdata)
- {
- queue = exdata->queue;
- for (size_t i = 0; i < queue->queue_size; i++)
+ /* merge cached uncompleted header and current tcp payload first */
+ if (half->cached_header_buffer != NULL && half->cached_header_buffer->buffer != NULL)
{
- struct http_decoder_half_data *req_data = queue->array[i].req_data;
- res_data = queue->array[i].res_data;
- if ((req_data != NULL) && (NULL == res_data) && (req_data->state < HTTP_EVENT_REQ_END))
+ http_half_stage_buf_append(half, newdata, newdata_len);
+ parse_data = half->cached_header_buffer->buffer;
+ parse_data_len = half->cached_header_buffer->buffer_size;
+ mem_merged = 1;
+ if (parse_data_len > HTTP_HEADERS_CACHE_MAX_SIZE)
{
- msg = http_message_new(HTTP_TRANSACTION_END, queue, i, HTTP_REQUEST);
- session_mq_publish_message(sess, exdata->pub_topic_id, msg);
+ STELLAR_LOG_FATAL(half->http_env_ref->logger_ref, HTTP_MODULE_NAME,
+ "sess: %s, headers buf len more than %d", session_get_readable_addr(half->sess_ref), HTTP_HEADERS_CACHE_MAX_SIZE);
+ return -1;
}
}
+ ret = http_flow_parse(half, parse_data, parse_data_len);
+ if (ret != HPE_OK)
+ {
+ STELLAR_LOG_FATAL(half->http_env_ref->logger_ref, HTTP_MODULE_NAME,
+ "llhttp parser error: %d, %s. sess: %s", ret, llhttp_errno_name(ret), session_get_readable_addr(half->sess_ref));
+ return -1;
+ }
- for (size_t i = 0; i < queue->queue_size; i++)
+ if (half->half_stage.llhttp_last_stage >= LLHTTP_STAGE_MESSAGE_BEGIN &&
+ half->half_stage.llhttp_last_stage < LLHTTP_STAGE_HEADERS_COMPLETE) /* some headers not completed */
{
- res_data = queue->array[i].res_data;
- if ((res_data != NULL) && (res_data->state < HTTP_EVENT_RES_END))
+ STELLAR_LOG_DEBUG(half->http_env_ref->logger_ref, HTTP_MODULE_NAME,
+ "sess: %s, header not completed, need caching, %.*s",
+ session_get_readable_addr(half->sess_ref), parse_data_len > 16 ? 16 : parse_data_len, parse_data);
+ if (http_half_cache_uncompleted_header(half, mem_merged, parse_data, parse_data_len) < 0)
{
- msg = http_message_new(HTTP_TRANSACTION_END, queue, i, HTTP_RESPONSE);
- session_mq_publish_message(sess, exdata->pub_topic_id, msg);
+ STELLAR_LOG_FATAL(half->http_env_ref->logger_ref, HTTP_MODULE_NAME,
+ "sess: %s, headers buf len more than %d", session_get_readable_addr(half->sess_ref), HTTP_HEADERS_CACHE_MAX_SIZE);
+ return -1;
}
+
+ /* uncompleted_header, reset llhttp state and all headers */
+ http_half_stage_reset(half);
+ llhttp_reset(&half->parser.llhttp_parser);
+ return 0;
+ }
+ else
+ {
+ /* ownership is maintained by header message, free it here (reference-- actually) */
+ http_half_stage_buf_free(half);
}
+ return 0;
}
-}
-void http_half_update_state(struct http_decoder_half_data *hf_data, enum http_event state)
-{
- hf_data->state = state;
+#ifdef __cplusplus
}
-
-void http_half_get_max_transaction_seq(struct http_decoder_exdata *exdata, long long *max_req_seq, long long *max_res_seq)
-{
- assert(exdata && max_req_seq && max_res_seq);
- *max_req_seq = exdata->decoder->c2s_half->transaction_seq;
- *max_res_seq = exdata->decoder->s2c_half->transaction_seq;
-}
-
-enum http_content_encoding http_half_data_get_content_encoding(struct http_decoder_half_data *hf_data)
-{
- if (NULL == hf_data)
- {
- return HTTP_CONTENT_ENCODING_NONE;
- }
- return hf_data->content_encoding;
-} \ No newline at end of file
+#endif