Stashing state on game process termination

I’ve had slow but steady progress toward my goal of having rolling deploys that preserve running game session state. I sometimes only have an hour or two of free time in a day to spend time on this project, but breaking down the problems into smaller pieces and writing this dev journal has allowed me to be more focused and collect my thoughts.

Here is my task list:

  • Launch the application as nodes in an Erlang Cluster on deploy.
  • Start new game session processes on a new version node to replace the sessions of the old version node.
  • LiveView processes continue to communicate with the game sessions after session processes have migrated to new nodes.
  • State for each game session is stashed before the old session process terminates.
  • New game session processes initialize with the stashed state for the respective session.

I create a new test case which will check that game session state is stashed in the replicated StateHandoff store:

describe "when a node shuts down while having an active game session" do
  setup [:start_cluster, :start_game, :add_node2, :stop_node1]

  test "game session state can be found by node2", ctx do
    stash_key = "game_#{ctx.game.id}"
  
    wait_until(fn ->
      res = Cluster.call(ctx.cluster, ctx.node2, StateHandoff, :pickup, [stash_key])
      assert {:ok, ctx.game} == res
    end)
  end
end

I plan to eventually delete this test case once I finish the last task on my list which will cover picking up the state from the store. This new test knows about the internal implementation of how the game session process stashes state which makes this test potentially brittle if the mechanism for stashing and picking up state changes later. The behavior I care about testing is if the game state was moved to the new process, not how it does it.

The test is obviously failing since I have not changed any code at this point . I update the Minotaur.GameEngine.SessionServer module to trap exits and define a terminate/2 callback function.

def init(%Game{} = game) do
  Process.flag(:trap_exit, true)

  {:ok, game, {:continue, :continue_game}}
end

def terminate(reason, game) do
  IO.inspect("Terminating due to: #{reason}")
  Minotaur.StateHandoff.stash("game_#{game.id}", game)
end

I run the test to make sure the terminate callback is called, but the expected log message does not print. I check the ClusterHelper module used by the test setup to manage clustered nodes to see how it triggers the node to terminate. It uses :peer.stop which does not perform a graceful shutdown.

In order for my test to more closely resemble the scenario of the node application receiving a shutdown signal from the OS, I’ll need to call :init.stop within the node instead.

I modify the stop_node function of the cluster helper module to support a :graceful_shutdown option and update the test setup to use it.

defp stop_node1(%{cluster: cluster, node1: node1}) do
  Cluster.stop_node(cluster, node1, graceful_shutdown: true)
end

Now running the test shows a logged error that the StateHandoff process is not alive.

** (stop) exited in: GenServer.call(Minotaur.StateHandoff, {:stash, "game_XYZA", <GAME_STRUCT>, 5000)
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started

The handoff process should not be terminated until the supervised game session processes have completed their cleanup. The Elixir docs for stopping applications state

It will shut down every application in the opposite order they had been started.

I simply need to have StateHandoff started before the game session supervisor and registry in the application supervision tree so it isn’t stopped until game sessions are terminated.

After this change, the test is still failing due to the CRDT on node2 being empty for the expected key. I suspect the StateHandoff process is terminating before it completes its syncing to node2. I update StateHandoff to trap exits and hard code a 500ms sleep in the terminate callback.

def init(_opts) do
  :net_kernel.monitor_nodes(true, node_type: :visible)
  {:ok, crdt_pid} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, name: crdt_name())

  set_neighbors(crdt_pid)
  Process.flag(:trap_exit, true)

  {:ok, crdt_pid}
end

def terminate(_reason, _state) do
  :timer.sleep(500)
end

The test now passes!

Initialize game session with stashed state

The last task on my list is to have the game sessions pick up the stashed state when they are restarted on a new node.

As always, I start with a failing test. I want to check that any game state changes are replicated on the new node process so I’ll need update the test setup to apply some game state changes. The existing describe block applies the following setup functions:

describe "when a node shuts down while having an active game session" do
  setup [:start_cluster, :start_game, :add_node2, :stop_node1]

  # ...
end

I create a new update_game function ann add it to the setup list after :start_game. I also update the start_game function to create player references and define their starting positions.

defp update_game(ctx) do
  # TODO: modify game state
  :ok
end

defp start_game(%{cluster: cluster, node1: node1}) do
  player1 = %Player{id: 3, display_name: "Player1", user_id: 111}
  player2 = %Player{id: 6, display_name: "Player2", user_id: 222}
  coord = %Coord{q: -1, r: 0}

  game =
    game_fixture(
      players: [player1, player2],
      pc_coords: %{player1.id => coord, player2.id => coord}
    )

  {:ok, _pid} = Cluster.call(cluster, node1, GameEngine, :continue_game, [game])

  [game: game, player1: player1, player2: player2]
end

I apply a few game state changes on node1 through the GameEngine module interface and update the test context reference for :game.

defp update_game(ctx) do
  # Advance to round 4
  Cluster.call(ctx.cluster, ctx.node1, GameEngine, :end_round, [ctx.game.id])
  Cluster.call(ctx.cluster, ctx.node1, GameEngine, :end_round, [ctx.game.id])
  Cluster.call(ctx.cluster, ctx.node1, GameEngine, :end_round, [ctx.game.id])

  # Player1 attacks Player2
  Cluster.call(ctx.cluster, ctx.node1, GameEngine, :register_attack, [
    ctx.game.id,
    ctx.player1.user_id,
    ctx.player2.id
  ])

  # Player2 moves {1, 0}
  {:ok, game} =
    Cluster.call(ctx.cluster, ctx.node1, GameEngine, :register_move, [
      ctx.game.id,
      ctx.player2.user_id,
      %Vector{q: 1, r: 0}
    ])

  [game: game]
end

I then add a test case to verify the game state is resumed on the new node after it is replicated.

test "game session state is resumed on another node", ctx do
  %{cluster: cluster, node2: node2, game: game} = ctx

  wait_until(fn ->
    assert {:ok, game_node2} = Cluster.call(cluster, node2, GameEngine, :get_game, [game.id])
    assert game.id == game_node2.id
    assert game.round == game_node2.round
    assert game.registered_actions == game_node2.registered_actions
  end)
end

The first 2 test assertions pass, but the game round on node2 is still showing 1 which is expected at this point since the new process is using the original state defined in the child spec as the intial GenServer state.

I update the init callback for game session GenServers to use stashed state if it exists on startup.

def init(%Game{} = game) do
  Process.flag(:trap_exit, true)

  game_state = get_stashed_state(game.id, game)

  {:ok, game_state, {:continue, :continue_game}}
end

defp get_stashed_state(game_id, default_state) do
  stash_key = "game_#{game_id}"
  Minotaur.StateHandoff.pickup(stash_key) || default_state
end

The test surprisingly fails. I add a 200ms wait before calling pickup and this time the test passes. It appears the game session supervisor is spinning up the game session process on the new node before that node’s CRDT store has received updates from the terminating node.

The simplest solution would be to leave the hard coded sleep, but I opt to use a recursive retry loop that checks the stash until a value is found or a timeout is reached.

defp retry_until(0, _, fun), do: fun.()

defp retry_until(timeout, interval, fun) do
  case fun.() do
    nil ->
      :timer.sleep(interval)
      retry_until(max(0, timeout - interval), interval, fun)

    result ->
      result
  end
end

defp get_stashed_state(game_id, default_state) do
  stash_key = "game_#{game_id}"

  stashed_state =
    retry_until(250, 50, fn ->
      Minotaur.StateHandoff.pickup(stash_key)
    end)

  stashed_state || default_state
end

I add some additional logging to trace the flow of logic when running the test case:

"Starting game on node1"
"Starting game server XYZA"
"Checking stash for game_XYZA"
"Nope..."
"Nope..."
"Nope..."
"Nope..."
"Nope..."
"Stopping node1"
"Terminating server. Stashing state."
"Starting game server XYZA"
"Checking stash for game_XYZA"
"Nope..."
"Nope..."
"FOUND!"

When new game sessions are started as normal, they will still check for stashed state when none will ever be found. This seems okay for now since it’s an in-memory query but may be something to optimize in the future if this becomes a bottleneck. The delay will also have an effect on feature tests that start game session processes so this will be a potential area to improve in the future if the test suite run times are getting too long.

Running the test now passes! Unfortunately, several existing tests are failing because game state is being picked up from previous tests that use the same game id. The existing tests that start game session GenServer processes have the line use Minotaur.GameEngine.SessionSandbox which pulls in the following setup code to the test module:

defmacro __using__(_opts) do
  quote do
    setup do
      # Terminate all game sessions as they may share the same game id
      SessionSupervisor
      |> DynamicSupervisor.which_children()
      |> Enum.map(fn {_, pid, _, _} -> pid end)
      |> Enum.each(fn pid -> DynamicSupervisor.terminate_child(SessionSupervisor, pid) end)

      :ok
    end
  end
end

Now that I’ve implemented the process state handoff logic, I’ll need to also ensure that the stashed state is reset between tests. The solution I end up with is to create a reset_all function in the StateHandoff module which will remove all data from the CRDT store.

def reset_all() do
  GenServer.call(__MODULE__, :reset_all)
end

def handle_call(:reset_all, _from, crdt_pid) do
  keys =
    crdt_pid
    |> DeltaCrdt.to_map()
    |> Map.keys()

  DeltaCrdt.drop(crdt_pid, keys)

  {:reply, :ok, crdt_pid}
end

And update SessionSandbox test helper module to call StateHandoff.reset_all() in the setup block.

All tests are now passing!

Cleaning up

Now that my latest test case covers the full behavior of making sure the existing game session state is resumed on the new node, there is now overlap with the tests I wrote along the way.

I can now delete the tests for checking the session is started on another node and that the game state exists in the state handoff store.

describe "when a node shuts down while having an active game session" do
  setup [:start_cluster, :start_game, :update_game, :add_node2, :stop_node1]

  test "game session state is resumed on another node", ctx do
    %{cluster: cluster, node2: node2, game: game} = ctx

    wait_until(fn ->
      assert {:ok, game_node2} = Cluster.call(cluster, node2, GameEngine, :get_game, [game.id])
      assert game.id == game_node2.id
      assert game.round == game_node2.round
      assert game.registered_actions == game_node2.registered_actions
    end)
  end
-
- test "game session is restarted on another node", ctx do
-   wait_until(fn ->
-     res = Cluster.call(ctx.cluster, ctx.node2, GameEngine, :get_game, [ctx.game.id])
-
-     assert {:ok, %{id: game_id}} = res
-     assert ctx.game.id == game_id
-   end)
- end
-
- test "game session state can be found by node2", ctx do
-   stash_key = "game_#{ctx.game.id}"
-
-   wait_until(fn ->
-     res = Cluster.call(ctx.cluster, ctx.node2, StateHandoff, :pickup, [stash_key])
-     assert ctx.game == res
-   end)
- end
end