Replicated Data Store
Replicating data with CRDTs
To replicate state across nodes, the StateHandoff GenServer on each node will need to add references to the CRDT instances of all other nodes in the cluster.
The DeltaCrdt library provides the function set_neighbours/2
which configures a CRDT on a node with a list of DeltaCrdt
processes on other nodes with which it can sync state.
The docs state this is a unidirectional sync so a call to set_neighbours
will need to be made on each node in the cluster in order to fully sync data across all nodes.
Looking at the internals of the Horde library, I follow the distributed registry logic to the point where it calls send/2
to the local DeltaCrdt
process with a list of neighbors with which to sync.
The argument for neighbors is a list of tuples in the format of {crdt_name, node}
where crdt_name
is the registered name for the CRDT process used by the associated registry and node
is the node name where the CRDT process lives.
I continue following the logic for this send
call to the DeltaCrdt.CausalCrdt
module which defines the GenServer callback to handle the :set_neighbours
event.
It seems odd to me that the Horde library is making an undocumented direct send
call to the internal GenServer for DeltaCrdt instead of through the public interface, but it is likely due to the author of Horde being the same person who created DeltaCrdt who has a deep understanding of the internals of both libraries.
I check the source code for the DeltaCrdt
module’s set_neighbours/2
fuction and it is basically a proxy for the same send
call that Horde is making for the :set_neighbours
event.
I now have a good idea of how to use DeltaCrdt
to sync state across nodes in the cluster using set_neighbours/2
.
I already have a failing test in place to validate when the cross-node sync behavior is implemented so I can jump right into the StateHandoff
code to add the required logic.
I start by adding a private function to handle setting the neighbors for the CRDT process and make a call to it in the init
callback function.
defmodule Minotaur.StateHandoff do
# ...
def init(_opts) do
:net_kernel.monitor_nodes(true, node_type: :visible)
{:ok, crdt_pid} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap)
set_neighbors(crdt_pid)
{:ok, crdt_pid}
end
defp set_neighbors(crdt_pid) do
# TODO: Add neighbor nodes
neighbors = []
DeltaCrdt.set_neighbours(crdt_pid, neighbors)
end
end
For the second argument to set_neighbours
I will need to create a list of references to the CRDT process for each other node in the cluster.
I can get a list of other node names with Node.list()
which I can map over to create my reference list.
To reference the CRDT process of each node, I’ll need to register a name for the process when it is initialized on each node.
I update the init
callback to register a name when calling DeltaCrdt.start_link
and then use the registered name to create the list of CRDT neighbors.
def init(_opts) do
:net_kernel.monitor_nodes(true, node_type: :visible)
{:ok, crdt_pid} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, name: crdt_name())
set_neighbors(crdt_pid)
{:ok, crdt_pid}
end
defp set_neighbors(crdt_pid) do
neighbors =
Node.list()
|> Enum.map(fn node -> {crdt_name(), node} end)
DeltaCrdt.set_neighbours(crdt_pid, neighbors)
end
defp crdt_name() do
__MODULE__.Crdt
end
That should do it for setting the initial neighbors when StateHandoff
initializes!
However, the test will still fail with only these changes.
Let’s look at the test again to see what is happening:
describe "when state is stashed by one node" do
setup [:start_cluster, :stash_state]
test "state can be picked up from another node", ctx do
res = Cluster.call(ctx.cluster, ctx.node2, StateHandoff, :pickup, [@stash_key])
assert {:ok, @stash_value} == res
end
end
In the test setup function start_cluster
, node1
is initialized first so it won’t have any neighbors set when its StateHandoff
starts.
When node2
is started, it will set node1
as a neighbor, but this will only sync data in the direction of node2->node1.
In order for node2
to receive the synced change of node1
stashing data, node1
will need to respond to node2
joining the cluster and update its neighbor list.
The StateHandoff
GenServer is already subscribed to node events from monitor_nodes
in the :net_kernel
Erlang module so a simple change to the handlers will give me the behavior I need.
set_neighbors
is now called whenever a node connects or disconnects to the cluster.
def handle_info({:nodeup, _node, _opts}, crdt_pid) do
set_neighbors(crdt_pid)
{:noreply, crdt_pid}
end
def handle_info({:nodedown, _node, _opts}, crdt_pid) do
set_neighbors(crdt_pid)
{:noreply, crdt_pid}
end
The test is now passing… sometimes. There is going to be a slight delay for the state to sync across nodes so I update the test case with a simple wait helper I created the other day. The default wait time is 500ms which seems to be sufficent to make the test pass consistently.
test "state can be picked up from another node", ctx do
wait_until(fn ->
res = Cluster.call(ctx.cluster, ctx.node2, StateHandoff, :pickup, [@stash_key])
assert {:ok, @stash_value} == res
end)
end