use actix_web::error::{ErrorInternalServerError as ServerError, ErrorUnauthorized}; use actix_web::middleware::Logger; use actix_web::{web, web::Bytes, App, HttpRequest, HttpResponse, HttpServer}; use futures::StreamExt; use redis::{AsyncCommands, Client}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::env; use std::sync::{Arc, Mutex}; use tokio::sync::broadcast; use tokio::task::JoinHandle; mod data; #[derive(Clone)] struct AppState { tasks: Arc>>>, redis: Client, } #[derive(Serialize, Deserialize)] struct RedisMessageData { payload: HashMap, kind: String, } async fn connect_handler( req: HttpRequest, state: web::Data, ) -> Result { let token = match req.headers().get("Authorization") { Some(val) => val.to_str().unwrap_or("").split(" ").last().unwrap_or(""), None => return Err(ErrorUnauthorized("Unauthorized")), }; let listener_id = data::get_auth_id(&token).await.map_err(|e| { eprintln!("TOKEN check failed: {}", e); ErrorUnauthorized("Unauthorized") })?; let mut con = state.redis.get_async_connection().await.map_err(|e| { eprintln!("Failed to get async connection: {}", e); ServerError("Internal Server Error") })?; con.sadd::<&str, &i32, usize>("authors-online", &listener_id) .await .map_err(|e| { eprintln!("Failed to add author to online list: {}", e); ServerError("Internal Server Error") })?; let chats: Vec = con .smembers::>(format!("chats_by_author/{}", listener_id)) .await .map_err(|e| { eprintln!("Failed to get chats by author: {}", e); ServerError("Internal Server Error") })?; let (tx, mut rx) = broadcast::channel(100); let state_clone = state.clone(); let handle = tokio::spawn(async move { let conn = state_clone.redis.get_async_connection().await.unwrap(); let mut pubsub = conn.into_pubsub(); let followers_channel = format!("followers:{}", listener_id); pubsub.subscribe(followers_channel.clone()).await.unwrap(); println!("'{}' subscribed", followers_channel); pubsub.subscribe("new_shout").await.unwrap(); println!("'new_shout' subscribed"); pubsub.subscribe("new_reaction").await.unwrap(); println!("'new_reaction' subscribed"); for chat_id in &chats { let channel_name = format!("chat:{}", chat_id); pubsub.subscribe(&channel_name).await.unwrap(); println!("'{}' subscribed", channel_name); } while let Some(msg) = pubsub.on_message().next().await { let message_str: String = msg.get_payload().unwrap(); let message_data: RedisMessageData = serde_json::from_str(&message_str).unwrap(); if msg.get_channel_name().starts_with("chat:") || msg.get_channel_name().starts_with("followers:") || data::is_fitting( listener_id, message_data.kind.to_string(), message_data.payload, ) .await .is_ok() { let send_result = tx.send(message_str); if send_result.is_err() { // remove author from online list let _ = con .srem::<&str, &i32, usize>("authors-online", &listener_id) .await .map_err(|e| { eprintln!("Failed to remove author from online list: {}", e); ServerError("Internal Server Error") }); break; } }; } }); state .tasks .lock() .unwrap() .insert(format!("{}", listener_id.clone()), handle); let server_event = rx.recv().await.map_err(|e| { eprintln!("Failed to receive server event: {}", e); ServerError("Internal Server Error") })?; let server_event_stream = futures::stream::once(async move { Ok::<_, actix_web::Error>(Bytes::from(server_event)) }); Ok(HttpResponse::Ok() .append_header(("content-type", "text/event-stream")) .streaming(server_event_stream)) } #[actix_web::main] async fn main() -> std::io::Result<()> { let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| String::from("redis://127.0.0.1/")); let client = redis::Client::open(redis_url.clone()).unwrap(); let tasks = Arc::new(Mutex::new(HashMap::new())); let state = AppState { tasks: tasks.clone(), redis: client.clone(), }; println!("Starting..."); HttpServer::new(move || { App::new() .wrap(Logger::default()) .app_data(web::Data::new(state.clone())) .route("/", web::get().to(connect_handler)) }) .bind("0.0.0.0:8080")? .run() .await }