Replicating data with CRDTs

To replicate state across nodes, the StateHandoff GenServer on each node will need to add references to the CRDT instances of all other nodes in the cluster. The DeltaCrdt library provides the function set_neighbours/2 which configures a CRDT on a node with a list of DeltaCrdt processes on other nodes with which it can sync state. The docs state this is a unidirectional sync so a call to set_neighbours will need to be made on each node in the cluster in order to fully sync data across all nodes.

Looking at the internals of the Horde library, I follow the distributed registry logic to the point where it calls send/2 to the local DeltaCrdt process with a list of neighbors with which to sync. The argument for neighbors is a list of tuples in the format of {crdt_name, node} where crdt_name is the registered name for the CRDT process used by the associated registry and node is the node name where the CRDT process lives. I continue following the logic for this send call to the DeltaCrdt.CausalCrdt module which defines the GenServer callback to handle the :set_neighbours event. It seems odd to me that the Horde library is making an undocumented direct send call to the internal GenServer for DeltaCrdt instead of through the public interface, but it is likely due to the author of Horde being the same person who created DeltaCrdt who has a deep understanding of the internals of both libraries. I check the source code for the DeltaCrdt module’s set_neighbours/2 fuction and it is basically a proxy for the same send call that Horde is making for the :set_neighbours event.

I now have a good idea of how to use DeltaCrdt to sync state across nodes in the cluster using set_neighbours/2. I already have a failing test in place to validate when the cross-node sync behavior is implemented so I can jump right into the StateHandoff code to add the required logic.

I start by adding a private function to handle setting the neighbors for the CRDT process and make a call to it in the init callback function.

defmodule Minotaur.StateHandoff do
  # ...

  def init(_opts) do
    :net_kernel.monitor_nodes(true, node_type: :visible)
    {:ok, crdt_pid} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap)

    set_neighbors(crdt_pid)

    {:ok, crdt_pid}
  end

  defp set_neighbors(crdt_pid) do
    # TODO: Add neighbor nodes
    neighbors = []

    DeltaCrdt.set_neighbours(crdt_pid, neighbors)
  end
end

For the second argument to set_neighbours I will need to create a list of references to the CRDT process for each other node in the cluster. I can get a list of other node names with Node.list() which I can map over to create my reference list. To reference the CRDT process of each node, I’ll need to register a name for the process when it is initialized on each node.

I update the init callback to register a name when calling DeltaCrdt.start_link and then use the registered name to create the list of CRDT neighbors.

def init(_opts) do
  :net_kernel.monitor_nodes(true, node_type: :visible)
  {:ok, crdt_pid} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, name: crdt_name())

  set_neighbors(crdt_pid)

  {:ok, crdt_pid}
end

defp set_neighbors(crdt_pid) do
  neighbors =
    Node.list()
    |> Enum.map(fn node -> {crdt_name(), node} end)

  DeltaCrdt.set_neighbours(crdt_pid, neighbors)
end

defp crdt_name() do
  __MODULE__.Crdt
end

That should do it for setting the initial neighbors when StateHandoff initializes! However, the test will still fail with only these changes.

Let’s look at the test again to see what is happening:

describe "when state is stashed by one node" do
  setup [:start_cluster, :stash_state]

  test "state can be picked up from another node", ctx do
    res = Cluster.call(ctx.cluster, ctx.node2, StateHandoff, :pickup, [@stash_key])
    assert {:ok, @stash_value} == res
  end
end

In the test setup function start_cluster, node1 is initialized first so it won’t have any neighbors set when its StateHandoff starts. When node2 is started, it will set node1 as a neighbor, but this will only sync data in the direction of node2->node1. In order for node2 to receive the synced change of node1 stashing data, node1 will need to respond to node2 joining the cluster and update its neighbor list.

The StateHandoff GenServer is already subscribed to node events from monitor_nodes in the :net_kernel Erlang module so a simple change to the handlers will give me the behavior I need. set_neighbors is now called whenever a node connects or disconnects to the cluster.

def handle_info({:nodeup, _node, _opts}, crdt_pid) do
  set_neighbors(crdt_pid)
  {:noreply, crdt_pid}
end

def handle_info({:nodedown, _node, _opts}, crdt_pid) do
  set_neighbors(crdt_pid)
  {:noreply, crdt_pid}
end

The test is now passing… sometimes. There is going to be a slight delay for the state to sync across nodes so I update the test case with a simple wait helper I created the other day. The default wait time is 500ms which seems to be sufficent to make the test pass consistently.

test "state can be picked up from another node", ctx do
  wait_until(fn ->
    res = Cluster.call(ctx.cluster, ctx.node2, StateHandoff, :pickup, [@stash_key])
    assert {:ok, @stash_value} == res
  end)
end