diff --git a/src/data.rs b/src/data.rs new file mode 100644 index 0000000..7307cc7 --- /dev/null +++ b/src/data.rs @@ -0,0 +1,83 @@ + +use reqwest::Client as HTTPClient; +use serde::{Serialize, Deserialize}; +use serde_json::Value; +use std::error::Error; +use std::env; +use uuid::Uuid; +use chrono::Utc; + + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +enum PayloadKind { + NewMessage, + NewFollower, + NewShout, + NewApproval, + NewComment, + NewRate, +} + +#[derive(Debug, Serialize, Deserialize)] +struct Payload { + chat_id: Option, + shout_id: Option, + author_id: Option, + topic_id: Option, + reaction_id: Option, + community_id: Option, + kind: PayloadKind, + body: String, +} + +pub async fn get_auth_id(token: &str) -> Result> { + let api_base = env::var("API_BASE")?; + let gql = match api_base.contains("v2") { + true => r#"mutation { getSession { user { id } } }"#, // v2 + _ => r#"query { sessiom { user { id } } }"# // authorizer + }; + let client = HTTPClient::new(); + let response = client + .post(api_base) + .bearer_auth(token) // NOTE: auth token is here + .body(gql) + .send() + .await?; + let response_body: Value = response.json().await?; + let id = response_body["data"]["getSession"]["user"]["id"] + .as_i64() + .ok_or("Failed to get user id by token")? as i32; + Ok(id) +} + +pub async fn create_first_chat(author_id: i32, con: &mut redis::aio::Connection) -> Result, Box> { + let chat_id = Uuid::new_v4().to_string(); + let members = vec![author_id.to_string(), "1".to_string()]; + let timestamp = Utc::now().timestamp(); + + let chat = serde_json::json!({ + "id": chat_id.clone(), + "admins": members.clone(), + "members": members.clone(), + "title": "", + "createdBy": author_id, + "createdAt": timestamp, + "updatedAt": timestamp, + }); + + let _: () = redis::pipe() + .atomic() + .cmd("SADD") + .arg(format!("chats_by_author/{}", author_id)) + .arg(&chat_id) + .ignore() + .set(format!("chats/{}", chat_id), chat.to_string()) + .ignore() + .set(format!("chats/{}/next_message_id", chat_id), "0") + .ignore() + .query_async(con) + .await?; + + Ok(vec![chat_id]) +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 1133500..1886e23 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,96 +1,18 @@ use actix_web::{web, App, HttpResponse, HttpServer, Responder, web::Bytes}; use redis::{Client, AsyncCommands}; -use reqwest::Client as HTTPClient; -use serde::{Serialize, Deserialize}; -use serde_json::Value; use std::collections::HashMap; use std::env; -use std::error::Error; use futures::StreamExt; use tokio::sync::broadcast::{self, Receiver}; -use uuid::Uuid; -use chrono::Utc; -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -enum PayloadKind { - NewMessage, - NewFollower, - NewShout, - NewApproval, - NewComment, - NewRate -} - -#[derive(Debug, Serialize, Deserialize)] -struct Payload { - chat_id: Option, - shout_id: Option, - author_id: Option, - topic_id: Option, - reaction_id: Option, - community_id: Option, - kind: PayloadKind, - body: String, -} - -async fn get_auth_id(token: &str) -> Result> { - let api_base = env::var("API_BASE")?; - let gql = match api_base.contains("v2") { - true => r#"mutation { getSession { user { id } } }"#, // v2 - _ => r#"query { sessiom { user { id } } }"# // authorizer - }; - let client = HTTPClient::new(); - let response = client - .post(api_base) - .bearer_auth(token) // NOTE: auth token is here - .body(gql) - .send() - .await?; - let response_body: Value = response.json().await?; - let id = response_body["data"]["getSession"]["user"]["id"] - .as_i64() - .ok_or("Failed to get user id by token")? as i32; - Ok(id) -} - -async fn create_first_chat(author_id: i32, con: &mut redis::aio::Connection) -> Result, Box> { - let chat_id = Uuid::new_v4().to_string(); - let members = vec![author_id.to_string(), "1".to_string()]; - let timestamp = Utc::now().timestamp(); - - let chat = serde_json::json!({ - "id": chat_id.clone(), - "admins": members.clone(), - "members": members.clone(), - "title": "", - "createdBy": author_id, - "createdAt": timestamp, - "updatedAt": timestamp, - }); - - let _: () = redis::pipe() - .atomic() - .cmd("SADD") - .arg(format!("chats_by_author/{}", author_id)) - .arg(&chat_id) - .ignore() - .set(format!("chats/{}", chat_id), chat.to_string()) - .ignore() - .set(format!("chats/{}/next_message_id", chat_id), "0") - .ignore() - .query_async(con) - .await?; - - Ok(vec![chat_id]) -} +pub mod data; async fn sse_handler( token: web::Path, rx: web::Data>, redis: web::Data, ) -> impl Responder { - let author_id = match get_auth_id(&token).await { + let author_id = match data::get_auth_id(&token).await { Ok(id) => id, Err(e) => { eprintln!("TOKEN check failed: {}", e); @@ -117,7 +39,7 @@ async fn sse_handler( let chats: Vec = match con.smembers::>(format!("chats_by_author/{}", author_id)).await { Ok(chats) => { if chats.is_empty() { - match create_first_chat(author_id, &mut con).await { + match data::create_first_chat(author_id, &mut con).await { Ok(chat) => chat, Err(e) => { eprintln!("Failed to create first chat: {}", e); @@ -130,7 +52,7 @@ async fn sse_handler( }, Err(e) => { eprintln!("Failed to get chats by author: {}", e); - match create_first_chat(author_id, &mut con).await { + match data::create_first_chat(author_id, &mut con).await { Ok(chat) => chat, Err(e) => { eprintln!("Failed to create first chat: {}", e); @@ -164,7 +86,10 @@ async fn sse_handler( } }; - let server_event = match rx.as_ref().clone().recv().await { + // FIXME: cannot borrow data in an `Arc` as mutable + // trait `DerefMut` is required to modify through a dereference, + // but it is not implemented for `Arc>` + let server_event = match rx.recv().await { Ok(event) => event, Err(e) => { eprintln!("Failed to receive server event: {}", e);