CapnProto - Request to other server in callback

527 views Asked by At

I am a newbie of CapnProto.
I want to request a function of server2 in the callback of server1.
But I got exception like below.
Please help me resolve it.
Many thanks!

* thread #1, queue = 'com.apple.main-thread', stop reason = EXC_BAD_ACCESS (code=1, address=0x3000000000020)
  * frame #0: 0x000000010035859c libcapnp-rpc-0.10.2.dylib`capnp::VatNetwork<capnp::rpc::twoparty::VatId, capnp::rpc::twoparty::ProvisionId, capnp::rpc::twoparty::RecipientId, capnp::rpc::twoparty::ThirdPartyCapId, capnp::rpc::twoparty::JoinResult>::baseConnect(capnp::AnyStruct::Reader) + 20
    frame #1: 0x0000000100361a2c libcapnp-rpc-0.10.2.dylib`kj::_::TransformPromiseNode<kj::_::Void, kj::Own<kj::AsyncIoStream>, capnp::EzRpcClient::Impl::Impl(kj::StringPtr, unsigned int, capnp::ReaderOptions)::'lambda'(kj::Own<kj::AsyncIoStream>&&), kj::_::PropagateException>::getImpl(kj::_::ExceptionOrValue&) + 512
    frame #2: 0x00000001004d2220 libkj-async-0.10.2.dylib`kj::_::RunnableImpl<kj::_::TransformPromiseNodeBase::get(kj::_::ExceptionOrValue&)::$_31>::run() + 32
    frame #3: 0x000000010028aaa4 libkj-0.10.2.dylib`kj::_::runCatchingExceptions(kj::_::Runnable&) + 40
    frame #4: 0x00000001004c7e48 libkj-async-0.10.2.dylib`kj::_::TransformPromiseNodeBase::get(kj::_::ExceptionOrValue&) + 64
    frame #5: 0x00000001004c8684 libkj-async-0.10.2.dylib`kj::_::ForkHubBase::fire() + 60
    frame #6: 0x00000001004c654c libkj-async-0.10.2.dylib`kj::_::waitImpl(kj::Own<kj::_::PromiseNode>&&, kj::_::ExceptionOrValue&, kj::WaitScope&, kj::SourceLocation) + 608
    frame #7: 0x0000000100005198 client`kj::Promise<capnp::Response<SampleServer1::CallbackRegisterResults> >::wait(kj::WaitScope&, kj::SourceLocation) + 120
    frame #8: 0x0000000100004b94 client`main + 344
    frame #9: 0x000000010003d08c dyld`start + 520

Source code implementation sample as below

SampleServer1.capnp:

interface SampleServer1 {

    callbackRegister @0 (callback :Callback) -> ();   //to register a callback

    interface Callback {
        calbackFunc @0 (in :Int32) -> ();
    }
}

SampleServer1::Server Impl:

class SampleServer1Impl : public SampleServer1::Server
{
::kj::Promise<void> callbackRegister(CallbackRegisterContext context){
    auto cb = context.getParams().getCallback());

    auto request = cb.calbackFuncRequest();   //Call callback function
    request.setIn(111);
    auto promise = request.send();

    return kj::READY_NOW;
  }
}

SampleServer1::Callback::Server Impl:

class CallbackImpl : public SampleServer1::Callback::Server
{
::kj::Promise<void> calbackFunc(CalbackFuncContext context){

    capnp::EzRpcClient ezClient2("unix:/tmp/capnp-server-2");
    SampleServer2::Client client2 = ezClient2.getMain<SampleServer2>();

    auto& waitScope = ezClient2.getWaitScope();
    {
      auto request = client2.functionSampleRequest();  //Request to SERVER2
      request.setIn(222);
      auto promise = request.send();
     
      promise.wait(waitScope);
      
    }

    return kj::READY_NOW;
  }
}

SampleServer2.capnp:

interface SampleServer2 {

    functionSample @0 (in :Int32) -> ();
}

SampleServer2::Server impl

class SampleServer2 Impl : public SampleServer2::Server
{
::kj::Promise<void> functionSample(FunctionSampleContext context){
    //Do something
    return kj::READY_NOW;
  }
}

Client implement

  capnp::EzRpcClient ezClient("unix:/tmp/capnp-server-1");
  SampleServer1::Client client = ezClient.getMain<SampleServer1>();
  auto& waitScope = ezClient.getWaitScope();

 ::SampleServer1::Callback::Client callback = ::SampleServer1::Callback::Client(kj::heap<CallbackImpl>());

  auto request = client.callbackRegisterRequest();      //Register a callback to Server1
  request.setCallback(callback);
  auto promise = request.send();
2

There are 2 answers

6
Duy Nguyễn On

There is a problem with the implementation of calbackFunc. The calbackFunc itself is executed from NEVER_DONE.wait (event loop), so waiting for promise in this function will make a nested wait. This is not allowed in capnp.

You can avoid this by doing it on another thread. Ex:

  ::kj::Promise<void> calbackFunc(CalbackFuncContext context){
    
    kj::Thread th([](){
        capnp::EzRpcClient ezClient2("unix:/tmp/capnp-server-2");
        Server2::Client client2 = ezClient2.getMain<SampleServer2>();
        auto& waitScope = ezClient2.getWaitScope();
        {
          auto request = client2.functionSampleRequest();  //Request to SERVER2
          request.setIn(222);
          auto promise = request.send();
         
          promise.wait(waitScope);
          
        }
    });
    return kj::READY_NOW;
  }
0
JonasVautherin On

Similar to Kenton's suggestion in the other answer's comments, the following works for me:

  ::kj::Promise<void> calbackFunc(CalbackFuncContext context){
    auto ezClient2 = kj::heap<capnp::EzRpcClient>("unix:/tmp/capnp-server-2");
    SampleServer2::Client client2 = ezClient2->getMain<SampleServer2>();

    auto request = client2.functionSampleRequest();  //Request to SERVER2
    request.setIn(222);
    return request.send().attach(kj::mv(ezClient2)).ignoreResult();

Notes:

  • I return the promise generated by request.send() instead of creating a new promise like you do with kj::READY_NOW.
  • I .attach the ezClient2, so that it stays in the scope of the request.send() promise. Try for yourself: remove the .attach() and see that it crashes. Read about attachments here in the KJ tour.
  • I .ignoreResult(), just to convert the request.send() promise to a kj::Promise<void>. It will still complete normally, but it just conveniently converts the type to something we can return from calbackFunc.

Now, say you want to do something after the request to functionSample() completes. You can just register a callback to call upon completion using .then (see documentation here):

  ::kj::Promise<void> calbackFunc(CalbackFuncContext context){
    auto ezClient2 = kj::heap<capnp::EzRpcClient>("unix:/tmp/capnp-server-2");
    SampleServer2::Client client2 = ezClient2->getMain<SampleServer2>();

    auto request = client2.functionSampleRequest();  //Request to SERVER2
    request.setIn(222);

    return request.send().ignoreResult()
        .then([]() {
          // DO SOMETHING IN THE CONTINUATION HERE
        }).attach(kj::mv(ezClient2));
  }