#include #include #include #include #include #include #include #include #include #include #include /* Global config */ #define MR_LB_MAX_GROUP 1024 #define MR_LB_MAX_DEV_FOR_SINGLE_GROUP 1024 /* LB next node */ enum { LB_NEXT_FORWARDER = 0, LB_NEXT_ETH_EGRESS, LB_NEXT_PKT_DROP, LB_NEXT_MAX }; /* Dev type */ enum { DEV_TYPE_NORMAL = 0, DEV_TYPE_ACTIVE, DEV_TYPE_BACKUP, DEV_TYPE_MAX }; /* Dev flag */ enum { DEV_DISABLE = 0, DEV_ENABLE }; /* Group mode */ enum { GROUP_MODE_BALANCE = 0, GROUP_MODE_ACTIVE_BACKUP, GROUP_MODE_MAX }; /* Group state */ enum { GROUP_STATUS_UN_USE = 0, GROUP_STATUS_IN_USE }; /* Declaration struct */ struct lb_group; /* Health check session */ struct health_check_session; /* Dispatch function */ typedef int (*lb_dispatch_func)(struct node_lb_main * lb_main, struct lb_group * group_item, uint32_t hash_usr, port_id_t * egress_port_index); /* Get health check session */ int get_health_check_session_for_name(char * name, struct health_check_session ** __session); /* Get health check session name */ char * get_health_check_session_name(struct health_check_session * session); /* Get health check session remote state */ int64_t get_health_check_remote_state(struct health_check_session * _session); /* LB dev struct */ struct lb_dev { uint8_t dev_type; uint8_t dev_flag; struct mr_dev_desc * dev_desc; char dev_sym[MR_SYMBOL_MAX]; }; /* LB group struct */ struct lb_group { uint8_t index; uint8_t group_status; uint8_t group_mode; uint16_t dev_num; uint16_t next_group_num; uint16_t next_group_array[MR_LB_MAX_GROUP]; uint16_t sessions_num; uint32_t sid; lb_dispatch_func dispatch_func; struct lb_dev dev_array[MR_LB_MAX_DEV_FOR_SINGLE_GROUP]; struct health_check_session * sessions[MR_LB_MAX_DEV_FOR_SINGLE_GROUP]; }; /* LB main struct */ struct node_lb_main { uint16_t nr_group; uint32_t nr_sid; uint32_t sid_start; uint32_t sid_end; struct lb_group lb_groups[MR_LB_MAX_GROUP]; }; /* LB stats struct */ struct lb_stats { volatile uint64_t total_pkts; volatile uint64_t pkts_per_batch; volatile uint64_t to_eth_egress; volatile uint64_t to_forwarder; } __rte_cache_aligned; /* Load balance stats */ struct lb_stats stats_per_graph[RTE_MAX_LCORE] = {}; /* Global lb main */ static struct node_lb_main g_lb_main = {}; static inline struct node_lb_main * lb_main_get(void) { return &g_lb_main; } /* Get dev up num */ uint16_t get_dev_remote_link_up_num(struct lb_group * group_item, port_id_t * port_index_array) { uint16_t dev_up_num = 0; for (uint16_t i = 0; i < group_item->dev_num; i++) { /* Session = Null default remote up */ if (unlikely(group_item->sessions[i] == NULL)) { uint8_t link_status = LINK_DOWN; struct mr_dev_desc * dev_desc = group_item->dev_array[i].dev_desc; if (dev_desc->drv_type == MR_DEV_DRV_TYPE_SHMDEV) { link_status = dev_desc->shm_dev_desc->link_status; } else { link_status = dev_desc->dpdk_dev_desc->link_status.link_status; } if (unlikely(link_status == LINK_DOWN)) continue; } else { if (unlikely(get_health_check_remote_state(group_item->sessions[i]) == REMOTE_DOWN)) continue; } port_index_array[dev_up_num] = group_item->dev_array[i].dev_desc->port_id; dev_up_num++; } return dev_up_num; } /* Dispatch function */ int load_balance(struct node_lb_main * lb_main, struct lb_group * group_item, uint32_t hash_usr, port_id_t * egress_port_index) { if (group_item->dev_num != 1) { port_id_t port_index_array[MR_LB_MAX_DEV_FOR_SINGLE_GROUP]; uint16_t dev_up_num = get_dev_remote_link_up_num(group_item, port_index_array); if (likely(dev_up_num != 0)) { uint16_t member_index = hash_usr % dev_up_num; *egress_port_index = port_index_array[member_index]; return RT_SUCCESS; } } /* Session = Null default remote up */ if (likely(group_item->sessions[0] == NULL)) { *egress_port_index = group_item->dev_array[0].dev_desc->port_id; return RT_SUCCESS; } if (unlikely(get_health_check_remote_state(group_item->sessions[0]) != REMOTE_DOWN)) { *egress_port_index = group_item->dev_array[0].dev_desc->port_id; return RT_SUCCESS; } return RT_ERR; } /************************************* LB config **************************************/ /* Parser the Local lb config */ int parser_local_lb_conf(struct sc_main * sc, struct node_lb_main * lb_main) { /* Get sid range start */ uint32_t sid_start; const char * cfg_path = sc->local_cfgfile; MESA_load_profile_uint_def(cfg_path, "service_lb", "sid_start", &sid_start, 1000); /* Get sid range end */ uint32_t sid_end; MESA_load_profile_uint_def(cfg_path, "service_lb", "sid_end", &sid_end, 2000); /* Check sid range */ if (sid_start > sid_end) { MR_ERROR("Service Lb error: 'sid_end' is less than 'sid_start'."); return RT_ERR; } /* Get load balance group */ uint32_t nr_group = 0; for (int index = 0; index < MR_LB_MAX_GROUP; index++) { char str_section[MR_STRING_MAX] = {}; snprintf(str_section, sizeof(str_section), "load_balance:%d", index); uint32_t sid; int ret = MESA_load_profile_uint_nodef(cfg_path, str_section, "sid", &sid); if (ret < 0) continue; if (sid < sid_start || sid > sid_end) { MR_ERROR("Load balance configuration error: 'sid' value in '%s' is invalid, valid 'sid' range: [%u, %u].", str_section, sid_start, sid_end); return RT_ERR; } uint32_t mode; ret = MESA_load_profile_uint_nodef(cfg_path, str_section, "mode", &mode); if (ret < 0) { MR_ERROR("Load balance config error: '%s' missing 'mode' configuration.", str_section); return RT_ERR; } if (mode != GROUP_MODE_BALANCE) { MR_ERROR("Load balance config error: 'mode' in '%s' is invalid. Currently, only 'balance(1)' is supported.", str_section); return RT_ERR; } /* Save Mode */ struct lb_group * lb_group = &lb_main->lb_groups[nr_group]; lb_group->group_mode = GROUP_MODE_BALANCE; lb_group->dispatch_func = (lb_dispatch_func)load_balance; char str_devices[MR_STRING_MAX] = {}; ret = MESA_load_profile_string_nodef(cfg_path, str_section, "devices", str_devices, sizeof(str_devices)); if (ret < 0) { MR_ERROR("Load balance config error: No 'devices' configuration in '%s'.", str_section); return RT_ERR; } char * str_tokens[MR_TOKENS_MAX] = {}; int nr_str_tokens = rte_strsplit(str_devices, sizeof(str_devices), str_tokens, MR_TOKENS_MAX, ','); if (nr_str_tokens % 2) { MR_ERROR("Load balance config error: 'devices' in '%s' has an invalid number of arguments.", str_section); return RT_ERR; } uint64_t dev_num = 0; for (int i = 0; i < nr_str_tokens; i += 2) { uint8_t dev_type = DEV_TYPE_MAX; struct lb_dev * lb_dev = &lb_group->dev_array[dev_num]; /* Check the dev name */ struct mr_dev_desc * dev_desc = mr_dev_desc_lookup(sc->devmgr_main, str_tokens[i]); if (dev_desc == NULL) { MR_ERROR("Load balance config error: Device name '%s' in '%s' is invalid.", str_section, str_tokens[i]); return RT_ERR; } /* Save Type */ sscanf(str_tokens[i + 1], "%hhu", &dev_type); if (dev_type != DEV_TYPE_NORMAL) { MR_ERROR( "Load balance config error: 'type' in '%s' is invalid. Currently, only 'normal(0)' is supported.", str_section); return RT_ERR; } dev_num++; lb_dev->dev_type = dev_type; lb_dev->dev_flag = DEV_ENABLE; lb_dev->dev_desc = dev_desc; snprintf(lb_dev->dev_sym, sizeof(lb_dev->dev_sym), "%s", str_tokens[i]); } uint16_t sessions_num = 0; char str_health_check_session[MR_STRING_MAX] = {}; ret = MESA_load_profile_string_nodef(cfg_path, str_section, "health_check_sessions", str_health_check_session, sizeof(str_health_check_session)); if (ret > 0) { char * str_tokens[MR_TOKENS_MAX] = {}; sessions_num = rte_strsplit(str_health_check_session, sizeof(str_health_check_session), str_tokens, MR_TOKENS_MAX, ','); if (sessions_num > dev_num) { MR_ERROR( "Load balance config error: 'health_check_sessions' in '%s' has an invalid number of arguments.", str_section); return RT_ERR; } for (int index = 0; index < sessions_num; index++) { if (get_health_check_session_for_name(str_tokens[index], &lb_group->sessions[index]) == RT_ERR) { MR_ERROR("Load balance config error: 'health_check_sessions: %s' in '%s' is invalid.", str_section, str_tokens[index]); return RT_ERR; } } } lb_group->index = nr_group; lb_group->group_status = GROUP_STATUS_IN_USE; lb_group->dev_num = dev_num; lb_group->sessions_num = sessions_num; lb_group->sid = sid; nr_group++; } /* Check group num */ uint32_t nr_sid = sid_end - sid_start + 1; if (nr_group > nr_sid) { MR_ERROR("Load balance config error: Group number '%u' exceeds sid number '%u'.", nr_group, nr_sid); return RT_ERR; } /* inserter sid to forwarder table */ for (int i = 0; i < nr_group; i++) { struct lb_group * lb_group = &lb_main->lb_groups[i]; if (lb_group->group_status == GROUP_STATUS_IN_USE) forwarder_table_insert(lb_group->sid, FORWARDER_TYPE_LB); } /* Save sid info */ lb_main->nr_group = nr_group; lb_main->nr_sid = nr_sid; lb_main->sid_start = sid_start; lb_main->sid_end = sid_end; return RT_SUCCESS; } /* Dump lb config */ void dump_lb_config(struct node_lb_main * lb_main) { if (lb_main->nr_group == 0) return; MR_INFO(" "); MR_INFO("Load balance config: total_num=%u, sid_num=%u, sid_start=%u, sid_end=%u.", lb_main->nr_group, lb_main->nr_sid, lb_main->sid_start, lb_main->sid_end); for (int i = 0; i < MR_LB_MAX_GROUP; i++) { struct lb_group * lb_groups = &lb_main->lb_groups[i]; if (lb_groups->group_status != GROUP_STATUS_IN_USE) continue; char str_cfg_info[2048]; int len = snprintf(str_cfg_info, sizeof(str_cfg_info), "Load balance, id=%d, sid=%u", i, lb_groups->sid); switch (lb_groups->group_mode) { case GROUP_MODE_BALANCE: len += snprintf(str_cfg_info + len, sizeof(str_cfg_info) - len, ", mode=balance"); break; default: break; } /* Device info */ len += snprintf(str_cfg_info + len, sizeof(str_cfg_info) - len, ", dev_count=%u", lb_groups->dev_num); for (int j = 0; j < MR_LB_MAX_DEV_FOR_SINGLE_GROUP; j++) { struct lb_dev * _lb_dev_item = &lb_groups->dev_array[j]; if (_lb_dev_item->dev_flag != DEV_ENABLE) continue; len += snprintf(str_cfg_info + len, sizeof(str_cfg_info) - len, ", dev_name=%s", _lb_dev_item->dev_sym); switch (_lb_dev_item->dev_type) { case DEV_TYPE_NORMAL: len += snprintf(str_cfg_info + len, sizeof(str_cfg_info) - len, ", dev_type=normal"); break; default: break; } } /* Health check info */ if (lb_groups->sessions_num == 0) continue; len += snprintf(str_cfg_info + len, sizeof(str_cfg_info) - len, ", health_check="); for (int index = 0; index < lb_groups->sessions_num; index++) { if (lb_groups->sessions[index] == NULL) len += snprintf(str_cfg_info + len, sizeof(str_cfg_info) - len, "NULL,"); else len += snprintf(str_cfg_info + len, sizeof(str_cfg_info) - len, "%s,", get_health_check_session_name(lb_groups->sessions[index])); } MR_INFO("%s", str_cfg_info); } } /* Init lb */ int lb_init(struct sc_main * sc) { struct node_lb_main * lb_main = lb_main_get(); memset(lb_main, 0, sizeof(struct node_lb_main)); /* Parser the local lb config */ int ret = parser_local_lb_conf(sc, lb_main); /* Dump the config */ if (ret != RT_ERR) dump_lb_config(lb_main); return ret; } /* Generate and store the trace information */ static __rte_always_inline void gen_store_trace_info(struct rte_node * node, struct rte_mbuf * mbuf, uint16_t next_node_index, struct lb_group * lb_group) { /* Populate the next node infomation */ char str_record[MR_STRING_MAX]; int len = snprintf(str_record, sizeof(str_record), "next node:%s", node->nodes[next_node_index]->name); /* Populate the reason for next node */ if (unlikely(next_node_index == LB_NEXT_FORWARDER)) { len += snprintf(str_record + len, sizeof(str_record) - len, ", rsn:remote down"); } else { /* Populate the egress port */ struct mrb_metadata * mrb_meta = mrbuf_cz_data(mbuf, MR_NODE_CTRLZONE_ID); len += snprintf(str_record + len, sizeof(str_record) - len, ", tx:%u", mrb_meta->port_egress); } /* Populate the lb information */ len += snprintf(str_record + len, sizeof(str_record) - len, ", cur sid:%u, lb id:%u", lb_group->sid, lb_group->index); /* Emit the trace record */ struct dp_trace_record_meta meta = { .measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE, .appsym = MR_TRACE_APPSYM, .module = node->name}; dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record); } /************************************** LB node **************************************/ /* LB node process function */ static __rte_always_inline uint16_t lb_node_process(struct rte_graph * graph, struct rte_node * node, void ** objs, uint16_t cnt) { /* Get pkts num and pkts buffer */ uint16_t last_spec = 0; uint16_t n_left_from = cnt; struct rte_mbuf ** pkts = (struct rte_mbuf **)objs; struct rte_mbuf * mbuf = NULL; void ** batch_pkts = objs; uint16_t batch_next_node_index = LB_NEXT_ETH_EGRESS; uint16_t last_sid = 0; struct lb_stats stats = {}; struct lb_group * lb_group = NULL; struct node_lb_main * lb_main = lb_main_get(); uint32_t sid_start = lb_main->sid_start; /* Single packet processing */ while (n_left_from > 0) { uint16_t next_node_index; mbuf = pkts[0]; pkts += 1; n_left_from -= 1; /* Get private ctrlzone */ struct mrb_metadata * mrb_meta = mrbuf_cz_data(mbuf, MR_NODE_CTRLZONE_ID); if (last_sid != mrb_meta->cur_sid) { /* Get lb group index */ uint32_t lb_group_index = mrb_meta->cur_sid - sid_start; assert(lb_group_index < MR_LB_MAX_GROUP); assert(lb_group_index < lb_main->nr_group); if (unlikely(lb_group_index >= lb_main->nr_group)) { MR_ERROR("The lb group index is invalid: %u, cur_sid: %u, sid_start: %u, nr_group: %u.", lb_group_index, mrb_meta->cur_sid, sid_start, lb_main->nr_group); mrb_metadata_pktmbuf_dump(mbuf); next_node_index = LB_NEXT_PKT_DROP; goto node_enqueue; } lb_group = &lb_main->lb_groups[lb_group_index]; last_sid = mrb_meta->cur_sid; } port_id_t port_egress; int ret = lb_group->dispatch_func(lb_main, lb_group, mbuf->hash.usr, &port_egress); if (likely(ret != RT_ERR)) { stats.to_eth_egress++; mrb_meta->port_egress = port_egress; next_node_index = LB_NEXT_ETH_EGRESS; } else { stats.to_forwarder++; next_node_index = LB_NEXT_FORWARDER; } /* Check if tracing is enabled for the current Mbuf */ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE))) { gen_store_trace_info(node, mbuf, next_node_index, lb_group); } node_enqueue: /* Batch processing */ if (unlikely(batch_next_node_index != next_node_index)) { /* If the next index has been changed,enqueue last pkts */ rte_node_enqueue(graph, node, batch_next_node_index, batch_pkts, last_spec); batch_pkts += last_spec; last_spec = 1; batch_next_node_index = next_node_index; } else { /* If the next index not change, update the lasts */ last_spec++; } } /* Process the remaining packets */ if (likely(last_spec > 0)) rte_node_enqueue(graph, node, batch_next_node_index, batch_pkts, last_spec); struct lb_stats * graph_stats = &stats_per_graph[graph->id]; graph_stats->total_pkts += cnt; graph_stats->pkts_per_batch = cnt; graph_stats->to_eth_egress += stats.to_eth_egress; graph_stats->to_forwarder += stats.to_forwarder; return cnt; } /* LB node base */ static struct rte_node_register lb_node_base = { .process = lb_node_process, .name = "lb", .init = NULL, .nb_edges = LB_NEXT_MAX, .next_nodes = { [LB_NEXT_FORWARDER] = "forwarder", [LB_NEXT_ETH_EGRESS] = "eth_egress", [LB_NEXT_PKT_DROP] = "pkt_drop_trap", }, }; RTE_NODE_REGISTER(lb_node_base); /************************************** LB Statistics **************************************/ cJSON * lb_node_monit_loop(struct sc_main * sc) { cJSON * json_root = cJSON_CreateObject(); unsigned int nr_graphs = sc->nr_io_thread; uint64_t total_pkts[nr_graphs]; uint64_t pkts_per_batch[nr_graphs]; uint64_t to_eth_egress[nr_graphs]; uint64_t to_forwarder[nr_graphs]; for (uint32_t graph_id = 0; graph_id < nr_graphs; graph_id++) { struct lb_stats * stats = &stats_per_graph[graph_id]; if (stats->total_pkts == 0) { total_pkts[graph_id] = 0; pkts_per_batch[graph_id] = 0; to_eth_egress[graph_id] = 0; to_forwarder[graph_id] = 0; continue; } total_pkts[graph_id] = stats->total_pkts; pkts_per_batch[graph_id] = stats->pkts_per_batch; to_eth_egress[graph_id] = stats->to_eth_egress; to_forwarder[graph_id] = stats->to_forwarder; } cJSON * json_total_pkts = create_uint64_array(total_pkts, nr_graphs); cJSON_AddItemToObject(json_root, "lb, total_pkts", json_total_pkts); cJSON * json_pkts_per_batch = create_uint64_array(pkts_per_batch, nr_graphs); cJSON_AddItemToObject(json_root, "lb, pkts_per_batch", json_pkts_per_batch); cJSON * json_to_eth_egress = create_uint64_array(to_eth_egress, nr_graphs); cJSON_AddItemToObject(json_root, "lb, to_eth_egress", json_to_eth_egress); cJSON * json_to_forwarder = create_uint64_array(to_forwarder, nr_graphs); cJSON_AddItemToObject(json_root, "lb, to_forwarder", json_to_forwarder); return json_root; }