Clusters and membership: discovering the SWIM protocol

Recently, I have been playing with distributed systems in Go. I am toying around with a distributed key-value store. Because why not?

This store — again, that I am building just for the sake of learning — will be able to operate as a single instance on a server as well as inside a cluster. And instead of defining statically through a configuration file which nodes are part for the cluster, I want to be able to add or remove nodes dynamically.

Each node in the cluster must be able to answer a seemingly simple question: « Who are my (active) peers? » Or expressed in a more fancy manner: how can the cluster detect nodes addition, nodes removal, nodes failures and propagate information across an entire cluster?

Have you ever wondered how Cassandra, Redis, Riak, and many others could maintain a cluster? Well, I have. And in this blog post, we’ll see two of the protocols that this type of service relies upon for their failure detection and memberships.


Traditionally, distributed peer-to-peer applications rely on heart-beat style protocols to detect node failures. When a failure is detected, it is usually communicated to the cluster using some kind of broadcast or multicast subsystem.

We could summarize the process like this:

  • every node sends an “heartbeat signal” every T interval of time to all other nodes ;
  • if no heartbeat from a node N is received after T*limit, the non-responsive node is considered dead.
Heartbeat sent to a 4-nodes cluster

The regular heartbeats solve the failure detection problem and we can also use the broadcast communication channel to transmit information related to which nodes are in the cluster. So this protocol solves both the failure detection problem and the member-list one.

But when we look more closely at what happens in the network, we notice that the more the cluster grows, the more messages are sent. For instance, a four-nodes cluster will send twelve heartbeats every T interval, a five-nodes one will send twenty, a six-nodes one will send thirty, … The number of signals grows quadratically with heartbeat-style protocols and this means that a too large cluster could take down the whole network.

Protocol evaluation criteria

After noticing this first limitation in the heartbeat-based method, I realized that I actually needed a way to objectively evaluate protocols.

Luckily, a paper called « On scalable and efficient distributed failure detectors » published in 2001 by I. Gupta, T. D. Chandra, and G. S. Goldszmidt addresses this topic.

According to them, the key characteristics of the efficiency and scalability of distributed failure detector protocols are:

  • Strong Completeness: crash-failure of any group member is detected by all non-faulty members ;
  • Speed of failure detection: the time interval between a member failure and its detection by some non-faulty group member ;
  • Accuracy: the rate of false positives of failure detection (note: 100% accuracy is impossible on asynchronous networks) ;
  • Network Message Load, in bytes per second generated by the protocol.

Evaluating heartbeat

In the light of our evaluation criteria, what can we say about heartbeating?

  • Strong Completeness: every nodes receives heartbeats, so a node failure will be detected independently by every other node in the cluster ;
  • Speed of failure detection: this is tunable and is often set to a value looking like T * limit which means that a failure will be detected after limit intervals T of time ;
  • Network Message Load: O(n²), where n is the number of nodes in the cluster.
  • Accuracy: there is a trade-off to be made here. No network being perfect, there will always be a portion of the packets that will be lost. So if the limit value is set too low (to 1, for instance), a node might be considered dead just because the heartbeat was dropped by the network. On the other hand, if limit is set to a value that is too high, the speed of failure detection will increase accordingly. The trade-off is therefore between speed and accuracy. With a reliable-enough network, we can safely assume that a low limit will suffice and hence we consider heartbeating as having a “reasonably high” accuracy.

Back in 2002, a group of researchers from Cornell University (the same three as previously) devised a new protocol to address the network load imposed by traditional heart-beating protocols. They called their new protocol SWIM.

Enters SWIM

The new idea introduced by SWIM — or Scalable Weakly-consistent Infection-style Process Group Membership Protocol — is the separation of the failure detection and membership update dissemination functionalities.

Failure detection component

As the name suggests, the role of this component is to detect potential failures of nodes in the cluster.

Like in heartbeating protocols, each node has a list of the nodes that are in the cluster. But instead of brutally pinging all of them every T interval of time, only one of them will be checked.

On this schema, we see a node randomly choosing another one of its peers to check if it is still alive. It sends a PING message and expects an ACK in response, indicating that the target node is healthy.

SWIM: node successfully pinged

If no ACK from the node C is received by A in the pre-specified timeout period (determined by the message round-trip time, which is chosen smaller than the protocol period), then an indirect check is performed.

A will select k members at random from its list of nodes and send them a REQ-PING(C) message. Each of these nodes will in turn send a PING message to C and forward the ACK (if received) to A.

SWIM: indirect checks from A to C

At the end of the protocol period T, A checks if it has received any direct or indirect response from C. If not, it declares C as dead in its local membership list and hands this update off to the dissemination component.

Indirect probing is a great way to avoid any congested network path between two nodes and dropped packets, which might have caused the direct PING to have failed in the first place. That’s also why the ACK is relayed back instead of being sent directly to the probing node. The number of false-positives decreases and the accuracy is improved.

Note: the values T, k as well as the timeout period are parameters of the protocol and can be tuned to suit different needs and networks.

Dissemination component

Upon detecting the failure of another group member, the process multicasts this information to the rest of the cluster as failed(C) message. A member receiving this message deletes the offending node from its local membership list. Information about newly joined members or voluntarily leaving members are disseminated in a similar manner.

SWIM: failure of C, detected and multicast by A

Note: for a new node to join the cluster, it must know the address of at least one alive node member of the cluster. This way, it can introduce itself to cluster and the rest of the nodes will include it to the gossips, this allowing it to gradually receive information about all the nodes of the cluster.

Evaluating SWIM

If we sum up what SWIM does so far:

  • it splits the failure detection and the dissemination components ;
  • each node of the cluster periodically check the health of a random node of its memberlist (either directly or indirectly) ;
  • join() and failed() events are multicast.

So given our evaluation criteria, what can we say about the protocol?

  • Strong Completeness: as each node randomly selects at each protocol period a node to check, all nodes will eventually be checked and all faulty-nodes will be detected ;
  • Speed of failure detection: the expected time between a failure of a node and its detection by some other node is at most T’ * (1 / (1 - e^Qf)), where Qf is the fraction of non-faulty nodes in the cluster and T’ is the protocol period (in time units, should be at least three times the round-trip estimate) ;
  • False positives: the probability of direct or indirect probes being lost is (gross approximation here, the real equation is a bit larger) (1 - Delivery_%²)*(1 - Delivery_%⁴)^k — where Delivery_% is the probability of timely delivery of a packet and k is the number of members used to perform indirect probing). Notice how k is used as an exponent? It shows that increasing the number of nodes used for indirect probing will rapidly decrease the probability of having false positives ;
  • Accuracy: as the accuracy is directly correlated to the false positives, we can calculate that with 95% of the packets successfully delivered and 3 nodes used for indirect probing, the protocol will have an accuracy of 99.9%. Again, this value increases as we increase k ;
  • Network Message Load: as each node only pings one target each protocol period, we moved from an O(N²) to something in O(N). We roughly have two packets (PING and ACK) and an optional indirect probe (hence the 4 extra packets per relay node) (4*k + 2)*N.

Important: the speed of failure detection and the probability of false positives don’t depend on the number of nodes in the cluster (except asymptotically). Which means that the protocol will scale nicely, even on large clusters.

A more robust and efficient SWIM

The version of the SWIM protocol we described so far could be qualified as « basic SWIM ». It works and showcases the main ideas of the protocol, but it suffers from a few issues and limitations:

  • the dissemination component still relies on network multicasts. However, network multicast primitives such as IP multicast etc., are only best-effort and most of the time they aren’t even enabled on production systems ;
  • the failure detector doesn’t handle well nodes that are perturbed for small durations of time (overloaded host, unexpectedly long GC pause, …) ;
  • the basic SWIM failure detection protocol guarantees eventual detection of the failure of an arbitrary node. However, it gives no deterministic guarantees on the time between failure of an arbitrary node and its detection at another arbitrary member (in terms of the number of local protocol rounds).

Let’s dig in these limitations to better understand them and see what can be done to go around.

Infection-style dissemination

The basic protocol propagates membership updates (joins, leaves and failures) through the cluster using multicast primitives. Hardware multicast and IP multicast are available on most networks and operating systems, but are rarely enabled (for administrative or security reasons). The basic SWIM protocol would then have to use a costly broadcast, or an inefficient point-to-point messaging scheme, in order to disseminate the membership updates.

Furthermore, this type of multicast usually uses UDP and is a best-effort protocol. Reliably maintaining the memberlist would prove challenging.

Instead of multicast, the authors of the SWIM protocol suggested a different approach, eliminating the need for multicast altogether: piggybacking.

They took advantage of the fact that the protocol regularly sends messages to other nodes (PING, PING-REQ(), ACK) so they decided to include “gossip information” in these packets. This dissemination mechanism is called “infection-style”, as information spreads in a manner analogous to the spread of gossip in society, or epidemic in the general population.

Notice that this implementation of the dissemination component does not generate any extra packets (such as multicasts) - all “messages” handed to this component are propagated by piggybacking on the packets of the failure detection component. The network load remains the same as seen previously.

In the following example, we see how piggybacking can be used to disseminate information:

  1. A knows that C is dead ;
  2. D knows that E joined the cluster ;
  3. A directly probes B and includes a gossip indicating that C should be considered as failed. B acknowledges and updates its own memberlist to remove C ;
  4. A directly probes D and also indicates that C is dead. D acknowledges and piggybacks the ACK packet to indicate that a new node E has joined the cluster.
SWIM: infection-style dissemination

Infection-style dissemination implies that members of the cluster might have a different memberlist status as information takes more time to propagate. But it has been shown that such epidemic process spreads exponentially fast and all the nodes in the cluster will eventually (and rapidly) receive the gossips (it takes O(log(N)) time to reach every node).

To improve resiliency in case of nodes failures and packet losses, each node has its own prioritized list of “gossips” to disseminate. And each packet sent by the protocol can be used to piggyback a fixed amount of gossips.

Suspicion mechanism

The failure detector component can easily detect as dead a node due to packet losses or because it was asleep for some time (because of a temporary issue, an unusual load of work, GC pauses, …). The node would then be incorrectly considered dead and forced to leave the cluster.

To mitigate this effect, a “suspicion” sub-protocol was introduced.

It works as follows. Consider a node A that chooses another node B as ping target. If A receives no direct or indirect ACK from B, it does NOT declare it dead. Instead, it marks the unresponsive node as suspected in its local memberlist and it transfers this update to the dissemination component. Each node receiving the update also updates its local memberlist and keeps on disseminating the update.

That being said, suspected members do stay on the memberlists and are treated similarly to healthy members.

If any node receives an ACK from a suspected node, it un-suspects the node and disseminate the information. Similarly, if a node receive a gossip indicating that it is suspected, it can start propagating a “hey, I’m alive!” message to clarify the situation.

After a predefined timeout, suspected nodes are marked as dead and the information is propagated.

This mechanism reduces (but does not eliminate) the rate of failure detection false positives. Notice also that the Strong Completeness property of the original protocol continues to hold. Failures of processes suspecting a failed process may prolong detection time, but eventual detection is still guaranteed.

From the above discussion, Alive messages override Suspect messages, and Confirm messages override both Suspect and Alive messages, in their effect on the local membership list element corresponding to the suspected member.

However, a member might be suspected and unsuspected multiple times during its lifetime. And as no ordering of the event between the nodes is guaranteed, the following situation might happen:

Two nodes, receiving ambiguous updates

What’s the status of C? Which node is right? Without any additional information, A believes that C is alive whereas B believes it’s suspected.

To resolve this ambiguity, the updates need to be distinguished through unique identifiers: an incarnation number. Each node will maintain its own incarnation number. It’s number set to 0 when the node joins the cluster and can be incremented only by the node itself, when it receives information about itself being suspected in the current incarnation. The suspected node will then emit an Alive message with an incremented incarnation number and disseminate it.

The incarnation number must be included, along a node identifier, in every Alive, Suspect and Confirm messages. This number will be used to establish an “happens-before” relationship between events. For instance:

Two nodes, receiving unambiguous updates
  • receiving the message { Alive C, incarnation = 1 } would override the message { Suspect C, incarnation = 0 } as the incarnation number of the suspected C node is less than the alive C node.
  • on the other hand, the message { Alive C, incarnation = 1 } would not be overridden by { Suspect C, incarnation = 0 }

By comparing incarnation numbers, we can safely drop outdated messages and still end-up in a coherent state across the cluster.

Deterministic probe-target selection

In the first version of the protocol, the failure detector component chooses randomly the node to probe. Although each node failure is guaranteed to be detected eventually, a specially unlucky selection of nodes can lead to an important delay between the failure and its detection.

This can be fixed by using a round-robin selection instead of a random one. Joining nodes will be inserted in the member-list at random positions and when the list traversal is complete, it is randomly rearranged before being traversed again.

With this method, successive selections of the same target are at most (2n - 1) protocol periods apart. This bounds the worst case detection time of a node failure.


  • SWIM is a membership protocol designed to answer the question « Which are the alive nodes in a cluster? » in a scalable way
  • the protocol is divided in two components: failure detection and information dissemination
  • the failure detector works by directly or indirectly probing nodes, at fixed intervals
  • the suspicion sub-protocol improves the failure detector’s accuracy by giving time to nodes for recovery
  • information about the status of the cluster is disseminated using a infection-style method: events are included in the failure detection component and propagated as an infection or gossip would in real life, from peer to peer
  • selected nodes to probe using a randomized round-robin makes the failure detection deterministic

Going further

While we went pretty deep in the memberlist problem, there are still a lof of subjects we didn’t explore:

  • what happens in case of network partitions?
  • how does SWIM deal with nodes voluntarily leaving the cluster?
  • what about nodes re-joining?
  • we totally ignored security. Shouldn’t the communications between nodes at least be encrypted?
  • etc.

Some of these concerns — and more — are addressed in Serf. Two of HashiCorp’s products rely on a version of the SWIM protocol enhanced by their teams: Serf and Consul.

They provide a guide describing what modifications they introduced to the protocol. A convergence simulator can also be found on their website to visualize how fast a cluster will converge to a consistent state.

SWIM in Go: memberlist

And last but not least, the Go implementation of their modified version of SWIM is Open-Source and available on GitHub:

Here is how a node joins a cluster, using the memberlist library:

 1/* Create the initial memberlist from a safe configuration.
 2   Please reference the godoc for other default config types.
 5list, err := memberlist.Create(memberlist.DefaultLocalConfig())
 6if err != nil {
 7	panic("Failed to create memberlist: " + err.Error())
10// Join an existing cluster by specifying at least one known member.
11n, err := list.Join([]string{""})
12if err != nil {
13	panic("Failed to join cluster: " + err.Error())
16// Ask for members of the cluster
17for _, member := range list.Members() {
18	fmt.Printf("Member: %s %s\n", member.Name, member.Addr)
21// Continue doing whatever you need, memberlist will maintain membership
22// information in the background. Delegates can be used for receiving
23// events when members join or leave.

Wait… weren’t you initially talking about a key-value store?!

That’s right! The idea for this post came from the fact that I am building a key-value store prototype.

I wanted it to be distributed, so I documented myself about how to detect node failures and how to maintain a membership list in a peer-to-peer cluster.

Now that this step is done, I will have to take advantage of all of this to build the “store aspect” on top of it: how can I efficiently shard the data? How to replicate it? How to rebalance it when new nodes are added? Or when nodes fail?

But that’s for another day, in another post!