parent
ca97ecf128
commit
fd3f8371cc
125
src/app_state.rs
125
src/app_state.rs
|
@ -5,18 +5,15 @@ use redis::{aio::MultiplexedConnection, AsyncCommands, Client as RedisClient};
|
|||
use std::{env, time::Duration};
|
||||
use tokio::time::interval;
|
||||
use std::collections::HashMap;
|
||||
use crate::s3_utils::check_file_exists;
|
||||
use crate::s3_utils::get_s3_filelist;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AppState {
|
||||
pub redis: MultiplexedConnection,
|
||||
pub storj_client: S3Client,
|
||||
pub storj_bucket: String,
|
||||
pub aws_client: S3Client,
|
||||
pub aws_bucket: String,
|
||||
pub storj_bucket: String
|
||||
}
|
||||
|
||||
// const FILE_LIST_CACHE_KEY: &str = "s3_file_list_cache"; // Ключ для хранения списка файлов в Redis
|
||||
const PATH_MAPPING_KEY: &str = "filepath_mapping"; // Ключ для хранения маппинга путей
|
||||
const CHECK_INTERVAL_SECONDS: u64 = 60 * 60; // Интервал обновления списка файлов: 1 час
|
||||
const WEEK_SECONDS: u64 = 604800;
|
||||
|
@ -40,11 +37,11 @@ impl AppState {
|
|||
let storj_bucket = env::var("STORJ_BUCKET_NAME").unwrap_or_else(|_| "discours-io".to_string());
|
||||
|
||||
// Получаем конфигурацию для AWS S3
|
||||
let aws_access_key = env::var("AWS_ACCESS_KEY").expect("AWS_ACCESS_KEY must be set");
|
||||
let aws_secret_key = env::var("AWS_SECRET_KEY").expect("AWS_SECRET_KEY must be set");
|
||||
let aws_endpoint =
|
||||
env::var("AWS_END_POINT").unwrap_or_else(|_| "https://s3.amazonaws.com".to_string());
|
||||
let aws_bucket = env::var("AWS_BUCKET_NAME").unwrap_or_else(|_| "discours-io".to_string());
|
||||
// let aws_access_key = env::var("AWS_ACCESS_KEY").expect("AWS_ACCESS_KEY must be set");
|
||||
// let aws_secret_key = env::var("AWS_SECRET_KEY").expect("AWS_SECRET_KEY must be set");
|
||||
// let aws_endpoint =
|
||||
// env::var("AWS_END_POINT").unwrap_or_else(|_| "https://s3.amazonaws.com".to_string());
|
||||
// let aws_bucket = env::var("AWS_BUCKET_NAME").unwrap_or_else(|_| "discours-io".to_string());
|
||||
|
||||
// Конфигурируем клиент S3 для Storj
|
||||
let storj_config = aws_config::defaults(BehaviorVersion::latest())
|
||||
|
@ -63,7 +60,7 @@ impl AppState {
|
|||
let storj_client = S3Client::new(&storj_config);
|
||||
|
||||
// Конфигурируем клиент S3 для AWS
|
||||
let aws_config = aws_config::defaults(BehaviorVersion::latest())
|
||||
/* let aws_config = aws_config::defaults(BehaviorVersion::latest())
|
||||
.region("eu-west-1")
|
||||
.endpoint_url(aws_endpoint)
|
||||
.credentials_provider(Credentials::new(
|
||||
|
@ -75,15 +72,13 @@ impl AppState {
|
|||
))
|
||||
.load()
|
||||
.await;
|
||||
|
||||
let aws_client = S3Client::new(&aws_config);
|
||||
*/
|
||||
// let aws_client = S3Client::new(&aws_config);
|
||||
|
||||
let app_state = AppState {
|
||||
redis: redis_connection,
|
||||
storj_client,
|
||||
storj_bucket,
|
||||
aws_client,
|
||||
aws_bucket,
|
||||
storj_bucket
|
||||
};
|
||||
|
||||
// Кэшируем список файлов из Storj S3 при старте приложения
|
||||
|
@ -97,39 +92,17 @@ impl AppState {
|
|||
let mut redis = self.redis.clone();
|
||||
|
||||
// Запрашиваем список файлов из Storj S3
|
||||
let list_objects_v2 = self.storj_client.list_objects_v2();
|
||||
let list_response = list_objects_v2
|
||||
.bucket(&self.storj_bucket)
|
||||
.send()
|
||||
.await
|
||||
.expect("Failed to list files from Storj");
|
||||
|
||||
if let Some(objects) = list_response.contents {
|
||||
// Формируем список файлов без дубликатов по имени файла (без расширения)
|
||||
|
||||
for object in objects.iter() {
|
||||
if let Some(storj_filepath) = &object.key {
|
||||
let filepath = match storj_filepath.ends_with("/webp") {
|
||||
true => &storj_filepath.replace("/webp", ""),
|
||||
false => storj_filepath,
|
||||
};
|
||||
let mut parts = filepath.split('/').collect::<Vec<&str>>(); // Explicit type annotation
|
||||
let filename = parts.pop().unwrap();
|
||||
let mut filename_parts = filename.split('.').collect::<Vec<&str>>();
|
||||
let _ext = filename_parts.pop().unwrap();
|
||||
let filekey = filename_parts.pop().unwrap();
|
||||
let filekeyed_list = get_s3_filelist(&self.storj_client, &self.storj_bucket).await;
|
||||
|
||||
for [filekey, filepath] in filekeyed_list {
|
||||
// Сохраняем список файлов в Redis, используя HSET для каждого файла
|
||||
let _: () = redis
|
||||
.hset(PATH_MAPPING_KEY, filekey, storj_filepath)
|
||||
.hset(PATH_MAPPING_KEY, filekey, filepath)
|
||||
.await
|
||||
.expect("Failed to cache file in Redis");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/// Получает кэшированный список файлов из Redis.
|
||||
pub async fn get_cached_file_list(&self) -> Vec<String> {
|
||||
let mut redis = self.redis.clone();
|
||||
|
@ -150,22 +123,6 @@ impl AppState {
|
|||
}
|
||||
}
|
||||
|
||||
/// Сохраняет маппинг старого пути из AWS S3 на новый путь в Storj S3.
|
||||
async fn save_aws2storj_mapping(
|
||||
&self,
|
||||
aws_filekey: &str,
|
||||
storj_filekey: &str,
|
||||
) -> Result<(), actix_web::Error> {
|
||||
let mut redis = self.redis.clone();
|
||||
// Храним маппинг в формате Hash: old_path -> new_path
|
||||
redis
|
||||
.hset::<_, &str, &str, ()>(PATH_MAPPING_KEY, aws_filekey, storj_filekey)
|
||||
.await
|
||||
.map_err(|_| ErrorInternalServerError("Failed to save path mapping in Redis"))?;
|
||||
// println!("[ok] {}", storj_filekey);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Получает путь в Storj из ключа (имени файла) в Redis.
|
||||
pub async fn get_path(&self, file_key: &str) -> Result<Option<String>, actix_web::Error> {
|
||||
let mut redis = self.redis.clone();
|
||||
|
@ -176,60 +133,6 @@ impl AppState {
|
|||
Ok(new_path)
|
||||
}
|
||||
|
||||
/// Обновляет Storj S3 данными из Amazon S3
|
||||
pub async fn cache_aws_filelist(&self) {
|
||||
// Получаем список объектов из AWS S3
|
||||
let list_objects_v2 = self.aws_client.list_objects_v2();
|
||||
|
||||
match list_objects_v2.bucket(&self.aws_bucket).send().await {
|
||||
Ok(list_response) => {
|
||||
// Перебор списка файлов
|
||||
if let Some(objects) = list_response.contents {
|
||||
for object in objects {
|
||||
if let Some(key) = object.key {
|
||||
// Получаем имя файла с расширением
|
||||
let parts: Vec<&str> = key.split('.').collect();
|
||||
let storj_filekey = parts.first().and_then(|s| s.split('/').last()).unwrap_or(parts.first().unwrap());
|
||||
|
||||
if storj_filekey.is_empty() && !storj_filekey.ends_with('/') {
|
||||
eprint!("empty filename: {}\n", key);
|
||||
} else {
|
||||
// Проверяем, существует ли файл на Storj S3
|
||||
match check_file_exists(&self.storj_client, &self.storj_bucket, &storj_filekey).await
|
||||
{
|
||||
Ok(false) => {
|
||||
// Сохраняем маппинг пути
|
||||
if let Err(e) =
|
||||
self.save_aws2storj_mapping(&key, &storj_filekey).await
|
||||
{
|
||||
eprintln!("save {}: {:?}", key, e);
|
||||
} else {
|
||||
println!("[ok] {}", key);
|
||||
}
|
||||
}
|
||||
Ok(true) => {
|
||||
println!("[skip] {}", storj_filekey);
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!(
|
||||
"check {}: {:?}",
|
||||
storj_filekey, e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
println!("AWS S3 file list is empty.");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("get AWS S3 file list: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// создает или получает текущее значение квоты пользователя
|
||||
pub async fn get_or_create_quota(&self, user_id: &str) -> Result<u64, actix_web::Error> {
|
||||
let mut redis = self.redis.clone();
|
||||
|
|
|
@ -5,13 +5,23 @@ use crate::app_state::AppState;
|
|||
use crate::s3_utils::check_file_exists;
|
||||
|
||||
/// Функция для обслуживания файла по заданному пути.
|
||||
pub async fn serve_file(file_key: &str, state: &AppState) -> Result<HttpResponse, actix_web::Error> {
|
||||
// Проверяем наличие файла в Storj S3
|
||||
if !check_file_exists(&state.storj_client, &state.storj_bucket, &file_key).await? {
|
||||
return Err(ErrorInternalServerError(format!("File {} not found in Storj", file_key)));
|
||||
}
|
||||
pub async fn serve_file(file_path: &str, state: &AppState) -> Result<HttpResponse, actix_web::Error> {
|
||||
let filepath = match file_path.ends_with("/webp") {
|
||||
true => &file_path.replace("/webp", ""),
|
||||
false => file_path,
|
||||
};
|
||||
let mut parts = filepath.split('/').collect::<Vec<&str>>(); // Explicit type annotation
|
||||
let filename = parts.pop().unwrap();
|
||||
let mut filename_parts = filename.split('.').collect::<Vec<&str>>();
|
||||
let _ext = filename_parts.pop().unwrap();
|
||||
let filekey = filename_parts.pop().unwrap();
|
||||
|
||||
let file_path_in_storj = state.get_path(file_key).await.unwrap().unwrap();
|
||||
let file_path_in_storj = state.get_path(filekey).await.unwrap().unwrap();
|
||||
|
||||
// Проверяем наличие файла в Storj S3
|
||||
if !check_file_exists(&state.storj_client, &state.storj_bucket, &file_path_in_storj).await? {
|
||||
return Err(ErrorInternalServerError(format!("File {} not found in Storj", file_path_in_storj)));
|
||||
}
|
||||
|
||||
// Получаем объект из Storj S3
|
||||
let get_object_output = state
|
||||
|
|
|
@ -22,7 +22,7 @@ async fn main() -> std::io::Result<()> {
|
|||
spawn_blocking(move || {
|
||||
let rt = tokio::runtime::Handle::current();
|
||||
rt.block_on(async move {
|
||||
app_state_clone.cache_aws_filelist().await;
|
||||
app_state_clone.cache_storj_filelist().await;
|
||||
app_state_clone.refresh_file_list_periodically().await;
|
||||
});
|
||||
});
|
||||
|
|
|
@ -27,11 +27,11 @@ pub async fn upload_to_s3(
|
|||
|
||||
/// Проверяет, существует ли файл в S3.
|
||||
pub async fn check_file_exists(
|
||||
storj_client: &S3Client,
|
||||
s3_client: &S3Client,
|
||||
bucket: &str,
|
||||
file_key: &str,
|
||||
filepath: &str,
|
||||
) -> Result<bool, actix_web::Error> {
|
||||
match storj_client.head_object().bucket(bucket).key(file_key).send().await {
|
||||
match s3_client.head_object().bucket(bucket).key(filepath).send().await {
|
||||
Ok(_) => Ok(true), // Файл найден
|
||||
Err(SdkError::ServiceError(service_error)) if service_error.err().is_not_found() => {
|
||||
Ok(false) // Файл не найден
|
||||
|
@ -74,3 +74,34 @@ pub fn generate_key_with_extension(base_key: String, mime_type: String) -> Strin
|
|||
}
|
||||
base_key
|
||||
}
|
||||
|
||||
/// список файлов из S3
|
||||
pub async fn get_s3_filelist(client: &S3Client, bucket: &str) -> Vec<[std::string::String; 2]> {
|
||||
let mut filekeys = Vec::new();
|
||||
// Запрашиваем список файлов из S3
|
||||
let list_objects_v2 = client.list_objects_v2();
|
||||
let list_response = list_objects_v2
|
||||
.bucket(bucket)
|
||||
.send()
|
||||
.await
|
||||
.expect("Failed to list files from Storj");
|
||||
|
||||
if let Some(objects) = list_response.contents {
|
||||
for object in objects.iter() {
|
||||
if let Some(s3_filepath) = &object.key {
|
||||
let filepath = match s3_filepath.ends_with("/webp") {
|
||||
true => &s3_filepath.replace("/webp", ""),
|
||||
false => s3_filepath,
|
||||
};
|
||||
let mut parts = filepath.split('/').collect::<Vec<&str>>(); // Explicit type annotation
|
||||
let filename = parts.pop().unwrap();
|
||||
let mut filename_parts = filename.split('.').collect::<Vec<&str>>();
|
||||
let _ext = filename_parts.pop().unwrap();
|
||||
let filekey = filename_parts.pop().unwrap();
|
||||
|
||||
filekeys.push([filekey.to_string(), s3_filepath.to_string()]);
|
||||
}
|
||||
}
|
||||
}
|
||||
filekeys
|
||||
}
|
Loading…
Reference in New Issue
Block a user