Published

Rusty Redis: The Architecture of an Async Key-Value Store

January 20, 202628 min read

"Rust is just C++ with safety rails."

That is what everyone tells you. They say it's for people who can't handle pointers.
But honestly, they are WRONG. Rust is not just about safety. It is about Fearless Concurrency.

NOTE: This is a technical deep-dive into the architecture of Rusty Redis, a high-performance Redis clone written in Rust. We will dissect the Tokio runtime, the physics of lock contention, and the memory model required to handle 1.5 million operations per second.


what this post covers

performance snapshot

hardware:

  • cpu: amd ryzen 7 7840hs (8 cores, 16 threads)
  • ram: 16gb ddr5
  • os: linux

benchmark results (100k operations):

throughput: 1,355,691 ops/sec
set operations: ~677,845 ops/sec
get operations: ~677,846 ops/sec
p99 latency: <1ms

peak performance exceeds 1.5 million operations per second under optimal conditions

test constraints: single-node, localhost benchmark via custom python script using 10 concurrent workers performing 100,000 set/get operations over raw tcp sockets


Prologue: The Concurrency Wall

In my previous project, Lithos, I built a database in C.
It was a lesson in Absolute Control. I managed every byte of memory, every disk flush, every thread. I used pthreads and mutexes. I manually aligned memory to CPU cache lines to avoid false sharing.

And to be very honest with you? I felt like a god.

Unfortunately for me, even gods hit ceilings (sometimes).
And in my case, the ceiling was OS Threads.

In C (and standard Java/C++), a "Thread" is a kernel object. It is heavy. It is expensive.
When you spawn a thread in Linux, you aren't just creating a while loop. You are asking the Kernel to reserve real physical resources.

  • Stack Size: The kernel typically reserves 8MB of virtual memory per thread (on x86_64 Linux).
  • Context Switch: Switching between threads requires a trap into Kernel Mode (Ring 0). The CPU has to save all registers (General Purpose, SSE, AVX). It has to flush the TLB (Translation Lookaside Buffer). It has to pollute the L1/L2 caches with kernel code.
  • The Cost: This "dance" costs ~5-10 microseconds. That sounds fast, but if you do it 100,000 times a second, your CPU spends 100% of its time switching contexts and 0% of its time running your database logic.

If you want to handle 10,000 concurrent connections in C, you have two tragic choices:

  1. The Apache Model (Thread-per-Request):
    Spawn 10,000 threads.

    • The Reality: Your server crashes. 10,000 threads * 8MB stack = 80GB of RAM. Just for empty stacks. Even if you tune the stack size down, the scheduler thrashing will kill your latency.
  2. The NGINX Model (Event Loops):
    Use epoll (Linux), kqueue (BSD), or IOCP (Windows).

    • The Reality: Performance is excellent. But your code becomes a nightmare. You have to break your logic into state machines. You are manually managing callback pointers and void* user_data contexts. You lose linear control flow. "Read this, then write that" becomes "Register read callback, return, wait for callback, register write callback..."

Me being me, I wanted the raw performance of NGINX with the readability of blocking code.

I wanted Async Rust.

This is the story of Rusty Redis.
More importantly tho, this is a manual on how to build your very own high-concurrency systems without losing your sanity.


PART I: THE PHYSICS OF THE WIRE

Chapter 1: The Protocol (RESP)

Before we talk about Rust, we must talk about the Language of the Wire.
A database is useless if clients cannot talk to it. And I didn't want to write a custom client. I wanted to use redis-cli itself since it is the industry standard.

Rusty Redis implements RESP (Redis Serialization Protocol).
It is a study in pragmatic engineering. It balances Human Readability with Parsing Efficiency.

The Anatomy of a Frame

Every request is a "Frame". A Frame is a type-length-value encoding, but text-based. It uses the Control Characters \r (CR) and \n (LF) as delimiters.

The protocol defines 6 basic types:

  • + : Simple String (Status, e.g., "OK")
  • - : Error (e.g., "ERR unknown command")
  • : : Integer (e.g., "1000")
  • $ : Bulk String (Binary Safe data)
  • * : Array (Aggregate type)
  • Null : Represents absence of value

If I want to run SET key val, the client sends an Array of 3 Bulk Strings:

*3\r\n      (Array length 3)
$3\r\n      (Next string is 3 bytes)
SET\r\n     (The bytes)
$3\r\n      (Next string is 3 bytes)
key\r\n     (The bytes)
$3\r\n      (Next string is 3 bytes)
val\r\n     (The bytes)

Why specific delimiters?
Why not just use a binary length prefix? Why the \r\n overhead?

Because of Debuggability.
You can literally type *3 into a raw TCP socket via telnet or netcat, and the server will understand you. This debuggability is worth the slight overhead compared to a compact binary protocol (like Protobuf or MsgPack). Say hypothetically, when you are debugging a production outage at 3 AM, being able to read the raw traffic is gonna be a superpower.

Chapter 2: The Zero-Copy Parser

Moving to the harder stuff now. How do we read this from the network efficiently?

In C, parsing this is dangerous. You risk buffer overflows if you miss a bound check.
In naive Rust, you might be tempted to use String::from_utf8.

Do not do this.

The Allocation Problem (The Silent Killer)

Every time you allocate memory on the Heap (malloc or String::new), you are paying a tax.

  1. The Lock: The memory allocator (jemalloc/glibc) must acquire a lock on the heap arena to prevent corruption.
  2. The Search: It must traverse a free list to find a block of the correct size.
  3. The Copy: If you read data from the TCP buffer into a temporary buffer, and then copy it into a String, you are burning memory bandwidth.

If your server handles 100,000 requests per second, and each request allocates 3 Strings (Command, Key, Value), that is 300,000 allocations per second.
Your CPU will spend more time managing the Heap than running your database.

What's the solution to this, you may wonder?

The Solution: The Bytes Crate

Rusty Redis makes use of the bytes crate (part of the Tokio ecosystem).
A Bytes object is a specialized, reference-counted slice of memory. It acts like a shared pointer (Arc) to a region of memory, but with O(1) slicing.

When we read 4KB from the TCP socket, we read it into one massive BytesMut buffer.

Scenario: We receive a packet containing SET key value.

  1. We have one buffer A containing [S, E, T, _, k, e, y, _, v, a, l, u, e].
  2. We parse the command "SET". We do NOT create a new String "SET".
  3. We create a Bytes handle that points to offset 0-2 of Buffer A.
  4. We parse the key "key". We create a Bytes handle pointing to offset 4-6 of Buffer A.

We now have 3 "objects" in our code (Cmd, Key, Value), but zero new memory allocations for the data itself. They all point to the same underlying chunk of RAM.
The memory is automatically freed only when the last reference (e.g., the stored Value in the database) is dropped.

The Rust Implementation

This is how we define the Frame structure to be zero-copy. Notice we store Bytes, not String.

use bytes::Bytes;

/// A Frame enum representing the parsed data
/// Note: We use Bytes, not String, for bulk data.
#[derive(Clone, Debug)]
pub enum Frame {
    Simple(String),
    Error(String),
    Integer(i64),    // Signed integer for RESP compliance
    Bulk(Bytes),     // <--- Shared ownership via Arc internally
    Null,
    Array(Vec<Frame>),
}

And here is the parsing logic using a Cursor. A Cursor allows us to track our position in the byte stream without modifying the underlying data.

use std::io::Cursor;
use bytes::{Buf, Bytes};

// The Parse Function: Extracts a Frame from the buffer without copying data
pub fn parse(src: &mut Cursor<&[u8]>) -> Result<Frame, Error> {
    match get_u8(src)? {
        b'+' => {
            // Simple strings are usually small (e.g., "OK"), so copying 
            // to a String here is acceptable for convenience.
            let line = get_line(src)?;
            let string = String::from_utf8(line.to_vec())?;
            Ok(Frame::Simple(string))
        }
        b'$' => {
            // Bulk Strings are where the heavy data lives.

            // 1. Read the length prefix (e.g., $3 -> len = 3)
            let len = get_decimal(src)? as usize;

            // 2. Extract the data WITHOUT copying
            // `copy_to_bytes` increments the ref-count of the underlying buffer
            // and returns a specialized slice. This is an O(1) operation.
            let data = src.copy_to_bytes(len);

            // 3. Skip the trailing \r\n (Protocol compliance)
            if get_u8(src)? != b'\r' || get_u8(src)? != b'\n' {
                 return Err("Invalid protocol".into());
            }

            Ok(Frame::Bulk(data))
        }
        // ... (Handling Arrays and Integers follows similar logic)
    }
}

The Buffered IO Loop (The Fragmentation Problem)

Parsing is tricky because TCP is a Stream, not a packet queue.

When you call socket.read(), you get a chunk of bytes. You have no guarantee that this chunk contains a complete message.

We might get: *3\r\n$3\r\nSET\r\n$3\r\nke ... and the y is missing because it's stuck in a router buffer somewhere.

If we try to parse this incomplete frame, we fail.
We need a Buffering Strategy. We need to accumulate bytes until we have a full Frame.

pub struct Connection {
    stream: TcpStream,
    buffer: BytesMut,
}

impl Connection {
    pub fn new(stream: TcpStream) -> Self {
        Self {
            stream,
            buffer: BytesMut::with_capacity(4096),
        }
    }

    pub async fn read_frame(&mut self) -> Result<Option<Frame>, std::io::Error> {
        loop {
            // 1. Try to parse a frame from buffered data
            if let Some(frame) = parse_frame(&mut self.buffer)? {
                return Ok(Some(frame));
            }

            // 2. Read more data from socket
            let n = self.stream.read_buf(&mut self.buffer).await?;

            // 3. Check for connection close
            if n == 0 {
                if self.buffer.is_empty() {
                    return Ok(None);
                } else {
                    return Err(std::io::Error::new(
                        std::io::ErrorKind::UnexpectedEof,
                        "connection reset by peer",
                    ));
                }
            }
        }
    }

    pub async fn write_frame(&mut self, frame: &Frame) -> Result<(), std::io::Error> {
        let mut buf = BytesMut::new();
        serialize_frame(frame, &mut buf);
        self.stream.write_all(&buf).await?;
        self.stream.flush().await?;
        Ok(())
    }
}

This loop is the "heartbeat" of the network layer.
It aggressively fills the BytesMut buffer (which grows dynamically) until a valid RESP frame can be extracted.

Combined with the zero-copy slicing, this allows Rusty Redis to saturate a 10Gbps network link while keeping CPU usage remarkably low.

This was the physics of the wire. We have successfully taken raw electrons from the network card and turned them into Rust types without blocking the CPU and without burning the heap.

Now, let's discuss where this data goes. Time to discuss State.
And that brings us to the most dangerous word in concurrent programming: Locks (necessary, but often catastrophic at scale).


PART II: THE ENGINE (STATE)

Chapter 3: The Fallacy of the Global Lock

The heart of any database is the Key-Value Store.

In a single-threaded world (like standard Redis or Node.js), this is trivial. You create a HashMap. You read from it. You write to it. It's simple. It's easy. Life is good.

But Rusty Redis is multi-threaded. We have 16 cores available on a modern server. We want to use all of them.
This introduces one of the most dangerous problem in Computer Science: Shared Mutable State.

If Thread A is writing to the map, and Thread B is reading from it simultaneously, you get a Data Race. The memory becomes corrupted. The program crashes (or worse, returns garbage data).

To stop this, we need synchronization. We need Locks.

Attempt 1: The Mutex (The Naive Approach)

The first instinct of every Rust developer is to wrap the data in a Mutex (short for Mutual Exclusion).

struct Db {
    // A standard HashMap protected by a Mutex
    store: Mutex<HashMap<String, Bytes>>,
}

The Theory:
When a thread wants to access the data, it "locks" the Mutex. If another thread holds the lock, the new thread goes to sleep (blocks) until the lock is released.

The Reality:
A Mutex allows only one thread to access the map at a time.
Imagine a superhighway or an expressway with 16 lanes (your 16 CPU cores).
A Mutex puts a single toll booth in the middle of the highway that only lets one car pass at a time.

  • Scenario: 10,000 requests come in simultaneously.
  • Result: 1 thread does work. 9,999 threads are asleep, waiting in a queue.
  • Performance: Your 16-core server effectively becomes a 1-core server, but slower, because the OS has to waste time waking up and putting to sleep thousands of threads.

This is called Lock Contention. And it sucks because it kills performance.
And we don't like things that kill performance.

Chapter 4: The Physics of Cache Coherence (Why RwLock Also Fails)

"Okay," you say. "Mutexes are too aggressive. Databases are mostly read-heavy. Let's use a Reader-Writer Lock."

Let's understand that approach.

Attempt 2: The RwLock

struct Db {
    store: RwLock<HashMap<String, Bytes>>,
}

The Theory:
An RwLock allows:

  • Infinite Readers: Multiple threads can read simultaneously.
  • One Writer: If a thread wants to write, it waits for all readers to finish, then gets exclusive access.

And since 90% of Redis traffic is GET (Reads), this actually sounds perfect, doesn't it?

The Reality Check:
It fails due to a phenomenon known as Cache Line Bouncing.

To implement an RwLock, the lock must maintain a "Reader Count" in memory (an integer).
Every time a thread wants to read, it must:

  1. Fetch the Reader Count.
  2. Increment it atomically (fetch_add).
  3. Read the data.
  4. Decrement it atomically.

The Hardware Problem:
Modern CPUs have L1/L2/L3 caches.
If Core 1 increments the counter, that memory address is marked as "Modified" in Core 1's L1 cache.
If Core 2 wants to increment the counter, it cannot just do it. The CPU must physically flush Core 1's cache line to main RAM, and then Core 2 must pull it into its own L1 cache.

This is the MESI Protocol (Modified, Exclusive, Shared, Invalid) in action.

When you have 16 cores all trying to increment the same integer (the RwLock read count) at the same time, the cache line bounces between cores like a crazy ball.
The memory bus becomes saturated. The CPUs spend more time waiting for the cache line than executing your code.

Even if you are only reading, global locks do not scale.

Chapter 5: The Sharded Solution (DashMap)

To solve contention, we must destroy the Global Lock.
If having one lock is the bottleneck, the solution is to have many locks.

We use Sharding (or Lock Striping).
We do not use a standard HashMap. We use a specialized concurrent map called DashMap.

Internally, DashMap is not one map. It is a collection of N smaller maps (called Shards or Buckets).
N is typically determined by a hash function.

The Algorithm

When a request for GET "user:100" comes in:

  1. Hash the Key: We run a hash function on "user:100". Let's say the result is 0x12345678.
  2. Determine the Shard: We mod the hash by the number of shards (e.g., 64). ShardID = 0x12345678 % 64 = 7.
  3. Lock Only Shard 7: The thread acquires a lock only for that specific shard.

The Concurrency Win:

  • Thread A accesses "user:100" -> Locks Shard 7.
  • Thread B accesses "user:200" -> Locks Shard 12.
  • Thread C accesses "settings" -> Locks Shard 3.

All three threads run in parallel. They do not fight over the same lock. They do not bounce cache lines.
Unless two threads try to access keys that hash to the exact same shard (a collision), there is Zero Contention.

The Rust Implementation

This is how we structure the Db state in Rusty Redis to achieve 1.5M+ ops/sec.

use dashmap::DashMap;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use tokio::sync::broadcast;

/// The main Database state.
/// We use Arc to share it across threads safely.
#[derive(Clone)]
pub struct Db {
    // The "Sharded" KV Store
    // Key: String
    // Value: Bytes (Zero-copy ref counted)
    // DashMap handles the locking internally.
    pub entries: Arc<DashMap<String, Bytes>>,

    // We also shard the expirations to prevent contention on TTL checks
    // Key: String, Value: Instant (When it dies)
    expirations: Arc<DashMap<String, Instant>>,

    // Pub/Sub channels: each channel name has its own broadcast sender
    pub_sub: Arc<DashMap<String, broadcast::Sender<Bytes>>>,

    // Change tracking for auto-snapshot optimization
    changed: Arc<AtomicBool>,
}

impl Db {
    pub fn new() -> Db {
        let db = Db {
            entries: Arc::new(DashMap::new()),
            expirations: Arc::new(DashMap::new()),
            pub_sub: Arc::new(DashMap::new()),
            changed: Arc::new(AtomicBool::new(false)),
        };
        // Start background tasks
        db.start_eviction_task();
        db.start_snapshot_task();
        db
    }

    /// Optimized SET implementation
    pub fn set(&self, key: String, value: Bytes, expire: Option<Duration>) {
        // 1. Insert into the main store
        // This only locks ONE shard inside the DashMap
        self.entries.insert(key.clone(), value);
        
        // 2. Mark database as changed for auto-snapshot
        self.changed.store(true, Ordering::Relaxed);

        // 3. Handle Expiry
        if let Some(duration) = expire {
            let when = Instant::now() + duration;
            self.expirations.insert(key, when);
        } else {
            // If no expiry, ensure we remove any old expiry logic
            self.expirations.remove(&key);
        }
    }
}

Notice that we use DashMap directly. We do not wrap it in a Mutex.
The locking is handled inside the insert and get methods, hidden from the user, and extremely granular.

Chapter 6: Lifetime Management (The Arc)

You will notice Arc<...> wrapping everything in the struct above.
Arc stands for Atomic Reference Counting.

In Rust, every value has a single owner. When the owner goes out of scope, the value is dropped (freed).
But in a database server, the Db struct is shared by:

  1. The TCP Listener loop.
  2. Thousands of connection handler threads.
  3. The Background Janitor task (expiry).
  4. The Snapshot task (persistence).

Who owns the Db?
Everyone owns it.

We cannot simply pass a reference (&Db) because the compiler cannot prove that the main thread will live longer than the connection threads.
(What if the main thread crashes but a connection thread keeps running? Use-after-free!)

The Solution?
Arc allows multiple owners.

  • When we call db.clone(), we do NOT copy the data. We typically just copy a pointer (8 bytes) and atomically increment a generic reference counter.
  • All threads point to the exact same heap memory location.
  • When a thread finishes (connection closes), Arc drops the handle and decrements the counter.
  • When the counter hits 0, the memory is freed.
// In main.rs

// 1. Create the Database State (Ref count = 1)
let db = Db::new();

loop {
    let (socket, _) = listener.accept().await.unwrap();

    // 2. Clone the handle for the new connection (Ref count++)
    // This is extremely cheap (nanoseconds).
    let db_handle = db.clone(); 

    // 3. Move the handle into the async task
    tokio::spawn(async move {
        process_connection(socket, db_handle).await;
        // When this task finishes, db_handle is dropped (Ref count--)
    });
}

This combination, that is, Sharding for Concurrency and Arc for Lifetime Management, is the secret of high-performance Rust services.

We have eliminated the bottlenecks of global locking and the dangers of manual memory management in one swoop.

Now that we have a fast storage engine, we need to execute commands. But simply running code isn't enough. We need to handle thousands of connections efficiently.

Let's move onto the Runtime.


PART III: THE RUNTIME (ASYNC PHYSICS)

Chapter 7: The Illusion of Multitasking (Async/Await)

In Lithos (my C database), I managed threads manually. I knew exactly which line of code was executing on which CPU core.
In Rusty Redis, I use async/.await.

To the uninitiated, async looks like syntactic sugar. It looks like "normal" code that just pauses.
But under the hood, the Rust compiler is doing something radical. It is fundamentally rewriting your code into a State Machine.

The Stackless Coroutine

In languages like Go or Java (Project Loom), "Green Threads" have their own stack. When they pause, the runtime saves their stack pointer. This uses memory (2KB - 4KB per goroutine).
Rust is zero-cost. It does not allocate a stack for async tasks.

Consider this simple handler in Rusty Redis:

async fn handle_connection(mut socket: TcpStream, db: Db) {
    let mut connection = Connection::new(socket);
    
    // State 1: Wait for data
    let frame = connection.read_frame().await.unwrap();

    // State 2: Process data
    let command = Command::from_frame(frame).unwrap();
    let response = command.apply(&db);

    // State 3: Write response
    connection.write_frame(&response).await.unwrap();
}

When you compile this, Rust generates a hidden Enum that represents the state of this function. It somewhat looks like this (simplified):

// The Compiler-Generated State Machine
enum HandleConnectionFuture {
    // State 0: Just started
    Start { socket: TcpStream, db: Db },

    // State 1: We are waiting for read_frame() to complete
    WaitingForRead { 
        socket: TcpStream, 
        db: Db, 
        internal_read_future: ReadFrameFuture 
    },

    // State 2: We have the frame, waiting to write
    WaitingForWrite { 
        socket: TcpStream, 
        response: Frame, 
        internal_write_future: WriteFrameFuture 
    },

    // State 3: Done
    Terminated,
}

This Enum is roughly 200 bytes. It lives on the heap.

When the code hits .await, it doesn't "block" the thread.

  1. It checks: "Is the I/O ready?"
  2. If No: It saves the current variables into the Enum variant (WaitingForRead) and returns Pending.
  3. The thread is now free to go do something else.

This is why Async Rust handles 10,000 connections easily. We don't have 10,000 threads. We have 10,000 tiny state machines sitting in RAM, waiting for a signal to advance to the next state.

Chapter 8: The Tokio Scheduler (M:N Threading)

We have the state machines (the Tasks). But who pushes them forward? Who calls poll()?
This is the job of the Runtime. Rusty Redis uses Tokio.

Tokio implements an M:N Threading Model.

  • M Tasks: Millions of async futures (our connections).
  • N OS Threads: Exactly equal to the number of CPU cores (e.g., 16).

The Scheduler Algorithm

If you have 16 cores, Tokio spawns 16 Worker Threads.
Each Worker Thread has its own Local Run Queue.
When a socket receives a packet, the kernel wakes up the task. The task is pushed into the Local Run Queue of the thread that caught the event.

The Problem: What if Core 1 is hammered with 1,000 active connections, but Core 2 is idle?
If we do nothing, Core 1 will choke while Core 2 sleeps. This is the "Head-of-Line Blocking" problem.

The Solution: Work Stealing

Tokio solves this with a Work Stealing Strategy.

  1. Core 2 is idle. It finishes its last task.
  2. It looks at its own queue. Empty.
  3. It looks at the Global Queue. Empty.
  4. The Heist: It looks at Core 1's queue. It sees 1000 tasks.
  5. It atomically steals half of them (500 tasks) and moves them to its own queue.

This algorithm is mathematically beautiful. It ensures that the system automatically load-balances itself.

  • Under low load, threads stick to their own tasks (maximizing CPU cache locality).
  • Under high load or imbalance, threads help each other out.

This is why Rusty Redis' CPU usage graph looks like a flat line across all cores during the benchmark. The scheduler ensures no core is left behind.

Chapter 9: The Event Loop (Select!)

One of the hardest problems in distributed systems is waiting for multiple things at once.
In a database, a connection isn't just "Request -> Response".

We have Pub/Sub.
A client might subscribe to a channel (SUBSCRIBE chat).
Now, the server needs to listen for TWO things simultaneously:

  1. The Network: Did the client send an UNSUBSCRIBE command?
  2. The Message Bus: Did someone PUBLISH a message to the "chat" channel?

The Old Way (C/Go)

In C, you need epoll. You register both the socket FD and the event FD with the kernel. It is verbose and error-prone.
In Go, you use select over channels. It is elegant, but channels have overhead.

The Rust Way: tokio::select!

Rust solves this with the select! macro. It allows us to race multiple futures against each other.

// The Pub/Sub Loop
async fn subscribe_loop(mut socket: Connection, mut rx: broadcast::Receiver<Bytes>) {
    loop {
        tokio::select! {
            // Branch A: Listen for internal messages
            Ok(msg) = rx.recv() => {
                // We got a message from the bus! Write it to the client.
                let frame = Frame::Array(vec![
                    Frame::Bulk("message".into()),
                    Frame::Bulk(channel_name.clone()),
                    Frame::Bulk(msg),
                ]);
                socket.write_frame(frame).await.unwrap();
            }

            // Branch B: Listen for client commands (e.g. Unsubscribe)
            res = socket.read_frame() => {
                match res {
                    Ok(Some(frame)) => handle_command(frame),
                    // If socket closes, we break the loop
                    Ok(None) => break, 
                    Err(_) => break, 
                }
            }
        }
    }
}

How it works:

  1. select! polls both Branch A and Branch B.
  2. It says to the runtime: "Wake me up if either of these makes progress."
  3. If a message arrives on rx (Branch A):
  • It executes the code block for Branch A.
  • Crucially: It DROPS the future for Branch B.

The Drop Semantics:
In Rust, "canceling" a task is trivial. You just stop polling it and drop the struct.
When socket.read_frame() is dropped, its destructor runs. It automatically deregisters interest from the TCP socket.
There is no "thread cleanup" or "zombie processes." The state machine just evaporates.

This allows us to write complex concurrent logic (timeouts, cancellation, racing) in a declarative way.

Chapter 10: The Nervous System (Broadcast Channels)

To implement Pub/Sub, we need a way to send messages between tasks.
Thread A (Publisher) needs to talk to Thread B, C, and D (Subscribers).
We use tokio::sync::broadcast.

Why not Mutex<Vec<Sender>>?
A naive implementation would look like this:

struct Channel {
    subscribers: Mutex<Vec<TcpStream>>, // Bad!
}

If we publish to 10k subscribers, we would have to iterate the vector and write to each socket inside the lock.
The Publisher would block for seconds. The entire database would freeze.

The Linked-Chunk Buffer

Tokio's broadcast channel is lock-free (mostly).
It does not copy the message 10,000 times.
It uses a Linked List of Chunks.

  1. The Publisher writes the message "Hello" into a shared buffer (Arc).
  2. The channel stores a pointer to this buffer in the Tail of the list.
  3. Every Subscriber holds a simple integer index (cursor).

When a Subscriber calls recv(), it just checks:
if channel.tail > my.cursor { return buffer[my.cursor++]; }

The Zero-Copy Win:
The message Bytes are reference counted.
Even if we have 10k subscribers, the actual data payload ("Hello") exists in memory only once.
Each subscriber gets a reference to it.
When the slowest subscriber finally reads it, the memory is freed.

This architecture handles "Slow Consumers" gracefully. If a subscriber falls too far behind (the buffer wraps around), they get a Lagged error, but they do not slow down the Publisher.

Chapter 11: The Cooperative Caveat

There is one danger in Async Rust.
Blocking the Loop.

Remember, we only have N OS threads (typically matching your CPU core count).
If one of our state machines decides to calculate the 1,000,000th Fibonacci number, or calls std::thread::sleep() instead of tokio::time::sleep(), that specific OS thread is held hostage.
It cannot switch to another task.
If you block one thread, you starve 1/16th of your users.

The Rule of Rusty Redis:

"Never block the thread. If you must do heavy CPU work or synchronous File I/O, banish it to spawn_blocking."

This is why, in the Persistence layer (saving to disk), we do not just call fs::write. Even though modern NVMe drives are fast, the syscalls are blocking.
Instead, we use tokio::fs.

tokio::fs is actually a clever abstraction. Traditional Unix doesn't support true async file I/O (though io_uring on Linux 5.1+ does).
So Tokio uses a separate Thread Pool (spawn_blocking) dedicated solely to blocking file operations.
When we save a snapshot, the heavy lifting happens on a side-thread, leaving the main worker cores free to serve GET and SET requests without blocking.

By now, we have mastered the Wire. We have mastered the State. We have mastered the Runtime.
Finally, it is time to face what was the hardest part for me: Time and Persistence.

The question now is how exactly do we handle data that must die (TTL), and data that must live forever (Snapshots)?


PART IV: MEMORY & TIME

Chapter 12: The Dimension of Time (TTL)

A key-value store without expiration is just a fancy hash map.
Real-world caches need TTL (Time To Live).
SET session:123 "user_data" EX 60.

This introduces a new dimension to our data. It is no longer just Space (RAM); it is Time.
The key session:123 must effectively vanish from the universe at T + 60s.
How do we implement this efficiently when we have 10 million keys?

Attempt 1: The Timer Heap (The Naive Approach)

You might think: "Just spawn a timer."

// Don't do this
tokio::spawn(async move {
    sleep(Duration::from_secs(60)).await;
    db.remove("session:123");
});

The Problem:
If you insert 1 million keys, you spawn 1 million async tasks.
The Tokio scheduler is amazing, but 1 million tasks waiting in a Priority Queue (the underlying timer wheel) consumes massive CPU just to tick the clock. It bloats the runtime overhead significantly.

The Hybrid Strategy (Passive + Active)

Rusty Redis, like Redis, uses a two-pronged strategy. We cheat. We don't actually delete the key at the exact microsecond it expires.
We just make sure it is unreachable.

1. Passive Expiry (Lazy Collection)
This happens on the Read path.
When a user calls GET session:123:

  • We look up the key.
  • We check the expirations map.
  • if now() > expiry_time:
  • We secretly delete the key.
  • We return (nil) to the user.

To the user, it looks like the key is gone. But there's a problem.

The Flaw: What if the user never accesses the key again?
The data sits in RAM forever. This is a Memory Leak.

2. Active Expiry (The Janitor)
We need a garbage collector. But scanning the entire map (10 million keys) every second would freeze the database.
We use a Probabilistic Algorithm.

We spawn one dedicated background task: The Janitor.

fn start_eviction_task(&self) {
    let entries = Arc::clone(&self.entries);
    let expirations = Arc::clone(&self.expirations);

    tokio::spawn(async move {
        let mut interval = tokio::time::interval(Duration::from_millis(100));
        
        loop {
            interval.tick().await;
            
            let now = Instant::now();
            let mut evicted = 0;

            // Step 1: Sample up to 20 keys from the expiration map
            let keys_to_check: Vec<String> = expirations
                .iter()
                .take(20)
                .map(|entry| entry.key().clone())
                .collect();

            for key in keys_to_check {
                if let Some(expiry_entry) = expirations.get(&key) {
                    if now > *expiry_entry.value() {
                        // Step 2: Evict dead keys from both maps
                        drop(expiry_entry);
                        entries.remove(&key);
                        expirations.remove(&key);
                        evicted += 1;
                    }
                }
            }

            // Step 3: The Adaptive Loop
            // If more than 25% of our sample was garbage, we assume 
            // there is A LOT of garbage left. Do not sleep. Repeat immediately.
            if evicted > 5 {
                continue; // Skip the tick(), run again
            }
        }
    });
}

Here's why this is brilliant:
It creates a feedback loop.

  • If memory is clean, the loop sleeps (low CPU).
  • If a massive wave of keys expires (e.g., session dump), the loop spins up aggressively (high CPU) to reclaim RAM quickly.

It balances CPU usage against Memory pressure automatically.

Chapter 13: The Vault (Async Persistence)

An In-Memory DB is useless if a power outage wipes your data.
We need Persistence.
We implement RDB Snapshotting: saving the state to disk periodically.

Here's the tough part:
The database is mutating 100,000 times a second.
The question arises as to how do we save a consistent snapshot without locking the entire DB?

If we iterate slowly, the data at the start of the file will be from T=0 and the data at the end will be from T=5s. That is a corrupted state.

The Magic of Copy-On-Write (Arc + Bytes)

In a traditional language, you would need fork() (like Redis does) to let the OS handle the memory snapshotting.
In Rust, we have Arc.

When the persistence task wakes up, it does something surprisingly simple:
It Clones the Database.

async fn save_snapshot(db: Db) {
    // 1. Clone the Arc handles
    // This does NOT copy the data. It increments ref-counts.
    // This is atomic and near-instant.
    let snapshot_entries = db.entries.clone();

    // 2. Spawn a blocking task to write to disk
    tokio::task::spawn_blocking(move || {
        write_to_disk(snapshot_entries);
    });
}

Oops. Guess what? Thats a race condition.

If the main thread updates a key after we clone the map, does the snapshot see it?

  • DashMap Semantics: The snapshot_entries handle points to the same map. If we just iterated it, we would see new writes. That's bad.
  • The Fix: We actually rely on DashMap's iteration behavior or, for stricter consistency, we can utilize the underlying Bytes immutability.

Since Bytes are immutable, if the main thread performs a SET, it replaces the entry with a new Bytes object. The old Bytes object (held by our snapshot iterator) remains untouched because its reference count is > 0.
We effectively get a consistent view of the data for the cost of an integer increment.

The Atomic Swap

Writing to a file is dangerous. If the server crashes halfway through writing dump.rdb, you are left with a corrupted file.

For this reason, we use a Write-Replace Pattern.

  1. Open dump.rdb.tmp.
  2. Stream all data into it (using bincode or JSON).
  3. Call fsync (flush to hardware).
  4. Rename dump.rdb.tmp -> dump.rdb.
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tokio::fs;

#[derive(Serialize, Deserialize)]
pub struct Snapshot {
    pub entries: HashMap<String, Vec<u8>>,
}

pub async fn save(db: &Db, filename: &str) -> io::Result<()> {
    // Collect snapshot data
    let mut entries = HashMap::new();
    
    for entry in db.entries.iter() {
        let key = entry.key().clone();
        let value = entry.value().to_vec();
        entries.insert(key, value);
    }

    let snapshot = Snapshot { entries };
    // Serialize to bytes
    let serialized = bincode::serialize(&snapshot)
        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;

    // Write to temporary file
    let temp_file = format!("{}.tmp", filename);
    fs::write(&temp_file, serialized).await?;

    // The Atomic Commit
    // This is a syscall that the OS guarantees is atomic.
    fs::rename(&temp_file, filename).await?;

    Ok(())
}

By using rename, we guarantee that dump.rdb is always valid. It either contains the old snapshot or the new one. Never a half-written mess.

Chapter 14: The Code Architecture (Clean Architecture)

We have discussed the components. Now, how do we organize 830 lines of Rust without creating spaghetti?
Rusty Redis follows a strict separation of concerns, heavily inspired by "Hexagonal Architecture".

1. The Interface Adapter (connection.rs)

  • Responsibility: Knows about TCP, Bytes, and Frames.
  • Ignorance: Knows nothing about the database or commands. It just parses bytes into Enums.

2. The Application Core (cmd.rs)

  • Responsibility: Business logic. "What does SET do?"
  • Ignorance: Knows nothing about TCP. It receives a Frame, executes it against Db, and returns a Frame.
  • Benefit: We can test this without a network. Unit tests just pass Frame::Array(...) and assert the result.

3. The Infrastructure (db.rs, persistence.rs)

  • Responsibility: The dirty work. Managing HashMaps, File Systems, and Time.

The Dependency Injection
We wire it all up in main.rs.

#[tokio::main]
async fn main() {
    // 1. Init Infrastructure
    let db = Db::new(); // Background tasks start automatically

    // 2. Load existing data
    let dump_file = "dump.rdb";
    match tokio::fs::try_exists(dump_file).await {
        Ok(true) => {
            match persistence::load(dump_file).await {
                Ok(entries) => {
                    let count = entries.len();
                    db.bulk_insert(entries);
                    info!("loaded {} keys from disk", count);
                }
                Err(e) => error!("failed to load dump file: {}", e),
            }
        }
        Ok(false) => info!("no dump file found, starting with empty database"),
        Err(e) => error!("failed to check for dump file: {}", e),
    }

    // 3. Register Shutdown Handler
    let db_for_shutdown = db.clone();
    tokio::spawn(async move {
        match tokio::signal::ctrl_c().await {
            Ok(()) => {
                info!("received shutdown signal, saving database...");
                match persistence::save(&db_for_shutdown, "dump.rdb").await {
                    Ok(_) => {
                        let count = db_for_shutdown.entries.len();
                        info!("saved {} keys to disk", count);
                    }
                    Err(e) => error!("failed to save on shutdown: {}", e),
                }
                std::process::exit(0);
            }
            Err(e) => error!("failed to listen for shutdown signal: {}", e),
        }
    });

    // 4. Start Network Listener
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    loop {
        let (socket, _) = listener.accept().await.unwrap();
        let db_handle = db.clone();

        // 5. Inject Dependencies per Connection
        tokio::spawn(async move {
            process(socket, db_handle).await;
        });
    }
}

This is readable. It is testable. It is safe.


Epilogue: The Rust Advantage

Building Lithos in C taught me how the machine works.
Building Rusty Redis in Rust taught me how to scale.

In C, I spent 50% of my time debugging Segfaults, Double Frees, and Race Conditions.
In Rust, I spent 0% of my time on memory errors.

The compiler forced me to handle concurrency correctly.

  • It wouldn't let me share a mutable variable across threads without a Lock or a Channel.
  • It wouldn't let me write to a file from an async thread without acknowledging the blocking nature.
  • It would genuinely scream at me until I architected the system correctly.

Oh, a fun fact: Rusty Redis is not just a clone. It is a testament to the fact that you do not need to choose between Safety and Speed.
You can have both.

You just need to fight the Borrow Checker first.
And once you win that fight, you have built something that can handle over 1.5 million requests per second, and sleep soundly at 3 AM without being afraid of getting segfault nightmares.

The Code:
https://github.com/bit2swaz/rusty-redis