Event Driven Shared Drive in AWS
Deepak Poudel
Posted on November 15, 2024
This blog outlines the architecture and setup for Network File Sharing, and Event-Driven Processing using AWS services. The primary components include an Amazon EC2 instance configured as an FTP server with Amazon EFS mounted for shared storage, network file sharing enabled via SAMBA, and event-driven processing handled through AWS Lambda functions triggered by AWS EventBridge. The objective is to create a scalable, secure, and automated environment for file storage, sharing, and processing.
Fig: AWS Architecture Diagram
Create a EC2 instance and mount a NFS Drive
**Mount EFS on the EC2 Instance**
-
SSH into the EC2 instance and install NFS utilities:
`sudo yum install -y amazon-efs-utils`
-
Create a directory for mounting the EFS:
`sudo yum install -y amazon-efs-utils`
-
Mount the EFS using the file system ID:
`sudo mount -t efs -o tls fs-XXXXXXXX:/ /mnt/efs`
-
Add an entry to /etc/fstab to ensure EFS is remounted on reboot:
`echo "fs-XXXXXXXX:/ /mnt/efs efs _netdev,tls 0 0" | sudo tee -a /etc/fstab`
-
Check if the EFS is successfully mounted:
`df -h`
Then Setup Samba so that windows devices can directly add network drive
-
Install SAMBA on the EC2 Instance:
sudo yum install -y samba samba-client samba-common
-
Backup the default SAMBA configuration file:
sudo cp /etc/samba/smb.conf /etc/samba/smb.conf.bak
-
Edit the SAMBA Configuration:
sudo nano /etc/samba/smb.conf
The following settings are configured under the [global] section:
[global]
workgroup = WORKGROUP
server string = Samba Server
netbios name = ftp-server
security = user
map to guest = bad user
Following share configuration is added to allow Windows clients to access the EFS directory:
[EFS_Share]
path = /mnt/efs
browseable = yes
writable = yes
guest ok = yes
create mask = 0755
directory mask = 0755
-
Start the SAMBA services to apply the configuration:
sudo systemctl start smb sudo systemctl start nmb
-
Enable SAMBA services to start on boot:
sudo systemctl enable smb
Now when the file is uploaded to the FTP server or on via Samba, We need a REPL that checks for changes and sends them for processing \
\
#!/bin/bash
# Set variables
SOURCE_DIR="/mnt/efs/fs1"
S3_BUCKET="s3://backup-efs-ftp-bucketffa/"
LOG_FILE="/home/ec2-user/upload_to_s3.log"
DEBOUNCE_DELAY=30 # Delay in seconds for file stability check
# Function to log messages
log_message() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" | tee -a "$LOG_FILE"
}
# Function to check if file size is stable and not locked
is_file_stable() {
local file="$1"
local prev_size=$(stat -c%s "$file")
log_message "Initial size of '$file': $prev_size bytes"
# Sleep for the debounce delay
sleep "$DEBOUNCE_DELAY"
local new_size=$(stat -c%s "$file")
log_message "Size of '$file' after sleep: $new_size bytes"
# Check if the file size is stable
if [ "$prev_size" -eq "$new_size" ]; then
log_message "Size of '$file' after sleep didn't changed."
# Now check if the file is locked
if lsof "$file" &>/dev/null; then
log_message "File '$file' is locked after stability check."
return 1 # File is locked
else
log_message "File '$file' is stable and not locked."
return 0 # File is stable and not locked
fi
else
log_message "File '$file' size changed during stability check."
return 1 # File is still changing
fi
}
# Function to upload file to S3
upload_to_s3() {
local file="$1"
local full_path="$SOURCE_DIR/$file"
# Check if the file exists
if [ ! -f "$full_path" ]; then
log_message "File '$full_path' does not exist. Skipping upload."
return
fi
# Ensure the file size is stable and not locked
if ! is_file_stable "$full_path"; then
log_message "File '$full_path' is still changing or locked. Delaying processing."
return
fi
# Create destination path for S3
local s3_path="${S3_BUCKET}${file}"
# Upload file to S3
log_message "Attempting to upload '$full_path' to S3 path '$s3_path'..."
if aws s3 cp "$full_path" "$s3_path" --acl bucket-owner-full-control; then
log_message "Successfully uploaded '$file' to S3"
else
log_message "Failed to upload '$file' to S3. Error code: $?"
fi
}
# Main loop to monitor directory recursively
log_message "Starting to monitor '$SOURCE_DIR' for new files..."
inotifywait -m -r --format '%w%f' -e close_write -e moved_to "$SOURCE_DIR" |
while read -r full_path; do
# Clean up the filename to remove unwanted characters
clean_filename=$(basename "$full_path")
# Debugging information
echo "Detected full path: '$full_path'"
echo "Cleaned filename: '$clean_filename'"
# Log detected file
log_message "Detected new file: '$full_path'"
# Ignore temporary or partial files
if [[ "$clean_filename" != .* ]] && [[ "$clean_filename" != *.part ]] && [[ "$clean_filename" != *.tmp ]]; then
# Wait for the debounce delay before uploading
if is_file_stable "$full_path"; then
upload_to_s3 "${full_path#$SOURCE_DIR/}" # Remove SOURCE_DIR from the path
else
log_message "File '$full_path' is still locked or changing. Ignoring this upload attempt."
fi
else
log_message "Ignoring temporary or partial file: '$full_path'"
fi
done
Now Let’s Move Forward to Event Driven Architecture.
In our case let’s unzip uploaded files if they are zipped.
This is the code for our triggering AWS Lambda for each file upload. If the uploaded file is
import json
import boto3
from handler_run_task import run_ecs_task
s3 = boto3.client('s3')
def lambda_handler(event, context):
try:
# Get bucket name and object key from the event
source_bucket = event['Records'][0]['s3']['bucket']['name']
object_key = event['Records'][0]['s3']['object']['key']
# Define ECS cluster and task details
cluster_name = 'unzip-test-cluster' # Replace with your ECS cluster name
task_family = 'ple-family' # Replace with your ECS task family
container_name = 'test-container' # Replace with your container name
# Define the overrides for the ECS task
overrides = {
'environment': [
{
"name": "BUCKET_NAME",
"value": source_bucket
},
{
"name": "KEY",
"value": object_key
}
]
}
# Run ECS Task
ecs_response = run_ecs_task(
cluster_name,
task_family,
container_name,
overrides,
source_bucket,
object_key
)
return {
'statusCode': 200,
'body': json.dumps('ECS task triggered successfully!')
}
except Exception as e:
print(f"Error triggering ECS task: {e}")
return {
'statusCode': 500,
'body': json.dumps(f"Error triggering ECS task: {str(e)}")
}
**Let’s Create a handler when the previous lambda gives, json with files to process. The json will contain keys and bucket from which to unzip and the image to unzip the files**
ECS Task is created by the lambda function and is able to handle the unzipping of files. If an unzipped file is found, the ECS task unzips the file and uploads the extracted contents back to a target S3 bucket. If the upload fails, the file is routed to a Dead Letter Queue (DLQ). Non-ZIP files are directly copied to the target bucket.
ECS Configuration:
- Cluster Name: unzip-test-cluster
- Task Family: ple-family
- Container: test-container
- Launch Type: Fargate
Task Execution:
- Retrieves the latest task definition for the given family.
- Executes the task with environment variables passed by the Lambda function.
Handler_run_task.py
import boto3
def run_ecs_task(cluster_name, task_family, container_name, overrides, source_bucket, object_key):
ecs_client = boto3.client('ecs')
# Get the latest task definition for the given family
response = ecs_client.list_task_definitions(
familyPrefix=task_family,
sort='DESC',
maxResults=1
)
latest_task_definition = response['taskDefinitionArns'][0]
print("Printing Latest task def")
print(latest_task_definition)
# Run the ECS task with the latest task definition
response = ecs_client.run_task(
cluster=cluster_name,
taskDefinition=latest_task_definition,
overrides={
'containerOverrides': [
{
'name': container_name,
# 'cpu': overrides.get('cpu', 512), # Default CPU to 512
# 'memory': overrides.get('memory', 1024), # Default memory to 1024 MiB
'environment': overrides.get('environment', [])
}
]
},
networkConfiguration={
'awsvpcConfiguration': {
'subnets': ['subnet-089f9162bd2913570', 'subnet-05591da28974513ee', 'subnet-0732585a95fcd1b64'], # Replace with your subnet ID
'assignPublicIp': 'ENABLED' # or 'DISABLED' depending on your network setup
}
},
launchType='FARGATE', # Or 'EC2', depending on your setup
count=1
)
return response
**Now let’s create a service linked role.**
An IAM role is created to grant the Lambda function and ECS tasks the necessary permissions to access other AWS resources, such as S3, ECS, and CloudWatch Logs.
-
Key Policies:
- Log creation and event publishing to CloudWatch
- S3 bucket access (GetObject, ListBucket, PutObject)
- ECS task execution and description
-
IAM role passing
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws:logs:<aws_region>:<account_id>:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:<aws_region>:<account_id>:log-group:/aws/lambda/<lambda_function_name>:*" }, { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::<source_bucket_name>", "arn:aws:s3:::<source_bucket_name>/*" ] }, { "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::<target_bucket_name>/*" }, { "Effect": "Allow", "Action": "iam:PassRole", "Resource": "arn:aws:iam::<account_id>:role/*" }, { "Effect": "Allow", "Action": [ "ecs:RegisterTaskDefinition", "ecs:DescribeTaskDefinition" ], "Resource": "*" } ] }
**Dead-letter Queue (DLQ) Mechanism**
The DLQ captures and logs events where the ECS task fails to process the uploaded file correctly. This mechanism ensures that errors are captured and stored for subsequent analysis or reprocessing.
Failure Handling
- Condition: A failure is identified if the ECS task returns an HTTP status code other than 200.
- DLQ Logging: The failed event, including file details and error messages, is sent to the DLQ. The DLQ serves as a reliable storage for these failed events, ensuring no data is lost.
Now let’s create an ecs image \
\
Here goes the unzip file \
\
#!/bin/bash
# Environment variables
SRC_BUCKET_NAME="$BUCKET_NAME"
SRC_BUCKET_KEY="$KEY"
DEST_BUCKET_NAME="backup-efs-ftp-ple-unzipped"
DLQ_URL="https://sqs.us-east-1.amazonaws.com/654654543848/unzip-failed-processing-queue"
OUTPUT_DIR="./output"
LOCAL_FILE_PATH="./$(basename "$SRC_BUCKET_KEY")"
# Function to log messages
log_message() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1"
}
# Function to download a file from S3
download_file() {
log_message "Downloading $SRC_BUCKET_KEY from $SRC_BUCKET_NAME"
aws s3 cp "s3://$SRC_BUCKET_NAME/$SRC_BUCKET_KEY" "$LOCAL_FILE_PATH"
if [ $? -ne 0 ]; then
log_message "Error downloading file from S3"
send_to_dlq "Error downloading file from S3"
exit 1
fi
}
# Function to upload a file to S3
upload_to_s3() {
local file_path="$1"
local s3_key="$2"
log_message "Uploading $file_path to s3://$DEST_BUCKET_NAME/$s3_key"
aws s3 cp "$file_path" "s3://$DEST_BUCKET_NAME/$s3_key" --acl bucket-owner-full-control
if [ $? -ne 0 ]; then
log_message "Failed to upload $file_path to S3"
send_to_dlq "Failed to upload $file_path to S3"
exit 1
fi
invoke_load_balancer "$s3_key"
}
# Function to send a message to the DLQ
send_to_dlq() {
local message="$1"
log_message "Sending message to DLQ: $message"
aws sqs send-message --queue-url "$DLQ_URL" --message-body "$message"
if [ $? -ne 0 ]; then
log_message "Failed to send message to DLQ"
exit 1
fi
}
# Function to invoke load balancer
invoke_load_balancer() {
local s3_key="$1"
log_message "Invoking load balancer for $s3_key"
local payload=$(jq -n \
--arg bucket "$DEST_BUCKET_NAME" \
--arg key "$s3_key" \
'{bucket: $bucket, key: $key, filePath: $key}')
local response=$(curl -s -X POST "https://asdlb.mydomain.com/companyx/gateway/listenFTPWebhook" \
-H "Content-Type: application/json" \
-d "$payload")
local status_code=$(echo "$response" | jq -r '.status')
if [ "$status_code" != "200" ]; then
log_message "Load balancer invocation failed with status code $status_code"
send_to_dlq "Load balancer invocation failed with status code $status_code"
else
log_message "Load balancer invocation successful"
fi
}
# Function to extract ZIP files
extract_and_process_files() {
log_message "Extracting ZIP file $LOCAL_FILE_PATH"
mkdir -p "$OUTPUT_DIR"
unzip -o "$LOCAL_FILE_PATH" -d "$OUTPUT_DIR"
if [ $? -ne 0 ]; then
log_message "Failed to extract ZIP file"
send_to_dlq "Failed to extract ZIP file"
exit 1
fi
for file in "$OUTPUT_DIR"/*; do
local s3_key=$(dirname "$SRC_BUCKET_KEY")/$(basename "$file")
log_message "Processing file $file"
upload_to_s3 "$file" "$s3_key"
done
}
# Main process
main() {
log_message "Starting processing for $SRC_BUCKET_KEY from $SRC_BUCKET_NAME"
download_file
if [[ "$LOCAL_FILE_PATH" == *.zip ]]; then
extract_and_process_files
else
local s3_key=$(dirname "$SRC_BUCKET_KEY")/$(basename "$LOCAL_FILE_PATH")
upload_to_s3 "$LOCAL_FILE_PATH" "$s3_key"
fi
log_message "Processing complete"
}
main
And here’s the Docker File for the same \
\
# Use the official lightweight Debian image as the base
FROM debian:bookworm-slim
# Set the working directory
WORKDIR /usr/src/app
# Set non-interactive mode to avoid prompts during package installation
ENV DEBIAN_FRONTEND=noninteractive
# Install necessary tools with cache cleanup
RUN apt-get update && \
apt-get install -y --no-install-recommends \
bash \
curl \
unzip \
awscli \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# Copy your shell script into the container
COPY unzip.sh /usr/src/app/
# Make the shell script executable
RUN chmod +x /usr/src/app/unzip.sh
# Set environment variables (can be overridden at runtime)
ARG BUCKET_NAME
ARG KEY
# Command to run your shell script
CMD ["/usr/src/app/unzip.sh"]
Build and Push the Docker Image to ECR and update the code.
Conclusion
We have Shared Samba Server for windows users with Event Driven on demand unzip functionality when a user uploads file to the Drive.
aws #awscommunitybuuilder #awscommunitynepal #communitybuilder
Posted on November 15, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
November 27, 2024