summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--conf/gtest_entry.toml1
-rw-r--r--include/http_decoder.h71
-rw-r--r--src/CMakeLists.txt2
-rw-r--r--src/http_decoder.cpp408
-rw-r--r--src/http_decoder_half.cpp139
-rw-r--r--src/http_decoder_half.h23
-rw-r--r--src/http_decoder_inc.h36
-rw-r--r--src/http_decoder_stat.cpp43
-rw-r--r--src/http_decoder_stat.h16
-rw-r--r--src/http_decoder_string.cpp118
-rw-r--r--src/http_decoder_string.h8
-rw-r--r--src/http_decoder_table.cpp56
-rw-r--r--src/http_decoder_table.h12
-rw-r--r--src/http_decoder_tunnel.cpp99
-rw-r--r--src/http_decoder_tunnel.h21
-rw-r--r--src/http_decoder_utils.cpp20
-rw-r--r--src/http_decoder_utils.h2
-rw-r--r--src/version.map1
-rw-r--r--test/http_decoder_gtest.cpp156
-rw-r--r--test/http_decoder_perf_main.cpp56
-rw-r--r--test/http_decoder_perf_plug.cpp40
-rw-r--r--test/http_pcap/http_tunnel_for_http.pcapbin0 -> 4233 bytes
-rw-r--r--test/test_result_json/http_inner_tunnel_for_http.json15
-rw-r--r--test/test_result_json/http_inner_tunnel_for_pop3.json15
-rw-r--r--test/test_result_json/http_msg_type_state_tunnel.json26
-rw-r--r--test/test_result_json/http_tunnel_for_http.json62
-rw-r--r--test/test_result_json/http_tunnel_for_pop3.json5
-rw-r--r--test_based_on_stellar/CMakeLists.txt38
28 files changed, 1043 insertions, 446 deletions
diff --git a/conf/gtest_entry.toml b/conf/gtest_entry.toml
index 03c5660..35e32ca 100644
--- a/conf/gtest_entry.toml
+++ b/conf/gtest_entry.toml
@@ -1,2 +1,3 @@
[entry]
name="http_decoder_test_state_entry"
+topic="HTTP_DECODER_MESSAGE"
diff --git a/include/http_decoder.h b/include/http_decoder.h
index 9ee6ff4..7439016 100644
--- a/include/http_decoder.h
+++ b/include/http_decoder.h
@@ -1,10 +1,12 @@
-#ifndef _HTTP_DECODER_H_
-#define _HTTP_DECODER_H_
+#pragma once
#ifdef __cplusplus
extern "C"
{
#endif
+#include <bits/types/struct_iovec.h>
+
+typedef struct iovec hstring;
enum http_message_type {
HTTP_TRANSACTION_NEW,
@@ -22,87 +24,88 @@ enum http_message_type {
HTTP_MESSAGE_RES_BODY_END,
HTTP_TRANSACTION_FREE,
-
- HTTP_MESSAGE_MAX
-};
-struct hstring {
- char *str;
- size_t str_len;
+ HTTP_MESSAGE_MAX
};
struct http_header {
- struct hstring key;
- struct hstring val;
+ struct iovec key;
+ struct iovec val;
};
struct http_request_line {
- struct hstring method;
- struct hstring uri;
- struct hstring version;
+ struct iovec method;
+ struct iovec uri;
+ struct iovec version;
int major_version;
int minor_version;
};
struct http_response_line {
- struct hstring version;
- struct hstring status;
+ struct iovec version;
+ struct iovec status;
int major_version;
int minor_version;
int status_code;
};
struct http_message;
-#define HTTP_DECODER_TOPIC "HTTP_DECODER_MESSAGE"
+#define HTTP_DECODER_TOPIC "HTTP_DECODER_MESSAGE"
enum http_message_type http_message_type_get(const struct http_message *msg);
-void http_message_get_request_line(const struct http_message *msg,
- struct http_request_line *line);
+void http_message_get_request_line(const struct http_message *msg, struct http_request_line *line);
-void http_message_get_response_line(const struct http_message *msg,
- struct http_response_line *line);
+void http_message_get_response_line(const struct http_message *msg, struct http_response_line *line);
/*
* Pay attention: key->str is case-insensitive.
*/
-void http_message_get_header(const struct http_message *msg,
- const struct hstring *key,
- struct http_header *hdr_result);
+void http_message_get_header(const struct http_message *msg, const struct iovec *key, struct http_header *hdr_result);
/**
* @brief loop reading all headers.
*
* @retval succeed( >= 0) failed(-1)
*/
-int http_message_header_next(const struct http_message *msg,
- struct http_header *header);
+int http_message_header_next(const struct http_message *msg, struct http_header *header);
/**
* @retval succeed( >= 0) failed(-1)
*/
int http_message_reset_header_iter(struct http_message *msg);
-
-void http_message_get_raw_body(const struct http_message *msg,
- struct hstring *body);
+void http_message_get_raw_body(const struct http_message *msg, struct iovec *body);
/**
* @brief If the body hasn't been compressed, same as http_message_get_raw_body().
*
*/
-void http_message_get_decompress_body(const struct http_message *msg,
- struct hstring *body);
+void http_message_get_decompress_body(const struct http_message *msg, struct iovec *body);
-void http_message_get_url(const struct http_message *msg,
- struct hstring *url);
+void http_message_get_url(const struct http_message *msg, struct iovec *url);
/**
* @retval succeed( >= 0) failed(-1)
*/
int http_message_get_transaction_seq(const struct http_message *msg);
+
+
+/************************************************************
+* HTTP TUNNEL WITH CONNECT METHOD.
+*************************************************************/
+struct http_tunnel_message;
+#define HTTP_DECODER_TUNNEL_TOPIC "HTTP_DECODER_TUNNEL_MESSAGE"
+
+enum http_tunnel_message_type {
+ HTTP_TUNNEL_OPENING,
+ HTTP_TUNNEL_ACTIVE,
+ HTTP_TUNNEL_CLOSING,
+ HTTP_TUNNEL_MSG_MAX
+};
+enum http_tunnel_message_type http_tunnel_message_type_get(const struct http_tunnel_message *tmsg);
+void http_tunnel_message_get_payload(const struct http_tunnel_message *tmsg, struct iovec *tunnel_payload);
+
#ifdef __cplusplus
}
#endif
-
-#endif \ No newline at end of file
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 1d3c949..469591b 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -10,7 +10,7 @@ aux_source_directory(${PROJECT_SOURCE_DIR}/deps/toml DEPS_SRC)
set(HTTP_SRC ${DEPS_SRC} http_decoder.cpp http_decoder_utils.cpp http_decoder_half.cpp
http_decoder_table.cpp http_decoder_string.cpp http_content_decompress.cpp
- http_decoder_result_queue.cpp http_decoder_stat.cpp)
+ http_decoder_result_queue.cpp http_decoder_stat.cpp http_decoder_tunnel.cpp)
add_library(http_decoder SHARED ${HTTP_SRC})
set_target_properties(http_decoder PROPERTIES LINK_FLAGS "-Wl,--version-script=${PROJECT_SOURCE_DIR}/src/version.map")
diff --git a/src/http_decoder.cpp b/src/http_decoder.cpp
index 8aa39e9..3af4dad 100644
--- a/src/http_decoder.cpp
+++ b/src/http_decoder.cpp
@@ -1,3 +1,4 @@
+#include "http_decoder.h"
#include <assert.h>
#include <stdio.h>
#include <string.h>
@@ -36,6 +37,7 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d
struct http_decoder_half_data *half_data = NULL;
int ret = 0;
u_int8_t flow_flag = 0;
+ struct http_decoder_exdata *exdata = ev_ctx->ref_httpd_ctx;
int thread_id = stellar_get_current_thread_id(httpd_env->st);
@@ -74,44 +76,53 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d
}
*data = half_data;
queue_idx = http_decoder_result_queue_req_index(queue); //get the index after inc
-#if 1 /* llhttp always call on_message_begin() even if llhttp_execute() error!!! */
+ /* llhttp always call on_message_begin() even if llhttp_execute() error!!! */
msg = http_message_new(HTTP_TRANSACTION_NEW, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_NEW, 1);
-#endif
break;
case HTTP_EVENT_REQ_LINE:
-#if 0
- msg = http_message_new(HTTP_TRANSACTION_NEW, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_NEW, 1);
-#endif
msg = http_message_new(HTTP_MESSAGE_REQ_LINE, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
+ if(httpd_tunnel_identify(PACKET_DIRECTION_C2S, half_data)){
+ exdata->tunnel_state = HTTP_TUN_C2S_HDR_START;
+ }
+ http_decoder_get_url(half_data, mempool);
break;
case HTTP_EVENT_REQ_HDR:
msg = http_message_new(HTTP_MESSAGE_REQ_HEADER, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
break;
case HTTP_EVENT_REQ_HDR_END:
{
- http_decoder_join_url_finally(ev_ctx, http_decoder_result_queue_peek_req(queue), mempool);
+ http_decoder_join_url_finally(ev_ctx, half_data, mempool);
/* maybe some parsed headers in buffer, but has not pushed to plugins yet */
- half_data = http_decoder_result_queue_peek_req(queue);
+
if(http_decoder_half_data_has_parsed_header(half_data)){
msg = http_message_new(HTTP_MESSAGE_REQ_HEADER, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
}
http_half_data_update_commit_index(half_data);
msg = http_message_new(HTTP_MESSAGE_REQ_HEADER_END, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
int tot_c2s_headers = http_half_data_get_total_parsed_header_count(half_data);
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_HEADERS_C2S, tot_c2s_headers);
- struct hstring tmp_url = {};
+ hstring tmp_url = {};
http_half_data_get_url(half_data, &tmp_url);
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_URL_BYTES, tmp_url.str_len);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_URL_BYTES, tmp_url.iov_len);
+
+ if(httpd_is_tunnel_session(exdata)){
+ session_is_symmetric(ev_ctx->ref_session, &flow_flag);
+ if(SESSION_SEEN_C2S_FLOW == flow_flag){
+ exdata->tunnel_state = HTTP_TUN_INNER_STARTING;
+ http_half_pre_context_free(ev_ctx->ref_session, exdata);
+ exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].sub_topic_id;
+ }else{
+ exdata->tunnel_state = HTTP_TUN_C2S_HDR_END;
+ }
+ }
}
break;
case HTTP_EVENT_REQ_BODY_BEGIN:
@@ -119,18 +130,18 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d
break;
case HTTP_EVENT_REQ_BODY_DATA:
msg = http_message_new(HTTP_MESSAGE_REQ_BODY, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
break;
case HTTP_EVENT_REQ_BODY_END:
msg = http_message_new(HTTP_MESSAGE_REQ_BODY_END, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
break;
case HTTP_EVENT_REQ_END:
{
session_is_symmetric(ev_ctx->ref_session, &flow_flag);
if(SESSION_SEEN_C2S_FLOW == flow_flag){
msg = http_message_new(HTTP_TRANSACTION_FREE, queue, queue_idx, HTTP_REQUEST);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_FREE, 1);
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_ASYMMETRY_TRANSACTION_C2S, 1);
}
@@ -173,17 +184,23 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d
if(0 == session_is_symmetric(ev_ctx->ref_session, &flow_flag)){
if(SESSION_SEEN_S2C_FLOW == flow_flag){
msg = http_message_new(HTTP_TRANSACTION_NEW, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
}
}
break;
case HTTP_EVENT_RES_LINE:
msg = http_message_new(HTTP_MESSAGE_RES_LINE, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
+ if(httpd_tunnel_identify(PACKET_DIRECTION_S2C, half_data)){
+ exdata->tunnel_state = HTTP_TUN_S2C_START;
+ }else{
+ //connect response fail, reset tunnel_state
+ exdata->tunnel_state = HTTP_TUN_NON;
+ }
break;
case HTTP_EVENT_RES_HDR:
msg = http_message_new(HTTP_MESSAGE_RES_HEADER, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
break;
case HTTP_EVENT_RES_HDR_END:
{
@@ -191,29 +208,36 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d
half_data = http_decoder_result_queue_peek_res(queue);
if(http_decoder_half_data_has_parsed_header(half_data)){
msg = http_message_new(HTTP_MESSAGE_RES_HEADER, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
}
http_half_data_update_commit_index(half_data);
msg = http_message_new(HTTP_MESSAGE_RES_HEADER_END, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
int tot_s2c_headers = http_half_data_get_total_parsed_header_count(half_data);
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_HEADERS_S2C, tot_s2c_headers);
+
+ if(httpd_is_tunnel_session(exdata)){
+ exdata->tunnel_state = HTTP_TUN_INNER_STARTING;
+ http_half_pre_context_free(ev_ctx->ref_session, exdata);
+ exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].sub_topic_id;
+ // http_decoder_push_tunnel_data(ev_ctx->ref_session, exdata, HTTP_TUNNEL_OPENING);
+ }
}
break;
case HTTP_EVENT_RES_BODY_BEGIN:
break;
case HTTP_EVENT_RES_BODY_DATA:
msg = http_message_new(HTTP_MESSAGE_RES_BODY, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
break;
case HTTP_EVENT_RES_BODY_END:
msg = http_message_new(HTTP_MESSAGE_RES_BODY_END, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
break;
case HTTP_EVENT_RES_END:
msg = http_message_new(HTTP_TRANSACTION_FREE, queue, queue_idx, HTTP_RESPONSE);
- session_mq_publish_message(ev_ctx->ref_session, ev_ctx->topic_id, msg);
+ session_mq_publish_message(ev_ctx->ref_session, exdata->pub_topic_id, msg);
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TRANSACTION_FREE, 1);
session_is_symmetric(ev_ctx->ref_session, &flow_flag);
if(SESSION_SEEN_S2C_FLOW == flow_flag){
@@ -237,12 +261,14 @@ static void http_event_handler(enum http_event event, struct http_decoder_half_d
}
}
-static struct http_decoder *http_decoder_new(nmx_pool_t *mempool, http_event_cb *ev_cb, int decompress_switch, struct http_decoder_env *httpd_env)
+static struct http_decoder *http_decoder_new(struct http_decoder_exdata *hd_ctx, nmx_pool_t *mempool,
+ http_event_cb *ev_cb, int decompress_switch, struct http_decoder_env *httpd_env,
+ long long req_start_seq, long long res_start_seq)
{
struct http_decoder *decoder = MEMPOOL_CALLOC(mempool, struct http_decoder, 1);
assert(decoder);
- decoder->c2s_half = http_decoder_half_new(mempool, ev_cb, HTTP_REQUEST, decompress_switch, httpd_env);
- decoder->s2c_half = http_decoder_half_new(mempool, ev_cb, HTTP_RESPONSE, decompress_switch, httpd_env);
+ decoder->c2s_half = http_decoder_half_new(hd_ctx, mempool, ev_cb, HTTP_REQUEST, decompress_switch, httpd_env, req_start_seq);
+ decoder->s2c_half = http_decoder_half_new(hd_ctx, mempool, ev_cb, HTTP_RESPONSE, decompress_switch, httpd_env, res_start_seq);
return decoder;
}
@@ -266,13 +292,15 @@ static void http_decoder_free(nmx_pool_t *mempool, struct http_decoder *decoder)
}
static struct http_decoder_exdata *http_decoder_exdata_new(size_t mempool_size, size_t queue_size,
- int decompress_switch, struct http_decoder_env *httpd_env)
+ int decompress_switch, struct http_decoder_env *httpd_env,
+ long long req_start_seq, long long res_start_seq)
{
- struct http_decoder_exdata *ex_data = CALLOC(struct http_decoder_exdata, 1);
- ex_data->mempool = nmx_create_pool(mempool_size);
- ex_data->decoder = http_decoder_new(ex_data->mempool, http_event_handler, decompress_switch, httpd_env);
- ex_data->queue = http_decoder_result_queue_new(ex_data->mempool, queue_size);
- return ex_data;
+ struct http_decoder_exdata *hd_ctx = CALLOC(struct http_decoder_exdata, 1);
+ hd_ctx->mempool = nmx_create_pool(mempool_size);
+ hd_ctx->decoder = http_decoder_new(hd_ctx, hd_ctx->mempool, http_event_handler, decompress_switch,
+ httpd_env, req_start_seq, res_start_seq);
+ hd_ctx->queue = http_decoder_result_queue_new(hd_ctx->mempool, queue_size);
+ return hd_ctx;
}
static void http_decoder_exdata_free(struct http_decoder_exdata *ex_data)
@@ -291,8 +319,9 @@ static void http_decoder_exdata_free(struct http_decoder_exdata *ex_data)
http_decoder_result_queue_free(ex_data->mempool, ex_data->queue);
ex_data->queue = NULL;
}
-
- nmx_destroy_pool(ex_data->mempool);
+ if(ex_data->mempool){
+ nmx_destroy_pool(ex_data->mempool);
+ }
FREE(ex_data);
}
@@ -310,7 +339,7 @@ static int http_protocol_identify(const char *data, size_t data_len)
{
return -1;
}
- return 0;
+ return 1;
}
static void _http_decoder_context_free(struct http_decoder_env *env)
@@ -327,10 +356,10 @@ static void _http_decoder_context_free(struct http_decoder_env *env)
http_decoder_stat_free(&env->hd_stat);
- if (env->httpd_msg_topic_id >= 0)
- {
- stellar_session_mq_destroy_topic(env->st, env->httpd_msg_topic_id);
- env->httpd_msg_topic_id = -1;
+ for(int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++){
+ if(env->topic_exdata_compose[i].msg_free_cb){
+ stellar_session_mq_destroy_topic(env->st, env->topic_exdata_compose[i].sub_topic_id);
+ }
}
FREE(env);
@@ -412,7 +441,7 @@ static int load_http_decoder_config(const char *cfg_path,
return ret;
}
-static int http_msg_get_request_header(const struct http_message *msg, const struct hstring *key,
+static int http_msg_get_request_header(const struct http_message *msg, const hstring *key,
struct http_header *hdr_result)
{
const struct http_decoder_half_data *req_data =
@@ -420,7 +449,7 @@ static int http_msg_get_request_header(const struct http_message *msg, const str
return http_decoder_half_data_get_header(req_data, key, hdr_result);
}
-static int http_msg_get_response_header(const struct http_message *msg, const struct hstring *key,
+static int http_msg_get_response_header(const struct http_message *msg, const hstring *key,
struct http_header *hdr_result)
{
const struct http_decoder_half_data *res_data =
@@ -445,7 +474,7 @@ static int http_msg_response_header_next(const struct http_message *msg,
}
static int http_msg_get_request_raw_body(const struct http_message *msg,
- struct hstring *body)
+ hstring *body)
{
const struct http_decoder_half_data *req_data =
msg->ref_queue->array[msg->queue_index].req_data;
@@ -453,7 +482,7 @@ static int http_msg_get_request_raw_body(const struct http_message *msg,
}
static int http_msg_get_response_raw_body(const struct http_message *msg,
- struct hstring *body)
+ hstring *body)
{
const struct http_decoder_half_data *res_data =
msg->ref_queue->array[msg->queue_index].res_data;
@@ -461,7 +490,7 @@ static int http_msg_get_response_raw_body(const struct http_message *msg,
}
static int http_msg_get_request_decompress_body(const struct http_message *msg,
- struct hstring *body)
+ hstring *body)
{
const struct http_decoder_half_data *req_data =
msg->ref_queue->array[msg->queue_index].req_data;
@@ -469,19 +498,32 @@ static int http_msg_get_request_decompress_body(const struct http_message *msg,
}
static int http_msg_get_response_decompress_body(const struct http_message *msg,
- struct hstring *body)
+ hstring *body)
{
const struct http_decoder_half_data *res_data =
msg->ref_queue->array[msg->queue_index].res_data;
return http_decoder_half_data_get_decompress_body(res_data, body);
}
+static struct http_decoder_exdata *httpd_session_exdata_new(struct session *sess, struct http_decoder_env *httpd_env,
+ long long req_start_seq, long long res_start_seq)
+{
+ struct http_decoder_exdata *exdata = http_decoder_exdata_new(httpd_env->hd_cfg.mempool_size,
+ httpd_env->hd_cfg.result_queue_len,
+ httpd_env->hd_cfg.decompress_switch,
+ httpd_env,req_start_seq,res_start_seq);
+ // exdata->sub_topic_id = sub_topic_id;
+ int thread_id = stellar_get_current_thread_id(httpd_env->st);
+ http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_SESSION_NEW, 1);
+ return exdata;
+}
+
#ifdef __cplusplus
extern "C"
{
#endif
- void _httpd_ex_data_free_cb(struct session *s, int idx, void *ex_data, void *arg)
+ void httpd_ex_data_free_cb(struct session *s, int idx, void *ex_data, void *arg)
{
if (NULL == ex_data)
{
@@ -491,7 +533,7 @@ extern "C"
http_decoder_exdata_free(exdata);
}
- void *_httpd_session_ctx_new_cb(struct session *sess, void *plugin_env)
+ void *httpd_session_ctx_new_cb(struct session *sess, void *plugin_env)
{
// If not http, ignore this session
size_t payload_len;
@@ -504,31 +546,23 @@ extern "C"
if (ret < 0)
{
stellar_session_plugin_dettach_current_session(sess);
- return (void *)"not_http_session";
+ return (void *)"__not_http_session__";
}
}
- struct http_decoder_exdata *ex_data = http_decoder_exdata_new(httpd_env->hd_cfg.mempool_size,
- httpd_env->hd_cfg.result_queue_len,
- httpd_env->hd_cfg.decompress_switch,
- httpd_env);
- session_exdata_set(sess, httpd_env->ex_data_idx, ex_data);
- int thread_id = stellar_get_current_thread_id(httpd_env->st);
- http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_SESSION_NEW, 1);
-
- return (void *)"fake_http_decoder_ctx"; // http decoder not use ctx, use exdata only!
+ return (void *)"__fake_http_decoder_ctx__";
}
- void _httpd_session_ctx_free_cb(struct session *sess, void *session_ctx, void *plugin_env)
+ void httpd_session_ctx_free_cb(struct session *sess, void *session_ctx, void *plugin_env)
{
if(NULL == plugin_env || NULL == session_ctx){
return;
}
- if(strncmp((const char *)session_ctx, "not_http_session", strlen("not_http_session")) == 0){
+ if(strncmp((const char *)session_ctx, "__not_http_session__", strlen("__not_http_session__")) == 0){
return;
}
struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env;
- int thread_id = stellar_get_current_thread_id(httpd_env->st);
+ int thread_id = session_get_current_thread_id(sess);
unsigned char flow_flag = 0;
session_is_symmetric(sess, &flow_flag);
@@ -541,53 +575,189 @@ extern "C"
}
}
- void http_decoder_tcp_stream_msg_cb(struct session *sess, int topic_id, const void *msg, void *no_use_ctx, void *plugin_env)
+ static void http_decoder_execute(struct session *sess, struct http_decoder_env *httpd_env, http_decoder_exdata *exdata)
{
- struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env;
- struct http_decoder_exdata *ex_data = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->ex_data_idx);
size_t payload_len;
-
- if(SESSION_STATE_CLOSING == session_get_current_state(sess)){
- http_half_pre_context_free(sess, httpd_env, ex_data);
- return;
- }
-
const char *payload = session_get0_current_payload(sess, &payload_len);
- if (unlikely(0 == payload_len || NULL == ex_data))
+ if (unlikely(0 == payload_len || NULL == payload))
{
return;
}
- int thread_id = stellar_get_current_thread_id(httpd_env->st);
- struct http_decoder_half *cur_half = NULL;
+ if(httpd_in_tunnel_transmitting(exdata)){
+ http_decoder_push_tunnel_data(sess, exdata, httpd_tunnel_state_to_msg(exdata));
+ httpd_tunnel_state_update(exdata);
+ return;
+ }
- if (PACKET_DIRECTION_C2S == packet_get_direction(session_get0_current_packet(sess)))
+ int thread_id = session_get_current_thread_id(sess);
+ struct http_decoder_half *cur_half = NULL;
+ int sess_dir = packet_get_direction(session_get0_current_packet(sess));
+ if (PACKET_DIRECTION_C2S == sess_dir)
{
- cur_half = ex_data->decoder->c2s_half;
+ cur_half = exdata->decoder->c2s_half;
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_BYTES_C2S, payload_len);
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TCP_SEG_C2S, 1);
}
else
{
- cur_half = ex_data->decoder->s2c_half;
+ cur_half = exdata->decoder->s2c_half;
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_BYTES_S2C, payload_len);
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_TCP_SEG_S2C, 1);
}
- http_decoder_half_reinit(cur_half, httpd_env->httpd_msg_topic_id, ex_data->queue,
- ex_data->mempool, sess);
+ http_decoder_half_reinit(cur_half, exdata->queue, exdata->mempool, sess);
int ret = http_decoder_half_parse(cur_half, payload, payload_len);
if (ret < 0)
{
http_decoder_stat_update(&httpd_env->hd_stat, thread_id, HTTPD_STAT_PARSE_ERR, 1);
stellar_session_plugin_dettach_current_session(sess);
}
+ }
+
+ void http_decoder_tunnel_msg_cb(struct session *sess, int topic_id, const void *tmsg, void *per_session_ctx, void *plugin_env)
+ {
+ struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env;
+ struct http_decoder_exdata *exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].exdata_id);
+ enum http_tunnel_message_type tmsg_type = http_tunnel_message_type_get((const struct http_tunnel_message *)tmsg);
+ switch (tmsg_type)
+ {
+ case HTTP_TUNNEL_OPENING:
+ {
+ if(NULL != exdata){
+ //not support nested http tunnel
+ session_mq_ignore_message(sess, topic_id, httpd_env->plugin_id);
+ return;
+ }
+ size_t payload_len;
+ const char *payload = session_get0_current_payload(sess, &payload_len);
+ size_t http_identify_len = payload_len > HTTP_IDENTIFY_LEN ? HTTP_IDENTIFY_LEN : payload_len;
+ int is_http = http_protocol_identify(payload, http_identify_len);
+ if(is_http){
+ long long max_req_seq = 0, max_res_seq = 0;
+ struct http_decoder_exdata *tcp_stream_exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id);
+ http_half_get_max_transaction_seq(tcp_stream_exdata, &max_req_seq, &max_res_seq);
+ exdata = httpd_session_exdata_new(sess, httpd_env, max_req_seq, max_res_seq);
+ session_exdata_set(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].exdata_id, exdata);
+ exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_MSG_INDEX].sub_topic_id;
+ exdata->in_tunnel_is_http = 1;
+ }else{
+ exdata = CALLOC(struct http_decoder_exdata, 1);
+ exdata->decoder = NULL;
+ exdata->pub_topic_id = -1;
+ exdata->in_tunnel_is_http = 0;
+ session_exdata_set(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_TUNNEL_INDEX].exdata_id, exdata);
+ //do nothing, but can't call stellar_session_plugin_dettach_current_session() !!!
+ return;
+ }
+ }
+ break;
+
+ case HTTP_TUNNEL_ACTIVE:
+ break;
+
+ case HTTP_TUNNEL_CLOSING:
+ if(exdata->in_tunnel_is_http){
+ http_half_pre_context_free(sess, exdata);
+ }
+ return;
+ break;
+
+ default:
+ break;
+ }
+ if(exdata->in_tunnel_is_http){
+ http_decoder_execute(sess, httpd_env, exdata);
+ }
+ return;
+ }
+
+ void http_decoder_tcp_stream_msg_cb(struct session *sess, int topic_id, const void *msg, void *nouse_session_ctx, void *plugin_env)
+ {
+ struct http_decoder_env *httpd_env = (struct http_decoder_env *)plugin_env;
+ enum session_state sess_state = session_get_current_state(sess);
+ struct http_decoder_exdata *exdata = (struct http_decoder_exdata *)session_exdata_get(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id);
+
+ switch(sess_state){
+ case SESSION_STATE_OPENING:
+ {
+ exdata = httpd_session_exdata_new(sess, httpd_env, 0, 0);
+ exdata->pub_topic_id = httpd_env->topic_exdata_compose[HTTPD_TOPIC_HTTP_MSG_INDEX].sub_topic_id;
+ session_exdata_set(sess, httpd_env->topic_exdata_compose[HTTPD_TOPIC_TCP_STREAM_INDEX].exdata_id, exdata);
+ //go on
+ }
+ break;
+
+ case SESSION_STATE_ACTIVE:
+ //go on
+ break;
+
+ case SESSION_STATE_CLOSING:
+ {
+ if(httpd_in_tunnel_transmitting(exdata)){
+ http_decoder_push_tunnel_data(sess, exdata, HTTP_TUNNEL_CLOSING);
+ }else{
+ http_half_pre_context_free(sess, exdata);
+ }
+ return;
+ }
+ break;
+
+ default:
+ return;
+ break;
+ }
+
+ http_decoder_execute(sess, httpd_env, exdata);
+
return;
}
+static const struct http_topic_exdata_compose g_topic_exdata_compose[HTTPD_TOPIC_INDEX_MAX] =
+{
+ {HTTPD_TOPIC_TCP_STREAM_INDEX, TOPIC_TCP_STREAM, http_decoder_tcp_stream_msg_cb, NULL, "HTTP_DECODER_EXDATA_BASEON_TCP_STREAM", httpd_ex_data_free_cb, -1, -1},
+ {HTTPD_TOPIC_HTTP_MSG_INDEX, HTTP_DECODER_TOPIC, NULL, http_message_free, NULL, NULL, -1, -1},
+ {HTTPD_TOPIC_HTTP_TUNNEL_INDEX, HTTP_DECODER_TUNNEL_TOPIC, http_decoder_tunnel_msg_cb, http_message_free, "HTTP_DECODER_EXDATA_BASEON_HTTP_TUNNEL", httpd_ex_data_free_cb, -1, -1},
+};
+
+ static void http_decoder_topic_exdata_compose_init(struct http_decoder_env *httpd_env)
+ {
+ memcpy(httpd_env->topic_exdata_compose, g_topic_exdata_compose, sizeof(g_topic_exdata_compose));
+ for(int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++){
+ httpd_env->topic_exdata_compose[i].sub_topic_id = stellar_session_mq_get_topic_id_reliable(httpd_env->st,
+ httpd_env->topic_exdata_compose[i].topic_name,
+ httpd_env->topic_exdata_compose[i].msg_free_cb,
+ NULL);
+ assert(httpd_env->topic_exdata_compose[i].sub_topic_id >= 0);
+
+ if(httpd_env->topic_exdata_compose[i].exdata_name){
+ httpd_env->topic_exdata_compose[i].exdata_id = stellar_session_exdata_new_index(httpd_env->st,
+ httpd_env->topic_exdata_compose[i].exdata_name,
+ httpd_env->topic_exdata_compose[i].exdata_free_cb,
+ NULL);
+ assert(httpd_env->topic_exdata_compose[i].exdata_id >= 0);
+ }
+
+ if(httpd_env->topic_exdata_compose[i].on_msg_cb){
+ stellar_session_mq_subscribe(httpd_env->st, httpd_env->topic_exdata_compose[i].sub_topic_id,
+ httpd_env->topic_exdata_compose[i].on_msg_cb, httpd_env->plugin_id);
+ }
+ }
+ }
+
+ int http_topic_exdata_compose_get_index(const struct http_decoder_env *httpd_env, int by_topic_id)
+ {
+ for(int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++){
+ if(httpd_env->topic_exdata_compose[i].sub_topic_id == by_topic_id){
+ return i;
+ }
+ }
+ assert(0);
+ return -1;
+ }
+
void *http_decoder_init(struct stellar *st)
{
- int httpd_msg_topic_id = -1, tcp_stream_topic_id = -1;
int thread_num = 0;
struct http_decoder_env *httpd_env = CALLOC(struct http_decoder_env, 1);
@@ -597,27 +767,13 @@ extern "C"
goto failed;
}
httpd_env->st = st;
- httpd_env->ex_data_idx = stellar_session_exdata_new_index(st, "HTTP_DECODER",
- _httpd_ex_data_free_cb,
- NULL);
- httpd_env->plugin_id = stellar_session_plugin_register(st, _httpd_session_ctx_new_cb,
- _httpd_session_ctx_free_cb, (void *)httpd_env);
+ httpd_env->plugin_id = stellar_session_plugin_register(st, httpd_session_ctx_new_cb,
+ httpd_session_ctx_free_cb, (void *)httpd_env);
if (httpd_env->plugin_id < 0)
{
goto failed;
}
-
- httpd_msg_topic_id = stellar_session_mq_get_topic_id(st, HTTP_DECODER_TOPIC);
- if (httpd_msg_topic_id < 0)
- {
- httpd_msg_topic_id = stellar_session_mq_create_topic(st, HTTP_DECODER_TOPIC,
- http_message_free, NULL);
- }
- httpd_env->httpd_msg_topic_id = httpd_msg_topic_id;
-
- tcp_stream_topic_id = stellar_session_mq_get_topic_id(st, TOPIC_TCP_STREAM);
- assert(tcp_stream_topic_id >= 0);
- stellar_session_mq_subscribe(st, tcp_stream_topic_id, http_decoder_tcp_stream_msg_cb, httpd_env->plugin_id);
+ http_decoder_topic_exdata_compose_init(httpd_env);
thread_num = stellar_get_worker_thread_num(st);
assert(thread_num >= 1);
@@ -626,9 +782,15 @@ extern "C"
{
goto failed;
}
-
- printf("http_decoder_init succ: ex_data_idx:%d, plugin_id:%d, topic_id:%d\n",
- httpd_env->ex_data_idx, httpd_env->plugin_id, httpd_env->httpd_msg_topic_id);
+
+ printf("http decoder init succ, plugin id:%d \n", httpd_env->plugin_id);
+ for(int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++){
+ printf("\ttopic_name:%s, topic_id:%d, ex_data_name:%s, exdata_id:%d\n",
+ httpd_env->topic_exdata_compose[i].topic_name,
+ httpd_env->topic_exdata_compose[i].sub_topic_id,
+ httpd_env->topic_exdata_compose[i].exdata_name,
+ httpd_env->topic_exdata_compose[i].exdata_id);
+ }
return httpd_env;
failed:
@@ -662,9 +824,9 @@ extern "C"
if (unlikely(NULL == msg || msg->type != HTTP_MESSAGE_REQ_LINE))
{
if(line){
- line->method.str = NULL;
- line->uri.str = NULL;
- line->version.str = NULL;
+ line->method.iov_base = NULL;
+ line->uri.iov_base = NULL;
+ line->version.iov_base = NULL;
}
return;
}
@@ -683,8 +845,8 @@ extern "C"
if (unlikely(NULL == msg || msg->type != HTTP_MESSAGE_RES_LINE))
{
if(line){
- line->version.str = NULL;
- line->status.str = NULL;
+ line->version.iov_base = NULL;
+ line->status.iov_base = NULL;
}
return;
}
@@ -697,7 +859,7 @@ extern "C"
http_decoder_half_data_get_response_line(res_data, line);
}
- void http_message_get_header(const struct http_message *msg, const struct hstring *key,
+ void http_message_get_header(const struct http_message *msg, const hstring *key,
struct http_header *hdr_result)
{
int ret = -1;
@@ -720,8 +882,8 @@ extern "C"
}
fail:
if(hdr_result){
- hdr_result->key.str = NULL;
- hdr_result->val.str = NULL;
+ hdr_result->key.iov_base = NULL;
+ hdr_result->val.iov_base = NULL;
}
return;
}
@@ -750,8 +912,8 @@ extern "C"
return 0;
fail:
if(header){
- header->key.str = NULL;
- header->val.str = NULL;
+ header->key.iov_base = NULL;
+ header->val.iov_base = NULL;
}
return -1;
}
@@ -780,7 +942,7 @@ extern "C"
}
void http_message_get_raw_body(const struct http_message *msg,
- struct hstring *body)
+ hstring *body)
{
int ret = -1;
if (unlikely(NULL == msg))
@@ -803,14 +965,14 @@ extern "C"
return;
fail:
if(body){
- body->str = NULL;
- body->str_len = 0;
+ body->iov_base = NULL;
+ body->iov_len = 0;
}
return;
}
void http_message_get_decompress_body(const struct http_message *msg,
- struct hstring *body)
+ hstring *body)
{
int ret = -1;
if (unlikely(NULL == msg))
@@ -833,19 +995,19 @@ extern "C"
return;
fail:
if(body){
- body->str = NULL;
- body->str_len = 0;
+ body->iov_base = NULL;
+ body->iov_len = 0;
}
return;
}
- void http_message_get_url(const struct http_message *msg, struct hstring *url)
+ void http_message_get_url(const struct http_message *msg, hstring *url)
{
if (unlikely(NULL == msg))
{
if(url){
- url->str = NULL;
- url->str_len = 0;
+ url->iov_base = NULL;
+ url->iov_len = 0;
}
return;
}
@@ -862,8 +1024,8 @@ extern "C"
fail:
if(url){
- url->str = NULL;
- url->str_len = 0;
+ url->iov_base = NULL;
+ url->iov_len = 0;
}
return;
}
diff --git a/src/http_decoder_half.cpp b/src/http_decoder_half.cpp
index 441ff2c..6074307 100644
--- a/src/http_decoder_half.cpp
+++ b/src/http_decoder_half.cpp
@@ -21,7 +21,7 @@ struct http_decoder_half_data
size_t decompress_body_len;
int joint_url_complete;
- struct hstring joint_url; // http://<host>[:<port>]/<path>?<searchpart>
+ hstring joint_url; // http://<host>[:<port>]/<path>?<searchpart>
long long transaction_index;
};
@@ -42,7 +42,7 @@ struct http_decoder_half
long long trans_counter;
long long err_counter;
- long long transaction_seq;
+ long long transaction_seq; //accumulated
const char *data;
int data_len;
@@ -77,9 +77,9 @@ http_decoder_half_data_decompress(struct http_decoder_half_data *data)
return;
}
- struct hstring raw_body = {0};
+ hstring raw_body = {0};
http_decoder_table_get_body(data->table, &raw_body);
- if (raw_body.str == NULL || raw_body.str_len == 0)
+ if (raw_body.iov_base == NULL || raw_body.iov_len == 0)
{
return;
}
@@ -90,8 +90,8 @@ http_decoder_half_data_decompress(struct http_decoder_half_data *data)
}
assert(data->decompress);
- if (http_content_decompress_write(data->decompress, raw_body.str,
- raw_body.str_len,
+ if (http_content_decompress_write(data->decompress, (char *)raw_body.iov_base,
+ raw_body.iov_len,
&data->ref_decompress_body,
&data->decompress_body_len) == -1)
{
@@ -226,12 +226,12 @@ static int on_uri(llhttp_t *http, const char *at, size_t length)
return 0;
}
-static void http_decoder_cached_portion_url(struct http_decoder_half *half, const struct hstring *uri_result)
+static void http_decoder_cached_portion_url(struct http_decoder_half *half, const hstring *uri_result)
{
struct http_decoder_half_data *ref_data = half->ref_data;
int uri_skip_len = 0;
- if ((uri_result->str_len) > 7 && (strncasecmp("http://", uri_result->str, 7) == 0)) // absolute URI
+ if ((uri_result->iov_len) > 7 && (strncasecmp("http://", (char *)uri_result->iov_base, 7) == 0)) // absolute URI
{
uri_skip_len = strlen("http://");
ref_data->joint_url_complete = 1;
@@ -241,9 +241,9 @@ static void http_decoder_cached_portion_url(struct http_decoder_half *half, cons
ref_data->joint_url_complete = 0;
}
- ref_data->joint_url.str_len = uri_result->str_len - uri_skip_len;
- ref_data->joint_url.str = MEMPOOL_CALLOC(half->http_ev_ctx->ref_mempool, char, ref_data->joint_url.str_len);
- memcpy(ref_data->joint_url.str, uri_result->str + uri_skip_len, ref_data->joint_url.str_len);
+ ref_data->joint_url.iov_len = uri_result->iov_len - uri_skip_len;
+ ref_data->joint_url.iov_base = MEMPOOL_CALLOC(half->http_ev_ctx->ref_mempool, char, ref_data->joint_url.iov_len);
+ memcpy(ref_data->joint_url.iov_base, (char *)uri_result->iov_base + uri_skip_len, ref_data->joint_url.iov_len);
}
/* Information-only callbacks, return value is ignored */
@@ -261,9 +261,9 @@ static int on_uri_complete(llhttp_t *http)
http_decoder_table_commit(half->ref_data->table, HTTP_ITEM_URI);
- struct hstring uri_result = {};
+ hstring uri_result = {};
http_decoder_table_get_uri(half->ref_data->table, &uri_result);
- assert(uri_result.str);
+ assert(uri_result.iov_base);
http_decoder_cached_portion_url(half, &uri_result);
return 0;
@@ -408,17 +408,17 @@ static int on_header_value_complete(llhttp_t *http)
if (half->ref_data->content_encoding == HTTP_CONTENT_ENCODING_NONE)
{
struct http_header http_hdr = {0};
- struct hstring key = {.str = (char *)"Content-Encoding", .str_len = 16};
+ hstring key = {.iov_base = (char *)"Content-Encoding", .iov_len = 16};
if (http_decoder_table_get_header(half->ref_data->table, &key, &http_hdr) == 0)
{
char encoding_str[MAX_ENCODING_STR_LEN + 1] = {0};
- size_t str_len = http_hdr.val.str_len;
- if (str_len > MAX_ENCODING_STR_LEN)
+ size_t iov_len = http_hdr.val.iov_len;
+ if (iov_len > MAX_ENCODING_STR_LEN)
{
- str_len = MAX_ENCODING_STR_LEN;
+ iov_len = MAX_ENCODING_STR_LEN;
}
- memcpy(encoding_str, http_hdr.val.str, str_len);
+ memcpy(encoding_str, http_hdr.val.iov_base, iov_len);
half->ref_data->content_encoding = http_content_encoding_str2int(encoding_str);
}
}
@@ -539,8 +539,7 @@ static int on_body(llhttp_t *http, const char *at, size_t length)
return 0;
}
-static void http_decoder_half_init(struct http_decoder_half *half,
- http_event_cb *http_ev_cb, enum llhttp_type type)
+static void http_decoder_half_init(struct http_decoder_half *half, http_event_cb *http_ev_cb, enum llhttp_type type)
{
llhttp_settings_init(&half->settings);
llhttp_init(&half->parser, type, &half->settings);
@@ -579,8 +578,9 @@ static void http_decoder_half_init(struct http_decoder_half *half,
half->ref_data = NULL;
}
-struct http_decoder_half * http_decoder_half_new(nmx_pool_t *mempool, http_event_cb *ev_cb, enum llhttp_type http_type,
- int decompress_switch, struct http_decoder_env *httpd_env)
+struct http_decoder_half * http_decoder_half_new(struct http_decoder_exdata *hd_ctx, nmx_pool_t *mempool,
+ http_event_cb *ev_cb, enum llhttp_type http_type,
+ int decompress_switch, struct http_decoder_env *httpd_env, long long start_seq)
{
struct http_decoder_half *half = MEMPOOL_CALLOC(mempool, struct http_decoder_half, 1);
assert(half);
@@ -588,8 +588,9 @@ struct http_decoder_half * http_decoder_half_new(nmx_pool_t *mempool, http_event
half->decompress_switch = decompress_switch;
half->http_ev_ctx = MEMPOOL_CALLOC(mempool, struct http_event_context, 1);
http_decoder_half_init(half, ev_cb, http_type);
-
+ half->http_ev_ctx->ref_httpd_ctx = hd_ctx;
half->httpd_env = httpd_env;
+ half->transaction_seq = start_seq;
return half;
}
@@ -609,7 +610,7 @@ void http_decoder_half_free(nmx_pool_t *mempool, struct http_decoder_half *half)
MEMPOOL_FREE(mempool, half);
}
-void http_decoder_half_reinit(struct http_decoder_half *half, int topic_id,
+void http_decoder_half_reinit(struct http_decoder_half *half,
struct http_decoder_result_queue *queue,
nmx_pool_t *mempool, struct session *sess)
{
@@ -618,8 +619,6 @@ void http_decoder_half_reinit(struct http_decoder_half *half, int topic_id,
{
http_decoder_table_reinit(half->ref_data->table);
}
-
- half->http_ev_ctx->topic_id = topic_id;
half->http_ev_ctx->ref_mempool = mempool;
half->http_ev_ctx->ref_session = sess;
half->http_ev_ctx->ref_queue = queue;
@@ -810,10 +809,10 @@ void http_decoder_half_data_free(nmx_pool_t *mempool, struct http_decoder_half_d
data->decompress = NULL;
}
- if (data->joint_url.str)
+ if (data->joint_url.iov_base)
{
- MEMPOOL_FREE(mempool, data->joint_url.str);
- data->joint_url.str = NULL;
+ MEMPOOL_FREE(mempool, data->joint_url.iov_base);
+ data->joint_url.iov_base = NULL;
data->joint_url_complete = 0;
}
MEMPOOL_FREE(mempool, data);
@@ -846,7 +845,7 @@ int http_decoder_half_data_get_response_line(struct http_decoder_half_data *data
}
int http_decoder_half_data_get_header(const struct http_decoder_half_data *data,
- const struct hstring *key,
+ const hstring *key,
struct http_header *hdr_result)
{
return http_decoder_table_get_header(data->table, key, hdr_result);
@@ -877,7 +876,7 @@ int http_decoder_half_data_has_parsed_header(struct http_decoder_half_data *data
}
int http_decoder_half_data_get_raw_body(const struct http_decoder_half_data *data,
- struct hstring *body)
+ hstring *body)
{
if (NULL == data || NULL == body)
{
@@ -887,15 +886,15 @@ int http_decoder_half_data_get_raw_body(const struct http_decoder_half_data *dat
}
int http_decoder_half_data_get_decompress_body(const struct http_decoder_half_data *data,
- struct hstring *body)
+ hstring *body)
{
if (HTTP_CONTENT_ENCODING_NONE == data->content_encoding)
{
return http_decoder_table_get_body(data->table, body);
}
- body->str = data->ref_decompress_body;
- body->str_len = data->decompress_body_len;
+ body->iov_base = data->ref_decompress_body;
+ body->iov_len = data->decompress_body_len;
return 0;
}
@@ -923,17 +922,17 @@ static void using_session_addr_as_host(struct session *ref_session,
char ip_string_buf[INET6_ADDRSTRLEN];
if (SESSION_ADDR_TYPE_IPV4_TCP == ssaddr_type || SESSION_ADDR_TYPE_IPV4_UDP == ssaddr_type)
{
- host_result->val.str = MEMPOOL_CALLOC(mempool, char, (INET_ADDRSTRLEN + 7) /* "ip:port" max length */);
+ host_result->val.iov_base = MEMPOOL_CALLOC(mempool, char, (INET_ADDRSTRLEN + 7) /* "ip:port" max length */);
inet_ntop(AF_INET, &ssaddr->ipv4.daddr, ip_string_buf, INET_ADDRSTRLEN);
- sprintf(host_result->val.str, "%s:%u", ip_string_buf, ntohs(ssaddr->ipv4.dport));
- host_result->val.str_len = strlen(host_result->val.str);
+ sprintf((char *)host_result->val.iov_base, "%s:%u", ip_string_buf, ntohs(ssaddr->ipv4.dport));
+ host_result->val.iov_len = strlen((char *)host_result->val.iov_base);
}
else if (SESSION_ADDR_TYPE_IPV6_TCP == ssaddr_type || SESSION_ADDR_TYPE_IPV6_UDP == ssaddr_type)
{
- host_result->val.str = MEMPOOL_CALLOC(mempool, char, (INET6_ADDRSTRLEN + 7) /* "ip:port" max length */);
+ host_result->val.iov_base = MEMPOOL_CALLOC(mempool, char, (INET6_ADDRSTRLEN + 7) /* "ip:port" max length */);
inet_ntop(AF_INET6, &ssaddr->ipv6.daddr, ip_string_buf, INET6_ADDRSTRLEN);
- sprintf(host_result->val.str, "%s:%u", ip_string_buf, ntohs(ssaddr->ipv6.dport));
- host_result->val.str_len = strlen(host_result->val.str);
+ sprintf((char *)host_result->val.iov_base, "%s:%u", ip_string_buf, ntohs(ssaddr->ipv6.dport));
+ host_result->val.iov_len = strlen((char *)host_result->val.iov_base);
}
else
{
@@ -944,30 +943,43 @@ static void using_session_addr_as_host(struct session *ref_session,
void http_decoder_join_url(struct http_decoder_half_data *hfdata, nmx_pool_t *mempool, const struct http_header *host_hdr)
{
int append_slash_len = 0;
- if ('/' != hfdata->joint_url.str[0])
+ if ('/' != ((char *)hfdata->joint_url.iov_base)[0])
{
append_slash_len = 1;
}
- int url_cache_str_len = host_hdr->val.str_len + hfdata->joint_url.str_len + append_slash_len;
+ int url_cache_str_len = host_hdr->val.iov_len + hfdata->joint_url.iov_len + append_slash_len;
char *url_cache_str = MEMPOOL_CALLOC(mempool, char, url_cache_str_len);
char *ptr = url_cache_str;
- memcpy(ptr, host_hdr->val.str, host_hdr->val.str_len);
- ptr += host_hdr->val.str_len;
+ memcpy(ptr, host_hdr->val.iov_base, host_hdr->val.iov_len);
+ ptr += host_hdr->val.iov_len;
if (append_slash_len)
{
*ptr = '/';
ptr++;
}
- memcpy(ptr, hfdata->joint_url.str, hfdata->joint_url.str_len);
+ memcpy(ptr, hfdata->joint_url.iov_base, hfdata->joint_url.iov_len);
- MEMPOOL_FREE(mempool, hfdata->joint_url.str); // free the cached uri buffer
- hfdata->joint_url.str = url_cache_str;
- hfdata->joint_url.str_len = url_cache_str_len;
+ MEMPOOL_FREE(mempool, hfdata->joint_url.iov_base); // free the cached uri buffer
+ hfdata->joint_url.iov_base = url_cache_str;
+ hfdata->joint_url.iov_len = url_cache_str_len;
hfdata->joint_url_complete = 1;
}
+void http_decoder_get_url(struct http_decoder_half_data *hfdata, nmx_pool_t *mempool)
+{
+ struct http_request_line reqline = {};
+ http_decoder_half_data_get_request_line(hfdata, &reqline);
+ if(unlikely(strncasecmp_safe("CONNECT", (char *)reqline.method.iov_base, 7, reqline.method.iov_len) == 0))
+ {
+ hfdata->joint_url.iov_base = MEMPOOL_CALLOC(mempool, char, reqline.uri.iov_len+1);
+ memcpy(hfdata->joint_url.iov_base, reqline.uri.iov_base, reqline.uri.iov_len);
+ hfdata->joint_url.iov_len = reqline.uri.iov_len;
+ hfdata->joint_url_complete = 1;
+ }
+}
+
int http_decoder_join_url_finally(struct http_event_context *ev_ctx,
struct http_decoder_half_data *hfdata,
nmx_pool_t *mempool)
@@ -981,14 +993,14 @@ int http_decoder_join_url_finally(struct http_event_context *ev_ctx,
using_session_addr_as_host(ev_ctx->ref_session, &addr_as_host, mempool);
http_decoder_join_url(hfdata, mempool, &addr_as_host);
- MEMPOOL_FREE(mempool, addr_as_host.val.str); // free session addr to host buffer
+ MEMPOOL_FREE(mempool, addr_as_host.val.iov_base); // free session addr to host buffer
return 1;
}
void http_decoder_get_host_feed_url(struct http_decoder_half *half)
{
struct http_header host_result = {};
- struct hstring host_key = {(char *)"Host", 4};
+ hstring host_key = {(char *)"Host", 4};
const char *host_refer_str = NULL;
int host_refer_len = 0;
@@ -1007,14 +1019,14 @@ void http_decoder_get_host_feed_url(struct http_decoder_half *half)
http_decoder_join_url(half->ref_data, half->http_ev_ctx->ref_mempool, &host_result);
}
-int http_half_data_get_url(struct http_decoder_half_data *res_data, struct hstring *url)
+int http_half_data_get_url(struct http_decoder_half_data *res_data, hstring *url)
{
if (0 == res_data->joint_url_complete)
{
return -1;
}
- url->str = res_data->joint_url.str;
- url->str_len = res_data->joint_url.str_len;
+ url->iov_base = res_data->joint_url.iov_base;
+ url->iov_len = res_data->joint_url.iov_len;
return 0;
}
@@ -1033,24 +1045,20 @@ int http_half_data_get_total_parsed_header_count(struct http_decoder_half_data *
return http_decoder_table_get_total_parsed_header(half_data->table);
}
-void http_half_pre_context_free(struct session *sess, struct http_decoder_env *httpd_env,
- struct http_decoder_exdata *ex_data)
+void http_half_pre_context_free(struct session *sess, struct http_decoder_exdata *exdata)
{
- if (NULL == ex_data)
- {
- return;
- }
struct http_message *msg = NULL;
struct http_decoder_half_data *req_data;
struct http_decoder_half_data *res_data;
- struct http_decoder_result_queue *queue = ex_data->queue;
+ struct http_decoder_result_queue *queue = exdata->queue;
+
for(int i = 0; i < queue->queue_size; i++){
req_data = queue->array[i].req_data;
res_data = queue->array[i].res_data;
if ((req_data != NULL) && (NULL == res_data) && (req_data->state < HTTP_EVENT_REQ_END))
{
msg = http_message_new(HTTP_TRANSACTION_FREE, queue, i, HTTP_REQUEST);
- session_mq_publish_message(sess, httpd_env->httpd_msg_topic_id, msg);
+ session_mq_publish_message(sess, exdata->pub_topic_id, msg);
}
}
@@ -1059,7 +1067,7 @@ void http_half_pre_context_free(struct session *sess, struct http_decoder_env *h
if ((res_data != NULL) && (res_data->state < HTTP_EVENT_RES_END))
{
msg = http_message_new(HTTP_TRANSACTION_FREE, queue, i, HTTP_RESPONSE);
- session_mq_publish_message(sess, httpd_env->httpd_msg_topic_id, msg);
+ session_mq_publish_message(sess, exdata->pub_topic_id, msg);
}
}
}
@@ -1067,4 +1075,11 @@ void http_half_pre_context_free(struct session *sess, struct http_decoder_env *h
void http_half_update_state(struct http_decoder_half_data *hf_data, enum http_event state)
{
hf_data->state = state;
+}
+
+void http_half_get_max_transaction_seq(struct http_decoder_exdata *exdata, long long *max_req_seq, long long *max_res_seq)
+{
+ assert(exdata && max_req_seq && max_res_seq);
+ *max_req_seq = exdata->decoder->c2s_half->transaction_seq;
+ *max_res_seq = exdata->decoder->s2c_half->transaction_seq;
} \ No newline at end of file
diff --git a/src/http_decoder_half.h b/src/http_decoder_half.h
index e918e6e..5183171 100644
--- a/src/http_decoder_half.h
+++ b/src/http_decoder_half.h
@@ -30,7 +30,7 @@ enum http_event {
};
struct http_event_context {
- int topic_id;
+ struct http_decoder_exdata *ref_httpd_ctx;
nmx_pool_t *ref_mempool;
struct session *ref_session;
struct http_decoder_result_queue *ref_queue;
@@ -43,12 +43,12 @@ typedef void http_event_cb(enum http_event event, struct http_decoder_half_data
struct http_event_context *ev_ctx, void *httpd_plugin_env);
struct http_decoder_half *
-http_decoder_half_new(nmx_pool_t *mempool, http_event_cb *event_cb, enum llhttp_type http_type,
- int decompress_switch, struct http_decoder_env *httpd_env);
+http_decoder_half_new(struct http_decoder_exdata *hd_ctx, nmx_pool_t *mempool, http_event_cb *event_cb,
+ enum llhttp_type http_type, int decompress_switch, struct http_decoder_env *httpd_env,long long start_seq);
void http_decoder_half_free(nmx_pool_t *mempool, struct http_decoder_half *half);
-void http_decoder_half_reinit(struct http_decoder_half *half, int topic_id,
+void http_decoder_half_reinit(struct http_decoder_half *half,
struct http_decoder_result_queue *queue,
nmx_pool_t *mempool, struct session *sess);
@@ -69,32 +69,33 @@ int http_decoder_half_data_get_response_line(struct http_decoder_half_data *data
struct http_response_line *line);
int http_decoder_half_data_get_header(const struct http_decoder_half_data *data,
- const struct hstring *key, struct http_header *hdr_res);
+ const hstring *key, struct http_header *hdr_res);
int http_decoder_half_data_iter_header(struct http_decoder_half_data *data,
struct http_header *header);
int http_decoder_half_data_reset_header_iter(struct http_decoder_half_data *req_data);
int http_decoder_half_data_has_parsed_header(struct http_decoder_half_data *data);
-int http_decoder_half_data_get_raw_body(const struct http_decoder_half_data *data, struct hstring *body);
+int http_decoder_half_data_get_raw_body(const struct http_decoder_half_data *data, hstring *body);
-int http_decoder_half_data_get_decompress_body(const struct http_decoder_half_data *data, struct hstring *body);
+int http_decoder_half_data_get_decompress_body(const struct http_decoder_half_data *data, hstring *body);
void http_decoder_half_data_dump(struct http_decoder_half *half);
void http_decoder_get_host_feed_url(struct http_decoder_half *half);
+void http_decoder_get_url(struct http_decoder_half_data *hfdata, nmx_pool_t *mempool);
void http_decoder_join_url(struct http_decoder_half_data *hfdata,
nmx_pool_t *mempool,
const struct http_header *host_hdr);
int http_decoder_join_url_finally(struct http_event_context *ev_ctx,
struct http_decoder_half_data *hfdata,
nmx_pool_t *mempool);
-int http_half_data_get_url(struct http_decoder_half_data *res_data, struct hstring *url);
+int http_half_data_get_url(struct http_decoder_half_data *res_data, hstring *url);
int http_half_data_get_transaction_seq(struct http_decoder_half_data *hf_data);
void http_half_data_update_commit_index(struct http_decoder_half_data * half_data);
-void http_half_pre_context_free(struct session *sess, struct http_decoder_env *httpd_env,
- struct http_decoder_exdata *ex_data);
+void http_half_pre_context_free(struct session *sess, struct http_decoder_exdata *exdata);
void http_half_update_state(struct http_decoder_half_data *hf_data, enum http_event state);
-int http_half_data_get_total_parsed_header_count(struct http_decoder_half_data * half_data);
+int http_half_data_get_total_parsed_header_count(struct http_decoder_half_data * half_data);
+void http_half_get_max_transaction_seq(struct http_decoder_exdata *exdata, long long *max_req_seq, long long *max_res_seq);
#endif \ No newline at end of file
diff --git a/src/http_decoder_inc.h b/src/http_decoder_inc.h
index 760eaba..0e0f0c5 100644
--- a/src/http_decoder_inc.h
+++ b/src/http_decoder_inc.h
@@ -31,6 +31,7 @@ extern "C"
#include "http_decoder_result_queue.h"
#include "http_decoder_utils.h"
#include "http_decoder_stat.h"
+#include "http_decoder_tunnel.h"
#include "fieldstat/fieldstat_easy.h"
#include "toml/toml.h"
@@ -82,6 +83,7 @@ struct http_message
enum http_message_type type;
size_t queue_index;
struct http_decoder_result_queue *ref_queue;
+ hstring tunnel_payload;
};
struct http_decoder
@@ -90,19 +92,45 @@ struct http_decoder
struct http_decoder_half *s2c_half;
};
+enum httpd_topic_index{
+ HTTPD_TOPIC_TCP_STREAM_INDEX = 0,
+ HTTPD_TOPIC_HTTP_MSG_INDEX,
+ HTTPD_TOPIC_HTTP_TUNNEL_INDEX,
+ HTTPD_TOPIC_INDEX_MAX,
+};
+
struct http_decoder_exdata
{
+ int sub_topic_id; //tcp_stream
+ int pub_topic_id; //http message or http tunnel msg
struct http_decoder_result_queue *queue;
struct http_decoder *decoder;
nmx_pool_t *mempool;
+ enum http_tunnel_state tunnel_state;
+ int in_tunnel_is_http;
+};
+
+// struct http_decoder_context{
+// int array_size;
+// struct http_decoder_exdata **exdata_array; //raw tcp stream for http msg; http tunnel for inner http transaction.
+// };
+
+struct http_topic_exdata_compose{
+ enum httpd_topic_index index;
+ const char *topic_name;
+ on_session_msg_cb_func *on_msg_cb;
+ session_msg_free_cb_func *msg_free_cb;
+ const char *exdata_name;
+ session_exdata_free *exdata_free_cb;
+ int sub_topic_id; //as consumer
+ int exdata_id;
};
struct http_decoder_env
{
- int plugin_id;
- int httpd_msg_topic_id;
- int ex_data_idx;
struct stellar *st;
+ int plugin_id;
+ struct http_topic_exdata_compose topic_exdata_compose[HTTPD_TOPIC_INDEX_MAX];
struct http_decoder_config hd_cfg;
struct http_decoder_stat hd_stat;
};
@@ -111,7 +139,7 @@ struct http_message;
struct http_message *http_message_new(enum http_message_type type, struct http_decoder_result_queue *queue,
int queue_index, unsigned char flow_type);
-
+int http_topic_exdata_compose_get_index(const struct http_decoder_env *httpd_env, int by_topic_id);
#ifdef __cplusplus
}
#endif
diff --git a/src/http_decoder_stat.cpp b/src/http_decoder_stat.cpp
index dfd1a2e..a2b6205 100644
--- a/src/http_decoder_stat.cpp
+++ b/src/http_decoder_stat.cpp
@@ -1,5 +1,6 @@
#include <assert.h>
#include <stdio.h>
+#include <pthread.h>
#include <unistd.h>
#include "http_decoder_inc.h"
@@ -26,6 +27,12 @@ static const struct hd_stat_config_tuple g_httpd_stat_tuple[HTTPD_STAT_MAX] =
void http_decoder_stat_free(struct http_decoder_stat *hd_stat)
{
+ pthread_cancel(hd_stat->timer_pid);
+ void *join_res = NULL;
+ do{
+ pthread_join(hd_stat->timer_pid, &join_res);
+ }while(join_res != PTHREAD_CANCELED);
+
if(hd_stat->stats != NULL){
free(hd_stat->stats);
}
@@ -34,6 +41,19 @@ void http_decoder_stat_free(struct http_decoder_stat *hd_stat)
}
}
+static void *httpd_stat_timer_thread(void *arg)
+{
+ pthread_setname_np(pthread_self(), "http_decoder_timer_thread");
+ struct http_decoder_stat *hd_stat = (struct http_decoder_stat *)arg;
+ struct timespec res;
+ while(1){
+ clock_gettime(CLOCK_MONOTONIC, &res);
+ hd_stat->current_time_ms = (res.tv_sec * 1000) + (res.tv_nsec / 1000000);
+ usleep(800);
+ }
+ return NULL;
+}
+
int http_decoder_stat_init(struct http_decoder_stat *hd_stat, int thread_max, int stat_interval_pkts, int stat_interval_time)
{
assert(sizeof(g_httpd_stat_tuple)/sizeof(struct hd_stat_config_tuple) == HTTPD_STAT_MAX);
@@ -64,6 +84,8 @@ int http_decoder_stat_init(struct http_decoder_stat *hd_stat, int thread_max, in
return -1;
}
+ pthread_create(&hd_stat->timer_pid, NULL, httpd_stat_timer_thread, hd_stat);
+
return 0;
}
@@ -73,15 +95,16 @@ void http_decoder_stat_update(struct http_decoder_stat *hd_stat, int thread_id,
assert(thread_id >= 0);
assert(type < HTTPD_STAT_MAX);
- hd_stat->stats[thread_id].counter[type] += value;
- hd_stat->stats[thread_id].batch++;
+ struct hd_statistics *cur_hds = &hd_stat->stats[thread_id];
- if(hd_stat->stats[thread_id].batch >= hd_stat->stat_interval_pkts){
- for(int i = 0; i < HTTPD_STAT_MAX; i++){
- //update all type, maybe decrease performance ?
- fieldstat_easy_counter_incrby(hd_stat->fse, thread_id, hd_stat->field_stat_id[i], NULL, 0, hd_stat->stats[thread_id].counter[i]);
- hd_stat->stats[thread_id].counter[i] = 0;
- }
- hd_stat->stats[thread_id].batch = 0;
+ cur_hds->counter[type] += value;
+ cur_hds->batch[type]++;
+
+ if(cur_hds->batch[type] >= hd_stat->stat_interval_pkts
+ || cur_hds->time_ms[type] + 1000 < hd_stat->current_time_ms){
+ fieldstat_easy_counter_incrby(hd_stat->fse, thread_id, hd_stat->field_stat_id[type], NULL, 0, cur_hds->counter[type]);
+ cur_hds->counter[type] = 0;
+ cur_hds->batch[type] = 0;
+ cur_hds->time_ms[type] = hd_stat->current_time_ms;
}
-}
+} \ No newline at end of file
diff --git a/src/http_decoder_stat.h b/src/http_decoder_stat.h
index 235475d..e8f18d8 100644
--- a/src/http_decoder_stat.h
+++ b/src/http_decoder_stat.h
@@ -32,25 +32,19 @@ struct hd_stat_config_tuple
struct hd_statistics
{
- // long long incoming_bytes;
- // long long incoming_tcp_seg;
- // long long session_new;
- // long long session_free;
- // long long transaction_new;
- // long long transaction_free;
- // long long incoming_trans;
- // long long err_pkts;
-
+ long long time_ms[HTTPD_STAT_MAX];
long long counter[HTTPD_STAT_MAX];
- int batch; //call fieldstat_easy_counter_incrby() per batch
+ int batch[HTTPD_STAT_MAX]; //call fieldstat_easy_counter_incrby() per batch
}__attribute__ ((aligned (64)));
struct http_decoder_stat
{
+ pthread_t timer_pid;
+ long long current_time_ms;
struct fieldstat_easy *fse;
int stat_interval_pkts; // call fieldstat_incrby every stat_interval_pkts
int field_stat_id[HTTPD_STAT_MAX];
- struct hd_statistics *stats; //multi thread
+ struct hd_statistics *stats; //size is thread number
};
int http_decoder_stat_init(struct http_decoder_stat *hd_stat, int thread_max, int stat_interval_pkts, int stat_interval_time);
diff --git a/src/http_decoder_string.cpp b/src/http_decoder_string.cpp
index e10b4a0..c74030c 100644
--- a/src/http_decoder_string.cpp
+++ b/src/http_decoder_string.cpp
@@ -35,8 +35,8 @@ void http_decoder_string_refer(struct http_decoder_string *rstr,
switch (rstr->state) {
case STRING_STATE_INIT:
case STRING_STATE_CACHE:
- rstr->refer.str = (char *)at;
- rstr->refer.str_len = length;
+ rstr->refer.iov_base = (char *)at;
+ rstr->refer.iov_len = length;
break;
default:
abort();
@@ -48,60 +48,60 @@ void http_decoder_string_refer(struct http_decoder_string *rstr,
static void string_refer2cache(struct http_decoder_string *rstr)
{
- if (0 == rstr->refer.str_len) {
+ if (0 == rstr->refer.iov_len) {
return;
}
- if (rstr->cache.str_len >= rstr->max_cache_size) {
+ if (rstr->cache.iov_len >= rstr->max_cache_size) {
return;
}
- size_t length = rstr->cache.str_len + rstr->refer.str_len;
+ size_t length = rstr->cache.iov_len + rstr->refer.iov_len;
if (length > rstr->max_cache_size) {
length = rstr->max_cache_size;
}
- if (NULL == rstr->cache.str) {
- rstr->cache.str = CALLOC(char, length + 1);
- memcpy(rstr->cache.str, rstr->refer.str, length);
+ if (NULL == rstr->cache.iov_base) {
+ rstr->cache.iov_base = CALLOC(char, length + 1);
+ memcpy(rstr->cache.iov_base, rstr->refer.iov_base, length);
} else {
- rstr->cache.str = REALLOC(char, rstr->cache.str, length + 1);
- memcpy(rstr->cache.str + rstr->cache.str_len, rstr->refer.str,
- (length - rstr->cache.str_len));
+ rstr->cache.iov_base = REALLOC(char, rstr->cache.iov_base, length + 1);
+ memcpy((char *)rstr->cache.iov_base + rstr->cache.iov_len, rstr->refer.iov_base,
+ (length - rstr->cache.iov_len));
}
- rstr->cache.str_len = length;
- rstr->refer.str = NULL;
- rstr->refer.str_len = 0;
+ rstr->cache.iov_len = length;
+ rstr->refer.iov_base = NULL;
+ rstr->refer.iov_len = 0;
}
static void string_commit2cache(struct http_decoder_string *rstr)
{
- if (rstr->cache.str_len == rstr->commit.str_len &&
- rstr->cache.str == rstr->commit.str) {
- rstr->commit.str = NULL;
- rstr->commit.str_len = 0;
+ if (rstr->cache.iov_len == rstr->commit.iov_len &&
+ rstr->cache.iov_base == rstr->commit.iov_base) {
+ rstr->commit.iov_base = NULL;
+ rstr->commit.iov_len = 0;
return;
}
//Only http header key need to backward to cache
size_t length = 0;
- if (rstr->commit.str_len > rstr->max_cache_size) {
+ if (rstr->commit.iov_len > rstr->max_cache_size) {
length = rstr->max_cache_size;
} else {
- length = rstr->commit.str_len;
+ length = rstr->commit.iov_len;
}
if (length > 0) {
- if (NULL == rstr->cache.str) {
- rstr->cache.str = CALLOC(char, length + 1);
+ if (NULL == rstr->cache.iov_base) {
+ rstr->cache.iov_base = CALLOC(char, length + 1);
} else {
abort();
}
- memcpy(rstr->cache.str, rstr->commit.str, length);
- rstr->cache.str_len = length;
+ memcpy(rstr->cache.iov_base, rstr->commit.iov_base, length);
+ rstr->cache.iov_len = length;
- rstr->commit.str = NULL;
- rstr->commit.str_len = 0;
+ rstr->commit.iov_base = NULL;
+ rstr->commit.iov_len = 0;
}
}
@@ -136,24 +136,24 @@ void http_decoder_string_commit(struct http_decoder_string *rstr)
switch (rstr->state) {
case STRING_STATE_REFER:
- if (rstr->cache.str_len) {
+ if (rstr->cache.iov_len) {
http_decoder_string_cache(rstr);
- rstr->commit.str = rstr->cache.str;
- rstr->commit.str_len = rstr->cache.str_len;
- // not overwrite rstr->cache.str
+ rstr->commit.iov_base = rstr->cache.iov_base;
+ rstr->commit.iov_len = rstr->cache.iov_len;
+ // not overwrite rstr->cache.iov_base
} else {
- rstr->commit.str = rstr->refer.str;
- rstr->commit.str_len = rstr->refer.str_len;
+ rstr->commit.iov_base = rstr->refer.iov_base;
+ rstr->commit.iov_len = rstr->refer.iov_len;
- rstr->refer.str = NULL;
- rstr->refer.str_len = 0;
+ rstr->refer.iov_base = NULL;
+ rstr->refer.iov_len = 0;
}
break;
case STRING_STATE_CACHE:
- rstr->commit.str = rstr->cache.str;
- rstr->commit.str_len = rstr->cache.str_len;
- // not overwrite rstr->cache.str
+ rstr->commit.iov_base = rstr->cache.iov_base;
+ rstr->commit.iov_len = rstr->cache.iov_len;
+ // not overwrite rstr->cache.iov_base
break;
default:
//abort();
@@ -172,7 +172,7 @@ void http_decoder_string_reset(struct http_decoder_string *rstr)
case STRING_STATE_REFER:
case STRING_STATE_CACHE:
case STRING_STATE_COMMIT:
- FREE(rstr->cache.str);
+ FREE(rstr->cache.iov_base);
memset(rstr, 0, sizeof(struct http_decoder_string));
break;
default:
@@ -197,20 +197,20 @@ void http_decoder_string_reinit(struct http_decoder_string *rstr)
}
if (rstr->state == STRING_STATE_COMMIT &&
- rstr->cache.str == rstr->commit.str &&
- rstr->cache.str_len == rstr->commit.str_len) {
+ rstr->cache.iov_base == rstr->commit.iov_base &&
+ rstr->cache.iov_len == rstr->commit.iov_len) {
return;
}
- if (rstr->cache.str != NULL) {
- FREE(rstr->cache.str);
- rstr->cache.str_len = 0;
+ if (rstr->cache.iov_base != NULL) {
+ FREE(rstr->cache.iov_base);
+ rstr->cache.iov_len = 0;
}
- rstr->refer.str = NULL;
- rstr->refer.str_len = 0;
- rstr->commit.str = NULL;
- rstr->commit.str_len = 0;
+ rstr->refer.iov_base = NULL;
+ rstr->refer.iov_len = 0;
+ rstr->commit.iov_base = NULL;
+ rstr->commit.iov_len = 0;
rstr->state = STRING_STATE_INIT;
}
@@ -219,18 +219,18 @@ enum string_state http_decoder_string_state(const struct http_decoder_string *rs
return rstr->state;
}
-int http_decoder_string_get(const struct http_decoder_string *rstr, struct hstring *out)
+int http_decoder_string_get(const struct http_decoder_string *rstr, hstring *out)
{
if (NULL == rstr || NULL == out) {
return -1;
}
if (http_decoder_string_state(rstr) == STRING_STATE_COMMIT) {
- out->str = rstr->commit.str;
- out->str_len = rstr->commit.str_len;
+ out->iov_base = rstr->commit.iov_base;
+ out->iov_len = rstr->commit.iov_len;
} else {
- out->str = NULL;
- out->str_len = 0;
+ out->iov_base = NULL;
+ out->iov_len = 0;
}
return 0;
@@ -242,15 +242,15 @@ void http_decoder_string_dump(struct http_decoder_string *rstr, const char *desc
return;
}
- char *refer_str = safe_dup(rstr->refer.str, rstr->refer.str_len);
- char *cache_str = safe_dup(rstr->cache.str, rstr->cache.str_len);
- char *commit_str = safe_dup(rstr->commit.str, rstr->commit.str_len);
+ char *refer_str = safe_dup((char *)rstr->refer.iov_base, rstr->refer.iov_len);
+ char *cache_str = safe_dup((char *)rstr->cache.iov_base, rstr->cache.iov_len);
+ char *commit_str = safe_dup((char *)rstr->commit.iov_base, rstr->commit.iov_len);
- printf("%s: state: %s, refer: {len: %02zu, str: %s}, cache: {len: %02zu, str: %s}, commit: {len: %02zu, str: %s}\n",
+ printf("%s: state: %s, refer: {len: %02zu, iov_base: %s}, cache: {len: %02zu, iov_base: %s}, commit: {len: %02zu, iov_base: %s}\n",
desc, string_state_to_desc(rstr->state),
- rstr->refer.str_len, refer_str,
- rstr->cache.str_len, cache_str,
- rstr->commit.str_len, commit_str);
+ rstr->refer.iov_len, refer_str,
+ rstr->cache.iov_len, cache_str,
+ rstr->commit.iov_len, commit_str);
FREE(refer_str);
FREE(cache_str);
diff --git a/src/http_decoder_string.h b/src/http_decoder_string.h
index 4c95960..9fe82e1 100644
--- a/src/http_decoder_string.h
+++ b/src/http_decoder_string.h
@@ -44,9 +44,9 @@ enum string_state {
//http decoder string
struct http_decoder_string {
- struct hstring refer; // shallow copy
- struct hstring cache; // deep copy
- struct hstring commit;
+ hstring refer; // shallow copy
+ hstring cache; // deep copy
+ hstring commit;
enum string_state state;
size_t max_cache_size;
@@ -68,7 +68,7 @@ void http_decoder_string_reinit(struct http_decoder_string *rstr);
enum string_state http_decoder_string_state(const struct http_decoder_string *rstr);
-int http_decoder_string_get(const struct http_decoder_string *rstr, struct hstring *out);
+int http_decoder_string_get(const struct http_decoder_string *rstr, hstring *out);
void http_decoder_string_dump(struct http_decoder_string *rstr, const char *desc);
#endif \ No newline at end of file
diff --git a/src/http_decoder_table.cpp b/src/http_decoder_table.cpp
index d1fc6e9..0ed6313 100644
--- a/src/http_decoder_table.cpp
+++ b/src/http_decoder_table.cpp
@@ -76,30 +76,30 @@ void http_decoder_table_free(struct http_decoder_table *table)
if (NULL == table) {
return;
}
- if (table->uri.cache.str != NULL) {
- FREE(table->uri.cache.str);
+ if (table->uri.cache.iov_base != NULL) {
+ FREE(table->uri.cache.iov_base);
}
- if (table->status.cache.str != NULL) {
- FREE(table->status.cache.str);
+ if (table->status.cache.iov_base != NULL) {
+ FREE(table->status.cache.iov_base);
}
- if (table->method.cache.str != NULL) {
- FREE(table->method.cache.str);
+ if (table->method.cache.iov_base != NULL) {
+ FREE(table->method.cache.iov_base);
}
- if (table->version.cache.str != NULL) {
- FREE(table->version.cache.str);
+ if (table->version.cache.iov_base != NULL) {
+ FREE(table->version.cache.iov_base);
}
- if (table->body.cache.str != NULL) {
- FREE(table->body.cache.str);
+ if (table->body.cache.iov_base != NULL) {
+ FREE(table->body.cache.iov_base);
}
if (table->headers != NULL) {
for (size_t i = 0; i < table->header_cnt; i++) {
- if (table->headers[i].key.cache.str != NULL) {
- FREE(table->headers[i].key.cache.str);
+ if (table->headers[i].key.cache.iov_base != NULL) {
+ FREE(table->headers[i].key.cache.iov_base);
}
- if (table->headers[i].val.cache.str != NULL) {
- FREE(table->headers[i].val.cache.str);
+ if (table->headers[i].val.cache.iov_base != NULL) {
+ FREE(table->headers[i].val.cache.iov_base);
}
}
@@ -381,7 +381,7 @@ void http_decoder_table_dump(struct http_decoder_table *table)
}
}
-int http_decoder_table_get_uri(const struct http_decoder_table *table, struct hstring *out)
+int http_decoder_table_get_uri(const struct http_decoder_table *table, hstring *out)
{
if (NULL == table || NULL == out) {
return -1;
@@ -389,7 +389,7 @@ int http_decoder_table_get_uri(const struct http_decoder_table *table, struct hs
return http_decoder_string_get(&table->uri, out);
}
-int http_decoder_table_get_method(const struct http_decoder_table *table, struct hstring *out)
+int http_decoder_table_get_method(const struct http_decoder_table *table, hstring *out)
{
if (NULL == table || NULL == out) {
return -1;
@@ -397,7 +397,7 @@ int http_decoder_table_get_method(const struct http_decoder_table *table, struct
return http_decoder_string_get(&table->method, out);
}
-int http_decoder_table_get_status(const struct http_decoder_table *table, struct hstring *out)
+int http_decoder_table_get_status(const struct http_decoder_table *table, hstring *out)
{
if (NULL == table || NULL == out) {
return -1;
@@ -405,7 +405,7 @@ int http_decoder_table_get_status(const struct http_decoder_table *table, struct
return http_decoder_string_get(&table->status, out);
}
-int http_decoder_table_get_version(const struct http_decoder_table *table, struct hstring *out)
+int http_decoder_table_get_version(const struct http_decoder_table *table, hstring *out)
{
if (NULL == table || NULL == out) {
return -1;
@@ -413,7 +413,7 @@ int http_decoder_table_get_version(const struct http_decoder_table *table, struc
return http_decoder_string_get(&table->version, out);
}
-int http_decoder_table_get_body(const struct http_decoder_table *table, struct hstring *out)
+int http_decoder_table_get_body(const struct http_decoder_table *table, hstring *out)
{
if (NULL == table || NULL == out) {
return -1;
@@ -421,22 +421,22 @@ int http_decoder_table_get_body(const struct http_decoder_table *table, struct h
return http_decoder_string_get(&table->body, out);
}
-int http_decoder_table_get_header(const struct http_decoder_table *table, const struct hstring *key,
+int http_decoder_table_get_header(const struct http_decoder_table *table, const hstring *key,
struct http_header *hdr_result)
{
for (size_t i = 0; i < table->header_cnt; i++) {
const struct http_decoder_header *tmp_header = &table->headers[i];
- if (tmp_header->key.commit.str_len != key->str_len) {
+ if (tmp_header->key.commit.iov_len != key->iov_len) {
continue;
}
if (http_decoder_string_state(&tmp_header->key) == STRING_STATE_COMMIT &&
http_decoder_string_state(&tmp_header->val) == STRING_STATE_COMMIT) {
- struct hstring tmp_key;
+ hstring tmp_key;
http_decoder_string_get(&tmp_header->key, &tmp_key);
- if (tmp_key.str_len == key->str_len &&
- (0 == strncasecmp(tmp_key.str, key->str, key->str_len))) {
+ if (tmp_key.iov_len == key->iov_len &&
+ (0 == strncasecmp((char *)tmp_key.iov_base, (char *)key->iov_base, key->iov_len))) {
http_decoder_string_get(&tmp_header->key, &hdr_result->key);
http_decoder_string_get(&tmp_header->val, &hdr_result->val);
return 0;
@@ -468,10 +468,10 @@ int http_decoder_table_iter_header(struct http_decoder_table *table,
}
}
- hdr->key.str = NULL;
- hdr->key.str_len = 0;
- hdr->val.str = NULL;
- hdr->val.str_len = 0;
+ hdr->key.iov_base = NULL;
+ hdr->key.iov_len = 0;
+ hdr->val.iov_base = NULL;
+ hdr->val.iov_len = 0;
return -1;
}
diff --git a/src/http_decoder_table.h b/src/http_decoder_table.h
index 1272d3a..4c7792a 100644
--- a/src/http_decoder_table.h
+++ b/src/http_decoder_table.h
@@ -36,18 +36,18 @@ void http_decoder_table_reinit(struct http_decoder_table *table);
void http_decoder_table_dump(struct http_decoder_table *table);
-int http_decoder_table_get_uri(const struct http_decoder_table *table, struct hstring *out);
+int http_decoder_table_get_uri(const struct http_decoder_table *table, hstring *out);
-int http_decoder_table_get_method(const struct http_decoder_table *table, struct hstring *out);
+int http_decoder_table_get_method(const struct http_decoder_table *table, hstring *out);
-int http_decoder_table_get_status(const struct http_decoder_table *table, struct hstring *out);
+int http_decoder_table_get_status(const struct http_decoder_table *table, hstring *out);
-int http_decoder_table_get_version(const struct http_decoder_table *table, struct hstring *out);
+int http_decoder_table_get_version(const struct http_decoder_table *table, hstring *out);
-int http_decoder_table_get_body(const struct http_decoder_table *table, struct hstring *out);
+int http_decoder_table_get_body(const struct http_decoder_table *table, hstring *out);
int http_decoder_table_get_header(const struct http_decoder_table *table,
- const struct hstring *key,
+ const hstring *key,
struct http_header *hdr_res);
int http_decoder_table_iter_header(struct http_decoder_table *table,
diff --git a/src/http_decoder_tunnel.cpp b/src/http_decoder_tunnel.cpp
new file mode 100644
index 0000000..67445f3
--- /dev/null
+++ b/src/http_decoder_tunnel.cpp
@@ -0,0 +1,99 @@
+#include <assert.h>
+#include <stdio.h>
+#include <string.h>
+#include <strings.h>
+#include <unistd.h>
+#include "http_decoder_inc.h"
+#include "llhttp.h"
+
+struct http_tunnel_message
+{
+ enum http_tunnel_message_type type;
+ hstring tunnel_payload;
+};
+
+
+int httpd_tunnel_identify(int curdir, struct http_decoder_half_data *hfdata)
+{
+ if(PACKET_DIRECTION_C2S == curdir){
+ struct http_request_line reqline = {};
+ http_decoder_half_data_get_request_line(hfdata, &reqline);
+ if(0 == strncasecmp_safe("CONNECT", (char *)reqline.method.iov_base,
+ 7, reqline.method.iov_len)){
+ return 1;
+ }
+ }else{
+ struct http_response_line resline = {};
+ http_decoder_half_data_get_response_line(hfdata, &resline);
+ if(resline.status_code == HTTP_STATUS_OK
+ && 0 == strncasecmp_safe("Connection established", (char *)resline.status.iov_base,
+ strlen("Connection established"), resline.status.iov_len)){
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
+int httpd_is_tunnel_session(const struct http_decoder_exdata *ex_data)
+{
+ return (ex_data->tunnel_state != HTTP_TUN_NON);
+}
+
+int httpd_in_tunnel_transmitting(struct http_decoder_exdata *ex_data)
+{
+ return (ex_data->tunnel_state >= HTTP_TUN_INNER_STARTING);
+}
+
+enum http_tunnel_message_type httpd_tunnel_state_to_msg(const struct http_decoder_exdata *ex_data)
+{
+ if(ex_data->tunnel_state == HTTP_TUN_INNER_STARTING){
+ return HTTP_TUNNEL_OPENING;
+ }
+ if(ex_data->tunnel_state == HTTP_TUN_INNER_TRANS){
+ return HTTP_TUNNEL_ACTIVE;
+ }
+ return HTTP_TUNNEL_MSG_MAX;
+}
+
+void httpd_tunnel_state_update(struct http_decoder_exdata *ex_data)
+{
+ if(ex_data->tunnel_state == HTTP_TUN_INNER_STARTING){
+ ex_data->tunnel_state = HTTP_TUN_INNER_TRANS;
+ }
+}
+
+void http_decoder_push_tunnel_data(struct session *sess, const struct http_decoder_exdata *exdata, enum http_tunnel_message_type type)
+{
+ struct http_tunnel_message *tmsg = (struct http_tunnel_message *)CALLOC(struct http_tunnel_message, 1);
+ tmsg->type = type;
+ size_t payload_len;
+ const char *payload = session_get0_current_payload(sess, &payload_len);
+ tmsg->tunnel_payload.iov_base = (char *)payload;
+ tmsg->tunnel_payload.iov_len = payload_len;
+ session_mq_publish_message(sess, exdata->pub_topic_id, tmsg);
+}
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+void http_tunnel_message_get_payload(const struct http_tunnel_message *tmsg,
+ hstring *tunnel_payload)
+{
+ if (unlikely(NULL == tmsg || tunnel_payload == NULL))
+ {
+ return;
+ }
+ tunnel_payload->iov_base = tmsg->tunnel_payload.iov_base;
+ tunnel_payload->iov_len = tmsg->tunnel_payload.iov_len;
+}
+
+enum http_tunnel_message_type http_tunnel_message_type_get(const struct http_tunnel_message *tmsg)
+{
+ return tmsg->type;
+}
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/src/http_decoder_tunnel.h b/src/http_decoder_tunnel.h
new file mode 100644
index 0000000..b1e74df
--- /dev/null
+++ b/src/http_decoder_tunnel.h
@@ -0,0 +1,21 @@
+#pragma once
+
+#include "http_decoder_half.h"
+#include "llhttp.h"
+
+enum http_tunnel_state{
+ HTTP_TUN_NON = 0, // init, or not tunnel session
+ HTTP_TUN_C2S_HDR_START, //CONNECT ...
+ HTTP_TUN_C2S_HDR_END, //CONNECT request all heades end with \r\n
+ HTTP_TUN_S2C_START, // HTTP 200 connet established
+ // HTTP_TUN_S2C_END, // http response all heades end with \r\n
+ HTTP_TUN_INNER_STARTING, // http inner tunnel protocol starting
+ HTTP_TUN_INNER_TRANS, // http inner tunnel protocol transmitting
+};
+
+int httpd_tunnel_identify(int curdir, struct http_decoder_half_data *hfdata);
+int httpd_is_tunnel_session(const struct http_decoder_exdata *ex_data);
+int httpd_in_tunnel_transmitting(struct http_decoder_exdata *ex_data);
+void httpd_tunnel_state_update(struct http_decoder_exdata *ex_data);
+void http_decoder_push_tunnel_data(struct session *sess, const struct http_decoder_exdata *exdata, enum http_tunnel_message_type type);
+enum http_tunnel_message_type httpd_tunnel_state_to_msg(const struct http_decoder_exdata *ex_data); \ No newline at end of file
diff --git a/src/http_decoder_utils.cpp b/src/http_decoder_utils.cpp
index 5686e2d..6d71bdb 100644
--- a/src/http_decoder_utils.cpp
+++ b/src/http_decoder_utils.cpp
@@ -12,6 +12,17 @@ char *safe_dup(const char *str, size_t len)
return dup;
}
+int strncasecmp_safe(const char *fix_s1, const char *dyn_s2, size_t fix_n1, size_t dyn_n2)
+{
+ if (fix_s1 == NULL || dyn_s2 == NULL) {
+ return -1;
+ }
+ if(fix_n1 != dyn_n2){
+ return -1;
+ }
+ return strncasecmp(fix_s1, dyn_s2, fix_n1);
+}
+
const char *http_message_type_to_string(enum http_message_type type)
{
const char *sname = "unknown_msg_type";
@@ -134,4 +145,13 @@ int http_event_is_req(enum http_event event)
break;
}
return -1;
+}
+
+int stellar_session_mq_get_topic_id_reliable(struct stellar *st, const char *topic_name, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
+{
+ int topic_id = stellar_session_mq_get_topic_id(st, topic_name);
+ if(topic_id < 0){
+ topic_id = stellar_session_mq_create_topic(st, topic_name, msg_free_cb, msg_free_arg);
+ }
+ return topic_id;
} \ No newline at end of file
diff --git a/src/http_decoder_utils.h b/src/http_decoder_utils.h
index 0661641..5b09d50 100644
--- a/src/http_decoder_utils.h
+++ b/src/http_decoder_utils.h
@@ -5,9 +5,11 @@
#include <stdio.h>
char *safe_dup(const char *str, size_t len);
+int strncasecmp_safe(const char *fix_s1, const char *dyn_s2, size_t fix_n1, size_t dyn_n2);
const char *http_message_type_to_string(enum http_message_type type);
int http_message_type_is_req(struct session *sess, enum http_message_type msg_type);
int http_event_is_req(enum http_event event);
+int stellar_session_mq_get_topic_id_reliable(struct stellar *st, const char *topic_name, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
/******************************************************************************
* Logger
******************************************************************************/
diff --git a/src/version.map b/src/version.map
index be433c2..15d1d95 100644
--- a/src/version.map
+++ b/src/version.map
@@ -5,6 +5,7 @@ global:
http_decoder_init;
http_decoder_exit;
http_decoder_tcp_stream_msg_cb;
+ http_tunnel_message_*;
};
local: *;
}; \ No newline at end of file
diff --git a/test/http_decoder_gtest.cpp b/test/http_decoder_gtest.cpp
index b2677fc..27fb0e6 100644
--- a/test/http_decoder_gtest.cpp
+++ b/test/http_decoder_gtest.cpp
@@ -17,6 +17,7 @@ extern "C"
int commit_test_result_json(cJSON *node, const char *name);
extern void http_decoder_test_entry(struct session *sess, int topic_id, const void *raw_msg, void *no_use_ctx, void *plugin_env);
extern void http_decoder_test_state_entry(struct session *sess, int topic_id, const void *raw_msg, void *no_use_ctx, void *plugin_env);
+ extern void http_decoder_tunnel_entry(struct session *sess, int topic_id, const void *raw_msg, void *no_use_ctx, void *plugin_env);
static on_session_msg_cb_func *g_entry_fun = http_decoder_test_entry;
}
#endif
@@ -32,6 +33,7 @@ struct plug_entry_t{
static struct plug_entry_t g_entry_tbl[] = {
{"http_decoder_test_entry", http_decoder_test_entry},
{"http_decoder_test_state_entry", http_decoder_test_state_entry},
+ {"http_decoder_tunnel_entry", http_decoder_tunnel_entry},
{NULL, NULL}
};
@@ -61,26 +63,26 @@ static int g_topic_id = -1;
void output_http_req_line(struct http_request_line *req_line)
{
char tmp_str[MAX_KEY_STR_LEN] = {0};
- memcpy(tmp_str, req_line->method.str, req_line->method.str_len);
+ memcpy(tmp_str, req_line->method.iov_base, req_line->method.iov_len);
printf("req_method:%s\n", tmp_str);
memset(tmp_str, 0, sizeof(tmp_str));
- memcpy(tmp_str, req_line->uri.str, req_line->uri.str_len);
+ memcpy(tmp_str, req_line->uri.iov_base, req_line->uri.iov_len);
printf("req_uri:%s\n", tmp_str);
memset(tmp_str, 0, sizeof(tmp_str));
- memcpy(tmp_str, req_line->version.str, req_line->version.str_len);
+ memcpy(tmp_str, req_line->version.iov_base, req_line->version.iov_len);
printf("req_version:%s\n", tmp_str);
}
void output_http_res_line(struct http_response_line *res_line)
{
char tmp_str[MAX_KEY_STR_LEN] = {0};
- memcpy(tmp_str, res_line->version.str, res_line->version.str_len);
+ memcpy(tmp_str, res_line->version.iov_base, res_line->version.iov_len);
printf("res_version:%s\n", tmp_str);
memset(tmp_str, 0, sizeof(tmp_str));
- memcpy(tmp_str, res_line->status.str, res_line->status.str_len);
+ memcpy(tmp_str, res_line->status.iov_base, res_line->status.iov_len);
printf("res_status:%s\n", tmp_str);
}
@@ -89,42 +91,42 @@ void output_http_header(struct http_header *header)
char tmp_key[MAX_KEY_STR_LEN] = {0};
char tmp_val[MAX_KEY_STR_LEN] = {0};
- memcpy(tmp_key, header->key.str, header->key.str_len);
- memcpy(tmp_val, header->val.str, header->val.str_len);
+ memcpy(tmp_key, header->key.iov_base, header->key.iov_len);
+ memcpy(tmp_val, header->val.iov_base, header->val.iov_len);
printf("<%s:%s>\n", tmp_key, tmp_val);
}
-void output_http_body(struct hstring *body, int decompress_flag)
+void output_http_body(hstring *body, int decompress_flag)
{
int counter = 0;
if (1 == decompress_flag)
{
printf("\n\n----------------decompress body len:%zu---------------\n",
- body->str_len);
+ body->iov_len);
}
else
{
printf("\n\n----------------raw body len:%zu---------------\n",
- body->str_len);
+ body->iov_len);
}
- for (size_t i = 0; i < body->str_len; i++)
+ for (size_t i = 0; i < body->iov_len; i++)
{
if (counter % 16 == 0)
{
printf("\n");
}
- printf("%02x ", (unsigned char)body->str[i]);
+ printf("%02x ", (unsigned char)body->iov_base[i]);
counter++;
}
printf("\n");
}
#endif
-static void append_http_payload(struct session *sess, struct gtest_plug_exdata_t *gtest_plug_exdata, const struct hstring *body, enum http_transaction_type type)
+static void append_http_payload(struct session *sess, struct gtest_plug_exdata_t *gtest_plug_exdata, const hstring *body, enum http_transaction_type type)
{
- if (NULL == body->str || 0 == body->str_len)
+ if (NULL == body->iov_base || 0 == body->iov_len)
{
return;
}
@@ -133,7 +135,7 @@ static void append_http_payload(struct session *sess, struct gtest_plug_exdata_t
gtest_plug_exdata->md5ctx[type] = MMALLOC(MD5_CTX, sizeof(MD5_CTX));
MD5Init(gtest_plug_exdata->md5ctx[type]);
}
- MD5Update(gtest_plug_exdata->md5ctx[type], (unsigned char *)body->str, body->str_len);
+ MD5Update(gtest_plug_exdata->md5ctx[type], (unsigned char *)body->iov_base, body->iov_len);
}
int http_field_to_json(cJSON *object, const char *key, char *val, size_t val_len)
@@ -158,11 +160,11 @@ void transaction_index_to_json(cJSON *ctx, int transaction_index)
void req_line_to_json(cJSON *ctx, struct http_request_line *req_line)
{
- http_field_to_json(ctx, "method", req_line->method.str,
- req_line->method.str_len);
- http_field_to_json(ctx, "uri", req_line->uri.str, req_line->uri.str_len);
- http_field_to_json(ctx, "req_version", req_line->version.str,
- req_line->version.str_len);
+ http_field_to_json(ctx, "method", (char *)req_line->method.iov_base,
+ req_line->method.iov_len);
+ http_field_to_json(ctx, "uri", (char *)req_line->uri.iov_base, req_line->uri.iov_len);
+ http_field_to_json(ctx, "req_version", (char *)req_line->version.iov_base,
+ req_line->version.iov_len);
cJSON_AddNumberToObject(ctx, "major_version", req_line->major_version);
cJSON_AddNumberToObject(ctx, "minor_version", req_line->minor_version);
@@ -170,10 +172,10 @@ void req_line_to_json(cJSON *ctx, struct http_request_line *req_line)
void res_line_to_json(cJSON *ctx, struct http_response_line *res_line)
{
- http_field_to_json(ctx, "res_version", res_line->version.str,
- res_line->version.str_len);
- http_field_to_json(ctx, "res_status", res_line->status.str,
- res_line->status.str_len);
+ http_field_to_json(ctx, "res_version", (char *)res_line->version.iov_base,
+ res_line->version.iov_len);
+ http_field_to_json(ctx, "res_status", (char *)res_line->status.iov_base,
+ res_line->status.iov_len);
cJSON_AddNumberToObject(ctx, "major_version", res_line->major_version);
cJSON_AddNumberToObject(ctx, "minor_version", res_line->minor_version);
@@ -183,25 +185,27 @@ void res_line_to_json(cJSON *ctx, struct http_response_line *res_line)
void http_header_to_json(cJSON *ctx, struct http_header *header)
{
char key[MAX_KEY_STR_LEN] = {0};
+ assert(header->key.iov_base);
+ assert(header->val.iov_base);
- memcpy(key, header->key.str, header->key.str_len);
+ memcpy(key, header->key.iov_base, header->key.iov_len);
if (cJSON_HasObjectItem(ctx, key) == FALSE)
{
- http_field_to_json(ctx, key, header->val.str, header->val.str_len);
+ http_field_to_json(ctx, key, (char *)header->val.iov_base, header->val.iov_len);
}
else
{
// ctx already has the key, so rename key by key%d
char new_key[MAX_KEY_STR_LEN] = {0};
sprintf(new_key, "%s%d", key, g_header_count++);
- http_field_to_json(ctx, new_key, header->val.str, header->val.str_len);
+ http_field_to_json(ctx, new_key, (char *)header->val.iov_base, header->val.iov_len);
}
}
void http_url_add_to_json(cJSON *ctx, struct http_message *msg)
{
- struct hstring url_result = {};
+ hstring url_result = {};
if (cJSON_GetObjectItem(ctx, GTEST_HTTP_URL_NAME))
{
@@ -209,16 +213,16 @@ void http_url_add_to_json(cJSON *ctx, struct http_message *msg)
}
http_message_get_url(msg, &url_result);
- if(url_result.str == NULL)
+ if(url_result.iov_base == NULL)
{
- // printf("url:%s\n", url_result.str);
+ // printf("url:%s\n", url_result.iov_base);
return;
}
struct http_header url_header_result = {};
- url_header_result.key.str = (char *)GTEST_HTTP_URL_NAME;
- url_header_result.key.str_len = strlen(GTEST_HTTP_URL_NAME);
+ url_header_result.key.iov_base = (char *)GTEST_HTTP_URL_NAME;
+ url_header_result.key.iov_len = strlen(GTEST_HTTP_URL_NAME);
url_header_result.val = url_result;
http_header_to_json(ctx, &url_header_result);
@@ -289,7 +293,7 @@ static void http_decoder_test_update_session_tuple4(struct session *sess, struct
}
}
-static int get_gtest_plug_entry(const char *cfg_path, char *entry_name, int max_len)
+static int get_gtest_plug_entry(const char *cfg_path, char *entry_name, int max_len, char *topic_name, int topic_max_len)
{
FILE *fp = fopen(cfg_path, "r");
if (NULL == fp)
@@ -320,6 +324,14 @@ static int get_gtest_plug_entry(const char *cfg_path, char *entry_name, int max_
strncpy(entry_name, str_val.u.s, max_len);
free(str_val.u.s);
}
+
+ toml_datum_t topic_str_val = toml_string_in(basic_sec_tbl, "topic");
+ if (str_val.ok != 0)
+ {
+ strncpy(topic_name, topic_str_val.u.s, topic_max_len);
+ free(topic_str_val.u.s);
+ }
+
toml_free(root);
return 0;
}
@@ -345,7 +357,7 @@ extern "C" void http_decoder_test_entry(struct session *sess, int topic_id, cons
struct http_request_line req_line = {0};
struct http_response_line res_line = {0};
struct http_header header = {0};
- struct hstring body = {0};
+ hstring body = {0};
struct http_message *msg = (struct http_message *)raw_msg;
enum http_message_type msg_type = http_message_type_get(msg);
@@ -489,30 +501,83 @@ extern "C" void http_decoder_test_state_entry(struct session *sess, int topic_id
return;
}
+extern "C" void http_decoder_tunnel_entry(struct session *sess, int topic_id, const void *raw_msg, void *no_use_ctx, void *plugin_env)
+{
+ struct gtest_plug_exdata_t *gtest_plug_exdata;
+ enum http_tunnel_message_type tmsg_type = http_tunnel_message_type_get((const struct http_tunnel_message *)raw_msg);
+ static size_t req_payload_block = 0, req_payload_size = 0;
+ static size_t res_payload_block = 0, res_payload_size = 0;
+ gtest_plug_exdata = (struct gtest_plug_exdata_t *)session_exdata_get(sess, g_exdata_idx);
+
+ switch(tmsg_type){
+ case HTTP_TUNNEL_OPENING:
+ {
+ if (NULL == gtest_plug_exdata)
+ {
+ gtest_plug_exdata = (struct gtest_plug_exdata_t *)calloc(1, sizeof(struct gtest_plug_exdata_t));
+ session_exdata_set(sess, g_exdata_idx, gtest_plug_exdata);
+ }
+ const char *human_addr_cstr = session_get0_readable_addr(sess);
+ gtest_plug_exdata->result_jnode[HTTP_TRANSACTION_REQ] = cJSON_CreateObject();
+ gtest_plug_exdata->result_jnode[HTTP_TRANSACTION_RES] = cJSON_CreateObject();
+ gtest_plug_exdata->result_jnode[HTTP_TRANSACTION_SESSION] = cJSON_CreateObject();
+ cJSON_AddStringToObject(gtest_plug_exdata->result_jnode[HTTP_TRANSACTION_SESSION], GTEST_HTTP_TUPLE4_NAME, human_addr_cstr);
+ commit_test_result_json(gtest_plug_exdata->result_jnode[HTTP_TRANSACTION_SESSION], "TUNNEL_NEW");
+ }
+ // OPENING state has payload, go on
+
+ case HTTP_TUNNEL_ACTIVE:
+ {
+ int curdir = packet_get_direction(session_get0_current_packet(sess));
+ hstring tunnel_payload = {};
+ http_tunnel_message_get_payload((const struct http_tunnel_message *)raw_msg, &tunnel_payload);
+ if(PACKET_DIRECTION_C2S == curdir){
+ req_payload_block++;
+ req_payload_size += tunnel_payload.iov_len;
+ }else{
+ res_payload_block++;
+ res_payload_size += tunnel_payload.iov_len;
+ }
+ }
+ break;
+ case HTTP_TUNNEL_CLOSING:
+ {
+ cJSON_AddStringToObject(gtest_plug_exdata->result_jnode[HTTP_TRANSACTION_REQ], "flow", "C2S");
+ cJSON_AddStringToObject(gtest_plug_exdata->result_jnode[HTTP_TRANSACTION_RES], "flow", "S2C");
+ cJSON_AddNumberToObject(gtest_plug_exdata->result_jnode[HTTP_TRANSACTION_REQ], "payload_block", req_payload_block);
+ cJSON_AddNumberToObject(gtest_plug_exdata->result_jnode[HTTP_TRANSACTION_REQ], "payload_size", req_payload_size);
+ cJSON_AddNumberToObject(gtest_plug_exdata->result_jnode[HTTP_TRANSACTION_RES], "payload_block", res_payload_block);
+ cJSON_AddNumberToObject(gtest_plug_exdata->result_jnode[HTTP_TRANSACTION_RES], "payload_size", res_payload_size);
+ }
+ break;
+ default:
+ assert(0);
+ break;
+ }
+}
+
extern "C" void *http_decoder_test_init(struct stellar *st)
{
g_http_gtest_plugin_id = stellar_session_plugin_register(st, NULL, NULL, NULL);
- g_exdata_idx = stellar_session_exdata_new_index(st, "HTTP_DECODER_REQ_TEST",
- http_decoder_test_exdata_free,
- NULL);
+ g_exdata_idx = stellar_session_exdata_new_index(st, "HTTP_DECODER_GTEST_EXDATA",http_decoder_test_exdata_free, NULL);
if (g_exdata_idx < 0)
{
printf("[%s:%d]: can't get http_decoder exdata index !!!\n", __FUNCTION__, __LINE__);
return NULL;
}
- g_topic_id = stellar_session_mq_get_topic_id(st, HTTP_DECODER_TOPIC);
+ char entry_name[64] = "";
+ char topic_name[64] = "";
+ get_gtest_plug_entry(GTEST_PLUG_ENTRY_CFG, entry_name, sizeof(entry_name)-1, topic_name, sizeof(topic_name)-1);
+ set_gtest_plug_entry(entry_name);
+ g_topic_id = stellar_session_mq_get_topic_id(st, topic_name);
if (g_topic_id < 0)
{
- printf("[%s:%d]: can't get http_decoder topic id !!!\n", __FUNCTION__, __LINE__);
+ printf("[%s:%d]: can't get http_decoder topic:%s id !!!\n", __FUNCTION__, __LINE__, topic_name);
return NULL;
- }
-
- char entry_name[64] = "";
- get_gtest_plug_entry(GTEST_PLUG_ENTRY_CFG, entry_name, sizeof(entry_name)-1);
- set_gtest_plug_entry(entry_name);
+ }
stellar_session_mq_subscribe(st, g_topic_id, g_entry_fun, g_http_gtest_plugin_id);
- printf("http_decoder_test_init succ, plugin_id:%d, topic_id:%d\n", g_http_gtest_plugin_id, g_topic_id);
+ printf("http_decoder_gtest_init succ, plugin_id:%d, sub_topic_id:%d\n", g_http_gtest_plugin_id, g_topic_id);
return (void *)strdup("http_decoder_test_ctx");
}
@@ -523,6 +588,5 @@ extern "C" void http_decoder_test_exit(void *test_ctx)
{
FREE(test_ctx);
}
- // update_config_file(GTEST_PLUG_ENTRY_CFG, "name", "\\x22http_decoder_test_entry\\x22");
printf("http_decoder_test_exit OK!\n");
} \ No newline at end of file
diff --git a/test/http_decoder_perf_main.cpp b/test/http_decoder_perf_main.cpp
index 5cfcf2c..5d1b0f2 100644
--- a/test/http_decoder_perf_main.cpp
+++ b/test/http_decoder_perf_main.cpp
@@ -24,6 +24,13 @@
#define TIME_DIFF()
#endif
+static struct http_topic_exdata_compose g_topic_exdata_set[] =
+{
+ {HTTPD_TOPIC_TCP_STREAM_INDEX, TOPIC_TCP_STREAM, NULL, NULL, "HTTP_DECODER_EXDATA_BASEON_TCP_STREAM", NULL, 0, 0},
+ {HTTPD_TOPIC_HTTP_MSG_INDEX, HTTP_DECODER_TOPIC, NULL, NULL, NULL, NULL, 1, 1},
+ {HTTPD_TOPIC_HTTP_TUNNEL_INDEX, HTTP_DECODER_TUNNEL_TOPIC, NULL, NULL, "HTTP_DECODER_EXDATA_BASEON_HTTP_TUNNEL", NULL, 2, 2},
+};
+
struct packet{
const char *payload;
size_t payload_len;
@@ -43,6 +50,7 @@ struct stellar{
struct session{
struct stellar *st;
+ enum session_state sess_state;
struct session_addr addr;
void *context;
void *exdata;
@@ -52,7 +60,7 @@ struct session{
extern "C" void *http_decoder_init(struct stellar *st);
extern "C" void http_decoder_exit(void *plugin_env);
-extern "C" void *_httpd_session_ctx_new_cb(struct session *sess, void *plugin_env);
+extern "C" void *httpd_session_ctx_new_cb(struct session *sess, void *plugin_env);
extern "C" void _httpd_ex_data_free_cb(struct session *s, int idx,void *ex_data, void *arg);
extern "C" void http_decoder_tcp_stream_msg_cb(struct session *sess, int topic_id, const void *msg, void *no_use_ctx, void *plugin_env);
extern "C" void http_decoder_perf_entry(struct session *sess, int topic_id, const void *raw_msg, void *per_session_ctx, void *plugin_env);
@@ -84,6 +92,10 @@ int stellar_get_current_thread_id(struct stellar *st)
{
return 0;
}
+int session_get_current_thread_id(struct session *sess)
+{
+ return 0;
+}
int stellar_session_plugin_register(struct stellar *st,
session_ctx_new_func session_ctx_new,
session_ctx_free_func session_ctx_free,
@@ -108,31 +120,31 @@ void *session_exdata_get(struct session *sess, int idx)
enum session_state session_get_current_state(struct session *sess)
{
- return SESSION_STATE_ACTIVE;
+ return sess->sess_state;
}
int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name)
{
- if(strcmp(topic_name, "HTTP_DECODER_MESSAGE") == 0){
- return st->publish_topic_id;
- }
- if(strcmp(topic_name, "TCP_STREAM") == 0){
- return st->consumer_topid_id;
+ for(int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++){
+ if(strcmp(topic_name, g_topic_exdata_set[i].topic_name) == 0){
+ return st->consumer_topid_id;
+ }
}
- assert(0);
return -1;
}
int stellar_session_mq_create_topic(struct stellar *st, const char *topic_name, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
{
- if(strcmp(topic_name, "HTTP_DECODER_MESSAGE") == 0){
- st->publish_topic_id = 1;
- st->publish_msg_free_cb = msg_free_cb;
- return 1;
- }
- if(strcmp(topic_name, "TCP_STREAM") == 0){
- st->consumer_topid_id = 2;
- return 2;
+
+ for(int i = 0; i < HTTPD_TOPIC_INDEX_MAX; i++){
+ if(strcmp(topic_name, g_topic_exdata_set[i].topic_name) == 0){
+ st->consumer_topid_id = g_topic_exdata_set[i].sub_topic_id;
+ st->publish_msg_free_cb = msg_free_cb;
+ if(strcmp(topic_name, "TCP_STREAM") == 0){
+ st->publish_topic_id = g_topic_exdata_set[HTTPD_TOPIC_HTTP_MSG_INDEX].sub_topic_id;
+ }
+ return st->consumer_topid_id;
+ }
}
return -1;
}
@@ -179,9 +191,14 @@ int session_is_symmetric(struct session *sess, unsigned char *flag)
return 1;
}
+int session_mq_ignore_message(struct session *sess, int topic_id, int plugin_id)
+{
+ return 0;
+}
+
static void perf_test_init_per_session(struct session *sess)
{
- sess->context = _httpd_session_ctx_new_cb(sess, sess->st->plugin_env);
+ sess->context = httpd_session_ctx_new_cb(sess, sess->st->plugin_env);
}
static void perf_test_free_per_session(struct session *sess)
@@ -194,6 +211,7 @@ static void perf_test_loop(struct session *sess, struct packet *test_payload, in
TIME_START();
sess->current_payload_st = &test_payload[0];
perf_test_init_per_session(sess);
+ sess->sess_state = SESSION_STATE_OPENING;
for(int i = 0; i < test_payload_index_max; i++)
{
@@ -201,7 +219,9 @@ static void perf_test_loop(struct session *sess, struct packet *test_payload, in
http_decoder_tcp_stream_msg_cb(sess, sess->st->consumer_topid_id, test_payload[i].payload, NULL, sess->st->plugin_env);
TIME_DIFF();
fieldstat_easy_histogram_record(fs4_instance, 0, fs4_metric_id, tag, 1, time_diff_ns);
+ sess->sess_state = SESSION_STATE_ACTIVE;
}
+ sess->sess_state = SESSION_STATE_CLOSING;
perf_test_free_per_session(sess);
}
@@ -321,7 +341,7 @@ static void init_test_data_long_long_url(struct packet *test_payload, int *index
static void perf_stat_init(void)
{
fs4_instance = fieldstat_easy_new(1, "http_decoder_test", NULL, 0);
- fieldstat_easy_enable_auto_output(fs4_instance, "./httpd_fs4.json", 1);
+ fieldstat_easy_enable_auto_output(fs4_instance, "./httpd_hisgram.json", 1);
FS4_SIMPLE_HISGRAM_TAG.key = "simple";
FS4_SIMPLE_HISGRAM_TAG.type = TAG_DOUBLE;
diff --git a/test/http_decoder_perf_plug.cpp b/test/http_decoder_perf_plug.cpp
index f588683..c8c8771 100644
--- a/test/http_decoder_perf_plug.cpp
+++ b/test/http_decoder_perf_plug.cpp
@@ -28,8 +28,8 @@ extern "C" void http_decoder_perf_entry(struct session *sess, int topic_id, cons
struct http_request_line req_line = {0};
struct http_response_line res_line = {0};
struct http_header header = {0};
- struct hstring url = {0};
- struct hstring body = {0};
+ hstring url = {0};
+ hstring body = {0};
struct http_message *msg = (struct http_message *)raw_msg;
enum http_message_type msg_type = http_message_type_get(msg);
void *ret1, *ret2;
@@ -39,23 +39,23 @@ extern "C" void http_decoder_perf_entry(struct session *sess, int topic_id, cons
case HTTP_MESSAGE_REQ_LINE:
DEBUG_PRINT("---------------------------------------------------------------\n");
http_message_get_request_line(msg, &req_line);
- if (req_line.uri.str)
+ if (req_line.uri.iov_base)
{
- DEBUG_PRINT("req_line.method.str: %.*s\n", req_line.method.str_len, req_line.method.str);
- ret1 = memmem(req_line.method.str, req_line.method.str_len, "PUT", 3);
- DEBUG_PRINT("req_line.version.str: %.*s\n", req_line.version.str_len, req_line.version.str);
+ DEBUG_PRINT("req_line.method.iov_base: %.*s\n", req_line.method.iov_len, req_line.method.iov_base);
+ ret1 = memmem(req_line.method.iov_base, req_line.method.iov_len, "PUT", 3);
+ DEBUG_PRINT("req_line.version.iov_base: %.*s\n", req_line.version.iov_len, req_line.version.iov_base);
}
break;
case HTTP_MESSAGE_REQ_HEADER:
while (http_message_header_next(msg, &header) >= 0)
{
- ret1 = memmem(header.key.str, header.key.str_len, "key", 3);
- ret2 = memmem(header.val.str, header.val.str_len, "val", 3);
- DEBUG_PRINT("REQ header: %.*s : %.*s\n", (int)header.key.str_len, header.key.str, (int)header.val.str_len, header.val.str);
+ ret1 = memmem(header.key.iov_base, header.key.iov_len, "key", 3);
+ ret2 = memmem(header.val.iov_base, header.val.iov_len, "val", 3);
+ DEBUG_PRINT("REQ header: %.*s : %.*s\n", (int)header.key.iov_len, header.key.iov_base, (int)header.val.iov_len, header.val.iov_base);
}
http_message_get_url(msg, &url);
- if(url.str && url.str_len){
- DEBUG_PRINT("URL: %.*s\n", url.str_len, url.str);
+ if(url.iov_base && url.iov_len){
+ DEBUG_PRINT("URL: %.*s\n", url.iov_len, url.iov_base);
}
break;
case HTTP_MESSAGE_REQ_BODY:
@@ -63,29 +63,29 @@ extern "C" void http_decoder_perf_entry(struct session *sess, int topic_id, cons
// output_http_body(&body, 0);
http_message_get_decompress_body(msg, &body);
// output_http_body(&body, 1);
- ret1 = memmem(body.str, body.str_len, "</html>", 7);
+ ret1 = memmem(body.iov_base, body.iov_len, "</html>", 7);
break;
case HTTP_MESSAGE_RES_LINE:
http_message_get_response_line(msg, &res_line);
- ret1 = memmem(res_line.status.str, res_line.status.str_len, "OK", 2);
- DEBUG_PRINT("res_line.status.str: %.*s\n", (int)res_line.status.str_len, res_line.status.str);
+ ret1 = memmem(res_line.status.iov_base, res_line.status.iov_len, "OK", 2);
+ DEBUG_PRINT("res_line.status.iov_base: %.*s\n", (int)res_line.status.iov_len, res_line.status.iov_base);
break;
case HTTP_MESSAGE_RES_HEADER:
while (http_message_header_next(msg, &header) >= 0)
{
- ret1 = memmem(header.key.str, header.key.str_len, "key", 3);
- ret2 = memmem(header.val.str, header.val.str_len, "val", 3);
- DEBUG_PRINT("RES header: %.*s : %.*s\n", (int)header.key.str_len, header.key.str, (int)header.val.str_len, header.val.str);
+ ret1 = memmem(header.key.iov_base, header.key.iov_len, "key", 3);
+ ret2 = memmem(header.val.iov_base, header.val.iov_len, "val", 3);
+ DEBUG_PRINT("RES header: %.*s : %.*s\n", (int)header.key.iov_len, header.key.iov_base, (int)header.val.iov_len, header.val.iov_base);
}
break;
case HTTP_MESSAGE_RES_BODY:
http_message_get_raw_body(msg, &body);
- DEBUG_PRINT("res raw body: %.*s\n", body.str_len, body.str);
+ DEBUG_PRINT("res raw body: %.*s\n", body.iov_len, body.iov_base);
// output_http_body(&body, 0);
http_message_get_decompress_body(msg, &body);
// output_http_body(&body, 1);
- ret1 = memmem(body.str, body.str_len, "</html>", 7);
- DEBUG_PRINT("res unzip body: %.*s\n", body.str_len, body.str);
+ ret1 = memmem(body.iov_base, body.iov_len, "</html>", 7);
+ DEBUG_PRINT("res unzip body: %.*s\n", body.iov_len, body.iov_base);
DEBUG_PRINT("---------------------------------------------------------------\n");
break;
diff --git a/test/http_pcap/http_tunnel_for_http.pcap b/test/http_pcap/http_tunnel_for_http.pcap
new file mode 100644
index 0000000..0654576
--- /dev/null
+++ b/test/http_pcap/http_tunnel_for_http.pcap
Binary files differ
diff --git a/test/test_result_json/http_inner_tunnel_for_http.json b/test/test_result_json/http_inner_tunnel_for_http.json
new file mode 100644
index 0000000..973556a
--- /dev/null
+++ b/test/test_result_json/http_inner_tunnel_for_http.json
@@ -0,0 +1,15 @@
+[
+ {
+ "__X_HTTP_TUPLE4": "192.168.40.139.59234>192.168.38.83.8080"
+ },
+ {
+ "flow": "C2S",
+ "payload_block": 1,
+ "payload_size": 77
+ },
+ {
+ "flow": "S2C",
+ "payload_block": 3,
+ "payload_size": 2781
+ }
+] \ No newline at end of file
diff --git a/test/test_result_json/http_inner_tunnel_for_pop3.json b/test/test_result_json/http_inner_tunnel_for_pop3.json
new file mode 100644
index 0000000..b4f9784
--- /dev/null
+++ b/test/test_result_json/http_inner_tunnel_for_pop3.json
@@ -0,0 +1,15 @@
+[
+ {
+ "__X_HTTP_TUPLE4": "192.168.10.58.51798>192.168.10.144.808"
+ },
+ {
+ "flow": "C2S",
+ "payload_block": 6,
+ "payload_size": 68
+ },
+ {
+ "flow": "S2C",
+ "payload_block": 7,
+ "payload_size": 1737
+ }
+] \ No newline at end of file
diff --git a/test/test_result_json/http_msg_type_state_tunnel.json b/test/test_result_json/http_msg_type_state_tunnel.json
new file mode 100644
index 0000000..cdbad4e
--- /dev/null
+++ b/test/test_result_json/http_msg_type_state_tunnel.json
@@ -0,0 +1,26 @@
+[
+ {
+ "msg_0": "HTTP_TRANSACTION_NEW_transaction_0",
+ "msg_1": "HTTP_MESSAGE_REQ_LINE",
+ "msg_2": "HTTP_MESSAGE_REQ_HEADER",
+ "msg_3": "HTTP_MESSAGE_REQ_HEADER_END",
+ "msg_8": "HTTP_TRANSACTION_NEW_transaction_1",
+ "msg_9": "HTTP_MESSAGE_REQ_LINE",
+ "msg_10": "HTTP_MESSAGE_REQ_HEADER",
+ "msg_11": "HTTP_MESSAGE_REQ_HEADER_END"
+ },
+ {
+ "msg_4": "HTTP_MESSAGE_RES_LINE",
+ "msg_5": "HTTP_MESSAGE_RES_HEADER",
+ "msg_6": "HTTP_MESSAGE_RES_HEADER_END",
+ "msg_7": "HTTP_TRANSACTION_FREE_transaction_0",
+ "msg_12": "HTTP_MESSAGE_RES_LINE",
+ "msg_13": "HTTP_MESSAGE_RES_HEADER",
+ "msg_14": "HTTP_MESSAGE_RES_HEADER_END",
+ "msg_15": "HTTP_MESSAGE_RES_BODY",
+ "msg_16": "HTTP_MESSAGE_RES_BODY",
+ "msg_17": "HTTP_MESSAGE_RES_BODY",
+ "msg_18": "HTTP_MESSAGE_RES_BODY_END",
+ "msg_19": "HTTP_TRANSACTION_FREE_transaction_1"
+ }
+] \ No newline at end of file
diff --git a/test/test_result_json/http_tunnel_for_http.json b/test/test_result_json/http_tunnel_for_http.json
new file mode 100644
index 0000000..acc5828
--- /dev/null
+++ b/test/test_result_json/http_tunnel_for_http.json
@@ -0,0 +1,62 @@
+[
+ {
+ "__X_HTTP_TUPLE4": "192.168.40.139.59234>192.168.38.83.8080"
+ },
+ {
+ "__X_HTTP_TRANSACTION": "request",
+ "__X_HTTP_TRANSACTION_SEQ": 0,
+ "method": "CONNECT",
+ "uri": "www.baidu.com:80",
+ "req_version": "1.1",
+ "major_version": 1,
+ "minor_version": 1,
+ "Host": "www.baidu.com:80",
+ "User-Agent": "curl/7.29.0",
+ "Proxy-Connection": "Keep-Alive",
+ "__X_HTTP_URL": "www.baidu.com:80"
+ },
+ {
+ "__X_HTTP_TRANSACTION": "response",
+ "__X_HTTP_TRANSACTION_SEQ": 0,
+ "res_version": "1.1",
+ "res_status": "Connection established",
+ "major_version": 1,
+ "minor_version": 1,
+ "status_code": 200,
+ "Proxy-agent": "CCProxy"
+ },
+ {
+ "__X_HTTP_TRANSACTION": "request",
+ "__X_HTTP_TRANSACTION_SEQ": 1,
+ "method": "GET",
+ "uri": "/",
+ "req_version": "1.1",
+ "major_version": 1,
+ "minor_version": 1,
+ "User-Agent": "curl/7.29.0",
+ "Host": "www.baidu.com",
+ "Accept": "*/*",
+ "__X_HTTP_URL": "www.baidu.com/"
+ },
+ {
+ "__X_HTTP_TRANSACTION": "response",
+ "__X_HTTP_TRANSACTION_SEQ": 1,
+ "res_version": "1.1",
+ "res_status": "OK",
+ "major_version": 1,
+ "minor_version": 1,
+ "status_code": 200,
+ "Accept-Ranges": "bytes",
+ "Cache-Control": "private, no-cache, no-store, proxy-revalidate, no-transform",
+ "Connection": "keep-alive",
+ "Content-Length": "2381",
+ "Content-Type": "text/html",
+ "Date": "Mon, 08 Apr 2024 09:45:51 GMT",
+ "Etag": "\"588604c1-94d\"",
+ "Last-Modified": "Mon, 23 Jan 2017 13:27:29 GMT",
+ "Pragma": "no-cache",
+ "Server": "bfe/1.0.8.18",
+ "Set-Cookie": "BDORZ=27315; max-age=86400; domain=.baidu.com; path=/",
+ "__X_HTTP_PAYLOAD_MD5": "090fe607a5be1228362614ccaa088577"
+ }
+] \ No newline at end of file
diff --git a/test/test_result_json/http_tunnel_for_pop3.json b/test/test_result_json/http_tunnel_for_pop3.json
index 9bf2f41..a69ac7b 100644
--- a/test/test_result_json/http_tunnel_for_pop3.json
+++ b/test/test_result_json/http_tunnel_for_pop3.json
@@ -17,7 +17,7 @@
"Proxy-Connection": "Keep-Alive",
"Connection": "Keep-Alive",
"Host": "192.168.10.144",
- "__X_HTTP_URL": "192.168.10.144/pop.163.com:110"
+ "__X_HTTP_URL": "pop.163.com:110"
},
{
"__X_HTTP_TRANSACTION": "response",
@@ -27,7 +27,6 @@
"major_version": 1,
"minor_version": 1,
"status_code": 200,
- "Proxy-agent": "CCProxy",
- "__X_HTTP_PAYLOAD_MD5": "8a9716c4f06c88bb9482f50c25b39032"
+ "Proxy-agent": "CCProxy"
}
] \ No newline at end of file
diff --git a/test_based_on_stellar/CMakeLists.txt b/test_based_on_stellar/CMakeLists.txt
index 29d53ac..64b0129 100644
--- a/test_based_on_stellar/CMakeLists.txt
+++ b/test_based_on_stellar/CMakeLists.txt
@@ -41,7 +41,8 @@ add_test(NAME STELLAR_COPY_HTTP_GTEST_ENTRY_CONF COMMAND sh -c "mkdir -p ${TEST_
add_test(NAME STELLAR_UPDATE_SAPP_LOG_LEVEL COMMAND bash -c "sed -i 's/sapp_log.fatal/sapp_log.info/' ${TEST_RUN_DIR}/etc/sapp_log.conf")
add_test(NAME STELLAR_UPDATE_SAPP_SYN_MODE COMMAND bash -c "sed -i 's/syn_mandatory=1/syn_mandatory=0/' ${TEST_RUN_DIR}/etc/sapp.toml")
add_test(NAME STELLAR_ENABLE_SAPP_MONITOR COMMAND bash -c "sed -i 's/monitor_thread_enabled=0/monitor_thread_enabled=1/' ${TEST_RUN_DIR}/etc/sapp.toml")
-add_test(NAME UPDATE_GTEST_PLUG_ENTRY COMMAND bash -c "sed -i 's/name=.*/name=\\x22http_decoder_test_entry\\x22/' ${TEST_RUN_DIR}/etc/http/gtest_entry.toml")
+add_test(NAME UPDATE_GTEST_PLUG_ENTRY COMMAND bash -c "sed -i 's/name=.*/name=\\x22http_decoder_test_entry\\x22/' ${TEST_RUN_DIR}/etc/http/gtest_entry.toml")
+add_test(NAME UPDATE_GTEST_PLUG_TOPIC COMMAND bash -c "sed -i 's/topic=.*/topic=\\x22HTTP_DECODER_MESSAGE\\x22/' ${TEST_RUN_DIR}/etc/http/gtest_entry.toml")
# update plugin to be tested
# add_test(NAME STELLAR_ON_SAPP_SO COMMAND sh -c "cp ${STELLAR_ON_SAPP_LIB_DIR}/stellar_on_sapp.so ${TEST_RUN_DIR}/plug/stellar_on_sapp/stellar_on_sapp.so")
@@ -50,8 +51,9 @@ add_test(NAME STELLAR_HTTP_DECODER_SO COMMAND sh -c "cp ${CMAKE_BINARY_DIR}/src/
add_test(NAME STELLAR_HTTP_DECODER_GTEST_SO COMMAND sh -c "cp ${CMAKE_BINARY_DIR}/test/${DECODER_NAME}_test.so ${TEST_RUN_DIR}/stellar_plugin/${DECODER_NAME}_test.so")
set_tests_properties(STELLAR_INSTALL_TEST_MAIN STELLAR_COPY_SPEC STELLAR_COPY_CONFLIST STELLAR_COPY_START_LOADER_INF STELLAR_COPY_DEFER_LOADER_INF STELLAR_COPY_HTTP_DECODER_CONF STELLAR_COPY_HTTP_GTEST_ENTRY_CONF
- STELLAR_ON_SAPP_SO STELLAR_HTTP_DECODER_SO STELLAR_HTTP_DECODER_GTEST_SO STELLAR_UPDATE_SAPP_LOG_LEVEL STELLAR_UPDATE_SAPP_SYN_MODE STELLAR_ENABLE_SAPP_MONITOR UPDATE_GTEST_PLUG_ENTRY
- PROPERTIES FIXTURES_SETUP TestFixture)
+ STELLAR_ON_SAPP_SO STELLAR_HTTP_DECODER_SO STELLAR_HTTP_DECODER_GTEST_SO STELLAR_UPDATE_SAPP_LOG_LEVEL STELLAR_UPDATE_SAPP_SYN_MODE STELLAR_ENABLE_SAPP_MONITOR
+ UPDATE_GTEST_PLUG_ENTRY UPDATE_GTEST_PLUG_TOPIC
+ PROPERTIES FIXTURES_SETUP TestFixture)
set(TEST_JSON_DIR ${PROJECT_SOURCE_DIR}/test/test_result_json)
set(TEST_PCAP_DIR ${PROJECT_SOURCE_DIR}/test/http_pcap)
@@ -78,9 +80,12 @@ add_test(NAME STELLAR_HTTP_CHUNKED_RES_GZIP_TEST COMMAND ./${TEST_MAIN} ${TEST_
add_test(NAME STELLAR_HTTP_OVER_TCP_KEEPALIVE_TEST COMMAND ./${TEST_MAIN} ${TEST_JSON_DIR}/http_over_tcp_keepalive.json
-r ${TEST_PCAP_DIR}/http_over_tcp_keepalive.pcap WORKING_DIRECTORY ${TEST_RUN_DIR})
-add_test(NAME STELLAR_HTTP_TUNNEL_FOR_POP3_TEST COMMAND ./${TEST_MAIN} ${TEST_JSON_DIR}/http_tunnel_for_pop3.json
+add_test(NAME STELLAR_HTTP_TUNNEL_FOR_POP3_TEST COMMAND ./${TEST_MAIN} ${TEST_JSON_DIR}/http_tunnel_for_pop3.json
-r ${TEST_PCAP_DIR}/http_tunnel_for_pop3.pcap WORKING_DIRECTORY ${TEST_RUN_DIR})
+add_test(NAME STELLAR_HTTP_TUNNEL_FOR_HTTP_TEST COMMAND ./${TEST_MAIN} ${TEST_JSON_DIR}/http_tunnel_for_http.json
+ -r ${TEST_PCAP_DIR}/http_tunnel_for_http.pcap WORKING_DIRECTORY ${TEST_RUN_DIR})
+
add_test(NAME STELLAR_HTTP_OVER_PPPOE_TEST COMMAND ./${TEST_MAIN} ${TEST_JSON_DIR}/http_over_pppoe.json
-r ${TEST_PCAP_DIR}/http_over_pppoe.pcap WORKING_DIRECTORY ${TEST_RUN_DIR})
@@ -140,6 +145,7 @@ set_tests_properties(STELLAR_HTTP_GET_SINGLE_TRANS_TEST
STELLAR_HTTP_CHUNKED_RES_GZIP_TEST
STELLAR_HTTP_OVER_TCP_KEEPALIVE_TEST
STELLAR_HTTP_TUNNEL_FOR_POP3_TEST
+ STELLAR_HTTP_TUNNEL_FOR_HTTP_TEST
STELLAR_HTTP_OVER_PPPOE_TEST
STELLAR_HTTP_OVER_TLS_TEST
STELLAR_NON_HTTP_TEST
@@ -160,8 +166,9 @@ set_tests_properties(STELLAR_HTTP_GET_SINGLE_TRANS_TEST
PROPERTIES FIXTURES_REQUIRED TestFixture)
add_test(NAME UPDATE_STATE_PLUG_ENTRY COMMAND bash -c "sed -i 's/name=.*/name=\\x22http_decoder_test_state_entry\\x22/' ${TEST_RUN_DIR}/etc/http/gtest_entry.toml")
+add_test(NAME UPDATE_STATE_PLUG_TOPIC COMMAND bash -c "sed -i 's/topic=.*/topic=\\x22HTTP_DECODER_MESSAGE\\x22/' ${TEST_RUN_DIR}/etc/http/gtest_entry.toml")
-set_tests_properties(UPDATE_STATE_PLUG_ENTRY
+set_tests_properties(UPDATE_STATE_PLUG_ENTRY UPDATE_STATE_PLUG_TOPIC
PROPERTIES FIXTURES_SETUP TestState)
add_test(NAME STELLAR_HTTP_MSG_TYPE_STATE_TEST COMMAND ./${TEST_MAIN} ${TEST_JSON_DIR}/http_msg_type_state.json
@@ -176,10 +183,29 @@ add_test(NAME STELLAR_HTTP_MSG_TYPE_STATE_SES_EXCEPTION_C2S_TEST COMMAND ./${TE
-r ${TEST_PCAP_DIR}/http_session_exception_c2s.pcap WORKING_DIRECTORY ${TEST_RUN_DIR})
add_test(NAME STELLAR_HTTP_MSG_TYPE_STATE_SES_EXCEPTION_S2C_TEST COMMAND ./${TEST_MAIN} ${TEST_JSON_DIR}/http_msg_type_state_exception_s2c.json
-r ${TEST_PCAP_DIR}/http_session_exception_s2c.pcap WORKING_DIRECTORY ${TEST_RUN_DIR})
+add_test(NAME STELLAR_HTTP_MSG_TYPE_STATE_TUNNEL_TEST COMMAND ./${TEST_MAIN} ${TEST_JSON_DIR}/http_msg_type_state_tunnel.json
+ -r ${TEST_PCAP_DIR}/http_tunnel_for_http.pcap WORKING_DIRECTORY ${TEST_RUN_DIR})
set_tests_properties(STELLAR_HTTP_MSG_TYPE_STATE_TEST
STELLAR_HTTP_MSG_TYPE_STATE_C2S_TEST
STELLAR_HTTP_MSG_TYPE_STATE_S2C_TEST
STELLAR_HTTP_MSG_TYPE_STATE_PIPELINE_TEST
STELLAR_HTTP_MSG_TYPE_STATE_SES_EXCEPTION_C2S_TEST
STELLAR_HTTP_MSG_TYPE_STATE_SES_EXCEPTION_S2C_TEST
- PROPERTIES FIXTURES_REQUIRED TestState) \ No newline at end of file
+ STELLAR_HTTP_MSG_TYPE_STATE_TUNNEL_TEST
+ PROPERTIES FIXTURES_REQUIRED TestState)
+
+add_test(NAME UPDATE_TUNNEL_PLUG_ENTRY COMMAND bash -c "sed -i 's/name=.*/name=\\x22http_decoder_tunnel_entry\\x22/' ${TEST_RUN_DIR}/etc/http/gtest_entry.toml")
+add_test(NAME UPDATE_TUNNEL_PLUG_TOPIC COMMAND bash -c "sed -i 's/topic=.*/topic=\\x22HTTP_DECODER_TUNNEL_MESSAGE\\x22/' ${TEST_RUN_DIR}/etc/http/gtest_entry.toml")
+
+set_tests_properties(UPDATE_TUNNEL_PLUG_ENTRY UPDATE_TUNNEL_PLUG_TOPIC
+ STELLAR_HTTP_DECODER_SO STELLAR_HTTP_DECODER_GTEST_SO
+ PROPERTIES FIXTURES_SETUP TestTunnel)
+
+add_test(NAME STELLAR_HTTP_MSG_TYPE_TUNNEL_POP3_TEST COMMAND ./${TEST_MAIN} ${TEST_JSON_DIR}/http_inner_tunnel_for_pop3.json
+ -r ${TEST_PCAP_DIR}/http_tunnel_for_pop3.pcap WORKING_DIRECTORY ${TEST_RUN_DIR})
+add_test(NAME STELLAR_HTTP_MSG_TYPE_TUNNEL_HTTP_TEST COMMAND ./${TEST_MAIN} ${TEST_JSON_DIR}/http_inner_tunnel_for_http.json
+ -r ${TEST_PCAP_DIR}/http_tunnel_for_http.pcap WORKING_DIRECTORY ${TEST_RUN_DIR})
+
+set_tests_properties(STELLAR_HTTP_MSG_TYPE_TUNNEL_POP3_TEST
+ STELLAR_HTTP_MSG_TYPE_TUNNEL_HTTP_TEST
+ PROPERTIES FIXTURES_REQUIRED TestTunnel) \ No newline at end of file