summaryrefslogtreecommitdiff
path: root/src/http_decoder/http_decoder.c
diff options
context:
space:
mode:
authorliuwentan <[email protected]>2023-12-14 22:13:55 +0800
committerliuwentan <[email protected]>2023-12-14 22:13:55 +0800
commitf29d9e463fc71c7d553a85c519251155882cf2ed (patch)
tree73bed7c18593613451d33b20845b64cac1a0d08b /src/http_decoder/http_decoder.c
parenta4254d7b60b1aee385c1531ab8c6f7f69b074b90 (diff)
[PATCH]fix pipeline error
Diffstat (limited to 'src/http_decoder/http_decoder.c')
-rw-r--r--src/http_decoder/http_decoder.c221
1 files changed, 143 insertions, 78 deletions
diff --git a/src/http_decoder/http_decoder.c b/src/http_decoder/http_decoder.c
index ce767fb..dd6850d 100644
--- a/src/http_decoder/http_decoder.c
+++ b/src/http_decoder/http_decoder.c
@@ -30,12 +30,12 @@
const char *http_decoder_topic = "HTTP_DECODER_MESSAGE";
struct http_decoder_result {
- struct session *ref_session;
struct http_decoder_half_data *req_data;
struct http_decoder_half_data *res_data;
};
struct http_decoder_result_queue {
+ struct session *ref_session;
size_t req_index;
size_t res_index;
size_t del_index;
@@ -56,13 +56,19 @@ struct http_decoder {
struct http_decoder_half *s2c_half;
};
+struct http_event_context {
+ int topic_id;
+ struct session *ref_session;
+ struct http_decoder_result_queue *ref_queue;
+};
+
struct http_decoder_context {
int plugin_id;
int topic_id;
int ex_data_idx;
struct stellar *st;
struct http_decoder *decoder;
- struct http_decoder_result_queue *ref_queue;
+ struct http_event_context *http_ev_ctx;
};
struct http_decoder_result *http_decoder_result_new()
@@ -156,32 +162,40 @@ static void http_decoder_result_queue_gc(struct http_decoder_result_queue *queue
static void http_event_handler(enum http_event event,
struct http_decoder_half_data **data,
- void *cb_args)
+ void *http_ev_ctx)
{
- struct http_decoder_context *ctx = (struct http_decoder_context *)cb_args;
+ struct http_event_context *ctx = (struct http_event_context *)http_ev_ctx;
assert(ctx);
- printf("ctx->topic_id:%d\n", ctx->topic_id);
- assert(ctx->ref_queue);
struct http_decoder_result_queue *queue = ctx->ref_queue;
+ struct http_message *msg = NULL;
switch (event) {
- case HTTP_EVENT_INIT:
- *data = http_decoder_half_data_new();
+ case HTTP_EVENT_REQ_INIT:
+ queue->array[queue->req_index]->req_data = http_decoder_half_data_new();
+ *data = queue->array[queue->req_index]->req_data;
break;
case HTTP_EVENT_REQ_LINE:
- if (NULL == queue->array[queue->req_index]) {
- http_decoder_result_queue_push(queue, queue->req_index);
- }
- queue->array[queue->req_index]->req_data = *data;
+ msg = CALLOC(struct http_message, 1);
+ msg->type = HTTP_MESSAGE_REQ_LINE;
+ msg->data = *data;
+ session_mq_publish_message(ctx->ref_session, ctx->topic_id, msg);
break;
case HTTP_EVENT_REQ_HDR:
break;
case HTTP_EVENT_REQ_HDR_END:
+ msg = CALLOC(struct http_message, 1);
+ msg->type = HTTP_MESSAGE_REQ_HEADER_END;
+ msg->data = *data;
+ session_mq_publish_message(ctx->ref_session, ctx->topic_id, msg);
break;
case HTTP_EVENT_REQ_BODY_BEGIN:
break;
case HTTP_EVENT_REQ_BODY_DATA:
+ msg = CALLOC(struct http_message, 1);
+ msg->type = HTTP_MESSAGE_REQ_BODY;
+ msg->data = *data;
+ session_mq_publish_message(ctx->ref_session, ctx->topic_id, msg);
break;
case HTTP_EVENT_REQ_BODY_END:
break;
@@ -189,16 +203,31 @@ static void http_event_handler(enum http_event event,
http_decoder_result_queue_inc_req_index(queue);
http_decoder_result_queue_gc(queue, queue->req_index);
break;
+ case HTTP_EVENT_RES_INIT:
+ queue->array[queue->res_index]->res_data = http_decoder_half_data_new();
+ *data = queue->array[queue->res_index]->res_data;
+ break;
case HTTP_EVENT_RES_LINE:
- queue->array[queue->res_index]->res_data = *data;
+ msg = CALLOC(struct http_message, 1);
+ msg->type = HTTP_MESSAGE_RES_LINE;
+ msg->data = *data;
+ session_mq_publish_message(ctx->ref_session, ctx->topic_id, msg);
break;
case HTTP_EVENT_RES_HDR:
break;
case HTTP_EVENT_RES_HDR_END:
+ msg = CALLOC(struct http_message, 1);
+ msg->type = HTTP_MESSAGE_RES_HEADER_END;
+ msg->data = *data;
+ session_mq_publish_message(ctx->ref_session, ctx->topic_id, msg);
break;
case HTTP_EVENT_RES_BODY_BEGIN:
break;
case HTTP_EVENT_RES_BODY_DATA:
+ msg = CALLOC(struct http_message, 1);
+ msg->type = HTTP_MESSAGE_RES_BODY;
+ msg->data = *data;
+ session_mq_publish_message(ctx->ref_session, ctx->topic_id, msg);
break;
case HTTP_EVENT_RES_BODY_END:
break;
@@ -210,28 +239,35 @@ static void http_event_handler(enum http_event event,
}
}
-static struct http_decoder *http_decoder_new()
+static struct http_decoder *http_decoder_new(http_event_cb *ev_cb, int is_cache_line,
+ int is_cache_header, int is_cache_body)
{
struct http_decoder *decoder = CALLOC(struct http_decoder, 1);
assert(decoder);
- decoder->c2s_half = http_decoder_half_new();
- decoder->s2c_half = http_decoder_half_new();
+ decoder->c2s_half = http_decoder_half_new(ev_cb, is_cache_line, is_cache_header, is_cache_body);
+ decoder->s2c_half = http_decoder_half_new(ev_cb, is_cache_line, is_cache_header, is_cache_body);
return decoder;
}
-void http_decoder_reinit(struct http_decoder *decoder, void *args)
+void http_decoder_free(struct http_decoder *decoder)
{
- http_decoder_half_init(decoder->c2s_half, http_event_handler, args,
- HTTP_DECODER_IS_CACHE_LINE,
- HTTP_DECODER_IS_CACHE_BODY,
- HTTP_DECODER_IS_CACHE_HEADER);
-
- http_decoder_half_init(decoder->s2c_half, http_event_handler, args,
- HTTP_DECODER_IS_CACHE_LINE,
- HTTP_DECODER_IS_CACHE_BODY,
- HTTP_DECODER_IS_CACHE_HEADER);
+ if (NULL == decoder) {
+ return;
+ }
+
+ if (decoder->c2s_half != NULL) {
+ http_decoder_half_free(decoder->c2s_half);
+ decoder->c2s_half = NULL;
+ }
+
+ if (decoder->s2c_half != NULL) {
+ http_decoder_half_free(decoder->s2c_half);
+ decoder->s2c_half = NULL;
+ }
+
+ FREE(decoder);
}
static struct http_decoder_result_queue *http_decoder_result_queue_new(size_t queue_size)
@@ -247,6 +283,10 @@ static struct http_decoder_result_queue *http_decoder_result_queue_new(size_t qu
queue->array = CALLOC(struct http_decoder_result *, queue->queue_size);
assert(queue->array);
+ for (size_t i = 0; i < queue->queue_size; i++) {
+ queue->array[i] = CALLOC(struct http_decoder_result, 1);
+ }
+
return queue;
}
@@ -270,21 +310,6 @@ static void http_decoder_result_queue_free(struct http_decoder_result_queue *que
FREE(queue);
}
-static struct http_decoder_context *http_decoder_context_new()
-{
- struct http_decoder_context *ctx = CALLOC(struct http_decoder_context, 1);
- assert(ctx);
-
- return ctx;
-}
-
-static void http_decoder_context_free(struct http_decoder_context *ctx)
-{
- if (NULL == ctx) {
- return;
- }
-}
-
static int http_protocol_identify(const char *data, size_t data_len)
{
enum llhttp_type type = HTTP_BOTH;
@@ -303,32 +328,33 @@ static int http_protocol_identify(const char *data, size_t data_len)
return 0;
}
-int http_decoder_entry(struct session *s, int events, const struct packet *pkt, void *cb_arg)
+int http_decoder_entry(struct session *sess, int events, const struct packet *pkt, void *cb_arg)
{
int ret = 0;
size_t payload_len = 0;
struct http_decoder_context *ctx = (struct http_decoder_context *)cb_arg;
- const char *payload = session_get0_current_payload(s, &payload_len);
- size_t http_identify_len = payload_len > HTTP_IDENTIFY_LEN ? HTTP_IDENTIFY_LEN : payload_len;
+ const char *payload = session_get0_current_payload(sess, &payload_len);
+ if (NULL == payload || 0 == payload_len) {
+ return 0;
+ }
- struct http_decoder_result_queue *queue = session_get_ex_data(s, ctx->ex_data_idx);
+ struct http_decoder_result_queue *queue = session_get_ex_data(sess, ctx->ex_data_idx);
if (NULL == queue) {
+ size_t http_identify_len = payload_len > HTTP_IDENTIFY_LEN ? HTTP_IDENTIFY_LEN : payload_len;
ret = http_protocol_identify(payload, http_identify_len);
- printf("http_protocol_identify ret:%d\n", ret);
if (ret < 0) {
//ignore this session's event
- struct session_event *s_event = session_get_intrinsic_event(s, ctx->plugin_id);
- session_event_assign(s_event, ctx->st, s, 0, http_decoder_entry, ctx);
+ struct session_event *s_event = session_get_intrinsic_event(sess, ctx->plugin_id);
+ session_event_assign(s_event, ctx->st, sess, 0, http_decoder_entry, ctx);
return 0;
}
queue = http_decoder_result_queue_new(HTTP_DECODER_RESULT_QUEUE_SIZE);
- session_set_ex_data(s, ctx->ex_data_idx, queue);
- }
+ queue->ref_session = sess;
+ session_set_ex_data(sess, ctx->ex_data_idx, queue);
+ }
- ctx->ref_queue = queue;
-
int dir = packet_get_direction(pkt);
if (dir < 0) {
return -1;
@@ -341,30 +367,30 @@ int http_decoder_entry(struct session *s, int events, const struct packet *pkt,
// printf("\n-------------------------------------------\n");
if (NULL == ctx->decoder) {
- ctx->decoder = http_decoder_new();
- http_decoder_reinit(ctx->decoder, ctx);
+ ctx->decoder = http_decoder_new(http_event_handler, HTTP_DECODER_IS_CACHE_LINE,
+ HTTP_DECODER_IS_CACHE_HEADER, HTTP_DECODER_IS_CACHE_BODY);
}
struct http_decoder_half *cur_half = NULL;
- if (dir == PACKET_DIRECTION_C2S) {
- cur_half = ctx->decoder->c2s_half;
- } else {
- cur_half = ctx->decoder->s2c_half;
- }
+ if (dir == PACKET_DIRECTION_C2S) {
+ cur_half = ctx->decoder->c2s_half;
+ } else {
+ cur_half = ctx->decoder->s2c_half;
+ }
- ret = http_decoder_half_parse(cur_half, payload, payload_len);
- if (ret < 0) {
- if (dir == PACKET_DIRECTION_C2S) {
+ ctx->http_ev_ctx->topic_id = ctx->topic_id;
+ ctx->http_ev_ctx->ref_queue = queue;
+ ctx->http_ev_ctx->ref_session = sess;
+
+ ret = http_decoder_half_parse(cur_half, ctx->http_ev_ctx, payload, payload_len);
+ if (ret < 0) {
+ if (dir == PACKET_DIRECTION_C2S) {
http_decoder_result_queue_pop(queue, queue->req_index);
} else {
http_decoder_result_queue_pop(queue, queue->res_index);
}
}
- // struct http_message *msg = CALLOC(struct http_message, 1);
- // msg->type = HTTP_MESSAGE_REQ_LINE;
- // session_mq_publish_message(s, ctx->topic_id, msg);
-
return 0;
}
@@ -389,8 +415,9 @@ void http_message_free(void *msg, void *cb_arg)
void *http_decoder_init(struct stellar *st)
{
- struct http_decoder_context *ctx = http_decoder_context_new();
+ struct http_decoder_context *ctx = CALLOC(struct http_decoder_context, 1);
+ ctx->http_ev_ctx = CALLOC(struct http_event_context, 1);
ctx->st = st;
ctx->ex_data_idx = stellar_session_get_ex_new_index(st, "HTTP_DECODER",
http_decoder_ex_data_free, NULL);
@@ -411,13 +438,23 @@ void *http_decoder_init(struct stellar *st)
return ctx;
}
-void http_decoder_exit(void *ctx)
+void http_decoder_exit(void *decoder_ctx)
{
- if (NULL == ctx) {
+ if (NULL == decoder_ctx) {
return;
}
- FREE(ctx);
+ struct http_decoder_context *ctx = (struct http_decoder_context *)decoder_ctx;
+ if (ctx->http_ev_ctx != NULL) {
+ FREE(ctx->http_ev_ctx);
+ }
+
+ if (ctx->decoder != NULL) {
+ http_decoder_free(ctx->decoder);
+ ctx->decoder = NULL;
+ }
+
+ FREE(decoder_ctx);
}
enum http_message_type http_message_type(struct http_message *msg)
@@ -454,7 +491,7 @@ int http_message_get_request_header(struct http_message *msg, struct hstring *ke
{
if (NULL == msg ||
(msg->type != HTTP_MESSAGE_REQ_HEADER && msg->type != HTTP_MESSAGE_REQ_HEADER_END) ||
- NULL == header_array || 0 == array_size) {
+ NULL == key || NULL == header_array || 0 == array_size) {
return -1;
}
@@ -466,7 +503,7 @@ int http_message_get_response_header(struct http_message *msg, struct hstring *k
{
if (NULL == msg ||
(msg->type != HTTP_MESSAGE_RES_HEADER && msg->type != HTTP_MESSAGE_RES_HEADER_END) ||
- NULL == header_array || 0 == array_size) {
+ NULL == key || NULL == header_array || 0 == array_size) {
return -1;
}
@@ -475,30 +512,58 @@ int http_message_get_response_header(struct http_message *msg, struct hstring *k
int http_message_request_header_next(struct http_message *msg, struct http_header *header)
{
- return 0;
+ if (NULL == msg ||
+ (msg->type != HTTP_MESSAGE_REQ_HEADER && msg->type != HTTP_MESSAGE_REQ_HEADER_END) ||
+ NULL == header) {
+ return -1;
+ }
+
+ return http_decoder_half_data_iter_header(msg->data, header);
}
int http_message_response_header_next(struct http_message *msg, struct http_header *header)
{
- return 0;
+ if (NULL == msg ||
+ (msg->type != HTTP_MESSAGE_RES_HEADER && msg->type != HTTP_MESSAGE_RES_HEADER_END) ||
+ NULL == header) {
+ return -1;
+ }
+
+ return http_decoder_half_data_iter_header(msg->data, header);
}
-void http_message_get_request_raw_body(struct http_message *msg, struct hstring *body)
+int http_message_get_request_raw_body(struct http_message *msg, struct hstring *body)
{
+ if (NULL == msg || (msg->type != HTTP_MESSAGE_REQ_BODY) || NULL == body) {
+ return -1;
+ }
+ return http_decoder_half_data_get_raw_body(msg->data, body);
}
-void http_message_get_response_raw_body(struct http_message *msg, struct hstring *body)
+int http_message_get_response_raw_body(struct http_message *msg, struct hstring *body)
{
+ if (NULL == msg || (msg->type != HTTP_MESSAGE_RES_BODY) || NULL == body) {
+ return -1;
+ }
+ return http_decoder_half_data_get_raw_body(msg->data, body);
}
-void http_message_get_request_decompress_body(struct http_message *msg, struct hstring *body)
+int http_message_get_request_decompress_body(struct http_message *msg, struct hstring *body)
{
+ if (NULL == msg || (msg->type != HTTP_MESSAGE_REQ_BODY) || NULL == body) {
+ return -1;
+ }
+ return http_decoder_half_data_get_decompress_body(msg->data, body);
}
-void http_message_get_response_decompress_body(struct http_message *msg, struct hstring *body)
+int http_message_get_response_decompress_body(struct http_message *msg, struct hstring *body)
{
+ if (NULL == msg || (msg->type != HTTP_MESSAGE_RES_BODY) || NULL == body) {
+ return -1;
+ }
+ return http_decoder_half_data_get_decompress_body(msg->data, body);
} \ No newline at end of file