redis-types+spawn-blocking-fix
Some checks failed
deploy / deploy (push) Failing after 5s

This commit is contained in:
Untone 2024-09-23 16:32:54 +03:00
parent 336e46268c
commit ad6623a1b8
5 changed files with 507 additions and 432 deletions

823
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -12,7 +12,7 @@ actix-web = "4.5.1"
reqwest = { version = "0.12.3", features = ["json"] } reqwest = { version = "0.12.3", features = ["json"] }
sentry = { version = "0.34.0", features = ["tokio"] } sentry = { version = "0.34.0", features = ["tokio"] }
uuid = { version = "1.8.0", features = ["v4"] } uuid = { version = "1.8.0", features = ["v4"] }
redis = { version = "0.26.1", features = ["tokio-comp"] } redis = { version = "0.27.2", features = ["tokio-comp"] }
tokio = { version = "1.37.0", features = ["full"] } tokio = { version = "1.37.0", features = ["full"] }
serde = { version = "1.0.209", features = ["derive"] } serde = { version = "1.0.209", features = ["derive"] }
sentry-actix = "0.34.0" sentry-actix = "0.34.0"

View File

@ -37,13 +37,15 @@ impl AppState {
// Получаем конфигурацию для S3 (Storj) // Получаем конфигурацию для S3 (Storj)
let s3_access_key = env::var("STORJ_ACCESS_KEY").expect("STORJ_ACCESS_KEY must be set"); let s3_access_key = env::var("STORJ_ACCESS_KEY").expect("STORJ_ACCESS_KEY must be set");
let s3_secret_key = env::var("STORJ_SECRET_KEY").expect("STORJ_SECRET_KEY must be set"); let s3_secret_key = env::var("STORJ_SECRET_KEY").expect("STORJ_SECRET_KEY must be set");
let s3_endpoint = env::var("STORJ_END_POINT").unwrap_or_else(|_| "https://gateway.storjshare.io".to_string()); let s3_endpoint = env::var("STORJ_END_POINT")
.unwrap_or_else(|_| "https://gateway.storjshare.io".to_string());
let s3_bucket = env::var("STORJ_BUCKET_NAME").unwrap_or_else(|_| "discours-io".to_string()); let s3_bucket = env::var("STORJ_BUCKET_NAME").unwrap_or_else(|_| "discours-io".to_string());
// Получаем конфигурацию для AWS S3 // Получаем конфигурацию для AWS S3
let aws_access_key = env::var("AWS_ACCESS_KEY").expect("AWS_ACCESS_KEY must be set"); let aws_access_key = env::var("AWS_ACCESS_KEY").expect("AWS_ACCESS_KEY must be set");
let aws_secret_key = env::var("AWS_SECRET_KEY").expect("AWS_SECRET_KEY must be set"); let aws_secret_key = env::var("AWS_SECRET_KEY").expect("AWS_SECRET_KEY must be set");
let aws_endpoint = env::var("AWS_END_POINT").unwrap_or_else(|_| "https://s3.amazonaws.com".to_string()); let aws_endpoint =
env::var("AWS_END_POINT").unwrap_or_else(|_| "https://s3.amazonaws.com".to_string());
let aws_bucket = env::var("AWS_BUCKET_NAME").unwrap_or_else(|_| "discours-io".to_string()); let aws_bucket = env::var("AWS_BUCKET_NAME").unwrap_or_else(|_| "discours-io".to_string());
// Конфигурируем клиент S3 для Storj // Конфигурируем клиент S3 для Storj
@ -155,7 +157,7 @@ impl AppState {
let mut redis = self.redis.clone(); let mut redis = self.redis.clone();
// Храним маппинг в формате Hash: old_path -> new_path // Храним маппинг в формате Hash: old_path -> new_path
redis redis
.hset(PATH_MAPPING_KEY, filekey, path) .hset::<_, &str, &str, ()>(PATH_MAPPING_KEY, filekey, path)
.await .await
.map_err(|_| ErrorInternalServerError("Failed to save path mapping in Redis"))?; .map_err(|_| ErrorInternalServerError("Failed to save path mapping in Redis"))?;
Ok(()) Ok(())
@ -175,37 +177,64 @@ impl AppState {
pub async fn update_filelist_from_aws(&self) { pub async fn update_filelist_from_aws(&self) {
// Получаем список объектов из AWS S3 // Получаем список объектов из AWS S3
let list_objects_v2 = self.aws_client.list_objects_v2(); let list_objects_v2 = self.aws_client.list_objects_v2();
let list_response = list_objects_v2
.bucket(&self.aws_bucket)
.send()
.await
.expect("Failed to list files from AWS S3");
// перебор списка файлов match list_objects_v2.bucket(&self.aws_bucket).send().await {
if let Some(objects) = list_response.contents { Ok(list_response) => {
for object in objects { // Перебор списка файлов
if let Some(key) = object.key { if let Some(objects) = list_response.contents {
let filename_with_extension = key.split('/').last().unwrap(); for object in objects {
if let Some(key) = object.key {
// Получаем имя файла с расширением
let filename_with_extension = key.split('/').last().unwrap();
// Убираем расширение файла // Убираем расширение файла
let filename = filename_with_extension let filename = filename_with_extension
.rsplit_once('.') .rsplit_once('.')
.map(|(name, _ext)| name) .map(|(name, _ext)| name)
.unwrap_or(filename_with_extension); // Если расширение отсутствует, возвращаем оригинальное имя .unwrap_or(filename_with_extension); // Если расширение отсутствует, возвращаем оригинальное имя
// Проверяем, существует ли файл на Storj S3 // Проверяем, существует ли файл на Storj S3
if !check_file_exists(&self.s3_client, &self.s3_bucket, filename) match check_file_exists(&self.s3_client, &self.s3_bucket, filename)
.await .await
.unwrap_or(false) {
{ Ok(false) => {
// Сохраняем маппинг пути // Сохраняем маппинг пути
self.save_path_by_filekey(filename, &key).await.unwrap(); if let Err(e) = self.save_path_by_filekey(filename, &key).await
{
eprintln!(
"Ошибка сохранения маппинга для файла {}: {:?}",
filename, e
);
} else {
println!(
"Маппинг для файла {} успешно сохранен.",
filename
);
}
}
Ok(true) => {
println!("Файл {} уже существует в Storj.", filename);
}
Err(e) => {
eprintln!(
"Ошибка при проверке файла {} на Storj: {:?}",
filename, e
);
}
}
}
} }
} else {
println!("Список файлов в AWS S3 пуст.");
} }
} }
Err(e) => {
eprintln!("Не удалось получить список файлов из AWS S3: {:?}", e);
}
} }
} }
/// создает или получает текущее значение квоты пользователя
pub async fn get_or_create_quota(&self, user_id: &str) -> Result<u64, actix_web::Error> { pub async fn get_or_create_quota(&self, user_id: &str) -> Result<u64, actix_web::Error> {
let mut redis = self.redis.clone(); let mut redis = self.redis.clone();
let quota_key = format!("quota:{}", user_id); let quota_key = format!("quota:{}", user_id);
@ -216,7 +245,7 @@ impl AppState {
if quota == 0 { if quota == 0 {
// Если квота не найдена, устанавливаем её в 0 байт и задаем TTL на одну неделю // Если квота не найдена, устанавливаем её в 0 байт и задаем TTL на одну неделю
redis redis
.set_ex(&quota_key, 0, WEEK_SECONDS) .set_ex::<&str, u64, ()>(&quota_key, 0, WEEK_SECONDS)
.await .await
.map_err(|_| { .map_err(|_| {
ErrorInternalServerError("Failed to set initial user quota in Redis") ErrorInternalServerError("Failed to set initial user quota in Redis")
@ -228,6 +257,7 @@ impl AppState {
} }
} }
/// инкрементирует значение квоты пользователя в байтах
pub async fn increment_uploaded_bytes( pub async fn increment_uploaded_bytes(
&self, &self,
user_id: &str, user_id: &str,
@ -235,29 +265,29 @@ impl AppState {
) -> Result<u64, actix_web::Error> { ) -> Result<u64, actix_web::Error> {
let mut redis = self.redis.clone(); let mut redis = self.redis.clone();
let quota_key = format!("quota:{}", user_id); let quota_key = format!("quota:{}", user_id);
// Проверяем, существует ли ключ в Redis // Проверяем, существует ли ключ в Redis
let exists: bool = redis.exists(&quota_key).await.map_err(|_| { let exists: bool = redis.exists::<_, bool>(&quota_key).await.map_err(|_| {
ErrorInternalServerError("Failed to check if user quota exists in Redis") ErrorInternalServerError("Failed to check if user quota exists in Redis")
})?; })?;
// Если ключ не существует, создаем его с начальным значением и устанавливаем TTL // Если ключ не существует, создаем его с начальным значением и устанавливаем TTL
if !exists { if !exists {
redis redis
.set_ex(&quota_key, bytes, WEEK_SECONDS) .set_ex::<_, u64, ()>(&quota_key, bytes, WEEK_SECONDS)
.await .await
.map_err(|_| { .map_err(|_| {
ErrorInternalServerError("Failed to set initial user quota in Redis") ErrorInternalServerError("Failed to set initial user quota in Redis")
})?; })?;
return Ok(bytes); return Ok(bytes);
} }
// Если ключ существует, инкрементируем его значение на заданное количество байт // Если ключ существует, инкрементируем его значение на заданное количество байт
let new_quota: u64 = redis let new_quota: u64 = redis
.incr(&quota_key, bytes) .incr::<_, u64, u64>(&quota_key, bytes)
.await .await
.map_err(|_| ErrorInternalServerError("Failed to increment user quota in Redis"))?; .map_err(|_| ErrorInternalServerError("Failed to increment user quota in Redis"))?;
Ok(new_quota) Ok(new_quota)
} }
} }

View File

@ -90,7 +90,7 @@ pub async fn user_added_file(
filename: &str, filename: &str,
) -> Result<(), actix_web::Error> { ) -> Result<(), actix_web::Error> {
redis redis
.sadd(user_id, filename) .sadd::<&str, &str, ()>(user_id, filename)
.await .await
.map_err(|_| ErrorInternalServerError("Failed to save filename in Redis"))?; // Добавляем имя файла в набор пользователя .map_err(|_| ErrorInternalServerError("Failed to save filename in Redis"))?; // Добавляем имя файла в набор пользователя
Ok(()) Ok(())

View File

@ -7,15 +7,21 @@ mod thumbnail;
use actix_web::{middleware::Logger, web, App, HttpServer}; use actix_web::{middleware::Logger, web, App, HttpServer};
use app_state::AppState; use app_state::AppState;
use handlers::{proxy_handler, upload_handler}; use handlers::{proxy_handler, upload_handler};
use tokio::task::spawn_blocking;
#[actix_web::main] #[actix_web::main]
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
let app_state = AppState::new().await; let app_state = AppState::new().await;
let app_state_clone = app_state.clone(); let app_state_clone = app_state.clone();
tokio::spawn(async move {
app_state_clone.update_filelist_from_aws().await; // Используем spawn_blocking для работы, которая не совместима с Send
app_state_clone.refresh_file_list_periodically().await; spawn_blocking(move || {
let rt = tokio::runtime::Handle::current();
rt.block_on(async move {
app_state_clone.update_filelist_from_aws().await;
app_state_clone.refresh_file_list_periodically().await;
});
}); });
HttpServer::new(move || { HttpServer::new(move || {
@ -28,4 +34,4 @@ async fn main() -> std::io::Result<()> {
.bind("127.0.0.1:8080")? .bind("127.0.0.1:8080")?
.run() .run()
.await .await
} }