summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--app/src/monit.c46
-rw-r--r--app/src/rawio.c80
-rw-r--r--include/internal/shmdev_define.h17
-rw-r--r--service/src/shmdev.c33
4 files changed, 87 insertions, 89 deletions
diff --git a/app/src/monit.c b/app/src/monit.c
index 605fd43..c1a640a 100644
--- a/app/src/monit.c
+++ b/app/src/monit.c
@@ -1,14 +1,7 @@
-/* \brief 应用状态监测数据输出
-*
-* \author Lu Qiuwen<[email protected]>
-* \date 2016-11-30
-*/
-
-#include <mrapp.h>
+#include <mrapp.h>
#include <cJSON.h>
unsigned int g_monit_interval = 1;
-
#if 0
int vdev_instance_stats_get(struct vdev_instance * vdi, struct vdev_stat_info * stat_info)
{
@@ -87,25 +80,38 @@ static cJSON * __create_uint64_delta_array(const uint64_t * value_large,
return uint64_array;
}
+#endif
-static cJSON * __create_vdev_stats(struct vdev_instance * vdi)
+static cJSON * vdev_stat_create(struct mr_vdev * vdev)
{
-#if 0
- vdev_instance_stats_get(vdi, &_stat_info);
- vdev_instance_stats_last_get(vdi, &_stat_info_last);
- vdev_instance_stat_last_save(vdi, &_stat_info);
+ struct cJSON * j_father = cJSON_CreateObject();
+ struct cJSON * j_vdev_value = cJSON_CreateObject();
+ struct cJSON * j_vdev_speed = cJSON_CreateObject();
+
+ struct shmdev_instance * sdi = vdev->sdi;
+ uint64_t pkt_rx_drop_now = rte_atomic64_read(&sdi->pkt_rx_drop);
+ uint64_t pkt_tx_drop_now = rte_atomic64_read(&sdi->pkt_tx_drop);
- unsigned int nr_rx_stream = _stat_info.nr_rxstream;
- unsigned int nr_tx_stream = _stat_info.nr_txstream;
+ cJSON_AddNumberToObject(j_vdev_value, "pkt_drop_rx_bp", pkt_rx_drop_now);
+ cJSON_AddNumberToObject(j_vdev_value, "pkt_drop_tx_bp", pkt_tx_drop_now);
+ cJSON_AddItemToObject(j_father, "accumulative", j_vdev_value);
+ cJSON_AddItemToObject(j_father, "speed", j_vdev_speed);
+ return j_father;
+}
+
+#if 0
+static cJSON * vdev_per_q_stat_create(struct mr_vdev * vdev)
+{
/* 统计节点句柄 */
struct cJSON * j_vdev_stats = cJSON_CreateObject();
/* 累计量 */
struct cJSON * j_vdev_value = cJSON_CreateObject();
/* 瞬时值 */
struct cJSON * j_vdev_speed = cJSON_CreateObject();
- /* Gauge */
- struct cJSON * j_vdev_gauge = cJSON_CreateObject();
+
+
+
#define __JOIN_VDEV_VALUE_STATS_ITEM(item, streams) do { \
cJSON_AddItemToObject(j_vdev_value, #item, \
@@ -200,11 +206,8 @@ static cJSON * __create_vdev_stats(struct vdev_instance * vdi)
cJSON_AddItemToObject(j_vdev_stats, "accumulative", j_vdev_value);
cJSON_AddItemToObject(j_vdev_stats, "speed", j_vdev_speed);
cJSON_AddItemToObject(j_vdev_stats, "gauge", j_vdev_gauge);
-
-#endif
return NULL;
}
-
#endif
// 运行时原始报文设备统计计数
@@ -220,6 +223,8 @@ static cJSON * monit_vdev(struct mr_instance * instance)
cJSON_AddStringToObject(j_vdev, "symbol", vdev->devsym);
cJSON_AddNumberToObject(j_vdev, "rxstreams", vdev->nr_rxstream);
cJSON_AddNumberToObject(j_vdev, "txstreams", vdev->nr_txstream);
+ cJSON_AddItemToObject(j_vdev, "stats", vdev_stat_create(vdev));
+
#if 0
cJSON_AddItemToObject(j_vdev, "stats", __create_vdev_stats(vdev->sdi));
#endif
@@ -229,6 +234,7 @@ static cJSON * monit_vdev(struct mr_instance * instance)
return j_vdev_array;
}
+
static cJSON * monit_app_stat(struct mr_instance * instance)
{
struct cJSON * j_root = cJSON_CreateObject();
diff --git a/app/src/rawio.c b/app/src/rawio.c
index d5a6585..e2837be 100644
--- a/app/src/rawio.c
+++ b/app/src/rawio.c
@@ -32,11 +32,6 @@ int marsio_recv_burst(struct mr_vdev * vdev, queue_id_t qid, marsio_buff_t * mbu
struct shmdev_instance * sdi = vdev->sdi;
unsigned int nr_evs = rte_event_dequeue_burst(sdi->eventdev_id, sdi->pkt_rx_ports[qid], evs, nr_max_evs, 0);
- for (unsigned int i = 0; i < nr_evs; i++)
- {
- mbufs[i] = evs[i].mbuf;
- }
-
struct mr_vdev_counters * vdev_counter = &vdev->counters[qid];
vdev_counter->rx_burst_call_counters++;
@@ -49,6 +44,18 @@ int marsio_recv_burst(struct mr_vdev * vdev, queue_id_t qid, marsio_buff_t * mbu
vdev_counter->rx_burst_non_zero_counters++;
}
+ /* fill the output mbufs, and calculate the total length */
+ unsigned int rx_total_bytes = 0;
+ for (unsigned int i = 0; i < nr_evs; i++)
+ {
+ mbufs[i] = evs[i].mbuf;
+ rx_total_bytes += rte_pktmbuf_pkt_len((struct rte_mbuf *)mbufs[i]);
+ }
+
+ struct shmdev_instance_stat_per_q_worker * q_worker_stat = &sdi->stat_per_q_worker[qid];
+ q_worker_stat->pkt_count_rx_total += nr_evs;
+ q_worker_stat->pkt_bytes_rx_total += rx_total_bytes;
+
return (int)nr_evs;
}
@@ -69,13 +76,13 @@ int marsio_recv_all_burst(struct mr_instance * instance, queue_id_t qid, marsio_
void marsio_send_buffer_flush(struct mr_vdev * vdev, queue_id_t sid)
{
struct shmdev_instance * sdi = vdev->sdi;
- rte_event_maintain(sdi->eventdev_id, sdi->pkt_tx_ports[sid], RTE_EVENT_DEV_MAINT_OP_FLUSH);
+ rte_event_maintain(sdi->eventdev_id, sdi->pkt_tx_ports[sid], 0);
}
-int marsio_send_burst(struct mr_sendpath * sendpath, queue_id_t sid, marsio_buff_t * mbufs[], int nr_mbufs)
+int marsio_send_burst(struct mr_sendpath * sendpath, queue_id_t qid, marsio_buff_t * mbufs[], int nr_mbufs)
{
struct rte_mbuf ** _mbufs = (struct rte_mbuf **)mbufs;
- assert(sid < sendpath->sdi->nr_txstream);
+ assert(qid < sendpath->sdi->nr_txstream);
assert(nr_mbufs <= MR_BURST_MAX);
struct rte_event evs[MR_BURST_MAX];
@@ -86,7 +93,7 @@ int marsio_send_burst(struct mr_sendpath * sendpath, queue_id_t sid, marsio_buff
ev->sched_type = RTE_SCHED_TYPE_ATOMIC;
ev->queue_id = EVT_QID_WORKER_TO_IO;
ev->priority = RTE_EVENT_DEV_PRIORITY_NORMAL;
- ev->flow_id = _mbufs[i]->hash.usr % 512;
+ ev->flow_id = _mbufs[i]->hash.usr % 1024;
ev->mbuf = _mbufs[i];
}
@@ -94,15 +101,20 @@ int marsio_send_burst(struct mr_sendpath * sendpath, queue_id_t sid, marsio_buff
assert(vdev != NULL);
struct shmdev_instance * sdi = vdev->sdi;
- int nr_enq = rte_event_enqueue_burst(sdi->eventdev_id, sdi->pkt_tx_ports[sid], evs, nr_mbufs);
+ struct shmdev_instance_stat_per_q_worker * q_worker_stat = &sdi->stat_per_q_worker[qid];
+
+ q_worker_stat->pkt_count_rx_total += nr_mbufs;
+ q_worker_stat->pkt_bytes_rx_total += packet_total_len(_mbufs, nr_mbufs);
+
+ int nr_enq = rte_event_enqueue_burst(sdi->eventdev_id, sdi->pkt_tx_ports[qid], evs, nr_mbufs);
if (unlikely(nr_enq >= 0 && nr_enq < nr_mbufs))
{
- /* no space, drop the packets */
+ rte_atomic64_add(&sdi->pkt_tx_drop, nr_mbufs - nr_enq);
rte_pktmbuf_free_bulk(&_mbufs[nr_enq], nr_mbufs - nr_enq);
}
else if (unlikely(nr_enq == -ENOSPC))
{
- /* no space, drop all enqueued packets */
+ rte_atomic64_add(&sdi->pkt_tx_drop, nr_mbufs);
rte_pktmbuf_free_bulk(_mbufs, nr_mbufs);
}
else if (unlikely(nr_enq < 0))
@@ -137,46 +149,7 @@ int marsio_send_burst_with_options(struct mr_sendpath * sendpath, queue_id_t sid
marsio_buff_do_rehash(sendpath->instance, mbufs[i]);
}
- /* 线程运行情况统计 */
- if (dp_thread_ctx.instance != NULL)
- {
- thread_id_t tid = dp_thread_ctx.thread_id;
- dp_thread_ctx.instance->stat[tid].packet_send_count += nr_mbufs;
- dp_thread_ctx.instance->stat[tid].packet_send_length = packet_total_len(_mbufs, nr_mbufs);
- }
-
- struct rte_event evs[MR_BURST_MAX];
- for (unsigned int i = 0; i < nr_mbufs; i++)
- {
- struct rte_event * ev = &evs[i];
- ev->op = RTE_EVENT_OP_NEW;
- ev->sched_type = RTE_SCHED_TYPE_ATOMIC;
- ev->queue_id = EVT_QID_WORKER_TO_IO;
- ev->priority = RTE_EVENT_DEV_PRIORITY_NORMAL;
- ev->mbuf = mbufs[i];
- }
-
- struct mr_vdev * vdev = sendpath->vdev;
- assert(vdev != NULL);
-
- struct shmdev_instance * sdi = vdev->sdi;
- int nr_enq = rte_event_enqueue_burst(sdi->eventdev_id, sdi->pkt_tx_ports[sid], evs, nr_mbufs);
- if (unlikely(nr_enq >= 0 && nr_enq < nr_mbufs))
- {
- /* no space, drop the packets */
- rte_pktmbuf_free_bulk(&_mbufs[nr_enq], nr_mbufs - nr_enq);
- }
- else if (unlikely(nr_enq == -ENOSPC))
- {
- /* no space, drop all enqueued packets */
- rte_pktmbuf_free_bulk(_mbufs, nr_mbufs);
- }
- else if (unlikely(nr_enq < 0))
- {
- rte_panic("Panic at enqueue eventdev, eventdev_id = %u, ret = %d", sdi->eventdev_id, nr_enq);
- }
-
- return RT_SUCCESS;
+ return marsio_send_burst(sendpath, sid, mbufs, nr_mbufs);
}
int marsio_poll_register_eventfd(struct mr_instance * instance, int eventfd, unsigned int tid)
@@ -200,8 +173,7 @@ static void clear_all_zero_recv_counters(struct mr_vdev * vdevs[], unsigned int
int marsio_poll_wait(struct mr_instance * instance, struct mr_vdev * vdevs[], unsigned int nr_vdevs, unsigned int tid,
int timeout)
{
-
- eventfd_t _not_use_eventfd_value;
+ __attribute__((unused)) eventfd_t _not_use_eventfd_value;
/* flush the vdev */
for (unsigned int i = 0; i < nr_vdevs; i++)
diff --git a/include/internal/shmdev_define.h b/include/internal/shmdev_define.h
index f08bb14..69a3d2b 100644
--- a/include/internal/shmdev_define.h
+++ b/include/internal/shmdev_define.h
@@ -40,8 +40,18 @@ struct shmdev_dev
/* qid maps */
unsigned int io_qid_to_port_id_map[MR_SID_MAX];
+
+ /* SDI */
+ struct shmdev_instance * sdi;
};
+struct shmdev_instance_stat_per_q_worker
+{
+ uint64_t pkt_count_rx_total;
+ uint64_t pkt_count_tx_total;
+ uint64_t pkt_bytes_rx_total;
+ uint64_t pkt_bytes_tx_total;
+} __rte_cache_aligned;
struct shmdev_instance
{
@@ -61,4 +71,11 @@ struct shmdev_instance
unsigned int nr_rxstream;
unsigned int nr_txstream;
+
+ /* stat */
+ rte_atomic64_t pkt_rx_drop;
+ rte_atomic64_t pkt_tx_drop;
+
+ /* stat per worker q */
+ struct shmdev_instance_stat_per_q_worker stat_per_q_worker[MR_SID_MAX];
}; \ No newline at end of file
diff --git a/service/src/shmdev.c b/service/src/shmdev.c
index 79dceda..c9b7e1e 100644
--- a/service/src/shmdev.c
+++ b/service/src/shmdev.c
@@ -52,7 +52,7 @@ void shmdev_config_load(const char * devsym, struct shmdev_config * cfg_out)
unsigned int default_sz_tun_tx = default_sz_tunnel;
unsigned int default_sz_tun_rx_shared = 0;
- /* override configration */
+ /* override configuration */
MESA_load_profile_uint_def(cfgfile, "device", "sz_rx_tunnel", &default_sz_tun_rx_exclusive,
default_sz_tun_rx_exclusive);
MESA_load_profile_uint_def(cfgfile, "device", "sz_tx_tunnel", &default_sz_tun_tx, default_sz_tun_tx);
@@ -179,13 +179,13 @@ struct shmdev_instance * shmdev_open(struct shmdev_dev * shm_dev_desc,
return NULL;
}
- struct shmdev_instance * worker_instance = ZMALLOC(sizeof(struct shmdev_instance));
- MR_VERIFY_MALLOC(worker_instance);
+ struct shmdev_instance * sdi = ZMALLOC(sizeof(struct shmdev_instance));
+ MR_VERIFY_MALLOC(sdi);
- worker_instance->shm_dev_desc = shm_dev_desc;
- worker_instance->nr_rxstream = nr_worker_rx_thread;
- worker_instance->nr_txstream = nr_worker_tx_thread;
- worker_instance->eventdev_id = shm_dev_desc->eventdev_id;
+ sdi->shm_dev_desc = shm_dev_desc;
+ sdi->nr_rxstream = nr_worker_rx_thread;
+ sdi->nr_txstream = nr_worker_tx_thread;
+ sdi->eventdev_id = shm_dev_desc->eventdev_id;
unsigned int nr_io_ports = RTE_MAX(shm_dev_desc->nr_rx_thread, shm_dev_desc->nr_tx_thread);
unsigned int nr_worker_ports = RTE_MAX(nr_worker_rx_thread, nr_worker_tx_thread);
@@ -313,8 +313,8 @@ struct shmdev_instance * shmdev_open(struct shmdev_dev * shm_dev_desc,
goto errout;
}
- worker_instance->pkt_rx_ports[qid_for_works] = port_id_iter;
- worker_instance->pkt_tx_ports[qid_for_works] = port_id_iter;
+ sdi->pkt_rx_ports[qid_for_works] = port_id_iter;
+ sdi->pkt_tx_ports[qid_for_works] = port_id_iter;
qid_for_works++;
}
@@ -332,10 +332,11 @@ struct shmdev_instance * shmdev_open(struct shmdev_dev * shm_dev_desc,
/* notify other modules, this shmdev is in use */
rte_atomic_thread_fence(__ATOMIC_RELEASE);
shm_dev_desc->in_use = 1;
- return worker_instance;
+ shm_dev_desc->sdi = sdi;
+ return sdi;
errout:
- FREE(worker_instance);
+ FREE(sdi);
return NULL;
}
@@ -357,23 +358,25 @@ void shmdev_packet_dispatch(struct shmdev_dev * shmdev_dev, queue_id_t qid, stru
ev->sched_type = RTE_SCHED_TYPE_ATOMIC;
ev->queue_id = EVT_QID_IO_TO_WORKER;
ev->priority = RTE_EVENT_DEV_PRIORITY_NORMAL;
- ev->flow_id = mbufs[i]->hash.usr % 512;
+ ev->flow_id = mbufs[i]->hash.usr % 1024;
ev->mbuf = mbufs[i];
}
+ struct shmdev_instance * sdi = shmdev_dev->sdi;
+ assert(sdi != NULL);
+
int nr_enq =
rte_event_enqueue_burst(shmdev_dev->eventdev_id, shmdev_dev->io_qid_to_port_id_map[qid], evs, nr_mbufs);
if (unlikely(nr_enq >= 0 && nr_enq < nr_mbufs))
{
- /* no space, drop the packets */
+ rte_atomic64_add(&sdi->pkt_rx_drop, nr_mbufs - nr_enq);
rte_pktmbuf_free_bulk(&mbufs[nr_enq], nr_mbufs - nr_enq);
}
else if (unlikely(nr_enq == -ENOSPC))
{
- /* backpressure */
+ rte_atomic64_add(&sdi->pkt_rx_drop, nr_mbufs);
rte_pktmbuf_free_bulk(mbufs, nr_mbufs);
- return;
}
else if (unlikely(nr_enq < 0))
{