Files
quoter/src/app_state.rs
Untone 3ff469c8a1
Some checks failed
Deploy on push / deploy (push) Failing after 4s
connection-pool-fix
2025-09-22 01:23:16 +03:00

562 lines
22 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use crate::s3_utils::get_s3_filelist;
use crate::security::SecurityConfig;
use crate::thumbnail::cleanup_cache;
use actix_web::error::ErrorInternalServerError;
use aws_config::BehaviorVersion;
use aws_sdk_s3::{Client as S3Client, config::Credentials};
use log::warn;
use redis::{AsyncCommands, Client as RedisClient, aio::MultiplexedConnection};
use std::{env, sync::Arc, time::Duration};
use tokio::sync::Mutex;
/// Redis Connection Pool
#[derive(Clone)]
pub struct RedisConnectionPool {
client: RedisClient,
max_connections: usize,
connections: Arc<Mutex<Vec<MultiplexedConnection>>>,
timeout: Duration,
}
impl RedisConnectionPool {
/// Создает новый connection pool
pub async fn new(
redis_url: String,
max_connections: usize,
timeout: Duration,
) -> Result<Self, redis::RedisError> {
let client = RedisClient::open(redis_url)?;
let connections = Arc::new(Mutex::new(Vec::new()));
let pool = Self {
client,
max_connections,
connections,
timeout,
};
// Предварительно создаем несколько соединений
pool.warm_up_connections(3).await?;
Ok(pool)
}
/// Предварительно создает соединения для пула
async fn warm_up_connections(&self, count: usize) -> Result<(), redis::RedisError> {
let mut connections = self.connections.lock().await;
for _ in 0..count.min(self.max_connections) {
match tokio::time::timeout(self.timeout, self.client.get_multiplexed_async_connection())
.await
{
Ok(Ok(conn)) => {
connections.push(conn);
}
Ok(Err(e)) => {
warn!("Failed to create Redis connection during warm-up: {}", e);
return Err(e);
}
Err(_) => {
warn!("Timeout creating Redis connection during warm-up");
return Err(redis::RedisError::from((
redis::ErrorKind::IoError,
"Connection timeout",
)));
}
}
}
warn!(
"✅ Redis connection pool warmed up with {} connections",
connections.len()
);
Ok(())
}
/// Получает соединение из пула
pub async fn get_connection(&self) -> Result<MultiplexedConnection, redis::RedisError> {
let mut connections = self.connections.lock().await;
// Пытаемся взять существующее соединение
if let Some(conn) = connections.pop() {
return Ok(conn);
}
// Если соединений нет, создаем новое
drop(connections); // Освобождаем мьютекс
match tokio::time::timeout(self.timeout, self.client.get_multiplexed_async_connection())
.await
{
Ok(Ok(conn)) => Ok(conn),
Ok(Err(e)) => {
warn!("Failed to create new Redis connection: {}", e);
Err(e)
}
Err(_) => {
warn!("Timeout creating new Redis connection");
Err(redis::RedisError::from((
redis::ErrorKind::IoError,
"Connection timeout",
)))
}
}
}
/// Возвращает соединение обратно в пул
pub async fn return_connection(&self, conn: MultiplexedConnection) {
let mut connections = self.connections.lock().await;
if connections.len() < self.max_connections {
connections.push(conn);
}
// Если пул полный, соединение просто закрывается
}
/// Проверяет здоровье пула
pub async fn health_check(&self) -> bool {
match self.get_connection().await {
Ok(mut conn) => {
match tokio::time::timeout(Duration::from_secs(2), conn.ping::<String>()).await {
Ok(Ok(_)) => {
self.return_connection(conn).await;
true
}
_ => false,
}
}
Err(_) => false,
}
}
/// Получает статистику пула
pub async fn get_stats(&self) -> (usize, usize) {
let connections = self.connections.lock().await;
(connections.len(), self.max_connections)
}
}
#[derive(Clone)]
pub struct AppState {
pub redis_pool: Option<RedisConnectionPool>,
pub storj_client: S3Client,
pub aws_client: S3Client,
pub bucket: String,
pub request_timeout: Duration,
}
const PATH_MAPPING_KEY: &str = "filepath_mapping"; // Ключ для хранения маппинга путей
// Убираем TTL для квоты - она должна быть постоянной на пользователя
impl AppState {
/// Инициализация нового состояния приложения.
pub async fn new() -> Self {
let security_config = SecurityConfig::default();
Self::new_with_config(security_config).await
}
/// Получает соединение из Redis connection pool
pub async fn get_redis_connection(&self) -> Result<MultiplexedConnection, redis::RedisError> {
if let Some(ref pool) = self.redis_pool {
pool.get_connection().await
} else {
Err(redis::RedisError::from((
redis::ErrorKind::IoError,
"Redis pool not available",
)))
}
}
/// Возвращает соединение обратно в пул
pub async fn return_redis_connection(&self, conn: MultiplexedConnection) {
if let Some(ref pool) = self.redis_pool {
pool.return_connection(conn).await;
}
}
/// Проверяет здоровье Redis connection pool
pub async fn redis_health_check(&self) -> bool {
if let Some(ref pool) = self.redis_pool {
pool.health_check().await
} else {
false
}
}
/// Получает статистику Redis connection pool
pub async fn redis_pool_stats(&self) -> Option<(usize, usize)> {
if let Some(ref pool) = self.redis_pool {
Some(pool.get_stats().await)
} else {
None
}
}
/// Инициализация с кастомной конфигурацией безопасности.
pub async fn new_with_config(security_config: SecurityConfig) -> Self {
log::warn!("🚀 Starting AppState initialization...");
// Получаем конфигурацию для Redis с таймаутом
log::warn!("📋 Getting REDIS_URL from environment...");
let redis_url = match env::var("REDIS_URL") {
Ok(url) => {
log::warn!("✅ REDIS_URL found in environment");
url
}
Err(e) => {
log::error!("❌ REDIS_URL not found: {}", e);
panic!("REDIS_URL must be set: {}", e);
}
};
// Детальное логирование для отладки
log::warn!(
"🔗 Redis URL: {}",
redis_url.replace(redis_url.split('@').nth(0).unwrap_or(""), "***")
);
// Парсим URL для детального анализа
log::warn!("🔍 Parsing Redis URL...");
let final_redis_url = match url::Url::parse(&redis_url) {
Ok(parsed_url) => {
log::warn!("✅ Redis URL parsed successfully");
log::warn!(" Host: {}", parsed_url.host_str().unwrap_or("none"));
log::warn!(" Port: {}", parsed_url.port().unwrap_or(0));
let username = parsed_url.username();
let password = parsed_url.password();
log::warn!(" Username: '{}'", username);
log::warn!(
" Password: {}",
if password.is_some() { "***" } else { "none" }
);
// Если username пустой и есть пароль, оставляем как есть
// Redis может работать только с паролем без username
if username.is_empty() && password.is_some() {
log::warn!(" 🔧 Using password-only authentication (no username)");
}
redis_url
}
Err(e) => {
log::error!("❌ Failed to parse Redis URL: {}", e);
panic!("Invalid Redis URL: {}", e);
}
};
// Используем исправленный URL
Self::create_app_state_with_redis_url(security_config, final_redis_url).await
}
/// Создает AppState с указанным Redis URL.
async fn create_app_state_with_redis_url(
security_config: SecurityConfig,
redis_url: String,
) -> Self {
// Устанавливаем таймаут для Redis операций с graceful fallback
log::warn!(
"🔄 Attempting Redis connection with timeout: {}s",
security_config.request_timeout_seconds
);
// Создаем Redis connection pool
let redis_pool = match RedisConnectionPool::new(
redis_url.clone(),
20, // max_connections согласно руководству
Duration::from_secs(security_config.request_timeout_seconds),
)
.await
{
Ok(pool) => {
log::warn!("✅ Redis connection pool created successfully");
Some(pool)
}
Err(e) => {
log::warn!("⚠️ Redis connection pool creation failed: {}", e);
log::warn!("⚠️ Running in fallback mode without Redis (quotas disabled)");
None
}
};
// Одиночное соединение больше не нужно - используем только connection pool
// Получаем конфигурацию для S3 (Storj)
let s3_access_key = env::var("STORJ_ACCESS_KEY").expect("STORJ_ACCESS_KEY must be set");
let s3_secret_key = env::var("STORJ_SECRET_KEY").expect("STORJ_SECRET_KEY must be set");
let s3_endpoint = env::var("STORJ_END_POINT")
.unwrap_or_else(|_| "https://gateway.storjshare.io".to_string());
let bucket = env::var("STORJ_BUCKET_NAME").unwrap_or_else(|_| "discours-io".to_string());
// Получаем конфигурацию для AWS S3
let aws_access_key = env::var("AWS_ACCESS_KEY").expect("AWS_ACCESS_KEY must be set");
let aws_secret_key = env::var("AWS_SECRET_KEY").expect("AWS_SECRET_KEY must be set");
let aws_endpoint =
env::var("AWS_END_POINT").unwrap_or_else(|_| "https://s3.amazonaws.com".to_string());
// Конфигурируем клиент S3 для Storj с таймаутом
let storj_config = aws_config::defaults(BehaviorVersion::latest())
.region("eu-west-1")
.endpoint_url(s3_endpoint)
.credentials_provider(Credentials::new(
s3_access_key,
s3_secret_key,
None,
None,
"rust-storj-client",
))
.timeout_config(
aws_config::timeout::TimeoutConfig::builder()
.operation_timeout(Duration::from_secs(security_config.request_timeout_seconds))
.operation_attempt_timeout(Duration::from_secs(
security_config.request_timeout_seconds / 2,
))
.build(),
)
.load()
.await;
let storj_client = S3Client::new(&storj_config);
// Конфигурируем клиент S3 для AWS с таймаутом
let aws_config = aws_config::defaults(BehaviorVersion::latest())
.region("eu-west-1")
.endpoint_url(aws_endpoint)
.credentials_provider(Credentials::new(
aws_access_key,
aws_secret_key,
None,
None,
"rust-aws-client",
))
.timeout_config(
aws_config::timeout::TimeoutConfig::builder()
.operation_timeout(Duration::from_secs(security_config.request_timeout_seconds))
.operation_attempt_timeout(Duration::from_secs(
security_config.request_timeout_seconds / 2,
))
.build(),
)
.load()
.await;
let aws_client = S3Client::new(&aws_config);
let app_state = AppState {
redis_pool,
storj_client,
aws_client,
bucket,
request_timeout: Duration::from_secs(security_config.request_timeout_seconds),
};
// Кэшируем список файлов из AWS при старте приложения
app_state.cache_filelist().await;
app_state
}
/// Кэширует список файлов из Storj S3 в Redis.
pub async fn cache_filelist(&self) {
warn!("caching AWS filelist...");
// Получаем соединение из пула
let mut redis = match self.get_redis_connection().await {
Ok(conn) => conn,
Err(_) => {
warn!("⚠️ Redis pool not available, skipping filelist caching");
return;
}
};
// Запрашиваем список файлов из Storj S3
let filelist = get_s3_filelist(&self.aws_client, &self.bucket).await;
for [filename, filepath] in filelist.clone() {
// Сохраняем список файлов в Redis, используя HSET для каждого файла
if let Err(e) = tokio::time::timeout(
self.request_timeout,
redis.hset::<_, _, _, ()>(PATH_MAPPING_KEY, filename.clone(), filepath),
)
.await
{
warn!("⚠️ Redis operation failed: {}", e);
break;
}
}
warn!("cached {} files", filelist.len());
// Возвращаем соединение в пул
self.return_redis_connection(redis).await;
}
/// Получает путь из ключа (имени файла) в Redis с таймаутом.
pub async fn get_path(&self, filename: &str) -> Result<Option<String>, actix_web::Error> {
let mut redis = match self.get_redis_connection().await {
Ok(conn) => conn,
Err(_) => {
warn!("⚠️ Redis pool not available, returning None for path lookup");
return Ok(None);
}
};
let result: Option<String> =
tokio::time::timeout(self.request_timeout, redis.hget(PATH_MAPPING_KEY, filename))
.await
.map_err(|_| ErrorInternalServerError("Redis operation timeout"))?
.map_err(|_| ErrorInternalServerError("Failed to get path mapping from Redis"))?;
// Возвращаем соединение в пул
self.return_redis_connection(redis).await;
Ok(result)
}
pub async fn set_path(&self, filename: &str, filepath: &str) {
let mut redis = match self.get_redis_connection().await {
Ok(conn) => conn,
Err(_) => {
warn!(
"⚠️ Redis pool not available, skipping path caching for {}",
filename
);
return;
}
};
if let Err(e) = tokio::time::timeout(
self.request_timeout,
redis.hset::<_, _, _, ()>(PATH_MAPPING_KEY, filename, filepath),
)
.await
{
warn!("⚠️ Redis operation failed for {}: {}", filename, e);
}
// Возвращаем соединение в пул
self.return_redis_connection(redis).await;
}
/// создает или получает текущее значение квоты пользователя с таймаутом
pub async fn get_or_create_quota(&self, user_id: &str) -> Result<u64, actix_web::Error> {
let mut redis = match self.get_redis_connection().await {
Ok(conn) => conn,
Err(_) => {
warn!(
"⚠️ Redis pool not available, returning default quota for user {}",
user_id
);
return Ok(0); // Возвращаем 0 как fallback
}
};
let quota_key = format!("quota:{}", user_id);
// Попытка получить квоту из Redis с таймаутом
let quota: u64 = tokio::time::timeout(self.request_timeout, redis.get(&quota_key))
.await
.map_err(|_| ErrorInternalServerError("Redis timeout getting user quota"))?
.unwrap_or(0);
if quota == 0 {
// Если квота не найдена, устанавливаем её в 0 байт без TTL с таймаутом
tokio::time::timeout(
self.request_timeout,
redis.set::<&str, u64, ()>(&quota_key, 0),
)
.await
.map_err(|_| ErrorInternalServerError("Redis timeout setting user quota"))?
.map_err(|_| ErrorInternalServerError("Failed to set initial user quota in Redis"))?;
// Возвращаем соединение в пул
self.return_redis_connection(redis).await;
Ok(0) // Возвращаем 0 как начальную квоту
} else {
// Возвращаем соединение в пул
self.return_redis_connection(redis).await;
Ok(quota)
}
}
/// инкрементирует значение квоты пользователя в байтах с таймаутом
pub async fn increment_uploaded_bytes(
&self,
user_id: &str,
bytes: u64,
) -> Result<u64, actix_web::Error> {
let mut redis = match self.get_redis_connection().await {
Ok(conn) => conn,
Err(_) => {
warn!(
"⚠️ Redis pool not available, skipping quota increment for user {}",
user_id
);
return Ok(0); // Возвращаем 0 как fallback
}
};
let quota_key = format!("quota:{}", user_id);
// Проверяем, существует ли ключ в Redis с таймаутом
let exists: bool =
tokio::time::timeout(self.request_timeout, redis.exists::<_, bool>(&quota_key))
.await
.map_err(|_| {
ErrorInternalServerError("Redis timeout checking user quota existence")
})?
.map_err(|_| {
ErrorInternalServerError("Failed to check if user quota exists in Redis")
})?;
// Если ключ не существует, создаем его с начальным значением без TTL
if !exists {
tokio::time::timeout(
self.request_timeout,
redis.set::<_, u64, ()>(&quota_key, bytes),
)
.await
.map_err(|_| ErrorInternalServerError("Redis timeout setting initial user quota"))?
.map_err(|_| ErrorInternalServerError("Failed to set initial user quota in Redis"))?;
// Возвращаем соединение в пул
self.return_redis_connection(redis).await;
return Ok(bytes);
}
// Если ключ существует, инкрементируем его значение на заданное количество байт
let new_quota: u64 = tokio::time::timeout(
self.request_timeout,
redis.incr::<_, u64, u64>(&quota_key, bytes),
)
.await
.map_err(|_| ErrorInternalServerError("Redis timeout incrementing user quota"))?
.map_err(|_| ErrorInternalServerError("Failed to increment user quota in Redis"))?;
// Возвращаем соединение в пул
self.return_redis_connection(redis).await;
Ok(new_quota)
}
/// Очищает старые файлы из локального кэша.
pub fn cleanup_local_cache(&self) -> Result<(), Box<dyn std::error::Error>> {
// Очищаем кэш старше 7 дней
cleanup_cache("/tmp/thumbnails", 7)?;
Ok(())
}
/// Запускает периодическую очистку кэша.
pub fn start_cache_cleanup_task(&self) {
let state = self.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(24 * 60 * 60)); // раз в день
loop {
interval.tick().await;
if let Err(e) = state.cleanup_local_cache() {
warn!("Failed to cleanup local cache: {}", e);
} else {
warn!("Local cache cleanup completed successfully");
}
}
});
}
}