You're Invited:Meet the Socket Team at RSAC and BSidesSF 2026, March 23–26.RSVP
Socket
Book a DemoSign in
Socket

hyper-util

Package Overview
Dependencies
Maintainers
1
Versions
22
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

hyper-util - cargo Package Compare versions

Comparing version
0.1.18
to
0.1.19
+495
src/client/pool/cache.rs
//! A cache of services
//!
//! The cache is a single list of cached services, bundled with a `MakeService`.
//! Calling the cache returns either an existing service, or makes a new one.
//! The returned `impl Service` can be used to send requests, and when dropped,
//! it will try to be returned back to the cache.
pub use self::internal::builder;
#[cfg(docsrs)]
pub use self::internal::Builder;
#[cfg(docsrs)]
pub use self::internal::Cache;
#[cfg(docsrs)]
pub use self::internal::Cached;
// For now, nothing else in this module is nameable. We can always make things
// more public, but we can't change type shapes (generics) once things are
// public.
mod internal {
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex, Weak};
use std::task::{self, Poll};
use futures_core::ready;
use futures_util::future;
use tokio::sync::oneshot;
use tower_service::Service;
use super::events;
/// Start a builder to construct a `Cache` pool.
pub fn builder() -> Builder<events::Ignore> {
Builder {
events: events::Ignore,
}
}
/// A cache pool of services from the inner make service.
///
/// Created with [`builder()`].
///
/// # Unnameable
///
/// This type is normally unnameable, forbidding naming of the type within
/// code. The type is exposed in the documentation to show which methods
/// can be publicly called.
#[derive(Debug)]
pub struct Cache<M, Dst, Ev>
where
M: Service<Dst>,
{
connector: M,
shared: Arc<Mutex<Shared<M::Response>>>,
events: Ev,
}
/// A builder to configure a `Cache`.
///
/// # Unnameable
///
/// This type is normally unnameable, forbidding naming of the type within
/// code. The type is exposed in the documentation to show which methods
/// can be publicly called.
#[derive(Debug)]
pub struct Builder<Ev> {
events: Ev,
}
/// A cached service returned from a [`Cache`].
///
/// Implements `Service` by delegating to the inner service. Once dropped,
/// tries to reinsert into the `Cache`.
///
/// # Unnameable
///
/// This type is normally unnameable, forbidding naming of the type within
/// code. The type is exposed in the documentation to show which methods
/// can be publicly called.
pub struct Cached<S> {
is_closed: bool,
inner: Option<S>,
shared: Weak<Mutex<Shared<S>>>,
// todo: on_idle
}
pub enum CacheFuture<M, Dst, Ev>
where
M: Service<Dst>,
{
Racing {
shared: Arc<Mutex<Shared<M::Response>>>,
select: future::Select<oneshot::Receiver<M::Response>, M::Future>,
events: Ev,
},
Connecting {
// TODO: could be Weak even here...
shared: Arc<Mutex<Shared<M::Response>>>,
future: M::Future,
},
Cached {
svc: Option<Cached<M::Response>>,
},
}
// shouldn't be pub
#[derive(Debug)]
pub struct Shared<S> {
services: Vec<S>,
waiters: Vec<oneshot::Sender<S>>,
}
// impl Builder
impl<Ev> Builder<Ev> {
/// Provide a `Future` executor to be used by the `Cache`.
///
/// The executor is used handle some optional background tasks that
/// can improve the behavior of the cache, such as reducing connection
/// thrashing when a race is won. If not configured with an executor,
/// the default behavior is to ignore any of these optional background
/// tasks.
///
/// The executor should implmenent [`hyper::rt::Executor`].
///
/// # Example
///
/// ```rust
/// # #[cfg(feature = "tokio")]
/// # fn run() {
/// let builder = hyper_util::client::pool::cache::builder()
/// .executor(hyper_util::rt::TokioExecutor::new());
/// # }
/// ```
pub fn executor<E>(self, exec: E) -> Builder<events::WithExecutor<E>> {
Builder {
events: events::WithExecutor(exec),
}
}
/// Build a `Cache` pool around the `connector`.
pub fn build<M, Dst>(self, connector: M) -> Cache<M, Dst, Ev>
where
M: Service<Dst>,
{
Cache {
connector,
events: self.events,
shared: Arc::new(Mutex::new(Shared {
services: Vec::new(),
waiters: Vec::new(),
})),
}
}
}
// impl Cache
impl<M, Dst, Ev> Cache<M, Dst, Ev>
where
M: Service<Dst>,
{
/// Retain all cached services indicated by the predicate.
pub fn retain<F>(&mut self, predicate: F)
where
F: FnMut(&mut M::Response) -> bool,
{
self.shared.lock().unwrap().services.retain_mut(predicate);
}
/// Check whether this cache has no cached services.
pub fn is_empty(&self) -> bool {
self.shared.lock().unwrap().services.is_empty()
}
}
impl<M, Dst, Ev> Service<Dst> for Cache<M, Dst, Ev>
where
M: Service<Dst>,
M::Future: Unpin,
M::Response: Unpin,
Ev: events::Events<BackgroundConnect<M::Future, M::Response>> + Clone + Unpin,
{
type Response = Cached<M::Response>;
type Error = M::Error;
type Future = CacheFuture<M, Dst, Ev>;
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
if !self.shared.lock().unwrap().services.is_empty() {
Poll::Ready(Ok(()))
} else {
self.connector.poll_ready(cx)
}
}
fn call(&mut self, target: Dst) -> Self::Future {
// 1. If already cached, easy!
let waiter = {
let mut locked = self.shared.lock().unwrap();
if let Some(found) = locked.take() {
return CacheFuture::Cached {
svc: Some(Cached::new(found, Arc::downgrade(&self.shared))),
};
}
let (tx, rx) = oneshot::channel();
locked.waiters.push(tx);
rx
};
// 2. Otherwise, we start a new connect, and also listen for
// any newly idle.
CacheFuture::Racing {
shared: self.shared.clone(),
select: future::select(waiter, self.connector.call(target)),
events: self.events.clone(),
}
}
}
impl<M, Dst, Ev> Clone for Cache<M, Dst, Ev>
where
M: Service<Dst> + Clone,
Ev: Clone,
{
fn clone(&self) -> Self {
Self {
connector: self.connector.clone(),
events: self.events.clone(),
shared: self.shared.clone(),
}
}
}
impl<M, Dst, Ev> Future for CacheFuture<M, Dst, Ev>
where
M: Service<Dst>,
M::Future: Unpin,
M::Response: Unpin,
Ev: events::Events<BackgroundConnect<M::Future, M::Response>> + Unpin,
{
type Output = Result<Cached<M::Response>, M::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
loop {
match &mut *self.as_mut() {
CacheFuture::Racing {
shared,
select,
events,
} => {
match ready!(Pin::new(select).poll(cx)) {
future::Either::Left((Err(_pool_closed), connecting)) => {
// pool was dropped, so we'll never get it from a waiter,
// but if this future still exists, then the user still
// wants a connection. just wait for the connecting
*self = CacheFuture::Connecting {
shared: shared.clone(),
future: connecting,
};
}
future::Either::Left((Ok(pool_got), connecting)) => {
events.on_race_lost(BackgroundConnect {
future: connecting,
shared: Arc::downgrade(&shared),
});
return Poll::Ready(Ok(Cached::new(
pool_got,
Arc::downgrade(&shared),
)));
}
future::Either::Right((connected, _waiter)) => {
let inner = connected?;
return Poll::Ready(Ok(Cached::new(
inner,
Arc::downgrade(&shared),
)));
}
}
}
CacheFuture::Connecting { shared, future } => {
let inner = ready!(Pin::new(future).poll(cx))?;
return Poll::Ready(Ok(Cached::new(inner, Arc::downgrade(&shared))));
}
CacheFuture::Cached { svc } => {
return Poll::Ready(Ok(svc.take().unwrap()));
}
}
}
}
}
// impl Cached
impl<S> Cached<S> {
fn new(inner: S, shared: Weak<Mutex<Shared<S>>>) -> Self {
Cached {
is_closed: false,
inner: Some(inner),
shared,
}
}
// TODO: inner()? looks like `tower` likes `get_ref()` and `get_mut()`.
/// Get a reference to the inner service.
pub fn inner(&self) -> &S {
self.inner.as_ref().expect("inner only taken in drop")
}
/// Get a mutable reference to the inner service.
pub fn inner_mut(&mut self) -> &mut S {
self.inner.as_mut().expect("inner only taken in drop")
}
}
impl<S, Req> Service<Req> for Cached<S>
where
S: Service<Req>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.as_mut().unwrap().poll_ready(cx).map_err(|err| {
self.is_closed = true;
err
})
}
fn call(&mut self, req: Req) -> Self::Future {
self.inner.as_mut().unwrap().call(req)
}
}
impl<S> Drop for Cached<S> {
fn drop(&mut self) {
if self.is_closed {
return;
}
if let Some(value) = self.inner.take() {
if let Some(shared) = self.shared.upgrade() {
if let Ok(mut shared) = shared.lock() {
shared.put(value);
}
}
}
}
}
impl<S: fmt::Debug> fmt::Debug for Cached<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("Cached")
.field(self.inner.as_ref().unwrap())
.finish()
}
}
// impl Shared
impl<V> Shared<V> {
fn put(&mut self, val: V) {
let mut val = Some(val);
while let Some(tx) = self.waiters.pop() {
if !tx.is_closed() {
match tx.send(val.take().unwrap()) {
Ok(()) => break,
Err(v) => {
val = Some(v);
}
}
}
}
if let Some(val) = val {
self.services.push(val);
}
}
fn take(&mut self) -> Option<V> {
// TODO: take in a loop
self.services.pop()
}
}
pub struct BackgroundConnect<CF, S> {
future: CF,
shared: Weak<Mutex<Shared<S>>>,
}
impl<CF, S, E> Future for BackgroundConnect<CF, S>
where
CF: Future<Output = Result<S, E>> + Unpin,
{
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(&mut self.future).poll(cx)) {
Ok(svc) => {
if let Some(shared) = self.shared.upgrade() {
if let Ok(mut locked) = shared.lock() {
locked.put(svc);
}
}
Poll::Ready(())
}
Err(_e) => Poll::Ready(()),
}
}
}
}
mod events {
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct Ignore;
#[derive(Clone, Debug)]
pub struct WithExecutor<E>(pub(super) E);
pub trait Events<CF> {
fn on_race_lost(&self, fut: CF);
}
impl<CF> Events<CF> for Ignore {
fn on_race_lost(&self, _fut: CF) {}
}
impl<E, CF> Events<CF> for WithExecutor<E>
where
E: hyper::rt::Executor<CF>,
{
fn on_race_lost(&self, fut: CF) {
self.0.execute(fut);
}
}
}
#[cfg(test)]
mod tests {
use futures_util::future;
use tower_service::Service;
use tower_test::assert_request_eq;
#[tokio::test]
async fn test_makes_svc_when_empty() {
let (mock, mut handle) = tower_test::mock::pair();
let mut cache = super::builder().build(mock);
handle.allow(1);
crate::common::future::poll_fn(|cx| cache.poll_ready(cx))
.await
.unwrap();
let f = cache.call(1);
future::join(f, async move {
assert_request_eq!(handle, 1).send_response("one");
})
.await
.0
.expect("call");
}
#[tokio::test]
async fn test_reuses_after_idle() {
let (mock, mut handle) = tower_test::mock::pair();
let mut cache = super::builder().build(mock);
// only 1 connection should ever be made
handle.allow(1);
crate::common::future::poll_fn(|cx| cache.poll_ready(cx))
.await
.unwrap();
let f = cache.call(1);
let cached = future::join(f, async {
assert_request_eq!(handle, 1).send_response("one");
})
.await
.0
.expect("call");
drop(cached);
crate::common::future::poll_fn(|cx| cache.poll_ready(cx))
.await
.unwrap();
let f = cache.call(1);
let cached = f.await.expect("call");
drop(cached);
}
}
//! Map pool utilities
//!
//! The map isn't a typical `Service`, but rather stand-alone type that can map
//! requests to a key and service factory. This is because the service is more
//! of a router, and cannot determine which inner service to check for
//! backpressure since it's not know until the request is made.
//!
//! The map implementation allows customization of extracting a key, and how to
//! construct a MakeService for that key.
//!
//! # Example
//!
//! ```rust,ignore
//! # async fn run() {
//! # use hyper_util::client::pool;
//! # let req = http::Request::new(());
//! # let some_http1_connector = || {
//! # tower::service::service_fn(|_req| async { Ok::<_, &'static str>(()) })
//! # };
//! let mut map = pool::map::Map::builder()
//! .keys(|uri| (uri.scheme().clone(), uri.authority().clone()))
//! .values(|_uri| {
//! some_http1_connector()
//! })
//! .build();
//!
//! let resp = map.service(req.uri()).call(req).await;
//! # }
//! ```
use std::collections::HashMap;
// expose the documentation
#[cfg(docsrs)]
pub use self::builder::Builder;
/// A map caching `MakeService`s per key.
///
/// Create one with the [`Map::builder()`].
pub struct Map<T, Req>
where
T: target::Target<Req>,
{
map: HashMap<T::Key, T::Service>,
targeter: T,
}
// impl Map
impl Map<builder::StartHere, builder::StartHere> {
/// Create a [`Builder`] to configure a new `Map`.
pub fn builder<Dst>() -> builder::Builder<Dst, builder::WantsKeyer, builder::WantsServiceMaker>
{
builder::Builder::new()
}
}
impl<T, Req> Map<T, Req>
where
T: target::Target<Req>,
{
fn new(targeter: T) -> Self {
Map {
map: HashMap::new(),
targeter,
}
}
}
impl<T, Req> Map<T, Req>
where
T: target::Target<Req>,
T::Key: Eq + std::hash::Hash,
{
/// Get a service after extracting the key from `req`.
pub fn service(&mut self, req: &Req) -> &mut T::Service {
let key = self.targeter.key(req);
self.map
.entry(key)
.or_insert_with(|| self.targeter.service(req))
}
/// Retains only the services specified by the predicate.
pub fn retain<F>(&mut self, predicate: F)
where
F: FnMut(&T::Key, &mut T::Service) -> bool,
{
self.map.retain(predicate);
}
/// Clears the map, removing all key-value pairs.
pub fn clear(&mut self) {
self.map.clear();
}
}
// sealed and unnameable for now
mod target {
pub trait Target<Dst> {
type Key;
type Service;
fn key(&self, dst: &Dst) -> Self::Key;
fn service(&self, dst: &Dst) -> Self::Service;
}
}
// sealed and unnameable for now
mod builder {
use std::marker::PhantomData;
/// A builder to configure a `Map`.
///
/// # Unnameable
///
/// This type is normally unnameable, forbidding naming of the type within
/// code. The type is exposed in the documentation to show which methods
/// can be publicly called.
pub struct Builder<Dst, K, S> {
_dst: PhantomData<fn(Dst)>,
keys: K,
svcs: S,
}
pub struct WantsKeyer;
pub struct WantsServiceMaker;
pub enum StartHere {}
pub struct Built<K, S> {
keys: K,
svcs: S,
}
impl<Dst> Builder<Dst, WantsKeyer, WantsServiceMaker> {
pub(super) fn new() -> Self {
Builder {
_dst: PhantomData,
keys: WantsKeyer,
svcs: WantsServiceMaker,
}
}
}
impl<Dst, S> Builder<Dst, WantsKeyer, S> {
/// Provide a closure that extracts a pool key for the destination.
pub fn keys<K, KK>(self, keyer: K) -> Builder<Dst, K, S>
where
K: Fn(&Dst) -> KK,
{
Builder {
_dst: PhantomData,
keys: keyer,
svcs: self.svcs,
}
}
}
impl<Dst, K> Builder<Dst, K, WantsServiceMaker> {
/// Provide a closure to create a new `MakeService` for the destination.
pub fn values<S, SS>(self, svcs: S) -> Builder<Dst, K, S>
where
S: Fn(&Dst) -> SS,
{
Builder {
_dst: PhantomData,
keys: self.keys,
svcs,
}
}
}
impl<Dst, K, S> Builder<Dst, K, S>
where
Built<K, S>: super::target::Target<Dst>,
<Built<K, S> as super::target::Target<Dst>>::Key: Eq + std::hash::Hash,
{
/// Build the `Map` pool.
pub fn build(self) -> super::Map<Built<K, S>, Dst> {
super::Map::new(Built {
keys: self.keys,
svcs: self.svcs,
})
}
}
impl super::target::Target<StartHere> for StartHere {
type Key = StartHere;
type Service = StartHere;
fn key(&self, _: &StartHere) -> Self::Key {
match *self {}
}
fn service(&self, _: &StartHere) -> Self::Service {
match *self {}
}
}
impl<K, KK, S, SS, Dst> super::target::Target<Dst> for Built<K, S>
where
K: Fn(&Dst) -> KK,
S: Fn(&Dst) -> SS,
KK: Eq + std::hash::Hash,
{
type Key = KK;
type Service = SS;
fn key(&self, dst: &Dst) -> Self::Key {
(self.keys)(dst)
}
fn service(&self, dst: &Dst) -> Self::Service {
(self.svcs)(dst)
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn smoke() {
let mut pool = super::Map::builder().keys(|_| "a").values(|_| "b").build();
pool.service(&"hello");
}
}
//! Composable pool services
//!
//! This module contains various concepts of a connection pool separated into
//! their own concerns. This allows for users to compose the layers, along with
//! any other layers, when constructing custom connection pools.
pub mod cache;
pub mod map;
pub mod negotiate;
pub mod singleton;
//! Negotiate a pool of services
//!
//! The negotiate pool allows for a service that can decide between two service
//! types based on an intermediate return value. It differs from typical
//! routing since it doesn't depend on the request, but the response.
//!
//! The original use case is support ALPN upgrades to HTTP/2, with a fallback
//! to HTTP/1.
//!
//! # Example
//!
//! ```rust,ignore
//! # async fn run() -> Result<(), Box<dyn std::error::Error>> {
//! # struct Conn;
//! # impl Conn { fn negotiated_protocol(&self) -> &[u8] { b"h2" } }
//! # let some_tls_connector = tower::service::service_fn(|_| async move {
//! # Ok::<_, std::convert::Infallible>(Conn)
//! # });
//! # let http1_layer = tower::layer::layer_fn(|s| s);
//! # let http2_layer = tower::layer::layer_fn(|s| s);
//! let mut pool = hyper_util::client::pool::negotiate::builder()
//! .connect(some_tls_connector)
//! .inspect(|c| c.negotiated_protocol() == b"h2")
//! .fallback(http1_layer)
//! .upgrade(http2_layer)
//! .build();
//!
//! // connect
//! let mut svc = pool.call(http::Uri::from_static("https://hyper.rs")).await?;
//! svc.ready().await;
//!
//! // http1 or http2 is now set up
//! # let some_http_req = http::Request::new(());
//! let resp = svc.call(some_http_req).await?;
//! # Ok(())
//! # }
//! ```
pub use self::internal::builder;
#[cfg(docsrs)]
pub use self::internal::Builder;
#[cfg(docsrs)]
pub use self::internal::Negotiate;
#[cfg(docsrs)]
pub use self::internal::Negotiated;
mod internal {
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{self, Poll};
use futures_core::ready;
use pin_project_lite::pin_project;
use tower_layer::Layer;
use tower_service::Service;
type BoxError = Box<dyn std::error::Error + Send + Sync>;
/// A negotiating pool over an inner make service.
///
/// Created with [`builder()`].
///
/// # Unnameable
///
/// This type is normally unnameable, forbidding naming of the type within
/// code. The type is exposed in the documentation to show which methods
/// can be publicly called.
#[derive(Clone)]
pub struct Negotiate<L, R> {
left: L,
right: R,
}
/// A negotiated service returned by [`Negotiate`].
///
/// # Unnameable
///
/// This type is normally unnameable, forbidding naming of the type within
/// code. The type is exposed in the documentation to show which methods
/// can be publicly called.
#[derive(Clone, Debug)]
pub enum Negotiated<L, R> {
#[doc(hidden)]
Fallback(L),
#[doc(hidden)]
Upgraded(R),
}
pin_project! {
pub struct Negotiating<Dst, L, R>
where
L: Service<Dst>,
R: Service<()>,
{
#[pin]
state: State<Dst, L::Future, R::Future>,
left: L,
right: R,
}
}
pin_project! {
#[project = StateProj]
enum State<Dst, FL, FR> {
Eager {
#[pin]
future: FR,
dst: Option<Dst>,
},
Fallback {
#[pin]
future: FL,
},
Upgrade {
#[pin]
future: FR,
}
}
}
pin_project! {
#[project = NegotiatedProj]
pub enum NegotiatedFuture<L, R> {
Fallback {
#[pin]
future: L
},
Upgraded {
#[pin]
future: R
},
}
}
/// A builder to configure a `Negotiate`.
///
/// # Unnameable
///
/// This type is normally unnameable, forbidding naming of the type within
/// code. The type is exposed in the documentation to show which methods
/// can be publicly called.
#[derive(Debug)]
pub struct Builder<C, I, L, R> {
connect: C,
inspect: I,
fallback: L,
upgrade: R,
}
#[derive(Debug)]
pub struct WantsConnect;
#[derive(Debug)]
pub struct WantsInspect;
#[derive(Debug)]
pub struct WantsFallback;
#[derive(Debug)]
pub struct WantsUpgrade;
/// Start a builder to construct a `Negotiate` pool.
pub fn builder() -> Builder<WantsConnect, WantsInspect, WantsFallback, WantsUpgrade> {
Builder {
connect: WantsConnect,
inspect: WantsInspect,
fallback: WantsFallback,
upgrade: WantsUpgrade,
}
}
impl<C, I, L, R> Builder<C, I, L, R> {
/// Provide the initial connector.
pub fn connect<CC>(self, connect: CC) -> Builder<CC, I, L, R> {
Builder {
connect,
inspect: self.inspect,
fallback: self.fallback,
upgrade: self.upgrade,
}
}
/// Provide the inspector that determines the result of the negotiation.
pub fn inspect<II>(self, inspect: II) -> Builder<C, II, L, R> {
Builder {
connect: self.connect,
inspect,
fallback: self.fallback,
upgrade: self.upgrade,
}
}
/// Provide the layer to fallback to if negotiation fails.
pub fn fallback<LL>(self, fallback: LL) -> Builder<C, I, LL, R> {
Builder {
connect: self.connect,
inspect: self.inspect,
fallback,
upgrade: self.upgrade,
}
}
/// Provide the layer to upgrade to if negotiation succeeds.
pub fn upgrade<RR>(self, upgrade: RR) -> Builder<C, I, L, RR> {
Builder {
connect: self.connect,
inspect: self.inspect,
fallback: self.fallback,
upgrade,
}
}
/// Build the `Negotiate` pool.
pub fn build<Dst>(self) -> Negotiate<L::Service, R::Service>
where
C: Service<Dst>,
C::Error: Into<BoxError>,
L: Layer<Inspector<C, C::Response, I>>,
L::Service: Service<Dst> + Clone,
<L::Service as Service<Dst>>::Error: Into<BoxError>,
R: Layer<Inspected<C::Response>>,
R::Service: Service<()> + Clone,
<R::Service as Service<()>>::Error: Into<BoxError>,
I: Fn(&C::Response) -> bool + Clone,
{
let Builder {
connect,
inspect,
fallback,
upgrade,
} = self;
let slot = Arc::new(Mutex::new(None));
let wrapped = Inspector {
svc: connect,
inspect,
slot: slot.clone(),
};
let left = fallback.layer(wrapped);
let right = upgrade.layer(Inspected { slot });
Negotiate { left, right }
}
}
impl<L, R> Negotiate<L, R> {
/// Get a mutable reference to the fallback service.
pub fn fallback_mut(&mut self) -> &mut L {
&mut self.left
}
/// Get a mutable reference to the upgrade service.
pub fn upgrade_mut(&mut self) -> &mut R {
&mut self.right
}
}
impl<L, R, Target> Service<Target> for Negotiate<L, R>
where
L: Service<Target> + Clone,
L::Error: Into<BoxError>,
R: Service<()> + Clone,
R::Error: Into<BoxError>,
{
type Response = Negotiated<L::Response, R::Response>;
type Error = BoxError;
type Future = Negotiating<Target, L, R>;
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.left.poll_ready(cx).map_err(Into::into)
}
fn call(&mut self, dst: Target) -> Self::Future {
let left = self.left.clone();
Negotiating {
state: State::Eager {
future: self.right.call(()),
dst: Some(dst),
},
// place clone, take original that we already polled-ready.
left: std::mem::replace(&mut self.left, left),
right: self.right.clone(),
}
}
}
impl<Dst, L, R> Future for Negotiating<Dst, L, R>
where
L: Service<Dst>,
L::Error: Into<BoxError>,
R: Service<()>,
R::Error: Into<BoxError>,
{
type Output = Result<Negotiated<L::Response, R::Response>, BoxError>;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
// States:
// - `Eager`: try the "right" path first; on `UseOther` sentinel, fall back to left.
// - `Fallback`: try the left path; on `UseOther` sentinel, upgrade back to right.
// - `Upgrade`: retry the right path after a fallback.
// If all fail, give up.
let mut me = self.project();
loop {
match me.state.as_mut().project() {
StateProj::Eager { future, dst } => match ready!(future.poll(cx)) {
Ok(out) => return Poll::Ready(Ok(Negotiated::Upgraded(out))),
Err(err) => {
let err = err.into();
if UseOther::is(&*err) {
let dst = dst.take().unwrap();
let f = me.left.call(dst);
me.state.set(State::Fallback { future: f });
continue;
} else {
return Poll::Ready(Err(err));
}
}
},
StateProj::Fallback { future } => match ready!(future.poll(cx)) {
Ok(out) => return Poll::Ready(Ok(Negotiated::Fallback(out))),
Err(err) => {
let err = err.into();
if UseOther::is(&*err) {
let f = me.right.call(());
me.state.set(State::Upgrade { future: f });
continue;
} else {
return Poll::Ready(Err(err));
}
}
},
StateProj::Upgrade { future } => match ready!(future.poll(cx)) {
Ok(out) => return Poll::Ready(Ok(Negotiated::Upgraded(out))),
Err(err) => return Poll::Ready(Err(err.into())),
},
}
}
}
}
impl<L, R> Negotiated<L, R> {
// Could be useful?
#[cfg(test)]
pub(super) fn is_fallback(&self) -> bool {
matches!(self, Negotiated::Fallback(_))
}
#[cfg(test)]
pub(super) fn is_upgraded(&self) -> bool {
matches!(self, Negotiated::Upgraded(_))
}
// TODO: are these the correct methods? Or .as_ref().fallback(), etc?
/// Get a reference to the fallback service if this is it.
pub fn fallback_ref(&self) -> Option<&L> {
if let Negotiated::Fallback(ref left) = self {
Some(left)
} else {
None
}
}
/// Get a mutable reference to the fallback service if this is it.
pub fn fallback_mut(&mut self) -> Option<&mut L> {
if let Negotiated::Fallback(ref mut left) = self {
Some(left)
} else {
None
}
}
/// Get a reference to the upgraded service if this is it.
pub fn upgraded_ref(&self) -> Option<&R> {
if let Negotiated::Upgraded(ref right) = self {
Some(right)
} else {
None
}
}
/// Get a mutable reference to the upgraded service if this is it.
pub fn upgraded_mut(&mut self) -> Option<&mut R> {
if let Negotiated::Upgraded(ref mut right) = self {
Some(right)
} else {
None
}
}
}
impl<L, R, Req, Res, E> Service<Req> for Negotiated<L, R>
where
L: Service<Req, Response = Res, Error = E>,
R: Service<Req, Response = Res, Error = E>,
{
type Response = Res;
type Error = E;
type Future = NegotiatedFuture<L::Future, R::Future>;
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
match self {
Negotiated::Fallback(ref mut s) => s.poll_ready(cx),
Negotiated::Upgraded(ref mut s) => s.poll_ready(cx),
}
}
fn call(&mut self, req: Req) -> Self::Future {
match self {
Negotiated::Fallback(ref mut s) => NegotiatedFuture::Fallback {
future: s.call(req),
},
Negotiated::Upgraded(ref mut s) => NegotiatedFuture::Upgraded {
future: s.call(req),
},
}
}
}
impl<L, R, Out> Future for NegotiatedFuture<L, R>
where
L: Future<Output = Out>,
R: Future<Output = Out>,
{
type Output = Out;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
match self.project() {
NegotiatedProj::Fallback { future } => future.poll(cx),
NegotiatedProj::Upgraded { future } => future.poll(cx),
}
}
}
// ===== internal =====
pub struct Inspector<M, S, I> {
svc: M,
inspect: I,
slot: Arc<Mutex<Option<S>>>,
}
pin_project! {
pub struct InspectFuture<F, S, I> {
#[pin]
future: F,
inspect: I,
slot: Arc<Mutex<Option<S>>>,
}
}
impl<M: Clone, S, I: Clone> Clone for Inspector<M, S, I> {
fn clone(&self) -> Self {
Self {
svc: self.svc.clone(),
inspect: self.inspect.clone(),
slot: self.slot.clone(),
}
}
}
impl<M, S, I, Target> Service<Target> for Inspector<M, S, I>
where
M: Service<Target, Response = S>,
M::Error: Into<BoxError>,
I: Clone + Fn(&S) -> bool,
{
type Response = M::Response;
type Error = BoxError;
type Future = InspectFuture<M::Future, S, I>;
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.svc.poll_ready(cx).map_err(Into::into)
}
fn call(&mut self, dst: Target) -> Self::Future {
InspectFuture {
future: self.svc.call(dst),
inspect: self.inspect.clone(),
slot: self.slot.clone(),
}
}
}
impl<F, I, S, E> Future for InspectFuture<F, S, I>
where
F: Future<Output = Result<S, E>>,
E: Into<BoxError>,
I: Fn(&S) -> bool,
{
type Output = Result<S, BoxError>;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let me = self.project();
let s = ready!(me.future.poll(cx)).map_err(Into::into)?;
Poll::Ready(if (me.inspect)(&s) {
*me.slot.lock().unwrap() = Some(s);
Err(UseOther.into())
} else {
Ok(s)
})
}
}
pub struct Inspected<S> {
slot: Arc<Mutex<Option<S>>>,
}
impl<S, Target> Service<Target> for Inspected<S> {
type Response = S;
type Error = BoxError;
type Future = std::future::Ready<Result<S, BoxError>>;
fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.slot.lock().unwrap().is_some() {
Poll::Ready(Ok(()))
} else {
Poll::Ready(Err(UseOther.into()))
}
}
fn call(&mut self, _dst: Target) -> Self::Future {
let s = self
.slot
.lock()
.unwrap()
.take()
.ok_or_else(|| UseOther.into());
std::future::ready(s)
}
}
impl<S> Clone for Inspected<S> {
fn clone(&self) -> Inspected<S> {
Inspected {
slot: self.slot.clone(),
}
}
}
#[derive(Debug)]
struct UseOther;
impl std::fmt::Display for UseOther {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("sentinel error; using other")
}
}
impl std::error::Error for UseOther {}
impl UseOther {
fn is(err: &(dyn std::error::Error + 'static)) -> bool {
let mut source = Some(err);
while let Some(err) = source {
if err.is::<UseOther>() {
return true;
}
source = err.source();
}
false
}
}
}
#[cfg(test)]
mod tests {
use futures_util::future;
use tower_service::Service;
use tower_test::assert_request_eq;
#[tokio::test]
async fn not_negotiated_falls_back_to_left() {
let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>();
let mut negotiate = super::builder()
.connect(mock_svc)
.inspect(|_: &&str| false)
.fallback(layer_fn(|s| s))
.upgrade(layer_fn(|s| s))
.build();
crate::common::future::poll_fn(|cx| negotiate.poll_ready(cx))
.await
.unwrap();
let fut = negotiate.call(());
let nsvc = future::join(fut, async move {
assert_request_eq!(handle, ()).send_response("one");
})
.await
.0
.expect("call");
assert!(nsvc.is_fallback());
}
#[tokio::test]
async fn negotiated_uses_right() {
let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>();
let mut negotiate = super::builder()
.connect(mock_svc)
.inspect(|_: &&str| true)
.fallback(layer_fn(|s| s))
.upgrade(layer_fn(|s| s))
.build();
crate::common::future::poll_fn(|cx| negotiate.poll_ready(cx))
.await
.unwrap();
let fut = negotiate.call(());
let nsvc = future::join(fut, async move {
assert_request_eq!(handle, ()).send_response("one");
})
.await
.0
.expect("call");
assert!(nsvc.is_upgraded());
}
fn layer_fn<F>(f: F) -> LayerFn<F> {
LayerFn(f)
}
#[derive(Clone)]
struct LayerFn<F>(F);
impl<F, S, Out> tower_layer::Layer<S> for LayerFn<F>
where
F: Fn(S) -> Out,
{
type Service = Out;
fn layer(&self, inner: S) -> Self::Service {
(self.0)(inner)
}
}
}
//! Singleton pools
//!
//! This ensures that only one active connection is made.
//!
//! The singleton pool wraps a `MakeService<T, Req>` so that it only produces a
//! single `Service<Req>`. It bundles all concurrent calls to it, so that only
//! one connection is made. All calls to the singleton will return a clone of
//! the inner service once established.
//!
//! This fits the HTTP/2 case well.
//!
//! ## Example
//!
//! ```rust,ignore
//! let mut pool = Singleton::new(some_make_svc);
//!
//! let svc1 = pool.call(some_dst).await?;
//!
//! let svc2 = pool.call(some_dst).await?;
//! // svc1 == svc2
//! ```
use std::sync::{Arc, Mutex};
use std::task::{self, Poll};
use tokio::sync::oneshot;
use tower_service::Service;
use self::internal::{DitchGuard, SingletonError, SingletonFuture, State};
type BoxError = Box<dyn std::error::Error + Send + Sync>;
#[cfg(docsrs)]
pub use self::internal::Singled;
/// A singleton pool over an inner service.
///
/// The singleton wraps an inner service maker, bundling all calls to ensure
/// only one service is created. Once made, it returns clones of the made
/// service.
#[derive(Debug)]
pub struct Singleton<M, Dst>
where
M: Service<Dst>,
{
mk_svc: M,
state: Arc<Mutex<State<M::Response>>>,
}
impl<M, Target> Singleton<M, Target>
where
M: Service<Target>,
M::Response: Clone,
{
/// Create a new singleton pool over an inner make service.
pub fn new(mk_svc: M) -> Self {
Singleton {
mk_svc,
state: Arc::new(Mutex::new(State::Empty)),
}
}
// pub fn clear? cancel?
/// Retains the inner made service if specified by the predicate.
pub fn retain<F>(&mut self, mut predicate: F)
where
F: FnMut(&mut M::Response) -> bool,
{
let mut locked = self.state.lock().unwrap();
match *locked {
State::Empty => {}
State::Making(..) => {}
State::Made(ref mut svc) => {
if !predicate(svc) {
*locked = State::Empty;
}
}
}
}
/// Returns whether this singleton pool is empty.
///
/// If this pool has created a shared instance, or is currently in the
/// process of creating one, this returns false.
pub fn is_empty(&self) -> bool {
matches!(*self.state.lock().unwrap(), State::Empty)
}
}
impl<M, Target> Service<Target> for Singleton<M, Target>
where
M: Service<Target>,
M::Response: Clone,
M::Error: Into<BoxError>,
{
type Response = internal::Singled<M::Response>;
type Error = SingletonError;
type Future = SingletonFuture<M::Future, M::Response>;
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
if let State::Empty = *self.state.lock().unwrap() {
return self
.mk_svc
.poll_ready(cx)
.map_err(|e| SingletonError(e.into()));
}
Poll::Ready(Ok(()))
}
fn call(&mut self, dst: Target) -> Self::Future {
let mut locked = self.state.lock().unwrap();
match *locked {
State::Empty => {
let fut = self.mk_svc.call(dst);
*locked = State::Making(Vec::new());
SingletonFuture::Driving {
future: fut,
singleton: DitchGuard(Arc::downgrade(&self.state)),
}
}
State::Making(ref mut waiters) => {
let (tx, rx) = oneshot::channel();
waiters.push(tx);
SingletonFuture::Waiting {
rx,
state: Arc::downgrade(&self.state),
}
}
State::Made(ref svc) => SingletonFuture::Made {
svc: Some(svc.clone()),
state: Arc::downgrade(&self.state),
},
}
}
}
impl<M, Target> Clone for Singleton<M, Target>
where
M: Service<Target> + Clone,
{
fn clone(&self) -> Self {
Self {
mk_svc: self.mk_svc.clone(),
state: self.state.clone(),
}
}
}
// Holds some "pub" items that otherwise shouldn't be public.
mod internal {
use std::future::Future;
use std::pin::Pin;
use std::sync::{Mutex, Weak};
use std::task::{self, Poll};
use futures_core::ready;
use pin_project_lite::pin_project;
use tokio::sync::oneshot;
use tower_service::Service;
use super::BoxError;
pin_project! {
#[project = SingletonFutureProj]
pub enum SingletonFuture<F, S> {
Driving {
#[pin]
future: F,
singleton: DitchGuard<S>,
},
Waiting {
rx: oneshot::Receiver<S>,
state: Weak<Mutex<State<S>>>,
},
Made {
svc: Option<S>,
state: Weak<Mutex<State<S>>>,
},
}
}
// XXX: pub because of the enum SingletonFuture
#[derive(Debug)]
pub enum State<S> {
Empty,
Making(Vec<oneshot::Sender<S>>),
Made(S),
}
// XXX: pub because of the enum SingletonFuture
pub struct DitchGuard<S>(pub(super) Weak<Mutex<State<S>>>);
/// A cached service returned from a [`Singleton`].
///
/// Implements `Service` by delegating to the inner service. If
/// `poll_ready` returns an error, this will clear the cache in the related
/// `Singleton`.
///
/// [`Singleton`]: super::Singleton
///
/// # Unnameable
///
/// This type is normally unnameable, forbidding naming of the type within
/// code. The type is exposed in the documentation to show which methods
/// can be publicly called.
#[derive(Debug)]
pub struct Singled<S> {
inner: S,
state: Weak<Mutex<State<S>>>,
}
impl<F, S, E> Future for SingletonFuture<F, S>
where
F: Future<Output = Result<S, E>>,
E: Into<BoxError>,
S: Clone,
{
type Output = Result<Singled<S>, SingletonError>;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
match self.project() {
SingletonFutureProj::Driving { future, singleton } => {
match ready!(future.poll(cx)) {
Ok(svc) => {
if let Some(state) = singleton.0.upgrade() {
let mut locked = state.lock().unwrap();
match std::mem::replace(&mut *locked, State::Made(svc.clone())) {
State::Making(waiters) => {
for tx in waiters {
let _ = tx.send(svc.clone());
}
}
State::Empty | State::Made(_) => {
// shouldn't happen!
unreachable!()
}
}
}
// take out of the DitchGuard so it doesn't treat as "ditched"
let state = std::mem::replace(&mut singleton.0, Weak::new());
Poll::Ready(Ok(Singled::new(svc, state)))
}
Err(e) => {
if let Some(state) = singleton.0.upgrade() {
let mut locked = state.lock().unwrap();
singleton.0 = Weak::new();
*locked = State::Empty;
}
Poll::Ready(Err(SingletonError(e.into())))
}
}
}
SingletonFutureProj::Waiting { rx, state } => match ready!(Pin::new(rx).poll(cx)) {
Ok(svc) => Poll::Ready(Ok(Singled::new(svc, state.clone()))),
Err(_canceled) => Poll::Ready(Err(SingletonError(Canceled.into()))),
},
SingletonFutureProj::Made { svc, state } => {
Poll::Ready(Ok(Singled::new(svc.take().unwrap(), state.clone())))
}
}
}
}
impl<S> Drop for DitchGuard<S> {
fn drop(&mut self) {
if let Some(state) = self.0.upgrade() {
if let Ok(mut locked) = state.lock() {
*locked = State::Empty;
}
}
}
}
impl<S> Singled<S> {
fn new(inner: S, state: Weak<Mutex<State<S>>>) -> Self {
Singled { inner, state }
}
}
impl<S, Req> Service<Req> for Singled<S>
where
S: Service<Req>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
// We notice if the cached service dies, and clear the singleton cache.
match self.inner.poll_ready(cx) {
Poll::Ready(Err(err)) => {
if let Some(state) = self.state.upgrade() {
*state.lock().unwrap() = State::Empty;
}
Poll::Ready(Err(err))
}
other => other,
}
}
fn call(&mut self, req: Req) -> Self::Future {
self.inner.call(req)
}
}
// An opaque error type. By not exposing the type, nor being specifically
// Box<dyn Error>, we can _change_ the type once we no longer need the Canceled
// error type. This will be possible with the refactor to baton passing.
#[derive(Debug)]
pub struct SingletonError(pub(super) BoxError);
impl std::fmt::Display for SingletonError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("singleton connection error")
}
}
impl std::error::Error for SingletonError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&*self.0)
}
}
#[derive(Debug)]
struct Canceled;
impl std::fmt::Display for Canceled {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("singleton connection canceled")
}
}
impl std::error::Error for Canceled {}
}
#[cfg(test)]
mod tests {
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;
use tower_service::Service;
use super::Singleton;
#[tokio::test]
async fn first_call_drives_subsequent_wait() {
let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>();
let mut singleton = Singleton::new(mock_svc);
handle.allow(1);
crate::common::future::poll_fn(|cx| singleton.poll_ready(cx))
.await
.unwrap();
// First call: should go into Driving
let fut1 = singleton.call(());
// Second call: should go into Waiting
let fut2 = singleton.call(());
// Expect exactly one request to the inner service
let ((), send_response) = handle.next_request().await.unwrap();
send_response.send_response("svc");
// Both futures should resolve to the same value
fut1.await.unwrap();
fut2.await.unwrap();
}
#[tokio::test]
async fn made_state_returns_immediately() {
let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>();
let mut singleton = Singleton::new(mock_svc);
handle.allow(1);
crate::common::future::poll_fn(|cx| singleton.poll_ready(cx))
.await
.unwrap();
// Drive first call to completion
let fut1 = singleton.call(());
let ((), send_response) = handle.next_request().await.unwrap();
send_response.send_response("svc");
fut1.await.unwrap();
// Second call should not hit inner service
singleton.call(()).await.unwrap();
}
#[tokio::test]
async fn cached_service_poll_ready_error_clears_singleton() {
// Outer mock returns an inner mock service
let (outer, mut outer_handle) =
tower_test::mock::pair::<(), tower_test::mock::Mock<(), &'static str>>();
let mut singleton = Singleton::new(outer);
// Allow the singleton to be made
outer_handle.allow(2);
crate::common::future::poll_fn(|cx| singleton.poll_ready(cx))
.await
.unwrap();
// First call produces an inner mock service
let fut1 = singleton.call(());
let ((), send_inner) = outer_handle.next_request().await.unwrap();
let (inner, mut inner_handle) = tower_test::mock::pair::<(), &'static str>();
send_inner.send_response(inner);
let mut cached = fut1.await.unwrap();
// Now: allow readiness on the inner mock, then inject error
inner_handle.allow(1);
// Inject error so next poll_ready fails
inner_handle.send_error(std::io::Error::new(
std::io::ErrorKind::Other,
"cached poll_ready failed",
));
// Drive poll_ready on cached service
let err = crate::common::future::poll_fn(|cx| cached.poll_ready(cx))
.await
.err()
.expect("expected poll_ready error");
assert_eq!(err.to_string(), "cached poll_ready failed");
// After error, the singleton should be cleared, so a new call drives outer again
outer_handle.allow(1);
crate::common::future::poll_fn(|cx| singleton.poll_ready(cx))
.await
.unwrap();
let fut2 = singleton.call(());
let ((), send_inner2) = outer_handle.next_request().await.unwrap();
let (inner2, mut inner_handle2) = tower_test::mock::pair::<(), &'static str>();
send_inner2.send_response(inner2);
let mut cached2 = fut2.await.unwrap();
// The new cached service should still work
inner_handle2.allow(1);
crate::common::future::poll_fn(|cx| cached2.poll_ready(cx))
.await
.expect("expected poll_ready");
let cfut2 = cached2.call(());
let ((), send_cached2) = inner_handle2.next_request().await.unwrap();
send_cached2.send_response("svc2");
cfut2.await.unwrap();
}
#[tokio::test]
async fn cancel_waiter_does_not_affect_others() {
let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>();
let mut singleton = Singleton::new(mock_svc);
crate::common::future::poll_fn(|cx| singleton.poll_ready(cx))
.await
.unwrap();
let fut1 = singleton.call(());
let fut2 = singleton.call(());
drop(fut2); // cancel one waiter
let ((), send_response) = handle.next_request().await.unwrap();
send_response.send_response("svc");
fut1.await.unwrap();
}
// TODO: this should be able to be improved with a cooperative baton refactor
#[tokio::test]
async fn cancel_driver_cancels_all() {
let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>();
let mut singleton = Singleton::new(mock_svc);
crate::common::future::poll_fn(|cx| singleton.poll_ready(cx))
.await
.unwrap();
let mut fut1 = singleton.call(());
let fut2 = singleton.call(());
// poll driver just once, and then drop
crate::common::future::poll_fn(move |cx| {
let _ = Pin::new(&mut fut1).poll(cx);
Poll::Ready(())
})
.await;
let ((), send_response) = handle.next_request().await.unwrap();
send_response.send_response("svc");
assert_eq!(
fut2.await.unwrap_err().0.to_string(),
"singleton connection canceled"
);
}
}
+1
-1
{
"git": {
"sha1": "203c9563a0ed51666e1829a5be3fbb33d79a3ba2"
"sha1": "d5740116a55cbf7af13d1142b365c56b1d684f3a"
},
"path_in_vcs": ""
}

@@ -129,2 +129,2 @@ name: CI

- uses: dtolnay/rust-toolchain@nightly
- run: cargo rustdoc -- --cfg docsrs -D rustdoc::broken_intra_doc_links
- run: cargo rustdoc --features full -- --cfg docsrs -D rustdoc::broken_intra_doc_links

@@ -248,3 +248,3 @@ # This file is automatically @generated by Cargo.

name = "hyper-util"
version = "0.1.18"
version = "0.1.19"
dependencies = [

@@ -270,3 +270,5 @@ "base64",

"tokio-test",
"tower-layer",
"tower-service",
"tower-test",
"tracing",

@@ -366,2 +368,22 @@ "windows-registry",

[[package]]
name = "pin-project"
version = "1.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"

@@ -633,2 +655,8 @@ version = "0.2.16"

[[package]]
name = "tower-layer"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e"
[[package]]
name = "tower-service"

@@ -640,2 +668,16 @@ version = "0.3.3"

[[package]]
name = "tower-test"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4546773ffeab9e4ea02b8872faa49bb616a80a7da66afc2f32688943f97efa7"
dependencies = [
"futures-util",
"pin-project",
"tokio",
"tokio-test",
"tower-layer",
"tower-service",
]
[[package]]
name = "tracing"

@@ -642,0 +684,0 @@ version = "0.1.41"

@@ -16,3 +16,3 @@ # THIS FILE IS AUTOMATICALLY GENERATED BY CARGO

name = "hyper-util"
version = "0.1.18"
version = "0.1.19"
authors = ["Sean McArthur <sean@seanmonstar.com>"]

@@ -65,2 +65,7 @@ build = false

]
client-pool = [
"client",
"dep:futures-util",
"dep:tower-layer",
]
client-proxy = [

@@ -80,2 +85,3 @@ "client",

"client-legacy",
"client-pool",
"client-proxy",

@@ -204,2 +210,6 @@ "client-proxy-system",

[dependencies.tower-layer]
version = "0.3"
optional = true
[dependencies.tower-service]

@@ -244,2 +254,5 @@ version = "0.3"

[dev-dependencies.tower-test]
version = "0.4"
[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies.pnet_datalink]

@@ -246,0 +259,0 @@ version = "0.35.0"

@@ -0,1 +1,9 @@

# 0.1.19 (2025-12-03)
- Add `client::pool` module for composable pools. Enable with the `client-pool` feature.
- Add `pool::singleton` for sharing a single cloneable connection.
- Add `pool::cache` for caching a list of connections.
- Add `pool::negotiate` for combining two pools with upgrade and fallback negotiation.
- Add `pool::map` for customizable mapping of keys and connections.
# 0.1.18 (2025-11-13)

@@ -2,0 +10,0 @@

@@ -21,3 +21,3 @@ mod errors;

/// This is a connector that can be used by the `legacy::Client`. It wraps
/// another connector, and after getting an underlying connection, it established
/// another connector, and after getting an underlying connection, it establishes
/// a TCP tunnel over it using SOCKSv4.

@@ -24,0 +24,0 @@ #[derive(Debug, Clone)]

@@ -21,3 +21,3 @@ mod errors;

/// This is a connector that can be used by the `legacy::Client`. It wraps
/// another connector, and after getting an underlying connection, it established
/// another connector, and after getting an underlying connection, it establishes
/// a TCP tunnel over it using SOCKSv5.

@@ -68,4 +68,5 @@ #[derive(Debug, Clone)]

/// Username and Password must be maximum of 255 characters each.
/// 0 length strings are allowed despite RFC prohibiting it. This is done so that
/// for compatablity with server implementations that require it for IP authentication.
/// 0 length strings are allowed despite RFC prohibiting it. This is done for
/// compatablity with server implementations that use empty credentials
/// to allow returning error codes during IP authentication.
pub fn with_auth(mut self, user: String, pass: String) -> Self {

@@ -87,6 +88,6 @@ self.config.proxy_auth = Some((user, pass));

///
/// Typical SOCKS handshake with auithentication takes 3 round trips. Optimistic sending
/// A typical SOCKS handshake with user/pass authentication takes 3 round trips Optimistic sending
/// can reduce round trip times and dramatically increase speed of handshake at the cost of
/// reduced portability; many server implementations do not support optimistic sending as it
/// is not defined in the RFC (RFC 1928).
/// is not defined in the RFC.
///

@@ -93,0 +94,0 @@ /// Recommended to ensure connector works correctly without optimistic sending before trying

@@ -7,3 +7,6 @@ //! HTTP client utilities

#[cfg(feature = "client-pool")]
pub mod pool;
#[cfg(feature = "client-proxy")]
pub mod proxy;

Sorry, the diff of this file is not supported yet