summaryrefslogtreecommitdiff
path: root/src/http_decoder.cpp
diff options
context:
space:
mode:
authorlijia <[email protected]>2024-06-18 16:45:35 +0800
committerlijia <[email protected]>2024-06-20 18:51:47 +0800
commit05e8c9db6912dc95de9691e9b90e549a4c3beffe (patch)
treeed5d4b3392bdd577986d26ac8d5c6da21f9c2b2a /src/http_decoder.cpp
parent7d6170a23027aff0ebf2e7832dc11e4bbdce57ea (diff)
feat: TSG-20446, support http tunnel with CONNECT method.
Diffstat (limited to 'src/http_decoder.cpp')
-rw-r--r--src/http_decoder.cpp408
1 files changed, 285 insertions, 123 deletions
diff --git a/src/http_decoder.cpp b/src/http_decoder.cpp
index 8aa39e9..3af4dad 100644
--- a/src/http_decoder.cpp
+++ b/src/http_decoder.cpp
@@ -1,3 +1,4 @@
+#include "http_decoder.h"
#include <assert.h>
#include <stdio.h>
#include <string.h>
@@ -36,6 +37,7 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d
struct http_decoder_half_data *half_data = NULL;
int ret = 0;
u_int8_t flow_flag = 0;
+ struct http_decoder_exdata *exdata = ev_ctx->ref_httpd_ctx;
int thread_id = stellar_get_current_thread_id(httpd_env->st);
@@ -74,44 +76,53 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d
}
*data = half_data;
queue_idx = http_decoder_result_queue_req_index(queue); //get the index after inc
-#if 1 /* llhttp always call on_message_begin() even if llhttp_execute() error!!! */
+ /* llhttp always call on_message_begin() even if llhttp_execute() error!!! */
msg = http_message_new(HTTP_TRANSACTION_NEW, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_NEW, 1);
-#endif
break;
case HTTP_EVENT_REQ_LINE:
-#if 0
- msg = http_message_new(HTTP_TRANSACTION_NEW, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_NEW, 1);
-#endif
msg = http_message_new(HTTP_MESSAGE_REQ_LINE, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
+ if(httpd_tunnel_identify(PACKET_DIRECTION_C2S, half_data)){
+ exdata->tunnel_state = HTTP_TUN_C2S_HDR_START;
+ }
+ http_decoder_get_url(half_data, mempool);
break;
case HTTP_EVENT_REQ_HDR:
msg = http_message_new(HTTP_MESSAGE_REQ_HEADER, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
break;
case HTTP_EVENT_REQ_HDR_END:
{
- http_decoder_join_url_finally(ev_ctx, http_decoder_result_queue_peek_req(queue), mempool);
+ http_decoder_join_url_finally(ev_ctx, half_data, mempool);
/* maybe some parsed headers in buffer, but has not pushed to plugins yet */
- half_data = http_decoder_result_queue_peek_req(queue);
+
if(http_decoder_half_data_has_parsed_header(half_data)){
msg = http_message_new(HTTP_MESSAGE_REQ_HEADER, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
}
http_half_data_update_commit_index(half_data);
msg = http_message_new(HTTP_MESSAGE_REQ_HEADER_END, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
int tot_c2s_headers = http_half_data_get_total_parsed_header_count(half_data);
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_HEADERS_C2S, tot_c2s_headers);
- struct hstring tmp_url = {};
+ hstring tmp_url = {};
http_half_data_get_url(half_data, &tmp_url);
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_URL_BYTES, tmp_url.str_len);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_URL_BYTES, tmp_url.iov_len);
+
+ if(httpd_is_tunnel_session(exdata)){
+ session_is_symmetric(ev_ctx->ref_session, &flow_flag);
+ if(SESSION_SEEN_C2S_FLOW == flow_flag){
+ exdata->tunnel_state = HTTP_TUN_INNER_STARTING;
+ http_half_pre_context_free(ev_ctx->ref_session, exdata);
+ exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].sub_topic_id;
+ }else{
+ exdata->tunnel_state = HTTP_TUN_C2S_HDR_END;
+ }
+ }
}
break;
case HTTP_EVENT_REQ_BODY_BEGIN:
@@ -119,18 +130,18 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d
break;
case HTTP_EVENT_REQ_BODY_DATA:
msg = http_message_new(HTTP_MESSAGE_REQ_BODY, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
break;
case HTTP_EVENT_REQ_BODY_END:
msg = http_message_new(HTTP_MESSAGE_REQ_BODY_END, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
break;
case HTTP_EVENT_REQ_END:
{
session_is_symmetric(ev_ctx->ref_session, &flow_flag);
if(SESSION_SEEN_C2S_FLOW == flow_flag){
msg = http_message_new(HTTP_TRANSACTION_FREE, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_FREE, 1);
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_ASYMMETRY_TRANSACTION_C2S, 1);
}
@@ -173,17 +184,23 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d
if(0 == session_is_symmetric(ev_ctx->ref_session, &flow_flag)){
if(SESSION_SEEN_S2C_FLOW == flow_flag){
msg = http_message_new(HTTP_TRANSACTION_NEW, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
}
}
break;
case HTTP_EVENT_RES_LINE:
msg = http_message_new(HTTP_MESSAGE_RES_LINE, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
+ if(httpd_tunnel_identify(PACKET_DIRECTION_S2C, half_data)){
+ exdata->tunnel_state = HTTP_TUN_S2C_START;
+ }else{
+ //connect response fail, reset tunnel_state
+ exdata->tunnel_state = HTTP_TUN_NON;
+ }
break;
case HTTP_EVENT_RES_HDR:
msg = http_message_new(HTTP_MESSAGE_RES_HEADER, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
break;
case HTTP_EVENT_RES_HDR_END:
{
@@ -191,29 +208,36 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d
half_data = http_decoder_result_queue_peek_res(queue);
if(http_decoder_half_data_has_parsed_header(half_data)){
msg = http_message_new(HTTP_MESSAGE_RES_HEADER, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
}
http_half_data_update_commit_index(half_data);
msg = http_message_new(HTTP_MESSAGE_RES_HEADER_END, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
int tot_s2c_headers = http_half_data_get_total_parsed_header_count(half_data);
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_HEADERS_S2C, tot_s2c_headers);
+
+ if(httpd_is_tunnel_session(exdata)){
+ exdata->tunnel_state = HTTP_TUN_INNER_STARTING;
+ http_half_pre_context_free(ev_ctx->ref_session, exdata);
+ exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].sub_topic_id;
+ // http_decoder_push_tunnel_data(ev_ctx->ref_session, exdata, HTTP_TUNNEL_OPENING);
+ }
}
break;
case HTTP_EVENT_RES_BODY_BEGIN:
break;
case HTTP_EVENT_RES_BODY_DATA:
msg = http_message_new(HTTP_MESSAGE_RES_BODY, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
break;
case HTTP_EVENT_RES_BODY_END:
msg = http_message_new(HTTP_MESSAGE_RES_BODY_END, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
break;
case HTTP_EVENT_RES_END:
msg = http_message_new(HTTP_TRANSACTION_FREE, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_FREE, 1);
session_is_symmetric(ev_ctx->ref_session, &flow_flag);
if(SESSION_SEEN_S2C_FLOW == flow_flag){
@@ -237,12 +261,14 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d
}
}
-static struct http_decoder *http_decoder_new(nmx_pool_t *mempool, http_event_cb *ev_cb, int decompress_switch, struct http_decoder_env *httpd_env)
+static struct http_decoder *http_decoder_new(struct http_decoder_exdata *hd_ctx, nmx_pool_t *mempool,
+ http_event_cb *ev_cb, int decompress_switch, struct http_decoder_env *httpd_env,
+ long long req_start_seq, long long res_start_seq)
{
struct http_decoder *decoder = MEMPOOL_CALLOC(mempool, struct http_decoder, 1);
assert(decoder);
- decoder->c2s_half = http_decoder_half_new(mempool, ev_cb, HTTP_REQUEST, decompress_switch, httpd_env);
- decoder->s2c_half = http_decoder_half_new(mempool, ev_cb, HTTP_RESPONSE, decompress_switch, httpd_env);
+ decoder->c2s_half = http_decoder_half_new(hd_ctx, mempool, ev_cb, HTTP_REQUEST, decompress_switch, httpd_env, req_start_seq);
+ decoder->s2c_half = http_decoder_half_new(hd_ctx, mempool, ev_cb, HTTP_RESPONSE, decompress_switch, httpd_env, res_start_seq);
return decoder;
}
@@ -266,13 +292,15 @@ static void http_decoder_free(nmx_pool_t *mempool, struct http_decoder *decoder)
}
static struct http_decoder_exdata *http_decoder_exdata_new(size_t mempool_size, size_t queue_size,
- int decompress_switch, struct http_decoder_env *httpd_env)
+ int decompress_switch, struct http_decoder_env *httpd_env,
+ long long req_start_seq, long long res_start_seq)
{
- struct http_decoder_exdata *ex_data = CALLOC(struct http_decoder_exdata, 1);
- ex_data->mempool = nmx_create_pool(mempool_size);
- ex_data->decoder = http_decoder_new(ex_data->mempool, http_event_handler, decompress_switch, httpd_env);
- ex_data->queue = http_decoder_result_queue_new(ex_data->mempool, queue_size);
- return ex_data;
+ struct http_decoder_exdata *hd_ctx = CALLOC(struct http_decoder_exdata, 1);
+ hd_ctx->mempool = nmx_create_pool(mempool_size);
+ hd_ctx->decoder = http_decoder_new(hd_ctx, hd_ctx->mempool, http_event_handler, decompress_switch,
+ httpd_env, req_start_seq, res_start_seq);
+ hd_ctx->queue = http_decoder_result_queue_new(hd_ctx->mempool, queue_size);
+ return hd_ctx;
}
static void http_decoder_exdata_free(struct http_decoder_exdata *ex_data)
@@ -291,8 +319,9 @@ static void http_decoder_exdata_free(struct http_decoder_exdata *ex_data)
http_decoder_result_queue_free(ex_data->mempool, ex_data->queue);
ex_data->queue = NULL;
}
-
- nmx_destroy_pool(ex_data->mempool);
+ if(ex_data->mempool){
+ nmx_destroy_pool(ex_data->mempool);
+ }
FREE(ex_data);
}
@@ -310,7 +339,7 @@ static int http_protocol_identify(const char *data, size_t data_len)
{
return -1;
}
- return 0;
+ return 1;
}
static void _http_decoder_context_free(struct http_decoder_env *env)
@@ -327,10 +356,10 @@ static void _http_decoder_context_free(struct http_decoder_env *env)
http_decoder_stat_free(&env->hd_stat);
- if (env->httpd_msg_topic_id >= 0)
- {
- stellar_session_mq_destroy_topic(env->st, env->httpd_msg_topic_id);
- env->httpd_msg_topic_id = -1;
+ for(int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++){
+ if(env->topic_exdata_compose[i].msg_free_cb){
+ stellar_session_mq_destroy_topic(env->st, env->topic_exdata_compose[i].sub_topic_id);
+ }
}
FREE(env);
@@ -412,7 +441,7 @@ static int load_http_decoder_config(const char *cfg_path,
return ret;
}
-static int http_msg_get_request_header(const struct http_message *msg, const struct hstring *key,
+static int http_msg_get_request_header(const struct http_message *msg, const hstring *key,
struct http_header *hdr_result)
{
const struct http_decoder_half_data *req_data =
@@ -420,7 +449,7 @@ static int http_msg_get_request_header(const struct http_message *msg, const str
return http_decoder_half_data_get_header(req_data, key, hdr_result);
}
-static int http_msg_get_response_header(const struct http_message *msg, const struct hstring *key,
+static int http_msg_get_response_header(const struct http_message *msg, const hstring *key,
struct http_header *hdr_result)
{
const struct http_decoder_half_data *res_data =
@@ -445,7 +474,7 @@ static int http_msg_response_header_next(const struct http_message *msg,
}
static int http_msg_get_request_raw_body(const struct http_message *msg,
- struct hstring *body)
+ hstring *body)
{
const struct http_decoder_half_data *req_data =
msg->ref_queue->array[msg->queue_index].req_data;
@@ -453,7 +482,7 @@ static int http_msg_get_request_raw_body(const struct http_message *msg,
}
static int http_msg_get_response_raw_body(const struct http_message *msg,
- struct hstring *body)
+ hstring *body)
{
const struct http_decoder_half_data *res_data =
msg->ref_queue->array[msg->queue_index].res_data;
@@ -461,7 +490,7 @@ static int http_msg_get_response_raw_body(const struct http_message *msg,
}
static int http_msg_get_request_decompress_body(const struct http_message *msg,
- struct hstring *body)
+ hstring *body)
{
const struct http_decoder_half_data *req_data =
msg->ref_queue->array[msg->queue_index].req_data;
@@ -469,19 +498,32 @@ static int http_msg_get_request_decompress_body(const struct http_message *msg,
}
static int http_msg_get_response_decompress_body(const struct http_message *msg,
- struct hstring *body)
+ hstring *body)
{
const struct http_decoder_half_data *res_data =
msg->ref_queue->array[msg->queue_index].res_data;
return http_decoder_half_data_get_decompress_body(res_data, body);
}
+static struct http_decoder_exdata *httpd_session_exdata_new(struct session *sess, struct http_decoder_env *httpd_env,
+ long long req_start_seq, long long res_start_seq)
+{
+ struct http_decoder_exdata *exdata = http_decoder_exdata_new(httpd_env->hd_cfg.mempool_size,
+ httpd_env->hd_cfg.result_queue_len,
+ httpd_env->hd_cfg.decompress_switch,
+ httpd_env,req_start_seq,res_start_seq);
+ // exdata->sub_topic_id = sub_topic_id;
+ int thread_id = stellar_get_current_thread_id(httpd_env->st);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_SESSION_NEW, 1);
+ return exdata;
+}
+
#ifdef __cplusplus
extern "C"
{
#endif
- void _httpd_ex_data_free_cb(struct session *s, int idx, void *ex_data, void *arg)
+ void httpd_ex_data_free_cb(struct session *s, int idx, void *ex_data, void *arg)
{
if (NULL == ex_data)
{
@@ -491,7 +533,7 @@ extern "C"
http_decoder_exdata_free(exdata);
}
- void *_httpd_session_ctx_new_cb(struct session *sess, void *plugin_env)
+ void *httpd_session_ctx_new_cb(struct session *sess, void *plugin_env)
{
// If not http, ignore this session
size_t payload_len;
@@ -504,31 +546,23 @@ extern "C"
if (ret < 0)
{
stellar_session_plugin_dettach_current_session(sess);
- return (void *)"not_http_session";
+ return (void *)"__not_http_session__";
}
}
- struct http_decoder_exdata *ex_data = http_decoder_exdata_new(httpd_env->hd_cfg.mempool_size,
- httpd_env->hd_cfg.result_queue_len,
- httpd_env->hd_cfg.decompress_switch,
- httpd_env);
- session_exdata_set(sess, httpd_env->ex_data_idx, ex_data);
- int thread_id = stellar_get_current_thread_id(httpd_env->st);
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_SESSION_NEW, 1);
-
- return (void *)"fake_http_decoder_ctx"; // http decoder not use ctx, use exdata only!
+ return (void *)"__fake_http_decoder_ctx__";
}
- void _httpd_session_ctx_free_cb(struct session *sess, void *session_ctx, void *plugin_env)
+ void httpd_session_ctx_free_cb(struct session *sess, void *session_ctx, void *plugin_env)
{
if(NULL == plugin_env || NULL == session_ctx){
return;
}
- if(strncmp((const char *)session_ctx, "not_http_session", strlen("not_http_session")) == 0){
+ if(strncmp((const char *)session_ctx, "__not_http_session__", strlen("__not_http_session__")) == 0){
return;
}
struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env;
- int thread_id = stellar_get_current_thread_id(httpd_env->st);
+ int thread_id = session_get_current_thread_id(sess);
unsigned char flow_flag = 0;
session_is_symmetric(sess, &flow_flag);
@@ -541,53 +575,189 @@ extern "C"
}
}
- void http_decoder_tcp_stream_msg_cb(struct session *sess, int topic_id, const void *msg, void *no_use_ctx, void *plugin_env)
+ static void http_decoder_execute(struct session *sess, struct http_decoder_env *httpd_env, http_decoder_exdata *exdata)
{
- struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env;
- struct http_decoder_exdata *ex_data = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->ex_data_idx);
size_t payload_len;
-
- if(SESSION_STATE_CLOSING == session_get_current_state(sess)){
- http_half_pre_context_free(sess, httpd_env, ex_data);
- return;
- }
-
const char *payload = session_get0_current_payload(sess, &payload_len);
- if (unlikely(0 == payload_len || NULL == ex_data))
+ if (unlikely(0 == payload_len || NULL == payload))
{
return;
}
- int thread_id = stellar_get_current_thread_id(httpd_env->st);
- struct http_decoder_half *cur_half = NULL;
+ if(httpd_in_tunnel_transmitting(exdata)){
+ http_decoder_push_tunnel_data(sess, exdata, httpd_tunnel_state_to_msg(exdata));
+ httpd_tunnel_state_update(exdata);
+ return;
+ }
- if (PACKET_DIRECTION_C2S == packet_get_direction(session_get0_current_packet(sess)))
+ int thread_id = session_get_current_thread_id(sess);
+ struct http_decoder_half *cur_half = NULL;
+ int sess_dir = packet_get_direction(session_get0_current_packet(sess));
+ if (PACKET_DIRECTION_C2S == sess_dir)
{
- cur_half = ex_data->decoder->c2s_half;
+ cur_half = exdata->decoder->c2s_half;
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_BYTES_C2S, payload_len);
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TCP_SEG_C2S, 1);
}
else
{
- cur_half = ex_data->decoder->s2c_half;
+ cur_half = exdata->decoder->s2c_half;
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_BYTES_S2C, payload_len);
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TCP_SEG_S2C, 1);
}
- http_decoder_half_reinit(cur_half, httpd_env->httpd_msg_topic_id, ex_data->queue,
- ex_data->mempool, sess);
+ http_decoder_half_reinit(cur_half, exdata->queue, exdata->mempool, sess);
int ret = http_decoder_half_parse(cur_half, payload, payload_len);
if (ret < 0)
{
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_PARSE_ERR, 1);
stellar_session_plugin_dettach_current_session(sess);
}
+ }
+
+ void http_decoder_tunnel_msg_cb(struct session *sess, int topic_id, const void *tmsg, void *per_session_ctx, void *plugin_env)
+ {
+ struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env;
+ struct http_decoder_exdata *exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].exdata_id);
+ enum http_tunnel_message_type tmsg_type = http_tunnel_message_type_get((const struct http_tunnel_message *)tmsg);
+ switch (tmsg_type)
+ {
+ case HTTP_TUNNEL_OPENING:
+ {
+ if(NULL != exdata){
+ //not support nested http tunnel
+ session_mq_ignore_message(sess, topic_id, httpd_env->plugin_id);
+ return;
+ }
+ size_t payload_len;
+ const char *payload = session_get0_current_payload(sess, &payload_len);
+ size_t http_identify_len = payload_len > HTTP_IDENTIFY_LEN ? HTTP_IDENTIFY_LEN : payload_len;
+ int is_http = http_protocol_identify(payload, http_identify_len);
+ if(is_http){
+ long long max_req_seq = 0, max_res_seq = 0;
+ struct http_decoder_exdata *tcp_stream_exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id);
+ http_half_get_max_transaction_seq(tcp_stream_exdata, &max_req_seq, &max_res_seq);
+ exdata = httpd_session_exdata_new(sess, httpd_env, max_req_seq, max_res_seq);
+ session_exdata_set(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].exdata_id, exdata);
+ exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_MSG_INDEX].sub_topic_id;
+ exdata->in_tunnel_is_http = 1;
+ }else{
+ exdata = CALLOC(struct http_decoder_exdata, 1);
+ exdata->decoder = NULL;
+ exdata->pub_topic_id = -1;
+ exdata->in_tunnel_is_http = 0;
+ session_exdata_set(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].exdata_id, exdata);
+ //do nothing, but can't call stellar_session_plugin_dettach_current_session() !!!
+ return;
+ }
+ }
+ break;
+
+ case HTTP_TUNNEL_ACTIVE:
+ break;
+
+ case HTTP_TUNNEL_CLOSING:
+ if(exdata->in_tunnel_is_http){
+ http_half_pre_context_free(sess, exdata);
+ }
+ return;
+ break;
+
+ default:
+ break;
+ }
+ if(exdata->in_tunnel_is_http){
+ http_decoder_execute(sess, httpd_env, exdata);
+ }
+ return;
+ }
+
+ void http_decoder_tcp_stream_msg_cb(struct session *sess, int topic_id, const void *msg, void *nouse_session_ctx, void *plugin_env)
+ {
+ struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env;
+ enum session_state sess_state = session_get_current_state(sess);
+ struct http_decoder_exdata *exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id);
+
+ switch(sess_state){
+ case SESSION_STATE_OPENING:
+ {
+ exdata = httpd_session_exdata_new(sess, httpd_env, 0, 0);
+ exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_MSG_INDEX].sub_topic_id;
+ session_exdata_set(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id, exdata);
+ //go on
+ }
+ break;
+
+ case SESSION_STATE_ACTIVE:
+ //go on
+ break;
+
+ case SESSION_STATE_CLOSING:
+ {
+ if(httpd_in_tunnel_transmitting(exdata)){
+ http_decoder_push_tunnel_data(sess, exdata, HTTP_TUNNEL_CLOSING);
+ }else{
+ http_half_pre_context_free(sess, exdata);
+ }
+ return;
+ }
+ break;
+
+ default:
+ return;
+ break;
+ }
+
+ http_decoder_execute(sess, httpd_env, exdata);
+
return;
}
+static const struct http_topic_exdata_compose g_topic_exdata_compose[HTTPD_TOPIC_INDEX_MAX] =
+{
+ {HTTPD_TOPIC_TCP_STREAM_INDEX, TOPIC_TCP_STREAM, http_decoder_tcp_stream_msg_cb, NULL, "HTTP_DECODER_EXDATA_BASEON_TCP_STREAM", httpd_ex_data_free_cb, -1, -1},
+ {HTTPD_TOPIC_HTTP_MSG_INDEX, HTTP_DECODER_TOPIC, NULL, http_message_free, NULL, NULL, -1, -1},
+ {HTTPD_TOPIC_HTTP_TUNNEL_INDEX, HTTP_DECODER_TUNNEL_TOPIC, http_decoder_tunnel_msg_cb, http_message_free, "HTTP_DECODER_EXDATA_BASEON_HTTP_TUNNEL", httpd_ex_data_free_cb, -1, -1},
+};
+
+ static void http_decoder_topic_exdata_compose_init(struct http_decoder_env *httpd_env)
+ {
+ memcpy(httpd_env->topic_exdata_compose, g_topic_exdata_compose, sizeof(g_topic_exdata_compose));
+ for(int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++){
+ httpd_env->topic_exdata_compose[i].sub_topic_id = stellar_session_mq_get_topic_id_reliable(httpd_env->st,
+ httpd_env->topic_exdata_compose[i].topic_name,
+ httpd_env->topic_exdata_compose[i].msg_free_cb,
+ NULL);
+ assert(httpd_env->topic_exdata_compose[i].sub_topic_id >= 0);
+
+ if(httpd_env->topic_exdata_compose[i].exdata_name){
+ httpd_env->topic_exdata_compose[i].exdata_id = stellar_session_exdata_new_index(httpd_env->st,
+ httpd_env->topic_exdata_compose[i].exdata_name,
+ httpd_env->topic_exdata_compose[i].exdata_free_cb,
+ NULL);
+ assert(httpd_env->topic_exdata_compose[i].exdata_id >= 0);
+ }
+
+ if(httpd_env->topic_exdata_compose[i].on_msg_cb){
+ stellar_session_mq_subscribe(httpd_env->st, httpd_env->topic_exdata_compose[i].sub_topic_id,
+ httpd_env->topic_exdata_compose[i].on_msg_cb, httpd_env->plugin_id);
+ }
+ }
+ }
+
+ int http_topic_exdata_compose_get_index(const struct http_decoder_env *httpd_env, int by_topic_id)
+ {
+ for(int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++){
+ if(httpd_env->topic_exdata_compose[i].sub_topic_id == by_topic_id){
+ return i;
+ }
+ }
+ assert(0);
+ return -1;
+ }
+
void *http_decoder_init(struct stellar *st)
{
- int httpd_msg_topic_id = -1, tcp_stream_topic_id = -1;
int thread_num = 0;
struct http_decoder_env *httpd_env = CALLOC(struct http_decoder_env, 1);
@@ -597,27 +767,13 @@ extern "C"
goto failed;
}
httpd_env->st = st;
- httpd_env->ex_data_idx = stellar_session_exdata_new_index(st, "HTTP_DECODER",
- _httpd_ex_data_free_cb,
- NULL);
- httpd_env->plugin_id = stellar_session_plugin_register(st, _httpd_session_ctx_new_cb,
- _httpd_session_ctx_free_cb, (void *)httpd_env);
+ httpd_env->plugin_id = stellar_session_plugin_register(st, httpd_session_ctx_new_cb,
+ httpd_session_ctx_free_cb, (void *)httpd_env);
if (httpd_env->plugin_id < 0)
{
goto failed;
}
-
- httpd_msg_topic_id = stellar_session_mq_get_topic_id(st, HTTP_DECODER_TOPIC);
- if (httpd_msg_topic_id < 0)
- {
- httpd_msg_topic_id = stellar_session_mq_create_topic(st, HTTP_DECODER_TOPIC,
- http_message_free, NULL);
- }
- httpd_env->httpd_msg_topic_id = httpd_msg_topic_id;
-
- tcp_stream_topic_id = stellar_session_mq_get_topic_id(st, TOPIC_TCP_STREAM);
- assert(tcp_stream_topic_id >= 0);
- stellar_session_mq_subscribe(st, tcp_stream_topic_id, http_decoder_tcp_stream_msg_cb, httpd_env->plugin_id);
+ http_decoder_topic_exdata_compose_init(httpd_env);
thread_num = stellar_get_worker_thread_num(st);
assert(thread_num >= 1);
@@ -626,9 +782,15 @@ extern "C"
{
goto failed;
}
-
- printf("http_decoder_init succ: ex_data_idx:%d, plugin_id:%d, topic_id:%d\n",
- httpd_env->ex_data_idx, httpd_env->plugin_id, httpd_env->httpd_msg_topic_id);
+
+ printf("http decoder init succ, plugin id:%d \n", httpd_env->plugin_id);
+ for(int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++){
+ printf("\ttopic_name:%s, topic_id:%d, ex_data_name:%s, exdata_id:%d\n",
+ httpd_env->topic_exdata_compose[i].topic_name,
+ httpd_env->topic_exdata_compose[i].sub_topic_id,
+ httpd_env->topic_exdata_compose[i].exdata_name,
+ httpd_env->topic_exdata_compose[i].exdata_id);
+ }
return httpd_env;
failed:
@@ -662,9 +824,9 @@ extern "C"
if (unlikely(NULL == msg || msg->type != HTTP_MESSAGE_REQ_LINE))
{
if(line){
- line->method.str = NULL;
- line->uri.str = NULL;
- line->version.str = NULL;
+ line->method.iov_base = NULL;
+ line->uri.iov_base = NULL;
+ line->version.iov_base = NULL;
}
return;
}
@@ -683,8 +845,8 @@ extern "C"
if (unlikely(NULL == msg || msg->type != HTTP_MESSAGE_RES_LINE))
{
if(line){
- line->version.str = NULL;
- line->status.str = NULL;
+ line->version.iov_base = NULL;
+ line->status.iov_base = NULL;
}
return;
}
@@ -697,7 +859,7 @@ extern "C"
http_decoder_half_data_get_response_line(res_data, line);
}
- void http_message_get_header(const struct http_message *msg, const struct hstring *key,
+ void http_message_get_header(const struct http_message *msg, const hstring *key,
struct http_header *hdr_result)
{
int ret = -1;
@@ -720,8 +882,8 @@ extern "C"
}
fail:
if(hdr_result){
- hdr_result->key.str = NULL;
- hdr_result->val.str = NULL;
+ hdr_result->key.iov_base = NULL;
+ hdr_result->val.iov_base = NULL;
}
return;
}
@@ -750,8 +912,8 @@ extern "C"
return 0;
fail:
if(header){
- header->key.str = NULL;
- header->val.str = NULL;
+ header->key.iov_base = NULL;
+ header->val.iov_base = NULL;
}
return -1;
}
@@ -780,7 +942,7 @@ extern "C"
}
void http_message_get_raw_body(const struct http_message *msg,
- struct hstring *body)
+ hstring *body)
{
int ret = -1;
if (unlikely(NULL == msg))
@@ -803,14 +965,14 @@ extern "C"
return;
fail:
if(body){
- body->str = NULL;
- body->str_len = 0;
+ body->iov_base = NULL;
+ body->iov_len = 0;
}
return;
}
void http_message_get_decompress_body(const struct http_message *msg,
- struct hstring *body)
+ hstring *body)
{
int ret = -1;
if (unlikely(NULL == msg))
@@ -833,19 +995,19 @@ extern "C"
return;
fail:
if(body){
- body->str = NULL;
- body->str_len = 0;
+ body->iov_base = NULL;
+ body->iov_len = 0;
}
return;
}
- void http_message_get_url(const struct http_message *msg, struct hstring *url)
+ void http_message_get_url(const struct http_message *msg, hstring *url)
{
if (unlikely(NULL == msg))
{
if(url){
- url->str = NULL;
- url->str_len = 0;
+ url->iov_base = NULL;
+ url->iov_len = 0;
}
return;
}
@@ -862,8 +1024,8 @@ extern "C"
fail:
if(url){
- url->str = NULL;
- url->str_len = 0;
+ url->iov_base = NULL;
+ url->iov_len = 0;
}
return;
}