diff options
| author | Qiuwen Lu <[email protected]> | 2016-12-13 15:05:00 +0800 |
|---|---|---|
| committer | Qiuwen Lu <[email protected]> | 2016-12-13 15:05:00 +0800 |
| commit | 797b8fcb516d64edabbe11033a352f2df1c09457 (patch) | |
| tree | 19e114e4b5fb33f91f87b1101a61d72b86f4a69b | |
| parent | 0bd00dfe4d5f01ebda9ebba8a1d889db40a05ce3 (diff) | |
| parent | 43cc4a9ddebc27c9c4e584fcc75424a890bf00e5 (diff) | |
Merge branch 'rel-4.X.X' of 10.0.6.226:mesa_platform/marsio into rel-4.X.Xv4.0.9-20161213
| -rw-r--r-- | app/src/marsio.c | 12 | ||||
| -rw-r--r-- | core/include/mr_core.h | 4 | ||||
| -rw-r--r-- | core/include/mr_rtdev.h | 9 | ||||
| -rw-r--r-- | core/include/mr_vnode.h | 13 | ||||
| -rw-r--r-- | core/src/core.c | 10 | ||||
| -rw-r--r-- | core/src/rtdev.c | 91 | ||||
| -rw-r--r-- | core/src/vman.c | 472 | ||||
| -rw-r--r-- | core/src/vnode.c | 339 | ||||
| -rw-r--r-- | include/internal/mr_common.h | 16 | ||||
| -rw-r--r-- | runtime/CMakeLists.txt | 3 | ||||
| -rw-r--r-- | runtime/include/mr_runtime.h | 7 | ||||
| -rw-r--r-- | runtime/include/runtime.h | 17 | ||||
| -rw-r--r-- | runtime/src/app.c | 25 | ||||
| -rw-r--r-- | runtime/src/event.c | 444 | ||||
| -rw-r--r-- | runtime/src/export.c | 15 | ||||
| -rw-r--r-- | runtime/src/runtime.c | 8 | ||||
| -rw-r--r-- | service/src/rxtx.c | 2 |
17 files changed, 829 insertions, 658 deletions
diff --git a/app/src/marsio.c b/app/src/marsio.c index 3c3a875..2b25cb4 100644 --- a/app/src/marsio.c +++ b/app/src/marsio.c @@ -154,12 +154,12 @@ static int mrapp_init(const char * appsym, struct mrapp_config * config) { // У����� int ret = mrapp_config_check(appsym, config); - if(ret < 0) + if (ret < 0) { MR_LOG(ERR, MRLIB, "Marsio config check failed. \n"); return ret; } - + // Ӧ��ע�� ret = mr_app_register(appsym, config->coremask, config->autoexit); if (ret < 0) @@ -168,6 +168,14 @@ static int mrapp_init(const char * appsym, struct mrapp_config * config) return -1; } + // ������� + ret = mr_app_crash_raiser_register(appsym); + if(ret < 0) + { + MR_LOG(ERR, MRLIB, "Cannot register application %s crash monitor. \n", appsym); + return -2; + } + MR_LOG(INFO, MRLIB, "Application %s register success. \n", appsym); // ������̵�Privָ�벻Ϊ�գ�˵����ǰ���й����ָ������� diff --git a/core/include/mr_core.h b/core/include/mr_core.h index 7d96417..6bba16f 100644 --- a/core/include/mr_core.h +++ b/core/include/mr_core.h @@ -1,5 +1,7 @@ #pragma once +#include <sys/queue.h> + // ǰ������������ͷ�ļ���ѭ������ struct mr_global_config; struct devman_ctx; @@ -8,6 +10,8 @@ struct vnodeman_ctx; struct mrb_pool_t; struct rtdev_desc_list; +TAILQ_HEAD(rtdev_desc_list, rtdev_desc); + // Core��ʵ���ṹ�壨ȫ�֣� struct mr_core_instance { diff --git a/core/include/mr_rtdev.h b/core/include/mr_rtdev.h index bd08a26..52b2758 100644 --- a/core/include/mr_rtdev.h +++ b/core/include/mr_rtdev.h @@ -7,7 +7,6 @@ * */ - #include <rte_mempool.h> #include <mr_core.h> #include <mr_vnode.h> @@ -71,8 +70,6 @@ struct rtdev_stat_info uint64_t ftx_missed[MR_SID_MAX]; }; -TAILQ_HEAD(rtdev_desc_list, rtdev_desc); - // ����һ������ʱ�����豸 struct rtdev_desc * mr_rtdev_create(struct mr_core_instance * instance, const char * devsym, struct rte_mempool * pool, unsigned int nr_serv_thread, unsigned int sz_tunnel, unsigned int sz_buffer); @@ -88,4 +85,8 @@ int mr_rt_device_close(struct rtdev_app_desc * desc); // ����ʱ�豸��Ϣͳ�� int mr_rtdev_stats_get(struct rtdev_desc * dev_desc, struct rtdev_stat_info * stat_info); -int mr_rtdev_app_stats_get(struct rtdev_app_desc * app_desc, struct rtdev_stat_info * stat_info);
\ No newline at end of file +int mr_rtdev_app_stats_get(struct rtdev_app_desc * app_desc, struct rtdev_stat_info * stat_info); + +#include <mr_runtime.h> +void rt_dev_app_crash_event_handler(int app_event, const char * appsym, + struct appinfo * appinfo, void * arg);
\ No newline at end of file diff --git a/core/include/mr_vnode.h b/core/include/mr_vnode.h index f650102..b6a9b03 100644 --- a/core/include/mr_vnode.h +++ b/core/include/mr_vnode.h @@ -56,9 +56,9 @@ struct vnode_cons TAILQ_ENTRY(vnode_cons) next; char symbol[MR_SYMBOL_MAX]; struct vnode * vnode; - struct vnode_consq * consq; unsigned int nr_consq; - struct tunnel_block_list block_list; + int max_idx; + struct tunnel_block* block_list[MR_PROD_MAX]; unsigned int nr_block; unsigned int cur_attach; struct vnode_cons_stat stat[MR_SID_MAX]; @@ -70,12 +70,13 @@ struct vnode_prod TAILQ_ENTRY(vnode_prod) next; char symbol[MR_SYMBOL_MAX]; struct vnode * vnode; - struct vnode_prodq * prodq; unsigned int nr_prodq; - struct tunnel_block_list block_list; + int max_idx; + struct tunnel_block* block_list[MR_CONS_MAX]; unsigned int nr_block; unsigned int cur_attach; struct vnode_prod_stat stat[MR_SID_MAX]; + }; /* Virtual Data-Node Consumer Queue Structure */ @@ -113,7 +114,7 @@ struct vnode /* Tunnel Enqueue Buffer Size */ unsigned int sz_tunnel_buffer; /* Read-Write Lock */ - mr_spin_rwlock_t rwlock; + mr_spin_rwlock_t rwlock; //guarantees one operator(consumer or producer, create or destroy) a time }; /* Virtual Data-Node Operation Callback Structure @@ -173,4 +174,4 @@ int vnode_prod_attach(struct vnode * node, struct vnode_prod* prod); #ifdef __cplusplus } -#endif
\ No newline at end of file +#endif diff --git a/core/src/core.c b/core/src/core.c index 20e7dde..2067080 100644 --- a/core/src/core.c +++ b/core/src/core.c @@ -10,6 +10,8 @@ #include <rte_ip_frag.h> #include <mr_core.h> #include "mr_buffer.h" +#include "runtime.h" +#include "mr_rtdev.h" // �ⲿ�����������豸������ extern int devman_init(struct mr_core_instance * core_instance); @@ -54,8 +56,14 @@ struct mr_core_instance * mr_core_instance_create(struct mr_core_config* config) struct mrb_pool_t * pool = MRB_pool_handle_create(); MR_CHECK(pool != NULL, "Failed at MRB Pool create. "); - instance->mrb_pool_handle = pool; + + instance->rtdev_list = rte_zmalloc(NULL, sizeof(struct rtdev_desc_list), 0); + MR_CHECK(instance->rtdev_list != NULL, "Cannot alloc memory for Rt-device list"); + TAILQ_INIT(instance->rtdev_list); + + // ע������������� + mr_app_crash_cb_register(rt_dev_app_crash_event_handler, instance); return instance; } diff --git a/core/src/rtdev.c b/core/src/rtdev.c index d7cd139..bd207bf 100644 --- a/core/src/rtdev.c +++ b/core/src/rtdev.c @@ -132,13 +132,6 @@ int mr_rtdev_stats_get(struct rtdev_desc * dev_desc, struct rtdev_stat_info * st struct rtdev_desc * mr_rtdev_create(struct mr_core_instance * instance, const char * devsym, struct rte_mempool * pool, unsigned int nr_serv_thread, unsigned int sz_tunnel, unsigned int sz_buffer) { - // �б���û�г�ʼ������������ʱ�б���ʼ�� - if(instance->rtdev_list == NULL) - { - instance->rtdev_list = rte_zmalloc(NULL, sizeof(struct rtdev_desc_list), 0); - TAILQ_INIT(instance->rtdev_list); - } - // �����������ڴ� struct rtdev_desc * desc = rte_zmalloc(NULL, sizeof(struct rtdev_desc), 0); MR_CHECK(desc != NULL, "Cannot alloc memory for rtdev-desc %s", devsym); @@ -242,20 +235,30 @@ struct rtdev_app_desc * mr_rt_device_open(struct mr_core_instance * instance, // ΪRX����ע�������� char vnodesym[MR_SYMBOL_MAX]; - snprintf(vnodesym, sizeof(vnodesym), __PATTERN_CONS_APP, appsym); - desc->vnode_cons_rx = vnode_create_cons(dev_desc->vnode_rx, &desc->ops, vnodesym, nr_rx_stream); - if (desc->vnode_cons_rx == NULL) goto prod_cons_create_failed; + if (nr_rx_stream > 0) + { + snprintf(vnodesym, sizeof(vnodesym), __PATTERN_CONS_APP, appsym); + desc->vnode_cons_rx = vnode_create_cons(dev_desc->vnode_rx, &desc->ops, vnodesym, nr_rx_stream); + if (desc->vnode_cons_rx == NULL) goto prod_cons_create_failed; + } + // ΪTXע�������� - snprintf(vnodesym, sizeof(vnodesym), __PATTERN_CONS_APP, appsym); - desc->vnode_prod_tx = vnode_create_prod(dev_desc->vnode_tx, &desc->ops, vnodesym, nr_tx_stream); - if (desc->vnode_prod_tx == NULL) goto prod_cons_create_failed; - + if (nr_tx_stream > 0) + { + snprintf(vnodesym, sizeof(vnodesym), __PATTERN_CONS_APP, appsym); + desc->vnode_prod_tx = vnode_create_prod(dev_desc->vnode_tx, &desc->ops, vnodesym, nr_tx_stream); + if (desc->vnode_prod_tx == NULL) goto prod_cons_create_failed; + } + // ΪFTXע�������� - snprintf(vnodesym, sizeof(vnodesym), __PATTERN_CONS_APP, appsym); - desc->vnode_prod_ftx = vnode_create_prod(dev_desc->vnode_ftx, &desc->ops, vnodesym, nr_tx_stream); - if (desc->vnode_prod_ftx == NULL) goto prod_cons_create_failed; - + if (nr_tx_stream > 0) + { + snprintf(vnodesym, sizeof(vnodesym), __PATTERN_CONS_APP, appsym); + desc->vnode_prod_ftx = vnode_create_prod(dev_desc->vnode_ftx, &desc->ops, vnodesym, nr_tx_stream); + if (desc->vnode_prod_ftx == NULL) goto prod_cons_create_failed; + } + // ����ע���б� rte_rwlock_write_lock(&dev_desc->app_desc_lock); TAILQ_INSERT_TAIL(&dev_desc->app_desc_list, desc, next); @@ -278,13 +281,38 @@ clean: return NULL; } +int mr_rt_device_iterate(struct mr_core_instance * instance, struct rtdev_desc ** out_dev_desc) +{ + // ������Ϊ�գ���ͷ��ʼ������������ҵ�������һ������ + if (*out_dev_desc == NULL) + { + *out_dev_desc = TAILQ_FIRST(instance->rtdev_list); + return 0; + } + else + { + *out_dev_desc = TAILQ_NEXT(*out_dev_desc, next); + } + + // ������β�������ش����� + if (*out_dev_desc == NULL) return -ENOENT; + return 0; +} + // �ر��豸 int mr_rt_device_close(struct rtdev_app_desc * desc) { struct rtdev_desc * dev_desc = desc->dev_desc; - if (desc->vnode_cons_rx != NULL) vnode_delete_cons(desc->vnode_cons_rx, &desc->ops); - if (desc->vnode_prod_tx != NULL) vnode_delete_prod(desc->vnode_prod_tx, &desc->ops); - if (desc->vnode_prod_ftx != NULL) vnode_delete_prod(desc->vnode_prod_ftx, &desc->ops); + + // �����������̵��ã�����ָ��ָ��ĺ�����ַ��ͬ���������һ�� + struct vnode_ops _ops; + _ops = _rtdev_vnode_ops; + _ops.pool_dup_object = dev_desc->ops.pool_dup_object; + _ops.pool_new_object = dev_desc->ops.pool_new_object; + + if (desc->vnode_cons_rx != NULL) vnode_delete_cons(desc->vnode_cons_rx, &_ops); + if (desc->vnode_prod_tx != NULL) vnode_delete_prod(desc->vnode_prod_tx, &_ops); + if (desc->vnode_prod_ftx != NULL) vnode_delete_prod(desc->vnode_prod_ftx, &_ops); rte_rwlock_write_lock(&dev_desc->app_desc_lock); TAILQ_REMOVE(&dev_desc->app_desc_list, desc, next); @@ -292,4 +320,25 @@ int mr_rt_device_close(struct rtdev_app_desc * desc) rte_free(desc); return 0; +} + +// Ӧ�ñ��������ص����� +void rt_dev_app_crash_event_handler(int app_event, const char * appsym, + struct appinfo * appinfo, void * arg) +{ + struct mr_core_instance * core_instance = (struct mr_core_instance *)arg; + assert(core_instance != NULL); + + struct rtdev_desc * dev_desc_iterate = NULL; + while(mr_rt_device_iterate(core_instance, &dev_desc_iterate) >= 0) + { + struct rtdev_app_desc * app_desc = mr_rtdev_app_lookup( + dev_desc_iterate, appsym); + if (app_desc == NULL) continue; + mr_rt_device_close(app_desc); + MR_LOG(INFO, BASE, "RT-device %s from applicaton %s closed. \n", + dev_desc_iterate->symbol, appsym); + } + + return; }
\ No newline at end of file diff --git a/core/src/vman.c b/core/src/vman.c deleted file mode 100644 index 243727b..0000000 --- a/core/src/vman.c +++ /dev/null @@ -1,472 +0,0 @@ -/* \brief VNodeManager VNode管理器 - * - * 用于对系统中存在的VNode节点进行管理,避免每个进程管理 - * 自己的VNode的节点。 - * - * Author : Qiuwen Lu<[email protected]> - * Date : 2016-09-01 - * - */ - -#include <stdlib.h> -#include <sys/queue.h> -#include <assert.h> -#include <string.h> -#include <rte_log.h> -#include <rte_malloc.h> -#include <rte_mbuf.h> - -#include <mr_common.h> -#include <mr_vnode.h> -#include <mr_vman.h> -#include <mr_core.h> - -struct vnode_record -{ - TAILQ_ENTRY(vnode_record) next; - char sym[MR_SYMBOL_MAX]; - struct vnode * node; -}; - -TAILQ_HEAD(vnode_record_list, vnode_record); - -struct vnodeman_ctx -{ - mr_spinlock_t lock; - struct vnode_record_list vnode_list; -}; - -static void __wrap_object_delete(void * object) -{ - return rte_pktmbuf_free(object); -} - -//TODO: 报文缓冲区池决定机制 -static void * __wrap_object_new(void * object) -{ - return rte_pktmbuf_alloc(mrbuf_get_mempool(0)); -} - -static void * __wrap_object_dup(void * object) -{ - return rte_pktmbuf_clone(object, mrbuf_get_mempool(0)); -} - -// VNode操作指针 -struct vnode_ops vnodeman_ops = -{ - .op_object_delete = __wrap_object_delete, - .op_object_new = __wrap_object_new, - .op_object_dup = __wrap_object_dup -}; - -struct vnode_record * __vnodeman_lookup_unsafe(struct vnodeman_ctx * ctx_, - const char * sym) -{ - struct vnode_record * node_record_ret = NULL; - struct vnode_record * node_record_iter = NULL; - - TAILQ_FOREACH(node_record_iter, &ctx_->vnode_list, next) - { - if(strncmp(sym, node_record_iter->sym, sizeof(node_record_iter->sym)) == 0) - { - node_record_ret = node_record_iter; - break; - } - } - return node_record_ret; -} - -/** - * \brief 创建VNode操作,内部实现,非线程安全 - * \param ctx_ VNodeman内部句柄 - * \param sym VNode的名称 - * \param sz_tunnel VNode队列长度 - * \param sz_buffer 队列缓冲区长度 - * \param flags 预留,现在没有作用 - * \return 是否成功,>=0成功,<0失败。 - */ -static int __vnodeman_create_unsafe(struct vnodeman_ctx * ctx_, const char * sym, - size_t sz_tunnel, size_t sz_buffer, flags_t flags) -{ - struct vnode * node; - struct vnode_record * node_record; - - // 查找列表,查看注册的名称是否存在 - node_record = __vnodeman_lookup_unsafe(ctx_, sym); - if(node_record != NULL) - { - MR_LOG(INFO, BASE, "VNodeMan, VNodeManCreate, " - "VNode %s existed, failed.\n", sym); - return -1; - } - - // 创建vnode节点 - node = vnode_create(sym, &vnodeman_ops, sz_tunnel, sz_buffer); - if(node == NULL) - { - MR_LOG(INFO, BASE, "VNodeMan, VNodeManCreate, " - "Cannot create vnode, sym=%s, sz_tunnel=%zu, sz_buffer=%zu\n", - sym, sz_tunnel, sz_buffer); - return -2; - } - - // 创建vnode_record链表节点 - node_record = rte_zmalloc(NULL, sizeof(struct vnode_record), 0); - if(node_record == NULL) - { - MR_LOG(INFO, BASE, "VNodeMan, VNodeManCreate, " - "Cannot alloc memory for vnode record, failed.\n"); - return -3; - } - - snprintf(node_record->sym, sizeof(node_record->sym), "%s", sym); - node_record->node = node; - - // 创建成功后,插入ctx中的vnode_record链表 - TAILQ_INSERT_TAIL(&ctx_->vnode_list, node_record, next); - - MR_LOG(INFO, BASE, "VNodeMan, VNodeCreate, " - "Vnode %s(sz_tunnel=%zu, sz_buffer=%zu) created. \n", - node_record->sym, sz_tunnel, sz_buffer); - - return 0; -} - -static int __vnodeman_unregister_consumer_unsafe(struct vnodeman_ctx * ctx_, - const char * vnode_sym, const char * cons_sym) -{ - struct vnode_record * vnode_record = NULL; - struct vnode_cons * cons = NULL; - int ret = 0; - - vnode_record = __vnodeman_lookup_unsafe(ctx_, vnode_sym); - if (vnode_record == NULL) - { - MR_LOG(WARNING, BASE, "VNodeMan, VNodeManUnregisterConsumer, " - "Vnode %s does not existed, failed. \n", vnode_sym); - ret = -1; goto errout; - } - - cons = vnode_cons_lookup(vnode_record->node, cons_sym); - if (cons == NULL) - { - MR_LOG(WARNING, BASE, "VNodeMan, VNodeManUnregisterConsumer, " - "Vnode %s consumer %s does not existed, failed. \n", - vnode_sym, cons_sym); - ret = -2; goto errout; - } - - ret = vnode_delete_cons(cons, &vnodeman_ops); - if(ret < 0) - { - MR_LOG(WARNING, BASE, "VNodeMan, VNodeManUnregisterConsumer, " - "Cannot delete cons %s for vnode %s, failed. \n", - cons_sym, vnode_sym); - ret = -3; goto errout; - } - - return 0; -errout: - return ret; -} - -static int __vnodeman_unregister_producer_unsafe(struct vnodeman_ctx * ctx_, - const char * vnode_sym, const char * prod_sym) -{ - struct vnode_record * vnode_record = NULL; - struct vnode_prod * prod = NULL; - int ret = 0; - - vnode_record = __vnodeman_lookup_unsafe(ctx_, vnode_sym); - if (vnode_record == NULL) - { - MR_LOG(WARNING, BASE, "VNodeMan, VNodeManUnregisterProducer, " - "Vnode %s does not existed, failed. \n", vnode_sym); - ret = -1; goto errout; - } - - prod = vnode_prod_lookup(vnode_record->node, prod_sym); - if (prod == NULL) - { - MR_LOG(WARNING, BASE, "VNodeMan, VNodeManUnregisterProducer, " - "Vnode %s producer %s does not existed, failed. \n", - vnode_sym, prod_sym); - ret = -2; goto errout; - } - - ret = vnode_delete_prod(prod, &vnodeman_ops); - if (ret < 0) - { - MR_LOG(WARNING, BASE, "VNodeMan, VNodeManUnregisterProducer, " - "Cannot delete cons %s for vnode %s, failed. \n", - prod_sym, vnode_sym); - ret = -3; goto errout; - } - - return 0; -errout: - return ret; -} - -static int __vnodeman_register_consumer_unsafe(struct vnodeman_ctx * ctx_, - const char * vnode_sym, const char * cons_sym, unsigned int nr_cons_queue) -{ - struct vnode_record * node_record; - node_record = __vnodeman_lookup_unsafe(ctx_, vnode_sym); - if(node_record == NULL) - { - MR_LOG(INFO, BASE, "VNodeMan, VNodeManRegisterConsumer, " - "Vnode %s does not existed, failed. \n", vnode_sym); - return -1; - } - - struct vnode_cons * cons; - cons = vnode_create_cons(node_record->node, - &vnodeman_ops, cons_sym, nr_cons_queue); - - if(cons == NULL) - { - MR_LOG(INFO, BASE, "VNodeMan, VNodeManRegisterConsumer, " - "Cannot create consumer %s for vnode %s \n", cons_sym, vnode_sym); - return -2; - } - - return 0; -} - -static int __vnodeman_register_producer_unsafe(struct vnodeman_ctx * ctx_, -const char * vnode_sym, const char * prod_sym, unsigned int nr_prod_queue) -{ - struct vnode_record * node_record; - node_record = __vnodeman_lookup_unsafe(ctx_, vnode_sym); - if (node_record == NULL) - { - MR_LOG(INFO, BASE, "VNodeMan, VNodeManRegisterProducer, " - "Vnode %s does not existed, failed. \n", vnode_sym); - return -1; - } - - struct vnode_prod * prod; - prod = vnode_create_prod(node_record->node, - &vnodeman_ops, prod_sym, nr_prod_queue); - - if (prod == NULL) - { - MR_LOG(INFO, BASE, "VNodeMan, VNodeManRegisterProducer, " - "Cannot create consumer %s for vnode %s. \n", prod_sym, vnode_sym); - return -2; - } - - return 0; -} - -struct vnodeman_attach_desc * __vnodeman_consumer_attach_unsafe( - struct vnodeman_ctx * ctx_, const char * vnode_sym, const char * cons_sym) -{ - struct vnodeman_attach_desc * desc; - desc = rte_zmalloc(NULL, sizeof(struct vnodeman_attach_desc), 0); - if(desc == NULL) - { - MR_LOG(INFO, BASE, "VNodeMan, ConsumerAttach, " - "Cannot alloc memory for vnode_attach_desc, failed. \n"); - goto errout; - } - - struct vnode_record * vnode_record = NULL; - struct vnode_cons * cons = NULL; - - vnode_record = __vnodeman_lookup_unsafe(ctx_, vnode_sym); - if(vnode_record == NULL) - { - MR_LOG(INFO, BASE, "VNodeMan, ConsumerAttach, " - "Vnode %s does not existed, failed. \n", vnode_sym); - goto errout; - } - - cons = vnode_cons_lookup(vnode_record->node, cons_sym); - if(cons == NULL) - { - MR_LOG(INFO, BASE, "VNodeMan, ConsumerAttach, " - "Vnode %s consumer %s does not existed, failed. \n", - vnode_sym, cons_sym); - goto errout; - } - - int queue_id; - queue_id = vnode_cons_attach(vnode_record->node, cons); - if(queue_id < 0) - { - MR_LOG(INFO, BASE, "VNodeMan, ConsumerAttach, " - "Cannot attach to vnode %s consumer %s, failed. \n", - vnode_sym, cons_sym); - goto errout; - } - - desc->node = vnode_record->node; - desc->desc_type = VNODEMAN_ATTACH_CONSUMER; - desc->queue_id = queue_id; - desc->cons = cons; - - MR_LOG(INFO, BASE, "VNodeMan, ConsumerAttach, " - "Attached to vnode %s consumer %s, queue_id=%d.\n", - vnode_sym, cons_sym, desc->queue_id); - - return desc; - -errout: - if (desc) rte_free(desc); - return NULL; -} - -struct vnodeman_attach_desc * __vnodeman_producer_attach_unsafe( - struct vnodeman_ctx * ctx_, const char * vnode_sym, const char * prod_sym) -{ - struct vnodeman_attach_desc * desc; - desc = rte_zmalloc(NULL, sizeof(struct vnodeman_attach_desc), 0); - if (desc == NULL) - { - MR_LOG(INFO, BASE, "VNodeMan, ProducerAttach, " - "Cannot alloc memory for vnode_attach_desc, failed. \n"); - goto errout; - } - - struct vnode_record * vnode_record = NULL; - struct vnode_prod * prod = NULL; - - vnode_record = __vnodeman_lookup_unsafe(ctx_, vnode_sym); - if (vnode_record == NULL) - { - MR_LOG(INFO, BASE, "VNodeMan, ProducerAttach, " - "Vnode %s does not existed, failed. \n", vnode_sym); - goto errout; - } - - prod = vnode_prod_lookup(vnode_record->node, prod_sym); - if (prod == NULL) - { - MR_LOG(INFO, BASE, "VNodeMan, ProducerAttach, " - "Vnode %s producer %s does not existed, failed. \n", - vnode_sym, prod_sym); - goto errout; - } - - int queue_id; - queue_id = vnode_prod_attach(vnode_record->node, prod); - if (queue_id < 0) - { - MR_LOG(INFO, BASE, "VNodeMan, ProducerAttach, " - "Cannot attach to vnode %s producer %s, failed. \n", - vnode_sym, prod_sym); - goto errout; - } - - desc->node = vnode_record->node; - desc->desc_type = VNODEMAN_ATTACH_PRODUCER; - desc->queue_id = queue_id; - desc->prod = prod; - - MR_LOG(INFO, BASE, "VNodeMan, ProducerAttach, " - "Attached to vnode %s producer %s, queue_id=%d.\n", - vnode_sym, prod_sym, desc->queue_id); - - return desc; - -errout: - if (desc) rte_free(desc); - return NULL; -} - -// TODO: 上锁粒度太大,应该尽量降低锁的粒度或使用无锁实现。 -int mr_vnodeman_create(const char * sym, size_t sz_tunnel, size_t sz_buffer, flags_t flags) -{ - int ret = 0; - struct vnodeman_ctx * ctx = mr_core_default_instance_get()->vnodeman_ctx; - - mr_spin_lock(&ctx->lock); - ret = __vnodeman_create_unsafe(ctx, sym, sz_tunnel, sz_buffer, flags); - mr_spin_unlock(&ctx->lock); - return ret; -} - -int mr_vnodeman_unregister_consumer(const char * vnode_sym, const char * cons_sym) -{ - int ret = 0; - struct vnodeman_ctx * ctx = mr_core_default_instance_get()->vnodeman_ctx; - - mr_spin_lock(&ctx->lock); - ret = __vnodeman_unregister_consumer_unsafe(ctx, vnode_sym, cons_sym); - mr_spin_unlock(&ctx->lock); - return ret; -} - -int mr_vnodeman_unregister_producer(const char * vnode_sym, const char * cons_sym) -{ - int ret = 0; - struct vnodeman_ctx * ctx = mr_core_default_instance_get()->vnodeman_ctx; - - mr_spin_lock(&ctx->lock); - ret = __vnodeman_unregister_producer_unsafe(ctx, vnode_sym, cons_sym); - mr_spin_unlock(&ctx->lock); - return ret; -} - -int mr_vnodeman_register_consumer(const char * vnode_sym, const char * cons_sym, - unsigned int nr_cons_queue) -{ - int ret = 0; - struct vnodeman_ctx * ctx = mr_core_default_instance_get()->vnodeman_ctx; - - mr_spin_lock(&ctx->lock); - ret = __vnodeman_register_consumer_unsafe(ctx, vnode_sym, cons_sym, nr_cons_queue); - mr_spin_unlock(&ctx->lock); - return ret; -} - -int mr_vnodeman_register_producer(const char * vnode_sym, const char * prod_sym, - unsigned int nr_prod_queue) -{ - int ret = 0; - struct vnodeman_ctx * ctx = mr_core_default_instance_get()->vnodeman_ctx; - - mr_spin_lock(&ctx->lock); - ret = __vnodeman_register_producer_unsafe(ctx, vnode_sym, prod_sym, nr_prod_queue); - mr_spin_unlock(&ctx->lock); - return ret; -} - -struct vnodeman_attach_desc * mr_vnodeman_consumer_attach(const char * vnode_sym, - const char * cons_sym) -{ - struct vnodeman_ctx * ctx = mr_core_default_instance_get()->vnodeman_ctx; - struct vnodeman_attach_desc * desc; - mr_spin_lock(&ctx->lock); - desc = __vnodeman_consumer_attach_unsafe(ctx, vnode_sym, cons_sym); - mr_spin_unlock(&ctx->lock); - return desc; -} - -struct vnodeman_attach_desc * mr_vnodeman_producer_attach(const char * vnode_sym, - const char * prod_sym) -{ - struct vnodeman_ctx * ctx = mr_core_default_instance_get()->vnodeman_ctx; - - struct vnodeman_attach_desc * desc; - mr_spin_lock(&ctx->lock); - desc = __vnodeman_producer_attach_unsafe(ctx, vnode_sym, prod_sym); - mr_spin_unlock(&ctx->lock); - return desc; -} - -int vnodeman_init(struct mr_core_instance * core_instance) -{ - core_instance->vnodeman_ctx = rte_zmalloc("VNODEMAN_CTX", - sizeof(struct vnodeman_ctx), 0); - MR_CHECK(core_instance->vnodeman_ctx != NULL, - "Cannot alloc memory for vnodeman context, Failed. "); - - struct vnodeman_ctx * ctx = core_instance->vnodeman_ctx; - TAILQ_INIT(&ctx->vnode_list); - mr_spin_init(&ctx->lock); - return 0; -}
\ No newline at end of file diff --git a/core/src/vnode.c b/core/src/vnode.c index 402d043..66a1420 100644 --- a/core/src/vnode.c +++ b/core/src/vnode.c @@ -1,8 +1,8 @@ /* Virtual Data-Node Library for MARSIO - Author : Lu Qiuwen<[email protected]> - Date : 2016-08-10 - + Author : Lu Qiuwen<[email protected]> Zheng Chao<[email protected]> + Date : 2016-12-10 + TODO: Try to use RCU in thread safe mode. */ @@ -17,6 +17,7 @@ #include <stdio.h> #include <stdlib.h> #include <assert.h> +#include <unistd.h> //usleep #include <sys/queue.h> #include <mr_common.h> @@ -71,7 +72,7 @@ struct tunnel_desc }; /* Alloc a tunnel for fixed size */ -struct tunnel_desc * tunnel_new(const char * symbol, unsigned int size, +struct tunnel_desc * tunnel_new(const char * symbol, unsigned int size, unsigned int sz_buffer) { struct tunnel_desc * desc = (struct tunnel_desc *) @@ -84,7 +85,7 @@ struct tunnel_desc * tunnel_new(const char * symbol, unsigned int size, return NULL; } - desc->tunnel_object = rte_ring_create(symbol, size, + desc->tunnel_object = rte_ring_create(symbol, size, SOCKET_ID_ANY, RING_F_SC_DEQ | RING_F_SP_ENQ); if (unlikely(desc->tunnel_object == NULL)) @@ -94,7 +95,7 @@ struct tunnel_desc * tunnel_new(const char * symbol, unsigned int size, __str_errno()); return NULL; } - + snprintf(desc->symbol, sizeof(desc->symbol), "%s", symbol); desc->tunnel_size = size; @@ -103,28 +104,36 @@ struct tunnel_desc * tunnel_new(const char * symbol, unsigned int size, desc->en_buffer = rte_zmalloc(NULL, sizeof(void *) * sz_buffer, 0); desc->sz_en_buffer = sz_buffer; - + if (unlikely(desc->en_buffer == NULL)) { MR_LOG(INFO, BASE, "LibVNode, TunnelBlockNew, " "Cannot alloc memory for tunnel buffer\n"); return NULL; } - + return desc; } /* Delete a tunnel */ -int tunnel_delete(struct tunnel_desc * desc) +int tunnel_delete(struct tunnel_desc * desc, struct vnode_ops * ops) { + for (int i = 0; i < desc->sz_en_buffer_used; i++) + ops->op_object_delete(desc->en_buffer[i]); + + void * object; + while (rte_ring_dequeue(desc->tunnel_object, &object) == 0) + ops->op_object_delete(object); + + assert(rte_ring_empty(desc->tunnel_object) == 1); rte_free(desc->en_buffer); rte_ring_free(desc->tunnel_object); rte_free(desc); return 0; } -static inline void tunnel_enqueue_without_buffer(struct tunnel_desc * desc, - struct vnode_ops * ops, void * obj) +static inline void tunnel_enqueue_without_buffer(struct tunnel_desc * desc, + struct vnode_ops * ops, void * obj) { assert(desc->en_buffer == NULL && desc->sz_en_buffer == 0); int ret = rte_ring_enqueue_burst(desc->tunnel_object, &obj, 1); @@ -133,7 +142,7 @@ static inline void tunnel_enqueue_without_buffer(struct tunnel_desc * desc, } static inline void tunnel_enqueue(struct vnode_prod * prod, struct vnode_cons * cons, - unsigned int prodq, unsigned int consq, struct tunnel_desc * desc, + unsigned int prodq, unsigned int consq, struct tunnel_desc * desc, struct vnode_ops * ops, void * obj) { // append the object at the tail of enqueue buffer. @@ -153,7 +162,7 @@ static inline void tunnel_enqueue(struct vnode_prod * prod, struct vnode_cons * int n_to_send = desc->sz_en_buffer; int n_send = rte_ring_sp_enqueue_burst(desc->tunnel_object, desc->en_buffer, n_to_send); - + if (likely(n_send == n_to_send)) goto out; // release all the objects which not send successfully. @@ -173,7 +182,7 @@ out: VNODE_STAT_UPDATE(cons, consq, on_line, n_to_send); VNODE_STAT_UPDATE(cons, consq, recieved, n_send); VNODE_STAT_UPDATE(cons, consq, missed, n_to_send - n_send); - + // 清空缓冲区 desc->sz_en_buffer_used = 0; return; @@ -189,12 +198,12 @@ static inline int tunnel_dequeue(struct tunnel_desc * desc, struct vnode_ops * o /* +-------------+-------------+-------------+------------+------------+ | nr_prodq | nr_consq | descs[0][0] | desc[0][1] | .......... | +-------------+-------------+-------------+------------+------------+ - Len = sizeof(nr_prodq) + sizeof(nr_consq) + - sizeof(descs) * nr_prodq * nr_consq + Len = sizeof(nr_prodq) + sizeof(nr_consq) + + sizeof(descs) * nr_prodq * nr_consq */ -static inline struct tunnel_desc ** tunnel_block_locate(struct tunnel_block * block, +static inline struct tunnel_desc ** tunnel_block_locate(struct tunnel_block * block, unsigned int prodq_id, unsigned int consq_id) { assert(prodq_id < block->nr_prodq && consq_id < block->nr_consq); @@ -209,7 +218,7 @@ int tunnel_block_delete(struct tunnel_block * block, struct vnode_ops * ops) for (int consq_id = 0; consq_id < block->nr_consq; consq_id++) { if (*tunnel_block_locate(block, prodq_id, consq_id) != NULL) - tunnel_delete(*tunnel_block_locate(block, prodq_id, consq_id)); + tunnel_delete(*tunnel_block_locate(block, prodq_id, consq_id), ops); } } @@ -240,19 +249,19 @@ int tunnel_block_try_gc(struct tunnel_block * block, struct vnode_ops * ops) /* Alloc a block of tunnels, and init all the tunnels */ struct tunnel_block * tunnel_block_new(struct vnode_ops * ops, const char * symbol, - struct vnode_prod * prod, struct vnode_cons * cons, + struct vnode_prod * prod, struct vnode_cons * cons, unsigned int tunnel_size, unsigned int tun_sz_buffer) { unsigned int nr_prodq = prod->nr_prodq; unsigned int nr_consq = cons->nr_consq; - + unsigned int block_size = sizeof(struct tunnel_block) + sizeof(struct tunnel_desc *) * (nr_prodq * nr_consq); - + struct tunnel_block * block = (struct tunnel_block *) rte_zmalloc(NULL, block_size, 0); - if(block == NULL) + if (block == NULL) { MR_LOG(INFO, BASE, "LibVNode, TunnelBlockNew, " "Cannot alloc memory for block storage, nr_prodq=%d" @@ -287,9 +296,9 @@ struct tunnel_block * tunnel_block_new(struct vnode_ops * ops, const char * symb } MR_LOG(DEBUG, BASE, "LibVNode, TunnelBlockNew, " - "Tunnel %s(object=%p, sz_tunnel=%d, sz_buffer=%d) created.\n", + "Tunnel %s(object=%p, sz_tunnel=%d, sz_buffer=%d) created.\n", tunnel_sym, tdesc->tunnel_object, tunnel_size, tun_sz_buffer); - + *tunnel_block_locate(block, prodq_id, consq_id) = tdesc; } } @@ -332,7 +341,7 @@ static inline int tunnel_block_dequeue(struct tunnel_block * block, struct vnode { unsigned int nr_obj = 0, nr_obj_recv = 0; unsigned int nr_obj_left = nr_max_obj; - + for (int prodq = 0; prodq < block->nr_prodq; prodq++) { struct tunnel_desc * tunnel = *tunnel_block_locate(block, prodq, consq); @@ -341,7 +350,7 @@ static inline int tunnel_block_dequeue(struct tunnel_block * block, struct vnode nr_obj_left -= nr_obj_recv; } - assert(nr_obj <= nr_max_obj); + assert(nr_obj <= nr_max_obj); return nr_obj; } @@ -377,13 +386,49 @@ struct vnode * vnode_create(const char * sym, struct vnode_ops * ops, object->nr_prod = 0; object->sz_tunnel = sz_tunnel; object->sz_tunnel_buffer = sz_tunnel_buffer; - + mr_spin_rwlock_init(&object->rwlock); TAILQ_INIT(&object->cons_list); TAILQ_INIT(&object->prod_list); return object; } +int add_block_to_list(struct tunnel_block** list, struct tunnel_block* block, int size, int *max_idx) +{ + int i = 0; + for (i = 0; i < size; i++) + { + if (list[i] == NULL) + { + list[i] = block;// assume this operation is a transaction + break; + } + } + if (i == size) + { + return -1; + } + + if (i > *max_idx) + { + assert(*max_idx + 1 == i); + (*max_idx)++; + } + + return 0; +} +int renew_max_id(struct tunnel_block** list, int size) +{ + int i = 0, cnt = 0; + for (i = 0; i < size; i++) + { + if (list[i] != NULL) + { + cnt++; + } + } + return cnt; +} /* Vnode prod or cons increase or decrease functions These functions are thread unsafe */ @@ -400,38 +445,24 @@ int vnode_prod_increase_unsafe(struct vnode * vnode, struct vnode_ops * ops, TAILQ_FOREACH(cons, &vnode->cons_list, next) { char block_sym[MR_SYMBOL_MAX]; - snprintf(block_sym, sizeof(block_sym), "%s-%s-%s", + snprintf(block_sym, sizeof(block_sym), "%s-%s-%s", vnode->symbol, prod->symbol, cons->symbol); nr_consq = cons->nr_consq; // create commucation tunnel for each cons and prods - block = tunnel_block_new(ops, block_sym, prod, cons, + block = tunnel_block_new(ops, block_sym, prod, cons, vnode->sz_tunnel, vnode->sz_tunnel_buffer); if (block == NULL) { MR_LOG(INFO, BASE, "LibVNode, VNodeProdIncrease, " - "Cannot create block for prod %s to cons %s\n", + "Cannot create block for prod %s to cons %s\n", prod->symbol, cons->symbol); return -1; } - // create list item - struct tunnel_block_list_item * tblist_prod_item; - tblist_prod_item = tblist_item_new(block); - struct tunnel_block_list_item * tblist_cons_item; - tblist_cons_item = tblist_item_new(block); - - if(tblist_prod_item == NULL || tblist_cons_item == NULL) - { - MR_LOG(INFO, BASE, "LibVNode, VNodeProdIncrease, " - "Cannot create block item for prod %s to cons %s\n", - prod->symbol, cons->symbol); - return -2; - } - // insert block into prod block list and cons block list - TAILQ_INSERT_TAIL(&prod->block_list, tblist_prod_item, next); - TAILQ_INSERT_TAIL(&cons->block_list, tblist_cons_item, next); + add_block_to_list(prod->block_list, block, MR_CONS_MAX, &(prod->max_idx)); + add_block_to_list(cons->block_list, block, MR_PROD_MAX, &(cons->max_idx)); prod->nr_block++; cons->nr_block++; @@ -448,7 +479,7 @@ int vnode_prod_increase_unsafe(struct vnode * vnode, struct vnode_ops * ops, } int vnode_cons_increase_unsafe(struct vnode * vnode, struct vnode_ops * ops, -struct vnode_cons * cons) + struct vnode_cons * cons) { int nr_consq = cons->nr_consq; int nr_prodq = 0; @@ -459,43 +490,28 @@ struct vnode_cons * cons) TAILQ_FOREACH(prod, &vnode->prod_list, next) { char block_sym[MR_SYMBOL_MAX]; - snprintf(block_sym, sizeof(block_sym), "%s-%s-%s", + snprintf(block_sym, sizeof(block_sym), "%s-%s-%s", vnode->symbol, prod->symbol, cons->symbol); nr_prodq = prod->nr_prodq; - + // create commucation tunnel for each cons and prods - block = tunnel_block_new(ops, block_sym, prod, cons, + block = tunnel_block_new(ops, block_sym, prod, cons, vnode->sz_tunnel, vnode->sz_tunnel_buffer); if (block == NULL) { MR_LOG(INFO, BASE, "LibVNode, VNodeConsIncrease, " - "Cannot create block for prod %s to cons %s\n", - prod->symbol, cons->symbol); - return -1; - } - - // create list item - struct tunnel_block_list_item * tblist_prod_item; - tblist_prod_item = tblist_item_new(block); - struct tunnel_block_list_item * tblist_cons_item; - tblist_cons_item = tblist_item_new(block); - - if (tblist_prod_item == NULL || tblist_cons_item == NULL) - { - MR_LOG(INFO, BASE, "LibVNode, VNodeProdIncrease, " - "Cannot create block item for prod %s to cons %s\n", + "Cannot create block for prod %s to cons %s\n", prod->symbol, cons->symbol); - return -2; + return -1; } - // insert block into prod block list and cons block list - TAILQ_INSERT_TAIL(&prod->block_list, tblist_prod_item, next); - TAILQ_INSERT_TAIL(&cons->block_list, tblist_cons_item, next); + add_block_to_list(prod->block_list, block, MR_CONS_MAX, &(prod->max_idx)); + add_block_to_list(cons->block_list, block, MR_PROD_MAX, &(cons->max_idx)); prod->nr_block++; cons->nr_block++; MR_LOG(DEBUG, BASE, "LibVNode, VNodeConsIncrease, " - "Insert block %s(object=%p, nr_prodq=%d, nr_consq=%d) in prod %s(obj=%p)" + "Insert block %s(object=%p, nr_prodq=%d, nr_consq=%d) in prod %s(obj=%p)" "and cons %s(obj=%p)\n", block_sym, block, nr_prodq, nr_consq, prod->symbol, prod, cons->symbol, cons); } @@ -509,30 +525,54 @@ struct vnode_cons * cons) // 垃圾回收,回收已经没有用的TunnelBlock void vnode_cons_block_gc_unsafe(struct vnode_cons * cons, struct vnode_ops * ops) { - struct tunnel_block_list_item * tblist_item_iter; - TAILQ_FOREACH(tblist_item_iter, &cons->block_list, next) + struct tunnel_block* block = NULL; + int i = 0, ret = 0; + for (i = 0; i < cons->max_idx + 1; i++) { - int ret = tunnel_block_try_gc(tblist_item_iter->block, ops); - if (ret < 0) continue; + block = cons->block_list[i]; + if (block == NULL) + { + continue; + } - TAILQ_REMOVE(&cons->block_list, tblist_item_iter, next); - tblist_item_delete(tblist_item_iter); + ret = tunnel_block_try_gc(block, ops); + if (ret < 0) + { + continue; + } + + cons->block_list[i] = NULL; + cons->nr_block--; + rte_mb(); } + if (i == cons->max_idx) + { + cons->max_idx = renew_max_id(cons->block_list, MR_PROD_MAX); + } return; } // 垃圾回收,回收已经没有用的TunnelBlock void vnode_prod_block_gc_unsafe(struct vnode_prod * prod, struct vnode_ops * ops) { - struct tunnel_block_list_item * tblist_item_iter; - TAILQ_FOREACH(tblist_item_iter, &prod->block_list, next) + struct tunnel_block* block = NULL; + int i = 0, ret = 0; + for (i = 0; i < prod->max_idx + 1; i++) { - int ret = tunnel_block_try_gc(tblist_item_iter->block, ops); - if (ret < 0) continue; - - TAILQ_REMOVE(&prod->block_list, tblist_item_iter, next); - tblist_item_delete(tblist_item_iter); + block = prod->block_list[i]; + if (block == NULL) + { + continue; + } + ret = tunnel_block_try_gc(block, ops); + if (ret < 0) + { + continue; + } + prod->block_list[i] = NULL; + prod->nr_block--; + rte_mb(); } return; @@ -544,7 +584,7 @@ struct vnode_prod * vnode_create_prod(struct vnode * vnode, struct vnode_ops * o { int ret = 0; struct vnode_prod * prod = rte_zmalloc(NULL, sizeof(struct vnode_prod), 0); - if(prod == NULL) + if (prod == NULL) { MR_LOG(INFO, BASE, "LibVNode, VNodeProdCreate, " "Cannot alloc memory for prod description\n"); @@ -557,15 +597,6 @@ struct vnode_prod * vnode_create_prod(struct vnode * vnode, struct vnode_ops * o prod->vnode = vnode; prod->nr_prodq = nr_prodq; prod->nr_block = 0; - prod->prodq = rte_zmalloc(NULL, sizeof(struct vnode_prod) * prod->nr_prodq, 0); - TAILQ_INIT(&prod->block_list); - - if(prod->prodq == NULL) - { - MR_LOG(INFO, BASE, "LibVNode, VNodeProdCreate, " - "Cannot alloc memory for prodq descriptions\n"); - goto err; - } __write_lock(&vnode->rwlock); ret = vnode_prod_increase_unsafe(vnode, ops, prod); @@ -575,7 +606,6 @@ struct vnode_prod * vnode_create_prod(struct vnode * vnode, struct vnode_ops * o return prod; err: - if (prod != NULL && prod->prodq != NULL) rte_free(prod->prodq); if (prod != NULL) rte_free(prod); return NULL; } @@ -585,7 +615,7 @@ struct vnode_cons * vnode_create_cons(struct vnode * vnode, struct vnode_ops * o { int ret = 0; struct vnode_cons * cons = rte_zmalloc(NULL, sizeof(struct vnode_cons), 0); - if(cons == NULL) + if (cons == NULL) { MR_LOG(INFO, BASE, "LibVNode, VNodeConsCreate, " "Cannot alloc memory for cons description\n"); @@ -598,15 +628,6 @@ struct vnode_cons * vnode_create_cons(struct vnode * vnode, struct vnode_ops * o cons->vnode = vnode; cons->nr_consq = nr_consq; cons->nr_block = 0; - cons->consq = rte_zmalloc(NULL, sizeof(struct vnode_consq) * cons->nr_consq, 0); - TAILQ_INIT(&cons->block_list); - - if(cons->consq == NULL) - { - MR_LOG(INFO, BASE, "LibVNode, VNodeConsCreate, " - "Cannot alloc memory for consq descriptions\n"); - goto err; - } __write_lock(&vnode->rwlock); ret = vnode_cons_increase_unsafe(vnode, ops, cons); @@ -616,7 +637,6 @@ struct vnode_cons * vnode_create_cons(struct vnode * vnode, struct vnode_ops * o return cons; err: - if (cons != NULL && cons->consq != NULL) rte_free(cons->consq); if (cons != NULL) rte_free(cons); return NULL; } @@ -634,7 +654,7 @@ struct vnode_prod * vnode_prod_lookup(struct vnode * vnode, const char * sym) TAILQ_FOREACH(prod_iter, &vnode->prod_list, next) { - if(strncmp(sym, prod_iter->symbol, sizeof(prod_iter->symbol)) == 0) + if (strncmp(sym, prod_iter->symbol, sizeof(prod_iter->symbol)) == 0) { ret = prod_iter; break; @@ -718,35 +738,49 @@ struct vnode_cons_stat * vnode_cons_stat_get(struct vnode_cons * cons) return cons->stat; } +void synchronize_dataplane() +{ + usleep(10);//assume each function in operation will finished after such time. +} + int vnode_delete_prod(struct vnode_prod * prod, struct vnode_ops * ops) { assert(prod != NULL && prod->vnode != NULL && ops != NULL); - - __write_lock(&prod->vnode->rwlock); - + int i = 0; + struct tunnel_block* block = NULL; + struct vnode * vnode = prod->vnode; - struct tunnel_block_list_item * tblist_item_iter; - + __write_lock(&prod->vnode->rwlock); + // 删除引用的Block - TAILQ_FOREACH(tblist_item_iter, &prod->block_list, next) + for (i = 0; i < prod->max_idx + 1; i++) { - assert(tblist_item_iter != NULL); - tunnel_block_try_delete(tblist_item_iter->block, ops); - tblist_item_delete(tblist_item_iter); + block = prod->block_list[i]; + if (block == NULL) + { + continue; + } + assert(block->deleted == 0); + tunnel_block_try_delete(block, ops); + prod->block_list[i] = NULL; + } - // 从VNode中移除生产者描述符 + //remove Producer from VNode struct vnode_prod_list * prod_list_head = &prod->vnode->prod_list; TAILQ_REMOVE(prod_list_head, prod, next); + prod->vnode->nr_prod--; + + rte_mb(); + synchronize_dataplane(); - // 执行GC,移除所有消费者描述符中引用的Block struct vnode_cons * cons_iter; TAILQ_FOREACH(cons_iter, &vnode->cons_list, next) { vnode_cons_block_gc_unsafe(cons_iter, ops); } - __write_unlock(&prod->vnode->rwlock); + return 0; } @@ -754,21 +788,28 @@ int vnode_delete_cons(struct vnode_cons * cons, struct vnode_ops * ops) { assert(cons != NULL && ops != NULL); __write_lock(&cons->vnode->rwlock); - + struct vnode * vnode = cons->vnode; - struct tunnel_block_list_item * tblist_item_iter; + struct tunnel_block* block = NULL; // 删除引用的Block - TAILQ_FOREACH(tblist_item_iter, &cons->block_list, next) + for (int i = 0; i < cons->max_idx + 1; i++) { - assert(tblist_item_iter != NULL); - tunnel_block_try_delete(tblist_item_iter->block, ops); - tblist_item_delete(tblist_item_iter); + block = cons->block_list[i]; + if (block == NULL) + { + continue; + } + tunnel_block_try_delete(block, ops); } + rte_mb(); + synchronize_dataplane(); + // 从VNode中移除消费者描述符 struct vnode_cons_list * cons_list_head = &cons->vnode->cons_list; TAILQ_REMOVE(cons_list_head, cons, next); + cons->vnode->nr_cons--; // 执行GC,移除所有生产者描述符中引用的Block struct vnode_prod * prod_iter; @@ -806,29 +847,28 @@ int vnode_delete(struct vnode * vnode, struct vnode_ops * ops) */ int vnode_enqueue_burst_with_hash(struct vnode_prod * prod, struct vnode_ops * ops, - unsigned int prodq, void * objects[], uint32_t hash[], int nr_objects) + unsigned int prodq, void * objects[], uint32_t hash[], int nr_objects) { - int ret; + int ret = 0; assert(nr_objects <= MR_LIBVNODE_MAX_SZ_BURST); - __read_lock(&prod->vnode->rwlock); - + struct tunnel_block* block = NULL, *last_block = NULL; // get the last block, this block are enqueued with the original objects. - struct tunnel_block_list_item * tblist_item_last; - tblist_item_last = TAILQ_LAST(&prod->block_list, tunnel_block_list); - - // tblist_item_last is null when there is not registerd consumer. - if(unlikely(tblist_item_last == NULL)) + last_block = prod->block_list[prod->max_idx]; + // last_block is null when there is not registerd consumer. + if (unlikely(last_block == NULL || last_block->deleted == 1)) { ret = -1; goto out; } - assert(tblist_item_last != NULL && tblist_item_last->block != NULL); // check all the blocks expect last block, and dup objects and enqueue them. - struct tunnel_block_list_item * tblist_item_iter; - TAILQ_FOREACH(tblist_item_iter, &prod->block_list, next) + for (int block_id = 0; block_id < prod->max_idx; block_id++) { - if (tblist_item_iter->block == tblist_item_last->block) break; + block = prod->block_list[block_id]; + if (block == NULL || block->deleted == 1) + { + continue; + } void * dup_objects[MR_LIBVNODE_MAX_SZ_BURST]; unsigned int nr_dup_objects = 0; @@ -845,33 +885,36 @@ int vnode_enqueue_burst_with_hash(struct vnode_prod * prod, struct vnode_ops * o } //TODO: Dup错误之后,hash值如何对应 - tunnel_block_enqueue_with_hash(tblist_item_iter->block, ops, prodq, + tunnel_block_enqueue_with_hash(block, ops, prodq, dup_objects, hash, nr_dup_objects); } // enqueue for the last block, use original objects. - tunnel_block_enqueue_with_hash(tblist_item_last->block, ops, prodq, + tunnel_block_enqueue_with_hash(last_block, ops, prodq, objects, hash, nr_objects); ret = 0; goto out; out: - __read_unlock(&prod->vnode->rwlock); return ret; } int vnode_dequeue_burst(struct vnode_cons * cons, struct vnode_ops * ops, - unsigned int consq, void * objects[], int nr_max_objects) + unsigned int consq, void * objects[], int nr_max_objects) { int nr_used = 0; int nr_left = nr_max_objects; int nr_dequeue = 0; - - __read_lock(&cons->vnode->rwlock); - struct tunnel_block_list_item * tblist_item_iter; - TAILQ_FOREACH(tblist_item_iter, &cons->block_list, next) + int i = 0; + struct tunnel_block* block = NULL; + for (i = 0; i < cons->max_idx + 1; i++) { - nr_dequeue = tunnel_block_dequeue(tblist_item_iter->block, + block = cons->block_list[i]; + if (block == NULL || block->deleted == 1) + { + continue; + } + nr_dequeue = tunnel_block_dequeue(block, ops, consq, &objects[nr_used], nr_left); nr_used += nr_dequeue; nr_left -= nr_dequeue; @@ -879,6 +922,6 @@ int vnode_dequeue_burst(struct vnode_cons * cons, struct vnode_ops * ops, assert(nr_left <= nr_max_objects); assert(nr_used <= nr_max_objects); - __read_unlock(&cons->vnode->rwlock); return nr_dequeue; -}
\ No newline at end of file +} + diff --git a/include/internal/mr_common.h b/include/internal/mr_common.h index 3911a26..51c7b05 100644 --- a/include/internal/mr_common.h +++ b/include/internal/mr_common.h @@ -30,6 +30,14 @@ extern "C" { #define MR_APP_MAX 64 #endif +#ifndef MR_CONS_MAX +#define MR_CONS_MAX 16 +#endif + +#ifndef MR_PROD_MAX +#define MR_PROD_MAX 16 +#endif + #ifndef MR_SID_MAX #define MR_SID_MAX 64 #endif @@ -116,6 +124,14 @@ static inline const char * __str_errno() return rte_strerror(rte_errno); } +#ifdef __GNUC__ +#define container_of(ptr, type, member) ({ \ + const typeof(((type *)0)->member) *__mptr = (ptr); \ + (type *)((char *)__mptr - offsetof(type,member));}) +#else +#define container_of(ptr, type,member) \ + ((type *)((char *)(ptr) - offsetof(type, member))) +#endif #ifdef __cplusplus } diff --git a/runtime/CMakeLists.txt b/runtime/CMakeLists.txt index e2ad0c6..b681298 100644 --- a/runtime/CMakeLists.txt +++ b/runtime/CMakeLists.txt @@ -6,6 +6,7 @@ include_directories(${DPDK_INCLUDE_DIR}) add_definitions(${DPDK_C_PREDEFINED})
include_directories(include)
-add_library(mruntime src/app.c src/ctx.c src/runtime.c src/hwinfo.c src/id.c src/export.c)
+add_library(mruntime src/app.c src/ctx.c src/runtime.c src/hwinfo.c src/id.c
+ src/event.c src/export.c)
target_link_libraries(mruntime PRIVATE rt pthread dl)
target_include_directories(mruntime INTERFACE "${CMAKE_CURRENT_SOURCE_DIR}/include/")
\ No newline at end of file diff --git a/runtime/include/mr_runtime.h b/runtime/include/mr_runtime.h index 94e27f6..7704e1a 100644 --- a/runtime/include/mr_runtime.h +++ b/runtime/include/mr_runtime.h @@ -61,4 +61,9 @@ unsigned int mr_appinfo_get_nr_max_thread(struct appinfo * appinfo); void * mr_app_priv_get(); void mr_app_priv_set(void * ptr); void * mr_thread_priv_get(); -void mr_thread_priv_set(void * ptr);
\ No newline at end of file +void mr_thread_priv_set(void * ptr); + +typedef void(*app_event_cb_t)(int app_event, const char * appsym, + struct appinfo * appinfo, void * arg); +int mr_app_crash_cb_register(app_event_cb_t cb, void * arg); +int mr_app_crash_raiser_register(const char * appsym);
\ No newline at end of file diff --git a/runtime/include/runtime.h b/runtime/include/runtime.h index 744f9d0..5fcc08e 100644 --- a/runtime/include/runtime.h +++ b/runtime/include/runtime.h @@ -5,6 +5,8 @@ struct app_manager; struct hwinfo_manager; struct id_manager; +struct app_ev_manager; +struct appinfo; #ifndef MR_RUNTIME_MAX_NR_EXIT_HANDLER #define MR_RUNTIME_MAX_NR_EXIT_HANDLER 64 @@ -16,6 +18,7 @@ struct mr_runtime_ctx struct app_manager * app_ctx; struct hwinfo_manager * hwinfo_ctx; struct id_manager * id_ctx; + struct app_ev_manager * ev_ctx; }; extern struct app_manager * app_manager_create(); @@ -38,6 +41,9 @@ extern int app_manager_appinfo_unregister(struct app_manager * object); extern int app_manager_thread_register(struct app_manager * object); extern int app_manager_thread_unregister(struct app_manager * object); extern int app_manager_appinfo_iterate(struct app_manager * object, struct appinfo ** appinfo); +extern struct appinfo * app_mamager_appinfo_lookup(struct app_manager * object, const char * appsym); +extern void app_mamager_crash_event_handler(int app_event, const char * appsym, + struct appinfo * appinfo, void * arg); // ��������������� extern struct id_manager * id_manager_create(); @@ -51,4 +57,13 @@ extern int id_manager_release_gsid(struct id_manager * object, thread_id_t start extern int mr_global_ctx_init(); extern int mr_global_ctx_slave_init(); extern int mr_global_cfg_init(); -extern int mr_global_cfg_slave_init();
\ No newline at end of file +extern int mr_global_cfg_slave_init(); + +// ������� +struct app_ev_manager * app_ev_manager_create(struct mr_runtime_ctx * rt_ctx); +void app_ev_manager_destory(struct app_ev_manager * object); + +typedef void(*app_event_cb_t)(int app_event, const char * appsym, struct appinfo * appinfo, void * arg); +int app_crash_cb_register(struct app_ev_manager * object, app_event_cb_t cb, void * arg); +int app_crash_raiser_register(struct app_ev_manager * object, const char * appsym); +void * app_thread_crash_monitor(void * arg);
\ No newline at end of file diff --git a/runtime/src/app.c b/runtime/src/app.c index f7e2ac3..ea1980a 100644 --- a/runtime/src/app.c +++ b/runtime/src/app.c @@ -457,6 +457,21 @@ int app_manager_appinfo_unregister(struct app_manager * object) return ret; } +#if 0 +typedef void(*app_event_cb_t)(int app_event, const char * appsym, + struct appinfo * appinfo, void * arg); +#endif + +void app_mamager_crash_event_handler(int app_event, const char * appsym, + struct appinfo * appinfo, void * arg) +{ + struct app_manager * object = (struct app_manager *)arg; + + mr_spin_lock(&object->lock); + __appinfo_delete_unsafe(object, appinfo); + mr_spin_unlock(&object->lock); +} + int app_manager_thread_register(struct app_manager * object) { struct appinfo * pinfo = currect_app_info; @@ -521,6 +536,16 @@ int app_manager_list_all_appinfo(struct app_manager * object, return ret; } +struct appinfo * app_mamager_appinfo_lookup(struct app_manager * object, + const char * appsym) +{ + struct appinfo * result = NULL; + mr_spin_lock(&object->lock); + result = __appinfo_lookup_unsafe(object, appsym); + mr_spin_unlock(&object->lock); + return result; +} + int app_manager_tinfo_iterate(struct appinfo * appinfo, struct thread_info ** tinfo, int * iterate) { diff --git a/runtime/src/event.c b/runtime/src/event.c new file mode 100644 index 0000000..adabe4e --- /dev/null +++ b/runtime/src/event.c @@ -0,0 +1,444 @@ +/* \brief Ӧ�ó����쳣״̬������¼�֪ͨ + * + * ���Ӧ���쳣�˳�״̬����֪ͨ�����̸�ģ�����Ӧ��ʹ�õ���Դ + * TODO: �ع����δ��룬֧�ָ�����첽�ź�֪ͨ���� + * + * \author Qiuwen Lu<[email protected]> + * \date 2016-12-09 + */ + +#include <errno.h> +#include <string.h> +#include <unistd.h> +#include <fcntl.h> +#include <netinet/in.h> +#include <sys/epoll.h> +#include <assert.h> +#include <rte_malloc.h> +#include <rte_devargs.h> + +#include <mr_common.h> +#include <mr_runtime.h> +#include <runtime.h> + +TAILQ_HEAD(crash_ev_raiser_list, crash_ev_raiser); +TAILQ_HEAD(crash_ev_cb_list, crash_ev_cb); + +/* �����¼�Դ */ +struct crash_ev_raiser +{ + TAILQ_ENTRY(crash_ev_raiser) next; + struct appinfo * appinfo; + char appsym[MR_SYMBOL_MAX]; + int connect_fd; +}; + +/* �����¼��������� */ +struct crash_ev_cb +{ + TAILQ_ENTRY(crash_ev_cb) next; + app_event_cb_t cb; + void * arg; +}; + +/* �����¼�Դע��ָ�ͨ��TCP���䣩 */ +struct crash_raiser_reg_cmd +{ + /* ���� */ + char symlen; + /* ���� */ + char appsym[MR_SYMBOL_MAX]; +}; + +struct app_ev_manager +{ +#define __MAX_CRASH_CBS 64 + + /* Rt��� */ + struct mr_runtime_ctx * rt_ctx; + /* Ӧ�ñ����¼�Դ */ + struct crash_ev_raiser_list crash_ev_raiser_list; + /* Ӧ�ñ����¼������ص����� */ + struct crash_ev_cb_list crash_ev_cb_list; + /* Ӧ�ñ�������ַ */ + struct sockaddr_in crash_sockaddr; +}; + +struct app_ev_manager * app_ev_manager_create(struct mr_runtime_ctx * rt_ctx) +{ + struct app_ev_manager * object = rte_zmalloc(NULL, sizeof(struct app_ev_manager), 0); + MR_CHECK(object != NULL, "Cannot alloc memory for app event manager"); + + object->rt_ctx = rt_ctx; + TAILQ_INIT(&object->crash_ev_cb_list); + TAILQ_INIT(&object->crash_ev_raiser_list); + return object; +} + +void app_ev_manager_destory(struct app_ev_manager * object) +{ + rte_free(object); +} + +/* ע������������� */ +int app_crash_cb_register(struct app_ev_manager * object, app_event_cb_t cb, + void * arg) +{ + struct crash_ev_cb * ev_cb = rte_zmalloc(NULL, sizeof(struct crash_ev_cb), 0); + MR_CHECK(ev_cb != NULL, "Cannot alloc memory for crash_ev_cb"); + + ev_cb->cb = cb; + ev_cb->arg = arg; + TAILQ_INSERT_TAIL(&object->crash_ev_cb_list, ev_cb, next); + return 0; +} + +/* ע������¼�Դ */ +int app_crash_raiser_register(struct app_ev_manager * object, const char * appsym) +{ + int conn_fd = socket(AF_INET, SOCK_STREAM, 0); + if (unlikely(conn_fd < 0)) + { + MR_LOG(ERR, BASE, "Cannot create socket for crash raiser register : %s. \n", + strerror(errno)); + return -1; + } + + int ret = connect(conn_fd, (const struct sockaddr *)&object->crash_sockaddr, + sizeof(object->crash_sockaddr)); + + if (unlikely(ret < 0)) + { + MR_LOG(ERR, BASE, "Connect to crash monitor failed : %s", strerror(errno)); + return -2; + } + + struct crash_raiser_reg_cmd reg_cmd; + snprintf(reg_cmd.appsym, sizeof(reg_cmd.appsym), "%s", appsym); + reg_cmd.symlen = strlen(reg_cmd.appsym); + + do { + ret = send(conn_fd, ®_cmd, sizeof(reg_cmd), MSG_NOSIGNAL); + } while (ret == -1 && (errno == EINTR)); + + assert(ret == sizeof(reg_cmd)); + return 0; +} + +/* �¼���������ע��Ӧ�� */ +static int app_crash_handle_new_raiser(struct app_ev_manager * object, + const char * appsym, int conn_fd) +{ + struct mr_runtime_ctx * rt_ctx = object->rt_ctx; + struct app_manager * app_manager = rt_ctx->app_ctx; + + struct appinfo * appinfo = app_mamager_appinfo_lookup(app_manager, appsym); + if (appinfo == NULL) + { + MR_LOG(ERR, BASE, "Application %s does not existed, " + "cannot monitor its crash state. \n", appsym); + return -1; + } + + struct crash_ev_raiser * raiser; + +#ifndef _NDEBUG + TAILQ_FOREACH(raiser, &object->crash_ev_raiser_list, next) + { + assert(strcmp(raiser->appsym, appsym) != 0); + } +#endif + + raiser = rte_zmalloc(NULL, sizeof(struct crash_ev_raiser), 0); + MR_CHECK(raiser != NULL, "Cannot alloc memory for crash raiser. "); + + snprintf(raiser->appsym, sizeof(raiser->appsym), "%s", appsym); + raiser->appinfo = appinfo; + raiser->connect_fd = conn_fd; + TAILQ_INSERT_TAIL(&object->crash_ev_raiser_list, raiser, next); + + MR_LOG(INFO, BASE, "Crash raiser from application %s registed. \n", appsym); + return 0; +} + +/* �¼�������Ӧ�ñ��� */ +static int app_crash_handle_crash_raiser(struct app_ev_manager * object, + struct crash_ev_raiser * raiser) +{ + MR_LOG(INFO, BASE, "Crash raise from applicaion %s. \n", raiser->appsym); + + struct crash_ev_cb * cb; + TAILQ_FOREACH(cb, &object->crash_ev_cb_list, next) + { + cb->cb(0, raiser->appsym, raiser->appinfo, cb->arg); + } + + TAILQ_REMOVE(&object->crash_ev_raiser_list, raiser, next); + rte_free(raiser); + return 0; +} + +#define __LISTEN_PORT_START 1024 +#define __LISTEN_PORT_END 65535 +#define __LISTEN_BACKEND 16 + +static int __create_monitor_fd(struct sockaddr_in * sockaddr_in) +{ + int try_times_max = 36; + int success = 0; + + // �������ض˿ڣ�TCP���� + int listen_fd = socket(AF_INET, SOCK_STREAM, 0); + if(unlikely(listen_fd < 0)) + { + MR_LOG(ERR, BASE, "Create crash monitor listen fd failed : %s. \n", + strerror(errno)); + goto out; + } + + int reuse = 1; + if (unlikely(setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, + (const void*)&reuse, sizeof(int)) < 0)) + { + MR_LOG(ERR, BASE, "Set crash monitor listen fd reuse failed : %s\n", + strerror(errno)); + goto out; + } + + while (try_times_max >= 0) + { + unsigned int random_port; + + // ���ѡ��һ�������˿ڣ����ؼ��� + random_port = rand() % (__LISTEN_PORT_END - __LISTEN_PORT_START) + __LISTEN_PORT_START; + assert(random_port >= __LISTEN_PORT_START && random_port <= __LISTEN_PORT_END); + + memset(sockaddr_in, 0, sizeof(struct sockaddr_in)); + sockaddr_in->sin_family = AF_INET; + sockaddr_in->sin_addr.s_addr = htonl(INADDR_LOOPBACK); + sockaddr_in->sin_port = htons(random_port); + + int ret = bind(listen_fd, (struct sockaddr *)sockaddr_in, + sizeof(struct sockaddr_in)); + + // �˿ڱ�ռ�ã�����ѡ��һ�����õĶ˿� + if (ret < 0 && errno == EADDRINUSE) + { + try_times_max--; + continue; + } + + // �������� + else if (ret < 0) + { + MR_LOG(ERR, BASE, "Bind crash monitor listen fd(PORT=%d) failed : %s.\n", + random_port, strerror(errno)); + goto out; + } + + ret = listen(listen_fd, 20); + if(ret < 0) + { + MR_LOG(ERR, BASE, "Listen crash monitor listen fd(PORT=%d) failed : %s.\n", + random_port, strerror(errno)); + goto out; + } + + success = 1; + break; + } + + if(success) + { + MR_LOG(INFO, BASE, "Crash monitor listen tcp fd(port=%d) create success.\n", + ntohs(sockaddr_in->sin_port)); + return listen_fd; + } + +out: + if (listen_fd > 0) close(listen_fd); + return -1; +} + +#define __EV_MAX_EVENTS 16 +#define __EV_MAX_BUFFER 2048 + + +// ���¹��ߺ�����MESATCP�н�ȡ +static int __tcp_setnonblock(int sockfd) +{ + int flags = fcntl(sockfd, F_GETFL, 0); + if (flags == -1) return -1; + int ret = fcntl(sockfd, F_SETFL, flags | O_NONBLOCK, 0); + if (ret == -1) return -1; + return 0; +} + +static int __update_epoll_event(int epoll_fd, int epoll_mode, int fd, int epoll_type) +{ + struct epoll_event event = { 0 }; + event.data.fd = fd; + event.events = epoll_type; + + __tcp_setnonblock(fd); + if (-1 == epoll_ctl(epoll_fd, epoll_mode, fd, &event)) + { + MR_LOG(ERR, BASE, "update epoll event failed : %s\n", + strerror(errno)); return -1; + } + + return 0; +} + +int __epoll_add_event(int epoll_fd, int sd, int epoll_type) +{ + return __update_epoll_event(epoll_fd, EPOLL_CTL_ADD, sd, epoll_type); +} + +int __epoll_del_event(int epoll_fd, int sd, int epoll_type) +{ + return __update_epoll_event(epoll_fd, EPOLL_CTL_DEL, sd, epoll_type); +} + +int __epoll_mod_event(int epoll_fd, int sd, int epoll_type) +{ + return __update_epoll_event(epoll_fd, EPOLL_CTL_MOD, sd, epoll_type); +} + +/* �����½��ļ������� */ +void __handle_accept(struct app_ev_manager * object, int epoll_fd, int listen_fd) +{ + struct sockaddr_in remote_addr; + socklen_t sz_remote_addr = sizeof(remote_addr); + + int conn_fd = accept(listen_fd, (struct sockaddr *)&remote_addr, &sz_remote_addr); + if(conn_fd < 0) + { + MR_LOG(ERR, BASE, "Accept remote connnection failed : %s", strerror(errno)); + return; + } + + __epoll_add_event(epoll_fd, conn_fd, EPOLLIN); + return; +} + +// �������ӹر���Ϣ�����ӹرմ���Ӧ�ñ��� +void __handle_close(struct app_ev_manager * object, int epoll_fd, int fd) +{ + struct crash_ev_raiser * raiser_iter; + TAILQ_FOREACH(raiser_iter, &object->crash_ev_raiser_list, next) + { + if (raiser_iter->connect_fd != fd) continue; + app_crash_handle_crash_raiser(object, raiser_iter); + } + + __epoll_del_event(epoll_fd, fd, EPOLLIN); + close(fd); + return; +} + +/* �����������ӵ�������� */ +void __handle_read(struct app_ev_manager * object, int epoll_fd, int fd) +{ + // ���뻺��������������С����ָ��� + char __read_buffer[sizeof(struct crash_raiser_reg_cmd)]; + ssize_t __read_buffer_used = 0; + ssize_t ret = 0; + + while ((ret = read(fd, __read_buffer + __read_buffer_used, + sizeof(__read_buffer) - __read_buffer_used)) > 0) + { + __read_buffer_used += ret; + } + + // ���ݶ���������ָ�� + if(__read_buffer_used == sizeof(struct crash_raiser_reg_cmd)) + { + struct crash_raiser_reg_cmd * cmd = (struct crash_raiser_reg_cmd *)__read_buffer; + if (cmd->symlen != strnlen(cmd->appsym, sizeof(cmd->appsym))) return; + app_crash_handle_new_raiser(object, cmd->appsym, fd); + return; + } + + if(ret < 0 && (errno == EWOULDBLOCK || errno == EAGAIN)) + { + return; + } + + if (ret == 0) + { + __handle_close(object, epoll_fd, fd); + return; + } + + //TODO: ��������������� + assert(0); + return; +} + +/* ��������߳� */ +//TODO: ���Epoll��ͨ�Ż��ƣ������Libevent�� + +void * app_thread_crash_monitor(void * arg) +{ + struct app_ev_manager * object = (struct app_ev_manager *)arg; + pthread_detach(pthread_self()); + + int listen_fd = __create_monitor_fd(&object->crash_sockaddr); + if(listen_fd < 0) + { + MR_LOG(ERR, BASE, "Launch crash monitor thread failed : " + "Cannot create listen fd. \n"); goto errout; + } + + int epoll_fd = epoll_create(16); + if(epoll_fd < 0) + { + MR_LOG(ERR, BASE, "Launch crash monitor thread failed : " + "Cannot create epoll fd. \n"); goto errout; + } + + struct epoll_event evlist[__EV_MAX_EVENTS]; + struct epoll_event ev; + + ev.events = EPOLLIN; + ev.data.fd = listen_fd; + if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev) < 0) + { + MR_LOG(ERR, BASE, "Crash monitor thread failed : " + "Add listenfd to epoll list error : %s \n", strerror(errno)); + goto errout; + } + + while(1) + { + int ret = epoll_wait(epoll_fd, evlist, __EV_MAX_EVENTS, 0); + if (ret == -1 && errno == EINTR) continue; + else if(ret == -1) + { + MR_LOG(ERR, BASE, "Crash monitor thread failed : " + "waiting on epoll fd : %s", strerror(errno)); + goto errout; + } + + for(int i = 0; i < ret; i++) + { + int fd = evlist[i].data.fd; + if(evlist[i].events & EPOLLIN) + { + if (fd == listen_fd) __handle_accept(object, epoll_fd, fd); + else __handle_read(object, epoll_fd, fd); + continue; + } + + if(evlist[i].events & ( EPOLLHUP | EPOLLERR)) + { + __handle_close(object, epoll_fd, fd); + } + } + } + +errout: + MR_LOG(ERR, BASE, "Crash monitor thread is exiting. \n"); + return (void *)NULL; +}
\ No newline at end of file diff --git a/runtime/src/export.c b/runtime/src/export.c index 6a145c3..a4b755a 100644 --- a/runtime/src/export.c +++ b/runtime/src/export.c @@ -103,3 +103,18 @@ int mr_id_manager_release_gsid(thread_id_t start_gsid, struct mr_runtime_ctx * rt_ctx = (struct mr_runtime_ctx *)g_ctx->ctx_runtime; return id_manager_release_gsid(rt_ctx->id_ctx, start_gsid, nr_thread); } + +// ������������ +int mr_app_crash_cb_register(app_event_cb_t cb, void * arg) +{ + struct mr_global_ctx * g_ctx = mr_global_ctx_get(); + struct mr_runtime_ctx * rt_ctx = (struct mr_runtime_ctx *)g_ctx->ctx_runtime; + return app_crash_cb_register(rt_ctx->ev_ctx, cb, arg); +} + +int mr_app_crash_raiser_register(const char * appsym) +{ + struct mr_global_ctx * g_ctx = mr_global_ctx_get(); + struct mr_runtime_ctx * rt_ctx = (struct mr_runtime_ctx *)g_ctx->ctx_runtime; + return app_crash_raiser_register(rt_ctx->ev_ctx, appsym); +}
\ No newline at end of file diff --git a/runtime/src/runtime.c b/runtime/src/runtime.c index 37bec21..88e912e 100644 --- a/runtime/src/runtime.c +++ b/runtime/src/runtime.c @@ -43,6 +43,14 @@ void mr_runtime_init() rt_ctx->id_ctx = id_manager_create(); MR_CHECK(rt_ctx->id_ctx != NULL, "RuntimeInit, ID Manager initialize failed. "); + + rt_ctx->ev_ctx = app_ev_manager_create(rt_ctx); + MR_CHECK(rt_ctx->ev_ctx != NULL, "RuntimeInit, App event initialize failed. "); + + pthread_t pid_crash_monitor; + pthread_create(&pid_crash_monitor, NULL, app_thread_crash_monitor, (void *)rt_ctx->ev_ctx); + + app_crash_cb_register(rt_ctx->ev_ctx, app_mamager_crash_event_handler, rt_ctx->app_ctx); g_ctx->ctx_runtime = rt_ctx; return; diff --git a/service/src/rxtx.c b/service/src/rxtx.c index d52df25..5a5a581 100644 --- a/service/src/rxtx.c +++ b/service/src/rxtx.c @@ -26,7 +26,7 @@ int rt_serv_device_rx_bulk(struct rtdev_desc * dev_desc, thread_id_t sid, int ret = vnode_enqueue_burst_with_hash(dev_desc->vnode_prod_rx, &dev_desc->ops, sid, (void **)mbufs_in, hash, nr_mbufs_in); - if (likely(ret < 0)) return 0; + if (unlikely(ret < 0)) return 0; for (int i = 0; i < nr_mbufs_in; i++) mbufs_in[i] = NULL; return nr_mbufs_in; } |
