summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQiuwen Lu <[email protected]>2016-12-13 15:05:00 +0800
committerQiuwen Lu <[email protected]>2016-12-13 15:05:00 +0800
commit797b8fcb516d64edabbe11033a352f2df1c09457 (patch)
tree19e114e4b5fb33f91f87b1101a61d72b86f4a69b
parent0bd00dfe4d5f01ebda9ebba8a1d889db40a05ce3 (diff)
parent43cc4a9ddebc27c9c4e584fcc75424a890bf00e5 (diff)
Merge branch 'rel-4.X.X' of 10.0.6.226:mesa_platform/marsio into rel-4.X.Xv4.0.9-20161213
-rw-r--r--app/src/marsio.c12
-rw-r--r--core/include/mr_core.h4
-rw-r--r--core/include/mr_rtdev.h9
-rw-r--r--core/include/mr_vnode.h13
-rw-r--r--core/src/core.c10
-rw-r--r--core/src/rtdev.c91
-rw-r--r--core/src/vman.c472
-rw-r--r--core/src/vnode.c339
-rw-r--r--include/internal/mr_common.h16
-rw-r--r--runtime/CMakeLists.txt3
-rw-r--r--runtime/include/mr_runtime.h7
-rw-r--r--runtime/include/runtime.h17
-rw-r--r--runtime/src/app.c25
-rw-r--r--runtime/src/event.c444
-rw-r--r--runtime/src/export.c15
-rw-r--r--runtime/src/runtime.c8
-rw-r--r--service/src/rxtx.c2
17 files changed, 829 insertions, 658 deletions
diff --git a/app/src/marsio.c b/app/src/marsio.c
index 3c3a875..2b25cb4 100644
--- a/app/src/marsio.c
+++ b/app/src/marsio.c
@@ -154,12 +154,12 @@ static int mrapp_init(const char * appsym, struct mrapp_config * config)
{
// �����
int ret = mrapp_config_check(appsym, config);
- if(ret < 0)
+ if (ret < 0)
{
MR_LOG(ERR, MRLIB, "Marsio config check failed. \n");
return ret;
}
-
+
// Ӧ��ע��
ret = mr_app_register(appsym, config->coremask, config->autoexit);
if (ret < 0)
@@ -168,6 +168,14 @@ static int mrapp_init(const char * appsym, struct mrapp_config * config)
return -1;
}
+ // �������
+ ret = mr_app_crash_raiser_register(appsym);
+ if(ret < 0)
+ {
+ MR_LOG(ERR, MRLIB, "Cannot register application %s crash monitor. \n", appsym);
+ return -2;
+ }
+
MR_LOG(INFO, MRLIB, "Application %s register success. \n", appsym);
// ������̵�Privָ�벻Ϊ�գ�˵����ǰ���й����ָ�������
diff --git a/core/include/mr_core.h b/core/include/mr_core.h
index 7d96417..6bba16f 100644
--- a/core/include/mr_core.h
+++ b/core/include/mr_core.h
@@ -1,5 +1,7 @@
#pragma once
+#include <sys/queue.h>
+
// ǰ������������ͷ�ļ���ѭ������
struct mr_global_config;
struct devman_ctx;
@@ -8,6 +10,8 @@ struct vnodeman_ctx;
struct mrb_pool_t;
struct rtdev_desc_list;
+TAILQ_HEAD(rtdev_desc_list, rtdev_desc);
+
// Core��ʵ���ṹ�壨ȫ�֣�
struct mr_core_instance
{
diff --git a/core/include/mr_rtdev.h b/core/include/mr_rtdev.h
index bd08a26..52b2758 100644
--- a/core/include/mr_rtdev.h
+++ b/core/include/mr_rtdev.h
@@ -7,7 +7,6 @@
*
*/
-
#include <rte_mempool.h>
#include <mr_core.h>
#include <mr_vnode.h>
@@ -71,8 +70,6 @@ struct rtdev_stat_info
uint64_t ftx_missed[MR_SID_MAX];
};
-TAILQ_HEAD(rtdev_desc_list, rtdev_desc);
-
// ����һ������ʱ�����豸
struct rtdev_desc * mr_rtdev_create(struct mr_core_instance * instance, const char * devsym,
struct rte_mempool * pool, unsigned int nr_serv_thread, unsigned int sz_tunnel, unsigned int sz_buffer);
@@ -88,4 +85,8 @@ int mr_rt_device_close(struct rtdev_app_desc * desc);
// ����ʱ�豸��Ϣͳ��
int mr_rtdev_stats_get(struct rtdev_desc * dev_desc, struct rtdev_stat_info * stat_info);
-int mr_rtdev_app_stats_get(struct rtdev_app_desc * app_desc, struct rtdev_stat_info * stat_info); \ No newline at end of file
+int mr_rtdev_app_stats_get(struct rtdev_app_desc * app_desc, struct rtdev_stat_info * stat_info);
+
+#include <mr_runtime.h>
+void rt_dev_app_crash_event_handler(int app_event, const char * appsym,
+ struct appinfo * appinfo, void * arg); \ No newline at end of file
diff --git a/core/include/mr_vnode.h b/core/include/mr_vnode.h
index f650102..b6a9b03 100644
--- a/core/include/mr_vnode.h
+++ b/core/include/mr_vnode.h
@@ -56,9 +56,9 @@ struct vnode_cons
TAILQ_ENTRY(vnode_cons) next;
char symbol[MR_SYMBOL_MAX];
struct vnode * vnode;
- struct vnode_consq * consq;
unsigned int nr_consq;
- struct tunnel_block_list block_list;
+ int max_idx;
+ struct tunnel_block* block_list[MR_PROD_MAX];
unsigned int nr_block;
unsigned int cur_attach;
struct vnode_cons_stat stat[MR_SID_MAX];
@@ -70,12 +70,13 @@ struct vnode_prod
TAILQ_ENTRY(vnode_prod) next;
char symbol[MR_SYMBOL_MAX];
struct vnode * vnode;
- struct vnode_prodq * prodq;
unsigned int nr_prodq;
- struct tunnel_block_list block_list;
+ int max_idx;
+ struct tunnel_block* block_list[MR_CONS_MAX];
unsigned int nr_block;
unsigned int cur_attach;
struct vnode_prod_stat stat[MR_SID_MAX];
+
};
/* Virtual Data-Node Consumer Queue Structure */
@@ -113,7 +114,7 @@ struct vnode
/* Tunnel Enqueue Buffer Size */
unsigned int sz_tunnel_buffer;
/* Read-Write Lock */
- mr_spin_rwlock_t rwlock;
+ mr_spin_rwlock_t rwlock; //guarantees one operator(consumer or producer, create or destroy) a time
};
/* Virtual Data-Node Operation Callback Structure
@@ -173,4 +174,4 @@ int vnode_prod_attach(struct vnode * node, struct vnode_prod* prod);
#ifdef __cplusplus
}
-#endif \ No newline at end of file
+#endif
diff --git a/core/src/core.c b/core/src/core.c
index 20e7dde..2067080 100644
--- a/core/src/core.c
+++ b/core/src/core.c
@@ -10,6 +10,8 @@
#include <rte_ip_frag.h>
#include <mr_core.h>
#include "mr_buffer.h"
+#include "runtime.h"
+#include "mr_rtdev.h"
// �ⲿ�����������豸������
extern int devman_init(struct mr_core_instance * core_instance);
@@ -54,8 +56,14 @@ struct mr_core_instance * mr_core_instance_create(struct mr_core_config* config)
struct mrb_pool_t * pool = MRB_pool_handle_create();
MR_CHECK(pool != NULL, "Failed at MRB Pool create. ");
-
instance->mrb_pool_handle = pool;
+
+ instance->rtdev_list = rte_zmalloc(NULL, sizeof(struct rtdev_desc_list), 0);
+ MR_CHECK(instance->rtdev_list != NULL, "Cannot alloc memory for Rt-device list");
+ TAILQ_INIT(instance->rtdev_list);
+
+ // ע�������������
+ mr_app_crash_cb_register(rt_dev_app_crash_event_handler, instance);
return instance;
}
diff --git a/core/src/rtdev.c b/core/src/rtdev.c
index d7cd139..bd207bf 100644
--- a/core/src/rtdev.c
+++ b/core/src/rtdev.c
@@ -132,13 +132,6 @@ int mr_rtdev_stats_get(struct rtdev_desc * dev_desc, struct rtdev_stat_info * st
struct rtdev_desc * mr_rtdev_create(struct mr_core_instance * instance, const char * devsym,
struct rte_mempool * pool, unsigned int nr_serv_thread, unsigned int sz_tunnel, unsigned int sz_buffer)
{
- // �б���û�г�ʼ������������ʱ�б���ʼ��
- if(instance->rtdev_list == NULL)
- {
- instance->rtdev_list = rte_zmalloc(NULL, sizeof(struct rtdev_desc_list), 0);
- TAILQ_INIT(instance->rtdev_list);
- }
-
// �����������ڴ�
struct rtdev_desc * desc = rte_zmalloc(NULL, sizeof(struct rtdev_desc), 0);
MR_CHECK(desc != NULL, "Cannot alloc memory for rtdev-desc %s", devsym);
@@ -242,20 +235,30 @@ struct rtdev_app_desc * mr_rt_device_open(struct mr_core_instance * instance,
// ΪRX����ע��������
char vnodesym[MR_SYMBOL_MAX];
- snprintf(vnodesym, sizeof(vnodesym), __PATTERN_CONS_APP, appsym);
- desc->vnode_cons_rx = vnode_create_cons(dev_desc->vnode_rx, &desc->ops, vnodesym, nr_rx_stream);
- if (desc->vnode_cons_rx == NULL) goto prod_cons_create_failed;
+ if (nr_rx_stream > 0)
+ {
+ snprintf(vnodesym, sizeof(vnodesym), __PATTERN_CONS_APP, appsym);
+ desc->vnode_cons_rx = vnode_create_cons(dev_desc->vnode_rx, &desc->ops, vnodesym, nr_rx_stream);
+ if (desc->vnode_cons_rx == NULL) goto prod_cons_create_failed;
+ }
+
// ΪTXע��������
- snprintf(vnodesym, sizeof(vnodesym), __PATTERN_CONS_APP, appsym);
- desc->vnode_prod_tx = vnode_create_prod(dev_desc->vnode_tx, &desc->ops, vnodesym, nr_tx_stream);
- if (desc->vnode_prod_tx == NULL) goto prod_cons_create_failed;
-
+ if (nr_tx_stream > 0)
+ {
+ snprintf(vnodesym, sizeof(vnodesym), __PATTERN_CONS_APP, appsym);
+ desc->vnode_prod_tx = vnode_create_prod(dev_desc->vnode_tx, &desc->ops, vnodesym, nr_tx_stream);
+ if (desc->vnode_prod_tx == NULL) goto prod_cons_create_failed;
+ }
+
// ΪFTXע��������
- snprintf(vnodesym, sizeof(vnodesym), __PATTERN_CONS_APP, appsym);
- desc->vnode_prod_ftx = vnode_create_prod(dev_desc->vnode_ftx, &desc->ops, vnodesym, nr_tx_stream);
- if (desc->vnode_prod_ftx == NULL) goto prod_cons_create_failed;
-
+ if (nr_tx_stream > 0)
+ {
+ snprintf(vnodesym, sizeof(vnodesym), __PATTERN_CONS_APP, appsym);
+ desc->vnode_prod_ftx = vnode_create_prod(dev_desc->vnode_ftx, &desc->ops, vnodesym, nr_tx_stream);
+ if (desc->vnode_prod_ftx == NULL) goto prod_cons_create_failed;
+ }
+
// ����ע���б�
rte_rwlock_write_lock(&dev_desc->app_desc_lock);
TAILQ_INSERT_TAIL(&dev_desc->app_desc_list, desc, next);
@@ -278,13 +281,38 @@ clean:
return NULL;
}
+int mr_rt_device_iterate(struct mr_core_instance * instance, struct rtdev_desc ** out_dev_desc)
+{
+ // ������Ϊ�գ���ͷ��ʼ������������ҵ�������һ������
+ if (*out_dev_desc == NULL)
+ {
+ *out_dev_desc = TAILQ_FIRST(instance->rtdev_list);
+ return 0;
+ }
+ else
+ {
+ *out_dev_desc = TAILQ_NEXT(*out_dev_desc, next);
+ }
+
+ // ������β�������ش�����
+ if (*out_dev_desc == NULL) return -ENOENT;
+ return 0;
+}
+
// �ر��豸
int mr_rt_device_close(struct rtdev_app_desc * desc)
{
struct rtdev_desc * dev_desc = desc->dev_desc;
- if (desc->vnode_cons_rx != NULL) vnode_delete_cons(desc->vnode_cons_rx, &desc->ops);
- if (desc->vnode_prod_tx != NULL) vnode_delete_prod(desc->vnode_prod_tx, &desc->ops);
- if (desc->vnode_prod_ftx != NULL) vnode_delete_prod(desc->vnode_prod_ftx, &desc->ops);
+
+ // �����������̵��ã�����ָ��ָ��ĺ�����ַ��ͬ���������һ��
+ struct vnode_ops _ops;
+ _ops = _rtdev_vnode_ops;
+ _ops.pool_dup_object = dev_desc->ops.pool_dup_object;
+ _ops.pool_new_object = dev_desc->ops.pool_new_object;
+
+ if (desc->vnode_cons_rx != NULL) vnode_delete_cons(desc->vnode_cons_rx, &_ops);
+ if (desc->vnode_prod_tx != NULL) vnode_delete_prod(desc->vnode_prod_tx, &_ops);
+ if (desc->vnode_prod_ftx != NULL) vnode_delete_prod(desc->vnode_prod_ftx, &_ops);
rte_rwlock_write_lock(&dev_desc->app_desc_lock);
TAILQ_REMOVE(&dev_desc->app_desc_list, desc, next);
@@ -292,4 +320,25 @@ int mr_rt_device_close(struct rtdev_app_desc * desc)
rte_free(desc);
return 0;
+}
+
+// Ӧ�ñ��������ص�����
+void rt_dev_app_crash_event_handler(int app_event, const char * appsym,
+ struct appinfo * appinfo, void * arg)
+{
+ struct mr_core_instance * core_instance = (struct mr_core_instance *)arg;
+ assert(core_instance != NULL);
+
+ struct rtdev_desc * dev_desc_iterate = NULL;
+ while(mr_rt_device_iterate(core_instance, &dev_desc_iterate) >= 0)
+ {
+ struct rtdev_app_desc * app_desc = mr_rtdev_app_lookup(
+ dev_desc_iterate, appsym);
+ if (app_desc == NULL) continue;
+ mr_rt_device_close(app_desc);
+ MR_LOG(INFO, BASE, "RT-device %s from applicaton %s closed. \n",
+ dev_desc_iterate->symbol, appsym);
+ }
+
+ return;
} \ No newline at end of file
diff --git a/core/src/vman.c b/core/src/vman.c
deleted file mode 100644
index 243727b..0000000
--- a/core/src/vman.c
+++ /dev/null
@@ -1,472 +0,0 @@
-/* \brief VNodeManager VNode管理器
- *
- * 用于对系统中存在的VNode节点进行管理,避免每个进程管理
- * 自己的VNode的节点。
- *
- * Author : Qiuwen Lu<[email protected]>
- * Date : 2016-09-01
- *
- */
-
-#include <stdlib.h>
-#include <sys/queue.h>
-#include <assert.h>
-#include <string.h>
-#include <rte_log.h>
-#include <rte_malloc.h>
-#include <rte_mbuf.h>
-
-#include <mr_common.h>
-#include <mr_vnode.h>
-#include <mr_vman.h>
-#include <mr_core.h>
-
-struct vnode_record
-{
- TAILQ_ENTRY(vnode_record) next;
- char sym[MR_SYMBOL_MAX];
- struct vnode * node;
-};
-
-TAILQ_HEAD(vnode_record_list, vnode_record);
-
-struct vnodeman_ctx
-{
- mr_spinlock_t lock;
- struct vnode_record_list vnode_list;
-};
-
-static void __wrap_object_delete(void * object)
-{
- return rte_pktmbuf_free(object);
-}
-
-//TODO: 报文缓冲区池决定机制
-static void * __wrap_object_new(void * object)
-{
- return rte_pktmbuf_alloc(mrbuf_get_mempool(0));
-}
-
-static void * __wrap_object_dup(void * object)
-{
- return rte_pktmbuf_clone(object, mrbuf_get_mempool(0));
-}
-
-// VNode操作指针
-struct vnode_ops vnodeman_ops =
-{
- .op_object_delete = __wrap_object_delete,
- .op_object_new = __wrap_object_new,
- .op_object_dup = __wrap_object_dup
-};
-
-struct vnode_record * __vnodeman_lookup_unsafe(struct vnodeman_ctx * ctx_,
- const char * sym)
-{
- struct vnode_record * node_record_ret = NULL;
- struct vnode_record * node_record_iter = NULL;
-
- TAILQ_FOREACH(node_record_iter, &ctx_->vnode_list, next)
- {
- if(strncmp(sym, node_record_iter->sym, sizeof(node_record_iter->sym)) == 0)
- {
- node_record_ret = node_record_iter;
- break;
- }
- }
- return node_record_ret;
-}
-
-/**
- * \brief 创建VNode操作,内部实现,非线程安全
- * \param ctx_ VNodeman内部句柄
- * \param sym VNode的名称
- * \param sz_tunnel VNode队列长度
- * \param sz_buffer 队列缓冲区长度
- * \param flags 预留,现在没有作用
- * \return 是否成功,>=0成功,<0失败。
- */
-static int __vnodeman_create_unsafe(struct vnodeman_ctx * ctx_, const char * sym,
- size_t sz_tunnel, size_t sz_buffer, flags_t flags)
-{
- struct vnode * node;
- struct vnode_record * node_record;
-
- // 查找列表,查看注册的名称是否存在
- node_record = __vnodeman_lookup_unsafe(ctx_, sym);
- if(node_record != NULL)
- {
- MR_LOG(INFO, BASE, "VNodeMan, VNodeManCreate, "
- "VNode %s existed, failed.\n", sym);
- return -1;
- }
-
- // 创建vnode节点
- node = vnode_create(sym, &vnodeman_ops, sz_tunnel, sz_buffer);
- if(node == NULL)
- {
- MR_LOG(INFO, BASE, "VNodeMan, VNodeManCreate, "
- "Cannot create vnode, sym=%s, sz_tunnel=%zu, sz_buffer=%zu\n",
- sym, sz_tunnel, sz_buffer);
- return -2;
- }
-
- // 创建vnode_record链表节点
- node_record = rte_zmalloc(NULL, sizeof(struct vnode_record), 0);
- if(node_record == NULL)
- {
- MR_LOG(INFO, BASE, "VNodeMan, VNodeManCreate, "
- "Cannot alloc memory for vnode record, failed.\n");
- return -3;
- }
-
- snprintf(node_record->sym, sizeof(node_record->sym), "%s", sym);
- node_record->node = node;
-
- // 创建成功后,插入ctx中的vnode_record链表
- TAILQ_INSERT_TAIL(&ctx_->vnode_list, node_record, next);
-
- MR_LOG(INFO, BASE, "VNodeMan, VNodeCreate, "
- "Vnode %s(sz_tunnel=%zu, sz_buffer=%zu) created. \n",
- node_record->sym, sz_tunnel, sz_buffer);
-
- return 0;
-}
-
-static int __vnodeman_unregister_consumer_unsafe(struct vnodeman_ctx * ctx_,
- const char * vnode_sym, const char * cons_sym)
-{
- struct vnode_record * vnode_record = NULL;
- struct vnode_cons * cons = NULL;
- int ret = 0;
-
- vnode_record = __vnodeman_lookup_unsafe(ctx_, vnode_sym);
- if (vnode_record == NULL)
- {
- MR_LOG(WARNING, BASE, "VNodeMan, VNodeManUnregisterConsumer, "
- "Vnode %s does not existed, failed. \n", vnode_sym);
- ret = -1; goto errout;
- }
-
- cons = vnode_cons_lookup(vnode_record->node, cons_sym);
- if (cons == NULL)
- {
- MR_LOG(WARNING, BASE, "VNodeMan, VNodeManUnregisterConsumer, "
- "Vnode %s consumer %s does not existed, failed. \n",
- vnode_sym, cons_sym);
- ret = -2; goto errout;
- }
-
- ret = vnode_delete_cons(cons, &vnodeman_ops);
- if(ret < 0)
- {
- MR_LOG(WARNING, BASE, "VNodeMan, VNodeManUnregisterConsumer, "
- "Cannot delete cons %s for vnode %s, failed. \n",
- cons_sym, vnode_sym);
- ret = -3; goto errout;
- }
-
- return 0;
-errout:
- return ret;
-}
-
-static int __vnodeman_unregister_producer_unsafe(struct vnodeman_ctx * ctx_,
- const char * vnode_sym, const char * prod_sym)
-{
- struct vnode_record * vnode_record = NULL;
- struct vnode_prod * prod = NULL;
- int ret = 0;
-
- vnode_record = __vnodeman_lookup_unsafe(ctx_, vnode_sym);
- if (vnode_record == NULL)
- {
- MR_LOG(WARNING, BASE, "VNodeMan, VNodeManUnregisterProducer, "
- "Vnode %s does not existed, failed. \n", vnode_sym);
- ret = -1; goto errout;
- }
-
- prod = vnode_prod_lookup(vnode_record->node, prod_sym);
- if (prod == NULL)
- {
- MR_LOG(WARNING, BASE, "VNodeMan, VNodeManUnregisterProducer, "
- "Vnode %s producer %s does not existed, failed. \n",
- vnode_sym, prod_sym);
- ret = -2; goto errout;
- }
-
- ret = vnode_delete_prod(prod, &vnodeman_ops);
- if (ret < 0)
- {
- MR_LOG(WARNING, BASE, "VNodeMan, VNodeManUnregisterProducer, "
- "Cannot delete cons %s for vnode %s, failed. \n",
- prod_sym, vnode_sym);
- ret = -3; goto errout;
- }
-
- return 0;
-errout:
- return ret;
-}
-
-static int __vnodeman_register_consumer_unsafe(struct vnodeman_ctx * ctx_,
- const char * vnode_sym, const char * cons_sym, unsigned int nr_cons_queue)
-{
- struct vnode_record * node_record;
- node_record = __vnodeman_lookup_unsafe(ctx_, vnode_sym);
- if(node_record == NULL)
- {
- MR_LOG(INFO, BASE, "VNodeMan, VNodeManRegisterConsumer, "
- "Vnode %s does not existed, failed. \n", vnode_sym);
- return -1;
- }
-
- struct vnode_cons * cons;
- cons = vnode_create_cons(node_record->node,
- &vnodeman_ops, cons_sym, nr_cons_queue);
-
- if(cons == NULL)
- {
- MR_LOG(INFO, BASE, "VNodeMan, VNodeManRegisterConsumer, "
- "Cannot create consumer %s for vnode %s \n", cons_sym, vnode_sym);
- return -2;
- }
-
- return 0;
-}
-
-static int __vnodeman_register_producer_unsafe(struct vnodeman_ctx * ctx_,
-const char * vnode_sym, const char * prod_sym, unsigned int nr_prod_queue)
-{
- struct vnode_record * node_record;
- node_record = __vnodeman_lookup_unsafe(ctx_, vnode_sym);
- if (node_record == NULL)
- {
- MR_LOG(INFO, BASE, "VNodeMan, VNodeManRegisterProducer, "
- "Vnode %s does not existed, failed. \n", vnode_sym);
- return -1;
- }
-
- struct vnode_prod * prod;
- prod = vnode_create_prod(node_record->node,
- &vnodeman_ops, prod_sym, nr_prod_queue);
-
- if (prod == NULL)
- {
- MR_LOG(INFO, BASE, "VNodeMan, VNodeManRegisterProducer, "
- "Cannot create consumer %s for vnode %s. \n", prod_sym, vnode_sym);
- return -2;
- }
-
- return 0;
-}
-
-struct vnodeman_attach_desc * __vnodeman_consumer_attach_unsafe(
- struct vnodeman_ctx * ctx_, const char * vnode_sym, const char * cons_sym)
-{
- struct vnodeman_attach_desc * desc;
- desc = rte_zmalloc(NULL, sizeof(struct vnodeman_attach_desc), 0);
- if(desc == NULL)
- {
- MR_LOG(INFO, BASE, "VNodeMan, ConsumerAttach, "
- "Cannot alloc memory for vnode_attach_desc, failed. \n");
- goto errout;
- }
-
- struct vnode_record * vnode_record = NULL;
- struct vnode_cons * cons = NULL;
-
- vnode_record = __vnodeman_lookup_unsafe(ctx_, vnode_sym);
- if(vnode_record == NULL)
- {
- MR_LOG(INFO, BASE, "VNodeMan, ConsumerAttach, "
- "Vnode %s does not existed, failed. \n", vnode_sym);
- goto errout;
- }
-
- cons = vnode_cons_lookup(vnode_record->node, cons_sym);
- if(cons == NULL)
- {
- MR_LOG(INFO, BASE, "VNodeMan, ConsumerAttach, "
- "Vnode %s consumer %s does not existed, failed. \n",
- vnode_sym, cons_sym);
- goto errout;
- }
-
- int queue_id;
- queue_id = vnode_cons_attach(vnode_record->node, cons);
- if(queue_id < 0)
- {
- MR_LOG(INFO, BASE, "VNodeMan, ConsumerAttach, "
- "Cannot attach to vnode %s consumer %s, failed. \n",
- vnode_sym, cons_sym);
- goto errout;
- }
-
- desc->node = vnode_record->node;
- desc->desc_type = VNODEMAN_ATTACH_CONSUMER;
- desc->queue_id = queue_id;
- desc->cons = cons;
-
- MR_LOG(INFO, BASE, "VNodeMan, ConsumerAttach, "
- "Attached to vnode %s consumer %s, queue_id=%d.\n",
- vnode_sym, cons_sym, desc->queue_id);
-
- return desc;
-
-errout:
- if (desc) rte_free(desc);
- return NULL;
-}
-
-struct vnodeman_attach_desc * __vnodeman_producer_attach_unsafe(
- struct vnodeman_ctx * ctx_, const char * vnode_sym, const char * prod_sym)
-{
- struct vnodeman_attach_desc * desc;
- desc = rte_zmalloc(NULL, sizeof(struct vnodeman_attach_desc), 0);
- if (desc == NULL)
- {
- MR_LOG(INFO, BASE, "VNodeMan, ProducerAttach, "
- "Cannot alloc memory for vnode_attach_desc, failed. \n");
- goto errout;
- }
-
- struct vnode_record * vnode_record = NULL;
- struct vnode_prod * prod = NULL;
-
- vnode_record = __vnodeman_lookup_unsafe(ctx_, vnode_sym);
- if (vnode_record == NULL)
- {
- MR_LOG(INFO, BASE, "VNodeMan, ProducerAttach, "
- "Vnode %s does not existed, failed. \n", vnode_sym);
- goto errout;
- }
-
- prod = vnode_prod_lookup(vnode_record->node, prod_sym);
- if (prod == NULL)
- {
- MR_LOG(INFO, BASE, "VNodeMan, ProducerAttach, "
- "Vnode %s producer %s does not existed, failed. \n",
- vnode_sym, prod_sym);
- goto errout;
- }
-
- int queue_id;
- queue_id = vnode_prod_attach(vnode_record->node, prod);
- if (queue_id < 0)
- {
- MR_LOG(INFO, BASE, "VNodeMan, ProducerAttach, "
- "Cannot attach to vnode %s producer %s, failed. \n",
- vnode_sym, prod_sym);
- goto errout;
- }
-
- desc->node = vnode_record->node;
- desc->desc_type = VNODEMAN_ATTACH_PRODUCER;
- desc->queue_id = queue_id;
- desc->prod = prod;
-
- MR_LOG(INFO, BASE, "VNodeMan, ProducerAttach, "
- "Attached to vnode %s producer %s, queue_id=%d.\n",
- vnode_sym, prod_sym, desc->queue_id);
-
- return desc;
-
-errout:
- if (desc) rte_free(desc);
- return NULL;
-}
-
-// TODO: 上锁粒度太大,应该尽量降低锁的粒度或使用无锁实现。
-int mr_vnodeman_create(const char * sym, size_t sz_tunnel, size_t sz_buffer, flags_t flags)
-{
- int ret = 0;
- struct vnodeman_ctx * ctx = mr_core_default_instance_get()->vnodeman_ctx;
-
- mr_spin_lock(&ctx->lock);
- ret = __vnodeman_create_unsafe(ctx, sym, sz_tunnel, sz_buffer, flags);
- mr_spin_unlock(&ctx->lock);
- return ret;
-}
-
-int mr_vnodeman_unregister_consumer(const char * vnode_sym, const char * cons_sym)
-{
- int ret = 0;
- struct vnodeman_ctx * ctx = mr_core_default_instance_get()->vnodeman_ctx;
-
- mr_spin_lock(&ctx->lock);
- ret = __vnodeman_unregister_consumer_unsafe(ctx, vnode_sym, cons_sym);
- mr_spin_unlock(&ctx->lock);
- return ret;
-}
-
-int mr_vnodeman_unregister_producer(const char * vnode_sym, const char * cons_sym)
-{
- int ret = 0;
- struct vnodeman_ctx * ctx = mr_core_default_instance_get()->vnodeman_ctx;
-
- mr_spin_lock(&ctx->lock);
- ret = __vnodeman_unregister_producer_unsafe(ctx, vnode_sym, cons_sym);
- mr_spin_unlock(&ctx->lock);
- return ret;
-}
-
-int mr_vnodeman_register_consumer(const char * vnode_sym, const char * cons_sym,
- unsigned int nr_cons_queue)
-{
- int ret = 0;
- struct vnodeman_ctx * ctx = mr_core_default_instance_get()->vnodeman_ctx;
-
- mr_spin_lock(&ctx->lock);
- ret = __vnodeman_register_consumer_unsafe(ctx, vnode_sym, cons_sym, nr_cons_queue);
- mr_spin_unlock(&ctx->lock);
- return ret;
-}
-
-int mr_vnodeman_register_producer(const char * vnode_sym, const char * prod_sym,
- unsigned int nr_prod_queue)
-{
- int ret = 0;
- struct vnodeman_ctx * ctx = mr_core_default_instance_get()->vnodeman_ctx;
-
- mr_spin_lock(&ctx->lock);
- ret = __vnodeman_register_producer_unsafe(ctx, vnode_sym, prod_sym, nr_prod_queue);
- mr_spin_unlock(&ctx->lock);
- return ret;
-}
-
-struct vnodeman_attach_desc * mr_vnodeman_consumer_attach(const char * vnode_sym,
- const char * cons_sym)
-{
- struct vnodeman_ctx * ctx = mr_core_default_instance_get()->vnodeman_ctx;
- struct vnodeman_attach_desc * desc;
- mr_spin_lock(&ctx->lock);
- desc = __vnodeman_consumer_attach_unsafe(ctx, vnode_sym, cons_sym);
- mr_spin_unlock(&ctx->lock);
- return desc;
-}
-
-struct vnodeman_attach_desc * mr_vnodeman_producer_attach(const char * vnode_sym,
- const char * prod_sym)
-{
- struct vnodeman_ctx * ctx = mr_core_default_instance_get()->vnodeman_ctx;
-
- struct vnodeman_attach_desc * desc;
- mr_spin_lock(&ctx->lock);
- desc = __vnodeman_producer_attach_unsafe(ctx, vnode_sym, prod_sym);
- mr_spin_unlock(&ctx->lock);
- return desc;
-}
-
-int vnodeman_init(struct mr_core_instance * core_instance)
-{
- core_instance->vnodeman_ctx = rte_zmalloc("VNODEMAN_CTX",
- sizeof(struct vnodeman_ctx), 0);
- MR_CHECK(core_instance->vnodeman_ctx != NULL,
- "Cannot alloc memory for vnodeman context, Failed. ");
-
- struct vnodeman_ctx * ctx = core_instance->vnodeman_ctx;
- TAILQ_INIT(&ctx->vnode_list);
- mr_spin_init(&ctx->lock);
- return 0;
-} \ No newline at end of file
diff --git a/core/src/vnode.c b/core/src/vnode.c
index 402d043..66a1420 100644
--- a/core/src/vnode.c
+++ b/core/src/vnode.c
@@ -1,8 +1,8 @@
/* Virtual Data-Node Library for MARSIO
- Author : Lu Qiuwen<[email protected]>
- Date : 2016-08-10
-
+ Author : Lu Qiuwen<[email protected]> Zheng Chao<[email protected]>
+ Date : 2016-12-10
+
TODO: Try to use RCU in thread safe mode.
*/
@@ -17,6 +17,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
+#include <unistd.h> //usleep
#include <sys/queue.h>
#include <mr_common.h>
@@ -71,7 +72,7 @@ struct tunnel_desc
};
/* Alloc a tunnel for fixed size */
-struct tunnel_desc * tunnel_new(const char * symbol, unsigned int size,
+struct tunnel_desc * tunnel_new(const char * symbol, unsigned int size,
unsigned int sz_buffer)
{
struct tunnel_desc * desc = (struct tunnel_desc *)
@@ -84,7 +85,7 @@ struct tunnel_desc * tunnel_new(const char * symbol, unsigned int size,
return NULL;
}
- desc->tunnel_object = rte_ring_create(symbol, size,
+ desc->tunnel_object = rte_ring_create(symbol, size,
SOCKET_ID_ANY, RING_F_SC_DEQ | RING_F_SP_ENQ);
if (unlikely(desc->tunnel_object == NULL))
@@ -94,7 +95,7 @@ struct tunnel_desc * tunnel_new(const char * symbol, unsigned int size,
__str_errno());
return NULL;
}
-
+
snprintf(desc->symbol, sizeof(desc->symbol), "%s", symbol);
desc->tunnel_size = size;
@@ -103,28 +104,36 @@ struct tunnel_desc * tunnel_new(const char * symbol, unsigned int size,
desc->en_buffer = rte_zmalloc(NULL, sizeof(void *) * sz_buffer, 0);
desc->sz_en_buffer = sz_buffer;
-
+
if (unlikely(desc->en_buffer == NULL))
{
MR_LOG(INFO, BASE, "LibVNode, TunnelBlockNew, "
"Cannot alloc memory for tunnel buffer\n");
return NULL;
}
-
+
return desc;
}
/* Delete a tunnel */
-int tunnel_delete(struct tunnel_desc * desc)
+int tunnel_delete(struct tunnel_desc * desc, struct vnode_ops * ops)
{
+ for (int i = 0; i < desc->sz_en_buffer_used; i++)
+ ops->op_object_delete(desc->en_buffer[i]);
+
+ void * object;
+ while (rte_ring_dequeue(desc->tunnel_object, &object) == 0)
+ ops->op_object_delete(object);
+
+ assert(rte_ring_empty(desc->tunnel_object) == 1);
rte_free(desc->en_buffer);
rte_ring_free(desc->tunnel_object);
rte_free(desc);
return 0;
}
-static inline void tunnel_enqueue_without_buffer(struct tunnel_desc * desc,
- struct vnode_ops * ops, void * obj)
+static inline void tunnel_enqueue_without_buffer(struct tunnel_desc * desc,
+ struct vnode_ops * ops, void * obj)
{
assert(desc->en_buffer == NULL && desc->sz_en_buffer == 0);
int ret = rte_ring_enqueue_burst(desc->tunnel_object, &obj, 1);
@@ -133,7 +142,7 @@ static inline void tunnel_enqueue_without_buffer(struct tunnel_desc * desc,
}
static inline void tunnel_enqueue(struct vnode_prod * prod, struct vnode_cons * cons,
- unsigned int prodq, unsigned int consq, struct tunnel_desc * desc,
+ unsigned int prodq, unsigned int consq, struct tunnel_desc * desc,
struct vnode_ops * ops, void * obj)
{
// append the object at the tail of enqueue buffer.
@@ -153,7 +162,7 @@ static inline void tunnel_enqueue(struct vnode_prod * prod, struct vnode_cons *
int n_to_send = desc->sz_en_buffer;
int n_send = rte_ring_sp_enqueue_burst(desc->tunnel_object,
desc->en_buffer, n_to_send);
-
+
if (likely(n_send == n_to_send)) goto out;
// release all the objects which not send successfully.
@@ -173,7 +182,7 @@ out:
VNODE_STAT_UPDATE(cons, consq, on_line, n_to_send);
VNODE_STAT_UPDATE(cons, consq, recieved, n_send);
VNODE_STAT_UPDATE(cons, consq, missed, n_to_send - n_send);
-
+
// 清空缓冲区
desc->sz_en_buffer_used = 0;
return;
@@ -189,12 +198,12 @@ static inline int tunnel_dequeue(struct tunnel_desc * desc, struct vnode_ops * o
/* +-------------+-------------+-------------+------------+------------+
| nr_prodq | nr_consq | descs[0][0] | desc[0][1] | .......... |
+-------------+-------------+-------------+------------+------------+
- Len = sizeof(nr_prodq) + sizeof(nr_consq) +
- sizeof(descs) * nr_prodq * nr_consq
+ Len = sizeof(nr_prodq) + sizeof(nr_consq) +
+ sizeof(descs) * nr_prodq * nr_consq
*/
-static inline struct tunnel_desc ** tunnel_block_locate(struct tunnel_block * block,
+static inline struct tunnel_desc ** tunnel_block_locate(struct tunnel_block * block,
unsigned int prodq_id, unsigned int consq_id)
{
assert(prodq_id < block->nr_prodq && consq_id < block->nr_consq);
@@ -209,7 +218,7 @@ int tunnel_block_delete(struct tunnel_block * block, struct vnode_ops * ops)
for (int consq_id = 0; consq_id < block->nr_consq; consq_id++)
{
if (*tunnel_block_locate(block, prodq_id, consq_id) != NULL)
- tunnel_delete(*tunnel_block_locate(block, prodq_id, consq_id));
+ tunnel_delete(*tunnel_block_locate(block, prodq_id, consq_id), ops);
}
}
@@ -240,19 +249,19 @@ int tunnel_block_try_gc(struct tunnel_block * block, struct vnode_ops * ops)
/* Alloc a block of tunnels, and init all the tunnels */
struct tunnel_block * tunnel_block_new(struct vnode_ops * ops, const char * symbol,
- struct vnode_prod * prod, struct vnode_cons * cons,
+ struct vnode_prod * prod, struct vnode_cons * cons,
unsigned int tunnel_size, unsigned int tun_sz_buffer)
{
unsigned int nr_prodq = prod->nr_prodq;
unsigned int nr_consq = cons->nr_consq;
-
+
unsigned int block_size = sizeof(struct tunnel_block) +
sizeof(struct tunnel_desc *) * (nr_prodq * nr_consq);
-
+
struct tunnel_block * block = (struct tunnel_block *)
rte_zmalloc(NULL, block_size, 0);
- if(block == NULL)
+ if (block == NULL)
{
MR_LOG(INFO, BASE, "LibVNode, TunnelBlockNew, "
"Cannot alloc memory for block storage, nr_prodq=%d"
@@ -287,9 +296,9 @@ struct tunnel_block * tunnel_block_new(struct vnode_ops * ops, const char * symb
}
MR_LOG(DEBUG, BASE, "LibVNode, TunnelBlockNew, "
- "Tunnel %s(object=%p, sz_tunnel=%d, sz_buffer=%d) created.\n",
+ "Tunnel %s(object=%p, sz_tunnel=%d, sz_buffer=%d) created.\n",
tunnel_sym, tdesc->tunnel_object, tunnel_size, tun_sz_buffer);
-
+
*tunnel_block_locate(block, prodq_id, consq_id) = tdesc;
}
}
@@ -332,7 +341,7 @@ static inline int tunnel_block_dequeue(struct tunnel_block * block, struct vnode
{
unsigned int nr_obj = 0, nr_obj_recv = 0;
unsigned int nr_obj_left = nr_max_obj;
-
+
for (int prodq = 0; prodq < block->nr_prodq; prodq++)
{
struct tunnel_desc * tunnel = *tunnel_block_locate(block, prodq, consq);
@@ -341,7 +350,7 @@ static inline int tunnel_block_dequeue(struct tunnel_block * block, struct vnode
nr_obj_left -= nr_obj_recv;
}
- assert(nr_obj <= nr_max_obj);
+ assert(nr_obj <= nr_max_obj);
return nr_obj;
}
@@ -377,13 +386,49 @@ struct vnode * vnode_create(const char * sym, struct vnode_ops * ops,
object->nr_prod = 0;
object->sz_tunnel = sz_tunnel;
object->sz_tunnel_buffer = sz_tunnel_buffer;
-
+
mr_spin_rwlock_init(&object->rwlock);
TAILQ_INIT(&object->cons_list);
TAILQ_INIT(&object->prod_list);
return object;
}
+int add_block_to_list(struct tunnel_block** list, struct tunnel_block* block, int size, int *max_idx)
+{
+ int i = 0;
+ for (i = 0; i < size; i++)
+ {
+ if (list[i] == NULL)
+ {
+ list[i] = block;// assume this operation is a transaction
+ break;
+ }
+ }
+ if (i == size)
+ {
+ return -1;
+ }
+
+ if (i > *max_idx)
+ {
+ assert(*max_idx + 1 == i);
+ (*max_idx)++;
+ }
+
+ return 0;
+}
+int renew_max_id(struct tunnel_block** list, int size)
+{
+ int i = 0, cnt = 0;
+ for (i = 0; i < size; i++)
+ {
+ if (list[i] != NULL)
+ {
+ cnt++;
+ }
+ }
+ return cnt;
+}
/* Vnode prod or cons increase or decrease functions
These functions are thread unsafe
*/
@@ -400,38 +445,24 @@ int vnode_prod_increase_unsafe(struct vnode * vnode, struct vnode_ops * ops,
TAILQ_FOREACH(cons, &vnode->cons_list, next)
{
char block_sym[MR_SYMBOL_MAX];
- snprintf(block_sym, sizeof(block_sym), "%s-%s-%s",
+ snprintf(block_sym, sizeof(block_sym), "%s-%s-%s",
vnode->symbol, prod->symbol, cons->symbol);
nr_consq = cons->nr_consq;
// create commucation tunnel for each cons and prods
- block = tunnel_block_new(ops, block_sym, prod, cons,
+ block = tunnel_block_new(ops, block_sym, prod, cons,
vnode->sz_tunnel, vnode->sz_tunnel_buffer);
if (block == NULL)
{
MR_LOG(INFO, BASE, "LibVNode, VNodeProdIncrease, "
- "Cannot create block for prod %s to cons %s\n",
+ "Cannot create block for prod %s to cons %s\n",
prod->symbol, cons->symbol);
return -1;
}
- // create list item
- struct tunnel_block_list_item * tblist_prod_item;
- tblist_prod_item = tblist_item_new(block);
- struct tunnel_block_list_item * tblist_cons_item;
- tblist_cons_item = tblist_item_new(block);
-
- if(tblist_prod_item == NULL || tblist_cons_item == NULL)
- {
- MR_LOG(INFO, BASE, "LibVNode, VNodeProdIncrease, "
- "Cannot create block item for prod %s to cons %s\n",
- prod->symbol, cons->symbol);
- return -2;
- }
-
// insert block into prod block list and cons block list
- TAILQ_INSERT_TAIL(&prod->block_list, tblist_prod_item, next);
- TAILQ_INSERT_TAIL(&cons->block_list, tblist_cons_item, next);
+ add_block_to_list(prod->block_list, block, MR_CONS_MAX, &(prod->max_idx));
+ add_block_to_list(cons->block_list, block, MR_PROD_MAX, &(cons->max_idx));
prod->nr_block++;
cons->nr_block++;
@@ -448,7 +479,7 @@ int vnode_prod_increase_unsafe(struct vnode * vnode, struct vnode_ops * ops,
}
int vnode_cons_increase_unsafe(struct vnode * vnode, struct vnode_ops * ops,
-struct vnode_cons * cons)
+ struct vnode_cons * cons)
{
int nr_consq = cons->nr_consq;
int nr_prodq = 0;
@@ -459,43 +490,28 @@ struct vnode_cons * cons)
TAILQ_FOREACH(prod, &vnode->prod_list, next)
{
char block_sym[MR_SYMBOL_MAX];
- snprintf(block_sym, sizeof(block_sym), "%s-%s-%s",
+ snprintf(block_sym, sizeof(block_sym), "%s-%s-%s",
vnode->symbol, prod->symbol, cons->symbol);
nr_prodq = prod->nr_prodq;
-
+
// create commucation tunnel for each cons and prods
- block = tunnel_block_new(ops, block_sym, prod, cons,
+ block = tunnel_block_new(ops, block_sym, prod, cons,
vnode->sz_tunnel, vnode->sz_tunnel_buffer);
if (block == NULL)
{
MR_LOG(INFO, BASE, "LibVNode, VNodeConsIncrease, "
- "Cannot create block for prod %s to cons %s\n",
- prod->symbol, cons->symbol);
- return -1;
- }
-
- // create list item
- struct tunnel_block_list_item * tblist_prod_item;
- tblist_prod_item = tblist_item_new(block);
- struct tunnel_block_list_item * tblist_cons_item;
- tblist_cons_item = tblist_item_new(block);
-
- if (tblist_prod_item == NULL || tblist_cons_item == NULL)
- {
- MR_LOG(INFO, BASE, "LibVNode, VNodeProdIncrease, "
- "Cannot create block item for prod %s to cons %s\n",
+ "Cannot create block for prod %s to cons %s\n",
prod->symbol, cons->symbol);
- return -2;
+ return -1;
}
-
// insert block into prod block list and cons block list
- TAILQ_INSERT_TAIL(&prod->block_list, tblist_prod_item, next);
- TAILQ_INSERT_TAIL(&cons->block_list, tblist_cons_item, next);
+ add_block_to_list(prod->block_list, block, MR_CONS_MAX, &(prod->max_idx));
+ add_block_to_list(cons->block_list, block, MR_PROD_MAX, &(cons->max_idx));
prod->nr_block++;
cons->nr_block++;
MR_LOG(DEBUG, BASE, "LibVNode, VNodeConsIncrease, "
- "Insert block %s(object=%p, nr_prodq=%d, nr_consq=%d) in prod %s(obj=%p)"
+ "Insert block %s(object=%p, nr_prodq=%d, nr_consq=%d) in prod %s(obj=%p)"
"and cons %s(obj=%p)\n", block_sym, block, nr_prodq, nr_consq,
prod->symbol, prod, cons->symbol, cons);
}
@@ -509,30 +525,54 @@ struct vnode_cons * cons)
// 垃圾回收,回收已经没有用的TunnelBlock
void vnode_cons_block_gc_unsafe(struct vnode_cons * cons, struct vnode_ops * ops)
{
- struct tunnel_block_list_item * tblist_item_iter;
- TAILQ_FOREACH(tblist_item_iter, &cons->block_list, next)
+ struct tunnel_block* block = NULL;
+ int i = 0, ret = 0;
+ for (i = 0; i < cons->max_idx + 1; i++)
{
- int ret = tunnel_block_try_gc(tblist_item_iter->block, ops);
- if (ret < 0) continue;
+ block = cons->block_list[i];
+ if (block == NULL)
+ {
+ continue;
+ }
- TAILQ_REMOVE(&cons->block_list, tblist_item_iter, next);
- tblist_item_delete(tblist_item_iter);
+ ret = tunnel_block_try_gc(block, ops);
+ if (ret < 0)
+ {
+ continue;
+ }
+
+ cons->block_list[i] = NULL;
+ cons->nr_block--;
+ rte_mb();
}
+ if (i == cons->max_idx)
+ {
+ cons->max_idx = renew_max_id(cons->block_list, MR_PROD_MAX);
+ }
return;
}
// 垃圾回收,回收已经没有用的TunnelBlock
void vnode_prod_block_gc_unsafe(struct vnode_prod * prod, struct vnode_ops * ops)
{
- struct tunnel_block_list_item * tblist_item_iter;
- TAILQ_FOREACH(tblist_item_iter, &prod->block_list, next)
+ struct tunnel_block* block = NULL;
+ int i = 0, ret = 0;
+ for (i = 0; i < prod->max_idx + 1; i++)
{
- int ret = tunnel_block_try_gc(tblist_item_iter->block, ops);
- if (ret < 0) continue;
-
- TAILQ_REMOVE(&prod->block_list, tblist_item_iter, next);
- tblist_item_delete(tblist_item_iter);
+ block = prod->block_list[i];
+ if (block == NULL)
+ {
+ continue;
+ }
+ ret = tunnel_block_try_gc(block, ops);
+ if (ret < 0)
+ {
+ continue;
+ }
+ prod->block_list[i] = NULL;
+ prod->nr_block--;
+ rte_mb();
}
return;
@@ -544,7 +584,7 @@ struct vnode_prod * vnode_create_prod(struct vnode * vnode, struct vnode_ops * o
{
int ret = 0;
struct vnode_prod * prod = rte_zmalloc(NULL, sizeof(struct vnode_prod), 0);
- if(prod == NULL)
+ if (prod == NULL)
{
MR_LOG(INFO, BASE, "LibVNode, VNodeProdCreate, "
"Cannot alloc memory for prod description\n");
@@ -557,15 +597,6 @@ struct vnode_prod * vnode_create_prod(struct vnode * vnode, struct vnode_ops * o
prod->vnode = vnode;
prod->nr_prodq = nr_prodq;
prod->nr_block = 0;
- prod->prodq = rte_zmalloc(NULL, sizeof(struct vnode_prod) * prod->nr_prodq, 0);
- TAILQ_INIT(&prod->block_list);
-
- if(prod->prodq == NULL)
- {
- MR_LOG(INFO, BASE, "LibVNode, VNodeProdCreate, "
- "Cannot alloc memory for prodq descriptions\n");
- goto err;
- }
__write_lock(&vnode->rwlock);
ret = vnode_prod_increase_unsafe(vnode, ops, prod);
@@ -575,7 +606,6 @@ struct vnode_prod * vnode_create_prod(struct vnode * vnode, struct vnode_ops * o
return prod;
err:
- if (prod != NULL && prod->prodq != NULL) rte_free(prod->prodq);
if (prod != NULL) rte_free(prod);
return NULL;
}
@@ -585,7 +615,7 @@ struct vnode_cons * vnode_create_cons(struct vnode * vnode, struct vnode_ops * o
{
int ret = 0;
struct vnode_cons * cons = rte_zmalloc(NULL, sizeof(struct vnode_cons), 0);
- if(cons == NULL)
+ if (cons == NULL)
{
MR_LOG(INFO, BASE, "LibVNode, VNodeConsCreate, "
"Cannot alloc memory for cons description\n");
@@ -598,15 +628,6 @@ struct vnode_cons * vnode_create_cons(struct vnode * vnode, struct vnode_ops * o
cons->vnode = vnode;
cons->nr_consq = nr_consq;
cons->nr_block = 0;
- cons->consq = rte_zmalloc(NULL, sizeof(struct vnode_consq) * cons->nr_consq, 0);
- TAILQ_INIT(&cons->block_list);
-
- if(cons->consq == NULL)
- {
- MR_LOG(INFO, BASE, "LibVNode, VNodeConsCreate, "
- "Cannot alloc memory for consq descriptions\n");
- goto err;
- }
__write_lock(&vnode->rwlock);
ret = vnode_cons_increase_unsafe(vnode, ops, cons);
@@ -616,7 +637,6 @@ struct vnode_cons * vnode_create_cons(struct vnode * vnode, struct vnode_ops * o
return cons;
err:
- if (cons != NULL && cons->consq != NULL) rte_free(cons->consq);
if (cons != NULL) rte_free(cons);
return NULL;
}
@@ -634,7 +654,7 @@ struct vnode_prod * vnode_prod_lookup(struct vnode * vnode, const char * sym)
TAILQ_FOREACH(prod_iter, &vnode->prod_list, next)
{
- if(strncmp(sym, prod_iter->symbol, sizeof(prod_iter->symbol)) == 0)
+ if (strncmp(sym, prod_iter->symbol, sizeof(prod_iter->symbol)) == 0)
{
ret = prod_iter;
break;
@@ -718,35 +738,49 @@ struct vnode_cons_stat * vnode_cons_stat_get(struct vnode_cons * cons)
return cons->stat;
}
+void synchronize_dataplane()
+{
+ usleep(10);//assume each function in operation will finished after such time.
+}
+
int vnode_delete_prod(struct vnode_prod * prod, struct vnode_ops * ops)
{
assert(prod != NULL && prod->vnode != NULL && ops != NULL);
-
- __write_lock(&prod->vnode->rwlock);
-
+ int i = 0;
+ struct tunnel_block* block = NULL;
+
struct vnode * vnode = prod->vnode;
- struct tunnel_block_list_item * tblist_item_iter;
-
+ __write_lock(&prod->vnode->rwlock);
+
// 删除引用的Block
- TAILQ_FOREACH(tblist_item_iter, &prod->block_list, next)
+ for (i = 0; i < prod->max_idx + 1; i++)
{
- assert(tblist_item_iter != NULL);
- tunnel_block_try_delete(tblist_item_iter->block, ops);
- tblist_item_delete(tblist_item_iter);
+ block = prod->block_list[i];
+ if (block == NULL)
+ {
+ continue;
+ }
+ assert(block->deleted == 0);
+ tunnel_block_try_delete(block, ops);
+ prod->block_list[i] = NULL;
+
}
- // 从VNode中移除生产者描述符
+ //remove Producer from VNode
struct vnode_prod_list * prod_list_head = &prod->vnode->prod_list;
TAILQ_REMOVE(prod_list_head, prod, next);
+ prod->vnode->nr_prod--;
+
+ rte_mb();
+ synchronize_dataplane();
- // 执行GC,移除所有消费者描述符中引用的Block
struct vnode_cons * cons_iter;
TAILQ_FOREACH(cons_iter, &vnode->cons_list, next)
{
vnode_cons_block_gc_unsafe(cons_iter, ops);
}
-
__write_unlock(&prod->vnode->rwlock);
+
return 0;
}
@@ -754,21 +788,28 @@ int vnode_delete_cons(struct vnode_cons * cons, struct vnode_ops * ops)
{
assert(cons != NULL && ops != NULL);
__write_lock(&cons->vnode->rwlock);
-
+
struct vnode * vnode = cons->vnode;
- struct tunnel_block_list_item * tblist_item_iter;
+ struct tunnel_block* block = NULL;
// 删除引用的Block
- TAILQ_FOREACH(tblist_item_iter, &cons->block_list, next)
+ for (int i = 0; i < cons->max_idx + 1; i++)
{
- assert(tblist_item_iter != NULL);
- tunnel_block_try_delete(tblist_item_iter->block, ops);
- tblist_item_delete(tblist_item_iter);
+ block = cons->block_list[i];
+ if (block == NULL)
+ {
+ continue;
+ }
+ tunnel_block_try_delete(block, ops);
}
+ rte_mb();
+ synchronize_dataplane();
+
// 从VNode中移除消费者描述符
struct vnode_cons_list * cons_list_head = &cons->vnode->cons_list;
TAILQ_REMOVE(cons_list_head, cons, next);
+ cons->vnode->nr_cons--;
// 执行GC,移除所有生产者描述符中引用的Block
struct vnode_prod * prod_iter;
@@ -806,29 +847,28 @@ int vnode_delete(struct vnode * vnode, struct vnode_ops * ops)
*/
int vnode_enqueue_burst_with_hash(struct vnode_prod * prod, struct vnode_ops * ops,
- unsigned int prodq, void * objects[], uint32_t hash[], int nr_objects)
+ unsigned int prodq, void * objects[], uint32_t hash[], int nr_objects)
{
- int ret;
+ int ret = 0;
assert(nr_objects <= MR_LIBVNODE_MAX_SZ_BURST);
- __read_lock(&prod->vnode->rwlock);
-
+ struct tunnel_block* block = NULL, *last_block = NULL;
// get the last block, this block are enqueued with the original objects.
- struct tunnel_block_list_item * tblist_item_last;
- tblist_item_last = TAILQ_LAST(&prod->block_list, tunnel_block_list);
-
- // tblist_item_last is null when there is not registerd consumer.
- if(unlikely(tblist_item_last == NULL))
+ last_block = prod->block_list[prod->max_idx];
+ // last_block is null when there is not registerd consumer.
+ if (unlikely(last_block == NULL || last_block->deleted == 1))
{
ret = -1; goto out;
}
- assert(tblist_item_last != NULL && tblist_item_last->block != NULL);
// check all the blocks expect last block, and dup objects and enqueue them.
- struct tunnel_block_list_item * tblist_item_iter;
- TAILQ_FOREACH(tblist_item_iter, &prod->block_list, next)
+ for (int block_id = 0; block_id < prod->max_idx; block_id++)
{
- if (tblist_item_iter->block == tblist_item_last->block) break;
+ block = prod->block_list[block_id];
+ if (block == NULL || block->deleted == 1)
+ {
+ continue;
+ }
void * dup_objects[MR_LIBVNODE_MAX_SZ_BURST];
unsigned int nr_dup_objects = 0;
@@ -845,33 +885,36 @@ int vnode_enqueue_burst_with_hash(struct vnode_prod * prod, struct vnode_ops * o
}
//TODO: Dup错误之后,hash值如何对应
- tunnel_block_enqueue_with_hash(tblist_item_iter->block, ops, prodq,
+ tunnel_block_enqueue_with_hash(block, ops, prodq,
dup_objects, hash, nr_dup_objects);
}
// enqueue for the last block, use original objects.
- tunnel_block_enqueue_with_hash(tblist_item_last->block, ops, prodq,
+ tunnel_block_enqueue_with_hash(last_block, ops, prodq,
objects, hash, nr_objects);
ret = 0; goto out;
out:
- __read_unlock(&prod->vnode->rwlock);
return ret;
}
int vnode_dequeue_burst(struct vnode_cons * cons, struct vnode_ops * ops,
- unsigned int consq, void * objects[], int nr_max_objects)
+ unsigned int consq, void * objects[], int nr_max_objects)
{
int nr_used = 0;
int nr_left = nr_max_objects;
int nr_dequeue = 0;
-
- __read_lock(&cons->vnode->rwlock);
- struct tunnel_block_list_item * tblist_item_iter;
- TAILQ_FOREACH(tblist_item_iter, &cons->block_list, next)
+ int i = 0;
+ struct tunnel_block* block = NULL;
+ for (i = 0; i < cons->max_idx + 1; i++)
{
- nr_dequeue = tunnel_block_dequeue(tblist_item_iter->block,
+ block = cons->block_list[i];
+ if (block == NULL || block->deleted == 1)
+ {
+ continue;
+ }
+ nr_dequeue = tunnel_block_dequeue(block,
ops, consq, &objects[nr_used], nr_left);
nr_used += nr_dequeue;
nr_left -= nr_dequeue;
@@ -879,6 +922,6 @@ int vnode_dequeue_burst(struct vnode_cons * cons, struct vnode_ops * ops,
assert(nr_left <= nr_max_objects);
assert(nr_used <= nr_max_objects);
- __read_unlock(&cons->vnode->rwlock);
return nr_dequeue;
-} \ No newline at end of file
+}
+
diff --git a/include/internal/mr_common.h b/include/internal/mr_common.h
index 3911a26..51c7b05 100644
--- a/include/internal/mr_common.h
+++ b/include/internal/mr_common.h
@@ -30,6 +30,14 @@ extern "C" {
#define MR_APP_MAX 64
#endif
+#ifndef MR_CONS_MAX
+#define MR_CONS_MAX 16
+#endif
+
+#ifndef MR_PROD_MAX
+#define MR_PROD_MAX 16
+#endif
+
#ifndef MR_SID_MAX
#define MR_SID_MAX 64
#endif
@@ -116,6 +124,14 @@ static inline const char * __str_errno()
return rte_strerror(rte_errno);
}
+#ifdef __GNUC__
+#define container_of(ptr, type, member) ({ \
+ const typeof(((type *)0)->member) *__mptr = (ptr); \
+ (type *)((char *)__mptr - offsetof(type,member));})
+#else
+#define container_of(ptr, type,member) \
+ ((type *)((char *)(ptr) - offsetof(type, member)))
+#endif
#ifdef __cplusplus
}
diff --git a/runtime/CMakeLists.txt b/runtime/CMakeLists.txt
index e2ad0c6..b681298 100644
--- a/runtime/CMakeLists.txt
+++ b/runtime/CMakeLists.txt
@@ -6,6 +6,7 @@ include_directories(${DPDK_INCLUDE_DIR})
add_definitions(${DPDK_C_PREDEFINED})
include_directories(include)
-add_library(mruntime src/app.c src/ctx.c src/runtime.c src/hwinfo.c src/id.c src/export.c)
+add_library(mruntime src/app.c src/ctx.c src/runtime.c src/hwinfo.c src/id.c
+ src/event.c src/export.c)
target_link_libraries(mruntime PRIVATE rt pthread dl)
target_include_directories(mruntime INTERFACE "${CMAKE_CURRENT_SOURCE_DIR}/include/") \ No newline at end of file
diff --git a/runtime/include/mr_runtime.h b/runtime/include/mr_runtime.h
index 94e27f6..7704e1a 100644
--- a/runtime/include/mr_runtime.h
+++ b/runtime/include/mr_runtime.h
@@ -61,4 +61,9 @@ unsigned int mr_appinfo_get_nr_max_thread(struct appinfo * appinfo);
void * mr_app_priv_get();
void mr_app_priv_set(void * ptr);
void * mr_thread_priv_get();
-void mr_thread_priv_set(void * ptr); \ No newline at end of file
+void mr_thread_priv_set(void * ptr);
+
+typedef void(*app_event_cb_t)(int app_event, const char * appsym,
+ struct appinfo * appinfo, void * arg);
+int mr_app_crash_cb_register(app_event_cb_t cb, void * arg);
+int mr_app_crash_raiser_register(const char * appsym); \ No newline at end of file
diff --git a/runtime/include/runtime.h b/runtime/include/runtime.h
index 744f9d0..5fcc08e 100644
--- a/runtime/include/runtime.h
+++ b/runtime/include/runtime.h
@@ -5,6 +5,8 @@
struct app_manager;
struct hwinfo_manager;
struct id_manager;
+struct app_ev_manager;
+struct appinfo;
#ifndef MR_RUNTIME_MAX_NR_EXIT_HANDLER
#define MR_RUNTIME_MAX_NR_EXIT_HANDLER 64
@@ -16,6 +18,7 @@ struct mr_runtime_ctx
struct app_manager * app_ctx;
struct hwinfo_manager * hwinfo_ctx;
struct id_manager * id_ctx;
+ struct app_ev_manager * ev_ctx;
};
extern struct app_manager * app_manager_create();
@@ -38,6 +41,9 @@ extern int app_manager_appinfo_unregister(struct app_manager * object);
extern int app_manager_thread_register(struct app_manager * object);
extern int app_manager_thread_unregister(struct app_manager * object);
extern int app_manager_appinfo_iterate(struct app_manager * object, struct appinfo ** appinfo);
+extern struct appinfo * app_mamager_appinfo_lookup(struct app_manager * object, const char * appsym);
+extern void app_mamager_crash_event_handler(int app_event, const char * appsym,
+ struct appinfo * appinfo, void * arg);
// ���������������
extern struct id_manager * id_manager_create();
@@ -51,4 +57,13 @@ extern int id_manager_release_gsid(struct id_manager * object, thread_id_t start
extern int mr_global_ctx_init();
extern int mr_global_ctx_slave_init();
extern int mr_global_cfg_init();
-extern int mr_global_cfg_slave_init(); \ No newline at end of file
+extern int mr_global_cfg_slave_init();
+
+// �������
+struct app_ev_manager * app_ev_manager_create(struct mr_runtime_ctx * rt_ctx);
+void app_ev_manager_destory(struct app_ev_manager * object);
+
+typedef void(*app_event_cb_t)(int app_event, const char * appsym, struct appinfo * appinfo, void * arg);
+int app_crash_cb_register(struct app_ev_manager * object, app_event_cb_t cb, void * arg);
+int app_crash_raiser_register(struct app_ev_manager * object, const char * appsym);
+void * app_thread_crash_monitor(void * arg); \ No newline at end of file
diff --git a/runtime/src/app.c b/runtime/src/app.c
index f7e2ac3..ea1980a 100644
--- a/runtime/src/app.c
+++ b/runtime/src/app.c
@@ -457,6 +457,21 @@ int app_manager_appinfo_unregister(struct app_manager * object)
return ret;
}
+#if 0
+typedef void(*app_event_cb_t)(int app_event, const char * appsym,
+ struct appinfo * appinfo, void * arg);
+#endif
+
+void app_mamager_crash_event_handler(int app_event, const char * appsym,
+ struct appinfo * appinfo, void * arg)
+{
+ struct app_manager * object = (struct app_manager *)arg;
+
+ mr_spin_lock(&object->lock);
+ __appinfo_delete_unsafe(object, appinfo);
+ mr_spin_unlock(&object->lock);
+}
+
int app_manager_thread_register(struct app_manager * object)
{
struct appinfo * pinfo = currect_app_info;
@@ -521,6 +536,16 @@ int app_manager_list_all_appinfo(struct app_manager * object,
return ret;
}
+struct appinfo * app_mamager_appinfo_lookup(struct app_manager * object,
+ const char * appsym)
+{
+ struct appinfo * result = NULL;
+ mr_spin_lock(&object->lock);
+ result = __appinfo_lookup_unsafe(object, appsym);
+ mr_spin_unlock(&object->lock);
+ return result;
+}
+
int app_manager_tinfo_iterate(struct appinfo * appinfo,
struct thread_info ** tinfo, int * iterate)
{
diff --git a/runtime/src/event.c b/runtime/src/event.c
new file mode 100644
index 0000000..adabe4e
--- /dev/null
+++ b/runtime/src/event.c
@@ -0,0 +1,444 @@
+/* \brief Ӧ�ó����쳣״̬������¼�֪ͨ
+ *
+ * ���Ӧ���쳣�˳�״̬����֪ͨ�����̸�ģ�����Ӧ��ʹ�õ���Դ
+ * TODO: �ع����δ��룬֧�ָ�����첽�ź�֪ͨ����
+ *
+ * \author Qiuwen Lu<[email protected]>
+ * \date 2016-12-09
+ */
+
+#include <errno.h>
+#include <string.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <assert.h>
+#include <rte_malloc.h>
+#include <rte_devargs.h>
+
+#include <mr_common.h>
+#include <mr_runtime.h>
+#include <runtime.h>
+
+TAILQ_HEAD(crash_ev_raiser_list, crash_ev_raiser);
+TAILQ_HEAD(crash_ev_cb_list, crash_ev_cb);
+
+/* �����¼�Դ */
+struct crash_ev_raiser
+{
+ TAILQ_ENTRY(crash_ev_raiser) next;
+ struct appinfo * appinfo;
+ char appsym[MR_SYMBOL_MAX];
+ int connect_fd;
+};
+
+/* �����¼��������� */
+struct crash_ev_cb
+{
+ TAILQ_ENTRY(crash_ev_cb) next;
+ app_event_cb_t cb;
+ void * arg;
+};
+
+/* �����¼�Դע��ָ�ͨ��TCP���䣩 */
+struct crash_raiser_reg_cmd
+{
+ /* ���� */
+ char symlen;
+ /* ���� */
+ char appsym[MR_SYMBOL_MAX];
+};
+
+struct app_ev_manager
+{
+#define __MAX_CRASH_CBS 64
+
+ /* Rt��� */
+ struct mr_runtime_ctx * rt_ctx;
+ /* Ӧ�ñ����¼�Դ */
+ struct crash_ev_raiser_list crash_ev_raiser_list;
+ /* Ӧ�ñ����¼������ص����� */
+ struct crash_ev_cb_list crash_ev_cb_list;
+ /* Ӧ�ñ�������ַ */
+ struct sockaddr_in crash_sockaddr;
+};
+
+struct app_ev_manager * app_ev_manager_create(struct mr_runtime_ctx * rt_ctx)
+{
+ struct app_ev_manager * object = rte_zmalloc(NULL, sizeof(struct app_ev_manager), 0);
+ MR_CHECK(object != NULL, "Cannot alloc memory for app event manager");
+
+ object->rt_ctx = rt_ctx;
+ TAILQ_INIT(&object->crash_ev_cb_list);
+ TAILQ_INIT(&object->crash_ev_raiser_list);
+ return object;
+}
+
+void app_ev_manager_destory(struct app_ev_manager * object)
+{
+ rte_free(object);
+}
+
+/* ע������������� */
+int app_crash_cb_register(struct app_ev_manager * object, app_event_cb_t cb,
+ void * arg)
+{
+ struct crash_ev_cb * ev_cb = rte_zmalloc(NULL, sizeof(struct crash_ev_cb), 0);
+ MR_CHECK(ev_cb != NULL, "Cannot alloc memory for crash_ev_cb");
+
+ ev_cb->cb = cb;
+ ev_cb->arg = arg;
+ TAILQ_INSERT_TAIL(&object->crash_ev_cb_list, ev_cb, next);
+ return 0;
+}
+
+/* ע������¼�Դ */
+int app_crash_raiser_register(struct app_ev_manager * object, const char * appsym)
+{
+ int conn_fd = socket(AF_INET, SOCK_STREAM, 0);
+ if (unlikely(conn_fd < 0))
+ {
+ MR_LOG(ERR, BASE, "Cannot create socket for crash raiser register : %s. \n",
+ strerror(errno));
+ return -1;
+ }
+
+ int ret = connect(conn_fd, (const struct sockaddr *)&object->crash_sockaddr,
+ sizeof(object->crash_sockaddr));
+
+ if (unlikely(ret < 0))
+ {
+ MR_LOG(ERR, BASE, "Connect to crash monitor failed : %s", strerror(errno));
+ return -2;
+ }
+
+ struct crash_raiser_reg_cmd reg_cmd;
+ snprintf(reg_cmd.appsym, sizeof(reg_cmd.appsym), "%s", appsym);
+ reg_cmd.symlen = strlen(reg_cmd.appsym);
+
+ do {
+ ret = send(conn_fd, &reg_cmd, sizeof(reg_cmd), MSG_NOSIGNAL);
+ } while (ret == -1 && (errno == EINTR));
+
+ assert(ret == sizeof(reg_cmd));
+ return 0;
+}
+
+/* �¼���������ע��Ӧ�� */
+static int app_crash_handle_new_raiser(struct app_ev_manager * object,
+ const char * appsym, int conn_fd)
+{
+ struct mr_runtime_ctx * rt_ctx = object->rt_ctx;
+ struct app_manager * app_manager = rt_ctx->app_ctx;
+
+ struct appinfo * appinfo = app_mamager_appinfo_lookup(app_manager, appsym);
+ if (appinfo == NULL)
+ {
+ MR_LOG(ERR, BASE, "Application %s does not existed, "
+ "cannot monitor its crash state. \n", appsym);
+ return -1;
+ }
+
+ struct crash_ev_raiser * raiser;
+
+#ifndef _NDEBUG
+ TAILQ_FOREACH(raiser, &object->crash_ev_raiser_list, next)
+ {
+ assert(strcmp(raiser->appsym, appsym) != 0);
+ }
+#endif
+
+ raiser = rte_zmalloc(NULL, sizeof(struct crash_ev_raiser), 0);
+ MR_CHECK(raiser != NULL, "Cannot alloc memory for crash raiser. ");
+
+ snprintf(raiser->appsym, sizeof(raiser->appsym), "%s", appsym);
+ raiser->appinfo = appinfo;
+ raiser->connect_fd = conn_fd;
+ TAILQ_INSERT_TAIL(&object->crash_ev_raiser_list, raiser, next);
+
+ MR_LOG(INFO, BASE, "Crash raiser from application %s registed. \n", appsym);
+ return 0;
+}
+
+/* �¼�������Ӧ�ñ��� */
+static int app_crash_handle_crash_raiser(struct app_ev_manager * object,
+ struct crash_ev_raiser * raiser)
+{
+ MR_LOG(INFO, BASE, "Crash raise from applicaion %s. \n", raiser->appsym);
+
+ struct crash_ev_cb * cb;
+ TAILQ_FOREACH(cb, &object->crash_ev_cb_list, next)
+ {
+ cb->cb(0, raiser->appsym, raiser->appinfo, cb->arg);
+ }
+
+ TAILQ_REMOVE(&object->crash_ev_raiser_list, raiser, next);
+ rte_free(raiser);
+ return 0;
+}
+
+#define __LISTEN_PORT_START 1024
+#define __LISTEN_PORT_END 65535
+#define __LISTEN_BACKEND 16
+
+static int __create_monitor_fd(struct sockaddr_in * sockaddr_in)
+{
+ int try_times_max = 36;
+ int success = 0;
+
+ // �������ض˿ڣ�TCP����
+ int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
+ if(unlikely(listen_fd < 0))
+ {
+ MR_LOG(ERR, BASE, "Create crash monitor listen fd failed : %s. \n",
+ strerror(errno));
+ goto out;
+ }
+
+ int reuse = 1;
+ if (unlikely(setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR,
+ (const void*)&reuse, sizeof(int)) < 0))
+ {
+ MR_LOG(ERR, BASE, "Set crash monitor listen fd reuse failed : %s\n",
+ strerror(errno));
+ goto out;
+ }
+
+ while (try_times_max >= 0)
+ {
+ unsigned int random_port;
+
+ // ���ѡ��һ�������˿ڣ����ؼ���
+ random_port = rand() % (__LISTEN_PORT_END - __LISTEN_PORT_START) + __LISTEN_PORT_START;
+ assert(random_port >= __LISTEN_PORT_START && random_port <= __LISTEN_PORT_END);
+
+ memset(sockaddr_in, 0, sizeof(struct sockaddr_in));
+ sockaddr_in->sin_family = AF_INET;
+ sockaddr_in->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ sockaddr_in->sin_port = htons(random_port);
+
+ int ret = bind(listen_fd, (struct sockaddr *)sockaddr_in,
+ sizeof(struct sockaddr_in));
+
+ // �˿ڱ�ռ�ã�����ѡ��һ�����õĶ˿�
+ if (ret < 0 && errno == EADDRINUSE)
+ {
+ try_times_max--;
+ continue;
+ }
+
+ // ��������
+ else if (ret < 0)
+ {
+ MR_LOG(ERR, BASE, "Bind crash monitor listen fd(PORT=%d) failed : %s.\n",
+ random_port, strerror(errno));
+ goto out;
+ }
+
+ ret = listen(listen_fd, 20);
+ if(ret < 0)
+ {
+ MR_LOG(ERR, BASE, "Listen crash monitor listen fd(PORT=%d) failed : %s.\n",
+ random_port, strerror(errno));
+ goto out;
+ }
+
+ success = 1;
+ break;
+ }
+
+ if(success)
+ {
+ MR_LOG(INFO, BASE, "Crash monitor listen tcp fd(port=%d) create success.\n",
+ ntohs(sockaddr_in->sin_port));
+ return listen_fd;
+ }
+
+out:
+ if (listen_fd > 0) close(listen_fd);
+ return -1;
+}
+
+#define __EV_MAX_EVENTS 16
+#define __EV_MAX_BUFFER 2048
+
+
+// ���¹��ߺ�����MESATCP�н�ȡ
+static int __tcp_setnonblock(int sockfd)
+{
+ int flags = fcntl(sockfd, F_GETFL, 0);
+ if (flags == -1) return -1;
+ int ret = fcntl(sockfd, F_SETFL, flags | O_NONBLOCK, 0);
+ if (ret == -1) return -1;
+ return 0;
+}
+
+static int __update_epoll_event(int epoll_fd, int epoll_mode, int fd, int epoll_type)
+{
+ struct epoll_event event = { 0 };
+ event.data.fd = fd;
+ event.events = epoll_type;
+
+ __tcp_setnonblock(fd);
+ if (-1 == epoll_ctl(epoll_fd, epoll_mode, fd, &event))
+ {
+ MR_LOG(ERR, BASE, "update epoll event failed : %s\n",
+ strerror(errno)); return -1;
+ }
+
+ return 0;
+}
+
+int __epoll_add_event(int epoll_fd, int sd, int epoll_type)
+{
+ return __update_epoll_event(epoll_fd, EPOLL_CTL_ADD, sd, epoll_type);
+}
+
+int __epoll_del_event(int epoll_fd, int sd, int epoll_type)
+{
+ return __update_epoll_event(epoll_fd, EPOLL_CTL_DEL, sd, epoll_type);
+}
+
+int __epoll_mod_event(int epoll_fd, int sd, int epoll_type)
+{
+ return __update_epoll_event(epoll_fd, EPOLL_CTL_MOD, sd, epoll_type);
+}
+
+/* �����½��ļ������� */
+void __handle_accept(struct app_ev_manager * object, int epoll_fd, int listen_fd)
+{
+ struct sockaddr_in remote_addr;
+ socklen_t sz_remote_addr = sizeof(remote_addr);
+
+ int conn_fd = accept(listen_fd, (struct sockaddr *)&remote_addr, &sz_remote_addr);
+ if(conn_fd < 0)
+ {
+ MR_LOG(ERR, BASE, "Accept remote connnection failed : %s", strerror(errno));
+ return;
+ }
+
+ __epoll_add_event(epoll_fd, conn_fd, EPOLLIN);
+ return;
+}
+
+// �������ӹر���Ϣ�����ӹرմ���Ӧ�ñ���
+void __handle_close(struct app_ev_manager * object, int epoll_fd, int fd)
+{
+ struct crash_ev_raiser * raiser_iter;
+ TAILQ_FOREACH(raiser_iter, &object->crash_ev_raiser_list, next)
+ {
+ if (raiser_iter->connect_fd != fd) continue;
+ app_crash_handle_crash_raiser(object, raiser_iter);
+ }
+
+ __epoll_del_event(epoll_fd, fd, EPOLLIN);
+ close(fd);
+ return;
+}
+
+/* �����������ӵ�������� */
+void __handle_read(struct app_ev_manager * object, int epoll_fd, int fd)
+{
+ // ���뻺��������������С����ָ���
+ char __read_buffer[sizeof(struct crash_raiser_reg_cmd)];
+ ssize_t __read_buffer_used = 0;
+ ssize_t ret = 0;
+
+ while ((ret = read(fd, __read_buffer + __read_buffer_used,
+ sizeof(__read_buffer) - __read_buffer_used)) > 0)
+ {
+ __read_buffer_used += ret;
+ }
+
+ // ���ݶ���������ָ��
+ if(__read_buffer_used == sizeof(struct crash_raiser_reg_cmd))
+ {
+ struct crash_raiser_reg_cmd * cmd = (struct crash_raiser_reg_cmd *)__read_buffer;
+ if (cmd->symlen != strnlen(cmd->appsym, sizeof(cmd->appsym))) return;
+ app_crash_handle_new_raiser(object, cmd->appsym, fd);
+ return;
+ }
+
+ if(ret < 0 && (errno == EWOULDBLOCK || errno == EAGAIN))
+ {
+ return;
+ }
+
+ if (ret == 0)
+ {
+ __handle_close(object, epoll_fd, fd);
+ return;
+ }
+
+ //TODO: ���������������
+ assert(0);
+ return;
+}
+
+/* ��������߳� */
+//TODO: ���Epoll��ͨ�Ż��ƣ������Libevent��
+
+void * app_thread_crash_monitor(void * arg)
+{
+ struct app_ev_manager * object = (struct app_ev_manager *)arg;
+ pthread_detach(pthread_self());
+
+ int listen_fd = __create_monitor_fd(&object->crash_sockaddr);
+ if(listen_fd < 0)
+ {
+ MR_LOG(ERR, BASE, "Launch crash monitor thread failed : "
+ "Cannot create listen fd. \n"); goto errout;
+ }
+
+ int epoll_fd = epoll_create(16);
+ if(epoll_fd < 0)
+ {
+ MR_LOG(ERR, BASE, "Launch crash monitor thread failed : "
+ "Cannot create epoll fd. \n"); goto errout;
+ }
+
+ struct epoll_event evlist[__EV_MAX_EVENTS];
+ struct epoll_event ev;
+
+ ev.events = EPOLLIN;
+ ev.data.fd = listen_fd;
+ if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev) < 0)
+ {
+ MR_LOG(ERR, BASE, "Crash monitor thread failed : "
+ "Add listenfd to epoll list error : %s \n", strerror(errno));
+ goto errout;
+ }
+
+ while(1)
+ {
+ int ret = epoll_wait(epoll_fd, evlist, __EV_MAX_EVENTS, 0);
+ if (ret == -1 && errno == EINTR) continue;
+ else if(ret == -1)
+ {
+ MR_LOG(ERR, BASE, "Crash monitor thread failed : "
+ "waiting on epoll fd : %s", strerror(errno));
+ goto errout;
+ }
+
+ for(int i = 0; i < ret; i++)
+ {
+ int fd = evlist[i].data.fd;
+ if(evlist[i].events & EPOLLIN)
+ {
+ if (fd == listen_fd) __handle_accept(object, epoll_fd, fd);
+ else __handle_read(object, epoll_fd, fd);
+ continue;
+ }
+
+ if(evlist[i].events & ( EPOLLHUP | EPOLLERR))
+ {
+ __handle_close(object, epoll_fd, fd);
+ }
+ }
+ }
+
+errout:
+ MR_LOG(ERR, BASE, "Crash monitor thread is exiting. \n");
+ return (void *)NULL;
+} \ No newline at end of file
diff --git a/runtime/src/export.c b/runtime/src/export.c
index 6a145c3..a4b755a 100644
--- a/runtime/src/export.c
+++ b/runtime/src/export.c
@@ -103,3 +103,18 @@ int mr_id_manager_release_gsid(thread_id_t start_gsid,
struct mr_runtime_ctx * rt_ctx = (struct mr_runtime_ctx *)g_ctx->ctx_runtime;
return id_manager_release_gsid(rt_ctx->id_ctx, start_gsid, nr_thread);
}
+
+// ������⵼������
+int mr_app_crash_cb_register(app_event_cb_t cb, void * arg)
+{
+ struct mr_global_ctx * g_ctx = mr_global_ctx_get();
+ struct mr_runtime_ctx * rt_ctx = (struct mr_runtime_ctx *)g_ctx->ctx_runtime;
+ return app_crash_cb_register(rt_ctx->ev_ctx, cb, arg);
+}
+
+int mr_app_crash_raiser_register(const char * appsym)
+{
+ struct mr_global_ctx * g_ctx = mr_global_ctx_get();
+ struct mr_runtime_ctx * rt_ctx = (struct mr_runtime_ctx *)g_ctx->ctx_runtime;
+ return app_crash_raiser_register(rt_ctx->ev_ctx, appsym);
+} \ No newline at end of file
diff --git a/runtime/src/runtime.c b/runtime/src/runtime.c
index 37bec21..88e912e 100644
--- a/runtime/src/runtime.c
+++ b/runtime/src/runtime.c
@@ -43,6 +43,14 @@ void mr_runtime_init()
rt_ctx->id_ctx = id_manager_create();
MR_CHECK(rt_ctx->id_ctx != NULL, "RuntimeInit, ID Manager initialize failed. ");
+
+ rt_ctx->ev_ctx = app_ev_manager_create(rt_ctx);
+ MR_CHECK(rt_ctx->ev_ctx != NULL, "RuntimeInit, App event initialize failed. ");
+
+ pthread_t pid_crash_monitor;
+ pthread_create(&pid_crash_monitor, NULL, app_thread_crash_monitor, (void *)rt_ctx->ev_ctx);
+
+ app_crash_cb_register(rt_ctx->ev_ctx, app_mamager_crash_event_handler, rt_ctx->app_ctx);
g_ctx->ctx_runtime = rt_ctx;
return;
diff --git a/service/src/rxtx.c b/service/src/rxtx.c
index d52df25..5a5a581 100644
--- a/service/src/rxtx.c
+++ b/service/src/rxtx.c
@@ -26,7 +26,7 @@ int rt_serv_device_rx_bulk(struct rtdev_desc * dev_desc, thread_id_t sid,
int ret = vnode_enqueue_burst_with_hash(dev_desc->vnode_prod_rx,
&dev_desc->ops, sid, (void **)mbufs_in, hash, nr_mbufs_in);
- if (likely(ret < 0)) return 0;
+ if (unlikely(ret < 0)) return 0;
for (int i = 0; i < nr_mbufs_in; i++) mbufs_in[i] = NULL;
return nr_mbufs_in;
}