Build your own Dynamo-like key/value database - Part 1 - TCP Server
Rafael Camargo Leite
Posted on November 12, 2024
Intro
This is the second blog post part of the Build your own Dynamo-like key/value database
. You can find part one here.
Part 1 - Exposing APIs to clients
1. Background
Every database needs to somehow vend its APIs for clients to consume.
This is not strictly important for our goal of studying the internals of a database system but nevertheless we need to include an entry-point for clients to interact with, even if only to allow us to write proper end-to-end tests.
2. Requirements
2.1. Non-functional
-
readability
- This is going to be a requirement for every component but since this is our first one, better to state it clearly. If, at any point, going through the code base yields too many "wtf"s, we messed up..
-
Minimal amount of dependencies
- This another cornerstone of every component - our goal is to learn how all of these data structure and algorithms are implemented. if we just include dependencies for all of them, what's the point of even building this database!?
-
Simplicity and extensibility
- similar to requirement 1. - making components as simple as possible definitely helps with readability.
- let's make sure adding new commands doesn't require lots of changes/boilerplate.
2.2. Functional
The only functional requirement I'll add is the ability to trace requests based on request_ids - ie: For every request our database receives, we have to be able to trace it for debugging purposes.
3. Design
Different databases provide different interfaces for clients to interact with. A few examples are:
- SQLite - This is a sql database that runs embedded in the process that consumes it. For this reason, there's a C client library that exposes all necessary APIs for database management and query execution.
- Redis - Redis is an in memory database that provides a wide variaty of APIs to access different data structures though TCP. It uses a custom protocol on top of TCP called RESP. So clients that want to integrate with Redis have to implement RESP on top of TCP to be able to issue commands against it.
- CouchDB - exposes a RESTful HTTP API for clients to consume.
As previously stated, in our case, the actual interface is less relevant - our goal is to study the internals of storage systems, not to write a production ready, performant database for broad consumption. For that reason, I'll use non-functional requirements 2.(as few dependencies as possible) and 3. (simplicity) to choose to expose our APIs via TCP using a thin serialization protocol based on JSON.
Our serialization protocol will be based on Message
- a struct
composed of a small binary header followed by an optional JSON payload that represents the Command
we want to execute.
After parsing a Message
from the network, we will then build a Command
which, in turn, can be executed.
4. Implementation
We will start by defining what a Command
is.
Think of a Command
as a Controller
on a MVC application. A command basically interprets the request a client has issued and generates the appropriate response by calling internal methods where the business logic is.
Let's get a bit more concrete by implementing Ping
.
From a client's perspective, Ping
is as simple as:
- Client sends a Ping request over TCP,
- Cluster node parses the received bytes into a
Message
, - Cluster node constructs a
Ping
command from theMessage
parsed in step 2 - Cluster node replies to the client with a "PONG" message
The ping command can be found here in rldb
. The core of it's implementation is shown below:
...
#[derive(Debug)]
pub struct Ping;
#[derive(Serialize, Deserialize)]
pub struct PingResponse {
pub message: String,
}
impl Ping {
pub async fn execute(self) -> Result<PingResponse> {
Ok(PingResponse {
message: "PONG".to_string(),
})
}
}
...
Now, how do we construct a Ping
Command
from a Tcp connection?
That's where the Message
definition comes into play. Message
is our serialization protocol on top of TCP.
It defines the format/frame of the bytes a client has to send in order for our server to be able to interpret it.
Think of it as an intermediate abstraction that enables our database server to properly route requests to the specific Command
based on the arguments provided.
A Message
is defined as follows:
pub struct Message {
/// Used as a way of identifying the format of the payload for deserialization
pub cmd_id: CommandId,
/// A unique request identifier - used for request tracing and debugging
/// Note that this has to be encoded as utf8 otherwise parsing the message will fail
pub request_id: String,
/// the Request payload
pub payload: Option<Bytes>,
}
Every field prior to the payload can be thought of as the header of the message while payload is what actually encodes the Command
we are trying to execute. Many other fields could be included as part of the header. Two fields will most likely be added in the future
- a checksum of the payload (maybe one including the overall message as well?)
- the timestamp of the message (just for debugging purposes)
Now let's construct a Message from a Tcp connection:
impl Message {
pub async fn try_from_async_read(tcp_connection: &mut TcpStream) -> Result<Self> {
let cmd_id = CommandId::try_from(tcp_connection.read_u8().await?)?;
let request_id_length = tcp_connection.read_u32().await?;
let request_id = {
let mut buf = vec![0u8; request_id_length as usize];
tcp_connection.read_exact(&mut buf).await?;
String::from_utf8(buf).map_err(|_| {
Error::InvalidRequest(InvalidRequest::MessageRequestIdMustBeUtf8Encoded)
})?
};
let payload_length = tcp_connection.read_u32().await?;
let payload = if payload_length > 0 {
let mut buf = vec![0u8; payload_length as usize];
tcp_connection.read_exact(&mut buf).await?;
Some(buf.into())
} else {
None
};
Ok(Self {
cmd_id,
request_id,
payload,
})
}
}
Three notes:
- I removed most of the error handling from this snippet to make it more readable. Read the actual implementation here if curious.
- If you check the real implementation, you will see that the signature of the function is slightly different. Instead of
pub async fn try_from_async_read(tcp_connection: &mut TcpStream) -> Result<Message>
we have
pub async fn try_from_async_read<R: AsyncRead + Unpin>(reader: &mut R) -> Result<Self>
This is an important distinction that is worth discussing. If we had the TcpStream
as argument, every test would require that we setup a real Tcp server, create Tcp connections and send / receive bytes via localhost.
Not only this would make tests slow, they would also make writing tests much more complicated than they had to be.
Here is an example of a test that we can write without setting up any tcp connection:
#[tokio::test]
async fn test_max_message_size_exceeded() {
let mut reader = MaxMessageSizeExceededAsyncRead::default();
let err = Message::try_from_async_read(&mut reader)
.await
.err()
.unwrap();
match err {
Error::InvalidRequest(InvalidRequest::MaxMessageSizeExceeded { max, got }) => {
assert_eq!(max, MAX_MESSAGE_SIZE);
assert_eq!(got, MAX_MESSAGE_SIZE + 1);
}
_ => {
panic!("Unexpected error: {}", err);
}
}
}
In this example, I chose to implement AsyncRead for my custom struct MaxMessageSizeExceededAsyncRead
. But we could've used UnixStream
s or many other options of types that impl AsyncRead
to inject the bytes we are interested in.
- You will likely notice that no timeouts are set to any of the tcp connection interactions. This is something that has to be included if we are building a production ready service but that I chose to skip at this point as it is not the focus of our work. But it has to be stated that if you plan on deploying anything that interacts with the network to a production environment, error handling of timeouts/slow requests is mandatory.
Now that we can build a Message
out of a TcpStream
, we must be able to serialize a Message
into bytes so that it can be sent via network. The snippet below depicts how this can be done (without error handling again)
impl Message {
pub fn serialize(self) -> Bytes {
let payload = self.payload.clone();
let payload_len = payload.clone().map_or(0, |payload| payload.len());
let mut buf = BytesMut::with_capacity(
self.request_id.len() + payload_len + 2 * size_of::<u32>() + size_of::<u8>(),
);
buf.put_u8(self.cmd_id as u8);
buf.put_u32(self.request_id.len() as u32);
buf.put(self.request_id.as_bytes());
buf.put_u32(payload_len as u32);
if let Some(payload) = payload {
buf.put(payload);
}
assert_eq!(buf.capacity(), buf.len());
buf.freeze()
}
}
Let's go over some notes here as well:
- You will quickly see that I rely on the bytes crate heavily throughout this project. Bytes is a very useful tool to write network related applications that allows you to work with contiguous byte buffers while avoiding memcopies almost entirely. It also provides some neat traits like BufMut which gives use methods like
put_u32
etc.. Please refer to its documentation for more information. - I included an assertion about
buf
len and capacity that is there to make sure that an important invariant is held: If we allocate a buffer of sizeX
, we must completely fill it without ever resizing it. This guarantees that we are properly computing the buffer size prior to filling it (which is important if we care about performance, for example)
Finally, given that we have a cmd_id
field in Message
, we can easily choose how to serialize the payload field for each specific Command and vice versa.
For Ping, a Request Message
would look like
Message {
cmd_id: 1,
request_id: <some string>,
payload: None
}
and a response message would look like
Message {
cmd_id: 1,
request_id: <some string>,
payload Some(Bytes::from(serde_json::to_string(PingResponse {message: "PONG".to_string()})))
}
And that's it: A Client needs to know only about Message
in order to be able to interact with our database.
If you want to walk through the code yourself, here are the pointers to the 4 larger components
The IntoMessage trait and request_id (covers the functional requirement around tracing requests)
For any type that we want to be able to be converted into a Message
, we can implement the IntoMessage
trait for it.
pub trait IntoMessage {
/// Same as [`Message::cmd_id`]
fn cmd_id(&self) -> CommandId;
/// Same as [`Message::payload`]
fn payload(&self) -> Option<Bytes> {
None
}
fn request_id(&self) -> String {
REQUEST_ID
.try_with(|rid| rid.clone())
.unwrap_or("NOT_SET".to_string())
}
}
You will see that this trait has 2 default implementations:
- payload -> Commands like Ping don't have a payload. So Ping doesn't have to implement this specific function and just rely on the default behavior
- request_id -> This is the more interesting one: Every message has to have a request_id associated with it. This is going to be extremely important once we have to analyze logs/traces for requests received by the database.
The way request_id is handled by rldb at the time of this writing is: request_id is injected either by the client or by the server into tokio::task_local as soon as the Message
is parsed from the network.
If you really think about it, it's a bit strange that we let the client set the request_id that is internal to the database. We allow this to happen (at least for now) because rldb nodes talk to each other. In requests like Get
, a rldb cluster node will have to talk to at least another 2 nodes to issue internal GET requests. In order for us to be able to connect all of the logs generated for a client request, we have to provide a mechanism for a rldb node to inject the request_id being executed when issuing requests to other nodes in the cluster.
Final thoughts
This post covered how rldb
handles incoming client requests. It described how requests are serialized/deserialized and how request ids are propagated throughout the request lifecycle.
Next chapter
- Part 3 - Bootstrapping our cluster: Node discovery and failure detection
Which will cover how gossip protocols work and go over the rldb
gossip implementation.
Posted on November 12, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.