diff options
| author | ihc童鞋@提不起劲 <[email protected]> | 2023-05-18 20:49:24 +0800 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-18 20:49:24 +0800 |
| commit | 5902eb8a6f4bd9ec65da4407efaaedfdaf5493f2 (patch) | |
| tree | 93e86ee8f1fa5df3595d00750ba45e52c51cd8d8 /monoio | |
| parent | d0ffe8bfbffe075f3f104be383fe67efe38e1bb3 (diff) | |
fix: accept new thread pool impl (#167)
Diffstat (limited to 'monoio')
| -rw-r--r-- | monoio/src/blocking.rs | 14 | ||||
| -rw-r--r-- | monoio/src/builder.rs | 66 | ||||
| -rw-r--r-- | monoio/src/lib.rs | 2 |
3 files changed, 41 insertions, 41 deletions
diff --git a/monoio/src/blocking.rs b/monoio/src/blocking.rs index 3be4716..ecfaedf 100644 --- a/monoio/src/blocking.rs +++ b/monoio/src/blocking.rs @@ -1,6 +1,6 @@ //! Blocking tasks related. -use std::{future::Future, sync::Arc, task::Poll}; +use std::{future::Future, task::Poll}; use threadpool::{Builder as ThreadPoolBuilder, ThreadPool as ThreadPoolImpl}; @@ -121,6 +121,7 @@ where /// DefaultThreadPool is a simple wrapped `threadpool::ThreadPool` that implememt /// `monoio::blocking::ThreadPool`. You may use this implementation, or you can use your own thread /// pool implementation. +#[derive(Clone)] pub struct DefaultThreadPool { pool: ThreadPoolImpl, } @@ -153,9 +154,8 @@ impl crate::task::Schedule for NoopScheduler { } } -#[derive(Clone)] pub(crate) enum BlockingHandle { - Attached(Arc<dyn ThreadPool>), + Attached(Box<dyn crate::blocking::ThreadPool + Send + 'static>), Empty(BlockingStrategy), } @@ -188,8 +188,6 @@ where #[cfg(test)] mod tests { - use std::sync::Arc; - use super::DefaultThreadPool; /// NaiveThreadPool always create a new thread on executing tasks. @@ -212,7 +210,7 @@ mod tests { #[test] fn hello_blocking() { - let shared_pool = Arc::new(NaiveThreadPool); + let shared_pool = Box::new(NaiveThreadPool); let mut rt = crate::RuntimeBuilder::<crate::FusionDriver>::new() .attach_thread_pool(shared_pool) .enable_timer() @@ -272,7 +270,7 @@ mod tests { #[test] fn drop_task() { - let shared_pool = Arc::new(FakeThreadPool); + let shared_pool = Box::new(FakeThreadPool); let mut rt = crate::RuntimeBuilder::<crate::FusionDriver>::new() .attach_thread_pool(shared_pool) .enable_timer() @@ -286,7 +284,7 @@ mod tests { #[test] fn default_pool() { - let shared_pool = Arc::new(DefaultThreadPool::new(3)); + let shared_pool = Box::new(DefaultThreadPool::new(3)); let mut rt = crate::RuntimeBuilder::<crate::FusionDriver>::new() .attach_thread_pool(shared_pool) .enable_timer() diff --git a/monoio/src/builder.rs b/monoio/src/builder.rs index e123fd1..0ca43f1 100644 --- a/monoio/src/builder.rs +++ b/monoio/src/builder.rs @@ -60,7 +60,7 @@ impl<T> RuntimeBuilder<T> { /// Buildable trait. pub trait Buildable: Sized { /// Build the runtime. - fn build(this: &RuntimeBuilder<Self>) -> io::Result<Runtime<Self>>; + fn build(this: RuntimeBuilder<Self>) -> io::Result<Runtime<Self>>; } #[allow(unused)] @@ -68,7 +68,7 @@ macro_rules! direct_build { ($ty: ty) => { impl RuntimeBuilder<$ty> { /// Build the runtime. - pub fn build(&self) -> io::Result<Runtime<$ty>> { + pub fn build(self) -> io::Result<Runtime<$ty>> { Buildable::build(self) } } @@ -88,10 +88,10 @@ direct_build!(TimeDriver<LegacyDriver>); #[cfg(all(unix, feature = "legacy"))] impl Buildable for LegacyDriver { - fn build(this: &RuntimeBuilder<Self>) -> io::Result<Runtime<LegacyDriver>> { + fn build(this: RuntimeBuilder<Self>) -> io::Result<Runtime<LegacyDriver>> { let thread_id = gen_id(); #[cfg(feature = "sync")] - let blocking_handle = this.blocking_handle.clone(); + let blocking_handle = this.blocking_handle; BUILD_THREAD_ID.set(&thread_id, || { let driver = match this.entries { @@ -109,10 +109,10 @@ impl Buildable for LegacyDriver { #[cfg(all(target_os = "linux", feature = "iouring"))] impl Buildable for IoUringDriver { - fn build(this: &RuntimeBuilder<Self>) -> io::Result<Runtime<IoUringDriver>> { + fn build(this: RuntimeBuilder<Self>) -> io::Result<Runtime<IoUringDriver>> { let thread_id = gen_id(); #[cfg(feature = "sync")] - let blocking_handle = this.blocking_handle.clone(); + let blocking_handle = this.blocking_handle; BUILD_THREAD_ID.set(&thread_id, || { let driver = match this.entries { @@ -150,8 +150,8 @@ impl<D> RuntimeBuilder<D> { #[cfg(all(target_os = "linux", feature = "iouring"))] #[must_use] - pub fn uring_builder(mut self, urb: &io_uring::Builder) -> Self { - self.urb = urb.clone(); + pub fn uring_builder(mut self, urb: io_uring::Builder) -> Self { + self.urb = urb; self } } @@ -166,13 +166,13 @@ pub struct FusionDriver; impl RuntimeBuilder<FusionDriver> { /// Build the runtime. #[cfg(all(target_os = "linux", feature = "iouring", feature = "legacy"))] - pub fn build(&self) -> io::Result<crate::FusionRuntime<IoUringDriver, LegacyDriver>> { + pub fn build(self) -> io::Result<crate::FusionRuntime<IoUringDriver, LegacyDriver>> { if crate::utils::detect_uring() { let builder = RuntimeBuilder::<IoUringDriver> { entries: self.entries, - urb: self.urb.clone(), + urb: self.urb, #[cfg(feature = "sync")] - blocking_handle: self.blocking_handle.clone(), + blocking_handle: self.blocking_handle, _mark: PhantomData, }; info!("io_uring driver built"); @@ -180,9 +180,9 @@ impl RuntimeBuilder<FusionDriver> { } else { let builder = RuntimeBuilder::<LegacyDriver> { entries: self.entries, - urb: self.urb.clone(), + urb: self.urb, #[cfg(feature = "sync")] - blocking_handle: self.blocking_handle.clone(), + blocking_handle: self.blocking_handle, _mark: PhantomData, }; info!("legacy driver built"); @@ -192,11 +192,11 @@ impl RuntimeBuilder<FusionDriver> { /// Build the runtime. #[cfg(all(unix, not(all(target_os = "linux", feature = "iouring"))))] - pub fn build(&self) -> io::Result<crate::FusionRuntime<LegacyDriver>> { + pub fn build(self) -> io::Result<crate::FusionRuntime<LegacyDriver>> { let builder = RuntimeBuilder::<LegacyDriver> { entries: self.entries, #[cfg(feature = "sync")] - blocking_handle: self.blocking_handle.clone(), + blocking_handle: self.blocking_handle, _mark: PhantomData, }; Ok(builder.build()?.into()) @@ -204,12 +204,12 @@ impl RuntimeBuilder<FusionDriver> { /// Build the runtime. #[cfg(all(target_os = "linux", feature = "iouring", not(feature = "legacy")))] - pub fn build(&self) -> io::Result<crate::FusionRuntime<IoUringDriver>> { + pub fn build(self) -> io::Result<crate::FusionRuntime<IoUringDriver>> { let builder = RuntimeBuilder::<IoUringDriver> { entries: self.entries, - urb: self.urb.clone(), + urb: self.urb, #[cfg(feature = "sync")] - blocking_handle: self.blocking_handle.clone(), + blocking_handle: self.blocking_handle, _mark: PhantomData, }; Ok(builder.build()?.into()) @@ -221,14 +221,14 @@ impl RuntimeBuilder<TimeDriver<FusionDriver>> { /// Build the runtime. #[cfg(all(target_os = "linux", feature = "iouring", feature = "legacy"))] pub fn build( - &self, + self, ) -> io::Result<crate::FusionRuntime<TimeDriver<IoUringDriver>, TimeDriver<LegacyDriver>>> { if crate::utils::detect_uring() { let builder = RuntimeBuilder::<TimeDriver<IoUringDriver>> { entries: self.entries, - urb: self.urb.clone(), + urb: self.urb, #[cfg(feature = "sync")] - blocking_handle: self.blocking_handle.clone(), + blocking_handle: self.blocking_handle, _mark: PhantomData, }; info!("io_uring driver with timer built"); @@ -236,9 +236,9 @@ impl RuntimeBuilder<TimeDriver<FusionDriver>> { } else { let builder = RuntimeBuilder::<TimeDriver<LegacyDriver>> { entries: self.entries, - urb: self.urb.clone(), + urb: self.urb, #[cfg(feature = "sync")] - blocking_handle: self.blocking_handle.clone(), + blocking_handle: self.blocking_handle, _mark: PhantomData, }; info!("legacy driver with timer built"); @@ -248,11 +248,11 @@ impl RuntimeBuilder<TimeDriver<FusionDriver>> { /// Build the runtime. #[cfg(all(unix, not(all(target_os = "linux", feature = "iouring"))))] - pub fn build(&self) -> io::Result<crate::FusionRuntime<TimeDriver<LegacyDriver>>> { + pub fn build(self) -> io::Result<crate::FusionRuntime<TimeDriver<LegacyDriver>>> { let builder = RuntimeBuilder::<TimeDriver<LegacyDriver>> { entries: self.entries, #[cfg(feature = "sync")] - blocking_handle: self.blocking_handle.clone(), + blocking_handle: self.blocking_handle, _mark: PhantomData, }; Ok(builder.build()?.into()) @@ -263,9 +263,9 @@ impl RuntimeBuilder<TimeDriver<FusionDriver>> { pub fn build(&self) -> io::Result<crate::FusionRuntime<TimeDriver<IoUringDriver>>> { let builder = RuntimeBuilder::<TimeDriver<IoUringDriver>> { entries: self.entries, - urb: self.urb.clone(), + urb: self.urb, #[cfg(feature = "sync")] - blocking_handle: self.blocking_handle.clone(), + blocking_handle: self.blocking_handle, _mark: PhantomData, }; Ok(builder.build()?.into()) @@ -289,16 +289,16 @@ where D: Buildable, { /// Build the runtime - fn build(this: &RuntimeBuilder<Self>) -> io::Result<Runtime<TimeDriver<D>>> { + fn build(this: RuntimeBuilder<Self>) -> io::Result<Runtime<TimeDriver<D>>> { let Runtime { driver, mut context, - } = Buildable::build(&RuntimeBuilder::<D> { + } = Buildable::build(RuntimeBuilder::<D> { entries: this.entries, #[cfg(all(target_os = "linux", feature = "iouring"))] - urb: this.urb.clone(), + urb: this.urb, #[cfg(feature = "sync")] - blocking_handle: this.blocking_handle.clone(), + blocking_handle: this.blocking_handle, _mark: PhantomData, })?; @@ -338,14 +338,16 @@ impl<D: time_wrap::TimeWrapable> RuntimeBuilder<D> { _mark: PhantomData, } } +} +impl<D> RuntimeBuilder<D> { /// Attach thread pool, this will overwrite blocking strategy. /// All `spawn_blocking` will be executed on given thread pool. #[cfg(feature = "sync")] #[must_use] pub fn attach_thread_pool( mut self, - tp: std::sync::Arc<dyn crate::blocking::ThreadPool>, + tp: Box<dyn crate::blocking::ThreadPool + Send + 'static>, ) -> Self { self.blocking_handle = crate::blocking::BlockingHandle::Attached(tp); self diff --git a/monoio/src/lib.rs b/monoio/src/lib.rs index b41f1ef..f116421 100644 --- a/monoio/src/lib.rs +++ b/monoio/src/lib.rs @@ -97,7 +97,7 @@ where F::Output: 'static, D: Buildable + Driver, { - let mut rt = builder::Buildable::build(&builder::RuntimeBuilder::<D>::new()) + let mut rt = builder::Buildable::build(builder::RuntimeBuilder::<D>::new()) .expect("Unable to build runtime."); rt.block_on(future) } |
