Distributed System challenges in Rust (Gossip Glomers)

by Anirudh on 23 Sep '23

A few months ago, I stumbled upon these distributed system challenges put out by Fly.io. Going through them, I felt like they were a fantastic set of problems that would allow me to practice my theoretical knowledge of distributed systems and learn quite a lot along the way.


I (finally) got some time to sit down and dive into these problems and try to solve them. Along with the set of problems, there is an executable/library (Maelstrom) provided which would run tests against your system to validate that the behaviour is correct. There is also a Golang library which sets up some of the SDK-ish part, and allows you to focus on solving the challenges. I decided to use Rust instead of Golang, since I wanted to get some more practice with Rust.


This would have involved implementing their SDK in Rust, but thankfully, I came across this repository by Jon Gjengset (he has a fantastic YouTube channel btw, lots of really educational Rust videos), where he had spent time solving these challenges himself. I decided to pick the "library" part of his code and use it as a starting point for my own implementation. I still wanted to read and understand the decision making process behind the design of the SDK, so I'm spending some time going through the code and trying to understand it.


Coming from a non-systems programming background, there were quite a few things in the library that were difficult for me to grok. I decided to start from the main exported function, main_loop, and dig into the code from there.

The first thing that was part of the implementation, that I did not understand, was that the code was taking locks for stdin and stdout. I had no idea why this was being done, so I decided to research a little bit.


STDIN/STDOUT locking


pub fn main_loop<S, N, P, IP>(init_state: S) -> anyhow::Result<()>
where
    P: DeserializeOwned + Send + 'static,
    N: Node<S, P, IP>,
    IP: Send + 'static,
{
    let (tx, rx) = std::sync::mpsc::channel();

    let stdin = std::io::stdin().lock();
    let mut stdin = stdin.lines();
    let mut stdout = std::io::stdout().lock();
    // ...
    // ...

}

Stdin from Rust's docs

Each handle is a shared reference to a global buffer of input data to this process.

A handle can be lock’d to gain full access to BufRead methods (e.g., .lines()). Reads to this handle are otherwise locked with respect to other reads.


What I understand from the docs and this very helpful SO comment , is that since stdin is a global buffer, it is possible for multiple threads to read from it at the same time. This would cause the reads to be interleaved, and would cause some commands coming from the Maelstrom server to be missed. This is why the code is taking a lock on stdin, so that only one thread can read from it at a time. Similar reasoning applies to the STDOUT lock as well.


Having an exclusive handle also allows us to use the BufRead methods, like .lines() which allows us to iterate over the lines of the input (better DX).


Initialization


According to the spec, the first thing that happens when tests start running, is that the Maelstrom server pushes an "init" message to our node. This message contains the node id of our node, and the list of all the nodes in the cluster, including ours. This is the first thing that we need to handle in our main_loop.


let init_msg: Message<InitPayload> = serde_json::from_str(
        &stdin
            .next()
            .expect("no init message received")
            .context("failed to read init message from stdin")?,
    )
    .context("init message could not be deserialized")?;
let InitPayload::Init(init) = init_msg.body.payload else {
    panic!("first message should be init");
};
let mut node: N =
    Node::from_init(init_state, init, tx.clone()).context("node initilization failed")?;

let reply = Message {
    src: init_msg.dest,
    dest: init_msg.src,
    body: Body {
        id: Some(0),
        in_reply_to: init_msg.body.id,
        payload: InitPayload::InitOk,
    },
};

reply.send(&mut stdout).context("send response to init")?;

drop(stdin);

Serde

serde is a very popular Rust library for serializing and deserializing data.


We pick the first line from stdin, and deserialize it into a Message<InitPayload>. We take this init message, and then initialize our "node" from the data provided in it. We then send a reply back to the Maelstrom server, acknowledging the creation of our node.


Then, we drop the stdin handle manually, so that we can take a new lock on it from the executing thread later on.


Node


The node is the core of our implementation. There are many variants of a node, depending on the kind of challenge that we're solving. Each variant receives a different kind of payload, and has different behaviour. In order to support this, a Node trait is defined, which is implemented by each variant of the node. It is generic over the state that it holds, the type of payload that it receives, and a mpsc sender.


sync::mpsc

Rust's mpsc MPSC package provides a way to communicate between threads. It allows us to create multiple producers (instances that are capable of sending a message, which can be cloned and handed over to any number of threads), and a single consumer, which receives messages from all these threads.


This can continue until one half of this channel hangs up (is dropped by going out of scope, or dropped manually). Once this happens, the other half of the channel receives an Error the next time it tries to poll for a message (consumer) or send a message (producers).

We create a separate thread of execution, where we read lines from stdin, deserialize them into the Message struct, and pass them into the Node's step function to tell it to perform the required operation for the relevant challenge.


There are still a couple of things that are unclear to me -


  • Why do we send messages to the main thread from a separate thread which is reading stdin, instead of reading and then executing the step function from the main thread itself?
  • There are a few properties/functions that I don't understand the use of yet. I'm confident that once I get to the gnarlier problems, these things will make sense to me.
  • I still need to understand how the Event<Message> construct works - and how I would make use of it while implementing all the node variants.

In the next part of these blog posts, I'll be trying my hand at the first challenge of the series - Echo!