quoter/src/main.rs
2023-10-02 15:35:16 +03:00

126 lines
3.7 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use actix_web::{web, App, HttpResponse, HttpServer, Responder};
use redis::{Client, AsyncCommands};
use reqwest::Client as HTTPClient;
use serde::{Serialize, Deserialize};
use serde_json::Value;
use std::collections::HashMap;
use std::env;
use std::error::Error;
use futures::{StreamExt, FutureExt};
use tokio::sync::broadcast::{self, Receiver};
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
enum PayloadKind {
NewMessage,
NewFollower,
NewShout,
NewApproval,
NewComment,
NewRate
}
#[derive(Debug, Serialize, Deserialize)]
struct Payload {
chat_id: Option<String>,
shout_id: Option<i32>,
author_id: Option<i32>,
topic_id: Option<i32>,
reaction_id: Option<i32>,
community_id: Option<i32>,
kind: PayloadKind,
body: String,
}
async fn get_auth_id(token: &str) -> Result<i32, Box<dyn Error>> {
let api_base = env::var("API_BASE")?;
let gql = match api_base.contains("v2") {
true => r#"mutation { getSession { user { id } } }"#, // v2
_ => r#"query { sessiom { user { id } } }"# // authorizer
};
let client = HTTPClient::new();
let response = client
.post(api_base)
.bearer_auth(token) // NOTE: auth token is here
.body(gql)
.send()
.await?;
let response_body: Value = response.json().await?;
let id = response_body["data"]["getSession"]["user"]["id"]
.as_i64()
.ok_or("Failed to get user id by token")? as i32;
Ok(id)
}
async fn sse_handler(
token: web::Path<String>,
rx: web::Data<Receiver<String>>,
redis: web::Data<Client>,
) -> impl Responder {
let author_id = match get_auth_id(&token).await {
Ok(id) => id,
Err(e) => {
eprintln!("Не удалось проверить токен: {}", e);
return HttpResponse::Unauthorized().finish();
}
};
let mut con = redis.get_async_connection().await.unwrap();
let _: () = con
.sadd("authors-online", &author_id)
.await
.unwrap();
let chats: Vec<String> = con
.smembers(format!("chats_by_author/{}", author_id))
.await
.unwrap();
let mut pubsub = con.into_pubsub();
for chat_id in chats {
pubsub.subscribe(format!("message:{}", chat_id)).await.unwrap();
}
let mut rx = rx.get_ref().subscribe();
let server_event = rx.recv().await.unwrap();
let _: () = con
.srem("authors-online", &author_id)
.await
.unwrap();
HttpResponse::Ok()
.append_header(("content-type", "text/event-stream"))
.streaming(server_event)
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let (tx, mut rx) = broadcast::channel(100);
let redis_url = env::var("REDIS_URL").unwrap();
let client = redis::Client::open(redis_url).unwrap();
let _handle = tokio::spawn(async move {
let mut conn = client.get_async_connection().await.unwrap();
let mut pubsub = conn.into_pubsub();
pubsub.subscribe("new_follower").await.unwrap();
pubsub.subscribe("new_shout").await.unwrap();
pubsub.subscribe("new_reaction").await.unwrap();
while let Some(msg) = pubsub.on_message().next().await {
let payload: HashMap<String, String> = msg.get_payload().unwrap();
tx.send(serde_json::to_string(&payload).unwrap()).unwrap();
}
});
HttpServer::new(move || {
let rx = tx.subscribe();
App::new()
.app_data(web::Data::new(rx))
.app_data(web::Data::new(client.clone()))
.route("/aware/{token}", web::get().to(sse_handler))
})
.bind("127.0.0.1:8080")?
.run()
.await
}