From 9b1c2060d60083a18828371f466a30229ca509d2 Mon Sep 17 00:00:00 2001 From: Untone Date: Fri, 30 Aug 2024 23:27:01 +0300 Subject: [PATCH] user-id-request --- src/main.rs | 433 ++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 300 insertions(+), 133 deletions(-) diff --git a/src/main.rs b/src/main.rs index 33055cb..1d48540 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,22 +10,36 @@ use image::{imageops::FilterType, DynamicImage}; use mime_guess::MimeGuess; use redis::Client as RedisClient; use redis::{aio::MultiplexedConnection, AsyncCommands}; -use std::env; -use std::io::Cursor; +use reqwest::{ + header::{HeaderMap, HeaderValue, CONTENT_TYPE}, + Client as HTTPClient, +}; +use serde::Deserialize; +use serde_json::json; use std::path::Path; +use std::{collections::HashMap, error::Error, io::Cursor}; +use std::{env, time::Duration}; +use tokio::time::interval; -const MAX_QUOTA_BYTES: u64 = 2 * 1024 * 1024 * 1024; // 2 GB per week +const MAX_QUOTA_BYTES: u64 = 2 * 1024 * 1024 * 1024; // Лимит квоты на пользователя: 2 ГБ в неделю +const FILE_LIST_CACHE_KEY: &str = "s3_file_list_cache"; // Ключ для хранения списка файлов в Redis +const PATH_MAPPING_KEY: &str = "path_mapping"; // Ключ для хранения маппинга путей +const CHECK_INTERVAL_SECONDS: u64 = 60; // Интервал обновления кэша: 1 минута +/// Структура состояния приложения, содержащая Redis и S3 клиенты. #[derive(Clone)] struct AppState { - redis: MultiplexedConnection, - s3_client: S3Client, - s3_bucket: String, - aws_bucket: String, + redis: MultiplexedConnection, // Подключение к Redis + s3_client: S3Client, // Клиент S3 для Storj + s3_bucket: String, // Название бакета в Storj + aws_client: S3Client, // Клиент S3 для AWS + aws_bucket: String, // Название бакета в AWS } impl AppState { + /// Инициализация нового состояния приложения. async fn new() -> Self { + // Получаем конфигурацию для Redis let redis_url = env::var("REDIS_URL").expect("REDIS_URL must be set"); let redis_client = RedisClient::open(redis_url).expect("Invalid Redis URL"); let redis_connection = redis_client @@ -33,13 +47,20 @@ impl AppState { .await .unwrap(); + // Получаем конфигурацию для S3 (Storj) let s3_access_key = env::var("STORJ_ACCESS_KEY").expect("STORJ_ACCESS_KEY must be set"); let s3_secret_key = env::var("STORJ_SECRET_KEY").expect("STORJ_SECRET_KEY must be set"); let s3_endpoint = env::var("STORJ_END_POINT").expect("STORJ_END_POINT must be set"); let s3_bucket = env::var("STORJ_BUCKET_NAME").expect("STORJ_BUCKET_NAME must be set"); + + // Получаем конфигурацию для AWS S3 + let aws_access_key = env::var("AWS_ACCESS_KEY").expect("AWS_ACCESS_KEY must be set"); + let aws_secret_key = env::var("AWS_SECRET_KEY").expect("AWS_SECRET_KEY must be set"); + let aws_endpoint = env::var("AWS_END_POINT").expect("AWS_END_POINT must be set"); let aws_bucket = env::var("AWS_BUCKET_NAME").expect("AWS_BUCKET_NAME must be set"); - let config = aws_config::defaults(BehaviorVersion::latest()) + // Конфигурируем клиент S3 для Storj + let storj_config = aws_config::defaults(BehaviorVersion::latest()) .region("eu-west-1") .endpoint_url(s3_endpoint) .credentials_provider(Credentials::new( @@ -52,28 +73,132 @@ impl AppState { .load() .await; - let s3_client = S3Client::new(&config); + let s3_client = S3Client::new(&storj_config); - AppState { + // Конфигурируем клиент S3 для AWS + let aws_config = aws_config::defaults(BehaviorVersion::latest()) + .region("us-east-1") + .endpoint_url(aws_endpoint) + .credentials_provider(Credentials::new( + aws_access_key, + aws_secret_key, + None, + None, + "rust-aws-client", + )) + .load() + .await; + + let aws_client = S3Client::new(&aws_config); + + let app_state = AppState { redis: redis_connection, s3_client, s3_bucket, + aws_client, aws_bucket, + }; + + // Кэшируем список файлов из S3 при старте приложения + app_state.cache_file_list().await; + + app_state + } + + /// Кэширует список файлов из Storj S3 в Redis. + async fn cache_file_list(&self) { + let mut redis = self.redis.clone(); + + // Запрашиваем список файлов из Storj S3 + let list_objects_v2 = self.s3_client.list_objects_v2(); + let list_response = list_objects_v2 + .bucket(&self.s3_bucket) + .send() + .await + .expect("Failed to list files from S3"); + + if let Some(objects) = list_response.contents { + // Формируем список файлов + let file_list: Vec = objects + .iter() + .filter_map(|object| object.key.clone()) + .collect(); + + // Сохраняем список файлов в Redis в формате JSON + let _: () = redis + .set( + FILE_LIST_CACHE_KEY, + serde_json::to_string(&file_list).unwrap(), + ) + .await + .expect("Failed to cache file list in Redis"); } } + + /// Получает кэшированный список файлов из Redis. + async fn get_cached_file_list(&self) -> Vec { + let mut redis = self.redis.clone(); + + // Пытаемся получить кэшированный список из Redis + let cached_list: Option = redis.get(FILE_LIST_CACHE_KEY).await.unwrap_or(None); + + if let Some(cached_list) = cached_list { + // Если список найден, возвращаем его в виде вектора строк + serde_json::from_str(&cached_list).unwrap_or_else(|_| vec![]) + } else { + vec![] + } + } + + /// Периодически обновляет кэшированный список файлов из Storj S3. + async fn refresh_file_list_periodically(&self) { + let mut interval = interval(Duration::from_secs(CHECK_INTERVAL_SECONDS)); + loop { + interval.tick().await; + self.cache_file_list().await; + } + } + + /// Сохраняет маппинг старого пути из AWS S3 на новый путь в Storj S3. + async fn save_path_mapping( + &self, + old_path: &str, + new_path: &str, + ) -> Result<(), actix_web::Error> { + let mut redis = self.redis.clone(); + // Храним маппинг в формате Hash: old_path -> new_path + redis + .hset(PATH_MAPPING_KEY, old_path, new_path) + .await + .map_err(|_| ErrorInternalServerError("Failed to save path mapping in Redis"))?; + Ok(()) + } + + /// Получает новый путь для старого пути из маппинга в Redis. + async fn get_new_path(&self, old_path: &str) -> Result, actix_web::Error> { + let mut redis = self.redis.clone(); + let new_path: Option = redis + .hget(PATH_MAPPING_KEY, old_path) + .await + .map_err(|_| ErrorInternalServerError("Failed to get path mapping from Redis"))?; + Ok(new_path) + } } +/// Генерирует миниатюру изображения с заданной шириной. async fn generate_thumbnail(image: &DynamicImage, width: u32) -> Result, actix_web::Error> { - let k = image.width() / width; - let height = image.height() / k; - let thumbnail = image.resize(width, height, FilterType::Lanczos3); + let original_width = image.width(); + let scale_factor = original_width / width; + let height = image.height() / scale_factor; + let thumbnail = image.resize(width, height, FilterType::Lanczos3); // Ресайз изображения с использованием фильтра Lanczos3 let mut buffer = Vec::new(); thumbnail .write_to(&mut Cursor::new(&mut buffer), image::ImageFormat::Jpeg) - .map_err(|_| ErrorInternalServerError("Failed to generate thumbnail"))?; + .map_err(|_| ErrorInternalServerError("Failed to generate thumbnail"))?; // Сохранение изображения в формате JPEG Ok(buffer) } +/// Загружает файл в S3 хранилище. async fn upload_to_s3( s3_client: &S3Client, bucket: &str, @@ -81,7 +206,7 @@ async fn upload_to_s3( body: Vec, content_type: &str, ) -> Result { - let body_stream = ByteStream::from(body); + let body_stream = ByteStream::from(body); // Преобразуем тело файла в поток байтов s3_client .put_object() .bucket(bucket) @@ -90,41 +215,44 @@ async fn upload_to_s3( .content_type(content_type) .send() .await - .map_err(|_| ErrorInternalServerError("Failed to upload file to S3"))?; + .map_err(|_| ErrorInternalServerError("Failed to upload file to S3"))?; // Загрузка файла в S3 - Ok(key.to_string()) + Ok(key.to_string()) // Возвращаем ключ файла } +/// Проверяет, существует ли файл в S3. async fn check_file_exists( s3_client: &S3Client, bucket: &str, key: &str, ) -> Result { match s3_client.head_object().bucket(bucket).key(key).send().await { - Ok(_) => Ok(true), + Ok(_) => Ok(true), // Файл найден Err(SdkError::ServiceError(service_error)) if service_error.err().is_not_found() => { - Ok(false) + Ok(false) // Файл не найден } - Err(e) => Err(ErrorInternalServerError(e.to_string())), + Err(e) => Err(ErrorInternalServerError(e.to_string())), // Ошибка при проверке } } +/// Проверяет и обновляет квоту пользователя. async fn check_and_update_quota( redis: &mut MultiplexedConnection, user_id: &str, file_size: u64, ) -> Result<(), actix_web::Error> { - let current_quota: u64 = redis.get(user_id).await.unwrap_or(0); + let current_quota: u64 = redis.get(user_id).await.unwrap_or(0); // Получаем текущую квоту пользователя if current_quota + file_size > MAX_QUOTA_BYTES { - return Err(ErrorUnauthorized("Quota exceeded")); + return Err(ErrorUnauthorized("Quota exceeded")); // Квота превышена } redis .incr(user_id, file_size) .await - .map_err(|_| ErrorInternalServerError("Failed to update quota in Redis"))?; + .map_err(|_| ErrorInternalServerError("Failed to update quota in Redis"))?; // Увеличиваем использованную квоту Ok(()) } +/// Сохраняет имя файла в Redis для пользователя. async fn save_filename_in_redis( redis: &mut MultiplexedConnection, user_id: &str, @@ -133,19 +261,16 @@ async fn save_filename_in_redis( redis .sadd(user_id, filename) .await - .map_err(|_| ErrorInternalServerError("Failed to save filename in Redis"))?; + .map_err(|_| ErrorInternalServerError("Failed to save filename in Redis"))?; // Добавляем имя файла в набор пользователя Ok(()) } -async fn upload_files_from_aws( - aws_client: &S3Client, - aws_bucket: &str, - storj_client: &S3Client, - storj_bucket: &str, -) -> Result<(), actix_web::Error> { - let list_objects_v2 = aws_client.list_objects_v2(); +/// Загружает файлы из AWS S3 в Storj S3 и сохраняет маппинг путей. +async fn upload_files_from_aws(app_state: &AppState) -> Result<(), actix_web::Error> { + // Получаем список объектов из AWS S3 + let list_objects_v2 = app_state.aws_client.list_objects_v2(); let list_response = list_objects_v2 - .bucket(aws_bucket) + .bucket(app_state.aws_bucket.clone()) .send() .await .map_err(|_| ErrorInternalServerError("Failed to list files from AWS S3"))?; @@ -153,10 +278,11 @@ async fn upload_files_from_aws( if let Some(objects) = list_response.contents { for object in objects { if let Some(key) = object.key { - // Get the object from AWS S3 - let object_response = aws_client + // Получаем объект из AWS S3 + let object_response = app_state + .aws_client .get_object() - .bucket(aws_bucket) + .bucket(app_state.aws_bucket.clone()) .key(&key) .send() .await @@ -171,16 +297,26 @@ async fn upload_files_from_aws( .content_type .unwrap_or_else(|| "application/octet-stream".to_string()); - // Upload the object to Storj S3 + // Определяем новый ключ для Storj S3 (например, сохраняем в корне с тем же именем) + let new_key = Path::new(&key) + .file_name() + .and_then(|name| name.to_str()) + .unwrap_or(&key) + .to_string(); + + // Загружаем объект в Storj S3 let storj_url = upload_to_s3( - storj_client, - storj_bucket, - &key, + &app_state.s3_client, + &app_state.s3_bucket, + &new_key, body.into_bytes().to_vec(), &content_type, ) .await?; + // Сохраняем маппинг старого пути на новый + app_state.save_path_mapping(&key, &new_key).await?; + println!("Uploaded {} to Storj at {}", key, storj_url); } } @@ -189,129 +325,160 @@ async fn upload_files_from_aws( Ok(()) } +// Структура для десериализации ответа от сервиса аутентификации +#[derive(Deserialize)] +struct AuthResponse { + data: Option, +} + +#[derive(Deserialize)] +struct AuthData { + validate_jwt_token: Option, +} + +#[derive(Deserialize)] +struct ValidateJWTToken { + is_valid: bool, + claims: Option, +} + +#[derive(Deserialize)] +struct Claims { + sub: Option, +} + +pub async fn get_id_by_token(token: &str) -> Result> { + let auth_api_base = env::var("AUTH_URL")?; + let query_name = "validate_jwt_token"; + let operation = "ValidateToken"; + let mut headers = HeaderMap::new(); + headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); + + let mut variables = HashMap::>::new(); + let mut params = HashMap::::new(); + params.insert("token".to_string(), token.to_string()); + params.insert("token_type".to_string(), "access_token".to_string()); + variables.insert("params".to_string(), params); + + let gql = json!({ + "query": format!("query {}($params: ValidateJWTTokenInput!) {{ {}(params: $params) {{ is_valid claims }} }}", operation, query_name), + "operationName": operation, + "variables": variables + }); + + let client = HTTPClient::new(); + let response = client + .post(&auth_api_base) + .headers(headers) + .json(&gql) + .send() + .await?; + + if response.status().is_success() { + let auth_response: AuthResponse = response.json().await?; + if let Some(auth_data) = auth_response.data { + if let Some(validate_jwt_token) = auth_data.validate_jwt_token { + if validate_jwt_token.is_valid { + if let Some(claims) = validate_jwt_token.claims { + if let Some(sub) = claims.sub { + return Ok(sub); + } + } + } + } + } + Err(Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + "Invalid token response", + ))) + } else { + Err(Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + format!("Request failed with status: {}", response.status()), + ))) + } +} + +/// Обработчик прокси-запросов. async fn proxy_handler( req: HttpRequest, path: web::Path, state: web::Data, ) -> Result { + // Получаем токен из заголовка авторизации let token = req .headers() .get("Authorization") .and_then(|header_value| header_value.to_str().ok()); if token.is_none() { - return Err(ErrorUnauthorized("Unauthorized")); + return Err(ErrorUnauthorized("Unauthorized")); // Если токен отсутствует, возвращаем ошибку } - let user_id = token.unwrap(); // Assuming the token is the user ID + let user_id = get_id_by_token(token.unwrap()).await?; + let requested_path = path.into_inner(); // Полученный путь из запроса - let file_path = path.into_inner(); - let mime_type = MimeGuess::from_path(&file_path).first_or_octet_stream(); - let extension = Path::new(&file_path) - .extension() - .and_then(|ext| ext.to_str()) - .unwrap_or("bin"); - - if mime_type.type_() == "image" { - let image = image::open(&file_path) - .map_err(|_| ErrorInternalServerError("Failed to open image"))?; - - // Define thumbnail sizes - let thumbnail_sizes = vec![40, 110, 300, 600, 800]; - - for width in thumbnail_sizes { - let thumbnail_key = format!("{}_{}.jpg", file_path, width); - let thumbnail_data = generate_thumbnail(&image, width).await?; - - // Check if thumbnail already exists - if !check_file_exists(&state.s3_client, &state.s3_bucket, &thumbnail_key).await? { - upload_to_s3( - &state.s3_client, - &state.s3_bucket, - &thumbnail_key, - thumbnail_data, - "image/jpeg", - ) - .await?; - } - } - - // Prepare original image data - let mut original_buffer = Vec::new(); - image - .write_to( - &mut Cursor::new(&mut original_buffer), - image::ImageFormat::Jpeg, - ) - .map_err(|_| ErrorInternalServerError("Failed to read image data"))?; - - // Upload the original image - let image_key = format!("{}.{}", file_path, extension); - let image_url = upload_to_s3( - &state.s3_client, - &state.s3_bucket, - &image_key, - original_buffer.clone(), - mime_type.essence_str(), - ) - .await?; - - // Update quota and save filename - check_and_update_quota( - &mut state.redis.clone(), - user_id, - original_buffer.len() as u64, - ) - .await?; - save_filename_in_redis(&mut state.redis.clone(), user_id, &image_key).await?; - - return Ok( - HttpResponse::Ok().body(format!("Image and thumbnails uploaded to: {}", image_url)) - ); + // Проверяем, есть ли маппинг для старого пути + if let Some(new_path) = state.get_new_path(&requested_path).await? { + // Используем новый путь для доступа к файлу + return serve_file(&new_path, &state).await; } - // Handle non-image files - let file_data = - std::fs::read(&file_path).map_err(|_| ErrorInternalServerError("Failed to read file"))?; - let file_size = file_data.len() as u64; + // Если маппинга нет, предполагаем, что путь является новым + serve_file(&requested_path, &state).await +} - // Check and update the user's quota - check_and_update_quota(&mut state.redis.clone(), user_id, file_size).await?; +/// Функция для обслуживания файла по заданному пути. +async fn serve_file(file_key: &str, state: &AppState) -> Result { + // Проверяем наличие файла в Storj S3 + if !check_file_exists(&state.s3_client, &state.s3_bucket, file_key).await? { + return Err(ErrorInternalServerError("File not found in S3")); + } - // Upload the file - let file_key = format!("{}.{}", file_path, extension); - let file_url = upload_to_s3( - &state.s3_client, - &state.s3_bucket, - &file_key, - file_data, - mime_type.essence_str(), - ) - .await?; + // Получаем объект из Storj S3 + let get_object_output = state + .s3_client + .get_object() + .bucket(&state.s3_bucket) + .key(file_key) + .send() + .await + .map_err(|_| ErrorInternalServerError("Failed to get object from S3"))?; - // Save the filename in Redis for this user - save_filename_in_redis(&mut state.redis.clone(), user_id, &file_key).await?; - Ok(HttpResponse::Ok().body(format!("File uploaded to: {}", file_url))) + let data = get_object_output + .body + .collect() + .await + .map_err(|_| ErrorInternalServerError("Failed to read object body"))?; + + let mime_type = MimeGuess::from_path(file_key).first_or_octet_stream(); // Определяем MIME-тип файла + + Ok(HttpResponse::Ok() + .content_type(mime_type.as_ref()) + .body(data.into_bytes())) } #[actix_web::main] async fn main() -> std::io::Result<()> { + // Инициализируем состояние приложения let app_state = AppState::new().await; - // Example of uploading files from AWS S3 to Storj - upload_files_from_aws( - &app_state.s3_client, - &app_state.aws_bucket, - &app_state.s3_client, - &app_state.s3_bucket, - ) - .await - .expect("Failed to upload files from AWS to Storj"); + let app_state_clone = app_state.clone(); + tokio::spawn(async move { + // Запускаем задачу обновления списка файлов в фоне + app_state_clone.refresh_file_list_periodically().await; + }); + // Загружаем файлы из AWS S3 в Storj S3 и сохраняем маппинг путей + upload_files_from_aws(&app_state) + .await + .expect("Failed to upload files from AWS to Storj"); + + // Запускаем HTTP сервер HttpServer::new(move || { App::new() .app_data(web::Data::new(app_state.clone())) .wrap(Logger::default()) - .route("/{path:.*}", web::get().to(proxy_handler)) + .route("/{path:.*}", web::get().to(proxy_handler)) // Маршрутизация всех GET запросов на proxy_handler }) .bind("127.0.0.1:8080")? .run()