Implementing the Raft protocol in Rust
Introduction
I embarked on building the Raft consensus protocol to deepen my understanding of one of the most widely used algorithms in distributed systems, drawn by its reputation for clarity compared to the complex Paxos. I chose Cap’n Proto, a high-performance serialization library that’s a great fit for Rust, because I was eager to explore its capabilities.
Goals of the Project
Initially, I expected this to be a quick weekend project, but implementing Raft correctly, handling edge cases and operations beyond basic role switching, proved far more challenging than anticipated. My goals were to create a functional Raft implementation and master Cap’n Proto’s integration with Rust. (Note: I haven’t yet stress-tested the system with Jepsen, so there could be hidden bugs waiting to be uncovered!).
What to Expect in This Post
In this post, I’ll guide you through my implementation, covering the design, core components, election process, and lessons learned along the way. Whether you’re a Rust enthusiast, a distributed systems beginner, or simply passionate about coding, I aim to offer insights that spark your curiosity.
Understanding Raft
What Is Raft?
Raft is a consensus algorithm crafted to ensure reliability in distributed systems, allowing multiple nodes to agree on a shared state even when some nodes crash or network connections falter. Unlike its notoriously complex predecessor, Paxos, Raft emphasizes simplicity by breaking down the consensus process into three clear components: leader election, log replication, and safety mechanisms, making it easier to understand and implement.
Key Concepts: Leader Election, Log Replication, and Safety
In Raft, nodes operate in one of three roles, leader, follower, or candidate, communicating through heartbeats to maintain order and vote requests to drive elections. Leaders manage the cluster, followers replicate the leader’s logs, and candidates compete to become the new leader during elections. Raft ensures consistent logs across nodes using terms and log indices.
Raft in the Real World
Raft underpins critical systems like etcd, a backbone of Kubernetes, and Consul, where reliability is paramount. For a deeper understanding, I recommend exploring the Raft paper or test the interactive visualizations on this project, which vividly bring the algorithm to life and clarify its mechanics.

Why Cap’n Proto?
Overview of Cap’n Proto
Cap’n Proto is a high-speed serialization and RPC framework designed for simplicity and performance, distinguishing itself from JSON or Protocol Buffers by avoiding data copying during serialization. Its pipelining feature allows multiple requests to be sent in a single round trip, such as querying the leader and sending requests simultaneously.
Integrating Cap’n Proto with Rust
Getting started with Cap’n Proto in Rust was relatively straightforward: I defined schemas, followed the official documentation, and set up a build.rs
file to generate Rust code from those schemas. Using a Git submodule to share schemas across projects ensured consistency, but I’m still exploring how to optimize message passing between tasks or threads.
fn main() {
::capnpc::CompilerCommand::new()
.src_prefix("schema")
.file("schema/raft.capnp")
.file("schema/storage.capnp")
.run()
.expect("compiling schema");
}
[build-dependencies]
capnpc = "0.20.1"
Project Architecture
High-Level Design
Raft focuses on four core tasks: managing node states (leader, follower, or candidate), processing vote requests and heartbeats, supporting dynamic configuration changes, and snapshot replication for efficient state transfer. I didn’t implement the snapshot replication (yet).
Core Modules
My system is organized into four key modules:
node
: The central hub that orchestrates all consensus logic.server
: Handles incoming requests and routes them to the node.peer
: Manages peer connections to prevent blocking the main logic.storage
: Persists logs and state to disk for durability.
Using Tokio’s mpsc for Communication
I leveraged Tokio’s mpsc
(multi-producer, single-consumer) channels to facilitate communication between the client and server sides, enabling followers and candidates to run a server loop that listens for messages while leaders execute a client loop to send heartbeats. These loops operate as tasks within a single thread, using mpsc
to update node state without relying on Rc<RefCell<T>>
or Arc<Mutex<T>>
, maintaining a lock-free and decoupled system.
Core Components
The Node
Role as the System’s Heart
The node
serves as the beating heart of the system, where Raft’s consensus magic comes to life. It’s a central struct that holds the state, manages timeouts, and coordinates system-wide communication through channels, ensuring all components work in harmony to achieve consensus.

Main Loop with Tokio’s select!
At its core, the node runs a loop powered by Tokio’s select!
macro, which determines whether to process incoming messages, send heartbeats, or initiate elections based on the current context. I’m not entirely convinced that checking self.state.role()
with equality comparisons is the most elegant approach, perhaps there’s a cleaner, more idiomatic Rust solution. Suggestions are welcome!
pub async fn run(mut self) {
let mut heartbeat_interval = interval(Duration::from_secs_f64(self.heartbeat_interval));
let mut election_timeout = Box::pin(sleep(self.election_dur()));
let last_log_info = self.state.last_log_info();
tracing::info!(action="starting", term=%self.state.current_term(), last_log_index=last_log_info.last_log_index(), last_log_term=last_log_info.last_log_term());
loop {
tokio::select! {
// Calls to manage peers reconnection
Some(rpc) = self.peers_channel.recv() => {
self.state.add_peer(rpc);
}
// Incoming RPCs from external users
Some(rpc) = self.commands_channel.recv() => {
self.handle_command(rpc).await;
}
// RPCs from the cluster's leader
Some(rpc) = self.raft_channel.recv() => {
match rpc {
RaftMsg::AppendEntries(req) => {
let msg = req.msg;
let sender = req.sender;
let resp = self.state.handle_append_entries(
msg.term,
&msg.leader_id,
msg.prev_log_index as usize,
msg.prev_log_term,
msg.leader_commit,
msg.entries,
);
if sender.send(resp).is_err(){
tracing::error!("Failed to send response in channel for append entries");
}
}
RaftMsg::Vote(req) => {
let msg = req.msg;
let sender = req.sender;
let resp = self.state.handle_request_vote(
msg.term(),
msg.candidate_id(),
msg.last_log_index(),
msg.last_log_term(),
);
if sender.send(resp).is_err(){
tracing::error!("Failed to send response in channel for vote");
}
}
}
election_timeout.as_mut().reset(Instant::now() + self.election_dur());
}
//TODO: maybe the following functions could be driven by the role and a trait
// election timeout fires → start election
_ = &mut election_timeout, if matches!(self.state.role(), Role::Follower | Role::Candidate) => {
election_timeout.as_mut().reset(Instant::now() + self.election_dur());
self.state.become_candidate();
tracing::info!(action="sendVotes");
let result = vote(&self.state, self.state.peers()).await;
if self.state.is_majority(result.votes_granted()) {
self.state.become_leader();
}
self.disconnect_peers(result.failed_peers()).await;
}
// heartbeat tick → send heartbeats if leader
_ = heartbeat_interval.tick(), if matches!(self.state.role(), Role::Leader) => {
tracing::info!(action="sendHeartbeat");
//NOTE: for the heartbeat we don't really care about the error nor the number of successful append entries, or do we?
let _ = self.send_entries(&[]).await;
}
}
}
}
Managing State and Timeouts
The node tracks timeouts, which I’m considering defining as a range rather than a fixed value to add flexibility, and delegates critical decisions, like transitioning to a candidate state or handling votes, to the state
struct. This design keeps the node focused on high-level coordination, streamlining the consensus process.
The Server
Handling Incoming Requests
The server component is the simplest part of the system, listening on a port to capture incoming requests and forwarding them to the node for processing. Most of the server’s code consists of Cap’n Proto boilerplate, featuring a loop that processes incoming messages efficiently. Two primary functions handle vote requests and log entries, reading requests, querying the node, and returning responses.
#[derive(Debug, Clone)]
pub struct Server {
raft_channel: Sender<RaftMsg>,
commands_channel: Sender<CommandMsg>,
peers_channel: Sender<PeersDisconnected>,
}
impl Server {
pub fn new(
raft_channel: Sender<RaftMsg>,
commands_channel: Sender<CommandMsg>,
peers_channel: Sender<PeersDisconnected>,
) -> Self {
Self {
raft_channel,
commands_channel,
peers_channel,
}
}
pub async fn run(self, addr: SocketAddr) -> Result<(), Box<dyn std::error::Error>> {
let listener = tokio::net::TcpListener::bind(&addr).await?;
let client: raft_capnp::raft::Client = capnp_rpc::new_client(self);
loop {
let (stream, _) = listener.accept().await?;
stream.set_nodelay(true)?;
let (reader, writer) =
tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split();
let network = twoparty::VatNetwork::new(
futures::io::BufReader::new(reader),
futures::io::BufWriter::new(writer),
rpc_twoparty_capnp::Side::Server,
Default::default(),
);
let rpc_system = RpcSystem::new(Box::new(network), Some(client.clone().client));
tokio::task::spawn_local(rpc_system);
}
}
}
Processing Votes and Log Entries
The append_entries
function, responsible for handling log entries, currently creates unnecessary lists and struggles with log mismatches, an area I plan to optimize soon. Despite these shortcomings, it performs its core job of reading requests, forwarding them to the node, and sending responses back to clients, ensuring basic functionality.
fn append_entries(
&mut self,
params: raft::AppendEntriesParams,
mut results: raft::AppendEntriesResults,
) -> capnp::capability::Promise<(), capnp::Error> {
let request = pry!(pry!(params.get()).get_request());
let entries = pry!(request.get_entries());
//TODO: find a way to no use a list
let mut new_entries: Vec<LogEntry> = Vec::with_capacity(entries.len() as usize);
for e in entries {
new_entries.push(LogEntry::new(
e.get_index(),
e.get_term(),
pry!(e.get_command()).to_vec(),
));
}
let leader = pry!(pry!(request.get_leader_id()).to_str());
let (msg, rx) = RaftMsg::request_append_entries(
request.get_term(),
leader.into(),
request.get_prev_log_index(),
request.get_prev_log_term(),
new_entries,
request.get_leader_commit(),
);
let raft_channel = self.raft_channel.clone();
// let entries_client = pry!(request.get_handle_entries());
Promise::from_future(async move {
if let Err(err) = raft_channel.send(msg).await {
tracing::error!("error sending the append_entries to the state {err}");
return Err(capnp::Error::failed(
"error sending the append_entries to the state".into(),
));
};
match rx.await {
Ok(resp) => {
let mut response = results.get().get_response()?;
match resp {
AppendEntriesResult::Ok => {
response.set_ok(());
}
AppendEntriesResult::TermMismatch(term) => {
response.set_err(term);
}
AppendEntriesResult::LogEntriesMismatch {
last_index: _,
last_term: _,
} => {
// let mut entries_client = entries_client.get_request();
// entries_client.get().set_last_log_index(last_index);
// entries_client.get().set_last_log_term(last_term);
// let entries_up_to_date = entries_client.send().promise.await?;
//TODO: use the response to update its log's entries
response.set_ok(());
}
}
Ok(())
}
Err(err) => {
tracing::error!("error receiving the append_entries response {err}");
Err(capnp::Error::failed(
"error receiving the append_entries response".into(),
))
}
}
})
}
The State
Managing Node Data
The state
struct acts as the node’s single source of truth, storing essential data like the node’s role, last committed index and term, peer list, and IDs. This centralized data ensures all consensus decisions are based on accurate, up-to-date information.
#[derive(Default, Debug)]
pub struct State {
id: NodeId,
role: Role,
leader: Option<NodeId>,
leader_state: LeaderState,
hard_state: HardState,
soft_state: SoftState,
peers: Peers,
}
impl State {
pub fn new(id: NodeId, path: impl AsRef<Path>) -> Self {
let mut hard_state = HardState::new(path);
hard_state
.load_from_disk()
.expect("hard config load failed");
Self {
id,
hard_state,
..Default::default()
}
}
}
impl PeersManagement for State {
// more code
}
impl Consensus for State {
// more code
}
Consensus Logic
Beyond storing data, the state
also implements the consensus logic, managing role transitions, vote processing, and log entry handling. I’m still debating whether the state
should handle all this logic directly or if an intermediary struct would better separate concerns, any thoughts on this design choice? For now, keeping the logic close to the data simplifies implementation and testing, but I’m open to exploring alternatives.
Peer Manager
Reconnecting Peers
The peer
module manages a background task queue, populated via a channel, to handle peer reconnections efficiently. When a connection drops, a task is enqueued to reestablish it, and once complete, the task is consumed via select!
and passed to the node, ensuring smooth, non-blocking peer handling that keeps the system responsive.
pub struct PeersReconnectionTask {
rx: Receiver<PeersDisconnected>,
tx: Sender<NewPeer>,
initial_backoff_ms: u64,
max_backoff_ms: u64,
}
impl PeersReconnectionTask {
pub fn new(
rx: Receiver<PeersDisconnected>,
tx: Sender<NewPeer>,
initial_backoff_ms: u64,
max_backoff_ms: u64,
) -> Self {
Self {
rx,
tx,
initial_backoff_ms,
max_backoff_ms,
}
}
pub async fn run(mut self) {
let mut reconnection_tasks: JoinSet<NewPeer> = JoinSet::new();
loop {
tokio::select! {
// Handle new disconnected peers
Some(peers_disconnected) = self.rx.recv() => {
for peer_disconnected in peers_disconnected.0 {
reconnection_tasks.spawn_local(Self::reconnect_peer(
self.initial_backoff_ms,
self.max_backoff_ms,
peer_disconnected));
}
}
// Handle completed reconnections
Some(reconnection_result) = reconnection_tasks.join_next() => {
match reconnection_result {
Ok(new_peer) => {
// Send the reconnected peer back to the main node
if let Err(e) = self.tx.send(new_peer).await {
tracing::error!(action = "failed_to_send_reconnected_peer", error = ?e);
}
}
Err(join_error) => {
tracing::error!(action = "reconnection_task_failed", error = ?join_error);
}
}
}
// Yield control when no work is available
else => {
tokio::task::yield_now().await;
}
}
}
}
}
Avoiding Blocking in the Main Logic
By delegating reconnection tasks to the background, the node remains focused on consensus logic without being bogged down by network issues. This design choice has worked cleanly without major challenges.
Storage Engine
Persisting Logs and State
The storage
module currently uses a simple file-based approach to persist logs and state, ensuring durability across node restarts. To make it nicer, I plan to build a custom LSM tree. I might document this journey in a future blog post, diving deeper into storage optimization for distributed systems.
Raft in Action
Leader Election
Randomized Timers
To prevent multiple nodes from triggering elections simultaneously, I implemented random timers that can be modified passing the values as args when calling the binary. A dedicated task sleeps for this duration and checks whether an election is necessary, avoiding conflicts and ensuring smooth election processes.
Requesting and Validating Votes
When a node initiates an election, it increments its current_term
, transitions to the candidate state, votes for itself, and sends request_vote
calls to all servers in the cluster. Receivers validate three conditions: the candidate’s term must exceed their own, they must not have voted or have voted for this candidate, and the candidate’s last log must be up to date (with an index and term matching or exceeding theirs).
fn handle_request_vote(
&mut self,
term: u64,
candidate_id: &str,
last_log_index: u64,
last_log_term: u64,
) -> VoteResponse {
tracing::info!(
action = "receiveVote",
term = term,
candidate = candidate_id,
last_log_index = last_log_index,
last_log_term = last_log_term
);
if term < self.current_term() {
return VoteResponse::not_granted(self.current_term());
}
let candidate = NodeId::new(
SocketAddr::from_str(candidate_id)
.expect("why candidate_id isnt a correct socketaddrs?"),
);
if term > self.current_term() {
self.become_follower(None, term);
}
let condidate_id_matches = self.voted_for().is_none_or(|addr| addr.eq(&candidate));
let last_log_info = self.last_log_info();
//NOTE: in the paper we find it has "at least up to date" and in a presentation we find this formula
let logs_uptodate = last_log_term > last_log_info.last_log_term()
|| (last_log_index >= last_log_info.last_log_index()
&& last_log_term == last_log_info.last_log_term());
if condidate_id_matches && logs_uptodate {
self.vote_for(candidate);
VoteResponse::granted(self.current_term())
} else {
VoteResponse::not_granted(self.current_term())
}
}
Handling Election Outcomes
The candidate awaits either vote responses or a heartbeat; if it secures a majority of votes before receiving a heartbeat, it becomes the leader.
Note: Random timers are critical to avoid infinite election loops, a problem that can trap nodes if timeouts are fixed, especially in smaller clusters like a three-node setup.
Leader Responsibilities
Sending Heartbeats
The leader sends regular heartbeats to followers, signaling its active status to prevent them from initiating new elections. Upon receiving a heartbeat, followers should update their last_heartbeat
field, delaying their election timers. Currenlty I simply restart the elections timer.
Appending Log Entries
Leaders process client requests by appending new entries to the log and replicating them across followers, with the handle_append_entries
function driving this process. While functional, this function needs optimization to handle log mismatches more efficiently and avoid creating unnecessary lists, improvements I plan to tackle soon to enhance performance.
fn handle_append_entries(
&mut self,
term: u64,
leader_id: &str,
prev_log_index: usize,
prev_log_term: u64,
leader_commit: u64,
entries: Vec<LogEntry>,
) -> AppendEntriesResult {
tracing::info!(
action = "receiveAppendEntries",
term = term,
leader = leader_id,
prev_log_index = prev_log_index,
prev_log_term = prev_log_term,
leader_commit = leader_commit
);
//1
if term < self.current_term() {
return AppendEntriesResult::TermMismatch(self.current_term());
}
if self.role() != &Role::Follower {
self.become_follower(
Some(
SocketAddr::from_str(leader_id).expect("why leader_id isnt a correct socketaddrs?"),
),
term,
);
}
let current_entries = &mut *self.log_entries();
// 2
if !current_entries.previous_log_entry_is_up_to_date(prev_log_index, prev_log_term) {
//TODO: when we return this error, the leader needs to know the last index/term of the
//failing node so it can send the log entries to make him update
let result = self.last_log_info();
return AppendEntriesResult::LogEntriesMismatch {
last_index: result.last_log_index(),
last_term: result.last_log_term(),
};
}
// 3 and 4
// TODO: send this as a message to avoid blocking?
let result = current_entries.merge(entries);
//5
if leader_commit > self.commit_index() {
self.update_commit_index(std::cmp::min(leader_commit, result.last_log_index()));
}
AppendEntriesResult::Ok
}
Lessons Learned
Benefits of Message Passing
After experimenting with both Rc<RefCell<T>>
and mpsc
channels, I found channels to be a far better choice, as they decouple logic cleanly, create clear boundaries, and are a joy to implement. This approach simplified my code and made it easier to reason about compared to managing shared state with locks.
Importance of Random Timeouts
Random, frequently updated timeouts proved essential in preventing consensus stalls, particularly in three-node setups where a lagging node with a short timeout could trigger endless elections. Randomizing timeouts is a must for stable Raft clusters, ensuring robust operation under varying network conditions.
Conclusion
Project Achievements
I successfully built a working Raft implementation in Rust, integrated Cap’n Proto for efficient communication, and gained deep insights into distributed systems. The code is available on GitHub, and I invite you to explore it, try it out, and share your thoughts on how it can be improved.
Next Steps
My upcoming tasks involve conducting Jepsen stress tests to identify any hidden issues, adding snapshot replication for efficient state transfers, and exploring the development of an LSM tree storage engine to boost performance. Additionally, I need to refine some details. Currently, outdated followers (in terms of logs) are not being updated. I plan to enable followers to request missing entries directly from the leader, rather than having the leader track each follower’s log index and term to send the necessary updates. This approach will eliminate the need to store follower-specific state information.
Get Involved: Explore the Code
This project was a thrilling ride, blending Rust’s power with Raft’s elegance and Cap’n Proto’s speed. I’d love to hear your feedback or see how you build on it, fork the repo, experiment with the code, or share your own Raft adventures. Happy coding!