#include "sc_trace.h" #include #include #include #include #include #include #include #include #include #include #include #include #define DEFAULT_PRIVATE_MULTIPLIER 3 #define DEFAULT_PRIVATE_INTERVAL 200 #define HEALTH_CHECK_MS_TO_TCS 1000 enum health_check_method { HEALTH_CHECK_PRIVATE = 0, HEALTH_CHECK_MAX }; /* Private health check ask next node */ enum { HEALTH_CHECK_ASK_NEXT_ETH_EGRESS = 0, HEALTH_CHECK_ASK_NEXT_PKT_DROP, HEALTH_CHECK_ASK_NEXT_MAX }; /* Private health check deal answer next node */ enum { HEALTH_CHECK_DEAL_ANSWER_NEXT_PKT_DROP = 0, HEALTH_CHECK_DEAL_ANSWER_NEXT_MAX }; /* Health check ask stats */ struct health_check_ask_stats { volatile uint64_t total_pkts; volatile uint64_t pkts_per_batch; volatile uint64_t buf_alloc_err; } __rte_cache_aligned; /* Health check deal answer exception reason */ enum health_chk_deal_ans_exc_reason { HEALTH_CHK_DEAL_ANS_EXC_RSN_SESS_MISS = 0, HEALTH_CHK_DEAL_ANS_EXC_RSN_TIMEOUT, HEALTH_CHK_DEAL_ANS_EXC_RSN_MAX }; /* Health check deal answer exception reason string */ static const char * health_chk_deal_ans_exc_reason_str[HEALTH_CHK_DEAL_ANS_EXC_RSN_MAX] = {"session_lookup_miss", "timeout"}; /* Health check deal answer stats */ struct health_check_deal_answer_stats { volatile uint64_t total_pkts; volatile uint64_t pkts_per_batch; volatile uint64_t excpt_reason[HEALTH_CHK_DEAL_ANS_EXC_RSN_MAX]; } __rte_cache_aligned; /* Private health check hdr */ struct private_health_check_hdr { uint64_t expire_time_nsec; }; /* Health check target for "private" */ struct private_health_check_target { uint8_t multiplier; uint32_t interval; }; /* Declaration struct */ struct health_check_session; struct node_health_check_main; extern void node_setup_desc_add_for_all_workers(struct node_manager_main * node_mgr_main, const char * node_sym); /* Health check ask func */ typedef int (*ask_func)(struct node_health_check_main * health_check_main, struct health_check_session * session, struct rte_mbuf * pkts[], unsigned int mbuf_index); /* Health check deal answer func */ typedef int (*deal_answer_func)(struct health_check_session * session, struct rte_ether_hdr * eth_hdr); /* Health check service func */ typedef void (*check_service_func)(struct health_check_session * session); /* Health check session */ struct health_check_session { RTE_MARKER cacheline0; rte_atomic64_t remote_state; uint8_t enable : 1; uint8_t no_use_bit : 6; uint8_t session_id; uint16_t method; port_id_t port_id; union { struct private_health_check_target private_target; }; uint64_t detection_cycle_ms; ask_func ask_func; deal_answer_func deal_answer_func; check_service_func check_service_func; uint8_t no_use[2]; RTE_MARKER cacheline1 __rte_cache_min_aligned; volatile uint64_t ask_ms; rte_atomic64_t answer_ms; char name[MR_STRING_MAX]; char device[MR_STRING_MAX]; } __rte_cache_aligned; /* Health check main */ struct node_health_check_main { uint8_t total_session_num; uint8_t per_thread_session_num; struct health_check_session sessions[256]; struct rte_mempool * direct_pool; }; /* Health check main for global */ static struct node_health_check_main g_health_check_main = {}; struct health_check_ask_stats ask_stats[RTE_MAX_LCORE] = {}; struct health_check_deal_answer_stats deal_answer_stats[RTE_MAX_LCORE] = {}; static inline struct node_health_check_main * node_health_check_main_get(void) { return &g_health_check_main; } /************************************* Private health check public func **************************************/ /* Health check get remote state */ int64_t get_health_check_remote_state(struct health_check_session * session) { return rte_atomic64_read(&session->remote_state); } /* Health check get session name */ char * get_health_check_session_name(struct health_check_session * session) { return session->name; } /* Health check get session id for name */ int get_health_check_session_for_name(char * name, struct health_check_session ** out_session) { struct node_health_check_main * health_check_main = node_health_check_main_get(); if (strcmp(name, "null") == 0) { *out_session = NULL; return RT_SUCCESS; } for (int index = 0; index < health_check_main->total_session_num; index++) { struct health_check_session * session = &health_check_main->sessions[index]; if (strcmp(name, session->name) != 0) continue; *out_session = session; return RT_SUCCESS; } return RT_ERR; } /* Health check get session id for pkt */ struct health_check_session * health_check_session_lookup(port_id_t ingress_port_id, struct rte_ether_hdr * eth_hdr) { struct node_health_check_main * health_check_main = node_health_check_main_get(); port_id_t port_id_from_eth = eth_hdr->src_addr.addr_bytes[4] << 16 | eth_hdr->src_addr.addr_bytes[5]; for (int index = 0; index < health_check_main->total_session_num; index++) { struct health_check_session * session = &health_check_main->sessions[index]; if (session->port_id != ingress_port_id) continue; if (session->port_id != port_id_from_eth) continue; return session; } return NULL; } /* Private health check ask func */ int private_ask_func(struct node_health_check_main * health_check_main, struct health_check_session * session, struct rte_mbuf * pkts[], unsigned int mbuf_index) { /* Alloc new mbuf */ struct rte_mbuf * mbuf = rte_pktmbuf_alloc(health_check_main->direct_pool); if (mbuf == NULL) { return RT_ERR; } /* Get private ctrlzone */ struct mrb_metadata * mrb_meta = mrbuf_cz_data(mbuf, MR_NODE_CTRLZONE_ID); mrb_metadata_clear(mrb_meta); mrb_meta->port_ingress = session->port_id; mrb_meta->port_egress = session->port_id; mrb_meta->packet_create_from_nf = 0; mrb_meta->health_check = 1; /* Set mbuf pkt_len */ mbuf->data_len = sizeof(struct rte_ether_hdr) + sizeof(struct private_health_check_hdr); mbuf->pkt_len = mbuf->data_len; /* Build eth hdr */ struct rte_ether_hdr * eth_hdr = rte_pktmbuf_mtod(mbuf, struct rte_ether_hdr *); /* Src ae:aa:aa:aa:port_id:port_id */ eth_hdr->src_addr.addr_bytes[0] = 0xae; eth_hdr->src_addr.addr_bytes[1] = 0xaa; eth_hdr->src_addr.addr_bytes[2] = 0xaa; eth_hdr->src_addr.addr_bytes[3] = 0xaa; eth_hdr->src_addr.addr_bytes[4] = session->port_id >> 16; eth_hdr->src_addr.addr_bytes[5] = session->port_id & 0xff; /* Dst ae:bb:rand:bb:rand:rand */ eth_hdr->dst_addr.addr_bytes[0] = 0xae; eth_hdr->dst_addr.addr_bytes[1] = 0xbb; eth_hdr->dst_addr.addr_bytes[2] = rte_rand_max(0xff); eth_hdr->dst_addr.addr_bytes[3] = 0xbb; eth_hdr->dst_addr.addr_bytes[4] = rte_rand_max(0xff); eth_hdr->dst_addr.addr_bytes[5] = rte_rand_max(0xff); /* type */ eth_hdr->ether_type = ETH_HEALTH_CHECK_PRIVATE; /* Build private hdr */ struct private_health_check_hdr * private_hdr = rte_pktmbuf_mtod_offset(mbuf, struct private_health_check_hdr *, sizeof(struct rte_ether_hdr)); /* Expire time nsec = current + interval * multiplier */ uint64_t cur_time_ms = rte_rdtsc() / HEALTH_CHECK_MS_TO_TCS * US_PER_S / (rte_get_tsc_hz() + US_PER_S - 1); private_hdr->expire_time_nsec = cur_time_ms + session->private_target.interval * session->private_target.multiplier; struct pkt_parser pkt_parser; pkt_parser_init(&pkt_parser, &mrb_meta->pkt_parser_result, LAYER_TYPE_ALL, MR_PKT_PARSER_LAYERS_MAX); pkt_parser_exec(&pkt_parser, mbuf); /* Fill mbuf */ pkts[mbuf_index] = mbuf; return RT_SUCCESS; } /* Private health check deal answer func */ int private_deal_answer_func(struct health_check_session * session, struct rte_ether_hdr * eth_hdr) { struct private_health_check_hdr * _private_hdr = (struct private_health_check_hdr *)(eth_hdr + 1); int64_t cur_time_ms = rte_rdtsc() / HEALTH_CHECK_MS_TO_TCS * US_PER_S / (rte_get_tsc_hz() + US_PER_S - 1); if (cur_time_ms < _private_hdr->expire_time_nsec) { rte_atomic64_set(&session->answer_ms, cur_time_ms); return RT_SUCCESS; } return RT_ERR; } /* Private health check func */ void private_health_check_func(struct health_check_session * session) { int64_t cur_time_ms = rte_rdtsc() / HEALTH_CHECK_MS_TO_TCS * US_PER_S / (rte_get_tsc_hz() + US_PER_S - 1); const uint64_t drain_time = rte_atomic64_read(&session->answer_ms) + session->private_target.interval * session->private_target.multiplier; int64_t remote_state = REMOTE_UP; /* Get remote state */ if (cur_time_ms > drain_time) remote_state = REMOTE_DOWN; /* Check remote state */ if (remote_state == rte_atomic64_read(&session->remote_state)) return; /* Update remote state and output log */ rte_atomic64_set(&session->remote_state, remote_state); switch (remote_state) { case REMOTE_UP: MR_INFO("Health check, name=%s, device=%s, remote_state=Up", session->name, session->device); break; case REMOTE_DOWN: MR_INFO("Health check, name=%s, device=%s, remote_state=Down", session->name, session->device); break; } } /* Private distributer calculate */ void private_distributer_calculate(struct distributer * dist_object, struct rte_mbuf * mbufs[], unsigned int nr_mbufs) { for (int i = 0; i < nr_mbufs; i++) { /* Get eth hdr */ struct rte_ether_hdr * eth_hdr = rte_pktmbuf_mtod(mbufs[i], struct rte_ether_hdr *); const char * s_data = (const char *)ð_hdr->src_addr.addr_bytes; const char * d_data = (const char *)ð_hdr->dst_addr.addr_bytes; uint32_t src_hash = rte_hash_crc(s_data, sizeof(eth_hdr->src_addr.addr_bytes), dist_object->orin_hash); uint32_t dst_hash = rte_hash_crc(d_data, sizeof(eth_hdr->dst_addr.addr_bytes), dist_object->orin_hash); mbufs[i]->hash.usr = src_hash ^ dst_hash; } } /************************************* Health check config **************************************/ /* Parse the health check config */ int parse_health_check_config(struct sc_main * sc, struct node_health_check_main * health_check_main) { uint8_t total_session_num = 0; for (int session_index = 0; session_index < 256; session_index++) { /* Parse 'name' */ char str_section[MR_SYMBOL_MAX] = {}; char str_name[MR_STRING_MAX] = {}; snprintf(str_section, sizeof(str_section), "health_check:%d", session_index); int ret = MESA_load_profile_string_nodef(sc->local_cfgfile, str_section, "name", str_name, sizeof(str_name)); if (ret < 0) continue; /* Parse 'device' */ char str_device[MR_STRING_MAX] = {}; ret = MESA_load_profile_string_nodef(sc->local_cfgfile, str_section, "device", str_device, sizeof(str_device)); if (ret < 0) { MR_ERROR("The health check name : %s ,no config 'device' .", str_section); return RT_ERR; } /* Parse 'method' */ uint32_t method = HEALTH_CHECK_PRIVATE; ret = MESA_load_profile_uint_nodef(sc->local_cfgfile, str_section, "method", &method); if (ret < 0) { MR_ERROR("The health check name : %s ,no config 'method' .", str_section); return RT_ERR; } /* Check method */ if (method != HEALTH_CHECK_PRIVATE) { MR_ERROR("The health check name : %s , 'method=%u' is invalid .", str_section, method); return RT_ERR; } /* Parse 'multiplier' and 'interval' */ uint32_t multiplier = DEFAULT_PRIVATE_MULTIPLIER; uint32_t interval = DEFAULT_PRIVATE_INTERVAL; MESA_load_profile_uint_def(sc->local_cfgfile, str_section, "multiplier", &multiplier, DEFAULT_PRIVATE_MULTIPLIER); MESA_load_profile_uint_def(sc->local_cfgfile, str_section, "interval", &interval, DEFAULT_PRIVATE_INTERVAL); /* Check the dev name */ struct mr_dev_desc * dev_desc = mr_dev_desc_lookup(sc->devmgr_main, str_device); if (dev_desc == NULL) { MR_ERROR("In section [%s], expected device %s is not existed.", str_section, str_device); return RT_ERR; } /* Fill the session */ struct health_check_session * session = &health_check_main->sessions[total_session_num]; memcpy(session->name, str_name, sizeof(str_name)); memcpy(session->device, str_device, sizeof(str_device)); session->enable = 1; session->session_id = session_index; session->method = method; session->port_id = dev_desc->port_id; rte_atomic64_init(&session->answer_ms); rte_atomic64_init(&session->remote_state); rte_atomic64_set(&session->remote_state, REMOTE_UP); switch (method) { case HEALTH_CHECK_PRIVATE: { session->private_target.multiplier = (uint8_t)multiplier; session->private_target.interval = interval; session->detection_cycle_ms = interval; session->ask_func = private_ask_func; session->deal_answer_func = private_deal_answer_func; session->check_service_func = private_health_check_func; } break; } /* Update total session num */ total_session_num++; } /* Save total session num */ health_check_main->total_session_num = total_session_num; /* Computer per thread session num */ if (total_session_num % sc->nr_io_thread) health_check_main->per_thread_session_num = total_session_num / sc->nr_io_thread + 1; else health_check_main->per_thread_session_num = total_session_num / sc->nr_io_thread; struct rte_mempool * d_mempool = mrb_direct_mempool_locate(sc->mrb_pool_main, NULL, SOCKET_ID_ANY, LCORE_ID_ANY); if (d_mempool == NULL) { MR_ERROR("The health check get direct pool error ."); return RT_ERR; } health_check_main->direct_pool = d_mempool; return RT_SUCCESS; } /* Dump the health check session */ void dump_health_check_session(struct health_check_session * session, int session_id) { char str_session_info[1024] = {}; int len = snprintf(str_session_info, sizeof(str_session_info), "Health check, id=%u", session_id); len += snprintf(str_session_info + len, sizeof(str_session_info) - len, ", name=%s", session->name); len += snprintf(str_session_info + len, sizeof(str_session_info) - len, ", dev_name=%s", session->device); len += snprintf(str_session_info + len, sizeof(str_session_info) - len, ", dev_id=%u", session->port_id); switch (session->method) { case HEALTH_CHECK_PRIVATE: { len += snprintf(str_session_info + len, sizeof(str_session_info) - len, ", method=private"); len += snprintf(str_session_info + len, sizeof(str_session_info) - len, ", multiplier=%u", session->private_target.multiplier); len += snprintf(str_session_info + len, sizeof(str_session_info) - len, ", interval=%u", session->private_target.interval); } break; } MR_INFO("%s", str_session_info); } /* Dump the health check config */ void dump_health_check_config(struct node_health_check_main * health_check_main) { if (health_check_main->total_session_num == 0) return; MR_INFO(" "); /* Dump the health check config */ MR_INFO("Health check session total number: %u", health_check_main->total_session_num); for (int index = 0; index < health_check_main->total_session_num; index++) { struct health_check_session * session = &health_check_main->sessions[index]; dump_health_check_session(session, index); } } /************************************** Health check init **************************************/ /* Health check init */ int health_check_init(struct sc_main * sc) { struct node_health_check_main * health_check_main = node_health_check_main_get(); /* Parse health check config */ if (parse_health_check_config(sc, health_check_main) == RT_ERR) return RT_ERR; /* Dump health check config */ dump_health_check_config(health_check_main); return RT_SUCCESS; } /* Enroll health_check_ask node */ int node_manager_health_check_init(struct node_manager_main * node_mgr_main) { node_setup_desc_add_for_all_workers(node_mgr_main, "health_check_ask"); return RT_SUCCESS; } /************************************** Health check ask statistics **************************************/ cJSON * health_check_ask_node_monit_loop(struct sc_main * sc) { cJSON * json_root = cJSON_CreateObject(); unsigned int nr_graphs = sc->nr_io_thread; uint64_t total_pkts[nr_graphs]; uint64_t pkts_per_batch[nr_graphs]; uint64_t buf_alloc_err[nr_graphs]; for (uint32_t graph_id = 0; graph_id < nr_graphs; graph_id++) { struct health_check_ask_stats * stats = &ask_stats[graph_id]; if (stats->total_pkts == 0) { total_pkts[graph_id] = 0; pkts_per_batch[graph_id] = 0; buf_alloc_err[graph_id] = 0; continue; } total_pkts[graph_id] = stats->total_pkts; pkts_per_batch[graph_id] = stats->pkts_per_batch; buf_alloc_err[graph_id] = stats->buf_alloc_err; } cJSON * json_total_pkts = create_uint64_array(total_pkts, nr_graphs); cJSON_AddItemToObject(json_root, "health_chk_ask, total_pkts", json_total_pkts); cJSON * json_pkts_per_batch = create_uint64_array(pkts_per_batch, nr_graphs); cJSON_AddItemToObject(json_root, "health_chk_ask, pkts_per_batch", json_pkts_per_batch); cJSON * json_buf_alloc_err = create_uint64_array(buf_alloc_err, nr_graphs); cJSON_AddItemToObject(json_root, "health_chk_ask, buf_alloc_err", json_buf_alloc_err); return json_root; } /************************************ Health check deal answer statistics *************************************/ cJSON * health_check_deal_answer_node_monit_loop(struct sc_main * sc) { cJSON * json_root = cJSON_CreateObject(); unsigned int nr_graphs = sc->nr_io_thread; uint64_t total_pkts[nr_graphs]; uint64_t pkts_per_batch[nr_graphs]; uint64_t excpt_reason[nr_graphs][HEALTH_CHK_DEAL_ANS_EXC_RSN_MAX]; for (uint32_t graph_id = 0; graph_id < nr_graphs; graph_id++) { struct health_check_deal_answer_stats * stats = &deal_answer_stats[graph_id]; if (stats->total_pkts == 0) { total_pkts[graph_id] = 0; pkts_per_batch[graph_id] = 0; memset(excpt_reason[graph_id], 0, sizeof(excpt_reason[graph_id])); continue; } total_pkts[graph_id] = stats->total_pkts; pkts_per_batch[graph_id] = stats->pkts_per_batch; for (int i = 0; i < HEALTH_CHK_DEAL_ANS_EXC_RSN_MAX; i++) { excpt_reason[graph_id][i] = stats->excpt_reason[i]; } } cJSON * json_total_pkts = cJSON_CreateIntArray((const int *)total_pkts, nr_graphs); cJSON_AddItemToObject(json_root, "health_chk_deal_ans, total_pkts", json_total_pkts); cJSON * json_pkts_per_batch = cJSON_CreateIntArray((const int *)pkts_per_batch, nr_graphs); cJSON_AddItemToObject(json_root, "health_chk_deal_ans, pkts_per_batch", json_pkts_per_batch); for (int i = 0; i < HEALTH_CHK_DEAL_ANS_EXC_RSN_MAX; i++) { char str_title[MR_STRING_MAX]; snprintf(str_title, sizeof(str_title), "health_chk_deal_ans, %s", health_chk_deal_ans_exc_reason_str[i]); cJSON * json_excpt_reason = cJSON_CreateIntArray((const int *)excpt_reason[i], nr_graphs); cJSON_AddItemToObject(json_root, str_title, json_excpt_reason); } return json_root; } /************************************ Health check session state *************************************/ cJSON * health_check_session_monit_loop(struct sc_main * sc) { struct node_health_check_main * health_check_main = node_health_check_main_get(); cJSON * json_root = cJSON_CreateObject(); for (int index = 0; index < health_check_main->total_session_num; index++) { struct health_check_session * session = &health_check_main->sessions[index]; if (session->enable == 0) continue; cJSON * session_obj = cJSON_CreateObject(); cJSON_AddNumberToObject(session_obj, "session_id", session->session_id); cJSON_AddStringToObject(session_obj, "session_name", session->name); cJSON_AddStringToObject(session_obj, "device_name", session->device); cJSON_AddNumberToObject(session_obj, "port_id", session->port_id); if (rte_atomic64_read(&session->remote_state)) cJSON_AddStringToObject(session_obj, "remote_state", "up"); else cJSON_AddStringToObject(session_obj, "remote_state", "down"); switch (session->method) { case HEALTH_CHECK_PRIVATE: { cJSON_AddStringToObject(session_obj, "method", "private"); cJSON_AddNumberToObject(session_obj, "multiplier", session->private_target.multiplier); cJSON_AddNumberToObject(session_obj, "interval", session->private_target.interval); } } char session_index[MR_STRING_MAX] = {}; snprintf(session_index, sizeof(session_index), "session-%u", session->session_id); cJSON_AddItemToObject(json_root, session_index, session_obj); } return json_root; } /************************************ Private health check services *************************************/ /* Health check func */ void health_check_func() { struct node_health_check_main * health_check_main = node_health_check_main_get(); for (int index = 0; index < health_check_main->total_session_num; index++) { /* Get session item */ struct health_check_session * session = &health_check_main->sessions[index]; /* Deal check service func */ session->check_service_func(session); } } /* Health check thread */ void * health_check_thread(void * args) { pthread_detach(pthread_self()); while (g_keep_running) { health_check_func(); usleep(HEALTH_CHECK_MS_TO_TCS); } return (void *)NULL; } /************************************* Health check ask node **************************************/ /* Generate and store the trace information */ static __rte_always_inline void gen_store_trace_info_ask_node(struct rte_node * node, struct rte_mbuf * mbuf, uint16_t next_node_index, struct health_check_session * session) { /* Populate the next node infomation */ char str_record[MR_STRING_MAX]; snprintf(str_record, sizeof(str_record), "next node:%s, core:%u, session name:%s, listening dev:%u,%s", node->nodes[next_node_index]->name, rte_lcore_id(), session->name, session->port_id, session->device); /* Emit the trace record */ struct dp_trace_record_meta meta = { .measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE, .appsym = MR_TRACE_APPSYM, .module = node->name}; dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record); } /* Health check ask node process function */ static __rte_always_inline uint16_t health_check_ask_node_process(struct rte_graph * graph, struct rte_node * node, void ** objs, uint16_t cnt) { unsigned int nr_mbufs = 0; struct node_health_check_main * health_check_main = node_health_check_main_get(); uint64_t cur_time_ms = rte_rdtsc() / HEALTH_CHECK_MS_TO_TCS * US_PER_S / (rte_get_tsc_hz() + US_PER_S - 1); struct health_check_ask_stats stats = {}; uint8_t start_session_id = graph->id * health_check_main->per_thread_session_num; for (int index = 0; index < health_check_main->per_thread_session_num; index++) { /* Check start session id */ if (start_session_id >= health_check_main->total_session_num) break; /* Get health check session */ struct health_check_session * session = &health_check_main->sessions[start_session_id]; /* Check detection cycle */ uint64_t diff_time_ms = cur_time_ms - session->ask_ms; if (session->detection_cycle_ms > diff_time_ms) continue; /* Call ask func */ int ret = session->ask_func(health_check_main, session, (struct rte_mbuf **)node->objs, nr_mbufs); if (ret == RT_ERR) { stats.buf_alloc_err++; break; } /* Update ask tsc */ session->ask_ms = cur_time_ms; /* Mark packets for trace*/ struct rte_mbuf * mbuf = (struct rte_mbuf *)node->objs[nr_mbufs]; dp_trace_filter_exec(sc_main_get()->trace, mbuf, 0, rte_lcore_id()); /* Check if tracing is enabled for the current Mbuf */ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE))) { gen_store_trace_info_ask_node(node, mbuf, HEALTH_CHECK_ASK_NEXT_ETH_EGRESS, session); } /* Update pkts and start session id */ nr_mbufs++; start_session_id++; } /* hash calculate */ node->idx = nr_mbufs; private_distributer_calculate(sc_main_get()->dist_object, (struct rte_mbuf **)node->objs, node->idx); /* Record the mbuf information */ for (int i = 0; i < nr_mbufs; i++) { struct rte_mbuf * mbuf = (struct rte_mbuf *)node->objs[i]; if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE))) gen_store_trace_info_rte_mbuf(node, mbuf); } /* To eth egress */ rte_node_next_stream_move(graph, node, HEALTH_CHECK_ASK_NEXT_ETH_EGRESS); /* Update total count */ struct health_check_ask_stats * graph_stats = &ask_stats[graph->id]; graph_stats->total_pkts += nr_mbufs; graph_stats->pkts_per_batch = nr_mbufs; graph_stats->buf_alloc_err += stats.buf_alloc_err; return nr_mbufs; } /* Private health check ask node base */ static struct rte_node_register health_check_ask_node_base = { .process = health_check_ask_node_process, .flags = RTE_NODE_SOURCE_F, .name = "health_check_ask", .nb_edges = HEALTH_CHECK_ASK_NEXT_MAX, .next_nodes = { [HEALTH_CHECK_ASK_NEXT_ETH_EGRESS] = "eth_egress", [HEALTH_CHECK_ASK_NEXT_PKT_DROP] = "pkt_drop_trap", }, }; RTE_NODE_REGISTER(health_check_ask_node_base); /************************************* Health check deal answer node **************************************/ /* Generate and store the trace information */ static __rte_always_inline void gen_store_trace_info_answer_node(struct rte_node * node, struct rte_mbuf * mbuf, uint16_t next_node_index, struct health_check_deal_answer_stats * stats, struct health_check_session * session, enum health_chk_deal_ans_exc_reason excpt_reason) { /* Populate the next node infomation */ char str_record[MR_STRING_MAX]; int len = snprintf(str_record, sizeof(str_record), "next node:%s", node->nodes[next_node_index]->name); if (excpt_reason != HEALTH_CHK_DEAL_ANS_EXC_RSN_MAX) { assert(excpt_reason < HEALTH_CHK_DEAL_ANS_EXC_RSN_MAX); len += snprintf(str_record + len, sizeof(str_record) - len, ", %s", health_chk_deal_ans_exc_reason_str[excpt_reason]); } /* Populate the health check session infomation */ if (likely(session != NULL)) { len += snprintf(str_record + len, sizeof(str_record) - len, ", session name:%s, listening dev:%u,%s, remote_state:%ld", session->name, session->port_id, session->device, rte_atomic64_read(&session->remote_state)); } /* Emit the trace record */ struct dp_trace_record_meta meta = { .measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE, .appsym = MR_TRACE_APPSYM, .module = node->name}; dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record); } /* Health check deal answer node process function */ static __rte_always_inline uint16_t health_check_deal_answer_node_process(struct rte_graph * graph, struct rte_node * node, void ** objs, uint16_t cnt) { /* Get pkts num and pkts buffer */ uint16_t n_left_from = cnt; struct rte_mbuf ** pkts = (struct rte_mbuf **)objs; void ** batch_pkts = objs; struct health_check_deal_answer_stats stats = {}; enum health_chk_deal_ans_exc_reason excpt_reason = HEALTH_CHK_DEAL_ANS_EXC_RSN_MAX; /* Single packet processing */ while (n_left_from > 0) { struct rte_mbuf * mbuf = pkts[0]; pkts += 1; n_left_from -= 1; /* Get private ctrlzone */ struct mrb_metadata * mrb_meta = mrbuf_cz_data(mbuf, MR_NODE_CTRLZONE_ID); /* Get health check session */ struct rte_ether_hdr * eth_hdr = rte_pktmbuf_mtod(mbuf, struct rte_ether_hdr *); struct health_check_session * session = health_check_session_lookup(mrb_meta->port_ingress, eth_hdr); if (likely(session != NULL)) { /* Deal answer */ int ret = session->deal_answer_func(session, eth_hdr); if (unlikely(ret != RT_SUCCESS)) { excpt_reason = HEALTH_CHK_DEAL_ANS_EXC_RSN_TIMEOUT; stats.excpt_reason[excpt_reason]++; } } else { /* Update no match session pkts */ excpt_reason = HEALTH_CHK_DEAL_ANS_EXC_RSN_SESS_MISS; stats.excpt_reason[excpt_reason]++; } /* Check if tracing is enabled for the current Mbuf */ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE))) { gen_store_trace_info_answer_node(node, mbuf, HEALTH_CHECK_DEAL_ANSWER_NEXT_PKT_DROP, &stats, session, excpt_reason); } } /* Drop all pkt */ rte_node_enqueue(graph, node, HEALTH_CHECK_DEAL_ANSWER_NEXT_PKT_DROP, batch_pkts, cnt); /* Update graph stats */ struct health_check_deal_answer_stats * graph_stats = &deal_answer_stats[graph->id]; graph_stats->total_pkts += cnt; graph_stats->pkts_per_batch = cnt; for (int i = 0; i < HEALTH_CHK_DEAL_ANS_EXC_RSN_MAX; i++) { graph_stats->excpt_reason[i] += stats.excpt_reason[i]; } return cnt; } /* Health check ask node base */ static struct rte_node_register health_check_deal_answer_node_base = { .process = health_check_deal_answer_node_process, .name = "health_check_deal_answer", .nb_edges = HEALTH_CHECK_DEAL_ANSWER_NEXT_MAX, .next_nodes = { [HEALTH_CHECK_DEAL_ANSWER_NEXT_PKT_DROP] = "pkt_drop_trap", }, }; RTE_NODE_REGISTER(health_check_deal_answer_node_base);