(stream: T) -> Response where T: Strea" /> (stream: T) -> Response where T: Strea" /> (stream: T) -> Response where T: Strea"/>

How to return a stream using Hyper in Rust

106 views Asked by At

I am migrating from hyper = "0.14" to hyper = "1".

I have this function in v0.14:

fn response_stream<T, P>(stream: T) -> Response<Body>
where
    T: Stream<Item = P> + Send + 'static,
    P: Serialize + Send + 'static,
{
    let stream = stream.map(|item| serde_json::to_string(&item).map(|s| format!("{}\n", s)));
    let body = Body::wrap_stream(stream);
    Response::builder()
        .status(StatusCode::OK)
        .header(CONTENT_TYPE, "application/jsonlines")
        .body(body)
        .expect("Failed to build Response")
}

That takes as input some Stream of a serializable item.

We serialize the items in JSON, add a newline, and return the stream.

An implementation of jsonlines where each line is a distinct item.

I am not able to migrate this function.

I attempted with:

fn response_stream<T, P>(stream: T) -> Response<BoxBody<Bytes, Infallible>>
where
    T: Stream<Item = P> + Send + 'static,
    P: Serialize + 'static,
{
    let stream = stream.map(|item| serde_json::to_string(&item).map(|s| format!("{}\n", s)).unwrap()).map(Bytes::from);
    let body = StreamBody::new(stream);
    let body  = body.boxed();
    
    Response::builder()
        .status(StatusCode::OK)
        .header(CONTENT_TYPE, "application/jsonlines")
        .body(body)
        .expect("Failed to build Response")
}

But the compilers complains:

error[E0308]: mismatched types
  --> src/api/common.rs:34:5
   |
25 |   pub fn response_stream<T, P>(stream: T) -> Response<BoxBody<Bytes, Infallible>>
   |                                              ------------------------------------ expected `hyper::Response<BoxBody<bytes::Bytes, Infallible>>` because of return type
...
34 | /     Response::builder()
35 | |         .status(StatusCode::OK)
36 | |         .header(CONTENT_TYPE, "application/jsonlines")
37 | |         .body(body)
38 | |         .expect("Failed to build Response")
   | |___________________________________________^ expected `Response<BoxBody<Bytes, Infallible>>`, found `Response<Pin<Box<dyn Stream<Item = Bytes> + Send>>>`
   |
   = note: expected struct `hyper::Response<BoxBody<bytes::Bytes, Infallible>>`
              found struct `hyper::Response<Pin<Box<dyn futures_core::Stream<Item = bytes::Bytes> + std::marker::Send>>>`

I believe I understand the error, but it does not help in moving forward.

From a very low level perspective, what needs to happen is to keep listening to the stream and whenever a new element is detected, serialize it and push it into the socket.

But I am unable to express this.

1

There are 1 answers

0
Siscia On

I was able to slowly working through the issue and get to the following code that is compiling.

pub fn response_stream<T, P>(stream: T) -> Response<BoxBody<Bytes, Infallible>>
where
    T: Stream<Item = P> + Send + Sync + 'static,
    P: Serialize + 'static,
{
    let stream = stream.map(|item| {
        let s = serde_json::to_string(&item).unwrap();
        let s = format!("{}\n", s);
        let b = Bytes::from(s);
        let f = Frame::data(b);
        Ok(f)
    });
    let stream = StreamBody::new(stream);
    let body = BoxBody::new(stream);

    Response::builder()
        .status(StatusCode::OK)
        .header(CONTENT_TYPE, "application/jsonlines")
        .body(body)
        .expect("Failed to build Response")
}