diff --git a/src/main.rs b/src/main.rs index 0de1795..ca2e00a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -81,15 +81,8 @@ async fn sse_handler( pubsub.subscribe(format!("message:{}", chat_id)).await.unwrap(); } - let server_event = rx.get_ref().into_stream().map(|result| { - match result { - Ok(payload) => { - let payload: Payload = serde_json::from_str(&payload).unwrap(); - Ok::<_, actix_web::Error>(web::Bytes::from(format!("data: {:?}\n\n", payload.chat_id))) - } - Err(_) => Err(actix_web::Error::from("An error occurred")), - } - }); + let mut rx = rx.get_ref().subscribe(); + let server_event = rx.recv().await.unwrap(); let _: () = con .srem("authors-online", &author_id) @@ -114,7 +107,7 @@ async fn main() -> std::io::Result<()> { pubsub.subscribe("new_shout").await.unwrap(); pubsub.subscribe("new_reaction").await.unwrap(); - while let Some(msg) = pubsub.on_message().await { + while let Some(msg) = pubsub.on_message().next().await { let payload: HashMap = msg.get_payload().unwrap(); tx.send(serde_json::to_string(&payload).unwrap()).unwrap(); } @@ -122,7 +115,7 @@ async fn main() -> std::io::Result<()> { HttpServer::new(move || { App::new() - .app_data(web::Data::new(rx.clone())) + .app_data(web::Data::new(rx.subscribe())) .app_data(web::Data::new(client.clone())) .route("/aware/{token}", web::get().to(sse_handler)) })