From 5902eb8a6f4bd9ec65da4407efaaedfdaf5493f2 Mon Sep 17 00:00:00 2001 From: "ihc童鞋@提不起劲" Date: Thu, 18 May 2023 20:49:24 +0800 Subject: fix: accept new thread pool impl (#167) --- monoio/src/blocking.rs | 14 +++++------ monoio/src/builder.rs | 66 ++++++++++++++++++++++++++------------------------ monoio/src/lib.rs | 2 +- 3 files changed, 41 insertions(+), 41 deletions(-) diff --git a/monoio/src/blocking.rs b/monoio/src/blocking.rs index 3be4716..ecfaedf 100644 --- a/monoio/src/blocking.rs +++ b/monoio/src/blocking.rs @@ -1,6 +1,6 @@ //! Blocking tasks related. -use std::{future::Future, sync::Arc, task::Poll}; +use std::{future::Future, task::Poll}; use threadpool::{Builder as ThreadPoolBuilder, ThreadPool as ThreadPoolImpl}; @@ -121,6 +121,7 @@ where /// DefaultThreadPool is a simple wrapped `threadpool::ThreadPool` that implememt /// `monoio::blocking::ThreadPool`. You may use this implementation, or you can use your own thread /// pool implementation. +#[derive(Clone)] pub struct DefaultThreadPool { pool: ThreadPoolImpl, } @@ -153,9 +154,8 @@ impl crate::task::Schedule for NoopScheduler { } } -#[derive(Clone)] pub(crate) enum BlockingHandle { - Attached(Arc), + Attached(Box), Empty(BlockingStrategy), } @@ -188,8 +188,6 @@ where #[cfg(test)] mod tests { - use std::sync::Arc; - use super::DefaultThreadPool; /// NaiveThreadPool always create a new thread on executing tasks. @@ -212,7 +210,7 @@ mod tests { #[test] fn hello_blocking() { - let shared_pool = Arc::new(NaiveThreadPool); + let shared_pool = Box::new(NaiveThreadPool); let mut rt = crate::RuntimeBuilder::::new() .attach_thread_pool(shared_pool) .enable_timer() @@ -272,7 +270,7 @@ mod tests { #[test] fn drop_task() { - let shared_pool = Arc::new(FakeThreadPool); + let shared_pool = Box::new(FakeThreadPool); let mut rt = crate::RuntimeBuilder::::new() .attach_thread_pool(shared_pool) .enable_timer() @@ -286,7 +284,7 @@ mod tests { #[test] fn default_pool() { - let shared_pool = Arc::new(DefaultThreadPool::new(3)); + let shared_pool = Box::new(DefaultThreadPool::new(3)); let mut rt = crate::RuntimeBuilder::::new() .attach_thread_pool(shared_pool) .enable_timer() diff --git a/monoio/src/builder.rs b/monoio/src/builder.rs index e123fd1..0ca43f1 100644 --- a/monoio/src/builder.rs +++ b/monoio/src/builder.rs @@ -60,7 +60,7 @@ impl RuntimeBuilder { /// Buildable trait. pub trait Buildable: Sized { /// Build the runtime. - fn build(this: &RuntimeBuilder) -> io::Result>; + fn build(this: RuntimeBuilder) -> io::Result>; } #[allow(unused)] @@ -68,7 +68,7 @@ macro_rules! direct_build { ($ty: ty) => { impl RuntimeBuilder<$ty> { /// Build the runtime. - pub fn build(&self) -> io::Result> { + pub fn build(self) -> io::Result> { Buildable::build(self) } } @@ -88,10 +88,10 @@ direct_build!(TimeDriver); #[cfg(all(unix, feature = "legacy"))] impl Buildable for LegacyDriver { - fn build(this: &RuntimeBuilder) -> io::Result> { + fn build(this: RuntimeBuilder) -> io::Result> { let thread_id = gen_id(); #[cfg(feature = "sync")] - let blocking_handle = this.blocking_handle.clone(); + let blocking_handle = this.blocking_handle; BUILD_THREAD_ID.set(&thread_id, || { let driver = match this.entries { @@ -109,10 +109,10 @@ impl Buildable for LegacyDriver { #[cfg(all(target_os = "linux", feature = "iouring"))] impl Buildable for IoUringDriver { - fn build(this: &RuntimeBuilder) -> io::Result> { + fn build(this: RuntimeBuilder) -> io::Result> { let thread_id = gen_id(); #[cfg(feature = "sync")] - let blocking_handle = this.blocking_handle.clone(); + let blocking_handle = this.blocking_handle; BUILD_THREAD_ID.set(&thread_id, || { let driver = match this.entries { @@ -150,8 +150,8 @@ impl RuntimeBuilder { #[cfg(all(target_os = "linux", feature = "iouring"))] #[must_use] - pub fn uring_builder(mut self, urb: &io_uring::Builder) -> Self { - self.urb = urb.clone(); + pub fn uring_builder(mut self, urb: io_uring::Builder) -> Self { + self.urb = urb; self } } @@ -166,13 +166,13 @@ pub struct FusionDriver; impl RuntimeBuilder { /// Build the runtime. #[cfg(all(target_os = "linux", feature = "iouring", feature = "legacy"))] - pub fn build(&self) -> io::Result> { + pub fn build(self) -> io::Result> { if crate::utils::detect_uring() { let builder = RuntimeBuilder:: { entries: self.entries, - urb: self.urb.clone(), + urb: self.urb, #[cfg(feature = "sync")] - blocking_handle: self.blocking_handle.clone(), + blocking_handle: self.blocking_handle, _mark: PhantomData, }; info!("io_uring driver built"); @@ -180,9 +180,9 @@ impl RuntimeBuilder { } else { let builder = RuntimeBuilder:: { entries: self.entries, - urb: self.urb.clone(), + urb: self.urb, #[cfg(feature = "sync")] - blocking_handle: self.blocking_handle.clone(), + blocking_handle: self.blocking_handle, _mark: PhantomData, }; info!("legacy driver built"); @@ -192,11 +192,11 @@ impl RuntimeBuilder { /// Build the runtime. #[cfg(all(unix, not(all(target_os = "linux", feature = "iouring"))))] - pub fn build(&self) -> io::Result> { + pub fn build(self) -> io::Result> { let builder = RuntimeBuilder:: { entries: self.entries, #[cfg(feature = "sync")] - blocking_handle: self.blocking_handle.clone(), + blocking_handle: self.blocking_handle, _mark: PhantomData, }; Ok(builder.build()?.into()) @@ -204,12 +204,12 @@ impl RuntimeBuilder { /// Build the runtime. #[cfg(all(target_os = "linux", feature = "iouring", not(feature = "legacy")))] - pub fn build(&self) -> io::Result> { + pub fn build(self) -> io::Result> { let builder = RuntimeBuilder:: { entries: self.entries, - urb: self.urb.clone(), + urb: self.urb, #[cfg(feature = "sync")] - blocking_handle: self.blocking_handle.clone(), + blocking_handle: self.blocking_handle, _mark: PhantomData, }; Ok(builder.build()?.into()) @@ -221,14 +221,14 @@ impl RuntimeBuilder> { /// Build the runtime. #[cfg(all(target_os = "linux", feature = "iouring", feature = "legacy"))] pub fn build( - &self, + self, ) -> io::Result, TimeDriver>> { if crate::utils::detect_uring() { let builder = RuntimeBuilder::> { entries: self.entries, - urb: self.urb.clone(), + urb: self.urb, #[cfg(feature = "sync")] - blocking_handle: self.blocking_handle.clone(), + blocking_handle: self.blocking_handle, _mark: PhantomData, }; info!("io_uring driver with timer built"); @@ -236,9 +236,9 @@ impl RuntimeBuilder> { } else { let builder = RuntimeBuilder::> { entries: self.entries, - urb: self.urb.clone(), + urb: self.urb, #[cfg(feature = "sync")] - blocking_handle: self.blocking_handle.clone(), + blocking_handle: self.blocking_handle, _mark: PhantomData, }; info!("legacy driver with timer built"); @@ -248,11 +248,11 @@ impl RuntimeBuilder> { /// Build the runtime. #[cfg(all(unix, not(all(target_os = "linux", feature = "iouring"))))] - pub fn build(&self) -> io::Result>> { + pub fn build(self) -> io::Result>> { let builder = RuntimeBuilder::> { entries: self.entries, #[cfg(feature = "sync")] - blocking_handle: self.blocking_handle.clone(), + blocking_handle: self.blocking_handle, _mark: PhantomData, }; Ok(builder.build()?.into()) @@ -263,9 +263,9 @@ impl RuntimeBuilder> { pub fn build(&self) -> io::Result>> { let builder = RuntimeBuilder::> { entries: self.entries, - urb: self.urb.clone(), + urb: self.urb, #[cfg(feature = "sync")] - blocking_handle: self.blocking_handle.clone(), + blocking_handle: self.blocking_handle, _mark: PhantomData, }; Ok(builder.build()?.into()) @@ -289,16 +289,16 @@ where D: Buildable, { /// Build the runtime - fn build(this: &RuntimeBuilder) -> io::Result>> { + fn build(this: RuntimeBuilder) -> io::Result>> { let Runtime { driver, mut context, - } = Buildable::build(&RuntimeBuilder:: { + } = Buildable::build(RuntimeBuilder:: { entries: this.entries, #[cfg(all(target_os = "linux", feature = "iouring"))] - urb: this.urb.clone(), + urb: this.urb, #[cfg(feature = "sync")] - blocking_handle: this.blocking_handle.clone(), + blocking_handle: this.blocking_handle, _mark: PhantomData, })?; @@ -338,14 +338,16 @@ impl RuntimeBuilder { _mark: PhantomData, } } +} +impl RuntimeBuilder { /// Attach thread pool, this will overwrite blocking strategy. /// All `spawn_blocking` will be executed on given thread pool. #[cfg(feature = "sync")] #[must_use] pub fn attach_thread_pool( mut self, - tp: std::sync::Arc, + tp: Box, ) -> Self { self.blocking_handle = crate::blocking::BlockingHandle::Attached(tp); self diff --git a/monoio/src/lib.rs b/monoio/src/lib.rs index b41f1ef..f116421 100644 --- a/monoio/src/lib.rs +++ b/monoio/src/lib.rs @@ -97,7 +97,7 @@ where F::Output: 'static, D: Buildable + Driver, { - let mut rt = builder::Buildable::build(&builder::RuntimeBuilder::::new()) + let mut rt = builder::Buildable::build(builder::RuntimeBuilder::::new()) .expect("Unable to build runtime."); rt.block_on(future) } -- cgit v1.2.3