Samba Diallo
Posted on August 15, 2019
The coding world is rich with new languages and frameworks, and one growing in popularity is Rust. You’ve probably heard about it, somewhere between Stack Overflow and programming-focused Twitter accounts.
I won't put you through a sales pitch on Rust itself, after all, this Stack Overflow poll speaks for itself. However, I do want to show how to integrate Pub/Sub messaging into your Rust project by creating a Rust chat app.
Follow along with the tutorial or check out the full source code on GitHub.
What is Publish-Subscribe?
Publish-Subscribe, or Pub/Sub is a way for services and clients to communicate with each other in realtime. One common use case of Pub/Sub in Rust is to connect multiple devices or servers and reflect any changes or updates across them simultaneously. One can use these payloads to update systems, send chat app messages, live geolocation tracking and more. Another use is networking in Web Assembly, a pre-compiled binary format that will replace JavaScript in the browser. This will enable developers to make website front ends with another language besides JavaScript. Instead of having to interpret JavaScript at runtime, it compiles other programming languages to binary before the page loads.
There are a few Pub/Sub API’s out there; I’m going to use PubNub. The reason why I use it with Rust is that it has a simple REST API that I can leverage. While also providing pre-written, open-source API integrations, it provides a solid infrastructure for my use case.
Take the first step in creating this project by obtaining free PubNub API keys.
Setting Up a Rust Environment
When starting to develop with Rust, I found the compiler to be extremely helpful. Aside from running your code, it tells you where and what you’re doing wrong, and it will tell you why if you ask it! Let’s get it installed by typing curl https://sh.rustup.rs -sSf | sh
into your terminal.
Once that finishes, navigate to where you want your new Rust project to live and type cargo new rustychat
Now navigate into your project and use cargo run
to run your project!
Now that you have a project set up, let's go over some of the files inside of your project.
The file “Cargo.toml” is similar to a package.json from Node.js. This is where you’ll put all your crates (libraries), and when you build your project, they’ll be installed. If you’ve installed iOS frameworks with Cocoapods, listing these dependencies will feel similar.
Inside here, under dependencies, you should include the six crates that our app uses. The libraries will allow us to:
- Send GET requests
- Define custom error types more easily
- Construct and destruct JSON objects
- Derive custom structs from JSON
- Create a custom Terminal UI
- URL encode our requests
[dependencies]
reqwest = "0.9.18"
custom_error = "1.6.0"
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
cursive = { version = "0.12.0", default-features = false, features = ["pancurses-backend"] }
percent-encoding = "1.0.1"
Your Cargo.toml should now have all of the crates we plan on using. When we build our project, Rust will automatically install these dependencies.
The next file is src -> main.rs, your main Rust code file. Here is where your main function is, in addition to “imports” of crates and the defining of structs.
Above your main function, insert these lines to gain access to the crates you just listed as dependencies. Include some crates that come standard in Rust.
extern crate reqwest;
use percent_encoding::{percent_encode, PATH_SEGMENT_ENCODE_SET};
use serde::{Deserialize, Serialize};
use std::sync::mpsc::{channel, Sender};
use std::thread;
use cursive::align::HAlign;
use cursive::traits::*;
use cursive::Cursive;
use cursive::view::ScrollStrategy;
use cursive::views::{BoxView, Dialog, DummyView, EditView, LinearLayout, ScrollView, TextView};
use custom_error::custom_error;
If you run your code again, it should build all the crates, and then output “Hello World.”
Create a Custom Error
When taking risky actions in code, you should write error handlers to gracefully marshall code execution gone awry. This is why I used custom_error, a crate that reduces the boilerplate needed in normal Rust for creating errors. We define the type, and for each type of error we expect, we can return a description.
In our code, we’ll be turning an object into a JSON string (and vice-versa), making a GET request to a URL, and accessing what comes back as a response. All three of these are risky and might fail, but we can recover. Once we detect an error, create a description to output in the terminal.
Look below for an example of the errors that I cover and how I define them.
custom_error! {ChatError
JSONError{source: serde_json::error::Error} = @{
source.to_string()
},
ReqwestError{source: reqwest::Error } = @{
source.to_string().split(": ").collect::<Vec<&str>>()[1]
},
Unknown = "unknown error"
}
Define Structs for Rust JSON Response
In this section, we will define a few structs that represent a JSON response that we get from PubNub. We’ll have four structs that each represent one layer of the response object we receive.
Response holds a Time and a vector of MessageResp. Time holds a time token string, MessageResp holds a Message, which has a UUID string field and a text string field. Each of these fields needs to be deserializable. This allows us to throw the JSON response into a Response struct, and it’ll take care of organizing the data. Message also needs to be serializable to turn it into JSON data.
#[derive(Deserialize)]
struct Response {
t: Time,
m: Vec<MessageResp>,
}
#[derive(Deserialize)]
struct MessageResp {
d: Message,
}
#[derive(Deserialize)]
struct Time {
t: String,
}
//Message is a sub object of MessageResp
#[derive(Serialize, Deserialize)]
struct Message {
uuid: String,
text: String,
}
Asynchronous Rust: How to Multithread
Our main function in this project will create two threads: a thread that searches for new messages arriving and a thread for our UI. Let’s start with making our program use multithreading. Making our program run on multiple threads allows for us to block one thread waiting for messages while the user can still type new messages in the chat input.
The app runs on one thread, going line by line, by default. We can create a second thread that runs the subscribe loop at the same time. Before we create this second thread, how would we access its data on the main thread?
That’s where the channel comes in. It creates a sender and a receiver, also known as a producer and consumer. The sender can be cloned but the receiver cannot, allowing the receiver to access messages in the order they were sent.
Create two separate channels, one is to send a channel from the UI to the subscribe thread, and the other is to send messages from the subscribe thread to the UI.
fn main() {
//We create two channels, one to pass the channel name to the subscribe function
//Another to send new messages from the subscribe function to the UI
let (channel_sender, channel_receiver) = channel();
let (mut msg_sender, msg_receiver) = channel();
//...
//REST OF THE MAIN FUNCTION
}
Now create a thread using spawn, which is the simplest way to create new threads in Rust. We put the keyword “move“ before the callback to take ownership of the channel receiver and message sender. Inside the new thread, create an empty string that we’ll use for our initial time token. To subscribe, we need a channel from our UI. Our UI will send a channel once the user submits one, but until then, we should halt our thread.
The great thing about using channels is that we have the option of waiting for information to be sent. Let’s use the “recv()“
function on our receiver, which waits for a single value to be passed through the channel.
//...
//INSIDE MAIN
//Create a seperate thread, this allows us to have a subscribe loop that wont stop the UI from updating
let _handle1 = thread::spawn(move || {
let mut time_token = "".to_string();
//We wait for the UI to send us the channel name
let test_channel = channel_receiver.recv();
//...
//REST OF THREAD
}
//...
//REST OF MAIN
When we receive a variable from another thread, we don’t know if it is an error or not. If the value is “Ok,” unwrap it into a string. Let’s create our subscribe loop! Use the “loop“ keyword to create an infinite loop.
//...
//INSIDE THREAD
if test_channel.is_ok() {
let channel_name: String = test_channel.unwrap();
loop {
//...
//REST OF LOOP
}
}
The next step will be to call our subscribe function. It’ll return a Result enum of String, and ChatError. Results contain values wrapped in either an “Ok” or an “Err.” If the action went well, then an “Ok” will return, but “Err” will return if there was an error. You can check if the result is an error or a success, and additionally see what the value is by unwrapping it. The subscribe function will borrow the time token, a mutable version of the message sender, and the channel name.
//...
//Loop
let result: Result<String, ChatError> = subscribe(&time_token, &mut msg_sender, &channel_name);
We haven’t created this function yet, but I’ll walk you through it soon. All that we need to know right now is that the function might return an error. Let’s handle both cases.
If our subscribe Result went smoothly, “result.is_ok()“
should return true. If so, unwrap it and assign your time token to its value.
if result.is_ok() {
//We update the time_token var to get all messages that happened after that specific time.
time_token = result.ok().unwrap();
}
If it did not go well, don’t start worrying about alerting the user yet; it might be a ‘timed out’ error. This is expected every so often if there were no new messages.
Because PubNub’s REST API uses HTTP Long Polling, time outs mean that nothing new happened. If it did not time out, then we should print the error and then break out of the loop. This cancels our program. That’s the end of our loop, if we receive a time token or a timeout error from our subscribe function then we start the loop over.
else if result.is_err() {
let err = result.unwrap_err();
//If the request times out, thats okay, we just restart it with that same time token, looking for new messages.
if err.to_string() != "timed out" {
println!(
"Error: {:?} \nPlease restart application to try again.",
err.to_string()
);
break;
}
}
//...
//END LOOP
//END IF
//END THREAD
It’s time to pause on the main function now that we finished our subscribe loop. It’s time to delve deeper into subscribing by actually creating the function.
PubNub REST API
Since we already obtained our keys, let’s learn how we’ll use PubNub in Rust.
PubNub has a REST API, which allows us to access Pub/Sub and more with just HTTP GET requests. Check out the REST API with HTTP GET docs for more information.
The PubNub REST API allows developers to publish messages and to subscribe to channels using HTTP requests. Having low-level control over requests lets you decide how often to make the subscribe calls and whether calls will be asynchronous or not. For this chat app, we’ll be making subscribe calls as soon as our previous one ends so users can still interact with other parts of our app.
Check out the diagram below to understand the flow of information in our app.
We’ll be using the Publish and Subscribe URLs in our app, each in its function. I’ll first give instructions on subscribing, then we can use some of the similar concepts to publish.
Subscribing to a Channel in Rust with a REST API
This section will be a guide for creating a request to PubNub for Pub/Sub Subscribe. Through this, we’ll have a better understanding of how PubNub works with Rust. We’ll learn how to message data from this thread to the UI.
Function Definition
As we saw earlier, our function borrows a time, a sender to a channel, and a channel name. It returns a Result that either contains a String or a ChatError, which we defined at the beginning of our project. This allows us to define what might go wrong in our code, and what to do in each situation
fn subscribe(time: &str, msg_sender: &mut Sender<String>, channel: &str ) -> Result<String, ChatError> {
//...
//Subscribe Function
}
Create the URL String
The first step in these functions will be to create the URL string. To do this, we use the macro function format and the crate percent-encoding. Using “format” makes it easier to create URLs with variables. The percent-encoding package lets us convert our objects into URL encoded strings.
In each request, there is a spot to put your subscribe key and channel name. We include a time parameter in the end, even though we won’t always have one. We don’t always pass one to the subscribe function. We always receive one in a successful response from PubNub.
//...
//In subscribe
//Format the URL
let url = format!(
"https://{host}/v2/subscribe/{subkey}/{channel}/0/{time}",
host = "ps.pndsn.com",
subkey = "INSERT_SUB_KEY_HERE",
channel = percent_encode(channel.as_bytes(), PATH_SEGMENT_ENCODE_SET),
time = percent_encode(time.as_bytes(), PATH_SEGMENT_ENCODE_SET),
);
Call the reqwest::get function
Once we have the URL we want to request messages from, we call it using “Reqwest.” Reqwest is an abstraction over Hyper: a lower level Rust crate for making network requests. This function may return the information we want, but it also may give an error. This is where our functions’ Result return type plays a role. If something goes wrong, it’ll return an error.
let mut resp = reqwest::get(&url)?;
When using reqwest::get, we leave a question mark at the end. This means that if this call fails, the subscribe function will return an "Err" value. Instead of having to tell our functions to return an "Err" with custom error information on the inside, they will do it automatically.
Successful Response – JSON to Object
If the status of the response is successful, we can dive into what we received. We defined a few structs at the beginning of the file that will help us access the information inside! One of the crates we packed makes this deserialization process painless. We use serde_json to turn our response’s text into a nice object, that’s easy to access the information. This process also might fail, but we already have that error handled with ChatError.
if resp.status().is_success() {
let deserialized: Response = serde_json::from_str( & resp.text()?).unwrap();
//...
//Rest of If
}
We sometimes won’t receive one message in the response, so let’s iterate through the Vector inside of it. For each of the messages, we’ll use our Sender to pass each message’s information. I create a string from the two values we get from each, a UUID and text. Be sure to unwrap the statement afterward. After iterating through the messages we can return the new time we received in an "Ok".
//...
//In if statement
for m in deserialized.m {
//Send the new message to the UI above.
msg_sender
.send(format!("{} : {}", m.d.uuid, m.d.text))
.unwrap();
}
return Ok(deserialized.t.t);
//End of if
If the response did not succeed, put an “Ok” Result at the end of the subscribe function. Give it the original time that was passed in, which is converted to a string.
Ok(time.to_string())
//End of subscribe
Errors
Did you notice that we didn’t include any Err statements? All our errors are handled by ChatError. We do include some filled “Ok” statements, which allow us to pass some information back to the calling function and say “Hey, things went fine!”
Publishing Messages in Rust
After creating our subscribe function, let’s create another function to publish to PubNub. This function will follow most of the same format as the last one, but there are a few small changes.
Function Definition
This function will accept 3 strings as parameters. They are the text we want to send, the UUID of the sender, and the channel name. Just like the previous function, we return a Result, but this time it’s an empty “Ok”, or a ChatError Err.
fn publish(text: String, uuid: String, channel: String) -> Result<(), ChatError> {
//...
//PUBLISH FUNCTION
}
Object to JSON
At the beginning of this function, let’s create a Message with a couple of the parameters provided. We’re going to do the opposite of the subscribe function, but this time we’ll turn our object into a JSON string. This might return an error, so leave a question mark at the end and our ChatError will take care of it.
//...
//In publish
let message = Message { uuid, text };
let m_json = serde_json::to_string(&message)?;
Create URL String
This will be quite similar to the subscribe function, just adding and swapping some parameters. Replace ‘subscribe’ with ‘publish’, include your publish key, and replace time with a message. Both the channel and the message need to be URL encoded as well.
let url = format!(
"https://{host}/publish/{pubkey}/{subkey}/0/{channel}/0/{message}",
host = "ps.pndsn.com",
pubkey = "INSERT_PUB_KEY_HERE",
subkey = "INSERT_SUB_KEY_HERE",
channel = percent_encode(channel.as_bytes(), PATH_SEGMENT_ENCODE_SET),
message = percent_encode(m_json.as_bytes(), PATH_SEGMENT_ENCODE_SET),
);
Create GET Request
Make a GET request and put a question mark after the call. Put an “Ok” Result on the next line. Once these are complete, we’ve finished our publish function!
let _resp = reqwest::get(&url)?;
Ok(())
//End of publish
Create a Terminal UI using Cursive
What is a TUI?
We’ve learned how to use PubNub’s REST API and we also created a separate thread to long poll PubNub for new messages. So far, we look for messages on a channel and we send those new messages somewhere. We haven’t defined where we get the channel from and where the new messages go. Both of these have much to do with the user interface.
While it is possible to create a command-line based chat, using a Terminal User Interface makes the experience much cleaner and smoother. I used Cursive, a simple and feature-rich TUI that lets us separate the input from the output.
Channel and Username Input
To initiate Cursive, you need to create a mutable instance of its default function.
//Below end of _handle1 thread
let mut siv = Cursive::default();
Cursive uses a format that is reminiscent of other methods of UI creation, namely using Views. Views can hold other views and have different uses. Views at the very base are contained by layers. These are the windows that can be added or popped.
Create a layer, and pass it a Dialog. A Dialog is a type of window that can hold another view. It should be around a LinearLayout view, which we can set to a vertical orientation. LinearLayouts are great for organizing child views into stacks or rows. You can add children to it with the “child“
function, and it can be chained any number of times. You can also use “with“
to dynamically add in children. We’ll use that later, but for now, let’s design our connection layer inside the LinearLayout.
siv.add_layer(
Dialog::around(
LinearLayout::vertical()
//...
//LinearLayout's children
)
//...
//Title and buttons of Dialog
);
We are going to use a few different views to create this layer. We use TextViews for short lines of text, like a label. EditViews let users enter information, which we can later access with a specific ID. DummyViews are useful to space out our other views. You can design these how you want in your space, but I center the TextViews and set the EditViews width to 20. In addition to setting the styles, make sure you set an ID to each EditView AFTER “new()“
, but before “fixed_wifth(20)“
. If the ID is not in that spot, then we will not be able to reference that value later on. The Dummy Views are useful to space out items.
//...
//LinearLayout's children
.child(DummyView.fixed_height(1))
.child(TextView::new("Enter Username").h_align(HAlign::Center))
.child(EditView::new().with_id("username").fixed_width(20))
.child(DummyView.fixed_height(1))
.child(TextView::new("Enter Channel").h_align(HAlign::Center))
.child(EditView::new().with_id("channel").fixed_width(20))
After the LinearLayout is complete, finish up creating the Dialog. Add a title to the Dialog, create an “Okay” button with a callback, and a quit button. Finally, align it in the center. We’ll go into the “Okay” button’s callback next.
//...
//Attacched to Dialog
.title("PubNub Chat")
.button("Okay", CALLBACK )
.button("Quit", | s | s.quit())
.h_align(HAlign::Center),
Connect to a Channel
Inside of the “Okay” button, we have the option to provide a callback. This runs when the user either clicks or presses enter on the button. We want to use the “move“
keyword again to allow this callback ownership of all the variables it needs.
.button("Okay", move | s | {
//...
//Okay callback
}
Let’s grab the values that the user entered into the EditViews earlier.
//Inside Okay
let channel = s
.call_on_id("channel", |view: &mut EditView| view.get_content())
.unwrap();
let username = s
.call_on_id("username", |view: &mut EditView| view.get_content())
.unwrap();
Check if the username is empty, and if so, create a layer telling the user to enter a username. If it does not, then check if the channel name was empty. If it was, set it to “global” as a default.
//Checking if username input is empty.
if username.is_empty() {
s.add_layer(Dialog::info("Please enter a username!".to_string()));
} else {
let new_channel =
if channel.is_empty() {
"global".to_string()
} else {
channel.to_string()
};
//...
//The rest of connecting to PubNub
}
Continuing in that same else statement, let’s send the channel that we have (either “global” or a user-defined channel) to the subscribe loop. Once we do this, our loop will be able to proceed, and request messages from PubNub. Before we load the next screen, pop the initial layer.
channel_sender.send(new_channel).unwrap();
s.pop_layer();
Create the Chat Layer
Create another layer now, this will be a BoxView with a fixed size of 40, 20. Inside of that box will be a Dialog with a title. It has a content field. It’ll be aligned in the center, with a send and a quit button.
//...
//Still in else statement
s.add_layer(BoxView::with_fixed_size((40, 20),
Dialog::new()
.title("PubNub Chat")
.content(
//...
//Chat view: Includes list of messages and EditView
)
.h_align(HAlign::Center)
.button("Send", CALLBACK)
.button("Quit", | s | s.quit()),
))
//End of UI design
I'll first show how to design the content, then I'll show how to send a message. Inside the content function of the Dialog, insert a LinearLayout. This is to stack a Scrollview and an EditView on top of each other. Inside of the ScrollView, we insert another LinearLayout. This extra LinearLayout is so we can remove extra lines if we’d like. Some other views do not provide this functionality. Make the ScrollView stick to the bottom when new messages are received, using scroll_strategy
.
//Inside Content
LinearLayout::vertical()
.child(
ScrollView::new(
LinearLayout::vertical()
//Children of the LinearLayout
)
.scroll_strategy(ScrollStrategy::StickToBottom),
)
//Next Child
Inside of this second LinearLayout, we have DummyViews on the top and bottom, with the with
child between. It provides a reference to the LinearLayout and we can add children dynamically. Add some DummyViews, I chose 13, a number that works well with our layers height. We add these empty lines so that when new messages come in, they appear at the bottom first. Set the LinearLayouts ID to “messages” or whatever you want.
//Children of inner LinearLayout
.child(DummyView.fixed_height(1))
//Add in a certain amount of dummy views, to make the new messages appear at the bottom
.with( | messages | {
for _ in 0. .13 {
messages.add_child(DummyView.fixed_height(1));
}
})
.child(DummyView.fixed_height(1))
.with_id("messages"),
At the end of the first LinearLayout add an EditView child with the ID of “message.”
//Child of first LinearLayout
.child(EditView::new().with_id("message")),
//End of content
Publishing a Message
Now that we’ve designed our chat view, let’s work on sending some messages! Just like our “Okay” button from the previous layer, we want a callback and to move all the required variables into it.
.button("Send", move |s| {
//...
//Rest of the callback
}
Take the message from the EditView, as we did before, and check if it’s empty. If it is, then alert the user and tell them to type a message. If it is not empty, then we check if the channel that they entered is empty or filled, like we did above.
//Inside callback
let message = s
.call_on_id("message", |view: &mut EditView| view.get_content())
.unwrap();
if message.is_empty() {
s.add_layer(Dialog::info("Please enter a message!".to_string()))
} else {
let new_channel_2 = if channel.is_empty() {
"global".to_string()
} else {
channel.to_string()
};
//...
//Will handlee publishing messages
}
//End of callback
Call our publish function with the message, the username, and the channel names that the user entered. If the result was an error, then we tell the user that we encountered an error while publishing. If it went well, then we clear the text from the EditView.
//In the else statement
let result = publish(message.to_string(), username.to_string(), new_channel_2 );
if result.is_err() {
//If there was an error then we say that there is one, and don't do anything.
s.add_layer(Dialog::info("Error Publishing".to_string()));
} else {
//Clear out the EditView.
s.call_on_id("message", |view: &mut EditView| {
view.set_content("")
}).unwrap();
}
//End of else + callback
Inserting New Messages Into UI
Usually, we would use the command “siv.run()“
to have the UI start, but since we want to control when the UI changes we use “siv.refresh()“
instead. Create a new variable to count how many messages we receive, and refresh the UI after it updates.
//...
//After creating the UI design/layers
let mut message_count = 0;
siv.refresh();
Create a loop that first calls “step” to increment through the UI event loop. Check if cursive is running at that point, and if it’s not, break out of the loop. Create a variable to track whether there needs to be a UI refresh and set it to false.
loop {
siv.step();
if !siv.is_running() {
break;
}
let mut needs_refresh = false;
//...
//Handle new messages
}
Now for the final requirement, we need to check if any messages are waiting for us in the channel queue. We can use a non-blocking method on the “msg_receiver“
to see if anything is waiting. If there are messages, it will return an iterator. We can use a Rust for loop to iterate through the messages.
for m in msg_receiver.try_iter() {
//...
//Adding each message "m" in
}
Inside of this for loop, we access the LinearLayout with the ID “messages”. We can then set our refresh boolean to true, add one to the message count, and add the message as a child to the LinearLayout. In that loop, you can also remove the first child. This is to avoid a scroll bar from appearing until the messages go past the screen! Be sure to check if the message count is less than or equal to your DummyView count plus one.
//Inside of for loop
siv.call_on_id("messages", |messages: &mut LinearLayout| {
needs_refresh = true;
message_count += 1;
messages.add_child(TextView::new(m));
if message_count <= 14 {
messages.remove_child(0);
}
});
If our refresh boolean is true at the end of the loop, then we can refresh the UI and show the changes.
if needs_refresh {
siv.refresh();
}
//End of loop
//End of Main
After completing the main function, the chat app finished. If you enter cargo run
into your terminal, A UI will pop up asking for a username and channel name. Entering at least a username will allow you to connect to either the channel you entered or "global" as a default. If you enter a message and click "Send", the message will appear on your screen. Open up multiple command lines and chat in between them, or chat with others if they have access to the same keys as you.
Next Steps
In this tutorial, we created a terminal chat app in Rust. It uses PubNub’s Pub/Sub to send and receive messages through a Cursive Terminal UI. This is only an example of what is possible with Rust and PubNub. If you’re using Rust to compute an algorithm, or run a game, and you need some I/O around the world, PubNub can help.
Want to try this in another language? PubNub has over 75+ SDKs and hundreds of tutorials.
Posted on August 15, 2019
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.