Delta CRDT

I need to decide on how game session state will be handed off to a process on a new node during a rolling deploy. I want to delay using external tools like a database or Redis for as long as possible mostly to reduce infrastructure costs for this initial prototype. After learning more about how Horde implements Delta CRDTs to sync state across cluster nodes, I want to apply the same concept to process state handoffs in Minotaur.

The Horde library has a module Horde.NodeListener to watch for changes in the Elixir/Erlang cluster and update its internal Horde.Cluster memberships. NodeListener relies on the monitor_nodes function of the :net_kernel Erlang library which subscribes the calling process to node status changes when nodes are connected or disconnected from the cluster. When these status events are received by a NodeListener, an associated DeltaCrdt process is triggered to update its own membership list for nodes with which to syncronize state.

Creating my own implementation of distributed state looks fairly straightforward by using these core tools. DeltaCrdt is authored by Derek Kraan who also created Horde and his documentation is very helpful.

My initial idea is to build a GenServer that:

  • Starts a linked DeltaCrdt process to store state
  • Sets the initial “neighbor” nodes for syncing state
  • Subscribe to cluster node connect and disconnect events, updating the neighbor list in response
  • Provide an interface for reading and writing state

Handling node status events

I create a new module and add standard boilerplate for GenServer:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
defmodule Minotaur.StateHandoff do
  @moduledoc """
  This module defines a `GenServer` that manages a distributed data store which
  is synced to nodes across the cluster. A process can stash its state to the
  store for another process to pick up later.
  """

  use GenServer

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def init(_) do
    {:ok, nil}
  end
end

Looking at the docs for the Erlang net_kernel module, I can make a call to monitor_nodes in my GenServer which will subscribe it to receive nodeup and nodedown messages when nodes connect or disconnect respectively from the cluster. The GenServer will need to handle these node status messages which are documented to have the format {nodedown, Node, Info}. I’m not sure how exactly this will translate from Erlang syntax to Elixir so I update the GenServer to experiment with the interface.

14
15
16
17
18
19
20
21
22
23
  def init(_opts) do
    :net_kernel.monitor_nodes(true, node_type: :visible)

    {:ok, nil}
  end

  def handle_info(event, state) do
    IO.inspect(event)
    {:noreply, state}
  end

Since I have a cluster testing framework already setup, I make a temporary test in my existing cluster test module to find the event message signatures in the log output.

describe "tmp cluster test" do
  setup [:start_cluster]

  test "tmp case", ctx do
    Cluster.call(ctx.cluster, ctx.node1, Minotaur.StateHandoff, :start_link, [])
    [node2: node2] = add_node2(ctx)
    Cluster.stop_node(ctx.cluster, node2)

    wait_until(fn ->
      assert false
    end)
  end
end
{:nodeup,
 :"Elixir_Minotaur_Cluster_GameSessionTest_test_tmp_tmp-523-73164@127.0.0.1",
 [node_type: :visible]}
{:nodedown,
 :"Elixir_Minotaur_Cluster_GameSessionTest_test_tmp_tmp-523-73164@127.0.0.1",
 [node_type: :visible]}

Now that I know the message format for node status events, I can write callback handlers in the GenServer to act on them.

20
21
22
23
24
25
26
27
28
  def handle_info({:nodeup, node, _opts}, state) do
    IO.inspect("Node connected: #{node}")
    {:noreply, state}
  end

  def handle_info({:nodedown, node, _opts}, state) do
    IO.inspect("Node disconnected: #{node}")
    {:noreply, state}
  end

And I can see the callbacks are being handled as expected.

"Node connected: Elixir_Minotaur_Cluster_GameSessionTest_test_tmp_cluster_test_tmp_case-142-83433@127.0.0.1"
"Node disconnected: Elixir_Minotaur_Cluster_GameSessionTest_test_tmp_cluster_test_tmp_case-142-83433@127.0.0.1"

Next, I’ll look at adding DeltaCrdt to the project to implement a data store that syncs across all nodes in the cluster.