user-id-request
Some checks failed
deploy / deploy (push) Failing after 4s

This commit is contained in:
Untone 2024-08-30 23:27:01 +03:00
parent d20f19c2e0
commit 9b1c2060d6

View File

@ -10,22 +10,36 @@ use image::{imageops::FilterType, DynamicImage};
use mime_guess::MimeGuess;
use redis::Client as RedisClient;
use redis::{aio::MultiplexedConnection, AsyncCommands};
use std::env;
use std::io::Cursor;
use reqwest::{
header::{HeaderMap, HeaderValue, CONTENT_TYPE},
Client as HTTPClient,
};
use serde::Deserialize;
use serde_json::json;
use std::path::Path;
use std::{collections::HashMap, error::Error, io::Cursor};
use std::{env, time::Duration};
use tokio::time::interval;
const MAX_QUOTA_BYTES: u64 = 2 * 1024 * 1024 * 1024; // 2 GB per week
const MAX_QUOTA_BYTES: u64 = 2 * 1024 * 1024 * 1024; // Лимит квоты на пользователя: 2 ГБ в неделю
const FILE_LIST_CACHE_KEY: &str = "s3_file_list_cache"; // Ключ для хранения списка файлов в Redis
const PATH_MAPPING_KEY: &str = "path_mapping"; // Ключ для хранения маппинга путей
const CHECK_INTERVAL_SECONDS: u64 = 60; // Интервал обновления кэша: 1 минута
/// Структура состояния приложения, содержащая Redis и S3 клиенты.
#[derive(Clone)]
struct AppState {
redis: MultiplexedConnection,
s3_client: S3Client,
s3_bucket: String,
aws_bucket: String,
redis: MultiplexedConnection, // Подключение к Redis
s3_client: S3Client, // Клиент S3 для Storj
s3_bucket: String, // Название бакета в Storj
aws_client: S3Client, // Клиент S3 для AWS
aws_bucket: String, // Название бакета в AWS
}
impl AppState {
/// Инициализация нового состояния приложения.
async fn new() -> Self {
// Получаем конфигурацию для Redis
let redis_url = env::var("REDIS_URL").expect("REDIS_URL must be set");
let redis_client = RedisClient::open(redis_url).expect("Invalid Redis URL");
let redis_connection = redis_client
@ -33,13 +47,20 @@ impl AppState {
.await
.unwrap();
// Получаем конфигурацию для S3 (Storj)
let s3_access_key = env::var("STORJ_ACCESS_KEY").expect("STORJ_ACCESS_KEY must be set");
let s3_secret_key = env::var("STORJ_SECRET_KEY").expect("STORJ_SECRET_KEY must be set");
let s3_endpoint = env::var("STORJ_END_POINT").expect("STORJ_END_POINT must be set");
let s3_bucket = env::var("STORJ_BUCKET_NAME").expect("STORJ_BUCKET_NAME must be set");
// Получаем конфигурацию для AWS S3
let aws_access_key = env::var("AWS_ACCESS_KEY").expect("AWS_ACCESS_KEY must be set");
let aws_secret_key = env::var("AWS_SECRET_KEY").expect("AWS_SECRET_KEY must be set");
let aws_endpoint = env::var("AWS_END_POINT").expect("AWS_END_POINT must be set");
let aws_bucket = env::var("AWS_BUCKET_NAME").expect("AWS_BUCKET_NAME must be set");
let config = aws_config::defaults(BehaviorVersion::latest())
// Конфигурируем клиент S3 для Storj
let storj_config = aws_config::defaults(BehaviorVersion::latest())
.region("eu-west-1")
.endpoint_url(s3_endpoint)
.credentials_provider(Credentials::new(
@ -52,28 +73,132 @@ impl AppState {
.load()
.await;
let s3_client = S3Client::new(&config);
let s3_client = S3Client::new(&storj_config);
AppState {
// Конфигурируем клиент S3 для AWS
let aws_config = aws_config::defaults(BehaviorVersion::latest())
.region("us-east-1")
.endpoint_url(aws_endpoint)
.credentials_provider(Credentials::new(
aws_access_key,
aws_secret_key,
None,
None,
"rust-aws-client",
))
.load()
.await;
let aws_client = S3Client::new(&aws_config);
let app_state = AppState {
redis: redis_connection,
s3_client,
s3_bucket,
aws_client,
aws_bucket,
};
// Кэшируем список файлов из S3 при старте приложения
app_state.cache_file_list().await;
app_state
}
/// Кэширует список файлов из Storj S3 в Redis.
async fn cache_file_list(&self) {
let mut redis = self.redis.clone();
// Запрашиваем список файлов из Storj S3
let list_objects_v2 = self.s3_client.list_objects_v2();
let list_response = list_objects_v2
.bucket(&self.s3_bucket)
.send()
.await
.expect("Failed to list files from S3");
if let Some(objects) = list_response.contents {
// Формируем список файлов
let file_list: Vec<String> = objects
.iter()
.filter_map(|object| object.key.clone())
.collect();
// Сохраняем список файлов в Redis в формате JSON
let _: () = redis
.set(
FILE_LIST_CACHE_KEY,
serde_json::to_string(&file_list).unwrap(),
)
.await
.expect("Failed to cache file list in Redis");
}
}
/// Получает кэшированный список файлов из Redis.
async fn get_cached_file_list(&self) -> Vec<String> {
let mut redis = self.redis.clone();
// Пытаемся получить кэшированный список из Redis
let cached_list: Option<String> = redis.get(FILE_LIST_CACHE_KEY).await.unwrap_or(None);
if let Some(cached_list) = cached_list {
// Если список найден, возвращаем его в виде вектора строк
serde_json::from_str(&cached_list).unwrap_or_else(|_| vec![])
} else {
vec![]
}
}
/// Периодически обновляет кэшированный список файлов из Storj S3.
async fn refresh_file_list_periodically(&self) {
let mut interval = interval(Duration::from_secs(CHECK_INTERVAL_SECONDS));
loop {
interval.tick().await;
self.cache_file_list().await;
}
}
/// Сохраняет маппинг старого пути из AWS S3 на новый путь в Storj S3.
async fn save_path_mapping(
&self,
old_path: &str,
new_path: &str,
) -> Result<(), actix_web::Error> {
let mut redis = self.redis.clone();
// Храним маппинг в формате Hash: old_path -> new_path
redis
.hset(PATH_MAPPING_KEY, old_path, new_path)
.await
.map_err(|_| ErrorInternalServerError("Failed to save path mapping in Redis"))?;
Ok(())
}
/// Получает новый путь для старого пути из маппинга в Redis.
async fn get_new_path(&self, old_path: &str) -> Result<Option<String>, actix_web::Error> {
let mut redis = self.redis.clone();
let new_path: Option<String> = redis
.hget(PATH_MAPPING_KEY, old_path)
.await
.map_err(|_| ErrorInternalServerError("Failed to get path mapping from Redis"))?;
Ok(new_path)
}
}
/// Генерирует миниатюру изображения с заданной шириной.
async fn generate_thumbnail(image: &DynamicImage, width: u32) -> Result<Vec<u8>, actix_web::Error> {
let k = image.width() / width;
let height = image.height() / k;
let thumbnail = image.resize(width, height, FilterType::Lanczos3);
let original_width = image.width();
let scale_factor = original_width / width;
let height = image.height() / scale_factor;
let thumbnail = image.resize(width, height, FilterType::Lanczos3); // Ресайз изображения с использованием фильтра Lanczos3
let mut buffer = Vec::new();
thumbnail
.write_to(&mut Cursor::new(&mut buffer), image::ImageFormat::Jpeg)
.map_err(|_| ErrorInternalServerError("Failed to generate thumbnail"))?;
.map_err(|_| ErrorInternalServerError("Failed to generate thumbnail"))?; // Сохранение изображения в формате JPEG
Ok(buffer)
}
/// Загружает файл в S3 хранилище.
async fn upload_to_s3(
s3_client: &S3Client,
bucket: &str,
@ -81,7 +206,7 @@ async fn upload_to_s3(
body: Vec<u8>,
content_type: &str,
) -> Result<String, actix_web::Error> {
let body_stream = ByteStream::from(body);
let body_stream = ByteStream::from(body); // Преобразуем тело файла в поток байтов
s3_client
.put_object()
.bucket(bucket)
@ -90,41 +215,44 @@ async fn upload_to_s3(
.content_type(content_type)
.send()
.await
.map_err(|_| ErrorInternalServerError("Failed to upload file to S3"))?;
.map_err(|_| ErrorInternalServerError("Failed to upload file to S3"))?; // Загрузка файла в S3
Ok(key.to_string())
Ok(key.to_string()) // Возвращаем ключ файла
}
/// Проверяет, существует ли файл в S3.
async fn check_file_exists(
s3_client: &S3Client,
bucket: &str,
key: &str,
) -> Result<bool, actix_web::Error> {
match s3_client.head_object().bucket(bucket).key(key).send().await {
Ok(_) => Ok(true),
Ok(_) => Ok(true), // Файл найден
Err(SdkError::ServiceError(service_error)) if service_error.err().is_not_found() => {
Ok(false)
Ok(false) // Файл не найден
}
Err(e) => Err(ErrorInternalServerError(e.to_string())),
Err(e) => Err(ErrorInternalServerError(e.to_string())), // Ошибка при проверке
}
}
/// Проверяет и обновляет квоту пользователя.
async fn check_and_update_quota(
redis: &mut MultiplexedConnection,
user_id: &str,
file_size: u64,
) -> Result<(), actix_web::Error> {
let current_quota: u64 = redis.get(user_id).await.unwrap_or(0);
let current_quota: u64 = redis.get(user_id).await.unwrap_or(0); // Получаем текущую квоту пользователя
if current_quota + file_size > MAX_QUOTA_BYTES {
return Err(ErrorUnauthorized("Quota exceeded"));
return Err(ErrorUnauthorized("Quota exceeded")); // Квота превышена
}
redis
.incr(user_id, file_size)
.await
.map_err(|_| ErrorInternalServerError("Failed to update quota in Redis"))?;
.map_err(|_| ErrorInternalServerError("Failed to update quota in Redis"))?; // Увеличиваем использованную квоту
Ok(())
}
/// Сохраняет имя файла в Redis для пользователя.
async fn save_filename_in_redis(
redis: &mut MultiplexedConnection,
user_id: &str,
@ -133,19 +261,16 @@ async fn save_filename_in_redis(
redis
.sadd(user_id, filename)
.await
.map_err(|_| ErrorInternalServerError("Failed to save filename in Redis"))?;
.map_err(|_| ErrorInternalServerError("Failed to save filename in Redis"))?; // Добавляем имя файла в набор пользователя
Ok(())
}
async fn upload_files_from_aws(
aws_client: &S3Client,
aws_bucket: &str,
storj_client: &S3Client,
storj_bucket: &str,
) -> Result<(), actix_web::Error> {
let list_objects_v2 = aws_client.list_objects_v2();
/// Загружает файлы из AWS S3 в Storj S3 и сохраняет маппинг путей.
async fn upload_files_from_aws(app_state: &AppState) -> Result<(), actix_web::Error> {
// Получаем список объектов из AWS S3
let list_objects_v2 = app_state.aws_client.list_objects_v2();
let list_response = list_objects_v2
.bucket(aws_bucket)
.bucket(app_state.aws_bucket.clone())
.send()
.await
.map_err(|_| ErrorInternalServerError("Failed to list files from AWS S3"))?;
@ -153,10 +278,11 @@ async fn upload_files_from_aws(
if let Some(objects) = list_response.contents {
for object in objects {
if let Some(key) = object.key {
// Get the object from AWS S3
let object_response = aws_client
// Получаем объект из AWS S3
let object_response = app_state
.aws_client
.get_object()
.bucket(aws_bucket)
.bucket(app_state.aws_bucket.clone())
.key(&key)
.send()
.await
@ -171,16 +297,26 @@ async fn upload_files_from_aws(
.content_type
.unwrap_or_else(|| "application/octet-stream".to_string());
// Upload the object to Storj S3
// Определяем новый ключ для Storj S3 (например, сохраняем в корне с тем же именем)
let new_key = Path::new(&key)
.file_name()
.and_then(|name| name.to_str())
.unwrap_or(&key)
.to_string();
// Загружаем объект в Storj S3
let storj_url = upload_to_s3(
storj_client,
storj_bucket,
&key,
&app_state.s3_client,
&app_state.s3_bucket,
&new_key,
body.into_bytes().to_vec(),
&content_type,
)
.await?;
// Сохраняем маппинг старого пути на новый
app_state.save_path_mapping(&key, &new_key).await?;
println!("Uploaded {} to Storj at {}", key, storj_url);
}
}
@ -189,129 +325,160 @@ async fn upload_files_from_aws(
Ok(())
}
// Структура для десериализации ответа от сервиса аутентификации
#[derive(Deserialize)]
struct AuthResponse {
data: Option<AuthData>,
}
#[derive(Deserialize)]
struct AuthData {
validate_jwt_token: Option<ValidateJWTToken>,
}
#[derive(Deserialize)]
struct ValidateJWTToken {
is_valid: bool,
claims: Option<Claims>,
}
#[derive(Deserialize)]
struct Claims {
sub: Option<String>,
}
pub async fn get_id_by_token(token: &str) -> Result<String, Box<dyn Error>> {
let auth_api_base = env::var("AUTH_URL")?;
let query_name = "validate_jwt_token";
let operation = "ValidateToken";
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
let mut variables = HashMap::<String, HashMap<String, String>>::new();
let mut params = HashMap::<String, String>::new();
params.insert("token".to_string(), token.to_string());
params.insert("token_type".to_string(), "access_token".to_string());
variables.insert("params".to_string(), params);
let gql = json!({
"query": format!("query {}($params: ValidateJWTTokenInput!) {{ {}(params: $params) {{ is_valid claims }} }}", operation, query_name),
"operationName": operation,
"variables": variables
});
let client = HTTPClient::new();
let response = client
.post(&auth_api_base)
.headers(headers)
.json(&gql)
.send()
.await?;
if response.status().is_success() {
let auth_response: AuthResponse = response.json().await?;
if let Some(auth_data) = auth_response.data {
if let Some(validate_jwt_token) = auth_data.validate_jwt_token {
if validate_jwt_token.is_valid {
if let Some(claims) = validate_jwt_token.claims {
if let Some(sub) = claims.sub {
return Ok(sub);
}
}
}
}
}
Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"Invalid token response",
)))
} else {
Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
format!("Request failed with status: {}", response.status()),
)))
}
}
/// Обработчик прокси-запросов.
async fn proxy_handler(
req: HttpRequest,
path: web::Path<String>,
state: web::Data<AppState>,
) -> Result<HttpResponse, actix_web::Error> {
// Получаем токен из заголовка авторизации
let token = req
.headers()
.get("Authorization")
.and_then(|header_value| header_value.to_str().ok());
if token.is_none() {
return Err(ErrorUnauthorized("Unauthorized"));
return Err(ErrorUnauthorized("Unauthorized")); // Если токен отсутствует, возвращаем ошибку
}
let user_id = token.unwrap(); // Assuming the token is the user ID
let user_id = get_id_by_token(token.unwrap()).await?;
let requested_path = path.into_inner(); // Полученный путь из запроса
let file_path = path.into_inner();
let mime_type = MimeGuess::from_path(&file_path).first_or_octet_stream();
let extension = Path::new(&file_path)
.extension()
.and_then(|ext| ext.to_str())
.unwrap_or("bin");
if mime_type.type_() == "image" {
let image = image::open(&file_path)
.map_err(|_| ErrorInternalServerError("Failed to open image"))?;
// Define thumbnail sizes
let thumbnail_sizes = vec![40, 110, 300, 600, 800];
for width in thumbnail_sizes {
let thumbnail_key = format!("{}_{}.jpg", file_path, width);
let thumbnail_data = generate_thumbnail(&image, width).await?;
// Check if thumbnail already exists
if !check_file_exists(&state.s3_client, &state.s3_bucket, &thumbnail_key).await? {
upload_to_s3(
&state.s3_client,
&state.s3_bucket,
&thumbnail_key,
thumbnail_data,
"image/jpeg",
)
.await?;
}
}
// Prepare original image data
let mut original_buffer = Vec::new();
image
.write_to(
&mut Cursor::new(&mut original_buffer),
image::ImageFormat::Jpeg,
)
.map_err(|_| ErrorInternalServerError("Failed to read image data"))?;
// Upload the original image
let image_key = format!("{}.{}", file_path, extension);
let image_url = upload_to_s3(
&state.s3_client,
&state.s3_bucket,
&image_key,
original_buffer.clone(),
mime_type.essence_str(),
)
.await?;
// Update quota and save filename
check_and_update_quota(
&mut state.redis.clone(),
user_id,
original_buffer.len() as u64,
)
.await?;
save_filename_in_redis(&mut state.redis.clone(), user_id, &image_key).await?;
return Ok(
HttpResponse::Ok().body(format!("Image and thumbnails uploaded to: {}", image_url))
);
// Проверяем, есть ли маппинг для старого пути
if let Some(new_path) = state.get_new_path(&requested_path).await? {
// Используем новый путь для доступа к файлу
return serve_file(&new_path, &state).await;
}
// Handle non-image files
let file_data =
std::fs::read(&file_path).map_err(|_| ErrorInternalServerError("Failed to read file"))?;
let file_size = file_data.len() as u64;
// Если маппинга нет, предполагаем, что путь является новым
serve_file(&requested_path, &state).await
}
// Check and update the user's quota
check_and_update_quota(&mut state.redis.clone(), user_id, file_size).await?;
/// Функция для обслуживания файла по заданному пути.
async fn serve_file(file_key: &str, state: &AppState) -> Result<HttpResponse, actix_web::Error> {
// Проверяем наличие файла в Storj S3
if !check_file_exists(&state.s3_client, &state.s3_bucket, file_key).await? {
return Err(ErrorInternalServerError("File not found in S3"));
}
// Upload the file
let file_key = format!("{}.{}", file_path, extension);
let file_url = upload_to_s3(
&state.s3_client,
&state.s3_bucket,
&file_key,
file_data,
mime_type.essence_str(),
)
.await?;
// Получаем объект из Storj S3
let get_object_output = state
.s3_client
.get_object()
.bucket(&state.s3_bucket)
.key(file_key)
.send()
.await
.map_err(|_| ErrorInternalServerError("Failed to get object from S3"))?;
// Save the filename in Redis for this user
save_filename_in_redis(&mut state.redis.clone(), user_id, &file_key).await?;
Ok(HttpResponse::Ok().body(format!("File uploaded to: {}", file_url)))
let data = get_object_output
.body
.collect()
.await
.map_err(|_| ErrorInternalServerError("Failed to read object body"))?;
let mime_type = MimeGuess::from_path(file_key).first_or_octet_stream(); // Определяем MIME-тип файла
Ok(HttpResponse::Ok()
.content_type(mime_type.as_ref())
.body(data.into_bytes()))
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
// Инициализируем состояние приложения
let app_state = AppState::new().await;
// Example of uploading files from AWS S3 to Storj
upload_files_from_aws(
&app_state.s3_client,
&app_state.aws_bucket,
&app_state.s3_client,
&app_state.s3_bucket,
)
.await
.expect("Failed to upload files from AWS to Storj");
let app_state_clone = app_state.clone();
tokio::spawn(async move {
// Запускаем задачу обновления списка файлов в фоне
app_state_clone.refresh_file_list_periodically().await;
});
// Загружаем файлы из AWS S3 в Storj S3 и сохраняем маппинг путей
upload_files_from_aws(&app_state)
.await
.expect("Failed to upload files from AWS to Storj");
// Запускаем HTTP сервер
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(app_state.clone()))
.wrap(Logger::default())
.route("/{path:.*}", web::get().to(proxy_handler))
.route("/{path:.*}", web::get().to(proxy_handler)) // Маршрутизация всех GET запросов на proxy_handler
})
.bind("127.0.0.1:8080")?
.run()