diff options
| author | Qiuwen Lu <[email protected]> | 2016-12-27 17:12:52 +0800 |
|---|---|---|
| committer | Qiuwen Lu <[email protected]> | 2016-12-27 17:12:52 +0800 |
| commit | 748cc3443303cf6e80320437d5dff77143871ac9 (patch) | |
| tree | b0c45b1b781848d2426c492cf92da6830aeb439c | |
| parent | 8840c7c7d61d8c1408d83ec9dd03c3dce81fdad8 (diff) | |
修复除零错误问题。ZCPD进程初始化完成后再接受外部应用的连接。
| -rw-r--r-- | core/include/mr_vnode.h | 2 | ||||
| -rw-r--r-- | core/src/vnode.c | 151 | ||||
| -rw-r--r-- | pag/libpag.c | 11 | ||||
| -rw-r--r-- | runtime/include/mr_runtime.h | 2 | ||||
| -rw-r--r-- | runtime/src/event.c | 3 | ||||
| -rw-r--r-- | runtime/src/runtime.c | 13 | ||||
| -rw-r--r-- | service/src/core.c | 9 | ||||
| -rw-r--r-- | stack/src/interface.c | 1 |
8 files changed, 137 insertions, 55 deletions
diff --git a/core/include/mr_vnode.h b/core/include/mr_vnode.h index b6a9b03..3a1870e 100644 --- a/core/include/mr_vnode.h +++ b/core/include/mr_vnode.h @@ -15,7 +15,7 @@ extern "C" { struct tunnel_block { /* should be deleted */ - unsigned int deleted; + volatile unsigned int deleted; /* Prod Queue Count */ unsigned int nr_prodq; /* Cons Queue Count */ diff --git a/core/src/vnode.c b/core/src/vnode.c index 8fa9fca..d7d9c70 100644 --- a/core/src/vnode.c +++ b/core/src/vnode.c @@ -23,6 +23,7 @@ #include <mr_common.h> #include <mr_vnode.h> #include <rte_ring.h> +#include <rte_cycles.h> #ifndef MR_LIBVNODE_OPT_THREAD_SAFE #define MR_LIBVNODE_OPT_THREAD_SAFE 1 @@ -48,12 +49,45 @@ #define __write_unlock(x) #endif - #define VNODE_STAT_UPDATE(desc, queue, item, value) \ do { \ rte_atomic64_add(&desc->stat[queue].item,value); \ } while(0) \ + +/* Port from liburcu 0.9.3, to support lockless vnode */ + +#define CMM_ACCESS_ONCE(x) (*(__volatile__ __typeof__(x) *)&(x)) +#define _CMM_LOAD_SHARED(p) CMM_ACCESS_ONCE(p) + +/* +* Load a data from shared memory, doing a cache flush if required. +*/ +#define CMM_LOAD_SHARED(p) \ + __extension__ \ + ({ \ + rte_rmb(); \ + _CMM_LOAD_SHARED(p); \ + }) + +/* +* Identify a shared store. A cmm_smp_wmc() or cmm_smp_mc() should +* follow the store. +*/ +#define _CMM_STORE_SHARED(x, v) __extension__ ({ CMM_ACCESS_ONCE(x) = (v); }) + +/* +* Store v into x, where x is located in shared memory. Performs the +* required cache flush after writing. Returns v. +*/ +#define CMM_STORE_SHARED(x, v) \ + __extension__ \ + ({ \ + __typeof__(x) _v = _CMM_STORE_SHARED(x, v); \ + rte_wmb(); \ + _v = _v; /* Work around clang "unused result" */ \ + }) + /* Tunnel Description Structure */ struct tunnel_desc { @@ -229,9 +263,10 @@ int tunnel_block_delete(struct tunnel_block * block, struct vnode_ops * ops) // 删除Block。当Block没有被删除过时,进入GC状态,等待GC回收。 int tunnel_block_try_delete(struct tunnel_block * block, struct vnode_ops * ops) { - if (block->deleted == 0) + unsigned int deleted = CMM_LOAD_SHARED(block->deleted); + if (deleted == 0) { - block->deleted = 1; + CMM_STORE_SHARED(block->deleted, 1); return 0; } else @@ -242,7 +277,8 @@ int tunnel_block_try_delete(struct tunnel_block * block, struct vnode_ops * ops) int tunnel_block_try_gc(struct tunnel_block * block, struct vnode_ops * ops) { - if (block->deleted) + unsigned int deleted = CMM_LOAD_SHARED(block->deleted); + if (deleted) return tunnel_block_delete(block, ops); return -1; } @@ -524,6 +560,11 @@ int vnode_cons_increase_unsafe(struct vnode * vnode, struct vnode_ops * ops, return 0; } +void synchronize_dataplane() +{ + rte_delay_ms(10);//assume each function in operation will finished after such time. +} + // 垃圾回收,回收已经没有用的TunnelBlock void vnode_cons_block_gc_unsafe(struct vnode_cons * cons, struct vnode_ops * ops) { @@ -531,7 +572,7 @@ void vnode_cons_block_gc_unsafe(struct vnode_cons * cons, struct vnode_ops * ops int i = 0, ret = 0; for (i = 0; i < RTE_DIM(cons->block_list); i++) { - block = cons->block_list[i]; + block = CMM_LOAD_SHARED(cons->block_list[i]); if (block == NULL) { continue; @@ -543,9 +584,14 @@ void vnode_cons_block_gc_unsafe(struct vnode_cons * cons, struct vnode_ops * ops continue; } +#if 0 cons->block_list[i] = NULL; cons->nr_block--; rte_mb(); +#endif + CMM_STORE_SHARED(cons->block_list[i], NULL); + cons->nr_block--; + } return; } @@ -554,22 +600,31 @@ void vnode_cons_block_gc_unsafe(struct vnode_cons * cons, struct vnode_ops * ops void vnode_prod_block_gc_unsafe(struct vnode_prod * prod, struct vnode_ops * ops) { struct tunnel_block* block = NULL; - int i = 0, ret = 0; + int i = 0; for (i = 0; i < RTE_DIM(prod->block_list); i++) { - block = prod->block_list[i]; + block = CMM_LOAD_SHARED(prod->block_list[i]); if (block == NULL) { continue; } - ret = tunnel_block_try_gc(block, ops); - if (ret < 0) + + unsigned int deleted = CMM_LOAD_SHARED(block->deleted); + if (deleted) { - continue; + rte_wmb(); + CMM_STORE_SHARED(prod->block_list[i], NULL); + synchronize_dataplane(); + tunnel_block_delete(block, ops); + prod->nr_block--; } + +#if 0 prod->block_list[i] = NULL; prod->nr_block--; rte_mb(); +#endif + } return; @@ -735,11 +790,6 @@ struct vnode_cons_stat * vnode_cons_stat_get(struct vnode_cons * cons) return cons->stat; } -void synchronize_dataplane() -{ - usleep(10);//assume each function in operation will finished after such time. -} - int vnode_delete_prod(struct vnode_prod * prod, struct vnode_ops * ops) { assert(prod != NULL && prod->vnode != NULL && ops != NULL); @@ -753,28 +803,33 @@ int vnode_delete_prod(struct vnode_prod * prod, struct vnode_ops * ops) // 删除引用的Block for (i = 0; i < RTE_DIM(prod->block_list); i++) { - block = prod->block_list[i]; + block = CMM_LOAD_SHARED(prod->block_list[i]); if (block == NULL) { continue; } + assert(block->deleted == 0); tunnel_block_try_delete(block, ops); - prod->block_list[i] = NULL; + rte_wmb(); + CMM_STORE_SHARED(prod->block_list[i], NULL); } + TAILQ_FOREACH(cons_iter, &vnode->prod_list, next) { - cons_iter->max_idx=renew_max_id(cons_iter->block_list, RTE_DIM(cons_iter->block_list)); + unsigned int max_idx = renew_max_id(cons_iter->block_list, RTE_DIM(cons_iter->block_list)); + rte_wmb(); + CMM_STORE_SHARED(cons_iter->max_idx, max_idx); } - //remove Producer from VNode - struct vnode_prod_list * prod_list_head = &prod->vnode->prod_list; - TAILQ_REMOVE(prod_list_head, prod, next); - prod->vnode->nr_prod--; rte_mb(); synchronize_dataplane(); + //remove Producer from VNode + struct vnode_prod_list * prod_list_head = &prod->vnode->prod_list; + TAILQ_REMOVE(prod_list_head, prod, next); + prod->vnode->nr_prod--; TAILQ_FOREACH(cons_iter, &vnode->cons_list, next) { @@ -797,17 +852,24 @@ int vnode_delete_cons(struct vnode_cons * cons, struct vnode_ops * ops) // 删除引用的Block for (int i = 0; i < RTE_DIM(cons->block_list); i++) { - block = cons->block_list[i]; + block = CMM_LOAD_SHARED(cons->block_list[i]); if (block == NULL) { continue; } + tunnel_block_try_delete(block, ops); + + rte_wmb(); + CMM_STORE_SHARED(cons->block_list[i], NULL); } TAILQ_FOREACH(prod_iter, &vnode->prod_list, next) { - prod_iter->max_idx=renew_max_id(prod_iter->block_list, RTE_DIM(prod_iter->block_list)); + unsigned int max_idx = renew_max_id(prod_iter->block_list, RTE_DIM(prod_iter->block_list)); + + rte_wmb(); + CMM_STORE_SHARED(prod_iter->max_idx, max_idx); } rte_mb(); @@ -860,19 +922,33 @@ int vnode_enqueue_burst_with_hash(struct vnode_prod * prod, struct vnode_ops * o assert(nr_objects <= MR_LIBVNODE_MAX_SZ_BURST); struct tunnel_block* block = NULL, *last_block = NULL; + // get the last block, this block are enqueued with the original objects. - last_block = prod->block_list[prod->max_idx]; - // last_block is null when there is not registerd consumer. - if (unlikely(last_block == NULL || last_block->deleted == 1)) + unsigned int max_idx = CMM_LOAD_SHARED(prod->max_idx); + last_block = CMM_LOAD_SHARED(prod->block_list[max_idx]); + + if (unlikely(last_block == NULL)) + { + ret = -1; goto out; + } + + unsigned int last_block_delete = CMM_LOAD_SHARED(last_block->deleted); + if(unlikely(last_block_delete)) { ret = -1; goto out; } // check all the blocks expect last block, and dup objects and enqueue them. - for (int block_id = 0; block_id < prod->max_idx; block_id++) + for (int block_id = 0; block_id < max_idx; block_id++) { - block = prod->block_list[block_id]; - if (block == NULL || block->deleted == 1) + block = CMM_LOAD_SHARED(prod->block_list[block_id]); + if (block == NULL) + { + continue; + } + + unsigned int deleted = CMM_LOAD_SHARED(block->deleted); + if(deleted) { continue; } @@ -913,15 +989,22 @@ int vnode_dequeue_burst(struct vnode_cons * cons, struct vnode_ops * ops, int nr_used = 0; int nr_left = nr_max_objects; int nr_dequeue = 0; - int i = 0; struct tunnel_block* block = NULL; - for (i = 0; i < cons->max_idx + 1; i++) + for (int block_id = 0; block_id < cons->max_idx + 1; block_id++) { - block = cons->block_list[i]; - if (block == NULL || block->deleted == 1) + block = CMM_LOAD_SHARED(cons->block_list[block_id]); + if (block == NULL) { continue; } + + unsigned int deleted = CMM_LOAD_SHARED(block->deleted); + if (deleted) + { + continue; + } + + assert(block != NULL && block->deleted == 0); nr_dequeue = tunnel_block_dequeue(block, ops, consq, &objects[nr_used], nr_left); nr_used += nr_dequeue; diff --git a/pag/libpag.c b/pag/libpag.c index 8031b43..05ed5b4 100644 --- a/pag/libpag.c +++ b/pag/libpag.c @@ -254,11 +254,6 @@ static int pag_config(struct pag_instance * instance) return 0;
}
-static void __on_exit_pag_close(void * arg)
-{
- pag_close();
-}
-
int pag_open()
{
if (__pag_inited != 0) return 0;
@@ -280,9 +275,9 @@ int pag_open() return -3;
}
- if (instance->autoexit)
- marsio_on_exit_register(__on_exit_pag_close, NULL);
-
+ if (ret < 0)
+ exit(EXIT_FAILURE);
+
__pag_inited = 1;
return 0;
}
diff --git a/runtime/include/mr_runtime.h b/runtime/include/mr_runtime.h index 7704e1a..3c65cf3 100644 --- a/runtime/include/mr_runtime.h +++ b/runtime/include/mr_runtime.h @@ -36,6 +36,8 @@ int mr_id_manager_release_gsid(thread_id_t start_gsid, int nr_thread); void mr_runtime_init(); void mr_runtime_slave_init(); +void mr_runtime_start(); + struct mr_global_ctx * mr_global_ctx_get(); struct mr_global_config * mr_global_config_get(); diff --git a/runtime/src/event.c b/runtime/src/event.c index f344431..4f7be0f 100644 --- a/runtime/src/event.c +++ b/runtime/src/event.c @@ -114,8 +114,9 @@ void * app_crash_thread_monitor_service(void * arg) exit(EXIT_FAILURE); } - assert(0); MR_LOG(ERR, BASE, "Crash monitor thread is exited. This is an error. \n"); + abort(); + return (void *)NULL; } diff --git a/runtime/src/runtime.c b/runtime/src/runtime.c index ac40178..3a4afcb 100644 --- a/runtime/src/runtime.c +++ b/runtime/src/runtime.c @@ -47,13 +47,18 @@ void mr_runtime_init() rt_ctx->ev_ctx = app_ev_manager_create(rt_ctx); MR_CHECK(rt_ctx->ev_ctx != NULL, "RuntimeInit, App event initialize failed. "); + g_ctx->ctx_runtime = rt_ctx; + return; +} + +void mr_runtime_start() +{ + struct mr_global_ctx * g_ctx = mr_global_ctx_get(); + struct mr_runtime_ctx * rt_ctx = g_ctx->ctx_runtime; + pthread_t pid_crash_monitor; pthread_create(&pid_crash_monitor, NULL, app_thread_crash_monitor, (void *)rt_ctx->ev_ctx); - app_crash_cb_register(rt_ctx->ev_ctx, app_mamager_crash_event_handler, rt_ctx->app_ctx); - - g_ctx->ctx_runtime = rt_ctx; - return; } void mr_runtime_slave_init() diff --git a/service/src/core.c b/service/src/core.c index 139ef82..f77c29e 100644 --- a/service/src/core.c +++ b/service/src/core.c @@ -223,11 +223,6 @@ void sc_instance_ctx_init(struct sc_instance * instance) return; } -void sc_stage_init(struct sc_instance * instance) -{ - return; -} - extern int sc_monit_loop(struct sc_instance * sc_instance); int sc_keepalive_loop(struct sc_instance * sc_instance) @@ -271,8 +266,11 @@ void sc_launch_thread(struct sc_instance * instance) for(int i = 0; i < nr_thread; i++) pthread_create(&pids[i], NULL, sc_runtime_thread, instance); + + mr_runtime_start(); for (int i = 0; i < nr_thread; i++) pthread_join(pids[i], NULL); + return; } @@ -298,7 +296,6 @@ int main(int argc, char * argv[]) MR_LOG(INFO, SERVICE, "Starting IIE MARSIO I/O Service...\n"); sc_stage_config(instance); sc_stage_register(instance); - sc_stage_init(instance); sc_launch_thread(instance); assert(0); return 0; diff --git a/stack/src/interface.c b/stack/src/interface.c index d0a2334..5f53ab1 100644 --- a/stack/src/interface.c +++ b/stack/src/interface.c @@ -15,7 +15,6 @@ void sk_default_instance_set(struct sk_instance * instance, sk_default_app_instance = app_instance; } - int marsio_socket(int family, int type, int protocol) { if (unlikely(family != AF_INET && family != AF_INET6)) |
