gRPC async server : bad thread scailibility

31 views Asked by At

I have a simple gRPC asynchronous server. It's a also multi-thread, one thread handle one completion queue.

But the thread scalibility is very bad when the count of threads grows:

  • For small messages, the best performance is with only one thread.
  • For big messages, the performance is only doubled from 1 to 4 threads, and higher count of threads doesn't increase performance (on a server with 64 cores).

Is it a normal behaviour?

The server looks like this:

class Server final
{
public:
    Server(const Config& config, Service& sync_service)
        : m_config(config),
          m_sync_service(sync_service)
    {
        // Nothing to do
    }

    ~Server()
    {
        stop();
    }

    void stop()
    {
        m_stopped = true;
        m_server->Shutdown();
        for(auto& cq: m_cqs)
        {
            cq->Shutdown();
        }
    }

    void run_and_wait(const std::string& grpc_address_port)
    {
        grpc::ServerBuilder builder;
        builder.AddListeningPort(grpc_address_port, grpc::InsecureServerCredentials());
        builder.RegisterService(&m_async_service);

        const int num_threads = m_config.threads;
        const int threads_per_cq = 1;
        assert(num_threads % threads_per_cq == 0);
        for(int i = 0; i < ceil_div(num_threads, threads_per_cq); i++)
        {
            auto& cq = m_cqs.emplace_back(builder.AddCompletionQueue());
        }
        // with the gRPC runtime.
        // Finally assemble the server.
        m_server = builder.BuildAndStart();
        std::cout << "Server listening on " << grpc_address_port << std::endl;

        // Proceed to the server's main loop.
        std::vector<std::thread> threads;

        for(int i = 0; i < num_threads; i++)
        {
            grpc::ServerCompletionQueue* cq = m_cqs[i / threads_per_cq].get();
            
            for(int j = 0; j < m_config.concurrent_calldatas; j++)
            {
                new CallDataUnary<IMethod>(m_async_service, m_sync_service, *cq);
            }

            threads.emplace_back([this, cq]() {
                handle_rpcs(*cq);
            });
        }

        // Just wait all poller threads to stop
        for(auto& thread: threads)
        {
            if(thread.joinable())
            { thread.join(); }
        }
    }

private:
    class CallDataBase
    {
    public:
        virtual ~CallDataBase() = default;

        virtual void proceed() = 0;
        virtual void wait_for_new_request() = 0;

        enum class State
        {
            WAIT_REQUEST,
            FINISH
        };

    protected:
        State m_state{State::WAIT_REQUEST};
    };

    template<typename IMethod>
    class CallDataUnary : public CallDataBase
    {
    public:
        CallDataUnary(AsyncService& service, Service& sync_service, grpc::ServerCompletionQueue& cq)
            : m_service(service),
              m_sync_service(sync_service),
              m_writer(&m_context),
              m_cq(cq)
        {
            wait_for_request();
        }

        void wait_for_new_request()
        {
            (new CallDataUnary(m_service, m_sync_service, m_cq));
        }

        void wait_for_request()
        {
            // Equivalent of calling the RequestXXX()
            IMethod::request(
                m_service,
                &m_context,
                &m_request,
                &m_writer,
                &m_cq,
                &m_cq,
                this
            );
        }

        void proceed() final
        {
            if(m_state == State::WAIT_REQUEST)
            {
                wait_for_new_request();
                m_state = CallDataBase::State::FINISH;
                // Call the business logic
                grpc::Status status = IMethod::dispatch_grpc_sync(m_sync_service, &m_context, &m_request, &m_response);

                // At the end to avoid data races.
                m_writer.Finish(m_response, status, this);
            }
            else
            {
                GPR_ASSERT(m_state == State::FINISH);
                delete this;
            }
        }

    private:
        /// The service of the RPC method that this CallData is listening to.
        AsyncService& m_service;
        Service& m_sync_service;
        // The gRPC request for this RPC call.
        grpc::ServerContext m_context;
        /// The gRPC request sent by the client.
        RequestCPP m_request;
        /// The gRPC response to send to the client when the request is completed.
        ResponseCPP m_response;
        /// The writer to write the response to the client.
        grpc::ServerAsyncResponseWriter<ResponseCPP> m_writer;
        grpc::ServerCompletionQueue& m_cq;
    };

    // This can be run in multiple threads if needed.
    void handle_rpcs(grpc::ServerCompletionQueue& cq)
    {
        void* tag;  // uniquely identifies a request.
        bool ok;
        while(!m_stopped)
        {
            // Block waiting to read the next event from the completion queue. The
            // event is uniquely identified by its tag, which in this case is the
            // memory address of a CallData instance.
            // The return value of Next should always be checked. This return value
            // tells us whether there is any kind of event or m_cq is shutting down.
            GPR_ASSERT(cq.Next(&tag, &ok));
            auto call = static_cast<CallDataBase*>(tag);
            if(ok)
            {
                call->proceed();
            }
            else
            {
                call->wait_for_new_request();
                delete call;
            }
        }
    }

    Config m_config;
    std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> m_cqs;
    AsyncService m_async_service;
    Service& m_sync_service;
    std::unique_ptr<grpc::Server> m_server;
    bool m_stopped{false};
};
0

There are 0 answers