#define BINANCE_HANDLER(f) beast::bind_front_handler(&binanceWS<A>::f, this->shared_from_this())
template <typename A>
class binanceWS : public std::enable_shared_from_this<binanceWS<A>> {
tcp::resolver resolver_;
Stream ws_;
beast::flat_buffer buffer_;
std::string host_;
std::string message_text_;
std::string wsTarget_ = "/ws/";
char const* host = "stream.binance.com";
char const* port = "9443";
SPSCQueue<A>& diff_messages_queue;
std::function<void()> on_message_handler;
// OnMessage on_message_cb;
public:
binanceWS(net::any_io_executor ex, ssl::context& ctx, SPSCQueue<A>& q)
: resolver_(ex)
, ws_(ex, ctx)
, diff_messages_queue(q) {}
void run(char const* host, char const* port, json message, const std::string& streamName) {
if (!SSL_set_tlsext_host_name(ws_.next_layer().native_handle(), host)) {
throw boost::system::system_error(
error_code(::ERR_get_error(), net::error::get_ssl_category()));
}
host_ = host;
message_text_ = message.dump();
wsTarget_ += streamName;
resolver_.async_resolve(host_, port, BINANCE_HANDLER(on_resolve));
}
void on_resolve(beast::error_code ec, tcp::resolver::results_type results) {
if (ec)
return fail_ws(ec, "resolve");
if (!SSL_set_tlsext_host_name(ws_.next_layer().native_handle(), host_.c_str())) {
throw beast::system_error{
error_code(::ERR_get_error(), net::error::get_ssl_category())};
}
get_lowest_layer(ws_).expires_after(30s);
beast::get_lowest_layer(ws_).async_connect(results, BINANCE_HANDLER(on_connect));
}
void on_connect(beast::error_code ec,
[[maybe_unused]] tcp::resolver::results_type::endpoint_type ep) {
if (ec)
return fail_ws(ec, "connect");
// Perform the SSL handshake
ws_.next_layer().async_handshake(ssl::stream_base::client, BINANCE_HANDLER(on_ssl_handshake));
}
void on_ssl_handshake(beast::error_code ec) {
if (ec)
return fail_ws(ec, "ssl_handshake");
beast::get_lowest_layer(ws_).expires_never();
ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::client));
ws_.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
req.set(http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-async");
}));
std::cout << "using host_: " << host_ << std::endl;
ws_.async_handshake(host_, wsTarget_, BINANCE_HANDLER(on_handshake));
}
void on_handshake(beast::error_code ec) {
if (ec) {
return fail_ws(ec, "handshake");
}
std::cout << "Sending : " << message_text_ << std::endl;
ws_.async_write(net::buffer(message_text_), BINANCE_HANDLER(on_write));
}
void on_write(beast::error_code ec, size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);
if (ec)
return fail_ws(ec, "write");
ws_.async_read(buffer_, BINANCE_HANDLER(on_message));
}
void on_message(beast::error_code ec, size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);
if (ec)
return fail_ws(ec, "read");
on_message_handler(); // WORKS FINE!!!
ws_.async_read(buffer_, [this](beast::error_code ec, size_t n) {
if (ec)
return fail_ws(ec, "read");
on_message_handler(); // DOESN'T WORK
buffer_.clear();
ws_.async_read(buffer_, BINANCE_HANDLER(on_message));
});
}
void subscribe_orderbook_diffs(const std::string action,const std::string symbol,short int depth_levels)
{
std::string stream = symbol+"@"+"depth"+std::to_string(depth_levels);
on_message_handler = [this]() {
std::cout << "Orderbook Levels Update" << std::endl;
json payload = json::parse(beast::buffers_to_string(buffer_.cdata()));
std::cout << payload << std::endl;
};
json jv = {
{ "method", action },
{ "params", {stream} },
{ "id", 1 }
};
run(host, port,jv, stream);
}
};
int main() {
net::io_context ioc;
ssl::context ctx{ssl::context::tlsv12_client};
ctx.set_verify_mode(ssl::verify_peer);
ctx.set_default_verify_paths();
int levels = 10;
std::string symbol = "btcusdt";
auto binancews = std::make_shared<binanceWS>(make_strand(ioc), ctx);
binancews->subscribe_orderbook_diffs("SUBSCRIBE", symbol, levels);
ioc.run();
}
Output : Orderbook Levels Update
terminate called after throwing an instance of 'nlohmann::detail::parse_error'
what(): [json.exception.parse_error.101] parse error at line 1, column 687: syntax error while parsing value - unexpected '{'; expected end of input
Aborted (core dumped)
But by calling on_message_handler() inside on_message() function works just fine, problem arises when I do that (calling on_message_handler()) in inside lambda function (which is passed as handler in async_read()).
When you use
on_message_handleroutsideon_message,buffer_has been appended to byasync_readagain.Even if you want to do the processing later, parse the JSON when you know it's valid.
If you really just want to call it in both places, don't forget the
buffer_.clear()the first time...:In fact, this tells you
buffer_.clear()should probably be insideon_message_handler. Even better, encapsulate it so it cannot be (a) forgotten (b) mis-used:Demo
Made self-contained again
Live On Coliru