summaryrefslogtreecommitdiff
path: root/platform/src/tcp_stream.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'platform/src/tcp_stream.cpp')
-rw-r--r--platform/src/tcp_stream.cpp140
1 files changed, 42 insertions, 98 deletions
diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp
index 7df5883..1de3f79 100644
--- a/platform/src/tcp_stream.cpp
+++ b/platform/src/tcp_stream.cpp
@@ -91,6 +91,11 @@ static inline struct tfe_conn_private * __peer_conn(struct tfe_stream_private *
return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_upstream) : (_stream->conn_downstream));
}
+static inline struct tfe_conn_private *__steering_peer_conn(struct tfe_stream_private *_stream, enum tfe_conn_dir dir)
+{
+ return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_fake_c) : (_stream->conn_fake_s));
+}
+
static inline enum tfe_conn_dir __bev_dir(struct tfe_stream_private * _stream, struct bufferevent * bev)
{
if (_stream->conn_downstream && bev == _stream->conn_downstream->bev)
@@ -641,7 +646,6 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
enum tfe_conn_dir dir = __bev_dir(_stream, bev);
- struct tfe_conn_private * this_conn = NULL;
struct tfe_conn_private * peer_conn = NULL;
struct evbuffer * inbuf = NULL;
struct evbuffer * outbuf = NULL;
@@ -649,71 +653,12 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
if (_stream->is_decrypted_traffic_steering)
{
- if (bev == _stream->conn_downstream->bev)
- {
- peer_conn = _stream->conn_fake_c;
- }
- else if (bev == _stream->conn_upstream->bev)
- {
- peer_conn = _stream->conn_fake_s;
- }
- else
- {
- assert(0);
- }
-
- if (_stream->is_first_call_rxcb == 0)
- {
- TFE_PROXY_STAT_INCREASE(STAT_STREAM_INTERCEPT, 1);
- _stream->is_first_call_rxcb = 1;
- tfe_set_intercept_metric(&_stream->head, 1, 0, 0, 0, 0);
- }
-
- /*
- * Peer connection is terminated, drain all data.
- * This connection will be destoryed in __event_cb
- */
- inbuf = bufferevent_get_input(bev);
- inbuff_len = evbuffer_get_length(inbuf);
- if (peer_conn == NULL)
- {
- evbuffer_drain(inbuf, inbuff_len);
- return;
- }
-
- TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, %s send %d bytes from %s to %s",
- _stream->str_stream_addr,
- inbuff_len,
- bev == _stream->conn_downstream->bev ? "conn_downstream" : "conn_upstream",
- bev == _stream->conn_downstream->bev ? "conn_fake_c" : "conn_fake_s");
-
- outbuf = bufferevent_get_output(peer_conn->bev);
- evbuffer_add_buffer(outbuf, inbuf);
-
- if (bev == _stream->conn_downstream->bev)
- {
- TFE_PROXY_STAT_INCREASE(STAT_STEERING_CLIENT_TX_B, inbuff_len);
- // TODO: Delete the following code when support calling the tfe-plugin
- TFE_PROXY_STAT_INCREASE(STAT_STREAM_INCPT_DOWN_BYTES, inbuff_len);
- tfe_set_intercept_metric(&_stream->head, 0, 1, inbuff_len, 0, 0);
- _stream->downstream_rx_offset += inbuff_len;
- }
- else
- {
- TFE_PROXY_STAT_INCREASE(STAT_STEERING_SERVER_TX_B, inbuff_len);
- // TODO: Delete the following code when support calling the tfe-plugin
- TFE_PROXY_STAT_INCREASE(STAT_STREAM_INCPT_UP_BYTES, inbuff_len);
- tfe_set_intercept_metric(&_stream->head, 0, 0, 0, 1, inbuff_len);
- _stream->upstream_rx_offset += inbuff_len;
- }
-
- // TODO: Delete the following code when support calling the tfe-plugin
- TFE_PROXY_STAT_INCREASE(STAT_STREAM_INCPT_BYTES, inbuff_len);
- return;
+ peer_conn = __steering_peer_conn(_stream, dir);
+ }
+ else
+ {
+ peer_conn = __peer_conn(_stream, dir);
}
-
- this_conn = __this_conn(_stream, dir);
- peer_conn = __peer_conn(_stream, dir);
/* Peer connection is terminated, drain all data.
* This connection will be destoryed in __event_cb */
@@ -830,6 +775,23 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
/* Total Bytes */
TFE_PROXY_STAT_INCREASE(STAT_STREAM_INCPT_BYTES, rx_offset_increase);
+ if (_stream->is_decrypted_traffic_steering && action_final == ACTION_FORWARD_DATA)
+ {
+ TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, %s send %d bytes from %s to %s",
+ _stream->str_stream_addr, rx_offset_increase,
+ dir == CONN_DIR_DOWNSTREAM ? "conn_downstream" : "conn_upstream",
+ dir == CONN_DIR_DOWNSTREAM ? "conn_fake_c" : "conn_fake_s");
+
+ if (dir == CONN_DIR_DOWNSTREAM)
+ {
+ TFE_PROXY_STAT_INCREASE(STAT_STEERING_CLIENT_TX_B, rx_offset_increase);
+ }
+ else
+ {
+ TFE_PROXY_STAT_INCREASE(STAT_STEERING_SERVER_TX_B, rx_offset_increase);
+ }
+ }
+
if(_stream->need_to_be_kill)
{
const static struct linger sl{.l_onoff = 1, .l_linger = 0};
@@ -933,7 +895,6 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void *
const char * str_conn_dir = __str_dir(conn_dir);
enum tfe_conn_dir peer_conn_dir{};
size_t rx_offset = 0;
- int need_close_connection = 0;
if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM)
{
@@ -1003,23 +964,6 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void *
__stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FD_EOF, conn_dir, 0, NULL);
}
- need_close_connection = 1;
- }
-
- if (_stream->is_decrypted_traffic_steering)
- {
- TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, %s %s run eventcb, %s %s",
- _stream->str_stream_addr,
- bev == _stream->conn_downstream->bev ? "conn_downstream" : "conn_upstream",
- bev_event_to_string(events),
- errno_to_string(errno)
- );
- tfe_stream_destory(_stream);
- return;
- }
-
- if (need_close_connection)
- {
goto __close_connection;
}
@@ -1189,15 +1133,14 @@ static void __steering_stream_bev_writecb(struct bufferevent * bev, void * arg)
static void __steering_stream_bev_eventcb(struct bufferevent *bev, short events, void *arg)
{
struct tfe_stream_private *_stream = (struct tfe_stream_private *)arg;
+
+ TFE_LOG_ERROR(__STREAM_LOGGER(_stream), "decrypted traffic steering, %s %s run eventcb, %s %s",
+ _stream->str_stream_addr,
+ bev == _stream->conn_fake_c->bev ? "conn_fake_c" : "conn_fake_s",
+ bev_event_to_string(events),
+ errno_to_string(errno));
- TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, %s %s run eventcb, %s %s",
- _stream->str_stream_addr,
- bev == _stream->conn_fake_c->bev ? "conn_fake_c" : "conn_fake_s",
- bev_event_to_string(events),
- errno_to_string(errno)
- );
-
- enum tfe_conn_dir conn_dir = (bev == _stream->conn_fake_c->bev) ? CONN_DIR_UPSTREAM : CONN_DIR_DOWNSTREAM;
+ enum tfe_conn_dir conn_dir = (bev == _stream->conn_fake_c->bev) ? CONN_DIR_DOWNSTREAM : CONN_DIR_UPSTREAM;
if (events & BEV_EVENT_ERROR)
{
__stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FACKFD_ERROR, conn_dir, errno, errno_to_string(errno));
@@ -1206,6 +1149,8 @@ static void __steering_stream_bev_eventcb(struct bufferevent *bev, short events,
{
__stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FACKFD_EOF, conn_dir, errno, errno_to_string(errno));
}
+
+ call_plugin_close(_stream);
tfe_stream_destory(_stream);
}
@@ -1389,7 +1334,7 @@ void __stream_access_log_write(struct tfe_stream_private * stream)
break;
case EVENT_LOG_CLOSE_BY_FACKFD_EOF: /* FALLTHROUGH */
case EVENT_LOG_CLOSE_BY_FACKFD_ERROR: /* FALLTHROUGH */
- str_dir = ev_log->dir == CONN_DIR_DOWNSTREAM ? "SERVER" : "CLIENT";
+ str_dir = ev_log->dir == CONN_DIR_DOWNSTREAM ? "CLIENT" : "SERVER";
break;
default:
str_dir = "";
@@ -1421,11 +1366,11 @@ void __ev_log_to_stat_map_init()
ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_SSL_ERROR][CONN_DIR_DOWNSTREAM] = STAT_STREAM_CLS_DOWN_ERR;
ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_SSL_ERROR][CONN_DIR_UPSTREAM] = STAT_STREAM_CLS_UP_ERR;
- ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FACKFD_EOF][CONN_DIR_DOWNSTREAM] = STAT_STEERING_SERVER_EOF;
- ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FACKFD_EOF][CONN_DIR_UPSTREAM] = STAT_STEERING_CLIENT_EOF;
-
- ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FACKFD_ERROR][CONN_DIR_DOWNSTREAM] = STAT_STEERING_SERVER_ERR;
- ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FACKFD_ERROR][CONN_DIR_UPSTREAM] = STAT_STEERING_CLIENT_ERR;
+ ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FACKFD_EOF][CONN_DIR_DOWNSTREAM] = STAT_STEERING_CLIENT_EOF;
+ ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FACKFD_EOF][CONN_DIR_UPSTREAM] = STAT_STEERING_SERVER_EOF;
+
+ ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FACKFD_ERROR][CONN_DIR_DOWNSTREAM] = STAT_STEERING_CLIENT_ERR;
+ ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FACKFD_ERROR][CONN_DIR_UPSTREAM] = STAT_STEERING_SERVER_ERR;
}
void __stream_close_stat(struct tfe_stream_private * stream)
@@ -1886,7 +1831,6 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst
TFE_PROXY_STAT_INCREASE(STAT_STREAM_TCP_SSL, 1);
}
-
return 0;
__errout: