Calvin Databases

My notes on Calvin Databases.

Introduction

Calvin is a layer above storage systems to provide fast ACID transactions distributed across multiple nodes. These nodes don’t have any shared resources (shared-nothing nodes), so scaling is almost linear (as per the paper). The key insight of Calvin is that:

Once all transactions have been stored on a majority of nodes, we don’t need to commit them if we have deterministic execution. Just having a global order of transactions is enough to ensure consistency.

Also note that this global order can be formed by each node by itself without any coordination, provided that they all follow the same rules after receiving the same transactions.

Nodes, Replicas & Partitions

In Calvin, nodes are the physical machines that run the Calvin system. Meanwhile, a single replica can span multiple nodes, and each replica is a full copy of the entire data.

The entire data is split into partitions. In the paper, each partition is on a single node. Each replica has a copy of each partition allowing it to reconstruct the entire data. So say if data is partitioned into 3 partitions and we have 2 replicas, each replica will have a copy of all 3 partitions, giving us a total of 6 nodes.

A group of the same partition across different replicas is referred to as a replication group (I think a better term would be partition group). So partition 1 in replica 1 and partition 1 in replica 2 together form a replication group.

Architecture

There are 3 layers in Calvin which exist on each node:

  1. Sequencer: It receives transactions, batches them, replicates them to other nodes in its replication group, and sends them to the scheduler on all nodes within the replica.
  2. Scheduler: Schedules transactions for execution on the nodes. We still need some scheduling for fast execution across nodes.
  3. Storage: Storage layer which supports basic CRUD. For simplicity, it is assumed to be a key-value store.

Sequencer

The sequencer receives transactions from the client and is responsible for batching them within a single epoch. The paper takes 10ms as the epoch length. After batching, it needs to replicate the batch to all other nodes within the replication group. The epoch is synchronized across the entire system.

This replication can be done in 2 ways:

  1. Asynchronous Replication: Select a master replica, to which all transactions are forwarded. After compiling the batch, each node inside the master replica sends it to other nodes in its own replication group. This also gives better performance, at the cost of a more complex recovery strategy. From the paper:

    This has the advantage of extremely low latency before a transaction can begin being executed at the master replica, at the cost of significant complexity in failover. On the failure of a master sequencer, agreement has to be reached between all nodes in the same replica and all members of the failed node’s replication group regarding (a) which batch was the last valid batch sent out by the failed sequencer and (b) exactly what transactions that batch contained

  2. Synchronous Replication: Synchronize the batch across all nodes in the replication group using a consensus protocol like Paxos or Raft.

Within a replica, all the sequencers, after compiling their batch, will share their batch with all schedulers in the same replica.

Each sequencer will merge the batches in a deterministic round-robin fashion, so we don’t need to sync the global order of transactions, just the order within a batch of a partition. Note that this is independent of the replication mode. To merge batches in the round-robin fashion, there needs to some coordination between the sequencers to figure out which batch belongs to which epoch.

Another thing to note is that, for replication, there will be one master sequencer per replication group/partition. So for example is synchronous mode, there will be a consensus protocol running per partition, each having a master sequencer. In asynchronous mode, all sequencers within the master replica will be “master sequencers” (the term master sequencers is not mentioned in the paper, but I think it is implied).

Scheduler

Each scheduler is sent only the partial view of each epoch’s batches - only the transactions that are running on its partition, and so it will only handle locking for keys in its own partition. For transactions that can run on a single partition, we don’t need to do anything special.

But if a transaction is spanning multiple partitions, we send it to all the partitions it is running on. For such transactions, the scheduler will again only deal with locking for keys in its own partition. As this transaction will be sent to all concerning nodes, each node can independently deal only with transactions that run on its partition. So each node handles a transaction’s write on its own, and for reads, each participating node sends the read data to all other participating nodes. This way, schedulers need to do minimal communication.

Within each node, the scheduler only allows a transaction to acquire a lock to a key if all transactions before that have already acquired and released the lock on that key. This way, we guarantee deterministic execution. But this also means that we need to determine the read-write set of each transaction within a partition and then serialize the sets as per the order of transactions. This does not need to happen before executing the first transaction though; we can build the set up as we execute the transactions.

If a transaction needs to perform a read before determining its full read/write set, Calvin just first runs the transaction as a normal read-only transaction, determines the full read/write set, and then sends the transaction again to the sequencer. If your use case depends a lot on such queries, Calvin isn’t for you.

Checkpointing

Naïvely, you’d freeze the entire node and save the state to disk. Instead, to avoid this, Calvin relies on the global serial order instead.

Calvin sets a point within the global order of transactions and maintains a copy of the partition in memory. All transactions before this point are allowed to modify the copy, but transactions after this point can only read from the copy and not modify it. Then the checkpoint can happen asynchronously from this copy, and then this copy can be discarded. If you look at the code, you’ll see that they instead use a per-key-versioning, where old versions point to the next version in a linked list. This can probably further be improved by using some form of auto-checkpointing when new versions are created, but is not discussed in the paper and only mentioned as a comment in the code.

Further Reading & References