I try write some simple akka-http and akka-streams based application, that handle http requests, always with one precompiled stream, because I plan to use long time processing with back-pressure in my requestProcessor stream
My application code:
import akka.actor.{ActorSystem, Props}
import akka.http.scaladsl._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import akka.stream.ActorFlowMaterializer
import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl.{Sink, Source}
import scala.annotation.tailrec
import scala.concurrent.Future
object UserRegisterSource {
  def props: Props = Props[UserRegisterSource]
  final case class RegisterUser(username: String)
}
class UserRegisterSource extends ActorPublisher[UserRegisterSource.RegisterUser] {
  import UserRegisterSource._
  import akka.stream.actor.ActorPublisherMessage._
  val MaxBufferSize = 100
  var buf = Vector.empty[RegisterUser]
  override def receive: Receive = {
    case request: RegisterUser =>
      if (buf.isEmpty && totalDemand > 0)
        onNext(request)
      else {
        buf :+= request
        deliverBuf()
      }
    case Request(_) =>
      deliverBuf()
    case Cancel =>
      context.stop(self)
  }
  @tailrec final def deliverBuf(): Unit =
    if (totalDemand > 0) {
      if (totalDemand <= Int.MaxValue) {
        val (use, keep) = buf.splitAt(totalDemand.toInt)
        buf = keep
        use foreach onNext
      } else {
        val (use, keep) = buf.splitAt(Int.MaxValue)
        buf = keep
        use foreach onNext
        deliverBuf()
      }
    }
}
object Main extends App {
  val host = "127.0.0.1"
  val port = 8094
  implicit val system = ActorSystem("my-testing-system")
  implicit val fm = ActorFlowMaterializer()
  implicit val executionContext = system.dispatcher
  val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] = Http(system).bind(interface = host, port = port)
  val mySource = Source.actorPublisher[UserRegisterSource.RegisterUser](UserRegisterSource.props)
  val requestProcessor = mySource
    .mapAsync(1)(fakeSaveUserAndReturnCreatedUserId)
    .to(Sink.head[Int])
    .run()
  val route: Route =
    get {
      path("test") {
        parameter('test) { case t: String =>
          requestProcessor ! UserRegisterSource.RegisterUser(t)
          ???
        }
      }
    }
  def fakeSaveUserAndReturnCreatedUserId(param: UserRegisterSource.RegisterUser): Future[Int] =
    Future.successful {
      1
    }
  serverSource.to(Sink.foreach {
    connection =>
      connection handleWith Route.handlerFlow(route)
  }).run()
}
I found solution about how create Source that can dynamically accept new items to process, but I can found any solution about how than obtain result of stream execution in my route
                        
The direct answer to your question is to materialize a new Stream for each HttpRequest and use
Sink.headto get the value you're looking for. Modifying your code:However, I think your question is ill posed. In your code example the only thing you're using an akka Stream for is to create a new UserId. Futures readily solve this problem without the need for a materialized Stream (and all the accompanying overhead):
If you want to limit the number of concurrent calls to
fakeSaveUserAndReturnCreateUserIdthen you can create anExecutionContextwith a defined ThreadPool size, as explained in the answer to this question, and use that ExecutionContext to create the Futures: