Here I have a function which accept a TCP connection and run two Lwt threads handle_connection and send_message. Each time that a connection is terminated I am notified in the handle_connection thread, so I can terminate it loops, but then I want to terminate the whole join <&> thread to go further with another connection in the next serv recursive call.
let create_server sock =
let rec serve () =
Lwt_unix.accept sock
>>= (fun (fd, _) ->
connection := true;
let ic = Lwt_io.of_fd ~mode:Lwt_io.Input fd in
let oc = Lwt_io.of_fd ~mode:Lwt_io.Output fd in
handle_connection ic oc <&> send_message oc)
>>= serve
in
serve ()
The question is, how can I force the send_message thread to terminate each time that handle_connection terminates ?
let handle_connection ic oc =
Lwt.on_failure (handle_message ic oc "client") (fun e ->
Logs.err (fun m -> m "%s" (Printexc.to_string e)));
Logs_lwt.info (fun m -> m "New connection")
let rec send_message oc =
let* s = read_console () in
Lwt_io.write_line oc s >>= fun _ -> send_message oc)
I have already tried to use Lwt.choose instead of Lwt.join, it passes to the next connection when client is disconnected but the send_message thread is still running on the terminated connection.
I hesitate to comment on this as I suspect that you already know this, but as a matter of general principle the most fundamental way in which you conditionally wait on a promise in Lwt is to construct the promise using
Lwt.wait, and then bind on the promise using operatorlet*or operator>>=until the promise's resolver either fulfils the promise by means ofLwt.wakeup_lateror (in your special case) rejects it usingLwt.wakeup_later_exn. Alternatively in the latter case you could construct the promise usingLwt.taskand reject the promise directly by cancellation withLwt.cancel, but I believe cancellation is now deprecated or at least discouraged.There is a
Lwt.pickfunction which, on a promise being fulfilled, will cancel any others which are bound by the pick, but that is the inverse of what you want. What this means is that I think you are going to have to restructure your code to expose the conditional promise.