I am working on a bi-directional streaming gRPC POC where I have a gRPC client, which is spawning 500 threads to simulate 500 clients, each doing the rpc call parallelly every second. On the gRPC server side, When client make the first connect, I save the responseStreamObserver and link it with the client's ID, so the server can use the StreamObserver when needs to push message to client.
When running this setup I see below errors :
java.lang.IllegalStateException: Stream 39 sent too many headers EOS: false
and io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception$StreamException: Stream closed before write could take place
When I run a single instance/thread in the client, I see no errors. Seems to me that this is caused due to thread synchronization issue. Can anyone please help in resolving this? I'm blocked. :(
Following is my server implementation:
public StreamObserver<DataRequest> processBidirectionalStream(StreamObserver<DataResponse> responseObserver) {
return new StreamObserver<DataRequest>() {
private String clientId;
private int requestCount = 0;
private boolean isFirstRequest = true;
@Override
public void onNext(DataRequest request) {
clientId=request.getClientId();
if(isFirstRequest){
//Store the first StreamObserver for this client
clientObservers.put(clientId, responseObserver);
isFirstRequest = false;
}
//Increment request count for the client
requestCount = clientRequestCounts.getOrDefault(clientId, 0);
requestCount++;
clientRequestCounts.put(clientId, requestCount);
//Push the message to kafka topic for processing
processChunk(request.getName());
//Send a response back to the client after 5 requests
if(clientRequestCounts.get(clientId)%5==0){
DataResponse response = DataResponse.newBuilder()
.setMessage("****Notification sent****")
.build();
clientObservers.get(clientId).onNext(response);
}
}
@Override
public void onError(Throwable throwable) {
System.err.println("Encountered error in bidirectional stream: " + throwable);
}
@Override
public void onCompleted() {
System.out.println("Bidirectional streaming RPC completed");
clientObservers.get(clientId).onCompleted();
}
};
}
Tried running 500 clients (as separate threads here) which are doing parallel rpc calls to the gRPC server every second. Expected gRPC server to handle the requests and send appropriate response whenever needed to each client (on ad-hoc basis, not on every request.) Seeing errors in server as below when this set-up is run:
java.lang.IllegalStateException: Stream 39 sent too many headers EOS: false
and
io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception$StreamException: Stream closed before write could take place
Following is the stack trace when run with 10 clients for testing purposes:
Encountered error in bidirectional stream: io.grpc.StatusRuntimeException: CANCELLED: client cancelled
2024-02-26 10:56:39.588 ERROR 21736 --- [ault-executor-8] io.grpc.internal.SerializingExecutor : Exception while executing runnable io.grpc.internal.ServerImpl$ServerTransportListenerImpl$1StreamCreated@78959964
io.grpc.StatusRuntimeException: CANCELLED: call already cancelled. Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception
at io.grpc.Status.asRuntimeException(Status.java:524) ~[grpc-api-1.30.0.jar:1.30.0]
at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:366) ~[grpc-stub-1.42.1.jar:1.42.1]
at com.philips.fp.service.GrpcServer.sendMessageToClient(GrpcServer.java:120) ~[classes/:na]
at com.philips.fp.service.GrpcServer.processBidirectionalStream(GrpcServer.java:54) ~[classes/:na]
at com.philips.fp.grpc.DataProcessorGrpc$MethodHandlers.invoke(DataProcessorGrpc.java:204) ~[classes/:na]
at io.grpc.stub.ServerCalls$StreamingServerCallHandler.startCall(ServerCalls.java:235) ~[grpc-stub-1.42.1.jar:1.42.1]
at net.devh.boot.grpc.server.metric.MetricCollectingServerInterceptor.interceptCall(MetricCollectingServerInterceptor.java:138) ~[grpc-server-spring-boot-autoconfigure-2.9.0.RELEASE.jar:2.9.0.RELEASE]
at io.grpc.ServerInterceptors$InterceptCallHandler.startCall(ServerInterceptors.java:244) ~[grpc-api-1.30.0.jar:1.30.0]
at io.grpc.Contexts.interceptCall(Contexts.java:52) ~[grpc-api-1.30.0.jar:1.30.0]
at net.devh.boot.grpc.server.scope.GrpcRequestScope.interceptCall(GrpcRequestScope.java:75) ~[grpc-server-spring-boot-autoconfigure-2.9.0.RELEASE.jar:2.9.0.RELEASE]
at io.grpc.ServerInterceptors$InterceptCallHandler.startCall(ServerInterceptors.java:244) ~[grpc-api-1.30.0.jar:1.30.0]
at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.startWrappedCall(ServerImpl.java:651) ~[grpc-core-1.30.0.jar:1.30.0]
at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.startCall(ServerImpl.java:629) ~[grpc-core-1.30.0.jar:1.30.0]
at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.access$1900(ServerImpl.java:416) ~[grpc-core-1.30.0.jar:1.30.0]
at io.grpc.internal.ServerImpl$ServerTransportListenerImpl$1StreamCreated.runInternal(ServerImpl.java:556) ~[grpc-core-1.30.0.jar:1.30.0]
at io.grpc.internal.ServerImpl$ServerTransportListenerImpl$1StreamCreated.runInContext(ServerImpl.java:531) ~[grpc-core-1.30.0.jar:1.30.0]
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[grpc-core-1.30.0.jar:1.30.0]
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[grpc-core-1.30.0.jar:1.30.0]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]