I would like to track in-flight connections in warp such that a metrics counter is incremented before the request is handled and decremented after it was processed.
I attempted to solve this by using a "no-op" filter in the start of the chain and a custom logging filter in the end of the chain; something like that:
/// Increment the request count metric before the requests starts.
fn with_start_call_metrics() -> impl Filter<Extract = (), Error = Infallible> + Clone {
warp::any()
.and(path::full())
.map(|path: FullPath| {
HttpMetrics::inc_in_flight(path.as_str());
})
.untuple_one()
}
/// Decrement the request count metric after the request ended.
fn with_end_call_metrics() -> Log<fn(Info<'_>)> {
warp::log::custom(|info| {
HttpMetrics::dec_in_flight(info.path());
// ... track more metrics, e.g. info.elapsed() ...
})
}
The problem arises when a long-running request (/slow in the code below) is started and the connection is dropped before the request could be processed completely (e.g. CTRL-C on curl).
In this case, the slow route is simply aborted by warp and the with_end_call_metrics filter below is never reached:
#[tokio::main]
async fn main() {
let hello = warp::path!("hello" / String).and_then(hello);
let slow = warp::path!("slow").and_then(slow);
warp::serve(
with_start_call_metrics()
.and(
hello.or(slow), // ... and more ...
)
// If the call (e.g. of `slow`) is cancelled, this is never reached.
.with(with_end_call_metrics()),
)
.run(([127, 0, 0, 1], 8080))
.await;
}
async fn hello(name: String) -> Result<impl warp::Reply, warp::Rejection> {
Ok(format!("Hello, {}!", name))
}
async fn slow() -> Result<impl warp::Reply, warp::Rejection> {
tokio::time::sleep(Duration::from_secs(5)).await;
Ok(format!("That was slow."))
}
I understand this is normal behavior and the recommended way is to rely on the Drop implementation of a type in the request, as that would always be called, so something like:
async fn in_theory<F, T, E>(filter: F) -> Result<T, E>
where
F: Filter<Extract = T, Error = E>
{
let guard = TrackingGuard::new();
filter.await
}
But that doesn't work. I tried using wrap_fn like so:
pub fn in_theory<F>(filter: F) -> Result<F::Extract, F::Error>
where
F: Filter + Clone,
{
warp::any()
.and(filter)
.wrap_fn(|f| async {
// ... magic here ...
f.await
})
}
but regardless of what I try, it always ends up with an error like this:
error[E0277]: the trait bound `<F as warp::filter::FilterBase>::Error: reject::sealed::CombineRejection<Infallible>` is not satisfied
--> src/metrics.rs:255:25
|
255 | warp::any().and(filter).wrap_fn(|f| async { f.await })
| --- ^^^^^^ the trait `reject::sealed::CombineRejection<Infallible>` is not implemented for `<F as warp::filter::FilterBase>::Error`
| |
| required by a bound introduced by this call
And that cannot be specified, because reject::sealed is not a public module.
Any help is appreciated!
As was suggested in the comments, moving away from warp and using Tower for building the middleware helped. I had to rewrite the code for hosting the server to use
hyper::Serverdirectly but this was only a mild inconvenience.I started off with an
HttpCallMetricsservice wrapping an inner serviceS. Since I am tracking HTTP responses, I need that service to ultimately produce ahyper::Response, which is indicated here by type argumentO.The phantom data is here such that I can indicate
Oon the struct; not addingOhere would prevent theServiceimplementation to fail due to missing trait bounds.Because it is about HTTP metrics, the service also specifically deals with HTTP requests and hence implements
Service<Request<B>>for any body type B. Likewise, the wrapped service needs to be the same and its output needs to be convertible to aResponse<O>.The
HttpCallMetricsservice will produce a custom futureHttpCallMetricsFuturethat takes care of the metrics tracking; this is to avoid boxing here. Apart from that, since metrics never block, it forwards itspoll_readycall to the wrapped inner service.When called, a
HttpCallMetricTrackerinstance is created from the request. This is a struct that holds basic request information (HTTP method, version, path, start time instance) and implementsDrop- when dropped, it will register that the request terminated. This will work regardless of cancellation or finishing a request successfully.The implemented future again requires a phantom data hack for keeping track of the success variant
Oand error variantEof the service's future.The implementation is then comparatively simple: In essence, the
pollcall is forwarded to the wrapped inner future, and the method exits if that future is stillPoll::Pending.The moment the future returns
Poll::Readyit will be inspected for its result variant and if it is anOk(result)the result is converted into ahyper::Response. Metrics are then updated and the response is returned.In case of an error variant, the error is essentially returned as is.
The
HttpCallMetricTrackeris more or less trivial, it increments call metrics when constructed and decrements call metrics when dropped.The only interesting point here would be the
state: Cell<ResultState>field. This allows theDropimplementation to infer whether something should be logged or not. It's not strictly required hereAs far as hosting goes, the code now looks something like that:
That said, there also exists tower_http::trace which indeed seems to support all of the above. I will likely migrate to that later on, but this exercise helped me tremendously in understanding Tower in the first place.