diff options
| author | lijia <[email protected]> | 2024-10-27 18:08:00 +0800 |
|---|---|---|
| committer | lijia <[email protected]> | 2024-11-08 11:23:16 +0800 |
| commit | 627cfac992c52e3c7950355c0d447764056a5276 (patch) | |
| tree | afb5f8f462d964a764dbb071f5cfadad199cbe4d /decoders/http/http_decoder_half.c | |
| parent | d0a868591470a4a9d71a65a5d540058e72c8d92c (diff) | |
httpv2.0 rebase onto develop-2.0dev-http-v2.0
Diffstat (limited to 'decoders/http/http_decoder_half.c')
| -rw-r--r-- | decoders/http/http_decoder_half.c | 1548 |
1 files changed, 525 insertions, 1023 deletions
diff --git a/decoders/http/http_decoder_half.c b/decoders/http/http_decoder_half.c index 70991c6..84874a3 100644 --- a/decoders/http/http_decoder_half.c +++ b/decoders/http/http_decoder_half.c @@ -2,1187 +2,689 @@ #include <stdio.h> #include <string.h> #include <arpa/inet.h> -#include "http_decoder_private.h" +#include "http_decoder.h" +#include "http_decoder_half.h" +#include "http_decoder_utils.h" +#include "http_decoder_decompress.h" #include "llhttp.h" +#include "stellar/session.h" #include "uthash/utlist.h" +#include "uthash/utarray.h" -struct http_decompress_buffer +#ifdef __cplusplus +extern "C" { - struct iovec iov; - char is_commit; - struct http_decompress_buffer *next, *prev; -}; - -struct http_decoder_half_data -{ - struct http_decoder_table *table; - - int major_version; - int minor_version; - int status_code; - - enum http_event state; - - enum http_content_encoding content_encoding; - struct http_content_decompress *decompress; -#if 0 - char *ref_decompress_body; - size_t decompress_body_len; -#else - struct http_decompress_buffer *decompress_buffer_list; -#endif - int joint_url_complete; - int url_is_encoded; - // http://<host>[:<port>]/<path>?<searchpart> - hstring joint_url; - hstring decoded_url; - long long transaction_index; -}; - -struct http_decoder_half -{ - llhttp_t parser; - llhttp_settings_t settings; - enum llhttp_errno error; - int decompress_switch; - struct http_decoder_env *httpd_env; - - // uint8_t is_request_flow; - enum http_event event; - http_event_cb *http_ev_cb; - struct http_event_context *http_ev_ctx; - - struct http_decoder_half_data *ref_data; - - long long trans_counter; - long long err_counter; - long long transaction_seq; // accumulated - - const char *data; - int data_len; -}; - -// #define HTTP_DECODER_DEBUG -#ifdef HTTP_DECODER_DEBUG -static void printf_debug_info(const char *desc, const char *at, size_t length) -{ - if (at) - { - char *temp = http_safe_dup(at, length); - printf("HTTP PARSER STAGE: %s: %s\n", desc, temp); - FREE(temp); - } - else - { - printf("HTTP PARSER STAGE: %s\n", desc); - } -} -#else -#define printf_debug_info(desc, at, length) #endif -void http_half_decompress_buffer_free(struct http_decoder_half_data *data, hstring *decompress_body) -{ - struct http_decompress_buffer *el, *tmp; - DL_FOREACH_SAFE(data->decompress_buffer_list, el, tmp) + static inline void http_half_stage_reset_flow_data(struct http_half *half) { - if (el->iov.iov_base == decompress_body->iov_base && el->iov.iov_len == decompress_body->iov_len) + if (half->flow_data) { - DL_DELETE(data->decompress_buffer_list, el); - if (el->iov.iov_base) + memset(&half->flow_data->req_line, 0, sizeof(struct http_request_line)); + memset(&half->flow_data->status_line, 0, sizeof(struct http_status_line)); + memset(&half->flow_data->header, 0, sizeof(struct http_header)); + if (half->flow_data->ut_filed_array) { - FREE(el->iov.iov_base); + utarray_clear(half->flow_data->ut_filed_array); } - FREE(el); - break; } } -} - -void http_half_get_lastest_decompress_buffer(struct http_decoder_half_data *data, hstring *decompress_body) -{ - if (data->content_encoding == HTTP_CONTENT_ENCODING_NONE) - { - return; - } - if (data->decompress_buffer_list == NULL) - { - decompress_body->iov_base = NULL; - decompress_body->iov_len = 0; - return; - } - if (data->decompress_buffer_list->prev->is_commit == 1) + static inline void http_half_stage_reset_stat(struct http_half *half) { - decompress_body->iov_base = NULL; - decompress_body->iov_len = 0; - return; + memset(&half->half_stage, 0, sizeof(struct http_half_llhttp_stage)); } - decompress_body->iov_base = data->decompress_buffer_list->prev->iov.iov_base; - decompress_body->iov_len = data->decompress_buffer_list->prev->iov.iov_len; - data->decompress_buffer_list->prev->is_commit = 1; -} - -static void http_decoder_half_data_decompress(struct http_decoder_half_data *data) -{ - assert(data); - - if (data->content_encoding == HTTP_CONTENT_ENCODING_NONE) + static void http_half_stage_reset(struct http_half *half) { - return; + http_half_stage_reset_stat(half); + http_half_stage_reset_flow_data(half); } - hstring raw_body = {}; - http_decoder_table_get_body(data->table, (char **)&raw_body.iov_base, &raw_body.iov_len); - if (raw_body.iov_base == NULL || raw_body.iov_len == 0) + void http_flow_append_header_filed(struct http_half_data *flow_data, const char *at, size_t length) { - return; + if (NULL == flow_data->ut_filed_array) // the first field + { + UT_icd header_icd = {sizeof(struct http_header_field), NULL, NULL, NULL}; + utarray_new(flow_data->ut_filed_array, &header_icd); + assert(flow_data->ut_filed_array != NULL); + utarray_reserve(flow_data->ut_filed_array, HTTP_HEADER_FIELD_RESERVE_NUM); + } + struct http_header_field new_filed = {}; + new_filed.field_name = (char *)at; + new_filed.field_name_len = length; + utarray_push_back(flow_data->ut_filed_array, &new_filed); } - if (NULL == data->decompress) + void http_flow_append_header_value(struct http_half_data *flow_data, const char *at, size_t length) { - data->decompress = http_content_decompress_create(data->content_encoding); + assert(flow_data->ut_filed_array != NULL); + struct http_header_field *last_field = (struct http_header_field *)utarray_back(flow_data->ut_filed_array); + assert(last_field != NULL); + if (last_field) + { + last_field->field_value = (char *)at; + last_field->field_value_len = length; + } } - assert(data->decompress); - char *local_outdata = NULL; - size_t local_outdata_len = 0; - if (http_content_decompress_write(data->decompress, (char *)raw_body.iov_base, - raw_body.iov_len, - &local_outdata, - &local_outdata_len) == -1) + static struct http_message *http_produce_message(struct session *sess, enum http_event event, + enum http_topic_type topic_type, struct http_half_data *flow_data) { - // log error - http_content_decompress_destroy(data->decompress); - data->decompress = NULL; - return; + struct http_message *msg = (struct http_message *)calloc(1, sizeof(struct http_message)); + msg->sess_ref = sess; + msg->topic_type = topic_type; + msg->flow_data = flow_data; + msg->event = event; + return msg; } - if (local_outdata != NULL && local_outdata_len > 0) + static struct http_half_data *http_half_data_new(enum flow_type flow_dir) { - struct http_decompress_buffer *decompress_buffer = CALLOC(struct http_decompress_buffer, 1); - assert(decompress_buffer); - decompress_buffer->iov.iov_base = local_outdata; - decompress_buffer->iov.iov_len = local_outdata_len; - DL_APPEND(data->decompress_buffer_list, decompress_buffer); - http_content_decompress_ownership_borrow(data->decompress); + struct http_half_data *half_data = (struct http_half_data *)calloc(1, sizeof(struct http_half_data)); + half_data->flow_dir = flow_dir; + /* update transaction seq in http_half_parse_headers_finally(), not here */ + return half_data; } -} - -/* Possible return values 0, -1, `HPE_PAUSED` */ -static int on_message_begin(llhttp_t *http) -{ - printf_debug_info("on_message_begin", NULL, 0); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - if (half->parser.type == HTTP_REQUEST) - { - half->event = HTTP_EVENT_REQ_INIT; - } - else + void http_half_data_free(struct http_half_data *half_data) { - half->event = HTTP_EVENT_RES_INIT; + if (NULL == half_data) + { + return; + } + if (half_data->ut_filed_array) + { + utarray_free(half_data->ut_filed_array); + half_data->ut_filed_array = NULL; + } + if (half_data->joint_url.iov_base) + { + FREE(half_data->joint_url.iov_base); + } + if (half_data->decompress) + { + http_content_decompress_destroy(half_data->decompress); + half_data->decompress = NULL; + } + free(half_data); } - half->ref_data = NULL; - - assert(half->http_ev_cb != NULL); - half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler() - - half->trans_counter++; - half->ref_data->transaction_index = half->transaction_seq++; - return 0; -} - -static int on_message_complete(llhttp_t *http) -{ - printf_debug_info("on_message_complete", NULL, 0); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - - if (half->parser.type == HTTP_REQUEST) + void http_half_free(struct http_half *half) { - if (half->event == HTTP_EVENT_REQ_BODY_DATA) + if (NULL == half) { - half->event = HTTP_EVENT_REQ_BODY_END; - half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); + return; } - } - else - { - if (half->event == HTTP_EVENT_RES_BODY_DATA) + llhttp_reset(&half->parser.llhttp_parser); + http_half_data_free(half->flow_data); + if (half->cached_header_buffer) { - half->event = HTTP_EVENT_RES_BODY_END; - half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); + if (half->cached_header_buffer->buffer) + { + FREE(half->cached_header_buffer->buffer); + } + FREE(half->cached_header_buffer); } + free(half); } - // trigger req_end/res_end - if (half->parser.type == HTTP_REQUEST) - { - half->event = HTTP_EVENT_REQ_END; - half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); - } - else + struct http_half *http_half_new(enum flow_type flow_dir) { - half->event = HTTP_EVENT_RES_END; - half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); + struct http_half *half = (struct http_half *)calloc(1, sizeof(struct http_half)); + if (FLOW_TYPE_C2S == flow_dir) + { + http_flow_parser_init(&half->parser, HTTP_REQUEST); + } + else + { + http_flow_parser_init(&half->parser, HTTP_RESPONSE); + } + half->parser.half_ref = half; + return half; } - return 0; -} - -static int on_reset(llhttp_t *http __attribute__((unused))) -{ - printf_debug_info("on_reset", NULL, 0); - - return 0; -} - -static inline int is_line_crlf(struct http_decoder_half *half) -{ - const char *chr_r = (char *)memrchr(half->data, '\r', half->data_len); - const char *chr_n = (char *)memrchr(half->data, '\n', half->data_len); - if (chr_r && chr_n && (chr_r + 1 == chr_n)) + static struct http_header_field *http_flow_get_header_field(struct http_half_data *flow_data, const char *field_name, size_t field_name_len) { - return 1; + struct http_header_field *p; + if (NULL == flow_data->ut_filed_array) // some response no header fileds, such as "HTTP/1.1 100 Continue\r\n\r\n" + { + return NULL; + } + for (p = (struct http_header_field *)utarray_front(flow_data->ut_filed_array); + p != NULL; + p = (struct http_header_field *)utarray_next(flow_data->ut_filed_array, p)) + { + if ((field_name_len == p->field_name_len) && + (strncasecmp(p->field_name, field_name, field_name_len) == 0)) + { + return p; + } + } + return NULL; } - return 0; -} - -static int on_method(llhttp_t *http, const char *at, size_t length) -{ - printf_debug_info("on_method", at, length); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - - http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_METHOD, at, length); - return 0; -} - -/* Information-only callbacks, return value is ignored */ -static int on_method_complete(llhttp_t *http) -{ - printf_debug_info("on_method_complete", NULL, 0); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - if (is_line_crlf(half) == 0) + static void http_using_session_addr_as_host(struct session *ref_session, struct http_header_field *host_result) { - http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_METHOD); - } - - http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_METHOD); - - return 0; -} - -/* Possible return values 0, -1, HPE_USER */ -static int on_uri(llhttp_t *http, const char *at, size_t length) -{ - printf_debug_info("on_uri", at, length); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - - http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_URI, at, length); - return 0; -} - -static void http_decoder_cached_portion_url(struct http_decoder_half *half, const hstring *uri_result) -{ - struct http_decoder_half_data *ref_data = half->ref_data; - int uri_skip_len = 0; + memset(host_result, 0, sizeof(struct http_header_field)); + struct http_session_addr ssaddr = {}; + httpd_session_get_addr(ref_session, &ssaddr); + char ip_string_buf[INET6_ADDRSTRLEN]; - if ((uri_result->iov_len) > 7 && (strncasecmp("http://", (char *)uri_result->iov_base, 7) == 0)) // absolute URI - { - uri_skip_len = strlen("http://"); - ref_data->joint_url_complete = 1; - } - else - { - ref_data->joint_url_complete = 0; + if (4 == ssaddr.ipver) + { + host_result->field_value = (char *)calloc(1, (INET6_ADDRSTRLEN + 7) /* "ip:port" max length */); + inet_ntop(AF_INET, &ssaddr.daddr4, ip_string_buf, INET6_ADDRSTRLEN); + snprintf((char *)host_result->field_value, INET6_ADDRSTRLEN + 7, "%s:%u", ip_string_buf, ntohs(ssaddr.dport)); + host_result->field_value_len = strlen(host_result->field_value); + } + else if (6 == ssaddr.ipver) + { + host_result->field_value = (char *)calloc(1, (INET6_ADDRSTRLEN + 7) /* "ip:port" max length */); + inet_ntop(AF_INET6, &ssaddr.daddr6, ip_string_buf, INET6_ADDRSTRLEN); + snprintf((char *)host_result->field_value, INET6_ADDRSTRLEN + 7, "%s:%u", ip_string_buf, ntohs(ssaddr.dport)); + host_result->field_value_len = strlen(host_result->field_value); + } + else + { + host_result->field_value = (char *)calloc(1, 1); + host_result->field_value_len = 1; + } + return; } - ref_data->joint_url.iov_len = uri_result->iov_len - uri_skip_len; - ref_data->joint_url.iov_base = MEMPOOL_CALLOC(half->http_ev_ctx->ref_mempool, char, ref_data->joint_url.iov_len); - memcpy(ref_data->joint_url.iov_base, (char *)uri_result->iov_base + uri_skip_len, ref_data->joint_url.iov_len); -} - -/* Information-only callbacks, return value is ignored */ -static int on_uri_complete(llhttp_t *http) -{ - printf_debug_info("on_uri_complete", NULL, 0); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - - if (is_line_crlf(half) == 0) + static void http_flow_join_url(struct http_half_data *flow_data, const struct http_header_field *host_filed) { - http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_URI); - } - - http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_URI); - - hstring uri_result = {}; - http_decoder_table_get_uri(half->ref_data->table, (char **)&uri_result.iov_base, &uri_result.iov_len); - assert(uri_result.iov_base); - http_decoder_cached_portion_url(half, &uri_result); - - return 0; -} + int append_slash_len = 0; + const char *join_uri = flow_data->req_line.uri; + size_t join_url_len = flow_data->req_line.uri_len; -/* Possible return values 0, -1, HPE_USER */ -static int on_version(llhttp_t *http, const char *at, size_t length) -{ - printf_debug_info("on_version", at, length); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - - http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_VERSION, at, length); - return 0; -} + // skip schema "http://" + if (join_url_len > 7 && strncasecmp(join_uri, "http://", 7) == 0) + { + join_uri += 7; + join_url_len -= 7; + } -/* Information-only callbacks, return value is ignored */ -static int on_version_complete(llhttp_t *http) -{ - printf_debug_info("on_version_complete", NULL, 0); + // if uri not absolute path(start with '/'), append '/' + if ('/' != join_uri[0]) + { + append_slash_len = 1; + } + int url_cache_str_len = host_filed->field_value_len + join_url_len + append_slash_len; + char *url_cache_str = (char *)calloc(1, url_cache_str_len); - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); + char *ptr = url_cache_str; + memcpy(ptr, host_filed->field_value, host_filed->field_value_len); + ptr += host_filed->field_value_len; + if (append_slash_len) + { + *ptr = '/'; + ptr++; + } + memcpy(ptr, join_uri, join_url_len); - if (is_line_crlf(half) == 0) - { - http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_VERSION); + flow_data->joint_url.iov_base = url_cache_str; + flow_data->joint_url.iov_len = url_cache_str_len; } - http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_VERSION); - - half->ref_data->major_version = llhttp_get_http_major(&half->parser); - half->ref_data->minor_version = llhttp_get_http_minor(&half->parser); - - if (half->parser.type == HTTP_REQUEST) + // todo: optimize use hash table + static void http_half_get_frequently_used_fileds(struct http_half_data *flow_data) { - half->event = HTTP_EVENT_REQ_LINE; - if (half->http_ev_cb) // http_event_handler() + if (FLOW_TYPE_C2S == flow_data->flow_dir) + { + flow_data->header.host = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_HOST, strlen(HTTP_HEADER_FIELD_NAME_HOST)); + flow_data->header.user_agent = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_USER_AGENT, strlen(HTTP_HEADER_FIELD_NAME_USER_AGENT)); + flow_data->header.referer = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_REFERER, strlen(HTTP_HEADER_FIELD_NAME_REFERER)); + flow_data->header.cookie = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_COOKIE, strlen(HTTP_HEADER_FIELD_NAME_COOKIE)); + } + else { - half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); + flow_data->header.set_cookie = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_SET_COOKIE, strlen(HTTP_HEADER_FIELD_NAME_SET_COOKIE)); } + flow_data->header.content_type = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_CONTENT_TYPE, strlen(HTTP_HEADER_FIELD_NAME_CONTENT_TYPE)); } - return 0; -} - -/* Possible return values 0, -1, HPE_USER */ -static int on_status(llhttp_t *http, const char *at, size_t length) -{ - printf_debug_info("on_status", at, length); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - - http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_STATUS, at, length); - return 0; -} - -/* Information-only callbacks, return value is ignored */ -static int on_status_complete(llhttp_t *http) -{ - printf_debug_info("on_status_complete", NULL, 0); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - - if (is_line_crlf(half) == 0) + static int http_half_parse_headers_finally(struct http_half *half) { - http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_STATUS); - } + struct http_half_data *flow_data = half->flow_data; + http_half_get_frequently_used_fileds(flow_data); - http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_STATUS); - half->ref_data->status_code = llhttp_get_status_code(&half->parser); + /* update transaction_seq when headers completed */ + half->flow_data->transaction_seq = half->transaction_num++; - if (half->parser.type == HTTP_RESPONSE) - { - half->event = HTTP_EVENT_RES_LINE; - if (half->http_ev_cb != NULL) // http_event_handler() + if (FLOW_TYPE_C2S == flow_data->flow_dir) { - half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); + struct http_header_field tmp_host_field = {}; + const struct http_header_field *host_filed = flow_data->header.host; + if (NULL == host_filed) + { + http_using_session_addr_as_host(half->sess_ref, &tmp_host_field); + host_filed = &tmp_host_field; + } + http_flow_join_url(flow_data, host_filed); + FREE(tmp_host_field.field_name); + FREE(tmp_host_field.field_value); } - } - return 0; -} - -/* Possible return values 0, -1, HPE_USER */ -static int on_header_field(llhttp_t *http, const char *at, size_t length) -{ - printf_debug_info("on_header_field", at, length); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - - http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_HDRKEY, at, length); - return 0; -} - -/* Information-only callbacks, return value is ignored */ -static int on_header_field_complete(llhttp_t *http) -{ - printf_debug_info("on_header_field_complete", NULL, 0); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - - http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_HDRKEY); - - return 0; -} - -/* Possible return values 0, -1, HPE_USER */ -static int on_header_value(llhttp_t *http, const char *at, size_t length) -{ - printf_debug_info("on_header_value", at, length); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - - http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_HDRVAL, at, length); - return 0; -} - -#define MAX_ENCODING_STR_LEN 8 -/* Information-only callbacks, return value is ignored */ -static int on_header_value_complete(llhttp_t *http) -{ - printf_debug_info("on_header_value_complete", NULL, 0); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - - if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_HDRKEY) == - STRING_STATE_CACHE) - { - http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_HDRKEY); - } - - http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_HDRVAL); + struct http_header_field *encoding_field = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_CONTENT_ENCODING, strlen(HTTP_HEADER_FIELD_NAME_CONTENT_ENCODING)); + if (NULL == encoding_field) + { + flow_data->content_encoding_type = HTTP_CONTENT_ENCODING_NONE; + } + else + { + flow_data->content_encoding_type = http_content_encoding_str2int(encoding_field->field_value, encoding_field->field_value_len); + } - if (half->ref_data->content_encoding == HTTP_CONTENT_ENCODING_NONE) - { - struct http_header_field http_hdr = {}; + struct http_header_field *content_len_field = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_CONTENT_LENGTH, strlen(HTTP_HEADER_FIELD_NAME_CONTENT_LENGTH)); + if (NULL == content_len_field) + { + flow_data->header.content_length = -1; + } + else + { + flow_data->header.content_length = http_strtoll(content_len_field->field_value, content_len_field->field_value_len); + } - if (http_decoder_table_get_header(half->ref_data->table, (char *)"Content-Encoding", 16, &http_hdr) == 0) + struct http_header_field *transf_encoding_field = http_flow_get_header_field(flow_data, HTTP_HEADER_FIELD_NAME_TRANSFER_ENCODING, strlen(HTTP_HEADER_FIELD_NAME_TRANSFER_ENCODING)); + if (NULL == transf_encoding_field) { - half->ref_data->content_encoding = http_content_encoding_str2int(http_hdr.value, http_hdr.value_len); + flow_data->transfer_encoding_is_chunked = -1; } - } + else + { + if (http_strncasecmp_safe(transf_encoding_field->field_value, "chunked", transf_encoding_field->field_value_len, strlen("chunked")) == 0) + { + flow_data->transfer_encoding_is_chunked = 1; + } + else + { + flow_data->transfer_encoding_is_chunked = 0; + } + } + flow_data->header.header_buf = half->half_stage.first_header_name_ptr; + flow_data->header.header_buf_sz = half->half_stage.last_headers_complete_ptr - half->half_stage.first_header_name_ptr; - if (http->type == HTTP_REQUEST) - { - http_decoder_get_host_feed_url(half); + // todo, parse other frequently used fields + return 0; } - return 0; -} - -/* When on_chunk_header is called, the current chunk length is stored - * in parser->content_length. - * Possible return values 0, -1, `HPE_PAUSED` - */ -static int on_chunk_header(llhttp_t *http __attribute__((unused))) -{ - printf_debug_info("on_chunk_header", NULL, 0); - return 0; -} - -/* When on_chunk_header is called, the current chunk length is stored - * in parser->content_length. - * Possible return values 0, -1, `HPE_PAUSED` - */ -static int on_chunk_header_complete(llhttp_t *http __attribute__((unused))) -{ - printf_debug_info("on_chunk_header_complete", NULL, 0); - - return 0; -} - -/* Possible return values: - * 0 - Proceed normally - * 1 - Assume that request/response has no body, and proceed to parsing the next message - * 2 - Assume absence of body (as above) and make `llhttp_execute()` return `HPE_PAUSED_UPGRADE` - * -1 - Error `HPE_PAUSED` - */ -static int on_headers_complete(llhttp_t *http) -{ - printf_debug_info("on_headers_complete", NULL, 0); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - assert(half->ref_data); - - http_decoder_table_set_header_complete(half->ref_data->table); - - if (half->parser.type == HTTP_REQUEST) - { - half->event = HTTP_EVENT_REQ_HDR_END; - } - else + int http_get_header_field_count(struct http_half_data *flow_data) { - half->event = HTTP_EVENT_RES_HDR_END; + if (NULL == flow_data->ut_filed_array) + { + return 0; // some response no header fileds, such as "HTTP/1.1 100 Continue\r\n\r\n" + } + return utarray_len(flow_data->ut_filed_array); } - half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler() - - return 0; -} -/* Possible return values 0, -1, HPE_USER */ -static int on_body(llhttp_t *http, const char *at, size_t length) -{ - printf_debug_info("on_body", at, length); - - struct http_decoder_half *half = container_of(http, struct http_decoder_half, parser); - assert(half); - - // trigger body_begin event - if (half->parser.type == HTTP_REQUEST) + void http_flow_body_decompress(struct http_half_data *flow_data, const char *zipdata, size_t zipdatalen) { - if (half->event == HTTP_EVENT_REQ_HDR_END) + assert(flow_data); + if (flow_data->content_encoding_type == HTTP_CONTENT_ENCODING_NONE) { - half->event = HTTP_EVENT_REQ_BODY_BEGIN; - half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler() + return; } - } - else - { - if (half->event == HTTP_EVENT_RES_HDR_END) + if (zipdata == NULL || zipdatalen == 0) { - half->event = HTTP_EVENT_RES_BODY_BEGIN; - half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); + return; } - } - - if (half->ref_data != NULL) - { - if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_BODY) == - STRING_STATE_COMMIT) + if (NULL == flow_data->decompress) { - http_decoder_table_reset(half->ref_data->table, HTTP_ITEM_BODY); + flow_data->decompress = http_content_decompress_create(flow_data->content_encoding_type); } - http_decoder_table_refer(half->ref_data->table, HTTP_ITEM_BODY, at, length); - http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_BODY); - } + char *local_outdata = NULL; + size_t local_outdata_len = 0; + if (http_content_decompress_write(flow_data->decompress, (char *)zipdata, + zipdatalen, &local_outdata, &local_outdata_len) == -1) + { + http_content_decompress_destroy(flow_data->decompress); + flow_data->decompress = NULL; + flow_data->decompress_body.body = NULL; + flow_data->decompress_body.body_sz = 0; + return; + } - if (1 == half->decompress_switch && half->ref_data->content_encoding != HTTP_CONTENT_ENCODING_NONE) - { - http_decoder_half_data_decompress(half->ref_data); + if (local_outdata != NULL && local_outdata_len > 0) + { + flow_data->decompress_body.body = local_outdata; + flow_data->decompress_body.body_sz = local_outdata_len; + http_content_decompress_ownership_borrow(flow_data->decompress); + } } - if (half->parser.type == HTTP_REQUEST) - { - half->event = HTTP_EVENT_REQ_BODY_DATA; - half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler() - } - else + static void http_add_header_buf_reference(struct http_half *half) { - half->event = HTTP_EVENT_RES_BODY_DATA; - half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); - } - - return 0; -} - -static void http_decoder_half_init(struct http_decoder_half *half, http_event_cb *http_ev_cb, enum llhttp_type type) -{ - llhttp_settings_init(&half->settings); - llhttp_init(&half->parser, type, &half->settings); - - // half->is_request_flow = (type == HTTP_REQUEST) ? 1 : 0; - half->settings.on_message_begin = on_message_begin; - half->settings.on_message_complete = on_message_complete; - half->settings.on_reset = on_reset; - - half->settings.on_url = on_uri; - half->settings.on_url_complete = on_uri_complete; - - half->settings.on_status = on_status; - half->settings.on_status_complete = on_status_complete; - - half->settings.on_method = on_method; - half->settings.on_method_complete = on_method_complete; - - half->settings.on_version = on_version; - half->settings.on_version_complete = on_version_complete; - - half->settings.on_header_field = on_header_field; - half->settings.on_header_field_complete = on_header_field_complete; - - half->settings.on_header_value = on_header_value; - half->settings.on_header_value_complete = on_header_value_complete; - - half->settings.on_chunk_header = on_chunk_header; - half->settings.on_chunk_complete = on_chunk_header_complete; - - half->settings.on_headers_complete = on_headers_complete; - half->settings.on_body = on_body; - - half->error = HPE_OK; - half->http_ev_cb = http_ev_cb; // http_event_handler() - half->ref_data = NULL; -} - -struct http_decoder_half *http_decoder_half_new(struct http_decoder_exdata *hd_ctx, nmx_pool_t *mempool, - http_event_cb *ev_cb, enum llhttp_type http_type, - int decompress_switch, struct http_decoder_env *httpd_env, long long start_seq) -{ - struct http_decoder_half *half = MEMPOOL_CALLOC(mempool, struct http_decoder_half, 1); - assert(half); - - half->decompress_switch = decompress_switch; - half->http_ev_ctx = MEMPOOL_CALLOC(mempool, struct http_event_context, 1); - http_decoder_half_init(half, ev_cb, http_type); - half->http_ev_ctx->ref_httpd_ctx = hd_ctx; - half->httpd_env = httpd_env; - half->transaction_seq = start_seq; - return half; -} - -void http_decoder_half_free(nmx_pool_t *mempool, struct http_decoder_half *half) -{ - if (NULL == half) - { - return; + if (half->cached_header_buffer != NULL) + { + // add reference but not move ownership, because the buffer will be referenced of multiple transaction in pipeline mode + half->cached_header_buffer->reference++; + half->flow_data->cached_header_buffer = half->cached_header_buffer; + } } - if (half->http_ev_ctx != NULL) + static void http_update_body_offset(struct http_half_data *flow_data) { - MEMPOOL_FREE(mempool, half->http_ev_ctx); - half->http_ev_ctx = NULL; + if (flow_data->decompress_body.body != NULL || flow_data->decompress_body.offset > 0) + { + flow_data->decompress_body.offset += flow_data->decompress_body.body_sz; + } + else + { + flow_data->raw_body.offset += flow_data->raw_body.body_sz; + } } - MEMPOOL_FREE(mempool, half); -} - -void http_decoder_half_reinit(struct http_decoder_half *half, - struct http_decoder_result_queue *queue, - nmx_pool_t *mempool, struct session *sess) -{ - assert(half != NULL); - if (half->ref_data != NULL) + static void http_set_body_finish(struct http_half_data *flow_data) { - http_decoder_table_reinit(half->ref_data->table); + if (flow_data->decompress_body.offset > 0) + { + flow_data->decompress_body.body = NULL; + flow_data->decompress_body.body_sz = 0; + flow_data->decompress_body.is_finished = 1; + } + else + { + flow_data->raw_body.body = NULL; + flow_data->raw_body.body_sz = 0; + flow_data->raw_body.is_finished = 1; + } } - half->http_ev_ctx->ref_mempool = mempool; - half->http_ev_ctx->ref_session = sess; - half->http_ev_ctx->ref_queue = queue; -} -static void publish_message_for_parsed_header(struct http_decoder_half *half) -{ - if (0 == http_decoder_table_has_parsed_header(half->ref_data->table)) - { - return; - } - if (half->parser.type == HTTP_REQUEST) + void http_event_handler(struct http_half *half, enum http_event event, struct http *http_env) { - half->event = HTTP_EVENT_REQ_HDR; - } - else - { - half->event = HTTP_EVENT_RES_HDR; - } - half->http_ev_cb(half->event, &half->ref_data, half->http_ev_ctx, half->httpd_env); // http_event_handler(); - return; -} + struct http_message *msg = NULL; + int thread_id = module_manager_get_thread_id(http_env->mod_mgr_ref); + uint8_t flow_flag = 0; + struct mq_runtime *mq_rt = module_manager_get_mq_runtime(http_env->mod_mgr_ref); -int http_decoder_half_parse(int proxy_enable, struct http_decoder_half *half, const char *data, size_t data_len) -{ - assert(half && data); - - half->data = (const char *)data; - half->data_len = data_len; - half->error = llhttp_execute(&half->parser, data, data_len); - - int ret = 0; - enum llhttp_type type = HTTP_BOTH; + switch (event) + { + case HTTP_EVENT_REQ_INIT: + http_half_stage_reset(half); + if (NULL == half->flow_data) /* call llhttp_reset when headers is not completed */ + { + half->flow_data = http_half_data_new(FLOW_TYPE_C2S); + } + break; - switch (half->error) - { - case HPE_OK: - break; - case HPE_PAUSED: - llhttp_resume(&half->parser); - break; - case HPE_PAUSED_UPGRADE: - if (proxy_enable) + case HTTP_EVENT_REQ_HDR_END: { - llhttp_resume_after_upgrade(&half->parser); + http_half_parse_headers_finally(half); + int tot_c2s_headers = http_get_header_field_count(half->flow_data); + http_stat_update(&http_env->stat, thread_id, HTTP_TRANSACTION_NEW, 1); + http_stat_update(&http_env->stat, thread_id, HTTP_C2S_HEADERS, tot_c2s_headers); + http_stat_update(&http_env->stat, thread_id, HTTP_URL_BYTES, half->flow_data->joint_url.iov_len); + http_add_header_buf_reference(half); + msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_REQ_HEADER, half->flow_data); + mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_REQ_HEADER].topic_id, msg); } - ret = 0; break; - default: - type = (enum llhttp_type)half->parser.type; - llhttp_init(&half->parser, type, &half->settings); - ret = -1; - break; - } - - if (ret < 0) - { - // fprintf(stdout, - // "llhttp_execute parse error: %s err_reason:%s\n", - // llhttp_errno_name(half->error), half->parser.reason); - return half->error; - } - - if (half->ref_data != NULL) - { - if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_URI) == STRING_STATE_REFER) + case HTTP_EVENT_REQ_BODY_BEGIN: + break; + case HTTP_EVENT_REQ_BODY_DATA: { - http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_URI); + http_update_body_offset(half->flow_data); + msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_REQ_BODY, half->flow_data); + mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_REQ_BODY].topic_id, msg); } - - if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_STATUS) == STRING_STATE_REFER) + break; + case HTTP_EVENT_REQ_BODY_END: { - http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_STATUS); + http_set_body_finish(half->flow_data); + msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_REQ_BODY, half->flow_data); + mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_REQ_BODY].topic_id, msg); } - - if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_METHOD) == STRING_STATE_REFER) + break; + case HTTP_EVENT_REQ_END: { - http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_METHOD); + if ((0 == session_is_symmetric(half->sess_ref, &flow_flag) && (SESSION_SEEN_C2S_FLOW == flow_flag))) + { + http_stat_update(&http_env->stat, thread_id, HTTP_TRANSACTION_FREE, 1); + http_stat_update(&http_env->stat, thread_id, HTTP_C2S_ASYMMETRY_TRANSACTION, 1); + } + msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_REQ_BODY, half->flow_data); + mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_REQ_BODY].topic_id, msg); + half->flow_data = NULL; // ownership move to message, free it in message free callback } + break; - if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_VERSION) == STRING_STATE_REFER) + case HTTP_EVENT_RES_INIT: { - http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_VERSION); + http_half_stage_reset(half); + if (NULL == half->flow_data) /* call llhttp_reset when headers is not completed */ + { + half->flow_data = http_half_data_new(FLOW_TYPE_S2C); + } } + break; - if (http_decoder_table_header_complete(half->ref_data->table)) + case HTTP_EVENT_RES_HDR_END: { - http_decoder_table_reset_header_complete(half->ref_data->table); + http_half_parse_headers_finally(half); + int tot_s2c_headers = http_get_header_field_count(half->flow_data); + http_stat_update(&http_env->stat, thread_id, HTTP_S2C_HEADERS, tot_s2c_headers); + if ((0 == session_is_symmetric(half->sess_ref, &flow_flag) && (SESSION_SEEN_S2C_FLOW == flow_flag))) + { + http_stat_update(&http_env->stat, thread_id, HTTP_TRANSACTION_NEW, 1); + } + http_add_header_buf_reference(half); + msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_RES_HEADER, half->flow_data); + mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_RES_HEADER].topic_id, msg); } - else + break; + case HTTP_EVENT_RES_BODY_BEGIN: + break; + case HTTP_EVENT_RES_BODY_DATA: { - // if headers are not completed with EOF \r\n\r\n, push the parsed headers so far - publish_message_for_parsed_header(half); + http_update_body_offset(half->flow_data); + msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_RES_BODY, half->flow_data); + mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_RES_BODY].topic_id, msg); } - - enum string_state hdr_key_state = - http_decoder_table_state(half->ref_data->table, HTTP_ITEM_HDRKEY); - enum string_state hdr_val_state = - http_decoder_table_state(half->ref_data->table, HTTP_ITEM_HDRVAL); - - /* Truncated in http header key - For example http header k-v => User-Agent: Chrome - case1: - packet1: User- hdr_key_state == STRING_STATE_REFER - packet2: Agent: Chrome - - case2: - packet1: User-Agent: hdr_key_state == STRING_STATE_COMMIT - hdr_val_state == STRING_STATE_INIT - packet2: Chrome - */ - if (hdr_key_state == STRING_STATE_REFER || - (hdr_key_state == STRING_STATE_COMMIT && hdr_val_state == STRING_STATE_INIT)) + break; + case HTTP_EVENT_RES_BODY_END: { - http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_HDRKEY); + http_set_body_finish(half->flow_data); + msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_RES_BODY, half->flow_data); + mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_RES_BODY].topic_id, msg); } - - /* Truncated in http header value - For example http header k-v => User-Agent: Chrome - packet1: User-Agent: Ch hdr_key_state == STRING_STATE_COMMIT - hdr_val_state == STRING_STATE_REFER - - packet2: rome - */ - if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_HDRVAL) == STRING_STATE_REFER) + break; + case HTTP_EVENT_RES_END: { - /* Header key should have been committed - If it's not cached, cache it for next packet to use - */ - http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_HDRKEY); - http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_HDRVAL); + if ((0 == session_is_symmetric(half->sess_ref, &flow_flag) && (SESSION_SEEN_S2C_FLOW == flow_flag))) + { + http_stat_update(&http_env->stat, thread_id, HTTP_S2C_ASYMMETRY_TRANSACTION, 1); + } + http_stat_update(&http_env->stat, thread_id, HTTP_TRANSACTION_FREE, 1); + msg = http_produce_message(half->sess_ref, event, HTTP_TOPIC_RES_BODY, half->flow_data); + mq_runtime_publish_message(mq_rt, http_env->http_topic_mgr->topic_compose[HTTP_TOPIC_RES_BODY].topic_id, msg); + half->flow_data = NULL; // ownership move to message, free it in message free callback } - - if (http_decoder_table_state(half->ref_data->table, HTTP_ITEM_BODY) == STRING_STATE_REFER) - { - http_decoder_table_cache(half->ref_data->table, HTTP_ITEM_BODY); + break; + default: + assert(0); + break; } } - return 0; -} - -long long http_decoder_half_trans_count(struct http_decoder_half *half) -{ - if (NULL == half) - { - return 0; - } - - long long trans_cnt = half->trans_counter; - half->trans_counter = 0; - - return trans_cnt; -} - -struct http_decoder_half_data * -http_decoder_half_data_new(nmx_pool_t *mempool) -{ - struct http_decoder_half_data *data = - MEMPOOL_CALLOC(mempool, struct http_decoder_half_data, 1); - assert(data); - - data->table = http_decoder_table_new(mempool); - assert(data->table); - - data->major_version = -1; - data->minor_version = -1; - data->status_code = -1; - - data->content_encoding = HTTP_CONTENT_ENCODING_NONE; - // data->ref_decompress_body = NULL; - // data->decompress_body_len = 0; - data->decompress_buffer_list = NULL; - return data; -} - -static void http_decoder_half_decompress_buf_free(struct http_decoder_half_data *ref_data) -{ - if (ref_data == NULL) + static void http_half_stage_buf_free(struct http_half *half) { - return; - } - struct http_decompress_buffer *el, *tmp; - DL_FOREACH_SAFE(ref_data->decompress_buffer_list, el, tmp) - { - DL_DELETE(ref_data->decompress_buffer_list, el); - if (el->iov.iov_base != NULL) + if (half->cached_header_buffer != NULL) { - FREE(el->iov.iov_base); + http_buffer_free(half->cached_header_buffer); } - FREE(el); - } - ref_data->decompress_buffer_list = NULL; -} - -void http_decoder_half_data_free(nmx_pool_t *mempool, struct http_decoder_half_data *data) -{ - if (NULL == data) - { - return; - } - - if (data->table != NULL) - { - http_decoder_table_free(data->table); - data->table = NULL; + half->cached_header_buffer = NULL; } - if (data->decompress != NULL) + static void http_half_stage_buf_reuse(struct http_half *half) { - http_content_decompress_destroy(data->decompress); - data->decompress = NULL; + http_half_stage_buf_free(half); + half->cached_header_buffer = http_buffer_new(); } - if (data->joint_url.iov_base) - { - MEMPOOL_FREE(mempool, data->joint_url.iov_base); - data->joint_url.iov_base = NULL; - data->joint_url_complete = 0; - } - http_decoder_half_decompress_buf_free(data); - MEMPOOL_FREE(mempool, data); -} - -int http_decoder_half_data_get_request_line(struct http_decoder_half_data *data, - struct http_request_line *line) -{ - http_decoder_table_get_method(data->table, &line->method, &line->method_len); - http_decoder_table_get_uri(data->table, &line->uri, &line->uri_len); - http_decoder_table_get_version(data->table, &line->version, &line->version_len); - - line->major_version = data->major_version; - line->minor_version = data->minor_version; - - return 0; -} - -int http_decoder_half_data_get_response_line(struct http_decoder_half_data *data, - struct http_response_line *line) -{ - http_decoder_table_get_version(data->table, &line->version, &line->version_len); - http_decoder_table_get_status(data->table, &line->status, &line->status_len); - - line->major_version = data->major_version; - line->minor_version = data->minor_version; - line->status_code = data->status_code; - - return 0; -} - -int http_decoder_half_data_get_header(const struct http_decoder_half_data *data, - const char *name, size_t name_len, - struct http_header_field *hdr_result) -{ - return http_decoder_table_get_header(data->table, name, name_len, hdr_result); -} - -int http_decoder_half_data_iter_header(struct http_decoder_half_data *data, - struct http_header_field *header) -{ - return http_decoder_table_iter_header((struct http_decoder_table *)data->table, header); -} - -int http_decoder_half_data_reset_header_iter(struct http_decoder_half_data *req_data) -{ - if (NULL == req_data) - { - return -1; - } - return http_decoder_table_reset_header_iter(req_data->table); -} - -int http_decoder_half_data_has_parsed_header(struct http_decoder_half_data *data) -{ - if (NULL == data) + static int http_half_stage_buf_append(struct http_half *half, const char *newdata, size_t newdata_len) { + if (half->cached_header_buffer == NULL) + { + half->cached_header_buffer = http_buffer_new(); + } + if (half->cached_header_buffer->buffer_size > HTTP_HEADERS_CACHE_MAX_SIZE) + { + return -1; + } + http_buffer_add(half->cached_header_buffer, newdata, newdata_len); return 0; } - return http_decoder_table_has_parsed_header(data->table); -} - -int http_decoder_half_data_get_raw_body(const struct http_decoder_half_data *data, const char **body, size_t *body_len) -{ - if (NULL == data || NULL == body) - { - return -1; - } - return http_decoder_table_get_body(data->table, (char **)body, body_len); -} -#if 0 -int http_decoder_half_data_get_decompress_body(const struct http_decoder_half_data *data, hstring *body) -{ - if (HTTP_CONTENT_ENCODING_NONE == data->content_encoding) - { - return http_decoder_table_get_body(data->table, body); - } - - body->iov_base = data->ref_decompress_body; - body->iov_len = data->decompress_body_len; - return 0; -} -#endif - -void http_decoder_half_data_dump(struct http_decoder_half *half) -{ - if (NULL == half || NULL == half->ref_data) - { - return; - } - - http_decoder_table_dump(half->ref_data->table); -} - -static void using_session_addr_as_host(struct session *ref_session, struct http_header_field *host_result, nmx_pool_t *mempool) -{ -#if 1 // in native steallar, can't get the tuple4 from the session yet!!! - struct httpd_session_addr ssaddr = {}; - httpd_session_get_addr(ref_session, &ssaddr); - if (ssaddr.ipver != 4 && ssaddr.ipver != 6) - { - host_result->value = MEMPOOL_CALLOC(mempool, char, 1); - sprintf((char *)host_result->value, "%s", ""); - host_result->value_len = strlen((char *)host_result->value); - return; - } - - char ip_string_buf[INET6_ADDRSTRLEN]; - if (4 == ssaddr.ipver) - { - host_result->value = MEMPOOL_CALLOC(mempool, char, (INET_ADDRSTRLEN + 7) /* "ip:port" max length */); - inet_ntop(AF_INET, &ssaddr.daddr4, ip_string_buf, INET_ADDRSTRLEN); - sprintf((char *)host_result->value, "%s:%u", ip_string_buf, ntohs(ssaddr.dport)); - host_result->value_len = strlen((char *)host_result->value); - } - else if (6 == ssaddr.ipver) - { - host_result->value = MEMPOOL_CALLOC(mempool, char, (INET6_ADDRSTRLEN + 7) /* "ip:port" max length */); - inet_ntop(AF_INET6, &ssaddr.daddr6, ip_string_buf, INET6_ADDRSTRLEN); - sprintf((char *)host_result->value, "%s:%u", ip_string_buf, ntohs(ssaddr.dport)); - host_result->value_len = strlen((char *)host_result->value); - } - else - { - assert(0); - } -#else - host_result->val.iov_base = MEMPOOL_CALLOC(mempool, char, 32); - sprintf((char *)host_result->val.iov_base, "%s", "todo:get_tuple4"); - host_result->val.iov_len = strlen((char *)host_result->val.iov_base); -#endif -} -void http_decoder_join_url(struct http_decoder_half_data *hfdata, nmx_pool_t *mempool, const struct http_header_field *host_hdr) -{ - int append_slash_len = 0; - if ('/' != ((char *)hfdata->joint_url.iov_base)[0]) + static int http_half_cache_uncompleted_header(struct http_half *half, int mem_merged, const char *data, size_t data_len) { - append_slash_len = 1; - } - int url_cache_str_len = host_hdr->value_len + hfdata->joint_url.iov_len + append_slash_len; - char *url_cache_str = MEMPOOL_CALLOC(mempool, char, url_cache_str_len); + int ret = 0; + const char *cached_begin_ptr = NULL; + long long cached_len = 0; + if (half->half_stage.last_headers_complete_ptr == NULL) + { + if (mem_merged) + { + ret = 0; // no pipeline and memory have been merged, do nothing + } + else + { + ret = http_half_stage_buf_append(half, data, (size_t)data_len); + } + } + else + { + /* + has pipeline, cache begin ptr is the last completed header_field_value + "\r\n\r\n", + for example: + GET xxx \r\nheader_name:header:value\r\nheader_name:header:value\r\n\r\nGET yyy ... + ^ + | + cache begin ptr + */ + cached_begin_ptr = half->half_stage.last_headers_complete_ptr; + cached_len = (long long)data_len - (long long)(cached_begin_ptr - data); + assert(cached_len > 0); - char *ptr = url_cache_str; - memcpy(ptr, host_hdr->value, host_hdr->value_len); - ptr += host_hdr->value_len; - if (append_slash_len) - { - *ptr = '/'; - ptr++; + if (mem_merged) + { + /* some message have beed pushed, and pointer to half->half_stage->half_buffer, + the half->half_stage->half_buffer is maintain by messages , + so, we can reset the half->half_stage->half_buffer, and append the new data to it. + */ + http_half_stage_buf_reuse(half); + ret = http_half_stage_buf_append(half, cached_begin_ptr, (size_t)cached_len); + } + else + { + ret = http_half_stage_buf_append(half, cached_begin_ptr, (size_t)cached_len); + } + } + return ret; } - memcpy(ptr, hfdata->joint_url.iov_base, hfdata->joint_url.iov_len); - MEMPOOL_FREE(mempool, hfdata->joint_url.iov_base); // free the cached uri buffer - hfdata->joint_url.iov_base = url_cache_str; - hfdata->joint_url.iov_len = url_cache_str_len; - hfdata->joint_url_complete = 1; -} - -void http_decoder_get_url(struct http_decoder_half_data *hfdata, nmx_pool_t *mempool) -{ - struct http_request_line reqline = {}; - http_decoder_half_data_get_request_line(hfdata, &reqline); - if (unlikely(http_strncasecmp_safe("CONNECT", (char *)reqline.method, 7, reqline.method_len) == 0)) + int http_flow_parse(struct http_half *half, const char *data, size_t data_len) { - hfdata->joint_url.iov_base = MEMPOOL_CALLOC(mempool, char, reqline.uri_len + 1); - memcpy(hfdata->joint_url.iov_base, reqline.uri, reqline.uri_len); - hfdata->joint_url.iov_len = reqline.uri_len; - hfdata->joint_url_complete = 1; + assert(half && data); + llhttp_errno_t ret = HPE_OK /* HPE_OK = 0 */; + ret = llhttp_execute(&half->parser.llhttp_parser, data, data_len); + switch (ret) + { + case HPE_OK: + break; + case HPE_PAUSED: + llhttp_resume(&half->parser.llhttp_parser); + break; + case HPE_PAUSED_UPGRADE: + llhttp_resume_after_upgrade(&half->parser.llhttp_parser); + break; + default: + break; + } + return ret; } -} -int http_decoder_join_url_finally(struct http_event_context *ev_ctx, struct http_decoder_half_data *hfdata, nmx_pool_t *mempool) -{ - if (hfdata->joint_url_complete) + struct http_half *http_flow_get_nx(struct http_exdata *exdata, enum flow_type flow_dir, struct session *sess, struct http *http_env) { - return 0; - } - struct http_header_field addr_as_host = {}; - using_session_addr_as_host(ev_ctx->ref_session, &addr_as_host, mempool); - http_decoder_join_url(hfdata, mempool, &addr_as_host); - MEMPOOL_FREE(mempool, addr_as_host.value); // free session addr to host buffer - return 1; -} + struct http_half **flow = NULL; -void http_decoder_get_host_feed_url(struct http_decoder_half *half) -{ - if (half->ref_data->joint_url_complete) - { - return; - } - struct http_header_field host_result = {}; - int host_header_cnt = http_decoder_half_data_get_header(half->ref_data, (char *)"Host", 4, &host_result); - if (host_header_cnt < 0) - { - return; + if (FLOW_TYPE_C2S == flow_dir) + { + flow = &exdata->decoder->flow_c2s; + } + else if (FLOW_TYPE_S2C == flow_dir) + { + flow = &exdata->decoder->flow_s2c; + } + else + { + assert(0); + return NULL; + } + if (NULL == *flow) + { + *flow = http_half_new(flow_dir); + (*flow)->http_env_ref = http_env; + (*flow)->sess_ref = sess; + } + return *flow; } - http_decoder_join_url(half->ref_data, half->http_ev_ctx->ref_mempool, &host_result); -} -int http_half_data_get_url(struct http_decoder_half_data *res_data, const char **url_val, size_t *url_len) -{ - if (0 == res_data->joint_url_complete) - { - return -1; - } - *url_val = res_data->joint_url.iov_base; - *url_len = res_data->joint_url.iov_len; - return 0; -} -#if 0 -int http_half_data_get_decode_url(struct http_decoder_half_data *res_data, hstring *url) -{ - if (0 == res_data->joint_url_complete) + int http_half_flow_process(struct http_half *half, const char *newdata, size_t newdata_len) { - return -1; - } - url->iov_base = res_data->decoded_url.iov_base; - url->iov_len = res_data->decoded_url.iov_len; - return 0; -} -#endif - -int http_half_data_get_transaction_seq(struct http_decoder_half_data *hf_data) -{ - return hf_data->transaction_index; -} - -void http_half_data_update_commit_index(struct http_decoder_half_data *half_data) -{ - http_decoder_table_update_commit_index(half_data->table); -} + int ret = 0; + int mem_merged = 0; + const char *parse_data = newdata; + size_t parse_data_len = newdata_len; -int http_half_data_get_total_parsed_header_count(struct http_decoder_half_data *half_data) -{ - return http_decoder_table_get_total_parsed_header(half_data->table); -} - -void http_half_pre_context_free(struct session *sess, struct http_decoder_exdata *exdata) -{ - struct http_message *msg = NULL; - struct http_decoder_half_data *res_data = NULL; - struct http_decoder_result_queue *queue = NULL; - - if (exdata) - { - queue = exdata->queue; - for (size_t i = 0; i < queue->queue_size; i++) + /* merge cached uncompleted header and current tcp payload first */ + if (half->cached_header_buffer != NULL && half->cached_header_buffer->buffer != NULL) { - struct http_decoder_half_data *req_data = queue->array[i].req_data; - res_data = queue->array[i].res_data; - if ((req_data != NULL) && (NULL == res_data) && (req_data->state < HTTP_EVENT_REQ_END)) + http_half_stage_buf_append(half, newdata, newdata_len); + parse_data = half->cached_header_buffer->buffer; + parse_data_len = half->cached_header_buffer->buffer_size; + mem_merged = 1; + if (parse_data_len > HTTP_HEADERS_CACHE_MAX_SIZE) { - msg = http_message_new(HTTP_TRANSACTION_END, queue, i, HTTP_REQUEST); - session_mq_publish_message(sess, exdata->pub_topic_id, msg); + STELLAR_LOG_FATAL(half->http_env_ref->logger_ref, HTTP_MODULE_NAME, + "sess: %s, headers buf len more than %d", session_get_readable_addr(half->sess_ref), HTTP_HEADERS_CACHE_MAX_SIZE); + return -1; } } + ret = http_flow_parse(half, parse_data, parse_data_len); + if (ret != HPE_OK) + { + STELLAR_LOG_FATAL(half->http_env_ref->logger_ref, HTTP_MODULE_NAME, + "llhttp parser error: %d, %s. sess: %s", ret, llhttp_errno_name(ret), session_get_readable_addr(half->sess_ref)); + return -1; + } - for (size_t i = 0; i < queue->queue_size; i++) + if (half->half_stage.llhttp_last_stage >= LLHTTP_STAGE_MESSAGE_BEGIN && + half->half_stage.llhttp_last_stage < LLHTTP_STAGE_HEADERS_COMPLETE) /* some headers not completed */ { - res_data = queue->array[i].res_data; - if ((res_data != NULL) && (res_data->state < HTTP_EVENT_RES_END)) + STELLAR_LOG_DEBUG(half->http_env_ref->logger_ref, HTTP_MODULE_NAME, + "sess: %s, header not completed, need caching, %.*s", + session_get_readable_addr(half->sess_ref), parse_data_len > 16 ? 16 : parse_data_len, parse_data); + if (http_half_cache_uncompleted_header(half, mem_merged, parse_data, parse_data_len) < 0) { - msg = http_message_new(HTTP_TRANSACTION_END, queue, i, HTTP_RESPONSE); - session_mq_publish_message(sess, exdata->pub_topic_id, msg); + STELLAR_LOG_FATAL(half->http_env_ref->logger_ref, HTTP_MODULE_NAME, + "sess: %s, headers buf len more than %d", session_get_readable_addr(half->sess_ref), HTTP_HEADERS_CACHE_MAX_SIZE); + return -1; } + + /* uncompleted_header, reset llhttp state and all headers */ + http_half_stage_reset(half); + llhttp_reset(&half->parser.llhttp_parser); + return 0; + } + else + { + /* ownership is maintained by header message, free it here (reference-- actually) */ + http_half_stage_buf_free(half); } + return 0; } -} -void http_half_update_state(struct http_decoder_half_data *hf_data, enum http_event state) -{ - hf_data->state = state; +#ifdef __cplusplus } - -void http_half_get_max_transaction_seq(struct http_decoder_exdata *exdata, long long *max_req_seq, long long *max_res_seq) -{ - assert(exdata && max_req_seq && max_res_seq); - *max_req_seq = exdata->decoder->c2s_half->transaction_seq; - *max_res_seq = exdata->decoder->s2c_half->transaction_seq; -} - -enum http_content_encoding http_half_data_get_content_encoding(struct http_decoder_half_data *hf_data) -{ - if (NULL == hf_data) - { - return HTTP_CONTENT_ENCODING_NONE; - } - return hf_data->content_encoding; -}
\ No newline at end of file +#endif |
