diff options
| author | lijia <[email protected]> | 2024-06-18 16:45:35 +0800 |
|---|---|---|
| committer | lijia <[email protected]> | 2024-06-20 18:51:47 +0800 |
| commit | 05e8c9db6912dc95de9691e9b90e549a4c3beffe (patch) | |
| tree | ed5d4b3392bdd577986d26ac8d5c6da21f9c2b2a /src/http_decoder.cpp | |
| parent | 7d6170a23027aff0ebf2e7832dc11e4bbdce57ea (diff) | |
feat: TSG-20446, support http tunnel with CONNECT method.
Diffstat (limited to 'src/http_decoder.cpp')
| -rw-r--r-- | src/http_decoder.cpp | 408 |
1 files changed, 285 insertions, 123 deletions
diff --git a/src/http_decoder.cpp b/src/http_decoder.cpp index 8aa39e9..3af4dad 100644 --- a/src/http_decoder.cpp +++ b/src/http_decoder.cpp @@ -1,3 +1,4 @@ +#include "http_decoder.h" #include <assert.h> #include <stdio.h> #include <string.h> @@ -36,6 +37,7 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d struct http_decoder_half_data *half_data = NULL; int ret = 0; u_int8_t flow_flag = 0; + struct http_decoder_exdata *exdata = ev_ctx->ref_httpd_ctx; int thread_id = stellar_get_current_thread_id(httpd_env->st); @@ -74,44 +76,53 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d } *data = half_data; queue_idx = http_decoder_result_queue_req_index(queue); //get the index after inc -#if 1 /* llhttp always call on_message_begin() even if llhttp_execute() error!!! */ + /* llhttp always call on_message_begin() even if llhttp_execute() error!!! */ msg = http_message_new(HTTP_TRANSACTION_NEW, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_NEW, 1); -#endif break; case HTTP_EVENT_REQ_LINE: -#if 0 - msg = http_message_new(HTTP_TRANSACTION_NEW, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_NEW, 1); -#endif msg = http_message_new(HTTP_MESSAGE_REQ_LINE, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); + if(httpd_tunnel_identify(PACKET_DIRECTION_C2S, half_data)){ + exdata->tunnel_state = HTTP_TUN_C2S_HDR_START; + } + http_decoder_get_url(half_data, mempool); break; case HTTP_EVENT_REQ_HDR: msg = http_message_new(HTTP_MESSAGE_REQ_HEADER, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); break; case HTTP_EVENT_REQ_HDR_END: { - http_decoder_join_url_finally(ev_ctx, http_decoder_result_queue_peek_req(queue), mempool); + http_decoder_join_url_finally(ev_ctx, half_data, mempool); /* maybe some parsed headers in buffer, but has not pushed to plugins yet */ - half_data = http_decoder_result_queue_peek_req(queue); + if(http_decoder_half_data_has_parsed_header(half_data)){ msg = http_message_new(HTTP_MESSAGE_REQ_HEADER, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); } http_half_data_update_commit_index(half_data); msg = http_message_new(HTTP_MESSAGE_REQ_HEADER_END, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); int tot_c2s_headers = http_half_data_get_total_parsed_header_count(half_data); http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_HEADERS_C2S, tot_c2s_headers); - struct hstring tmp_url = {}; + hstring tmp_url = {}; http_half_data_get_url(half_data, &tmp_url); - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_URL_BYTES, tmp_url.str_len); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_URL_BYTES, tmp_url.iov_len); + + if(httpd_is_tunnel_session(exdata)){ + session_is_symmetric(ev_ctx->ref_session, &flow_flag); + if(SESSION_SEEN_C2S_FLOW == flow_flag){ + exdata->tunnel_state = HTTP_TUN_INNER_STARTING; + http_half_pre_context_free(ev_ctx->ref_session, exdata); + exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].sub_topic_id; + }else{ + exdata->tunnel_state = HTTP_TUN_C2S_HDR_END; + } + } } break; case HTTP_EVENT_REQ_BODY_BEGIN: @@ -119,18 +130,18 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d break; case HTTP_EVENT_REQ_BODY_DATA: msg = http_message_new(HTTP_MESSAGE_REQ_BODY, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); break; case HTTP_EVENT_REQ_BODY_END: msg = http_message_new(HTTP_MESSAGE_REQ_BODY_END, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); break; case HTTP_EVENT_REQ_END: { session_is_symmetric(ev_ctx->ref_session, &flow_flag); if(SESSION_SEEN_C2S_FLOW == flow_flag){ msg = http_message_new(HTTP_TRANSACTION_FREE, queue, queue_idx, HTTP_REQUEST); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_FREE, 1); http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_ASYMMETRY_TRANSACTION_C2S, 1); } @@ -173,17 +184,23 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d if(0 == session_is_symmetric(ev_ctx->ref_session, &flow_flag)){ if(SESSION_SEEN_S2C_FLOW == flow_flag){ msg = http_message_new(HTTP_TRANSACTION_NEW, queue, queue_idx, HTTP_RESPONSE); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); } } break; case HTTP_EVENT_RES_LINE: msg = http_message_new(HTTP_MESSAGE_RES_LINE, queue, queue_idx, HTTP_RESPONSE); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); + if(httpd_tunnel_identify(PACKET_DIRECTION_S2C, half_data)){ + exdata->tunnel_state = HTTP_TUN_S2C_START; + }else{ + //connect response fail, reset tunnel_state + exdata->tunnel_state = HTTP_TUN_NON; + } break; case HTTP_EVENT_RES_HDR: msg = http_message_new(HTTP_MESSAGE_RES_HEADER, queue, queue_idx, HTTP_RESPONSE); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); break; case HTTP_EVENT_RES_HDR_END: { @@ -191,29 +208,36 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d half_data = http_decoder_result_queue_peek_res(queue); if(http_decoder_half_data_has_parsed_header(half_data)){ msg = http_message_new(HTTP_MESSAGE_RES_HEADER, queue, queue_idx, HTTP_RESPONSE); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); } http_half_data_update_commit_index(half_data); msg = http_message_new(HTTP_MESSAGE_RES_HEADER_END, queue, queue_idx, HTTP_RESPONSE); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); int tot_s2c_headers = http_half_data_get_total_parsed_header_count(half_data); http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_HEADERS_S2C, tot_s2c_headers); + + if(httpd_is_tunnel_session(exdata)){ + exdata->tunnel_state = HTTP_TUN_INNER_STARTING; + http_half_pre_context_free(ev_ctx->ref_session, exdata); + exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].sub_topic_id; + // http_decoder_push_tunnel_data(ev_ctx->ref_session, exdata, HTTP_TUNNEL_OPENING); + } } break; case HTTP_EVENT_RES_BODY_BEGIN: break; case HTTP_EVENT_RES_BODY_DATA: msg = http_message_new(HTTP_MESSAGE_RES_BODY, queue, queue_idx, HTTP_RESPONSE); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); break; case HTTP_EVENT_RES_BODY_END: msg = http_message_new(HTTP_MESSAGE_RES_BODY_END, queue, queue_idx, HTTP_RESPONSE); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); break; case HTTP_EVENT_RES_END: msg = http_message_new(HTTP_TRANSACTION_FREE, queue, queue_idx, HTTP_RESPONSE); - session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg); + session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg); http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_FREE, 1); session_is_symmetric(ev_ctx->ref_session, &flow_flag); if(SESSION_SEEN_S2C_FLOW == flow_flag){ @@ -237,12 +261,14 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d } } -static struct http_decoder *http_decoder_new(nmx_pool_t *mempool, http_event_cb *ev_cb, int decompress_switch, struct http_decoder_env *httpd_env) +static struct http_decoder *http_decoder_new(struct http_decoder_exdata *hd_ctx, nmx_pool_t *mempool, + http_event_cb *ev_cb, int decompress_switch, struct http_decoder_env *httpd_env, + long long req_start_seq, long long res_start_seq) { struct http_decoder *decoder = MEMPOOL_CALLOC(mempool, struct http_decoder, 1); assert(decoder); - decoder->c2s_half = http_decoder_half_new(mempool, ev_cb, HTTP_REQUEST, decompress_switch, httpd_env); - decoder->s2c_half = http_decoder_half_new(mempool, ev_cb, HTTP_RESPONSE, decompress_switch, httpd_env); + decoder->c2s_half = http_decoder_half_new(hd_ctx, mempool, ev_cb, HTTP_REQUEST, decompress_switch, httpd_env, req_start_seq); + decoder->s2c_half = http_decoder_half_new(hd_ctx, mempool, ev_cb, HTTP_RESPONSE, decompress_switch, httpd_env, res_start_seq); return decoder; } @@ -266,13 +292,15 @@ static void http_decoder_free(nmx_pool_t *mempool, struct http_decoder *decoder) } static struct http_decoder_exdata *http_decoder_exdata_new(size_t mempool_size, size_t queue_size, - int decompress_switch, struct http_decoder_env *httpd_env) + int decompress_switch, struct http_decoder_env *httpd_env, + long long req_start_seq, long long res_start_seq) { - struct http_decoder_exdata *ex_data = CALLOC(struct http_decoder_exdata, 1); - ex_data->mempool = nmx_create_pool(mempool_size); - ex_data->decoder = http_decoder_new(ex_data->mempool, http_event_handler, decompress_switch, httpd_env); - ex_data->queue = http_decoder_result_queue_new(ex_data->mempool, queue_size); - return ex_data; + struct http_decoder_exdata *hd_ctx = CALLOC(struct http_decoder_exdata, 1); + hd_ctx->mempool = nmx_create_pool(mempool_size); + hd_ctx->decoder = http_decoder_new(hd_ctx, hd_ctx->mempool, http_event_handler, decompress_switch, + httpd_env, req_start_seq, res_start_seq); + hd_ctx->queue = http_decoder_result_queue_new(hd_ctx->mempool, queue_size); + return hd_ctx; } static void http_decoder_exdata_free(struct http_decoder_exdata *ex_data) @@ -291,8 +319,9 @@ static void http_decoder_exdata_free(struct http_decoder_exdata *ex_data) http_decoder_result_queue_free(ex_data->mempool, ex_data->queue); ex_data->queue = NULL; } - - nmx_destroy_pool(ex_data->mempool); + if(ex_data->mempool){ + nmx_destroy_pool(ex_data->mempool); + } FREE(ex_data); } @@ -310,7 +339,7 @@ static int http_protocol_identify(const char *data, size_t data_len) { return -1; } - return 0; + return 1; } static void _http_decoder_context_free(struct http_decoder_env *env) @@ -327,10 +356,10 @@ static void _http_decoder_context_free(struct http_decoder_env *env) http_decoder_stat_free(&env->hd_stat); - if (env->httpd_msg_topic_id >= 0) - { - stellar_session_mq_destroy_topic(env->st, env->httpd_msg_topic_id); - env->httpd_msg_topic_id = -1; + for(int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++){ + if(env->topic_exdata_compose[i].msg_free_cb){ + stellar_session_mq_destroy_topic(env->st, env->topic_exdata_compose[i].sub_topic_id); + } } FREE(env); @@ -412,7 +441,7 @@ static int load_http_decoder_config(const char *cfg_path, return ret; } -static int http_msg_get_request_header(const struct http_message *msg, const struct hstring *key, +static int http_msg_get_request_header(const struct http_message *msg, const hstring *key, struct http_header *hdr_result) { const struct http_decoder_half_data *req_data = @@ -420,7 +449,7 @@ static int http_msg_get_request_header(const struct http_message *msg, const str return http_decoder_half_data_get_header(req_data, key, hdr_result); } -static int http_msg_get_response_header(const struct http_message *msg, const struct hstring *key, +static int http_msg_get_response_header(const struct http_message *msg, const hstring *key, struct http_header *hdr_result) { const struct http_decoder_half_data *res_data = @@ -445,7 +474,7 @@ static int http_msg_response_header_next(const struct http_message *msg, } static int http_msg_get_request_raw_body(const struct http_message *msg, - struct hstring *body) + hstring *body) { const struct http_decoder_half_data *req_data = msg->ref_queue->array[msg->queue_index].req_data; @@ -453,7 +482,7 @@ static int http_msg_get_request_raw_body(const struct http_message *msg, } static int http_msg_get_response_raw_body(const struct http_message *msg, - struct hstring *body) + hstring *body) { const struct http_decoder_half_data *res_data = msg->ref_queue->array[msg->queue_index].res_data; @@ -461,7 +490,7 @@ static int http_msg_get_response_raw_body(const struct http_message *msg, } static int http_msg_get_request_decompress_body(const struct http_message *msg, - struct hstring *body) + hstring *body) { const struct http_decoder_half_data *req_data = msg->ref_queue->array[msg->queue_index].req_data; @@ -469,19 +498,32 @@ static int http_msg_get_request_decompress_body(const struct http_message *msg, } static int http_msg_get_response_decompress_body(const struct http_message *msg, - struct hstring *body) + hstring *body) { const struct http_decoder_half_data *res_data = msg->ref_queue->array[msg->queue_index].res_data; return http_decoder_half_data_get_decompress_body(res_data, body); } +static struct http_decoder_exdata *httpd_session_exdata_new(struct session *sess, struct http_decoder_env *httpd_env, + long long req_start_seq, long long res_start_seq) +{ + struct http_decoder_exdata *exdata = http_decoder_exdata_new(httpd_env->hd_cfg.mempool_size, + httpd_env->hd_cfg.result_queue_len, + httpd_env->hd_cfg.decompress_switch, + httpd_env,req_start_seq,res_start_seq); + // exdata->sub_topic_id = sub_topic_id; + int thread_id = stellar_get_current_thread_id(httpd_env->st); + http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_SESSION_NEW, 1); + return exdata; +} + #ifdef __cplusplus extern "C" { #endif - void _httpd_ex_data_free_cb(struct session *s, int idx, void *ex_data, void *arg) + void httpd_ex_data_free_cb(struct session *s, int idx, void *ex_data, void *arg) { if (NULL == ex_data) { @@ -491,7 +533,7 @@ extern "C" http_decoder_exdata_free(exdata); } - void *_httpd_session_ctx_new_cb(struct session *sess, void *plugin_env) + void *httpd_session_ctx_new_cb(struct session *sess, void *plugin_env) { // If not http, ignore this session size_t payload_len; @@ -504,31 +546,23 @@ extern "C" if (ret < 0) { stellar_session_plugin_dettach_current_session(sess); - return (void *)"not_http_session"; + return (void *)"__not_http_session__"; } } - struct http_decoder_exdata *ex_data = http_decoder_exdata_new(httpd_env->hd_cfg.mempool_size, - httpd_env->hd_cfg.result_queue_len, - httpd_env->hd_cfg.decompress_switch, - httpd_env); - session_exdata_set(sess, httpd_env->ex_data_idx, ex_data); - int thread_id = stellar_get_current_thread_id(httpd_env->st); - http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_SESSION_NEW, 1); - - return (void *)"fake_http_decoder_ctx"; // http decoder not use ctx, use exdata only! + return (void *)"__fake_http_decoder_ctx__"; } - void _httpd_session_ctx_free_cb(struct session *sess, void *session_ctx, void *plugin_env) + void httpd_session_ctx_free_cb(struct session *sess, void *session_ctx, void *plugin_env) { if(NULL == plugin_env || NULL == session_ctx){ return; } - if(strncmp((const char *)session_ctx, "not_http_session", strlen("not_http_session")) == 0){ + if(strncmp((const char *)session_ctx, "__not_http_session__", strlen("__not_http_session__")) == 0){ return; } struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env; - int thread_id = stellar_get_current_thread_id(httpd_env->st); + int thread_id = session_get_current_thread_id(sess); unsigned char flow_flag = 0; session_is_symmetric(sess, &flow_flag); @@ -541,53 +575,189 @@ extern "C" } } - void http_decoder_tcp_stream_msg_cb(struct session *sess, int topic_id, const void *msg, void *no_use_ctx, void *plugin_env) + static void http_decoder_execute(struct session *sess, struct http_decoder_env *httpd_env, http_decoder_exdata *exdata) { - struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env; - struct http_decoder_exdata *ex_data = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->ex_data_idx); size_t payload_len; - - if(SESSION_STATE_CLOSING == session_get_current_state(sess)){ - http_half_pre_context_free(sess, httpd_env, ex_data); - return; - } - const char *payload = session_get0_current_payload(sess, &payload_len); - if (unlikely(0 == payload_len || NULL == ex_data)) + if (unlikely(0 == payload_len || NULL == payload)) { return; } - int thread_id = stellar_get_current_thread_id(httpd_env->st); - struct http_decoder_half *cur_half = NULL; + if(httpd_in_tunnel_transmitting(exdata)){ + http_decoder_push_tunnel_data(sess, exdata, httpd_tunnel_state_to_msg(exdata)); + httpd_tunnel_state_update(exdata); + return; + } - if (PACKET_DIRECTION_C2S == packet_get_direction(session_get0_current_packet(sess))) + int thread_id = session_get_current_thread_id(sess); + struct http_decoder_half *cur_half = NULL; + int sess_dir = packet_get_direction(session_get0_current_packet(sess)); + if (PACKET_DIRECTION_C2S == sess_dir) { - cur_half = ex_data->decoder->c2s_half; + cur_half = exdata->decoder->c2s_half; http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_BYTES_C2S, payload_len); http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TCP_SEG_C2S, 1); } else { - cur_half = ex_data->decoder->s2c_half; + cur_half = exdata->decoder->s2c_half; http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_BYTES_S2C, payload_len); http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TCP_SEG_S2C, 1); } - http_decoder_half_reinit(cur_half, httpd_env->httpd_msg_topic_id, ex_data->queue, - ex_data->mempool, sess); + http_decoder_half_reinit(cur_half, exdata->queue, exdata->mempool, sess); int ret = http_decoder_half_parse(cur_half, payload, payload_len); if (ret < 0) { http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_PARSE_ERR, 1); stellar_session_plugin_dettach_current_session(sess); } + } + + void http_decoder_tunnel_msg_cb(struct session *sess, int topic_id, const void *tmsg, void *per_session_ctx, void *plugin_env) + { + struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env; + struct http_decoder_exdata *exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].exdata_id); + enum http_tunnel_message_type tmsg_type = http_tunnel_message_type_get((const struct http_tunnel_message *)tmsg); + switch (tmsg_type) + { + case HTTP_TUNNEL_OPENING: + { + if(NULL != exdata){ + //not support nested http tunnel + session_mq_ignore_message(sess, topic_id, httpd_env->plugin_id); + return; + } + size_t payload_len; + const char *payload = session_get0_current_payload(sess, &payload_len); + size_t http_identify_len = payload_len > HTTP_IDENTIFY_LEN ? HTTP_IDENTIFY_LEN : payload_len; + int is_http = http_protocol_identify(payload, http_identify_len); + if(is_http){ + long long max_req_seq = 0, max_res_seq = 0; + struct http_decoder_exdata *tcp_stream_exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id); + http_half_get_max_transaction_seq(tcp_stream_exdata, &max_req_seq, &max_res_seq); + exdata = httpd_session_exdata_new(sess, httpd_env, max_req_seq, max_res_seq); + session_exdata_set(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].exdata_id, exdata); + exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_MSG_INDEX].sub_topic_id; + exdata->in_tunnel_is_http = 1; + }else{ + exdata = CALLOC(struct http_decoder_exdata, 1); + exdata->decoder = NULL; + exdata->pub_topic_id = -1; + exdata->in_tunnel_is_http = 0; + session_exdata_set(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].exdata_id, exdata); + //do nothing, but can't call stellar_session_plugin_dettach_current_session() !!! + return; + } + } + break; + + case HTTP_TUNNEL_ACTIVE: + break; + + case HTTP_TUNNEL_CLOSING: + if(exdata->in_tunnel_is_http){ + http_half_pre_context_free(sess, exdata); + } + return; + break; + + default: + break; + } + if(exdata->in_tunnel_is_http){ + http_decoder_execute(sess, httpd_env, exdata); + } + return; + } + + void http_decoder_tcp_stream_msg_cb(struct session *sess, int topic_id, const void *msg, void *nouse_session_ctx, void *plugin_env) + { + struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env; + enum session_state sess_state = session_get_current_state(sess); + struct http_decoder_exdata *exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id); + + switch(sess_state){ + case SESSION_STATE_OPENING: + { + exdata = httpd_session_exdata_new(sess, httpd_env, 0, 0); + exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_MSG_INDEX].sub_topic_id; + session_exdata_set(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id, exdata); + //go on + } + break; + + case SESSION_STATE_ACTIVE: + //go on + break; + + case SESSION_STATE_CLOSING: + { + if(httpd_in_tunnel_transmitting(exdata)){ + http_decoder_push_tunnel_data(sess, exdata, HTTP_TUNNEL_CLOSING); + }else{ + http_half_pre_context_free(sess, exdata); + } + return; + } + break; + + default: + return; + break; + } + + http_decoder_execute(sess, httpd_env, exdata); + return; } +static const struct http_topic_exdata_compose g_topic_exdata_compose[HTTPD_TOPIC_INDEX_MAX] = +{ + {HTTPD_TOPIC_TCP_STREAM_INDEX, TOPIC_TCP_STREAM, http_decoder_tcp_stream_msg_cb, NULL, "HTTP_DECODER_EXDATA_BASEON_TCP_STREAM", httpd_ex_data_free_cb, -1, -1}, + {HTTPD_TOPIC_HTTP_MSG_INDEX, HTTP_DECODER_TOPIC, NULL, http_message_free, NULL, NULL, -1, -1}, + {HTTPD_TOPIC_HTTP_TUNNEL_INDEX, HTTP_DECODER_TUNNEL_TOPIC, http_decoder_tunnel_msg_cb, http_message_free, "HTTP_DECODER_EXDATA_BASEON_HTTP_TUNNEL", httpd_ex_data_free_cb, -1, -1}, +}; + + static void http_decoder_topic_exdata_compose_init(struct http_decoder_env *httpd_env) + { + memcpy(httpd_env->topic_exdata_compose, g_topic_exdata_compose, sizeof(g_topic_exdata_compose)); + for(int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++){ + httpd_env->topic_exdata_compose[i].sub_topic_id = stellar_session_mq_get_topic_id_reliable(httpd_env->st, + httpd_env->topic_exdata_compose[i].topic_name, + httpd_env->topic_exdata_compose[i].msg_free_cb, + NULL); + assert(httpd_env->topic_exdata_compose[i].sub_topic_id >= 0); + + if(httpd_env->topic_exdata_compose[i].exdata_name){ + httpd_env->topic_exdata_compose[i].exdata_id = stellar_session_exdata_new_index(httpd_env->st, + httpd_env->topic_exdata_compose[i].exdata_name, + httpd_env->topic_exdata_compose[i].exdata_free_cb, + NULL); + assert(httpd_env->topic_exdata_compose[i].exdata_id >= 0); + } + + if(httpd_env->topic_exdata_compose[i].on_msg_cb){ + stellar_session_mq_subscribe(httpd_env->st, httpd_env->topic_exdata_compose[i].sub_topic_id, + httpd_env->topic_exdata_compose[i].on_msg_cb, httpd_env->plugin_id); + } + } + } + + int http_topic_exdata_compose_get_index(const struct http_decoder_env *httpd_env, int by_topic_id) + { + for(int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++){ + if(httpd_env->topic_exdata_compose[i].sub_topic_id == by_topic_id){ + return i; + } + } + assert(0); + return -1; + } + void *http_decoder_init(struct stellar *st) { - int httpd_msg_topic_id = -1, tcp_stream_topic_id = -1; int thread_num = 0; struct http_decoder_env *httpd_env = CALLOC(struct http_decoder_env, 1); @@ -597,27 +767,13 @@ extern "C" goto failed; } httpd_env->st = st; - httpd_env->ex_data_idx = stellar_session_exdata_new_index(st, "HTTP_DECODER", - _httpd_ex_data_free_cb, - NULL); - httpd_env->plugin_id = stellar_session_plugin_register(st, _httpd_session_ctx_new_cb, - _httpd_session_ctx_free_cb, (void *)httpd_env); + httpd_env->plugin_id = stellar_session_plugin_register(st, httpd_session_ctx_new_cb, + httpd_session_ctx_free_cb, (void *)httpd_env); if (httpd_env->plugin_id < 0) { goto failed; } - - httpd_msg_topic_id = stellar_session_mq_get_topic_id(st, HTTP_DECODER_TOPIC); - if (httpd_msg_topic_id < 0) - { - httpd_msg_topic_id = stellar_session_mq_create_topic(st, HTTP_DECODER_TOPIC, - http_message_free, NULL); - } - httpd_env->httpd_msg_topic_id = httpd_msg_topic_id; - - tcp_stream_topic_id = stellar_session_mq_get_topic_id(st, TOPIC_TCP_STREAM); - assert(tcp_stream_topic_id >= 0); - stellar_session_mq_subscribe(st, tcp_stream_topic_id, http_decoder_tcp_stream_msg_cb, httpd_env->plugin_id); + http_decoder_topic_exdata_compose_init(httpd_env); thread_num = stellar_get_worker_thread_num(st); assert(thread_num >= 1); @@ -626,9 +782,15 @@ extern "C" { goto failed; } - - printf("http_decoder_init succ: ex_data_idx:%d, plugin_id:%d, topic_id:%d\n", - httpd_env->ex_data_idx, httpd_env->plugin_id, httpd_env->httpd_msg_topic_id); + + printf("http decoder init succ, plugin id:%d \n", httpd_env->plugin_id); + for(int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++){ + printf("\ttopic_name:%s, topic_id:%d, ex_data_name:%s, exdata_id:%d\n", + httpd_env->topic_exdata_compose[i].topic_name, + httpd_env->topic_exdata_compose[i].sub_topic_id, + httpd_env->topic_exdata_compose[i].exdata_name, + httpd_env->topic_exdata_compose[i].exdata_id); + } return httpd_env; failed: @@ -662,9 +824,9 @@ extern "C" if (unlikely(NULL == msg || msg->type != HTTP_MESSAGE_REQ_LINE)) { if(line){ - line->method.str = NULL; - line->uri.str = NULL; - line->version.str = NULL; + line->method.iov_base = NULL; + line->uri.iov_base = NULL; + line->version.iov_base = NULL; } return; } @@ -683,8 +845,8 @@ extern "C" if (unlikely(NULL == msg || msg->type != HTTP_MESSAGE_RES_LINE)) { if(line){ - line->version.str = NULL; - line->status.str = NULL; + line->version.iov_base = NULL; + line->status.iov_base = NULL; } return; } @@ -697,7 +859,7 @@ extern "C" http_decoder_half_data_get_response_line(res_data, line); } - void http_message_get_header(const struct http_message *msg, const struct hstring *key, + void http_message_get_header(const struct http_message *msg, const hstring *key, struct http_header *hdr_result) { int ret = -1; @@ -720,8 +882,8 @@ extern "C" } fail: if(hdr_result){ - hdr_result->key.str = NULL; - hdr_result->val.str = NULL; + hdr_result->key.iov_base = NULL; + hdr_result->val.iov_base = NULL; } return; } @@ -750,8 +912,8 @@ extern "C" return 0; fail: if(header){ - header->key.str = NULL; - header->val.str = NULL; + header->key.iov_base = NULL; + header->val.iov_base = NULL; } return -1; } @@ -780,7 +942,7 @@ extern "C" } void http_message_get_raw_body(const struct http_message *msg, - struct hstring *body) + hstring *body) { int ret = -1; if (unlikely(NULL == msg)) @@ -803,14 +965,14 @@ extern "C" return; fail: if(body){ - body->str = NULL; - body->str_len = 0; + body->iov_base = NULL; + body->iov_len = 0; } return; } void http_message_get_decompress_body(const struct http_message *msg, - struct hstring *body) + hstring *body) { int ret = -1; if (unlikely(NULL == msg)) @@ -833,19 +995,19 @@ extern "C" return; fail: if(body){ - body->str = NULL; - body->str_len = 0; + body->iov_base = NULL; + body->iov_len = 0; } return; } - void http_message_get_url(const struct http_message *msg, struct hstring *url) + void http_message_get_url(const struct http_message *msg, hstring *url) { if (unlikely(NULL == msg)) { if(url){ - url->str = NULL; - url->str_len = 0; + url->iov_base = NULL; + url->iov_len = 0; } return; } @@ -862,8 +1024,8 @@ extern "C" fail: if(url){ - url->str = NULL; - url->str_len = 0; + url->iov_base = NULL; + url->iov_len = 0; } return; } |
