I'm running into some issues trying to get NetMQ to work. My code compiles fine but I'm not achieving my desired outcome. Testing both Pub and Sub side from one PC.
I have a UI Handler that is linked to four buttons, start/stop pub/sub. Would love if someone could shed some light into the issue.
Desired outcome: To be able to transfer test.wav from "Sending Folder" to "Receiving Folder"
Publisher Logic
using System.Threading;
using NetMQ;
using NetMQ.Sockets;
using System.IO;
public class Publisher
{
private readonly Thread _publisherThread;
private readonly Thread _subscriberThread;
public Publisher()
{
_publisherThread = new Thread(PublisherWork);
_publisherThread.Start();
_subscriberThread = new Thread(SubscriberWork);
_subscriberThread.Start();
}
public void Start()
{
using (var pubSocket = new PublisherSocket())
{
pubSocket.Bind("tcp://*:5557");
string filePath = "C:/Users/XXXXX/Desktop/0MQ Demo/Sending Folder/test.wav";
byte[] fileBytes = File.ReadAllBytes(filePath);
pubSocket.SendMoreFrame("File").SendFrame(fileBytes);
}
}
private void PublisherWork()
{
using (var pubSocket = new PublisherSocket())
{
pubSocket.Bind("tcp://*:5557"); // [C] Port
string filePath = "C:/Users/XXXXX/Desktop/0MQ Demo/Sending Folder/test.wav"; // [C] Filepath
byte[] fileBytes = File.ReadAllBytes(filePath);
while (true)
{
pubSocket.SendMoreFrame("File").SendFrame(fileBytes); // [C] Topic
Thread.Sleep(1000);
}
}
}
private void SubscriberWork()
{
using (var subSocket = new SubscriberSocket())
{
subSocket.Connect("tcp://localhost:5557"); // [C] Port
subSocket.Subscribe("File"); // [C] Topic
while (true)
{
var topic = subSocket.ReceiveFrameString();
var fileBytes = subSocket.ReceiveFrameBytes();
SaveWavFile(fileBytes);
}
}
}
private void SaveWavFile(byte[] fileBytes)
{
string filePath = "C:/Users/XXXXX/Desktop/0MQ Demo/Receiving Folder/test.wav";
File.WriteAllBytes(filePath, fileBytes);
}
}
Subscriber Logic
using System.Threading;
using NetMQ;
using NetMQ.Sockets;
using UnityEngine;
using UnityEngine.UI;
using System.IO;
public class Subscriber : MonoBehaviour
{
public Button startButton;
public Button stopButton;
private Thread _publisherThread;
private Thread _subscriberThread;
private bool _isRunning;
private void Start()
{
startButton.onClick.AddListener(StartThreads);
stopButton.onClick.AddListener(StopThreads);
}
private void StartThreads()
{
_isRunning = true;
_publisherThread = new Thread(PublisherWork);
_publisherThread.Start();
_subscriberThread = new Thread(SubscriberWork);
_subscriberThread.Start();
}
private void StopThreads()
{
_isRunning = false;
_publisherThread.Join();
_subscriberThread.Join();
}
private void PublisherWork()
{
using (var pubSocket = new PublisherSocket())
{
pubSocket.Bind("tcp://*:5557"); // [C] Port
string filePath = "C:/Users/XXXXX/Desktop/0MQ Demo/Sending Folder/test.wav"; // [C] Filepath
byte[] fileBytes = File.ReadAllBytes(filePath);
while (_isRunning)
{
pubSocket.SendMoreFrame("File").SendFrame(fileBytes); // [C] Topic
Thread.Sleep(1000);
}
}
NetMQConfig.Cleanup();
}
private void SubscriberWork()
{
using (var subSocket = new SubscriberSocket())
{
subSocket.Connect("tcp://localhost:5557"); // [C] Port
subSocket.Subscribe("File"); // [C] Topic
while (_isRunning)
{
var topic = subSocket.ReceiveFrameString();
var fileBytes = subSocket.ReceiveFrameBytes();
SaveWavFile(fileBytes);
}
}
NetMQConfig.Cleanup();
}
private void SaveWavFile(byte[] fileBytes)
{
string filePath = "C:/Users/XXXXX/Desktop/0MQ Demo/Receiving Folder/test.wav"; // [C]
File.WriteAllBytes(filePath, fileBytes);
}
}
UI Handler
using System.Threading;
using NetMQ;
using NetMQ.Sockets;
using System.IO;
using UnityEngine;
using UnityEngine.UI;
public class UIHandler : MonoBehaviour
{
public Button startSubscriberButton;
public Button stopSubscriberButton;
public Button startPublisherButton;
public Button stopPublisherButton;
public Text pubStartStatus;
public Text pubStopStatus;
public Text subStartStatus;
public Text subStopStatus;
private Publisher _publisher;
private readonly Thread _publisherThread;
private readonly Thread _subscriberThread;
private bool _publisherRunning = false;
private bool _subscriberRunning = false;
public UIHandler()
{
// Initializing the threads
_publisherThread = new Thread(PublisherWork);
_subscriberThread = new Thread(SubscriberWork);
}
private void Start()
{
_publisher = new Publisher();
startPublisherButton.onClick.AddListener(StartPublisher);
stopPublisherButton.onClick.AddListener(StopPublisher);
startSubscriberButton.onClick.AddListener(StartSubscriber);
stopSubscriberButton.onClick.AddListener(StopSubscriber);
}
public void StartPublisher()
{
// Starting the publisher thread
_publisherThread.Start();
_publisherRunning = true;
pubStartStatus.text = "pubStartStatus: Started";
}
public void StopPublisher()
{
// Stopping the publisher thread
_publisherRunning = false;
_publisherThread.Join();
pubStopStatus.text = "pubStartStatus: Stopped";
}
public void StartSubscriber()
{
// Starting the subscriber thread
_subscriberThread.Start();
_subscriberRunning = true;
subStartStatus.text = "subStartStatus: Stopped";
}
public void StopSubscriber()
{
// Stopping the subscriber thread
_subscriberRunning = false;
_subscriberThread.Join();
subStopStatus.text = "subStartStatus: Stopped";
}
private void PublisherWork()
{
// Creating a publisher socket and binding it to the specified address
using (var pubSocket = new PublisherSocket())
{
pubSocket.Bind("tcp://*:5557");
// Reading the file bytes from the specified location
string filePath = "C:/Users/XXXXX/Desktop/0MQ Demo/Sending Folder/test.wav";
byte[] fileBytes = File.ReadAllBytes(filePath);
// Publishing the file bytes continuously
while (_publisherRunning)
{
pubSocket.SendMoreFrame("File").SendFrame(fileBytes);
Thread.Sleep(1000);
}
}
NetMQConfig.Cleanup();
}
private void SubscriberWork()
{
// Creating a subscriber socket and connecting it to the specified address
using (var subSocket = new SubscriberSocket())
{
subSocket.Connect("tcp://localhost:5557");
subSocket.Subscribe("File");
// Receiving and saving the file bytes continuously
while (_subscriberRunning)
{
var topic = subSocket.ReceiveFrameString();
var fileBytes = subSocket.ReceiveFrameBytes();
SaveWavFile(fileBytes);
}
}
NetMQConfig.Cleanup();
}
private void SaveWavFile(byte[] fileBytes)
{
// Saving the received file bytes to the specified location
string filePath = "C:/Users/XXXXX/Desktop/0MQ Demo/Receiving Folder/test.wav";
File.WriteAllBytes(filePath, fileBytes);
}
}