use actix_web::{web, App, HttpResponse, HttpServer, web::Bytes}; use redis::{Client, AsyncCommands}; use std::collections::HashMap; use std::env; use futures::StreamExt; use tokio::sync::broadcast; use actix_web::error::{ErrorUnauthorized, ErrorInternalServerError as ServerError}; use std::sync::{Arc, Mutex}; use tokio::task::JoinHandle; mod data; #[derive(Clone)] struct AppState { tasks: Arc>>>, redis: Client, } async fn connect_handler( token: web::Path, state: web::Data, ) -> Result { let listener_id = data::get_auth_id(&token).await.map_err(|e| { eprintln!("TOKEN check failed: {}", e); ErrorUnauthorized("Unauthorized") })?; let mut con = state.redis.get_async_connection().await.map_err(|e| { eprintln!("Failed to get async connection: {}", e); ServerError("Internal Server Error") })?; con.sadd::<&str, &i32, usize>("authors-online", &listener_id).await.map_err(|e| { eprintln!("Failed to add author to online list: {}", e); ServerError("Internal Server Error") })?; let chats: Vec = con.smembers::>(format!("chats_by_author/{}", listener_id)).await.map_err(|e| { eprintln!("Failed to get chats by author: {}", e); ServerError("Internal Server Error") })?; let (tx, mut rx) = broadcast::channel(100); let state_clone = state.clone(); let handle = tokio::spawn(async move { let conn = state_clone.redis.get_async_connection().await.unwrap(); let mut pubsub = conn.into_pubsub(); pubsub.subscribe("new_follower").await.unwrap(); println!("'new_follower' subscribed"); pubsub.subscribe("new_shout").await.unwrap(); println!("'new_shout' subscribed"); pubsub.subscribe("new_reaction").await.unwrap(); println!("'new_reaction' subscribed"); for chat_id in &chats { let channel_name = format!("chat:{}", chat_id); pubsub.subscribe(&channel_name).await.unwrap(); println!("'{}' subscribed", channel_name); } while let Some(msg) = pubsub.on_message().next().await { let payload: HashMap = msg.get_payload().unwrap(); if data::is_fitting(listener_id, payload.clone()).await.is_ok() { let _ = tx.send(serde_json::to_string(&payload).unwrap()); }; } }); state.tasks .lock() .unwrap() .insert(format!("{}", listener_id.clone()), handle); let server_event = rx.recv().await.map_err(|e| { eprintln!("Failed to receive server event: {}", e); ServerError("Internal Server Error") })?; let server_event_stream = futures::stream::once(async move { Ok::<_, actix_web::Error>(Bytes::from(server_event)) }); Ok(HttpResponse::Ok() .append_header(("content-type", "text/event-stream")) .streaming(server_event_stream)) } async fn disconnect_handler( token: web::Path, state: web::Data, ) -> Result { let listener_id = data::get_auth_id(&token).await.map_err(|e| { eprintln!("TOKEN check failed: {}", e); ErrorUnauthorized("Unauthorized") })?; if let Some(handle) = state.tasks.lock().unwrap().remove(&format!("{}", listener_id)) { handle.abort(); let mut con = state.redis.get_async_connection().await.map_err(|e| { eprintln!("Failed to get async connection: {}", e); ServerError("Internal Server Error") })?; con.srem::<&str, &i32, usize>("authors-online", &listener_id).await.map_err(|e| { eprintln!("Failed to remove author from online list: {}", e); ServerError("Internal Server Error") })?; } Ok(HttpResponse::Ok().finish()) } #[actix_web::main] async fn main() -> std::io::Result<()> { let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| String::from("redis://127.0.0.1/")); let client = redis::Client::open(redis_url.clone()).unwrap(); let tasks = Arc::new(Mutex::new(HashMap::new())); let state = AppState { tasks: tasks.clone(), redis: client.clone(), }; println!("Redis client initialized"); HttpServer::new(move || { println!("Webserver initialized"); App::new() .app_data(web::Data::new(state.clone())) .service( web::scope("") .route("/", web::get().to(connect_handler)) ) }) .bind("127.0.0.1:8080")? .run() .await }