#include #include #include #include #include #include "dp_trace.h" #include "vnode_common.h" static inline unsigned int dist_tunnel_rt_objects_retrieve(struct tunnel_desc * desc, struct rte_mbuf * rt_objs[], unsigned int nr_max_rt_objs) { assert(desc->sz_rt_buffer_used <= desc->sz_rt_buffer); unsigned int nr_rt_objs = desc->sz_rt_buffer_used; for (unsigned int i = 0; i < nr_rt_objs; i++) { rt_objs[i] = desc->rt_buffer[i]; } desc->sz_rt_buffer_used = 0; return nr_rt_objs; } static inline bool dist_tunnel_acquire_credits(struct vnode * vnode_desc, struct tunnel_desc * tunnel_desc, int32_t credits) { int32_t inflight_credits = tunnel_desc->inflight_credits; int32_t missing_credits = credits - inflight_credits; int32_t total_on_loan; int32_t available; int32_t acquired_credits; int32_t new_total_on_loan; if (likely(missing_credits <= 0)) { tunnel_desc->inflight_credits -= credits; return true; } #define DIST_TUNNEL_DESC_MIN_CREDITS (64) total_on_loan = __atomic_load_n(&vnode_desc->credits_on_loan, __ATOMIC_RELAXED); available = vnode_desc->max_inflight - total_on_loan; /* at least, acquire 64 credits */ acquired_credits = RTE_MAX(missing_credits, DIST_TUNNEL_DESC_MIN_CREDITS); if (available < acquired_credits) { return false; } /* This is a race, no locks are involved, and thus some other * thread can allocate tokens in between the check and the * allocation. */ new_total_on_loan = __atomic_fetch_add(&vnode_desc->credits_on_loan, acquired_credits, __ATOMIC_RELAXED) + acquired_credits; if (unlikely(new_total_on_loan > vnode_desc->max_inflight)) { __atomic_fetch_sub(&vnode_desc->credits_on_loan, acquired_credits, __ATOMIC_RELAXED); return false; } tunnel_desc->inflight_credits += acquired_credits; tunnel_desc->inflight_credits -= credits; return true; } static inline void dist_tunnel_return_credits(struct vnode * vnode_desc, struct tunnel_desc * tunnel_desc, int32_t credits) { tunnel_desc->inflight_credits += credits; #define DIST_TUNNEL_MAX_CREDITS (2 * DIST_TUNNEL_DESC_MIN_CREDITS) if (unlikely(tunnel_desc->inflight_credits > vnode_desc->max_credits_per_tunnel)) { int32_t leave_credits = vnode_desc->max_credits_per_tunnel; int32_t return_credits = tunnel_desc->inflight_credits - leave_credits; tunnel_desc->inflight_credits = leave_credits; __atomic_fetch_sub(&vnode_desc->credits_on_loan, return_credits, __ATOMIC_RELAXED); } } static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons * cons, unsigned int prodq, unsigned int consq, struct tunnel_desc * desc) { /* nothing to send */ if (desc->sz_en_buffer_used == 0) { return; } unsigned int n_to_send = desc->sz_en_buffer_used; size_t n_send_len = 0; for (unsigned int k = 0; k < desc->sz_en_buffer_used; k++) { n_send_len += rte_pktmbuf_data_len(desc->en_buffer[k]); } for (unsigned int k = 0; k < n_to_send; k++) { struct rte_mbuf * mbuf = desc->en_buffer[k]; /* flush all cache lines in mbuf to release l1d */ rte_cldemote(rte_mbuf_data_addr_default(mbuf)); /* flush the mbuf's data first, and then flush the metadata of mbufs */ /* because to know where is mbuf's data, we need to access mbuf's first cache line */ rte_cldemote(mbuf); rte_cldemote(RTE_PTR_ADD(mbuf, RTE_CACHE_LINE_SIZE)); rte_cldemote(RTE_PTR_ADD(mbuf, RTE_CACHE_LINE_SIZE * 2)); } unsigned int n_free_space = 0; unsigned int n_send = 0; bool is_acquire_credit_success = dist_tunnel_acquire_credits(prod->vnode, desc, (int32_t)n_to_send); /* acquire credit, if failed, drop all the packets */ if (likely(is_acquire_credit_success)) { n_send = rte_ring_sp_enqueue_burst(desc->tunnel_object, (void **)desc->en_buffer, n_to_send, &n_free_space); } else { n_send = 0; } unsigned int n_send_missed = desc->sz_en_buffer_used - n_send; /* packet is missed */ if (unlikely(n_send_missed != 0)) { for (unsigned int k = n_send; k < desc->sz_en_buffer_used; k++) { struct rte_mbuf * object_to_be_free = desc->en_buffer[k]; n_send_len -= rte_pktmbuf_data_len(object_to_be_free); /* move the mbuf to return buffer */ assert(desc->sz_rt_buffer_used < desc->sz_rt_buffer); desc->rt_buffer[desc->sz_rt_buffer_used++] = desc->en_buffer[k]; } if (is_acquire_credit_success) { dist_tunnel_return_credits(prod->vnode, desc, (int32_t)n_send_missed); } } struct vnode_cons_notify * cons_notify_ctx = &cons->notify[consq]; if (cons_notify_ctx->enable) { /* wakeup the cons when it is waiting */ if (cons_notify_ctx->cons_running_status == CONS_STATUS_WAITING) { eventfd_write(cons_notify_ctx->cons_notify_eventfd, 1); } } /* update the counters */ desc->on_line += desc->sz_en_buffer_used; desc->deliver += n_send; desc->missed += n_send_missed; desc->total_len += n_send_len; /* q_len */ desc->q_len = desc->tunnel_size - n_free_space; desc->q_len_avg += 0.2F * ((float)desc->q_len - desc->q_len_avg); // clear the buffer desc->sz_en_buffer_used = 0; } static inline void dist_tunnel_enqueue(struct vnode_prod * prod, struct vnode_cons * cons, unsigned int prodq, unsigned int consq, struct tunnel_desc * desc, struct rte_mbuf * obj) { #if VNODE_CHECK_THREAD_SAFE assert(rte_spinlock_trylock(&desc->lock_thread_safe_check)); #endif desc->en_buffer[desc->sz_en_buffer_used++] = obj; assert(desc->sz_en_buffer_used <= desc->sz_en_buffer); #if 0 // append the object at the tail of enqueue buffer. unsigned int pos; pos = desc->sz_en_buffer_used; assert(pos < desc->sz_en_buffer); desc->en_buffer[pos++] = obj; // the enqueue buffer is not full, return if (likely(pos < desc->sz_en_buffer)) { desc->sz_en_buffer_used = pos; goto out; } desc->sz_en_buffer_used = desc->sz_en_buffer; #endif #if 0 dist_tunnel_flush(prod, cons, prodq, consq, desc); #endif #if VNODE_CHECK_THREAD_SAFE out: rte_spinlock_unlock(&desc->lock_thread_safe_check); #endif } static inline int dist_tunnel_dequeue(struct vnode * vnode_desc, struct tunnel_desc * tunnel_desc, void * obj, unsigned int nr_max_obj) { unsigned int nr_deq = rte_ring_sc_dequeue_burst(tunnel_desc->tunnel_object, obj, nr_max_obj, NULL); dist_tunnel_return_credits(vnode_desc, tunnel_desc, (int32_t)nr_deq); return (int)nr_deq; } // Tunnel Block Data Operation, send objects according for their hash value, // only use by prod. // TODO: rewrite in SSE/SSE2/AVX/AVX2 intrinsics static inline void dist_tunnel_block_flush(struct tunnel_block * block, int prodq) { for (unsigned int consq = 0; consq < block->nr_consq; consq++) { struct tunnel_desc * tunnel = *tunnel_block_locate(block, prodq, consq); dist_tunnel_flush(block->prod, block->cons, prodq, consq, tunnel); } } static inline void dist_tunnel_block_enqueue_with_hash(struct tunnel_block * block, unsigned int prodq, struct rte_mbuf * obj[], uint32_t hash[], unsigned int nr_obj) { assert(nr_obj <= MR_LIBVNODE_MAX_SZ_BURST); for (unsigned int i = 0; i < nr_obj; i++) { assert(obj[i] != NULL); unsigned int consq = hash[i] % block->nr_consq; struct tunnel_desc * tunnel = *tunnel_block_locate(block, prodq, consq); dist_tunnel_enqueue(block->prod, block->cons, prodq, consq, tunnel, obj[i]); } dist_tunnel_block_flush(block, prodq); } static inline unsigned int dist_tunnel_block_rt_objects_retrieve(struct tunnel_block * block, unsigned int prodq, struct rte_mbuf * rt_objs[], unsigned int nr_max_rt_objs) { unsigned int nr_rt_objs = 0; unsigned int nr_rt_objs_left = nr_max_rt_objs; for (unsigned int consq = 0; consq < block->nr_consq; consq++) { struct tunnel_desc * tunnel = *tunnel_block_locate(block, prodq, consq); unsigned int nr_rt_objs_recv = dist_tunnel_rt_objects_retrieve(tunnel, &rt_objs[nr_rt_objs], nr_rt_objs_left); nr_rt_objs += nr_rt_objs_recv; nr_rt_objs_left -= nr_rt_objs_recv; } assert(nr_rt_objs <= nr_max_rt_objs); return nr_rt_objs; } // Tunnel Block Dequeue, dequeue from block, only used by cons. // TODO: rewrite in SSE/SSE2/AVX/AVX2 intrinsics static inline unsigned int dist_tunnel_block_dequeue(struct vnode * vnode_desc, struct tunnel_block * block, unsigned int consq, struct rte_mbuf * obj[], unsigned int nr_max_obj) { unsigned int nr_obj = 0, nr_obj_recv = 0; unsigned int nr_obj_left = nr_max_obj; for (unsigned int prodq = 0; prodq < block->nr_prodq; prodq++) { struct tunnel_desc * tunnel = *tunnel_block_locate(block, prodq, consq); nr_obj_recv = dist_tunnel_dequeue(vnode_desc, tunnel, &obj[nr_obj], nr_obj_left); nr_obj += nr_obj_recv; nr_obj_left -= nr_obj_recv; } assert(nr_obj <= nr_max_obj); return nr_obj; } int vnode_mirror_rt_object_retrieve(struct vnode_prod * prod, unsigned int prodq, struct rte_mbuf * rt_objs[], unsigned int nr_max_rt_objects) { struct tunnel_block * block = ACCESS_ONCE(prod->block); if (likely(block != NULL)) { return (int)dist_tunnel_block_rt_objects_retrieve(block, prodq, rt_objs, nr_max_rt_objects); } else { return 0; } } int vnode_mirror_enqueue_bulk(struct vnode_prod * prod, unsigned int prodq, struct rte_mbuf * objects[], uint32_t hash[], unsigned int nr_objects) { assert(nr_objects <= MR_LIBVNODE_MAX_SZ_BURST); struct tunnel_block * block = ACCESS_ONCE(prod->block); if (unlikely(block == NULL)) { return -1; } dist_tunnel_block_enqueue_with_hash(block, prodq, objects, hash, nr_objects); return 0; } int vnode_mirror_dequeue_burst(struct vnode_cons * cons, unsigned int consq, struct rte_mbuf * objects[], int nr_max_objects) { struct tunnel_block * block = ACCESS_ONCE(cons->block); if (likely(block != NULL)) { return (int)dist_tunnel_block_dequeue(cons->vnode, block, consq, objects, nr_max_objects); } else { return 0; } } void vnode_mirror_flush(struct vnode_prod * prod, unsigned int prodq) { struct tunnel_block * block = prod->block; if (unlikely(block != NULL)) { dist_tunnel_block_flush(block, prodq); } } struct vnode * vnode_mirror_create(const char * sym, unsigned int sz_exclusive, unsigned int sz_max_inflight, unsigned int notify_cons_when_rx, unsigned int batch_interval_us) { struct vnode * vnode_common = __vnode_common_create(sym, sz_exclusive, sz_max_inflight, notify_cons_when_rx); if (vnode_common == NULL) { MR_ERROR("Mirror vnode %s create failed. ", sym); return NULL; } if (sz_max_inflight == 0) { vnode_common->max_inflight = INT32_MAX; vnode_common->credits_on_loan = 0; } else { vnode_common->max_inflight = (int32_t)sz_max_inflight; vnode_common->credits_on_loan = 0; } vnode_common->batch_interval_tsc = batch_interval_us * rte_get_timer_cycles() / US_PER_S; return vnode_common; } int vnode_mirror_delete(struct vnode * vnode) { return __vnode_common_delete(vnode); } __USE_COMMON_VNODE_CREATE_PROD(mirror) __USE_COMMON_VNODE_CREATE_CONS(mirror) __USE_COMMON_VNODE_DELETE_PROD(mirror) __USE_COMMON_VNODE_DELETE_CONS(mirror) __USE_COMMON_VNODE_CONS_LOOKUP(mirror) __USE_COMMON_VNODE_PROD_LOOKUP(mirror) __USE_COMMON_VNODE_PROD_STAT_GET(mirror) __USE_COMMON_VNODE_CONS_STAT_GET(mirror) __USE_COMMON_VNODE_PROD_ATTACH(mirror) __USE_COMMON_VNODE_CONS_ATTACH(mirror) __USE_COMMON_VNODE_UNPOISON_PROD(mirror) __USE_COMMON_VNODE_UNPOISON_CONS(mirror) __USE_COMMON_VNODE_NOTIFY_CTX_CONS(mirror)