2023-10-16 14:44:19 +00:00
|
|
|
use actix_web::error::{ErrorInternalServerError as ServerError, ErrorUnauthorized};
|
2023-10-11 15:19:56 +00:00
|
|
|
use actix_web::middleware::Logger;
|
2023-10-16 14:44:19 +00:00
|
|
|
use actix_web::{web, web::Bytes, App, HttpRequest, HttpResponse, HttpServer};
|
|
|
|
use futures::StreamExt;
|
|
|
|
use redis::{AsyncCommands, Client};
|
2023-10-16 13:22:54 +00:00
|
|
|
use serde::{Deserialize, Serialize};
|
2023-10-16 16:40:45 +00:00
|
|
|
use serde_json::Value;
|
2023-10-19 14:43:00 +00:00
|
|
|
use uuid::Uuid;
|
2023-09-27 23:08:48 +00:00
|
|
|
use std::collections::HashMap;
|
|
|
|
use std::env;
|
2023-10-06 14:57:54 +00:00
|
|
|
use std::sync::{Arc, Mutex};
|
2023-10-16 14:44:19 +00:00
|
|
|
use tokio::sync::broadcast;
|
2023-10-06 14:57:54 +00:00
|
|
|
use tokio::task::JoinHandle;
|
2023-10-03 10:43:21 +00:00
|
|
|
mod data;
|
2023-10-02 13:47:22 +00:00
|
|
|
|
2023-10-06 14:57:54 +00:00
|
|
|
#[derive(Clone)]
|
|
|
|
struct AppState {
|
|
|
|
tasks: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
|
|
|
|
redis: Client,
|
|
|
|
}
|
|
|
|
|
2023-10-17 14:57:10 +00:00
|
|
|
#[derive(Serialize, Deserialize, Clone, Debug)]
|
2023-10-16 13:22:54 +00:00
|
|
|
struct RedisMessageData {
|
2023-10-16 16:40:45 +00:00
|
|
|
payload: HashMap<String, Value>,
|
2023-10-19 14:43:00 +00:00
|
|
|
action: String
|
|
|
|
}
|
|
|
|
|
2023-10-20 17:52:26 +00:00
|
|
|
#[derive(Serialize, Deserialize, Clone, Debug, Display)]
|
2023-10-19 14:43:00 +00:00
|
|
|
pub struct SSEMessageData {
|
|
|
|
payload: HashMap<String, Value>,
|
|
|
|
action: String,
|
|
|
|
entity: String
|
2023-10-16 13:22:54 +00:00
|
|
|
}
|
|
|
|
|
2023-10-06 14:57:54 +00:00
|
|
|
async fn connect_handler(
|
2023-10-11 20:52:33 +00:00
|
|
|
req: HttpRequest,
|
2023-10-06 14:57:54 +00:00
|
|
|
state: web::Data<AppState>,
|
2023-10-03 13:29:31 +00:00
|
|
|
) -> Result<HttpResponse, actix_web::Error> {
|
2023-10-18 11:59:01 +00:00
|
|
|
|
2023-10-11 20:52:33 +00:00
|
|
|
let token = match req.headers().get("Authorization") {
|
2023-10-16 14:44:19 +00:00
|
|
|
Some(val) => val.to_str().unwrap_or("").split(" ").last().unwrap_or(""),
|
2023-10-18 10:00:14 +00:00
|
|
|
None => match req.match_info().get("token") {
|
|
|
|
Some(val) => val,
|
2023-10-18 10:36:28 +00:00
|
|
|
None => match req.query_string().split('=').last() {
|
|
|
|
Some(val) => val,
|
|
|
|
None => return Err(ErrorUnauthorized("Unauthorized")),
|
|
|
|
},
|
2023-10-18 10:00:14 +00:00
|
|
|
},
|
2023-10-18 09:48:59 +00:00
|
|
|
};
|
|
|
|
|
2023-10-03 13:29:31 +00:00
|
|
|
let listener_id = data::get_auth_id(&token).await.map_err(|e| {
|
|
|
|
eprintln!("TOKEN check failed: {}", e);
|
|
|
|
ErrorUnauthorized("Unauthorized")
|
|
|
|
})?;
|
2023-10-03 10:43:21 +00:00
|
|
|
|
2023-10-06 14:57:54 +00:00
|
|
|
let mut con = state.redis.get_async_connection().await.map_err(|e| {
|
2023-10-03 13:29:31 +00:00
|
|
|
eprintln!("Failed to get async connection: {}", e);
|
|
|
|
ServerError("Internal Server Error")
|
|
|
|
})?;
|
2023-10-02 18:55:10 +00:00
|
|
|
|
2023-10-16 14:44:19 +00:00
|
|
|
con.sadd::<&str, &i32, usize>("authors-online", &listener_id)
|
|
|
|
.await
|
|
|
|
.map_err(|e| {
|
|
|
|
eprintln!("Failed to add author to online list: {}", e);
|
|
|
|
ServerError("Internal Server Error")
|
|
|
|
})?;
|
|
|
|
|
|
|
|
let chats: Vec<String> = con
|
|
|
|
.smembers::<String, Vec<String>>(format!("chats_by_author/{}", listener_id))
|
|
|
|
.await
|
|
|
|
.map_err(|e| {
|
|
|
|
eprintln!("Failed to get chats by author: {}", e);
|
|
|
|
ServerError("Internal Server Error")
|
|
|
|
})?;
|
2023-09-28 10:12:07 +00:00
|
|
|
|
2023-10-18 11:29:50 +00:00
|
|
|
let (tx, rx) = broadcast::channel(100);
|
2023-10-06 15:07:24 +00:00
|
|
|
let state_clone = state.clone();
|
2023-10-06 14:57:54 +00:00
|
|
|
let handle = tokio::spawn(async move {
|
2023-10-06 15:07:24 +00:00
|
|
|
let conn = state_clone.redis.get_async_connection().await.unwrap();
|
2023-10-03 10:43:21 +00:00
|
|
|
let mut pubsub = conn.into_pubsub();
|
2023-10-19 14:43:00 +00:00
|
|
|
let followers_channel = format!("follower:{}", listener_id);
|
2023-10-16 13:22:54 +00:00
|
|
|
pubsub.subscribe(followers_channel.clone()).await.unwrap();
|
2023-10-19 14:43:00 +00:00
|
|
|
println!("'{}' pubsub subscribed", followers_channel);
|
|
|
|
pubsub.subscribe("shout").await.unwrap();
|
|
|
|
println!("'shout' pubsub subscribed");
|
|
|
|
pubsub.subscribe("reaction").await.unwrap();
|
|
|
|
println!("'reaction' pubsub subscribed");
|
2023-10-03 13:29:31 +00:00
|
|
|
|
2023-10-03 10:43:21 +00:00
|
|
|
for chat_id in &chats {
|
|
|
|
let channel_name = format!("chat:{}", chat_id);
|
2023-10-03 13:29:31 +00:00
|
|
|
pubsub.subscribe(&channel_name).await.unwrap();
|
2023-10-06 12:10:17 +00:00
|
|
|
println!("'{}' subscribed", channel_name);
|
2023-10-03 05:02:11 +00:00
|
|
|
}
|
|
|
|
|
2023-10-03 10:43:21 +00:00
|
|
|
while let Some(msg) = pubsub.on_message().next().await {
|
2023-10-19 14:43:00 +00:00
|
|
|
let redis_message_str: String = msg.get_payload().unwrap();
|
|
|
|
let redis_message_data: RedisMessageData = serde_json::from_str(&redis_message_str).unwrap();
|
|
|
|
let prepared_message_data = SSEMessageData {
|
|
|
|
payload: redis_message_data.payload,
|
|
|
|
action: redis_message_data.action,
|
|
|
|
entity: msg.get_channel_name()
|
|
|
|
.to_owned()
|
|
|
|
.split(":")
|
|
|
|
.next()
|
|
|
|
.unwrap_or("")
|
|
|
|
.to_string()
|
|
|
|
};
|
|
|
|
if data::is_fitting(
|
2023-10-16 14:44:19 +00:00
|
|
|
listener_id,
|
2023-10-19 14:43:00 +00:00
|
|
|
prepared_message_data.clone(),
|
2023-10-16 14:44:19 +00:00
|
|
|
)
|
|
|
|
.await
|
|
|
|
.is_ok()
|
|
|
|
{
|
2023-10-19 14:43:00 +00:00
|
|
|
let prepared_message_str = serde_json::to_string(&prepared_message_data).unwrap();
|
|
|
|
let send_result = tx.send(prepared_message_str.clone());
|
2023-10-16 13:22:54 +00:00
|
|
|
if send_result.is_err() {
|
2023-10-16 14:44:19 +00:00
|
|
|
// remove author from online list
|
|
|
|
let _ = con
|
|
|
|
.srem::<&str, &i32, usize>("authors-online", &listener_id)
|
|
|
|
.await
|
|
|
|
.map_err(|e| {
|
|
|
|
eprintln!("Failed to remove author from online list: {}", e);
|
|
|
|
ServerError("Internal Server Error")
|
|
|
|
});
|
2023-10-16 13:22:54 +00:00
|
|
|
break;
|
2023-10-17 11:01:29 +00:00
|
|
|
} else {
|
2023-10-19 14:43:00 +00:00
|
|
|
println!("[handler] message handled {}", prepared_message_str);
|
2023-10-16 13:22:54 +00:00
|
|
|
}
|
2023-10-03 13:29:31 +00:00
|
|
|
};
|
2023-10-03 04:52:48 +00:00
|
|
|
}
|
2023-10-03 10:43:21 +00:00
|
|
|
});
|
2023-10-16 14:44:19 +00:00
|
|
|
state
|
|
|
|
.tasks
|
2023-10-06 14:57:54 +00:00
|
|
|
.lock()
|
|
|
|
.unwrap()
|
|
|
|
.insert(format!("{}", listener_id.clone()), handle);
|
2023-10-03 04:52:48 +00:00
|
|
|
|
2023-10-19 14:43:00 +00:00
|
|
|
let server_event_stream = futures::stream::unfold(rx, |mut rx| async {
|
|
|
|
let result = rx.recv().await;
|
|
|
|
match result {
|
|
|
|
Ok(server_event) => {
|
|
|
|
let message_data: SSEMessageData = serde_json::from_str(&server_event).unwrap();
|
|
|
|
let event_id = format!("{}", Uuid::new_v4()); // Generate a random UUID as the event ID
|
|
|
|
|
|
|
|
let formatted_server_event = format!(
|
2023-10-20 17:50:26 +00:00
|
|
|
"id: {}\ndata: {}\n\n",
|
2023-10-19 14:43:00 +00:00
|
|
|
event_id,
|
2023-10-20 17:52:26 +00:00
|
|
|
server_event
|
2023-10-19 14:43:00 +00:00
|
|
|
);
|
|
|
|
|
|
|
|
Some((Ok::<_, actix_web::Error>(Bytes::from(formatted_server_event)), rx))
|
|
|
|
},
|
|
|
|
Err(_) => None,
|
|
|
|
}
|
|
|
|
});
|
2023-10-02 15:27:55 +00:00
|
|
|
|
2023-10-03 13:29:31 +00:00
|
|
|
Ok(HttpResponse::Ok()
|
2023-10-02 11:24:45 +00:00
|
|
|
.append_header(("content-type", "text/event-stream"))
|
2023-10-03 13:29:31 +00:00
|
|
|
.streaming(server_event_stream))
|
2023-09-27 23:08:48 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#[actix_web::main]
|
|
|
|
async fn main() -> std::io::Result<()> {
|
2023-10-03 13:29:31 +00:00
|
|
|
let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| String::from("redis://127.0.0.1/"));
|
2023-10-06 12:10:54 +00:00
|
|
|
let client = redis::Client::open(redis_url.clone()).unwrap();
|
2023-10-06 14:57:54 +00:00
|
|
|
let tasks = Arc::new(Mutex::new(HashMap::new()));
|
|
|
|
let state = AppState {
|
|
|
|
tasks: tasks.clone(),
|
|
|
|
redis: client.clone(),
|
|
|
|
};
|
2023-10-11 20:52:33 +00:00
|
|
|
println!("Starting...");
|
2023-09-27 23:08:48 +00:00
|
|
|
HttpServer::new(move || {
|
|
|
|
App::new()
|
2023-10-11 20:03:12 +00:00
|
|
|
.wrap(Logger::default())
|
2023-10-06 14:57:54 +00:00
|
|
|
.app_data(web::Data::new(state.clone()))
|
2023-10-18 10:00:14 +00:00
|
|
|
.route("/", web::get().to(connect_handler))
|
2023-10-18 09:48:59 +00:00
|
|
|
.route("/{token}", web::get().to(connect_handler))
|
2023-09-27 23:08:48 +00:00
|
|
|
})
|
2023-10-12 12:02:47 +00:00
|
|
|
.bind("0.0.0.0:8080")?
|
2023-09-27 23:08:48 +00:00
|
|
|
.run()
|
|
|
|
.await
|
2023-10-16 14:44:19 +00:00
|
|
|
}
|