quoter/src/app_state.rs
Untone ad6623a1b8
Some checks failed
deploy / deploy (push) Failing after 5s
redis-types+spawn-blocking-fix
2024-09-23 16:32:54 +03:00

294 lines
12 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// app_state.rs
use actix_web::error::ErrorInternalServerError;
use aws_config::BehaviorVersion;
use aws_sdk_s3::{config::Credentials, Client as S3Client};
use redis::{aio::MultiplexedConnection, AsyncCommands, Client as RedisClient};
use std::{env, time::Duration};
use tokio::time::interval;
use crate::s3_utils::check_file_exists;
#[derive(Clone)]
pub struct AppState {
pub redis: MultiplexedConnection,
pub s3_client: S3Client,
pub s3_bucket: String,
pub aws_client: S3Client,
pub aws_bucket: String,
}
const FILE_LIST_CACHE_KEY: &str = "s3_file_list_cache"; // Ключ для хранения списка файлов в Redis
const PATH_MAPPING_KEY: &str = "path_mapping"; // Ключ для хранения маппинга путей
const CHECK_INTERVAL_SECONDS: u64 = 60 * 60; // Интервал обновления списка файлов: 1 час
const WEEK_SECONDS: u64 = 604800;
impl AppState {
/// Инициализация нового состояния приложения.
pub async fn new() -> Self {
// Получаем конфигурацию для Redis
let redis_url = env::var("REDIS_URL").expect("REDIS_URL must be set");
let redis_client = RedisClient::open(redis_url).expect("Invalid Redis URL");
let redis_connection = redis_client
.get_multiplexed_async_connection()
.await
.unwrap();
// Получаем конфигурацию для S3 (Storj)
let s3_access_key = env::var("STORJ_ACCESS_KEY").expect("STORJ_ACCESS_KEY must be set");
let s3_secret_key = env::var("STORJ_SECRET_KEY").expect("STORJ_SECRET_KEY must be set");
let s3_endpoint = env::var("STORJ_END_POINT")
.unwrap_or_else(|_| "https://gateway.storjshare.io".to_string());
let s3_bucket = env::var("STORJ_BUCKET_NAME").unwrap_or_else(|_| "discours-io".to_string());
// Получаем конфигурацию для AWS S3
let aws_access_key = env::var("AWS_ACCESS_KEY").expect("AWS_ACCESS_KEY must be set");
let aws_secret_key = env::var("AWS_SECRET_KEY").expect("AWS_SECRET_KEY must be set");
let aws_endpoint =
env::var("AWS_END_POINT").unwrap_or_else(|_| "https://s3.amazonaws.com".to_string());
let aws_bucket = env::var("AWS_BUCKET_NAME").unwrap_or_else(|_| "discours-io".to_string());
// Конфигурируем клиент S3 для Storj
let storj_config = aws_config::defaults(BehaviorVersion::latest())
.region("eu-west-1")
.endpoint_url(s3_endpoint)
.credentials_provider(Credentials::new(
s3_access_key,
s3_secret_key,
None,
None,
"rust-s3-client",
))
.load()
.await;
let s3_client = S3Client::new(&storj_config);
// Конфигурируем клиент S3 для AWS
let aws_config = aws_config::defaults(BehaviorVersion::latest())
.region("eu-west-1")
.endpoint_url(aws_endpoint)
.credentials_provider(Credentials::new(
aws_access_key,
aws_secret_key,
None,
None,
"rust-aws-client",
))
.load()
.await;
let aws_client = S3Client::new(&aws_config);
let app_state = AppState {
redis: redis_connection,
s3_client,
s3_bucket,
aws_client,
aws_bucket,
};
// Кэшируем список файлов из S3 при старте приложения
app_state.cache_file_list().await;
app_state
}
/// Кэширует список файлов из Storj S3 в Redis.
pub async fn cache_file_list(&self) {
let mut redis = self.redis.clone();
// Запрашиваем список файлов из Storj S3
let list_objects_v2 = self.s3_client.list_objects_v2();
let list_response = list_objects_v2
.bucket(&self.s3_bucket)
.send()
.await
.expect("Failed to list files from S3");
if let Some(objects) = list_response.contents {
// Формируем список файлов
let file_list: Vec<String> = objects
.iter()
.filter_map(|object| object.key.clone())
.collect();
// Сохраняем список файлов в Redis в формате JSON
let _: () = redis
.set(
FILE_LIST_CACHE_KEY,
serde_json::to_string(&file_list).unwrap(),
)
.await
.expect("Failed to cache file list in Redis");
}
}
/// Получает кэшированный список файлов из Redis.
pub async fn get_cached_file_list(&self) -> Vec<String> {
let mut redis = self.redis.clone();
// Пытаемся получить кэшированный список из Redis
let cached_list: Option<String> = redis.get(FILE_LIST_CACHE_KEY).await.unwrap_or(None);
if let Some(cached_list) = cached_list {
// Если список найден, возвращаем его в виде вектора строк
serde_json::from_str(&cached_list).unwrap_or_else(|_| vec![])
} else {
vec![]
}
}
/// Периодически обновляет кэшированный список файлов из Storj S3.
pub async fn refresh_file_list_periodically(&self) {
let mut interval = interval(Duration::from_secs(CHECK_INTERVAL_SECONDS));
loop {
interval.tick().await;
self.cache_file_list().await;
}
}
/// Сохраняет маппинг старого пути из AWS S3 на новый путь в Storj S3.
async fn save_path_by_filekey(
&self,
filekey: &str,
path: &str,
) -> Result<(), actix_web::Error> {
let mut redis = self.redis.clone();
// Храним маппинг в формате Hash: old_path -> new_path
redis
.hset::<_, &str, &str, ()>(PATH_MAPPING_KEY, filekey, path)
.await
.map_err(|_| ErrorInternalServerError("Failed to save path mapping in Redis"))?;
Ok(())
}
/// Получает путь в хранилище из ключа (имени файла) в Redis.
pub async fn get_path(&self, filekey: &str) -> Result<Option<String>, actix_web::Error> {
let mut redis = self.redis.clone();
let new_path: Option<String> = redis
.hget(PATH_MAPPING_KEY, filekey)
.await
.map_err(|_| ErrorInternalServerError("Failed to get path mapping from Redis"))?;
Ok(new_path)
}
/// Обновляет Storj S3 данными из Amazon S3
pub async fn update_filelist_from_aws(&self) {
// Получаем список объектов из AWS S3
let list_objects_v2 = self.aws_client.list_objects_v2();
match list_objects_v2.bucket(&self.aws_bucket).send().await {
Ok(list_response) => {
// Перебор списка файлов
if let Some(objects) = list_response.contents {
for object in objects {
if let Some(key) = object.key {
// Получаем имя файла с расширением
let filename_with_extension = key.split('/').last().unwrap();
// Убираем расширение файла
let filename = filename_with_extension
.rsplit_once('.')
.map(|(name, _ext)| name)
.unwrap_or(filename_with_extension); // Если расширение отсутствует, возвращаем оригинальное имя
// Проверяем, существует ли файл на Storj S3
match check_file_exists(&self.s3_client, &self.s3_bucket, filename)
.await
{
Ok(false) => {
// Сохраняем маппинг пути
if let Err(e) = self.save_path_by_filekey(filename, &key).await
{
eprintln!(
"Ошибка сохранения маппинга для файла {}: {:?}",
filename, e
);
} else {
println!(
"Маппинг для файла {} успешно сохранен.",
filename
);
}
}
Ok(true) => {
println!("Файл {} уже существует в Storj.", filename);
}
Err(e) => {
eprintln!(
"Ошибка при проверке файла {} на Storj: {:?}",
filename, e
);
}
}
}
}
} else {
println!("Список файлов в AWS S3 пуст.");
}
}
Err(e) => {
eprintln!("Не удалось получить список файлов из AWS S3: {:?}", e);
}
}
}
/// создает или получает текущее значение квоты пользователя
pub async fn get_or_create_quota(&self, user_id: &str) -> Result<u64, actix_web::Error> {
let mut redis = self.redis.clone();
let quota_key = format!("quota:{}", user_id);
// Попытка получить квоту из Redis
let quota: u64 = redis.get(&quota_key).await.unwrap_or(0);
if quota == 0 {
// Если квота не найдена, устанавливаем её в 0 байт и задаем TTL на одну неделю
redis
.set_ex::<&str, u64, ()>(&quota_key, 0, WEEK_SECONDS)
.await
.map_err(|_| {
ErrorInternalServerError("Failed to set initial user quota in Redis")
})?;
Ok(0) // Возвращаем 0 как начальную квоту
} else {
Ok(quota)
}
}
/// инкрементирует значение квоты пользователя в байтах
pub async fn increment_uploaded_bytes(
&self,
user_id: &str,
bytes: u64,
) -> Result<u64, actix_web::Error> {
let mut redis = self.redis.clone();
let quota_key = format!("quota:{}", user_id);
// Проверяем, существует ли ключ в Redis
let exists: bool = redis.exists::<_, bool>(&quota_key).await.map_err(|_| {
ErrorInternalServerError("Failed to check if user quota exists in Redis")
})?;
// Если ключ не существует, создаем его с начальным значением и устанавливаем TTL
if !exists {
redis
.set_ex::<_, u64, ()>(&quota_key, bytes, WEEK_SECONDS)
.await
.map_err(|_| {
ErrorInternalServerError("Failed to set initial user quota in Redis")
})?;
return Ok(bytes);
}
// Если ключ существует, инкрементируем его значение на заданное количество байт
let new_quota: u64 = redis
.incr::<_, u64, u64>(&quota_key, bytes)
.await
.map_err(|_| ErrorInternalServerError("Failed to increment user quota in Redis"))?;
Ok(new_quota)
}
}