summaryrefslogtreecommitdiff
path: root/decoders/http/http_decoder_module.c
diff options
context:
space:
mode:
Diffstat (limited to 'decoders/http/http_decoder_module.c')
-rw-r--r--decoders/http/http_decoder_module.c299
1 files changed, 299 insertions, 0 deletions
diff --git a/decoders/http/http_decoder_module.c b/decoders/http/http_decoder_module.c
new file mode 100644
index 0000000..53ff71b
--- /dev/null
+++ b/decoders/http/http_decoder_module.c
@@ -0,0 +1,299 @@
+#include <stdio.h>
+#include <assert.h>
+#include "stellar/session.h"
+#include "stellar/module.h"
+#include "http_decoder_utils.h"
+#include "http_decoder_half.h"
+#include "http_decoder.h"
+#include "toml/toml.h"
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+ __thread struct http_topic_manager *chaotic_http_topic_mgr;
+
+#define HTTP_TOPIC_NAME_REQ_HDR "HTTP_TOPIC_REQ_HDR"
+#define HTTP_TOPIC_NAME_REQ_BODY "HTTP_TOPIC_REQ_BODY"
+#define HTTP_TOPIC_NAME_RES_HDR "HTTP_TOPIC_RES_HDR"
+#define HTTP_TOPIC_NAME_RES_BODY "HTTP_TOPIC_RES_BODY"
+
+ static void http_set_default_config(struct http_config *hd_cfg)
+ {
+ hd_cfg->decompress_switch = 1;
+ }
+ static int http_load_config(const char *cfg_path, struct http_config *hd_cfg)
+ {
+ FILE *fp = fopen(cfg_path, "r");
+ if (NULL == fp)
+ {
+ fprintf(stderr, "[%s]Can't open config file:%s", __FUNCTION__, cfg_path);
+ return -1;
+ }
+ int ret = 0;
+ char errbuf[256] = {0};
+ toml_table_t *root = toml_parse_file(fp, errbuf, sizeof(errbuf));
+ fclose(fp);
+
+ toml_table_t *section = toml_table_in(root, "http");
+ if (section == NULL)
+ {
+ fprintf(stderr, "(logger) config file %s missing 'http' section\n", cfg_path);
+ goto error_exit;
+ }
+
+ toml_datum_t int_val = toml_int_in(section, "decompress_enable");
+ if (int_val.ok != 0)
+ {
+ hd_cfg->decompress_switch = int_val.u.b;
+ }
+ error_exit:
+ toml_free(root);
+ return ret;
+ }
+
+ static void http_update_header_array(struct http_half_data *flow_data)
+ {
+ if (0 == http_get_header_field_count(flow_data))
+ {
+ flow_data->header.field_array = NULL;
+ flow_data->header.field_array_num = 0;
+ }
+ else
+ {
+ flow_data->header.field_array = (struct http_header_field *)utarray_front(flow_data->ut_filed_array);
+ flow_data->header.field_array_num = utarray_len(flow_data->ut_filed_array);
+ }
+ }
+
+ static void http_on_msg_dispatch(int topic_id UNUSED, void *msg, on_msg_cb_func *on_msg_cb,
+ void *on_msg_cb_arg, void *dispatch_arg UNUSED)
+ {
+ assert(msg != NULL && on_msg_cb != NULL);
+ struct http_message *hmsg = (struct http_message *)msg;
+ struct http_half_data *flow_data = hmsg->flow_data;
+
+ if (hmsg->event == HTTP_EVENT_REQ_END || hmsg->event == HTTP_EVENT_RES_END)
+ {
+ /* notify subscriber ? */
+ return;
+ }
+ switch (hmsg->topic_type)
+ {
+ case HTTP_TOPIC_REQ_HEADER:
+ {
+ http_update_header_array(flow_data);
+ http_on_request_header_cb *on_req_hdr_cb = (http_on_request_header_cb *)((void *)on_msg_cb);
+ on_req_hdr_cb(hmsg->sess_ref, flow_data->transaction_seq, &flow_data->req_line, &flow_data->header,
+ (const char *)flow_data->joint_url.iov_base, flow_data->joint_url.iov_len,
+ on_msg_cb_arg);
+ }
+ break;
+ case HTTP_TOPIC_RES_HEADER:
+ {
+ http_update_header_array(flow_data);
+ http_on_response_header_cb *on_res_hdr_cb = (http_on_response_header_cb *)((void *)on_msg_cb);
+ on_res_hdr_cb(hmsg->sess_ref, flow_data->transaction_seq, &flow_data->status_line, &flow_data->header, on_msg_cb_arg);
+ }
+ break;
+ case HTTP_TOPIC_REQ_BODY:
+ case HTTP_TOPIC_RES_BODY:
+ {
+ http_on_body_cb *on_body_cb = (http_on_body_cb *)((void *)on_msg_cb);
+ if (flow_data->decompress_body.body != NULL || flow_data->decompress_body.offset > 0)
+ {
+ on_body_cb(hmsg->sess_ref, flow_data->decompress_body.body, flow_data->decompress_body.body_sz, flow_data->decompress_body.offset,
+ flow_data->transaction_seq, flow_data->decompress_body.is_finished, on_msg_cb_arg);
+ }
+ else
+ {
+ on_body_cb(hmsg->sess_ref, flow_data->raw_body.body, flow_data->raw_body.body_sz, flow_data->raw_body.offset,
+ flow_data->transaction_seq, flow_data->raw_body.is_finished, on_msg_cb_arg);
+ }
+ }
+ break;
+ default:
+ assert(0);
+ break;
+ }
+ }
+
+ static int http_create_topic_nx(struct module_manager *mod_mgr, const char *topic_name)
+ {
+ struct mq_schema *mq_s = module_manager_get_mq_schema(mod_mgr);
+ assert(mq_s != NULL);
+ int quic_topic_id = mq_schema_get_topic_id(mq_s, topic_name);
+ if (quic_topic_id >= 0)
+ {
+ return quic_topic_id;
+ }
+ int topic_id = mq_schema_create_topic(mq_s, topic_name, (on_msg_dispatch_cb_func *)http_on_msg_dispatch,
+ NULL, http_message_free_cb, NULL);
+ return topic_id;
+ }
+
+ struct http_topic_manager *http_topic_mgr_init(struct module_manager *mod_mgr)
+ {
+ if (chaotic_http_topic_mgr != NULL)
+ {
+ return chaotic_http_topic_mgr;
+ }
+ struct http_topic_manager *http_topic_mgr = (struct http_topic_manager *)calloc(1, sizeof(struct http_topic_manager));
+ assert(http_topic_mgr != NULL);
+ struct http_topic_compose *topic_compose = http_topic_mgr->topic_compose;
+
+ topic_compose[HTTP_TOPIC_REQ_HEADER].topic_name = HTTP_TOPIC_NAME_REQ_HDR;
+ topic_compose[HTTP_TOPIC_REQ_BODY].topic_name = HTTP_TOPIC_NAME_REQ_BODY;
+ topic_compose[HTTP_TOPIC_RES_HEADER].topic_name = HTTP_TOPIC_NAME_RES_HDR;
+ topic_compose[HTTP_TOPIC_RES_BODY].topic_name = HTTP_TOPIC_NAME_RES_BODY;
+
+ topic_compose[HTTP_TOPIC_REQ_HEADER].topic_id = http_create_topic_nx(mod_mgr, HTTP_TOPIC_NAME_REQ_HDR);
+ topic_compose[HTTP_TOPIC_REQ_BODY].topic_id = http_create_topic_nx(mod_mgr, HTTP_TOPIC_NAME_REQ_BODY);
+ topic_compose[HTTP_TOPIC_RES_HEADER].topic_id = http_create_topic_nx(mod_mgr, HTTP_TOPIC_NAME_RES_HDR);
+ topic_compose[HTTP_TOPIC_RES_BODY].topic_id = http_create_topic_nx(mod_mgr, HTTP_TOPIC_NAME_RES_BODY);
+ chaotic_http_topic_mgr = http_topic_mgr;
+ return http_topic_mgr;
+ }
+
+ void http_topic_mgr_free(struct http_topic_manager *topic_mgr)
+ {
+ assert(topic_mgr != NULL);
+ FREE(topic_mgr);
+ }
+
+ static int http_subscribe_common(struct module_manager *mod_mgr, enum http_topic_type topic_type, void *cb, void *args)
+ {
+ struct http_topic_manager *http_topic_mgr = chaotic_http_topic_mgr;
+ if (http_topic_mgr == NULL)
+ {
+ http_topic_mgr = http_topic_mgr_init(mod_mgr);
+ chaotic_http_topic_mgr = http_topic_mgr;
+ }
+ return mq_schema_subscribe(module_manager_get_mq_schema(mod_mgr),
+ http_topic_mgr->topic_compose[topic_type].topic_id,
+ (on_msg_cb_func *)cb, args);
+ }
+
+ int http_subscribe_request_header(struct module_manager *mod_mgr, http_on_request_header_cb *cb, void *args)
+ {
+ assert(mod_mgr != NULL);
+ return http_subscribe_common(mod_mgr, HTTP_TOPIC_REQ_HEADER, cb, args);
+ }
+
+ int http_subscribe_response_header(struct module_manager *mod_mgr, http_on_response_header_cb *cb, void *args)
+ {
+ assert(mod_mgr != NULL);
+ return http_subscribe_common(mod_mgr, HTTP_TOPIC_RES_HEADER, cb, args);
+ }
+
+ int http_subscribe_request_body(struct module_manager *mod_mgr, http_on_body_cb *cb, void *args)
+ {
+ assert(mod_mgr != NULL);
+ return http_subscribe_common(mod_mgr, HTTP_TOPIC_REQ_BODY, cb, args);
+ }
+
+ int http_subscribe_response_body(struct module_manager *mod_mgr, http_on_body_cb *cb, void *args)
+ {
+ assert(mod_mgr != NULL);
+ return http_subscribe_common(mod_mgr, HTTP_TOPIC_RES_BODY, cb, args);
+ }
+
+ int http_subscribe(struct http *http, struct http_subscirbe_params *params, void *arg)
+ {
+ assert(http != NULL && params != NULL);
+ struct module_manager *mod_mgr = http->mod_mgr_ref;
+ int ret = 0;
+ if (params->req_hdr_cb != NULL)
+ {
+ ret = http_subscribe_request_header(mod_mgr, params->req_hdr_cb, arg);
+ if (ret < 0)
+ {
+ return ret;
+ }
+ }
+ if (params->res_hdr_cb != NULL)
+ {
+ ret = http_subscribe_response_header(mod_mgr, params->res_hdr_cb, arg);
+ if (ret < 0)
+ {
+ return ret;
+ }
+ }
+ if (params->req_body_cb != NULL)
+ {
+ ret = http_subscribe_request_body(mod_mgr, params->req_body_cb, arg);
+ if (ret < 0)
+ {
+ return ret;
+ }
+ }
+ if (params->res_body_cb != NULL)
+ {
+ ret = http_subscribe_response_body(mod_mgr, params->res_body_cb, arg);
+ if (ret < 0)
+ {
+ return ret;
+ }
+ }
+ return ret;
+ }
+
+ struct module *http_init(struct module_manager *mod_mgr)
+ {
+ assert(mod_mgr != NULL);
+ struct http *http_env = (struct http *)calloc(1, sizeof(struct http));
+ http_set_default_config(&http_env->hd_cfg);
+ http_load_config(module_manager_get_toml_path(mod_mgr), &http_env->hd_cfg);
+ http_stat_init(mod_mgr, &http_env->stat);
+ struct module *mod = module_new(HTTP_MODULE_NAME, http_env);
+ http_env->mod_mgr_ref = mod_mgr;
+
+ http_env->logger_ref = module_manager_get_logger(mod_mgr);
+ assert(http_env->logger_ref != NULL);
+ struct module *sess_mod = module_manager_get_module(mod_mgr, SESSION_MANAGER_MODULE_NAME);
+ struct session_manager *sess_mgr = module_to_session_manager(sess_mod);
+ assert(sess_mgr != NULL);
+
+ struct http_topic_manager *http_topic_mgr = http_topic_mgr_init(mod_mgr);
+ assert(http_topic_mgr != NULL);
+ http_env->http_topic_mgr = http_topic_mgr;
+
+ session_manager_subscribe_tcp_stream(sess_mgr, http_on_tcp_stream_cb, http_env);
+ http_env->exdata_id = session_manager_new_session_exdata_index(sess_mgr, HTTP_EXDATA_NAME, http_exdata_free_cb, http_env);
+ STELLAR_LOG_FATAL(http_env->logger_ref, HTTP_MODULE_NAME,
+ "http init success, decompress_switch:%d", http_env->hd_cfg.decompress_switch);
+ return mod;
+ }
+ struct module *http_get_module(struct module_manager *mod_mgr)
+ {
+ assert(mod_mgr != NULL);
+
+ struct module *http_mod = module_manager_get_module(mod_mgr, HTTP_MODULE_NAME);
+ if (NULL == http_mod)
+ {
+ http_mod = http_init(mod_mgr);
+ }
+ return http_mod;
+ }
+
+ struct http *http_module_to_http(struct module *http_mod)
+ {
+ assert(http_mod);
+ return (struct http *)module_get_ctx(http_mod);
+ }
+
+ void http_exit(struct module_manager *mod_mgr UNUSED, struct module *mod)
+ {
+ assert(mod != NULL);
+ struct http *http_env = (struct http *)module_get_ctx(mod);
+ http_stat_free(&http_env->stat);
+ http_topic_mgr_free(http_env->http_topic_mgr);
+ STELLAR_LOG_FATAL(http_env->logger_ref, HTTP_MODULE_NAME, "http exit!");
+ FREE(http_env);
+ module_free(mod);
+ }
+
+#ifdef __cplusplus
+}
+#endif