Files
quoter/src/app_state.rs

371 lines
16 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use crate::s3_utils::get_s3_filelist;
use crate::security::SecurityConfig;
use actix_web::error::ErrorInternalServerError;
use aws_config::BehaviorVersion;
use aws_sdk_s3::{Client as S3Client, config::Credentials};
use log::warn;
use redis::{AsyncCommands, Client as RedisClient, aio::MultiplexedConnection};
use std::{env, time::Duration};
#[derive(Clone)]
pub struct AppState {
pub redis: Option<MultiplexedConnection>,
pub storj_client: S3Client,
pub aws_client: S3Client,
pub bucket: String,
pub request_timeout: Duration,
}
const PATH_MAPPING_KEY: &str = "filepath_mapping"; // Ключ для хранения маппинга путей
// Убираем TTL для квоты - она должна быть постоянной на пользователя
impl AppState {
/// Инициализация нового состояния приложения.
pub async fn new() -> Self {
let security_config = SecurityConfig::default();
Self::new_with_config(security_config).await
}
/// Инициализация с кастомной конфигурацией безопасности и исправленным URL.
async fn new_with_config_and_url(security_config: SecurityConfig, redis_url: String) -> Self {
log::warn!("🔧 Creating AppState with corrected Redis URL");
Self::create_app_state_with_redis_url(security_config, redis_url).await
}
/// Инициализация с кастомной конфигурацией безопасности.
pub async fn new_with_config(security_config: SecurityConfig) -> Self {
log::warn!("🚀 Starting AppState initialization...");
// Получаем конфигурацию для Redis с таймаутом
log::warn!("📋 Getting REDIS_URL from environment...");
let redis_url = match env::var("REDIS_URL") {
Ok(url) => {
log::warn!("✅ REDIS_URL found in environment");
url
}
Err(e) => {
log::error!("❌ REDIS_URL not found: {}", e);
panic!("REDIS_URL must be set: {}", e);
}
};
// Детальное логирование для отладки
log::warn!("🔗 Redis URL: {}", redis_url.replace(&redis_url.split('@').nth(0).unwrap_or(""), "***"));
// Парсим URL для детального анализа
log::warn!("🔍 Parsing Redis URL...");
match url::Url::parse(&redis_url) {
Ok(parsed_url) => {
log::warn!("✅ Redis URL parsed successfully");
log::warn!(" Host: {}", parsed_url.host_str().unwrap_or("none"));
log::warn!(" Port: {}", parsed_url.port().unwrap_or(0));
let username = parsed_url.username();
let password = parsed_url.password();
// Определяем правильное имя пользователя
let effective_username = if username.is_empty() && password.is_some() {
"redis" // Дефолтное имя пользователя для Redis
} else {
username
};
log::warn!(" Username: '{}' (effective: '{}')", username, effective_username);
log::warn!(" Password: {}", if password.is_some() { "***" } else { "none" });
// Если нужно, исправляем URL с правильным именем пользователя
if username.is_empty() && password.is_some() {
let corrected_url = redis_url.replace("redis://:", &format!("redis://{}:", effective_username));
log::warn!("🔧 Using corrected Redis URL: {}", corrected_url.replace(&corrected_url.split('@').nth(0).unwrap_or(""), "***"));
return Self::new_with_config_and_url(security_config, corrected_url).await;
}
}
Err(e) => {
log::error!("❌ Failed to parse Redis URL: {}", e);
panic!("Invalid Redis URL: {}", e);
}
}
Self::create_app_state_with_redis_url(security_config, redis_url).await
}
/// Создает AppState с указанным Redis URL.
async fn create_app_state_with_redis_url(security_config: SecurityConfig, redis_url: String) -> Self {
let redis_client = match RedisClient::open(redis_url) {
Ok(client) => {
log::warn!("✅ Redis client created successfully");
client
}
Err(e) => {
log::error!("❌ Failed to create Redis client: {}", e);
panic!("Redis client creation failed: {}", e);
}
};
// Устанавливаем таймаут для Redis операций с graceful fallback
log::warn!("🔄 Attempting Redis connection with timeout: {}s", security_config.request_timeout_seconds);
let redis_connection = match tokio::time::timeout(
Duration::from_secs(security_config.request_timeout_seconds),
redis_client.get_multiplexed_async_connection(),
)
.await
{
Ok(Ok(mut conn)) => {
log::warn!("✅ Redis connection established");
// Тестируем подключение простой командой
match tokio::time::timeout(
Duration::from_secs(2),
conn.ping::<String>()
).await {
Ok(Ok(result)) => {
log::warn!("✅ Redis PING successful: {}", result);
Some(conn)
}
Ok(Err(e)) => {
log::warn!("⚠️ Redis PING failed: {}", e);
None
}
Err(_) => {
log::warn!("⚠️ Redis PING timeout");
None
}
}
}
Ok(Err(e)) => {
log::warn!("⚠️ Redis connection failed: {}", e);
log::warn!(" Error type: {:?}", e.kind());
log::warn!("⚠️ Running in fallback mode without Redis (quotas disabled)");
None
}
Err(_) => {
log::warn!("⚠️ Redis connection timeout after {} seconds", security_config.request_timeout_seconds);
log::warn!("⚠️ Running in fallback mode without Redis (quotas disabled)");
None
}
};
// Получаем конфигурацию для S3 (Storj)
let s3_access_key = env::var("STORJ_ACCESS_KEY").expect("STORJ_ACCESS_KEY must be set");
let s3_secret_key = env::var("STORJ_SECRET_KEY").expect("STORJ_SECRET_KEY must be set");
let s3_endpoint = env::var("STORJ_END_POINT")
.unwrap_or_else(|_| "https://gateway.storjshare.io".to_string());
let bucket = env::var("STORJ_BUCKET_NAME").unwrap_or_else(|_| "discours-io".to_string());
// Получаем конфигурацию для AWS S3
let aws_access_key = env::var("AWS_ACCESS_KEY").expect("AWS_ACCESS_KEY must be set");
let aws_secret_key = env::var("AWS_SECRET_KEY").expect("AWS_SECRET_KEY must be set");
let aws_endpoint =
env::var("AWS_END_POINT").unwrap_or_else(|_| "https://s3.amazonaws.com".to_string());
// Конфигурируем клиент S3 для Storj с таймаутом
let storj_config = aws_config::defaults(BehaviorVersion::latest())
.region("eu-west-1")
.endpoint_url(s3_endpoint)
.credentials_provider(Credentials::new(
s3_access_key,
s3_secret_key,
None,
None,
"rust-storj-client",
))
.timeout_config(
aws_config::timeout::TimeoutConfig::builder()
.operation_timeout(Duration::from_secs(security_config.request_timeout_seconds))
.operation_attempt_timeout(Duration::from_secs(
security_config.request_timeout_seconds / 2,
))
.build(),
)
.load()
.await;
let storj_client = S3Client::new(&storj_config);
// Конфигурируем клиент S3 для AWS с таймаутом
let aws_config = aws_config::defaults(BehaviorVersion::latest())
.region("eu-west-1")
.endpoint_url(aws_endpoint)
.credentials_provider(Credentials::new(
aws_access_key,
aws_secret_key,
None,
None,
"rust-aws-client",
))
.timeout_config(
aws_config::timeout::TimeoutConfig::builder()
.operation_timeout(Duration::from_secs(security_config.request_timeout_seconds))
.operation_attempt_timeout(Duration::from_secs(
security_config.request_timeout_seconds / 2,
))
.build(),
)
.load()
.await;
let aws_client = S3Client::new(&aws_config);
let app_state = AppState {
redis: redis_connection,
storj_client,
aws_client,
bucket,
request_timeout: Duration::from_secs(security_config.request_timeout_seconds),
};
// Кэшируем список файлов из AWS при старте приложения
app_state.cache_filelist().await;
app_state
}
/// Кэширует список файлов из Storj S3 в Redis.
pub async fn cache_filelist(&self) {
warn!("caching AWS filelist...");
// Проверяем доступность Redis
let Some(mut redis) = self.redis.clone() else {
warn!("⚠️ Redis not available, skipping filelist caching");
return;
};
// Запрашиваем список файлов из Storj S3
let filelist = get_s3_filelist(&self.aws_client, &self.bucket).await;
for [filename, filepath] in filelist.clone() {
// Сохраняем список файлов в Redis, используя HSET для каждого файла
if let Err(e) = tokio::time::timeout(
self.request_timeout,
redis.hset::<_, _, _, ()>(PATH_MAPPING_KEY, filename.clone(), filepath),
)
.await
{
warn!("⚠️ Redis operation failed: {}", e);
break;
}
}
warn!("cached {} files", filelist.len());
}
/// Получает путь из ключа (имени файла) в Redis с таймаутом.
pub async fn get_path(&self, filename: &str) -> Result<Option<String>, actix_web::Error> {
let Some(mut redis) = self.redis.clone() else {
warn!("⚠️ Redis not available, returning None for path lookup");
return Ok(None);
};
let new_path: Option<String> =
tokio::time::timeout(self.request_timeout, redis.hget(PATH_MAPPING_KEY, filename))
.await
.map_err(|_| ErrorInternalServerError("Redis operation timeout"))?
.map_err(|_| ErrorInternalServerError("Failed to get path mapping from Redis"))?;
Ok(new_path)
}
pub async fn set_path(&self, filename: &str, filepath: &str) {
let Some(mut redis) = self.redis.clone() else {
warn!(
"⚠️ Redis not available, skipping path caching for {}",
filename
);
return;
};
if let Err(e) = tokio::time::timeout(
self.request_timeout,
redis.hset::<_, _, _, ()>(PATH_MAPPING_KEY, filename, filepath),
)
.await
{
warn!("⚠️ Redis operation failed for {}: {}", filename, e);
}
}
/// создает или получает текущее значение квоты пользователя с таймаутом
pub async fn get_or_create_quota(&self, user_id: &str) -> Result<u64, actix_web::Error> {
let Some(mut redis) = self.redis.clone() else {
warn!(
"⚠️ Redis not available, returning default quota for user {}",
user_id
);
return Ok(0); // Возвращаем 0 как fallback
};
let quota_key = format!("quota:{}", user_id);
// Попытка получить квоту из Redis с таймаутом
let quota: u64 = tokio::time::timeout(self.request_timeout, redis.get(&quota_key))
.await
.map_err(|_| ErrorInternalServerError("Redis timeout getting user quota"))?
.unwrap_or(0);
if quota == 0 {
// Если квота не найдена, устанавливаем её в 0 байт без TTL с таймаутом
tokio::time::timeout(
self.request_timeout,
redis.set::<&str, u64, ()>(&quota_key, 0),
)
.await
.map_err(|_| ErrorInternalServerError("Redis timeout setting user quota"))?
.map_err(|_| ErrorInternalServerError("Failed to set initial user quota in Redis"))?;
Ok(0) // Возвращаем 0 как начальную квоту
} else {
Ok(quota)
}
}
/// инкрементирует значение квоты пользователя в байтах с таймаутом
pub async fn increment_uploaded_bytes(
&self,
user_id: &str,
bytes: u64,
) -> Result<u64, actix_web::Error> {
let Some(mut redis) = self.redis.clone() else {
warn!(
"⚠️ Redis not available, skipping quota increment for user {}",
user_id
);
return Ok(0); // Возвращаем 0 как fallback
};
let quota_key = format!("quota:{}", user_id);
// Проверяем, существует ли ключ в Redis с таймаутом
let exists: bool =
tokio::time::timeout(self.request_timeout, redis.exists::<_, bool>(&quota_key))
.await
.map_err(|_| {
ErrorInternalServerError("Redis timeout checking user quota existence")
})?
.map_err(|_| {
ErrorInternalServerError("Failed to check if user quota exists in Redis")
})?;
// Если ключ не существует, создаем его с начальным значением без TTL
if !exists {
tokio::time::timeout(
self.request_timeout,
redis.set::<_, u64, ()>(&quota_key, bytes),
)
.await
.map_err(|_| ErrorInternalServerError("Redis timeout setting initial user quota"))?
.map_err(|_| ErrorInternalServerError("Failed to set initial user quota in Redis"))?;
return Ok(bytes);
}
// Если ключ существует, инкрементируем его значение на заданное количество байт
let new_quota: u64 = tokio::time::timeout(
self.request_timeout,
redis.incr::<_, u64, u64>(&quota_key, bytes),
)
.await
.map_err(|_| ErrorInternalServerError("Redis timeout incrementing user quota"))?
.map_err(|_| ErrorInternalServerError("Failed to increment user quota in Redis"))?;
Ok(new_quota)
}
}