Finishing Process State Handoff
Stashing state on game process termination
I’ve had slow but steady progress toward my goal of having rolling deploys that preserve running game session state. I sometimes only have an hour or two of free time in a day to spend time on this project, but breaking down the problems into smaller pieces and writing this dev journal has allowed me to be more focused and collect my thoughts.
Here is my task list:
- Launch the application as nodes in an Erlang Cluster on deploy.
- Start new game session processes on a new version node to replace the sessions of the old version node.
- LiveView processes continue to communicate with the game sessions after session processes have migrated to new nodes.
- State for each game session is stashed before the old session process terminates.
- New game session processes initialize with the stashed state for the respective session.
I create a new test case which will check that game session state is stashed in the replicated StateHandoff
store:
describe "when a node shuts down while having an active game session" do
setup [:start_cluster, :start_game, :add_node2, :stop_node1]
test "game session state can be found by node2", ctx do
stash_key = "game_#{ctx.game.id}"
wait_until(fn ->
res = Cluster.call(ctx.cluster, ctx.node2, StateHandoff, :pickup, [stash_key])
assert {:ok, ctx.game} == res
end)
end
end
I plan to eventually delete this test case once I finish the last task on my list which will cover picking up the state from the store. This new test knows about the internal implementation of how the game session process stashes state which makes this test potentially brittle if the mechanism for stashing and picking up state changes later. The behavior I care about testing is if the game state was moved to the new process, not how it does it.
The test is obviously failing since I have not changed any code at this point .
I update the Minotaur.GameEngine.SessionServer
module to trap exits and define a terminate/2
callback function.
def init(%Game{} = game) do
Process.flag(:trap_exit, true)
{:ok, game, {:continue, :continue_game}}
end
def terminate(reason, game) do
IO.inspect("Terminating due to: #{reason}")
Minotaur.StateHandoff.stash("game_#{game.id}", game)
end
I run the test to make sure the terminate callback is called, but the expected log message does not print.
I check the ClusterHelper
module used by the test setup to manage clustered nodes to see how it triggers the node to terminate.
It uses :peer.stop
which does not perform a graceful shutdown.
In order for my test to more closely resemble the scenario of the node application receiving a shutdown signal from the OS, I’ll need to call :init.stop
within the node instead.
I modify the stop_node
function of the cluster helper module to support a :graceful_shutdown
option and update the test setup to use it.
defp stop_node1(%{cluster: cluster, node1: node1}) do
Cluster.stop_node(cluster, node1, graceful_shutdown: true)
end
Now running the test shows a logged error that the StateHandoff
process is not alive.
** (stop) exited in: GenServer.call(Minotaur.StateHandoff, {:stash, "game_XYZA", <GAME_STRUCT>, 5000)
** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
The handoff process should not be terminated until the supervised game session processes have completed their cleanup. The Elixir docs for stopping applications state
It will shut down every application in the opposite order they had been started.
I simply need to have StateHandoff started before the game session supervisor and registry in the application supervision tree so it isn’t stopped until game sessions are terminated.
After this change, the test is still failing due to the CRDT on node2 being empty for the expected key. I suspect the StateHandoff process is terminating before it completes its syncing to node2. I update StateHandoff to trap exits and hard code a 500ms sleep in the terminate callback.
def init(_opts) do
:net_kernel.monitor_nodes(true, node_type: :visible)
{:ok, crdt_pid} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, name: crdt_name())
set_neighbors(crdt_pid)
Process.flag(:trap_exit, true)
{:ok, crdt_pid}
end
def terminate(_reason, _state) do
:timer.sleep(500)
end
The test now passes!
Initialize game session with stashed state
The last task on my list is to have the game sessions pick up the stashed state when they are restarted on a new node.
As always, I start with a failing test.
I want to check that any game state changes are replicated on the new node process so I’ll need update the test setup to apply some game state changes.
The existing describe
block applies the following setup functions:
describe "when a node shuts down while having an active game session" do
setup [:start_cluster, :start_game, :add_node2, :stop_node1]
# ...
end
I create a new update_game
function ann add it to the setup list after :start_game
.
I also update the start_game
function to create player references and define their starting positions.
defp update_game(ctx) do
# TODO: modify game state
:ok
end
defp start_game(%{cluster: cluster, node1: node1}) do
player1 = %Player{id: 3, display_name: "Player1", user_id: 111}
player2 = %Player{id: 6, display_name: "Player2", user_id: 222}
coord = %Coord{q: -1, r: 0}
game =
game_fixture(
players: [player1, player2],
pc_coords: %{player1.id => coord, player2.id => coord}
)
{:ok, _pid} = Cluster.call(cluster, node1, GameEngine, :continue_game, [game])
[game: game, player1: player1, player2: player2]
end
I apply a few game state changes on node1 through the GameEngine
module interface and update the test context reference for :game
.
defp update_game(ctx) do
# Advance to round 4
Cluster.call(ctx.cluster, ctx.node1, GameEngine, :end_round, [ctx.game.id])
Cluster.call(ctx.cluster, ctx.node1, GameEngine, :end_round, [ctx.game.id])
Cluster.call(ctx.cluster, ctx.node1, GameEngine, :end_round, [ctx.game.id])
# Player1 attacks Player2
Cluster.call(ctx.cluster, ctx.node1, GameEngine, :register_attack, [
ctx.game.id,
ctx.player1.user_id,
ctx.player2.id
])
# Player2 moves {1, 0}
{:ok, game} =
Cluster.call(ctx.cluster, ctx.node1, GameEngine, :register_move, [
ctx.game.id,
ctx.player2.user_id,
%Vector{q: 1, r: 0}
])
[game: game]
end
I then add a test case to verify the game state is resumed on the new node after it is replicated.
test "game session state is resumed on another node", ctx do
%{cluster: cluster, node2: node2, game: game} = ctx
wait_until(fn ->
assert {:ok, game_node2} = Cluster.call(cluster, node2, GameEngine, :get_game, [game.id])
assert game.id == game_node2.id
assert game.round == game_node2.round
assert game.registered_actions == game_node2.registered_actions
end)
end
The first 2 test assertions pass, but the game round on node2 is still showing 1 which is expected at this point since the new process is using the original state defined in the child spec as the intial GenServer state.
I update the init
callback for game session GenServers to use stashed state if it exists on startup.
def init(%Game{} = game) do
Process.flag(:trap_exit, true)
game_state = get_stashed_state(game.id, game)
{:ok, game_state, {:continue, :continue_game}}
end
defp get_stashed_state(game_id, default_state) do
stash_key = "game_#{game_id}"
Minotaur.StateHandoff.pickup(stash_key) || default_state
end
The test surprisingly fails.
I add a 200ms wait before calling pickup
and this time the test passes.
It appears the game session supervisor is spinning up the game session process on the new node before that node’s CRDT store has received updates from the terminating node.
The simplest solution would be to leave the hard coded sleep, but I opt to use a recursive retry loop that checks the stash until a value is found or a timeout is reached.
defp retry_until(0, _, fun), do: fun.()
defp retry_until(timeout, interval, fun) do
case fun.() do
nil ->
:timer.sleep(interval)
retry_until(max(0, timeout - interval), interval, fun)
result ->
result
end
end
defp get_stashed_state(game_id, default_state) do
stash_key = "game_#{game_id}"
stashed_state =
retry_until(250, 50, fn ->
Minotaur.StateHandoff.pickup(stash_key)
end)
stashed_state || default_state
end
I add some additional logging to trace the flow of logic when running the test case:
"Starting game on node1"
"Starting game server XYZA"
"Checking stash for game_XYZA"
"Nope..."
"Nope..."
"Nope..."
"Nope..."
"Nope..."
"Stopping node1"
"Terminating server. Stashing state."
"Starting game server XYZA"
"Checking stash for game_XYZA"
"Nope..."
"Nope..."
"FOUND!"
When new game sessions are started as normal, they will still check for stashed state when none will ever be found. This seems okay for now since it’s an in-memory query but may be something to optimize in the future if this becomes a bottleneck. The delay will also have an effect on feature tests that start game session processes so this will be a potential area to improve in the future if the test suite run times are getting too long.
Running the test now passes!
Unfortunately, several existing tests are failing because game state is being picked up from previous tests that use the same game id.
The existing tests that start game session GenServer processes have the line use Minotaur.GameEngine.SessionSandbox
which pulls in the following setup code to the test module:
defmacro __using__(_opts) do
quote do
setup do
# Terminate all game sessions as they may share the same game id
SessionSupervisor
|> DynamicSupervisor.which_children()
|> Enum.map(fn {_, pid, _, _} -> pid end)
|> Enum.each(fn pid -> DynamicSupervisor.terminate_child(SessionSupervisor, pid) end)
:ok
end
end
end
Now that I’ve implemented the process state handoff logic, I’ll need to also ensure that the stashed state is reset between tests.
The solution I end up with is to create a reset_all
function in the StateHandoff
module which will remove all data from the CRDT store.
def reset_all() do
GenServer.call(__MODULE__, :reset_all)
end
def handle_call(:reset_all, _from, crdt_pid) do
keys =
crdt_pid
|> DeltaCrdt.to_map()
|> Map.keys()
DeltaCrdt.drop(crdt_pid, keys)
{:reply, :ok, crdt_pid}
end
And update SessionSandbox
test helper module to call StateHandoff.reset_all()
in the setup block.
All tests are now passing!
Cleaning up
Now that my latest test case covers the full behavior of making sure the existing game session state is resumed on the new node, there is now overlap with the tests I wrote along the way.
I can now delete the tests for checking the session is started on another node and that the game state exists in the state handoff store.
describe "when a node shuts down while having an active game session" do
setup [:start_cluster, :start_game, :update_game, :add_node2, :stop_node1]
test "game session state is resumed on another node", ctx do
%{cluster: cluster, node2: node2, game: game} = ctx
wait_until(fn ->
assert {:ok, game_node2} = Cluster.call(cluster, node2, GameEngine, :get_game, [game.id])
assert game.id == game_node2.id
assert game.round == game_node2.round
assert game.registered_actions == game_node2.registered_actions
end)
end
-
- test "game session is restarted on another node", ctx do
- wait_until(fn ->
- res = Cluster.call(ctx.cluster, ctx.node2, GameEngine, :get_game, [ctx.game.id])
-
- assert {:ok, %{id: game_id}} = res
- assert ctx.game.id == game_id
- end)
- end
-
- test "game session state can be found by node2", ctx do
- stash_key = "game_#{ctx.game.id}"
-
- wait_until(fn ->
- res = Cluster.call(ctx.cluster, ctx.node2, StateHandoff, :pickup, [stash_key])
- assert ctx.game == res
- end)
- end
end