Protohackers in Rust: Echo Server
Thanks to @whatyouhide, I discovered protohackers, a series of challenges focused on network protocols. For each challenge, participants are required to write a server that implements a protocol spec. Once implemented, the server should be hosted somewhere and it gets automatically tested by protohackers test suite.
I'm a big fan of network protocols and I decided to participate to protohackers by doing the challenges in Rust and write about it in this blog series.
All the code in this series will be available on GitHub
# The Challenge
The first problem is to write a TCP Echo Service.
The basic requirements are to accepts TCP connections, receive data from a client and send it back untouched. We also need to handle at least 5 clients simultaneously.
# Getting Started
We are going to implement all the challenges using Tokio and the async ecosystem in Rust.
Now let's just bootstrap the projects and add some deps in there:
❯ cargo new --lib protohackers-rs
Created library `protohackers-rs` package
❯ cd protohackers-rs
❯ mkdir src/bin
❯ touch src/bin/smoke_test.rs
I started out with a library package, but then I will manually add in the src/bin folder one binary for each challenge. The main reason for this setup is that probably we will need common functions on each challenge, so a mixed crate lib/binary seems a good idea.
For the first challenge we are adding those deps:
- tokio (async runtime)
- anyhow (error handling)
- tracing (tracing and logging)
- tracing-subscriber (tracing utilities)
We can use the command cargo add
for that:
❯ cargo add tokio --features full
❯ cargo add anyhow
❯ cargo add tracing
❯ cargo add tracing-subscriber
At this point the Cargo.toml
will look like this:
[package]
name = "protohackers-rs"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0.68"
tokio = { version = "1.24.1", features = ["full"] }
tracing = "0.1.37"
tracing-subscriber = "0.3.16"
Now we create an empty main function in the smoke_test.rs
where we will write our solution for the first challenge:
#[tokio::main]
async fn main() -> anyhow::Result<()> {
Ok(())
}
Rust does not support async
in the main function, but we can using the tokio::main
attribute that will rewrite our main function in async style while bootstrapping
the tokio runtime.
You can check the generated code by running the cargo-expand command:
❯ cargo expand --bin smoke_test
Checking protohackers-rs v0.1.0 (~/Coding/protohackers-rs)
Finished dev [unoptimized + debuginfo] target(s) in 0.25s
#![feature(prelude_import)]
#[prelude_import]
use std::prelude::rust_2021::*;
#[macro_use]
extern crate std;
fn main() -> anyhow::Result<()> {
let body = async { Ok(()) };
#[allow(clippy::expect_used, clippy::diverging_sub_expression)]
{
return tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("Failed building the Runtime")
.block_on(body);
}
}
The original main code is wrapped in an async
block and passed to the runtime entry point method block_on
, which will run
it until completion.
An async block is a variant of a block expression that evaluates to a type that implements std::future::Future
generated by the compiler.
If you played with async in rust in pre async/await era you would probably remember that creating a future was not so easy, most of the time you were required to manually implement the Future
trait while wiring a custom state machine for tracking the Future
progress.
# Echo Service Implementation
Before staring our implementation let's bootstrap the logging system in our main function.
For logging we will be using the tracing
crate and the tracing-subscriber
with the default implementation for console logging.
tracing_subscriber::fmt::init(); // init the logging system
tracing::info!("Starting Echo service"); // log something
Here it's the output if you run the smoke_test
binary:
❯ cargo run --bin smoke_test
Finished dev [unoptimized + debuginfo] target(s) in 0.02s
Running `target/debug/smoke_test`
2023-01-15T09:42:56.803587Z INFO smoke_test: Starting Echo service
Now we are ready to write our Echo Service implementation. The server should first start accepting connections from clients that want to talk with us. Tokio provides that API in the TcpListener
which creates a server socket on a SocketAddr
(IP + port) listening for incoming connections.
let listener = TcpListener::bind("127.0.0.1:8000").await?;
loop {
let (socket, address) = listener.accept().await?;
debug!("Got connection from {}", address);
}
Here we bind the TcpListener
to a SocketAddr
and in a loop we wait for connections using the TcpListener::accept
method.
Once a client is connected, the method returns the communication channel for writing and reading data (TcpStream
) and the SocketAddr
of the remote connected peer.
Then we can use the returned TcpStream
in thehandle_client
function for the echo server logic:
async fn handle_client(mut socket: TcpStream) -> anyhow::Result<()> {
let mut buf = [0; 1024];
loop {
let read = socket.read(&mut buf).await?;
if read == 0 {
break;
}
socket.write_all(&buf[..read]).await?;
}
Ok(())
}
In a loop we read
data from the TcpStream
in a buffer, if the number of read bytes is 0 we have
reached the end-of-file and can break the loop. Else, we write back the the buffer from 0 to read (all bytes read).
For reading and writing we have to import two extensions traits tokio::io::{AsyncReadExt,AsyncWriteExt}
that provides utility methods such as read
and write_all
.
In this function we are not explicitly handling errors, we just use anyhow::Result as the return of the function and forward all possible errors to the caller with the ?
operator.
The requirements are also to handle at least 5 connections simultaneously, hence a single client connected should not block the acceptor loop. In the sync world we would spawn a new thread for handling the single client/socket, while in rust async world there is the concept of task, a unit of execution managed by the scheduler (asynchronous green threads).
Just like threads, tasks can be spawned in the executor using tokio::task::spawn
, which will enable the supplied task to run concurrently. After the connection is made we create an async
block where we call the previously defined handle_client
function with the TcpStream
as input.
Here's what the final implementation of the Echo Service looks like:
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
};
use tracing::{debug, error, info};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let listener = TcpListener::bind("127.0.0.1:8000").await?;
info!("Starting Echo service on 8000 port");
loop {
let (socket, address) = listener.accept().await?;
debug!("Got connection from {}", address);
tokio::task::spawn(async move {
if let Err(err) = handle_client(socket).await {
error!("Error handling client connection {}: {}", address, err);
}
});
}
}
async fn handle_client(mut socket: TcpStream) -> anyhow::Result<()> {
let mut buf = [0; 1024];
loop {
let read = socket.read(&mut buf).await?;
if read == 0 {
break;
}
socket.write_all(&buf[..read]).await?;
}
Ok(())
}
We don't want to miss possible errors bubbled up from the handle_function
, so we log them in the async block with tracing::error!
.
# Testing the solution
We can quickly test our solution with a tool like netcat
or socat
locally.
❯ cargo run --bin smoke_test
Finished dev [unoptimized + debuginfo] target(s) in 0.02s
Running `target/debug/smoke_test`
2023-01-15T11:04:45.698840Z INFO smoke_test: Starting Echo service on 8000 port
❯ echo test | socat - tcp:localhost:8000
test
For protohackers tests we can either host our echo service in a public server or we can use some proxy software in order to make our machine accessible from the internet.
I didn't want to setup the deploy or manually deploy the binary, so I opted out for the proxy solution using rathole.
# Shortcut solution
We just want to send back all the read bytes to the connected client, is there an utility that does that for us?
Yes there is!
Here's an example on how we could rewrite our handle_client
function with few lines of code:
async fn handle_client(mut socket: TcpStream) -> anyhow::Result<()> {
let (mut read, mut write) = socket.split();
tokio::io::copy(&mut read, &mut write).await?;
Ok(())
}
Being the TcpStream
a bidirectional channel, we can split it in the read
and write
parts and then call tokio::io::copy
for copying
the data from the read
part to the write
part of the TcpStream
.
Pretty cool!
# Wrapping Up
In this first chapter we have learned how to setup a TCP server for accepting connections, how to read/write data from/to a TcpStream
.
We've also learned how to bootstrap a tokio runtime with #[tokio::main]
attribute and spawn new tasks.
Even though it was just a simple echo service, it was fun getting back working with raw TCP, I'm sure it will get more interesting and fun in the next challenges.
Comments and suggestions are always welcome. Please feel free to send an email or comment on @wolf4ood@hachyderm.io.
Thank you for reading, see you in the next chapter!