upload_files_from_aws
Some checks failed
deploy / deploy (push) Failing after 3s

This commit is contained in:
Untone 2024-08-30 22:24:47 +03:00
parent e01e6f0e33
commit d20f19c2e0

View File

@ -4,12 +4,12 @@ use actix_web::{
web, App, HttpRequest, HttpResponse, HttpServer, Result, web, App, HttpRequest, HttpResponse, HttpServer, Result,
}; };
use aws_config::BehaviorVersion; use aws_config::BehaviorVersion;
use aws_sdk_s3::{config::Credentials, error::SdkError, Client as S3Client};
use aws_sdk_s3::primitives::ByteStream; use aws_sdk_s3::primitives::ByteStream;
use image::{DynamicImage, imageops::FilterType}; use aws_sdk_s3::{config::Credentials, error::SdkError, Client as S3Client};
use image::{imageops::FilterType, DynamicImage};
use mime_guess::MimeGuess; use mime_guess::MimeGuess;
use redis::{aio::MultiplexedConnection, AsyncCommands};
use redis::Client as RedisClient; use redis::Client as RedisClient;
use redis::{aio::MultiplexedConnection, AsyncCommands};
use std::env; use std::env;
use std::io::Cursor; use std::io::Cursor;
use std::path::Path; use std::path::Path;
@ -21,20 +21,23 @@ struct AppState {
redis: MultiplexedConnection, redis: MultiplexedConnection,
s3_client: S3Client, s3_client: S3Client,
s3_bucket: String, s3_bucket: String,
cdn_domain: String, aws_bucket: String,
} }
impl AppState { impl AppState {
async fn new() -> Self { async fn new() -> Self {
let redis_url = env::var("REDIS_URL").expect("REDIS_URL must be set"); let redis_url = env::var("REDIS_URL").expect("REDIS_URL must be set");
let redis_client = RedisClient::open(redis_url).expect("Invalid Redis URL"); let redis_client = RedisClient::open(redis_url).expect("Invalid Redis URL");
let redis_connection = redis_client.get_multiplexed_async_connection().await.unwrap(); let redis_connection = redis_client
.get_multiplexed_async_connection()
.await
.unwrap();
let s3_access_key = env::var("STORJ_ACCESS_KEY").expect("STORJ_ACCESS_KEY must be set"); let s3_access_key = env::var("STORJ_ACCESS_KEY").expect("STORJ_ACCESS_KEY must be set");
let s3_secret_key = env::var("STORJ_SECRET_KEY").expect("STORJ_SECRET_KEY must be set"); let s3_secret_key = env::var("STORJ_SECRET_KEY").expect("STORJ_SECRET_KEY must be set");
let s3_endpoint = env::var("STORJ_END_POINT").expect("STORJ_END_POINT must be set"); let s3_endpoint = env::var("STORJ_END_POINT").expect("STORJ_END_POINT must be set");
let s3_bucket = env::var("STORJ_BUCKET_NAME").expect("STORJ_BUCKET_NAME must be set"); let s3_bucket = env::var("STORJ_BUCKET_NAME").expect("STORJ_BUCKET_NAME must be set");
let cdn_domain = env::var("CDN_DOMAIN").expect("CDN_DOMAIN must be set"); let aws_bucket = env::var("AWS_BUCKET_NAME").expect("AWS_BUCKET_NAME must be set");
let config = aws_config::defaults(BehaviorVersion::latest()) let config = aws_config::defaults(BehaviorVersion::latest())
.region("eu-west-1") .region("eu-west-1")
@ -55,7 +58,7 @@ impl AppState {
redis: redis_connection, redis: redis_connection,
s3_client, s3_client,
s3_bucket, s3_bucket,
cdn_domain, aws_bucket,
} }
} }
} }
@ -77,10 +80,10 @@ async fn upload_to_s3(
key: &str, key: &str,
body: Vec<u8>, body: Vec<u8>,
content_type: &str, content_type: &str,
cdn_domain: &str,
) -> Result<String, actix_web::Error> { ) -> Result<String, actix_web::Error> {
let body_stream = ByteStream::from(body); let body_stream = ByteStream::from(body);
s3_client.put_object() s3_client
.put_object()
.bucket(bucket) .bucket(bucket)
.key(key) .key(key)
.body(body_stream) .body(body_stream)
@ -89,13 +92,19 @@ async fn upload_to_s3(
.await .await
.map_err(|_| ErrorInternalServerError("Failed to upload file to S3"))?; .map_err(|_| ErrorInternalServerError("Failed to upload file to S3"))?;
Ok(format!("{}/{}", cdn_domain, key)) Ok(key.to_string())
} }
async fn check_file_exists(s3_client: &S3Client, bucket: &str, key: &str) -> Result<bool, actix_web::Error> { async fn check_file_exists(
s3_client: &S3Client,
bucket: &str,
key: &str,
) -> Result<bool, actix_web::Error> {
match s3_client.head_object().bucket(bucket).key(key).send().await { match s3_client.head_object().bucket(bucket).key(key).send().await {
Ok(_) => Ok(true), Ok(_) => Ok(true),
Err(SdkError::ServiceError(service_error)) if service_error.err().is_not_found() => Ok(false), Err(SdkError::ServiceError(service_error)) if service_error.err().is_not_found() => {
Ok(false)
}
Err(e) => Err(ErrorInternalServerError(e.to_string())), Err(e) => Err(ErrorInternalServerError(e.to_string())),
} }
} }
@ -109,7 +118,10 @@ async fn check_and_update_quota(
if current_quota + file_size > MAX_QUOTA_BYTES { if current_quota + file_size > MAX_QUOTA_BYTES {
return Err(ErrorUnauthorized("Quota exceeded")); return Err(ErrorUnauthorized("Quota exceeded"));
} }
redis.incr(user_id, file_size).await.map_err(|_| ErrorInternalServerError("Failed to update quota in Redis"))?; redis
.incr(user_id, file_size)
.await
.map_err(|_| ErrorInternalServerError("Failed to update quota in Redis"))?;
Ok(()) Ok(())
} }
@ -118,7 +130,62 @@ async fn save_filename_in_redis(
user_id: &str, user_id: &str,
filename: &str, filename: &str,
) -> Result<(), actix_web::Error> { ) -> Result<(), actix_web::Error> {
redis.sadd(user_id, filename).await.map_err(|_| ErrorInternalServerError("Failed to save filename in Redis"))?; redis
.sadd(user_id, filename)
.await
.map_err(|_| ErrorInternalServerError("Failed to save filename in Redis"))?;
Ok(())
}
async fn upload_files_from_aws(
aws_client: &S3Client,
aws_bucket: &str,
storj_client: &S3Client,
storj_bucket: &str,
) -> Result<(), actix_web::Error> {
let list_objects_v2 = aws_client.list_objects_v2();
let list_response = list_objects_v2
.bucket(aws_bucket)
.send()
.await
.map_err(|_| ErrorInternalServerError("Failed to list files from AWS S3"))?;
if let Some(objects) = list_response.contents {
for object in objects {
if let Some(key) = object.key {
// Get the object from AWS S3
let object_response = aws_client
.get_object()
.bucket(aws_bucket)
.key(&key)
.send()
.await
.map_err(|_| ErrorInternalServerError("Failed to get object from AWS S3"))?;
let body = object_response
.body
.collect()
.await
.map_err(|_| ErrorInternalServerError("Failed to read object body"))?;
let content_type = object_response
.content_type
.unwrap_or_else(|| "application/octet-stream".to_string());
// Upload the object to Storj S3
let storj_url = upload_to_s3(
storj_client,
storj_bucket,
&key,
body.into_bytes().to_vec(),
&content_type,
)
.await?;
println!("Uploaded {} to Storj at {}", key, storj_url);
}
}
}
Ok(()) Ok(())
} }
@ -127,7 +194,10 @@ async fn proxy_handler(
path: web::Path<String>, path: web::Path<String>,
state: web::Data<AppState>, state: web::Data<AppState>,
) -> Result<HttpResponse, actix_web::Error> { ) -> Result<HttpResponse, actix_web::Error> {
let token = req.headers().get("Authorization").and_then(|header_value| header_value.to_str().ok()); let token = req
.headers()
.get("Authorization")
.and_then(|header_value| header_value.to_str().ok());
if token.is_none() { if token.is_none() {
return Err(ErrorUnauthorized("Unauthorized")); return Err(ErrorUnauthorized("Unauthorized"));
} }
@ -136,10 +206,14 @@ async fn proxy_handler(
let file_path = path.into_inner(); let file_path = path.into_inner();
let mime_type = MimeGuess::from_path(&file_path).first_or_octet_stream(); let mime_type = MimeGuess::from_path(&file_path).first_or_octet_stream();
let extension = Path::new(&file_path).extension().and_then(|ext| ext.to_str()).unwrap_or("bin"); let extension = Path::new(&file_path)
.extension()
.and_then(|ext| ext.to_str())
.unwrap_or("bin");
if mime_type.type_() == "image" { if mime_type.type_() == "image" {
let image = image::open(&file_path).map_err(|_| ErrorInternalServerError("Failed to open image"))?; let image = image::open(&file_path)
.map_err(|_| ErrorInternalServerError("Failed to open image"))?;
// Define thumbnail sizes // Define thumbnail sizes
let thumbnail_sizes = vec![40, 110, 300, 600, 800]; let thumbnail_sizes = vec![40, 110, 300, 600, 800];
@ -150,28 +224,54 @@ async fn proxy_handler(
// Check if thumbnail already exists // Check if thumbnail already exists
if !check_file_exists(&state.s3_client, &state.s3_bucket, &thumbnail_key).await? { if !check_file_exists(&state.s3_client, &state.s3_bucket, &thumbnail_key).await? {
upload_to_s3(&state.s3_client, &state.s3_bucket, &thumbnail_key, thumbnail_data, "image/jpeg", &state.cdn_domain).await?; upload_to_s3(
&state.s3_client,
&state.s3_bucket,
&thumbnail_key,
thumbnail_data,
"image/jpeg",
)
.await?;
} }
} }
// Prepare original image data // Prepare original image data
let mut original_buffer = Vec::new(); let mut original_buffer = Vec::new();
image.write_to(&mut Cursor::new(&mut original_buffer), image::ImageFormat::Jpeg) image
.write_to(
&mut Cursor::new(&mut original_buffer),
image::ImageFormat::Jpeg,
)
.map_err(|_| ErrorInternalServerError("Failed to read image data"))?; .map_err(|_| ErrorInternalServerError("Failed to read image data"))?;
// Upload the original image // Upload the original image
let image_key = format!("{}.{}", file_path, extension); let image_key = format!("{}.{}", file_path, extension);
let image_url = upload_to_s3(&state.s3_client, &state.s3_bucket, &image_key, original_buffer.clone(), mime_type.essence_str(), &state.cdn_domain).await?; let image_url = upload_to_s3(
&state.s3_client,
&state.s3_bucket,
&image_key,
original_buffer.clone(),
mime_type.essence_str(),
)
.await?;
// Update quota and save filename // Update quota and save filename
check_and_update_quota(&mut state.redis.clone(), user_id, original_buffer.len() as u64).await?; check_and_update_quota(
&mut state.redis.clone(),
user_id,
original_buffer.len() as u64,
)
.await?;
save_filename_in_redis(&mut state.redis.clone(), user_id, &image_key).await?; save_filename_in_redis(&mut state.redis.clone(), user_id, &image_key).await?;
return Ok(HttpResponse::Ok().body(format!("Image and thumbnails uploaded to: {}", image_url))); return Ok(
HttpResponse::Ok().body(format!("Image and thumbnails uploaded to: {}", image_url))
);
} }
// Handle non-image files // Handle non-image files
let file_data = std::fs::read(&file_path).map_err(|_| ErrorInternalServerError("Failed to read file"))?; let file_data =
std::fs::read(&file_path).map_err(|_| ErrorInternalServerError("Failed to read file"))?;
let file_size = file_data.len() as u64; let file_size = file_data.len() as u64;
// Check and update the user's quota // Check and update the user's quota
@ -179,7 +279,14 @@ async fn proxy_handler(
// Upload the file // Upload the file
let file_key = format!("{}.{}", file_path, extension); let file_key = format!("{}.{}", file_path, extension);
let file_url = upload_to_s3(&state.s3_client, &state.s3_bucket, &file_key, file_data, mime_type.essence_str(), &state.cdn_domain).await?; let file_url = upload_to_s3(
&state.s3_client,
&state.s3_bucket,
&file_key,
file_data,
mime_type.essence_str(),
)
.await?;
// Save the filename in Redis for this user // Save the filename in Redis for this user
save_filename_in_redis(&mut state.redis.clone(), user_id, &file_key).await?; save_filename_in_redis(&mut state.redis.clone(), user_id, &file_key).await?;
@ -190,6 +297,16 @@ async fn proxy_handler(
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
let app_state = AppState::new().await; let app_state = AppState::new().await;
// Example of uploading files from AWS S3 to Storj
upload_files_from_aws(
&app_state.s3_client,
&app_state.aws_bucket,
&app_state.s3_client,
&app_state.s3_bucket,
)
.await
.expect("Failed to upload files from AWS to Storj");
HttpServer::new(move || { HttpServer::new(move || {
App::new() App::new()
.app_data(web::Data::new(app_state.clone())) .app_data(web::Data::new(app_state.clone()))
@ -199,4 +316,4 @@ async fn main() -> std::io::Result<()> {
.bind("127.0.0.1:8080")? .bind("127.0.0.1:8080")?
.run() .run()
.await .await
} }