The Computing Series

Sharding and Partitioning

Introduction

In 2017, Discord stored every message in a single MongoDB replica set. The system handled millions of messages per day. By late 2017, the database held billions of messages. Read latency became unpredictable. Compaction stalled. The on-call team spent nights nursing the database through traffic spikes. The single replica set had reached its ceiling.

Discord migrated to Cassandra — a distributed database designed for horizontal scaling. Messages were partitioned by channel ID and bucketed by time. Each channel’s messages lived on a predictable set of nodes. Adding capacity meant adding nodes; Cassandra redistributed data automatically. The migration took months. The result was a system that scaled linearly with message volume instead of degrading under it.

Partitioning — dividing a single large dataset into smaller, independently stored and queried subsets — is the architectural act that makes databases scale horizontally. Without it, every byte of data must eventually fit on a single machine, and every query must go through a single CPU. With it, data is distributed across dozens or hundreds of machines, and queries can be answered in parallel.

The problem, as Discord’s engineers documented, is that partitioning is not free. The choice of partition key — channel ID, user ID, timestamp — determines which queries are fast and which require cross-partition fan-out. Choosing the wrong scheme does not just reduce performance — it moves the bottleneck from one place to another, often invisibly, until production traffic reveals it catastrophically.

The three dominant partitioning strategies, the hot shard problem, and the rebalancing problem that appears whenever a cluster grows each introduce a different failure surface.


Thread Activation

This chapter activates T1 (Hashing) and T8 (Divide & Conquer) simultaneously.

T1 originated in Book 1, Chapters 9–15 with sets, functions, and sequences — the mathematical structures that describe how data is organized. In Book 2, Chapter 15, consistent hashing was introduced as a technique for mapping keys to nodes with minimal disruption when the set of nodes changes. Sharding is T1 at database scale: the choice of how to partition a key space determines query expressiveness, rebalancing cost, and hotspot risk.

T8 (Divide & Conquer) makes its first full appearance in this chapter. Every partition decision is a distributed systems decision: which machine holds which data, how requests are routed, and what happens when a machine fails. The routing map that tells a client which shard owns a given key is a coordination problem. The rebalancing that happens when a new node joins is a data movement problem. Both threads run through every chapter in the second half of this book.


The Concept

Partitioning (also called sharding) divides a dataset into disjoint subsets, each stored on a separate node. Every record belongs to exactly one partition. A routing layer determines which partition owns a given record.

Three strategies dominate production systems:

Strategy Mechanism Strengths Weaknesses
Range partitioning Split by key range (A–M on shard 1, N–Z on shard 2) Range queries are efficient; data locality Sequential writes create hot shards
Hash partitioning shard = hash(key) % N Even distribution of random keys No range queries; rebalancing is O(N)
Consistent hashing Keys and nodes on a ring; key goes to next clockwise node O(1/N) keys move when a node joins Requires virtual nodes for balance

The routing map — the data structure that tells a client which partition owns a given key — is itself a scaling concern. It can be stored centrally (a single configuration server) or distributed (embedded in the client library). Centralised routing is simpler but is a single point of failure. Distributed routing is resilient but requires clients to handle routing table updates.


How It Works

Range Partitioning

Range partitioning assigns contiguous ranges of the key space to each shard. A routing table maps ranges to nodes.

// Range partition routing table
routing_table = [
    {min: "A", max: "M", node: shard_1},
    {min: "N", max: "Z", node: shard_2},
]

function route(key):
    for range in routing_table:
        if range.min <= key <= range.max:
            return range.node
    raise RoutingError("key out of range")

// Range scan: efficient — all keys in a range live on the same shard
function range_scan(start_key, end_key):
    affected_shards = routing_table.filter(overlaps(start_key, end_key))
    results = []
    for shard in affected_shards:
        results += shard.scan(start_key, end_key)
    return merge_sorted(results)

The problem: if keys are generated sequentially (timestamps, auto-increment IDs), all writes go to the shard that owns the current key range. That shard receives 100% of write traffic while the others sit idle.

Hash Partitioning

Hash partitioning maps each key to a shard by applying a hash function.

num_shards = 4

function route(key):
    shard_index = hash(key) % num_shards
    return shards[shard_index]

// Write: distributed evenly for random keys
function write(key, value):
    shard = route(key)
    shard.put(key, value)

// Point lookup: O(1) — hash the key, go directly to the shard
function read(key):
    shard = route(key)
    return shard.get(key)

// Range scan: NOT possible without scanning all shards
// Adjacent keys hash to arbitrary shards — there is no locality
function range_scan(start_key, end_key):
    results = []
    for shard in shards:                // must scan ALL shards
        results += shard.scan_range(start_key, end_key)
    return merge_sorted(results)

The rebalancing problem: when num_shards changes from 4 to 5, the formula hash(key) % num_shards gives a different result for almost every key. Nearly all data must be moved.

Consistent Hashing

Consistent hashing places both keys and nodes on a circular ring (hash space 0 to 2^32). A key is assigned to the first node clockwise from its hash position.

// Ring with virtual nodes for balance
ring = SortedMap()   // maps hash positions to node identifiers

function add_node(node, virtual_node_count=150):
    for i in 0..virtual_node_count:
        position = hash(node.id + ":" + i)
        ring[position] = node

function route(key):
    key_position = hash(key)
    // Find the first node clockwise from key_position
    node_position = ring.find_first_ge(key_position)
    if node_position is None:
        node_position = ring.first()   // wrap around
    return ring[node_position]

// When a new node joins: only the keys between
// (new_node - 1 predecessor) and new_node move.
// Expected fraction of keys that move = 1 / (num_nodes + 1)
function add_node_and_rebalance(new_node):
    add_node(new_node)
    predecessor = ring.predecessor(new_node.position)
    // Only keys in (predecessor, new_node] need to move
    migrate_keys(from: next_node_after(new_node),
                 to: new_node,
                 key_range: (predecessor.position, new_node.position])

When a node joins, only 1/N of keys move on average — compared to near-total movement for modulo hashing. Cassandra, Amazon DynamoDB, and Riak use consistent hashing.

The Hot Shard Problem

A hot shard occurs when one partition receives disproportionate traffic — reads, writes, or both.

// Scenario: celebrity user with 50 million followers
// All reads for that user's feed route to the same shard
function get_feed(user_id):
    shard = route(user_id)   // always shard_3 for user "celebrity_A"
    return shard.read_feed(user_id)

// Shard 3 receives millions of requests/second
// Other shards sit at 5% utilisation

// Mitigation 1: key splitting — append a random suffix
function write_celebrity_data(user_id, data):
    suffix = random_int(0, 9)
    shard_key = user_id + ":" + suffix   // spreads across 10 shards
    route(shard_key).write(data)

// Mitigation 2: read replicas for hot keys
function read_hot_key(key):
    replicas = hot_key_registry.get_replicas(key)
    chosen = replicas[random_int(0, len(replicas) - 1)]
    return chosen.read(key)

Tradeoffs

AT1 — Consistency/Availability: Hash partitioning distributes load evenly but eliminates range query capability entirely. To answer “find all users created between date A and date B” on a hash-partitioned table, you must fan out to every shard and merge results. Range partitioning preserves query expressiveness at the cost of potential hot shards. The choice is not between good and bad — it is between two different constraints: query flexibility and write distribution cannot be maximised simultaneously with a single partitioning key.

AT5 — Centralisation/Distribution: The routing map that directs each request to the correct shard is itself a scaled component. Centralising it (ZooKeeper, a config server, a dedicated routing service) provides a single authoritative source of truth but introduces a coordination bottleneck and a single point of failure. Distributing it (embedding the routing table in client libraries, gossip protocols) removes the bottleneck but means routing tables can diverge temporarily across clients. DynamoDB uses a centralised routing tier. Cassandra gossips the ring state to all nodes and embeds routing in the client driver. Neither approach is wrong — the tradeoff is operational simplicity versus resilience to coordinator failure.


Where It Fails

FM6 — Hotspotting: Range partitioning on a monotonically increasing key (a timestamp, an auto-increment ID) sends all new writes to the partition that owns the current key range. The hot shard receives 100% of write traffic. It exhausts CPU and I/O. Other shards sit idle. The system appears to have plenty of capacity in aggregate while one partition is saturated. Observability makes this worse: aggregate metrics (average CPU, average query latency) look healthy because they average across hot and cold shards. Only per-shard metrics reveal the problem.

FM3 — Unbounded Resource Consumption during Rebalancing: When a new node joins a hash-partitioned cluster, the modulo shift forces nearly all data to move simultaneously. The rebalancing traffic competes with live production traffic for network bandwidth, disk I/O, and CPU. On a large cluster, rebalancing can last hours. During that window, a second node failure — likely, given that the cluster is stressed — leaves the cluster in an inconsistent state. Consistent hashing reduces the volume of data movement to 1/N of total data, but even 1/N of a petabyte-scale dataset is substantial. Rebalancing must be rate-limited and throttled, and the cluster must continue serving traffic throughout.


Real Systems

Cassandra uses consistent hashing with virtual nodes. Each physical node is represented by 256 virtual nodes on the ring, which distributes load evenly even when physical nodes have different capacities. When a node joins, only the keys between its predecessor and itself migrate. Cassandra does not use a centralised routing coordinator — the client driver is given the ring state via gossip and routes requests directly to the correct replica.

Amazon DynamoDB uses a centralised routing layer backed by consistent hashing. Partition keys are hashed and mapped to partitions. DynamoDB automatically splits hot partitions — if a partition exceeds its provisioned throughput, the system detects the hotspot and subdivides the partition transparently. This automatic mitigation is not instant; brief hotspot spikes still cause throttling visible to clients as ProvisionedThroughputExceededException.

Google Spanner uses range partitioning via Paxos-based splits. The system monitors hot ranges and automatically splits them into smaller ranges that can be moved to less-loaded nodes. Because Spanner’s data model supports distributed transactions across partitions (via two-phase commit), range partitioning does not trade away transaction semantics. The cost is latency: cross-partition transactions require coordination across multiple Paxos groups.


Concept: Sharding and Partitioning

Thread: T8 (Divide & Conquer) ← Book 2, Ch 15 (Consistent Hashing) → Book 5, Ch 8 (Distributed Database Design)

Core Idea: Partitioning divides a dataset across machines to scale beyond single-node capacity, with the partitioning strategy determining query expressiveness, load distribution, and rebalancing cost.

Tradeoff: AT5 — Centralisation/Distribution (central routing map vs. gossip-distributed ring state)

Failure Mode: FM6 — Hotspotting (sequential keys on range partitions saturate one shard while others sit idle)

Signal: When one shard’s CPU or I/O is consistently above 80% while other shards are below 20%, the partition key is creating a hot shard.

Maps to: Book 0, Framework 3 (Threads) and Framework 5 (Infrastructure Components)


Exercises

Level 2 — Apply

A social network stores user profiles in a single PostgreSQL table with 200 million rows. The primary key is an auto-incrementing integer. The team decides to shard across 8 MySQL nodes using modulo hash partitioning (shard = user_id % 8).

  1. The team needs to add a ninth shard. Estimate what fraction of rows must move. Write pseudocode for the migration process that keeps the system available during the move.

  2. The team discovers that 80% of reads are for users created in the last 30 days. Design an alternative partitioning scheme that exploits this access pattern. What query patterns does your scheme support that modulo hashing cannot?

  3. A “power user” account generates 10,000 API requests per minute. All requests hash to shard 4. Describe two mitigations. For each, specify what additional data structure or coordination mechanism is required.

  4. The team wants to support the query “find all users registered between 2023-01-01 and 2023-06-30” efficiently. Which partitioning scheme would you choose? What is the tradeoff you accept?

Level 3 — Design

Design the partitioning strategy for a real-time analytics system that ingests 1 million events per second and must answer two types of queries: (1) “what happened to user X in the last 5 minutes?” and (2) “how many events of type Y occurred across all users in the last hour?”

  1. Explain why no single partitioning key satisfies both query types simultaneously. What does this tell you about the system design?

  2. Propose an architecture that handles both query types without requiring a full-cluster scan for either. Your design may use multiple physical data stores with different partitioning strategies. Specify the partition key, the routing mechanism, and the synchronisation mechanism for each store.

  3. The system must add nodes during peak traffic without pausing ingest. Specify the rebalancing protocol. What invariants must hold during the rebalancing window? How would you verify them?

  4. Three months after launch, the team notices that events of type “purchase” are 50× more frequent than any other event type, and all purchase events partition to the same shard under your original design. Describe the hotspot this creates and propose a modification to your design that addresses it without changing the query API.

A complete answer will: (1) correctly explain why user_id and event_type cannot both be partition keys for the two query types and what dual-store architecture this necessitates, (2) identify FM6 (hotspot cascade on the purchase-type shard) as the failure mode and quantify the imbalance (50× normal load), (3) address the AT5 tradeoff between sub-partitioning purchase events (distributes load, requires query-time fan-out) and routing all queries through a central aggregation tier (simpler, creates a bottleneck), and (4) propose a concrete modification — such as salting the partition key for high-frequency event types — that spreads load without requiring the query API to change.

Read in the book →