summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQiuwen Lu <[email protected]>2016-12-27 17:12:52 +0800
committerQiuwen Lu <[email protected]>2016-12-27 17:12:52 +0800
commit748cc3443303cf6e80320437d5dff77143871ac9 (patch)
treeb0c45b1b781848d2426c492cf92da6830aeb439c
parent8840c7c7d61d8c1408d83ec9dd03c3dce81fdad8 (diff)
修复除零错误问题。ZCPD进程初始化完成后再接受外部应用的连接。
-rw-r--r--core/include/mr_vnode.h2
-rw-r--r--core/src/vnode.c151
-rw-r--r--pag/libpag.c11
-rw-r--r--runtime/include/mr_runtime.h2
-rw-r--r--runtime/src/event.c3
-rw-r--r--runtime/src/runtime.c13
-rw-r--r--service/src/core.c9
-rw-r--r--stack/src/interface.c1
8 files changed, 137 insertions, 55 deletions
diff --git a/core/include/mr_vnode.h b/core/include/mr_vnode.h
index b6a9b03..3a1870e 100644
--- a/core/include/mr_vnode.h
+++ b/core/include/mr_vnode.h
@@ -15,7 +15,7 @@ extern "C" {
struct tunnel_block
{
/* should be deleted */
- unsigned int deleted;
+ volatile unsigned int deleted;
/* Prod Queue Count */
unsigned int nr_prodq;
/* Cons Queue Count */
diff --git a/core/src/vnode.c b/core/src/vnode.c
index 8fa9fca..d7d9c70 100644
--- a/core/src/vnode.c
+++ b/core/src/vnode.c
@@ -23,6 +23,7 @@
#include <mr_common.h>
#include <mr_vnode.h>
#include <rte_ring.h>
+#include <rte_cycles.h>
#ifndef MR_LIBVNODE_OPT_THREAD_SAFE
#define MR_LIBVNODE_OPT_THREAD_SAFE 1
@@ -48,12 +49,45 @@
#define __write_unlock(x)
#endif
-
#define VNODE_STAT_UPDATE(desc, queue, item, value) \
do { \
rte_atomic64_add(&desc->stat[queue].item,value); \
} while(0) \
+
+/* Port from liburcu 0.9.3, to support lockless vnode */
+
+#define CMM_ACCESS_ONCE(x) (*(__volatile__ __typeof__(x) *)&(x))
+#define _CMM_LOAD_SHARED(p) CMM_ACCESS_ONCE(p)
+
+/*
+* Load a data from shared memory, doing a cache flush if required.
+*/
+#define CMM_LOAD_SHARED(p) \
+ __extension__ \
+ ({ \
+ rte_rmb(); \
+ _CMM_LOAD_SHARED(p); \
+ })
+
+/*
+* Identify a shared store. A cmm_smp_wmc() or cmm_smp_mc() should
+* follow the store.
+*/
+#define _CMM_STORE_SHARED(x, v) __extension__ ({ CMM_ACCESS_ONCE(x) = (v); })
+
+/*
+* Store v into x, where x is located in shared memory. Performs the
+* required cache flush after writing. Returns v.
+*/
+#define CMM_STORE_SHARED(x, v) \
+ __extension__ \
+ ({ \
+ __typeof__(x) _v = _CMM_STORE_SHARED(x, v); \
+ rte_wmb(); \
+ _v = _v; /* Work around clang "unused result" */ \
+ })
+
/* Tunnel Description Structure */
struct tunnel_desc
{
@@ -229,9 +263,10 @@ int tunnel_block_delete(struct tunnel_block * block, struct vnode_ops * ops)
// 删除Block。当Block没有被删除过时,进入GC状态,等待GC回收。
int tunnel_block_try_delete(struct tunnel_block * block, struct vnode_ops * ops)
{
- if (block->deleted == 0)
+ unsigned int deleted = CMM_LOAD_SHARED(block->deleted);
+ if (deleted == 0)
{
- block->deleted = 1;
+ CMM_STORE_SHARED(block->deleted, 1);
return 0;
}
else
@@ -242,7 +277,8 @@ int tunnel_block_try_delete(struct tunnel_block * block, struct vnode_ops * ops)
int tunnel_block_try_gc(struct tunnel_block * block, struct vnode_ops * ops)
{
- if (block->deleted)
+ unsigned int deleted = CMM_LOAD_SHARED(block->deleted);
+ if (deleted)
return tunnel_block_delete(block, ops);
return -1;
}
@@ -524,6 +560,11 @@ int vnode_cons_increase_unsafe(struct vnode * vnode, struct vnode_ops * ops,
return 0;
}
+void synchronize_dataplane()
+{
+ rte_delay_ms(10);//assume each function in operation will finished after such time.
+}
+
// 垃圾回收,回收已经没有用的TunnelBlock
void vnode_cons_block_gc_unsafe(struct vnode_cons * cons, struct vnode_ops * ops)
{
@@ -531,7 +572,7 @@ void vnode_cons_block_gc_unsafe(struct vnode_cons * cons, struct vnode_ops * ops
int i = 0, ret = 0;
for (i = 0; i < RTE_DIM(cons->block_list); i++)
{
- block = cons->block_list[i];
+ block = CMM_LOAD_SHARED(cons->block_list[i]);
if (block == NULL)
{
continue;
@@ -543,9 +584,14 @@ void vnode_cons_block_gc_unsafe(struct vnode_cons * cons, struct vnode_ops * ops
continue;
}
+#if 0
cons->block_list[i] = NULL;
cons->nr_block--;
rte_mb();
+#endif
+ CMM_STORE_SHARED(cons->block_list[i], NULL);
+ cons->nr_block--;
+
}
return;
}
@@ -554,22 +600,31 @@ void vnode_cons_block_gc_unsafe(struct vnode_cons * cons, struct vnode_ops * ops
void vnode_prod_block_gc_unsafe(struct vnode_prod * prod, struct vnode_ops * ops)
{
struct tunnel_block* block = NULL;
- int i = 0, ret = 0;
+ int i = 0;
for (i = 0; i < RTE_DIM(prod->block_list); i++)
{
- block = prod->block_list[i];
+ block = CMM_LOAD_SHARED(prod->block_list[i]);
if (block == NULL)
{
continue;
}
- ret = tunnel_block_try_gc(block, ops);
- if (ret < 0)
+
+ unsigned int deleted = CMM_LOAD_SHARED(block->deleted);
+ if (deleted)
{
- continue;
+ rte_wmb();
+ CMM_STORE_SHARED(prod->block_list[i], NULL);
+ synchronize_dataplane();
+ tunnel_block_delete(block, ops);
+ prod->nr_block--;
}
+
+#if 0
prod->block_list[i] = NULL;
prod->nr_block--;
rte_mb();
+#endif
+
}
return;
@@ -735,11 +790,6 @@ struct vnode_cons_stat * vnode_cons_stat_get(struct vnode_cons * cons)
return cons->stat;
}
-void synchronize_dataplane()
-{
- usleep(10);//assume each function in operation will finished after such time.
-}
-
int vnode_delete_prod(struct vnode_prod * prod, struct vnode_ops * ops)
{
assert(prod != NULL && prod->vnode != NULL && ops != NULL);
@@ -753,28 +803,33 @@ int vnode_delete_prod(struct vnode_prod * prod, struct vnode_ops * ops)
// 删除引用的Block
for (i = 0; i < RTE_DIM(prod->block_list); i++)
{
- block = prod->block_list[i];
+ block = CMM_LOAD_SHARED(prod->block_list[i]);
if (block == NULL)
{
continue;
}
+
assert(block->deleted == 0);
tunnel_block_try_delete(block, ops);
- prod->block_list[i] = NULL;
+ rte_wmb();
+ CMM_STORE_SHARED(prod->block_list[i], NULL);
}
+
TAILQ_FOREACH(cons_iter, &vnode->prod_list, next)
{
- cons_iter->max_idx=renew_max_id(cons_iter->block_list, RTE_DIM(cons_iter->block_list));
+ unsigned int max_idx = renew_max_id(cons_iter->block_list, RTE_DIM(cons_iter->block_list));
+ rte_wmb();
+ CMM_STORE_SHARED(cons_iter->max_idx, max_idx);
}
- //remove Producer from VNode
- struct vnode_prod_list * prod_list_head = &prod->vnode->prod_list;
- TAILQ_REMOVE(prod_list_head, prod, next);
- prod->vnode->nr_prod--;
rte_mb();
synchronize_dataplane();
+ //remove Producer from VNode
+ struct vnode_prod_list * prod_list_head = &prod->vnode->prod_list;
+ TAILQ_REMOVE(prod_list_head, prod, next);
+ prod->vnode->nr_prod--;
TAILQ_FOREACH(cons_iter, &vnode->cons_list, next)
{
@@ -797,17 +852,24 @@ int vnode_delete_cons(struct vnode_cons * cons, struct vnode_ops * ops)
// 删除引用的Block
for (int i = 0; i < RTE_DIM(cons->block_list); i++)
{
- block = cons->block_list[i];
+ block = CMM_LOAD_SHARED(cons->block_list[i]);
if (block == NULL)
{
continue;
}
+
tunnel_block_try_delete(block, ops);
+
+ rte_wmb();
+ CMM_STORE_SHARED(cons->block_list[i], NULL);
}
TAILQ_FOREACH(prod_iter, &vnode->prod_list, next)
{
- prod_iter->max_idx=renew_max_id(prod_iter->block_list, RTE_DIM(prod_iter->block_list));
+ unsigned int max_idx = renew_max_id(prod_iter->block_list, RTE_DIM(prod_iter->block_list));
+
+ rte_wmb();
+ CMM_STORE_SHARED(prod_iter->max_idx, max_idx);
}
rte_mb();
@@ -860,19 +922,33 @@ int vnode_enqueue_burst_with_hash(struct vnode_prod * prod, struct vnode_ops * o
assert(nr_objects <= MR_LIBVNODE_MAX_SZ_BURST);
struct tunnel_block* block = NULL, *last_block = NULL;
+
// get the last block, this block are enqueued with the original objects.
- last_block = prod->block_list[prod->max_idx];
- // last_block is null when there is not registerd consumer.
- if (unlikely(last_block == NULL || last_block->deleted == 1))
+ unsigned int max_idx = CMM_LOAD_SHARED(prod->max_idx);
+ last_block = CMM_LOAD_SHARED(prod->block_list[max_idx]);
+
+ if (unlikely(last_block == NULL))
+ {
+ ret = -1; goto out;
+ }
+
+ unsigned int last_block_delete = CMM_LOAD_SHARED(last_block->deleted);
+ if(unlikely(last_block_delete))
{
ret = -1; goto out;
}
// check all the blocks expect last block, and dup objects and enqueue them.
- for (int block_id = 0; block_id < prod->max_idx; block_id++)
+ for (int block_id = 0; block_id < max_idx; block_id++)
{
- block = prod->block_list[block_id];
- if (block == NULL || block->deleted == 1)
+ block = CMM_LOAD_SHARED(prod->block_list[block_id]);
+ if (block == NULL)
+ {
+ continue;
+ }
+
+ unsigned int deleted = CMM_LOAD_SHARED(block->deleted);
+ if(deleted)
{
continue;
}
@@ -913,15 +989,22 @@ int vnode_dequeue_burst(struct vnode_cons * cons, struct vnode_ops * ops,
int nr_used = 0;
int nr_left = nr_max_objects;
int nr_dequeue = 0;
- int i = 0;
struct tunnel_block* block = NULL;
- for (i = 0; i < cons->max_idx + 1; i++)
+ for (int block_id = 0; block_id < cons->max_idx + 1; block_id++)
{
- block = cons->block_list[i];
- if (block == NULL || block->deleted == 1)
+ block = CMM_LOAD_SHARED(cons->block_list[block_id]);
+ if (block == NULL)
{
continue;
}
+
+ unsigned int deleted = CMM_LOAD_SHARED(block->deleted);
+ if (deleted)
+ {
+ continue;
+ }
+
+ assert(block != NULL && block->deleted == 0);
nr_dequeue = tunnel_block_dequeue(block,
ops, consq, &objects[nr_used], nr_left);
nr_used += nr_dequeue;
diff --git a/pag/libpag.c b/pag/libpag.c
index 8031b43..05ed5b4 100644
--- a/pag/libpag.c
+++ b/pag/libpag.c
@@ -254,11 +254,6 @@ static int pag_config(struct pag_instance * instance)
return 0;
}
-static void __on_exit_pag_close(void * arg)
-{
- pag_close();
-}
-
int pag_open()
{
if (__pag_inited != 0) return 0;
@@ -280,9 +275,9 @@ int pag_open()
return -3;
}
- if (instance->autoexit)
- marsio_on_exit_register(__on_exit_pag_close, NULL);
-
+ if (ret < 0)
+ exit(EXIT_FAILURE);
+
__pag_inited = 1;
return 0;
}
diff --git a/runtime/include/mr_runtime.h b/runtime/include/mr_runtime.h
index 7704e1a..3c65cf3 100644
--- a/runtime/include/mr_runtime.h
+++ b/runtime/include/mr_runtime.h
@@ -36,6 +36,8 @@ int mr_id_manager_release_gsid(thread_id_t start_gsid, int nr_thread);
void mr_runtime_init();
void mr_runtime_slave_init();
+void mr_runtime_start();
+
struct mr_global_ctx * mr_global_ctx_get();
struct mr_global_config * mr_global_config_get();
diff --git a/runtime/src/event.c b/runtime/src/event.c
index f344431..4f7be0f 100644
--- a/runtime/src/event.c
+++ b/runtime/src/event.c
@@ -114,8 +114,9 @@ void * app_crash_thread_monitor_service(void * arg)
exit(EXIT_FAILURE);
}
- assert(0);
MR_LOG(ERR, BASE, "Crash monitor thread is exited. This is an error. \n");
+ abort();
+
return (void *)NULL;
}
diff --git a/runtime/src/runtime.c b/runtime/src/runtime.c
index ac40178..3a4afcb 100644
--- a/runtime/src/runtime.c
+++ b/runtime/src/runtime.c
@@ -47,13 +47,18 @@ void mr_runtime_init()
rt_ctx->ev_ctx = app_ev_manager_create(rt_ctx);
MR_CHECK(rt_ctx->ev_ctx != NULL, "RuntimeInit, App event initialize failed. ");
+ g_ctx->ctx_runtime = rt_ctx;
+ return;
+}
+
+void mr_runtime_start()
+{
+ struct mr_global_ctx * g_ctx = mr_global_ctx_get();
+ struct mr_runtime_ctx * rt_ctx = g_ctx->ctx_runtime;
+
pthread_t pid_crash_monitor;
pthread_create(&pid_crash_monitor, NULL, app_thread_crash_monitor, (void *)rt_ctx->ev_ctx);
-
app_crash_cb_register(rt_ctx->ev_ctx, app_mamager_crash_event_handler, rt_ctx->app_ctx);
-
- g_ctx->ctx_runtime = rt_ctx;
- return;
}
void mr_runtime_slave_init()
diff --git a/service/src/core.c b/service/src/core.c
index 139ef82..f77c29e 100644
--- a/service/src/core.c
+++ b/service/src/core.c
@@ -223,11 +223,6 @@ void sc_instance_ctx_init(struct sc_instance * instance)
return;
}
-void sc_stage_init(struct sc_instance * instance)
-{
- return;
-}
-
extern int sc_monit_loop(struct sc_instance * sc_instance);
int sc_keepalive_loop(struct sc_instance * sc_instance)
@@ -271,8 +266,11 @@ void sc_launch_thread(struct sc_instance * instance)
for(int i = 0; i < nr_thread; i++)
pthread_create(&pids[i], NULL, sc_runtime_thread, instance);
+
+ mr_runtime_start();
for (int i = 0; i < nr_thread; i++)
pthread_join(pids[i], NULL);
+
return;
}
@@ -298,7 +296,6 @@ int main(int argc, char * argv[])
MR_LOG(INFO, SERVICE, "Starting IIE MARSIO I/O Service...\n");
sc_stage_config(instance);
sc_stage_register(instance);
- sc_stage_init(instance);
sc_launch_thread(instance);
assert(0);
return 0;
diff --git a/stack/src/interface.c b/stack/src/interface.c
index d0a2334..5f53ab1 100644
--- a/stack/src/interface.c
+++ b/stack/src/interface.c
@@ -15,7 +15,6 @@ void sk_default_instance_set(struct sk_instance * instance,
sk_default_app_instance = app_instance;
}
-
int marsio_socket(int family, int type, int protocol)
{
if (unlikely(family != AF_INET && family != AF_INET6))