You could also watch my video on the same topic: part 1 and part 2
Concept of Data Replication
This section is about replication in general. All other sections will be specifically about leader-follower replication
Replication in distributed systems is essentially a technique that allows storing the same piece of data on multiple machines.

Replicated Agent Smith
Replication provides the following benefits:
- Scalability
- increase number of replicas and distribute operations between them. System can serve more clients than with only one replica.
- Availability
- when one replica fails, clients can use the other replicas, thus system remains available.
- Performance
- system works faster by spreading load across multiple replicas
- Make Data close to clients
- avoid delays related to the physical limitation caused by the speed of light.
- Caching
- we could make the data very close to clients, to minimize costly remote calls..
- Fault-Tolerance/Reliability
- when replica fails, data is not lost
- Durability
- once data is written, they are durable because they are copied across multiple places.
That is an impressive list of benefits, but they do not come for free. The major fundamental problem which arises is how to keep data across replicas consistent? There is no easy solution, and multiple trade-offs has to be made. Even the definition of consistency is not straightforward, and there are in fact different consistency models.
There are a number of different approaches how to implement replication and in this article we will focus specifically on Leader-Follower replication technique.
Key Idea of Leader-Follower Replication
Probably the easiest replication technique from the perspectives of understanding and implementation is called leader-follower based replication
There is no standard terminology and there are a few names for this technique, all of them basically mean the same.
- leader-follower
- primary-secondary
- master-slave
The main idea is that there is a single node (leader, master, primary) which accepts writes and sends streams of changes to other nodes nodes (read replicas, secondaries, backup, hot standby) which can be used for reads only.
Leader-follower replication solves two major challenges of replication: global ordering and conflict resolution. The leader node enforces an order on every write, and conflicts are avoided because all writes go through a single leader.
Conceptual Diagram of Leader-Follower Replication
When client writes data to the leader, leader is responsible for replicating them to followers.
Reads can be handled by both leader and followers. Making reads from followers allows horizontal scaling for read queries but might introduce inconsistencies, due to replication lag.
Leader-Follower Replication is a method which is used by host of well known systems:
- MySQL
- MariaDB
- PostgreSQL
- Oracle Database
- Microsoft SQL Server
- MongoDB
- Redis
- ElasticSearch
- IBM DB2
- Apache Kafka
- Rabbit MQ
This schema has several variations as described further.
Replication Modes and Topologies
Synchronous Followers
When client sends write request, leader sends data change request to followers synchronously, and only when all synchronous followers have responded the leader considers the operation completed and acknowledges the client ‘s request.
Sequence Diagram for Synchronous Replication
As a result, followers always have the same state as the leader. If the leader fails no data will be lost, because of exactly the same data is available on followers. Also reads from followers would always return the same data as would return a read from the leader.
But the trade-off here is performance and availability. Because leader has to wait until all followers acknowledge the request, the response time will be response time of the slowest follower. Also, in case at least one synchronous follower is unavailable, leader will not be able to acknowledge the client’s write request at all.
Asynchronous Followers
To mitigate the drawbacks of the synchronous followers, asynchronous scheme can be used. With asynchronous replication leader acknowledges client’s request immediately after it wrote it into its storage, and then asynchronously updates followers.
Sequence Diagram for Asynchronous Replication
This improves performance, as the time to acknowledge the request depends only on leader. Also follower’s failure don’t prevent writes on the leader.
The trade-offs are consistency and durability. Because leader acknowledges request before it is sent to any follower, if leader crashes right after processing the request, the change will be lost. Additionally, due to the asynchronous nature of the propagation of updates followers will be lagging behind the leader, and reads from them can result in reading stale data. Thus such system would tend to provide weaker consistency model than system with synchronous followers.
Synchronous vs Asynchronous Replication
The trade-offs between synchronous and asynchronous replication could be summarized in the table below
|
Property |
Synchronous Replication |
Asynchronous Replication |
|---|---|---|
|
Consistency |
All replicas are always in sync. |
Replicas can lag after the leader. |
|
Durability |
Once the data is acknowledged by leader, they are durable, because they are available on all followers. |
If the leader fails after acknowledgment, the data will be lost. |
|
Performance |
Worse write performance as writes require synchronous calls to each follower. |
Good write performance because writes don’t depend on followers. |
|
Availability |
If a single follower failed, the whole system will be unavailable for writing, as the leader will not be able to acknowledge write requests. |
Failure of any number of followers do not have any impact on leader ability to accept write requests. |
Combined Replication
In practice, combined schema is often used [1] where one follower is synchronous and others are asynchronous. In that case the data is guaranteed to be consistent at least on two nodes – leader and the synchronous follower. If synchronous follower fails – then one of asynchronous followers is made synchronous.
Sequence Diagram for Combined Replication
Such approach provides a balance between all properties and selecting number of synchronous and asynchronous followers, we can control properties of the system.
Most of the systems which support leader follower replication support this combined mode and let to configure amount of synchronous followers.
Leader per Shard Approach
There is a variation of this schema when the data is split into partitions and each partition has independent set of leader and followers.
Conceptual Diagram for Leader Per Shard
Despite that there are multiple leaders, each partition has only one leader, and overall the schema has almost all the properties of the conventional leader-follower replication.
Partitions(shards) are distributed across different physical nodes, so that each node can contain different shards, and could have leaders of one shard and followers of another.
This approach allows horizontal scaling, and it allows to distribute write load across multiple nodes, reduces blast radius of single node outage. The downsides are that data changes across partitions are not serializable any more. I.e. we can’t reliably say which operation was executed earlier between shards. This means that linearizability degrades towards sequential consistency. Also there could be some restrictions/difficulties in executing transactions that involve data from multiple shards.
The most known examples of real systems which use that design are ElasticSearch and Kafka. In kafka terminology nodes are called brokers and the shards are called partitions.
Local vs Remote Write
In [2] the leader followers systems are classified into the systems with local write and systems with remote write. All examples which were presented in the article so far were related to remote write. Basically the name comes from the fact that to make write, we need to send the request to the remote leader. Such approach introduces overhead on writes, especially if we need to make a lot of consecutive changes which are coming from single client.
Systems with local write solve this by temporarily transferring leader to the node or even client which needs to make some series of writes. The system remains single leader system, it is just being switched over to the place where writes are done.
In the illustration below we will depict just single leader, but in reality it could be just a leader of a single partition or a single piece of data which is going to be updated.
For example, let’s assume that we want to make some heavy write, or series of multiple writes, so that connecting to leader to execute each write could be too slow. For example leader could be located in another region, so that there is considerable latency for each request to the leader. So client could just send request to the follower.
Local Write: Follower receives write request
After receiving request, follower would initiate leader switchover procedure to transfer leadership role locally to the node which wants to make the write.
Local Write: Procedure to switch leader is initiated
After reconfiguration, the system would look like in the figure below. The follower became leader, and previous leader stepped down to become follower.
Local Write: Leader is switched over
And now the new leader can perform the write and distribute all the changes across followers.
Local Write: write is performed locally on a new leader
This technique not very often used in practice specifically with conventional leader-follower database systems and has rather theoretical interest, because the switchover and re-election of the leader is a very complex process with a lot of pitfalls and edge cases.
It would make sense to implement it when the overhead of the leader switchover is less than just overhead of sending lots of remote write requests.
Though similar technique is implemented in NFS v4 and called delegation. When a file or a directory needs to be updated the leadership over that specific file or directory can be temporarily delegated to the client which wants to make the write. If there is another client who wants to make the updated to the same piece of data, the temporal leadership can be revoked from the first client. This is described in more details in the RFC describing NFSv4.
Cascading Replication
Sometimes cascading replication is used, when there are several tiers of followers. The first tier gets replication stream directly from the leader, and the rest tiers get updates from the other follower from previous tier.
Cascading Replication
This approach can be used when there are hundreds or thousands of followers and it reduces the load on the leader node. In geographically distributed system it could be one follower per data center which fans out the changes to the other followers in the same datacenter.
The problem of such architecture is the complexity of maintaining that structure and possibility that downstream followers could have significant lag behind the leader. Also, in case of of the top level followers fail, it can leave the whole tree beneath it without updates.
Examples of that approach(just some of them) are: postgres cascading replication, oracle cascaded standby.
Discovery and Routing
Apparently, to send the writes to leader there is should be some knowledge about where that leader is. Depending on who has that knowledge there can be several ways of how client requests are routed.
In fact there is no best or worst methods and each of them has its pros and cons. In most cases it will be dictated by selected database.
Client
Here client always must know the topology of the cluster and route the request to the appropriate node. In case of writes client should definitely send to the leader. Where to send read request depends on the application logic.
Client Responsible for routing requests
Examples of that approach are: MongoDB(proof) and Kafka(proof).
This schema is the simplest from server implementation point of view, basically it moves all logic related to routing to client itself. Because client establishes connection directly with the required node the latency is low. Also there is no need for extra infrastructure components and costs associated with them.
The main drawback is caused by the fact that this logic moved to client side. It brings complexity to client. Also it leads to potential issues related to version compatibility between client libraries and server. In some cases t could lead to security issues exposing the cluster topology to client.
Router
There is some routing layer which has the knowledge about the topology.
Client Responsible for routing requests
There are following examples: HAProxy, Patroni, MySQL Router
This relieves client from all the complexity related to the logic of discovery and routing, allows centralized configuration management and can implement complex routing logic controlled by server. Additionally it allows to have connection pools of connection to the nodes and thus could save time for establishing connections.
The trade-offs are potential single point of failure, necessity to scale the router component, will add latency due to additional network hop for each request and potential complexity of the router component. Apart from that it is an extra component which requires some costs for maitanence, infrastructure etc.
DNS
In this approach we avoid having central component which sits in the middle between client and the nodes, and instead we reuse DNS infrastructure to map some domain name which client will use to current leader. In order for this to work we need some additional component which monitors cluster state and configures DNS according to the changes in the cluster.
In this approach, though we use two components – DNS and the server which configures it they are not that high loaded as in router approach, because DNS reconfiguration happens only when topology changes, which usually is not happening very often, and client also caches DNS information, so there are no DNS calls for every request.
DNS Based Routing
An example is Microsoft SQL Server Always On (proof) replication. It has an abstraction called Availability Group Listener which consists of DNS name and virtual IP which points to current leader.
The main advantage is that it does not require additional component on the critical path and there is a built-in caching. In general that simplifies client and server configuration.
But it has negative sides as well: there are delays in propagation of the topology updates, the problem of TTL management of the caches and it is impossible to implement complex routing for requests.
Node
Each node knows the topology and forwards client requests to the right node. Client in turn connects to any node (could be the closest one) and always sends requests to that specific node.
Node Based Routing
This approach is chosen by ElasticSearch (proof).
Here we keep simplicity of client implementation, no extra costs for routing component, no single point of failure. We can implement complex routing on the nodes.
Drawbacks are that it introduces complexity for node implementation, there additional network hops on each request and a lot of node-to-node communications, which could be hard to debug, monitor and uses a lot of network between nodes.
Addressing Common Challenges
Bootstrapping New Followers
Since the data is constantly updated, how do we set up new follower?
Setting Up New Follower: Initial State
One option is to make the leader read-only/stop it then copy the data and start the both. This approach would do in some situations, for example some systems are stopped for maintenance every weekend. But in general that approach is not practical.
So, instead a bit more complex sequence of steps is performed. First we need to make a consistent snapshot of the leader at some point in time and transfer this snapshot to the follower.
Setting Up New Follower: Transfer Consistent Snapshot To New Follower
During the time when consistent snapshot is taken and transferred there are ongoing writes, which would need to be applied later incrementally. So after consistent snapshot is applied on follower, the follower connects to the leader and requests all writes since the time of the consistent snapshot was taken. Usually leader would have some kind of replication log, and snapshot is associated with exact position in that log, rather than just relying on time.
Setting Up New Follower: Catching Up
When the follower has processed all incremental the data from the leader, we say that it has caught-up and now can process the regular replication flow from the leader, and can be used in queries.
Setting Up New Follower: Completed
Followers Failure and Catch-Up Recovery
During ongoing replication follower keeps the log of data changes, which is synchronized with leader log in case of synchronous replication or lags behind the leader in case of asynchronous replication.
Catch-Up Recovery: Initial State
If follower fails, or there is some problem with the replication process or connection follower lags behind the leader and missing the latest writes.
Catch-Up Recovery: Failure
When follower is back online, it requests to the leader the last location in replication log which it processed, and leader sends all changes since that position. The process is actually the same as with setting up new follower.
Catch-Up Recovery: Catch-Up
Once all missing changes were processed, we say that follower caught up and leader continue sending all current changes of the data and the follower could be used again.
Catch-Up Recovery: Caught-Up
Leader Failures and Failover Process
While handling follower failure is trivial, handling leader failure is much more complex and tricky procedure. This process of detecting that leader failed and switching to the new leader is called failover.
Failover could be manual or automatic. Some systems support only manual failover. For example Postgres and MySQL by default do not have automatic failover functionality and they need some additional tools to perform the failover. Below there is an illustrative example which will show how automatic failover could happen. It does not describe any specific system but rather gives general idea of the failover process.
First, let’s assume that we have a system with one leader and three followers as shown on the figure below. There are clients that constantly write something to the leader, and those writes asynchronously replicated to followers.
Because of asynchronous process, followers slightly lag behind. On the picture leader has writes up to write 6, follower S2 has all 6 writes , S3 has only 3 writes, and S4 has 5 writes
Failover: System before failure
Then leader fails. Clients can’t do writes any more, and replication process stops.
Failover: Leader Crashed
Then followers detect the failure. Usually this is happening by timeout, either leader has not sent some heartbeat messages or did not respond on periodic ping from follower.
Failover: Followers detected the failure
After followers detected the leader’s failure, they initiate a process called leader election. Leader election is a complex process, and there are quite a few algorithms to do it.
|
System |
Leader Election Algorithm |
|---|---|
|
Oracle with Oracle Data Guard |
There is no leader election is its common sense. Instead there is separate Observer process which monitors nodes and chooses the leader. |
|
Microsoft SQL Server with Always On Availability Groups |
Quorum-based leader election |
|
MongoDB |
Raft-based algorithm |
|
Redis with Redis Sentinel |
Quorum-based leader election algorithm |
|
Elastic Search |
PAXOS consensus |
|
IBM DB2 |
Its own protocol called HADR |
|
Apache Kafka |
In older versions Zookeeper was used, in newer versions Raft-based protocol called KRaft. |
|
Rabbit MQ |
Raft consensus |
Description of leader-election algorithms is out of scope of this article, its a separate big topic.
Failover: Leader Election
As a result of the election in our example S2 was promoted to be a leader. Often follower with the most actual state has priority for the promotion. In our example S2 has all the updates that leader had before it failed.
After new leader is elected all other followers needs to be reconfigured to receive replication streams from the new leader and clients should be reconfigured to use new leader for writes (and potentially reads)
Failover: System Uses New Leader
But this is not the end of the process yet, because old leader might recover. It could be brought up, or network connection could be re-established. Since the old leader was down during the re-election it does not know that there is a new leader. So it needs to discover this and step down.
Failover: Old Leader Detects The New Leader
So, next the old leader becomes a follower and starts processing updates from new leader.
Failover: Old leader stepped down
So far we focused on the positive scenario, and in following section several common problems will be considered.
Potential Pitfalls
Failure Detection
First of all, it is required to reliably detect failure of the leader.
Failures in general can be classified into several types, according to [8] there are following types:
|
Type of Failure |
Description of server’s behaviour |
|---|---|
|
Crash Failure |
Halts, but working correctly until it halts |
|
Omission failure
|
|
|
Timing failure |
Response lies outside of specified time interval |
|
Response failure
|
|
|
Arbitrary failure |
May produce arbitrary responses at arbitrary times |
Normally, in leader-follower system we will be able to work only with first 3 types of failures from the table above, and we will not be able to distinct between them. Basically the only known failure detectors are based on timeouts. Either leader sends heartbeats or followers actively “ping” the leader and check its liveness status. If during some timeout followers were not able to ensure that the leader is alive, they consider it failed and start failover procedure.
There is a hard question for selecting timeout for considering leader to be dead.
- Short-timeout will cause shorter recovery time, in case of leader has truly failed. But in case in just experiencing temporal glitch, there is an overhead from unnecessary failover procedure.
- Longer timeout would prolong recovery time, but at the same time it lets to prevent failover in case of short temporary glitches.
Durability
Durability means the property of a system which means that when writes are acknowledged they are not lost even in case of failure of the server.
In case synchronous replication is used and leader fails, no data is lost, because there is a follower which has exactly the same data as the leader, because leader does not acknowledge client requests until it got confirmation from the follower.
However with asynchronous followers, new leader might not have received all the writes from the old leader before it failed. For example on the illustration below, write 6 was not replicated at the moment when leader S1 failed. If the old leader recovers it will detect that it has the write which is not known to the new leader, which could by that time already has processed new writes, And those writes might be in conflict with that not replicated write.
Durability Violation
Because of this usually such writes are just discarded, and that means that such system violates durability property. The problem of discarded writes is especially dangerous in case some data outside of the database depend on those writes.
Split Brain
Assume that leader S1 has crashed in the system.
Split-brain: Crash of the first leader
According to failover process, the rest of the followers detected the failure of the leader and as a result S2 was promoted to become a new leader.
Split Brain: Election of the new leader
But the first leader could recover from the failure, and it might have failed to detect that there is a new leader now. So in this situation we have two active leaders that could function together during some time. This is called split-brain and it is a dangerous situation, because both leaders will be accepting writes, which violates that main idea of the system with single leader. This might cause conflicts between writes and data corruption.
Split-brain: Two Active Leaders
Sometimes as a safety catch, if the system detects that there are two leaders, it shuts down one of them. But if this is not carefully designed, it could end up with shutting down both of them.
Split-brain: System Without A Leader
There is no easy and straightforward solutions for those problems, and because of that sometimes manual failover could be preferred.
References
[1] Martin Kleppmann. Designing Data-Intensive Applications. 2017
[2] Maarten van Steen, Andrew S. Tanenbaum, Distributed Systems, 4th edition, 2023
[3] https://www.elastic.co/guide/en/elasticsearch/reference/current/scalability.html
[4] https://docs.confluent.io/kafka/design/replication.html
[5] https://www.rfc-editor.org/rfc/rfc5661#section-10
[6] https://www.postgresql.org/docs/current/warm-standby.html#CASCADING-REPLICATION
[7] https://docs.oracle.com/html/E10700_01/cascade_appx.htm
[9] https://kafka.apache.org/protocol.html#protocol_partitioning
[10] https://docs.haproxy.org/
[11] https://patroni.readthedocs.io/en/latest/
[12] https://dev.mysql.com/doc/mysql-router/8.0/en/mysql-router-innodb-cluster.html
[14] https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-node.html

Leave a Reply