From e8fb760d2103d7eea464dd37c006766507e7742d Mon Sep 17 00:00:00 2001 From: Lu Date: Thu, 19 Jul 2018 20:12:51 +0800 Subject: #5 实现HTTPSession的Bypass功能,并将命中白名单配置的Session置为Bypass状态。 * 实现HTTPSession的Bypass功能,当一个Session被置为Bypass状态时,不再调用上层处理业务,同时按Stream方式处理应答。 * 增加命中白名单配置的Session置为Bypass状态功能,提高白名单配置对应的连接的处理速度。 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/http.h | 62 ++++++++++++++++++++++++++++++++--------- src/http1.cc | 83 +++++++++++++++++++++++++++++++++++++------------------ src/httpaction.cc | 75 ++++++++++++++++++++++++++++++++++--------------- src/httpscan.cc | 3 ++ 4 files changed, 161 insertions(+), 62 deletions(-) diff --git a/src/http.h b/src/http.h index 54e78e8..79da780 100644 --- a/src/http.h +++ b/src/http.h @@ -107,7 +107,8 @@ public: enum CallbackTag { kCallbackTagIgnore, - kCallBackTagOnce, + kCallbackTagNormal, + kCallBackTagOnlyOnce, kCallbackTagRepeat }; @@ -120,20 +121,43 @@ public: virtual void SetResponseBodyTag(enum CallbackTag tag) { tag_response_body_cb_ = tag; } - virtual void DropMe() + /* 丢弃这一Session,不转发 */ + virtual void Drop() { /* Disable all callbacks */ - SetRequestHeaderTag(HttpSession::kCallbackTagIgnore); - SetRequestBodyTag(HttpSession::kCallbackTagIgnore); - SetResponseHeaderTag(HttpSession::kCallbackTagIgnore); - SetResponseBodyTag(HttpSession::kCallbackTagIgnore); + SetRequestHeaderTag(kCallbackTagIgnore); + SetRequestBodyTag(kCallbackTagIgnore); + SetResponseHeaderTag(kCallbackTagIgnore); + SetResponseBodyTag(kCallbackTagIgnore); /* Tag, please drop this session */ need_to_drop_ = true; } + /* 直通,不再处理这一Session中的任何内容 */ + virtual void Bypass() + { + /* Disable all callbacks */ + SetRequestHeaderTag(kCallbackTagIgnore); + SetRequestBodyTag(kCallbackTagIgnore); + SetResponseHeaderTag(kCallbackTagIgnore); + SetResponseBodyTag(kCallbackTagIgnore); + + need_to_drop_ = false; + } + virtual bool NeedToDrop() - { return need_to_drop_; } + { + return need_to_drop_; + } + + virtual bool NeedToBypass() + { + return (tag_request_header_cb_ == kCallbackTagIgnore && + tag_request_body_cb_ == kCallbackTagIgnore && + tag_response_header_cb_ == kCallbackTagIgnore && + tag_response_body_cb_ == kCallbackTagIgnore) && (!need_to_drop_); + } protected: HttpConnection & http_connection_; @@ -148,10 +172,10 @@ protected: http_session_cb_t response_body_cb_{nullptr}; /* Call tag */ - enum CallbackTag tag_request_header_cb_{kCallBackTagOnce}; - enum CallbackTag tag_request_body_cb_{kCallBackTagOnce}; - enum CallbackTag tag_response_header_cb_{kCallBackTagOnce}; - enum CallbackTag tag_response_body_cb_{kCallBackTagOnce}; + enum CallbackTag tag_request_header_cb_{kCallBackTagOnlyOnce}; + enum CallbackTag tag_request_body_cb_{kCallbackTagNormal}; + enum CallbackTag tag_response_header_cb_{kCallBackTagOnlyOnce}; + enum CallbackTag tag_response_body_cb_{kCallbackTagNormal}; /* Drop tag */ bool need_to_drop_{false}; @@ -162,8 +186,9 @@ private: while (cb_tag != kCallbackTagIgnore) { cb(*this); - if (cb_tag == kCallBackTagOnce) cb_tag = kCallbackTagIgnore; - if (cb_tag == kCallbackTagRepeat) cb_tag = kCallBackTagOnce; + if (cb_tag == kCallbackTagNormal) break; + if (cb_tag == kCallBackTagOnlyOnce) cb_tag = kCallbackTagIgnore; + if (cb_tag == kCallbackTagRepeat) cb_tag = kCallBackTagOnlyOnce; } } }; @@ -215,6 +240,11 @@ public: /* Body的Stolen接口 */ virtual body_content_ptr_t StolenBody() = 0; + /* Bypass,标记本请求为直通 + * 当请求标记为直通时,转发数据,不再调用业务处理函数 */ + virtual bool Bypass() = 0; + virtual void Bypass(bool is_bypass) = 0; + /* ReadOnly,标记本请求为只读。 * 当一个请求为只读请求时,业务不应修改它的内容,底层处理Readonly的请求时,应直接转发不缓存 */ virtual bool ReadOnly() = 0; @@ -257,6 +287,7 @@ public: kStateReading, kStateComplete, kStateStream, + kStateCalled, kStateStolen }; @@ -291,6 +322,11 @@ public: virtual bool Forward() = 0; virtual void Forward(bool is_forward) = 0; + /* Bypass,标记本应答为直通 + * 当应答标记为直通时,转发数据,不再调用业务处理函数 */ + virtual bool Bypass() = 0; + virtual void Bypass(bool is_bypass) = 0; + virtual section_state_t SectionState(section_t section) = 0; /* 构建指令,根据Object构建对应的Memory */ virtual void Construct() = 0; diff --git a/src/http1.cc b/src/http1.cc index 6a06d15..7b31ae2 100644 --- a/src/http1.cc +++ b/src/http1.cc @@ -204,6 +204,11 @@ public: void Uri(const std::string & url) override { str_uri_ = url; } + bool Bypass() override + { return false; } + void Bypass(bool is_bypass) override + {} + private: /* Http Version */ short major_version_{1}; @@ -509,6 +514,12 @@ public: void Forward(bool is_forward) override { forward_ = is_forward; } + bool Bypass() override + { return bypass_; } + + void Bypass(bool is_bypass) override + { bypass_ = true; } + section_state_t SectionState(section_t section) override { return section_state[section]; } @@ -561,6 +572,7 @@ private: bool forward_{true}; bool readonly_{false}; + bool bypass_{false}; /* Tags */ section_state_t section_state[kSectionMax]{kStateBegin}; @@ -615,12 +627,15 @@ private: bool __is_body_content_empty() { return body_contents_.empty(); } + + void __drop_all_body_content() + { body_contents_.clear(); } }; class Http1ResponseParserCallbacks { public: - static const http_parser_settings * CallbackSettings(); + static http_parser_settings * CallbackSettings(); static int CallbackOnMessageBegin(http_parser * parser); static int CallbackOnHeaderField(http_parser * parser, const char * at, size_t length); static int CallbackOnHeaderValue(http_parser * parser, const char * at, size_t length); @@ -635,7 +650,7 @@ private: { return static_cast(parser->data); } }; -const http_parser_settings * Http1ResponseParserCallbacks::CallbackSettings() +http_parser_settings * Http1ResponseParserCallbacks::CallbackSettings() { static struct http_parser_settings __parser_setting = { @@ -758,24 +773,19 @@ ssize_t Http1Response::ConstructFromMemory(const char * buf, size_t buflen) { parser_->data = this; - /* 31 == HTTP_PAUSED, - * TODO: 替换这一硬编码 */ - if (parser_->http_errno == 31) - { - http_parser_pause(parser_.get(), 0); - } + /* 31 == HTTP_PAUSED, TODO: 替换这一硬编码 */ + if (parser_->http_errno == 31) http_parser_pause(parser_.get(), 0); + if (bypass_) parser_->flags |= F_SKIPBODY; size_t sz_parsed = http_parser_execute(parser_.get(), Http1ResponseParserCallbacks::CallbackSettings(), buf, buflen); /* 暂停状态,为了调用外部回调函数 */ if(sz_parsed && parser_->http_errno == 31) - { return sz_parsed; - } /* 解析错误 */ - else if (sz_parsed && parser_->http_errno > 0) + if (sz_parsed && parser_->http_errno > 0) { throw invalid_input_format(string_format("Failed at http parsing: errcode=%u, %s, %s", parser_->http_errno, http_errno_name(static_cast(parser_->http_errno)), @@ -867,11 +877,13 @@ evbuffer_unique_ptr_t Http1Response::StolenEvBuf() if (SectionState(kSectionBody) == kStateComplete) { __set_section_state(kSectionBody, kStateStolen); + __drop_all_body_content(); } if (SectionState(kSectionMessage) == kStateComplete) { __set_section_state(kSectionMessage, kStateStolen); + __drop_all_body_content(); } return std::move(evbuf_content_raw_); @@ -933,35 +945,46 @@ int Http1Connection::on_connection_read_request(pxy_conn_ctx_t * conn_ctx, pxy_c int Http1Connection::on_connection_read_response(pxy_conn_ctx_t * conn_ctx, pxy_conn_desc_t * conn_this, pxy_conn_desc_t * conn_other) { - auto * http_session = last_uncomplete_session(kDirectionResponse); - assert(http_session != nullptr); + auto * session = last_uncomplete_session(kDirectionResponse); + assert(session != nullptr); - auto & response = dynamic_cast(http_session->response()); + auto & response = dynamic_cast(session->response()); auto * downstream_evbuf = bufferevent_get_input(conn_this->bev); auto * upstream_evbuf = bufferevent_get_output(conn_other->bev); - ssize_t forward_len = response.ConstructFromEvBuf(downstream_evbuf); - if (forward_len < 0) + /* 检查Bypass和Drop标记,该标记可能由请求侧设置 + * 设置任何一个,即不调用上层业务函数,直接解析到消息结束 */ + if (session->NeedToBypass() || session->NeedToDrop()) { - LOG(DEBUG) << conn_ctx << "ResponseConstructFromEvBuf Failed, connection turns to passthrough"; - conn_ctx->passthrough = 1; - return 0; + response.Bypass(true); } - else if (forward_len == 0) + + ssize_t forward_len = response.ConstructFromEvBuf(downstream_evbuf); + if (forward_len <= 0) { return 0; } if (response.SectionState(response.kSectionHeader) == response.kStateComplete) { - LOG(DEBUG) << std::addressof(http_session) << "CallResponseHeaderCallback"; - http_session->CallResponseHeaderCallback(); + session->CallResponseHeaderCallback(); + } + + if (session->NeedToBypass() || session->NeedToDrop()) + { + response.Bypass(true); + goto __forward; } if (response.SectionState(response.kSectionBody) == response.kStateComplete) { - LOG(DEBUG) << std::addressof(http_session) << "CallResponseBodyCallback"; - http_session->CallResponseBodyCallback(); + session->CallResponseBodyCallback(); + } + + if (session->NeedToBypass() || session->NeedToDrop()) + { + response.Bypass(true); + goto __forward; } /* 如果有任何一部分为Reading,说明该部分数据还为到来,暂时不转发 */ @@ -971,9 +994,15 @@ int Http1Connection::on_connection_read_response(pxy_conn_ctx_t * conn_ctx, pxy_ return 0; } - /* 转发缓存中的数据,如果该Response处理结束,本Session处理完毕,销毁Session */ - evbuffer_add_buffer(upstream_evbuf, response.StolenEvBuf().release()); - LOG(DEBUG) << "ForwardData, upstream len = " << evbuffer_get_length(upstream_evbuf); +__forward: + if (session->NeedToDrop()) + { + auto __to_drop_evbuf = response.StolenEvBuf(); + } + else + { + evbuffer_add_buffer(upstream_evbuf, response.StolenEvBuf().release()); + } if (response.SectionState(response.kSectionMessage) == response.kStateComplete) { diff --git a/src/httpaction.cc b/src/httpaction.cc index a4dd07e..32941c1 100644 --- a/src/httpaction.cc +++ b/src/httpaction.cc @@ -189,17 +189,22 @@ public: void Construct(const std::string & str_service_define) override {} void OnRequestHeader(HttpSession * session) override - {} + { return __set_session_bypass(session); } void OnRequestBody(HttpSession * session) override - {} + { return __set_session_bypass(session); } void OnResponseHeader(HttpSession * session) override - {} + { return __set_session_bypass(session); } void OnResponseBody(HttpSession * session) override - {} + { return __set_session_bypass(session); } + void LoggerSetup(std::unique_ptr logger) override {} void LoggerClear() override {} + +private: + void __set_session_bypass(HttpSession * session) + { session->Bypass(); } }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -226,8 +231,12 @@ protected: private: unsigned int resp_code_{500}; - std::string resp_location_{""}; - std::string resp_content_{""}; + + bool is_resp_location_set{false}; + std::string resp_location_{}; + + bool is_resp_content_set{false}; + std::string resp_content_{}; void do_redirect_action(HttpSession * session); }; @@ -266,43 +275,65 @@ void HttpActionRedirect::Construct(const std::string & str_kv) if (__str_key == "url") { resp_location_ = __str_value; + is_resp_location_set = true; continue; } - assert(0); + if (__str_key == "content") + { + resp_content_ = __str_value; + is_resp_content_set = true; + continue; + } + + throw std::invalid_argument(string_format( + "Illegal redirect keyrules: %s", __str_key.c_str())); } } void HttpActionRedirect::do_redirect_action(HttpSession * session) { - auto & http_connection = session->connection(); /* 创建新的HttpResponse */ auto http_session = std::make_unique(http_connection); - http_session->request(HttpRequestFactory(1, 0)); - http_session->response(HttpResponseFactory(1, 0)); + + /* 根据当前的Http协议版本构造新的请求、应答 */ + auto request_version = session->request().Version(); + auto version_major = std::get<0>(request_version); + auto version_minor = std::get<1>(request_version); + + http_session->request(HttpRequestFactory(version_major, version_minor)); + http_session->response(HttpResponseFactory(version_major, version_minor)); /* 构建Redirect Response */ auto & http_response = http_session->response(); http_response.ResponseCode(resp_code_); - /* TODO: 正文 */ - http_response.Headers().Add("Location", resp_location_); - http_response.Construct(); + /* Location字段 */ + if (is_resp_location_set) + { + http_response.Headers().Add("Location", resp_location_); + } - /* 写新构建的HttpSession */ - http_connection.Write(std::move(http_session)); + /* 应答内容 */ + if (is_resp_content_set) + { + auto body_segment_ptr = std::make_unique(); + body_segment_ptr->insert(body_segment_ptr->end(), resp_content_.begin(), resp_content_.end()); + + auto body_segment_vec = std::vector(); + body_segment_vec.push_back(std::move(body_segment_ptr)); + + http_response.Body(std::move(body_segment_vec)); + } - /* 禁用后续的调用流程 */ - session->SetRequestHeaderTag(HttpSession::kCallbackTagIgnore); - session->SetRequestBodyTag(HttpSession::kCallbackTagIgnore); - session->SetResponseHeaderTag(HttpSession::kCallbackTagIgnore); - session->SetResponseBodyTag(HttpSession::kCallbackTagIgnore); + http_response.Construct(); + http_connection.Write(std::move(http_session)); /* 丢弃当前的HTTP Session */ - session->DropMe(); + session->Drop(); return; } @@ -364,7 +395,7 @@ private: void HttpActionDrop::do_drop_action(HttpSession * session) { - session->DropMe(); + session->Drop(); }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/httpscan.cc b/src/httpscan.cc index 5700669..c8915ad 100644 --- a/src/httpscan.cc +++ b/src/httpscan.cc @@ -292,6 +292,9 @@ HttpScanSession::scan_result_t HttpScanSession::scan_headers(const HttpHeaders & if (ret < 0) { + LOG(ERROR) << "Error in setting maat scan status:" + "field =" << field << "field.length =" << field.length(); + scan_result = scan_result_t::kScanResultError; return false; } -- cgit v1.2.3