summaryrefslogtreecommitdiff
path: root/service/src/node.c
blob: a38a1fa342c02b4ae500caaa6a02f2d8a6e2fa21 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
/// Node Manager for RTE_GRAPH infrastructures
/// The node manager is a helper for construct the packet processing graph.

/// Author	:	Qiuwen Lu<[email protected]>
///	Date	:	2022-6-8

#include <assert.h>
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <strings.h>
#include <sys/queue.h>
#include <unistd.h>

#include <rte_config.h>
#include <rte_graph.h>
#include <rte_graph_worker.h>
#include <rte_malloc.h>
#include <rte_node_eth_api.h>

#include <common.h>
#include <sc_common.h>
#include <sc_node.h>

#if 0
static int ethdev_rx_tx_nodes_setup(struct sc_main * sc, struct node_manager_main * ctx)
{
    /*  For each physical devices, the dpdk's library will create:
            N nodes for ethdev-rx, N for the rx queue of the physical device,
            1 nodes for ethdev-tx, the tx-queue is same with graph id.
    */

    /*  perpare the node_ethdev_config structures for physical devices */
    struct dpdk_dev * phydev_iter = NULL;
    unsigned int nr_workers = sc->nr_io_thread;


    while (phydev_iterate(sc->phydev_main, &phydev_iter) >= 0)
    {
        struct rte_node_ethdev_config * ethdev_config = &ctx->node_ethdev_config[ctx->nr_node_ethdev_config];
        MR_VERIFY(ctx->nr_node_ethdev_config < RTE_DIM(ctx->node_ethdev_config));

        /*  at here, set mp to NULL and mp_count to zero to ignore the priv_size check in dpdk's library */
        ethdev_config->mp = NULL;
        ethdev_config->mp_count = 0;
        ethdev_config->port_id = phydev_iter->port_id;
        ethdev_config->num_rx_queues = phydev_iter->nr_rxq;
        ethdev_config->num_tx_queues = phydev_iter->nr_txq;

        for (unsigned int worker_id = 0; worker_id < nr_workers; worker_id++)
        {
            /* each worker has one rx node for each physical device */
            struct node_setup_desc * ethdev_rx_node_desc = ZMALLOC(sizeof(struct node_setup_desc));
            struct node_setup_desc * ethdev_tx_node_desc = ZMALLOC(sizeof(struct node_setup_desc));

            MR_VERIFY_MALLOC(ethdev_rx_node_desc);
            MR_VERIFY_MALLOC(ethdev_tx_node_desc);

            snprintf(ethdev_rx_node_desc->node_symbol, sizeof(ethdev_rx_node_desc->node_symbol) - 1, "ethdev_rx-%u-%u",
                     phydev_iter->port_id, worker_id);

            snprintf(ethdev_tx_node_desc->node_symbol, sizeof(ethdev_tx_node_desc->node_symbol) - 1, "ethdev_tx-%u",
                     phydev_iter->port_id);

            assert(worker_id < RTE_DIM(ctx->node_setup_descs));
            TAILQ_INSERT_TAIL(&ctx->node_setup_descs[worker_id], ethdev_rx_node_desc, entries);
            ctx->nr_node_setup_descs[worker_id]++;

            TAILQ_INSERT_TAIL(&ctx->node_setup_descs[worker_id], ethdev_tx_node_desc, entries);
            ctx->nr_node_setup_descs[worker_id]++;
        }

        ctx->nr_node_ethdev_config++;
    }

    /* create the ethdev rx and tx nodes */
    int ret = rte_node_eth_config(ctx->node_ethdev_config, ctx->nr_node_ethdev_config, nr_workers);
    if (unlikely(ret < 0))
    {
        MR_ERROR("Failed at creating ethdev nodes: ret = %d", ret);
        goto error;
    }

    return ret;

error:
    return RT_ERR;
}
#endif

static int graph_init(struct sc_main * sc, struct node_manager_main * ctx)
{
    /* create the graph structure for each i/o core */
    for (unsigned int worker_id = 0; worker_id < sc->nr_io_thread; worker_id++)
    {
        /* convert the patterns from tailq chains to string arrays */
        unsigned int nr_node_setup_desc = ctx->nr_node_setup_descs[worker_id];
        if (nr_node_setup_desc == 0)
        {
            continue;
        }

        const char ** node_patterns = ZMALLOC(sizeof(char *) * nr_node_setup_desc);
        MR_VERIFY_MALLOC(node_patterns);

        struct node_setup_desc * node_desc_iter = NULL;
        unsigned int nr_node_patterns = 0;

        TAILQ_FOREACH(node_desc_iter, &ctx->node_setup_descs[worker_id], entries)
        {
            node_patterns[nr_node_patterns] = node_desc_iter->node_symbol;
            nr_node_patterns++;
        }

        assert(nr_node_patterns == nr_node_setup_desc);

        /* create the graph config and handles */
        unsigned int lcore_id = cpu_set_location(&sc->cpu_set_io, worker_id);
        struct graph_handlers * graph_handle = &ctx->graph_handlers[worker_id];

        snprintf(graph_handle->graph_symbol, sizeof(graph_handle->graph_symbol) - 1, "lcore_%d", lcore_id);
        graph_handle->lcore_id = lcore_id;

        struct rte_graph_param graph_param = {
            .socket_id = (int)rte_lcore_to_socket_id(graph_handle->lcore_id),
            .node_patterns = node_patterns,
            .nb_node_patterns = nr_node_patterns,
        };

        rte_graph_t graph_id = rte_graph_create(graph_handle->graph_symbol, &graph_param);
        FREE(node_patterns);

        if (unlikely(graph_id == RTE_GRAPH_ID_INVALID))
        {
            MR_ERROR("rte_graph_create(): graph_id invalid for lcore %d", lcore_id);
            goto errout;
        }

        graph_handle->graph_id = graph_id;
        graph_handle->graph_handle = rte_graph_lookup(graph_handle->graph_symbol);
        MR_VERIFY(graph_handle->graph_handle != NULL);

        /* logs */
        MR_INFO("Create graph %s(id = %u) for lcore %u successfully.", graph_handle->graph_symbol,
                graph_handle->graph_id, graph_handle->lcore_id);
    }

    return RT_SUCCESS;

errout:
    return RT_ERR;
}

int32_t node_manager_pkt_graph_service_entry(void * args)
{
    struct sc_main * sc = (struct sc_main *)args;
    struct node_manager_main * node_mgr_main = sc->node_mgr_main;

    assert(sc != NULL);
    assert(node_mgr_main != NULL);

    unsigned int lcore_id = rte_lcore_id();
    unsigned int worker_id = cpu_set_relative_id(&sc->cpu_set_io, lcore_id);

    struct rte_graph * graph_object = node_mgr_main->graph_handlers[worker_id].graph_handle;
    if (likely(graph_object != NULL))
    {
        rte_graph_walk(graph_object);
    }

    /* mark the lcore is alive */
    rte_keepalive_mark_alive(sc->keepalive);
    return 0;
}

void node_setup_desc_add_for_all_workers(struct node_manager_main * node_mgr_main, const char * node_sym)
{
    for (unsigned int worker_id = 0; worker_id < node_mgr_main->nr_workers; worker_id++)
    {
        struct node_setup_desc * desc = ZMALLOC(sizeof(struct node_setup_desc));
        MR_VERIFY_MALLOC(desc);

        snprintf(desc->node_symbol, sizeof(desc->node_symbol) - 1, "%s", node_sym);
        TAILQ_INSERT_TAIL(&node_mgr_main->node_setup_descs[worker_id], desc, entries);
        node_mgr_main->nr_node_setup_descs[worker_id]++;
    }

    return;
}

static const struct rte_mbuf_dynfield node_mbuf_shared_desc = {
    .name = "node_mbuf_shared_ctx",
    .size = sizeof(struct node_mbuf_shared_ctx),
    .align = __alignof__(struct node_mbuf_shared_ctx),
};

static_assert(sizeof(struct node_mbuf_shared_ctx) <= 64, "the node ctx for rte_mbuf should be packed.");
int node_mbuf_shared_ctx_offset = 0;

extern int node_manager_dev_init(struct sc_main * sc, struct node_manager_main * node_mgr_main);
extern int node_manager_health_check_init(struct node_manager_main * node_mgr_main);
extern int node_eth_ingress_init(struct node_manager_main * node_mgr_main);
extern int node_eth_egress_init(struct node_manager_main * node_mgr_main);

int node_manager_deinit(struct node_manager_main * ctx)
{
    return 0;
}

int node_manager_init(struct sc_main * sc)
{
    struct node_manager_main * node_mgr_main = ZMALLOC(sizeof(struct node_manager_main));
    MR_VERIFY_MALLOC(node_mgr_main);

    node_mgr_main->nr_workers = sc->nr_io_thread;
    for (unsigned int i = 0; i < RTE_DIM(node_mgr_main->node_setup_descs); i++)
    {
        TAILQ_INIT(&node_mgr_main->node_setup_descs[i]);
    }

    /* shared ctx for mbuf */
    node_mbuf_shared_ctx_offset = rte_mbuf_dynfield_register(&node_mbuf_shared_desc);
    if (unlikely(node_mbuf_shared_ctx_offset < 0))
    {
        MR_ERROR("Failed at reserve space for node_mbuf_ctx");
        goto errout;
    }

    if (unlikely(node_manager_dev_init(sc, node_mgr_main) < 0))
    {
        MR_ERROR("Failed at setup rx/tx nodes for devices.");
        goto errout;
    }

    if (unlikely(node_eth_ingress_init(node_mgr_main) < 0))
    {
        MR_ERROR("Failed at init node_eth_ingress.");
        goto errout;
    }

    if (unlikely(node_eth_egress_init(node_mgr_main) < 0))
    {
        MR_ERROR("Failed at init node_eth_egress.");
        goto errout;
    }

    if (unlikely(node_manager_health_check_init(node_mgr_main) < 0))
    {
        MR_ERROR("Failed at setup health check nodes");
        goto errout;
    }

    if (unlikely(graph_init(sc, node_mgr_main) < 0))
    {
        MR_ERROR("Failed at graph init");
        goto errout;
    }

    sc->node_mgr_main = node_mgr_main;
    return RT_SUCCESS;

errout:
    if (node_mgr_main != NULL)
    {
        node_manager_deinit(node_mgr_main);
    }

    return RT_ERR;
}