Client Cancelled error in bidirectional grpc channel

129 views Asked by At

At grpc server side:

    CANCELLED:client cancelled io.grpc.StatusRuntimeException:CANCELLED:client cancelled at io.grpc.Status.asRuntimeException(Status.java:530)
    at io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:291)
    at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:365)

At grpc client side:

    io.grpc.StatusRuntimeException:INTERNAL:RST_STREAM closed stream.HTTP/2 error code:PROTOCOL_ERROR at io.grpc.Status.asRuntimeException(Status.java:539)
    at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:487)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:567)
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:71)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:735)

I have started a grpc server on aws ec2 instance as below:


` `server=ServerBuilder.forPort(GRPC_SERVER_PORT)
        .addService(ServerInterceptors.intercept((BindableService)new GrpcServiceImpl(),new AuthServerInterceptor()))
        .build().start();``

I have submitted a job on databricks cluster from where a bidirectional grpc channel is created as below:

private static StreamObserver<InspectCommand> inspectCommandStreamObserver;
        //client creation
        ManagedChannel channel=ManagedChannelBuilder.forAddress(SERVER_HOST,SERVER_PORT)
        .useTransportSecurity()
        .intercept(new AuthClientInterceptor(authToken))
        .enableRetry()
        .maxRetryAttempts(2)
        .build();
        LOGGER.info("channel created");
        GrpcServiceDefinitionGrpc.GrpcServiceDefinitionStub stub=GrpcServiceDefinitionGrpc.newStub(channel);

        createBidirectionalGrpcStreaming(grpcStub);

private void createBidirectionalGrpcStreaming(GrpcServiceDefinitionGrpc.GrpcServiceDefinitionStub grpcStub){
        inspectCommandStreamObserver=grpcStub.inspectClient(new StreamObserver<InspectCommand>(){
@Override public void onNext(InspectCommand inspectCommand){
        try{
        //code for command processing
        inspectCommandStreamObserver.onNext(response));
        }catch(IOException e){
        throw new RuntimeException(e);
        }catch(ClassNotFoundException e){
        throw new RuntimeException(e);
        }
        }
@Override public void onError(Throwable t){
        inspectCommandStreamObserver=null;
        createBidirectionalGrpcStreaming(grpcStub);
        t.printStackTrace();
        }
@Override public void onCompleted(){
        LOGGER.info("Communication completed.");
        }
        });
        LOGGER.info("Signalling ready");
        // Send the "ready" message to the GRPC Server
        byte[]commandBytes="Waiting for command".getBytes(StandardCharsets.UTF_8);
        InspectCommand readyCommand=InspectCommand.newBuilder()
        .setCommand(ByteString.copyFrom(commandBytes))
        .build();
        inspectCommandStreamObserver.onNext(readyCommand);
        }

As soon as job runs on databricks cluster,above code is executed and ready message is sent to Grpc server side for bidirectional comm.

Now,at GRPC server side:

private final ConcurrentMap<String, StreamObserver<InspectCommand>>inspectCommandStreamObservers=new ConcurrentHashMap<>();
private final ConcurrentMap<String, StreamObserver<DynamicMethodResponse>>mvcResponseObservers=new ConcurrentHashMap<>();

@Override public StreamObserver<InspectCommand> inspectClient(StreamObserver<InspectCommand> responseObserver){

        return new StreamObserver<InspectCommand>(){
@Override public void onNext(InspectCommand command){
synchronized (lock){
        inspectCommandStreamObservers.put(command.getHttpSessionId(),responseObserver);
        }
        // Process the received command from Databricks                       
        if(mvcResponseObservers.containsKey(command.getCommandId())){
        LOGGER.info("Returning response to MVC");
        mvcResponseObservers.get(command.getCommandId())
        .onNext(DynamicMethodResponse.newBuilder().setInspectCommandResponse(command.getCommand()).build());
        mvcResponseObservers.get(command.getCommandId()).onCompleted();
        mvcResponseObservers.remove(command.getCommandId());
        }

        }
@Override public void onError(Throwable t){
        t.printStackTrace();
        }
@Override public void onCompleted(){
        // Communication completed
        }
        };
        }

Bidirectional channel is created and ready message sent from databricks cluster is received at Grpc Server.

Expectation:Bidirectional channel to be long running.

Actual:CANCELLED:client cancelled error after every one minute.

0

There are 0 answers