I can receive UDP data but how do I send it simultaneously to ws websocket connected clients using c#?

65 views Asked by At

I can accept UDP messages from connected devices on the local network. I need to send that data to connected websocket clients. When new data is received, it needs to be sent to the websocket clients for real-time updates. I listen for the UDP data in a new thread. When the data is received, I don't know how to pass that data to the websocket clients. All clients will receive the same data. There won't be more than 2 or 3 websocket clients connected at any given time. UDP data is sent on the local network to a designated port. The goal is for the UDP data to show up on web pages in real-time.

The UDP data needs to be received even if there are no websocket clients connected. The UDP data is also put into a database as it's received.

class MainClass
{
    static void Main(string[] args)
    {
        Console.WriteLine("Receiver v1.0.0");
        ExternalVariables Ev = new ExternalVariables();
        Ev.Get();

        UdpReceiver r = new UdpReceiver();
        r.Port = Ev.ReceivingPort;
        Thread ControllerThread = new Thread(r.ReceiveMessages);
        ControllerThread.Start();

        TcpListener Server = new TcpListener(IPAddress.Parse(Ev.LocalHost), Ev.WebsocketPort);
        Server.Start();

        while (true)
        {
            TcpClient Client = Server.AcceptTcpClient();
            Console.WriteLine("Websocket Connected...");

            //Thread WebsocketThread = new Thread(WebsocketClientHandler);
            //WebsocketThread.Start(Client);
            WebsocketClientHandler(Client);

        }
    }

    public static void WebsocketClientHandler(object c)
    {
        byte[] Bytes;
        TcpClient Client = (TcpClient)c;
        NetworkStream Stream = Client.GetStream();
        bool connected = true;

        try
        {
            Bytes = new byte[Client.Available];
            Stream.Read(Bytes, 0, Bytes.Length);
            string s = Encoding.UTF8.GetString(Bytes);

            if (Regex.IsMatch(s, "^GET", RegexOptions.IgnoreCase))
            {
                Console.WriteLine("=====Handshaking from client=====\n{0}", s);

                // 1. Obtain the value of the "Sec-WebSocket-Key" request header without any leading or trailing whitespace
                // 2. Concatenate it with "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" (a special GUID specified by RFC 6455)
                // 3. Compute SHA-1 and Base64 hash of the new value
                // 4. Write the hash back as the value of "Sec-WebSocket-Accept" response header in an HTTP response
                string swk = Regex.Match(s, "Sec-WebSocket-Key: (.*)").Groups[1].Value.Trim();
                string swka = swk + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
                byte[] swkaSha1 = System.Security.Cryptography.SHA1.Create().ComputeHash(Encoding.UTF8.GetBytes(swka));
                string swkaSha1Base64 = Convert.ToBase64String(swkaSha1);

                // HTTP/1.1 defines the sequence CR LF as the end-of-line marker
                byte[] response = Encoding.UTF8.GetBytes(
                    "HTTP/1.1 101 Switching Protocols\r\n" +
                    "Connection: Upgrade\r\n" +
                    "Upgrade: websocket\r\n" +
                    "Sec-WebSocket-Accept: " + swkaSha1Base64 + "\r\n\r\n");

                Stream.Write(response, 0, response.Length);
            }

            //Not sure How to send to connected websocket clients.
            while (NewData)
            {
                //Send to webclient...


            }
        }
        catch (Exception e)
        {
            connected = false;
            Console.WriteLine(e);
            Console.WriteLine(e.StackTrace);
        }
    }
}

public class UdpReceiver
{
    public bool MessageReceived = false;
    public int Port { get; set; }

    public UdpReceiver() 
    {
        Port = 0;
    }

    public void ReceiveMessages()
    {

        IPEndPoint e = new IPEndPoint(IPAddress.Any, Port);
        UdpClient u = new UdpClient(e);

        while (true)
        {
            MessageReceived = false;
            byte[] Data = u.Receive(ref e);
            MessageReceived = true;
            //Task.Run(() => ProcessData(Data, c));  //New task to process the data in case another controller is sending data
            ProcessData(Data);
        }
    }

    public void ProcessData(byte[] Data)
    {
        ReceivedData ReceivedData = new ReceivedData();
        ReceivedData.Data = Data;
        ReceivedData.Process();
        //Process() puts the data in a database and is supposed to send to websocket clients.
    }
}

Websocket Test Client......

<%@ Page Language="C#" Inherits="WebsocketsExampleWeb.Default" %>
<!doctype html>
<html lang="en">
  <style>
    textarea {
      vertical-align: bottom;
    }
    #output {
      overflow: auto;
    }
    #output > p {
      overflow-wrap: break-word;
    }
    #output span {
      color: blue;
    }
    #output span.error {
      color: red;
    }
  </style>
  <body>
    <h2>WebSocket</h2>
    <textarea cols="60" rows="6"></textarea>
    <button>send</button>
    <div id="output"></div>
  </body>
  <script>
        
        
function connect() {
  var ws = new WebSocket('ws://127.0.0.1:8077');
  ws.onopen = function() {
     writeToScreen("CONNECTED");
  };

  ws.onmessage = function(e) {
    writeToScreen(`<span>RESPONSE: ${e.data}</span>`);
    ws.send("Client Response");
    console.log('Message:', e.data);
  };

  ws.onclose = function(e) {
    console.log('Socket is closed. Reconnect will be attempted in 1 second.', e.reason);
    setTimeout(function() {
      connect();
    }, 1000);
  };

  ws.onerror = function(err) {
    console.error('Socket encountered error: ', err.message, 'Closing socket');
    ws.close();
  };
}
        
function writeToScreen(message) {
  output.insertAdjacentHTML("afterbegin", `<p>${message}</p>`);
}
        

connect();        
        
  </script>
</html>
2

There are 2 answers

2
Charlieface On

First off, do not try to reimplement the WebSocket protocol yourself. It's extremely complex, and C# already has a perfectly good WebSocket implementation already.

  • Make everything async where possible.
  • Use HttpListener instead of TcpListener, and upgrade each received context to a web socket.
  • Put each context in a list so you can continue listening. Make sure to lock the list when you use it.
class MainClass
{
    private static readonly CancellationTokenSource cts = new CancellationTokenSource();
    private static readonly List<HttpListenerWebSocketContext> _sockets = new();

    static async Task Main(string[] args)
    {
        Console.WriteLine("Receiver v1.0.0");
        ExternalVariables Ev = new ExternalVariables();
        Ev.Get();

        UdpReceiver r = new UdpReceiver(_sockets, cts.Token);
        r.Port = Ev.ReceivingPort;
        
        var Server = new HttpListener();
        Server.Prefixes.Add("http://+:" + Ev.WebsocketPort);
        Server.Start();

        Task.Run(r.ReceiveMessages);

        // need a way of shutting everything down
        Console.CancelKeyPress += (sender, eventArgs) =>
        {
            cts.Cancel();
            eventArgs.Cancel = true;
        };
        try
        {
            while (true)
            {
                var context = await Server.GetContextAsync(cts.Token);
                Console.WriteLine("Websocket Connected...");
                // hand off the client to another task so we can continue listening
                Task.Run(() => WebsocketClientHandler(context));
            }
        }
        catch (OperationCanceledException) {}
    }

    public static async Task WebsocketClientHandler(HttpListenerContext context)
    {
        try
        {
            var socket = await context.AcceptWebSocketAsync(null);

            lock (_sockets)
            {
                _sockets.Add(socket);
            }
        }
        catch
        {
            context.Response.Dispose();
        }
    }
}
public class UdpReceiver
{
    public int Port { get; set; }
    private List<HttpListenerWebSocketContext> _contexts;
    private CancellationToken _token;

    public UdpReceiver(List<HttpListenerWebSocketContext> contexts, CancellationToken token) 
    {
        Port = 0;
        _contexts = contexts;
        _token = token;
    }

    public async Task ReceiveMessages()
    {
        IPEndPoint e = new IPEndPoint(IPAddress.Any, Port);
        using var u = new UdpClient(e);

        try
        {
            while (true)
            {
                var response = await u.ReceiveAsync(_token);
                ProcessData(response.Buffer);
            }
        }
        catch (OperationCanceledException) {}
    }

    public void ProcessData(byte[] Data)
    {
        ReceivedData ReceivedData = new ReceivedData();
        ReceivedData.Data = Data;
        ReceivedData.Process();
        // broadcast to all sockets
        List<HttpListenerWebSocketContext> list;
        lock (_contexts)
        {
            list = _contexts.ToList();
        }
        foreach (var context in list)
        {
            await SendToSocket(context, Data)
        }
    }

    public static async Task SendToSocket(HttpListenerContext context, byte[] bytes)
    {
        try
        {
            await context.WebSocket.SendAsync(bytes, WebSocketMessageType.Binary, false, _token);
        }
        catch (OperationCanceledException) {}
        catch (Exception e)
        {
            connected = false;
            Console.WriteLine(e);
            Console.WriteLine(e.StackTrace);
            lock(_sockets)
            {
                _contexts.Remove(context);  // remove failed context
                context.WebSocket.Dispose();
            }
        }
    }
}
0
Joe On

Here is my working prototype. I was able to connect/disconnect multiple web clients to receive the messages. It also handles multiple UDP clients.

class MainClass
{
    public static readonly CancellationTokenSource cts = new CancellationTokenSource();
    public static readonly List<WebsocketConnection> _clientstreams = new List<WebsocketConnection>();

    static void Main(string[] args)
    {
        Console.WriteLine("Receiver v1.0.0");
        ExternalVariables Ev = new ExternalVariables();
        Ev.Get();

        UdpReceiver r = new UdpReceiver(_clientstreams, cts.Token);
        r.Port = Ev.ReceivingPort;

        TcpListener Server = new TcpListener(IPAddress.Parse(Ev.LocalHost), Ev.WebsocketPort);
        Server.Start();

        Task.Run(r.ReceiveMessages);

        while (true)
        {
            WebsocketConnection Client = new WebsocketConnection();
            Client.Client = Server.AcceptTcpClient();
            bool NewClient = Client.GetConnection();

            if (NewClient)
            {

                lock (_clientstreams)
                {
                    _clientstreams.Add(Client);
                    Console.WriteLine("Added Websocket client to the list");
                }
            }
        }
    }
}

public class UdpReceiver
{
    public int Port { get; set; }
    private List<WebsocketConnection> _clientstreams;
    private  static CancellationToken _token;

    public UdpReceiver(List<WebsocketConnection> ClientStreams, CancellationToken token)
    {
        Port = 0;
        _clientstreams = ClientStreams;
        _token = token;
    }

    public async Task ReceiveMessages()
    {
        IPEndPoint e = new IPEndPoint(IPAddress.Any, Port);
        var u = new UdpClient(e);

        try
        {
            while (true)
            {
                var response = await u.ReceiveAsync();
                ProcessDataAsync(response.Buffer);
            }
        }
        catch (OperationCanceledException) 
        { 
        
        }
    }

    public async Task ProcessDataAsync(byte[] Data)
    {
        Console.WriteLine("Received Data");
        ReceivedData ReceivedData = new ReceivedData();
        ReceivedData.Data = Data;
        ReceivedData.Process();

        // broadcast to all sockets
        List<WebsocketConnection> list;
        lock (_clientstreams)
        {
            list = _clientstreams.ToList();
        }
        foreach (WebsocketConnection c in list)
        {

            await SendToSocket(c, ReceivedData);
        }

    }

    public async Task SendToSocket(WebsocketConnection Stream, ReceivedData Data)
    {
        try
        {
            string json = JsonConvert.SerializeObject(Data.SwipeRecord);
            Stream.SendMessage(json);
            string BrowserMessage = Stream.ReadMessage();

            json = JsonConvert.SerializeObject(Data.ControllerStatus);
            Stream.SendMessage(json);
            BrowserMessage = Stream.ReadMessage();

            Console.WriteLine(BrowserMessage);
            Console.WriteLine("Sent to Websocket");
        }
        catch (Exception e)
        {
            Console.WriteLine("Closing WebSocket");
            Console.WriteLine(e);
            Console.WriteLine(e.StackTrace);
            lock (_clientstreams)
            {
                _clientstreams.Remove(Stream);  // remove failed context
                Console.WriteLine("Removed Socket");
            }
        }
    }
}

public class WebsocketConnection
{
    public string IpAddress { get; set; }
    public int Port { get; set; }
    public TcpListener Server { get; set; }
    public TcpClient Client { get; set; }
    public NetworkStream Stream { get; set; }
    public byte[] Bytes { get; set; }
    public int ConnectionTries { get; set; }

    public WebsocketConnection()
    {
        ConnectionTries = 0;
        Client = new TcpClient();
    }

    public bool GetConnection()
    {

        bool NewClient = false;

        Console.WriteLine("Websocket Connection Established...");
        Stream = Client.GetStream();

        ConnectionTries = 0;

        while (!Stream.DataAvailable) ;
        while (Client.Available < 3) ;

        Bytes = new byte[Client.Available];
        Stream.Read(Bytes, 0, Bytes.Length);
        string s = Encoding.UTF8.GetString(Bytes);

        if (Regex.IsMatch(s, "^GET", RegexOptions.IgnoreCase))
        {
            Console.WriteLine("=====Handshaking from client=====\n{0}", s);

            // 1. Obtain the value of the "Sec-WebSocket-Key" request header without any leading or trailing whitespace
            // 2. Concatenate it with "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" (a special GUID specified by RFC 6455)
            // 3. Compute SHA-1 and Base64 hash of the new value
            // 4. Write the hash back as the value of "Sec-WebSocket-Accept" response header in an HTTP response
            string swk = Regex.Match(s, "Sec-WebSocket-Key: (.*)").Groups[1].Value.Trim();
            string swka = swk + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
            byte[] swkaSha1 = System.Security.Cryptography.SHA1.Create().ComputeHash(Encoding.UTF8.GetBytes(swka));
            string swkaSha1Base64 = Convert.ToBase64String(swkaSha1);

            // HTTP/1.1 defines the sequence CR LF as the end-of-line marker
            byte[] response = Encoding.UTF8.GetBytes(
                "HTTP/1.1 101 Switching Protocols\r\n" +
                "Connection: Upgrade\r\n" +
                "Upgrade: websocket\r\n" +
                "Sec-WebSocket-Accept: " + swkaSha1Base64 + "\r\n\r\n");

            Stream.Write(response, 0, response.Length);
            NewClient = true;

        }
        return NewClient;
    }

    public void Handshake(string s)
    {

        if (Regex.IsMatch(s, "^GET", RegexOptions.IgnoreCase))
        {
            Bytes = new byte[Client.Available];
            Stream.Read(Bytes, 0, Bytes.Length);
            s = Encoding.UTF8.GetString(Bytes);

            if (Regex.IsMatch(s, "^GET", RegexOptions.IgnoreCase))
            {
                Console.WriteLine("=====Handshaking from client=====\n{0}", s);

                // 1. Obtain the value of the "Sec-WebSocket-Key" request header without any leading or trailing whitespace
                // 2. Concatenate it with "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" (a special GUID specified by RFC 6455)
                // 3. Compute SHA-1 and Base64 hash of the new value
                // 4. Write the hash back as the value of "Sec-WebSocket-Accept" response header in an HTTP response
                string swk = Regex.Match(s, "Sec-WebSocket-Key: (.*)").Groups[1].Value.Trim();
                string swka = swk + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
                byte[] swkaSha1 = System.Security.Cryptography.SHA1.Create().ComputeHash(Encoding.UTF8.GetBytes(swka));
                string swkaSha1Base64 = Convert.ToBase64String(swkaSha1);

                // HTTP/1.1 defines the sequence CR LF as the end-of-line marker
                byte[] response = Encoding.UTF8.GetBytes(
                    "HTTP/1.1 101 Switching Protocols\r\n" +
                    "Connection: Upgrade\r\n" +
                    "Upgrade: websocket\r\n" +
                    "Sec-WebSocket-Accept: " + swkaSha1Base64 + "\r\n\r\n");

                Stream.Write(response, 0, response.Length);
            }
        }
    }

    public void SendMessage(string Msg)
    {

        byte[] sendBytes = Encoding.UTF8.GetBytes(Msg);
        byte lengthHeader = 0;
        byte[] lengthCount = new byte[] { };

        if (sendBytes.Length <= 125)
            lengthHeader = (byte)sendBytes.Length;

        if (125 < sendBytes.Length && sendBytes.Length < 65535) //System.UInt16
        {
            lengthHeader = 126;

            lengthCount = new byte[] {
                    (byte)(sendBytes.Length >> 8),
                    (byte)(sendBytes.Length)
                        };
        }

        if (sendBytes.Length > 65535)//max 2_147_483_647 but .Length -> System.Int32
        {
            lengthHeader = 127;
            lengthCount = new byte[] {
                    (byte)(sendBytes.Length >> 56),
                    (byte)(sendBytes.Length >> 48),
                    (byte)(sendBytes.Length >> 40),
                    (byte)(sendBytes.Length >> 32),
                    (byte)(sendBytes.Length >> 24),
                    (byte)(sendBytes.Length >> 16),
                    (byte)(sendBytes.Length >> 8),
                    (byte)sendBytes.Length,
                };
        }

        List<byte> responseArray = new List<byte>() { 0b10000001 };

        responseArray.Add(lengthHeader);
        responseArray.AddRange(lengthCount);
        responseArray.AddRange(sendBytes);

        Stream.Write(responseArray.ToArray(), 0, responseArray.Count);
    }

    public string ReadMessage()
    {
        string Message = "";
        Stream.ReadTimeout = 50;

        Stream.Read(Bytes, 0, Bytes.Length);

        string s = Encoding.UTF8.GetString(Bytes);

        bool fin = (Bytes[0] & 0b10000000) != 0,
            mask = (Bytes[1] & 0b10000000) != 0; // must be true, "All messages from the client to the server have this bit set"
        int opcode = Bytes[0] & 0b00001111, // expecting 1 - text message
            offset = 2;
        ulong msglen = Bytes[1] & (ulong)0b01111111;

        if (msglen == 126)
        {
            // bytes are reversed because websocket will print them in Big-Endian, whereas
            // BitConverter will want them arranged in little-endian on windows
            msglen = BitConverter.ToUInt16(new byte[] { Bytes[3], Bytes[2] }, 0);
            offset = 4;
        }
        else if (msglen == 127)
        {
            // To test the below code, we need to manually buffer larger messages — since the NIC's autobuffering
            // may be too latency-friendly for this code to run (that is, we may have only some of the bytes in this
            // websocket frame available through client.Available).
            msglen = BitConverter.ToUInt64(new byte[] { Bytes[9], Bytes[8], Bytes[7], Bytes[6], Bytes[5], Bytes[4], Bytes[3], Bytes[2] }, 0);
            offset = 10;
        }

        if (msglen == 0)
        {
            Console.WriteLine("msglen == 0");
        }
        else if (mask)
        {
            byte[] decoded = new byte[msglen];
            byte[] masks = new byte[4] { Bytes[offset], Bytes[offset + 1], Bytes[offset + 2], Bytes[offset + 3] };
            offset += 4;

            for (ulong i = 0; i < msglen; ++i)
                decoded[i] = (byte)(Bytes[(int)offset + (int)i] ^ masks[i % 4]);

            string text = Encoding.UTF8.GetString(decoded);
            Console.WriteLine("175 WebsocketConnection {0}", text);
            Message = text;
        }
        else
            Console.WriteLine("mask bit not set");

        Console.WriteLine();
        return Message;
    }

    public bool IsSocketConnected()
    {
        
        bool part1 = Client.Client.Poll(50, SelectMode.SelectRead);
        bool part2 = (Client.Client.Available == 0);
        if ((part1 && part2 ) || !Client.Client.Connected)
            return false;
        else
            return true;
    }
}