diff options
| author | zy <[email protected]> | 2023-09-27 05:27:22 +0000 |
|---|---|---|
| committer | zy <[email protected]> | 2023-09-27 05:27:22 +0000 |
| commit | 5be5a860802db12721e49f4d433c9fb123ef6b6e (patch) | |
| tree | f38b0cb9d9b119d1b830997beb6d93b514e02ab8 | |
| parent | 8948a7cf79b11a1013385e19d0eec4998e365fe9 (diff) | |
Revert "move the recv_burst, send_burst API from mr_instance structure to queue related structure."
This reverts commit cec8c8740585e92ddd226ec753bbdd953a02b0ac.
| -rw-r--r-- | .gitignore | 4 | ||||
| -rw-r--r-- | bindings/marsio/src/lib.rs | 145 | ||||
| -rw-r--r-- | bindings/marsio/src/sys.rs | 45 | ||||
| -rw-r--r-- | bindings/marsio/tests/integration_test.rs | 106 |
4 files changed, 70 insertions, 230 deletions
@@ -1,2 +1,2 @@ -Cargo.lock -target
\ No newline at end of file +/target +/Cargo.lock diff --git a/bindings/marsio/src/lib.rs b/bindings/marsio/src/lib.rs index c695a8c..c4de824 100644 --- a/bindings/marsio/src/lib.rs +++ b/bindings/marsio/src/lib.rs @@ -1,4 +1,3 @@ -use crate::sys::{mr_sendpath_t, mr_vdev_t}; use anyhow::Result; use anyhow::{anyhow, Context}; use arrayvec::ArrayVec; @@ -6,8 +5,6 @@ use libc::cpu_set_t; use libc::{c_char, c_int}; use std::ptr::null_mut; use std::ptr::NonNull; -use std::sync::Arc; -use std::sync::Mutex; mod sys; @@ -26,7 +23,7 @@ pub struct MarsioBuff<'instance> { pub struct CoreID(pub usize); impl<'instance> MarsioBuff<'instance> { - pub fn buffer(&self) -> &[u8] { + fn buffer(&self) -> &[u8] { unsafe { let buff_ptr = sys::marsio_buff_mtod(self.marsio_buff.as_ptr()); let buff_len = sys::marsio_buff_buflen(self.marsio_buff.as_ptr()) as usize; @@ -34,39 +31,13 @@ impl<'instance> MarsioBuff<'instance> { } } - pub fn buffer_mut(&mut self) -> &mut [u8] { + fn buffer_mut(&self) -> &mut [u8] { unsafe { let buff_ptr = sys::marsio_buff_mtod(self.marsio_buff.as_ptr()); let buff_len = sys::marsio_buff_buflen(self.marsio_buff.as_ptr()) as usize; return std::slice::from_raw_parts_mut(buff_ptr as *mut u8, buff_len); } } - - pub fn append(&mut self, data: &[u8]) -> Result<()> { - let buff_ptr = NonNull::new(unsafe { - sys::marsio_buff_append(self.marsio_buff.as_ptr(), data.len() as u16) - }) - .context(format!("Failed to append mbuf {:?}", self.marsio_buff))?; - - unsafe { - std::ptr::copy_nonoverlapping(data.as_ptr(), buff_ptr.as_ptr() as *mut u8, data.len()); - } - - Ok(()) - } - - pub fn prepend(&mut self, data: &[u8]) -> Result<()> { - let buff_ptr = NonNull::new(unsafe { - sys::marsio_buff_prepend(self.marsio_buff.as_ptr(), data.len() as u16) - }) - .context(format!("Failed to prepend mbuf {:?}", self.marsio_buff))?; - - unsafe { - std::ptr::copy_nonoverlapping(data.as_ptr(), buff_ptr.as_ptr() as *mut u8, data.len()); - } - - Ok(()) - } } impl<'instance> Drop for MarsioBuff<'instance> { @@ -83,92 +54,18 @@ impl<'instance> Drop for MarsioBuff<'instance> { } } -struct MarsioDeviceQueueCounter { - rx_queue_max: u32, - rx_queue_use: u32, - tx_queue_max: u32, - tx_queue_use: u32, -} - pub struct MarsioDevice { - /* raw ptr handles */ device_handle: NonNull<sys::mr_vdev_t>, - send_path_handle: NonNull<sys::mr_sendpath_t>, - - /* counter */ - queue_counters: Arc<Mutex<MarsioDeviceQueueCounter>>, + sendpath_handle: NonNull<sys::mr_sendpath_t>, } -unsafe impl Sync for MarsioDevice {} -unsafe impl Send for MarsioDevice {} - impl MarsioDevice { - unsafe fn device_handle_raw_ptr_get(&self) -> *mut mr_vdev_t { - self.device_handle.as_ptr() - } - - unsafe fn send_path_handle_raw_ptr_get(&self) -> *mut mr_sendpath_t { - self.send_path_handle.as_ptr() - } - - pub fn alloc_rx_queue(&self) -> Result<MarsioDeviceRxQueue> { - let mut q_counter = self.queue_counters.lock().unwrap(); - if q_counter.rx_queue_use >= q_counter.rx_queue_max { - return Err(anyhow!( - "Failed to alloc rx queue, rx_queue_use: {}, rx_queue_max: {}", - q_counter.rx_queue_use, - q_counter.rx_queue_max - )); - } - - let rx_queue_id = q_counter.rx_queue_use; - q_counter.rx_queue_use += 1; - - Ok(MarsioDeviceRxQueue { - device: self, - qid: rx_queue_id, - }) - } - - pub fn alloc_tx_queue(&self) -> Result<MarsioDeviceTxQueue> { - let mut q_counter = self.queue_counters.lock().unwrap(); - - if q_counter.rx_queue_use >= q_counter.tx_queue_max { - return Err(anyhow!( - "Failed to alloc tx queue, tx_queue_use: {}, tx_queue_max: {}", - q_counter.tx_queue_use, - q_counter.tx_queue_max - )); - }; - - let tx_queue_id = q_counter.tx_queue_use; - q_counter.tx_queue_use += 1; - - Ok(MarsioDeviceTxQueue { - device: self, - qid: tx_queue_id, - }) - } -} - -pub struct MarsioDeviceRxQueue<'device> { - device: &'device MarsioDevice, - qid: u32, -} - -pub struct MarsioDeviceTxQueue<'device> { - device: &'device MarsioDevice, - qid: u32, -} - -impl<'device> MarsioDeviceRxQueue<'device> { - pub fn recv_burst(&mut self, recv_buf: &mut RecvBuf) { + pub fn recv_burst(&self, recv_buf: &mut RecvBuf, qid: u32) { let mut recv_buf_raw_ptrs = ArrayVec::<_, DEFAULT_RX_BURST>::new(); unsafe { - let device_handle_ptr = self.device.device_handle_raw_ptr_get(); let recv_count = sys::marsio_recv_burst( - device_handle_ptr, - self.qid, + self.device_handle.as_ptr(), + qid, recv_buf_raw_ptrs.as_ptr(), recv_buf_raw_ptrs.capacity() as c_int, ); @@ -188,19 +85,17 @@ impl<'device> MarsioDeviceRxQueue<'device> { *recv_buf = recv_buf_new; } -} -impl<'device> MarsioDeviceTxQueue<'device> { - pub fn send_burst(&mut self, send_buf: &mut SendBuf) { + + pub fn send_burst(&self, send_buf: &mut SendBuf, qid: u32) { let send_buf_raw_ptrs: ArrayVec<_, DEFAULT_TX_BURST> = send_buf .iter() .map(|marsio_buff| marsio_buff.marsio_buff.as_ptr()) .collect(); unsafe { - let send_path_raw_ptr = self.device.send_path_handle_raw_ptr_get(); sys::marsio_send_burst( - send_path_raw_ptr, - self.qid, + self.sendpath_handle.as_ptr(), + qid, send_buf_raw_ptrs.as_ptr(), send_buf_raw_ptrs.len() as c_int, ) @@ -215,9 +110,6 @@ pub struct MarsioInstance { mr_instance: NonNull<sys::mr_instance_t>, } -unsafe impl Send for MarsioInstance {} -unsafe impl Sync for MarsioInstance {} - impl MarsioInstance { pub fn new() -> Result<MarsioInstance> { let mr_instance_ptr = NonNull::new(unsafe { sys::marsio_create() }) @@ -322,18 +214,11 @@ impl MarsioInstance { Ok(MarsioDevice { device_handle: _device_handle, - send_path_handle: _send_path_handle, - - queue_counters: Arc::new(Mutex::new(MarsioDeviceQueueCounter { - rx_queue_max: nr_rx_queues as u32, - rx_queue_use: 0, - tx_queue_max: nr_tx_queues as u32, - tx_queue_use: 0, - })), + sendpath_handle: _send_path_handle, }) } - pub fn buf_alloc(&self) -> Option<MarsioBuff> { + pub fn buf_alloc(&mut self) -> Option<MarsioBuff> { let mut out_buf = [std::ptr::null_mut(); 1]; unsafe { sys::marsio_buff_alloc_v2( @@ -349,16 +234,12 @@ impl MarsioInstance { _marker: std::marker::PhantomData {}, }) } - - pub fn thread_context_init(&self) { - unsafe { sys::marsio_thread_init(self.mr_instance.as_ptr()) }; - } } impl Drop for MarsioDevice { fn drop(&mut self) { unsafe { - sys::marsio_sendpath_destory(self.send_path_handle.as_ptr()); + sys::marsio_sendpath_destory(self.sendpath_handle.as_ptr()); sys::marsio_close_device(self.device_handle.as_ptr()); }; } diff --git a/bindings/marsio/src/sys.rs b/bindings/marsio/src/sys.rs index d2abf09..849b959 100644 --- a/bindings/marsio/src/sys.rs +++ b/bindings/marsio/src/sys.rs @@ -1,6 +1,6 @@ use libc::{c_char, c_int, c_void, size_t};
-#[allow(non_camel_case_types, dead_code)]
+#[allow(non_camel_case_types)]
#[repr(C)]
pub enum marsio_opt_type_t {
MARSIO_OPT_THREAD_NUM,
@@ -9,7 +9,7 @@ pub enum marsio_opt_type_t { MARSIO_OPT_THREAD_MASK_IN_CPUSET,
}
-#[allow(non_camel_case_types, dead_code)]
+#[allow(non_camel_case_types)]
#[repr(C)]
pub enum marsio_opt_send_t {
MARSIO_SEND_OPT_NO_FREE = 1 << 0,
@@ -23,7 +23,7 @@ pub enum marsio_opt_send_t { MARSIO_SEND_OPT_CTRL = 1 << 4,
}
-#[allow(non_camel_case_types, dead_code)]
+#[allow(non_camel_case_types)]
#[repr(C)]
enum mr_sendpath_type {
/* 去往特定虚设备的发包路径 */
@@ -36,7 +36,7 @@ enum mr_sendpath_type { MR_SENDPATH_MAX,
}
-#[allow(non_camel_case_types, dead_code)]
+#[allow(non_camel_case_types)]
#[repr(C)]
enum mr_sendpath_option {
/* 构建四层报文头 */
@@ -51,7 +51,7 @@ enum mr_sendpath_option { MR_SENDPATH_OPT_HOOK_POSTBUILD = 51,
}
-#[allow(non_camel_case_types, dead_code)]
+#[allow(non_camel_case_types)]
#[repr(C)]
enum mr_clone_options {
/* 拷贝区域 */
@@ -60,7 +60,7 @@ enum mr_clone_options { MR_BUFF_CLONE_CTRLZONE = 1 << 2,
}
-#[allow(non_camel_case_types, dead_code)]
+#[allow(non_camel_case_types)]
#[repr(C)]
enum mr_thread_affinity_mode {
/* 禁用线程亲和性设置 */
@@ -71,7 +71,14 @@ enum mr_thread_affinity_mode { MR_THREAD_AFFINITY_USER = 255,
}
-#[allow(non_camel_case_types, dead_code)]
+#[allow(non_camel_case_types)]
+#[repr(C)]
+enum mr_timestamp_type {
+ /* 从网卡收取时或报文缓冲区申请时的时间戳 */
+ MR_TIMESTAMP_RX_OR_ALLOC = 0,
+}
+
+#[allow(non_camel_case_types)]
#[repr(C)]
enum mr_buff_metadata_type {
/* Rehash Index */
@@ -99,17 +106,18 @@ pub type mr_vdev_t = c_void; #[allow(non_camel_case_types)]
pub type mr_sendpath_t = c_void;
-#[allow(non_camel_case_types, dead_code)]
+#[allow(non_camel_case_types)]
+pub type cpu_mask_t = u64;
+#[allow(non_camel_case_types)]
pub type port_id_t = u32;
-#[allow(non_camel_case_types, dead_code)]
+#[allow(non_camel_case_types)]
pub type queue_id_t = u32;
-#[allow(non_camel_case_types, dead_code)]
+#[allow(non_camel_case_types)]
pub type thread_id_t = u32;
#[link(name = "marsio")]
extern "C" {
/* options and init */
- #[allow(dead_code)]
pub fn marsio_option_get(
mr_instance: *mut mr_instance_t,
opt_type: c_int,
@@ -135,7 +143,6 @@ extern "C" { nr_txstream: c_int,
) -> *mut mr_vdev_t;
pub fn marsio_close_device(vdev: *mut mr_vdev_t);
- #[allow(dead_code)]
pub fn marsio_device_lookup(
mr_instance: *const mr_instance_t,
devsym: *const c_char,
@@ -152,13 +159,18 @@ extern "C" { mbufs: *const *mut marsio_buff_t,
nr_mbufs: c_int,
) -> c_int;
+ pub fn marsio_recv_all_burst(
+ mr_instance: *mut mr_instance_t,
+ qid: queue_id_t,
+ mbufs: *const *mut marsio_buff_t,
+ nr_mbufs: c_int,
+ ) -> c_int;
pub fn marsio_send_burst(
sendpath: *mut mr_sendpath_t,
qid: queue_id_t,
mbufs: *const *mut marsio_buff_t,
nr_mbufs: c_int,
) -> c_int;
- #[allow(dead_code)]
pub fn marsio_send_burst_with_options(
sendpath: *mut mr_sendpath_t,
sid: queue_id_t,
@@ -178,11 +190,4 @@ extern "C" { buffs: *mut marsio_buff_t,
nr_buffs: c_int,
) -> c_int;
-
- pub fn marsio_buff_prepend(m: *mut marsio_buff_t, len: u16) -> *mut c_char;
- pub fn marsio_buff_append(m: *mut marsio_buff_t, len: u16) -> *mut c_char;
- #[allow(dead_code)]
- pub fn marsio_buff_adj(m: *mut marsio_buff_t, len: u16) -> *mut c_char;
- #[allow(dead_code)]
- pub fn marsio_buff_trim(m: *mut marsio_buff_t, len: u16) -> c_int;
}
diff --git a/bindings/marsio/tests/integration_test.rs b/bindings/marsio/tests/integration_test.rs index 0a40d16..ada89f0 100644 --- a/bindings/marsio/tests/integration_test.rs +++ b/bindings/marsio/tests/integration_test.rs @@ -4,7 +4,8 @@ use static_init::dynamic; use std::process::Command;
struct TestFixture {
- mr_service_child: std::process::Child,
+ _mrzcpd_child: std::process::Child,
+ _mrzcpd_cfgfile: String,
}
impl TestFixture {
@@ -15,28 +16,29 @@ impl TestFixture { std::fs::write("/tmp/mrzcpd_startup.conf", tmp_config_template).unwrap();
/* run the program as backgroup */
- let child = Command::new("/opt/tsg/mrzcpd/bin/mrzcpd")
+ let mut child = Command::new("/opt/tsg/mrzcpd/bin/mrzcpd")
.arg("-c")
.arg("/tmp/mrzcpd_startup.conf")
.spawn()
.unwrap();
- println!("Fixture startup: config file = {}", tmp_config_template);
-
TestFixture {
- mr_service_child: child,
+ _mrzcpd_child: child,
+ _mrzcpd_cfgfile: tmp_config_template.to_string(),
}
}
}
impl Drop for TestFixture {
fn drop(&mut self) {
- self.mr_service_child.kill().unwrap();
+ self._mrzcpd_child.kill().unwrap();
}
}
+/*
#[dynamic(drop)]
static mut TEST_FIXTURE_OBJECT: TestFixture = TestFixture::new();
+*/
#[test]
fn test_marsio_instance_create_and_destroy() {
@@ -57,13 +59,11 @@ fn test_marsio_one_thread() { mr_instance.init("integration_test").unwrap();
/* open the test device with one rx/tx queue */
- let device = mr_instance.open_device("test", 1, 1).unwrap();
- let mut device_rx_queue = device.alloc_rx_queue().unwrap();
- let mut device_tx_queue = device.alloc_tx_queue().unwrap();
+ let mut device = mr_instance.open_device("test", 1, 1).unwrap();
/* try to recv the mbuf */
let mut mbuf_array = ArrayVec::<MarsioBuff, 32>::new();
- device_rx_queue.recv_burst(&mut mbuf_array);
+ device.recv_burst(&mut mbuf_array, 0);
/* for now, should be empty */
assert_eq!(mbuf_array.len(), 0);
@@ -72,81 +72,35 @@ fn test_marsio_one_thread() { let mut send_buf = ArrayVec::<MarsioBuff, 32>::new();
let mut send_mbuf = mr_instance.buf_alloc().unwrap();
- /* generate some random data */
- let packet_with_random_data = [0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06];
- send_mbuf.append(&packet_with_random_data).unwrap();
-
/* set the mbuf data */
send_buf.push(send_mbuf);
- device_tx_queue.send_burst(&mut send_buf);
+ device.send_burst(&mut send_buf, 0);
}
#[test]
-fn test_marsio_multiple_rx_thread() {
- let mut instance = MarsioInstance::new().unwrap();
- instance.set_cpu_affinity(&[CoreID(1), CoreID(2)]).unwrap();
- instance.init("integration_test").unwrap();
+fn test_marsio_multiple_thread() {
+ let mut mr_instance = MarsioInstance::new().unwrap();
+
+ /* Bind the thread to core 1,2 */
+ let core_ids = [CoreID(1), CoreID(2)];
+ mr_instance.set_cpu_affinity(&core_ids).unwrap();
+ mr_instance.init("integration_test").unwrap();
/* open the test device */
- let device = instance.open_device("test", 2, 2).unwrap();
- for _tid in 0..1 {
- std::thread::scope(|s| {
- let ref_instance = &instance;
- let mut rx_queue_object = device.alloc_rx_queue().unwrap();
-
- s.spawn(move || {
- /* init the thread context */
- ref_instance.thread_context_init();
-
- /* prepare the mbufs */
- let mut recv_bufs = ArrayVec::<MarsioBuff, 32>::new();
- rx_queue_object.recv_burst(&mut recv_bufs);
-
- /* for now, should be empty */
- assert_eq!(recv_bufs.len(), 0);
- });
+ let mut device = mr_instance.open_device("test", 2, 2).unwrap();
+
+ let mut thread_join_handles = Vec::new();
+ for tid in 0..1 {
+ let thread_join_handle = std::thread::spawn(|| {
+ let mut send_buf = ArrayVec::<MarsioBuff, 32>::new();
+ let mut send_mbuf = mr_instance.buf_alloc().unwrap();
+ send_buf.push(send_mbuf);
});
+ thread_join_handles.push(thread_join_handle);
}
-}
-#[test]
-fn test_marsio_multiple_tx_thread() {
- let mut instance = MarsioInstance::new().unwrap();
- instance.set_cpu_affinity(&[CoreID(1), CoreID(2)]).unwrap();
- instance.init("integration_test").unwrap();
-
- /* open the test device */
- let device = instance.open_device("test", 2, 2).unwrap();
- for _tid in 0..1 {
- std::thread::scope(|s| {
- let mut tx_queue_object = device.alloc_tx_queue().unwrap();
- let ref_instance = &instance;
-
- s.spawn(move || {
- /* init the thread_ctx */
- ref_instance.thread_context_init();
-
- /* prepare the mbufs */
- let mut send_mbuf = ref_instance.buf_alloc().unwrap();
- let mut send_buf = ArrayVec::<MarsioBuff, 32>::new();
-
- /* Try to fill the send mbuf */
- let send_mbuf_buffer = send_mbuf.buffer_mut();
- send_mbuf_buffer[0..6].copy_from_slice(&[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF]);
- send_mbuf_buffer[6..12].copy_from_slice(&[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF]);
- send_mbuf_buffer[12..14].copy_from_slice(&[0x08, 0x06]);
- send_mbuf_buffer[14..16].copy_from_slice(&[0x00, 0x01]);
- send_mbuf_buffer[16..18].copy_from_slice(&[0x08, 0x00]);
- send_mbuf_buffer[18..20].copy_from_slice(&[0x06, 0x04]);
- send_mbuf_buffer[20..22].copy_from_slice(&[0x00, 0x01]);
- send_mbuf_buffer[22..28].copy_from_slice(&[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF]);
- send_mbuf_buffer[28..32].copy_from_slice(&[0x10, 0x00, 0x00, 0x02]);
- send_mbuf_buffer[32..38].copy_from_slice(&[0x00, 0x00, 0x00, 0x00, 0x00, 0x00]);
- send_mbuf_buffer[38..42].copy_from_slice(&[0x10, 0x00, 0x00, 0x03]);
-
- send_buf.push(send_mbuf);
- tx_queue_object.send_burst(&mut send_buf);
- });
- })
+ for join_handle in thread_join_handles {
+ let join_result = join_handle.join().unwrap();
+ println!("join_handle_result = {:?}", join_result);
}
}
|
