I am migrating from hyper = "0.14" to hyper = "1".
I have this function in v0.14:
fn response_stream<T, P>(stream: T) -> Response<Body>
where
T: Stream<Item = P> + Send + 'static,
P: Serialize + Send + 'static,
{
let stream = stream.map(|item| serde_json::to_string(&item).map(|s| format!("{}\n", s)));
let body = Body::wrap_stream(stream);
Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, "application/jsonlines")
.body(body)
.expect("Failed to build Response")
}
That takes as input some Stream of a serializable item.
We serialize the items in JSON, add a newline, and return the stream.
An implementation of jsonlines where each line is a distinct item.
I am not able to migrate this function.
I attempted with:
fn response_stream<T, P>(stream: T) -> Response<BoxBody<Bytes, Infallible>>
where
T: Stream<Item = P> + Send + 'static,
P: Serialize + 'static,
{
let stream = stream.map(|item| serde_json::to_string(&item).map(|s| format!("{}\n", s)).unwrap()).map(Bytes::from);
let body = StreamBody::new(stream);
let body = body.boxed();
Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, "application/jsonlines")
.body(body)
.expect("Failed to build Response")
}
But the compilers complains:
error[E0308]: mismatched types
--> src/api/common.rs:34:5
|
25 | pub fn response_stream<T, P>(stream: T) -> Response<BoxBody<Bytes, Infallible>>
| ------------------------------------ expected `hyper::Response<BoxBody<bytes::Bytes, Infallible>>` because of return type
...
34 | / Response::builder()
35 | | .status(StatusCode::OK)
36 | | .header(CONTENT_TYPE, "application/jsonlines")
37 | | .body(body)
38 | | .expect("Failed to build Response")
| |___________________________________________^ expected `Response<BoxBody<Bytes, Infallible>>`, found `Response<Pin<Box<dyn Stream<Item = Bytes> + Send>>>`
|
= note: expected struct `hyper::Response<BoxBody<bytes::Bytes, Infallible>>`
found struct `hyper::Response<Pin<Box<dyn futures_core::Stream<Item = bytes::Bytes> + std::marker::Send>>>`
I believe I understand the error, but it does not help in moving forward.
From a very low level perspective, what needs to happen is to keep listening to the stream and whenever a new element is detected, serialize it and push it into the socket.
But I am unable to express this.
I was able to slowly working through the issue and get to the following code that is compiling.