From 1e71122521ec8bdb327d7e739b6870d4ea2c1e54 Mon Sep 17 00:00:00 2001 From: luwenpeng Date: Thu, 24 Oct 2024 10:24:20 +0800 Subject: feature: session mananger publish ctrl message; Enhance session debugger module --- decoders/lpi_plus/lpip_module.c | 2 +- include/stellar/packet.h | 2 +- include/stellar/session.h | 4 +- infra/packet_manager/packet_utils.c | 2 +- infra/session_manager/session_internal.h | 2 +- infra/session_manager/session_manager.c | 50 ++++++- infra/session_manager/session_manager_runtime.c | 1 + infra/version.map | 2 +- test/session_debugger/session_debugger.c | 183 ++++++++++++++++++++---- 9 files changed, 204 insertions(+), 44 deletions(-) diff --git a/decoders/lpi_plus/lpip_module.c b/decoders/lpi_plus/lpip_module.c index 9286a2e..d68190b 100644 --- a/decoders/lpi_plus/lpip_module.c +++ b/decoders/lpi_plus/lpip_module.c @@ -300,7 +300,7 @@ static void lpi_plus_on_session(struct session *sess, struct packet *pkt, void * if(exdata->ctx.detected_pkt_cnt>=env->max_pkts)return; uint16_t payload_len=packet_get_payload_len(pkt); - const char *payload=packet_get_payload(pkt); + const char *payload=packet_get_payload_data(pkt); if (payload!=NULL && payload_len>0)//detect packet with payload only { lpi_plus_context_update(sess, &exdata->ctx, payload, payload_len); diff --git a/include/stellar/packet.h b/include/stellar/packet.h index e910237..2e4b78e 100644 --- a/include/stellar/packet.h +++ b/include/stellar/packet.h @@ -182,7 +182,7 @@ const struct timeval *packet_get_timeval(const struct packet *pkt); const char *packet_get_raw_data(const struct packet *pkt); uint16_t packet_get_raw_len(const struct packet *pkt); -const char *packet_get_payload(const struct packet *pkt); +const char *packet_get_payload_data(const struct packet *pkt); uint16_t packet_get_payload_len(const struct packet *pkt); void packet_set_exdata(struct packet *pkt, int idx, void *ex_ptr); diff --git a/include/stellar/session.h b/include/stellar/session.h index 67c8c98..6e65a8e 100644 --- a/include/stellar/session.h +++ b/include/stellar/session.h @@ -71,8 +71,8 @@ enum session_stat STAT_INJECTED_BYTES_SUCCESS, // control packet - STAT_CONTROL_PACKETS_RECEIVED, // TODO - STAT_CONTROL_BYTES_RECEIVED, // TODO + STAT_CONTROL_PACKETS_RECEIVED, + STAT_CONTROL_BYTES_RECEIVED, STAT_CONTROL_PACKETS_TRANSMITTED, STAT_CONTROL_BYTES_TRANSMITTED, diff --git a/infra/packet_manager/packet_utils.c b/infra/packet_manager/packet_utils.c index 40c1b03..0240b78 100644 --- a/infra/packet_manager/packet_utils.c +++ b/infra/packet_manager/packet_utils.c @@ -850,7 +850,7 @@ uint16_t packet_get_raw_len(const struct packet *pkt) return pkt->data_len; } -const char *packet_get_payload(const struct packet *pkt) +const char *packet_get_payload_data(const struct packet *pkt) { if (pkt == NULL || pkt->layers_used == 0) { diff --git a/infra/session_manager/session_internal.h b/infra/session_manager/session_internal.h index b22e484..b117ba0 100644 --- a/infra/session_manager/session_internal.h +++ b/infra/session_manager/session_internal.h @@ -54,7 +54,7 @@ struct session UT_hash_handle hh1; UT_hash_handle hh2; UT_hash_handle hh3; - struct tuple6 tuple; + struct tuple6 tuple; // FLOW_TYPE_C2S tuple char tuple_str[TUPLE6_STR_SIZE]; struct sids sids[MAX_FLOW_TYPE]; struct route_ctx route_ctx[MAX_FLOW_TYPE]; diff --git a/infra/session_manager/session_manager.c b/infra/session_manager/session_manager.c index 64ebbc1..8469622 100644 --- a/infra/session_manager/session_manager.c +++ b/infra/session_manager/session_manager.c @@ -106,10 +106,16 @@ static void on_packet_forward(enum packet_stage stage, struct packet *pkt, void */ uint64_t now_ms = clock_get_real_time_ms(); + struct tuple6 key; struct tcp_segment *seg = NULL; struct session *sess = session_manager_runtime_lookup_session_by_packet(sess_mgr_rt, pkt); if (sess == NULL) { + if (packet_is_ctrl(pkt)) + { + goto fast_path; + } + sess = session_manager_runtime_new_session(sess_mgr_rt, pkt, now_ms); if (sess == NULL) { @@ -123,6 +129,11 @@ static void on_packet_forward(enum packet_stage stage, struct packet *pkt, void } else { + if (packet_is_ctrl(pkt)) + { + goto ctrl_path; + } + if (session_manager_runtime_update_session(sess_mgr_rt, sess, pkt, now_ms) == -1) { goto fast_path; @@ -133,6 +144,21 @@ static void on_packet_forward(enum packet_stage stage, struct packet *pkt, void } } +ctrl_path: + session_set_current_packet(sess, pkt); + packet_get_innermost_tuple6(pkt, &key); + if (tuple6_cmp(session_get_tuple6(sess), &key) == 0) + { + session_set_flow_type(sess, FLOW_TYPE_C2S); + } + else + { + session_set_flow_type(sess, FLOW_TYPE_S2C); + } + packet_set_exdata(pkt, sess_mgr->schema->pkt_exdata_idx, sess); + mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_ctrl_pkt, sess); + return; + slow_path: if (session_get_type(sess) == SESSION_TYPE_TCP) { @@ -146,11 +172,12 @@ slow_path: { mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_udp, sess); } - packet_set_exdata(pkt, sess_mgr->schema->pkt_exdata_idx, sess); + return; fast_path: packet_set_exdata(pkt, sess_mgr->schema->pkt_exdata_idx, NULL); + return; } static void on_packet_output(enum packet_stage stage, struct packet *pkt, void *args) @@ -163,18 +190,29 @@ static void on_packet_output(enum packet_stage stage, struct packet *pkt, void * struct session *sess = (struct session *)packet_get_exdata(pkt, sess_mgr->schema->pkt_exdata_idx); if (sess) { - enum flow_type type = session_get_flow_type(sess); + struct tuple6 key; + enum flow_type flow = FLOW_TYPE_NONE; + packet_get_innermost_tuple6(pkt, &key); + if (tuple6_cmp(session_get_tuple6(sess), &key) == 0) + { + flow = FLOW_TYPE_C2S; + } + else + { + flow = FLOW_TYPE_S2C; + } + int is_ctrl = packet_is_ctrl(pkt); uint16_t len = packet_get_raw_len(pkt); switch (packet_get_action(pkt)) { case PACKET_ACTION_DROP: - session_inc_stat(sess, type, (is_ctrl ? STAT_CONTROL_PACKETS_DROPPED : STAT_RAW_PACKETS_DROPPED), 1); - session_inc_stat(sess, type, (is_ctrl ? STAT_CONTROL_BYTES_DROPPED : STAT_RAW_BYTES_DROPPED), len); + session_inc_stat(sess, flow, (is_ctrl ? STAT_CONTROL_PACKETS_DROPPED : STAT_RAW_PACKETS_DROPPED), 1); + session_inc_stat(sess, flow, (is_ctrl ? STAT_CONTROL_BYTES_DROPPED : STAT_RAW_BYTES_DROPPED), len); break; case PACKET_ACTION_FORWARD: - session_inc_stat(sess, type, (is_ctrl ? STAT_CONTROL_PACKETS_TRANSMITTED : STAT_RAW_PACKETS_TRANSMITTED), 1); - session_inc_stat(sess, type, (is_ctrl ? STAT_CONTROL_BYTES_TRANSMITTED : STAT_RAW_BYTES_TRANSMITTED), len); + session_inc_stat(sess, flow, (is_ctrl ? STAT_CONTROL_PACKETS_TRANSMITTED : STAT_RAW_PACKETS_TRANSMITTED), 1); + session_inc_stat(sess, flow, (is_ctrl ? STAT_CONTROL_BYTES_TRANSMITTED : STAT_RAW_BYTES_TRANSMITTED), len); break; default: assert(0); diff --git a/infra/session_manager/session_manager_runtime.c b/infra/session_manager/session_manager_runtime.c index 1b06461..9c4f123 100644 --- a/infra/session_manager/session_manager_runtime.c +++ b/infra/session_manager/session_manager_runtime.c @@ -1102,6 +1102,7 @@ void session_manager_runtime_free_session(struct session_manager_runtime *sess_m session_set_current_state(sess, SESSION_STATE_INIT); session_set_current_packet(sess, NULL); session_set_flow_type(sess, FLOW_TYPE_NONE); + session_init(sess); session_pool_push(sess_mgr_rt->sess_pool, sess); sess = NULL; } diff --git a/infra/version.map b/infra/version.map index 57eb3d9..4895dd0 100644 --- a/infra/version.map +++ b/infra/version.map @@ -12,7 +12,7 @@ global: packet_get_action; packet_get_raw_data; packet_get_raw_len; - packet_get_payload; + packet_get_payload_data; packet_get_payload_len; packet_build_tcp; packet_build_udp; diff --git a/test/session_debugger/session_debugger.c b/test/session_debugger/session_debugger.c index 0af4081..48c028a 100644 --- a/test/session_debugger/session_debugger.c +++ b/test/session_debugger/session_debugger.c @@ -30,20 +30,41 @@ struct session_debugger_exdata struct session_debugger *dbg; struct session *sess; - uint64_t c2s_rx_pkts; - uint64_t s2c_rx_pkts; + // data packet + uint64_t c2s_rx_data_pkts; + uint64_t s2c_rx_data_pkts; - uint64_t c2s_rx_bytes; - uint64_t s2c_rx_bytes; + uint64_t c2s_rx_data_bytes; + uint64_t s2c_rx_data_bytes; + // control packet + uint64_t c2s_rx_ctrl_pkts; + uint64_t s2c_rx_ctrl_pkts; + + uint64_t c2s_rx_ctrl_bytes; + uint64_t s2c_rx_ctrl_bytes; + + // TCP segment uint64_t c2s_rx_tcp_seg; uint64_t s2c_rx_tcp_seg; uint64_t c2s_rx_tcp_bytes; uint64_t s2c_rx_tcp_bytes; + // UDP payload + uint64_t c2s_rx_udp_payload; + uint64_t s2c_rx_udp_payload; + + uint64_t c2s_rx_udp_bytes; + uint64_t s2c_rx_udp_bytes; + + // hexdump TCP segment int c2s_tcp_seg_hexdump_fd; int s2c_tcp_seg_hexdump_fd; + + // hexdump UDP payload + int c2s_udp_payload_hexdump_fd; + int s2c_udp_payload_hexdump_fd; }; static void session_debugger_log(int fd, const char *fmt, ...) @@ -88,14 +109,25 @@ static struct session_debugger_exdata *session_debugger_exdata_new(struct sessio if (session_get_type(sess) == SESSION_TYPE_TCP) { memset(buff, 0, sizeof(buff)); - sprintf(buff, "./log/session_debugger.%s_c2s.hexdump", session_get0_readable_addr(sess)); + sprintf(buff, "./log/session_debugger.TCP_%s_C2S.hexdump", session_get0_readable_addr(sess)); exdata->c2s_tcp_seg_hexdump_fd = open(buff, O_WRONLY | O_APPEND | O_CREAT, 0644); memset(buff, 0, sizeof(buff)); - sprintf(buff, "./log/session_debugger.%s_s2c.hexdump", session_get0_readable_addr(sess)); + sprintf(buff, "./log/session_debugger.TCP_%s_S2C.hexdump", session_get0_readable_addr(sess)); exdata->s2c_tcp_seg_hexdump_fd = open(buff, O_WRONLY | O_APPEND | O_CREAT, 0644); } + if (session_get_type(sess) == SESSION_TYPE_UDP) + { + memset(buff, 0, sizeof(buff)); + sprintf(buff, "./log/session_debugger.UDP_%s_C2S.hexdump", session_get0_readable_addr(sess)); + exdata->c2s_udp_payload_hexdump_fd = open(buff, O_WRONLY | O_APPEND | O_CREAT, 0644); + + memset(buff, 0, sizeof(buff)); + sprintf(buff, "./log/session_debugger.UDP_%s_S2C.hexdump", session_get0_readable_addr(sess)); + exdata->s2c_udp_payload_hexdump_fd = open(buff, O_WRONLY | O_APPEND | O_CREAT, 0644); + } + memset(buff, 0, sizeof(buff)); session_to_str(sess, 1, buff, sizeof(buff) - 1); session_debugger_log(dbg->fd, "sess new: %s", buff); @@ -115,6 +147,14 @@ static void session_debugger_exdata_free(struct session_debugger_exdata *exdata) { close(exdata->s2c_tcp_seg_hexdump_fd); } + if (exdata->c2s_udp_payload_hexdump_fd > 0) + { + close(exdata->c2s_udp_payload_hexdump_fd); + } + if (exdata->s2c_udp_payload_hexdump_fd > 0) + { + close(exdata->s2c_udp_payload_hexdump_fd); + } free(exdata); } @@ -128,7 +168,7 @@ static void session_debugger_exdata_free_callback(int idx, void *ex_ptr, void *a session_debugger_exdata_free((struct session_debugger_exdata *)ex_ptr); } -static void on_sess_free(struct session *sess, void *arg) +static void on_session_free(struct session *sess, void *arg) { struct session_debugger *dbg = (struct session_debugger *)arg; struct session_debugger_exdata *exdata = (struct session_debugger_exdata *)session_get_exdata(sess, dbg->sess_exdata_idx); @@ -136,23 +176,39 @@ static void on_sess_free(struct session *sess, void *arg) char buff[PATH_MAX] = {0}; session_to_str(exdata->sess, 0, buff, sizeof(buff) - 1); session_debugger_log(exdata->dbg->fd, "sess free: %s", buff); - session_debugger_log(exdata->dbg->fd, "session %lu %s stat:\n" - "C2S rx packets: %6lu, C2S rx bytes: %6lu\n" - "S2C rx packets: %6lu, S2C rx bytes: %6lu\n" - "C2S rx TCP segments: %6lu, C2S rx TCP bytes: %6lu\n" - "S2C rx TCP segments: %6lu, S2C rx TCP bytes: %6lu\n", - session_get_id(exdata->sess), session_get0_readable_addr(exdata->sess), - exdata->c2s_rx_pkts, exdata->c2s_rx_bytes, - exdata->s2c_rx_pkts, exdata->s2c_rx_bytes, - exdata->c2s_rx_tcp_seg, exdata->c2s_rx_tcp_bytes, - exdata->s2c_rx_tcp_seg, exdata->s2c_rx_tcp_bytes); + + snprintf(buff, sizeof(buff), + "==========================================================\n" + "C2S Data Packets : %6lu | C2S Data Bytes : %6lu\n" + "S2C Data Packets : %6lu | S2C Data Bytes : %6lu\n" + "----------------------------------------------------------\n" + "C2S Control Packets : %6lu | C2S Control Bytes : %6lu\n" + "S2C Control Packets : %6lu | S2C Control Bytes : %6lu\n" + "----------------------------------------------------------\n" + "C2S TCP Segments : %6lu | C2S TCP Bytes : %6lu\n" + "S2C TCP Segments : %6lu | S2C TCP Bytes : %6lu\n" + "----------------------------------------------------------\n" + "C2S UDP Payload : %6lu | C2S UDP Bytes : %6lu\n" + "S2C UDP Payload : %6lu | S2C UDP Bytes : %6lu\n", + exdata->c2s_rx_data_pkts, exdata->c2s_rx_data_bytes, + exdata->s2c_rx_data_pkts, exdata->s2c_rx_data_bytes, + exdata->c2s_rx_ctrl_pkts, exdata->c2s_rx_ctrl_bytes, + exdata->s2c_rx_ctrl_pkts, exdata->s2c_rx_ctrl_bytes, + exdata->c2s_rx_tcp_seg, exdata->c2s_rx_tcp_bytes, + exdata->s2c_rx_tcp_seg, exdata->s2c_rx_tcp_bytes, + exdata->c2s_rx_udp_payload, exdata->c2s_rx_udp_bytes, + exdata->s2c_rx_udp_payload, exdata->s2c_rx_udp_bytes); + session_debugger_log(exdata->dbg->fd, "session %lu %s statistics:\n%s", session_get_id(exdata->sess), session_get0_readable_addr(exdata->sess), buff); } -static void on_sess_packet(struct session *sess, struct packet *pkt, void *arg) +static void on_session_packet(struct session *sess, struct packet *pkt, void *arg) { struct session_debugger *dbg = (struct session_debugger *)arg; + int is_ctrl = packet_is_ctrl(pkt); char buff[PATH_MAX]; + enum flow_type flow = session_get_flow_type(sess); + assert(flow == FLOW_TYPE_C2S || flow == FLOW_TYPE_S2C); struct session_debugger_exdata *exdata = (struct session_debugger_exdata *)session_get_exdata(sess, dbg->sess_exdata_idx); if (exdata == NULL) { @@ -160,24 +216,40 @@ static void on_sess_packet(struct session *sess, struct packet *pkt, void *arg) session_set_exdata(sess, dbg->sess_exdata_idx, exdata); } - if (session_get_flow_type(sess) == FLOW_TYPE_C2S) + if (flow == FLOW_TYPE_C2S) { - exdata->c2s_rx_pkts++; - exdata->c2s_rx_bytes += packet_get_raw_len(pkt); + if (is_ctrl) + { + exdata->c2s_rx_ctrl_pkts++; + exdata->c2s_rx_ctrl_bytes += packet_get_raw_len(pkt); + } + else + { + exdata->c2s_rx_data_pkts++; + exdata->c2s_rx_data_bytes += packet_get_raw_len(pkt); + } } else { - exdata->s2c_rx_pkts++; - exdata->s2c_rx_bytes += packet_get_raw_len(pkt); + if (is_ctrl) + { + exdata->s2c_rx_ctrl_pkts++; + exdata->s2c_rx_ctrl_bytes += packet_get_raw_len(pkt); + } + else + { + exdata->s2c_rx_data_pkts++; + exdata->s2c_rx_data_bytes += packet_get_raw_len(pkt); + } } memset(buff, 0, sizeof(buff)); session_to_str(sess, 1, buff, sizeof(buff) - 1); - session_debugger_log(dbg->fd, "on %s msg: %s", session_type_to_str(session_get_type(sess)), buff); + session_debugger_log(dbg->fd, "on %s %s packet: %s", session_type_to_str(session_get_type(sess)), (is_ctrl ? "ctrl" : "data"), buff); memset(buff, 0, sizeof(buff)); packet_dump_str(pkt, buff, sizeof(buff) - 1); - session_debugger_log(dbg->fd, "rx %s packet\n%s", session_type_to_str(session_get_type(sess)), buff); + session_debugger_log(dbg->fd, "rx %s %s packet\n%s", session_type_to_str(session_get_type(sess)), (is_ctrl ? "ctrl" : "data"), buff); pthread_spin_lock(&dbg->lock); packet_dump_hex(pkt, dbg->fd); @@ -189,14 +261,17 @@ static void on_tcp_stream(struct session *sess, const char *tcp_payload, uint32_ struct session_debugger *dbg = (struct session_debugger *)arg; char buff[PATH_MAX]; + enum flow_type flow = session_get_flow_type(sess); + assert(flow == FLOW_TYPE_C2S || flow == FLOW_TYPE_S2C); struct session_debugger_exdata *exdata = (struct session_debugger_exdata *)session_get_exdata(sess, dbg->sess_exdata_idx); + assert(exdata); memset(buff, 0, sizeof(buff)); session_to_str(sess, 1, buff, sizeof(buff) - 1); - session_debugger_log(dbg->fd, "on TCP stream msg: %s", buff); + session_debugger_log(dbg->fd, "on TCP stream: %s", buff); pthread_spin_lock(&dbg->lock); - if (session_get_flow_type(sess) == FLOW_TYPE_C2S) + if (flow == FLOW_TYPE_C2S) { session_debugger_log(dbg->fd, "rx C2S TCP segment: len: %d, data: %p", tcp_payload_len, tcp_payload); hexdump_to_fd(dbg->fd, exdata->c2s_rx_tcp_bytes, tcp_payload, tcp_payload_len); @@ -217,6 +292,47 @@ static void on_tcp_stream(struct session *sess, const char *tcp_payload, uint32_ pthread_spin_unlock(&dbg->lock); } +static void on_udp_payload(struct session *sess, struct packet *pkt, void *arg) +{ + struct session_debugger *dbg = (struct session_debugger *)arg; + + char buff[PATH_MAX]; + enum flow_type flow = session_get_flow_type(sess); + assert(flow == FLOW_TYPE_C2S || flow == FLOW_TYPE_S2C); + struct session_debugger_exdata *exdata = (struct session_debugger_exdata *)session_get_exdata(sess, dbg->sess_exdata_idx); + assert(exdata); + + const char *udp_payload = packet_get_payload_data(pkt); + uint32_t udp_payload_len = packet_get_payload_len(pkt); + if (udp_payload_len == 0) + { + return; + } + + memset(buff, 0, sizeof(buff)); + session_to_str(sess, 1, buff, sizeof(buff) - 1); + session_debugger_log(dbg->fd, "on UDP payload: %s", buff); + + pthread_spin_lock(&dbg->lock); + if (flow == FLOW_TYPE_C2S) + { + session_debugger_log(dbg->fd, "rx C2S UDP payload: len: %d, data: %p", udp_payload_len, udp_payload); + hexdump_to_fd(dbg->fd, exdata->c2s_rx_udp_bytes, udp_payload, udp_payload_len); + hexdump_to_fd(exdata->c2s_udp_payload_hexdump_fd, exdata->c2s_rx_udp_bytes, udp_payload, udp_payload_len); + exdata->c2s_rx_udp_payload++; + exdata->c2s_rx_udp_bytes += udp_payload_len; + } + else + { + session_debugger_log(dbg->fd, "rx S2C UDP payload: len: %d, data: %p", udp_payload_len, udp_payload); + hexdump_to_fd(dbg->fd, exdata->s2c_rx_udp_bytes, udp_payload, udp_payload_len); + hexdump_to_fd(exdata->s2c_udp_payload_hexdump_fd, exdata->s2c_rx_udp_bytes, udp_payload, udp_payload_len); + exdata->s2c_rx_udp_payload++; + exdata->s2c_rx_udp_bytes += udp_payload_len; + } + pthread_spin_unlock(&dbg->lock); +} + static void session_debugger_free(struct session_debugger *dbg) { if (dbg) @@ -257,22 +373,22 @@ static struct session_debugger *session_debugger_new(struct session_manager *ses goto error_out; } - if (session_manager_subscribe_free(sess_mgr, on_sess_free, dbg) == -1) + if (session_manager_subscribe_free(sess_mgr, on_session_free, dbg) == -1) { session_debugger_log(STDERR_FILENO, "subscribe free failed\n"); goto error_out; } - if (session_manager_subscribe_tcp(sess_mgr, on_sess_packet, dbg) == -1) + if (session_manager_subscribe_tcp(sess_mgr, on_session_packet, dbg) == -1) { session_debugger_log(STDERR_FILENO, "subscribe tcp failed\n"); goto error_out; } - if (session_manager_subscribe_udp(sess_mgr, on_sess_packet, dbg) == -1) + if (session_manager_subscribe_udp(sess_mgr, on_session_packet, dbg) == -1) { session_debugger_log(STDERR_FILENO, "subscribe udp failed\n"); goto error_out; } - if (session_manager_subscribe_control_packet(sess_mgr, on_sess_packet, dbg) == -1) + if (session_manager_subscribe_control_packet(sess_mgr, on_session_packet, dbg) == -1) { session_debugger_log(STDERR_FILENO, "subscribe control packet failed\n"); goto error_out; @@ -282,6 +398,11 @@ static struct session_debugger *session_debugger_new(struct session_manager *ses session_debugger_log(STDERR_FILENO, "subscribe tcp stream failed\n"); goto error_out; } + if (session_manager_subscribe_udp(sess_mgr, on_udp_payload, dbg) == -1) + { + session_debugger_log(STDERR_FILENO, "subscribe udp failed\n"); + goto error_out; + } return dbg; -- cgit v1.2.3