Mustafa Balila
Posted on January 22, 2022
This article will focus mainly on the golang side of work, so if you're interested please continue reading
The problem:
I'm working at a real state company. We allow our users to upload videos showing their proprieties and we used AWS MediaConvert to handle compressing and transcoding but things got out of hand. With MediaConvert a single minute of video encoding costs $0.03 for each video quality at 720p and above and $0.015 for each video quality below 720p. That way, if you encode at 720p, 480p, and 360p you pay – $0.06 per minute which is $3.6 per hour.
A screenshot from AWS MediaConvert pricing page
When you don't have many videos and the videos themselves aren't really long this is fine, but it'll start to cost more when your user base grows.
The solution:
I used golang and ffmpeg to create a service that allow us to replace MediaConvert. Go allows you to write concurrent code and it's really fast compared to other high programming languages like Python, Java, etc. With this in hand you can build really powerful things.
I've used go-fluent-ffmpeg for ffmpeg integration with go - I'm planning to switch to cgo bindings for more performance!. I've got this idea from a former college of mine, a really talented engineer phr3nzy
Here's the flow for how things were done
- S3 is AWS simple storage service.
- Lambda is a serverless, event-driven compute service.
- SQS is a managed message queuing service.
Here's how the lambda code looks like
const aws = require("aws-sdk");
const sqs = new aws.SQS({ apiVersion: "2012-11-05" });
const s3 = new aws.S3({ apiVersion: "2006-03-01" });
exports.handler = async (event, context) => {
const bucket = event.Records[0].s3.bucket.name;
const key = decodeURIComponent(
event.Records[0].s3.object.key.replace(/\+/g, " ")
);
try {
const { ContentType, ContentLength } = await s3
.headObject({ Bucket: bucket, Key: key })
.promise();
const [type] = ContentType.split("/"); // ["image", "jpeg"], ["video", "mp4"], ...
if (type === "video") {
// convert ContentLength from bytes to megabytes
const size = ContentLength / (1024 * 2024);
const messageBody = {
originBucketName: bucket,
originalFilePath: key,
destinationBucketName: "",
destinationBucketFolder: "",
orientation: "landscape", // "landscape" || "portrait"
resolution: "480", // 360 || 480 || 720
size,
};
await sqs
.sendMessage({
MessageBody: JSON.stringify(messageBody),
QueueUrl: process.ENV.VIDEO_QUEUE_URL,
})
.promise();
}
} catch (err) {
console.log(err);
throw new Error(err);
}
};
After capturing the video data we send it to SQS then the go service should poll and process the video. I used Go AWS SDK to interact with AWS. and for the actual processing I've implemented a pipeline with three stages to process the videos.
Stage 1 - Downloading
func download(s3 *s3.Client, messages ...config.Message) <-chan config.DownloadedFile {
out := make(chan config.DownloadedFile)
go func() {
defer close(out)
for _, msg := range messages {
fullpath, err := storage.DownloadObject(s3, msg.OriginBucketName, msg.Filename)
if err != nil {
log.Error(err.Error())
return
}
name := strings.Split(msg.Filename, ".")
format := strings.ToLower(name[len(name)-1])
path := config.DownloadedFile{Fullpath: fullpath,
Filename: msg.Filename,
DestinationBucketName: msg.DestinationBucketName,
DestinationBucketFolder: msg.DestinationBucketFolder,
Resolution: msg.Resolution,
Orientation: msg.Orientation,
Format: format,
}
out <- path
}
}()
return out
}
Stage 2 - Transcoding
func transcode(paths <-chan config.DownloadedFile) <-chan config.TranscodedFile {
out := make(chan config.TranscodedFile)
go func() {
defer close(out)
for file := range paths {
unique := fmt.Sprintf("%s_%s_%s", file.Orientation, file.Resolution, file.Filename)
outputPath := fmt.Sprintf("%s/%s", config.RootVideosDir, unique)
ffmpeg := fluentffmpeg.NewCommand("")
vError := ffmpeg.
InputPath(file.Fullpath).
FromFormat(file.Format).
VideoBitRate(4 * 1042).
FrameRate(30).
AspectRatio(AspectRatios[file.Orientation]).
Resolution(VideoResolutions[file.Resolution]).
VideoCodec("libx265").
ConstantRateFactor(VideoQualities[file.Resolution]).
Preset("ultrafast").
OutputFormat(file.Format).
OutputPath(outputPath).
Overwrite(true).
Run()
if vError != nil {
log.Error(vError.Error())
return
}
transcoded := config.TranscodedFile{
DestinationBucketName: file.DestinationBucketName,
DestinationBucketFolder: file.DestinationBucketFolder,
Fullpath: file.Fullpath,
LocalDiskPath: outputPath,
Orientation: file.Orientation,
Filename: file.Filename}
out <- transcoded
}
}()
return out
}
Stage 3 - uploading
func upload(s3 *s3.Client, videos <-chan config.TranscodedFile) {
go func() {
defer close(errc)
for video := range videos {
dest := fmt.Sprintf("%s/%s",video.DestinationBucketFolder, video.Filename)
storage.UploadVideo(s3, video.Fullpath, video.DestinationBucketName, dest)
deleteFiles(video.Fullpath, video.LocalDiskPath)
}
}()
}
Finally I'd run the pipeline with this line
upload(s3Client,transcode(download(s3Client, messages...)))
Of course may need to add more but this is a showcase.
The code above uses goroutines which is how you write concurrent code in go.
Thanks for sticking to the end of the article!. Hope it helps.
Posted on January 22, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.