Protohackers in Rust: Echo Server

Enrico Risa |
|
8 min |
1513 words

Thanks to @whatyouhide, I discovered protohackers, a series of challenges focused on network protocols. For each challenge, participants are required to write a server that implements a protocol spec. Once implemented, the server should be hosted somewhere and it gets automatically tested by protohackers test suite.

I'm a big fan of network protocols and I decided to participate to protohackers by doing the challenges in Rust and write about it in this blog series.

All the code in this series will be available on GitHub

# The Challenge

The first problem is to write a TCP Echo Service.

The basic requirements are to accepts TCP connections, receive data from a client and send it back untouched. We also need to handle at least 5 clients simultaneously.

# Getting Started

We are going to implement all the challenges using Tokio and the async ecosystem in Rust.

Now let's just bootstrap the projects and add some deps in there:

 cargo new --lib protohackers-rs
     Created library `protohackers-rs` package

 cd protohackers-rs
 mkdir src/bin
 touch src/bin/smoke_test.rs

I started out with a library package, but then I will manually add in the src/bin folder one binary for each challenge. The main reason for this setup is that probably we will need common functions on each challenge, so a mixed crate lib/binary seems a good idea.

For the first challenge we are adding those deps:

We can use the command cargo add for that:

 cargo add tokio --features full
 cargo add anyhow
 cargo add tracing
 cargo add tracing-subscriber

At this point the Cargo.toml will look like this:

[package]
name = "protohackers-rs"
version = "0.1.0"
edition = "2021"


[dependencies]
anyhow = "1.0.68"
tokio = { version = "1.24.1", features = ["full"] }
tracing = "0.1.37"
tracing-subscriber = "0.3.16"

Now we create an empty main function in the smoke_test.rs where we will write our solution for the first challenge:

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    Ok(())
}

Rust does not support async in the main function, but we can using the tokio::main attribute that will rewrite our main function in async style while bootstrapping the tokio runtime.

You can check the generated code by running the cargo-expand command:

 cargo expand --bin smoke_test
    Checking protohackers-rs v0.1.0 (~/Coding/protohackers-rs)
    Finished dev [unoptimized + debuginfo] target(s) in 0.25s

#![feature(prelude_import)]
#[prelude_import]
use std::prelude::rust_2021::*;
#[macro_use]
extern crate std;
fn main() -> anyhow::Result<()> {
    let body = async { Ok(()) };
    #[allow(clippy::expect_used, clippy::diverging_sub_expression)]
    {
        return tokio::runtime::Builder::new_multi_thread()
            .enable_all()
            .build()
            .expect("Failed building the Runtime")
            .block_on(body);
    }
}

The original main code is wrapped in an async block and passed to the runtime entry point method block_on, which will run it until completion.

An async block is a variant of a block expression that evaluates to a type that implements std::future::Future generated by the compiler.

If you played with async in rust in pre async/await era you would probably remember that creating a future was not so easy, most of the time you were required to manually implement the Future trait while wiring a custom state machine for tracking the Future progress.

# Echo Service Implementation

Before staring our implementation let's bootstrap the logging system in our main function. For logging we will be using the tracing crate and the tracing-subscriber with the default implementation for console logging.

tracing_subscriber::fmt::init(); // init the logging system
tracing::info!("Starting Echo service"); // log something

Here it's the output if you run the smoke_test binary:

 cargo run --bin smoke_test
    Finished dev [unoptimized + debuginfo] target(s) in 0.02s
     Running `target/debug/smoke_test`
2023-01-15T09:42:56.803587Z  INFO smoke_test: Starting Echo service

Now we are ready to write our Echo Service implementation. The server should first start accepting connections from clients that want to talk with us. Tokio provides that API in the TcpListener which creates a server socket on a SocketAddr (IP + port) listening for incoming connections.

let listener = TcpListener::bind("127.0.0.1:8000").await?;

loop {
    let (socket, address) = listener.accept().await?;
    debug!("Got connection from {}", address);
}

Here we bind the TcpListener to a SocketAddr and in a loop we wait for connections using the TcpListener::accept method. Once a client is connected, the method returns the communication channel for writing and reading data (TcpStream) and the SocketAddr of the remote connected peer.

Then we can use the returned TcpStream in thehandle_client function for the echo server logic:

async fn handle_client(mut socket: TcpStream) -> anyhow::Result<()> {
    let mut buf = [0; 1024];
    loop {
        let read = socket.read(&mut buf).await?;

        if read == 0 {
            break;
        }
        socket.write_all(&buf[..read]).await?;
    }
    Ok(())
}

In a loop we read data from the TcpStream in a buffer, if the number of read bytes is 0 we have reached the end-of-file and can break the loop. Else, we write back the the buffer from 0 to read (all bytes read). For reading and writing we have to import two extensions traits tokio::io::{AsyncReadExt,AsyncWriteExt} that provides utility methods such as read and write_all. In this function we are not explicitly handling errors, we just use anyhow::Result as the return of the function and forward all possible errors to the caller with the ? operator.

The requirements are also to handle at least 5 connections simultaneously, hence a single client connected should not block the acceptor loop. In the sync world we would spawn a new thread for handling the single client/socket, while in rust async world there is the concept of task, a unit of execution managed by the scheduler (asynchronous green threads).

Just like threads, tasks can be spawned in the executor using tokio::task::spawn, which will enable the supplied task to run concurrently. After the connection is made we create an async block where we call the previously defined handle_client function with the TcpStream as input.

Here's what the final implementation of the Echo Service looks like:

use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    net::{TcpListener, TcpStream},
};
use tracing::{debug, error, info};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    tracing_subscriber::fmt::init();

    let listener = TcpListener::bind("127.0.0.1:8000").await?;

    info!("Starting Echo service on 8000 port");

    loop {
        let (socket, address) = listener.accept().await?;
        debug!("Got connection from {}", address);
        tokio::task::spawn(async move {
            if let Err(err) = handle_client(socket).await {
                error!("Error handling client connection {}: {}", address, err);
            }
        });
    }
}

async fn handle_client(mut socket: TcpStream) -> anyhow::Result<()> {
    let mut buf = [0; 1024];
    loop {
        let read = socket.read(&mut buf).await?;

        if read == 0 {
            break;
        }
        socket.write_all(&buf[..read]).await?;
    }
    Ok(())
}

We don't want to miss possible errors bubbled up from the handle_function, so we log them in the async block with tracing::error!.

# Testing the solution

We can quickly test our solution with a tool like netcat or socat locally.

 cargo run --bin smoke_test
    Finished dev [unoptimized + debuginfo] target(s) in 0.02s
     Running `target/debug/smoke_test`
2023-01-15T11:04:45.698840Z  INFO smoke_test: Starting Echo service on 8000 port

 echo test | socat - tcp:localhost:8000
test

For protohackers tests we can either host our echo service in a public server or we can use some proxy software in order to make our machine accessible from the internet.

I didn't want to setup the deploy or manually deploy the binary, so I opted out for the proxy solution using rathole.

# Shortcut solution

We just want to send back all the read bytes to the connected client, is there an utility that does that for us?

Yes there is!

Here's an example on how we could rewrite our handle_client function with few lines of code:

async fn handle_client(mut socket: TcpStream) -> anyhow::Result<()> {
    let (mut read, mut write) = socket.split();
    tokio::io::copy(&mut read, &mut write).await?;
    Ok(())
}

Being the TcpStream a bidirectional channel, we can split it in the read and write parts and then call tokio::io::copy for copying the data from the read part to the write part of the TcpStream.

Pretty cool!

# Wrapping Up

In this first chapter we have learned how to setup a TCP server for accepting connections, how to read/write data from/to a TcpStream. We've also learned how to bootstrap a tokio runtime with #[tokio::main] attribute and spawn new tasks.

Even though it was just a simple echo service, it was fun getting back working with raw TCP, I'm sure it will get more interesting and fun in the next challenges.

Comments and suggestions are always welcome. Please feel free to send an email or comment on @wolf4ood@hachyderm.io.

Thank you for reading, see you in the next chapter!