Understanding Leader-Follower Replication

You could also watch my video on the same topic: part 1 and part 2

Concept of Data Replication

This section is about replication in general. All other sections will be specifically about leader-follower replication

Replication in distributed systems is essentially a technique that allows storing the same piece of data on multiple machines.

Replication provides the following benefits:

  • Scalability
    • increase number of replicas and distribute operations between them. System can serve more clients than with only one replica.
  • Availability
    • when one replica fails, clients can use the other replicas, thus system remains available.
  • Performance
    • system works faster by spreading load across multiple replicas
  • Make Data close to clients
    • avoid delays related to the physical limitation caused by the speed of light.
  • Caching
    • we could make the data very close to clients, to minimize costly remote calls..
  • Fault-Tolerance/Reliability
    • when replica fails, data is not lost
  • Durability
    • once data is written, they are durable because they are copied across multiple places.

That is an impressive list of benefits, but they do not come for free. The major fundamental problem which arises is how to keep data across replicas consistent? There is no easy solution, and multiple trade-offs has to be made. Even the definition of consistency is not straightforward, and there are in fact different consistency models.

There are a number of different approaches how to implement replication and in this article we will focus specifically on Leader-Follower replication technique.

Key Idea of Leader-Follower Replication

Probably the easiest replication technique from the perspectives of understanding and implementation is called leader-follower based replication

There is no standard terminology and there are a few names for this technique, all of them basically mean the same.

  • leader-follower
  • primary-secondary
  • master-slave

The main idea is that there is a single node (leader, master, primary) which accepts writes and sends streams of changes to other nodes nodes (read replicas, secondaries, backup, hot standby) which can be used for reads only.

Leader-follower replication solves two major challenges of replication: global ordering and conflict resolution. The leader node enforces an order on every write, and conflicts are avoided because all writes go through a single leader.

When client writes data to the leader, leader is responsible for replicating them to followers.

Reads can be handled by both leader and followers. Making reads from followers allows horizontal scaling for read queries but might introduce inconsistencies, due to replication lag.

Leader-Follower Replication is a method which is used by host of well known systems:

  • MySQL
  • MariaDB
  • PostgreSQL
  • Oracle Database
  • Microsoft SQL Server
  • MongoDB
  • Redis
  • ElasticSearch
  • IBM DB2
  • Apache Kafka
  • Rabbit MQ

This schema has several variations as described further.

Replication Modes and Topologies

Synchronous Followers

When client sends write request, leader sends data change request to followers synchronously, and only when all synchronous followers have responded the leader considers the operation completed and acknowledges the client ‘s request.

As a result, followers always have the same state as the leader. If the leader fails no data will be lost, because of exactly the same data is available on followers. Also reads from followers would always return the same data as would return a read from the leader.

But the trade-off here is performance and availability. Because leader has to wait until all followers acknowledge the request, the response time will be response time of the slowest follower. Also, in case at least one synchronous follower is unavailable, leader will not be able to acknowledge the client’s write request at all.

Asynchronous Followers

To mitigate the drawbacks of the synchronous followers, asynchronous scheme can be used. With asynchronous replication leader acknowledges client’s request immediately after it wrote it into its storage, and then asynchronously updates followers.

This improves performance, as the time to acknowledge the request depends only on leader. Also follower’s failure don’t prevent writes on the leader.

The trade-offs are consistency and durability. Because leader acknowledges request before it is sent to any follower, if leader crashes right after processing the request, the change will be lost. Additionally, due to the asynchronous nature of the propagation of updates followers will be lagging behind the leader, and reads from them can result in reading stale data. Thus such system would tend to provide weaker consistency model than system with synchronous followers.

Synchronous vs Asynchronous Replication

The trade-offs between synchronous and asynchronous replication could be summarized in the table below

Property

Synchronous Replication

Asynchronous Replication

Consistency

All replicas are always in sync.

Potentially it is possible to have linearizability or external consistency(which is basically analog of linearizability for transactions).

Replicas can lag after the leader.
It is possible to achieve eventual consistency, causal consistency and consistent prefix read.

Durability

Once the data is acknowledged by leader, they are durable, because they are available on all followers.

If the leader fails after acknowledgment, the data will be lost.

Performance

Worse write performance as writes require synchronous calls to each follower.

Good write performance because writes don’t depend on followers.

Availability

If a single follower failed, the whole system will be unavailable for writing, as the leader will not be able to acknowledge write requests.

Failure of any number of followers do not have any impact on leader ability to accept write requests.

Combined Replication

In practice, combined schema is often used [1] where one follower is synchronous and others are asynchronous. In that case the data is guaranteed to be consistent at least on two nodes – leader and the synchronous follower. If synchronous follower fails – then one of asynchronous followers is made synchronous.

Such approach provides a balance between all properties and selecting number of synchronous and asynchronous followers, we can control properties of the system.

Most of the systems which support leader follower replication support this combined mode and let to configure amount of synchronous followers.

Leader per Shard Approach

There is a variation of this schema when the data is split into partitions and each partition has independent set of leader and followers.

Despite that there are multiple leaders, each partition has only one leader, and overall the schema has almost all the properties of the conventional leader-follower replication.

Partitions(shards) are distributed across different physical nodes, so that each node can contain different shards, and could have leaders of one shard and followers of another.

This approach allows horizontal scaling, and it allows to distribute write load across multiple nodes, reduces blast radius of single node outage. The downsides are that data changes across partitions are not serializable any more. I.e. we can’t reliably say which operation was executed earlier between shards. This means that linearizability degrades towards sequential consistency. Also there could be some restrictions/difficulties in executing transactions that involve data from multiple shards.

The most known examples of real systems which use that design are ElasticSearch and Kafka. In kafka terminology nodes are called brokers and the shards are called partitions.

Local vs Remote Write

In [2] the leader followers systems are classified into the systems with local write and systems with remote write. All examples which were presented in the article so far were related to remote write. Basically the name comes from the fact that to make write, we need to send the request to the remote leader. Such approach introduces overhead on writes, especially if we need to make a lot of consecutive changes which are coming from single client.

Systems with local write solve this by temporarily transferring leader to the node or even client which needs to make some series of writes. The system remains single leader system, it is just being switched over to the place where writes are done.

In the illustration below we will depict just single leader, but in reality it could be just a leader of a single partition or a single piece of data which is going to be updated.

For example, let’s assume that we want to make some heavy write, or series of multiple writes, so that connecting to leader to execute each write could be too slow. For example leader could be located in another region, so that there is considerable latency for each request to the leader. So client could just send request to the follower.

Local Write: Follower receives write request

After receiving request, follower would initiate leader switchover procedure to transfer leadership role locally to the node which wants to make the write.

Local Write: Procedure to switch leader is initiated

After reconfiguration, the system would look like in the figure below. The follower became leader, and previous leader stepped down to become follower.

Local Write: Leader is switched over

And now the new leader can perform the write and distribute all the changes across followers.

Local Write: write is performed locally on a new leader

This technique not very often used in practice specifically with conventional leader-follower database systems and has rather theoretical interest, because the switchover and re-election of the leader is a very complex process with a lot of pitfalls and edge cases.

It would make sense to implement it when the overhead of the leader switchover is less than just overhead of sending lots of remote write requests.

Though similar technique is implemented in NFS v4 and called delegation. When a file or a directory needs to be updated the leadership over that specific file or directory can be temporarily delegated to the client which wants to make the write. If there is another client who wants to make the updated to the same piece of data, the temporal leadership can be revoked from the first client. This is described in more details in the RFC describing NFSv4.

Cascading Replication

Sometimes cascading replication is used, when there are several tiers of followers. The first tier gets replication stream directly from the leader, and the rest tiers get updates from the other follower from previous tier.

Cascading Replication

This approach can be used when there are hundreds or thousands of followers and it reduces the load on the leader node. In geographically distributed system it could be one follower per data center which fans out the changes to the other followers in the same datacenter.

The problem of such architecture is the complexity of maintaining that structure and possibility that downstream followers could have significant lag behind the leader. Also, in case of of the top level followers fail, it can leave the whole tree beneath it without updates.

Examples of that approach(just some of them) are: postgres cascading replication, oracle cascaded standby.

Discovery and Routing

Apparently, to send the writes to leader there is should be some knowledge about where that leader is. Depending on who has that knowledge there can be several ways of how client requests are routed.

In fact there is no best or worst methods and each of them has its pros and cons. In most cases it will be dictated by selected database.

Client

Here client always must know the topology of the cluster and route the request to the appropriate node. In case of writes client should definitely send to the leader. Where to send read request depends on the application logic.

Client Responsible for routing requests

Examples of that approach are: MongoDB(proof) and Kafka(proof).

This schema is the simplest from server implementation point of view, basically it moves all logic related to routing to client itself. Because client establishes connection directly with the required node the latency is low. Also there is no need for extra infrastructure components and costs associated with them.

The main drawback is caused by the fact that this logic moved to client side. It brings complexity to client. Also it leads to potential issues related to version compatibility between client libraries and server. In some cases t could lead to security issues exposing the cluster topology to client.

Router

There is some routing layer which has the knowledge about the topology.

Client Responsible for routing requests

There are following examples: HAProxy, Patroni, MySQL Router

This relieves client from all the complexity related to the logic of discovery and routing, allows centralized configuration management and can implement complex routing logic controlled by server. Additionally it allows to have connection pools of connection to the nodes and thus could save time for establishing connections.

The trade-offs are potential single point of failure, necessity to scale the router component, will add latency due to additional network hop for each request and potential complexity of the router component. Apart from that it is an extra component which requires some costs for maitanence, infrastructure etc.

DNS

In this approach we avoid having central component which sits in the middle between client and the nodes, and instead we reuse DNS infrastructure to map some domain name which client will use to current leader. In order for this to work we need some additional component which monitors cluster state and configures DNS according to the changes in the cluster.

In this approach, though we use two components – DNS and the server which configures it they are not that high loaded as in router approach, because DNS reconfiguration happens only when topology changes, which usually is not happening very often, and client also caches DNS information, so there are no DNS calls for every request.

DNS Based Routing

An example is Microsoft SQL Server Always On (proof) replication. It has an abstraction called Availability Group Listener which consists of DNS name and virtual IP which points to current leader.

The main advantage is that it does not require additional component on the critical path and there is a built-in caching. In general that simplifies client and server configuration.

But it has negative sides as well: there are delays in propagation of the topology updates, the problem of TTL management of the caches and it is impossible to implement complex routing for requests.

Node

Each node knows the topology and forwards client requests to the right node. Client in turn connects to any node (could be the closest one) and always sends requests to that specific node.

Node Based Routing

This approach is chosen by ElasticSearch (proof).

Here we keep simplicity of client implementation, no extra costs for routing component, no single point of failure. We can implement complex routing on the nodes.

Drawbacks are that it introduces complexity for node implementation, there additional network hops on each request and a lot of node-to-node communications, which could be hard to debug, monitor and uses a lot of network between nodes.

Addressing Common Challenges

Bootstrapping New Followers

Since the data is constantly updated, how do we set up new follower?

Setting Up New Follower: Initial State

One option is to make the leader read-only/stop it then copy the data and start the both. This approach would do in some situations, for example some systems are stopped for maintenance every weekend. But in general that approach is not practical.

So, instead a bit more complex sequence of steps is performed. First we need to make a consistent snapshot of the leader at some point in time and transfer this snapshot to the follower.

Setting Up New Follower: Transfer Consistent Snapshot To New Follower

During the time when consistent snapshot is taken and transferred there are ongoing writes, which would need to be applied later incrementally. So after consistent snapshot is applied on follower, the follower connects to the leader and requests all writes since the time of the consistent snapshot was taken. Usually leader would have some kind of replication log, and snapshot is associated with exact position in that log, rather than just relying on time.

Setting Up New Follower: Catching Up

When the follower has processed all incremental the data from the leader, we say that it has caught-up and now can process the regular replication flow from the leader, and can be used in queries.

Setting Up New Follower: Initial State

Followers Failure and Catch-Up Recovery

During ongoing replication follower keeps the log of data changes, which is synchronized with leader log in case of synchronous replication or lags behind the leader in case of asynchronous replication.

Catch-Up Recovery: Initial State

If follower fails, or there is some problem with the replication process or connection follower lags behind the leader and missing the latest writes.

Catch-Up Recovery: Failure

When follower is back online, it requests to the leader the last location in replication log which it processed, and leader sends all changes since that position. The process is actually the same as with setting up new follower.

Catch-Up Recovery: Catch-Up

Once all missing changes were processed, we say that follower caught up and leader continue sending all current changes of the data and the follower could be used again.

Catch-Up Recovery: Caught-Up

Leader Failures and Failover Process

While handling follower failure is trivial, handling leader failure is much more complex and tricky procedure. This process of detecting that leader failed and switching to the new leader is called failover.

Failover could be manual or automatic. Some systems support only manual failover. For example Postgres and MySQL by default do not have automatic failover functionality and they need some additional tools to perform the failover. Below there is an illustrative example which will show how automatic failover could happen. It does not describe any specific system but rather gives general idea of the failover process.

First, let’s assume that we have a system with one leader and three followers as shown on the figure below. There are clients that constantly write something to the leader, and those writes asynchronously replicated to followers.

Because of asynchronous process, followers slightly lag behind. On the picture leader has writes up to write 6, follower S2 has all 6 writes , S3 has only 3 writes, and S4 has 5 writes

Failover: System before failure

Then leader fails. Clients can’t do writes any more, and replication process stops.

Failover: Leader Crashed

Then followers detect the failure. Usually this is happening by timeout, either leader has not sent some heartbeat messages or did not respond on periodic ping from follower.

Failover: Followers detected the failure

After followers detected the leader’s failure, they initiate a process called leader election. Leader election is a complex process, and there are quite a few algorithms to do it.

System

Leader Election Algorithm

Oracle with Oracle Data Guard

There is no leader election is its common sense. Instead there is separate Observer process which monitors nodes and chooses the leader.

Microsoft SQL Server with Always On Availability Groups

Quorum-based leader election

MongoDB

Raft-based algorithm

Redis with Redis Sentinel

Quorum-based leader election algorithm

Elastic Search

PAXOS consensus

IBM DB2

Its own protocol called HADR

Apache Kafka

In older versions Zookeeper was used, in newer versions Raft-based protocol called KRaft.

Rabbit MQ

Raft consensus

Description of leader-election algorithms is out of scope of this article, its a separate big topic.

Failover: Leader Election

As a result of the election in our example S2 was promoted to be a leader. Often follower with the most actual state has priority for the promotion. In our example S2 has all the updates that leader had before it failed.

After new leader is elected all other followers needs to be reconfigured to receive replication streams from the new leader and clients should be reconfigured to use new leader for writes (and potentially reads)

Failover: System Uses New Leader

But this is not the end of the process yet, because old leader might recover. It could be brought up, or network connection could be re-established. Since the old leader was down during the re-election it does not know that there is a new leader. So it needs to discover this and step down.

Failover: Old Leader Detects The New Leader

So, next the old leader becomes a follower and starts processing updates from new leader.

Failover: Old leader stepped down

So far we focused on the positive scenario, and in following section several common problems will be considered.

Potential Pitfalls

Failure Detection

First of all, it is required to reliably detect failure of the leader.

Failures in general can be classified into several types, according to [8] there are following types:

Type of Failure

Description of server’s behaviour

Crash Failure

Halts, but working correctly until it halts

Omission failure

  • Receive omission
  • Send omission
  • Fails to respond to incoming requests
  • Fails to receive incoming messages
  • Fails to send messages

Timing failure

Response lies outside of specified time interval

Response failure

  • Value failure
  • State-transition failure
  • Response is incorrect
  • The value of the response is wrong
  • Deviates from the correct flow of control

Arbitrary failure

May produce arbitrary responses at arbitrary times

Normally, in leader-follower system we will be able to work only with first 3 types of failures from the table above, and we will not be able to distinct between them. Basically the only known failure detectors are based on timeouts. Either leader sends heartbeats or followers actively “ping” the leader and check its liveness status. If during some timeout followers were not able to ensure that the leader is alive, they consider it failed and start failover procedure.

There is a hard question for selecting timeout for considering leader to be dead.

  • Short-timeout will cause shorter recovery time, in case of leader has truly failed. But in case in just experiencing temporal glitch, there is an overhead from unnecessary failover procedure.
  • Longer timeout would prolong recovery time, but at the same time it lets to prevent failover in case of short temporary glitches.

Durability

Durability means the property of a system which means that when writes are acknowledged they are not lost even in case of failure of the server.

In case synchronous replication is used and leader fails, no data is lost, because there is a follower which has exactly the same data as the leader, because leader does not acknowledge client requests until it got confirmation from the follower.

However with asynchronous followers, new leader might not have received all the writes from the old leader before it failed. For example on the illustration below, write 6 was not replicated at the moment when leader S1 failed. If the old leader recovers it will detect that it has the write which is not known to the new leader, which could by that time already has processed new writes, And those writes might be in conflict with that not replicated write.

Durability Violation

Because of this usually such writes are just discarded, and that means that such system violates durability property. The problem of discarded writes is especially dangerous in case some data outside of the database depend on those writes.

Split Brain

Assume that leader S1 has crashed in the system.

Split-brain: Crash of the first leader

According to failover process, the rest of the followers detected the failure of the leader and as a result S2 was promoted to become a new leader.

Split Brain: Election of the new leader

But the first leader could recover from the failure, and it might have failed to detect that there is a new leader now. So in this situation we have two active leaders that could function together during some time. This is called split-brain and it is a dangerous situation, because both leaders will be accepting writes, which violates that main idea of the system with single leader. This might cause conflicts between writes and data corruption.

Split-brain: Two Active Leaders

Sometimes as a safety catch, if the system detects that there are two leaders, it shuts down one of them. But if this is not carefully designed, it could end up with shutting down both of them.

Split-brain: System Without A Leader

There is no easy and straightforward solutions for those problems, and because of that sometimes manual failover could be preferred.

References

[1] Martin Kleppmann. Designing Data-Intensive Applications. 2017

[2] Maarten van Steen, Andrew S. Tanenbaum, Distributed Systems, 4th edition, 2023

[3] https://www.elastic.co/guide/en/elasticsearch/reference/current/scalability.html

[4] https://docs.confluent.io/kafka/design/replication.html

[5] https://www.rfc-editor.org/rfc/rfc5661#section-10

[6] https://www.postgresql.org/docs/current/warm-standby.html#CASCADING-REPLICATION

[7] https://docs.oracle.com/html/E10700_01/cascade_appx.htm

[8] https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.md

[9] https://kafka.apache.org/protocol.html#protocol_partitioning

[10] https://docs.haproxy.org/

[11] https://patroni.readthedocs.io/en/latest/

[12] https://dev.mysql.com/doc/mysql-router/8.0/en/mysql-router-innodb-cluster.html

[13] https://learn.microsoft.com/en-us/sql/database-engine/availability-groups/windows/availability-group-listener-overview

[14] https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-node.html

Comments

Leave a Reply

Discover more from Ivan Fedianin

Subscribe now to keep reading and get access to the full archive.

Continue reading