/* Virtual Data Deliver Node - Base Class Author : Lu Qiuwen Zheng Chao Date : 2016-12-10 TODO: Try to use RCU in thread safe mode. */ #include #include #include #include #include #include #include #include #include #include #include #include #include //usleep #include #include #include #include #include "vnode_common.h" /* What is a tunnel ? * tunnel is a fifo with fixed size. we use rte_ring as tunnel. * * What is a tunnel block ? * A 2d array of tunnels, trans data from a prod to a cons * +-------------+-------------+-------------+------------+------------+ * | nr_prodq | nr_consq | descs[0][0] | desc[0][1] | .......... | * +-------------+-------------+-------------+------------+------------+ * Len = sizeof(nr_prodq) + sizeof(nr_consq) + sizeof(descs) * nr_prodq * nr_consq */ static struct tunnel_desc * tunnel_new(const char * symbol, unsigned int size, unsigned int sz_buffer) { struct tunnel_desc * desc = ZMALLOC(sizeof(struct tunnel_desc)); MR_VERIFY_MALLOC(desc); desc->tunnel_object = rte_ring_create(symbol, size, SOCKET_ID_ANY, RING_F_SC_DEQ | RING_F_SP_ENQ); if (desc->tunnel_object == NULL) { MR_ERROR("Create tunnel %s failed : %s", symbol, MR_STR_ERRNO(errno)); return NULL; } snprintf(desc->symbol, sizeof(desc->symbol), "%s", symbol); desc->tunnel_size = size; // 禁用buffer,就是让buffer为1,写入一个后立即发出 if (sz_buffer == 0) sz_buffer = 1; desc->en_buffer = ZMALLOC(sizeof(void *) * sz_buffer); MR_VERIFY_MALLOC(desc->en_buffer); desc->sz_en_buffer = sz_buffer; #if VNODE_CHECK_THREAD_SAFE rte_spinlock_init(&desc->lock_thread_safe_check); #endif return desc; } /* Delete a tunnel */ static int tunnel_delete(struct tunnel_desc * desc) { for (int i = 0; i < desc->sz_en_buffer_used; i++) rte_pktmbuf_free(desc->en_buffer[i]); struct rte_mbuf * mbuf; while (rte_ring_dequeue(desc->tunnel_object, (void **)&mbuf) == 0) rte_pktmbuf_free(mbuf); MR_VERIFY_2(rte_ring_empty(desc->tunnel_object) == 1, "Tunnel %s is not empty", desc->symbol); rte_free(desc->en_buffer); rte_ring_free(desc->tunnel_object); rte_free(desc); return 0; } /* Delete a block of tunnels */ static int tunnel_block_delete(struct tunnel_block * block) { for (int prodq_id = 0; prodq_id < block->nr_prodq; prodq_id++) { for (int consq_id = 0; consq_id < block->nr_consq; consq_id++) { if (*tunnel_block_locate(block, prodq_id, consq_id) != NULL) tunnel_delete(*tunnel_block_locate(block, prodq_id, consq_id)); } } rte_free(block); return 0; } static void tunnel_unpoison(struct tunnel_desc * desc) { MR_ASAN_UNPOISON_MEMORY_REGION(desc, sizeof(struct tunnel_desc)); MR_ASAN_UNPOISON_MEMORY_REGION(desc->en_buffer, sizeof(void *) * desc->sz_en_buffer); MR_ASAN_UNPOISON_MEMORY_REGION(desc->tunnel_object, sizeof(struct rte_ring)); ssize_t sz_tunnel_object = rte_ring_get_memsize(rte_ring_get_size(desc->tunnel_object)); assert(sz_tunnel_object >= 0); MR_ASAN_UNPOISON_MEMORY_REGION(desc->tunnel_object, (size_t)sz_tunnel_object); } static void tunnel_block_unpoison(struct tunnel_block * block) { /* Tunnel-Block Header */ MR_ASAN_UNPOISON_MEMORY_REGION(block, sizeof(struct tunnel_block)); /* Tunnel-Block Body */ void * __tunnel_block_body = (void *)block + sizeof(struct tunnel_block); size_t __tunnel_block_body_size = sizeof(struct tunnel_desc *) * (block->nr_prodq * block->nr_consq); MR_ASAN_UNPOISON_MEMORY_REGION(__tunnel_block_body, __tunnel_block_body_size); /* Tunnel Objects */ for (unsigned int prodq_id = 0; prodq_id < block->nr_prodq; prodq_id++) { for (unsigned int consq_id = 0; consq_id < block->nr_consq; consq_id++) { struct tunnel_desc * tunnel_desc = *tunnel_block_locate(block, prodq_id, consq_id); tunnel_unpoison(tunnel_desc); } } } /* Alloc a block of tunnels, and init all the tunnels */ static struct tunnel_block * tunnel_block_new(const char * symbol, struct vnode_prod * prod, struct vnode_cons * cons, unsigned int tunnel_size, unsigned int tun_sz_buffer) { unsigned int nr_prodq = prod->nr_prodq; unsigned int nr_consq = cons->nr_consq; unsigned int block_size = sizeof(struct tunnel_block) + sizeof(struct tunnel_desc *) * (nr_prodq * nr_consq); struct tunnel_block * block = (struct tunnel_block *) rte_zmalloc(NULL, block_size, 0); MR_VERIFY_MALLOC(block); block->cons = cons; block->prod = prod; block->nr_consq = nr_consq; block->nr_prodq = nr_prodq; // create tunnel for each prodq and consq in block for (int prodq_id = 0; prodq_id < nr_prodq; prodq_id++) { for (int consq_id = 0; consq_id < nr_consq; consq_id++) { char tunnel_sym[MR_SYMBOL_MAX]; snprintf(tunnel_sym, sizeof(tunnel_sym), "%s-%d-%d", symbol, prodq_id, consq_id); struct tunnel_desc * tdesc = tunnel_new(tunnel_sym, tunnel_size, tun_sz_buffer); if (tdesc == NULL) goto err; *tunnel_block_locate(block, prodq_id, consq_id) = tdesc; } } return block; err: MR_ERROR("Create tunnel block %s failed, tunnel size = %d, tunnel buffer = %d", symbol, tunnel_size, tun_sz_buffer); if (block) tunnel_block_delete(block); return NULL; } int add_block_to_list(struct tunnel_block** list, struct tunnel_block* block, int size, int *max_idx) { int i = 0; for (i = 0; i < size; i++) { if (list[i] == NULL) { list[i] = block;// assume this operation is a transaction break; } } if (i == size) { return -1; } if (i > *max_idx) { assert(*max_idx + 1 == i); (*max_idx)++; } return 0; } int renew_max_id(struct tunnel_block** list, int size) { int i = 0, cnt = 0; for (i = 0; i < size; i++) { if (list[i] == NULL) continue; cnt = i; } return cnt; } static int __do_producer_join_unsafe(struct vnode * vnode, struct vnode_prod * prod) { int nr_prodq = prod->nr_prodq; int nr_consq = 0; struct vnode_cons * cons; struct tunnel_block * block; // create tunnel blocks for each cons, and insert it into cons and prods. TAILQ_FOREACH(cons, &vnode->cons_list, next) { char block_sym[MR_SYMBOL_MAX]; snprintf(block_sym, sizeof(block_sym), "%s-%s-%s", vnode->symbol, prod->symbol, cons->symbol); nr_consq = cons->nr_consq; // create commucation tunnel for each cons and prods block = tunnel_block_new(block_sym, prod, cons, vnode->sz_tunnel, vnode->sz_tunnel_buffer); if (block == NULL) goto error; // insert block into prod block list and cons block list add_block_to_list(prod->block_list, block, RTE_DIM(prod->block_list), &(prod->max_idx)); add_block_to_list(cons->block_list, block, RTE_DIM(cons->block_list), &(cons->max_idx)); prod->nr_block++; cons->nr_block++; MR_DEBUG("Insert block %s(object=%p, nr_prodq=%d, nr_consq=%d) in prod %s(obj=%p) " "and cons %s(obj=%p)", block_sym, block, nr_prodq, nr_consq, prod->symbol, prod, cons->symbol, cons); } // insert prod into vnode's prod_list TAILQ_INSERT_TAIL(&vnode->prod_list, prod, next); vnode->nr_prod++; return 0; error: MR_ERROR("Join vnode %s producer %s failed. ", vnode->symbol, prod->symbol); return -1; } static int __do_consumer_join_unsafe(struct vnode * vnode, struct vnode_cons * cons) { int nr_consq = cons->nr_consq; int nr_prodq = 0; struct vnode_prod * prod; struct tunnel_block * block; // create tunnel blocks for each cons, and insert it into cons and prods. TAILQ_FOREACH(prod, &vnode->prod_list, next) { char block_sym[MR_SYMBOL_MAX]; snprintf(block_sym, sizeof(block_sym), "%s-%s-%s", vnode->symbol, prod->symbol, cons->symbol); nr_prodq = prod->nr_prodq; // create commucation tunnel for each cons and prods block = tunnel_block_new(block_sym, prod, cons, vnode->sz_tunnel, vnode->sz_tunnel_buffer); if (block == NULL) goto error; // insert block into prod block list and cons block list add_block_to_list(prod->block_list, block, RTE_DIM(prod->block_list), &(prod->max_idx)); add_block_to_list(cons->block_list, block, RTE_DIM(cons->block_list), &(cons->max_idx)); prod->nr_block++; cons->nr_block++; MR_DEBUG("Insert block %s(object=%p, nr_prodq=%d, nr_consq=%d) in prod %s(obj=%p)" "and cons %s(obj=%p)", block_sym, block, nr_prodq, nr_consq, prod->symbol, prod, cons->symbol, cons); } // insert prod into vnode's prod_list TAILQ_INSERT_TAIL(&vnode->cons_list, cons, next); vnode->nr_cons++; return 0; error: MR_ERROR("Join vnode %s consumer %s failed. ", vnode->symbol, cons->symbol); return -1; } static void synchronize_dataplane() { rte_delay_ms(100);//assume each function in operation will finished after such time. } /* VNode Structure Operation Functions */ struct vnode * __vnode_common_create(const char * sym, unsigned int sz_tunnel, unsigned int sz_tunnel_buffer) { struct vnode * object = (struct vnode *)rte_zmalloc(NULL, sizeof(struct vnode), 0); MR_VERIFY_MALLOC(object); snprintf(object->symbol, sizeof(object->symbol), "%s", sym); object->nr_cons = 0; object->nr_prod = 0; object->sz_tunnel = sz_tunnel; object->sz_tunnel_buffer = sz_tunnel_buffer; rte_spinlock_init(&object->lock); TAILQ_INIT(&object->cons_list); TAILQ_INIT(&object->prod_list); return object; } /* VNode的生产者、消费者注册 */ struct vnode_prod * __vnode_common_create_prod(struct vnode * vnode, const char * symbol, int nr_prodq) { int ret = 0; struct vnode_prod * prod = ZMALLOC(sizeof(struct vnode_prod)); MR_VERIFY_MALLOC(prod); snprintf(prod->symbol, sizeof(vnode->symbol), "%s", symbol); prod->vnode = vnode; prod->nr_prodq = nr_prodq; prod->nr_block = 0; rte_spinlock_lock(&vnode->lock); ret = __do_producer_join_unsafe(vnode, prod); rte_spinlock_unlock(&vnode->lock); if (ret < 0) goto err; return prod; err: if (prod != NULL) rte_free(prod); return NULL; } struct vnode_cons * __vnode_common_create_cons(struct vnode * vnode, const char * symbol, int nr_consq) { int ret = 0; struct vnode_cons * cons = ZMALLOC(sizeof(struct vnode_cons)); MR_VERIFY_MALLOC(cons); // cons description init snprintf(cons->symbol, sizeof(cons->symbol), "%s", symbol); cons->vnode = vnode; cons->nr_consq = nr_consq; cons->nr_block = 0; rte_spinlock_lock(&vnode->lock); ret = __do_consumer_join_unsafe(vnode, cons); rte_spinlock_unlock(&vnode->lock); if (ret < 0) goto err; return cons; err: if (cons != NULL) rte_free(cons); return NULL; } /// vnode_prod/cons_lookup() functions /// lookup prod/cons description by it's name. struct vnode_prod * __vnode_common_prod_lookup(struct vnode * vnode, const char * sym) { assert(vnode != NULL && sym != NULL); rte_spinlock_lock(&vnode->lock); struct vnode_prod * prod_iter = NULL; struct vnode_prod * ret = NULL; TAILQ_FOREACH(prod_iter, &vnode->prod_list, next) { if (strncmp(sym, prod_iter->symbol, sizeof(prod_iter->symbol)) == 0) { ret = prod_iter; break; } } rte_spinlock_unlock(&vnode->lock); return ret; } struct vnode_cons * __vnode_common_cons_lookup(struct vnode * vnode, const char * sym) { assert(vnode != NULL && sym != NULL); struct vnode_cons * cons_iter = NULL; struct vnode_cons * ret = NULL; rte_spinlock_lock(&vnode->lock); TAILQ_FOREACH(cons_iter, &vnode->cons_list, next) { if (strncmp(sym, cons_iter->symbol, sizeof(cons_iter->symbol)) == 0) { ret = cons_iter; break; } } rte_spinlock_unlock(&vnode->lock); return ret; } int __vnode_common_cons_attach(struct vnode * node, struct vnode_cons * cons) { int ret; assert(node != NULL && cons != NULL && cons->vnode == node); rte_spinlock_lock(&node->lock); if (cons->cur_attach >= cons->nr_consq) { MR_ERROR("Too much attach request(cur_attach=%d, nr_consq=%d)", cons->cur_attach, cons->nr_consq); ret = -1; } else { ret = cons->cur_attach++; } rte_spinlock_unlock(&node->lock); return ret; } int __vnode_common_prod_attach(struct vnode * node, struct vnode_prod* prod) { int ret; assert(node != NULL && prod != NULL && prod->vnode == node); rte_spinlock_lock(&node->lock); if (prod->cur_attach >= prod->nr_prodq) { MR_ERROR("Too much attach request(cur_attach=%d, nr_prodq=%d)", prod->cur_attach, prod->nr_prodq); ret = -1; } else { ret = prod->cur_attach++; } rte_spinlock_unlock(&node->lock); return ret; } struct vnode_prod_stat * __vnode_common_prod_stat_get(struct vnode_prod * prod) { return prod->stat; } struct vnode_cons_stat * __vnode_common_cons_stat_get(struct vnode_cons * cons) { return cons->stat; } /* 集合求交集 */ static void block_list_intersection(struct tunnel_block ** block_list_a, unsigned int sz_block_list_a, unsigned int * result_in_a, struct tunnel_block ** block_list_b, unsigned int sz_block_list_b, unsigned int * result_in_b) { for (int i = 0; i < sz_block_list_a; i++) { for (int j = 0; j < sz_block_list_b; j++) { if (block_list_a[i] != block_list_b[j]) continue; result_in_a[i] = 1; result_in_b[j] = 1; } } return; } int __vnode_common_delete_prod(struct vnode_prod * prod) { struct vnode * vnode = prod->vnode; struct tunnel_block* block = NULL; struct vnode_cons * cons_iter; rte_spinlock_lock(&vnode->lock); /* 消费者中查找需要删除的Block */ TAILQ_FOREACH(cons_iter, &vnode->cons_list, next) { unsigned int tag_cons_block_list[RTE_DIM(prod->block_list)] = { 0 }; unsigned int tag_prod_block_list[RTE_DIM(cons_iter->block_list)] = { 0 }; block_list_intersection(cons_iter->block_list, RTE_DIM(cons_iter->block_list), tag_cons_block_list, prod->block_list, RTE_DIM(prod->block_list), tag_prod_block_list); for (int i = 0; i < RTE_DIM(tag_cons_block_list); i++) { if (tag_cons_block_list[i] == 0) continue; __atomic_store_n(&cons_iter->block_list[i], NULL, __ATOMIC_SEQ_CST); } unsigned int max_idx = renew_max_id(cons_iter->block_list, RTE_DIM(cons_iter->block_list)); cons_iter->max_idx = max_idx; } synchronize_dataplane(); // 删除引用的Block for (int i = 0; i < RTE_DIM(prod->block_list); i++) { block = prod->block_list[i]; if (block == NULL) continue; tunnel_block_delete(block); } // 从VNode中移除生产者描述符 struct vnode_prod_list * prod_list_head = &vnode->prod_list; TAILQ_REMOVE(prod_list_head, prod, next); vnode->nr_prod--; // 释放prod的描述符空间 rte_free(prod); rte_spinlock_unlock(&vnode->lock); return 0; } int __vnode_common_delete_cons(struct vnode_cons * cons) { struct vnode * vnode = cons->vnode; struct tunnel_block* block = NULL; struct vnode_prod * prod_iter; rte_spinlock_lock(&vnode->lock); /* 在生产者中查找需要删除的Block */ TAILQ_FOREACH(prod_iter, &vnode->prod_list, next) { unsigned int tag_cons_block_list[RTE_DIM(cons->block_list)] = { 0 }; unsigned int tag_prod_block_list[RTE_DIM(prod_iter->block_list)] = { 0 }; block_list_intersection(prod_iter->block_list, RTE_DIM(prod_iter->block_list), tag_prod_block_list, cons->block_list, RTE_DIM(cons->block_list), tag_cons_block_list); for(int i = 0; i < RTE_DIM(tag_prod_block_list); i++) { if (tag_prod_block_list[i] == 0) continue; __atomic_store_n(&prod_iter->block_list[i], NULL, __ATOMIC_SEQ_CST); } unsigned int max_idx = renew_max_id(prod_iter->block_list, RTE_DIM(prod_iter->block_list)); prod_iter->max_idx = max_idx; } synchronize_dataplane(); // 删除引用的Block for (int i = 0; i < RTE_DIM(cons->block_list); i++) { block = cons->block_list[i]; if (block == NULL) continue; tunnel_block_delete(block); } // 从VNode中移除消费者描述符 struct vnode_cons_list * cons_list_head = &vnode->cons_list; TAILQ_REMOVE(cons_list_head, cons, next); vnode->nr_cons--; // 释放cons描述符空间 rte_free(cons); rte_spinlock_unlock(&vnode->lock); return 0; } int __vnode_common_delete(struct vnode * vnode) { // delete all the cons struct vnode_cons * cons_iter; TAILQ_FOREACH(cons_iter, &vnode->cons_list, next) { __vnode_common_delete_cons(cons_iter); } // delete all the prods struct vnode_prod * prod_iter; TAILQ_FOREACH(prod_iter, &vnode->prod_list, next) { __vnode_common_delete_prod(prod_iter); } if (vnode != NULL) rte_free(vnode); return 0; } void __vnode_common_unpoison(struct vnode * vnode) { MR_ASAN_UNPOISON_MEMORY_REGION(vnode, sizeof(struct vnode)); } void __vnode_common_unpoison_prod(struct vnode_prod * prod) { MR_ASAN_UNPOISON_MEMORY_REGION(prod, sizeof(struct vnode_prod)); __vnode_common_unpoison(prod->vnode); for(unsigned int i = 0; i < prod->nr_block; i++) tunnel_block_unpoison(prod->block_list[i]); } void __vnode_common_unpoison_cons(struct vnode_cons * cons) { MR_ASAN_UNPOISON_MEMORY_REGION(cons, sizeof(struct vnode_cons)); __vnode_common_unpoison(cons->vnode); for(unsigned int i = 0; i < cons->nr_block; i++) tunnel_block_unpoison(cons->block_list[i]); }