summaryrefslogtreecommitdiff
path: root/infra/src/vnode_mirror.c
blob: d613618a128a9fd4f8b981a2402ac9e5a731edaa (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423

/*	Virtual Data Deliver Node - Packet Deliver Class
    Author	:	Lu Qiuwen<[email protected]>
                Zheng Chao<[email protected]>
    Date	:	2017-01-09
*/

#include <assert.h>
#include <protect.h>
#include <rte_malloc.h>
#include <rte_mbuf.h>
#include <rte_ring.h>
#include <rte_spinlock.h>
#include <rte_version.h>
#include <sys/eventfd.h>
#include <unistd.h>

#include "vnode_common.h"

static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons * cons, unsigned int prodq,
                                     unsigned int consq, struct tunnel_desc * desc)
{
    struct vnode_cons_notify * cons_notify_ctx;

    /* nothing to send */
    if (desc->sz_en_buffer_used == 0)
    {
        return;
    }

    unsigned int nr_ring_count = desc->shared_credict_counter != NULL ? rte_ring_count(desc->tunnel_object) : 0;
    unsigned int nr_ring_to_use = nr_ring_count + desc->sz_en_buffer_used;
    unsigned int nr_shared_credict = 0;

    if (nr_ring_to_use > desc->tunnel_exclusive_size)
    {
        /* need to apply shared credict */
        assert(nr_ring_to_use >= (desc->tunnel_exclusive_size + desc->shared_credict_used));

        nr_shared_credict = nr_ring_to_use - (desc->tunnel_exclusive_size + desc->shared_credict_used);
        while (1)
        {
            uint32_t cur_value = rte_atomic32_read(desc->shared_credict_counter);
            uint32_t new_value = cur_value > nr_shared_credict ? cur_value - nr_shared_credict : 0;

            if (rte_atomic32_cmpset((volatile uint32_t *)desc->shared_credict_counter, cur_value, new_value))
            {
                nr_shared_credict = cur_value > new_value ? cur_value - new_value : 0;
                break;
            }
        }

        desc->shared_credict_used += nr_shared_credict;
        assert(desc->shared_credict_used <= (desc->tunnel_size - desc->tunnel_exclusive_size));
    }

    assert((desc->tunnel_exclusive_size + desc->shared_credict_used) >= nr_ring_count);

    unsigned int n_can_send = (desc->tunnel_exclusive_size + desc->shared_credict_used) - nr_ring_count;
    unsigned int n_to_send = RTE_MIN(desc->sz_en_buffer_used, n_can_send);

    size_t n_send_len = 0;
    for (unsigned int k = 0; k < n_to_send; k++)
    {
        n_send_len += rte_pktmbuf_data_len(desc->en_buffer[k]);
    }

    for (unsigned int k = 0; k < n_to_send; k++)
    {
        struct rte_mbuf * mbuf = desc->en_buffer[k];

        /* flush all cache lines in mbuf to release l1d */
        rte_cldemote(rte_mbuf_data_addr_default(mbuf));

        /* flush the mbuf's data first, and then flush the metadata of mbufs */
        /* because to know where is mbuf's data, we need to access mbuf's first cache line */
        rte_cldemote(mbuf);
        rte_cldemote(RTE_PTR_ADD(mbuf, RTE_CACHE_LINE_SIZE));
        rte_cldemote(RTE_PTR_ADD(mbuf, RTE_CACHE_LINE_SIZE * 2));
    }

    unsigned int n_send = rte_ring_sp_enqueue_burst(desc->tunnel_object, (void **)desc->en_buffer, n_to_send, NULL);
    unsigned int n_send_missed = desc->sz_en_buffer_used - n_send;

    /* packet is missed */
    if (unlikely(n_send_missed != 0))
    {
        for (unsigned int k = n_send; k < desc->sz_en_buffer_used; k++)
        {
            struct rte_mbuf * object_to_be_free = desc->en_buffer[k];
            n_send_len -= rte_pktmbuf_data_len(object_to_be_free);
            rte_pktmbuf_free(object_to_be_free);
        }

        /* return the shared credict */
        unsigned int nr_shared_credict_to_release = RTE_MIN(n_send_missed, nr_shared_credict);
        if (nr_shared_credict_to_release > 0)
        {
            rte_atomic32_add(desc->shared_credict_counter, (int32_t)nr_shared_credict_to_release);
            desc->shared_credict_used -= nr_shared_credict_to_release;
        }

        assert(desc->shared_credict_used <= (desc->tunnel_size - desc->tunnel_exclusive_size));
    }

    cons_notify_ctx = &cons->notify[consq];
    if (cons_notify_ctx->enable)
    {
        /* wakeup the cons when it is waiting */
        if (cons_notify_ctx->cons_running_status == CONS_STATUS_WAITING)
        {
            eventfd_write(cons_notify_ctx->cons_notify_eventfd, 1);
        }
    }

    // 更新生产者统计计数
    VNODE_STAT_UPDATE(prod, prodq, on_line, desc->sz_en_buffer_used);
    VNODE_STAT_UPDATE(prod, prodq, deliver, n_send);
    VNODE_STAT_UPDATE(prod, prodq, missed, n_send_missed);
    VNODE_STAT_UPDATE(prod, prodq, total_len, n_send_len);

    // 更新消费者统计计数
    VNODE_STAT_UPDATE(cons, consq, on_line, desc->sz_en_buffer_used);
    VNODE_STAT_UPDATE(cons, consq, deliver, n_send);
    VNODE_STAT_UPDATE(cons, consq, missed, n_send_missed);
    VNODE_STAT_UPDATE(cons, consq, total_len, n_send_len);

    // Batch size
#if 0
    VNODE_STAT_UPDATE(prod, prodq, batch_size_total, desc->sz_en_buffer_used);
    VNODE_STAT_UPDATE(prod, prodq, batch_size_count, 1);

    // Ring -- Prod
    VNODE_STAT_UPDATE_AVG(prod, prodq, ring_elem_count_avg, nr_ring_count);
    VNODE_STAT_UPDATE_AVG(prod, prodq, ring_shared_credict_avg, nr_shared_credict);
#endif

    // 清空缓冲区
    desc->sz_en_buffer_used = 0;
}

static inline void dist_tunnel_enqueue(struct vnode_prod * prod, struct vnode_cons * cons, unsigned int prodq,
                                       unsigned int consq, struct tunnel_desc * desc, struct rte_mbuf * obj)
{
#if VNODE_CHECK_THREAD_SAFE
    assert(rte_spinlock_trylock(&desc->lock_thread_safe_check));
#endif

    desc->en_buffer[desc->sz_en_buffer_used++] = obj;
    assert(desc->sz_en_buffer_used <= desc->sz_en_buffer);

#if 0
    // append the object at the tail of enqueue buffer.
    unsigned int pos;
    pos = desc->sz_en_buffer_used;

    assert(pos < desc->sz_en_buffer);
    desc->en_buffer[pos++] = obj;

    // the enqueue buffer is not full, return
    if (likely(pos < desc->sz_en_buffer))
    {
        desc->sz_en_buffer_used = pos;
        goto out;
    }

    desc->sz_en_buffer_used = desc->sz_en_buffer;
#endif

#if 0
    dist_tunnel_flush(prod, cons, prodq, consq, desc);
#endif

#if VNODE_CHECK_THREAD_SAFE

out:
    rte_spinlock_unlock(&desc->lock_thread_safe_check);
#endif

    return;
}

static inline int dist_tunnel_dequeue(struct tunnel_desc * desc, void * obj, int nr_max_obj)
{
    unsigned int nr_deq = rte_ring_sc_dequeue_burst(desc->tunnel_object, obj, nr_max_obj, NULL);
    unsigned int shared_credict_to_release = RTE_MIN(nr_deq, desc->shared_credict_used);

    if (shared_credict_to_release > 0)
    {
        rte_atomic32_add(desc->shared_credict_counter, (int32_t)shared_credict_to_release);
        desc->shared_credict_used -= shared_credict_to_release;
    }

    assert(desc->shared_credict_used <= (desc->tunnel_size - desc->tunnel_exclusive_size));
    return (int)nr_deq;
}

// Tunnel Block Data Operation, send objects according for their hash value,
// only use by prod.
// TODO: rewrite in SSE/SSE2/AVX/AVX2 intrinsics

static inline void dist_tunnel_block_flush(struct tunnel_block * block, int prodq)
{
    for (unsigned int consq = 0; consq < block->nr_consq; consq++)
    {
        struct tunnel_desc * tunnel = *tunnel_block_locate(block, prodq, consq);
        dist_tunnel_flush(block->prod, block->cons, prodq, consq, tunnel);
    }
}

static inline void dist_tunnel_block_enqueue_with_hash(struct tunnel_block * block, int prodq, struct rte_mbuf * obj[],
                                                       uint32_t hash[], int nr_obj)
{
    assert(nr_obj <= MR_LIBVNODE_MAX_SZ_BURST);
    for (unsigned int i = 0; i < nr_obj; i++)
    {
        assert(obj[i] != NULL);
        unsigned int consq = hash[i] % block->nr_consq;
        struct tunnel_desc * tunnel = *tunnel_block_locate(block, prodq, consq);
        dist_tunnel_enqueue(block->prod, block->cons, prodq, consq, tunnel, obj[i]);
    }

    dist_tunnel_block_flush(block, prodq);
}

#if 0

#define __FWDSTEP 4

static inline void dist_tunnel_block_enqueue_with_hash(struct tunnel_block * block,
    int prodq, struct rte_mbuf * obj[], uint32_t hash[], int nr_obj)
{
    struct tunnel_desc * tunnel;
    unsigned int idx = 0;
    unsigned int consq;

    switch (nr_obj % 4)
    {
    case 0:
        while (idx != nr_obj) {
            consq = hash[idx] % block->nr_consq;
            tunnel = *tunnel_block_locate(block, prodq, consq);
            dist_tunnel_enqueue(block->prod, block->cons, prodq, consq, tunnel, obj[idx]);
            idx++;
    case 3:
        consq = hash[idx] % block->nr_consq;
        tunnel = *tunnel_block_locate(block, prodq, consq);
        dist_tunnel_enqueue(block->prod, block->cons, prodq, consq, tunnel, obj[idx]);
        idx++;
    case 2:
        consq = hash[idx] % block->nr_consq;
        tunnel = *tunnel_block_locate(block, prodq, consq);
        dist_tunnel_enqueue(block->prod, block->cons, prodq, consq, tunnel, obj[idx]);
        idx++;
    case 1:
        consq = hash[idx] % block->nr_consq;
        tunnel = *tunnel_block_locate(block, prodq, consq);
        dist_tunnel_enqueue(block->prod, block->cons, prodq, consq, tunnel, obj[idx]);
        idx++;
        }
    }
}

#endif

// Tunnel Block Dequeue, dequeue from block, only used by cons.
// TODO: rewrite in SSE/SSE2/AVX/AVX2 intrinsics

static inline int dist_tunnel_block_dequeue(struct tunnel_block * block, int consq, struct rte_mbuf * obj[],
                                            int nr_max_obj)
{
    unsigned int nr_obj = 0, nr_obj_recv = 0;
    unsigned int nr_obj_left = nr_max_obj;

    for (int prodq = 0; prodq < block->nr_prodq; prodq++)
    {
        struct tunnel_desc * tunnel = *tunnel_block_locate(block, prodq, consq);
        nr_obj_recv = dist_tunnel_dequeue(tunnel, &obj[nr_obj], nr_obj_left);
        nr_obj += nr_obj_recv;
        nr_obj_left -= nr_obj_recv;
    }

    assert(nr_obj <= nr_max_obj);
    return nr_obj;
}



/*	VNode Data Operation Functions
    VNode Enqueue/Dequeue Data
*/

#if VNODE_MIRROR_CLONE_BY_REFCNT
static inline int __mirror_clone_objects(struct rte_mempool * clone_pool, struct rte_mbuf * source_objects[],
                                         struct rte_mbuf * cloned_objects[], unsigned int nr_source_objects)
{
    for (int i = 0; i < nr_source_objects; i++)
    {
        PROTECT_rte_mbuf_refcnt_update(source_objects[i], 1);
    }

    rte_memcpy(cloned_objects, source_objects, sizeof(struct rte_mbuf *) * nr_source_objects);
    return 0;
}

#else

static inline int __mirror_clone_objects(struct rte_mempool * clone_pool, struct rte_mbuf * source_objects[],
                                         struct rte_mbuf * cloned_objects[], unsigned int nr_source_objects)
{
    // 全部复制成功标志位,避免在循环体内判断,减少分支判断造成的流水线中断
    uintptr_t tag_clone_objects = 1;

    for (int i = 0; i < nr_source_objects; i++)
    {
        cloned_objects[i] = rte_pktmbuf_clone(source_objects[i], clone_pool);
        tag_clone_objects = (tag_clone_objects && cloned_objects[i]);
    }

    if (likely(tag_clone_objects))
        return 0;

    /* 克隆失败,释放所有已经克隆的报文 */
    for (int i = 0; i < nr_source_objects; i++)
    {
        PROTECT_rte_pktmbuf_free(cloned_objects[i]);
    }

    return -1;
}

#endif

int vnode_mirror_enqueue_bulk(struct vnode_prod * prod, unsigned int prodq, struct rte_mbuf * objects[],
                              uint32_t hash[], int nr_objects)
{
    assert(nr_objects <= MR_LIBVNODE_MAX_SZ_BURST);

    struct tunnel_block * block = ACCESS_ONCE(prod->block);
    if (unlikely(block == NULL))
    {
        goto failure;
    }

    dist_tunnel_block_enqueue_with_hash(block, (int)prodq, objects, hash, nr_objects);
    return 0;

failure:
    for (int i = 0; i < nr_objects; i++)
    {
        rte_pktmbuf_free(objects[i]);
    }

    VNODE_STAT_UPDATE(prod, prodq, on_line, nr_objects);
    VNODE_STAT_UPDATE(prod, prodq, missed, nr_objects);
    return 0;
}

int vnode_mirror_dequeue_burst(struct vnode_cons * cons, unsigned int consq, struct rte_mbuf * objects[],
                               int nr_max_objects)
{
    struct tunnel_block * block = ACCESS_ONCE(cons->block);
    if (likely(block != NULL))
    {
        return dist_tunnel_block_dequeue(block, consq, objects, nr_max_objects);
    }
    else
    {
        return 0;
    }
}

void vnode_mirror_flush(struct vnode_prod * prod, unsigned int prodq)
{
    struct tunnel_block * block = prod->block;
    if (unlikely(block != NULL))
    {
        dist_tunnel_block_flush(block, prodq);
    }
}

struct vnode * vnode_mirror_create(const char * sym, unsigned int sz_exclusive, unsigned int sz_shared,
                                   unsigned int sz_buffer, unsigned int notify_cons_when_rx,
                                   unsigned int batch_interval_us)
{
    struct vnode * vnode_common = __vnode_common_create(sym, sz_exclusive, sz_buffer, notify_cons_when_rx);

    if (vnode_common == NULL)
    {
        MR_ERROR("Mirror vnode %s create failed. ", sym);
        return NULL;
    }

    vnode_common->batch_interval_tsc = batch_interval_us * rte_get_timer_cycles() / US_PER_S;
    vnode_common->sz_shared = sz_shared;

    rte_atomic32_init(&vnode_common->shared_credict_counter);
    return vnode_common;
}

int vnode_mirror_delete(struct vnode * vnode)
{
    return __vnode_common_delete(vnode);
}

void vnode_mirror_common_unpoison(struct vnode * vnode)
{
    __vnode_common_unpoison(vnode);
}

__USE_COMMON_VNODE_CREATE_PROD(mirror)
__USE_COMMON_VNODE_CREATE_CONS(mirror)
__USE_COMMON_VNODE_DELETE_PROD(mirror)
__USE_COMMON_VNODE_DELETE_CONS(mirror)
__USE_COMMON_VNODE_CONS_LOOKUP(mirror)
__USE_COMMON_VNODE_PROD_LOOKUP(mirror)
__USE_COMMON_VNODE_PROD_STAT_GET(mirror)
__USE_COMMON_VNODE_CONS_STAT_GET(mirror)
__USE_COMMON_VNODE_PROD_ATTACH(mirror)
__USE_COMMON_VNODE_CONS_ATTACH(mirror)
__USE_COMMON_VNODE_UNPOISON_PROD(mirror)
__USE_COMMON_VNODE_UNPOISON_CONS(mirror)
__USE_COMMON_VNODE_NOTIFY_CTX_CONS(mirror)