summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt2
-rw-r--r--src/http_decoder.cpp408
-rw-r--r--src/http_decoder_half.cpp139
-rw-r--r--src/http_decoder_half.h23
-rw-r--r--src/http_decoder_inc.h36
-rw-r--r--src/http_decoder_stat.cpp43
-rw-r--r--src/http_decoder_stat.h16
-rw-r--r--src/http_decoder_string.cpp118
-rw-r--r--src/http_decoder_string.h8
-rw-r--r--src/http_decoder_table.cpp56
-rw-r--r--src/http_decoder_table.h12
-rw-r--r--src/http_decoder_tunnel.cpp99
-rw-r--r--src/http_decoder_tunnel.h21
-rw-r--r--src/http_decoder_utils.cpp20
-rw-r--r--src/http_decoder_utils.h2
-rw-r--r--src/version.map1
16 files changed, 685 insertions, 319 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 1d3c949..469591b 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -10,7 +10,7 @@ aux_source_directory(${PROJECT_SOURCE_DIR}/deps/toml DEPS_SRC)
set(HTTP_SRC ${DEPS_SRC} http_decoder.cpp http_decoder_utils.cpp http_decoder_half.cpp
http_decoder_table.cpp http_decoder_string.cpp http_content_decompress.cpp
- http_decoder_result_queue.cpp http_decoder_stat.cpp)
+ http_decoder_result_queue.cpp http_decoder_stat.cpp http_decoder_tunnel.cpp)
add_library(http_decoder SHARED ${HTTP_SRC})
set_target_properties(http_decoder PROPERTIES LINK_FLAGS "-Wl,--version-script=${PROJECT_SOURCE_DIR}/src/version.map")
diff --git a/src/http_decoder.cpp b/src/http_decoder.cpp
index 8aa39e9..3af4dad 100644
--- a/src/http_decoder.cpp
+++ b/src/http_decoder.cpp
@@ -1,3 +1,4 @@
+#include "http_decoder.h"
#include <assert.h>
#include <stdio.h>
#include <string.h>
@@ -36,6 +37,7 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d
struct http_decoder_half_data *half_data = NULL;
int ret = 0;
u_int8_t flow_flag = 0;
+ struct http_decoder_exdata *exdata = ev_ctx->ref_httpd_ctx;
int thread_id = stellar_get_current_thread_id(httpd_env->st);
@@ -74,44 +76,53 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d
}
*data = half_data;
queue_idx = http_decoder_result_queue_req_index(queue); //get the index after inc
-#if 1 /* llhttp always call on_message_begin() even if llhttp_execute() error!!! */
+ /* llhttp always call on_message_begin() even if llhttp_execute() error!!! */
msg = http_message_new(HTTP_TRANSACTION_NEW, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_NEW, 1);
-#endif
break;
case HTTP_EVENT_REQ_LINE:
-#if 0
- msg = http_message_new(HTTP_TRANSACTION_NEW, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_NEW, 1);
-#endif
msg = http_message_new(HTTP_MESSAGE_REQ_LINE, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
+ if(httpd_tunnel_identify(PACKET_DIRECTION_C2S, half_data)){
+ exdata->tunnel_state = HTTP_TUN_C2S_HDR_START;
+ }
+ http_decoder_get_url(half_data, mempool);
break;
case HTTP_EVENT_REQ_HDR:
msg = http_message_new(HTTP_MESSAGE_REQ_HEADER, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
break;
case HTTP_EVENT_REQ_HDR_END:
{
- http_decoder_join_url_finally(ev_ctx, http_decoder_result_queue_peek_req(queue), mempool);
+ http_decoder_join_url_finally(ev_ctx, half_data, mempool);
/* maybe some parsed headers in buffer, but has not pushed to plugins yet */
- half_data = http_decoder_result_queue_peek_req(queue);
+
if(http_decoder_half_data_has_parsed_header(half_data)){
msg = http_message_new(HTTP_MESSAGE_REQ_HEADER, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
}
http_half_data_update_commit_index(half_data);
msg = http_message_new(HTTP_MESSAGE_REQ_HEADER_END, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
int tot_c2s_headers = http_half_data_get_total_parsed_header_count(half_data);
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_HEADERS_C2S, tot_c2s_headers);
- struct hstring tmp_url = {};
+ hstring tmp_url = {};
http_half_data_get_url(half_data, &tmp_url);
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_URL_BYTES, tmp_url.str_len);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_URL_BYTES, tmp_url.iov_len);
+
+ if(httpd_is_tunnel_session(exdata)){
+ session_is_symmetric(ev_ctx->ref_session, &flow_flag);
+ if(SESSION_SEEN_C2S_FLOW == flow_flag){
+ exdata->tunnel_state = HTTP_TUN_INNER_STARTING;
+ http_half_pre_context_free(ev_ctx->ref_session, exdata);
+ exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].sub_topic_id;
+ }else{
+ exdata->tunnel_state = HTTP_TUN_C2S_HDR_END;
+ }
+ }
}
break;
case HTTP_EVENT_REQ_BODY_BEGIN:
@@ -119,18 +130,18 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d
break;
case HTTP_EVENT_REQ_BODY_DATA:
msg = http_message_new(HTTP_MESSAGE_REQ_BODY, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
break;
case HTTP_EVENT_REQ_BODY_END:
msg = http_message_new(HTTP_MESSAGE_REQ_BODY_END, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
break;
case HTTP_EVENT_REQ_END:
{
session_is_symmetric(ev_ctx->ref_session, &flow_flag);
if(SESSION_SEEN_C2S_FLOW == flow_flag){
msg = http_message_new(HTTP_TRANSACTION_FREE, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_FREE, 1);
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_ASYMMETRY_TRANSACTION_C2S, 1);
}
@@ -173,17 +184,23 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d
if(0 == session_is_symmetric(ev_ctx->ref_session, &flow_flag)){
if(SESSION_SEEN_S2C_FLOW == flow_flag){
msg = http_message_new(HTTP_TRANSACTION_NEW, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
}
}
break;
case HTTP_EVENT_RES_LINE:
msg = http_message_new(HTTP_MESSAGE_RES_LINE, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
+ if(httpd_tunnel_identify(PACKET_DIRECTION_S2C, half_data)){
+ exdata->tunnel_state = HTTP_TUN_S2C_START;
+ }else{
+ //connect response fail, reset tunnel_state
+ exdata->tunnel_state = HTTP_TUN_NON;
+ }
break;
case HTTP_EVENT_RES_HDR:
msg = http_message_new(HTTP_MESSAGE_RES_HEADER, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
break;
case HTTP_EVENT_RES_HDR_END:
{
@@ -191,29 +208,36 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d
half_data = http_decoder_result_queue_peek_res(queue);
if(http_decoder_half_data_has_parsed_header(half_data)){
msg = http_message_new(HTTP_MESSAGE_RES_HEADER, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
}
http_half_data_update_commit_index(half_data);
msg = http_message_new(HTTP_MESSAGE_RES_HEADER_END, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
int tot_s2c_headers = http_half_data_get_total_parsed_header_count(half_data);
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_HEADERS_S2C, tot_s2c_headers);
+
+ if(httpd_is_tunnel_session(exdata)){
+ exdata->tunnel_state = HTTP_TUN_INNER_STARTING;
+ http_half_pre_context_free(ev_ctx->ref_session, exdata);
+ exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].sub_topic_id;
+ // http_decoder_push_tunnel_data(ev_ctx->ref_session, exdata, HTTP_TUNNEL_OPENING);
+ }
}
break;
case HTTP_EVENT_RES_BODY_BEGIN:
break;
case HTTP_EVENT_RES_BODY_DATA:
msg = http_message_new(HTTP_MESSAGE_RES_BODY, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
break;
case HTTP_EVENT_RES_BODY_END:
msg = http_message_new(HTTP_MESSAGE_RES_BODY_END, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
break;
case HTTP_EVENT_RES_END:
msg = http_message_new(HTTP_TRANSACTION_FREE, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_FREE, 1);
session_is_symmetric(ev_ctx->ref_session, &flow_flag);
if(SESSION_SEEN_S2C_FLOW == flow_flag){
@@ -237,12 +261,14 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d
}
}
-static struct http_decoder *http_decoder_new(nmx_pool_t *mempool, http_event_cb *ev_cb, int decompress_switch, struct http_decoder_env *httpd_env)
+static struct http_decoder *http_decoder_new(struct http_decoder_exdata *hd_ctx, nmx_pool_t *mempool,
+ http_event_cb *ev_cb, int decompress_switch, struct http_decoder_env *httpd_env,
+ long long req_start_seq, long long res_start_seq)
{
struct http_decoder *decoder = MEMPOOL_CALLOC(mempool, struct http_decoder, 1);
assert(decoder);
- decoder->c2s_half = http_decoder_half_new(mempool, ev_cb, HTTP_REQUEST, decompress_switch, httpd_env);
- decoder->s2c_half = http_decoder_half_new(mempool, ev_cb, HTTP_RESPONSE, decompress_switch, httpd_env);
+ decoder->c2s_half = http_decoder_half_new(hd_ctx, mempool, ev_cb, HTTP_REQUEST, decompress_switch, httpd_env, req_start_seq);
+ decoder->s2c_half = http_decoder_half_new(hd_ctx, mempool, ev_cb, HTTP_RESPONSE, decompress_switch, httpd_env, res_start_seq);
return decoder;
}
@@ -266,13 +292,15 @@ static void http_decoder_free(nmx_pool_t *mempool, struct http_decoder *decoder)
}
static struct http_decoder_exdata *http_decoder_exdata_new(size_t mempool_size, size_t queue_size,
- int decompress_switch, struct http_decoder_env *httpd_env)
+ int decompress_switch, struct http_decoder_env *httpd_env,
+ long long req_start_seq, long long res_start_seq)
{
- struct http_decoder_exdata *ex_data = CALLOC(struct http_decoder_exdata, 1);
- ex_data->mempool = nmx_create_pool(mempool_size);
- ex_data->decoder = http_decoder_new(ex_data->mempool, http_event_handler, decompress_switch, httpd_env);
- ex_data->queue = http_decoder_result_queue_new(ex_data->mempool, queue_size);
- return ex_data;
+ struct http_decoder_exdata *hd_ctx = CALLOC(struct http_decoder_exdata, 1);
+ hd_ctx->mempool = nmx_create_pool(mempool_size);
+ hd_ctx->decoder = http_decoder_new(hd_ctx, hd_ctx->mempool, http_event_handler, decompress_switch,
+ httpd_env, req_start_seq, res_start_seq);
+ hd_ctx->queue = http_decoder_result_queue_new(hd_ctx->mempool, queue_size);
+ return hd_ctx;
}
static void http_decoder_exdata_free(struct http_decoder_exdata *ex_data)
@@ -291,8 +319,9 @@ static void http_decoder_exdata_free(struct http_decoder_exdata *ex_data)
http_decoder_result_queue_free(ex_data->mempool, ex_data->queue);
ex_data->queue = NULL;
}
-
- nmx_destroy_pool(ex_data->mempool);
+ if(ex_data->mempool){
+ nmx_destroy_pool(ex_data->mempool);
+ }
FREE(ex_data);
}
@@ -310,7 +339,7 @@ static int http_protocol_identify(const char *data, size_t data_len)
{
return -1;
}
- return 0;
+ return 1;
}
static void _http_decoder_context_free(struct http_decoder_env *env)
@@ -327,10 +356,10 @@ static void _http_decoder_context_free(struct http_decoder_env *env)
http_decoder_stat_free(&env->hd_stat);
- if (env->httpd_msg_topic_id >= 0)
- {
- stellar_session_mq_destroy_topic(env->st, env->httpd_msg_topic_id);
- env->httpd_msg_topic_id = -1;
+ for(int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++){
+ if(env->topic_exdata_compose[i].msg_free_cb){
+ stellar_session_mq_destroy_topic(env->st, env->topic_exdata_compose[i].sub_topic_id);
+ }
}
FREE(env);
@@ -412,7 +441,7 @@ static int load_http_decoder_config(const char *cfg_path,
return ret;
}
-static int http_msg_get_request_header(const struct http_message *msg, const struct hstring *key,
+static int http_msg_get_request_header(const struct http_message *msg, const hstring *key,
struct http_header *hdr_result)
{
const struct http_decoder_half_data *req_data =
@@ -420,7 +449,7 @@ static int http_msg_get_request_header(const struct http_message *msg, const str
return http_decoder_half_data_get_header(req_data, key, hdr_result);
}
-static int http_msg_get_response_header(const struct http_message *msg, const struct hstring *key,
+static int http_msg_get_response_header(const struct http_message *msg, const hstring *key,
struct http_header *hdr_result)
{
const struct http_decoder_half_data *res_data =
@@ -445,7 +474,7 @@ static int http_msg_response_header_next(const struct http_message *msg,
}
static int http_msg_get_request_raw_body(const struct http_message *msg,
- struct hstring *body)
+ hstring *body)
{
const struct http_decoder_half_data *req_data =
msg->ref_queue->array[msg->queue_index].req_data;
@@ -453,7 +482,7 @@ static int http_msg_get_request_raw_body(const struct http_message *msg,
}
static int http_msg_get_response_raw_body(const struct http_message *msg,
- struct hstring *body)
+ hstring *body)
{
const struct http_decoder_half_data *res_data =
msg->ref_queue->array[msg->queue_index].res_data;
@@ -461,7 +490,7 @@ static int http_msg_get_response_raw_body(const struct http_message *msg,
}
static int http_msg_get_request_decompress_body(const struct http_message *msg,
- struct hstring *body)
+ hstring *body)
{
const struct http_decoder_half_data *req_data =
msg->ref_queue->array[msg->queue_index].req_data;
@@ -469,19 +498,32 @@ static int http_msg_get_request_decompress_body(const struct http_message *msg,
}
static int http_msg_get_response_decompress_body(const struct http_message *msg,
- struct hstring *body)
+ hstring *body)
{
const struct http_decoder_half_data *res_data =
msg->ref_queue->array[msg->queue_index].res_data;
return http_decoder_half_data_get_decompress_body(res_data, body);
}
+static struct http_decoder_exdata *httpd_session_exdata_new(struct session *sess, struct http_decoder_env *httpd_env,
+ long long req_start_seq, long long res_start_seq)
+{
+ struct http_decoder_exdata *exdata = http_decoder_exdata_new(httpd_env->hd_cfg.mempool_size,
+ httpd_env->hd_cfg.result_queue_len,
+ httpd_env->hd_cfg.decompress_switch,
+ httpd_env,req_start_seq,res_start_seq);
+ // exdata->sub_topic_id = sub_topic_id;
+ int thread_id = stellar_get_current_thread_id(httpd_env->st);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_SESSION_NEW, 1);
+ return exdata;
+}
+
#ifdef __cplusplus
extern "C"
{
#endif
- void _httpd_ex_data_free_cb(struct session *s, int idx, void *ex_data, void *arg)
+ void httpd_ex_data_free_cb(struct session *s, int idx, void *ex_data, void *arg)
{
if (NULL == ex_data)
{
@@ -491,7 +533,7 @@ extern "C"
http_decoder_exdata_free(exdata);
}
- void *_httpd_session_ctx_new_cb(struct session *sess, void *plugin_env)
+ void *httpd_session_ctx_new_cb(struct session *sess, void *plugin_env)
{
// If not http, ignore this session
size_t payload_len;
@@ -504,31 +546,23 @@ extern "C"
if (ret < 0)
{
stellar_session_plugin_dettach_current_session(sess);
- return (void *)"not_http_session";
+ return (void *)"__not_http_session__";
}
}
- struct http_decoder_exdata *ex_data = http_decoder_exdata_new(httpd_env->hd_cfg.mempool_size,
- httpd_env->hd_cfg.result_queue_len,
- httpd_env->hd_cfg.decompress_switch,
- httpd_env);
- session_exdata_set(sess, httpd_env->ex_data_idx, ex_data);
- int thread_id = stellar_get_current_thread_id(httpd_env->st);
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_SESSION_NEW, 1);
-
- return (void *)"fake_http_decoder_ctx"; // http decoder not use ctx, use exdata only!
+ return (void *)"__fake_http_decoder_ctx__";
}
- void _httpd_session_ctx_free_cb(struct session *sess, void *session_ctx, void *plugin_env)
+ void httpd_session_ctx_free_cb(struct session *sess, void *session_ctx, void *plugin_env)
{
if(NULL == plugin_env || NULL == session_ctx){
return;
}
- if(strncmp((const char *)session_ctx, "not_http_session", strlen("not_http_session")) == 0){
+ if(strncmp((const char *)session_ctx, "__not_http_session__", strlen("__not_http_session__")) == 0){
return;
}
struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env;
- int thread_id = stellar_get_current_thread_id(httpd_env->st);
+ int thread_id = session_get_current_thread_id(sess);
unsigned char flow_flag = 0;
session_is_symmetric(sess, &flow_flag);
@@ -541,53 +575,189 @@ extern "C"
}
}
- void http_decoder_tcp_stream_msg_cb(struct session *sess, int topic_id, const void *msg, void *no_use_ctx, void *plugin_env)
+ static void http_decoder_execute(struct session *sess, struct http_decoder_env *httpd_env, http_decoder_exdata *exdata)
{
- struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env;
- struct http_decoder_exdata *ex_data = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->ex_data_idx);
size_t payload_len;
-
- if(SESSION_STATE_CLOSING == session_get_current_state(sess)){
- http_half_pre_context_free(sess, httpd_env, ex_data);
- return;
- }
-
const char *payload = session_get0_current_payload(sess, &payload_len);
- if (unlikely(0 == payload_len || NULL == ex_data))
+ if (unlikely(0 == payload_len || NULL == payload))
{
return;
}
- int thread_id = stellar_get_current_thread_id(httpd_env->st);
- struct http_decoder_half *cur_half = NULL;
+ if(httpd_in_tunnel_transmitting(exdata)){
+ http_decoder_push_tunnel_data(sess, exdata, httpd_tunnel_state_to_msg(exdata));
+ httpd_tunnel_state_update(exdata);
+ return;
+ }
- if (PACKET_DIRECTION_C2S == packet_get_direction(session_get0_current_packet(sess)))
+ int thread_id = session_get_current_thread_id(sess);
+ struct http_decoder_half *cur_half = NULL;
+ int sess_dir = packet_get_direction(session_get0_current_packet(sess));
+ if (PACKET_DIRECTION_C2S == sess_dir)
{
- cur_half = ex_data->decoder->c2s_half;
+ cur_half = exdata->decoder->c2s_half;
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_BYTES_C2S, payload_len);
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TCP_SEG_C2S, 1);
}
else
{
- cur_half = ex_data->decoder->s2c_half;
+ cur_half = exdata->decoder->s2c_half;
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_BYTES_S2C, payload_len);
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TCP_SEG_S2C, 1);
}
- http_decoder_half_reinit(cur_half, httpd_env->httpd_msg_topic_id, ex_data->queue,
- ex_data->mempool, sess);
+ http_decoder_half_reinit(cur_half, exdata->queue, exdata->mempool, sess);
int ret = http_decoder_half_parse(cur_half, payload, payload_len);
if (ret < 0)
{
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_PARSE_ERR, 1);
stellar_session_plugin_dettach_current_session(sess);
}
+ }
+
+ void http_decoder_tunnel_msg_cb(struct session *sess, int topic_id, const void *tmsg, void *per_session_ctx, void *plugin_env)
+ {
+ struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env;
+ struct http_decoder_exdata *exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].exdata_id);
+ enum http_tunnel_message_type tmsg_type = http_tunnel_message_type_get((const struct http_tunnel_message *)tmsg);
+ switch (tmsg_type)
+ {
+ case HTTP_TUNNEL_OPENING:
+ {
+ if(NULL != exdata){
+ //not support nested http tunnel
+ session_mq_ignore_message(sess, topic_id, httpd_env->plugin_id);
+ return;
+ }
+ size_t payload_len;
+ const char *payload = session_get0_current_payload(sess, &payload_len);
+ size_t http_identify_len = payload_len > HTTP_IDENTIFY_LEN ? HTTP_IDENTIFY_LEN : payload_len;
+ int is_http = http_protocol_identify(payload, http_identify_len);
+ if(is_http){
+ long long max_req_seq = 0, max_res_seq = 0;
+ struct http_decoder_exdata *tcp_stream_exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id);
+ http_half_get_max_transaction_seq(tcp_stream_exdata, &max_req_seq, &max_res_seq);
+ exdata = httpd_session_exdata_new(sess, httpd_env, max_req_seq, max_res_seq);
+ session_exdata_set(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].exdata_id, exdata);
+ exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_MSG_INDEX].sub_topic_id;
+ exdata->in_tunnel_is_http = 1;
+ }else{
+ exdata = CALLOC(struct http_decoder_exdata, 1);
+ exdata->decoder = NULL;
+ exdata->pub_topic_id = -1;
+ exdata->in_tunnel_is_http = 0;
+ session_exdata_set(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].exdata_id, exdata);
+ //do nothing, but can't call stellar_session_plugin_dettach_current_session() !!!
+ return;
+ }
+ }
+ break;
+
+ case HTTP_TUNNEL_ACTIVE:
+ break;
+
+ case HTTP_TUNNEL_CLOSING:
+ if(exdata->in_tunnel_is_http){
+ http_half_pre_context_free(sess, exdata);
+ }
+ return;
+ break;
+
+ default:
+ break;
+ }
+ if(exdata->in_tunnel_is_http){
+ http_decoder_execute(sess, httpd_env, exdata);
+ }
+ return;
+ }
+
+ void http_decoder_tcp_stream_msg_cb(struct session *sess, int topic_id, const void *msg, void *nouse_session_ctx, void *plugin_env)
+ {
+ struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env;
+ enum session_state sess_state = session_get_current_state(sess);
+ struct http_decoder_exdata *exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id);
+
+ switch(sess_state){
+ case SESSION_STATE_OPENING:
+ {
+ exdata = httpd_session_exdata_new(sess, httpd_env, 0, 0);
+ exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_MSG_INDEX].sub_topic_id;
+ session_exdata_set(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id, exdata);
+ //go on
+ }
+ break;
+
+ case SESSION_STATE_ACTIVE:
+ //go on
+ break;
+
+ case SESSION_STATE_CLOSING:
+ {
+ if(httpd_in_tunnel_transmitting(exdata)){
+ http_decoder_push_tunnel_data(sess, exdata, HTTP_TUNNEL_CLOSING);
+ }else{
+ http_half_pre_context_free(sess, exdata);
+ }
+ return;
+ }
+ break;
+
+ default:
+ return;
+ break;
+ }
+
+ http_decoder_execute(sess, httpd_env, exdata);
+
return;
}
+static const struct http_topic_exdata_compose g_topic_exdata_compose[HTTPD_TOPIC_INDEX_MAX] =
+{
+ {HTTPD_TOPIC_TCP_STREAM_INDEX, TOPIC_TCP_STREAM, http_decoder_tcp_stream_msg_cb, NULL, "HTTP_DECODER_EXDATA_BASEON_TCP_STREAM", httpd_ex_data_free_cb, -1, -1},
+ {HTTPD_TOPIC_HTTP_MSG_INDEX, HTTP_DECODER_TOPIC, NULL, http_message_free, NULL, NULL, -1, -1},
+ {HTTPD_TOPIC_HTTP_TUNNEL_INDEX, HTTP_DECODER_TUNNEL_TOPIC, http_decoder_tunnel_msg_cb, http_message_free, "HTTP_DECODER_EXDATA_BASEON_HTTP_TUNNEL", httpd_ex_data_free_cb, -1, -1},
+};
+
+ static void http_decoder_topic_exdata_compose_init(struct http_decoder_env *httpd_env)
+ {
+ memcpy(httpd_env->topic_exdata_compose, g_topic_exdata_compose, sizeof(g_topic_exdata_compose));
+ for(int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++){
+ httpd_env->topic_exdata_compose[i].sub_topic_id = stellar_session_mq_get_topic_id_reliable(httpd_env->st,
+ httpd_env->topic_exdata_compose[i].topic_name,
+ httpd_env->topic_exdata_compose[i].msg_free_cb,
+ NULL);
+ assert(httpd_env->topic_exdata_compose[i].sub_topic_id >= 0);
+
+ if(httpd_env->topic_exdata_compose[i].exdata_name){
+ httpd_env->topic_exdata_compose[i].exdata_id = stellar_session_exdata_new_index(httpd_env->st,
+ httpd_env->topic_exdata_compose[i].exdata_name,
+ httpd_env->topic_exdata_compose[i].exdata_free_cb,
+ NULL);
+ assert(httpd_env->topic_exdata_compose[i].exdata_id >= 0);
+ }
+
+ if(httpd_env->topic_exdata_compose[i].on_msg_cb){
+ stellar_session_mq_subscribe(httpd_env->st, httpd_env->topic_exdata_compose[i].sub_topic_id,
+ httpd_env->topic_exdata_compose[i].on_msg_cb, httpd_env->plugin_id);
+ }
+ }
+ }
+
+ int http_topic_exdata_compose_get_index(const struct http_decoder_env *httpd_env, int by_topic_id)
+ {
+ for(int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++){
+ if(httpd_env->topic_exdata_compose[i].sub_topic_id == by_topic_id){
+ return i;
+ }
+ }
+ assert(0);
+ return -1;
+ }
+
void *http_decoder_init(struct stellar *st)
{
- int httpd_msg_topic_id = -1, tcp_stream_topic_id = -1;
int thread_num = 0;
struct http_decoder_env *httpd_env = CALLOC(struct http_decoder_env, 1);
@@ -597,27 +767,13 @@ extern "C"
goto failed;
}
httpd_env->st = st;
- httpd_env->ex_data_idx = stellar_session_exdata_new_index(st, "HTTP_DECODER",
- _httpd_ex_data_free_cb,
- NULL);
- httpd_env->plugin_id = stellar_session_plugin_register(st, _httpd_session_ctx_new_cb,
- _httpd_session_ctx_free_cb, (void *)httpd_env);
+ httpd_env->plugin_id = stellar_session_plugin_register(st, httpd_session_ctx_new_cb,
+ httpd_session_ctx_free_cb, (void *)httpd_env);
if (httpd_env->plugin_id < 0)
{
goto failed;
}
-
- httpd_msg_topic_id = stellar_session_mq_get_topic_id(st, HTTP_DECODER_TOPIC);
- if (httpd_msg_topic_id < 0)
- {
- httpd_msg_topic_id = stellar_session_mq_create_topic(st, HTTP_DECODER_TOPIC,
- http_message_free, NULL);
- }
- httpd_env->httpd_msg_topic_id = httpd_msg_topic_id;
-
- tcp_stream_topic_id = stellar_session_mq_get_topic_id(st, TOPIC_TCP_STREAM);
- assert(tcp_stream_topic_id >= 0);
- stellar_session_mq_subscribe(st, tcp_stream_topic_id, http_decoder_tcp_stream_msg_cb, httpd_env->plugin_id);
+ http_decoder_topic_exdata_compose_init(httpd_env);
thread_num = stellar_get_worker_thread_num(st);
assert(thread_num >= 1);
@@ -626,9 +782,15 @@ extern "C"
{
goto failed;
}
-
- printf("http_decoder_init succ: ex_data_idx:%d, plugin_id:%d, topic_id:%d\n",
- httpd_env->ex_data_idx, httpd_env->plugin_id, httpd_env->httpd_msg_topic_id);
+
+ printf("http decoder init succ, plugin id:%d \n", httpd_env->plugin_id);
+ for(int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++){
+ printf("\ttopic_name:%s, topic_id:%d, ex_data_name:%s, exdata_id:%d\n",
+ httpd_env->topic_exdata_compose[i].topic_name,
+ httpd_env->topic_exdata_compose[i].sub_topic_id,
+ httpd_env->topic_exdata_compose[i].exdata_name,
+ httpd_env->topic_exdata_compose[i].exdata_id);
+ }
return httpd_env;
failed:
@@ -662,9 +824,9 @@ extern "C"
if (unlikely(NULL == msg || msg->type != HTTP_MESSAGE_REQ_LINE))
{
if(line){
- line->method.str = NULL;
- line->uri.str = NULL;
- line->version.str = NULL;
+ line->method.iov_base = NULL;
+ line->uri.iov_base = NULL;
+ line->version.iov_base = NULL;
}
return;
}
@@ -683,8 +845,8 @@ extern "C"
if (unlikely(NULL == msg || msg->type != HTTP_MESSAGE_RES_LINE))
{
if(line){
- line->version.str = NULL;
- line->status.str = NULL;
+ line->version.iov_base = NULL;
+ line->status.iov_base = NULL;
}
return;
}
@@ -697,7 +859,7 @@ extern "C"
http_decoder_half_data_get_response_line(res_data, line);
}
- void http_message_get_header(const struct http_message *msg, const struct hstring *key,
+ void http_message_get_header(const struct http_message *msg, const hstring *key,
struct http_header *hdr_result)
{
int ret = -1;
@@ -720,8 +882,8 @@ extern "C"
}
fail:
if(hdr_result){
- hdr_result->key.str = NULL;
- hdr_result->val.str = NULL;
+ hdr_result->key.iov_base = NULL;
+ hdr_result->val.iov_base = NULL;
}
return;
}
@@ -750,8 +912,8 @@ extern "C"
return 0;
fail:
if(header){
- header->key.str = NULL;
- header->val.str = NULL;
+ header->key.iov_base = NULL;
+ header->val.iov_base = NULL;
}
return -1;
}
@@ -780,7 +942,7 @@ extern "C"
}
void http_message_get_raw_body(const struct http_message *msg,
- struct hstring *body)
+ hstring *body)
{
int ret = -1;
if (unlikely(NULL == msg))
@@ -803,14 +965,14 @@ extern "C"
return;
fail:
if(body){
- body->str = NULL;
- body->str_len = 0;
+ body->iov_base = NULL;
+ body->iov_len = 0;
}
return;
}
void http_message_get_decompress_body(const struct http_message *msg,
- struct hstring *body)
+ hstring *body)
{
int ret = -1;
if (unlikely(NULL == msg))
@@ -833,19 +995,19 @@ extern "C"
return;
fail:
if(body){
- body->str = NULL;
- body->str_len = 0;
+ body->iov_base = NULL;
+ body->iov_len = 0;
}
return;
}
- void http_message_get_url(const struct http_message *msg, struct hstring *url)
+ void http_message_get_url(const struct http_message *msg, hstring *url)
{
if (unlikely(NULL == msg))
{
if(url){
- url->str = NULL;
- url->str_len = 0;
+ url->iov_base = NULL;
+ url->iov_len = 0;
}
return;
}
@@ -862,8 +1024,8 @@ extern "C"
fail:
if(url){
- url->str = NULL;
- url->str_len = 0;
+ url->iov_base = NULL;
+ url->iov_len = 0;
}
return;
}
diff --git a/src/http_decoder_half.cpp b/src/http_decoder_half.cpp
index 441ff2c..6074307 100644
--- a/src/http_decoder_half.cpp
+++ b/src/http_decoder_half.cpp
@@ -21,7 +21,7 @@ struct http_decoder_half_data
size_t decompress_body_len;
int joint_url_complete;
- struct hstring joint_url; // http://<host>[:<port>]/<path>?<searchpart>
+ hstring joint_url; // http://<host>[:<port>]/<path>?<searchpart>
long long transaction_index;
};
@@ -42,7 +42,7 @@ struct http_decoder_half
long long trans_counter;
long long err_counter;
- long long transaction_seq;
+ long long transaction_seq; //accumulated
const char *data;
int data_len;
@@ -77,9 +77,9 @@ http_decoder_half_data_decompress(struct http_decoder_half_data *data)
return;
}
- struct hstring raw_body = {0};
+ hstring raw_body = {0};
http_decoder_table_get_body(data->table, &raw_body);
- if (raw_body.str == NULL || raw_body.str_len == 0)
+ if (raw_body.iov_base == NULL || raw_body.iov_len == 0)
{
return;
}
@@ -90,8 +90,8 @@ http_decoder_half_data_decompress(struct http_decoder_half_data *data)
}
assert(data->decompress);
- if (http_content_decompress_write(data->decompress, raw_body.str,
- raw_body.str_len,
+ if (http_content_decompress_write(data->decompress, (char *)raw_body.iov_base,
+ raw_body.iov_len,
&data->ref_decompress_body,
&data->decompress_body_len) == -1)
{
@@ -226,12 +226,12 @@ static int on_uri(llhttp_t *http, const char *at, size_t length)
return 0;
}
-static void http_decoder_cached_portion_url(struct http_decoder_half *half, const struct hstring *uri_result)
+static void http_decoder_cached_portion_url(struct http_decoder_half *half, const hstring *uri_result)
{
struct http_decoder_half_data *ref_data = half->ref_data;
int uri_skip_len = 0;
- if ((uri_result->str_len) > 7 && (strncasecmp("http://", uri_result->str, 7) == 0)) // absolute URI
+ if ((uri_result->iov_len) > 7 && (strncasecmp("http://", (char *)uri_result->iov_base, 7) == 0)) // absolute URI
{
uri_skip_len = strlen("http://");
ref_data->joint_url_complete = 1;
@@ -241,9 +241,9 @@ static void http_decoder_cached_portion_url(struct http_decoder_half *half, cons
ref_data->joint_url_complete = 0;
}
- ref_data->joint_url.str_len = uri_result->str_len - uri_skip_len;
- ref_data->joint_url.str = MEMPOOL_CALLOC(half->http_ev_ctx->ref_mempool, char, ref_data->joint_url.str_len);
- memcpy(ref_data->joint_url.str, uri_result->str + uri_skip_len, ref_data->joint_url.str_len);
+ ref_data->joint_url.iov_len = uri_result->iov_len - uri_skip_len;
+ ref_data->joint_url.iov_base = MEMPOOL_CALLOC(half->http_ev_ctx->ref_mempool, char, ref_data->joint_url.iov_len);
+ memcpy(ref_data->joint_url.iov_base, (char *)uri_result->iov_base + uri_skip_len, ref_data->joint_url.iov_len);
}
/* Information-only callbacks, return value is ignored */
@@ -261,9 +261,9 @@ static int on_uri_complete(llhttp_t *http)
http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_URI);
- struct hstring uri_result = {};
+ hstring uri_result = {};
http_decoder_table_get_uri(half->ref_data->table, &uri_result);
- assert(uri_result.str);
+ assert(uri_result.iov_base);
http_decoder_cached_portion_url(half, &uri_result);
return 0;
@@ -408,17 +408,17 @@ static int on_header_value_complete(llhttp_t *http)
if (half->ref_data->content_encoding == HTTP_CONTENT_ENCODING_NONE)
{
struct http_header http_hdr = {0};
- struct hstring key = {.str = (char *)"Content-Encoding", .str_len = 16};
+ hstring key = {.iov_base = (char *)"Content-Encoding", .iov_len = 16};
if (http_decoder_table_get_header(half->ref_data->table, &key, &http_hdr) == 0)
{
char encoding_str[MAX_ENCODING_STR_LEN + 1] = {0};
- size_t str_len = http_hdr.val.str_len;
- if (str_len > MAX_ENCODING_STR_LEN)
+ size_t iov_len = http_hdr.val.iov_len;
+ if (iov_len > MAX_ENCODING_STR_LEN)
{
- str_len = MAX_ENCODING_STR_LEN;
+ iov_len = MAX_ENCODING_STR_LEN;
}
- memcpy(encoding_str, http_hdr.val.str, str_len);
+ memcpy(encoding_str, http_hdr.val.iov_base, iov_len);
half->ref_data->content_encoding = http_content_encoding_str2int(encoding_str);
}
}
@@ -539,8 +539,7 @@ static int on_body(llhttp_t *http, const char *at, size_t length)
return 0;
}
-static void http_decoder_half_init(struct http_decoder_half *half,
- http_event_cb *http_ev_cb, enum llhttp_type type)
+static void http_decoder_half_init(struct http_decoder_half *half, http_event_cb *http_ev_cb, enum llhttp_type type)
{
llhttp_settings_init(&half->settings);
llhttp_init(&half->parser, type, &half->settings);
@@ -579,8 +578,9 @@ static void http_decoder_half_init(struct http_decoder_half *half,
half->ref_data = NULL;
}
-struct http_decoder_half * http_decoder_half_new(nmx_pool_t *mempool, http_event_cb *ev_cb, enum llhttp_type http_type,
- int decompress_switch, struct http_decoder_env *httpd_env)
+struct http_decoder_half * http_decoder_half_new(struct http_decoder_exdata *hd_ctx, nmx_pool_t *mempool,
+ http_event_cb *ev_cb, enum llhttp_type http_type,
+ int decompress_switch, struct http_decoder_env *httpd_env, long long start_seq)
{
struct http_decoder_half *half = MEMPOOL_CALLOC(mempool, struct http_decoder_half, 1);
assert(half);
@@ -588,8 +588,9 @@ struct http_decoder_half * http_decoder_half_new(nmx_pool_t *mempool, http_event
half->decompress_switch = decompress_switch;
half->http_ev_ctx = MEMPOOL_CALLOC(mempool, struct http_event_context, 1);
http_decoder_half_init(half, ev_cb, http_type);
-
+ half->http_ev_ctx->ref_httpd_ctx = hd_ctx;
half->httpd_env = httpd_env;
+ half->transaction_seq = start_seq;
return half;
}
@@ -609,7 +610,7 @@ void http_decoder_half_free(nmx_pool_t *mempool, struct http_decoder_half *half)
MEMPOOL_FREE(mempool, half);
}
-void http_decoder_half_reinit(struct http_decoder_half *half, int topic_id,
+void http_decoder_half_reinit(struct http_decoder_half *half,
struct http_decoder_result_queue *queue,
nmx_pool_t *mempool, struct session *sess)
{
@@ -618,8 +619,6 @@ void http_decoder_half_reinit(struct http_decoder_half *half, int topic_id,
{
http_decoder_table_reinit(half->ref_data->table);
}
-
- half->http_ev_ctx->topic_id = topic_id;
half->http_ev_ctx->ref_mempool = mempool;
half->http_ev_ctx->ref_session = sess;
half->http_ev_ctx->ref_queue = queue;
@@ -810,10 +809,10 @@ void http_decoder_half_data_free(nmx_pool_t *mempool, struct http_decoder_half_d
data->decompress = NULL;
}
- if (data->joint_url.str)
+ if (data->joint_url.iov_base)
{
- MEMPOOL_FREE(mempool, data->joint_url.str);
- data->joint_url.str = NULL;
+ MEMPOOL_FREE(mempool, data->joint_url.iov_base);
+ data->joint_url.iov_base = NULL;
data->joint_url_complete = 0;
}
MEMPOOL_FREE(mempool, data);
@@ -846,7 +845,7 @@ int http_decoder_half_data_get_response_line(struct http_decoder_half_data *data
}
int http_decoder_half_data_get_header(const struct http_decoder_half_data *data,
- const struct hstring *key,
+ const hstring *key,
struct http_header *hdr_result)
{
return http_decoder_table_get_header(data->table, key, hdr_result);
@@ -877,7 +876,7 @@ int http_decoder_half_data_has_parsed_header(struct http_decoder_half_data *data
}
int http_decoder_half_data_get_raw_body(const struct http_decoder_half_data *data,
- struct hstring *body)
+ hstring *body)
{
if (NULL == data || NULL == body)
{
@@ -887,15 +886,15 @@ int http_decoder_half_data_get_raw_body(const struct http_decoder_half_data *dat
}
int http_decoder_half_data_get_decompress_body(const struct http_decoder_half_data *data,
- struct hstring *body)
+ hstring *body)
{
if (HTTP_CONTENT_ENCODING_NONE == data->content_encoding)
{
return http_decoder_table_get_body(data->table, body);
}
- body->str = data->ref_decompress_body;
- body->str_len = data->decompress_body_len;
+ body->iov_base = data->ref_decompress_body;
+ body->iov_len = data->decompress_body_len;
return 0;
}
@@ -923,17 +922,17 @@ static void using_session_addr_as_host(struct session *ref_session,
char ip_string_buf[INET6_ADDRSTRLEN];
if (SESSION_ADDR_TYPE_IPV4_TCP == ssaddr_type || SESSION_ADDR_TYPE_IPV4_UDP == ssaddr_type)
{
- host_result->val.str = MEMPOOL_CALLOC(mempool, char, (INET_ADDRSTRLEN + 7) /* "ip:port" max length */);
+ host_result->val.iov_base = MEMPOOL_CALLOC(mempool, char, (INET_ADDRSTRLEN + 7) /* "ip:port" max length */);
inet_ntop(AF_INET, &ssaddr->ipv4.daddr, ip_string_buf, INET_ADDRSTRLEN);
- sprintf(host_result->val.str, "%s:%u", ip_string_buf, ntohs(ssaddr->ipv4.dport));
- host_result->val.str_len = strlen(host_result->val.str);
+ sprintf((char *)host_result->val.iov_base, "%s:%u", ip_string_buf, ntohs(ssaddr->ipv4.dport));
+ host_result->val.iov_len = strlen((char *)host_result->val.iov_base);
}
else if (SESSION_ADDR_TYPE_IPV6_TCP == ssaddr_type || SESSION_ADDR_TYPE_IPV6_UDP == ssaddr_type)
{
- host_result->val.str = MEMPOOL_CALLOC(mempool, char, (INET6_ADDRSTRLEN + 7) /* "ip:port" max length */);
+ host_result->val.iov_base = MEMPOOL_CALLOC(mempool, char, (INET6_ADDRSTRLEN + 7) /* "ip:port" max length */);
inet_ntop(AF_INET6, &ssaddr->ipv6.daddr, ip_string_buf, INET6_ADDRSTRLEN);
- sprintf(host_result->val.str, "%s:%u", ip_string_buf, ntohs(ssaddr->ipv6.dport));
- host_result->val.str_len = strlen(host_result->val.str);
+ sprintf((char *)host_result->val.iov_base, "%s:%u", ip_string_buf, ntohs(ssaddr->ipv6.dport));
+ host_result->val.iov_len = strlen((char *)host_result->val.iov_base);
}
else
{
@@ -944,30 +943,43 @@ static void using_session_addr_as_host(struct session *ref_session,
void http_decoder_join_url(struct http_decoder_half_data *hfdata, nmx_pool_t *mempool, const struct http_header *host_hdr)
{
int append_slash_len = 0;
- if ('/' != hfdata->joint_url.str[0])
+ if ('/' != ((char *)hfdata->joint_url.iov_base)[0])
{
append_slash_len = 1;
}
- int url_cache_str_len = host_hdr->val.str_len + hfdata->joint_url.str_len + append_slash_len;
+ int url_cache_str_len = host_hdr->val.iov_len + hfdata->joint_url.iov_len + append_slash_len;
char *url_cache_str = MEMPOOL_CALLOC(mempool, char, url_cache_str_len);
char *ptr = url_cache_str;
- memcpy(ptr, host_hdr->val.str, host_hdr->val.str_len);
- ptr += host_hdr->val.str_len;
+ memcpy(ptr, host_hdr->val.iov_base, host_hdr->val.iov_len);
+ ptr += host_hdr->val.iov_len;
if (append_slash_len)
{
*ptr = '/';
ptr++;
}
- memcpy(ptr, hfdata->joint_url.str, hfdata->joint_url.str_len);
+ memcpy(ptr, hfdata->joint_url.iov_base, hfdata->joint_url.iov_len);
- MEMPOOL_FREE(mempool, hfdata->joint_url.str); // free the cached uri buffer
- hfdata->joint_url.str = url_cache_str;
- hfdata->joint_url.str_len = url_cache_str_len;
+ MEMPOOL_FREE(mempool, hfdata->joint_url.iov_base); // free the cached uri buffer
+ hfdata->joint_url.iov_base = url_cache_str;
+ hfdata->joint_url.iov_len = url_cache_str_len;
hfdata->joint_url_complete = 1;
}
+void http_decoder_get_url(struct http_decoder_half_data *hfdata, nmx_pool_t *mempool)
+{
+ struct http_request_line reqline = {};
+ http_decoder_half_data_get_request_line(hfdata, &reqline);
+ if(unlikely(strncasecmp_safe("CONNECT", (char *)reqline.method.iov_base, 7, reqline.method.iov_len) == 0))
+ {
+ hfdata->joint_url.iov_base = MEMPOOL_CALLOC(mempool, char, reqline.uri.iov_len+1);
+ memcpy(hfdata->joint_url.iov_base, reqline.uri.iov_base, reqline.uri.iov_len);
+ hfdata->joint_url.iov_len = reqline.uri.iov_len;
+ hfdata->joint_url_complete = 1;
+ }
+}
+
int http_decoder_join_url_finally(struct http_event_context *ev_ctx,
struct http_decoder_half_data *hfdata,
nmx_pool_t *mempool)
@@ -981,14 +993,14 @@ int http_decoder_join_url_finally(struct http_event_context *ev_ctx,
using_session_addr_as_host(ev_ctx->ref_session, &addr_as_host, mempool);
http_decoder_join_url(hfdata, mempool, &addr_as_host);
- MEMPOOL_FREE(mempool, addr_as_host.val.str); // free session addr to host buffer
+ MEMPOOL_FREE(mempool, addr_as_host.val.iov_base); // free session addr to host buffer
return 1;
}
void http_decoder_get_host_feed_url(struct http_decoder_half *half)
{
struct http_header host_result = {};
- struct hstring host_key = {(char *)"Host", 4};
+ hstring host_key = {(char *)"Host", 4};
const char *host_refer_str = NULL;
int host_refer_len = 0;
@@ -1007,14 +1019,14 @@ void http_decoder_get_host_feed_url(struct http_decoder_half *half)
http_decoder_join_url(half->ref_data, half->http_ev_ctx->ref_mempool, &host_result);
}
-int http_half_data_get_url(struct http_decoder_half_data *res_data, struct hstring *url)
+int http_half_data_get_url(struct http_decoder_half_data *res_data, hstring *url)
{
if (0 == res_data->joint_url_complete)
{
return -1;
}
- url->str = res_data->joint_url.str;
- url->str_len = res_data->joint_url.str_len;
+ url->iov_base = res_data->joint_url.iov_base;
+ url->iov_len = res_data->joint_url.iov_len;
return 0;
}
@@ -1033,24 +1045,20 @@ int http_half_data_get_total_parsed_header_count(struct http_decoder_half_data *
return http_decoder_table_get_total_parsed_header(half_data->table);
}
-void http_half_pre_context_free(struct session *sess, struct http_decoder_env *httpd_env,
- struct http_decoder_exdata *ex_data)
+void http_half_pre_context_free(struct session *sess, struct http_decoder_exdata *exdata)
{
- if (NULL == ex_data)
- {
- return;
- }
struct http_message *msg = NULL;
struct http_decoder_half_data *req_data;
struct http_decoder_half_data *res_data;
- struct http_decoder_result_queue *queue = ex_data->queue;
+ struct http_decoder_result_queue *queue = exdata->queue;
+
for(int i = 0; i < queue->queue_size; i++){
req_data = queue->array[i].req_data;
res_data = queue->array[i].res_data;
if ((req_data != NULL) && (NULL == res_data) && (req_data->state < HTTP_EVENT_REQ_END))
{
msg = http_message_new(HTTP_TRANSACTION_FREE, queue, i, HTTP_REQUEST);
- session_mq_publish_message(sess, httpd_env->httpd_msg_topic_id, msg);
+ session_mq_publish_message(sess, exdata->pub_topic_id, msg);
}
}
@@ -1059,7 +1067,7 @@ void http_half_pre_context_free(struct session *sess, struct http_decoder_env *h
if ((res_data != NULL) && (res_data->state < HTTP_EVENT_RES_END))
{
msg = http_message_new(HTTP_TRANSACTION_FREE, queue, i, HTTP_RESPONSE);
- session_mq_publish_message(sess, httpd_env->httpd_msg_topic_id, msg);
+ session_mq_publish_message(sess, exdata->pub_topic_id, msg);
}
}
}
@@ -1067,4 +1075,11 @@ void http_half_pre_context_free(struct session *sess, struct http_decoder_env *h
void http_half_update_state(struct http_decoder_half_data *hf_data, enum http_event state)
{
hf_data->state = state;
+}
+
+void http_half_get_max_transaction_seq(struct http_decoder_exdata *exdata, long long *max_req_seq, long long *max_res_seq)
+{
+ assert(exdata && max_req_seq && max_res_seq);
+ *max_req_seq = exdata->decoder->c2s_half->transaction_seq;
+ *max_res_seq = exdata->decoder->s2c_half->transaction_seq;
} \ No newline at end of file
diff --git a/src/http_decoder_half.h b/src/http_decoder_half.h
index e918e6e..5183171 100644
--- a/src/http_decoder_half.h
+++ b/src/http_decoder_half.h
@@ -30,7 +30,7 @@ enum http_event {
};
struct http_event_context {
- int topic_id;
+ struct http_decoder_exdata *ref_httpd_ctx;
nmx_pool_t *ref_mempool;
struct session *ref_session;
struct http_decoder_result_queue *ref_queue;
@@ -43,12 +43,12 @@ typedef void http_event_cb(enum http_event event, struct http_decoder_half_data
struct http_event_context *ev_ctx, void *httpd_plugin_env);
struct http_decoder_half *
-http_decoder_half_new(nmx_pool_t *mempool, http_event_cb *event_cb, enum llhttp_type http_type,
- int decompress_switch, struct http_decoder_env *httpd_env);
+http_decoder_half_new(struct http_decoder_exdata *hd_ctx, nmx_pool_t *mempool, http_event_cb *event_cb,
+ enum llhttp_type http_type, int decompress_switch, struct http_decoder_env *httpd_env,long long start_seq);
void http_decoder_half_free(nmx_pool_t *mempool, struct http_decoder_half *half);
-void http_decoder_half_reinit(struct http_decoder_half *half, int topic_id,
+void http_decoder_half_reinit(struct http_decoder_half *half,
struct http_decoder_result_queue *queue,
nmx_pool_t *mempool, struct session *sess);
@@ -69,32 +69,33 @@ int http_decoder_half_data_get_response_line(struct http_decoder_half_data *data
struct http_response_line *line);
int http_decoder_half_data_get_header(const struct http_decoder_half_data *data,
- const struct hstring *key, struct http_header *hdr_res);
+ const hstring *key, struct http_header *hdr_res);
int http_decoder_half_data_iter_header(struct http_decoder_half_data *data,
struct http_header *header);
int http_decoder_half_data_reset_header_iter(struct http_decoder_half_data *req_data);
int http_decoder_half_data_has_parsed_header(struct http_decoder_half_data *data);
-int http_decoder_half_data_get_raw_body(const struct http_decoder_half_data *data, struct hstring *body);
+int http_decoder_half_data_get_raw_body(const struct http_decoder_half_data *data, hstring *body);
-int http_decoder_half_data_get_decompress_body(const struct http_decoder_half_data *data, struct hstring *body);
+int http_decoder_half_data_get_decompress_body(const struct http_decoder_half_data *data, hstring *body);
void http_decoder_half_data_dump(struct http_decoder_half *half);
void http_decoder_get_host_feed_url(struct http_decoder_half *half);
+void http_decoder_get_url(struct http_decoder_half_data *hfdata, nmx_pool_t *mempool);
void http_decoder_join_url(struct http_decoder_half_data *hfdata,
nmx_pool_t *mempool,
const struct http_header *host_hdr);
int http_decoder_join_url_finally(struct http_event_context *ev_ctx,
struct http_decoder_half_data *hfdata,
nmx_pool_t *mempool);
-int http_half_data_get_url(struct http_decoder_half_data *res_data, struct hstring *url);
+int http_half_data_get_url(struct http_decoder_half_data *res_data, hstring *url);
int http_half_data_get_transaction_seq(struct http_decoder_half_data *hf_data);
void http_half_data_update_commit_index(struct http_decoder_half_data * half_data);
-void http_half_pre_context_free(struct session *sess, struct http_decoder_env *httpd_env,
- struct http_decoder_exdata *ex_data);
+void http_half_pre_context_free(struct session *sess, struct http_decoder_exdata *exdata);
void http_half_update_state(struct http_decoder_half_data *hf_data, enum http_event state);
-int http_half_data_get_total_parsed_header_count(struct http_decoder_half_data * half_data);
+int http_half_data_get_total_parsed_header_count(struct http_decoder_half_data * half_data);
+void http_half_get_max_transaction_seq(struct http_decoder_exdata *exdata, long long *max_req_seq, long long *max_res_seq);
#endif \ No newline at end of file
diff --git a/src/http_decoder_inc.h b/src/http_decoder_inc.h
index 760eaba..0e0f0c5 100644
--- a/src/http_decoder_inc.h
+++ b/src/http_decoder_inc.h
@@ -31,6 +31,7 @@ extern "C"
#include "http_decoder_result_queue.h"
#include "http_decoder_utils.h"
#include "http_decoder_stat.h"
+#include "http_decoder_tunnel.h"
#include "fieldstat/fieldstat_easy.h"
#include "toml/toml.h"
@@ -82,6 +83,7 @@ struct http_message
enum http_message_type type;
size_t queue_index;
struct http_decoder_result_queue *ref_queue;
+ hstring tunnel_payload;
};
struct http_decoder
@@ -90,19 +92,45 @@ struct http_decoder
struct http_decoder_half *s2c_half;
};
+enum httpd_topic_index{
+ HTTPD_TOPIC_TCP_STREAM_INDEX = 0,
+ HTTPD_TOPIC_HTTP_MSG_INDEX,
+ HTTPD_TOPIC_HTTP_TUNNEL_INDEX,
+ HTTPD_TOPIC_INDEX_MAX,
+};
+
struct http_decoder_exdata
{
+ int sub_topic_id; //tcp_stream
+ int pub_topic_id; //http message or http tunnel msg
struct http_decoder_result_queue *queue;
struct http_decoder *decoder;
nmx_pool_t *mempool;
+ enum http_tunnel_state tunnel_state;
+ int in_tunnel_is_http;
+};
+
+// struct http_decoder_context{
+// int array_size;
+// struct http_decoder_exdata **exdata_array; //raw tcp stream for http msg; http tunnel for inner http transaction.
+// };
+
+struct http_topic_exdata_compose{
+ enum httpd_topic_index index;
+ const char *topic_name;
+ on_session_msg_cb_func *on_msg_cb;
+ session_msg_free_cb_func *msg_free_cb;
+ const char *exdata_name;
+ session_exdata_free *exdata_free_cb;
+ int sub_topic_id; //as consumer
+ int exdata_id;
};
struct http_decoder_env
{
- int plugin_id;
- int httpd_msg_topic_id;
- int ex_data_idx;
struct stellar *st;
+ int plugin_id;
+ struct http_topic_exdata_compose topic_exdata_compose[HTTPD_TOPIC_INDEX_MAX];
struct http_decoder_config hd_cfg;
struct http_decoder_stat hd_stat;
};
@@ -111,7 +139,7 @@ struct http_message;
struct http_message *http_message_new(enum http_message_type type, struct http_decoder_result_queue *queue,
int queue_index, unsigned char flow_type);
-
+int http_topic_exdata_compose_get_index(const struct http_decoder_env *httpd_env, int by_topic_id);
#ifdef __cplusplus
}
#endif
diff --git a/src/http_decoder_stat.cpp b/src/http_decoder_stat.cpp
index dfd1a2e..a2b6205 100644
--- a/src/http_decoder_stat.cpp
+++ b/src/http_decoder_stat.cpp
@@ -1,5 +1,6 @@
#include <assert.h>
#include <stdio.h>
+#include <pthread.h>
#include <unistd.h>
#include "http_decoder_inc.h"
@@ -26,6 +27,12 @@ static const struct hd_stat_config_tuple g_httpd_stat_tuple[HTTPD_STAT_MAX] =
void http_decoder_stat_free(struct http_decoder_stat *hd_stat)
{
+ pthread_cancel(hd_stat->timer_pid);
+ void *join_res = NULL;
+ do{
+ pthread_join(hd_stat->timer_pid, &join_res);
+ }while(join_res != PTHREAD_CANCELED);
+
if(hd_stat->stats != NULL){
free(hd_stat->stats);
}
@@ -34,6 +41,19 @@ void http_decoder_stat_free(struct http_decoder_stat *hd_stat)
}
}
+static void *httpd_stat_timer_thread(void *arg)
+{
+ pthread_setname_np(pthread_self(), "http_decoder_timer_thread");
+ struct http_decoder_stat *hd_stat = (struct http_decoder_stat *)arg;
+ struct timespec res;
+ while(1){
+ clock_gettime(CLOCK_MONOTONIC, &res);
+ hd_stat->current_time_ms = (res.tv_sec * 1000) + (res.tv_nsec / 1000000);
+ usleep(800);
+ }
+ return NULL;
+}
+
int http_decoder_stat_init(struct http_decoder_stat *hd_stat, int thread_max, int stat_interval_pkts, int stat_interval_time)
{
assert(sizeof(g_httpd_stat_tuple)/sizeof(struct hd_stat_config_tuple) == HTTPD_STAT_MAX);
@@ -64,6 +84,8 @@ int http_decoder_stat_init(struct http_decoder_stat *hd_stat, int thread_max, in
return -1;
}
+ pthread_create(&hd_stat->timer_pid, NULL, httpd_stat_timer_thread, hd_stat);
+
return 0;
}
@@ -73,15 +95,16 @@ void http_decoder_stat_update(struct http_decoder_stat *hd_stat, int thread_id,
assert(thread_id >= 0);
assert(type < HTTPD_STAT_MAX);
- hd_stat->stats[thread_id].counter[type] += value;
- hd_stat->stats[thread_id].batch++;
+ struct hd_statistics *cur_hds = &hd_stat->stats[thread_id];
- if(hd_stat->stats[thread_id].batch >= hd_stat->stat_interval_pkts){
- for(int i = 0; i < HTTPD_STAT_MAX; i++){
- //update all type, maybe decrease performance ?
- fieldstat_easy_counter_incrby(hd_stat->fse, thread_id, hd_stat->field_stat_id[i], NULL, 0, hd_stat->stats[thread_id].counter[i]);
- hd_stat->stats[thread_id].counter[i] = 0;
- }
- hd_stat->stats[thread_id].batch = 0;
+ cur_hds->counter[type] += value;
+ cur_hds->batch[type]++;
+
+ if(cur_hds->batch[type] >= hd_stat->stat_interval_pkts
+ || cur_hds->time_ms[type] + 1000 < hd_stat->current_time_ms){
+ fieldstat_easy_counter_incrby(hd_stat->fse, thread_id, hd_stat->field_stat_id[type], NULL, 0, cur_hds->counter[type]);
+ cur_hds->counter[type] = 0;
+ cur_hds->batch[type] = 0;
+ cur_hds->time_ms[type] = hd_stat->current_time_ms;
}
-}
+} \ No newline at end of file
diff --git a/src/http_decoder_stat.h b/src/http_decoder_stat.h
index 235475d..e8f18d8 100644
--- a/src/http_decoder_stat.h
+++ b/src/http_decoder_stat.h
@@ -32,25 +32,19 @@ struct hd_stat_config_tuple
struct hd_statistics
{
- // long long incoming_bytes;
- // long long incoming_tcp_seg;
- // long long session_new;
- // long long session_free;
- // long long transaction_new;
- // long long transaction_free;
- // long long incoming_trans;
- // long long err_pkts;
-
+ long long time_ms[HTTPD_STAT_MAX];
long long counter[HTTPD_STAT_MAX];
- int batch; //call fieldstat_easy_counter_incrby() per batch
+ int batch[HTTPD_STAT_MAX]; //call fieldstat_easy_counter_incrby() per batch
}__attribute__ ((aligned (64)));
struct http_decoder_stat
{
+ pthread_t timer_pid;
+ long long current_time_ms;
struct fieldstat_easy *fse;
int stat_interval_pkts; // call fieldstat_incrby every stat_interval_pkts
int field_stat_id[HTTPD_STAT_MAX];
- struct hd_statistics *stats; //multi thread
+ struct hd_statistics *stats; //size is thread number
};
int http_decoder_stat_init(struct http_decoder_stat *hd_stat, int thread_max, int stat_interval_pkts, int stat_interval_time);
diff --git a/src/http_decoder_string.cpp b/src/http_decoder_string.cpp
index e10b4a0..c74030c 100644
--- a/src/http_decoder_string.cpp
+++ b/src/http_decoder_string.cpp
@@ -35,8 +35,8 @@ void http_decoder_string_refer(struct http_decoder_string *rstr,
switch (rstr->state) {
case STRING_STATE_INIT:
case STRING_STATE_CACHE:
- rstr->refer.str = (char *)at;
- rstr->refer.str_len = length;
+ rstr->refer.iov_base = (char *)at;
+ rstr->refer.iov_len = length;
break;
default:
abort();
@@ -48,60 +48,60 @@ void http_decoder_string_refer(struct http_decoder_string *rstr,
static void string_refer2cache(struct http_decoder_string *rstr)
{
- if (0 == rstr->refer.str_len) {
+ if (0 == rstr->refer.iov_len) {
return;
}
- if (rstr->cache.str_len >= rstr->max_cache_size) {
+ if (rstr->cache.iov_len >= rstr->max_cache_size) {
return;
}
- size_t length = rstr->cache.str_len + rstr->refer.str_len;
+ size_t length = rstr->cache.iov_len + rstr->refer.iov_len;
if (length > rstr->max_cache_size) {
length = rstr->max_cache_size;
}
- if (NULL == rstr->cache.str) {
- rstr->cache.str = CALLOC(char, length + 1);
- memcpy(rstr->cache.str, rstr->refer.str, length);
+ if (NULL == rstr->cache.iov_base) {
+ rstr->cache.iov_base = CALLOC(char, length + 1);
+ memcpy(rstr->cache.iov_base, rstr->refer.iov_base, length);
} else {
- rstr->cache.str = REALLOC(char, rstr->cache.str, length + 1);
- memcpy(rstr->cache.str + rstr->cache.str_len, rstr->refer.str,
- (length - rstr->cache.str_len));
+ rstr->cache.iov_base = REALLOC(char, rstr->cache.iov_base, length + 1);
+ memcpy((char *)rstr->cache.iov_base + rstr->cache.iov_len, rstr->refer.iov_base,
+ (length - rstr->cache.iov_len));
}
- rstr->cache.str_len = length;
- rstr->refer.str = NULL;
- rstr->refer.str_len = 0;
+ rstr->cache.iov_len = length;
+ rstr->refer.iov_base = NULL;
+ rstr->refer.iov_len = 0;
}
static void string_commit2cache(struct http_decoder_string *rstr)
{
- if (rstr->cache.str_len == rstr->commit.str_len &&
- rstr->cache.str == rstr->commit.str) {
- rstr->commit.str = NULL;
- rstr->commit.str_len = 0;
+ if (rstr->cache.iov_len == rstr->commit.iov_len &&
+ rstr->cache.iov_base == rstr->commit.iov_base) {
+ rstr->commit.iov_base = NULL;
+ rstr->commit.iov_len = 0;
return;
}
//Only http header key need to backward to cache
size_t length = 0;
- if (rstr->commit.str_len > rstr->max_cache_size) {
+ if (rstr->commit.iov_len > rstr->max_cache_size) {
length = rstr->max_cache_size;
} else {
- length = rstr->commit.str_len;
+ length = rstr->commit.iov_len;
}
if (length > 0) {
- if (NULL == rstr->cache.str) {
- rstr->cache.str = CALLOC(char, length + 1);
+ if (NULL == rstr->cache.iov_base) {
+ rstr->cache.iov_base = CALLOC(char, length + 1);
} else {
abort();
}
- memcpy(rstr->cache.str, rstr->commit.str, length);
- rstr->cache.str_len = length;
+ memcpy(rstr->cache.iov_base, rstr->commit.iov_base, length);
+ rstr->cache.iov_len = length;
- rstr->commit.str = NULL;
- rstr->commit.str_len = 0;
+ rstr->commit.iov_base = NULL;
+ rstr->commit.iov_len = 0;
}
}
@@ -136,24 +136,24 @@ void http_decoder_string_commit(struct http_decoder_string *rstr)
switch (rstr->state) {
case STRING_STATE_REFER:
- if (rstr->cache.str_len) {
+ if (rstr->cache.iov_len) {
http_decoder_string_cache(rstr);
- rstr->commit.str = rstr->cache.str;
- rstr->commit.str_len = rstr->cache.str_len;
- // not overwrite rstr->cache.str
+ rstr->commit.iov_base = rstr->cache.iov_base;
+ rstr->commit.iov_len = rstr->cache.iov_len;
+ // not overwrite rstr->cache.iov_base
} else {
- rstr->commit.str = rstr->refer.str;
- rstr->commit.str_len = rstr->refer.str_len;
+ rstr->commit.iov_base = rstr->refer.iov_base;
+ rstr->commit.iov_len = rstr->refer.iov_len;
- rstr->refer.str = NULL;
- rstr->refer.str_len = 0;
+ rstr->refer.iov_base = NULL;
+ rstr->refer.iov_len = 0;
}
break;
case STRING_STATE_CACHE:
- rstr->commit.str = rstr->cache.str;
- rstr->commit.str_len = rstr->cache.str_len;
- // not overwrite rstr->cache.str
+ rstr->commit.iov_base = rstr->cache.iov_base;
+ rstr->commit.iov_len = rstr->cache.iov_len;
+ // not overwrite rstr->cache.iov_base
break;
default:
//abort();
@@ -172,7 +172,7 @@ void http_decoder_string_reset(struct http_decoder_string *rstr)
case STRING_STATE_REFER:
case STRING_STATE_CACHE:
case STRING_STATE_COMMIT:
- FREE(rstr->cache.str);
+ FREE(rstr->cache.iov_base);
memset(rstr, 0, sizeof(struct http_decoder_string));
break;
default:
@@ -197,20 +197,20 @@ void http_decoder_string_reinit(struct http_decoder_string *rstr)
}
if (rstr->state == STRING_STATE_COMMIT &&
- rstr->cache.str == rstr->commit.str &&
- rstr->cache.str_len == rstr->commit.str_len) {
+ rstr->cache.iov_base == rstr->commit.iov_base &&
+ rstr->cache.iov_len == rstr->commit.iov_len) {
return;
}
- if (rstr->cache.str != NULL) {
- FREE(rstr->cache.str);
- rstr->cache.str_len = 0;
+ if (rstr->cache.iov_base != NULL) {
+ FREE(rstr->cache.iov_base);
+ rstr->cache.iov_len = 0;
}
- rstr->refer.str = NULL;
- rstr->refer.str_len = 0;
- rstr->commit.str = NULL;
- rstr->commit.str_len = 0;
+ rstr->refer.iov_base = NULL;
+ rstr->refer.iov_len = 0;
+ rstr->commit.iov_base = NULL;
+ rstr->commit.iov_len = 0;
rstr->state = STRING_STATE_INIT;
}
@@ -219,18 +219,18 @@ enum string_state http_decoder_string_state(const struct http_decoder_string *rs
return rstr->state;
}
-int http_decoder_string_get(const struct http_decoder_string *rstr, struct hstring *out)
+int http_decoder_string_get(const struct http_decoder_string *rstr, hstring *out)
{
if (NULL == rstr || NULL == out) {
return -1;
}
if (http_decoder_string_state(rstr) == STRING_STATE_COMMIT) {
- out->str = rstr->commit.str;
- out->str_len = rstr->commit.str_len;
+ out->iov_base = rstr->commit.iov_base;
+ out->iov_len = rstr->commit.iov_len;
} else {
- out->str = NULL;
- out->str_len = 0;
+ out->iov_base = NULL;
+ out->iov_len = 0;
}
return 0;
@@ -242,15 +242,15 @@ void http_decoder_string_dump(struct http_decoder_string *rstr, const char *desc
return;
}
- char *refer_str = safe_dup(rstr->refer.str, rstr->refer.str_len);
- char *cache_str = safe_dup(rstr->cache.str, rstr->cache.str_len);
- char *commit_str = safe_dup(rstr->commit.str, rstr->commit.str_len);
+ char *refer_str = safe_dup((char *)rstr->refer.iov_base, rstr->refer.iov_len);
+ char *cache_str = safe_dup((char *)rstr->cache.iov_base, rstr->cache.iov_len);
+ char *commit_str = safe_dup((char *)rstr->commit.iov_base, rstr->commit.iov_len);
- printf("%s: state: %s, refer: {len: %02zu, str: %s}, cache: {len: %02zu, str: %s}, commit: {len: %02zu, str: %s}\n",
+ printf("%s: state: %s, refer: {len: %02zu, iov_base: %s}, cache: {len: %02zu, iov_base: %s}, commit: {len: %02zu, iov_base: %s}\n",
desc, string_state_to_desc(rstr->state),
- rstr->refer.str_len, refer_str,
- rstr->cache.str_len, cache_str,
- rstr->commit.str_len, commit_str);
+ rstr->refer.iov_len, refer_str,
+ rstr->cache.iov_len, cache_str,
+ rstr->commit.iov_len, commit_str);
FREE(refer_str);
FREE(cache_str);
diff --git a/src/http_decoder_string.h b/src/http_decoder_string.h
index 4c95960..9fe82e1 100644
--- a/src/http_decoder_string.h
+++ b/src/http_decoder_string.h
@@ -44,9 +44,9 @@ enum string_state {
//http decoder string
struct http_decoder_string {
- struct hstring refer; // shallow copy
- struct hstring cache; // deep copy
- struct hstring commit;
+ hstring refer; // shallow copy
+ hstring cache; // deep copy
+ hstring commit;
enum string_state state;
size_t max_cache_size;
@@ -68,7 +68,7 @@ void http_decoder_string_reinit(struct http_decoder_string *rstr);
enum string_state http_decoder_string_state(const struct http_decoder_string *rstr);
-int http_decoder_string_get(const struct http_decoder_string *rstr, struct hstring *out);
+int http_decoder_string_get(const struct http_decoder_string *rstr, hstring *out);
void http_decoder_string_dump(struct http_decoder_string *rstr, const char *desc);
#endif \ No newline at end of file
diff --git a/src/http_decoder_table.cpp b/src/http_decoder_table.cpp
index d1fc6e9..0ed6313 100644
--- a/src/http_decoder_table.cpp
+++ b/src/http_decoder_table.cpp
@@ -76,30 +76,30 @@ void http_decoder_table_free(struct http_decoder_table *table)
if (NULL == table) {
return;
}
- if (table->uri.cache.str != NULL) {
- FREE(table->uri.cache.str);
+ if (table->uri.cache.iov_base != NULL) {
+ FREE(table->uri.cache.iov_base);
}
- if (table->status.cache.str != NULL) {
- FREE(table->status.cache.str);
+ if (table->status.cache.iov_base != NULL) {
+ FREE(table->status.cache.iov_base);
}
- if (table->method.cache.str != NULL) {
- FREE(table->method.cache.str);
+ if (table->method.cache.iov_base != NULL) {
+ FREE(table->method.cache.iov_base);
}
- if (table->version.cache.str != NULL) {
- FREE(table->version.cache.str);
+ if (table->version.cache.iov_base != NULL) {
+ FREE(table->version.cache.iov_base);
}
- if (table->body.cache.str != NULL) {
- FREE(table->body.cache.str);
+ if (table->body.cache.iov_base != NULL) {
+ FREE(table->body.cache.iov_base);
}
if (table->headers != NULL) {
for (size_t i = 0; i < table->header_cnt; i++) {
- if (table->headers[i].key.cache.str != NULL) {
- FREE(table->headers[i].key.cache.str);
+ if (table->headers[i].key.cache.iov_base != NULL) {
+ FREE(table->headers[i].key.cache.iov_base);
}
- if (table->headers[i].val.cache.str != NULL) {
- FREE(table->headers[i].val.cache.str);
+ if (table->headers[i].val.cache.iov_base != NULL) {
+ FREE(table->headers[i].val.cache.iov_base);
}
}
@@ -381,7 +381,7 @@ void http_decoder_table_dump(struct http_decoder_table *table)
}
}
-int http_decoder_table_get_uri(const struct http_decoder_table *table, struct hstring *out)
+int http_decoder_table_get_uri(const struct http_decoder_table *table, hstring *out)
{
if (NULL == table || NULL == out) {
return -1;
@@ -389,7 +389,7 @@ int http_decoder_table_get_uri(const struct http_decoder_table *table, struct hs
return http_decoder_string_get(&table->uri, out);
}
-int http_decoder_table_get_method(const struct http_decoder_table *table, struct hstring *out)
+int http_decoder_table_get_method(const struct http_decoder_table *table, hstring *out)
{
if (NULL == table || NULL == out) {
return -1;
@@ -397,7 +397,7 @@ int http_decoder_table_get_method(const struct http_decoder_table *table, struct
return http_decoder_string_get(&table->method, out);
}
-int http_decoder_table_get_status(const struct http_decoder_table *table, struct hstring *out)
+int http_decoder_table_get_status(const struct http_decoder_table *table, hstring *out)
{
if (NULL == table || NULL == out) {
return -1;
@@ -405,7 +405,7 @@ int http_decoder_table_get_status(const struct http_decoder_table *table, struct
return http_decoder_string_get(&table->status, out);
}
-int http_decoder_table_get_version(const struct http_decoder_table *table, struct hstring *out)
+int http_decoder_table_get_version(const struct http_decoder_table *table, hstring *out)
{
if (NULL == table || NULL == out) {
return -1;
@@ -413,7 +413,7 @@ int http_decoder_table_get_version(const struct http_decoder_table *table, struc
return http_decoder_string_get(&table->version, out);
}
-int http_decoder_table_get_body(const struct http_decoder_table *table, struct hstring *out)
+int http_decoder_table_get_body(const struct http_decoder_table *table, hstring *out)
{
if (NULL == table || NULL == out) {
return -1;
@@ -421,22 +421,22 @@ int http_decoder_table_get_body(const struct http_decoder_table *table, struct h
return http_decoder_string_get(&table->body, out);
}
-int http_decoder_table_get_header(const struct http_decoder_table *table, const struct hstring *key,
+int http_decoder_table_get_header(const struct http_decoder_table *table, const hstring *key,
struct http_header *hdr_result)
{
for (size_t i = 0; i < table->header_cnt; i++) {
const struct http_decoder_header *tmp_header = &table->headers[i];
- if (tmp_header->key.commit.str_len != key->str_len) {
+ if (tmp_header->key.commit.iov_len != key->iov_len) {
continue;
}
if (http_decoder_string_state(&tmp_header->key) == STRING_STATE_COMMIT &&
http_decoder_string_state(&tmp_header->val) == STRING_STATE_COMMIT) {
- struct hstring tmp_key;
+ hstring tmp_key;
http_decoder_string_get(&tmp_header->key, &tmp_key);
- if (tmp_key.str_len == key->str_len &&
- (0 == strncasecmp(tmp_key.str, key->str, key->str_len))) {
+ if (tmp_key.iov_len == key->iov_len &&
+ (0 == strncasecmp((char *)tmp_key.iov_base, (char *)key->iov_base, key->iov_len))) {
http_decoder_string_get(&tmp_header->key, &hdr_result->key);
http_decoder_string_get(&tmp_header->val, &hdr_result->val);
return 0;
@@ -468,10 +468,10 @@ int http_decoder_table_iter_header(struct http_decoder_table *table,
}
}
- hdr->key.str = NULL;
- hdr->key.str_len = 0;
- hdr->val.str = NULL;
- hdr->val.str_len = 0;
+ hdr->key.iov_base = NULL;
+ hdr->key.iov_len = 0;
+ hdr->val.iov_base = NULL;
+ hdr->val.iov_len = 0;
return -1;
}
diff --git a/src/http_decoder_table.h b/src/http_decoder_table.h
index 1272d3a..4c7792a 100644
--- a/src/http_decoder_table.h
+++ b/src/http_decoder_table.h
@@ -36,18 +36,18 @@ void http_decoder_table_reinit(struct http_decoder_table *table);
void http_decoder_table_dump(struct http_decoder_table *table);
-int http_decoder_table_get_uri(const struct http_decoder_table *table, struct hstring *out);
+int http_decoder_table_get_uri(const struct http_decoder_table *table, hstring *out);
-int http_decoder_table_get_method(const struct http_decoder_table *table, struct hstring *out);
+int http_decoder_table_get_method(const struct http_decoder_table *table, hstring *out);
-int http_decoder_table_get_status(const struct http_decoder_table *table, struct hstring *out);
+int http_decoder_table_get_status(const struct http_decoder_table *table, hstring *out);
-int http_decoder_table_get_version(const struct http_decoder_table *table, struct hstring *out);
+int http_decoder_table_get_version(const struct http_decoder_table *table, hstring *out);
-int http_decoder_table_get_body(const struct http_decoder_table *table, struct hstring *out);
+int http_decoder_table_get_body(const struct http_decoder_table *table, hstring *out);
int http_decoder_table_get_header(const struct http_decoder_table *table,
- const struct hstring *key,
+ const hstring *key,
struct http_header *hdr_res);
int http_decoder_table_iter_header(struct http_decoder_table *table,
diff --git a/src/http_decoder_tunnel.cpp b/src/http_decoder_tunnel.cpp
new file mode 100644
index 0000000..67445f3
--- /dev/null
+++ b/src/http_decoder_tunnel.cpp
@@ -0,0 +1,99 @@
+#include <assert.h>
+#include <stdio.h>
+#include <string.h>
+#include <strings.h>
+#include <unistd.h>
+#include "http_decoder_inc.h"
+#include "llhttp.h"
+
+struct http_tunnel_message
+{
+ enum http_tunnel_message_type type;
+ hstring tunnel_payload;
+};
+
+
+int httpd_tunnel_identify(int curdir, struct http_decoder_half_data *hfdata)
+{
+ if(PACKET_DIRECTION_C2S == curdir){
+ struct http_request_line reqline = {};
+ http_decoder_half_data_get_request_line(hfdata, &reqline);
+ if(0 == strncasecmp_safe("CONNECT", (char *)reqline.method.iov_base,
+ 7, reqline.method.iov_len)){
+ return 1;
+ }
+ }else{
+ struct http_response_line resline = {};
+ http_decoder_half_data_get_response_line(hfdata, &resline);
+ if(resline.status_code == HTTP_STATUS_OK
+ && 0 == strncasecmp_safe("Connection established", (char *)resline.status.iov_base,
+ strlen("Connection established"), resline.status.iov_len)){
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
+int httpd_is_tunnel_session(const struct http_decoder_exdata *ex_data)
+{
+ return (ex_data->tunnel_state != HTTP_TUN_NON);
+}
+
+int httpd_in_tunnel_transmitting(struct http_decoder_exdata *ex_data)
+{
+ return (ex_data->tunnel_state >= HTTP_TUN_INNER_STARTING);
+}
+
+enum http_tunnel_message_type httpd_tunnel_state_to_msg(const struct http_decoder_exdata *ex_data)
+{
+ if(ex_data->tunnel_state == HTTP_TUN_INNER_STARTING){
+ return HTTP_TUNNEL_OPENING;
+ }
+ if(ex_data->tunnel_state == HTTP_TUN_INNER_TRANS){
+ return HTTP_TUNNEL_ACTIVE;
+ }
+ return HTTP_TUNNEL_MSG_MAX;
+}
+
+void httpd_tunnel_state_update(struct http_decoder_exdata *ex_data)
+{
+ if(ex_data->tunnel_state == HTTP_TUN_INNER_STARTING){
+ ex_data->tunnel_state = HTTP_TUN_INNER_TRANS;
+ }
+}
+
+void http_decoder_push_tunnel_data(struct session *sess, const struct http_decoder_exdata *exdata, enum http_tunnel_message_type type)
+{
+ struct http_tunnel_message *tmsg = (struct http_tunnel_message *)CALLOC(struct http_tunnel_message, 1);
+ tmsg->type = type;
+ size_t payload_len;
+ const char *payload = session_get0_current_payload(sess, &payload_len);
+ tmsg->tunnel_payload.iov_base = (char *)payload;
+ tmsg->tunnel_payload.iov_len = payload_len;
+ session_mq_publish_message(sess, exdata->pub_topic_id, tmsg);
+}
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+void http_tunnel_message_get_payload(const struct http_tunnel_message *tmsg,
+ hstring *tunnel_payload)
+{
+ if (unlikely(NULL == tmsg || tunnel_payload == NULL))
+ {
+ return;
+ }
+ tunnel_payload->iov_base = tmsg->tunnel_payload.iov_base;
+ tunnel_payload->iov_len = tmsg->tunnel_payload.iov_len;
+}
+
+enum http_tunnel_message_type http_tunnel_message_type_get(const struct http_tunnel_message *tmsg)
+{
+ return tmsg->type;
+}
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/src/http_decoder_tunnel.h b/src/http_decoder_tunnel.h
new file mode 100644
index 0000000..b1e74df
--- /dev/null
+++ b/src/http_decoder_tunnel.h
@@ -0,0 +1,21 @@
+#pragma once
+
+#include "http_decoder_half.h"
+#include "llhttp.h"
+
+enum http_tunnel_state{
+ HTTP_TUN_NON = 0, // init, or not tunnel session
+ HTTP_TUN_C2S_HDR_START, //CONNECT ...
+ HTTP_TUN_C2S_HDR_END, //CONNECT request all heades end with \r\n
+ HTTP_TUN_S2C_START, // HTTP 200 connet established
+ // HTTP_TUN_S2C_END, // http response all heades end with \r\n
+ HTTP_TUN_INNER_STARTING, // http inner tunnel protocol starting
+ HTTP_TUN_INNER_TRANS, // http inner tunnel protocol transmitting
+};
+
+int httpd_tunnel_identify(int curdir, struct http_decoder_half_data *hfdata);
+int httpd_is_tunnel_session(const struct http_decoder_exdata *ex_data);
+int httpd_in_tunnel_transmitting(struct http_decoder_exdata *ex_data);
+void httpd_tunnel_state_update(struct http_decoder_exdata *ex_data);
+void http_decoder_push_tunnel_data(struct session *sess, const struct http_decoder_exdata *exdata, enum http_tunnel_message_type type);
+enum http_tunnel_message_type httpd_tunnel_state_to_msg(const struct http_decoder_exdata *ex_data); \ No newline at end of file
diff --git a/src/http_decoder_utils.cpp b/src/http_decoder_utils.cpp
index 5686e2d..6d71bdb 100644
--- a/src/http_decoder_utils.cpp
+++ b/src/http_decoder_utils.cpp
@@ -12,6 +12,17 @@ char *safe_dup(const char *str, size_t len)
return dup;
}
+int strncasecmp_safe(const char *fix_s1, const char *dyn_s2, size_t fix_n1, size_t dyn_n2)
+{
+ if (fix_s1 == NULL || dyn_s2 == NULL) {
+ return -1;
+ }
+ if(fix_n1 != dyn_n2){
+ return -1;
+ }
+ return strncasecmp(fix_s1, dyn_s2, fix_n1);
+}
+
const char *http_message_type_to_string(enum http_message_type type)
{
const char *sname = "unknown_msg_type";
@@ -134,4 +145,13 @@ int http_event_is_req(enum http_event event)
break;
}
return -1;
+}
+
+int stellar_session_mq_get_topic_id_reliable(struct stellar *st, const char *topic_name, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
+{
+ int topic_id = stellar_session_mq_get_topic_id(st, topic_name);
+ if(topic_id < 0){
+ topic_id = stellar_session_mq_create_topic(st, topic_name, msg_free_cb, msg_free_arg);
+ }
+ return topic_id;
} \ No newline at end of file
diff --git a/src/http_decoder_utils.h b/src/http_decoder_utils.h
index 0661641..5b09d50 100644
--- a/src/http_decoder_utils.h
+++ b/src/http_decoder_utils.h
@@ -5,9 +5,11 @@
#include <stdio.h>
char *safe_dup(const char *str, size_t len);
+int strncasecmp_safe(const char *fix_s1, const char *dyn_s2, size_t fix_n1, size_t dyn_n2);
const char *http_message_type_to_string(enum http_message_type type);
int http_message_type_is_req(struct session *sess, enum http_message_type msg_type);
int http_event_is_req(enum http_event event);
+int stellar_session_mq_get_topic_id_reliable(struct stellar *st, const char *topic_name, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
/******************************************************************************
* Logger
******************************************************************************/
diff --git a/src/version.map b/src/version.map
index be433c2..15d1d95 100644
--- a/src/version.map
+++ b/src/version.map
@@ -5,6 +5,7 @@ global:
http_decoder_init;
http_decoder_exit;
http_decoder_tcp_stream_msg_cb;
+ http_tunnel_message_*;
};
local: *;
}; \ No newline at end of file