use actix_web::error::ErrorInternalServerError; use aws_config::BehaviorVersion; use aws_sdk_s3::{config::Credentials, Client as S3Client}; use redis::{aio::MultiplexedConnection, AsyncCommands, Client as RedisClient}; use std::{env, time::Duration}; use tokio::time::interval; use std::collections::HashMap; use crate::s3_utils::check_file_exists; #[derive(Clone)] pub struct AppState { pub redis: MultiplexedConnection, pub storj_client: S3Client, pub storj_bucket: String, pub aws_client: S3Client, pub aws_bucket: String, } // const FILE_LIST_CACHE_KEY: &str = "s3_file_list_cache"; // Ключ для хранения списка файлов в Redis const PATH_MAPPING_KEY: &str = "filepath_mapping"; // Ключ для хранения маппинга путей const CHECK_INTERVAL_SECONDS: u64 = 60 * 60; // Интервал обновления списка файлов: 1 час const WEEK_SECONDS: u64 = 604800; impl AppState { /// Инициализация нового состояния приложения. pub async fn new() -> Self { // Получаем конфигурацию для Redis let redis_url = env::var("REDIS_URL").expect("REDIS_URL must be set"); let redis_client = RedisClient::open(redis_url).expect("Invalid Redis URL"); let redis_connection = redis_client .get_multiplexed_async_connection() .await .unwrap(); // Получаем конфигурацию для S3 (Storj) let s3_access_key = env::var("STORJ_ACCESS_KEY").expect("STORJ_ACCESS_KEY must be set"); let s3_secret_key = env::var("STORJ_SECRET_KEY").expect("STORJ_SECRET_KEY must be set"); let s3_endpoint = env::var("STORJ_END_POINT") .unwrap_or_else(|_| "https://gateway.storjshare.io".to_string()); let storj_bucket = env::var("STORJ_BUCKET_NAME").unwrap_or_else(|_| "discours-io".to_string()); // Получаем конфигурацию для AWS S3 let aws_access_key = env::var("AWS_ACCESS_KEY").expect("AWS_ACCESS_KEY must be set"); let aws_secret_key = env::var("AWS_SECRET_KEY").expect("AWS_SECRET_KEY must be set"); let aws_endpoint = env::var("AWS_END_POINT").unwrap_or_else(|_| "https://s3.amazonaws.com".to_string()); let aws_bucket = env::var("AWS_BUCKET_NAME").unwrap_or_else(|_| "discours-io".to_string()); // Конфигурируем клиент S3 для Storj let storj_config = aws_config::defaults(BehaviorVersion::latest()) .region("eu-west-1") .endpoint_url(s3_endpoint) .credentials_provider(Credentials::new( s3_access_key, s3_secret_key, None, None, "rust-storj-client", )) .load() .await; let storj_client = S3Client::new(&storj_config); // Конфигурируем клиент S3 для AWS let aws_config = aws_config::defaults(BehaviorVersion::latest()) .region("eu-west-1") .endpoint_url(aws_endpoint) .credentials_provider(Credentials::new( aws_access_key, aws_secret_key, None, None, "rust-aws-client", )) .load() .await; let aws_client = S3Client::new(&aws_config); let app_state = AppState { redis: redis_connection, storj_client, storj_bucket, aws_client, aws_bucket, }; // Кэшируем список файлов из Storj S3 при старте приложения app_state.cache_storj_filelist().await; app_state } /// Кэширует список файлов из Storj S3 в Redis. pub async fn cache_storj_filelist(&self) { let mut redis = self.redis.clone(); // Запрашиваем список файлов из Storj S3 let list_objects_v2 = self.storj_client.list_objects_v2(); let list_response = list_objects_v2 .bucket(&self.storj_bucket) .send() .await .expect("Failed to list files from Storj"); if let Some(objects) = list_response.contents { // Формируем список файлов без дубликатов по имени файла (без расширения) for object in objects.iter() { if let Some(storj_objkey) = &object.key { let filekey = storj_objkey.split('.') .chain(std::iter::once("")) // Ensure the chain doesn't break on empty strings .filter(|s| !s.is_empty()) // Filter out any empty strings .map(|s| s.split('/')) // Map to Split iterator .nth(0) // Get the first non-empty split result or default to &"" .and_then(|s| s.last()) // Call last() on the resulting iterator if it exists, otherwise None .unwrap_or(&""); // Сохраняем список файлов в Redis, используя HSET для каждого файла let _: () = redis .hset(PATH_MAPPING_KEY, filekey, storj_objkey) .await .expect("Failed to cache file in Redis"); } } } } /// Получает кэшированный список файлов из Redis. pub async fn get_cached_file_list(&self) -> Vec { let mut redis = self.redis.clone(); // Пытаемся получить кэшированный список из Redis let cached_list: HashMap = redis.hgetall(PATH_MAPPING_KEY).await.unwrap_or_default(); // Преобразуем HashMap в Vec, используя значения (пути файлов) cached_list.into_values().collect() } /// Периодически обновляет кэшированный список файлов из Storj S3. pub async fn refresh_file_list_periodically(&self) { let mut interval = interval(Duration::from_secs(CHECK_INTERVAL_SECONDS)); loop { interval.tick().await; self.cache_storj_filelist().await; } } /// Сохраняет маппинг старого пути из AWS S3 на новый путь в Storj S3. async fn save_aws2storj_mapping( &self, aws_filekey: &str, storj_filekey: &str, ) -> Result<(), actix_web::Error> { let mut redis = self.redis.clone(); // Храним маппинг в формате Hash: old_path -> new_path redis .hset::<_, &str, &str, ()>(PATH_MAPPING_KEY, aws_filekey, storj_filekey) .await .map_err(|_| ErrorInternalServerError("Failed to save path mapping in Redis"))?; // println!("[ok] {}", storj_filekey); Ok(()) } /// Получает путь в хранилище из ключа (имени файла) в Redis. pub async fn get_path(&self, file_key: &str) -> Result, actix_web::Error> { let mut redis = self.redis.clone(); let new_path: Option = redis .hget(PATH_MAPPING_KEY, file_key) .await .map_err(|_| ErrorInternalServerError("Failed to get path mapping from Redis"))?; Ok(new_path) } /// Обновляет Storj S3 данными из Amazon S3 pub async fn cache_aws_filelist(&self) { // Получаем список объектов из AWS S3 let list_objects_v2 = self.aws_client.list_objects_v2(); match list_objects_v2.bucket(&self.aws_bucket).send().await { Ok(list_response) => { // Перебор списка файлов if let Some(objects) = list_response.contents { for object in objects { if let Some(key) = object.key { // Получаем имя файла с расширением let parts: Vec<&str> = key.split('.').collect(); let storj_filekey = parts.first().and_then(|s| s.split('/').last()).unwrap_or(parts.first().unwrap()); if storj_filekey.is_empty() && !storj_filekey.ends_with("/") { eprint!("[ERROR] empty filename: {}\n", key); } else { // Проверяем, существует ли файл на Storj S3 match check_file_exists(&self.storj_client, &self.storj_bucket, &storj_filekey).await { Ok(false) => { // Сохраняем маппинг пути if let Err(e) = self.save_aws2storj_mapping(&key, &storj_filekey).await { eprintln!("[ERROR] save {}: {:?}", key, e); } else { println!("[ok] {}", key); } } Ok(true) => { println!("[skip] {}", storj_filekey); } Err(e) => { eprintln!( "[ERROR] check {}: {:?}", storj_filekey, e ); } } } } } } else { println!("AWS S3 file list is empty."); } } Err(e) => { eprintln!("[ERROR] get AWS S3 file list: {:?}", e); } } } /// создает или получает текущее значение квоты пользователя pub async fn get_or_create_quota(&self, user_id: &str) -> Result { let mut redis = self.redis.clone(); let quota_key = format!("quota:{}", user_id); // Попытка получить квоту из Redis let quota: u64 = redis.get("a_key).await.unwrap_or(0); if quota == 0 { // Если квота не найдена, устанавливаем её в 0 байт и задаем TTL на одну неделю redis .set_ex::<&str, u64, ()>("a_key, 0, WEEK_SECONDS) .await .map_err(|_| { ErrorInternalServerError("Failed to set initial user quota in Redis") })?; Ok(0) // Возвращаем 0 как начальную квоту } else { Ok(quota) } } /// инкрементирует значение квоты пользователя в байтах pub async fn increment_uploaded_bytes( &self, user_id: &str, bytes: u64, ) -> Result { let mut redis = self.redis.clone(); let quota_key = format!("quota:{}", user_id); // Проверяем, существует ли ключ в Redis let exists: bool = redis.exists::<_, bool>("a_key).await.map_err(|_| { ErrorInternalServerError("Failed to check if user quota exists in Redis") })?; // Если ключ не существует, создаем его с начальным значением и устанавливаем TTL if !exists { redis .set_ex::<_, u64, ()>("a_key, bytes, WEEK_SECONDS) .await .map_err(|_| { ErrorInternalServerError("Failed to set initial user quota in Redis") })?; return Ok(bytes); } // Если ключ существует, инкрементируем его значение на заданное количество байт let new_quota: u64 = redis .incr::<_, u64, u64>("a_key, bytes) .await .map_err(|_| ErrorInternalServerError("Failed to increment user quota in Redis"))?; Ok(new_quota) } }