quoter/src/main.rs

148 lines
5.2 KiB
Rust
Raw Normal View History

2023-10-16 14:44:19 +00:00
use actix_web::error::{ErrorInternalServerError as ServerError, ErrorUnauthorized};
2023-10-11 15:19:56 +00:00
use actix_web::middleware::Logger;
2023-10-16 14:44:19 +00:00
use actix_web::{web, web::Bytes, App, HttpRequest, HttpResponse, HttpServer};
use futures::StreamExt;
use redis::{AsyncCommands, Client};
2023-10-16 13:22:54 +00:00
use serde::{Deserialize, Serialize};
2023-10-16 16:40:45 +00:00
use serde_json::Value;
2023-09-27 23:08:48 +00:00
use std::collections::HashMap;
use std::env;
2023-10-06 14:57:54 +00:00
use std::sync::{Arc, Mutex};
2023-10-16 14:44:19 +00:00
use tokio::sync::broadcast;
2023-10-06 14:57:54 +00:00
use tokio::task::JoinHandle;
2023-10-03 10:43:21 +00:00
mod data;
2023-10-02 13:47:22 +00:00
2023-10-06 14:57:54 +00:00
#[derive(Clone)]
struct AppState {
tasks: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
redis: Client,
}
2023-10-17 14:57:10 +00:00
#[derive(Serialize, Deserialize, Clone, Debug)]
2023-10-16 13:22:54 +00:00
struct RedisMessageData {
2023-10-16 16:40:45 +00:00
payload: HashMap<String, Value>,
2023-10-16 14:44:19 +00:00
kind: String,
2023-10-16 13:22:54 +00:00
}
2023-10-06 14:57:54 +00:00
async fn connect_handler(
2023-10-11 20:52:33 +00:00
req: HttpRequest,
2023-10-06 14:57:54 +00:00
state: web::Data<AppState>,
2023-10-03 13:29:31 +00:00
) -> Result<HttpResponse, actix_web::Error> {
2023-10-11 20:52:33 +00:00
let token = match req.headers().get("Authorization") {
2023-10-16 14:44:19 +00:00
Some(val) => val.to_str().unwrap_or("").split(" ").last().unwrap_or(""),
2023-10-11 20:52:33 +00:00
None => return Err(ErrorUnauthorized("Unauthorized")),
};
2023-10-03 13:29:31 +00:00
let listener_id = data::get_auth_id(&token).await.map_err(|e| {
eprintln!("TOKEN check failed: {}", e);
ErrorUnauthorized("Unauthorized")
})?;
2023-10-03 10:43:21 +00:00
2023-10-06 14:57:54 +00:00
let mut con = state.redis.get_async_connection().await.map_err(|e| {
2023-10-03 13:29:31 +00:00
eprintln!("Failed to get async connection: {}", e);
ServerError("Internal Server Error")
})?;
2023-10-02 18:55:10 +00:00
2023-10-16 14:44:19 +00:00
con.sadd::<&str, &i32, usize>("authors-online", &listener_id)
.await
.map_err(|e| {
eprintln!("Failed to add author to online list: {}", e);
ServerError("Internal Server Error")
})?;
let chats: Vec<String> = con
.smembers::<String, Vec<String>>(format!("chats_by_author/{}", listener_id))
.await
.map_err(|e| {
eprintln!("Failed to get chats by author: {}", e);
ServerError("Internal Server Error")
})?;
2023-09-28 10:12:07 +00:00
2023-10-03 10:43:21 +00:00
let (tx, mut rx) = broadcast::channel(100);
2023-10-06 15:07:24 +00:00
let state_clone = state.clone();
2023-10-06 14:57:54 +00:00
let handle = tokio::spawn(async move {
2023-10-06 15:07:24 +00:00
let conn = state_clone.redis.get_async_connection().await.unwrap();
2023-10-03 10:43:21 +00:00
let mut pubsub = conn.into_pubsub();
2023-10-16 14:44:19 +00:00
let followers_channel = format!("followers:{}", listener_id);
2023-10-16 13:22:54 +00:00
pubsub.subscribe(followers_channel.clone()).await.unwrap();
println!("'{}' subscribed", followers_channel);
2023-10-03 13:29:31 +00:00
pubsub.subscribe("new_shout").await.unwrap();
2023-10-06 12:09:31 +00:00
println!("'new_shout' subscribed");
2023-10-03 13:29:31 +00:00
pubsub.subscribe("new_reaction").await.unwrap();
2023-10-06 12:09:31 +00:00
println!("'new_reaction' subscribed");
2023-10-03 13:29:31 +00:00
2023-10-03 10:43:21 +00:00
for chat_id in &chats {
let channel_name = format!("chat:{}", chat_id);
2023-10-03 13:29:31 +00:00
pubsub.subscribe(&channel_name).await.unwrap();
2023-10-06 12:10:17 +00:00
println!("'{}' subscribed", channel_name);
2023-10-03 05:02:11 +00:00
}
2023-10-03 10:43:21 +00:00
while let Some(msg) = pubsub.on_message().next().await {
2023-10-16 13:22:54 +00:00
let message_str: String = msg.get_payload().unwrap();
let message_data: RedisMessageData = serde_json::from_str(&message_str).unwrap();
2023-10-17 14:51:36 +00:00
2023-10-16 14:44:19 +00:00
if msg.get_channel_name().starts_with("chat:")
|| msg.get_channel_name().starts_with("followers:")
|| data::is_fitting(
listener_id,
message_data.kind.to_string(),
message_data.payload,
)
.await
.is_ok()
{
2023-10-17 11:03:41 +00:00
let send_result = tx.send(message_str.clone());
2023-10-16 13:22:54 +00:00
if send_result.is_err() {
2023-10-16 14:44:19 +00:00
// remove author from online list
let _ = con
.srem::<&str, &i32, usize>("authors-online", &listener_id)
.await
.map_err(|e| {
eprintln!("Failed to remove author from online list: {}", e);
ServerError("Internal Server Error")
});
2023-10-16 13:22:54 +00:00
break;
2023-10-17 11:01:29 +00:00
} else {
println!("[handler] message handled {}", message_str);
2023-10-16 13:22:54 +00:00
}
2023-10-03 13:29:31 +00:00
};
2023-10-03 04:52:48 +00:00
}
2023-10-03 10:43:21 +00:00
});
2023-10-16 14:44:19 +00:00
state
.tasks
2023-10-06 14:57:54 +00:00
.lock()
.unwrap()
.insert(format!("{}", listener_id.clone()), handle);
2023-10-03 04:52:48 +00:00
2023-10-16 20:32:28 +00:00
let server_event_stream = futures::stream::unfold(rx, |mut rx| async {
let result = rx.recv().await;
match result {
Ok(server_event) => Some((Ok::<_, actix_web::Error>(Bytes::from(server_event)), rx)),
Err(_) => None,
}
});
2023-10-02 15:27:55 +00:00
2023-10-03 13:29:31 +00:00
Ok(HttpResponse::Ok()
2023-10-02 11:24:45 +00:00
.append_header(("content-type", "text/event-stream"))
2023-10-03 13:29:31 +00:00
.streaming(server_event_stream))
2023-09-27 23:08:48 +00:00
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
2023-10-03 13:29:31 +00:00
let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| String::from("redis://127.0.0.1/"));
2023-10-06 12:10:54 +00:00
let client = redis::Client::open(redis_url.clone()).unwrap();
2023-10-06 14:57:54 +00:00
let tasks = Arc::new(Mutex::new(HashMap::new()));
let state = AppState {
tasks: tasks.clone(),
redis: client.clone(),
};
2023-10-11 20:52:33 +00:00
println!("Starting...");
2023-09-27 23:08:48 +00:00
HttpServer::new(move || {
App::new()
2023-10-11 20:03:12 +00:00
.wrap(Logger::default())
2023-10-06 14:57:54 +00:00
.app_data(web::Data::new(state.clone()))
2023-10-13 20:58:06 +00:00
.route("/", web::get().to(connect_handler))
2023-09-27 23:08:48 +00:00
})
2023-10-12 12:02:47 +00:00
.bind("0.0.0.0:8080")?
2023-09-27 23:08:48 +00:00
.run()
.await
2023-10-16 14:44:19 +00:00
}