Building on a snippet and answer, would it be possible to return results to the caller from the throttling queue? I've tried PostAndAsyncReply to receive reply on a channel but it's throwing an error if I pipe it with Enqueue. Here's the code.
Appreciate a F# core vanilla based solution around Queue or Mailbox design patterns.
Question
The question is to be able to call functions asynchronously based on the throttle (max 3 at a time), passing each item from the array, wait on the whole queue/array until it's finished while collecting all the results and then return the results to the caller. (Return the results to the caller is what's pending in here)
Callee Code
// Message type used by the agent - contains queueing
// of work items and notification of completion
type ThrottlingAgentMessage =
| Completed
| Enqueue of Async<unit>
/// Represents an agent that runs operations in concurrently. When the number
/// of concurrent operations exceeds 'limit', they are queued and processed later
let throttlingAgent limit =
MailboxProcessor.Start(fun inbox ->
async {
// The agent body is not executing in parallel,
// so we can safely use mutable queue & counter
let queue = System.Collections.Generic.Queue<Async<unit>>()
let running = ref 0
while true do
// Enqueue new work items or decrement the counter
// of how many tasks are running in the background
let! msg = inbox.Receive()
match msg with
| Completed -> decr running
| Enqueue w -> queue.Enqueue(w)
// If we have less than limit & there is some work to
// do, then start the work in the background!
while running.Value < limit && queue.Count > 0 do
let work = queue.Dequeue()
incr running
do! // When the work completes, send 'Completed'
// back to the agent to free a slot
async {
do! work
inbox.Post(Completed)
}
|> Async.StartChild
|> Async.Ignore
})
let requestDetailAsync (url: string) : Async<Result<string, Error>> =
async {
Console.WriteLine ("Simulating request " + url)
try
do! Async.Sleep(1000) // let's say each request takes about a second
return Ok (url + ":body...")
with :? WebException as e ->
return Error {Code = "500"; Message = "Internal Server Error"; Status = HttpStatusCode.InternalServerError}
}
let requestMasterAsync() : Async<Result<System.Collections.Concurrent.ConcurrentBag<_>, Error>> =
async {
let urls = [|
"http://www.example.com/1";
"http://www.example.com/2";
"http://www.example.com/3";
"http://www.example.com/4";
"http://www.example.com/5";
"http://www.example.com/6";
"http://www.example.com/7";
"http://www.example.com/8";
"http://www.example.com/9";
"http://www.example.com/10";
|]
let results = System.Collections.Concurrent.ConcurrentBag<_>()
let agent = throttlingAgent 3
for url in urls do
async {
let! res = requestDetailAsync url
results.Add res
}
|> Enqueue
|> agent.Post
return Ok results
}
Caller Code
[<TestMethod>]
member this.TestRequestMasterAsync() =
match Entity.requestMasterAsync() |> Async.RunSynchronously with
| Ok result -> Console.WriteLine result
| Error error -> Console.WriteLine error
You could use
Hopac.Streamsfor that. With such tool it is pretty trivial:ADDED In case you want to use only vanilla
FSharp.Corewith mailboxes only try this:It is pretty much the same logic, but slightly optimized to not use queue if there is free worker capacity (just start job and don't bother with queue/dequeue).
throttlingAgentis a functionint -> Async<unit> -> unitBecause we don't want client to bother with our internalThrottlingAgentMessagetype.use like this: