in async connection getting two message when subscribing on redisrs

38 views Asked by At
use chrono::Local;
use futures_util::StreamExt as _;
use serde_json::{json, Value};
use socketioxide::extract::{Data, SocketRef};
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::Duration;
use tracing::{error, info};

use crate::redis::Redis;

#[derive(serde::Deserialize, Debug)]
pub struct Message {
    #[serde(rename = "roomId")]
    room_id: String,
    message: String,
}

pub struct SocketHandlers;

impl SocketHandlers {
    pub async fn handle_sockets(socket: SocketRef, redis: Arc<Mutex<Redis>>) {
        let redis_clone = Arc::clone(&redis);
        socket.on(
            "join-room",
            move |socket: SocketRef, Data::<String>(room)| {
                let redis_clone = Arc::clone(&redis_clone);
                async move {
                    handle_join_room(socket, room, redis_clone).await;
                }
            },
        );

        let redis_clone = Arc::clone(&redis);

        socket.on(
            "send-message",
            move |socket: SocketRef, Data::<Value>(message)| {
                let redis_clone = Arc::clone(&redis_clone);
                async move {
                    handle_send_message(socket, message, redis_clone).await;
                }
            },
        );

        let redis_clone = Arc::clone(&redis);
        if let Err(err) =
            redis_subscription_handler(redis_clone, socket, Duration::from_secs(1)).await
        {
            error!("Error in subscription handler: {}", err);
        }
    }
}

async fn handle_join_room(socket: SocketRef, room: String, redis: Arc<Mutex<Redis>>) {
    info!("Joining room: {:?}", room);

    let _ = socket.leave_all();
    if let Err(e) = socket.join(room.clone()) {
        error!("Error joining room: {:?}", e);
    }
    let message = json!({
        "room_id": room,
        "message": format!("{} joined the room", socket.id),
        "sender": "Server",
        "time": Local::now().to_string()
    });

    let str_msg = match serde_json::to_string(&message) {
        Ok(v) => v,
        Err(e) => {
            error!("Error parsing message: {:?}", e);
            return;
        }
    };

    let redis_guard = redis.lock().await;
    if let Err(e) = redis_guard.publish_message("Messages", &str_msg).await {
        error!("Error publishing message to Redis: {:?}", e);
    }
}

async fn handle_send_message(socket: SocketRef, message: Value, redis: Arc<Mutex<Redis>>) {
    info!("Message: {:?}", message);

    //parse message with serde_json
    let parsed_message: Message = match serde_json::from_value::<Message>(message.clone()) {
        Ok(v) => v,
        Err(e) => {
            error!("Error parsing message: {:?}", e);
            return;
        }
    };

    let message = json!({
        "room_id": parsed_message.room_id,
        "message": parsed_message.message,
        "sender": socket.id,
        "time": Local::now().to_string()
    });

    let str_msg = match serde_json::to_string(&message) {
        Ok(v) => v,
        Err(e) => {
            error!("Error parsing message: {:?}", e);
            return;
        }
    };

    let redis_guard = redis.lock().await;
    if let Err(e) = redis_guard.publish_message("Messages", &str_msg).await {
        error!("Error publishing message to Redis: {:?}", e);
    }
}

pub async fn redis_subscription_handler(
    redis: Arc<Mutex<Redis>>,
    socket: SocketRef,
    delay: Duration,
) -> Result<(), Box<dyn std::error::Error>> {
    let redis_guard = redis.lock().await;
    let mut pubsub = redis_guard
        .client
        .get_tokio_connection()
        .await
        .unwrap()
        .into_pubsub();
    let _ = pubsub.subscribe("Messages").await;

    // Spawn an asynchronous task
    tokio::spawn(async move {
        // Inside the spawned task, use a loop to continuously process messages
        while let Some(msg) = pubsub.on_message().next().await {
            // Parse the payload
            let payload: Value = match serde_json::from_str(&msg.get_payload::<String>().unwrap()) {
                Ok(payload) => payload,
                Err(err) => {
                    error!("Error parsing JSON payload: {}", err);
                    continue; // Skip to the next iteration if parsing fails
                }
            };
            info!("Received message: {:?}", payload);

            // Extract room_id from payload
            let room_id = match payload["room_id"].as_str() {
                Some(room_id) => room_id.to_string(),
                None => {
                    error!("Error: 'room_id' field missing in payload");
                    continue; // Skip to the next iteration if 'room_id' is missing
                }
            };

            // Emit the message to the socket
            socket.within(room_id).emit("message", payload).unwrap();

            // Introduce a delay between processing each message
            tokio::time::sleep(delay).await;
        }
    });

    Ok(())
}

This is the full code for the Redis subscription task

redis_subscription_handler function is handling all subscriptions and emitting messages but there is an issue when Redis has messages from the subscribed channel it emits two messages or the spawn thread loop works two times I can't get to work to what going wrong.

I have tried different things but not a single thing got me work around also can take the single connection to work around for publishing and subscribing

You can find the full code on github.

0

There are 0 answers