#include #include #include #include #include #include #include #include #include #include #include #include struct dev_node_elem { TAILQ_ENTRY(dev_node_elem) entries; rte_node_t nid; struct dev_node_ctx ctx; }; struct dev_node_main { struct sc_main * sc; TAILQ_HEAD(dev_node_elem_head, dev_node_elem) elems_head; }; struct df_stat_counters { volatile uint64_t df_last_tsc; volatile uint64_t df_sum_Sj; volatile uint64_t df_VB_pre; volatile uint64_t df_VB_post; volatile uint64_t df_VB_max; volatile uint64_t df_VB_min; }; struct phydev_stat_per_core { volatile uint64_t total_rx_pkts; volatile uint64_t total_tx_pkts; volatile uint64_t rx_pkts_per_batch; volatile uint64_t tx_pkts_per_batch; volatile uint32_t total_rx_drop_pkts; volatile uint32_t total_tx_drop_pkts; volatile uint32_t rx_zero_iterations; volatile uint32_t rx_non_zero_iterations; /* https://www.rfc-editor.org/rfc/rfc4445.html, Section 3.1 Delay Factor VB(i,pre) = sum (Sj) - MR * Ti; where j=1..i-1 VB(i,post) = VB(i,pre) + Si */ /* DF calculate */ struct df_stat_counters rx_df; struct df_stat_counters tx_df; /* meter */ volatile uint64_t tx_meter_red; volatile uint64_t tx_meter_yellow; volatile uint64_t tx_meter_green; } __rte_cache_aligned; static struct phydev_stat_per_core phydev_stat[RTE_MAX_LCORE]; static struct dev_node_main st_dev_node_main; static struct dev_node_main * p_dev_node_main = &st_dev_node_main; #define DF_MR_DEFAULT 50000 static_assert(sizeof(struct dev_node_ctx) <= RTE_NODE_CTX_SZ, "dev_node_ctx size must smaller than RTE_NODE_CTX_SZ"); int mbuf_object_deserialize_from_msgpack(struct sc_main * sc, struct rte_mbuf * mbuf, const char * buf, unsigned int len) { mpack_tree_t tree; mpack_tree_init_data(&tree, buf, len); mpack_tree_parse(&tree); /* unpack the packet */ mpack_node_t packet = mpack_node_map_cstr(mpack_tree_root(&tree), "packet"); rte_pktmbuf_append(mbuf, mpack_node_data_len(packet)); rte_memcpy(rte_pktmbuf_mtod(mbuf, char *), mpack_node_data(packet), mpack_node_data_len(packet)); /* unpack the rte buff info */ mbuf->packet_type = mpack_node_u32(mpack_node_map_cstr(mpack_tree_root(&tree), "packet_type")); mbuf->ol_flags = mpack_node_u64(mpack_node_map_cstr(mpack_tree_root(&tree), "ol_flags")); mbuf->data_len = mpack_node_u16(mpack_node_map_cstr(mpack_tree_root(&tree), "data_len")); mbuf->pkt_len = mpack_node_u32(mpack_node_map_cstr(mpack_tree_root(&tree), "pkt_len")); /* If metadata is enabled, the route metadata fields should be deserialized. */ if (sc->en_mpack_metadata) { /* unpack the mrb_metadata */ struct mrb_metadata * mrb_meta = mrbuf_cz_data(mbuf, MR_NODE_CTRLZONE_ID); mrb_meta->dir = mpack_node_u8(mpack_node_map_cstr(mpack_tree_root(&tree), "dir")); mrb_meta->packet_create_from_nf = mpack_node_u8( mpack_node_map_cstr(mpack_tree_root(&tree), "packet_create_from_nf")); mrb_meta->is_ctrlbuf = mpack_node_u8(mpack_node_map_cstr(mpack_tree_root(&tree), "is_ctrlbuf")); mrb_meta->adapter_type = mpack_node_u8(mpack_node_map_cstr(mpack_tree_root(&tree), "adapter_type")); mrb_meta->adapter_id = mpack_node_u16(mpack_node_map_cstr(mpack_tree_root(&tree), "adapter_id")); mrb_meta->payload_offset = mpack_node_u16(mpack_node_map_cstr(mpack_tree_root(&tree), "payload_offset")); mrb_meta->user_0 = mpack_node_u16(mpack_node_map_cstr(mpack_tree_root(&tree), "user_0")); mrb_meta->ef_link_id = mpack_node_u16(mpack_node_map_cstr(mpack_tree_root(&tree), "ef_link_id")); mrb_meta->traffic_link_id = mpack_node_u16(mpack_node_map_cstr(mpack_tree_root(&tree), "traffic_link_id")); mrb_meta->ef_peer_index = mpack_node_u16(mpack_node_map_cstr(mpack_tree_root(&tree), "ef_peer_index")); mrb_meta->port_ingress = mpack_node_u16(mpack_node_map_cstr(mpack_tree_root(&tree), "port_ingress")); mrb_meta->port_egress = mpack_node_u16(mpack_node_map_cstr(mpack_tree_root(&tree), "port_egress")); mrb_meta->session_id = mpack_node_u64(mpack_node_map_cstr(mpack_tree_root(&tree), "session_id")); /* unpack the current sid */ mrb_meta->cur_sid = mpack_node_u16(mpack_node_map_cstr(mpack_tree_root(&tree), "cur_sid")); /* unpack the sid list */ struct sid_list * sid_list = &mrb_meta->sid_list; mpack_node_map_cstr(mpack_tree_root(&tree), "sids"); for (unsigned int i = 0; i < RTE_DIM(sid_list->sids); i++) { sid_list->sids[i] = mpack_node_u16( mpack_node_array_at(mpack_node_map_cstr(mpack_tree_root(&tree), "sids"), i)); } /* unpack the head sid */ sid_list->head = mpack_node_u8(mpack_node_map_cstr(mpack_tree_root(&tree), "head")); /* unpack the tail sid */ sid_list->tail = mpack_node_u8(mpack_node_map_cstr(mpack_tree_root(&tree), "tail")); /* unpack the sid capacity */ sid_list->capacity = mpack_node_u8(mpack_node_map_cstr(mpack_tree_root(&tree), "capacity")); } /* If route context is enabled, the route context fields should be deserialized. */ if (sc->en_mpack_route_ctx) { char route_ctx[128] = {}; mpack_node_t route_ctx_node = mpack_node_map_cstr(mpack_tree_root(&tree), "route_ctx"); rte_memcpy(route_ctx, mpack_node_data(route_ctx_node), mpack_node_data_len(route_ctx_node)); buffer_metadata_set(mbuf, MR_BUFF_ROUTE_CTX, route_ctx, sizeof(route_ctx)); } mpack_tree_destroy(&tree); return 0; } int mbuf_object_serialize_as_msgpack(struct sc_main * sc, struct rte_mbuf * mbuf, char ** data, size_t * size) { /* init mpack writer */ mpack_writer_t writer; mpack_writer_init_growable(&writer, data, size); /* build the map */ mpack_build_map(&writer); /* pack the rte buff info */ mpack_write_cstr(&writer, "port"); mpack_write_u16(&writer, mbuf->port); mpack_write_cstr(&writer, "packet_type"); mpack_write_u32(&writer, mbuf->packet_type); mpack_write_cstr(&writer, "ol_flags"); mpack_write_u64(&writer, mbuf->ol_flags); mpack_write_cstr(&writer, "data_len"); mpack_write_u16(&writer, mbuf->data_len); mpack_write_cstr(&writer, "pkt_len"); mpack_write_u32(&writer, mbuf->pkt_len); if (sc->en_mpack_metadata) { /* pack the mrb_metadata */ struct mrb_metadata * mrb_meta = mrbuf_cz_data(mbuf, MR_NODE_CTRLZONE_ID); mpack_write_cstr(&writer, "dir"); mpack_write_u8(&writer, mrb_meta->dir); mpack_write_cstr(&writer, "packet_create_from_nf"); mpack_write_u8(&writer, mrb_meta->packet_create_from_nf); mpack_write_cstr(&writer, "is_ctrlbuf"); mpack_write_u8(&writer, mrb_meta->is_ctrlbuf); mpack_write_cstr(&writer, "adapter_type"); mpack_write_u8(&writer, mrb_meta->adapter_type); mpack_write_cstr(&writer, "adapter_id"); mpack_write_u16(&writer, mrb_meta->adapter_id); mpack_write_cstr(&writer, "payload_offset"); mpack_write_u16(&writer, mrb_meta->payload_offset); mpack_write_cstr(&writer, "user_0"); mpack_write_u16(&writer, mrb_meta->user_0); mpack_write_cstr(&writer, "ef_link_id"); mpack_write_u16(&writer, mrb_meta->ef_link_id); mpack_write_cstr(&writer, "traffic_link_id"); mpack_write_u16(&writer, mrb_meta->traffic_link_id); mpack_write_cstr(&writer, "ef_peer_index"); mpack_write_u16(&writer, mrb_meta->ef_peer_index); mpack_write_cstr(&writer, "port_ingress"); mpack_write_u16(&writer, mrb_meta->port_ingress); mpack_write_cstr(&writer, "port_egress"); mpack_write_u16(&writer, mrb_meta->port_egress); mpack_write_cstr(&writer, "session_id"); mpack_write_u64(&writer, mrb_meta->session_id); /* pack the current sid */ mpack_write_cstr(&writer, "cur_sid"); mpack_write_u16(&writer, mrb_meta->cur_sid); /* pack the sids list */ struct sid_list * sid_list = &mrb_meta->sid_list; mpack_write_cstr(&writer, "sids"); mpack_start_array(&writer, RTE_DIM(sid_list->sids)); for (unsigned int i = 0; i < RTE_DIM(sid_list->sids); i++) { mpack_write_u16(&writer, sid_list->sids[i]); } mpack_finish_array(&writer); /* pack the head sid */ mpack_write_cstr(&writer, "head"); mpack_write_u8(&writer, sid_list->head); /* pack the tail sid */ mpack_write_cstr(&writer, "tail"); mpack_write_u8(&writer, sid_list->tail); /* pack the sid capacity */ mpack_write_cstr(&writer, "capacity"); mpack_write_u8(&writer, sid_list->capacity); } if (sc->en_mpack_route_ctx) { char route_ctx[64] = {}; buffer_metadata_get(mbuf, MR_BUFF_ROUTE_CTX, route_ctx, sizeof(route_ctx)); mpack_write_cstr(&writer, "route_ctx"); mpack_write_bin(&writer, route_ctx, sizeof(route_ctx)); } /* pack the packet */ mpack_write_cstr(&writer, "packet"); mpack_write_bin(&writer, rte_pktmbuf_mtod(mbuf, const char *), rte_pktmbuf_data_len(mbuf)); /* finish the map */ mpack_complete_map(&writer); /* finish writing */ if (mpack_writer_destroy(&writer) != mpack_ok) { MR_ERROR("An error occurred encoding the data!\n"); return -1; } return 0; } static unsigned int mbufs_msgpack_encode(struct rte_mbuf ** mbufs, unsigned int nr_mbufs) { struct sc_main * sc = sc_main_get(); for (unsigned int i = 0; i < nr_mbufs; i++) { struct rte_mbuf * mbuf = mbufs[i]; /* serialize the mbuf as msgpack */ char * mpack_data; size_t mpack_size; mbuf_object_serialize_as_msgpack(sc, mbuf, &mpack_data, &mpack_size); /* reset the mbuf */ rte_pktmbuf_reset(mbuf); /* construct a fake ethernet header with fake addresses */ struct rte_ether_hdr * eth_hdr = (struct rte_ether_hdr *)rte_pktmbuf_append(mbuf, sizeof(struct rte_ether_hdr)); eth_hdr->src_addr = (struct rte_ether_addr){{0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF}}; eth_hdr->dst_addr = (struct rte_ether_addr){{0xFF, 0xEE, 0xDD, 0xCC, 0xBB, 0xAA}}; eth_hdr->ether_type = rte_cpu_to_be_16(0x4d5a); /* append the msgpack after the ethernet header */ char * mbuf_data = rte_pktmbuf_append(mbuf, mpack_size); if (unlikely(mbuf_data == NULL)) { /* set the mbuf to null and release it */ mbufs[i] = NULL; hook_rte_pktmbuf_free(mbuf); } rte_memcpy(mbuf_data, mpack_data, mpack_size); free(mpack_data); } unsigned int out_nr_mbufs = 0; for (unsigned int i = 0; i < nr_mbufs; i++) { if (mbufs[i] != NULL) { mbufs[out_nr_mbufs++] = mbufs[i]; } } return out_nr_mbufs; } static unsigned int mbufs_msgpack_decode(struct rte_mbuf ** mbufs, unsigned int nr_mbufs) { struct sc_main * sc = sc_main_get(); for (unsigned int i = 0; i < nr_mbufs; i++) { struct rte_mbuf * mbuf = mbufs[i]; rte_prefetch0(rte_pktmbuf_mtod(mbuf, void *)); /* check the ethernet header type */ struct rte_ether_hdr * eth_hdr = rte_pktmbuf_mtod(mbuf, struct rte_ether_hdr *); if (unlikely(eth_hdr->ether_type != rte_cpu_to_be_16(0x4d5a))) { /* set the mbuf to null and release it */ hook_rte_pktmbuf_free(mbuf); mbufs[i] = NULL; continue; } /* skip the ethernet header */ rte_pktmbuf_adj(mbuf, sizeof(struct rte_ether_hdr)); /* deserialize the mbuf from mpack */ char mbuf_content[RTE_ETHER_MAX_JUMBO_FRAME_LEN]; unsigned int mbuf_len = mbuf->data_len; rte_memcpy(mbuf_content, rte_pktmbuf_mtod(mbuf, char *), mbuf->data_len); uint16_t port = mbuf->port; rte_pktmbuf_reset(mbuf); mbuf->port = port; mbuf_object_deserialize_from_msgpack(sc, mbuf, mbuf_content, mbuf_len); struct mrb_metadata * mrb_meta = mrbuf_cz_data(mbuf, MR_NODE_CTRLZONE_ID); /* the ingress port should not use the mpack info */ mrb_meta->port_ingress = mbuf->port; } /* pack the mbuf array, skip the null pointers */ unsigned int out_nr_mbufs = 0; for (unsigned int i = 0; i < nr_mbufs; i++) { if (mbufs[i] != NULL) { mbufs[out_nr_mbufs++] = mbufs[i]; } } return out_nr_mbufs; } void df_calculate_packet_input(struct df_stat_counters * df_counters, unsigned int MR, struct rte_mbuf * mbufs[], unsigned int nr_mbufs) { /* now tsc */ uint64_t now_tsc = rte_get_timer_cycles(); /* total bytes */ uint64_t Sj = 0; for (unsigned int i = 0; i < nr_mbufs; i++) { Sj += mbufs[i]->pkt_len; } if (Sj == 0) { return; } /* calculate the delay factor */ uint64_t delta_tsc = now_tsc - df_counters->df_last_tsc; uint64_t delta_tsc_in_us = delta_tsc / (rte_get_timer_hz() / US_PER_S); uint64_t VB_pre = df_counters->df_VB_pre + Sj - MR * delta_tsc_in_us; uint64_t VB_post = VB_pre + Sj; /* update the counters */ df_counters->df_last_tsc = now_tsc; df_counters->df_sum_Sj += Sj; df_counters->df_VB_pre = VB_pre; df_counters->df_VB_post = VB_post; df_counters->df_VB_max = RTE_MAX(df_counters->df_VB_max, VB_post); df_counters->df_VB_min = RTE_MIN(df_counters->df_VB_min, VB_pre); } uint64_t df_calculate_get_result(struct df_stat_counters * df_counters, uint64_t MR) { uint64_t df = (df_counters->df_VB_max - df_counters->df_VB_min) / MR; /* clear the counters */ df_counters->df_VB_max = df_counters->df_VB_post; df_counters->df_VB_min = df_counters->df_VB_post; df_counters->df_VB_pre = 0; df_counters->df_VB_post = 0; df_counters->df_sum_Sj = 0; return df; } static __rte_always_inline uint16_t dpdk_msgpack_dev_rx_node_process(struct rte_graph * graph, struct rte_node * node, void ** objs, uint16_t cnt) { struct dev_node_ctx * ctx = (struct dev_node_ctx *)node->ctx; assert(ctx->dev_desc->role_type == MR_DEV_ROLE_NF_INTERFACE); unsigned int nr_mbufs = rte_eth_rx_burst(ctx->dev_desc->port_id, graph->id, (struct rte_mbuf **)node->objs, RTE_GRAPH_BURST_SIZE); if (nr_mbufs == 0) { return 0; } /* decode the mbufs from msgpack */ nr_mbufs = mbufs_msgpack_decode((struct rte_mbuf **)node->objs, nr_mbufs); node->idx = nr_mbufs; struct pkt_parser_result * parser_results[RTE_GRAPH_BURST_SIZE]; for (unsigned int i = 0; i < nr_mbufs; i++) { /* Parser Pkt */ struct pkt_parser pkt_parser; struct rte_mbuf * mbuf = (struct rte_mbuf *)node->objs[i]; struct mrb_metadata * mrb_meta = mrbuf_cz_data(mbuf, MR_NODE_CTRLZONE_ID); pkt_parser_init(&pkt_parser, &mrb_meta->pkt_parser_result, LAYER_TYPE_ALL, MR_PKT_PARSER_LAYERS_MAX); pkt_parser_exec(&pkt_parser, mbuf); parser_results[i] = &mrb_meta->pkt_parser_result; } /* for test */ struct sc_main * sc = p_dev_node_main->sc; distributer_calculate_from_parser_results(sc->dist_object, (struct rte_mbuf **)node->objs, parser_results, nr_mbufs); /* Msgpack does not currently support tracing. */ #if 0 for (unsigned int i = 0; i < nr_mbufs; i++) { struct rte_mbuf * mbuf = (struct rte_mbuf *)node->objs[i]; /* mark packets for trace*/ dp_trace_filter_exec(sc_main_get()->trace, mbuf, 0); /* Check if tracing is enabled for the current Mbuf */ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE))) { gen_store_trace_info_rx(node, mbuf, ctx->dev_desc, graph->id); } } #endif /* move to next node */ rte_node_next_stream_move(graph, node, 0); return nr_mbufs; } static __rte_always_inline uint16_t dpdk_dev_rx_node_process(struct rte_graph * graph, struct rte_node * node, void ** objs, uint16_t cnt) { struct dev_node_ctx * ctx = (struct dev_node_ctx *)node->ctx; struct sc_main * sc = p_dev_node_main->sc; assert(ctx != NULL && sc != NULL); RTE_SET_USED(objs); RTE_SET_USED(cnt); struct mr_dev_desc * dev_desc = ctx->dev_desc; struct mr_dev_desc_qid_map * qid_map = dev_desc->rx_qid_map; unsigned int core_id = rte_lcore_id(); struct phydev_stat_per_core * stat_per_core = &phydev_stat[graph->id]; /* check the core can do recv for this device or not */ if (qid_map->qid_enabled[core_id] == 0) { return 0; } unsigned int qid = qid_map->qid_map[core_id]; unsigned int nr_mbufs = rte_eth_rx_burst(dev_desc->port_id, qid, (struct rte_mbuf **)node->objs, RTE_GRAPH_BURST_SIZE); if (nr_mbufs == 0) { stat_per_core->rx_zero_iterations++; return 0; } else { stat_per_core->rx_non_zero_iterations++; stat_per_core->total_rx_pkts += nr_mbufs; stat_per_core->rx_pkts_per_batch = nr_mbufs; } #if MR_PHYDEV_ENABLE_DF_CALCULATE df_calculate_packet_input(&stat_per_core->rx_df, DF_MR_DEFAULT, (struct rte_mbuf **)node->objs, nr_mbufs); #endif /* hash calculate */ node->idx = nr_mbufs; /* prefetch the packet header */ rte_prefetch0(node->objs[0]); rte_prefetch0(rte_pktmbuf_mtod((struct rte_mbuf *)node->objs[0], void *)); rte_prefetch0(rte_pktmbuf_mtod_offset((struct rte_mbuf *)node->objs[0], void *, 64)); struct mrb_metadata * mrb_meta = mrbuf_cz_data(node->objs[0], MR_NODE_CTRLZONE_ID); rte_prefetch0((void *)(mrb_meta)); rte_prefetch0((void *)(((char *)mrb_meta) + 64)); struct pkt_parser_result * parser_results[RTE_GRAPH_BURST_SIZE]; /* shared ctx */ for (unsigned int i = 0, j = 1; i < node->idx; i++, j++) { if (j < node->idx) { rte_prefetch0(node->objs[j]); rte_prefetch0(rte_pktmbuf_mtod((struct rte_mbuf *)node->objs[j], void *)); rte_prefetch0(rte_pktmbuf_mtod_offset((struct rte_mbuf *)node->objs[j], void *, 64)); struct mrb_metadata * mrb_meta = mrbuf_cz_data(node->objs[j], MR_NODE_CTRLZONE_ID); rte_prefetch0((void *)(mrb_meta)); rte_prefetch0((void *)(((char *)mrb_meta) + 64)); } struct rte_mbuf * mbuf = (struct rte_mbuf *)node->objs[i]; struct mrb_metadata * mrb_meta = mrbuf_cz_data(mbuf, MR_NODE_CTRLZONE_ID); mrb_metadata_clear(mrb_meta); /* Init the sid list */ sid_list_init(&mrb_meta->sid_list); /* Set the port id */ mrb_meta->port_ingress = ctx->dev_desc->port_id; mrb_meta->port_egress = UINT16_MAX; /* Init the traffic link id */ mrb_meta->traffic_link_id = UINT16_MAX; /* Parser Pkt */ struct pkt_parser pkt_parser; pkt_parser_init(&pkt_parser, &mrb_meta->pkt_parser_result, LAYER_TYPE_ALL, MR_PKT_PARSER_LAYERS_MAX); pkt_parser_exec(&pkt_parser, mbuf); parser_results[i] = &mrb_meta->pkt_parser_result; #if 0 /* Check if tracing is enabled for the current Mbuf */ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE))) { gen_store_trace_info_rx(node, mbuf, dev_desc, qid); gen_store_trace_info_pkt_parser(node, mbuf); } if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TELEMETRY))) { gen_store_telemetry_info_rx(node, mbuf, dev_desc, qid); } #endif } /* for test */ distributer_calculate_from_parser_results(sc->dist_object, (struct rte_mbuf **)node->objs, parser_results, node->idx); #if 0 for (unsigned int i = 0; i < node->idx; i++) { struct rte_mbuf * mbuf = (struct rte_mbuf *)node->objs[i]; if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE))) { gen_store_trace_info_rte_mbuf(node, mbuf); } } #endif /* move to the next node */ rte_node_next_stream_move(graph, node, 0); return nr_mbufs; } static __rte_always_inline uint16_t dpdk_dev_rx_periodical_process(struct rte_graph * graph, struct rte_node * node, void ** objs, uint16_t cnt) { struct dev_node_ctx * ctx = (struct dev_node_ctx *)node->ctx; struct mr_dev_desc * dev_desc = ctx->dev_desc; struct mr_dev_desc_qid_map * qid_map = dev_desc->rx_qid_map; unsigned int core_id = rte_lcore_id(); if (qid_map->qid_enabled[core_id] == 0) { return 0; } unsigned int qid = qid_map->qid_map[core_id]; /* call eth_rx_burst at least every 100ms */ unsigned int tsc_in_ms = rte_get_timer_cycles() / rte_get_timer_hz() * 1000; if (tsc_in_ms - ctx->last_rxtx_time_in_ms >= 100) { rte_eth_rx_burst(dev_desc->port_id, qid, NULL, 0); ctx->last_rxtx_time_in_ms = tsc_in_ms; } return 0; } static __rte_always_inline uint16_t dpdk_msgpack_dev_tx_node_process(struct rte_graph * graph, struct rte_node * node, void ** objs, uint16_t cnt) { struct dev_node_ctx * ctx = (struct dev_node_ctx *)node->ctx; struct mr_dev_desc * dev_desc = ctx->dev_desc; assert(dev_desc->role_type == MR_DEV_ROLE_NF_INTERFACE); #if 0 /* Msgpack does not currently support tracing. */ for (unsigned int i = 0; i < cnt; i++) { struct rte_mbuf * mbuf = (struct rte_mbuf *)node->objs[i]; /* Check if tracing is enabled for the current Mbuf */ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE))) { gen_store_trace_info_tx(node, mbuf, dev_desc, graph->id); } dp_trace_record_write(sc_main_get()->trace, mbuf); } #endif /* as a nf interface, we need to encode the mbuf as mpack */ cnt = mbufs_msgpack_encode((struct rte_mbuf **)objs, cnt); unsigned int tx_nr_mbufs = rte_eth_tx_burst(dev_desc->port_id, graph->id, (struct rte_mbuf **)objs, cnt); if (unlikely(tx_nr_mbufs != cnt)) { /* redirect unsent packets to drop node */ rte_node_next_stream_move(graph, node, 1); } return tx_nr_mbufs; } #define MR_PHYDEV_RETRY_WHEN_TX_FAILED 1 static __rte_always_inline void do_tx_burst(struct rte_graph * graph, struct rte_node * node, struct mr_dev_desc * dev_desc, struct rte_mbuf ** mbufs_to_send, unsigned int nr_mbufs_to_send) { unsigned int tx_nr_mbufs = 0; while (tx_nr_mbufs < nr_mbufs_to_send) { struct rte_mbuf ** mbuf_ptr = mbufs_to_send + tx_nr_mbufs; unsigned int tx_cnt = nr_mbufs_to_send - tx_nr_mbufs; tx_nr_mbufs += rte_eth_tx_burst(dev_desc->port_id, graph->id, mbuf_ptr, tx_cnt); } } static __rte_always_inline uint16_t dpdk_dev_tx_node_process(struct rte_graph * graph, struct rte_node * node, void ** objs, uint16_t cnt) { struct dev_node_ctx * ctx = (struct dev_node_ctx *)node->ctx; struct mr_dev_desc * dev_desc = ctx->dev_desc; struct dpdk_dev * dpdk_dev_desc = dev_desc->dpdk_dev_desc; struct phydev_stat_per_core * stat_per_core = &phydev_stat[graph->id]; for (unsigned int i = 0; i < cnt; i++) { struct rte_mbuf * mbuf = (struct rte_mbuf *)objs[i]; /* Check if tracing is enabled for the current Mbuf */ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE))) { gen_store_trace_info_tx(node, mbuf, dev_desc, graph->id); // gen_store_trace_info_rte_mbuf(node, mbuf); } if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TELEMETRY))) { gen_store_telemetry_info_tx(node, mbuf, dev_desc, graph->id); } dp_trace_record_write(sc_main_get()->trace, mbuf, rte_lcore_id()); } /* do CF calculate */ #if MR_PHYDEV_ENABLE_DF_CALCULATE df_calculate_packet_input(&stat_per_core->tx_df, DF_MR_DEFAULT, (struct rte_mbuf **)objs, cnt); #endif uint16_t n_left_from = cnt; struct rte_mbuf ** pkts = (struct rte_mbuf **)objs; while (n_left_from > 0) { struct rte_mbuf * mbufs_to_send[RTE_GRAPH_BURST_SIZE]; unsigned int nr_mbufs_to_send = 0; struct rte_mbuf * mbufs_to_drop[RTE_GRAPH_BURST_SIZE]; unsigned int nr_mbufs_to_drop = 0; unsigned int counter_tx_meter_green = 0; unsigned int counter_tx_meter_yellow = 0; unsigned int counter_tx_meter_red = 0; uint16_t nr_curr_send = n_left_from > RTE_DIM(mbufs_to_send) ? RTE_DIM(mbufs_to_send) : n_left_from; n_left_from -= nr_curr_send; if (dpdk_dev_desc->en_tx_meter) { struct rte_meter_srtcm * meter = dpdk_dev_desc->tx_meter[graph->id]; struct rte_meter_srtcm_profile * profile = dpdk_dev_desc->tx_meter_profile[graph->id]; for (unsigned int i = 0; i < nr_curr_send; i++) { struct rte_mbuf * mbuf = (struct rte_mbuf *)pkts[i]; uint64_t tsc = rte_get_timer_cycles(); enum rte_color color = rte_meter_srtcm_color_blind_check(meter, profile, tsc, mbuf->pkt_len); switch (color) { case RTE_COLORS: case RTE_COLOR_GREEN: mbufs_to_send[nr_mbufs_to_send++] = mbuf; counter_tx_meter_green++; break; case RTE_COLOR_YELLOW: mbufs_to_send[nr_mbufs_to_send++] = mbuf; do_tx_burst(graph, node, dev_desc, mbufs_to_send, nr_mbufs_to_send); nr_mbufs_to_send = 0; rte_delay_us_block(dpdk_dev_desc->tx_meter_yellow_pkt_delay_us); counter_tx_meter_yellow++; break; case RTE_COLOR_RED: stat_per_core->tx_meter_red++; mbufs_to_drop[nr_mbufs_to_drop++] = mbuf; break; } } } else { rte_memcpy(mbufs_to_send, pkts, nr_curr_send * sizeof(void *)); nr_mbufs_to_send = nr_curr_send; } do_tx_burst(graph, node, dev_desc, mbufs_to_send, nr_mbufs_to_send); /* enqueue drop packet to next node */ if (nr_mbufs_to_drop > 0) { rte_node_enqueue(graph, node, 0, (void **)mbufs_to_drop, nr_mbufs_to_drop); } /* stat */ stat_per_core->tx_meter_red += counter_tx_meter_red; stat_per_core->tx_meter_yellow += counter_tx_meter_yellow; stat_per_core->tx_meter_green += counter_tx_meter_green; stat_per_core->total_tx_pkts += nr_mbufs_to_send; stat_per_core->tx_pkts_per_batch = nr_mbufs_to_send; stat_per_core->total_tx_drop_pkts += nr_mbufs_to_drop; pkts += nr_curr_send; } return cnt; } static __rte_always_inline uint16_t dpdk_dev_tx_periodical_process(struct rte_graph * graph, struct rte_node * node, void ** objs, uint16_t cnt) { struct dev_node_ctx * ctx = (struct dev_node_ctx *)node->ctx; struct mr_dev_desc * dev_desc = ctx->dev_desc; /* call the tx_burst at least every 100ms */ unsigned int tsc_in_ms = rte_get_timer_cycles() / rte_get_timer_hz() * 1000; if (tsc_in_ms - ctx->last_rxtx_time_in_ms >= 100) { rte_eth_tx_burst(dev_desc->port_id, graph->id, NULL, 0); ctx->last_rxtx_time_in_ms = tsc_in_ms; } return 0; } static int dev_node_init(const struct rte_graph * graph, struct rte_node * node) { struct dev_node_ctx * ctx = (struct dev_node_ctx *)node->ctx; struct dev_node_elem * elem_iter = NULL; TAILQ_FOREACH(elem_iter, &p_dev_node_main->elems_head, entries) { if (elem_iter->nid != node->id) continue; *ctx = (elem_iter->ctx); } return 0; } static struct rte_node_register dpdk_msgpack_dev_rx_node_base = { .process = dpdk_msgpack_dev_rx_node_process, .flags = RTE_NODE_SOURCE_F, .name = "dpdk_msgpack_dev_rx_node", .init = dev_node_init, .nb_edges = 2, .next_nodes = { [0] = "eth_ingress", [1] = "pkt_drop_trap", }, }; static struct rte_node_register dpdk_msgpack_dev_tx_node_base = { .process = dpdk_msgpack_dev_tx_node_process, .name = "dpdk_msgpack_dev_tx_node", .init = dev_node_init, .nb_edges = 1, .next_nodes = { [0] = "pkt_drop_trap", }, }; static struct rte_node_register dpdk_dev_rx_node_base = { .process = dpdk_dev_rx_node_process, .flags = RTE_NODE_SOURCE_F, .name = "dpdk_dev_rx_node", .init = dev_node_init, .nb_edges = 2, .next_nodes = { [0] = "eth_ingress", [1] = "pkt_drop_trap", }, }; static struct rte_node_register dpdk_dev_tx_node_base = { .process = dpdk_dev_tx_node_process, .name = "dpdk_dev_tx_node", .init = dev_node_init, .nb_edges = 1, .next_nodes = { [0] = "pkt_drop_trap", }, }; static struct rte_node_register dpdk_dev_rx_node_periodical = { .process = dpdk_dev_rx_periodical_process, .name = "dpdk_dev_rx_node_periodical", .init = dev_node_init, .flags = RTE_NODE_SOURCE_F, .nb_edges = 1, .next_nodes = { [0] = "pkt_drop_trap", }, }; static struct rte_node_register dpdk_dev_tx_node_periodical = { .process = dpdk_dev_tx_periodical_process, .name = "dpdk_dev_tx_node_periodical", .flags = RTE_NODE_SOURCE_F, .init = dev_node_init, .nb_edges = 1, .next_nodes = { [0] = "pkt_drop_trap", }, }; extern uint16_t shmdev_rx_node_process(struct rte_graph * graph, struct rte_node * node, void ** objs, uint16_t cnt); extern uint16_t shmdev_tx_node_process(struct rte_graph * graph, struct rte_node * node, void ** objs, uint16_t cnt); static struct rte_node_register shm_dev_rx_node_base = { .process = shmdev_rx_node_process, .flags = RTE_NODE_SOURCE_F, .name = "shm_dev_rx_node", .init = dev_node_init, .nb_edges = 2, .next_nodes = { [0] = "eth_ingress", [1] = "pkt_drop_trap", }, }; static struct rte_node_register shm_dev_tx_node_base = { .process = shmdev_tx_node_process, .name = "shm_dev_tx_node", .init = dev_node_init, .nb_edges = 1, .next_nodes = { [0] = "pkt_drop_trap", }, }; RTE_NODE_REGISTER(dpdk_dev_rx_node_base); RTE_NODE_REGISTER(dpdk_dev_tx_node_base); RTE_NODE_REGISTER(dpdk_dev_rx_node_periodical); RTE_NODE_REGISTER(dpdk_dev_tx_node_periodical); RTE_NODE_REGISTER(dpdk_msgpack_dev_rx_node_base); RTE_NODE_REGISTER(dpdk_msgpack_dev_tx_node_base); RTE_NODE_REGISTER(shm_dev_rx_node_base); RTE_NODE_REGISTER(shm_dev_tx_node_base); int node_manager_dev_init(struct sc_main * sc, struct node_manager_main * node_mgr_main) { TAILQ_INIT(&p_dev_node_main->elems_head); /* reference to sc_main */ p_dev_node_main->sc = sc; unsigned int dev_desc_iterator = 0; struct mr_dev_desc * dev_desc = NULL; unsigned int nr_dpdk_devs = 0; unsigned int nr_dpdk_msgpack_devs = 0; unsigned int nr_shm_devs = 0; unsigned int nr_periodical_nodes = 0; while ((dev_desc = mr_dev_desc_iterate(sc->devmgr_main, &dev_desc_iterator)) != NULL) { /* the bond's slave should not be rx/tx by itself */ if (dev_desc->is_bond_slave) continue; struct dev_node_elem * rx_node_elem = ZMALLOC(sizeof(struct dev_node_elem)); MR_VERIFY_MALLOC(rx_node_elem); struct rte_node_register * rx_node_base = NULL; struct rte_node_register * tx_node_base = NULL; /* DPDK devices */ if (dev_desc->dpdk_dev_desc != NULL) { switch (dev_desc->encode_type) { case MR_DEV_ENCODE_TYPE_NONE: rx_node_base = &dpdk_dev_rx_node_base; tx_node_base = &dpdk_dev_tx_node_base; nr_dpdk_devs++; break; case MR_DEV_ENCODE_TYPE_MSGPACK: rx_node_base = &dpdk_msgpack_dev_rx_node_base; tx_node_base = &dpdk_msgpack_dev_tx_node_base; nr_dpdk_msgpack_devs++; break; default: assert(0); } } /* shm devices */ else if (dev_desc->shm_dev_desc != NULL) { rx_node_base = &shm_dev_rx_node_base; tx_node_base = &shm_dev_tx_node_base; nr_shm_devs++; } assert(rx_node_base != NULL); assert(tx_node_base != NULL); MR_DEBUG("node_clone: rx_node_base_id=%d, dev_desc_symbol=%s", rx_node_base->id, dev_desc->symbol); rte_node_t rx_node_id = rte_node_clone(rx_node_base->id, dev_desc->symbol); MR_VERIFY(rx_node_id != RTE_NODE_ID_INVALID); struct dev_node_ctx * node_ctx = &rx_node_elem->ctx; rx_node_elem->nid = rx_node_id; node_ctx->dev_desc = dev_desc; /* tx nodes */ struct dev_node_elem * tx_node_elem = ZMALLOC(sizeof(struct dev_node_elem)); MR_VERIFY_MALLOC(tx_node_elem); MR_DEBUG("node_clone: tx_node_base_id=%d, dev_desc_symbol=%s", tx_node_base->id, dev_desc->symbol); rte_node_t tx_node_id = rte_node_clone(tx_node_base->id, dev_desc->symbol); MR_VERIFY(tx_node_id != RTE_NODE_ID_INVALID); tx_node_elem->nid = tx_node_id; tx_node_elem->ctx.dev_desc = dev_desc; node_ctx = &tx_node_elem->ctx; node_ctx->dev_desc = dev_desc; dev_desc->rx_node_id = rx_node_id; dev_desc->tx_node_id = tx_node_id; TAILQ_INSERT_TAIL(&p_dev_node_main->elems_head, rx_node_elem, entries); TAILQ_INSERT_TAIL(&p_dev_node_main->elems_head, tx_node_elem, entries); if (dev_desc->en_periodic_rx_tx) { struct dev_node_elem * rx_periodical_node_elem = ZMALLOC(sizeof(struct dev_node_elem)); MR_VERIFY_MALLOC(rx_periodical_node_elem); rte_node_t rx_periodical_node_id = rte_node_clone(dpdk_dev_rx_node_periodical.id, dev_desc->symbol); MR_VERIFY(rx_periodical_node_id != RTE_NODE_ID_INVALID); rx_periodical_node_elem->nid = rx_periodical_node_id; rx_periodical_node_elem->ctx.dev_desc = dev_desc; node_ctx = &rx_periodical_node_elem->ctx; node_ctx->dev_desc = dev_desc; struct dev_node_elem * tx_periodical_node_elem = ZMALLOC(sizeof(struct dev_node_elem)); MR_VERIFY_MALLOC(tx_periodical_node_elem); rte_node_t tx_periodical_node_id = rte_node_clone(dpdk_dev_tx_node_periodical.id, dev_desc->symbol); MR_VERIFY(tx_periodical_node_id != RTE_NODE_ID_INVALID); tx_periodical_node_elem->nid = tx_periodical_node_id; tx_periodical_node_elem->ctx.dev_desc = dev_desc; node_ctx = &tx_periodical_node_elem->ctx; node_ctx->dev_desc = dev_desc; TAILQ_INSERT_TAIL(&p_dev_node_main->elems_head, rx_periodical_node_elem, entries); TAILQ_INSERT_TAIL(&p_dev_node_main->elems_head, tx_periodical_node_elem, entries); nr_periodical_nodes++; } } if (nr_dpdk_devs > 0) { node_setup_desc_add_for_all_workers(node_mgr_main, "dpdk_dev_rx_node-*"); node_setup_desc_add_for_all_workers(node_mgr_main, "dpdk_dev_tx_node-*"); } if (nr_periodical_nodes > 0) { node_setup_desc_add_for_all_workers(node_mgr_main, "dpdk_dev_rx_node_periodical-*"); node_setup_desc_add_for_all_workers(node_mgr_main, "dpdk_dev_tx_node_periodical-*"); } if (nr_dpdk_msgpack_devs > 0) { node_setup_desc_add_for_all_workers(node_mgr_main, "dpdk_msgpack_dev_rx_node-*"); node_setup_desc_add_for_all_workers(node_mgr_main, "dpdk_msgpack_dev_tx_node-*"); } if (nr_shm_devs > 0) { node_setup_desc_add_for_all_workers(node_mgr_main, "shm_dev_rx_node-*"); node_setup_desc_add_for_all_workers(node_mgr_main, "shm_dev_tx_node-*"); } return 0; } cJSON * phydev_rx_node_monit_loop(struct sc_main * sc) { unsigned int nr_graphs = sc->nr_io_thread; cJSON * json_root = cJSON_CreateObject(); /* total_rx_pkts, total_tx_pkts, rx_zero_iterations, rx_total_iterations */ uint64_t total_rx_pkts[RTE_MAX_LCORE]; uint64_t total_tx_pkts[RTE_MAX_LCORE]; uint64_t rx_pkts_per_batch[RTE_MAX_LCORE]; uint64_t tx_pkts_per_batch[RTE_MAX_LCORE]; uint64_t total_rx_drop_pkts[RTE_MAX_LCORE]; uint64_t total_tx_drop_pkts[RTE_MAX_LCORE]; double rx_spin_time[RTE_MAX_LCORE]; uint64_t df_rx[RTE_MAX_LCORE]; uint64_t df_tx[RTE_MAX_LCORE]; uint64_t total_tx_meter_red[RTE_MAX_LCORE]; uint64_t total_tx_meter_yellow[RTE_MAX_LCORE]; uint64_t total_tx_meter_green[RTE_MAX_LCORE]; for (unsigned int i = 0; i < nr_graphs; i++) { total_rx_pkts[i] = phydev_stat[i].total_rx_pkts; total_tx_pkts[i] = phydev_stat[i].total_tx_pkts; rx_pkts_per_batch[i] = phydev_stat[i].rx_pkts_per_batch; tx_pkts_per_batch[i] = phydev_stat[i].tx_pkts_per_batch; total_rx_drop_pkts[i] = phydev_stat[i].total_rx_drop_pkts; total_tx_drop_pkts[i] = phydev_stat[i].total_tx_drop_pkts; total_tx_meter_green[i] = phydev_stat[i].tx_meter_green; total_tx_meter_yellow[i] = phydev_stat[i].tx_meter_yellow; total_tx_meter_red[i] = phydev_stat[i].tx_meter_red; /* calculate the spin time */ uint32_t rx_zero_iterations = phydev_stat[i].rx_zero_iterations; uint32_t rx_non_zero_iterations = phydev_stat[i].rx_non_zero_iterations; phydev_stat[i].rx_non_zero_iterations = 0; phydev_stat[i].rx_zero_iterations = 0; uint32_t rx_total_iterations = rx_zero_iterations + rx_non_zero_iterations; if (rx_total_iterations != 0) { rx_spin_time[i] = (double)rx_zero_iterations / (double)rx_total_iterations; } else { rx_spin_time[i] = 0; } /* DF calculate */ df_rx[i] = df_calculate_get_result(&phydev_stat[i].rx_df, DF_MR_DEFAULT); df_tx[i] = df_calculate_get_result(&phydev_stat[i].tx_df, DF_MR_DEFAULT); } cJSON * json_total_rx_pkts = create_uint64_array(total_rx_pkts, nr_graphs); cJSON_AddItemToObject(json_root, "total_rx_pkts", json_total_rx_pkts); cJSON * json_total_tx_pkts = create_uint64_array(total_tx_pkts, nr_graphs); cJSON_AddItemToObject(json_root, "total_tx_pkts", json_total_tx_pkts); cJSON * json_rx_pkts_per_batch = create_uint64_array(rx_pkts_per_batch, nr_graphs); cJSON_AddItemToObject(json_root, "rx_pkts_per_batch", json_rx_pkts_per_batch); cJSON * json_tx_pkts_per_batch = create_uint64_array(tx_pkts_per_batch, nr_graphs); cJSON_AddItemToObject(json_root, "tx_pkts_per_batch", json_tx_pkts_per_batch); cJSON * json_total_rx_drop_pkts = create_uint64_array(total_rx_drop_pkts, nr_graphs); cJSON_AddItemToObject(json_root, "total_rx_drop_pkts", json_total_rx_drop_pkts); cJSON * json_total_tx_drop_pkts = create_uint64_array(total_tx_drop_pkts, nr_graphs); cJSON_AddItemToObject(json_root, "total_tx_drop_pkts", json_total_tx_drop_pkts); cJSON * json_total_tx_meter_red = create_uint64_array(total_tx_meter_red, nr_graphs); cJSON_AddItemToObject(json_root, "total_tx_meter_red", json_total_tx_meter_red); cJSON * json_total_tx_meter_yellow = create_uint64_array(total_tx_meter_yellow, nr_graphs); cJSON_AddItemToObject(json_root, "total_tx_meter_yellow", json_total_tx_meter_yellow); cJSON * json_total_tx_meter_green = create_uint64_array(total_tx_meter_green, nr_graphs); cJSON_AddItemToObject(json_root, "total_tx_meter_green", json_total_tx_meter_green); cJSON * json_rx_spin_time = cJSON_CreateDoubleArray(rx_spin_time, (int)nr_graphs); cJSON_AddItemToObject(json_root, "rx_spin_time", json_rx_spin_time); cJSON * json_df_rx = create_uint64_array(df_rx, nr_graphs); cJSON_AddItemToObject(json_root, "df_rx", json_df_rx); cJSON * json_df_tx = create_uint64_array(df_tx, nr_graphs); cJSON_AddItemToObject(json_root, "df_tx", json_df_tx); return json_root; } int node_manager_phydev_deinit(struct node_manager_main * node_mgr_main) { return 0; }