use actix_sse::Sse; use actix_web::{web, App, HttpServer, Responder}; use tokio::sync::broadcast::Receiver; use std::collections::HashMap; use std::sync::Mutex; use std::env; use reqwest::Client; use serde::{Deserialize, Serialize}; #[derive(Serialize)] struct ValidateJWTTokenInput { token_type: String, token: String, roles: Option>, } #[derive(Deserialize)] struct ValidateJWTTokenResponse { is_valid: bool, claims: HashMap, } #[derive(Deserialize)] struct ProfileResponse { data: ProfileData, } #[derive(Deserialize)] struct ProfileData { profile: Profile, } #[derive(Deserialize)] struct Profile { id: i32, } #[derive(Deserialize)] struct AuthorResponse { data: AuthorData, } #[derive(Deserialize)] struct AuthorData { author: Author, } #[derive(Deserialize)] struct Author { id: i32, } async fn get_author_id(user_id: i32) -> Result> { let client = Client::new(); let query = format!(r#"{{ author(user: {}) {{ id }} }}"#, user_id); let api_base = env::var("API_BASE").unwrap(); let response = client.post(api_base) .body(query) .send() .await?; let response_body: AuthorResponse = response.json().await?; Ok(response_body.data.author.id) } async fn get_user_id(token: &str) -> Result> { let client = Client::new(); let query = r#"{ profile { id } }"#; let authorizer_url = env::var("AUTHORIZER_URL").unwrap(); let response = client.post(authorizer_url) .bearer_auth(token) .body(query) .send() .await?; let response_body: ProfileResponse = response.json().await?; Ok(response_body.data.profile.id) } async fn sse_handler(receiver: web::Data>, token: web::Path) -> impl Responder { let user_id = match get_user_id(&token).await { Ok(id) => id, Err(e) => { eprintln!("Failed to validate token: {}", e); return actix_web::HttpResponse::Unauthorized().finish(); } }; let author_id = match get_author_id(user_id).await.unwrap(); let mut receivers = receiver.lock().unwrap(); let redis_url = env::var("REDIS_URL").unwrap(); let rx = receivers.entry(session_id.into_inner()).or_insert_with(|| { let (tx, rx) = broadcast::channel(100); let _handle = tokio::spawn(async move { let client = redis::Client::open(redis_url).unwrap(); let mut conn = client.get_async_connection().await.unwrap(); let mut pubsub = conn.into_pubsub(); // Subscribe to multiple channels pubsub.subscribe("new_follower").await.unwrap(); // follow to author pubsub.subscribe("new_reaction").await.unwrap(); // react on post pubsub.subscribe("new_shout").await.unwrap(); // post in subscribed topic, author or community pubsub.subscribe("new_approval").await.unwrap(); // post approved by community while let Some(msg) = pubsub.on_message().next().await { let payload: String = msg.get_payload().unwrap(); tx.send(payload).unwrap(); } }); rx }); Sse::new(rx.subscribe()) } #[actix_web::main] async fn main() -> std::io::Result<()> { let receivers: web::Data>>> = web::Data::new(Mutex::new(HashMap::new())); HttpServer::new(move || { App::new() .app_data(receivers.clone()) .route("/sse/{token}", web::get().to(sse_handler)) }) .bind("127.0.0.1:8080")? .run() .await }