summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzy <[email protected]>2023-09-27 05:27:22 +0000
committerzy <[email protected]>2023-09-27 05:27:22 +0000
commit5be5a860802db12721e49f4d433c9fb123ef6b6e (patch)
treef38b0cb9d9b119d1b830997beb6d93b514e02ab8
parent8948a7cf79b11a1013385e19d0eec4998e365fe9 (diff)
Revert "move the recv_burst, send_burst API from mr_instance structure to queue related structure."
This reverts commit cec8c8740585e92ddd226ec753bbdd953a02b0ac.
-rw-r--r--.gitignore4
-rw-r--r--bindings/marsio/src/lib.rs145
-rw-r--r--bindings/marsio/src/sys.rs45
-rw-r--r--bindings/marsio/tests/integration_test.rs106
4 files changed, 70 insertions, 230 deletions
diff --git a/.gitignore b/.gitignore
index cb117fd..4fffb2f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,2 @@
-Cargo.lock
-target \ No newline at end of file
+/target
+/Cargo.lock
diff --git a/bindings/marsio/src/lib.rs b/bindings/marsio/src/lib.rs
index c695a8c..c4de824 100644
--- a/bindings/marsio/src/lib.rs
+++ b/bindings/marsio/src/lib.rs
@@ -1,4 +1,3 @@
-use crate::sys::{mr_sendpath_t, mr_vdev_t};
use anyhow::Result;
use anyhow::{anyhow, Context};
use arrayvec::ArrayVec;
@@ -6,8 +5,6 @@ use libc::cpu_set_t;
use libc::{c_char, c_int};
use std::ptr::null_mut;
use std::ptr::NonNull;
-use std::sync::Arc;
-use std::sync::Mutex;
mod sys;
@@ -26,7 +23,7 @@ pub struct MarsioBuff<'instance> {
pub struct CoreID(pub usize);
impl<'instance> MarsioBuff<'instance> {
- pub fn buffer(&self) -> &[u8] {
+ fn buffer(&self) -> &[u8] {
unsafe {
let buff_ptr = sys::marsio_buff_mtod(self.marsio_buff.as_ptr());
let buff_len = sys::marsio_buff_buflen(self.marsio_buff.as_ptr()) as usize;
@@ -34,39 +31,13 @@ impl<'instance> MarsioBuff<'instance> {
}
}
- pub fn buffer_mut(&mut self) -> &mut [u8] {
+ fn buffer_mut(&self) -> &mut [u8] {
unsafe {
let buff_ptr = sys::marsio_buff_mtod(self.marsio_buff.as_ptr());
let buff_len = sys::marsio_buff_buflen(self.marsio_buff.as_ptr()) as usize;
return std::slice::from_raw_parts_mut(buff_ptr as *mut u8, buff_len);
}
}
-
- pub fn append(&mut self, data: &[u8]) -> Result<()> {
- let buff_ptr = NonNull::new(unsafe {
- sys::marsio_buff_append(self.marsio_buff.as_ptr(), data.len() as u16)
- })
- .context(format!("Failed to append mbuf {:?}", self.marsio_buff))?;
-
- unsafe {
- std::ptr::copy_nonoverlapping(data.as_ptr(), buff_ptr.as_ptr() as *mut u8, data.len());
- }
-
- Ok(())
- }
-
- pub fn prepend(&mut self, data: &[u8]) -> Result<()> {
- let buff_ptr = NonNull::new(unsafe {
- sys::marsio_buff_prepend(self.marsio_buff.as_ptr(), data.len() as u16)
- })
- .context(format!("Failed to prepend mbuf {:?}", self.marsio_buff))?;
-
- unsafe {
- std::ptr::copy_nonoverlapping(data.as_ptr(), buff_ptr.as_ptr() as *mut u8, data.len());
- }
-
- Ok(())
- }
}
impl<'instance> Drop for MarsioBuff<'instance> {
@@ -83,92 +54,18 @@ impl<'instance> Drop for MarsioBuff<'instance> {
}
}
-struct MarsioDeviceQueueCounter {
- rx_queue_max: u32,
- rx_queue_use: u32,
- tx_queue_max: u32,
- tx_queue_use: u32,
-}
-
pub struct MarsioDevice {
- /* raw ptr handles */
device_handle: NonNull<sys::mr_vdev_t>,
- send_path_handle: NonNull<sys::mr_sendpath_t>,
-
- /* counter */
- queue_counters: Arc<Mutex<MarsioDeviceQueueCounter>>,
+ sendpath_handle: NonNull<sys::mr_sendpath_t>,
}
-unsafe impl Sync for MarsioDevice {}
-unsafe impl Send for MarsioDevice {}
-
impl MarsioDevice {
- unsafe fn device_handle_raw_ptr_get(&self) -> *mut mr_vdev_t {
- self.device_handle.as_ptr()
- }
-
- unsafe fn send_path_handle_raw_ptr_get(&self) -> *mut mr_sendpath_t {
- self.send_path_handle.as_ptr()
- }
-
- pub fn alloc_rx_queue(&self) -> Result<MarsioDeviceRxQueue> {
- let mut q_counter = self.queue_counters.lock().unwrap();
- if q_counter.rx_queue_use >= q_counter.rx_queue_max {
- return Err(anyhow!(
- "Failed to alloc rx queue, rx_queue_use: {}, rx_queue_max: {}",
- q_counter.rx_queue_use,
- q_counter.rx_queue_max
- ));
- }
-
- let rx_queue_id = q_counter.rx_queue_use;
- q_counter.rx_queue_use += 1;
-
- Ok(MarsioDeviceRxQueue {
- device: self,
- qid: rx_queue_id,
- })
- }
-
- pub fn alloc_tx_queue(&self) -> Result<MarsioDeviceTxQueue> {
- let mut q_counter = self.queue_counters.lock().unwrap();
-
- if q_counter.rx_queue_use >= q_counter.tx_queue_max {
- return Err(anyhow!(
- "Failed to alloc tx queue, tx_queue_use: {}, tx_queue_max: {}",
- q_counter.tx_queue_use,
- q_counter.tx_queue_max
- ));
- };
-
- let tx_queue_id = q_counter.tx_queue_use;
- q_counter.tx_queue_use += 1;
-
- Ok(MarsioDeviceTxQueue {
- device: self,
- qid: tx_queue_id,
- })
- }
-}
-
-pub struct MarsioDeviceRxQueue<'device> {
- device: &'device MarsioDevice,
- qid: u32,
-}
-
-pub struct MarsioDeviceTxQueue<'device> {
- device: &'device MarsioDevice,
- qid: u32,
-}
-
-impl<'device> MarsioDeviceRxQueue<'device> {
- pub fn recv_burst(&mut self, recv_buf: &mut RecvBuf) {
+ pub fn recv_burst(&self, recv_buf: &mut RecvBuf, qid: u32) {
let mut recv_buf_raw_ptrs = ArrayVec::<_, DEFAULT_RX_BURST>::new();
unsafe {
- let device_handle_ptr = self.device.device_handle_raw_ptr_get();
let recv_count = sys::marsio_recv_burst(
- device_handle_ptr,
- self.qid,
+ self.device_handle.as_ptr(),
+ qid,
recv_buf_raw_ptrs.as_ptr(),
recv_buf_raw_ptrs.capacity() as c_int,
);
@@ -188,19 +85,17 @@ impl<'device> MarsioDeviceRxQueue<'device> {
*recv_buf = recv_buf_new;
}
-}
-impl<'device> MarsioDeviceTxQueue<'device> {
- pub fn send_burst(&mut self, send_buf: &mut SendBuf) {
+
+ pub fn send_burst(&self, send_buf: &mut SendBuf, qid: u32) {
let send_buf_raw_ptrs: ArrayVec<_, DEFAULT_TX_BURST> = send_buf
.iter()
.map(|marsio_buff| marsio_buff.marsio_buff.as_ptr())
.collect();
unsafe {
- let send_path_raw_ptr = self.device.send_path_handle_raw_ptr_get();
sys::marsio_send_burst(
- send_path_raw_ptr,
- self.qid,
+ self.sendpath_handle.as_ptr(),
+ qid,
send_buf_raw_ptrs.as_ptr(),
send_buf_raw_ptrs.len() as c_int,
)
@@ -215,9 +110,6 @@ pub struct MarsioInstance {
mr_instance: NonNull<sys::mr_instance_t>,
}
-unsafe impl Send for MarsioInstance {}
-unsafe impl Sync for MarsioInstance {}
-
impl MarsioInstance {
pub fn new() -> Result<MarsioInstance> {
let mr_instance_ptr = NonNull::new(unsafe { sys::marsio_create() })
@@ -322,18 +214,11 @@ impl MarsioInstance {
Ok(MarsioDevice {
device_handle: _device_handle,
- send_path_handle: _send_path_handle,
-
- queue_counters: Arc::new(Mutex::new(MarsioDeviceQueueCounter {
- rx_queue_max: nr_rx_queues as u32,
- rx_queue_use: 0,
- tx_queue_max: nr_tx_queues as u32,
- tx_queue_use: 0,
- })),
+ sendpath_handle: _send_path_handle,
})
}
- pub fn buf_alloc(&self) -> Option<MarsioBuff> {
+ pub fn buf_alloc(&mut self) -> Option<MarsioBuff> {
let mut out_buf = [std::ptr::null_mut(); 1];
unsafe {
sys::marsio_buff_alloc_v2(
@@ -349,16 +234,12 @@ impl MarsioInstance {
_marker: std::marker::PhantomData {},
})
}
-
- pub fn thread_context_init(&self) {
- unsafe { sys::marsio_thread_init(self.mr_instance.as_ptr()) };
- }
}
impl Drop for MarsioDevice {
fn drop(&mut self) {
unsafe {
- sys::marsio_sendpath_destory(self.send_path_handle.as_ptr());
+ sys::marsio_sendpath_destory(self.sendpath_handle.as_ptr());
sys::marsio_close_device(self.device_handle.as_ptr());
};
}
diff --git a/bindings/marsio/src/sys.rs b/bindings/marsio/src/sys.rs
index d2abf09..849b959 100644
--- a/bindings/marsio/src/sys.rs
+++ b/bindings/marsio/src/sys.rs
@@ -1,6 +1,6 @@
use libc::{c_char, c_int, c_void, size_t};
-#[allow(non_camel_case_types, dead_code)]
+#[allow(non_camel_case_types)]
#[repr(C)]
pub enum marsio_opt_type_t {
MARSIO_OPT_THREAD_NUM,
@@ -9,7 +9,7 @@ pub enum marsio_opt_type_t {
MARSIO_OPT_THREAD_MASK_IN_CPUSET,
}
-#[allow(non_camel_case_types, dead_code)]
+#[allow(non_camel_case_types)]
#[repr(C)]
pub enum marsio_opt_send_t {
MARSIO_SEND_OPT_NO_FREE = 1 << 0,
@@ -23,7 +23,7 @@ pub enum marsio_opt_send_t {
MARSIO_SEND_OPT_CTRL = 1 << 4,
}
-#[allow(non_camel_case_types, dead_code)]
+#[allow(non_camel_case_types)]
#[repr(C)]
enum mr_sendpath_type {
/* 去往特定虚设备的发包路径 */
@@ -36,7 +36,7 @@ enum mr_sendpath_type {
MR_SENDPATH_MAX,
}
-#[allow(non_camel_case_types, dead_code)]
+#[allow(non_camel_case_types)]
#[repr(C)]
enum mr_sendpath_option {
/* 构建四层报文头 */
@@ -51,7 +51,7 @@ enum mr_sendpath_option {
MR_SENDPATH_OPT_HOOK_POSTBUILD = 51,
}
-#[allow(non_camel_case_types, dead_code)]
+#[allow(non_camel_case_types)]
#[repr(C)]
enum mr_clone_options {
/* 拷贝区域 */
@@ -60,7 +60,7 @@ enum mr_clone_options {
MR_BUFF_CLONE_CTRLZONE = 1 << 2,
}
-#[allow(non_camel_case_types, dead_code)]
+#[allow(non_camel_case_types)]
#[repr(C)]
enum mr_thread_affinity_mode {
/* 禁用线程亲和性设置 */
@@ -71,7 +71,14 @@ enum mr_thread_affinity_mode {
MR_THREAD_AFFINITY_USER = 255,
}
-#[allow(non_camel_case_types, dead_code)]
+#[allow(non_camel_case_types)]
+#[repr(C)]
+enum mr_timestamp_type {
+ /* 从网卡收取时或报文缓冲区申请时的时间戳 */
+ MR_TIMESTAMP_RX_OR_ALLOC = 0,
+}
+
+#[allow(non_camel_case_types)]
#[repr(C)]
enum mr_buff_metadata_type {
/* Rehash Index */
@@ -99,17 +106,18 @@ pub type mr_vdev_t = c_void;
#[allow(non_camel_case_types)]
pub type mr_sendpath_t = c_void;
-#[allow(non_camel_case_types, dead_code)]
+#[allow(non_camel_case_types)]
+pub type cpu_mask_t = u64;
+#[allow(non_camel_case_types)]
pub type port_id_t = u32;
-#[allow(non_camel_case_types, dead_code)]
+#[allow(non_camel_case_types)]
pub type queue_id_t = u32;
-#[allow(non_camel_case_types, dead_code)]
+#[allow(non_camel_case_types)]
pub type thread_id_t = u32;
#[link(name = "marsio")]
extern "C" {
/* options and init */
- #[allow(dead_code)]
pub fn marsio_option_get(
mr_instance: *mut mr_instance_t,
opt_type: c_int,
@@ -135,7 +143,6 @@ extern "C" {
nr_txstream: c_int,
) -> *mut mr_vdev_t;
pub fn marsio_close_device(vdev: *mut mr_vdev_t);
- #[allow(dead_code)]
pub fn marsio_device_lookup(
mr_instance: *const mr_instance_t,
devsym: *const c_char,
@@ -152,13 +159,18 @@ extern "C" {
mbufs: *const *mut marsio_buff_t,
nr_mbufs: c_int,
) -> c_int;
+ pub fn marsio_recv_all_burst(
+ mr_instance: *mut mr_instance_t,
+ qid: queue_id_t,
+ mbufs: *const *mut marsio_buff_t,
+ nr_mbufs: c_int,
+ ) -> c_int;
pub fn marsio_send_burst(
sendpath: *mut mr_sendpath_t,
qid: queue_id_t,
mbufs: *const *mut marsio_buff_t,
nr_mbufs: c_int,
) -> c_int;
- #[allow(dead_code)]
pub fn marsio_send_burst_with_options(
sendpath: *mut mr_sendpath_t,
sid: queue_id_t,
@@ -178,11 +190,4 @@ extern "C" {
buffs: *mut marsio_buff_t,
nr_buffs: c_int,
) -> c_int;
-
- pub fn marsio_buff_prepend(m: *mut marsio_buff_t, len: u16) -> *mut c_char;
- pub fn marsio_buff_append(m: *mut marsio_buff_t, len: u16) -> *mut c_char;
- #[allow(dead_code)]
- pub fn marsio_buff_adj(m: *mut marsio_buff_t, len: u16) -> *mut c_char;
- #[allow(dead_code)]
- pub fn marsio_buff_trim(m: *mut marsio_buff_t, len: u16) -> c_int;
}
diff --git a/bindings/marsio/tests/integration_test.rs b/bindings/marsio/tests/integration_test.rs
index 0a40d16..ada89f0 100644
--- a/bindings/marsio/tests/integration_test.rs
+++ b/bindings/marsio/tests/integration_test.rs
@@ -4,7 +4,8 @@ use static_init::dynamic;
use std::process::Command;
struct TestFixture {
- mr_service_child: std::process::Child,
+ _mrzcpd_child: std::process::Child,
+ _mrzcpd_cfgfile: String,
}
impl TestFixture {
@@ -15,28 +16,29 @@ impl TestFixture {
std::fs::write("/tmp/mrzcpd_startup.conf", tmp_config_template).unwrap();
/* run the program as backgroup */
- let child = Command::new("/opt/tsg/mrzcpd/bin/mrzcpd")
+ let mut child = Command::new("/opt/tsg/mrzcpd/bin/mrzcpd")
.arg("-c")
.arg("/tmp/mrzcpd_startup.conf")
.spawn()
.unwrap();
- println!("Fixture startup: config file = {}", tmp_config_template);
-
TestFixture {
- mr_service_child: child,
+ _mrzcpd_child: child,
+ _mrzcpd_cfgfile: tmp_config_template.to_string(),
}
}
}
impl Drop for TestFixture {
fn drop(&mut self) {
- self.mr_service_child.kill().unwrap();
+ self._mrzcpd_child.kill().unwrap();
}
}
+/*
#[dynamic(drop)]
static mut TEST_FIXTURE_OBJECT: TestFixture = TestFixture::new();
+*/
#[test]
fn test_marsio_instance_create_and_destroy() {
@@ -57,13 +59,11 @@ fn test_marsio_one_thread() {
mr_instance.init("integration_test").unwrap();
/* open the test device with one rx/tx queue */
- let device = mr_instance.open_device("test", 1, 1).unwrap();
- let mut device_rx_queue = device.alloc_rx_queue().unwrap();
- let mut device_tx_queue = device.alloc_tx_queue().unwrap();
+ let mut device = mr_instance.open_device("test", 1, 1).unwrap();
/* try to recv the mbuf */
let mut mbuf_array = ArrayVec::<MarsioBuff, 32>::new();
- device_rx_queue.recv_burst(&mut mbuf_array);
+ device.recv_burst(&mut mbuf_array, 0);
/* for now, should be empty */
assert_eq!(mbuf_array.len(), 0);
@@ -72,81 +72,35 @@ fn test_marsio_one_thread() {
let mut send_buf = ArrayVec::<MarsioBuff, 32>::new();
let mut send_mbuf = mr_instance.buf_alloc().unwrap();
- /* generate some random data */
- let packet_with_random_data = [0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06];
- send_mbuf.append(&packet_with_random_data).unwrap();
-
/* set the mbuf data */
send_buf.push(send_mbuf);
- device_tx_queue.send_burst(&mut send_buf);
+ device.send_burst(&mut send_buf, 0);
}
#[test]
-fn test_marsio_multiple_rx_thread() {
- let mut instance = MarsioInstance::new().unwrap();
- instance.set_cpu_affinity(&[CoreID(1), CoreID(2)]).unwrap();
- instance.init("integration_test").unwrap();
+fn test_marsio_multiple_thread() {
+ let mut mr_instance = MarsioInstance::new().unwrap();
+
+ /* Bind the thread to core 1,2 */
+ let core_ids = [CoreID(1), CoreID(2)];
+ mr_instance.set_cpu_affinity(&core_ids).unwrap();
+ mr_instance.init("integration_test").unwrap();
/* open the test device */
- let device = instance.open_device("test", 2, 2).unwrap();
- for _tid in 0..1 {
- std::thread::scope(|s| {
- let ref_instance = &instance;
- let mut rx_queue_object = device.alloc_rx_queue().unwrap();
-
- s.spawn(move || {
- /* init the thread context */
- ref_instance.thread_context_init();
-
- /* prepare the mbufs */
- let mut recv_bufs = ArrayVec::<MarsioBuff, 32>::new();
- rx_queue_object.recv_burst(&mut recv_bufs);
-
- /* for now, should be empty */
- assert_eq!(recv_bufs.len(), 0);
- });
+ let mut device = mr_instance.open_device("test", 2, 2).unwrap();
+
+ let mut thread_join_handles = Vec::new();
+ for tid in 0..1 {
+ let thread_join_handle = std::thread::spawn(|| {
+ let mut send_buf = ArrayVec::<MarsioBuff, 32>::new();
+ let mut send_mbuf = mr_instance.buf_alloc().unwrap();
+ send_buf.push(send_mbuf);
});
+ thread_join_handles.push(thread_join_handle);
}
-}
-#[test]
-fn test_marsio_multiple_tx_thread() {
- let mut instance = MarsioInstance::new().unwrap();
- instance.set_cpu_affinity(&[CoreID(1), CoreID(2)]).unwrap();
- instance.init("integration_test").unwrap();
-
- /* open the test device */
- let device = instance.open_device("test", 2, 2).unwrap();
- for _tid in 0..1 {
- std::thread::scope(|s| {
- let mut tx_queue_object = device.alloc_tx_queue().unwrap();
- let ref_instance = &instance;
-
- s.spawn(move || {
- /* init the thread_ctx */
- ref_instance.thread_context_init();
-
- /* prepare the mbufs */
- let mut send_mbuf = ref_instance.buf_alloc().unwrap();
- let mut send_buf = ArrayVec::<MarsioBuff, 32>::new();
-
- /* Try to fill the send mbuf */
- let send_mbuf_buffer = send_mbuf.buffer_mut();
- send_mbuf_buffer[0..6].copy_from_slice(&[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF]);
- send_mbuf_buffer[6..12].copy_from_slice(&[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF]);
- send_mbuf_buffer[12..14].copy_from_slice(&[0x08, 0x06]);
- send_mbuf_buffer[14..16].copy_from_slice(&[0x00, 0x01]);
- send_mbuf_buffer[16..18].copy_from_slice(&[0x08, 0x00]);
- send_mbuf_buffer[18..20].copy_from_slice(&[0x06, 0x04]);
- send_mbuf_buffer[20..22].copy_from_slice(&[0x00, 0x01]);
- send_mbuf_buffer[22..28].copy_from_slice(&[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF]);
- send_mbuf_buffer[28..32].copy_from_slice(&[0x10, 0x00, 0x00, 0x02]);
- send_mbuf_buffer[32..38].copy_from_slice(&[0x00, 0x00, 0x00, 0x00, 0x00, 0x00]);
- send_mbuf_buffer[38..42].copy_from_slice(&[0x10, 0x00, 0x00, 0x03]);
-
- send_buf.push(send_mbuf);
- tx_queue_object.send_burst(&mut send_buf);
- });
- })
+ for join_handle in thread_join_handles {
+ let join_result = join_handle.join().unwrap();
+ println!("join_handle_result = {:?}", join_result);
}
}