Clusters and membership: discovering the SWIM protocol
Recently, I have been playing with distributed systems in Go. I am toying around with a distributed key-value store. Because why not?
This store — again, that I am building just for the sake of learning — will be able to operate as a single instance on a server as well as inside a cluster. And instead of defining statically through a configuration file which nodes are part for the cluster, I want to be able to add or remove nodes dynamically.
Each node in the cluster must be able to answer a seemingly simple question: « Who are my (active) peers? » Or expressed in a more fancy manner: how can the cluster detect nodes addition, nodes removal, nodes failures and propagate information across an entire cluster?
Have you ever wondered how Cassandra, Redis, Riak, and many others could maintain a cluster? Well, I have. And in this blog post, we’ll see two of the protocols that this type of service relies upon for their failure detection and memberships.
Heartbeat
Traditionally, distributed peer-to-peer applications rely on heart-beat style protocols to detect node failures. When a failure is detected, it is usually communicated to the cluster using some kind of broadcast or multicast subsystem.
We could summarize the process like this:
- every node sends an “heartbeat signal” every
T
interval of time to all other nodes ; - if no heartbeat from a node
N
is received afterT*limit
, the non-responsive node is considered dead.
The regular heartbeats solve the failure detection problem and we can also use the broadcast communication channel to transmit information related to which nodes are in the cluster. So this protocol solves both the failure detection problem and the member-list one.
But when we look more closely at what happens in the network, we notice that the
more the cluster grows, the more messages are sent.
For instance, a four-nodes cluster will send twelve heartbeats every
T
interval, a five-nodes one will send twenty, a six-nodes one will
send thirty, … The number of signals grows quadratically with heartbeat-style
protocols and this means that a too large cluster could take down the whole
network.
Protocol evaluation criteria
After noticing this first limitation in the heartbeat-based method, I realized that I actually needed a way to objectively evaluate protocols.
Luckily, a paper called « On scalable and efficient distributed failure detectors » published in 2001 by I. Gupta, T. D. Chandra, and G. S. Goldszmidt addresses this topic.
According to them, the key characteristics of the efficiency and scalability of distributed failure detector protocols are:
- Strong Completeness: crash-failure of any group member is detected by all non-faulty members ;
- Speed of failure detection: the time interval between a member failure and its detection by some non-faulty group member ;
- Accuracy: the rate of false positives of failure detection (note: 100% accuracy is impossible on asynchronous networks) ;
- Network Message Load, in bytes per second generated by the protocol.
Evaluating heartbeat
In the light of our evaluation criteria, what can we say about heartbeating?
- Strong Completeness: every nodes receives heartbeats, so a node failure will be detected independently by every other node in the cluster ;
- Speed of failure detection: this is tunable and is often set to a value
looking like
T * limit
which means that a failure will be detected afterlimit
intervalsT
of time ; - Network Message Load:
O(n²)
, wheren
is the number of nodes in the cluster. - Accuracy: there is a trade-off to be made here. No network being perfect,
there will always be a portion of the packets that will be lost. So if the
limit
value is set too low (to 1, for instance), a node might be considered dead just because the heartbeat was dropped by the network. On the other hand, iflimit
is set to a value that is too high, the speed of failure detection will increase accordingly. The trade-off is therefore between speed and accuracy. With a reliable-enough network, we can safely assume that a lowlimit
will suffice and hence we consider heartbeating as having a “reasonably high” accuracy.
Back in 2002, a group of researchers from Cornell University (the same three as previously) devised a new protocol to address the network load imposed by traditional heart-beating protocols. They called their new protocol SWIM.
Enters SWIM
The new idea introduced by SWIM — or Scalable Weakly-consistent Infection-style Process Group Membership Protocol — is the separation of the failure detection and membership update dissemination functionalities.
Failure detection component
As the name suggests, the role of this component is to detect potential failures of nodes in the cluster.
Like in heartbeating protocols, each node has a list of the nodes that are in
the cluster. But instead of brutally pinging all of them every T
interval of time, only one of them will be checked.
On this schema, we see a node randomly choosing another one of its peers to
check if it is still alive. It sends a PING
message and expects an
ACK
in response, indicating that the target node is healthy.
If no ACK
from the node C is received by A in the pre-specified
timeout period (determined by the message round-trip time, which is chosen
smaller than the protocol period), then an indirect check is performed.
A will select k
members at random from its list of nodes and send
them a REQ-PING(C)
message. Each of these nodes will in turn send
a PING
message to C and forward the ACK
(if
received) to A.
At the end of the protocol period T
, A checks if it has received
any direct or indirect response from C. If not, it declares C as dead in its
local membership list and hands this update off to the dissemination component.
Indirect probing is a great way to avoid any congested network path between
two nodes and dropped packets, which might have caused the direct
PING
to have failed in the first place. That’s also why the
ACK
is relayed back instead of being sent directly to the probing
node. The number of false-positives decreases and the accuracy is improved.
Note: the values T
, k
as well as the timeout
period are parameters of the protocol and can be tuned to suit different needs
and networks.
Dissemination component
Upon detecting the failure of another group member, the process multicasts this
information to the rest of the cluster as failed(C)
message.
A member receiving this message deletes the offending node from its local
membership list. Information about newly joined members or voluntarily
leaving members are disseminated in a similar manner.
Note: for a new node to join the cluster, it must know the address of at least one alive node member of the cluster. This way, it can introduce itself to cluster and the rest of the nodes will include it to the gossips, this allowing it to gradually receive information about all the nodes of the cluster.
Evaluating SWIM
If we sum up what SWIM does so far:
- it splits the failure detection and the dissemination components ;
- each node of the cluster periodically check the health of a random node of its memberlist (either directly or indirectly) ;
join()
andfailed()
events are multicast.
So given our evaluation criteria, what can we say about the protocol?
- Strong Completeness: as each node randomly selects at each protocol period a node to check, all nodes will eventually be checked and all faulty-nodes will be detected ;
- Speed of failure detection: the expected time between a failure of a node
and its detection by some other node is at most
T’ * (1 / (1 - e^Qf))
, whereQf
is the fraction of non-faulty nodes in the cluster andT’
is the protocol period (in time units, should be at least three times the round-trip estimate) ; - False positives: the probability of direct or indirect probes being lost
is (gross approximation here, the real equation is a bit larger)
(1 - Delivery_%²)*(1 - Delivery_%⁴)^k
— whereDelivery_%
is the probability of timely delivery of a packet andk
is the number of members used to perform indirect probing). Notice howk
is used as an exponent? It shows that increasing the number of nodes used for indirect probing will rapidly decrease the probability of having false positives ; - Accuracy: as the accuracy is directly correlated to the false positives,
we can calculate that with 95% of the packets successfully delivered and 3
nodes used for indirect probing, the protocol will have an accuracy of
99.9%. Again, this value increases as we increase
k
; - Network Message Load: as each node only pings one target each protocol period,
we moved from an
O(N²)
to something inO(N)
. We roughly have two packets (PING
andACK
) and an optional indirect probe (hence the 4 extra packets per relay node)(4*k + 2)*N
.
Important: the speed of failure detection and the probability of false positives don’t depend on the number of nodes in the cluster (except asymptotically). Which means that the protocol will scale nicely, even on large clusters.
A more robust and efficient SWIM
The version of the SWIM protocol we described so far could be qualified as « basic SWIM ». It works and showcases the main ideas of the protocol, but it suffers from a few issues and limitations:
- the dissemination component still relies on network multicasts. However, network multicast primitives such as IP multicast etc., are only best-effort and most of the time they aren’t even enabled on production systems ;
- the failure detector doesn’t handle well nodes that are perturbed for small durations of time (overloaded host, unexpectedly long GC pause, …) ;
- the basic SWIM failure detection protocol guarantees eventual detection of the failure of an arbitrary node. However, it gives no deterministic guarantees on the time between failure of an arbitrary node and its detection at another arbitrary member (in terms of the number of local protocol rounds).
Let’s dig in these limitations to better understand them and see what can be done to go around.
Infection-style dissemination
The basic protocol propagates membership updates (joins, leaves and failures) through the cluster using multicast primitives. Hardware multicast and IP multicast are available on most networks and operating systems, but are rarely enabled (for administrative or security reasons). The basic SWIM protocol would then have to use a costly broadcast, or an inefficient point-to-point messaging scheme, in order to disseminate the membership updates.
Furthermore, this type of multicast usually uses UDP and is a best-effort protocol. Reliably maintaining the memberlist would prove challenging.
Instead of multicast, the authors of the SWIM protocol suggested a different approach, eliminating the need for multicast altogether: piggybacking.
They took advantage of the fact that the protocol regularly sends messages to
other nodes (PING
, PING-REQ()
, ACK
) so
they decided to include “gossip information” in these packets. This
dissemination mechanism is called “infection-style”, as information spreads
in a manner analogous to the spread of gossip in society, or epidemic in the
general population.
Notice that this implementation of the dissemination component does not generate any extra packets (such as multicasts) - all “messages” handed to this component are propagated by piggybacking on the packets of the failure detection component. The network load remains the same as seen previously.
In the following example, we see how piggybacking can be used to disseminate information:
- A knows that C is dead ;
- D knows that E joined the cluster ;
- A directly probes B and includes a gossip indicating that C should be considered as failed. B acknowledges and updates its own memberlist to remove C ;
- A directly probes D and also indicates that C is dead. D
acknowledges and piggybacks the
ACK
packet to indicate that a new node E has joined the cluster.
Infection-style dissemination implies that members of the cluster might have a
different memberlist status as information takes more time to propagate. But it
has been shown that such epidemic process spreads exponentially fast and all the
nodes in the cluster will eventually (and rapidly) receive the gossips (it takes
O(log(N))
time to reach every node).
To improve resiliency in case of nodes failures and packet losses, each node has its own prioritized list of “gossips” to disseminate. And each packet sent by the protocol can be used to piggyback a fixed amount of gossips.
Suspicion mechanism
The failure detector component can easily detect as dead a node due to packet losses or because it was asleep for some time (because of a temporary issue, an unusual load of work, GC pauses, …). The node would then be incorrectly considered dead and forced to leave the cluster.
To mitigate this effect, a “suspicion” sub-protocol was introduced.
It works as follows. Consider a node A that chooses another node B as
ping
target. If A receives no direct or indirect ACK
from B, it does NOT declare it dead. Instead, it marks the unresponsive node
as suspected in its local memberlist and it transfers this update to the
dissemination component.
Each node receiving the update also updates its local memberlist and keeps on
disseminating the update.
That being said, suspected members do stay on the memberlists and are treated similarly to healthy members.
If any node receives an ACK
from a suspected node, it un-suspects
the node and disseminate the information.
Similarly, if a node receive a gossip indicating that it is suspected, it can
start propagating a “hey, I’m alive!” message to clarify the situation.
After a predefined timeout, suspected nodes are marked as dead and the information is propagated.
This mechanism reduces (but does not eliminate) the rate of failure detection false positives. Notice also that the Strong Completeness property of the original protocol continues to hold. Failures of processes suspecting a failed process may prolong detection time, but eventual detection is still guaranteed.
From the above discussion, Alive
messages override
Suspect
messages, and Confirm
messages override both
Suspect
and Alive
messages, in their effect on the
local membership list element corresponding to the suspected member.
However, a member might be suspected and unsuspected multiple times during its lifetime. And as no ordering of the event between the nodes is guaranteed, the following situation might happen:
What’s the status of C? Which node is right? Without any additional information, A believes that C is alive whereas B believes it’s suspected.
To resolve this ambiguity, the updates need to be distinguished through unique
identifiers: an incarnation number. Each node will maintain its own
incarnation number. It’s number set to 0 when the node joins the cluster and
can be incremented only by the node itself, when it receives information
about itself being suspected in the current incarnation. The suspected node will
then emit an Alive
message with an incremented incarnation number
and disseminate it.
The incarnation number must be included, along a node identifier, in every
Alive
, Suspect
and Confirm
messages. This
number will be used to establish an “happens-before” relationship between
events.
For instance:
- receiving the message
{ Alive C, incarnation = 1 }
would override the message{ Suspect C, incarnation = 0 }
as the incarnation number of the suspected C node is less than the alive C node. - on the other hand, the message
{ Alive C, incarnation = 1 }
would not be overridden by{ Suspect C, incarnation = 0 }
By comparing incarnation numbers, we can safely drop outdated messages and still end-up in a coherent state across the cluster.
Deterministic probe-target selection
In the first version of the protocol, the failure detector component chooses randomly the node to probe. Although each node failure is guaranteed to be detected eventually, a specially unlucky selection of nodes can lead to an important delay between the failure and its detection.
This can be fixed by using a round-robin selection instead of a random one. Joining nodes will be inserted in the member-list at random positions and when the list traversal is complete, it is randomly rearranged before being traversed again.
With this method, successive selections of the same target are at most
(2n - 1)
protocol periods apart. This bounds the worst case
detection time of a node failure.
Summary
- SWIM is a membership protocol designed to answer the question « Which are the alive nodes in a cluster? » in a scalable way
- the protocol is divided in two components: failure detection and information dissemination
- the failure detector works by directly or indirectly probing nodes, at fixed intervals
- the suspicion sub-protocol improves the failure detector’s accuracy by giving time to nodes for recovery
- information about the status of the cluster is disseminated using a infection-style method: events are included in the failure detection component and propagated as an infection or gossip would in real life, from peer to peer
- selected nodes to probe using a randomized round-robin makes the failure detection deterministic
Going further
While we went pretty deep in the memberlist problem, there are still a lof of subjects we didn’t explore:
- what happens in case of network partitions?
- how does SWIM deal with nodes voluntarily leaving the cluster?
- what about nodes re-joining?
- we totally ignored security. Shouldn’t the communications between nodes at least be encrypted?
- etc.
Some of these concerns — and more — are addressed in Serf. Two of HashiCorp’s products rely on a version of the SWIM protocol enhanced by their teams: Serf and Consul.
They provide a guide describing what modifications they introduced to the protocol. A convergence simulator can also be found on their website to visualize how fast a cluster will converge to a consistent state.
SWIM in Go: memberlist
And last but not least, the Go implementation of their modified version of SWIM is Open-Source and available on GitHub: https://github.com/hashicorp/memberlist
Here is how a node joins a cluster, using the memberlist library:
1/* Create the initial memberlist from a safe configuration.
2 Please reference the godoc for other default config types.
3 http://godoc.org/github.com/hashicorp/memberlist#Config
4*/
5list, err := memberlist.Create(memberlist.DefaultLocalConfig())
6if err != nil {
7 panic("Failed to create memberlist: " + err.Error())
8}
9
10// Join an existing cluster by specifying at least one known member.
11n, err := list.Join([]string{"1.2.3.4"})
12if err != nil {
13 panic("Failed to join cluster: " + err.Error())
14}
15
16// Ask for members of the cluster
17for _, member := range list.Members() {
18 fmt.Printf("Member: %s %s\n", member.Name, member.Addr)
19}
20
21// Continue doing whatever you need, memberlist will maintain membership
22// information in the background. Delegates can be used for receiving
23// events when members join or leave.
Wait… weren’t you initially talking about a key-value store?!
That’s right! The idea for this post came from the fact that I am building a key-value store prototype.
I wanted it to be distributed, so I documented myself about how to detect node failures and how to maintain a membership list in a peer-to-peer cluster.
Now that this step is done, I will have to take advantage of all of this to build the “store aspect” on top of it: how can I efficiently shard the data? How to replicate it? How to rebalance it when new nodes are added? Or when nodes fail?
But that’s for another day, in another post!