diff options
Diffstat (limited to 'decoders/http/http_decoder_module.c')
| -rw-r--r-- | decoders/http/http_decoder_module.c | 299 |
1 files changed, 299 insertions, 0 deletions
diff --git a/decoders/http/http_decoder_module.c b/decoders/http/http_decoder_module.c new file mode 100644 index 0000000..53ff71b --- /dev/null +++ b/decoders/http/http_decoder_module.c @@ -0,0 +1,299 @@ +#include <stdio.h> +#include <assert.h> +#include "stellar/session.h" +#include "stellar/module.h" +#include "http_decoder_utils.h" +#include "http_decoder_half.h" +#include "http_decoder.h" +#include "toml/toml.h" + +#ifdef __cplusplus +extern "C" +{ +#endif + + __thread struct http_topic_manager *chaotic_http_topic_mgr; + +#define HTTP_TOPIC_NAME_REQ_HDR "HTTP_TOPIC_REQ_HDR" +#define HTTP_TOPIC_NAME_REQ_BODY "HTTP_TOPIC_REQ_BODY" +#define HTTP_TOPIC_NAME_RES_HDR "HTTP_TOPIC_RES_HDR" +#define HTTP_TOPIC_NAME_RES_BODY "HTTP_TOPIC_RES_BODY" + + static void http_set_default_config(struct http_config *hd_cfg) + { + hd_cfg->decompress_switch = 1; + } + static int http_load_config(const char *cfg_path, struct http_config *hd_cfg) + { + FILE *fp = fopen(cfg_path, "r"); + if (NULL == fp) + { + fprintf(stderr, "[%s]Can't open config file:%s", __FUNCTION__, cfg_path); + return -1; + } + int ret = 0; + char errbuf[256] = {0}; + toml_table_t *root = toml_parse_file(fp, errbuf, sizeof(errbuf)); + fclose(fp); + + toml_table_t *section = toml_table_in(root, "http"); + if (section == NULL) + { + fprintf(stderr, "(logger) config file %s missing 'http' section\n", cfg_path); + goto error_exit; + } + + toml_datum_t int_val = toml_int_in(section, "decompress_enable"); + if (int_val.ok != 0) + { + hd_cfg->decompress_switch = int_val.u.b; + } + error_exit: + toml_free(root); + return ret; + } + + static void http_update_header_array(struct http_half_data *flow_data) + { + if (0 == http_get_header_field_count(flow_data)) + { + flow_data->header.field_array = NULL; + flow_data->header.field_array_num = 0; + } + else + { + flow_data->header.field_array = (struct http_header_field *)utarray_front(flow_data->ut_filed_array); + flow_data->header.field_array_num = utarray_len(flow_data->ut_filed_array); + } + } + + static void http_on_msg_dispatch(int topic_id UNUSED, void *msg, on_msg_cb_func *on_msg_cb, + void *on_msg_cb_arg, void *dispatch_arg UNUSED) + { + assert(msg != NULL && on_msg_cb != NULL); + struct http_message *hmsg = (struct http_message *)msg; + struct http_half_data *flow_data = hmsg->flow_data; + + if (hmsg->event == HTTP_EVENT_REQ_END || hmsg->event == HTTP_EVENT_RES_END) + { + /* notify subscriber ? */ + return; + } + switch (hmsg->topic_type) + { + case HTTP_TOPIC_REQ_HEADER: + { + http_update_header_array(flow_data); + http_on_request_header_cb *on_req_hdr_cb = (http_on_request_header_cb *)((void *)on_msg_cb); + on_req_hdr_cb(hmsg->sess_ref, flow_data->transaction_seq, &flow_data->req_line, &flow_data->header, + (const char *)flow_data->joint_url.iov_base, flow_data->joint_url.iov_len, + on_msg_cb_arg); + } + break; + case HTTP_TOPIC_RES_HEADER: + { + http_update_header_array(flow_data); + http_on_response_header_cb *on_res_hdr_cb = (http_on_response_header_cb *)((void *)on_msg_cb); + on_res_hdr_cb(hmsg->sess_ref, flow_data->transaction_seq, &flow_data->status_line, &flow_data->header, on_msg_cb_arg); + } + break; + case HTTP_TOPIC_REQ_BODY: + case HTTP_TOPIC_RES_BODY: + { + http_on_body_cb *on_body_cb = (http_on_body_cb *)((void *)on_msg_cb); + if (flow_data->decompress_body.body != NULL || flow_data->decompress_body.offset > 0) + { + on_body_cb(hmsg->sess_ref, flow_data->decompress_body.body, flow_data->decompress_body.body_sz, flow_data->decompress_body.offset, + flow_data->transaction_seq, flow_data->decompress_body.is_finished, on_msg_cb_arg); + } + else + { + on_body_cb(hmsg->sess_ref, flow_data->raw_body.body, flow_data->raw_body.body_sz, flow_data->raw_body.offset, + flow_data->transaction_seq, flow_data->raw_body.is_finished, on_msg_cb_arg); + } + } + break; + default: + assert(0); + break; + } + } + + static int http_create_topic_nx(struct module_manager *mod_mgr, const char *topic_name) + { + struct mq_schema *mq_s = module_manager_get_mq_schema(mod_mgr); + assert(mq_s != NULL); + int quic_topic_id = mq_schema_get_topic_id(mq_s, topic_name); + if (quic_topic_id >= 0) + { + return quic_topic_id; + } + int topic_id = mq_schema_create_topic(mq_s, topic_name, (on_msg_dispatch_cb_func *)http_on_msg_dispatch, + NULL, http_message_free_cb, NULL); + return topic_id; + } + + struct http_topic_manager *http_topic_mgr_init(struct module_manager *mod_mgr) + { + if (chaotic_http_topic_mgr != NULL) + { + return chaotic_http_topic_mgr; + } + struct http_topic_manager *http_topic_mgr = (struct http_topic_manager *)calloc(1, sizeof(struct http_topic_manager)); + assert(http_topic_mgr != NULL); + struct http_topic_compose *topic_compose = http_topic_mgr->topic_compose; + + topic_compose[HTTP_TOPIC_REQ_HEADER].topic_name = HTTP_TOPIC_NAME_REQ_HDR; + topic_compose[HTTP_TOPIC_REQ_BODY].topic_name = HTTP_TOPIC_NAME_REQ_BODY; + topic_compose[HTTP_TOPIC_RES_HEADER].topic_name = HTTP_TOPIC_NAME_RES_HDR; + topic_compose[HTTP_TOPIC_RES_BODY].topic_name = HTTP_TOPIC_NAME_RES_BODY; + + topic_compose[HTTP_TOPIC_REQ_HEADER].topic_id = http_create_topic_nx(mod_mgr, HTTP_TOPIC_NAME_REQ_HDR); + topic_compose[HTTP_TOPIC_REQ_BODY].topic_id = http_create_topic_nx(mod_mgr, HTTP_TOPIC_NAME_REQ_BODY); + topic_compose[HTTP_TOPIC_RES_HEADER].topic_id = http_create_topic_nx(mod_mgr, HTTP_TOPIC_NAME_RES_HDR); + topic_compose[HTTP_TOPIC_RES_BODY].topic_id = http_create_topic_nx(mod_mgr, HTTP_TOPIC_NAME_RES_BODY); + chaotic_http_topic_mgr = http_topic_mgr; + return http_topic_mgr; + } + + void http_topic_mgr_free(struct http_topic_manager *topic_mgr) + { + assert(topic_mgr != NULL); + FREE(topic_mgr); + } + + static int http_subscribe_common(struct module_manager *mod_mgr, enum http_topic_type topic_type, void *cb, void *args) + { + struct http_topic_manager *http_topic_mgr = chaotic_http_topic_mgr; + if (http_topic_mgr == NULL) + { + http_topic_mgr = http_topic_mgr_init(mod_mgr); + chaotic_http_topic_mgr = http_topic_mgr; + } + return mq_schema_subscribe(module_manager_get_mq_schema(mod_mgr), + http_topic_mgr->topic_compose[topic_type].topic_id, + (on_msg_cb_func *)cb, args); + } + + int http_subscribe_request_header(struct module_manager *mod_mgr, http_on_request_header_cb *cb, void *args) + { + assert(mod_mgr != NULL); + return http_subscribe_common(mod_mgr, HTTP_TOPIC_REQ_HEADER, cb, args); + } + + int http_subscribe_response_header(struct module_manager *mod_mgr, http_on_response_header_cb *cb, void *args) + { + assert(mod_mgr != NULL); + return http_subscribe_common(mod_mgr, HTTP_TOPIC_RES_HEADER, cb, args); + } + + int http_subscribe_request_body(struct module_manager *mod_mgr, http_on_body_cb *cb, void *args) + { + assert(mod_mgr != NULL); + return http_subscribe_common(mod_mgr, HTTP_TOPIC_REQ_BODY, cb, args); + } + + int http_subscribe_response_body(struct module_manager *mod_mgr, http_on_body_cb *cb, void *args) + { + assert(mod_mgr != NULL); + return http_subscribe_common(mod_mgr, HTTP_TOPIC_RES_BODY, cb, args); + } + + int http_subscribe(struct http *http, struct http_subscirbe_params *params, void *arg) + { + assert(http != NULL && params != NULL); + struct module_manager *mod_mgr = http->mod_mgr_ref; + int ret = 0; + if (params->req_hdr_cb != NULL) + { + ret = http_subscribe_request_header(mod_mgr, params->req_hdr_cb, arg); + if (ret < 0) + { + return ret; + } + } + if (params->res_hdr_cb != NULL) + { + ret = http_subscribe_response_header(mod_mgr, params->res_hdr_cb, arg); + if (ret < 0) + { + return ret; + } + } + if (params->req_body_cb != NULL) + { + ret = http_subscribe_request_body(mod_mgr, params->req_body_cb, arg); + if (ret < 0) + { + return ret; + } + } + if (params->res_body_cb != NULL) + { + ret = http_subscribe_response_body(mod_mgr, params->res_body_cb, arg); + if (ret < 0) + { + return ret; + } + } + return ret; + } + + struct module *http_init(struct module_manager *mod_mgr) + { + assert(mod_mgr != NULL); + struct http *http_env = (struct http *)calloc(1, sizeof(struct http)); + http_set_default_config(&http_env->hd_cfg); + http_load_config(module_manager_get_toml_path(mod_mgr), &http_env->hd_cfg); + http_stat_init(mod_mgr, &http_env->stat); + struct module *mod = module_new(HTTP_MODULE_NAME, http_env); + http_env->mod_mgr_ref = mod_mgr; + + http_env->logger_ref = module_manager_get_logger(mod_mgr); + assert(http_env->logger_ref != NULL); + struct module *sess_mod = module_manager_get_module(mod_mgr, SESSION_MANAGER_MODULE_NAME); + struct session_manager *sess_mgr = module_to_session_manager(sess_mod); + assert(sess_mgr != NULL); + + struct http_topic_manager *http_topic_mgr = http_topic_mgr_init(mod_mgr); + assert(http_topic_mgr != NULL); + http_env->http_topic_mgr = http_topic_mgr; + + session_manager_subscribe_tcp_stream(sess_mgr, http_on_tcp_stream_cb, http_env); + http_env->exdata_id = session_manager_new_session_exdata_index(sess_mgr, HTTP_EXDATA_NAME, http_exdata_free_cb, http_env); + STELLAR_LOG_FATAL(http_env->logger_ref, HTTP_MODULE_NAME, + "http init success, decompress_switch:%d", http_env->hd_cfg.decompress_switch); + return mod; + } + struct module *http_get_module(struct module_manager *mod_mgr) + { + assert(mod_mgr != NULL); + + struct module *http_mod = module_manager_get_module(mod_mgr, HTTP_MODULE_NAME); + if (NULL == http_mod) + { + http_mod = http_init(mod_mgr); + } + return http_mod; + } + + struct http *http_module_to_http(struct module *http_mod) + { + assert(http_mod); + return (struct http *)module_get_ctx(http_mod); + } + + void http_exit(struct module_manager *mod_mgr UNUSED, struct module *mod) + { + assert(mod != NULL); + struct http *http_env = (struct http *)module_get_ctx(mod); + http_stat_free(&http_env->stat); + http_topic_mgr_free(http_env->http_topic_mgr); + STELLAR_LOG_FATAL(http_env->logger_ref, HTTP_MODULE_NAME, "http exit!"); + FREE(http_env); + module_free(mod); + } + +#ifdef __cplusplus +} +#endif |
