Do whatever you want with your files, and do it quickly
Anton
Posted on November 13, 2023
File processing is a very common task in software development and data management. During my engineering career, I’ve been mostly building software that works on the servers. And if we are talking about the files, let me tell you a little about what I saw.
There was so much boilerplate code that checks the uploaded files and proxies them to S3 storage. The same is true about media transforming/transcoding that you do yourself. The first you do is find some lib that wraps ImageMagick, Vips, or FFmpeg. And then you write some code that supplies the lib with the parameters and the processable input. Another type of task you may see is finding some files, doing some basic analysis, and based on its results moving/removing/archiving the files. For some of such tasks, there are one-liners that you paste into your terminal, there may be specific tools/utils. If not, you probably end up writing a bash/python script that does what you need.
This is true that in some cases a custom solution is required. You may want the highest level of customization. It can be a business requirement or compliance. Or you just want to write and support it yourself which is sometimes totally fine too. But what if none of the above is your case?
If what you want is a set of operations that you can run your files through, perhaps, there’s a solution for you. It’s open source, and I recently decided that it’s okay to push the v1.0.0 tag into its repository. So let me show what it is I’m working on.
File processing pipeline
I called it Capyfile. This is a file-processing pipeline that you can construct yourself. A nice bonus is that it can process the files concurrently. Its source code is available on GitHub:
https://github.com/capyfile/capyfile
The overall architecture is quite simple. To start working with it you should know about two things.
Pipeline configuration file
First, you need a configuration file that defines your pipelines. The main purpose of this file is to group your pipelines and configure the operations that belong to them.
Besides the operations, there are two more entities that the configuration file consists of. Their main purpose for now is to group the pipelines. Service entity is used to group the processors. Processor entity is used to group the operations. So you can refer to your pipeline with a composite ID that is {Service}:{Processor}
or {Service}/{Processor}
. How you name these is up to you. For example, you want a config file for the pipeline that you use for image processing. It can be those: images:compress
, images:transform
, images:archive
, etc.
There are a few things that you want to know about the operations. One pipeline can consist of many operations and you can put them in any order that makes sense to you. Here's the list of operations available at this moment:
-
http_multipart_form_input_read
- read the files from the HTTP request body asmultipart/form-data
-
http_octet_stream_input_read
- read the files from the HTTP request body asapplication/octet-stream
-
filesystem_input_read
- read the files from the filesystem -
filesystem_input_write
- write the files to the filesystem -
filesystem_input_remove
- remove the files from the filesystem -
file_size_validate
- check file size -
file_type_validate
- check file MIME type -
file_time_validate
- check file time stat -
axiftool_metadata_cleanup
- clear file metadata if possible (require exiftool) -
image_convert
- convert image to another format (require libvips) -
s3_upload
- upload file to S3-compatible storage
Also, every operation has such thing as a file target policy. It defines what files the operation should process. The targetFiles
parameter can have those values:
-
without_errors
(default) - all files that are passed to the operation except files that have errors -
with_errors
- all files that are passed to the operation that have errors -
all
- all files that are passed to the operation
And, of course, the operations must be configured. Configuration values for the operations can be retrieved from the following sources:
-
value
- parameter value will be retrieved directly from the configuration file -
env_var
- parameter value will be retrieved from the environment variable -
secret
- parameter value will be retrieved from the secret (docker secret) -
file
- parameter value will be retrieved from the file -
http_get
- parameter value will be retrieved from the HTTP GET parameter -
http_post
- parameter value will be retrieved from the HTTP POST parameter -
http_header
- parameter value will be retrieved from the HTTP header -
etcd
- parameter value will be retrieved from the etcd key-value store
Now when you know all these, you are ready to write your own configuration file. So let's write a couple configuration files. The format that it accepts right now is JSON and YAML.
Avatar upload example
Let's say you are building a messenger and want functionality that allows your users to upload their avatar photos. Here's what the configuration file for your pipeline may look like:
---
version: '1.1'
name: avatars
processors:
- name: upload
operations:
- name: http_multipart_form_data_input_read
- name: file_size_validate
params:
maxFileSize:
sourceType: value
source: 10485760
- name: file_type_validate
params:
allowedMimeTypes:
sourceType: value
source:
- image/jpeg
- image/png
- image/heif
- name: image_convert
params:
toMimeType:
sourceType: value
source: image/jpeg
quality:
sourceType: value
source: high
- name: exiftool_metadata_cleanup
- name: s3_upload
params:
accessKeyId:
sourceType: secret
source: aws_access_key_id
secretAccessKey:
sourceType: secret
source: aws_secret_access_key
endpoint:
sourceType: etcd
source: "/services/messenger/aws_endpoint"
region:
sourceType: etcd
source: "/services/messenger/aws_region"
bucket:
sourceType: env_var
source: AWS_AVATARS_BUCKET
You can see how this pipeline processes the files step by step:
- Retrieves the input from the HTTP request
- Ensures that the file size is less than 10MB
- Ensures that the file format is either png, jpg, or heif
- If necessary, converts the image to jpg
- Strips the image metadata
- Uploads the image to S3 storage
Log archive example
Let's say you have a bunch of log files and you want to achieve the ones that are older than 1 month. Here's the configuration file you may use for this:
---
version: '1.1'
name: logs
processors:
- name: archive
operations:
- name: filesystem_input_read
params:
target:
sourceType: value
source: "/var/log/rotated-logs/*"
- name: file_time_validate
params:
maxMtime:
sourceType: env_var
source: MAX_LOG_FILE_AGE_RFC3339
- name: s3_upload
targetFiles: without_errors
params:
accessKeyId:
sourceType: secret
source: aws_access_key_id
secretAccessKey:
sourceType: secret
source: aws_secret_access_key
endpoint:
sourceType: value
source: s3.amazonaws.com
region:
sourceType: value
source: us-east-1
bucket:
sourceType: env_var
source: AWS_LOGS_BUCKET
- name: filesystem_input_remove
targetFiles: without_errors
From the config, you can see what it does:
- Reads the log files from the filesystem
- Checks max file mtime
- Uploads the files where mtime is older than 1 month ago to S3 storage
- Removes the files where mtime is older than 1 month ago
Pipeline runner
Now when we know how to configure the pipeline, we want to run it, right? Here you have two options:
- via
capycmd
command line application - via
capysvr
http server
Avatar upload example
Remember that avatar upload example? You probably already see that capysvr is the way to go here. So let's run it.
The file called service-definition.yml
contains our pipeline configuration. And we can use the capysvr Docker image to run it:
docker run \
--name capyfile_server \
--mount type=bind,source=./service-definition.yml,target=/etc/capyfile/service-definition.yml \
--env CAPYFILE_SERVICE_DEFINITION_FILE=/etc/capyfile/service-definition.yml \
--env AWS_AVATARS_BUCKET=avatars \
--secret aws_access_key_id \
--secret aws_secret_access_key \
-p 8024:80 \
capyfile/capysvr:latest
And if you want to load parameters from etcd, you can provide the etcd connection parameters via environment variables:
ETCD_ENDPOINTS=["etcd1:2379","etcd2:22379","etcd3:32379"]
ETCD_USERNAME=etcd_user
ETCD_PASSWORD=etcd_password
Now it is ready to accept and process the files.
curl -F "file1=@$HOME/Pictures/avatar.png" http://127.0.0.1:80/avatars/upload
The response you can expect looks like this:
{
"status": "SUCCESS",
"code": "SUCCESS",
"message": "successfully uploaded 1 file(s)",
"files": [
{
"url": "https://avatars.storage.example.com/avatars/abcdKDNJW_DDWse.jpg",
"filename": "abcdKDNJW_DDWse.jpg",
"originalFilename": "avatar.png",
"mime": "image/jpeg",
"size": 5892728,
"status": "SUCCESS",
"code": "FILE_SUCCESSFULLY_UPLOADED",
"message": "file successfully uploaded"
},
],
"errors": [],
"meta": {
"totalUploads": 1,
"successfulUploads": 1,
"failedUploads": 0
}
}
Log archive example
To run our log archiver we can use capycmd command line application. Here's how you can do it with Docker:
docker run \
--name capyfile_server \
--mount type=bind,source=./service-definition.yml,target=/etc/capyfile/service-definition.yml \
--mount type=bind,source=/var/log/rotated-logs,target=/var/log/rotated-logs \
--env CAPYFILE_SERVICE_DEFINITION_FILE=/etc/capyfile/service-definition.yml \
--env MAX_LOG_FILE_AGE_RFC3339=$(date -d "30 days ago" -u +"%Y-%m-%dT%H:%M:%SZ") \
--env AWS_LOGS_BUCKET=logs \
--secret aws_access_key_id \
--secret aws_secret_access_key \
capyfile/capycmd:latest logs:archive
Right now the app throws out some output that does not look very nice. But it can tell you what is happening and what the status of each file is. Remember, that it processes the files concurrently, so the output will have weird order. So you will see something like this:
Running logs:archive service processor...
[/var/log/rotated-logs/access-2023-08-27.log] filesystem_input_read FINISHED file read finished
[/var/log/rotated-logs/access-2023-08-28.log] filesystem_input_read FINISHED file read finished
[/var/log/rotated-logs/access-2023-09-27.log] filesystem_input_read FINISHED file read finished
[/var/log/rotated-logs/access-2023-09-28.log] filesystem_input_read FINISHED file read finished
[/var/log/rotated-logs/access-2023-09-29.log] filesystem_input_read FINISHED file read finished
[/var/log/rotated-logs/access-2023-08-28.log] file_time_validate STARTED file time validation started
[/var/log/rotated-logs/access-2023-08-28.log] file_time_validate FINISHED file time is valid
[/var/log/rotated-logs/access-2023-08-27.log] file_time_validate STARTED file time validation started
[/var/log/rotated-logs/access-2023-09-27.log] file_time_validate STARTED file time validation started
[/var/log/rotated-logs/access-2023-08-27.log] file_time_validate FINISHED file time is valid
[/var/log/rotated-logs/access-2023-09-27.log] file_time_validate FINISHED file mtime is too new
[/var/log/rotated-logs/access-2023-09-29.log] file_time_validate STARTED file time validation started
[/var/log/rotated-logs/access-2023-09-27.log] s3_upload SKIPPED skipped due to "without_errors" target files policy
[/var/log/rotated-logs/access-2023-09-28.log] file_time_validate STARTED file time validation started
[/var/log/rotated-logs/access-2023-09-29.log] file_time_validate FINISHED file mtime is too new
[/var/log/rotated-logs/access-2023-08-28.log] s3_upload STARTED S3 file upload has started
[/var/log/rotated-logs/access-2023-09-28.log] file_time_validate FINISHED file mtime is too new
[/var/log/rotated-logs/access-2023-09-29.log] s3_upload SKIPPED skipped due to "without_errors" target files policy
[/var/log/rotated-logs/access-2023-08-27.log] s3_upload STARTED S3 file upload has started
[/var/log/rotated-logs/access-2023-09-27.log] filesystem_input_remove SKIPPED skipped due to "without_errors" target files policy
[/var/log/rotated-logs/access-2023-09-29.log] filesystem_input_remove SKIPPED skipped due to "without_errors" target files policy
[/var/log/rotated-logs/access-2023-09-28.log] s3_upload SKIPPED skipped due to "without_errors" target files policy
[/var/log/rotated-logs/access-2023-09-28.log] filesystem_input_remove SKIPPED skipped due to "without_errors" target files policy
[/var/log/rotated-logs/access-2023-08-27.log] s3_upload FINISHED S3 file upload has finished
[/var/log/rotated-logs/access-2023-08-28.log] s3_upload FINISHED S3 file upload has finished
[/var/log/rotated-logs/access-2023-08-27.log] filesystem_input_remove STARTED file remove started
[/var/log/rotated-logs/access-2023-08-27.log] filesystem_input_remove FINISHED file remove finished
[/var/log/rotated-logs/access-2023-08-28.log] filesystem_input_remove STARTED file remove started
[/var/log/rotated-logs/access-2023-08-28.log] filesystem_input_remove FINISHED file remove finished
...
In the end
Capyfile is quite a new project and there is still a lot to do. Nevertheless, it is designed to give you as much flexibility as possible when it comes to building and configuring your pipeline, so it already quite successfully can cover some file processing routines.
My next plan is to implement a couple of new operations, such as an operation with which you can run external commands and an operation for some basic video transcoding (will probably use FFmpeg as the backend). Another thing that I think is missing is the worker that will run the pipeline every N amount of time.
If you think this is something useful, you are welcome to contribute. Your feedback, suggestions, ideas, and PRs – this is something I really appreciate!
Capyfile is maintained on GitHub. View the source, contribute, or report issues at:
Posted on November 13, 2023
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.