I want to do a Post-Fire-Reply to an agent. Basically the agent triggers an event then replies to the caller. However I either keep getting a timeout error or the events do not fire correctly. I tried doing Post-Fire, that stopped the timeout errors but the events do not fire.
let evt = new Event<int>()
let stream = evt.Publish
type Agent<'T> = MailboxProcessor<'T>
type Fire = Fire of int
let agent = Agent.Start(fun inbox ->
let rec loop() = async {
let! msg = inbox.Receive()
let (Fire i) = msg
evt.Trigger i }
loop())
let on i fn =
stream
|> Observable.filter (fun x -> x = i)
|> Observable.filter (fun x -> x <> 1)
|> Observable.subscribe (fun x -> fn x)
let rec collatz n =
printfn "%d" n
on n (fun i ->
if (i % 2 = 0) then collatz (i/2)
else collatz (3*n + 1)) |> ignore
agent.Post (Fire n) // this does not work
// evt.Trigger n // this does works
collatz 13
This is a simple experiment that repeatedly creates a function to find the next number in the Collatz series and then calls itself to return the value until it reaches 1.
What seems to happen is that the trigger only fires once. I tried experimenting with every combination of Async.RunSynchronously / Async.Start / StartChild / SynchronizationContext that I could think of but no progress. I found a blog similar to what I am doing but that didn't help me neither
EDIT Thank you Fyodor Soikin for pointing out my oversight. The original problem still remains in that I wish to both fire events and reply with a result, but get a timeout.
let evt = new Event<int>()
let stream = evt.Publish
type Agent<'T> = MailboxProcessor<'T>
type Command =
| Fire of int
| Get of int * AsyncReplyChannel<int>
let agent = Agent.Start(fun inbox ->
let rec loop() = async {
let! msg = inbox.Receive()
match msg with
| Fire i -> evt.Trigger i
| Get (i,ch) ->
evt.Trigger i
ch.Reply(i)
return! loop() }
loop())
let on i fn =
stream
|> Observable.filter (fun x -> x = i)
|> Observable.filter (fun x -> x <> 1)
|> Observable.subscribe (fun x -> fn x)
let rec collatz n =
printfn "%d" n
on n (fun i ->
if (i % 2 = 0) then collatz (i/2)
else collatz (3*n + 1)) |> ignore
agent.PostAndReply (fun ch -> (Get (n, ch))) |> ignore // timeout
agent.PostAndAsyncReply (fun ch -> (Get (n, ch))) |> Async.Ignore |> Async.Start // works but I need the result
agent.PostAndAsyncReply (fun ch -> (Get (n, ch))) |> Async.RunSynchronously |> ignore // timeout
collatz 13
Your
loopfunction doesn't loop. It receives the first message, triggers the event, and then just... exits. Never attempts to receive a second message.You need to make that function work continuously: process the first message, then go right back to receive the next one, then go receive the next one, and so on. Like this:
Edit
Since you've reached your limit on questions, I will answer your edit here.
The reason you're getting timeouts in your second snippet is that you have a deadlock in your code. Let's trace the execution to see that.
collatzcall.collatzcall posts a message to the agent.collatzcall happens.collatzcall posts a message to the agent.collatzcall starts waiting for the agent to respond.And this is where the execution ends. The agent cannot respond at this point (in fact, it cannot even receive the next message!), because its instruction pointer is still inside
evt.Trigger. Theevt.Triggercall hasn't yet returned, so theloopfunction hasn't yet recursed, so theinbox.Receivefunction hasn't yet been called, so the second message is still waiting in the agent's queue.So you get yourself a classic deadlock:
collatzis waiting for the agent to receive its message, but the agent is waiting forcollatzto finish handling the event.The simplest, dumbest solution to this would be to just trigger the event asynchronously:
This will make sure that the event handler is executed not "right there", but asynchronously, possibly on a different thread. This will in turn allow the agent not to wait for the event to be processed before it can continue its own execution loop.
In general though, when dealing with multithreading and asynchrony, one should never call unknown code directly. The agent should never directly call
evt.Trigger, or anything else that it doesn't control, because that code might be waiting on the agent itself (which is what happened in your case), thus introducing the deadlock.