diff options
| -rw-r--r-- | app/src/monit.c | 46 | ||||
| -rw-r--r-- | app/src/rawio.c | 80 | ||||
| -rw-r--r-- | include/internal/shmdev_define.h | 17 | ||||
| -rw-r--r-- | service/src/shmdev.c | 33 |
4 files changed, 87 insertions, 89 deletions
diff --git a/app/src/monit.c b/app/src/monit.c index 605fd43..c1a640a 100644 --- a/app/src/monit.c +++ b/app/src/monit.c @@ -1,14 +1,7 @@ -/* \brief 应用状态监测数据输出 -* -* \author Lu Qiuwen<[email protected]> -* \date 2016-11-30 -*/ - -#include <mrapp.h> +#include <mrapp.h> #include <cJSON.h> unsigned int g_monit_interval = 1; - #if 0 int vdev_instance_stats_get(struct vdev_instance * vdi, struct vdev_stat_info * stat_info) { @@ -87,25 +80,38 @@ static cJSON * __create_uint64_delta_array(const uint64_t * value_large, return uint64_array; } +#endif -static cJSON * __create_vdev_stats(struct vdev_instance * vdi) +static cJSON * vdev_stat_create(struct mr_vdev * vdev) { -#if 0 - vdev_instance_stats_get(vdi, &_stat_info); - vdev_instance_stats_last_get(vdi, &_stat_info_last); - vdev_instance_stat_last_save(vdi, &_stat_info); + struct cJSON * j_father = cJSON_CreateObject(); + struct cJSON * j_vdev_value = cJSON_CreateObject(); + struct cJSON * j_vdev_speed = cJSON_CreateObject(); + + struct shmdev_instance * sdi = vdev->sdi; + uint64_t pkt_rx_drop_now = rte_atomic64_read(&sdi->pkt_rx_drop); + uint64_t pkt_tx_drop_now = rte_atomic64_read(&sdi->pkt_tx_drop); - unsigned int nr_rx_stream = _stat_info.nr_rxstream; - unsigned int nr_tx_stream = _stat_info.nr_txstream; + cJSON_AddNumberToObject(j_vdev_value, "pkt_drop_rx_bp", pkt_rx_drop_now); + cJSON_AddNumberToObject(j_vdev_value, "pkt_drop_tx_bp", pkt_tx_drop_now); + cJSON_AddItemToObject(j_father, "accumulative", j_vdev_value); + cJSON_AddItemToObject(j_father, "speed", j_vdev_speed); + return j_father; +} + +#if 0 +static cJSON * vdev_per_q_stat_create(struct mr_vdev * vdev) +{ /* 统计节点句柄 */ struct cJSON * j_vdev_stats = cJSON_CreateObject(); /* 累计量 */ struct cJSON * j_vdev_value = cJSON_CreateObject(); /* 瞬时值 */ struct cJSON * j_vdev_speed = cJSON_CreateObject(); - /* Gauge */ - struct cJSON * j_vdev_gauge = cJSON_CreateObject(); + + + #define __JOIN_VDEV_VALUE_STATS_ITEM(item, streams) do { \ cJSON_AddItemToObject(j_vdev_value, #item, \ @@ -200,11 +206,8 @@ static cJSON * __create_vdev_stats(struct vdev_instance * vdi) cJSON_AddItemToObject(j_vdev_stats, "accumulative", j_vdev_value); cJSON_AddItemToObject(j_vdev_stats, "speed", j_vdev_speed); cJSON_AddItemToObject(j_vdev_stats, "gauge", j_vdev_gauge); - -#endif return NULL; } - #endif // 运行时原始报文设备统计计数 @@ -220,6 +223,8 @@ static cJSON * monit_vdev(struct mr_instance * instance) cJSON_AddStringToObject(j_vdev, "symbol", vdev->devsym); cJSON_AddNumberToObject(j_vdev, "rxstreams", vdev->nr_rxstream); cJSON_AddNumberToObject(j_vdev, "txstreams", vdev->nr_txstream); + cJSON_AddItemToObject(j_vdev, "stats", vdev_stat_create(vdev)); + #if 0 cJSON_AddItemToObject(j_vdev, "stats", __create_vdev_stats(vdev->sdi)); #endif @@ -229,6 +234,7 @@ static cJSON * monit_vdev(struct mr_instance * instance) return j_vdev_array; } + static cJSON * monit_app_stat(struct mr_instance * instance) { struct cJSON * j_root = cJSON_CreateObject(); diff --git a/app/src/rawio.c b/app/src/rawio.c index d5a6585..e2837be 100644 --- a/app/src/rawio.c +++ b/app/src/rawio.c @@ -32,11 +32,6 @@ int marsio_recv_burst(struct mr_vdev * vdev, queue_id_t qid, marsio_buff_t * mbu struct shmdev_instance * sdi = vdev->sdi; unsigned int nr_evs = rte_event_dequeue_burst(sdi->eventdev_id, sdi->pkt_rx_ports[qid], evs, nr_max_evs, 0); - for (unsigned int i = 0; i < nr_evs; i++) - { - mbufs[i] = evs[i].mbuf; - } - struct mr_vdev_counters * vdev_counter = &vdev->counters[qid]; vdev_counter->rx_burst_call_counters++; @@ -49,6 +44,18 @@ int marsio_recv_burst(struct mr_vdev * vdev, queue_id_t qid, marsio_buff_t * mbu vdev_counter->rx_burst_non_zero_counters++; } + /* fill the output mbufs, and calculate the total length */ + unsigned int rx_total_bytes = 0; + for (unsigned int i = 0; i < nr_evs; i++) + { + mbufs[i] = evs[i].mbuf; + rx_total_bytes += rte_pktmbuf_pkt_len((struct rte_mbuf *)mbufs[i]); + } + + struct shmdev_instance_stat_per_q_worker * q_worker_stat = &sdi->stat_per_q_worker[qid]; + q_worker_stat->pkt_count_rx_total += nr_evs; + q_worker_stat->pkt_bytes_rx_total += rx_total_bytes; + return (int)nr_evs; } @@ -69,13 +76,13 @@ int marsio_recv_all_burst(struct mr_instance * instance, queue_id_t qid, marsio_ void marsio_send_buffer_flush(struct mr_vdev * vdev, queue_id_t sid) { struct shmdev_instance * sdi = vdev->sdi; - rte_event_maintain(sdi->eventdev_id, sdi->pkt_tx_ports[sid], RTE_EVENT_DEV_MAINT_OP_FLUSH); + rte_event_maintain(sdi->eventdev_id, sdi->pkt_tx_ports[sid], 0); } -int marsio_send_burst(struct mr_sendpath * sendpath, queue_id_t sid, marsio_buff_t * mbufs[], int nr_mbufs) +int marsio_send_burst(struct mr_sendpath * sendpath, queue_id_t qid, marsio_buff_t * mbufs[], int nr_mbufs) { struct rte_mbuf ** _mbufs = (struct rte_mbuf **)mbufs; - assert(sid < sendpath->sdi->nr_txstream); + assert(qid < sendpath->sdi->nr_txstream); assert(nr_mbufs <= MR_BURST_MAX); struct rte_event evs[MR_BURST_MAX]; @@ -86,7 +93,7 @@ int marsio_send_burst(struct mr_sendpath * sendpath, queue_id_t sid, marsio_buff ev->sched_type = RTE_SCHED_TYPE_ATOMIC; ev->queue_id = EVT_QID_WORKER_TO_IO; ev->priority = RTE_EVENT_DEV_PRIORITY_NORMAL; - ev->flow_id = _mbufs[i]->hash.usr % 512; + ev->flow_id = _mbufs[i]->hash.usr % 1024; ev->mbuf = _mbufs[i]; } @@ -94,15 +101,20 @@ int marsio_send_burst(struct mr_sendpath * sendpath, queue_id_t sid, marsio_buff assert(vdev != NULL); struct shmdev_instance * sdi = vdev->sdi; - int nr_enq = rte_event_enqueue_burst(sdi->eventdev_id, sdi->pkt_tx_ports[sid], evs, nr_mbufs); + struct shmdev_instance_stat_per_q_worker * q_worker_stat = &sdi->stat_per_q_worker[qid]; + + q_worker_stat->pkt_count_rx_total += nr_mbufs; + q_worker_stat->pkt_bytes_rx_total += packet_total_len(_mbufs, nr_mbufs); + + int nr_enq = rte_event_enqueue_burst(sdi->eventdev_id, sdi->pkt_tx_ports[qid], evs, nr_mbufs); if (unlikely(nr_enq >= 0 && nr_enq < nr_mbufs)) { - /* no space, drop the packets */ + rte_atomic64_add(&sdi->pkt_tx_drop, nr_mbufs - nr_enq); rte_pktmbuf_free_bulk(&_mbufs[nr_enq], nr_mbufs - nr_enq); } else if (unlikely(nr_enq == -ENOSPC)) { - /* no space, drop all enqueued packets */ + rte_atomic64_add(&sdi->pkt_tx_drop, nr_mbufs); rte_pktmbuf_free_bulk(_mbufs, nr_mbufs); } else if (unlikely(nr_enq < 0)) @@ -137,46 +149,7 @@ int marsio_send_burst_with_options(struct mr_sendpath * sendpath, queue_id_t sid marsio_buff_do_rehash(sendpath->instance, mbufs[i]); } - /* 线程运行情况统计 */ - if (dp_thread_ctx.instance != NULL) - { - thread_id_t tid = dp_thread_ctx.thread_id; - dp_thread_ctx.instance->stat[tid].packet_send_count += nr_mbufs; - dp_thread_ctx.instance->stat[tid].packet_send_length = packet_total_len(_mbufs, nr_mbufs); - } - - struct rte_event evs[MR_BURST_MAX]; - for (unsigned int i = 0; i < nr_mbufs; i++) - { - struct rte_event * ev = &evs[i]; - ev->op = RTE_EVENT_OP_NEW; - ev->sched_type = RTE_SCHED_TYPE_ATOMIC; - ev->queue_id = EVT_QID_WORKER_TO_IO; - ev->priority = RTE_EVENT_DEV_PRIORITY_NORMAL; - ev->mbuf = mbufs[i]; - } - - struct mr_vdev * vdev = sendpath->vdev; - assert(vdev != NULL); - - struct shmdev_instance * sdi = vdev->sdi; - int nr_enq = rte_event_enqueue_burst(sdi->eventdev_id, sdi->pkt_tx_ports[sid], evs, nr_mbufs); - if (unlikely(nr_enq >= 0 && nr_enq < nr_mbufs)) - { - /* no space, drop the packets */ - rte_pktmbuf_free_bulk(&_mbufs[nr_enq], nr_mbufs - nr_enq); - } - else if (unlikely(nr_enq == -ENOSPC)) - { - /* no space, drop all enqueued packets */ - rte_pktmbuf_free_bulk(_mbufs, nr_mbufs); - } - else if (unlikely(nr_enq < 0)) - { - rte_panic("Panic at enqueue eventdev, eventdev_id = %u, ret = %d", sdi->eventdev_id, nr_enq); - } - - return RT_SUCCESS; + return marsio_send_burst(sendpath, sid, mbufs, nr_mbufs); } int marsio_poll_register_eventfd(struct mr_instance * instance, int eventfd, unsigned int tid) @@ -200,8 +173,7 @@ static void clear_all_zero_recv_counters(struct mr_vdev * vdevs[], unsigned int int marsio_poll_wait(struct mr_instance * instance, struct mr_vdev * vdevs[], unsigned int nr_vdevs, unsigned int tid, int timeout) { - - eventfd_t _not_use_eventfd_value; + __attribute__((unused)) eventfd_t _not_use_eventfd_value; /* flush the vdev */ for (unsigned int i = 0; i < nr_vdevs; i++) diff --git a/include/internal/shmdev_define.h b/include/internal/shmdev_define.h index f08bb14..69a3d2b 100644 --- a/include/internal/shmdev_define.h +++ b/include/internal/shmdev_define.h @@ -40,8 +40,18 @@ struct shmdev_dev /* qid maps */ unsigned int io_qid_to_port_id_map[MR_SID_MAX]; + + /* SDI */ + struct shmdev_instance * sdi; }; +struct shmdev_instance_stat_per_q_worker +{ + uint64_t pkt_count_rx_total; + uint64_t pkt_count_tx_total; + uint64_t pkt_bytes_rx_total; + uint64_t pkt_bytes_tx_total; +} __rte_cache_aligned; struct shmdev_instance { @@ -61,4 +71,11 @@ struct shmdev_instance unsigned int nr_rxstream; unsigned int nr_txstream; + + /* stat */ + rte_atomic64_t pkt_rx_drop; + rte_atomic64_t pkt_tx_drop; + + /* stat per worker q */ + struct shmdev_instance_stat_per_q_worker stat_per_q_worker[MR_SID_MAX]; };
\ No newline at end of file diff --git a/service/src/shmdev.c b/service/src/shmdev.c index 79dceda..c9b7e1e 100644 --- a/service/src/shmdev.c +++ b/service/src/shmdev.c @@ -52,7 +52,7 @@ void shmdev_config_load(const char * devsym, struct shmdev_config * cfg_out) unsigned int default_sz_tun_tx = default_sz_tunnel; unsigned int default_sz_tun_rx_shared = 0; - /* override configration */ + /* override configuration */ MESA_load_profile_uint_def(cfgfile, "device", "sz_rx_tunnel", &default_sz_tun_rx_exclusive, default_sz_tun_rx_exclusive); MESA_load_profile_uint_def(cfgfile, "device", "sz_tx_tunnel", &default_sz_tun_tx, default_sz_tun_tx); @@ -179,13 +179,13 @@ struct shmdev_instance * shmdev_open(struct shmdev_dev * shm_dev_desc, return NULL; } - struct shmdev_instance * worker_instance = ZMALLOC(sizeof(struct shmdev_instance)); - MR_VERIFY_MALLOC(worker_instance); + struct shmdev_instance * sdi = ZMALLOC(sizeof(struct shmdev_instance)); + MR_VERIFY_MALLOC(sdi); - worker_instance->shm_dev_desc = shm_dev_desc; - worker_instance->nr_rxstream = nr_worker_rx_thread; - worker_instance->nr_txstream = nr_worker_tx_thread; - worker_instance->eventdev_id = shm_dev_desc->eventdev_id; + sdi->shm_dev_desc = shm_dev_desc; + sdi->nr_rxstream = nr_worker_rx_thread; + sdi->nr_txstream = nr_worker_tx_thread; + sdi->eventdev_id = shm_dev_desc->eventdev_id; unsigned int nr_io_ports = RTE_MAX(shm_dev_desc->nr_rx_thread, shm_dev_desc->nr_tx_thread); unsigned int nr_worker_ports = RTE_MAX(nr_worker_rx_thread, nr_worker_tx_thread); @@ -313,8 +313,8 @@ struct shmdev_instance * shmdev_open(struct shmdev_dev * shm_dev_desc, goto errout; } - worker_instance->pkt_rx_ports[qid_for_works] = port_id_iter; - worker_instance->pkt_tx_ports[qid_for_works] = port_id_iter; + sdi->pkt_rx_ports[qid_for_works] = port_id_iter; + sdi->pkt_tx_ports[qid_for_works] = port_id_iter; qid_for_works++; } @@ -332,10 +332,11 @@ struct shmdev_instance * shmdev_open(struct shmdev_dev * shm_dev_desc, /* notify other modules, this shmdev is in use */ rte_atomic_thread_fence(__ATOMIC_RELEASE); shm_dev_desc->in_use = 1; - return worker_instance; + shm_dev_desc->sdi = sdi; + return sdi; errout: - FREE(worker_instance); + FREE(sdi); return NULL; } @@ -357,23 +358,25 @@ void shmdev_packet_dispatch(struct shmdev_dev * shmdev_dev, queue_id_t qid, stru ev->sched_type = RTE_SCHED_TYPE_ATOMIC; ev->queue_id = EVT_QID_IO_TO_WORKER; ev->priority = RTE_EVENT_DEV_PRIORITY_NORMAL; - ev->flow_id = mbufs[i]->hash.usr % 512; + ev->flow_id = mbufs[i]->hash.usr % 1024; ev->mbuf = mbufs[i]; } + struct shmdev_instance * sdi = shmdev_dev->sdi; + assert(sdi != NULL); + int nr_enq = rte_event_enqueue_burst(shmdev_dev->eventdev_id, shmdev_dev->io_qid_to_port_id_map[qid], evs, nr_mbufs); if (unlikely(nr_enq >= 0 && nr_enq < nr_mbufs)) { - /* no space, drop the packets */ + rte_atomic64_add(&sdi->pkt_rx_drop, nr_mbufs - nr_enq); rte_pktmbuf_free_bulk(&mbufs[nr_enq], nr_mbufs - nr_enq); } else if (unlikely(nr_enq == -ENOSPC)) { - /* backpressure */ + rte_atomic64_add(&sdi->pkt_rx_drop, nr_mbufs); rte_pktmbuf_free_bulk(mbufs, nr_mbufs); - return; } else if (unlikely(nr_enq < 0)) { |
