Shuttle
Posted on March 20, 2024
Notifications are a very helpful tool for notifying services. This is particularly relevant if you’re using microservice architecture. One notification mechanism we can use is AWS SNS (Simple Notification Service), which lets us create topics, subscribe to them and push messages to topics. By the end of this article, you’ll have two fully functioning web services with the following overall functionality:
- One web service will have a single endpoint that can receive a message from AWS SNS
- One web service will handle the job of sending messages
Interested in just deploying? You can find the repository here, complete with instructions on how to deploy.
Getting started
Pre-requisites
To get started, you will need two keys from AWS:
- Your access key ID
- Your secret access key
Receiving AWS SNS messages
To start with, let’s have a look at how we can receive SNS messages. We can do this with cargo shuttle init
, picking Axum as our framework. This service will have a single endpoint that takes the SNS messages and prints the message out.
To make this idiomatic, we’ll want to create a struct called SnsMessage
that implements axum::FromRequest
. This will allow us to use it as an extractor, rather than trying to parse the POST requests from SNS manually.
use axum::{routing::{get, post}, Router, response::{IntoResponse, Response},
http::StatusCode, extract::{Request, FromRequest},
Json, RequestExt
};
use serde::Deserialize;
#[derive(Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct SnsMessage {
#[serde(rename = "Type")]
kind: String,
message_id: String,
topic_arn: String,
subject: String,
message: String,
timestamp: String,
signature_version: String,
signature: String,
signing_cert_url: String,
unsubscribe_url: String
}
#[axum::async_trait]
impl<S> FromRequest<S> for SnsMessage {
type Rejection = Response;
async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
let headers = req.headers();
// if none of these headers are sent in the request
// automatically send a BAD_REQUEST status code
if !headers.contains_key("x-amz-sns-message-type")
| !headers.contains_key("x-amz-message-id")
| !headers.contains_key("x-amz-topic-arn")
| !headers.contains_key("x-amz-subscription-arn") {
return Err((StatusCode::BAD_REQUEST).into_response())
}
let Json(payload): axum::Json<SnsMessage> = req.extract()
.await.map_err(|_| (StatusCode::BAD_REQUEST).into_response())?;
Ok(payload)
}
}
Note that this FromRequest<S>
implementation is primarily to show the fundamentals. In production use cases, you would want to verify the signature sent from AWS to make sure it’s accurate. You can find out more about the basics of this here.
To use our new struct, you can use SnsMessage
as an extractor for a handler function like this:
async fn receive_sns(
sns_message: SnsMessage
) -> StatusCode {
println!("{}", sns_message.message);
StatusCode::OK
}
You can find out more about receiving POST requests from AWS SNS here. To extend this, you may want to think about verifying the signature sent by AWS here.
We’ll tie this all together by adding the handler function to our router:
use axum::routing::post;
#[shuttle_runtime::main]
async fn axum() -> shuttle_axum::ShuttleAxum {
let router = Router::new().route("/", get(hello_world))
.route("/sns", post(receive_sns));
Ok(router.into())
}
Now it’s complete!
Deploying
To deploy, you can use cargo shuttle deploy
(with --allow-dirty
if on a Git branch and uncommitted changes). Take note of the deployment URL! We will need this later on.
Sending AWS SNS messages
Now for the second part of our notification service: sending messages to our receiver!
Setup
We’ll get started by using cargo shuttle init
once again, picking Axum as our choice of framework. Make sure to follow the prompt until the end to finalise your project.
Once done, we can install our dependencies with this script:
cargo add aws-config@1.1.8 -F behavior-version-latest
cargo add aws-credential-types@1.1.8 -F hardcoded-credentials
cargo add aws-sdk-sns@1.18.0
cargo add anyhow@1.0.81
cargo add serde@1.0.197 -F derive
cargo add serde-json@1.0.114
cargo add thiserror@1.0.58
Remember your AWS keys from earlier? We’ll add them to Secrets.toml
file in the root of our project folder. It should look like this:
AWS_ACCESS_KEY_ID = "your-access-key-id-here"
AWS_SECRET_ACCESS_KEY = "your-secret-access-key-here"
Setting up our AWS config
To get started, we’ll want to add our resource annotations to our main function - which should look like this:
#[shuttle_runtime::main]
async fn main(
#[shuttle_runtime::Secrets] secrets: SecretStore,
) -> shuttle_axum::ShuttleAxum {
// .. your code here
}
When we use cargo shuttle run
, our secrets will now get provisioned to us without needing to do anything!
Before we start adding more code, let’s write our AppState
struct to hold Axum state. This will allow our handler functions to access our AWS SNS client whenever we want.
use aws_sdk_sns::Client;
#[derive(Clone)]
pub struct AppState {
sns: Client,
topic_arn: String
}
Next, we’ll want to add code to our main function that gets the secrets, creates aws_credential_types::Credentials
and creates an AWS config from the credentials, as well as adding a region. We’ll use eu-west-02
to reduce latency (because the Shuttle servers are hosted on eu-west-02
). We then create a new client, initialize our app state and then add it to the axum::Router
;
let access_key_id = secrets.get("AWS_ACCESS_KEY_ID").expect("AWS_ACCESS_KEY_ID not set in Secrets.toml");
let secret_access_key = secrets.get("AWS_SECRET_ACCESS_KEY").expect("AWS_ACCESS_KEY_ID not set in Secrets.toml");
let creds = Credentials::from_keys(
access_key_id,
secret_access_key,
None
);
let cfg = aws_config::from_env()
.region(Region::new("eu-west-02"))
.credentials_provider(creds)
.load()
.await;
let sns = Client::new(&cfg);
let state = AppState { sns };
let router = Router::new().route("/", get(hello_world)).with_state(state);
Error handling
Error handling for the ShuttleAxum type
The shuttle_axum::ShuttleAxum
return type defaults to anyhow::Error
for user errors. This doesn’t affect handler functions. For functions used in the main function however, they will need to either be able to convert to anyhow::Error
or be of the same type.
Error handling for API route functions
To create errors easily, we’ll use the thiserror
crate we installed to generate From<T>
implementations for our error type, as well as the error messages. Let’s have a look.
use thiserror::Error;
use aws_sdk_sns::error::SdkError;
use aws_sdk_sns::operation::publish::PublishError
#[derive(Debug, Error)]
pub enum ApiError {
#[error("Error while publishing message: {0}")]
PublishMessage(#[from] SdkError<PublishError>),
}
We use the thiserror::Error
derive macro to enable the attribute macros. A few things are happening here:
- The
#[from]
implementation automatically derivesFrom<T>
so that the given type will automatically turn into a given enum variant. Note however that if the error type is an enum, it will convert it regardless of the enum variant. If you want more custom error handling, you may want to implementFrom<T>
manually. -
#[error("..")]
automatically generates thestd::fmt::Display
implementation for our error type. Note that we need to use this on every enum variant, otherwise we will receive compile-time errors. - Using
thiserror::Error
automatically implementsstd::error::Error
forApiError
so there’s no need for us to re-implement it!
Next, we’ll want to implement axum::response::IntoResponse
so that we can return ApiError
from our Axum handler functions.
use axum::{
http::StatusCode,
response::{IntoResponse, Response},
};
impl IntoResponse for ApiError {
fn into_response(self) -> Response {
(StatusCode::INTERNAL_SERVER_ERROR, self.to_string().into_response())
}
}
Currently we just have all of our error variants return internal server errors. To go further into this, you could create a response depending on the internal error enum variant or source (via pattern matching).
Topics
To create a topic, we can use the AWS SNS client to create a builder object and then add tags or attributes to it depending on what we want.
For the endpoint, it is mostly about creating the builder, appending the attributes and using send()
to create it.
async fn create_topic(sns: &Client, name: &str) -> AnyhowResult<String> {
let topic = sns.create_topic().name(name);
let output = topic.send().await.unwrap();
println!("Topic created: {output:?}");
Ok(output.topic_arn.unwrap())
}
The output from sending the topic will contain the ARN. We need to store this to be able to retrieve a topic later on. Note that attempting to create two topics with the same name will return the current ARN instead of creating a new resource.
If you’re interested in customising this further, you can check out more about the CreateTopicFluentBuilder
here.
Subscriptions
Receiving published messages requires a topic subscription. Creating a subscription requires declaring an endpoint, the topic ARN as well as the protocol used. We can refer to the SubscribeFluentBuilder
for this here.
The function will look something like this:
async fn subscribe_to_topic(sns: &Client, url: &str, arn: &str) -> AnyhowResult<()> {
let sub = sns
.subscribe()
.protocol("https".to_string())
.endpoint(url)
.topic_arn(arn);
let output = sub
.send()
.await
.map_err(|e| anyhow::anyhow!("error: {e}"))?;
println!("New subscriber created: {output:?}");
Ok(())
}
There is not much to talk about here as it’s primarily just setting it up and then sending it. If we wanted to, we could also create multiple subscriptions to the same topic ARN, giving us the benefit of being able to fan-out our notifications.
Because we’re setting up an HTTPS endpoint, we need to confirm the subscription. When we create the subscription, SNS will send a subscription confirmation message to our receiver. This requires our receiver to already be deployed. Because we already print out the confirmation message in our logs, we can easily access and visit the URL to confirm the subscription. You can find out more about this here.
To check that the subscription exists, we can do this:
async fn subscription_exists(sns: &Client, url: &str, arn: &str) -> AnyhowResult<bool> {
let subscribers = sns
.list_subscriptions()
.send()
.await
.map_err(|e| anyhow::anyhow!("error: {e}"))?;
if let Some(subs) = subscribers.subscriptions {
if subs.iter().any(|sub| {
sub.clone().endpoint.unwrap() == *url && sub.clone().topic_arn.unwrap() == *arn
}) {
return Ok(true);
}
return Ok(false);
}
Ok(false)
}
Publishing messages
To be able to publish messages, we’ll need the topic ARN we want to subscribe to, the message as well as the subject.
#[derive(Deserialize)]
struct PublishMessageParams {
message: String,
subject: String
}
async fn publish_message(
State(state): State<AppState>,
Json(json): Json<PublishMessageParams>
) -> Result<impl IntoResponse, ApiError> {
let res = state.sns.publish()
.topic_arn(state.arn)
.message(json.message)
.subject(json.subject)
.send().await?;
Ok(StatusCode::OK)
}
All of the endpoints who are subscribed to the topic will now automatically receive the message!
Interested in customising this further? You can check out more about the PublishFluentBuilder
here.
Connecting it all together
To connect it all, let’s revisit our main function and fill it in with the new functions:
#[shuttle_runtime::main]
async fn main(#[shuttle_secrets::Secrets] secrets: SecretStore) -> shuttle_axum::ShuttleAxum {
let access_key_id = secrets
.get("AWS_ACCESS_KEY_ID")
.expect("AWS_ACCESS_KEY_ID not set in Secrets.toml");
let secret_access_key = secrets
.get("AWS_SECRET_ACCESS_KEY")
.expect("AWS_ACCESS_KEY_ID not set in Secrets.toml");
let creds = Credentials::from_keys(access_key_id, secret_access_key, None);
let cfg = aws_config::from_env()
.region(Region::new("eu-west-02"))
.credentials_provider(creds)
.load()
.await;
let sns = aws_sdk_sns::Client::new(&cfg);
let topic_arn = create_topic(&sns, "my_topic").await?;
// NOTE: Change this to your deployment URL for your receiver service!
// The SNS receiver route should be on `/sns` as instructed previously
let url = "https://sns-receiver.shuttleapp.rs/sns";
if !subscription_exists(&sns, url, &topic_arn).await? {
subscribe_to_topic(&sns, url, &topic_arn).await?;
}
let state = AppState { sns, topic_arn };
let router = Router::new().route("/", get(hello_world)).with_state(state);
Ok(router.into())
}
Deploying
To deploy, we just need to use cargo shuttle deploy
(with the --allow-dirty
flag if on an uncommitted Git branch) and watch the magic happen!
Finishing up
Thanks for reading! AWS SNS is a powerful tool to be able to get going with notifying services, particularly if you need fan-out from something like AWS SQS or other message queues.
Read more:
- Learn more about how to competently write an API with Axum
- Secure your API with JWT authentication
- Learn about getting started with logging in Rust
Posted on March 20, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
November 15, 2023