diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/http_decoder.cpp | 408 | ||||
| -rw-r--r-- | src/http_decoder_half.cpp | 139 | ||||
| -rw-r--r-- | src/http_decoder_half.h | 23 | ||||
| -rw-r--r-- | src/http_decoder_inc.h | 36 | ||||
| -rw-r--r-- | src/http_decoder_stat.cpp | 43 | ||||
| -rw-r--r-- | src/http_decoder_stat.h | 16 | ||||
| -rw-r--r-- | src/http_decoder_string.cpp | 118 | ||||
| -rw-r--r-- | src/http_decoder_string.h | 8 | ||||
| -rw-r--r-- | src/http_decoder_table.cpp | 56 | ||||
| -rw-r--r-- | src/http_decoder_table.h | 12 | ||||
| -rw-r--r-- | src/http_decoder_tunnel.cpp | 99 | ||||
| -rw-r--r-- | src/http_decoder_tunnel.h | 21 | ||||
| -rw-r--r-- | src/http_decoder_utils.cpp | 20 | ||||
| -rw-r--r-- | src/http_decoder_utils.h | 2 | ||||
| -rw-r--r-- | src/version.map | 1 |
16 files changed, 685 insertions, 319 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 1d3c949..469591b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -10,7 +10,7 @@ aux_source_directory(${PROJECT_SOURCE_DIR}/deps/toml DEPS_SRC) set(HTTP_SRC ${DEPS_SRC} http_decoder.cpp http_decoder_utils.cpp http_decoder_half.cpp http_decoder_table.cpp http_decoder_string.cpp http_content_decompress.cpp - http_decoder_result_queue.cpp http_decoder_stat.cpp) + http_decoder_result_queue.cpp http_decoder_stat.cpp http_decoder_tunnel.cpp) add_library(http_decoder SHARED ${HTTP_SRC}) set_target_properties(http_decoder PROPERTIES LINK_FLAGS "-Wl,--version-script=${PROJECT_SOURCE_DIR}/src/version.map") diff --git a/src/http_decoder.cpp b/src/http_decoder.cpp index 8aa39e9..3af4dad 100644 --- a/src/http_decoder.cpp +++ b/src/http_decoder.cpp @@ -1,3 +1,4 @@ +#include "http_decoder.h" #include <assert.h> #include <stdio.h> #include <string.h> @@ -36,6 +37,7 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d struct http_decoder_half_data *half_data = NULL; int ret = 0; u_int8_t flow_flag = 0; + struct http_decoder_exdata *exdata = ev_ctx->ref_httpd_ctx; int thread_id = stellar_get_current_thread_id(httpd_env->st); @@ -74,44 +76,53 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d } *data = half_data; queue_idx = http_decoder_result_queue_req_index(queue); //get the index after inc -#if 1 /* llhttp always call on_message_begin() even if llhttp_execute() error!!! */ + /* llhttp always call on_message_begin() even if llhttp_execute() error!!! */ msg = http_message_new(HTTP_TRANSACTION_NEW, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_NEW, 1); -#endif break; case HTTP_EVENT_REQ_LINE: -#if 0 - msg = http_message_new(HTTP_TRANSACTION_NEW, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_NEW, 1); -#endif msg = http_message_new(HTTP_MESSAGE_REQ_LINE, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); + if(httpd_tunnel_identify(PACKET_DIRECTION_C2S, half_data)){ + exdata->tunnel_state = HTTP_TUN_C2S_HDR_START; + } + http_decoder_get_url(half_data, mempool); break; case HTTP_EVENT_REQ_HDR: msg = http_message_new(HTTP_MESSAGE_REQ_HEADER, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); break; case HTTP_EVENT_REQ_HDR_END: { - http_decoder_join_url_finally(ev_ctx, http_decoder_result_queue_peek_req(queue), mempool); + http_decoder_join_url_finally(ev_ctx, half_data, mempool); /* maybe some parsed headers in buffer, but has not pushed to plugins yet */ - half_data = http_decoder_result_queue_peek_req(queue); + if(http_decoder_half_data_has_parsed_header(half_data)){ msg = http_message_new(HTTP_MESSAGE_REQ_HEADER, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); } http_half_data_update_commit_index(half_data); msg = http_message_new(HTTP_MESSAGE_REQ_HEADER_END, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); int tot_c2s_headers = http_half_data_get_total_parsed_header_count(half_data); http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_HEADERS_C2S, tot_c2s_headers); - struct hstring tmp_url = {}; + hstring tmp_url = {}; http_half_data_get_url(half_data, &tmp_url); - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_URL_BYTES, tmp_url.str_len); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_URL_BYTES, tmp_url.iov_len); + + if(httpd_is_tunnel_session(exdata)){ + session_is_symmetric(ev_ctx->ref_session, &flow_flag); + if(SESSION_SEEN_C2S_FLOW == flow_flag){ + exdata->tunnel_state = HTTP_TUN_INNER_STARTING; + http_half_pre_context_free(ev_ctx->ref_session, exdata); + exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].sub_topic_id; + }else{ + exdata->tunnel_state = HTTP_TUN_C2S_HDR_END; + } + } } break; case HTTP_EVENT_REQ_BODY_BEGIN: @@ -119,18 +130,18 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d break; case HTTP_EVENT_REQ_BODY_DATA: msg = http_message_new(HTTP_MESSAGE_REQ_BODY, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); break; case HTTP_EVENT_REQ_BODY_END: msg = http_message_new(HTTP_MESSAGE_REQ_BODY_END, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); break; case HTTP_EVENT_REQ_END: { session_is_symmetric(ev_ctx->ref_session, &flow_flag); if(SESSION_SEEN_C2S_FLOW == flow_flag){ msg = http_message_new(HTTP_TRANSACTION_FREE, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_FREE, 1); http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_ASYMMETRY_TRANSACTION_C2S, 1); } @@ -173,17 +184,23 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d if(0 == session_is_symmetric(ev_ctx->ref_session, &flow_flag)){ if(SESSION_SEEN_S2C_FLOW == flow_flag){ msg = http_message_new(HTTP_TRANSACTION_NEW, queue, queue_idx, HTTP_RESPONSE); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); } } break; case HTTP_EVENT_RES_LINE: msg = http_message_new(HTTP_MESSAGE_RES_LINE, queue, queue_idx, HTTP_RESPONSE); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); + if(httpd_tunnel_identify(PACKET_DIRECTION_S2C, half_data)){ + exdata->tunnel_state = HTTP_TUN_S2C_START; + }else{ + //connect response fail, reset tunnel_state + exdata->tunnel_state = HTTP_TUN_NON; + } break; case HTTP_EVENT_RES_HDR: msg = http_message_new(HTTP_MESSAGE_RES_HEADER, queue, queue_idx, HTTP_RESPONSE); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); break; case HTTP_EVENT_RES_HDR_END: { @@ -191,29 +208,36 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d half_data = http_decoder_result_queue_peek_res(queue); if(http_decoder_half_data_has_parsed_header(half_data)){ msg = http_message_new(HTTP_MESSAGE_RES_HEADER, queue, queue_idx, HTTP_RESPONSE); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); } http_half_data_update_commit_index(half_data); msg = http_message_new(HTTP_MESSAGE_RES_HEADER_END, queue, queue_idx, HTTP_RESPONSE); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); int tot_s2c_headers = http_half_data_get_total_parsed_header_count(half_data); http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_HEADERS_S2C, tot_s2c_headers); + + if(httpd_is_tunnel_session(exdata)){ + exdata->tunnel_state = HTTP_TUN_INNER_STARTING; + http_half_pre_context_free(ev_ctx->ref_session, exdata); + exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].sub_topic_id; + // http_decoder_push_tunnel_data(ev_ctx->ref_session, exdata, HTTP_TUNNEL_OPENING); + } } break; case HTTP_EVENT_RES_BODY_BEGIN: break; case HTTP_EVENT_RES_BODY_DATA: msg = http_message_new(HTTP_MESSAGE_RES_BODY, queue, queue_idx, HTTP_RESPONSE); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); break; case HTTP_EVENT_RES_BODY_END: msg = http_message_new(HTTP_MESSAGE_RES_BODY_END, queue, queue_idx, HTTP_RESPONSE); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); break; case HTTP_EVENT_RES_END: msg = http_message_new(HTTP_TRANSACTION_FREE, queue, queue_idx, HTTP_RESPONSE); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_FREE, 1); session_is_symmetric(ev_ctx->ref_session, &flow_flag); if(SESSION_SEEN_S2C_FLOW == flow_flag){ @@ -237,12 +261,14 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d } } -static struct http_decoder *http_decoder_new(nmx_pool_t *mempool, http_event_cb *ev_cb, int decompress_switch, struct http_decoder_env *httpd_env) +static struct http_decoder *http_decoder_new(struct http_decoder_exdata *hd_ctx, nmx_pool_t *mempool, + http_event_cb *ev_cb, int decompress_switch, struct http_decoder_env *httpd_env, + long long req_start_seq, long long res_start_seq) { struct http_decoder *decoder = MEMPOOL_CALLOC(mempool, struct http_decoder, 1); assert(decoder); - decoder->c2s_half = http_decoder_half_new(mempool, ev_cb, HTTP_REQUEST, decompress_switch, httpd_env); - decoder->s2c_half = http_decoder_half_new(mempool, ev_cb, HTTP_RESPONSE, decompress_switch, httpd_env); + decoder->c2s_half = http_decoder_half_new(hd_ctx, mempool, ev_cb, HTTP_REQUEST, decompress_switch, httpd_env, req_start_seq); + decoder->s2c_half = http_decoder_half_new(hd_ctx, mempool, ev_cb, HTTP_RESPONSE, decompress_switch, httpd_env, res_start_seq); return decoder; } @@ -266,13 +292,15 @@ static void http_decoder_free(nmx_pool_t *mempool, struct http_decoder *decoder) } static struct http_decoder_exdata *http_decoder_exdata_new(size_t mempool_size, size_t queue_size, - int decompress_switch, struct http_decoder_env *httpd_env) + int decompress_switch, struct http_decoder_env *httpd_env, + long long req_start_seq, long long res_start_seq) { - struct http_decoder_exdata *ex_data = CALLOC(struct http_decoder_exdata, 1); - ex_data->mempool = nmx_create_pool(mempool_size); - ex_data->decoder = http_decoder_new(ex_data->mempool, http_event_handler, decompress_switch, httpd_env); - ex_data->queue = http_decoder_result_queue_new(ex_data->mempool, queue_size); - return ex_data; + struct http_decoder_exdata *hd_ctx = CALLOC(struct http_decoder_exdata, 1); + hd_ctx->mempool = nmx_create_pool(mempool_size); + hd_ctx->decoder = http_decoder_new(hd_ctx, hd_ctx->mempool, http_event_handler, decompress_switch, + httpd_env, req_start_seq, res_start_seq); + hd_ctx->queue = http_decoder_result_queue_new(hd_ctx->mempool, queue_size); + return hd_ctx; } static void http_decoder_exdata_free(struct http_decoder_exdata *ex_data) @@ -291,8 +319,9 @@ static void http_decoder_exdata_free(struct http_decoder_exdata *ex_data) http_decoder_result_queue_free(ex_data->mempool, ex_data->queue); ex_data->queue = NULL; } - - nmx_destroy_pool(ex_data->mempool); + if(ex_data->mempool){ + nmx_destroy_pool(ex_data->mempool); + } FREE(ex_data); } @@ -310,7 +339,7 @@ static int http_protocol_identify(const char *data, size_t data_len) { return -1; } - return 0; + return 1; } static void _http_decoder_context_free(struct http_decoder_env *env) @@ -327,10 +356,10 @@ static void _http_decoder_context_free(struct http_decoder_env *env) http_decoder_stat_free(&env->hd_stat); - if (env->httpd_msg_topic_id >= 0) - { - stellar_session_mq_destroy_topic(env->st, env->httpd_msg_topic_id); - env->httpd_msg_topic_id = -1; + for(int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++){ + if(env->topic_exdata_compose[i].msg_free_cb){ + stellar_session_mq_destroy_topic(env->st, env->topic_exdata_compose[i].sub_topic_id); + } } FREE(env); @@ -412,7 +441,7 @@ static int load_http_decoder_config(const char *cfg_path, return ret; } -static int http_msg_get_request_header(const struct http_message *msg, const struct hstring *key, +static int http_msg_get_request_header(const struct http_message *msg, const hstring *key, struct http_header *hdr_result) { const struct http_decoder_half_data *req_data = @@ -420,7 +449,7 @@ static int http_msg_get_request_header(const struct http_message *msg, const str return http_decoder_half_data_get_header(req_data, key, hdr_result); } -static int http_msg_get_response_header(const struct http_message *msg, const struct hstring *key, +static int http_msg_get_response_header(const struct http_message *msg, const hstring *key, struct http_header *hdr_result) { const struct http_decoder_half_data *res_data = @@ -445,7 +474,7 @@ static int http_msg_response_header_next(const struct http_message *msg, } static int http_msg_get_request_raw_body(const struct http_message *msg, - struct hstring *body) + hstring *body) { const struct http_decoder_half_data *req_data = msg->ref_queue->array[msg->queue_index].req_data; @@ -453,7 +482,7 @@ static int http_msg_get_request_raw_body(const struct http_message *msg, } static int http_msg_get_response_raw_body(const struct http_message *msg, - struct hstring *body) + hstring *body) { const struct http_decoder_half_data *res_data = msg->ref_queue->array[msg->queue_index].res_data; @@ -461,7 +490,7 @@ static int http_msg_get_response_raw_body(const struct http_message *msg, } static int http_msg_get_request_decompress_body(const struct http_message *msg, - struct hstring *body) + hstring *body) { const struct http_decoder_half_data *req_data = msg->ref_queue->array[msg->queue_index].req_data; @@ -469,19 +498,32 @@ static int http_msg_get_request_decompress_body(const struct http_message *msg, } static int http_msg_get_response_decompress_body(const struct http_message *msg, - struct hstring *body) + hstring *body) { const struct http_decoder_half_data *res_data = msg->ref_queue->array[msg->queue_index].res_data; return http_decoder_half_data_get_decompress_body(res_data, body); } +static struct http_decoder_exdata *httpd_session_exdata_new(struct session *sess, struct http_decoder_env *httpd_env, + long long req_start_seq, long long res_start_seq) +{ + struct http_decoder_exdata *exdata = http_decoder_exdata_new(httpd_env->hd_cfg.mempool_size, + httpd_env->hd_cfg.result_queue_len, + httpd_env->hd_cfg.decompress_switch, + httpd_env,req_start_seq,res_start_seq); + // exdata->sub_topic_id = sub_topic_id; + int thread_id = stellar_get_current_thread_id(httpd_env->st); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_SESSION_NEW, 1); + return exdata; +} + #ifdef __cplusplus extern "C" { #endif - void _httpd_ex_data_free_cb(struct session *s, int idx, void *ex_data, void *arg) + void httpd_ex_data_free_cb(struct session *s, int idx, void *ex_data, void *arg) { if (NULL == ex_data) { @@ -491,7 +533,7 @@ extern "C" http_decoder_exdata_free(exdata); } - void *_httpd_session_ctx_new_cb(struct session *sess, void *plugin_env) + void *httpd_session_ctx_new_cb(struct session *sess, void *plugin_env) { // If not http, ignore this session size_t payload_len; @@ -504,31 +546,23 @@ extern "C" if (ret < 0) { stellar_session_plugin_dettach_current_session(sess); - return (void *)"not_http_session"; + return (void *)"__not_http_session__"; } } - struct http_decoder_exdata *ex_data = http_decoder_exdata_new(httpd_env->hd_cfg.mempool_size, - httpd_env->hd_cfg.result_queue_len, - httpd_env->hd_cfg.decompress_switch, - httpd_env); - session_exdata_set(sess, httpd_env->ex_data_idx, ex_data); - int thread_id = stellar_get_current_thread_id(httpd_env->st); - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_SESSION_NEW, 1); - - return (void *)"fake_http_decoder_ctx"; // http decoder not use ctx, use exdata only! + return (void *)"__fake_http_decoder_ctx__"; } - void _httpd_session_ctx_free_cb(struct session *sess, void *session_ctx, void *plugin_env) + void httpd_session_ctx_free_cb(struct session *sess, void *session_ctx, void *plugin_env) { if(NULL == plugin_env || NULL == session_ctx){ return; } - if(strncmp((const char *)session_ctx, "not_http_session", strlen("not_http_session")) == 0){ + if(strncmp((const char *)session_ctx, "__not_http_session__", strlen("__not_http_session__")) == 0){ return; } struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env; - int thread_id = stellar_get_current_thread_id(httpd_env->st); + int thread_id = session_get_current_thread_id(sess); unsigned char flow_flag = 0; session_is_symmetric(sess, &flow_flag); @@ -541,53 +575,189 @@ extern "C" } } - void http_decoder_tcp_stream_msg_cb(struct session *sess, int topic_id, const void *msg, void *no_use_ctx, void *plugin_env) + static void http_decoder_execute(struct session *sess, struct http_decoder_env *httpd_env, http_decoder_exdata *exdata) { - struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env; - struct http_decoder_exdata *ex_data = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->ex_data_idx); size_t payload_len; - - if(SESSION_STATE_CLOSING == session_get_current_state(sess)){ - http_half_pre_context_free(sess, httpd_env, ex_data); - return; - } - const char *payload = session_get0_current_payload(sess, &payload_len); - if (unlikely(0 == payload_len || NULL == ex_data)) + if (unlikely(0 == payload_len || NULL == payload)) { return; } - int thread_id = stellar_get_current_thread_id(httpd_env->st); - struct http_decoder_half *cur_half = NULL; + if(httpd_in_tunnel_transmitting(exdata)){ + http_decoder_push_tunnel_data(sess, exdata, httpd_tunnel_state_to_msg(exdata)); + httpd_tunnel_state_update(exdata); + return; + } - if (PACKET_DIRECTION_C2S == packet_get_direction(session_get0_current_packet(sess))) + int thread_id = session_get_current_thread_id(sess); + struct http_decoder_half *cur_half = NULL; + int sess_dir = packet_get_direction(session_get0_current_packet(sess)); + if (PACKET_DIRECTION_C2S == sess_dir) { - cur_half = ex_data->decoder->c2s_half; + cur_half = exdata->decoder->c2s_half; http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_BYTES_C2S, payload_len); http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TCP_SEG_C2S, 1); } else { - cur_half = ex_data->decoder->s2c_half; + cur_half = exdata->decoder->s2c_half; http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_BYTES_S2C, payload_len); http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TCP_SEG_S2C, 1); } - http_decoder_half_reinit(cur_half, httpd_env->httpd_msg_topic_id, ex_data->queue, - ex_data->mempool, sess); + http_decoder_half_reinit(cur_half, exdata->queue, exdata->mempool, sess); int ret = http_decoder_half_parse(cur_half, payload, payload_len); if (ret < 0) { http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_PARSE_ERR, 1); stellar_session_plugin_dettach_current_session(sess); } + } + + void http_decoder_tunnel_msg_cb(struct session *sess, int topic_id, const void *tmsg, void *per_session_ctx, void *plugin_env) + { + struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env; + struct http_decoder_exdata *exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].exdata_id); + enum http_tunnel_message_type tmsg_type = http_tunnel_message_type_get((const struct http_tunnel_message *)tmsg); + switch (tmsg_type) + { + case HTTP_TUNNEL_OPENING: + { + if(NULL != exdata){ + //not support nested http tunnel + session_mq_ignore_message(sess, topic_id, httpd_env->plugin_id); + return; + } + size_t payload_len; + const char *payload = session_get0_current_payload(sess, &payload_len); + size_t http_identify_len = payload_len > HTTP_IDENTIFY_LEN ? HTTP_IDENTIFY_LEN : payload_len; + int is_http = http_protocol_identify(payload, http_identify_len); + if(is_http){ + long long max_req_seq = 0, max_res_seq = 0; + struct http_decoder_exdata *tcp_stream_exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id); + http_half_get_max_transaction_seq(tcp_stream_exdata, &max_req_seq, &max_res_seq); + exdata = httpd_session_exdata_new(sess, httpd_env, max_req_seq, max_res_seq); + session_exdata_set(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].exdata_id, exdata); + exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_MSG_INDEX].sub_topic_id; + exdata->in_tunnel_is_http = 1; + }else{ + exdata = CALLOC(struct http_decoder_exdata, 1); + exdata->decoder = NULL; + exdata->pub_topic_id = -1; + exdata->in_tunnel_is_http = 0; + session_exdata_set(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].exdata_id, exdata); + //do nothing, but can't call stellar_session_plugin_dettach_current_session() !!! + return; + } + } + break; + + case HTTP_TUNNEL_ACTIVE: + break; + + case HTTP_TUNNEL_CLOSING: + if(exdata->in_tunnel_is_http){ + http_half_pre_context_free(sess, exdata); + } + return; + break; + + default: + break; + } + if(exdata->in_tunnel_is_http){ + http_decoder_execute(sess, httpd_env, exdata); + } + return; + } + + void http_decoder_tcp_stream_msg_cb(struct session *sess, int topic_id, const void *msg, void *nouse_session_ctx, void *plugin_env) + { + struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env; + enum session_state sess_state = session_get_current_state(sess); + struct http_decoder_exdata *exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id); + + switch(sess_state){ + case SESSION_STATE_OPENING: + { + exdata = httpd_session_exdata_new(sess, httpd_env, 0, 0); + exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_MSG_INDEX].sub_topic_id; + session_exdata_set(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id, exdata); + //go on + } + break; + + case SESSION_STATE_ACTIVE: + //go on + break; + + case SESSION_STATE_CLOSING: + { + if(httpd_in_tunnel_transmitting(exdata)){ + http_decoder_push_tunnel_data(sess, exdata, HTTP_TUNNEL_CLOSING); + }else{ + http_half_pre_context_free(sess, exdata); + } + return; + } + break; + + default: + return; + break; + } + + http_decoder_execute(sess, httpd_env, exdata); + return; } +static const struct http_topic_exdata_compose g_topic_exdata_compose[HTTPD_TOPIC_INDEX_MAX] = +{ + {HTTPD_TOPIC_TCP_STREAM_INDEX, TOPIC_TCP_STREAM, http_decoder_tcp_stream_msg_cb, NULL, "HTTP_DECODER_EXDATA_BASEON_TCP_STREAM", httpd_ex_data_free_cb, -1, -1}, + {HTTPD_TOPIC_HTTP_MSG_INDEX, HTTP_DECODER_TOPIC, NULL, http_message_free, NULL, NULL, -1, -1}, + {HTTPD_TOPIC_HTTP_TUNNEL_INDEX, HTTP_DECODER_TUNNEL_TOPIC, http_decoder_tunnel_msg_cb, http_message_free, "HTTP_DECODER_EXDATA_BASEON_HTTP_TUNNEL", httpd_ex_data_free_cb, -1, -1}, +}; + + static void http_decoder_topic_exdata_compose_init(struct http_decoder_env *httpd_env) + { + memcpy(httpd_env->topic_exdata_compose, g_topic_exdata_compose, sizeof(g_topic_exdata_compose)); + for(int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++){ + httpd_env->topic_exdata_compose[i].sub_topic_id = stellar_session_mq_get_topic_id_reliable(httpd_env->st, + httpd_env->topic_exdata_compose[i].topic_name, + httpd_env->topic_exdata_compose[i].msg_free_cb, + NULL); + assert(httpd_env->topic_exdata_compose[i].sub_topic_id >= 0); + + if(httpd_env->topic_exdata_compose[i].exdata_name){ + httpd_env->topic_exdata_compose[i].exdata_id = stellar_session_exdata_new_index(httpd_env->st, + httpd_env->topic_exdata_compose[i].exdata_name, + httpd_env->topic_exdata_compose[i].exdata_free_cb, + NULL); + assert(httpd_env->topic_exdata_compose[i].exdata_id >= 0); + } + + if(httpd_env->topic_exdata_compose[i].on_msg_cb){ + stellar_session_mq_subscribe(httpd_env->st, httpd_env->topic_exdata_compose[i].sub_topic_id, + httpd_env->topic_exdata_compose[i].on_msg_cb, httpd_env->plugin_id); + } + } + } + + int http_topic_exdata_compose_get_index(const struct http_decoder_env *httpd_env, int by_topic_id) + { + for(int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++){ + if(httpd_env->topic_exdata_compose[i].sub_topic_id == by_topic_id){ + return i; + } + } + assert(0); + return -1; + } + void *http_decoder_init(struct stellar *st) { - int httpd_msg_topic_id = -1, tcp_stream_topic_id = -1; int thread_num = 0; struct http_decoder_env *httpd_env = CALLOC(struct http_decoder_env, 1); @@ -597,27 +767,13 @@ extern "C" goto failed; } httpd_env->st = st; - httpd_env->ex_data_idx = stellar_session_exdata_new_index(st, "HTTP_DECODER", - _httpd_ex_data_free_cb, - NULL); - httpd_env->plugin_id = stellar_session_plugin_register(st, _httpd_session_ctx_new_cb, - _httpd_session_ctx_free_cb, (void *)httpd_env); + httpd_env->plugin_id = stellar_session_plugin_register(st, httpd_session_ctx_new_cb, + httpd_session_ctx_free_cb, (void *)httpd_env); if (httpd_env->plugin_id < 0) { goto failed; } - - httpd_msg_topic_id = stellar_session_mq_get_topic_id(st, HTTP_DECODER_TOPIC); - if (httpd_msg_topic_id < 0) - { - httpd_msg_topic_id = stellar_session_mq_create_topic(st, HTTP_DECODER_TOPIC, - http_message_free, NULL); - } - httpd_env->httpd_msg_topic_id = httpd_msg_topic_id; - - tcp_stream_topic_id = stellar_session_mq_get_topic_id(st, TOPIC_TCP_STREAM); - assert(tcp_stream_topic_id >= 0); - stellar_session_mq_subscribe(st, tcp_stream_topic_id, http_decoder_tcp_stream_msg_cb, httpd_env->plugin_id); + http_decoder_topic_exdata_compose_init(httpd_env); thread_num = stellar_get_worker_thread_num(st); assert(thread_num >= 1); @@ -626,9 +782,15 @@ extern "C" { goto failed; } - - printf("http_decoder_init succ: ex_data_idx:%d, plugin_id:%d, topic_id:%d\n", - httpd_env->ex_data_idx, httpd_env->plugin_id, httpd_env->httpd_msg_topic_id); + + printf("http decoder init succ, plugin id:%d \n", httpd_env->plugin_id); + for(int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++){ + printf("\ttopic_name:%s, topic_id:%d, ex_data_name:%s, exdata_id:%d\n", + httpd_env->topic_exdata_compose[i].topic_name, + httpd_env->topic_exdata_compose[i].sub_topic_id, + httpd_env->topic_exdata_compose[i].exdata_name, + httpd_env->topic_exdata_compose[i].exdata_id); + } return httpd_env; failed: @@ -662,9 +824,9 @@ extern "C" if (unlikely(NULL == msg || msg->type != HTTP_MESSAGE_REQ_LINE)) { if(line){ - line->method.str = NULL; - line->uri.str = NULL; - line->version.str = NULL; + line->method.iov_base = NULL; + line->uri.iov_base = NULL; + line->version.iov_base = NULL; } return; } @@ -683,8 +845,8 @@ extern "C" if (unlikely(NULL == msg || msg->type != HTTP_MESSAGE_RES_LINE)) { if(line){ - line->version.str = NULL; - line->status.str = NULL; + line->version.iov_base = NULL; + line->status.iov_base = NULL; } return; } @@ -697,7 +859,7 @@ extern "C" http_decoder_half_data_get_response_line(res_data, line); } - void http_message_get_header(const struct http_message *msg, const struct hstring *key, + void http_message_get_header(const struct http_message *msg, const hstring *key, struct http_header *hdr_result) { int ret = -1; @@ -720,8 +882,8 @@ extern "C" } fail: if(hdr_result){ - hdr_result->key.str = NULL; - hdr_result->val.str = NULL; + hdr_result->key.iov_base = NULL; + hdr_result->val.iov_base = NULL; } return; } @@ -750,8 +912,8 @@ extern "C" return 0; fail: if(header){ - header->key.str = NULL; - header->val.str = NULL; + header->key.iov_base = NULL; + header->val.iov_base = NULL; } return -1; } @@ -780,7 +942,7 @@ extern "C" } void http_message_get_raw_body(const struct http_message *msg, - struct hstring *body) + hstring *body) { int ret = -1; if (unlikely(NULL == msg)) @@ -803,14 +965,14 @@ extern "C" return; fail: if(body){ - body->str = NULL; - body->str_len = 0; + body->iov_base = NULL; + body->iov_len = 0; } return; } void http_message_get_decompress_body(const struct http_message *msg, - struct hstring *body) + hstring *body) { int ret = -1; if (unlikely(NULL == msg)) @@ -833,19 +995,19 @@ extern "C" return; fail: if(body){ - body->str = NULL; - body->str_len = 0; + body->iov_base = NULL; + body->iov_len = 0; } return; } - void http_message_get_url(const struct http_message *msg, struct hstring *url) + void http_message_get_url(const struct http_message *msg, hstring *url) { if (unlikely(NULL == msg)) { if(url){ - url->str = NULL; - url->str_len = 0; + url->iov_base = NULL; + url->iov_len = 0; } return; } @@ -862,8 +1024,8 @@ extern "C" fail: if(url){ - url->str = NULL; - url->str_len = 0; + url->iov_base = NULL; + url->iov_len = 0; } return; } diff --git a/src/http_decoder_half.cpp b/src/http_decoder_half.cpp index 441ff2c..6074307 100644 --- a/src/http_decoder_half.cpp +++ b/src/http_decoder_half.cpp @@ -21,7 +21,7 @@ struct http_decoder_half_data size_t decompress_body_len; int joint_url_complete; - struct hstring joint_url; // http://<host>[:<port>]/<path>?<searchpart> + hstring joint_url; // http://<host>[:<port>]/<path>?<searchpart> long long transaction_index; }; @@ -42,7 +42,7 @@ struct http_decoder_half long long trans_counter; long long err_counter; - long long transaction_seq; + long long transaction_seq; //accumulated const char *data; int data_len; @@ -77,9 +77,9 @@ http_decoder_half_data_decompress(struct http_decoder_half_data *data) return; } - struct hstring raw_body = {0}; + hstring raw_body = {0}; http_decoder_table_get_body(data->table, &raw_body); - if (raw_body.str == NULL || raw_body.str_len == 0) + if (raw_body.iov_base == NULL || raw_body.iov_len == 0) { return; } @@ -90,8 +90,8 @@ http_decoder_half_data_decompress(struct http_decoder_half_data *data) } assert(data->decompress); - if (http_content_decompress_write(data->decompress, raw_body.str, - raw_body.str_len, + if (http_content_decompress_write(data->decompress, (char *)raw_body.iov_base, + raw_body.iov_len, &data->ref_decompress_body, &data->decompress_body_len) == -1) { @@ -226,12 +226,12 @@ static int on_uri(llhttp_t *http, const char *at, size_t length) return 0; } -static void http_decoder_cached_portion_url(struct http_decoder_half *half, const struct hstring *uri_result) +static void http_decoder_cached_portion_url(struct http_decoder_half *half, const hstring *uri_result) { struct http_decoder_half_data *ref_data = half->ref_data; int uri_skip_len = 0; - if ((uri_result->str_len) > 7 && (strncasecmp("http://", uri_result->str, 7) == 0)) // absolute URI + if ((uri_result->iov_len) > 7 && (strncasecmp("http://", (char *)uri_result->iov_base, 7) == 0)) // absolute URI { uri_skip_len = strlen("http://"); ref_data->joint_url_complete = 1; @@ -241,9 +241,9 @@ static void http_decoder_cached_portion_url(struct http_decoder_half *half, cons ref_data->joint_url_complete = 0; } - ref_data->joint_url.str_len = uri_result->str_len - uri_skip_len; - ref_data->joint_url.str = MEMPOOL_CALLOC(half->http_ev_ctx->ref_mempool, char, ref_data->joint_url.str_len); - memcpy(ref_data->joint_url.str, uri_result->str + uri_skip_len, ref_data->joint_url.str_len); + ref_data->joint_url.iov_len = uri_result->iov_len - uri_skip_len; + ref_data->joint_url.iov_base = MEMPOOL_CALLOC(half->http_ev_ctx->ref_mempool, char, ref_data->joint_url.iov_len); + memcpy(ref_data->joint_url.iov_base, (char *)uri_result->iov_base + uri_skip_len, ref_data->joint_url.iov_len); } /* Information-only callbacks, return value is ignored */ @@ -261,9 +261,9 @@ static int on_uri_complete(llhttp_t *http) http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_URI); - struct hstring uri_result = {}; + hstring uri_result = {}; http_decoder_table_get_uri(half->ref_data->table, &uri_result); - assert(uri_result.str); + assert(uri_result.iov_base); http_decoder_cached_portion_url(half, &uri_result); return 0; @@ -408,17 +408,17 @@ static int on_header_value_complete(llhttp_t *http) if (half->ref_data->content_encoding == HTTP_CONTENT_ENCODING_NONE) { struct http_header http_hdr = {0}; - struct hstring key = {.str = (char *)"Content-Encoding", .str_len = 16}; + hstring key = {.iov_base = (char *)"Content-Encoding", .iov_len = 16}; if (http_decoder_table_get_header(half->ref_data->table, &key, &http_hdr) == 0) { char encoding_str[MAX_ENCODING_STR_LEN + 1] = {0}; - size_t str_len = http_hdr.val.str_len; - if (str_len > MAX_ENCODING_STR_LEN) + size_t iov_len = http_hdr.val.iov_len; + if (iov_len > MAX_ENCODING_STR_LEN) { - str_len = MAX_ENCODING_STR_LEN; + iov_len = MAX_ENCODING_STR_LEN; } - memcpy(encoding_str, http_hdr.val.str, str_len); + memcpy(encoding_str, http_hdr.val.iov_base, iov_len); half->ref_data->content_encoding = http_content_encoding_str2int(encoding_str); } } @@ -539,8 +539,7 @@ static int on_body(llhttp_t *http, const char *at, size_t length) return 0; } -static void http_decoder_half_init(struct http_decoder_half *half, - http_event_cb *http_ev_cb, enum llhttp_type type) +static void http_decoder_half_init(struct http_decoder_half *half, http_event_cb *http_ev_cb, enum llhttp_type type) { llhttp_settings_init(&half->settings); llhttp_init(&half->parser, type, &half->settings); @@ -579,8 +578,9 @@ static void http_decoder_half_init(struct http_decoder_half *half, half->ref_data = NULL; } -struct http_decoder_half * http_decoder_half_new(nmx_pool_t *mempool, http_event_cb *ev_cb, enum llhttp_type http_type, - int decompress_switch, struct http_decoder_env *httpd_env) +struct http_decoder_half * http_decoder_half_new(struct http_decoder_exdata *hd_ctx, nmx_pool_t *mempool, + http_event_cb *ev_cb, enum llhttp_type http_type, + int decompress_switch, struct http_decoder_env *httpd_env, long long start_seq) { struct http_decoder_half *half = MEMPOOL_CALLOC(mempool, struct http_decoder_half, 1); assert(half); @@ -588,8 +588,9 @@ struct http_decoder_half * http_decoder_half_new(nmx_pool_t *mempool, http_event half->decompress_switch = decompress_switch; half->http_ev_ctx = MEMPOOL_CALLOC(mempool, struct http_event_context, 1); http_decoder_half_init(half, ev_cb, http_type); - + half->http_ev_ctx->ref_httpd_ctx = hd_ctx; half->httpd_env = httpd_env; + half->transaction_seq = start_seq; return half; } @@ -609,7 +610,7 @@ void http_decoder_half_free(nmx_pool_t *mempool, struct http_decoder_half *half) MEMPOOL_FREE(mempool, half); } -void http_decoder_half_reinit(struct http_decoder_half *half, int topic_id, +void http_decoder_half_reinit(struct http_decoder_half *half, struct http_decoder_result_queue *queue, nmx_pool_t *mempool, struct session *sess) { @@ -618,8 +619,6 @@ void http_decoder_half_reinit(struct http_decoder_half *half, int topic_id, { http_decoder_table_reinit(half->ref_data->table); } - - half->http_ev_ctx->topic_id = topic_id; half->http_ev_ctx->ref_mempool = mempool; half->http_ev_ctx->ref_session = sess; half->http_ev_ctx->ref_queue = queue; @@ -810,10 +809,10 @@ void http_decoder_half_data_free(nmx_pool_t *mempool, struct http_decoder_half_d data->decompress = NULL; } - if (data->joint_url.str) + if (data->joint_url.iov_base) { - MEMPOOL_FREE(mempool, data->joint_url.str); - data->joint_url.str = NULL; + MEMPOOL_FREE(mempool, data->joint_url.iov_base); + data->joint_url.iov_base = NULL; data->joint_url_complete = 0; } MEMPOOL_FREE(mempool, data); @@ -846,7 +845,7 @@ int http_decoder_half_data_get_response_line(struct http_decoder_half_data *data } int http_decoder_half_data_get_header(const struct http_decoder_half_data *data, - const struct hstring *key, + const hstring *key, struct http_header *hdr_result) { return http_decoder_table_get_header(data->table, key, hdr_result); @@ -877,7 +876,7 @@ int http_decoder_half_data_has_parsed_header(struct http_decoder_half_data *data } int http_decoder_half_data_get_raw_body(const struct http_decoder_half_data *data, - struct hstring *body) + hstring *body) { if (NULL == data || NULL == body) { @@ -887,15 +886,15 @@ int http_decoder_half_data_get_raw_body(const struct http_decoder_half_data *dat } int http_decoder_half_data_get_decompress_body(const struct http_decoder_half_data *data, - struct hstring *body) + hstring *body) { if (HTTP_CONTENT_ENCODING_NONE == data->content_encoding) { return http_decoder_table_get_body(data->table, body); } - body->str = data->ref_decompress_body; - body->str_len = data->decompress_body_len; + body->iov_base = data->ref_decompress_body; + body->iov_len = data->decompress_body_len; return 0; } @@ -923,17 +922,17 @@ static void using_session_addr_as_host(struct session *ref_session, char ip_string_buf[INET6_ADDRSTRLEN]; if (SESSION_ADDR_TYPE_IPV4_TCP == ssaddr_type || SESSION_ADDR_TYPE_IPV4_UDP == ssaddr_type) { - host_result->val.str = MEMPOOL_CALLOC(mempool, char, (INET_ADDRSTRLEN + 7) /* "ip:port" max length */); + host_result->val.iov_base = MEMPOOL_CALLOC(mempool, char, (INET_ADDRSTRLEN + 7) /* "ip:port" max length */); inet_ntop(AF_INET, &ssaddr->ipv4.daddr, ip_string_buf, INET_ADDRSTRLEN); - sprintf(host_result->val.str, "%s:%u", ip_string_buf, ntohs(ssaddr->ipv4.dport)); - host_result->val.str_len = strlen(host_result->val.str); + sprintf((char *)host_result->val.iov_base, "%s:%u", ip_string_buf, ntohs(ssaddr->ipv4.dport)); + host_result->val.iov_len = strlen((char *)host_result->val.iov_base); } else if (SESSION_ADDR_TYPE_IPV6_TCP == ssaddr_type || SESSION_ADDR_TYPE_IPV6_UDP == ssaddr_type) { - host_result->val.str = MEMPOOL_CALLOC(mempool, char, (INET6_ADDRSTRLEN + 7) /* "ip:port" max length */); + host_result->val.iov_base = MEMPOOL_CALLOC(mempool, char, (INET6_ADDRSTRLEN + 7) /* "ip:port" max length */); inet_ntop(AF_INET6, &ssaddr->ipv6.daddr, ip_string_buf, INET6_ADDRSTRLEN); - sprintf(host_result->val.str, "%s:%u", ip_string_buf, ntohs(ssaddr->ipv6.dport)); - host_result->val.str_len = strlen(host_result->val.str); + sprintf((char *)host_result->val.iov_base, "%s:%u", ip_string_buf, ntohs(ssaddr->ipv6.dport)); + host_result->val.iov_len = strlen((char *)host_result->val.iov_base); } else { @@ -944,30 +943,43 @@ static void using_session_addr_as_host(struct session *ref_session, void http_decoder_join_url(struct http_decoder_half_data *hfdata, nmx_pool_t *mempool, const struct http_header *host_hdr) { int append_slash_len = 0; - if ('/' != hfdata->joint_url.str[0]) + if ('/' != ((char *)hfdata->joint_url.iov_base)[0]) { append_slash_len = 1; } - int url_cache_str_len = host_hdr->val.str_len + hfdata->joint_url.str_len + append_slash_len; + int url_cache_str_len = host_hdr->val.iov_len + hfdata->joint_url.iov_len + append_slash_len; char *url_cache_str = MEMPOOL_CALLOC(mempool, char, url_cache_str_len); char *ptr = url_cache_str; - memcpy(ptr, host_hdr->val.str, host_hdr->val.str_len); - ptr += host_hdr->val.str_len; + memcpy(ptr, host_hdr->val.iov_base, host_hdr->val.iov_len); + ptr += host_hdr->val.iov_len; if (append_slash_len) { *ptr = '/'; ptr++; } - memcpy(ptr, hfdata->joint_url.str, hfdata->joint_url.str_len); + memcpy(ptr, hfdata->joint_url.iov_base, hfdata->joint_url.iov_len); - MEMPOOL_FREE(mempool, hfdata->joint_url.str); // free the cached uri buffer - hfdata->joint_url.str = url_cache_str; - hfdata->joint_url.str_len = url_cache_str_len; + MEMPOOL_FREE(mempool, hfdata->joint_url.iov_base); // free the cached uri buffer + hfdata->joint_url.iov_base = url_cache_str; + hfdata->joint_url.iov_len = url_cache_str_len; hfdata->joint_url_complete = 1; } +void http_decoder_get_url(struct http_decoder_half_data *hfdata, nmx_pool_t *mempool) +{ + struct http_request_line reqline = {}; + http_decoder_half_data_get_request_line(hfdata, &reqline); + if(unlikely(strncasecmp_safe("CONNECT", (char *)reqline.method.iov_base, 7, reqline.method.iov_len) == 0)) + { + hfdata->joint_url.iov_base = MEMPOOL_CALLOC(mempool, char, reqline.uri.iov_len+1); + memcpy(hfdata->joint_url.iov_base, reqline.uri.iov_base, reqline.uri.iov_len); + hfdata->joint_url.iov_len = reqline.uri.iov_len; + hfdata->joint_url_complete = 1; + } +} + int http_decoder_join_url_finally(struct http_event_context *ev_ctx, struct http_decoder_half_data *hfdata, nmx_pool_t *mempool) @@ -981,14 +993,14 @@ int http_decoder_join_url_finally(struct http_event_context *ev_ctx, using_session_addr_as_host(ev_ctx->ref_session, &addr_as_host, mempool); http_decoder_join_url(hfdata, mempool, &addr_as_host); - MEMPOOL_FREE(mempool, addr_as_host.val.str); // free session addr to host buffer + MEMPOOL_FREE(mempool, addr_as_host.val.iov_base); // free session addr to host buffer return 1; } void http_decoder_get_host_feed_url(struct http_decoder_half *half) { struct http_header host_result = {}; - struct hstring host_key = {(char *)"Host", 4}; + hstring host_key = {(char *)"Host", 4}; const char *host_refer_str = NULL; int host_refer_len = 0; @@ -1007,14 +1019,14 @@ void http_decoder_get_host_feed_url(struct http_decoder_half *half) http_decoder_join_url(half->ref_data, half->http_ev_ctx->ref_mempool, &host_result); } -int http_half_data_get_url(struct http_decoder_half_data *res_data, struct hstring *url) +int http_half_data_get_url(struct http_decoder_half_data *res_data, hstring *url) { if (0 == res_data->joint_url_complete) { return -1; } - url->str = res_data->joint_url.str; - url->str_len = res_data->joint_url.str_len; + url->iov_base = res_data->joint_url.iov_base; + url->iov_len = res_data->joint_url.iov_len; return 0; } @@ -1033,24 +1045,20 @@ int http_half_data_get_total_parsed_header_count(struct http_decoder_half_data * return http_decoder_table_get_total_parsed_header(half_data->table); } -void http_half_pre_context_free(struct session *sess, struct http_decoder_env *httpd_env, - struct http_decoder_exdata *ex_data) +void http_half_pre_context_free(struct session *sess, struct http_decoder_exdata *exdata) { - if (NULL == ex_data) - { - return; - } struct http_message *msg = NULL; struct http_decoder_half_data *req_data; struct http_decoder_half_data *res_data; - struct http_decoder_result_queue *queue = ex_data->queue; + struct http_decoder_result_queue *queue = exdata->queue; + for(int i = 0; i < queue->queue_size; i++){ req_data = queue->array[i].req_data; res_data = queue->array[i].res_data; if ((req_data != NULL) && (NULL == res_data) && (req_data->state < HTTP_EVENT_REQ_END)) { msg = http_message_new(HTTP_TRANSACTION_FREE, queue, i, HTTP_REQUEST); - session_mq_publish_message(sess, httpd_env->httpd_msg_topic_id, msg); + session_mq_publish_message(sess, exdata->pub_topic_id, msg); } } @@ -1059,7 +1067,7 @@ void http_half_pre_context_free(struct session *sess, struct http_decoder_env *h if ((res_data != NULL) && (res_data->state < HTTP_EVENT_RES_END)) { msg = http_message_new(HTTP_TRANSACTION_FREE, queue, i, HTTP_RESPONSE); - session_mq_publish_message(sess, httpd_env->httpd_msg_topic_id, msg); + session_mq_publish_message(sess, exdata->pub_topic_id, msg); } } } @@ -1067,4 +1075,11 @@ void http_half_pre_context_free(struct session *sess, struct http_decoder_env *h void http_half_update_state(struct http_decoder_half_data *hf_data, enum http_event state) { hf_data->state = state; +} + +void http_half_get_max_transaction_seq(struct http_decoder_exdata *exdata, long long *max_req_seq, long long *max_res_seq) +{ + assert(exdata && max_req_seq && max_res_seq); + *max_req_seq = exdata->decoder->c2s_half->transaction_seq; + *max_res_seq = exdata->decoder->s2c_half->transaction_seq; }
\ No newline at end of file diff --git a/src/http_decoder_half.h b/src/http_decoder_half.h index e918e6e..5183171 100644 --- a/src/http_decoder_half.h +++ b/src/http_decoder_half.h @@ -30,7 +30,7 @@ enum http_event { }; struct http_event_context { - int topic_id; + struct http_decoder_exdata *ref_httpd_ctx; nmx_pool_t *ref_mempool; struct session *ref_session; struct http_decoder_result_queue *ref_queue; @@ -43,12 +43,12 @@ typedef void http_event_cb(enum http_event event, struct http_decoder_half_data struct http_event_context *ev_ctx, void *httpd_plugin_env); struct http_decoder_half * -http_decoder_half_new(nmx_pool_t *mempool, http_event_cb *event_cb, enum llhttp_type http_type, - int decompress_switch, struct http_decoder_env *httpd_env); +http_decoder_half_new(struct http_decoder_exdata *hd_ctx, nmx_pool_t *mempool, http_event_cb *event_cb, + enum llhttp_type http_type, int decompress_switch, struct http_decoder_env *httpd_env,long long start_seq); void http_decoder_half_free(nmx_pool_t *mempool, struct http_decoder_half *half); -void http_decoder_half_reinit(struct http_decoder_half *half, int topic_id, +void http_decoder_half_reinit(struct http_decoder_half *half, struct http_decoder_result_queue *queue, nmx_pool_t *mempool, struct session *sess); @@ -69,32 +69,33 @@ int http_decoder_half_data_get_response_line(struct http_decoder_half_data *data struct http_response_line *line); int http_decoder_half_data_get_header(const struct http_decoder_half_data *data, - const struct hstring *key, struct http_header *hdr_res); + const hstring *key, struct http_header *hdr_res); int http_decoder_half_data_iter_header(struct http_decoder_half_data *data, struct http_header *header); int http_decoder_half_data_reset_header_iter(struct http_decoder_half_data *req_data); int http_decoder_half_data_has_parsed_header(struct http_decoder_half_data *data); -int http_decoder_half_data_get_raw_body(const struct http_decoder_half_data *data, struct hstring *body); +int http_decoder_half_data_get_raw_body(const struct http_decoder_half_data *data, hstring *body); -int http_decoder_half_data_get_decompress_body(const struct http_decoder_half_data *data, struct hstring *body); +int http_decoder_half_data_get_decompress_body(const struct http_decoder_half_data *data, hstring *body); void http_decoder_half_data_dump(struct http_decoder_half *half); void http_decoder_get_host_feed_url(struct http_decoder_half *half); +void http_decoder_get_url(struct http_decoder_half_data *hfdata, nmx_pool_t *mempool); void http_decoder_join_url(struct http_decoder_half_data *hfdata, nmx_pool_t *mempool, const struct http_header *host_hdr); int http_decoder_join_url_finally(struct http_event_context *ev_ctx, struct http_decoder_half_data *hfdata, nmx_pool_t *mempool); -int http_half_data_get_url(struct http_decoder_half_data *res_data, struct hstring *url); +int http_half_data_get_url(struct http_decoder_half_data *res_data, hstring *url); int http_half_data_get_transaction_seq(struct http_decoder_half_data *hf_data); void http_half_data_update_commit_index(struct http_decoder_half_data * half_data); -void http_half_pre_context_free(struct session *sess, struct http_decoder_env *httpd_env, - struct http_decoder_exdata *ex_data); +void http_half_pre_context_free(struct session *sess, struct http_decoder_exdata *exdata); void http_half_update_state(struct http_decoder_half_data *hf_data, enum http_event state); -int http_half_data_get_total_parsed_header_count(struct http_decoder_half_data * half_data); +int http_half_data_get_total_parsed_header_count(struct http_decoder_half_data * half_data); +void http_half_get_max_transaction_seq(struct http_decoder_exdata *exdata, long long *max_req_seq, long long *max_res_seq); #endif
\ No newline at end of file diff --git a/src/http_decoder_inc.h b/src/http_decoder_inc.h index 760eaba..0e0f0c5 100644 --- a/src/http_decoder_inc.h +++ b/src/http_decoder_inc.h @@ -31,6 +31,7 @@ extern "C" #include "http_decoder_result_queue.h" #include "http_decoder_utils.h" #include "http_decoder_stat.h" +#include "http_decoder_tunnel.h" #include "fieldstat/fieldstat_easy.h" #include "toml/toml.h" @@ -82,6 +83,7 @@ struct http_message enum http_message_type type; size_t queue_index; struct http_decoder_result_queue *ref_queue; + hstring tunnel_payload; }; struct http_decoder @@ -90,19 +92,45 @@ struct http_decoder struct http_decoder_half *s2c_half; }; +enum httpd_topic_index{ + HTTPD_TOPIC_TCP_STREAM_INDEX = 0, + HTTPD_TOPIC_HTTP_MSG_INDEX, + HTTPD_TOPIC_HTTP_TUNNEL_INDEX, + HTTPD_TOPIC_INDEX_MAX, +}; + struct http_decoder_exdata { + int sub_topic_id; //tcp_stream + int pub_topic_id; //http message or http tunnel msg struct http_decoder_result_queue *queue; struct http_decoder *decoder; nmx_pool_t *mempool; + enum http_tunnel_state tunnel_state; + int in_tunnel_is_http; +}; + +// struct http_decoder_context{ +// int array_size; +// struct http_decoder_exdata **exdata_array; //raw tcp stream for http msg; http tunnel for inner http transaction. +// }; + +struct http_topic_exdata_compose{ + enum httpd_topic_index index; + const char *topic_name; + on_session_msg_cb_func *on_msg_cb; + session_msg_free_cb_func *msg_free_cb; + const char *exdata_name; + session_exdata_free *exdata_free_cb; + int sub_topic_id; //as consumer + int exdata_id; }; struct http_decoder_env { - int plugin_id; - int httpd_msg_topic_id; - int ex_data_idx; struct stellar *st; + int plugin_id; + struct http_topic_exdata_compose topic_exdata_compose[HTTPD_TOPIC_INDEX_MAX]; struct http_decoder_config hd_cfg; struct http_decoder_stat hd_stat; }; @@ -111,7 +139,7 @@ struct http_message; struct http_message *http_message_new(enum http_message_type type, struct http_decoder_result_queue *queue, int queue_index, unsigned char flow_type); - +int http_topic_exdata_compose_get_index(const struct http_decoder_env *httpd_env, int by_topic_id); #ifdef __cplusplus } #endif diff --git a/src/http_decoder_stat.cpp b/src/http_decoder_stat.cpp index dfd1a2e..a2b6205 100644 --- a/src/http_decoder_stat.cpp +++ b/src/http_decoder_stat.cpp @@ -1,5 +1,6 @@ #include <assert.h> #include <stdio.h> +#include <pthread.h> #include <unistd.h> #include "http_decoder_inc.h" @@ -26,6 +27,12 @@ static const struct hd_stat_config_tuple g_httpd_stat_tuple[HTTPD_STAT_MAX] = void http_decoder_stat_free(struct http_decoder_stat *hd_stat) { + pthread_cancel(hd_stat->timer_pid); + void *join_res = NULL; + do{ + pthread_join(hd_stat->timer_pid, &join_res); + }while(join_res != PTHREAD_CANCELED); + if(hd_stat->stats != NULL){ free(hd_stat->stats); } @@ -34,6 +41,19 @@ void http_decoder_stat_free(struct http_decoder_stat *hd_stat) } } +static void *httpd_stat_timer_thread(void *arg) +{ + pthread_setname_np(pthread_self(), "http_decoder_timer_thread"); + struct http_decoder_stat *hd_stat = (struct http_decoder_stat *)arg; + struct timespec res; + while(1){ + clock_gettime(CLOCK_MONOTONIC, &res); + hd_stat->current_time_ms = (res.tv_sec * 1000) + (res.tv_nsec / 1000000); + usleep(800); + } + return NULL; +} + int http_decoder_stat_init(struct http_decoder_stat *hd_stat, int thread_max, int stat_interval_pkts, int stat_interval_time) { assert(sizeof(g_httpd_stat_tuple)/sizeof(struct hd_stat_config_tuple) == HTTPD_STAT_MAX); @@ -64,6 +84,8 @@ int http_decoder_stat_init(struct http_decoder_stat *hd_stat, int thread_max, in return -1; } + pthread_create(&hd_stat->timer_pid, NULL, httpd_stat_timer_thread, hd_stat); + return 0; } @@ -73,15 +95,16 @@ void http_decoder_stat_update(struct http_decoder_stat *hd_stat, int thread_id, assert(thread_id >= 0); assert(type < HTTPD_STAT_MAX); - hd_stat->stats[thread_id].counter[type] += value; - hd_stat->stats[thread_id].batch++; + struct hd_statistics *cur_hds = &hd_stat->stats[thread_id]; - if(hd_stat->stats[thread_id].batch >= hd_stat->stat_interval_pkts){ - for(int i = 0; i < HTTPD_STAT_MAX; i++){ - //update all type, maybe decrease performance ? - fieldstat_easy_counter_incrby(hd_stat->fse, thread_id, hd_stat->field_stat_id[i], NULL, 0, hd_stat->stats[thread_id].counter[i]); - hd_stat->stats[thread_id].counter[i] = 0; - } - hd_stat->stats[thread_id].batch = 0; + cur_hds->counter[type] += value; + cur_hds->batch[type]++; + + if(cur_hds->batch[type] >= hd_stat->stat_interval_pkts + || cur_hds->time_ms[type] + 1000 < hd_stat->current_time_ms){ + fieldstat_easy_counter_incrby(hd_stat->fse, thread_id, hd_stat->field_stat_id[type], NULL, 0, cur_hds->counter[type]); + cur_hds->counter[type] = 0; + cur_hds->batch[type] = 0; + cur_hds->time_ms[type] = hd_stat->current_time_ms; } -} +}
\ No newline at end of file diff --git a/src/http_decoder_stat.h b/src/http_decoder_stat.h index 235475d..e8f18d8 100644 --- a/src/http_decoder_stat.h +++ b/src/http_decoder_stat.h @@ -32,25 +32,19 @@ struct hd_stat_config_tuple struct hd_statistics { - // long long incoming_bytes; - // long long incoming_tcp_seg; - // long long session_new; - // long long session_free; - // long long transaction_new; - // long long transaction_free; - // long long incoming_trans; - // long long err_pkts; - + long long time_ms[HTTPD_STAT_MAX]; long long counter[HTTPD_STAT_MAX]; - int batch; //call fieldstat_easy_counter_incrby() per batch + int batch[HTTPD_STAT_MAX]; //call fieldstat_easy_counter_incrby() per batch }__attribute__ ((aligned (64))); struct http_decoder_stat { + pthread_t timer_pid; + long long current_time_ms; struct fieldstat_easy *fse; int stat_interval_pkts; // call fieldstat_incrby every stat_interval_pkts int field_stat_id[HTTPD_STAT_MAX]; - struct hd_statistics *stats; //multi thread + struct hd_statistics *stats; //size is thread number }; int http_decoder_stat_init(struct http_decoder_stat *hd_stat, int thread_max, int stat_interval_pkts, int stat_interval_time); diff --git a/src/http_decoder_string.cpp b/src/http_decoder_string.cpp index e10b4a0..c74030c 100644 --- a/src/http_decoder_string.cpp +++ b/src/http_decoder_string.cpp @@ -35,8 +35,8 @@ void http_decoder_string_refer(struct http_decoder_string *rstr, switch (rstr->state) { case STRING_STATE_INIT: case STRING_STATE_CACHE: - rstr->refer.str = (char *)at; - rstr->refer.str_len = length; + rstr->refer.iov_base = (char *)at; + rstr->refer.iov_len = length; break; default: abort(); @@ -48,60 +48,60 @@ void http_decoder_string_refer(struct http_decoder_string *rstr, static void string_refer2cache(struct http_decoder_string *rstr) { - if (0 == rstr->refer.str_len) { + if (0 == rstr->refer.iov_len) { return; } - if (rstr->cache.str_len >= rstr->max_cache_size) { + if (rstr->cache.iov_len >= rstr->max_cache_size) { return; } - size_t length = rstr->cache.str_len + rstr->refer.str_len; + size_t length = rstr->cache.iov_len + rstr->refer.iov_len; if (length > rstr->max_cache_size) { length = rstr->max_cache_size; } - if (NULL == rstr->cache.str) { - rstr->cache.str = CALLOC(char, length + 1); - memcpy(rstr->cache.str, rstr->refer.str, length); + if (NULL == rstr->cache.iov_base) { + rstr->cache.iov_base = CALLOC(char, length + 1); + memcpy(rstr->cache.iov_base, rstr->refer.iov_base, length); } else { - rstr->cache.str = REALLOC(char, rstr->cache.str, length + 1); - memcpy(rstr->cache.str + rstr->cache.str_len, rstr->refer.str, - (length - rstr->cache.str_len)); + rstr->cache.iov_base = REALLOC(char, rstr->cache.iov_base, length + 1); + memcpy((char *)rstr->cache.iov_base + rstr->cache.iov_len, rstr->refer.iov_base, + (length - rstr->cache.iov_len)); } - rstr->cache.str_len = length; - rstr->refer.str = NULL; - rstr->refer.str_len = 0; + rstr->cache.iov_len = length; + rstr->refer.iov_base = NULL; + rstr->refer.iov_len = 0; } static void string_commit2cache(struct http_decoder_string *rstr) { - if (rstr->cache.str_len == rstr->commit.str_len && - rstr->cache.str == rstr->commit.str) { - rstr->commit.str = NULL; - rstr->commit.str_len = 0; + if (rstr->cache.iov_len == rstr->commit.iov_len && + rstr->cache.iov_base == rstr->commit.iov_base) { + rstr->commit.iov_base = NULL; + rstr->commit.iov_len = 0; return; } //Only http header key need to backward to cache size_t length = 0; - if (rstr->commit.str_len > rstr->max_cache_size) { + if (rstr->commit.iov_len > rstr->max_cache_size) { length = rstr->max_cache_size; } else { - length = rstr->commit.str_len; + length = rstr->commit.iov_len; } if (length > 0) { - if (NULL == rstr->cache.str) { - rstr->cache.str = CALLOC(char, length + 1); + if (NULL == rstr->cache.iov_base) { + rstr->cache.iov_base = CALLOC(char, length + 1); } else { abort(); } - memcpy(rstr->cache.str, rstr->commit.str, length); - rstr->cache.str_len = length; + memcpy(rstr->cache.iov_base, rstr->commit.iov_base, length); + rstr->cache.iov_len = length; - rstr->commit.str = NULL; - rstr->commit.str_len = 0; + rstr->commit.iov_base = NULL; + rstr->commit.iov_len = 0; } } @@ -136,24 +136,24 @@ void http_decoder_string_commit(struct http_decoder_string *rstr) switch (rstr->state) { case STRING_STATE_REFER: - if (rstr->cache.str_len) { + if (rstr->cache.iov_len) { http_decoder_string_cache(rstr); - rstr->commit.str = rstr->cache.str; - rstr->commit.str_len = rstr->cache.str_len; - // not overwrite rstr->cache.str + rstr->commit.iov_base = rstr->cache.iov_base; + rstr->commit.iov_len = rstr->cache.iov_len; + // not overwrite rstr->cache.iov_base } else { - rstr->commit.str = rstr->refer.str; - rstr->commit.str_len = rstr->refer.str_len; + rstr->commit.iov_base = rstr->refer.iov_base; + rstr->commit.iov_len = rstr->refer.iov_len; - rstr->refer.str = NULL; - rstr->refer.str_len = 0; + rstr->refer.iov_base = NULL; + rstr->refer.iov_len = 0; } break; case STRING_STATE_CACHE: - rstr->commit.str = rstr->cache.str; - rstr->commit.str_len = rstr->cache.str_len; - // not overwrite rstr->cache.str + rstr->commit.iov_base = rstr->cache.iov_base; + rstr->commit.iov_len = rstr->cache.iov_len; + // not overwrite rstr->cache.iov_base break; default: //abort(); @@ -172,7 +172,7 @@ void http_decoder_string_reset(struct http_decoder_string *rstr) case STRING_STATE_REFER: case STRING_STATE_CACHE: case STRING_STATE_COMMIT: - FREE(rstr->cache.str); + FREE(rstr->cache.iov_base); memset(rstr, 0, sizeof(struct http_decoder_string)); break; default: @@ -197,20 +197,20 @@ void http_decoder_string_reinit(struct http_decoder_string *rstr) } if (rstr->state == STRING_STATE_COMMIT && - rstr->cache.str == rstr->commit.str && - rstr->cache.str_len == rstr->commit.str_len) { + rstr->cache.iov_base == rstr->commit.iov_base && + rstr->cache.iov_len == rstr->commit.iov_len) { return; } - if (rstr->cache.str != NULL) { - FREE(rstr->cache.str); - rstr->cache.str_len = 0; + if (rstr->cache.iov_base != NULL) { + FREE(rstr->cache.iov_base); + rstr->cache.iov_len = 0; } - rstr->refer.str = NULL; - rstr->refer.str_len = 0; - rstr->commit.str = NULL; - rstr->commit.str_len = 0; + rstr->refer.iov_base = NULL; + rstr->refer.iov_len = 0; + rstr->commit.iov_base = NULL; + rstr->commit.iov_len = 0; rstr->state = STRING_STATE_INIT; } @@ -219,18 +219,18 @@ enum string_state http_decoder_string_state(const struct http_decoder_string *rs return rstr->state; } -int http_decoder_string_get(const struct http_decoder_string *rstr, struct hstring *out) +int http_decoder_string_get(const struct http_decoder_string *rstr, hstring *out) { if (NULL == rstr || NULL == out) { return -1; } if (http_decoder_string_state(rstr) == STRING_STATE_COMMIT) { - out->str = rstr->commit.str; - out->str_len = rstr->commit.str_len; + out->iov_base = rstr->commit.iov_base; + out->iov_len = rstr->commit.iov_len; } else { - out->str = NULL; - out->str_len = 0; + out->iov_base = NULL; + out->iov_len = 0; } return 0; @@ -242,15 +242,15 @@ void http_decoder_string_dump(struct http_decoder_string *rstr, const char *desc return; } - char *refer_str = safe_dup(rstr->refer.str, rstr->refer.str_len); - char *cache_str = safe_dup(rstr->cache.str, rstr->cache.str_len); - char *commit_str = safe_dup(rstr->commit.str, rstr->commit.str_len); + char *refer_str = safe_dup((char *)rstr->refer.iov_base, rstr->refer.iov_len); + char *cache_str = safe_dup((char *)rstr->cache.iov_base, rstr->cache.iov_len); + char *commit_str = safe_dup((char *)rstr->commit.iov_base, rstr->commit.iov_len); - printf("%s: state: %s, refer: {len: %02zu, str: %s}, cache: {len: %02zu, str: %s}, commit: {len: %02zu, str: %s}\n", + printf("%s: state: %s, refer: {len: %02zu, iov_base: %s}, cache: {len: %02zu, iov_base: %s}, commit: {len: %02zu, iov_base: %s}\n", desc, string_state_to_desc(rstr->state), - rstr->refer.str_len, refer_str, - rstr->cache.str_len, cache_str, - rstr->commit.str_len, commit_str); + rstr->refer.iov_len, refer_str, + rstr->cache.iov_len, cache_str, + rstr->commit.iov_len, commit_str); FREE(refer_str); FREE(cache_str); diff --git a/src/http_decoder_string.h b/src/http_decoder_string.h index 4c95960..9fe82e1 100644 --- a/src/http_decoder_string.h +++ b/src/http_decoder_string.h @@ -44,9 +44,9 @@ enum string_state { //http decoder string struct http_decoder_string { - struct hstring refer; // shallow copy - struct hstring cache; // deep copy - struct hstring commit; + hstring refer; // shallow copy + hstring cache; // deep copy + hstring commit; enum string_state state; size_t max_cache_size; @@ -68,7 +68,7 @@ void http_decoder_string_reinit(struct http_decoder_string *rstr); enum string_state http_decoder_string_state(const struct http_decoder_string *rstr); -int http_decoder_string_get(const struct http_decoder_string *rstr, struct hstring *out); +int http_decoder_string_get(const struct http_decoder_string *rstr, hstring *out); void http_decoder_string_dump(struct http_decoder_string *rstr, const char *desc); #endif
\ No newline at end of file diff --git a/src/http_decoder_table.cpp b/src/http_decoder_table.cpp index d1fc6e9..0ed6313 100644 --- a/src/http_decoder_table.cpp +++ b/src/http_decoder_table.cpp @@ -76,30 +76,30 @@ void http_decoder_table_free(struct http_decoder_table *table) if (NULL == table) { return; } - if (table->uri.cache.str != NULL) { - FREE(table->uri.cache.str); + if (table->uri.cache.iov_base != NULL) { + FREE(table->uri.cache.iov_base); } - if (table->status.cache.str != NULL) { - FREE(table->status.cache.str); + if (table->status.cache.iov_base != NULL) { + FREE(table->status.cache.iov_base); } - if (table->method.cache.str != NULL) { - FREE(table->method.cache.str); + if (table->method.cache.iov_base != NULL) { + FREE(table->method.cache.iov_base); } - if (table->version.cache.str != NULL) { - FREE(table->version.cache.str); + if (table->version.cache.iov_base != NULL) { + FREE(table->version.cache.iov_base); } - if (table->body.cache.str != NULL) { - FREE(table->body.cache.str); + if (table->body.cache.iov_base != NULL) { + FREE(table->body.cache.iov_base); } if (table->headers != NULL) { for (size_t i = 0; i < table->header_cnt; i++) { - if (table->headers[i].key.cache.str != NULL) { - FREE(table->headers[i].key.cache.str); + if (table->headers[i].key.cache.iov_base != NULL) { + FREE(table->headers[i].key.cache.iov_base); } - if (table->headers[i].val.cache.str != NULL) { - FREE(table->headers[i].val.cache.str); + if (table->headers[i].val.cache.iov_base != NULL) { + FREE(table->headers[i].val.cache.iov_base); } } @@ -381,7 +381,7 @@ void http_decoder_table_dump(struct http_decoder_table *table) } } -int http_decoder_table_get_uri(const struct http_decoder_table *table, struct hstring *out) +int http_decoder_table_get_uri(const struct http_decoder_table *table, hstring *out) { if (NULL == table || NULL == out) { return -1; @@ -389,7 +389,7 @@ int http_decoder_table_get_uri(const struct http_decoder_table *table, struct hs return http_decoder_string_get(&table->uri, out); } -int http_decoder_table_get_method(const struct http_decoder_table *table, struct hstring *out) +int http_decoder_table_get_method(const struct http_decoder_table *table, hstring *out) { if (NULL == table || NULL == out) { return -1; @@ -397,7 +397,7 @@ int http_decoder_table_get_method(const struct http_decoder_table *table, struct return http_decoder_string_get(&table->method, out); } -int http_decoder_table_get_status(const struct http_decoder_table *table, struct hstring *out) +int http_decoder_table_get_status(const struct http_decoder_table *table, hstring *out) { if (NULL == table || NULL == out) { return -1; @@ -405,7 +405,7 @@ int http_decoder_table_get_status(const struct http_decoder_table *table, struct return http_decoder_string_get(&table->status, out); } -int http_decoder_table_get_version(const struct http_decoder_table *table, struct hstring *out) +int http_decoder_table_get_version(const struct http_decoder_table *table, hstring *out) { if (NULL == table || NULL == out) { return -1; @@ -413,7 +413,7 @@ int http_decoder_table_get_version(const struct http_decoder_table *table, struc return http_decoder_string_get(&table->version, out); } -int http_decoder_table_get_body(const struct http_decoder_table *table, struct hstring *out) +int http_decoder_table_get_body(const struct http_decoder_table *table, hstring *out) { if (NULL == table || NULL == out) { return -1; @@ -421,22 +421,22 @@ int http_decoder_table_get_body(const struct http_decoder_table *table, struct h return http_decoder_string_get(&table->body, out); } -int http_decoder_table_get_header(const struct http_decoder_table *table, const struct hstring *key, +int http_decoder_table_get_header(const struct http_decoder_table *table, const hstring *key, struct http_header *hdr_result) { for (size_t i = 0; i < table->header_cnt; i++) { const struct http_decoder_header *tmp_header = &table->headers[i]; - if (tmp_header->key.commit.str_len != key->str_len) { + if (tmp_header->key.commit.iov_len != key->iov_len) { continue; } if (http_decoder_string_state(&tmp_header->key) == STRING_STATE_COMMIT && http_decoder_string_state(&tmp_header->val) == STRING_STATE_COMMIT) { - struct hstring tmp_key; + hstring tmp_key; http_decoder_string_get(&tmp_header->key, &tmp_key); - if (tmp_key.str_len == key->str_len && - (0 == strncasecmp(tmp_key.str, key->str, key->str_len))) { + if (tmp_key.iov_len == key->iov_len && + (0 == strncasecmp((char *)tmp_key.iov_base, (char *)key->iov_base, key->iov_len))) { http_decoder_string_get(&tmp_header->key, &hdr_result->key); http_decoder_string_get(&tmp_header->val, &hdr_result->val); return 0; @@ -468,10 +468,10 @@ int http_decoder_table_iter_header(struct http_decoder_table *table, } } - hdr->key.str = NULL; - hdr->key.str_len = 0; - hdr->val.str = NULL; - hdr->val.str_len = 0; + hdr->key.iov_base = NULL; + hdr->key.iov_len = 0; + hdr->val.iov_base = NULL; + hdr->val.iov_len = 0; return -1; } diff --git a/src/http_decoder_table.h b/src/http_decoder_table.h index 1272d3a..4c7792a 100644 --- a/src/http_decoder_table.h +++ b/src/http_decoder_table.h @@ -36,18 +36,18 @@ void http_decoder_table_reinit(struct http_decoder_table *table); void http_decoder_table_dump(struct http_decoder_table *table); -int http_decoder_table_get_uri(const struct http_decoder_table *table, struct hstring *out); +int http_decoder_table_get_uri(const struct http_decoder_table *table, hstring *out); -int http_decoder_table_get_method(const struct http_decoder_table *table, struct hstring *out); +int http_decoder_table_get_method(const struct http_decoder_table *table, hstring *out); -int http_decoder_table_get_status(const struct http_decoder_table *table, struct hstring *out); +int http_decoder_table_get_status(const struct http_decoder_table *table, hstring *out); -int http_decoder_table_get_version(const struct http_decoder_table *table, struct hstring *out); +int http_decoder_table_get_version(const struct http_decoder_table *table, hstring *out); -int http_decoder_table_get_body(const struct http_decoder_table *table, struct hstring *out); +int http_decoder_table_get_body(const struct http_decoder_table *table, hstring *out); int http_decoder_table_get_header(const struct http_decoder_table *table, - const struct hstring *key, + const hstring *key, struct http_header *hdr_res); int http_decoder_table_iter_header(struct http_decoder_table *table, diff --git a/src/http_decoder_tunnel.cpp b/src/http_decoder_tunnel.cpp new file mode 100644 index 0000000..67445f3 --- /dev/null +++ b/src/http_decoder_tunnel.cpp @@ -0,0 +1,99 @@ +#include <assert.h> +#include <stdio.h> +#include <string.h> +#include <strings.h> +#include <unistd.h> +#include "http_decoder_inc.h" +#include "llhttp.h" + +struct http_tunnel_message +{ + enum http_tunnel_message_type type; + hstring tunnel_payload; +}; + + +int httpd_tunnel_identify(int curdir, struct http_decoder_half_data *hfdata) +{ + if(PACKET_DIRECTION_C2S == curdir){ + struct http_request_line reqline = {}; + http_decoder_half_data_get_request_line(hfdata, &reqline); + if(0 == strncasecmp_safe("CONNECT", (char *)reqline.method.iov_base, + 7, reqline.method.iov_len)){ + return 1; + } + }else{ + struct http_response_line resline = {}; + http_decoder_half_data_get_response_line(hfdata, &resline); + if(resline.status_code == HTTP_STATUS_OK + && 0 == strncasecmp_safe("Connection established", (char *)resline.status.iov_base, + strlen("Connection established"), resline.status.iov_len)){ + return 1; + } + } + + return 0; +} + +int httpd_is_tunnel_session(const struct http_decoder_exdata *ex_data) +{ + return (ex_data->tunnel_state != HTTP_TUN_NON); +} + +int httpd_in_tunnel_transmitting(struct http_decoder_exdata *ex_data) +{ + return (ex_data->tunnel_state >= HTTP_TUN_INNER_STARTING); +} + +enum http_tunnel_message_type httpd_tunnel_state_to_msg(const struct http_decoder_exdata *ex_data) +{ + if(ex_data->tunnel_state == HTTP_TUN_INNER_STARTING){ + return HTTP_TUNNEL_OPENING; + } + if(ex_data->tunnel_state == HTTP_TUN_INNER_TRANS){ + return HTTP_TUNNEL_ACTIVE; + } + return HTTP_TUNNEL_MSG_MAX; +} + +void httpd_tunnel_state_update(struct http_decoder_exdata *ex_data) +{ + if(ex_data->tunnel_state == HTTP_TUN_INNER_STARTING){ + ex_data->tunnel_state = HTTP_TUN_INNER_TRANS; + } +} + +void http_decoder_push_tunnel_data(struct session *sess, const struct http_decoder_exdata *exdata, enum http_tunnel_message_type type) +{ + struct http_tunnel_message *tmsg = (struct http_tunnel_message *)CALLOC(struct http_tunnel_message, 1); + tmsg->type = type; + size_t payload_len; + const char *payload = session_get0_current_payload(sess, &payload_len); + tmsg->tunnel_payload.iov_base = (char *)payload; + tmsg->tunnel_payload.iov_len = payload_len; + session_mq_publish_message(sess, exdata->pub_topic_id, tmsg); +} + +#ifdef __cplusplus +extern "C" +{ +#endif +void http_tunnel_message_get_payload(const struct http_tunnel_message *tmsg, + hstring *tunnel_payload) +{ + if (unlikely(NULL == tmsg || tunnel_payload == NULL)) + { + return; + } + tunnel_payload->iov_base = tmsg->tunnel_payload.iov_base; + tunnel_payload->iov_len = tmsg->tunnel_payload.iov_len; +} + +enum http_tunnel_message_type http_tunnel_message_type_get(const struct http_tunnel_message *tmsg) +{ + return tmsg->type; +} + +#ifdef __cplusplus +} +#endif diff --git a/src/http_decoder_tunnel.h b/src/http_decoder_tunnel.h new file mode 100644 index 0000000..b1e74df --- /dev/null +++ b/src/http_decoder_tunnel.h @@ -0,0 +1,21 @@ +#pragma once + +#include "http_decoder_half.h" +#include "llhttp.h" + +enum http_tunnel_state{ + HTTP_TUN_NON = 0, // init, or not tunnel session + HTTP_TUN_C2S_HDR_START, //CONNECT ... + HTTP_TUN_C2S_HDR_END, //CONNECT request all heades end with \r\n + HTTP_TUN_S2C_START, // HTTP 200 connet established + // HTTP_TUN_S2C_END, // http response all heades end with \r\n + HTTP_TUN_INNER_STARTING, // http inner tunnel protocol starting + HTTP_TUN_INNER_TRANS, // http inner tunnel protocol transmitting +}; + +int httpd_tunnel_identify(int curdir, struct http_decoder_half_data *hfdata); +int httpd_is_tunnel_session(const struct http_decoder_exdata *ex_data); +int httpd_in_tunnel_transmitting(struct http_decoder_exdata *ex_data); +void httpd_tunnel_state_update(struct http_decoder_exdata *ex_data); +void http_decoder_push_tunnel_data(struct session *sess, const struct http_decoder_exdata *exdata, enum http_tunnel_message_type type); +enum http_tunnel_message_type httpd_tunnel_state_to_msg(const struct http_decoder_exdata *ex_data);
\ No newline at end of file diff --git a/src/http_decoder_utils.cpp b/src/http_decoder_utils.cpp index 5686e2d..6d71bdb 100644 --- a/src/http_decoder_utils.cpp +++ b/src/http_decoder_utils.cpp @@ -12,6 +12,17 @@ char *safe_dup(const char *str, size_t len) return dup; } +int strncasecmp_safe(const char *fix_s1, const char *dyn_s2, size_t fix_n1, size_t dyn_n2) +{ + if (fix_s1 == NULL || dyn_s2 == NULL) { + return -1; + } + if(fix_n1 != dyn_n2){ + return -1; + } + return strncasecmp(fix_s1, dyn_s2, fix_n1); +} + const char *http_message_type_to_string(enum http_message_type type) { const char *sname = "unknown_msg_type"; @@ -134,4 +145,13 @@ int http_event_is_req(enum http_event event) break; } return -1; +} + +int stellar_session_mq_get_topic_id_reliable(struct stellar *st, const char *topic_name, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg) +{ + int topic_id = stellar_session_mq_get_topic_id(st, topic_name); + if(topic_id < 0){ + topic_id = stellar_session_mq_create_topic(st, topic_name, msg_free_cb, msg_free_arg); + } + return topic_id; }
\ No newline at end of file diff --git a/src/http_decoder_utils.h b/src/http_decoder_utils.h index 0661641..5b09d50 100644 --- a/src/http_decoder_utils.h +++ b/src/http_decoder_utils.h @@ -5,9 +5,11 @@ #include <stdio.h> char *safe_dup(const char *str, size_t len); +int strncasecmp_safe(const char *fix_s1, const char *dyn_s2, size_t fix_n1, size_t dyn_n2); const char *http_message_type_to_string(enum http_message_type type); int http_message_type_is_req(struct session *sess, enum http_message_type msg_type); int http_event_is_req(enum http_event event); +int stellar_session_mq_get_topic_id_reliable(struct stellar *st, const char *topic_name, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg); /****************************************************************************** * Logger ******************************************************************************/ diff --git a/src/version.map b/src/version.map index be433c2..15d1d95 100644 --- a/src/version.map +++ b/src/version.map @@ -5,6 +5,7 @@ global: http_decoder_init; http_decoder_exit; http_decoder_tcp_stream_msg_cb; + http_tunnel_message_*; }; local: *; };
\ No newline at end of file |
