hyper-util
Advanced tools
| //! A cache of services | ||
| //! | ||
| //! The cache is a single list of cached services, bundled with a `MakeService`. | ||
| //! Calling the cache returns either an existing service, or makes a new one. | ||
| //! The returned `impl Service` can be used to send requests, and when dropped, | ||
| //! it will try to be returned back to the cache. | ||
| pub use self::internal::builder; | ||
| #[cfg(docsrs)] | ||
| pub use self::internal::Builder; | ||
| #[cfg(docsrs)] | ||
| pub use self::internal::Cache; | ||
| #[cfg(docsrs)] | ||
| pub use self::internal::Cached; | ||
| // For now, nothing else in this module is nameable. We can always make things | ||
| // more public, but we can't change type shapes (generics) once things are | ||
| // public. | ||
| mod internal { | ||
| use std::fmt; | ||
| use std::future::Future; | ||
| use std::pin::Pin; | ||
| use std::sync::{Arc, Mutex, Weak}; | ||
| use std::task::{self, Poll}; | ||
| use futures_core::ready; | ||
| use futures_util::future; | ||
| use tokio::sync::oneshot; | ||
| use tower_service::Service; | ||
| use super::events; | ||
| /// Start a builder to construct a `Cache` pool. | ||
| pub fn builder() -> Builder<events::Ignore> { | ||
| Builder { | ||
| events: events::Ignore, | ||
| } | ||
| } | ||
| /// A cache pool of services from the inner make service. | ||
| /// | ||
| /// Created with [`builder()`]. | ||
| /// | ||
| /// # Unnameable | ||
| /// | ||
| /// This type is normally unnameable, forbidding naming of the type within | ||
| /// code. The type is exposed in the documentation to show which methods | ||
| /// can be publicly called. | ||
| #[derive(Debug)] | ||
| pub struct Cache<M, Dst, Ev> | ||
| where | ||
| M: Service<Dst>, | ||
| { | ||
| connector: M, | ||
| shared: Arc<Mutex<Shared<M::Response>>>, | ||
| events: Ev, | ||
| } | ||
| /// A builder to configure a `Cache`. | ||
| /// | ||
| /// # Unnameable | ||
| /// | ||
| /// This type is normally unnameable, forbidding naming of the type within | ||
| /// code. The type is exposed in the documentation to show which methods | ||
| /// can be publicly called. | ||
| #[derive(Debug)] | ||
| pub struct Builder<Ev> { | ||
| events: Ev, | ||
| } | ||
| /// A cached service returned from a [`Cache`]. | ||
| /// | ||
| /// Implements `Service` by delegating to the inner service. Once dropped, | ||
| /// tries to reinsert into the `Cache`. | ||
| /// | ||
| /// # Unnameable | ||
| /// | ||
| /// This type is normally unnameable, forbidding naming of the type within | ||
| /// code. The type is exposed in the documentation to show which methods | ||
| /// can be publicly called. | ||
| pub struct Cached<S> { | ||
| is_closed: bool, | ||
| inner: Option<S>, | ||
| shared: Weak<Mutex<Shared<S>>>, | ||
| // todo: on_idle | ||
| } | ||
| pub enum CacheFuture<M, Dst, Ev> | ||
| where | ||
| M: Service<Dst>, | ||
| { | ||
| Racing { | ||
| shared: Arc<Mutex<Shared<M::Response>>>, | ||
| select: future::Select<oneshot::Receiver<M::Response>, M::Future>, | ||
| events: Ev, | ||
| }, | ||
| Connecting { | ||
| // TODO: could be Weak even here... | ||
| shared: Arc<Mutex<Shared<M::Response>>>, | ||
| future: M::Future, | ||
| }, | ||
| Cached { | ||
| svc: Option<Cached<M::Response>>, | ||
| }, | ||
| } | ||
| // shouldn't be pub | ||
| #[derive(Debug)] | ||
| pub struct Shared<S> { | ||
| services: Vec<S>, | ||
| waiters: Vec<oneshot::Sender<S>>, | ||
| } | ||
| // impl Builder | ||
| impl<Ev> Builder<Ev> { | ||
| /// Provide a `Future` executor to be used by the `Cache`. | ||
| /// | ||
| /// The executor is used handle some optional background tasks that | ||
| /// can improve the behavior of the cache, such as reducing connection | ||
| /// thrashing when a race is won. If not configured with an executor, | ||
| /// the default behavior is to ignore any of these optional background | ||
| /// tasks. | ||
| /// | ||
| /// The executor should implmenent [`hyper::rt::Executor`]. | ||
| /// | ||
| /// # Example | ||
| /// | ||
| /// ```rust | ||
| /// # #[cfg(feature = "tokio")] | ||
| /// # fn run() { | ||
| /// let builder = hyper_util::client::pool::cache::builder() | ||
| /// .executor(hyper_util::rt::TokioExecutor::new()); | ||
| /// # } | ||
| /// ``` | ||
| pub fn executor<E>(self, exec: E) -> Builder<events::WithExecutor<E>> { | ||
| Builder { | ||
| events: events::WithExecutor(exec), | ||
| } | ||
| } | ||
| /// Build a `Cache` pool around the `connector`. | ||
| pub fn build<M, Dst>(self, connector: M) -> Cache<M, Dst, Ev> | ||
| where | ||
| M: Service<Dst>, | ||
| { | ||
| Cache { | ||
| connector, | ||
| events: self.events, | ||
| shared: Arc::new(Mutex::new(Shared { | ||
| services: Vec::new(), | ||
| waiters: Vec::new(), | ||
| })), | ||
| } | ||
| } | ||
| } | ||
| // impl Cache | ||
| impl<M, Dst, Ev> Cache<M, Dst, Ev> | ||
| where | ||
| M: Service<Dst>, | ||
| { | ||
| /// Retain all cached services indicated by the predicate. | ||
| pub fn retain<F>(&mut self, predicate: F) | ||
| where | ||
| F: FnMut(&mut M::Response) -> bool, | ||
| { | ||
| self.shared.lock().unwrap().services.retain_mut(predicate); | ||
| } | ||
| /// Check whether this cache has no cached services. | ||
| pub fn is_empty(&self) -> bool { | ||
| self.shared.lock().unwrap().services.is_empty() | ||
| } | ||
| } | ||
| impl<M, Dst, Ev> Service<Dst> for Cache<M, Dst, Ev> | ||
| where | ||
| M: Service<Dst>, | ||
| M::Future: Unpin, | ||
| M::Response: Unpin, | ||
| Ev: events::Events<BackgroundConnect<M::Future, M::Response>> + Clone + Unpin, | ||
| { | ||
| type Response = Cached<M::Response>; | ||
| type Error = M::Error; | ||
| type Future = CacheFuture<M, Dst, Ev>; | ||
| fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| if !self.shared.lock().unwrap().services.is_empty() { | ||
| Poll::Ready(Ok(())) | ||
| } else { | ||
| self.connector.poll_ready(cx) | ||
| } | ||
| } | ||
| fn call(&mut self, target: Dst) -> Self::Future { | ||
| // 1. If already cached, easy! | ||
| let waiter = { | ||
| let mut locked = self.shared.lock().unwrap(); | ||
| if let Some(found) = locked.take() { | ||
| return CacheFuture::Cached { | ||
| svc: Some(Cached::new(found, Arc::downgrade(&self.shared))), | ||
| }; | ||
| } | ||
| let (tx, rx) = oneshot::channel(); | ||
| locked.waiters.push(tx); | ||
| rx | ||
| }; | ||
| // 2. Otherwise, we start a new connect, and also listen for | ||
| // any newly idle. | ||
| CacheFuture::Racing { | ||
| shared: self.shared.clone(), | ||
| select: future::select(waiter, self.connector.call(target)), | ||
| events: self.events.clone(), | ||
| } | ||
| } | ||
| } | ||
| impl<M, Dst, Ev> Clone for Cache<M, Dst, Ev> | ||
| where | ||
| M: Service<Dst> + Clone, | ||
| Ev: Clone, | ||
| { | ||
| fn clone(&self) -> Self { | ||
| Self { | ||
| connector: self.connector.clone(), | ||
| events: self.events.clone(), | ||
| shared: self.shared.clone(), | ||
| } | ||
| } | ||
| } | ||
| impl<M, Dst, Ev> Future for CacheFuture<M, Dst, Ev> | ||
| where | ||
| M: Service<Dst>, | ||
| M::Future: Unpin, | ||
| M::Response: Unpin, | ||
| Ev: events::Events<BackgroundConnect<M::Future, M::Response>> + Unpin, | ||
| { | ||
| type Output = Result<Cached<M::Response>, M::Error>; | ||
| fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { | ||
| loop { | ||
| match &mut *self.as_mut() { | ||
| CacheFuture::Racing { | ||
| shared, | ||
| select, | ||
| events, | ||
| } => { | ||
| match ready!(Pin::new(select).poll(cx)) { | ||
| future::Either::Left((Err(_pool_closed), connecting)) => { | ||
| // pool was dropped, so we'll never get it from a waiter, | ||
| // but if this future still exists, then the user still | ||
| // wants a connection. just wait for the connecting | ||
| *self = CacheFuture::Connecting { | ||
| shared: shared.clone(), | ||
| future: connecting, | ||
| }; | ||
| } | ||
| future::Either::Left((Ok(pool_got), connecting)) => { | ||
| events.on_race_lost(BackgroundConnect { | ||
| future: connecting, | ||
| shared: Arc::downgrade(&shared), | ||
| }); | ||
| return Poll::Ready(Ok(Cached::new( | ||
| pool_got, | ||
| Arc::downgrade(&shared), | ||
| ))); | ||
| } | ||
| future::Either::Right((connected, _waiter)) => { | ||
| let inner = connected?; | ||
| return Poll::Ready(Ok(Cached::new( | ||
| inner, | ||
| Arc::downgrade(&shared), | ||
| ))); | ||
| } | ||
| } | ||
| } | ||
| CacheFuture::Connecting { shared, future } => { | ||
| let inner = ready!(Pin::new(future).poll(cx))?; | ||
| return Poll::Ready(Ok(Cached::new(inner, Arc::downgrade(&shared)))); | ||
| } | ||
| CacheFuture::Cached { svc } => { | ||
| return Poll::Ready(Ok(svc.take().unwrap())); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| // impl Cached | ||
| impl<S> Cached<S> { | ||
| fn new(inner: S, shared: Weak<Mutex<Shared<S>>>) -> Self { | ||
| Cached { | ||
| is_closed: false, | ||
| inner: Some(inner), | ||
| shared, | ||
| } | ||
| } | ||
| // TODO: inner()? looks like `tower` likes `get_ref()` and `get_mut()`. | ||
| /// Get a reference to the inner service. | ||
| pub fn inner(&self) -> &S { | ||
| self.inner.as_ref().expect("inner only taken in drop") | ||
| } | ||
| /// Get a mutable reference to the inner service. | ||
| pub fn inner_mut(&mut self) -> &mut S { | ||
| self.inner.as_mut().expect("inner only taken in drop") | ||
| } | ||
| } | ||
| impl<S, Req> Service<Req> for Cached<S> | ||
| where | ||
| S: Service<Req>, | ||
| { | ||
| type Response = S::Response; | ||
| type Error = S::Error; | ||
| type Future = S::Future; | ||
| fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| self.inner.as_mut().unwrap().poll_ready(cx).map_err(|err| { | ||
| self.is_closed = true; | ||
| err | ||
| }) | ||
| } | ||
| fn call(&mut self, req: Req) -> Self::Future { | ||
| self.inner.as_mut().unwrap().call(req) | ||
| } | ||
| } | ||
| impl<S> Drop for Cached<S> { | ||
| fn drop(&mut self) { | ||
| if self.is_closed { | ||
| return; | ||
| } | ||
| if let Some(value) = self.inner.take() { | ||
| if let Some(shared) = self.shared.upgrade() { | ||
| if let Ok(mut shared) = shared.lock() { | ||
| shared.put(value); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| impl<S: fmt::Debug> fmt::Debug for Cached<S> { | ||
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
| f.debug_tuple("Cached") | ||
| .field(self.inner.as_ref().unwrap()) | ||
| .finish() | ||
| } | ||
| } | ||
| // impl Shared | ||
| impl<V> Shared<V> { | ||
| fn put(&mut self, val: V) { | ||
| let mut val = Some(val); | ||
| while let Some(tx) = self.waiters.pop() { | ||
| if !tx.is_closed() { | ||
| match tx.send(val.take().unwrap()) { | ||
| Ok(()) => break, | ||
| Err(v) => { | ||
| val = Some(v); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| if let Some(val) = val { | ||
| self.services.push(val); | ||
| } | ||
| } | ||
| fn take(&mut self) -> Option<V> { | ||
| // TODO: take in a loop | ||
| self.services.pop() | ||
| } | ||
| } | ||
| pub struct BackgroundConnect<CF, S> { | ||
| future: CF, | ||
| shared: Weak<Mutex<Shared<S>>>, | ||
| } | ||
| impl<CF, S, E> Future for BackgroundConnect<CF, S> | ||
| where | ||
| CF: Future<Output = Result<S, E>> + Unpin, | ||
| { | ||
| type Output = (); | ||
| fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { | ||
| match ready!(Pin::new(&mut self.future).poll(cx)) { | ||
| Ok(svc) => { | ||
| if let Some(shared) = self.shared.upgrade() { | ||
| if let Ok(mut locked) = shared.lock() { | ||
| locked.put(svc); | ||
| } | ||
| } | ||
| Poll::Ready(()) | ||
| } | ||
| Err(_e) => Poll::Ready(()), | ||
| } | ||
| } | ||
| } | ||
| } | ||
| mod events { | ||
| #[derive(Clone, Debug)] | ||
| #[non_exhaustive] | ||
| pub struct Ignore; | ||
| #[derive(Clone, Debug)] | ||
| pub struct WithExecutor<E>(pub(super) E); | ||
| pub trait Events<CF> { | ||
| fn on_race_lost(&self, fut: CF); | ||
| } | ||
| impl<CF> Events<CF> for Ignore { | ||
| fn on_race_lost(&self, _fut: CF) {} | ||
| } | ||
| impl<E, CF> Events<CF> for WithExecutor<E> | ||
| where | ||
| E: hyper::rt::Executor<CF>, | ||
| { | ||
| fn on_race_lost(&self, fut: CF) { | ||
| self.0.execute(fut); | ||
| } | ||
| } | ||
| } | ||
| #[cfg(test)] | ||
| mod tests { | ||
| use futures_util::future; | ||
| use tower_service::Service; | ||
| use tower_test::assert_request_eq; | ||
| #[tokio::test] | ||
| async fn test_makes_svc_when_empty() { | ||
| let (mock, mut handle) = tower_test::mock::pair(); | ||
| let mut cache = super::builder().build(mock); | ||
| handle.allow(1); | ||
| crate::common::future::poll_fn(|cx| cache.poll_ready(cx)) | ||
| .await | ||
| .unwrap(); | ||
| let f = cache.call(1); | ||
| future::join(f, async move { | ||
| assert_request_eq!(handle, 1).send_response("one"); | ||
| }) | ||
| .await | ||
| .0 | ||
| .expect("call"); | ||
| } | ||
| #[tokio::test] | ||
| async fn test_reuses_after_idle() { | ||
| let (mock, mut handle) = tower_test::mock::pair(); | ||
| let mut cache = super::builder().build(mock); | ||
| // only 1 connection should ever be made | ||
| handle.allow(1); | ||
| crate::common::future::poll_fn(|cx| cache.poll_ready(cx)) | ||
| .await | ||
| .unwrap(); | ||
| let f = cache.call(1); | ||
| let cached = future::join(f, async { | ||
| assert_request_eq!(handle, 1).send_response("one"); | ||
| }) | ||
| .await | ||
| .0 | ||
| .expect("call"); | ||
| drop(cached); | ||
| crate::common::future::poll_fn(|cx| cache.poll_ready(cx)) | ||
| .await | ||
| .unwrap(); | ||
| let f = cache.call(1); | ||
| let cached = f.await.expect("call"); | ||
| drop(cached); | ||
| } | ||
| } |
| //! Map pool utilities | ||
| //! | ||
| //! The map isn't a typical `Service`, but rather stand-alone type that can map | ||
| //! requests to a key and service factory. This is because the service is more | ||
| //! of a router, and cannot determine which inner service to check for | ||
| //! backpressure since it's not know until the request is made. | ||
| //! | ||
| //! The map implementation allows customization of extracting a key, and how to | ||
| //! construct a MakeService for that key. | ||
| //! | ||
| //! # Example | ||
| //! | ||
| //! ```rust,ignore | ||
| //! # async fn run() { | ||
| //! # use hyper_util::client::pool; | ||
| //! # let req = http::Request::new(()); | ||
| //! # let some_http1_connector = || { | ||
| //! # tower::service::service_fn(|_req| async { Ok::<_, &'static str>(()) }) | ||
| //! # }; | ||
| //! let mut map = pool::map::Map::builder() | ||
| //! .keys(|uri| (uri.scheme().clone(), uri.authority().clone())) | ||
| //! .values(|_uri| { | ||
| //! some_http1_connector() | ||
| //! }) | ||
| //! .build(); | ||
| //! | ||
| //! let resp = map.service(req.uri()).call(req).await; | ||
| //! # } | ||
| //! ``` | ||
| use std::collections::HashMap; | ||
| // expose the documentation | ||
| #[cfg(docsrs)] | ||
| pub use self::builder::Builder; | ||
| /// A map caching `MakeService`s per key. | ||
| /// | ||
| /// Create one with the [`Map::builder()`]. | ||
| pub struct Map<T, Req> | ||
| where | ||
| T: target::Target<Req>, | ||
| { | ||
| map: HashMap<T::Key, T::Service>, | ||
| targeter: T, | ||
| } | ||
| // impl Map | ||
| impl Map<builder::StartHere, builder::StartHere> { | ||
| /// Create a [`Builder`] to configure a new `Map`. | ||
| pub fn builder<Dst>() -> builder::Builder<Dst, builder::WantsKeyer, builder::WantsServiceMaker> | ||
| { | ||
| builder::Builder::new() | ||
| } | ||
| } | ||
| impl<T, Req> Map<T, Req> | ||
| where | ||
| T: target::Target<Req>, | ||
| { | ||
| fn new(targeter: T) -> Self { | ||
| Map { | ||
| map: HashMap::new(), | ||
| targeter, | ||
| } | ||
| } | ||
| } | ||
| impl<T, Req> Map<T, Req> | ||
| where | ||
| T: target::Target<Req>, | ||
| T::Key: Eq + std::hash::Hash, | ||
| { | ||
| /// Get a service after extracting the key from `req`. | ||
| pub fn service(&mut self, req: &Req) -> &mut T::Service { | ||
| let key = self.targeter.key(req); | ||
| self.map | ||
| .entry(key) | ||
| .or_insert_with(|| self.targeter.service(req)) | ||
| } | ||
| /// Retains only the services specified by the predicate. | ||
| pub fn retain<F>(&mut self, predicate: F) | ||
| where | ||
| F: FnMut(&T::Key, &mut T::Service) -> bool, | ||
| { | ||
| self.map.retain(predicate); | ||
| } | ||
| /// Clears the map, removing all key-value pairs. | ||
| pub fn clear(&mut self) { | ||
| self.map.clear(); | ||
| } | ||
| } | ||
| // sealed and unnameable for now | ||
| mod target { | ||
| pub trait Target<Dst> { | ||
| type Key; | ||
| type Service; | ||
| fn key(&self, dst: &Dst) -> Self::Key; | ||
| fn service(&self, dst: &Dst) -> Self::Service; | ||
| } | ||
| } | ||
| // sealed and unnameable for now | ||
| mod builder { | ||
| use std::marker::PhantomData; | ||
| /// A builder to configure a `Map`. | ||
| /// | ||
| /// # Unnameable | ||
| /// | ||
| /// This type is normally unnameable, forbidding naming of the type within | ||
| /// code. The type is exposed in the documentation to show which methods | ||
| /// can be publicly called. | ||
| pub struct Builder<Dst, K, S> { | ||
| _dst: PhantomData<fn(Dst)>, | ||
| keys: K, | ||
| svcs: S, | ||
| } | ||
| pub struct WantsKeyer; | ||
| pub struct WantsServiceMaker; | ||
| pub enum StartHere {} | ||
| pub struct Built<K, S> { | ||
| keys: K, | ||
| svcs: S, | ||
| } | ||
| impl<Dst> Builder<Dst, WantsKeyer, WantsServiceMaker> { | ||
| pub(super) fn new() -> Self { | ||
| Builder { | ||
| _dst: PhantomData, | ||
| keys: WantsKeyer, | ||
| svcs: WantsServiceMaker, | ||
| } | ||
| } | ||
| } | ||
| impl<Dst, S> Builder<Dst, WantsKeyer, S> { | ||
| /// Provide a closure that extracts a pool key for the destination. | ||
| pub fn keys<K, KK>(self, keyer: K) -> Builder<Dst, K, S> | ||
| where | ||
| K: Fn(&Dst) -> KK, | ||
| { | ||
| Builder { | ||
| _dst: PhantomData, | ||
| keys: keyer, | ||
| svcs: self.svcs, | ||
| } | ||
| } | ||
| } | ||
| impl<Dst, K> Builder<Dst, K, WantsServiceMaker> { | ||
| /// Provide a closure to create a new `MakeService` for the destination. | ||
| pub fn values<S, SS>(self, svcs: S) -> Builder<Dst, K, S> | ||
| where | ||
| S: Fn(&Dst) -> SS, | ||
| { | ||
| Builder { | ||
| _dst: PhantomData, | ||
| keys: self.keys, | ||
| svcs, | ||
| } | ||
| } | ||
| } | ||
| impl<Dst, K, S> Builder<Dst, K, S> | ||
| where | ||
| Built<K, S>: super::target::Target<Dst>, | ||
| <Built<K, S> as super::target::Target<Dst>>::Key: Eq + std::hash::Hash, | ||
| { | ||
| /// Build the `Map` pool. | ||
| pub fn build(self) -> super::Map<Built<K, S>, Dst> { | ||
| super::Map::new(Built { | ||
| keys: self.keys, | ||
| svcs: self.svcs, | ||
| }) | ||
| } | ||
| } | ||
| impl super::target::Target<StartHere> for StartHere { | ||
| type Key = StartHere; | ||
| type Service = StartHere; | ||
| fn key(&self, _: &StartHere) -> Self::Key { | ||
| match *self {} | ||
| } | ||
| fn service(&self, _: &StartHere) -> Self::Service { | ||
| match *self {} | ||
| } | ||
| } | ||
| impl<K, KK, S, SS, Dst> super::target::Target<Dst> for Built<K, S> | ||
| where | ||
| K: Fn(&Dst) -> KK, | ||
| S: Fn(&Dst) -> SS, | ||
| KK: Eq + std::hash::Hash, | ||
| { | ||
| type Key = KK; | ||
| type Service = SS; | ||
| fn key(&self, dst: &Dst) -> Self::Key { | ||
| (self.keys)(dst) | ||
| } | ||
| fn service(&self, dst: &Dst) -> Self::Service { | ||
| (self.svcs)(dst) | ||
| } | ||
| } | ||
| } | ||
| #[cfg(test)] | ||
| mod tests { | ||
| #[test] | ||
| fn smoke() { | ||
| let mut pool = super::Map::builder().keys(|_| "a").values(|_| "b").build(); | ||
| pool.service(&"hello"); | ||
| } | ||
| } |
| //! Composable pool services | ||
| //! | ||
| //! This module contains various concepts of a connection pool separated into | ||
| //! their own concerns. This allows for users to compose the layers, along with | ||
| //! any other layers, when constructing custom connection pools. | ||
| pub mod cache; | ||
| pub mod map; | ||
| pub mod negotiate; | ||
| pub mod singleton; |
| //! Negotiate a pool of services | ||
| //! | ||
| //! The negotiate pool allows for a service that can decide between two service | ||
| //! types based on an intermediate return value. It differs from typical | ||
| //! routing since it doesn't depend on the request, but the response. | ||
| //! | ||
| //! The original use case is support ALPN upgrades to HTTP/2, with a fallback | ||
| //! to HTTP/1. | ||
| //! | ||
| //! # Example | ||
| //! | ||
| //! ```rust,ignore | ||
| //! # async fn run() -> Result<(), Box<dyn std::error::Error>> { | ||
| //! # struct Conn; | ||
| //! # impl Conn { fn negotiated_protocol(&self) -> &[u8] { b"h2" } } | ||
| //! # let some_tls_connector = tower::service::service_fn(|_| async move { | ||
| //! # Ok::<_, std::convert::Infallible>(Conn) | ||
| //! # }); | ||
| //! # let http1_layer = tower::layer::layer_fn(|s| s); | ||
| //! # let http2_layer = tower::layer::layer_fn(|s| s); | ||
| //! let mut pool = hyper_util::client::pool::negotiate::builder() | ||
| //! .connect(some_tls_connector) | ||
| //! .inspect(|c| c.negotiated_protocol() == b"h2") | ||
| //! .fallback(http1_layer) | ||
| //! .upgrade(http2_layer) | ||
| //! .build(); | ||
| //! | ||
| //! // connect | ||
| //! let mut svc = pool.call(http::Uri::from_static("https://hyper.rs")).await?; | ||
| //! svc.ready().await; | ||
| //! | ||
| //! // http1 or http2 is now set up | ||
| //! # let some_http_req = http::Request::new(()); | ||
| //! let resp = svc.call(some_http_req).await?; | ||
| //! # Ok(()) | ||
| //! # } | ||
| //! ``` | ||
| pub use self::internal::builder; | ||
| #[cfg(docsrs)] | ||
| pub use self::internal::Builder; | ||
| #[cfg(docsrs)] | ||
| pub use self::internal::Negotiate; | ||
| #[cfg(docsrs)] | ||
| pub use self::internal::Negotiated; | ||
| mod internal { | ||
| use std::future::Future; | ||
| use std::pin::Pin; | ||
| use std::sync::{Arc, Mutex}; | ||
| use std::task::{self, Poll}; | ||
| use futures_core::ready; | ||
| use pin_project_lite::pin_project; | ||
| use tower_layer::Layer; | ||
| use tower_service::Service; | ||
| type BoxError = Box<dyn std::error::Error + Send + Sync>; | ||
| /// A negotiating pool over an inner make service. | ||
| /// | ||
| /// Created with [`builder()`]. | ||
| /// | ||
| /// # Unnameable | ||
| /// | ||
| /// This type is normally unnameable, forbidding naming of the type within | ||
| /// code. The type is exposed in the documentation to show which methods | ||
| /// can be publicly called. | ||
| #[derive(Clone)] | ||
| pub struct Negotiate<L, R> { | ||
| left: L, | ||
| right: R, | ||
| } | ||
| /// A negotiated service returned by [`Negotiate`]. | ||
| /// | ||
| /// # Unnameable | ||
| /// | ||
| /// This type is normally unnameable, forbidding naming of the type within | ||
| /// code. The type is exposed in the documentation to show which methods | ||
| /// can be publicly called. | ||
| #[derive(Clone, Debug)] | ||
| pub enum Negotiated<L, R> { | ||
| #[doc(hidden)] | ||
| Fallback(L), | ||
| #[doc(hidden)] | ||
| Upgraded(R), | ||
| } | ||
| pin_project! { | ||
| pub struct Negotiating<Dst, L, R> | ||
| where | ||
| L: Service<Dst>, | ||
| R: Service<()>, | ||
| { | ||
| #[pin] | ||
| state: State<Dst, L::Future, R::Future>, | ||
| left: L, | ||
| right: R, | ||
| } | ||
| } | ||
| pin_project! { | ||
| #[project = StateProj] | ||
| enum State<Dst, FL, FR> { | ||
| Eager { | ||
| #[pin] | ||
| future: FR, | ||
| dst: Option<Dst>, | ||
| }, | ||
| Fallback { | ||
| #[pin] | ||
| future: FL, | ||
| }, | ||
| Upgrade { | ||
| #[pin] | ||
| future: FR, | ||
| } | ||
| } | ||
| } | ||
| pin_project! { | ||
| #[project = NegotiatedProj] | ||
| pub enum NegotiatedFuture<L, R> { | ||
| Fallback { | ||
| #[pin] | ||
| future: L | ||
| }, | ||
| Upgraded { | ||
| #[pin] | ||
| future: R | ||
| }, | ||
| } | ||
| } | ||
| /// A builder to configure a `Negotiate`. | ||
| /// | ||
| /// # Unnameable | ||
| /// | ||
| /// This type is normally unnameable, forbidding naming of the type within | ||
| /// code. The type is exposed in the documentation to show which methods | ||
| /// can be publicly called. | ||
| #[derive(Debug)] | ||
| pub struct Builder<C, I, L, R> { | ||
| connect: C, | ||
| inspect: I, | ||
| fallback: L, | ||
| upgrade: R, | ||
| } | ||
| #[derive(Debug)] | ||
| pub struct WantsConnect; | ||
| #[derive(Debug)] | ||
| pub struct WantsInspect; | ||
| #[derive(Debug)] | ||
| pub struct WantsFallback; | ||
| #[derive(Debug)] | ||
| pub struct WantsUpgrade; | ||
| /// Start a builder to construct a `Negotiate` pool. | ||
| pub fn builder() -> Builder<WantsConnect, WantsInspect, WantsFallback, WantsUpgrade> { | ||
| Builder { | ||
| connect: WantsConnect, | ||
| inspect: WantsInspect, | ||
| fallback: WantsFallback, | ||
| upgrade: WantsUpgrade, | ||
| } | ||
| } | ||
| impl<C, I, L, R> Builder<C, I, L, R> { | ||
| /// Provide the initial connector. | ||
| pub fn connect<CC>(self, connect: CC) -> Builder<CC, I, L, R> { | ||
| Builder { | ||
| connect, | ||
| inspect: self.inspect, | ||
| fallback: self.fallback, | ||
| upgrade: self.upgrade, | ||
| } | ||
| } | ||
| /// Provide the inspector that determines the result of the negotiation. | ||
| pub fn inspect<II>(self, inspect: II) -> Builder<C, II, L, R> { | ||
| Builder { | ||
| connect: self.connect, | ||
| inspect, | ||
| fallback: self.fallback, | ||
| upgrade: self.upgrade, | ||
| } | ||
| } | ||
| /// Provide the layer to fallback to if negotiation fails. | ||
| pub fn fallback<LL>(self, fallback: LL) -> Builder<C, I, LL, R> { | ||
| Builder { | ||
| connect: self.connect, | ||
| inspect: self.inspect, | ||
| fallback, | ||
| upgrade: self.upgrade, | ||
| } | ||
| } | ||
| /// Provide the layer to upgrade to if negotiation succeeds. | ||
| pub fn upgrade<RR>(self, upgrade: RR) -> Builder<C, I, L, RR> { | ||
| Builder { | ||
| connect: self.connect, | ||
| inspect: self.inspect, | ||
| fallback: self.fallback, | ||
| upgrade, | ||
| } | ||
| } | ||
| /// Build the `Negotiate` pool. | ||
| pub fn build<Dst>(self) -> Negotiate<L::Service, R::Service> | ||
| where | ||
| C: Service<Dst>, | ||
| C::Error: Into<BoxError>, | ||
| L: Layer<Inspector<C, C::Response, I>>, | ||
| L::Service: Service<Dst> + Clone, | ||
| <L::Service as Service<Dst>>::Error: Into<BoxError>, | ||
| R: Layer<Inspected<C::Response>>, | ||
| R::Service: Service<()> + Clone, | ||
| <R::Service as Service<()>>::Error: Into<BoxError>, | ||
| I: Fn(&C::Response) -> bool + Clone, | ||
| { | ||
| let Builder { | ||
| connect, | ||
| inspect, | ||
| fallback, | ||
| upgrade, | ||
| } = self; | ||
| let slot = Arc::new(Mutex::new(None)); | ||
| let wrapped = Inspector { | ||
| svc: connect, | ||
| inspect, | ||
| slot: slot.clone(), | ||
| }; | ||
| let left = fallback.layer(wrapped); | ||
| let right = upgrade.layer(Inspected { slot }); | ||
| Negotiate { left, right } | ||
| } | ||
| } | ||
| impl<L, R> Negotiate<L, R> { | ||
| /// Get a mutable reference to the fallback service. | ||
| pub fn fallback_mut(&mut self) -> &mut L { | ||
| &mut self.left | ||
| } | ||
| /// Get a mutable reference to the upgrade service. | ||
| pub fn upgrade_mut(&mut self) -> &mut R { | ||
| &mut self.right | ||
| } | ||
| } | ||
| impl<L, R, Target> Service<Target> for Negotiate<L, R> | ||
| where | ||
| L: Service<Target> + Clone, | ||
| L::Error: Into<BoxError>, | ||
| R: Service<()> + Clone, | ||
| R::Error: Into<BoxError>, | ||
| { | ||
| type Response = Negotiated<L::Response, R::Response>; | ||
| type Error = BoxError; | ||
| type Future = Negotiating<Target, L, R>; | ||
| fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| self.left.poll_ready(cx).map_err(Into::into) | ||
| } | ||
| fn call(&mut self, dst: Target) -> Self::Future { | ||
| let left = self.left.clone(); | ||
| Negotiating { | ||
| state: State::Eager { | ||
| future: self.right.call(()), | ||
| dst: Some(dst), | ||
| }, | ||
| // place clone, take original that we already polled-ready. | ||
| left: std::mem::replace(&mut self.left, left), | ||
| right: self.right.clone(), | ||
| } | ||
| } | ||
| } | ||
| impl<Dst, L, R> Future for Negotiating<Dst, L, R> | ||
| where | ||
| L: Service<Dst>, | ||
| L::Error: Into<BoxError>, | ||
| R: Service<()>, | ||
| R::Error: Into<BoxError>, | ||
| { | ||
| type Output = Result<Negotiated<L::Response, R::Response>, BoxError>; | ||
| fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { | ||
| // States: | ||
| // - `Eager`: try the "right" path first; on `UseOther` sentinel, fall back to left. | ||
| // - `Fallback`: try the left path; on `UseOther` sentinel, upgrade back to right. | ||
| // - `Upgrade`: retry the right path after a fallback. | ||
| // If all fail, give up. | ||
| let mut me = self.project(); | ||
| loop { | ||
| match me.state.as_mut().project() { | ||
| StateProj::Eager { future, dst } => match ready!(future.poll(cx)) { | ||
| Ok(out) => return Poll::Ready(Ok(Negotiated::Upgraded(out))), | ||
| Err(err) => { | ||
| let err = err.into(); | ||
| if UseOther::is(&*err) { | ||
| let dst = dst.take().unwrap(); | ||
| let f = me.left.call(dst); | ||
| me.state.set(State::Fallback { future: f }); | ||
| continue; | ||
| } else { | ||
| return Poll::Ready(Err(err)); | ||
| } | ||
| } | ||
| }, | ||
| StateProj::Fallback { future } => match ready!(future.poll(cx)) { | ||
| Ok(out) => return Poll::Ready(Ok(Negotiated::Fallback(out))), | ||
| Err(err) => { | ||
| let err = err.into(); | ||
| if UseOther::is(&*err) { | ||
| let f = me.right.call(()); | ||
| me.state.set(State::Upgrade { future: f }); | ||
| continue; | ||
| } else { | ||
| return Poll::Ready(Err(err)); | ||
| } | ||
| } | ||
| }, | ||
| StateProj::Upgrade { future } => match ready!(future.poll(cx)) { | ||
| Ok(out) => return Poll::Ready(Ok(Negotiated::Upgraded(out))), | ||
| Err(err) => return Poll::Ready(Err(err.into())), | ||
| }, | ||
| } | ||
| } | ||
| } | ||
| } | ||
| impl<L, R> Negotiated<L, R> { | ||
| // Could be useful? | ||
| #[cfg(test)] | ||
| pub(super) fn is_fallback(&self) -> bool { | ||
| matches!(self, Negotiated::Fallback(_)) | ||
| } | ||
| #[cfg(test)] | ||
| pub(super) fn is_upgraded(&self) -> bool { | ||
| matches!(self, Negotiated::Upgraded(_)) | ||
| } | ||
| // TODO: are these the correct methods? Or .as_ref().fallback(), etc? | ||
| /// Get a reference to the fallback service if this is it. | ||
| pub fn fallback_ref(&self) -> Option<&L> { | ||
| if let Negotiated::Fallback(ref left) = self { | ||
| Some(left) | ||
| } else { | ||
| None | ||
| } | ||
| } | ||
| /// Get a mutable reference to the fallback service if this is it. | ||
| pub fn fallback_mut(&mut self) -> Option<&mut L> { | ||
| if let Negotiated::Fallback(ref mut left) = self { | ||
| Some(left) | ||
| } else { | ||
| None | ||
| } | ||
| } | ||
| /// Get a reference to the upgraded service if this is it. | ||
| pub fn upgraded_ref(&self) -> Option<&R> { | ||
| if let Negotiated::Upgraded(ref right) = self { | ||
| Some(right) | ||
| } else { | ||
| None | ||
| } | ||
| } | ||
| /// Get a mutable reference to the upgraded service if this is it. | ||
| pub fn upgraded_mut(&mut self) -> Option<&mut R> { | ||
| if let Negotiated::Upgraded(ref mut right) = self { | ||
| Some(right) | ||
| } else { | ||
| None | ||
| } | ||
| } | ||
| } | ||
| impl<L, R, Req, Res, E> Service<Req> for Negotiated<L, R> | ||
| where | ||
| L: Service<Req, Response = Res, Error = E>, | ||
| R: Service<Req, Response = Res, Error = E>, | ||
| { | ||
| type Response = Res; | ||
| type Error = E; | ||
| type Future = NegotiatedFuture<L::Future, R::Future>; | ||
| fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| match self { | ||
| Negotiated::Fallback(ref mut s) => s.poll_ready(cx), | ||
| Negotiated::Upgraded(ref mut s) => s.poll_ready(cx), | ||
| } | ||
| } | ||
| fn call(&mut self, req: Req) -> Self::Future { | ||
| match self { | ||
| Negotiated::Fallback(ref mut s) => NegotiatedFuture::Fallback { | ||
| future: s.call(req), | ||
| }, | ||
| Negotiated::Upgraded(ref mut s) => NegotiatedFuture::Upgraded { | ||
| future: s.call(req), | ||
| }, | ||
| } | ||
| } | ||
| } | ||
| impl<L, R, Out> Future for NegotiatedFuture<L, R> | ||
| where | ||
| L: Future<Output = Out>, | ||
| R: Future<Output = Out>, | ||
| { | ||
| type Output = Out; | ||
| fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { | ||
| match self.project() { | ||
| NegotiatedProj::Fallback { future } => future.poll(cx), | ||
| NegotiatedProj::Upgraded { future } => future.poll(cx), | ||
| } | ||
| } | ||
| } | ||
| // ===== internal ===== | ||
| pub struct Inspector<M, S, I> { | ||
| svc: M, | ||
| inspect: I, | ||
| slot: Arc<Mutex<Option<S>>>, | ||
| } | ||
| pin_project! { | ||
| pub struct InspectFuture<F, S, I> { | ||
| #[pin] | ||
| future: F, | ||
| inspect: I, | ||
| slot: Arc<Mutex<Option<S>>>, | ||
| } | ||
| } | ||
| impl<M: Clone, S, I: Clone> Clone for Inspector<M, S, I> { | ||
| fn clone(&self) -> Self { | ||
| Self { | ||
| svc: self.svc.clone(), | ||
| inspect: self.inspect.clone(), | ||
| slot: self.slot.clone(), | ||
| } | ||
| } | ||
| } | ||
| impl<M, S, I, Target> Service<Target> for Inspector<M, S, I> | ||
| where | ||
| M: Service<Target, Response = S>, | ||
| M::Error: Into<BoxError>, | ||
| I: Clone + Fn(&S) -> bool, | ||
| { | ||
| type Response = M::Response; | ||
| type Error = BoxError; | ||
| type Future = InspectFuture<M::Future, S, I>; | ||
| fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| self.svc.poll_ready(cx).map_err(Into::into) | ||
| } | ||
| fn call(&mut self, dst: Target) -> Self::Future { | ||
| InspectFuture { | ||
| future: self.svc.call(dst), | ||
| inspect: self.inspect.clone(), | ||
| slot: self.slot.clone(), | ||
| } | ||
| } | ||
| } | ||
| impl<F, I, S, E> Future for InspectFuture<F, S, I> | ||
| where | ||
| F: Future<Output = Result<S, E>>, | ||
| E: Into<BoxError>, | ||
| I: Fn(&S) -> bool, | ||
| { | ||
| type Output = Result<S, BoxError>; | ||
| fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { | ||
| let me = self.project(); | ||
| let s = ready!(me.future.poll(cx)).map_err(Into::into)?; | ||
| Poll::Ready(if (me.inspect)(&s) { | ||
| *me.slot.lock().unwrap() = Some(s); | ||
| Err(UseOther.into()) | ||
| } else { | ||
| Ok(s) | ||
| }) | ||
| } | ||
| } | ||
| pub struct Inspected<S> { | ||
| slot: Arc<Mutex<Option<S>>>, | ||
| } | ||
| impl<S, Target> Service<Target> for Inspected<S> { | ||
| type Response = S; | ||
| type Error = BoxError; | ||
| type Future = std::future::Ready<Result<S, BoxError>>; | ||
| fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| if self.slot.lock().unwrap().is_some() { | ||
| Poll::Ready(Ok(())) | ||
| } else { | ||
| Poll::Ready(Err(UseOther.into())) | ||
| } | ||
| } | ||
| fn call(&mut self, _dst: Target) -> Self::Future { | ||
| let s = self | ||
| .slot | ||
| .lock() | ||
| .unwrap() | ||
| .take() | ||
| .ok_or_else(|| UseOther.into()); | ||
| std::future::ready(s) | ||
| } | ||
| } | ||
| impl<S> Clone for Inspected<S> { | ||
| fn clone(&self) -> Inspected<S> { | ||
| Inspected { | ||
| slot: self.slot.clone(), | ||
| } | ||
| } | ||
| } | ||
| #[derive(Debug)] | ||
| struct UseOther; | ||
| impl std::fmt::Display for UseOther { | ||
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
| f.write_str("sentinel error; using other") | ||
| } | ||
| } | ||
| impl std::error::Error for UseOther {} | ||
| impl UseOther { | ||
| fn is(err: &(dyn std::error::Error + 'static)) -> bool { | ||
| let mut source = Some(err); | ||
| while let Some(err) = source { | ||
| if err.is::<UseOther>() { | ||
| return true; | ||
| } | ||
| source = err.source(); | ||
| } | ||
| false | ||
| } | ||
| } | ||
| } | ||
| #[cfg(test)] | ||
| mod tests { | ||
| use futures_util::future; | ||
| use tower_service::Service; | ||
| use tower_test::assert_request_eq; | ||
| #[tokio::test] | ||
| async fn not_negotiated_falls_back_to_left() { | ||
| let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>(); | ||
| let mut negotiate = super::builder() | ||
| .connect(mock_svc) | ||
| .inspect(|_: &&str| false) | ||
| .fallback(layer_fn(|s| s)) | ||
| .upgrade(layer_fn(|s| s)) | ||
| .build(); | ||
| crate::common::future::poll_fn(|cx| negotiate.poll_ready(cx)) | ||
| .await | ||
| .unwrap(); | ||
| let fut = negotiate.call(()); | ||
| let nsvc = future::join(fut, async move { | ||
| assert_request_eq!(handle, ()).send_response("one"); | ||
| }) | ||
| .await | ||
| .0 | ||
| .expect("call"); | ||
| assert!(nsvc.is_fallback()); | ||
| } | ||
| #[tokio::test] | ||
| async fn negotiated_uses_right() { | ||
| let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>(); | ||
| let mut negotiate = super::builder() | ||
| .connect(mock_svc) | ||
| .inspect(|_: &&str| true) | ||
| .fallback(layer_fn(|s| s)) | ||
| .upgrade(layer_fn(|s| s)) | ||
| .build(); | ||
| crate::common::future::poll_fn(|cx| negotiate.poll_ready(cx)) | ||
| .await | ||
| .unwrap(); | ||
| let fut = negotiate.call(()); | ||
| let nsvc = future::join(fut, async move { | ||
| assert_request_eq!(handle, ()).send_response("one"); | ||
| }) | ||
| .await | ||
| .0 | ||
| .expect("call"); | ||
| assert!(nsvc.is_upgraded()); | ||
| } | ||
| fn layer_fn<F>(f: F) -> LayerFn<F> { | ||
| LayerFn(f) | ||
| } | ||
| #[derive(Clone)] | ||
| struct LayerFn<F>(F); | ||
| impl<F, S, Out> tower_layer::Layer<S> for LayerFn<F> | ||
| where | ||
| F: Fn(S) -> Out, | ||
| { | ||
| type Service = Out; | ||
| fn layer(&self, inner: S) -> Self::Service { | ||
| (self.0)(inner) | ||
| } | ||
| } | ||
| } |
| //! Singleton pools | ||
| //! | ||
| //! This ensures that only one active connection is made. | ||
| //! | ||
| //! The singleton pool wraps a `MakeService<T, Req>` so that it only produces a | ||
| //! single `Service<Req>`. It bundles all concurrent calls to it, so that only | ||
| //! one connection is made. All calls to the singleton will return a clone of | ||
| //! the inner service once established. | ||
| //! | ||
| //! This fits the HTTP/2 case well. | ||
| //! | ||
| //! ## Example | ||
| //! | ||
| //! ```rust,ignore | ||
| //! let mut pool = Singleton::new(some_make_svc); | ||
| //! | ||
| //! let svc1 = pool.call(some_dst).await?; | ||
| //! | ||
| //! let svc2 = pool.call(some_dst).await?; | ||
| //! // svc1 == svc2 | ||
| //! ``` | ||
| use std::sync::{Arc, Mutex}; | ||
| use std::task::{self, Poll}; | ||
| use tokio::sync::oneshot; | ||
| use tower_service::Service; | ||
| use self::internal::{DitchGuard, SingletonError, SingletonFuture, State}; | ||
| type BoxError = Box<dyn std::error::Error + Send + Sync>; | ||
| #[cfg(docsrs)] | ||
| pub use self::internal::Singled; | ||
| /// A singleton pool over an inner service. | ||
| /// | ||
| /// The singleton wraps an inner service maker, bundling all calls to ensure | ||
| /// only one service is created. Once made, it returns clones of the made | ||
| /// service. | ||
| #[derive(Debug)] | ||
| pub struct Singleton<M, Dst> | ||
| where | ||
| M: Service<Dst>, | ||
| { | ||
| mk_svc: M, | ||
| state: Arc<Mutex<State<M::Response>>>, | ||
| } | ||
| impl<M, Target> Singleton<M, Target> | ||
| where | ||
| M: Service<Target>, | ||
| M::Response: Clone, | ||
| { | ||
| /// Create a new singleton pool over an inner make service. | ||
| pub fn new(mk_svc: M) -> Self { | ||
| Singleton { | ||
| mk_svc, | ||
| state: Arc::new(Mutex::new(State::Empty)), | ||
| } | ||
| } | ||
| // pub fn clear? cancel? | ||
| /// Retains the inner made service if specified by the predicate. | ||
| pub fn retain<F>(&mut self, mut predicate: F) | ||
| where | ||
| F: FnMut(&mut M::Response) -> bool, | ||
| { | ||
| let mut locked = self.state.lock().unwrap(); | ||
| match *locked { | ||
| State::Empty => {} | ||
| State::Making(..) => {} | ||
| State::Made(ref mut svc) => { | ||
| if !predicate(svc) { | ||
| *locked = State::Empty; | ||
| } | ||
| } | ||
| } | ||
| } | ||
| /// Returns whether this singleton pool is empty. | ||
| /// | ||
| /// If this pool has created a shared instance, or is currently in the | ||
| /// process of creating one, this returns false. | ||
| pub fn is_empty(&self) -> bool { | ||
| matches!(*self.state.lock().unwrap(), State::Empty) | ||
| } | ||
| } | ||
| impl<M, Target> Service<Target> for Singleton<M, Target> | ||
| where | ||
| M: Service<Target>, | ||
| M::Response: Clone, | ||
| M::Error: Into<BoxError>, | ||
| { | ||
| type Response = internal::Singled<M::Response>; | ||
| type Error = SingletonError; | ||
| type Future = SingletonFuture<M::Future, M::Response>; | ||
| fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| if let State::Empty = *self.state.lock().unwrap() { | ||
| return self | ||
| .mk_svc | ||
| .poll_ready(cx) | ||
| .map_err(|e| SingletonError(e.into())); | ||
| } | ||
| Poll::Ready(Ok(())) | ||
| } | ||
| fn call(&mut self, dst: Target) -> Self::Future { | ||
| let mut locked = self.state.lock().unwrap(); | ||
| match *locked { | ||
| State::Empty => { | ||
| let fut = self.mk_svc.call(dst); | ||
| *locked = State::Making(Vec::new()); | ||
| SingletonFuture::Driving { | ||
| future: fut, | ||
| singleton: DitchGuard(Arc::downgrade(&self.state)), | ||
| } | ||
| } | ||
| State::Making(ref mut waiters) => { | ||
| let (tx, rx) = oneshot::channel(); | ||
| waiters.push(tx); | ||
| SingletonFuture::Waiting { | ||
| rx, | ||
| state: Arc::downgrade(&self.state), | ||
| } | ||
| } | ||
| State::Made(ref svc) => SingletonFuture::Made { | ||
| svc: Some(svc.clone()), | ||
| state: Arc::downgrade(&self.state), | ||
| }, | ||
| } | ||
| } | ||
| } | ||
| impl<M, Target> Clone for Singleton<M, Target> | ||
| where | ||
| M: Service<Target> + Clone, | ||
| { | ||
| fn clone(&self) -> Self { | ||
| Self { | ||
| mk_svc: self.mk_svc.clone(), | ||
| state: self.state.clone(), | ||
| } | ||
| } | ||
| } | ||
| // Holds some "pub" items that otherwise shouldn't be public. | ||
| mod internal { | ||
| use std::future::Future; | ||
| use std::pin::Pin; | ||
| use std::sync::{Mutex, Weak}; | ||
| use std::task::{self, Poll}; | ||
| use futures_core::ready; | ||
| use pin_project_lite::pin_project; | ||
| use tokio::sync::oneshot; | ||
| use tower_service::Service; | ||
| use super::BoxError; | ||
| pin_project! { | ||
| #[project = SingletonFutureProj] | ||
| pub enum SingletonFuture<F, S> { | ||
| Driving { | ||
| #[pin] | ||
| future: F, | ||
| singleton: DitchGuard<S>, | ||
| }, | ||
| Waiting { | ||
| rx: oneshot::Receiver<S>, | ||
| state: Weak<Mutex<State<S>>>, | ||
| }, | ||
| Made { | ||
| svc: Option<S>, | ||
| state: Weak<Mutex<State<S>>>, | ||
| }, | ||
| } | ||
| } | ||
| // XXX: pub because of the enum SingletonFuture | ||
| #[derive(Debug)] | ||
| pub enum State<S> { | ||
| Empty, | ||
| Making(Vec<oneshot::Sender<S>>), | ||
| Made(S), | ||
| } | ||
| // XXX: pub because of the enum SingletonFuture | ||
| pub struct DitchGuard<S>(pub(super) Weak<Mutex<State<S>>>); | ||
| /// A cached service returned from a [`Singleton`]. | ||
| /// | ||
| /// Implements `Service` by delegating to the inner service. If | ||
| /// `poll_ready` returns an error, this will clear the cache in the related | ||
| /// `Singleton`. | ||
| /// | ||
| /// [`Singleton`]: super::Singleton | ||
| /// | ||
| /// # Unnameable | ||
| /// | ||
| /// This type is normally unnameable, forbidding naming of the type within | ||
| /// code. The type is exposed in the documentation to show which methods | ||
| /// can be publicly called. | ||
| #[derive(Debug)] | ||
| pub struct Singled<S> { | ||
| inner: S, | ||
| state: Weak<Mutex<State<S>>>, | ||
| } | ||
| impl<F, S, E> Future for SingletonFuture<F, S> | ||
| where | ||
| F: Future<Output = Result<S, E>>, | ||
| E: Into<BoxError>, | ||
| S: Clone, | ||
| { | ||
| type Output = Result<Singled<S>, SingletonError>; | ||
| fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { | ||
| match self.project() { | ||
| SingletonFutureProj::Driving { future, singleton } => { | ||
| match ready!(future.poll(cx)) { | ||
| Ok(svc) => { | ||
| if let Some(state) = singleton.0.upgrade() { | ||
| let mut locked = state.lock().unwrap(); | ||
| match std::mem::replace(&mut *locked, State::Made(svc.clone())) { | ||
| State::Making(waiters) => { | ||
| for tx in waiters { | ||
| let _ = tx.send(svc.clone()); | ||
| } | ||
| } | ||
| State::Empty | State::Made(_) => { | ||
| // shouldn't happen! | ||
| unreachable!() | ||
| } | ||
| } | ||
| } | ||
| // take out of the DitchGuard so it doesn't treat as "ditched" | ||
| let state = std::mem::replace(&mut singleton.0, Weak::new()); | ||
| Poll::Ready(Ok(Singled::new(svc, state))) | ||
| } | ||
| Err(e) => { | ||
| if let Some(state) = singleton.0.upgrade() { | ||
| let mut locked = state.lock().unwrap(); | ||
| singleton.0 = Weak::new(); | ||
| *locked = State::Empty; | ||
| } | ||
| Poll::Ready(Err(SingletonError(e.into()))) | ||
| } | ||
| } | ||
| } | ||
| SingletonFutureProj::Waiting { rx, state } => match ready!(Pin::new(rx).poll(cx)) { | ||
| Ok(svc) => Poll::Ready(Ok(Singled::new(svc, state.clone()))), | ||
| Err(_canceled) => Poll::Ready(Err(SingletonError(Canceled.into()))), | ||
| }, | ||
| SingletonFutureProj::Made { svc, state } => { | ||
| Poll::Ready(Ok(Singled::new(svc.take().unwrap(), state.clone()))) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| impl<S> Drop for DitchGuard<S> { | ||
| fn drop(&mut self) { | ||
| if let Some(state) = self.0.upgrade() { | ||
| if let Ok(mut locked) = state.lock() { | ||
| *locked = State::Empty; | ||
| } | ||
| } | ||
| } | ||
| } | ||
| impl<S> Singled<S> { | ||
| fn new(inner: S, state: Weak<Mutex<State<S>>>) -> Self { | ||
| Singled { inner, state } | ||
| } | ||
| } | ||
| impl<S, Req> Service<Req> for Singled<S> | ||
| where | ||
| S: Service<Req>, | ||
| { | ||
| type Response = S::Response; | ||
| type Error = S::Error; | ||
| type Future = S::Future; | ||
| fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| // We notice if the cached service dies, and clear the singleton cache. | ||
| match self.inner.poll_ready(cx) { | ||
| Poll::Ready(Err(err)) => { | ||
| if let Some(state) = self.state.upgrade() { | ||
| *state.lock().unwrap() = State::Empty; | ||
| } | ||
| Poll::Ready(Err(err)) | ||
| } | ||
| other => other, | ||
| } | ||
| } | ||
| fn call(&mut self, req: Req) -> Self::Future { | ||
| self.inner.call(req) | ||
| } | ||
| } | ||
| // An opaque error type. By not exposing the type, nor being specifically | ||
| // Box<dyn Error>, we can _change_ the type once we no longer need the Canceled | ||
| // error type. This will be possible with the refactor to baton passing. | ||
| #[derive(Debug)] | ||
| pub struct SingletonError(pub(super) BoxError); | ||
| impl std::fmt::Display for SingletonError { | ||
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
| f.write_str("singleton connection error") | ||
| } | ||
| } | ||
| impl std::error::Error for SingletonError { | ||
| fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { | ||
| Some(&*self.0) | ||
| } | ||
| } | ||
| #[derive(Debug)] | ||
| struct Canceled; | ||
| impl std::fmt::Display for Canceled { | ||
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
| f.write_str("singleton connection canceled") | ||
| } | ||
| } | ||
| impl std::error::Error for Canceled {} | ||
| } | ||
| #[cfg(test)] | ||
| mod tests { | ||
| use std::future::Future; | ||
| use std::pin::Pin; | ||
| use std::task::Poll; | ||
| use tower_service::Service; | ||
| use super::Singleton; | ||
| #[tokio::test] | ||
| async fn first_call_drives_subsequent_wait() { | ||
| let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>(); | ||
| let mut singleton = Singleton::new(mock_svc); | ||
| handle.allow(1); | ||
| crate::common::future::poll_fn(|cx| singleton.poll_ready(cx)) | ||
| .await | ||
| .unwrap(); | ||
| // First call: should go into Driving | ||
| let fut1 = singleton.call(()); | ||
| // Second call: should go into Waiting | ||
| let fut2 = singleton.call(()); | ||
| // Expect exactly one request to the inner service | ||
| let ((), send_response) = handle.next_request().await.unwrap(); | ||
| send_response.send_response("svc"); | ||
| // Both futures should resolve to the same value | ||
| fut1.await.unwrap(); | ||
| fut2.await.unwrap(); | ||
| } | ||
| #[tokio::test] | ||
| async fn made_state_returns_immediately() { | ||
| let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>(); | ||
| let mut singleton = Singleton::new(mock_svc); | ||
| handle.allow(1); | ||
| crate::common::future::poll_fn(|cx| singleton.poll_ready(cx)) | ||
| .await | ||
| .unwrap(); | ||
| // Drive first call to completion | ||
| let fut1 = singleton.call(()); | ||
| let ((), send_response) = handle.next_request().await.unwrap(); | ||
| send_response.send_response("svc"); | ||
| fut1.await.unwrap(); | ||
| // Second call should not hit inner service | ||
| singleton.call(()).await.unwrap(); | ||
| } | ||
| #[tokio::test] | ||
| async fn cached_service_poll_ready_error_clears_singleton() { | ||
| // Outer mock returns an inner mock service | ||
| let (outer, mut outer_handle) = | ||
| tower_test::mock::pair::<(), tower_test::mock::Mock<(), &'static str>>(); | ||
| let mut singleton = Singleton::new(outer); | ||
| // Allow the singleton to be made | ||
| outer_handle.allow(2); | ||
| crate::common::future::poll_fn(|cx| singleton.poll_ready(cx)) | ||
| .await | ||
| .unwrap(); | ||
| // First call produces an inner mock service | ||
| let fut1 = singleton.call(()); | ||
| let ((), send_inner) = outer_handle.next_request().await.unwrap(); | ||
| let (inner, mut inner_handle) = tower_test::mock::pair::<(), &'static str>(); | ||
| send_inner.send_response(inner); | ||
| let mut cached = fut1.await.unwrap(); | ||
| // Now: allow readiness on the inner mock, then inject error | ||
| inner_handle.allow(1); | ||
| // Inject error so next poll_ready fails | ||
| inner_handle.send_error(std::io::Error::new( | ||
| std::io::ErrorKind::Other, | ||
| "cached poll_ready failed", | ||
| )); | ||
| // Drive poll_ready on cached service | ||
| let err = crate::common::future::poll_fn(|cx| cached.poll_ready(cx)) | ||
| .await | ||
| .err() | ||
| .expect("expected poll_ready error"); | ||
| assert_eq!(err.to_string(), "cached poll_ready failed"); | ||
| // After error, the singleton should be cleared, so a new call drives outer again | ||
| outer_handle.allow(1); | ||
| crate::common::future::poll_fn(|cx| singleton.poll_ready(cx)) | ||
| .await | ||
| .unwrap(); | ||
| let fut2 = singleton.call(()); | ||
| let ((), send_inner2) = outer_handle.next_request().await.unwrap(); | ||
| let (inner2, mut inner_handle2) = tower_test::mock::pair::<(), &'static str>(); | ||
| send_inner2.send_response(inner2); | ||
| let mut cached2 = fut2.await.unwrap(); | ||
| // The new cached service should still work | ||
| inner_handle2.allow(1); | ||
| crate::common::future::poll_fn(|cx| cached2.poll_ready(cx)) | ||
| .await | ||
| .expect("expected poll_ready"); | ||
| let cfut2 = cached2.call(()); | ||
| let ((), send_cached2) = inner_handle2.next_request().await.unwrap(); | ||
| send_cached2.send_response("svc2"); | ||
| cfut2.await.unwrap(); | ||
| } | ||
| #[tokio::test] | ||
| async fn cancel_waiter_does_not_affect_others() { | ||
| let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>(); | ||
| let mut singleton = Singleton::new(mock_svc); | ||
| crate::common::future::poll_fn(|cx| singleton.poll_ready(cx)) | ||
| .await | ||
| .unwrap(); | ||
| let fut1 = singleton.call(()); | ||
| let fut2 = singleton.call(()); | ||
| drop(fut2); // cancel one waiter | ||
| let ((), send_response) = handle.next_request().await.unwrap(); | ||
| send_response.send_response("svc"); | ||
| fut1.await.unwrap(); | ||
| } | ||
| // TODO: this should be able to be improved with a cooperative baton refactor | ||
| #[tokio::test] | ||
| async fn cancel_driver_cancels_all() { | ||
| let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>(); | ||
| let mut singleton = Singleton::new(mock_svc); | ||
| crate::common::future::poll_fn(|cx| singleton.poll_ready(cx)) | ||
| .await | ||
| .unwrap(); | ||
| let mut fut1 = singleton.call(()); | ||
| let fut2 = singleton.call(()); | ||
| // poll driver just once, and then drop | ||
| crate::common::future::poll_fn(move |cx| { | ||
| let _ = Pin::new(&mut fut1).poll(cx); | ||
| Poll::Ready(()) | ||
| }) | ||
| .await; | ||
| let ((), send_response) = handle.next_request().await.unwrap(); | ||
| send_response.send_response("svc"); | ||
| assert_eq!( | ||
| fut2.await.unwrap_err().0.to_string(), | ||
| "singleton connection canceled" | ||
| ); | ||
| } | ||
| } |
| { | ||
| "git": { | ||
| "sha1": "203c9563a0ed51666e1829a5be3fbb33d79a3ba2" | ||
| "sha1": "d5740116a55cbf7af13d1142b365c56b1d684f3a" | ||
| }, | ||
| "path_in_vcs": "" | ||
| } |
@@ -129,2 +129,2 @@ name: CI | ||
| - uses: dtolnay/rust-toolchain@nightly | ||
| - run: cargo rustdoc -- --cfg docsrs -D rustdoc::broken_intra_doc_links | ||
| - run: cargo rustdoc --features full -- --cfg docsrs -D rustdoc::broken_intra_doc_links |
+43
-1
@@ -248,3 +248,3 @@ # This file is automatically @generated by Cargo. | ||
| name = "hyper-util" | ||
| version = "0.1.18" | ||
| version = "0.1.19" | ||
| dependencies = [ | ||
@@ -270,3 +270,5 @@ "base64", | ||
| "tokio-test", | ||
| "tower-layer", | ||
| "tower-service", | ||
| "tower-test", | ||
| "tracing", | ||
@@ -366,2 +368,22 @@ "windows-registry", | ||
| [[package]] | ||
| name = "pin-project" | ||
| version = "1.1.10" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" | ||
| dependencies = [ | ||
| "pin-project-internal", | ||
| ] | ||
| [[package]] | ||
| name = "pin-project-internal" | ||
| version = "1.1.10" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" | ||
| dependencies = [ | ||
| "proc-macro2", | ||
| "quote", | ||
| "syn", | ||
| ] | ||
| [[package]] | ||
| name = "pin-project-lite" | ||
@@ -633,2 +655,8 @@ version = "0.2.16" | ||
| [[package]] | ||
| name = "tower-layer" | ||
| version = "0.3.3" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" | ||
| [[package]] | ||
| name = "tower-service" | ||
@@ -640,2 +668,16 @@ version = "0.3.3" | ||
| [[package]] | ||
| name = "tower-test" | ||
| version = "0.4.0" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "a4546773ffeab9e4ea02b8872faa49bb616a80a7da66afc2f32688943f97efa7" | ||
| dependencies = [ | ||
| "futures-util", | ||
| "pin-project", | ||
| "tokio", | ||
| "tokio-test", | ||
| "tower-layer", | ||
| "tower-service", | ||
| ] | ||
| [[package]] | ||
| name = "tracing" | ||
@@ -642,0 +684,0 @@ version = "0.1.41" |
+14
-1
@@ -16,3 +16,3 @@ # THIS FILE IS AUTOMATICALLY GENERATED BY CARGO | ||
| name = "hyper-util" | ||
| version = "0.1.18" | ||
| version = "0.1.19" | ||
| authors = ["Sean McArthur <sean@seanmonstar.com>"] | ||
@@ -65,2 +65,7 @@ build = false | ||
| ] | ||
| client-pool = [ | ||
| "client", | ||
| "dep:futures-util", | ||
| "dep:tower-layer", | ||
| ] | ||
| client-proxy = [ | ||
@@ -80,2 +85,3 @@ "client", | ||
| "client-legacy", | ||
| "client-pool", | ||
| "client-proxy", | ||
@@ -204,2 +210,6 @@ "client-proxy-system", | ||
| [dependencies.tower-layer] | ||
| version = "0.3" | ||
| optional = true | ||
| [dependencies.tower-service] | ||
@@ -244,2 +254,5 @@ version = "0.3" | ||
| [dev-dependencies.tower-test] | ||
| version = "0.4" | ||
| [target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies.pnet_datalink] | ||
@@ -246,0 +259,0 @@ version = "0.35.0" |
+8
-0
@@ -0,1 +1,9 @@ | ||
| # 0.1.19 (2025-12-03) | ||
| - Add `client::pool` module for composable pools. Enable with the `client-pool` feature. | ||
| - Add `pool::singleton` for sharing a single cloneable connection. | ||
| - Add `pool::cache` for caching a list of connections. | ||
| - Add `pool::negotiate` for combining two pools with upgrade and fallback negotiation. | ||
| - Add `pool::map` for customizable mapping of keys and connections. | ||
| # 0.1.18 (2025-11-13) | ||
@@ -2,0 +10,0 @@ |
@@ -21,3 +21,3 @@ mod errors; | ||
| /// This is a connector that can be used by the `legacy::Client`. It wraps | ||
| /// another connector, and after getting an underlying connection, it established | ||
| /// another connector, and after getting an underlying connection, it establishes | ||
| /// a TCP tunnel over it using SOCKSv4. | ||
@@ -24,0 +24,0 @@ #[derive(Debug, Clone)] |
@@ -21,3 +21,3 @@ mod errors; | ||
| /// This is a connector that can be used by the `legacy::Client`. It wraps | ||
| /// another connector, and after getting an underlying connection, it established | ||
| /// another connector, and after getting an underlying connection, it establishes | ||
| /// a TCP tunnel over it using SOCKSv5. | ||
@@ -68,4 +68,5 @@ #[derive(Debug, Clone)] | ||
| /// Username and Password must be maximum of 255 characters each. | ||
| /// 0 length strings are allowed despite RFC prohibiting it. This is done so that | ||
| /// for compatablity with server implementations that require it for IP authentication. | ||
| /// 0 length strings are allowed despite RFC prohibiting it. This is done for | ||
| /// compatablity with server implementations that use empty credentials | ||
| /// to allow returning error codes during IP authentication. | ||
| pub fn with_auth(mut self, user: String, pass: String) -> Self { | ||
@@ -87,6 +88,6 @@ self.config.proxy_auth = Some((user, pass)); | ||
| /// | ||
| /// Typical SOCKS handshake with auithentication takes 3 round trips. Optimistic sending | ||
| /// A typical SOCKS handshake with user/pass authentication takes 3 round trips Optimistic sending | ||
| /// can reduce round trip times and dramatically increase speed of handshake at the cost of | ||
| /// reduced portability; many server implementations do not support optimistic sending as it | ||
| /// is not defined in the RFC (RFC 1928). | ||
| /// is not defined in the RFC. | ||
| /// | ||
@@ -93,0 +94,0 @@ /// Recommended to ensure connector works correctly without optimistic sending before trying |
@@ -7,3 +7,6 @@ //! HTTP client utilities | ||
| #[cfg(feature = "client-pool")] | ||
| pub mod pool; | ||
| #[cfg(feature = "client-proxy")] | ||
| pub mod proxy; |
Sorry, the diff of this file is not supported yet