diff options
| author | Lu <[email protected]> | 2018-06-05 11:04:48 +0800 |
|---|---|---|
| committer | Lu <[email protected]> | 2018-06-05 11:04:48 +0800 |
| commit | fe5d92850822671a2988048cdd0ed9e1bc619812 (patch) | |
| tree | d07012aff87546953ac3c61d9f37124b703bcb7e /src | |
| parent | dd3b63b00fd1e7a0dddad0ae4b647096292ba9c9 (diff) | |
初步完成http2的解析框架搭建工作。
Diffstat (limited to 'src')
| -rw-r--r-- | src/CMakeLists.txt | 19 | ||||
| -rw-r--r-- | src/cache.cc | 2 | ||||
| -rw-r--r-- | src/cachemgr.cc | 149 | ||||
| -rw-r--r-- | src/cfgparser.h | 29 | ||||
| -rw-r--r-- | src/compat.h | 15 | ||||
| -rw-r--r-- | src/dynbuf.cc | 4 | ||||
| -rw-r--r-- | src/http.h | 9 | ||||
| -rw-r--r-- | src/http1.cc | 142 | ||||
| -rw-r--r-- | src/http2.cc | 402 | ||||
| -rw-r--r-- | src/httpaction.cc | 9 | ||||
| -rw-r--r-- | src/httpscan.cc | 3 | ||||
| -rw-r--r-- | src/logger.cc | 151 | ||||
| -rw-r--r-- | src/logger.h | 64 | ||||
| -rw-r--r-- | src/main.cc | 3 | ||||
| -rw-r--r-- | src/nat.cc | 2 | ||||
| -rw-r--r-- | src/opts.cc | 2 | ||||
| -rw-r--r-- | src/privsep.cc | 3 | ||||
| -rw-r--r-- | src/proxy.cc | 6 | ||||
| -rw-r--r-- | src/pxyconn.cc | 2 | ||||
| -rw-r--r-- | src/pxyconn.h | 5 | ||||
| -rw-r--r-- | src/pxysslshut.cc | 2 | ||||
| -rw-r--r-- | src/pxythrmgr.cc | 4 | ||||
| -rw-r--r-- | src/ssl.cc | 2 | ||||
| -rw-r--r-- | src/sys.cc | 13 |
24 files changed, 833 insertions, 209 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 30e3fa6..e227370 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,16 +1,23 @@ find_package(OpenSSL REQUIRED) -add_library(tfe-library base64.cc build.cc cache.cc cachemgr.cc cachessess.cc compat.cc - cachedsess.cc cachetgcrt.cc cachefkcrt.cc cert.cc certstore.cc - dynbuf.cc logbuf.cc log.cc logger.cc nat.cc opts.cc - privsep.cc proxy.cc pxythrmgr.cc pxysslshut.cc - ssl.cc sys.cc thrqueue.cc url.cc util.cc httpscan.cc httpaction.cc http1.cc http.cc) +add_library(tfe-library base64.cc build.cc cache.cc cachemgr.cc cachessess.cc compat.cc easylogging++.cc + cachedsess.cc cachetgcrt.cc cachefkcrt.cc cert.cc certstore.cc logger.cc + dynbuf.cc nat.cc opts.cc privsep.cc proxy.cc pxythrmgr.cc pxysslshut.cc + ssl.cc sys.cc thrqueue.cc url.cc util.cc httpscan.cc httpaction.cc http2.cc http1.cc http.cc) #pxyconn.cc target_include_directories(tfe-library PRIVATE ${OPENSSL_INCLUDE_DIR}) target_link_libraries(tfe-library ${OPENSSL_LIBRARIES}) -target_link_libraries(tfe-library pthread libevent-static libevent-static-openssl libevent-static-pthreads http-parser-static MESA_prof_load-static) + +target_link_libraries(tfe-library + pthread libevent-static + libevent-static-openssl + libevent-static-pthreads + http-parser-static + nghttp2-static + MESA_prof_load-static) + target_link_libraries(tfe-library maatframe MESA_handle_logger) add_executable(tfe main.cc) diff --git a/src/cache.cc b/src/cache.cc index c47ec0b..8645bb5 100644 --- a/src/cache.cc +++ b/src/cache.cc @@ -27,8 +27,6 @@ */ #include "cache.h" - -#include "log.h" #include "khash.h" #include <pthread.h> diff --git a/src/cachemgr.cc b/src/cachemgr.cc index 58b4dc5..1bb318f 100644 --- a/src/cachemgr.cc +++ b/src/cachemgr.cc @@ -32,13 +32,14 @@ #include "cachetgcrt.h" #include "cachessess.h" #include "cachedsess.h" -#include "log.h" #include "attrib.h" +#include "util.h" #include <string.h> #include <pthread.h> #include <netinet/in.h> +#include <stdexcept> cache_t *cachemgr_fkcrt; cache_t *cachemgr_tgcrt; @@ -50,10 +51,10 @@ cache_t *cachemgr_dsess; * Calls the _gc() method on the cache passed as argument, then returns. */ static void * -cachemgr_gc_thread(UNUSED void * arg) +cachemgr_gc_thread(UNUSED void *arg) { - cache_gc((cache_t *)arg); - return NULL; + cache_gc((cache_t *) arg); + return NULL; } /* @@ -64,24 +65,24 @@ cachemgr_gc_thread(UNUSED void * arg) int cachemgr_preinit(void) { - if (!(cachemgr_fkcrt = cache_new(cachefkcrt_init_cb))) - goto out4; - if (!(cachemgr_tgcrt = cache_new(cachetgcrt_init_cb))) - goto out3; - if (!(cachemgr_ssess = cache_new(cachessess_init_cb))) - goto out2; - if (!(cachemgr_dsess = cache_new(cachedsess_init_cb))) - goto out1; - return 0; + if (!(cachemgr_fkcrt = cache_new(cachefkcrt_init_cb))) + goto out4; + if (!(cachemgr_tgcrt = cache_new(cachetgcrt_init_cb))) + goto out3; + if (!(cachemgr_ssess = cache_new(cachessess_init_cb))) + goto out2; + if (!(cachemgr_dsess = cache_new(cachedsess_init_cb))) + goto out1; + return 0; out1: - cache_free(cachemgr_ssess); + cache_free(cachemgr_ssess); out2: - cache_free(cachemgr_tgcrt); + cache_free(cachemgr_tgcrt); out3: - cache_free(cachemgr_fkcrt); + cache_free(cachemgr_fkcrt); out4: - return -1; + return -1; } /* @@ -91,15 +92,15 @@ out4: int cachemgr_init(void) { - if (cache_reinit(cachemgr_fkcrt)) - return -1; - if (cache_reinit(cachemgr_tgcrt)) - return -1; - if (cache_reinit(cachemgr_ssess)) - return -1; - if (cache_reinit(cachemgr_dsess)) - return -1; - return 0; + if (cache_reinit(cachemgr_fkcrt)) + return -1; + if (cache_reinit(cachemgr_tgcrt)) + return -1; + if (cache_reinit(cachemgr_ssess)) + return -1; + if (cache_reinit(cachemgr_dsess)) + return -1; + return 0; } /* @@ -110,10 +111,10 @@ cachemgr_init(void) void cachemgr_fini(void) { - cache_free(cachemgr_dsess); - cache_free(cachemgr_ssess); - cache_free(cachemgr_tgcrt); - cache_free(cachemgr_fkcrt); + cache_free(cachemgr_dsess); + cache_free(cachemgr_ssess); + cache_free(cachemgr_tgcrt); + cache_free(cachemgr_fkcrt); } /* @@ -122,48 +123,54 @@ cachemgr_fini(void) * This function returns after the cleanup completed and all threads are * joined. */ -void -cachemgr_gc(void) +void cachemgr_gc(void) { - pthread_t fkcrt_thr, dsess_thr, ssess_thr; - int rv; - - /* the tgcrt cache does not need cleanup */ - - rv = pthread_create(&fkcrt_thr, NULL, cachemgr_gc_thread, - cachemgr_fkcrt); - if (rv) { - log_err_printf("cachemgr_gc: pthread_create failed: %s\n", - strerror(rv)); - } - rv = pthread_create(&ssess_thr, NULL, cachemgr_gc_thread, - cachemgr_ssess); - if (rv) { - log_err_printf("cachemgr_gc: pthread_create failed: %s\n", - strerror(rv)); - } - rv = pthread_create(&dsess_thr, NULL, cachemgr_gc_thread, - cachemgr_dsess); - if (rv) { - log_err_printf("cachemgr_gc: pthread_create failed: %s\n", - strerror(rv)); - } - - rv = pthread_join(fkcrt_thr, NULL); - if (rv) { - log_err_printf("cachemgr_gc: pthread_join failed: %s\n", - strerror(rv)); - } - rv = pthread_join(ssess_thr, NULL); - if (rv) { - log_err_printf("cachemgr_gc: pthread_join failed: %s\n", - strerror(rv)); - } - rv = pthread_join(dsess_thr, NULL); - if (rv) { - log_err_printf("cachemgr_gc: pthread_join failed: %s\n", - strerror(rv)); - } + pthread_t fkcrt_thr, dsess_thr, ssess_thr; + int rv; + + /* the tgcrt cache does not need cleanup */ + + rv = pthread_create(&fkcrt_thr, NULL, cachemgr_gc_thread, + cachemgr_fkcrt); + + if (rv) + { + throw std::runtime_error(string_format("cachemgr_gc: pthread_create failed: %s", strerror(rv))); + } + + rv = pthread_create(&ssess_thr, NULL, cachemgr_gc_thread, + cachemgr_ssess); + + if (rv) + { + throw std::runtime_error(string_format("cachemgr_gc: pthread_create failed: %s", strerror(rv))); + } + + rv = pthread_create(&dsess_thr, NULL, cachemgr_gc_thread, cachemgr_dsess); + if (rv) + { + throw std::runtime_error(string_format("cachemgr_gc: pthread_create failed: %s", strerror(rv))); + } + + rv = pthread_join(fkcrt_thr, NULL); + if (rv) + { + throw std::runtime_error(string_format("cachemgr_gc: pthread_join failed: %s", strerror(rv))); + } + + rv = pthread_join(ssess_thr, NULL); + if (rv) + { + throw std::runtime_error(string_format("cachemgr_gc: pthread_join failed: %s", strerror(rv))); + } + + rv = pthread_join(dsess_thr, NULL); + if (rv) + { + throw std::runtime_error(string_format("cachemgr_gc: pthread_join failed: %s", strerror(rv))); + } + + return; } /* vim: set noet ft=c: */ diff --git a/src/cfgparser.h b/src/cfgparser.h index 10bd1cd..4853ef2 100644 --- a/src/cfgparser.h +++ b/src/cfgparser.h @@ -1,14 +1,14 @@ -// -// Created by luqiu on 2018-4-26. -// - -#ifndef TFE_CFGPARSER_H -#define TFE_CFGPARSER_H - -#ifndef TFE_STRING_MAX -#define TFE_STRING_MAX 2048 -#endif - +/* + * \brief TFE配置文件读取封装 + * + * 因MESA_Prof_load为C编写的库,采用了返回值的风格报告运行错误,比较麻烦。在这里,将MESA_Prof_Load封装为 + * C++接口,将返回值错误报告方法改为异常,便于调用者集中处理异常。 + * + * \author Lu Qiuwen<[email protected]> + * \date 2018-5-25 + */ + +#pragma once #include <stdexcept> #include <string> #include <cstring> @@ -33,7 +33,10 @@ public: template<typename T> T GetValueWithDefault(const std::string &str_section, const std::string &str_entry, const T &default_value); + const std::string & Source() { return str_cfgfile_; } + private: + static constexpr unsigned TFE_STRING_MAX = 2048; std::string str_cfgfile_; }; @@ -92,6 +95,4 @@ T TfeConfigParser::GetValueWithDefault( } return std::move(__value); -} - -#endif //TFE_CFGPARSER_H +}
\ No newline at end of file diff --git a/src/compat.h b/src/compat.h index 9615fc4..1fcf1e0 100644 --- a/src/compat.h +++ b/src/compat.h @@ -32,7 +32,6 @@ using evbuffer_unique_ptr_t = std::unique_ptr<struct evbuffer, EventDeleter>; using file_unique_ptr_t = std::unique_ptr<FILE, FileDescriptorDeleter>; using fd_unique_ptr_t = std::unique_ptr<int, FileDescriptorDeleter>; - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// Stream.h内的数据结构转换工具 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -49,4 +48,16 @@ struct SappIpAddrDeleter }; using sapp_ip_addr_ptr_t = std::unique_ptr<struct ipaddr, SappIpAddrDeleter>; -sapp_ip_addr_ptr_t sockaddr_to_sapp_ipaddr(const struct sockaddr * sk_addr_src, const struct sockaddr * sk_addr_dst);
\ No newline at end of file +sapp_ip_addr_ptr_t sockaddr_to_sapp_ipaddr(const struct sockaddr * sk_addr_src, const struct sockaddr * sk_addr_dst); + + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +/// sslsplit logger to easylogger++ +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#include "easylogging++.h" +#include "util.h" + +#define log_dbg_printf(fmt, ...) do { LOG(DEBUG) << string_format(fmt, ##__VA_ARGS__); } while(0) +#define log_err_printf(fmt, ...) do { LOG(ERROR) << string_format(fmt, ##__VA_ARGS__); } while(0) +#define log_dbg_print_free(str) do { LOG(DEBUG) << std::string(str); free(str); } while(0)
\ No newline at end of file diff --git a/src/dynbuf.cc b/src/dynbuf.cc index 9ddaecb..a424a07 100644 --- a/src/dynbuf.cc +++ b/src/dynbuf.cc @@ -28,8 +28,8 @@ #include "dynbuf.h" -#include <string.h> -#include <stdio.h> +#include <cstring> +#include <cstdio> /* * Simple dynamic buffer, consisting of internal buffer ptr plus length. @@ -88,6 +88,15 @@ public: virtual void SetResponseBodyCallback(http_session_cb_t cb) { response_body_cb_ = cb; } + virtual void CallRequestHeaderCallback() + { request_header_cb_(*this);} + virtual void CallRequestBodyCallback() + { request_body_cb_(*this); } + virtual void CallResponseHeaderCallback() + { response_header_cb_(*this); } + virtual void CallResponseBodyCallback() + { response_body_cb_(*this); } + protected: HttpConnection &http_connection_; std::unique_ptr<HttpRequest> request_{nullptr}; diff --git a/src/http1.cc b/src/http1.cc index f02c02f..68934cb 100644 --- a/src/http1.cc +++ b/src/http1.cc @@ -25,7 +25,7 @@ #include "httpscan.h" #include "compat.h" #include "util.h" - +#include "easylogging++.h" class Http1Connection : public HttpConnection { @@ -33,6 +33,8 @@ public: Http1Connection() = default; ~Http1Connection() = default; + void Close() override { need_to_close_ = true;} ; + int on_connection_read_request(pxy_conn_ctx_t *conn_ctx, pxy_conn_desc_t *conn_this, pxy_conn_desc_t *conn_other); int on_connection_read_response(pxy_conn_ctx_t *conn_ctx, struct bufferevent *bev); int on_connection_close(pxy_conn_ctx_t *conn_ctx, struct bufferevent *bev); @@ -41,11 +43,12 @@ private: using http_sessions_t = std::list<std::unique_ptr<HttpSession>>; http_sessions_t http_sessions_; - HttpSession & create_new_session(); - HttpSession & last_uncomplete_session(); + HttpSession &create_new_session(); + HttpSession &last_uncomplete_session(); void drop_last_session(); void drop_first_session(); + bool need_to_close_{false}; }; class Http1Request : public HttpRequest @@ -55,17 +58,23 @@ public: virtual ~Http1Request() = default; virtual ssize_t ConstructFromMemory(const char *buf, size_t buflen); - virtual ssize_t ConstructFromEvBuf(struct evbuffer * evbuf_ptr); + virtual ssize_t ConstructFromEvBuf(struct evbuffer *evbuf_ptr); - virtual evbuffer_unique_ptr_t StolenEvbuf() { return std::move(evbuf_content_raw_); } + virtual evbuffer_unique_ptr_t StolenEvbuf() + { return std::move(evbuf_content_raw_); } - bool ReadOnly() override { return readonly_; } - void ReadOnly(bool is_readonly) override { readonly_ = is_readonly; } + bool ReadOnly() override + { return readonly_; } + void ReadOnly(bool is_readonly) override + { readonly_ = is_readonly; } - bool Forward() override { return forward_; } - void Forward(bool is_forward) override { forward_ = is_forward; } + bool Forward() override + { return forward_; } + void Forward(bool is_forward) override + { forward_ = is_forward; } virtual void ForEachHeader(for_each_header_cb_t cb); - bool Complete() override { return request_complete_; } + bool Complete() override + { return request_complete_; } private: std::string str_uri{}; @@ -136,17 +145,18 @@ ssize_t Http1Request::ConstructFromMemory(const char *buf, size_t buflen) /* 解析错误 */ if (sz_parsed && parser_->http_errno > 0) { - log_err_printf("Http Parser errno: %d", parser_->http_errno); - return -1; + throw invalid_input_format(string_format("Failed at http parsing: errcode=%u, %s, %s", + parser_->http_errno, http_errno_name(static_cast<http_errno>(parser_->http_errno)), + http_errno_description(static_cast<http_errno>(parser_->http_errno)))); } return sz_parsed; } -ssize_t Http1Request::ConstructFromEvBuf(struct evbuffer * evbuf_ptr) +ssize_t Http1Request::ConstructFromEvBuf(struct evbuffer *evbuf_ptr) { /* 展平输入的Buffer为线性空间 */ - auto * __data_ptr = (const char *)(evbuffer_pullup(evbuf_ptr, -1)); + auto *__data_ptr = (const char *) (evbuffer_pullup(evbuf_ptr, -1)); size_t __data_len = evbuffer_get_length(evbuf_ptr); /* 解析输入的Buffer */ @@ -154,7 +164,7 @@ ssize_t Http1Request::ConstructFromEvBuf(struct evbuffer * evbuf_ptr) if (forward_len < 0) return forward_len; /* 已解析的部分,从原Buffer中抽离出来 */ - struct evbuffer * reserved_buffer = evbuffer_new(); + struct evbuffer *reserved_buffer = evbuffer_new(); evbuffer_remove_buffer(evbuf_ptr, reserved_buffer, static_cast<size_t>(forward_len)); /* 保存在上下文中 */ @@ -164,10 +174,10 @@ ssize_t Http1Request::ConstructFromEvBuf(struct evbuffer * evbuf_ptr) void Http1Request::ForEachHeader(HttpRequest::for_each_header_cb_t cb) { - for(auto & __iterate : str_headers) + for (auto &__iterate : str_headers) { - const std::string & __key = __iterate.first; - const std::string & __value = __iterate.second; + const std::string &__key = __iterate.first; + const std::string &__value = __iterate.second; cb(__key, __value); } @@ -234,7 +244,7 @@ static const char *server_error_phrases[] = { /* 505 */ "HTTP Version not supported" }; -const char * __resp_code_to_str(int resp_code) +const char *__resp_code_to_str(int resp_code) { if (resp_code >= 101 && resp_code <= 102) return informational_phrases[resp_code - 101]; if (resp_code >= 200 && resp_code <= 206) return success_phrases[resp_code - 200]; @@ -251,16 +261,23 @@ public: Http1Response(); virtual ~Http1Response() = default; - virtual ssize_t ConstructFromMemory(const char * buf, size_t buflen); - virtual ssize_t ConstructFromEvBuf(struct evbuffer * evbuf_len); - evbuffer_unique_ptr_t StolenEvBuf() { return std::move(evbuf_content_raw_); } - - virtual int ResponseCode() final { return resp_code_; } - virtual bool ReadOnly() final { return readonly_; } - virtual void ReadOnly(bool is_readonly) final { readonly_ = is_readonly; } - virtual bool Forward() final {return forward_; } - virtual void Forward(bool is_forward) final { forward_ = is_forward; } - virtual bool Complete() final { return parse_complete_; } + virtual ssize_t ConstructFromMemory(const char *buf, size_t buflen); + virtual ssize_t ConstructFromEvBuf(struct evbuffer *evbuf_len); + evbuffer_unique_ptr_t StolenEvBuf() + { return std::move(evbuf_content_raw_); } + + virtual int ResponseCode() final + { return resp_code_; } + virtual bool ReadOnly() final + { return readonly_; } + virtual void ReadOnly(bool is_readonly) final + { readonly_ = is_readonly; } + virtual bool Forward() final + { return forward_; } + virtual void Forward(bool is_forward) final + { forward_ = is_forward; } + virtual bool Complete() final + { return parse_complete_; } virtual void Construct() final; private: @@ -291,7 +308,6 @@ ssize_t Http1Response::ConstructFromMemory(const char *buf, size_t buflen) __parser_setting.on_status = [](http_parser *parser, const char *at, size_t length) -> int { auto *__ptr_this = reinterpret_cast<Http1Response *>(parser->data); - __ptr_this->str_uri = std::string(at, at + length); return 0; }; @@ -309,7 +325,7 @@ ssize_t Http1Response::ConstructFromMemory(const char *buf, size_t buflen) return 0; }; - __parser_setting.on_message_complete = [](http_parser * parser)->int + __parser_setting.on_message_complete = [](http_parser *parser) -> int { auto *__ptr_this = reinterpret_cast<Http1Response *>(parser->data); __ptr_this->parse_complete_ = true; @@ -333,7 +349,7 @@ ssize_t Http1Response::ConstructFromMemory(const char *buf, size_t buflen) ssize_t Http1Response::ConstructFromEvBuf(struct evbuffer *evbuf_ptr) { /* 展平输入的Buffer为线性空间 */ - auto * __data_ptr = (const char *)(evbuffer_pullup(evbuf_ptr, -1)); + auto *__data_ptr = (const char *) (evbuffer_pullup(evbuf_ptr, -1)); size_t __data_len = evbuffer_get_length(evbuf_ptr); /* 解析输入的Buffer */ @@ -341,7 +357,7 @@ ssize_t Http1Response::ConstructFromEvBuf(struct evbuffer *evbuf_ptr) if (forward_len < 0) return forward_len; /* 已解析的部分,从原Buffer中抽离出来 */ - struct evbuffer * reserved_buffer = evbuffer_new(); + struct evbuffer *reserved_buffer = evbuffer_new(); evbuffer_remove_buffer(evbuf_ptr, reserved_buffer, static_cast<size_t>(forward_len)); /* 保存在上下文中 */ @@ -353,17 +369,17 @@ void Http1Response::Construct() { /* 新申请EvBuffer */ evbuffer_unique_ptr_t evbuf_construct = evbuffer_unique_ptr_t(evbuffer_new()); - struct evbuffer * evbuf_ptr = evbuf_construct.get(); + struct evbuffer *evbuf_ptr = evbuf_construct.get(); /* 应答第一行 */ evbuffer_add_printf(evbuf_ptr, "HTTP/%d.%d %d %s\r\n", resp_version_major_, resp_version_minor_, resp_code_, __resp_code_to_str(resp_code_)); /* 应答头部 */ - for(const auto & __iterate : str_headers) + for (const auto &__iterate : str_headers) { - const auto & __header_field = __iterate.first; - const auto & __header_value = __iterate.second; + const auto &__header_field = __iterate.first; + const auto &__header_value = __iterate.second; evbuffer_add_printf(evbuf_ptr, "%s:%s\r\n", __header_field.c_str(), __header_value.c_str()); } @@ -377,17 +393,18 @@ int Http1Connection::on_connection_close(pxy_conn_ctx_t *conn_ctx, struct buffer return 0; } -int Http1Connection::on_connection_read_request(pxy_conn_ctx_t *conn_ctx, - pxy_conn_desc_t *conn_this, pxy_conn_desc_t *conn_other) +int Http1Connection::on_connection_read_request( + pxy_conn_ctx_t *conn_ctx, + pxy_conn_desc_t *conn_this, pxy_conn_desc_t *conn_other) { /* Get the last session */ auto http_session_ctx = last_uncomplete_session(); - auto * request_ctx = reinterpret_cast<Http1Request *>(http_session_ctx.request()); + auto *request = reinterpret_cast<Http1Request *>(http_session_ctx.request()); auto *downstream_evbuf = bufferevent_get_input(conn_this->bev); auto *upstream_evbuf = bufferevent_get_output(conn_other->bev); - ssize_t forward_len = request_ctx->ConstructFromEvBuf(downstream_evbuf); + ssize_t forward_len = request->ConstructFromEvBuf(downstream_evbuf); if (forward_len < 0) { @@ -395,43 +412,30 @@ int Http1Connection::on_connection_read_request(pxy_conn_ctx_t *conn_ctx, return 0; } - if (request_ctx->request_complete) + if (need_to_close_) { - httpscan_ctx->ScanRequestHeader(http_session_ctx); - httpscan_ctx->ScanRequestBody(http_session_ctx); + conn_ctx->enomem = 1; + return 0; + } - if (http_session_ctx->need_to_close_connection) - { - conn_ctx->enomem = 1; - return 0; - } + if (request->Complete()) + { + http_session_ctx.CallRequestHeaderCallback(); + http_session_ctx.CallRequestBodyCallback(); /* 转发请求 */ - if (request_ctx->forward) + if (request->Forward()) { - evbuffer_add_buffer(upstream_evbuf, request_ctx->StolenRawEvbuf()); - return 0; - } - - /* 丢弃请求,检查是否构建了响应 */ - auto * response_ctx = reinterpret_cast<Http1Response *>(http_session_ctx->response()); - if (response_ctx != nullptr) - { - struct evbuffer * resp_evbuf = response_ctx->stolen_content_raw(); - bufferevent_write_buffer(conn_this->bev, resp_evbuf); - - printf("bufferevent_get_enabled, conn_this = %d\n", bufferevent_get_enabled(conn_this->bev)); - printf("bufferevent_get_enabled, conn_other = %d\n", bufferevent_get_enabled(conn_other->bev)); + auto stolen_raw_evbuf = request->StolenEvbuf(); + evbuffer_add_buffer(upstream_evbuf, stolen_raw_evbuf.release()); } - /* 最后一个Session已经处理完毕,丢弃上下文 */ - drop_last_session(); return 0; } - if (request_ctx->transport_forward) + if (request->ReadOnly()) { - evbuffer_add_buffer(upstream_evbuf, request_ctx->StolenRawEvbuf()); + evbuffer_add_buffer(upstream_evbuf, request->StolenEvbuf().release()); return 0; } @@ -443,7 +447,7 @@ int Http1Connection::on_connection_read_response(pxy_conn_ctx_t *conn_ctx, struc return 0; } -HttpSession & Http1Connection::create_new_session() +HttpSession &Http1Connection::create_new_session() { /* Create new session, set a new request */ auto __http_session = std::unique_ptr<HttpSession>(new HttpSession(*this)); @@ -454,7 +458,7 @@ HttpSession & Http1Connection::create_new_session() return *__http_session; } -HttpSession & Http1Connection::last_uncomplete_session() +HttpSession &Http1Connection::last_uncomplete_session() { if (http_sessions_.cbegin() == http_sessions_.cend()) return create_new_session(); diff --git a/src/http2.cc b/src/http2.cc index f882d92..2003826 100644 --- a/src/http2.cc +++ b/src/http2.cc @@ -1,4 +1,400 @@ -// -// Created by luqiu on 2018-5-28. -// +extern "C" +{ +#include <nghttp2/nghttp2.h> +#include <nghttp2/nghttp2ver.h> +#include <event2/bufferevent.h> +} + +#include <memory> +#include <cassert> +#include <nghttp2/nghttp2.h> +#include "easylogging++.h" +#include "util.h" +#include "http.h" + +struct nghttp2_deleter +{ + void operator()(nghttp2_session *session) + { nghttp2_session_del(session); } +}; + +using nghttp2_session_ptr_t = std::unique_ptr<nghttp2_session, nghttp2_deleter>; + +class Http2Connection : public HttpConnection +{ +public: + explicit Http2Connection(struct bufferevent *bev_upstream, struct bufferevent *bev_downstream); + virtual ~Http2Connection() = default; + +private: + /* Upstream Bufferevent */ + struct bufferevent *bev_upstream_; + /* Downstream Bufferevent */ + struct bufferevent *bev_downstream_; + + /* Callbacks can modify the internal variables */ + friend class h2_upstream; + friend class h2_downstream; +}; + +class Http2Request : public HttpRequest +{ +public: + void Serialize(std::vector<nghttp2_nv> &nv_array_output, nghttp2_data_provider *data_prd); + void ConstructContent(const char *content, size_t content_len, bool is_append); + +private: + using header_kv_t = std::pair<std::string, std::string>; + std::vector<std::unique_ptr<header_kv_t>> header_kv_store_{}; + std::map<std::string, header_kv_t &> header_kv_lookup_{}; + + using header_content_t = std::vector<char>; + header_content_t content_{}; +}; + +void Http2Request::Serialize(std::vector<nghttp2_nv> &nv_array_output, nghttp2_data_provider *data_prd) +{ + /* HTTP2头部序列化 */ + for (auto kv_iterater : header_kv_store_) + { + const auto &header_k = kv_iterater->first; + const auto &header_v = kv_iterater->second; + + nghttp2_nv __nv; + + /* 构建NV对,应用程序需保证Name, Value均为小写 */ + __nv.name = static_cast<uint8_t *>(header_k.c_str()); + __nv.value = static_cast<uint8_t *>(header_v.c_str()); + __nv.namelen = header_k.length(); + __nv.valuelen = header_v.length(); + + /* 设置不拷贝标志为,下层发送HTTP2数据时使用此处传入的指针,需保证在 + * 发送结束前内存不释放 */ + + __nv.flags = NGHTTP2_NV_FLAG_NO_COPY_NAME | NGHTTP2_NV_FLAG_NO_COPY_VALUE; + nv_array_output.push_back(__nv); + } + + /* Http2请求头部序列化,需要发送的内容位于content中,需保证本对象的生存周期长于发送回调函数 */ + data_prd->source.ptr = &content_; + data_prd->source.fd = 0; + + /* Http2序列化回调函数,在发送时调用 + * TODO: 多片发送,当要发送的内容超过一个frame的大小时,应将内容分片发送 */ + data_prd->read_callback = [](nghttp2_session *session, int32_t stream_id, uint8_t *buf, + size_t length, uint32_t *data_flags, nghttp2_data_source *source, void *user_data) -> ssize_t + { + (void) session; + (void) stream_id; + (void) user_data; + + /* length为最大发送长度,返回实际发送的长度,取实际发送长度为内容与缓冲区长度的最小值 */ + header_content_t *header_content_ptr = static_cast<header_content_t *>(source->ptr); + ssize_t sz_want_to_send = std::min(length, header_content_ptr->size()); + + /* 如果发送长度为0,表示没有内容需要发送了,标记本片为EOF并返回 */ + if (sz_want_to_send == 0) + { + *data_flags |= NGHTTP2_DATA_FLAG_EOF; + return 0; + } + + /* 复制到目的缓冲区,并删除缓冲区的已经发送的内容 */ + std::copy_n(header_content_ptr->cbegin(), sz_want_to_send, buf); + header_content_ptr->erase(header_content_ptr->cbegin(), header_content_ptr->cbegin() + sz_want_to_send); + + return sz_want_to_send; + }; + + return; +} + +void Http2Request::ConstructContent(const char *content, size_t content_len, bool is_append) +{ + if (is_append) + { + content_.resize(content_.size() + content_len); + std::copy_n(content, content_len, std::back_inserter(content_)); + } + else + { + content_.resize(content_len); + std::copy_n(content, content_len, content_.begin()); + } + + return; +} + +class Http2Response : public HttpResponse +{ +private: + using header_kv_t = std::pair<std::string, std::string>; + std::vector<std::unique_ptr<header_kv_t>> header_kv_store_{}; + std::map<std::string, header_kv_t &> header_kv_lookup_{}; +}; + +class Http2Session : public HttpSession +{ +public: + explicit Http2Session(Http2Connection &conn) : HttpSession(conn) {} + virtual Http2Session() = default; + + int32_t stream_id_upstream{0}; + int32_t stream_id_downstream{0}; + +private: + std::unique_ptr<Http2Request> request_{std::make_unique<Http2Request>()}; + std::unique_ptr<Http2Response> response_{std::make_unique<Http2Response>()}; +}; + +class h2_downstream +{ +public: + explicit h2_downstream(Http2Connection &conn); + virtual ~h2_downstream() = default; + +private: + nghttp2_session_ptr_t h2_session_ptr_; + Http2Connection &conn_; + + static ssize_t cb_on_send(nghttp2_session *session, const uint8_t *data, + size_t length, int flags, void *user_data); + + static int cb_on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame, + void *user_data); + + static int cb_on_stream_close(nghttp2_session *session, int32_t stream_id, + uint32_t error_code, void *user_data); + + static int cb_on_header_begin(nghttp2_session *session, const nghttp2_frame *frame, + void *user_data); + + static int cb_on_header(nghttp2_session *session, const nghttp2_frame *frame, + const uint8_t *name, size_t namelen, + const uint8_t *value, size_t valuelen, + uint8_t flags, void *user_data); + + static int cb_on_data_chunk_recv(nghttp2_session *session, uint8_t flags, int32_t stream_id, + const uint8_t *data, size_t len, void *user_data); +}; + +class h2_upstream +{ +public: + explicit h2_upstream(Http2Connection &conn); + virtual ~h2_upstream() = default; + + void SubmitFrame(Http2Session &session, const nghttp2_frame *frame); + void SubmitRequest(Http2Session &session, Http2Request &request); + +private: + nghttp2_session_ptr_t h2_session_ptr_; + Http2Connection &conn_; +}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +/// Http2Connection实现,Http2Connection映射TCP连接,每个Http2Connection处理多个HttpSession +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +Http2Connection::Http2Connection(struct bufferevent *bev_upstream, struct bufferevent *bev_downstream) : + bev_upstream_(bev_upstream), bev_downstream_(bev_downstream_) +{} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +/// Http2 Downstream Callbacks +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +h2_downstream::h2_downstream(Http2Connection &conn) : conn_(conn) +{ + nghttp2_session_callbacks *callbacks; + nghttp2_session_callbacks_new(&callbacks); + + nghttp2_session_callbacks_set_send_callback(callbacks, cb_on_send); + nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, cb_on_frame_recv); + + nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, cb_on_data_chunk_recv); + nghttp2_session_callbacks_set_on_stream_close_callback(callbacks, cb_on_stream_close); + + nghttp2_session_callbacks_set_on_begin_headers_callback(callbacks, cb_on_header_begin); + nghttp2_session_callbacks_set_on_header_callback(callbacks, cb_on_header); + + nghttp2_session *__tmp_session_ptr = nullptr; + nghttp2_session_server_new(&__tmp_session_ptr, callbacks, this); + nghttp2_session_callbacks_del(callbacks); + + h2_session_ptr_.reset(__tmp_session_ptr); +} + +/* Upstream、Downstream共用 */ +ssize_t h2_downstream::cb_on_send(nghttp2_session *session, const uint8_t *data, + size_t length, int flags, void *user_data) +{ + auto *this_ptr = dynamic_cast<h2_downstream *>(user_data); + struct bufferevent *h2_bev = this_ptr->conn_.bev_downstream_; + + if (bufferevent_write(h2_bev, data, length) < 0) + { + LOG(WARNING) << string_format("bufferevent_write(h2bev = %p, data = %p, length = %u) failed.", + h2_bev, data, length); + return 0; + } + + return length; +} + +/* Downstream实现 */ +int h2_downstream::cb_on_header_begin(nghttp2_session *session, const nghttp2_frame *frame, void *user_data) +{ + auto *this_ptr = dynamic_cast<h2_downstream *>(user_data); + auto h2_connection = this_ptr->conn_; + + /* 对于Downstream->Upstream的报文,在HTTP Request时建立新上下文 */ + if (frame->hd.type != NGHTTP2_HEADERS || frame->headers.cat != NGHTTP2_HCAT_REQUEST) + { + return 0; + } + + auto h2_session = new Http2Session(h2_connection); + nghttp2_session_set_stream_user_data(session, frame->hd.stream_id, h2_session); + + return 0; +} + +int h2_downstream::cb_on_header(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, + size_t namelen, const uint8_t *value, size_t valuelen, uint8_t flags, void *user_data) +{ + auto *this_ptr = dynamic_cast<h2_downstream *>(user_data); + auto *h2_session = (Http2Session *) nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); + + assert(this_ptr != nullptr); + assert(h2_session != nullptr); + + std::string str_header_name; + std::string str_header_value; + + std::copy_n(name, std::back_inserter(str_header_name), namelen); + std::copy_n(value, std::back_inserter(str_header_value), valuelen); + + if (frame->hd.type == NGHTTP2_HCAT_REQUEST) + { + auto request = h2_session->request(); + request.HeaderValue(str_header_name, str_header_value); + } + else if (frame->hd.type == NGHTTP2_HCAT_RESPONSE) + { + auto response = h2_session->response(); + response.HeaderValue(str_header_name, str_header_value); + } + else if (frame->hd.type == NGHTTP2_HCAT_PUSH_RESPONSE) + { + auto response = h2_session->response(); + response.HeaderValue(str_header_name, str_header_value); + } + else + { + //TODO: 不能识别的Http2Header类型,计数处理 + } + + return 0; +} + +int h2_downstream::cb_on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame, void *user_data) +{ + auto *h2_session = (Http2Session *) nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); + + /* 对于Header、Data类型的帧,完成Request对象的构建后,调用业务层处理函数 */ + if ((frame->hd.flags & NGHTTP2_FLAG_END_HEADERS) && frame->hd.type == NGHTTP2_HEADERS) + { + h2_session->CallRequestBodyCallback(); + } + + if ((frame->hd.flags & NGHTTP2_FLAG_END_STREAM) && frame->hd.type == NGHTTP2_DATA) + { + h2_session->CallRequestBodyCallback(); + } + + return 0; +} + +int h2_downstream::cb_on_stream_close(nghttp2_session *session, int32_t stream_id, uint32_t error_code, + void *user_data) +{ + auto *h2_session = (Http2Session *) nghttp2_session_get_stream_user_data(session, stream_id); + delete h2_session; + + return 0; +} + +int h2_downstream::cb_on_data_chunk_recv(nghttp2_session *session, uint8_t flags, int32_t stream_id, + const uint8_t *data, size_t len, void *user_data) +{ + auto *h2_session = (Http2Session *) nghttp2_session_get_stream_user_data(session, stream_id); + + /* 构建Http请求内容,由于可能存在Content分为多个DATA帧传输的情况, + * 将每片数据采用Append的方式构建HttpRequest对象 */ + auto h2_request = dynamic_cast<Http2Request &>(h2_session->request()); + h2_request.ConstructContent(static_cast<const char *>(data), len, true); + + return 0; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +/// Http2 Upstream Callbacks +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +h2_upstream::h2_upstream(Http2Connection &conn) : conn_(conn) +{ + nghttp2_session_callbacks *callbacks; + nghttp2_session_callbacks_new(&callbacks); + + nghttp2_session_callbacks_set_send_callback(callbacks, cb_on_send); + nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, cb_on_frame_recv); + + nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, cb_on_data_chunk_recv); + nghttp2_session_callbacks_set_on_stream_close_callback(callbacks, cb_on_stream_close); + + nghttp2_session_callbacks_set_on_begin_headers_callback(callbacks, cb_on_header_begin); + nghttp2_session_callbacks_set_on_header_callback(callbacks, cb_on_header); + + nghttp2_session *__tmp_session_ptr = nullptr; + nghttp2_session_client_new(&__tmp_session_ptr, callbacks, this); + + h2_session_ptr_.reset(__tmp_session_ptr); + nghttp2_session_callbacks_del(callbacks); +} + +void h2_upstream::SubmitFrame(Http2Session &session, const nghttp2_frame *frame) +{ + auto h2_session = h2_session_ptr_.get(); + + /* Setting类型的Frame,透明转发 */ + if (frame->hd.type == NGHTTP2_SETTINGS) + { + nghttp2_submit_settings(h2_session, NGHTTP2_FLAG_NONE, frame->settings.iv, frame->settings.niv); + } + + if (frame->hd.type == NGHTTP2_GOAWAY) + { + nghttp2_submit_goaway(h2_session, NGHTTP2_FLAG_NONE, 0, frame->goaway.error_code, + frame->goaway.opaque_data, frame->goaway.opaque_data_len); + } + + /* 其他类型的封装,需要代理协议栈做出对应的动作,不能透转 */ + return; +} + +void h2_upstream::SubmitRequest(Http2Session &session, Http2Request &request) +{ + /* 序列化请求 */ + std::vector<nghttp2_nv> nv_headers; + nghttp2_data_provider data_provider; + request.Serialize(nv_headers, &data_provider); + + auto nv_array_ptr = static_cast<nghttp2_nv *>(nv_headers.data()); + auto nv_array_len = nv_headers.size(); + + int32_t stream_id = nghttp2_submit_request(h2_session_ptr_.get(), nullptr, + nv_array_ptr, nv_array_len, &data_provider, this); + + session.stream_id_upstream = stream_id; +}
\ No newline at end of file diff --git a/src/httpaction.cc b/src/httpaction.cc index d24a652..897d4a1 100644 --- a/src/httpaction.cc +++ b/src/httpaction.cc @@ -7,9 +7,16 @@ #include <regex> #include "httpaction.h" -#include "log.h" #include "http.h" #include "util.h" +#include "logger.h" + +/* 结构化日志Topic注册 */ +STRUCT_LOGGER_TOPIC_REGISTER(TFE_LOG_BYPASS_ACTION); +STRUCT_LOGGER_TOPIC_REGISTER(TFE_LOG_BLOCK_ACTION); +STRUCT_LOGGER_TOPIC_REGISTER(TFE_LOG_REDIRECT_ACTION); +STRUCT_LOGGER_TOPIC_REGISTER(TFE_LOG_EDIT_ACTION); +STRUCT_LOGGER_TOPIC_REGISTER(TFE_LOG_MONITOR_ACTION); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// HTTP白名单,不进行任何处理 diff --git a/src/httpscan.cc b/src/httpscan.cc index f8683cc..8273b02 100644 --- a/src/httpscan.cc +++ b/src/httpscan.cc @@ -19,9 +19,8 @@ #include "pxyconn.h" #include "http.h" #include "compat.h" +#include "logger.h" -#pragma clang diagnostic push -#pragma ide diagnostic ignored "IncompatibleTypes" static int __maat_table_register_or_throw(Maat_feather_t feather, const char *str_table) { int table_id = Maat_table_register(feather, str_table); diff --git a/src/logger.cc b/src/logger.cc index ca12d6f..eba741d 100644 --- a/src/logger.cc +++ b/src/logger.cc @@ -1,5 +1,150 @@ -// -// Created by luqiu on 2018-5-25. -// +extern "C" +{ +#include <librdkafka/rdkafka.h> +} + +#include "easylogging++.h" #include "logger.h" + +static std::vector<std::string> g_structlogger_topic; + +void StructLogger::Log(const std::string &topic, Json::Value &struct_log) +{ + if (identify_tag_.length() != 0) + { + struct_log["str_machine_id"] = identify_tag_; + } + + std::string str_struct_log = struct_log.toStyledString(); + for (auto &__iterate : backends_) __iterate->WriteLog(topic, str_struct_log); +} + +class LoggerKafka : public StructLoggerBackend +{ +public: + LoggerKafka(const std::string &str_brokers); + virtual ~LoggerKafka() = default; + + void WriteLog(const std::string &str_topic, const std::string &str) override; + void RegisterTopic(const std::string &str_topic) override; + +private: + /* Kafka Handle Deleter,析构时自动删除Kafka的句柄 */ + struct kafka_handle_deleter_ + { + void operator()(rd_kafka_t *handle) + { rd_kafka_destroy(handle); } + void operator()(rd_kafka_topic_t *topic_handle) + { rd_kafka_topic_destroy(topic_handle); } + void operator()(rd_kafka_conf_t *conf_handle) + { rd_kafka_conf_destroy(conf_handle); } + }; + + using kafka_handle_ptr_t = std::unique_ptr<rd_kafka_t, kafka_handle_deleter_>; + using kafka_topic_ptr_t = std::unique_ptr<rd_kafka_topic_t, kafka_handle_deleter_>; + using kafka_conf_ptr_t = std::unique_ptr<rd_kafka_conf_t, kafka_handle_deleter_>; + + /* Topic句柄,根据Topic返回句柄指针 */ + std::map<std::string, kafka_topic_ptr_t> kafka_topic_handle_; + /* Global句柄 */ + kafka_handle_ptr_t kafka_handle_; +}; + +LoggerKafka::LoggerKafka(const std::string &str_brokers) +{ + char __errstr[2048] = {0}; + + /* 申请Kafka的Conf句柄 */ + auto kafka_conf_ptr = kafka_conf_ptr_t(rd_kafka_conf_new()); + + /* 设置Broker的地址 */ + int ret = rd_kafka_conf_set(kafka_conf_ptr.get(), "bootstrap.servers", str_brokers.c_str(), + __errstr, sizeof(__errstr)); + + if (ret != RD_KAFKA_CONF_OK) + { + throw std::runtime_error(string_format("Failed to setup bootstrap.servers for kafka : %s", + __errstr)); + } + + /* 申请Kafka句柄 */ + kafka_handle_ = decltype(kafka_handle_)(rd_kafka_new(RD_KAFKA_PRODUCER, + kafka_conf_ptr.release(), __errstr, sizeof(__errstr))); + + if (kafka_handle_ == nullptr) + { + throw std::runtime_error(string_format("Failed to create new kafka producer : %s", + __errstr)); + } + + LOG(INFO) << "LoggerKafka: Brokers = " << str_brokers; +} + +void LoggerKafka::RegisterTopic(const std::string &str_topic) +{ + auto __rkt_ptr = kafka_topic_ptr_t(rd_kafka_topic_new( + kafka_handle_.get(), str_topic.c_str(), nullptr)); + + if (__rkt_ptr == nullptr) + { + throw std::runtime_error(string_format("Failed to create kafka topic object, " + "kafka_handle = %p, topic = %s: %s", kafka_handle_.get(), + str_topic.c_str(), rd_kafka_err2str(static_cast<rd_kafka_resp_err_t>(errno)))); + } + + kafka_topic_handle_[str_topic] = std::move(__rkt_ptr); + + LOG(INFO) << "LoggerKafka: Topic = " << str_topic; +} + +void LoggerKafka::WriteLog(const std::string &str_topic, const std::string &str) +{ + auto *__topic_handle = kafka_topic_handle_[str_topic].get(); + + int ret = rd_kafka_produce(__topic_handle, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, + (void *)(str.c_str()), str.length(), nullptr, 0, nullptr); + + if (ret == -1) + { + throw std::runtime_error(string_format("Failed at enqueue logmsg (%zd bytes), topic = %s", + str.length(), str_topic.c_str())); + } + + LOG(DEBUG) << string_format("LoggerKafka: Topic = %s, LogContent = %s", + str_topic.c_str(), str.c_str()); + + return; +} + +std::unique_ptr<StructLogger> StructLoggerFactory(TfeConfigParser &cfg) +{ + std::unique_ptr<StructLogger> struct_logger_ptr; + struct_logger_ptr = std::make_unique<StructLogger>(); + + /* 读Kafka后端组件参数,初始化Kafka后端组件 */ + std::string str_broker_addrs = cfg.GetValueWithDefault<std::string>("logger", "kafkaBrokers", ""); + const auto &str_topics = StructLoggerTopicRegister::topic_; + + /* 定义了Kafka Brokers的参数,启用Kafka组件 */ + if (str_broker_addrs != "") + { + /* 创建Kafka Backend句柄 */ + std::unique_ptr<LoggerKafka> kafka_logger; + kafka_logger = std::make_unique<LoggerKafka>(str_broker_addrs); + + /* 注册Topic */ + for (const std::string &str_topic : str_topics) + { + kafka_logger->RegisterTopic(str_topic); + } + + struct_logger_ptr->LoggerBackend(std::move(kafka_logger)); + } else + { + LOG(WARNING) << string_format("No kafka brokers found in config(file = %s), " + "kafka backend will be ignored.", cfg.Source().c_str()); + } + + return std::move(struct_logger_ptr); +}
\ No newline at end of file diff --git a/src/logger.h b/src/logger.h index 0c07af8..2b8fa3e 100644 --- a/src/logger.h +++ b/src/logger.h @@ -1,16 +1,66 @@ -// -// Created by luqiu on 2018-5-25. -// +/* + * \brief TFE结构化日志管理器 + * + * 结构化日志用于发送业务日志,采用Json将日志结构化,通过Kafka发送至后端日志服务器分析。 + * + * \author Lu Qiuwen<[email protected]> + * \date 2018-5-25 + */ -#ifndef TFE_LOGGER_H -#define TFE_LOGGER_H +#pragma once +#include <memory> +#include <jsoncpp/json/json.h> +#include "cfgparser.h" -class logger { +class StructLoggerBackend +{ +public: + virtual void WriteLog(const std::string &str_topic, const std::string &str) = 0; + virtual void RegisterTopic(const std::string &str_topic) = 0; +}; + +class StructLogger +{ +public: + explicit StructLogger() = default; + virtual ~StructLogger() = default; + + /* 日志发送标志符,通常为处理机管理口IP地址 */ + void IdentifyTag(std::string str_identify_tag) + { identify_tag_ = str_identify_tag; } + const std::string &IdentifyTag() + { return identify_tag_; } + + /* 后端注册 */ + void LoggerBackend(std::unique_ptr<StructLoggerBackend> backend_ptr) + { backends_.push_back(std::move(backend_ptr)); } + + /* 日志记录 */ + void Log(const std::string &topic, Json::Value &struct_log); +private: + std::string identify_tag_{}; + std::vector<std::unique_ptr<StructLoggerBackend>> backends_{}; }; +/* 工厂函数,根据配置文件创建Logger */ +std::unique_ptr<StructLogger> StructLoggerFactory(TfeConfigParser & cfg); + +class StructLoggerTopicRegister +{ +public: + explicit StructLoggerTopicRegister(const char * topic) noexcept + { topic_.push_back(std::string(topic)); } + + ~StructLoggerTopicRegister() = default; + +private: + friend std::unique_ptr<StructLogger> StructLoggerFactory(TfeConfigParser & cfg); + static std::vector<std::string> topic_; +}; +#define STRUCT_LOGGER_TOPIC_REGISTER(x) \ +static StructLoggerTopicRegister ___struct_logger_topic_##x{#x}; -#endif //TFE_LOGGER_H diff --git a/src/main.cc b/src/main.cc index 9f4a510..65486af 100644 --- a/src/main.cc +++ b/src/main.cc @@ -33,7 +33,6 @@ #include "nat.h" #include "cachemgr.h" #include "sys.h" -#include "log.h" #include "build.h" #include "defaults.h" @@ -61,7 +60,7 @@ static void main_version(void) /* * Note to package maintainers: If you break the version * string in your build, it will be impossible to provide - * proper upstream support to the users of the package, + * proper kUpStream support to the users of the package, * because it will be difficult or impossible to identify * the exact codebase that is being used by the user * reporting a bug. The version string is provided through @@ -28,7 +28,7 @@ #include "nat.h" -#include "log.h" +#include "compat.h" #include "attrib.h" #include <stdlib.h> diff --git a/src/opts.cc b/src/opts.cc index d0e2f43..0d95feb 100644 --- a/src/opts.cc +++ b/src/opts.cc @@ -29,7 +29,7 @@ #include "opts.h" #include "sys.h" -#include "log.h" +#include "compat.h" #include <string> #include <vector> diff --git a/src/privsep.cc b/src/privsep.cc index 794e2e2..da98251 100644 --- a/src/privsep.cc +++ b/src/privsep.cc @@ -30,7 +30,7 @@ #include "sys.h" #include "util.h" -#include "log.h" +#include "compat.h" #include "attrib.h" #include "defaults.h" @@ -896,7 +896,6 @@ privsep_fork(tfe_config *opts, int clisock[], size_t nclisock) * we still call the preinit's before forking in order to provide * better user feedback and less privsep complexity */ nat_preinit_undo(); - log_preinit_undo(); /* If the child exits before the parent installs the signal handler * here, we have a race condition; this is solved by the client diff --git a/src/proxy.cc b/src/proxy.cc index bdd0dff..93bd621 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -32,7 +32,7 @@ #include "pxyconn.h" #include "cachemgr.h" #include "opts.h" -#include "log.h" +#include "compat.h" #include "attrib.h" #include "util.h" @@ -215,6 +215,8 @@ proxy_signal_cb(evutil_socket_t fd, UNUSED short what, void *arg) case SIGHUP: proxy_loopbreak(ctx); break; + +#if 0 case SIGUSR1: if (log_reopen() == -1) { log_err_printf("Warning: Failed to reopen logs\n"); @@ -222,6 +224,8 @@ proxy_signal_cb(evutil_socket_t fd, UNUSED short what, void *arg) log_dbg_printf("Reopened log files\n"); } break; +#endif + case SIGPIPE: log_err_printf("Warning: Received SIGPIPE; ignoring.\n"); break; diff --git a/src/pxyconn.cc b/src/pxyconn.cc index e5346c7..6833865 100644 --- a/src/pxyconn.cc +++ b/src/pxyconn.cc @@ -36,7 +36,7 @@ #include "util.h" #include "base64.h" #include "url.h" -#include "log.h" +#include "compat.h" #include "attrib.h" #include <netinet/in.h> diff --git a/src/pxyconn.h b/src/pxyconn.h index 9a64593..ca09a24 100644 --- a/src/pxyconn.h +++ b/src/pxyconn.h @@ -29,7 +29,7 @@ #ifndef PXYCONN_H #define PXYCONN_H -#include "log.h" +#include "compat.h" #include "opts.h" #include "attrib.h" #include "pxythrmgr.h" @@ -104,9 +104,6 @@ typedef struct pxy_conn_ctx { char *origcrtfpr; char *usedcrtfpr; - /* content log context */ - log_content_ctx_t *logctx; - /* store fd and fd event while connected is 0 */ evutil_socket_t fd; struct event *ev; diff --git a/src/pxysslshut.cc b/src/pxysslshut.cc index ecde031..7763a8e 100644 --- a/src/pxysslshut.cc +++ b/src/pxysslshut.cc @@ -28,7 +28,7 @@ #include "pxysslshut.h" -#include "log.h" +#include "compat.h" #include "attrib.h" #include "util.h" diff --git a/src/pxythrmgr.cc b/src/pxythrmgr.cc index 25aff4d..39d2086 100644 --- a/src/pxythrmgr.cc +++ b/src/pxythrmgr.cc @@ -29,8 +29,6 @@ #include "pxythrmgr.h" #include "sys.h" -#include "log.h" - #include <string.h> #include <pthread.h> @@ -39,6 +37,8 @@ #include <thread> #include <mutex> +#include "compat.h" + /* * Proxy thread manager: manages the connection handling worker threads * and the per-thread resources (i.e. event bases). The load is shared @@ -28,7 +28,7 @@ #include "ssl.h" -#include "log.h" +#include "compat.h" #include "defaults.h" #include <sys/types.h> @@ -27,8 +27,6 @@ */ #include "sys.h" - -#include "log.h" #include "defaults.h" #include <sys/types.h> @@ -48,17 +46,10 @@ #include <stdio.h> #include <string.h> #include <errno.h> - -#ifndef _SC_NPROCESSORS_ONLN -#include <sys/sysctl.h> -#endif /* !_SC_NPROCESSORS_ONLN */ - -#if HAVE_DARWIN_LIBPROC -#include <libproc.h> -#endif - #include <event2/util.h> +#include "compat.h" + /* * Permanently drop from root privileges to an unprivileged user account. * Sets the real, effective and stored user and group ID and the list of |
