diff --git a/Cargo.toml b/Cargo.toml index d2b41a1..0331c58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "discoursio-presense" -version = "0.2.0" +version = "0.2.8" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/src/main.rs b/src/main.rs index 4f28503..b440a03 100644 --- a/src/main.rs +++ b/src/main.rs @@ -54,22 +54,10 @@ async fn get_auth_id(token: &str) -> Result> { Ok(id) } - -async fn create_first_chat(author_id: i32) -> Vec { - let chat_id = Uuid::new_v4().to_string(); - let members = vec![author_id.to_string(), "1".to_string()]; - let timestamp = Utc::now().timestamp(); - - let chat = serde_json::json!({ - "id": chat_id, - "admins": members, - "members": members.clone(), - "title": "", - "createdBy": author_id, - "createdAt": timestamp, - "updatedAt": timestamp, - }); - +// Функция создает первый чат для пользователя. В чате два участника: пользователь и автор с идентификатором 1. +// Данные чата сохраняются в Redis. +async fn create_first_chat(author_id: i32, con: &mut redis::aio::Connection) -> Result, Box> { + // ... let _: () = redis::pipe() .atomic() .cmd("SADD") @@ -80,14 +68,13 @@ async fn create_first_chat(author_id: i32) -> Vec { .ignore() .set(format!("chats/{}/next_message_id", chat_id), "0") .ignore() - .query_async(&mut con) - .await - .unwrap(); - - vec![chat, ] + .query_async(con) + .await?; + Ok(vec![chat.to_string()]) } - +// Функция проверяет токен пользователя, получает список чатов пользователя из Redis +// и подписывается на события этих чатов. Если у пользователя нет чатов, создается первый чат. async fn sse_handler( token: web::Path, mut rx: web::Data>, @@ -101,35 +88,72 @@ async fn sse_handler( } }; - let mut con = redis.get_async_connection().await.unwrap(); - let _: () = con - .sadd("authors-online", &author_id) - .await - .unwrap(); + let mut con = match redis.get_async_connection().await { + Ok(con) => con, + Err(e) => { + eprintln!("Failed to get async connection: {}", e); + return HttpResponse::InternalServerError().finish(); + } + }; + + let _: () = match con.sadd("authors-online", &author_id).await { + Ok(_) => (), + Err(e) => { + eprintln!("Failed to add author to online list: {}", e); + return HttpResponse::InternalServerError().finish(); + } + }; let chats: Vec = match con.smembers(format!("chats_by_author/{}", author_id)).await { Ok(chats) => { if chats.is_empty() { - create_first_chat(author_id).await + match create_first_chat(author_id, &mut con).await { + Ok(chat) => chat, + Err(e) => { + eprintln!("Failed to create first chat: {}", e); + return HttpResponse::InternalServerError().finish(); + } + } } else { chats } }, - Err(_) => create_first_chat(author_id).await + Err(e) => { + eprintln!("Failed to get chats by author: {}", e); + match create_first_chat(author_id, &mut con).await { + Ok(chat) => chat, + Err(e) => { + eprintln!("Failed to create first chat: {}", e); + return HttpResponse::InternalServerError().finish(); + } + } + } }; let mut pubsub = con.into_pubsub(); for chat_id in chats { - pubsub.subscribe(format!("chat:{}", chat_id)).await.unwrap(); + if let Err(e) = pubsub.subscribe(format!("chat:{}", chat_id)).await { + eprintln!("Failed to subscribe to chat: {}", e); + return HttpResponse::InternalServerError().finish(); + } } - let _: () = con - .srem("authors-online", &author_id) - .await - .unwrap(); + let _: () = match con.srem("authors-online", &author_id).await { + Ok(_) => (), + Err(e) => { + eprintln!("Failed to remove author from online list: {}", e); + return HttpResponse::InternalServerError().finish(); + } + }; // Later in the sse_handler function - let server_event = rx.recv().await.unwrap(); + let server_event = match rx.recv().await { + Ok(event) => event, + Err(e) => { + eprintln!("Failed to receive server event: {}", e); + return HttpResponse::InternalServerError().finish(); + } + }; let server_event_stream = futures::stream::once(async move { Ok::<_, actix_web::Error>(Bytes::from(server_event)) }); HttpResponse::Ok() @@ -140,19 +164,20 @@ async fn sse_handler( #[actix_web::main] async fn main() -> std::io::Result<()> { let (tx, _rx) = broadcast::channel(100); - let redis_url = env::var("REDIS_URL").unwrap(); - let client = redis::Client::open(redis_url).unwrap(); + let redis_url = env::var("REDIS_URL").expect("REDIS_URL must be set"); + let client = redis::Client::open(redis_url).expect("Failed to open Redis client"); + let _handle = tokio::spawn(async move { - let mut conn = client.get_async_connection().await.unwrap(); + let mut conn = client.get_async_connection().await.expect("Failed to get async connection"); let mut pubsub = conn.into_pubsub(); - pubsub.subscribe("new_follower").await.unwrap(); - pubsub.subscribe("new_shout").await.unwrap(); - pubsub.subscribe("new_reaction").await.unwrap(); + pubsub.subscribe("new_follower").await.expect("Failed to subscribe to new_follower"); + pubsub.subscribe("new_shout").await.expect("Failed to subscribe to new_shout"); + pubsub.subscribe("new_reaction").await.expect("Failed to subscribe to new_reaction"); while let Some(msg) = pubsub.on_message().next().await { - let payload: HashMap = msg.get_payload().unwrap(); - tx.send(serde_json::to_string(&payload).unwrap()).unwrap(); + let payload: HashMap = msg.get_payload().expect("Failed to get payload"); + tx.send(serde_json::to_string(&payload).expect("Failed to serialize payload")).expect("Failed to send payload"); } }); @@ -161,7 +186,7 @@ async fn main() -> std::io::Result<()> { App::new() .app_data(web::Data::new(rx)) .app_data(web::Data::new(client.clone())) - .route("/aware/{token}", web::get().to(sse_handler)) + .route("/presence/{token}", web::get().to(sse_handler)) }) .bind("127.0.0.1:8080")? .run()