// app_state.rs use actix_web::error::ErrorInternalServerError; use aws_config::BehaviorVersion; use aws_sdk_s3::{config::Credentials, Client as S3Client}; use redis::{aio::MultiplexedConnection, AsyncCommands, Client as RedisClient}; use std::{env, time::Duration}; use tokio::time::interval; use crate::s3_utils::check_file_exists; #[derive(Clone)] pub struct AppState { pub redis: MultiplexedConnection, pub s3_client: S3Client, pub s3_bucket: String, pub aws_client: S3Client, pub aws_bucket: String, } const FILE_LIST_CACHE_KEY: &str = "s3_file_list_cache"; // Ключ для хранения списка файлов в Redis const PATH_MAPPING_KEY: &str = "path_mapping"; // Ключ для хранения маппинга путей const CHECK_INTERVAL_SECONDS: u64 = 60 * 60; // Интервал обновления списка файлов: 1 час const WEEK_SECONDS: u64 = 604800; impl AppState { /// Инициализация нового состояния приложения. pub async fn new() -> Self { // Получаем конфигурацию для Redis let redis_url = env::var("REDIS_URL").expect("REDIS_URL must be set"); let redis_client = RedisClient::open(redis_url).expect("Invalid Redis URL"); let redis_connection = redis_client .get_multiplexed_async_connection() .await .unwrap(); // Получаем конфигурацию для S3 (Storj) let s3_access_key = env::var("STORJ_ACCESS_KEY").expect("STORJ_ACCESS_KEY must be set"); let s3_secret_key = env::var("STORJ_SECRET_KEY").expect("STORJ_SECRET_KEY must be set"); let s3_endpoint = env::var("STORJ_END_POINT").unwrap_or_else(|_| "https://gateway.storjshare.io".to_string()); let s3_bucket = env::var("STORJ_BUCKET_NAME").unwrap_or_else(|_| "discours-io".to_string()); // Получаем конфигурацию для AWS S3 let aws_access_key = env::var("AWS_ACCESS_KEY").expect("AWS_ACCESS_KEY must be set"); let aws_secret_key = env::var("AWS_SECRET_KEY").expect("AWS_SECRET_KEY must be set"); let aws_endpoint = env::var("AWS_END_POINT").unwrap_or_else(|_| "https://s3.amazonaws.com".to_string()); let aws_bucket = env::var("AWS_BUCKET_NAME").unwrap_or_else(|_| "discours-io".to_string()); // Конфигурируем клиент S3 для Storj let storj_config = aws_config::defaults(BehaviorVersion::latest()) .region("eu-west-1") .endpoint_url(s3_endpoint) .credentials_provider(Credentials::new( s3_access_key, s3_secret_key, None, None, "rust-s3-client", )) .load() .await; let s3_client = S3Client::new(&storj_config); // Конфигурируем клиент S3 для AWS let aws_config = aws_config::defaults(BehaviorVersion::latest()) .region("eu-west-1") .endpoint_url(aws_endpoint) .credentials_provider(Credentials::new( aws_access_key, aws_secret_key, None, None, "rust-aws-client", )) .load() .await; let aws_client = S3Client::new(&aws_config); let app_state = AppState { redis: redis_connection, s3_client, s3_bucket, aws_client, aws_bucket, }; // Кэшируем список файлов из S3 при старте приложения app_state.cache_file_list().await; app_state } /// Кэширует список файлов из Storj S3 в Redis. pub async fn cache_file_list(&self) { let mut redis = self.redis.clone(); // Запрашиваем список файлов из Storj S3 let list_objects_v2 = self.s3_client.list_objects_v2(); let list_response = list_objects_v2 .bucket(&self.s3_bucket) .send() .await .expect("Failed to list files from S3"); if let Some(objects) = list_response.contents { // Формируем список файлов let file_list: Vec = objects .iter() .filter_map(|object| object.key.clone()) .collect(); // Сохраняем список файлов в Redis в формате JSON let _: () = redis .set( FILE_LIST_CACHE_KEY, serde_json::to_string(&file_list).unwrap(), ) .await .expect("Failed to cache file list in Redis"); } } /// Получает кэшированный список файлов из Redis. pub async fn get_cached_file_list(&self) -> Vec { let mut redis = self.redis.clone(); // Пытаемся получить кэшированный список из Redis let cached_list: Option = redis.get(FILE_LIST_CACHE_KEY).await.unwrap_or(None); if let Some(cached_list) = cached_list { // Если список найден, возвращаем его в виде вектора строк serde_json::from_str(&cached_list).unwrap_or_else(|_| vec![]) } else { vec![] } } /// Периодически обновляет кэшированный список файлов из Storj S3. pub async fn refresh_file_list_periodically(&self) { let mut interval = interval(Duration::from_secs(CHECK_INTERVAL_SECONDS)); loop { interval.tick().await; self.cache_file_list().await; } } /// Сохраняет маппинг старого пути из AWS S3 на новый путь в Storj S3. async fn save_path_by_filekey( &self, filekey: &str, path: &str, ) -> Result<(), actix_web::Error> { let mut redis = self.redis.clone(); // Храним маппинг в формате Hash: old_path -> new_path redis .hset(PATH_MAPPING_KEY, filekey, path) .await .map_err(|_| ErrorInternalServerError("Failed to save path mapping in Redis"))?; Ok(()) } /// Получает путь в хранилище из ключа (имени файла) в Redis. pub async fn get_path(&self, filekey: &str) -> Result, actix_web::Error> { let mut redis = self.redis.clone(); let new_path: Option = redis .hget(PATH_MAPPING_KEY, filekey) .await .map_err(|_| ErrorInternalServerError("Failed to get path mapping from Redis"))?; Ok(new_path) } /// Обновляет Storj S3 данными из Amazon S3 pub async fn update_filelist_from_aws(&self) { // Получаем список объектов из AWS S3 let list_objects_v2 = self.aws_client.list_objects_v2(); let list_response = list_objects_v2 .bucket(&self.aws_bucket) .send() .await .expect("Failed to list files from AWS S3"); // перебор списка файлов if let Some(objects) = list_response.contents { for object in objects { if let Some(key) = object.key { let filename_with_extension = key.split('/').last().unwrap(); // Убираем расширение файла let filename = filename_with_extension .rsplit_once('.') .map(|(name, _ext)| name) .unwrap_or(filename_with_extension); // Если расширение отсутствует, возвращаем оригинальное имя // Проверяем, существует ли файл на Storj S3 if !check_file_exists(&self.s3_client, &self.s3_bucket, filename) .await .unwrap_or(false) { // Сохраняем маппинг пути self.save_path_by_filekey(filename, &key).await.unwrap(); } } } } } pub async fn get_or_create_quota(&self, user_id: &str) -> Result { let mut redis = self.redis.clone(); let quota_key = format!("quota:{}", user_id); // Попытка получить квоту из Redis let quota: u64 = redis.get("a_key).await.unwrap_or(0); if quota == 0 { // Если квота не найдена, устанавливаем её в 0 байт и задаем TTL на одну неделю redis .set_ex("a_key, 0, WEEK_SECONDS) .await .map_err(|_| { ErrorInternalServerError("Failed to set initial user quota in Redis") })?; Ok(0) // Возвращаем 0 как начальную квоту } else { Ok(quota) } } pub async fn increment_uploaded_bytes( &self, user_id: &str, bytes: u64, ) -> Result { let mut redis = self.redis.clone(); let quota_key = format!("quota:{}", user_id); // Проверяем, существует ли ключ в Redis let exists: bool = redis.exists("a_key).await.map_err(|_| { ErrorInternalServerError("Failed to check if user quota exists in Redis") })?; // Если ключ не существует, создаем его с начальным значением и устанавливаем TTL if !exists { redis .set_ex("a_key, bytes, WEEK_SECONDS) .await .map_err(|_| { ErrorInternalServerError("Failed to set initial user quota in Redis") })?; return Ok(bytes); } // Если ключ существует, инкрементируем его значение на заданное количество байт let new_quota: u64 = redis .incr("a_key, bytes) .await .map_err(|_| ErrorInternalServerError("Failed to increment user quota in Redis"))?; Ok(new_quota) } }