use actix_web::error::{ErrorInternalServerError as ServerError, ErrorUnauthorized}; use actix_web::middleware::Logger; use actix_web::{web, web::Bytes, App, HttpRequest, HttpResponse, HttpServer}; use futures::StreamExt; use redis::{AsyncCommands, Client}; use serde::{Deserialize, Serialize}; use serde_json::Value; use uuid::Uuid; use std::collections::HashMap; use std::env; use std::sync::{Arc, Mutex}; use tokio::sync::broadcast; use tokio::task::JoinHandle; mod data; #[derive(Clone)] struct AppState { tasks: Arc>>>, redis: Client, } #[derive(Serialize, Deserialize, Clone, Debug)] struct RedisMessageData { payload: HashMap, action: String } #[derive(Serialize, Deserialize, Clone, Debug, Display)] pub struct SSEMessageData { payload: HashMap, action: String, entity: String } async fn connect_handler( req: HttpRequest, state: web::Data, ) -> Result { let token = match req.headers().get("Authorization") { Some(val) => val.to_str().unwrap_or("").split(" ").last().unwrap_or(""), None => match req.match_info().get("token") { Some(val) => val, None => match req.query_string().split('=').last() { Some(val) => val, None => return Err(ErrorUnauthorized("Unauthorized")), }, }, }; let listener_id = data::get_auth_id(&token).await.map_err(|e| { eprintln!("TOKEN check failed: {}", e); ErrorUnauthorized("Unauthorized") })?; let mut con = state.redis.get_async_connection().await.map_err(|e| { eprintln!("Failed to get async connection: {}", e); ServerError("Internal Server Error") })?; con.sadd::<&str, &i32, usize>("authors-online", &listener_id) .await .map_err(|e| { eprintln!("Failed to add author to online list: {}", e); ServerError("Internal Server Error") })?; let chats: Vec = con .smembers::>(format!("chats_by_author/{}", listener_id)) .await .map_err(|e| { eprintln!("Failed to get chats by author: {}", e); ServerError("Internal Server Error") })?; let (tx, rx) = broadcast::channel(100); let state_clone = state.clone(); let handle = tokio::spawn(async move { let conn = state_clone.redis.get_async_connection().await.unwrap(); let mut pubsub = conn.into_pubsub(); let followers_channel = format!("follower:{}", listener_id); pubsub.subscribe(followers_channel.clone()).await.unwrap(); println!("'{}' pubsub subscribed", followers_channel); pubsub.subscribe("shout").await.unwrap(); println!("'shout' pubsub subscribed"); pubsub.subscribe("reaction").await.unwrap(); println!("'reaction' pubsub subscribed"); for chat_id in &chats { let channel_name = format!("chat:{}", chat_id); pubsub.subscribe(&channel_name).await.unwrap(); println!("'{}' subscribed", channel_name); } while let Some(msg) = pubsub.on_message().next().await { let redis_message_str: String = msg.get_payload().unwrap(); let redis_message_data: RedisMessageData = serde_json::from_str(&redis_message_str).unwrap(); let prepared_message_data = SSEMessageData { payload: redis_message_data.payload, action: redis_message_data.action, entity: msg.get_channel_name() .to_owned() .split(":") .next() .unwrap_or("") .to_string() }; if data::is_fitting( listener_id, prepared_message_data.clone(), ) .await .is_ok() { let prepared_message_str = serde_json::to_string(&prepared_message_data).unwrap(); let send_result = tx.send(prepared_message_str.clone()); if send_result.is_err() { // remove author from online list let _ = con .srem::<&str, &i32, usize>("authors-online", &listener_id) .await .map_err(|e| { eprintln!("Failed to remove author from online list: {}", e); ServerError("Internal Server Error") }); break; } else { println!("[handler] message handled {}", prepared_message_str); } }; } }); state .tasks .lock() .unwrap() .insert(format!("{}", listener_id.clone()), handle); let server_event_stream = futures::stream::unfold(rx, |mut rx| async { let result = rx.recv().await; match result { Ok(server_event) => { // Generate a random UUID as the event ID let event_id = format!("{}", Uuid::new_v4()); let formatted_server_event = format!( "id: {}\ndata: {}\n\n", event_id, server_event ); Some((Ok::<_, actix_web::Error>(Bytes::from(formatted_server_event)), rx)) }, Err(_) => None, } }); Ok(HttpResponse::Ok() .append_header(("content-type", "text/event-stream")) .streaming(server_event_stream)) } #[actix_web::main] async fn main() -> std::io::Result<()> { let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| String::from("redis://127.0.0.1/")); let client = redis::Client::open(redis_url.clone()).unwrap(); let tasks = Arc::new(Mutex::new(HashMap::new())); let state = AppState { tasks: tasks.clone(), redis: client.clone(), }; println!("Starting..."); HttpServer::new(move || { App::new() .wrap(Logger::default()) .app_data(web::Data::new(state.clone())) .route("/", web::get().to(connect_handler)) .route("/{token}", web::get().to(connect_handler)) }) .bind("0.0.0.0:8080")? .run() .await }