Skip to content

Commit ee3d6ab

Browse files
authored
Issue release message after response. (#15)
Send release message after receiving query response. Must do this in order to reacquire agency and read an up-to-date version of the ledger. This fixes an issue where the ledger state tracked by the Query OTP process wouldn't be updated after establishing a connection. The result was that queries would always return the same value. This PR refactors Query states and events. The goal is to only acquire agency prior to submitting a query to the node. This ensures that the query will be performed against the most up-to-date version of the ledger.
1 parent 6b1b934 commit ee3d6ab

File tree

1 file changed

+74
-32
lines changed

1 file changed

+74
-32
lines changed

lib/xander/query.ex

+74-32
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
defmodule Xander.Query do
22
@moduledoc """
33
Issues ledger queries to a Cardano node using the Node-to-Client (n2c) protocol.
4-
This module implements the `gen_statem` behaviour.
4+
This module implements the `gen_statem` OTP behaviour.
55
"""
66

77
@behaviour :gen_statem
@@ -133,12 +133,12 @@ defmodule Xander.Query do
133133
Handshake.Proposal.version_message(@active_n2c_versions, network)
134134
)
135135

136-
case client.recv(socket, 0, 5_000) do
136+
case client.recv(socket, 0, _timeout = 5_000) do
137137
{:ok, full_response} ->
138138
{:ok, _handshake_response} = Handshake.Response.validate(full_response)
139139

140-
actions = [{:next_event, :internal, :acquire_agency}]
141-
{:next_state, :established_no_agency, data, actions}
140+
# Transitions to idle state
141+
{:next_state, :established_no_agency, data}
142142

143143
{:error, reason} ->
144144
Logger.error("Error establishing connection #{inspect(reason)}")
@@ -148,67 +148,109 @@ defmodule Xander.Query do
148148

149149
@doc """
150150
Emits events when in the `established_no_agency` state.
151+
This maps to the StIdle state in the Cardano Local State Query protocol.
152+
This is the state where the process waits for a query to be made.
151153
"""
152154
def established_no_agency(_event_type, _event_content, _data)
153155

154156
def established_no_agency(
155-
:internal,
156-
:acquire_agency,
157+
{:call, from},
158+
{:request, query_name},
157159
%__MODULE__{client: client, socket: socket} = data
158160
) do
161+
# Send acquire message to transition to Acquiring state
159162
:ok = client.send(socket, Messages.msg_acquire())
160-
{:ok, _acquire_response} = client.recv(socket, 0, 5_000)
161-
{:next_state, :established_has_agency, data}
163+
164+
# Handle acquire response (MsgAcquired) to transition to Acquired state
165+
case client.recv(socket, 0, _timeout = 5_000) do
166+
{:ok, _acquire_response} ->
167+
# Set socket to active mode to receive async messages from node
168+
:ok = setopts_lib(client).setopts(socket, active: :once)
169+
170+
# Track the caller and query_name, then transition to
171+
# established_has_agency state prior to sending the query.
172+
data = update_in(data.queue, &:queue.in({from, query_name}, &1))
173+
{:next_state, :established_has_agency, data, [{:next_event, :internal, :send_query}]}
174+
175+
{:error, reason} ->
176+
Logger.error("Failed to acquire state: #{inspect(reason)}")
177+
actions = [{:reply, from, {:error, :acquire_failed}}]
178+
{:keep_state, data, actions}
179+
end
162180
end
163181

164182
def established_no_agency(:info, {:tcp_closed, socket}, %__MODULE__{socket: socket} = data) do
165-
Logger.error("Connection closed")
183+
Logger.error("Connection closed while in established_no_agency")
166184
{:next_state, :disconnected, data}
167185
end
168186

169187
@doc """
170-
Emits events when in the `established_has_agency` state. Can send queries
171-
to the node when in this state.
188+
Emits events when in the `established_has_agency` state.
189+
This maps to the Querying state in the Cardano Local State Query protocol.
172190
"""
173191
def established_has_agency(_event_type, _event_content, _data)
174192

175193
def established_has_agency(
176-
{:call, from},
177-
{:request, request},
194+
:internal,
195+
:send_query,
178196
%__MODULE__{client: client, socket: socket} = data
179197
) do
180-
:ok = setopts_lib(client).setopts(socket, active: :once)
181-
182-
message =
183-
case request do
184-
:get_current_era -> Messages.get_current_era()
185-
:get_current_block_height -> Messages.get_current_block_height()
186-
:get_epoch_number -> Messages.get_epoch_number()
187-
:get_current_tip -> Messages.get_current_tip()
188-
end
189-
190-
:ok = client.send(socket, message)
191-
data = update_in(data.queue, &:queue.in(from, &1))
198+
# Get the current query_name from queue without removing it
199+
{:value, {_from, query_name}} = :queue.peek(data.queue)
200+
201+
# Send query to node and remain in established_has_agency state
202+
:ok = client.send(socket, build_query_message(query_name))
192203
{:keep_state, data}
193204
end
194205

206+
# This function is invoked due to the socket being set to active mode.
207+
# It receives a response from the node, sends a release message and
208+
# then transitions back to the established_no_agency state.
195209
def established_has_agency(
196210
:info,
197211
{_tcp_or_ssl, socket, bytes},
198-
%__MODULE__{socket: socket} = data
212+
%__MODULE__{client: client, socket: socket} = data
199213
) do
200-
{:ok, query_response} = Response.parse_response(bytes)
201-
{{:value, caller}, data} = get_and_update_in(data.queue, &:queue.out/1)
202-
# This action issues the response back to the client
203-
actions = [{:reply, caller, {:ok, query_response}}]
204-
{:keep_state, data, actions}
214+
# Parse query response (MsgResult)
215+
case Response.parse_response(bytes) do
216+
{:ok, query_response} ->
217+
# Get the caller from our queue
218+
{{:value, {caller, _query_name}}, new_data} = get_and_update_in(data.queue, &:queue.out/1)
219+
220+
# Send release message to transition back to StIdle
221+
:ok = client.send(socket, Messages.msg_release())
222+
223+
# Reply to caller and transition back to established_no_agency (StIdle)
224+
actions = [{:reply, caller, {:ok, query_response}}]
225+
{:next_state, :established_no_agency, new_data, actions}
226+
227+
{:error, reason} ->
228+
Logger.error("Failed to parse response: #{inspect(reason)}")
229+
{{:value, {caller, _query_name}}, new_data} = get_and_update_in(data.queue, &:queue.out/1)
230+
actions = [{:reply, caller, {:error, :parse_failed}}]
231+
{:next_state, :established_no_agency, new_data, actions}
232+
end
205233
end
206234

207235
def established_has_agency(:info, {:tcp_closed, socket}, %__MODULE__{socket: socket} = data) do
208-
Logger.error("Connection closed")
236+
Logger.error("Connection closed while querying")
209237
{:next_state, :disconnected, data}
210238
end
211239

240+
def established_has_agency(:timeout, _event_content, data) do
241+
Logger.error("Query timeout")
242+
{:next_state, :established_no_agency, data}
243+
end
244+
245+
defp build_query_message(query_name) do
246+
case query_name do
247+
:get_current_era -> Messages.get_current_era()
248+
:get_current_block_height -> Messages.get_current_block_height()
249+
:get_epoch_number -> Messages.get_epoch_number()
250+
:get_current_tip -> Messages.get_current_tip()
251+
end
252+
end
253+
212254
defp maybe_local_path(path, :socket), do: {:local, path}
213255
defp maybe_local_path(path, _), do: path
214256

0 commit comments

Comments
 (0)