-
Notifications
You must be signed in to change notification settings - Fork 125
Rewrite local association scenario for cleaner error handling and cle… #260
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
8c44351
f908f65
e0932ea
a6bfd00
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using NativeWebSocket; | ||
| using Newtonsoft.Json; | ||
|
|
@@ -11,137 +12,294 @@ | |
|
|
||
| // ReSharper disable once CheckNamespace | ||
|
|
||
| public class LocalAssociationScenario | ||
| public class LocalAssociationScenario : IDisposable | ||
| { | ||
| private readonly TimeSpan _clientTimeoutMs; | ||
| private readonly MobileWalletAdapterSession _session; | ||
| private readonly int _port; | ||
| private readonly IWebSocket _webSocket; | ||
| private AndroidJavaObject _nativeLocalAssociationScenario; | ||
| private TaskCompletionSource<Response<object>> _startAssociationTaskCompletionSource; | ||
| private readonly TimeSpan _overallTimeout = TimeSpan.FromSeconds(30); | ||
| private readonly TimeSpan _keyExchangeTimeout = TimeSpan.FromSeconds(20); | ||
|
|
||
| private bool _didConnect; | ||
| private bool _handledEncryptedMessage; | ||
| private MobileWalletAdapterClient _client; | ||
| private readonly AndroidJavaObject _currentActivity; | ||
| private Queue<Action<IAdapterOperations>> _actions; | ||
| private readonly int _port; | ||
| private readonly MobileWalletAdapterSession _session; | ||
| private IWebSocket _webSocket; | ||
| private MobileWalletAdapterClient _client; | ||
|
|
||
| private bool _isConnecting; | ||
| private bool _disposed; | ||
|
|
||
| public LocalAssociationScenario(int clientTimeoutMs = 9000) | ||
| private TaskCompletionSource<bool> _wsConnected; | ||
| private TaskCompletionSource<Response<object>> _responseTcs; | ||
| private TaskCompletionSource<Response<object>> _tcs; | ||
| private CancellationToken _cancellationToken; | ||
|
|
||
| public LocalAssociationScenario() | ||
| { | ||
| var unityPlayer = new AndroidJavaClass("com.unity3d.player.UnityPlayer"); | ||
| _currentActivity = unityPlayer.GetStatic<AndroidJavaObject>("currentActivity"); | ||
| _clientTimeoutMs = TimeSpan.FromSeconds(clientTimeoutMs); | ||
| _port = Random.Range(WebSocketsTransportContract.WebsocketsLocalPortMin, WebSocketsTransportContract.WebsocketsLocalPortMax + 1); | ||
| _currentActivity = GetCurrentActivity(); | ||
| _port = RandomPort(); | ||
| _session = new MobileWalletAdapterSession(); | ||
| var webSocketUri = WebSocketsTransportContract.WebsocketsLocalScheme + "://" + WebSocketsTransportContract.WebsocketsLocalHost + ":" + _port + WebSocketsTransportContract.WebsocketsLocalPath; | ||
| _webSocket = WebSocket.Create(webSocketUri, WebSocketsTransportContract.WebsocketsProtocol); | ||
| _webSocket.OnOpen += () => | ||
| { | ||
| if(_didConnect)return; | ||
| _didConnect = true; | ||
| var helloReq = _session.CreateHelloReq(); | ||
| _webSocket.Send(helloReq); | ||
| ListenKeyExchange(); | ||
| }; | ||
| _webSocket.OnClose += (e) => | ||
| { | ||
| if (!_didConnect) return; | ||
| _webSocket.Connect(awaitConnection: false); | ||
| }; | ||
| _webSocket.OnError += (e) => | ||
| { | ||
| Debug.Log("WebSocket Error: " + e); | ||
| }; | ||
| _webSocket.OnMessage += ReceivePublicKeyHandler; | ||
| } | ||
|
|
||
| private static AndroidJavaObject GetCurrentActivity() | ||
| { | ||
| var unityPlayer = new AndroidJavaClass("com.unity3d.player.UnityPlayer"); | ||
| return unityPlayer.GetStatic<AndroidJavaObject>("currentActivity"); | ||
| } | ||
|
|
||
| public Task<Response<object>> StartAndExecute(List<Action<IAdapterOperations>> actions) | ||
| public async Task<Response<object>> StartAndExecute(List<Action<IAdapterOperations>> actions, | ||
| CancellationToken ct = default) | ||
| { | ||
| if (actions == null || actions.Count == 0) | ||
| throw new ArgumentException("Actions must be non-null and non-empty"); | ||
| _actions = new Queue<Action<IAdapterOperations>>(actions); | ||
| var intent = LocalAssociationIntentCreator.CreateAssociationIntent( | ||
| _session.AssociationToken, | ||
| _port); | ||
| throw new ArgumentException("Actions required"); | ||
|
|
||
| using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); | ||
| cts.CancelAfter(_overallTimeout); | ||
|
|
||
| _cancellationToken = ct; | ||
| _tcs = new TaskCompletionSource<Response<object>>(); | ||
|
|
||
| StartActivityForAssociation(_session.AssociationToken, _port); | ||
|
|
||
| Debug.Log("[MWA] Waiting for websocket connection"); | ||
| await Task.Run(async () => | ||
| { | ||
| try | ||
| { | ||
| Debug.Log("[MWA Connect Thread] Started"); | ||
| _isConnecting = true; | ||
|
|
||
| await ConnectWithBackoffAsync(); | ||
|
|
||
| Debug.Log("[MWA Connect Thread] Completed"); | ||
| _isConnecting = false; | ||
|
|
||
| var helloReq = _session.CreateHelloReq(); | ||
| await _webSocket.Send(helloReq); | ||
|
|
||
| Debug.Log("[MWA] Hello sent. Waiting for pubkey..."); | ||
|
|
||
| await WaitForKeyExchangeAsync(cts.Token); | ||
|
|
||
| Debug.Log("[MWA] Pubkey received, session is encrypted"); | ||
|
|
||
| var queue = new Queue<Action<IAdapterOperations>>(actions); | ||
| Response<object> lastResponse = null; | ||
|
|
||
| while (queue.Count > 0) | ||
| { | ||
| _responseTcs = new TaskCompletionSource<Response<object>>(); | ||
|
|
||
| var action = queue.Dequeue(); | ||
| Debug.Log($"[MWA] Invoking action {action.Method.Name}"); | ||
| action.Invoke(_client); | ||
|
|
||
| lastResponse = await _responseTcs.Task; | ||
|
|
||
| ct.ThrowIfCancellationRequested(); | ||
| } | ||
|
|
||
| _tcs.TrySetResult(lastResponse ?? new Response<object>()); | ||
| } | ||
| catch (OperationCanceledException) | ||
| { | ||
| _tcs.TrySetResult(new Response<object> | ||
| { | ||
| Error = new Response<object>.ResponseError { Message = "Timeout or cancelled" } | ||
| }); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| Debug.Log($"[MWA] Association failed: {ex}"); | ||
| _tcs.TrySetException(ex); | ||
| } | ||
| finally | ||
| { | ||
| await CleanupAsync(); | ||
| } | ||
| }, cts.Token); | ||
|
|
||
| return await _tcs.Task; | ||
| } | ||
|
Comment on lines
+40
to
+114
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion | 🟠 Major
The PR description calls out "cleaner error handling", but the delegate remains Migrating to ♻️ Suggested signature change- public async Task<Response<object>> StartAndExecute(List<Action<IAdapterOperations>> actions,
- CancellationToken ct = default)
+ public async Task<Response<object>> StartAndExecute(
+ IReadOnlyList<Func<IAdapterOperations, Task>> actions,
+ CancellationToken ct = default)
@@
- var action = queue.Dequeue();
- Debug.Log($"[MWA] Invoking action {action.Method.Name}");
- action.Invoke(_client);
-
- lastResponse = await _responseTcs.Task;
+ var action = queue.Dequeue();
+ Debug.Log($"[MWA] Invoking action {action.Method.Name}");
+ await action(_client).ConfigureAwait(false);
+
+ lastResponse = await _responseTcs.Task;Callers in Based on learnings: prior PR 🤖 Prompt for AI Agents |
||
|
|
||
| private static int RandomPort() | ||
| { | ||
| return Random.Range(WebSocketsTransportContract.WebsocketsLocalPortMin, | ||
| WebSocketsTransportContract.WebsocketsLocalPortMax + 1); | ||
| } | ||
|
|
||
| private static IWebSocket CreateWebSocket(int port) | ||
| { | ||
| var webSocketUri = WebSocketsTransportContract.WebsocketsLocalScheme + "://" + | ||
| WebSocketsTransportContract.WebsocketsLocalHost + ":" + port + | ||
| WebSocketsTransportContract.WebsocketsLocalPath; | ||
|
|
||
| Debug.Log($"[MWA] Websocket created with URI {webSocketUri}"); | ||
| return WebSocket.Create(webSocketUri, WebSocketsTransportContract.WebsocketsProtocol); | ||
| } | ||
|
|
||
| private void StartActivityForAssociation(string associationToken, int port) | ||
| { | ||
| var intent = LocalAssociationIntentCreator.CreateAssociationIntent(associationToken, port); | ||
| _currentActivity.Call("startActivityForResult", intent, 0); | ||
| _currentActivity.Call("runOnUiThread", new AndroidJavaRunnable(TryConnectWs)); | ||
| _startAssociationTaskCompletionSource = new TaskCompletionSource<Response<object>>(); | ||
| return _startAssociationTaskCompletionSource.Task; | ||
| Debug.Log($"[MWA] Launched intent for port {port}, token {associationToken}"); | ||
| } | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
| private async void TryConnectWs() | ||
|
|
||
| private async Task ConnectWithBackoffAsync() | ||
| { | ||
| var timeout = _clientTimeoutMs; | ||
| while (_webSocket.State != WebSocketState.Open && !_didConnect && timeout.TotalSeconds > 0) | ||
| { | ||
| await _webSocket.Connect(awaitConnection: false); | ||
| var timeDelta = TimeSpan.FromMilliseconds(500); | ||
| timeout -= timeDelta; | ||
| await Task.Delay(timeDelta); | ||
| } | ||
| if (_webSocket.State != WebSocketState.Open) | ||
| const int maxAttempts = 12; | ||
| const int delayStart = 400; | ||
| const int delayCap = 3000; | ||
|
|
||
| var attempt = 0; | ||
| var delayMs = delayStart; | ||
|
|
||
| // Short delay to give wallet time to start websocket | ||
| Debug.Log($"[MWA] Start delay"); | ||
| await Task.Delay(500, _cancellationToken); | ||
| Debug.Log($"[MWA] Delay over"); | ||
|
|
||
| do | ||
| { | ||
| Debug.Log("Error: timeout"); | ||
| } | ||
| if (_webSocket != null) | ||
| { | ||
| _webSocket.OnOpen -= OnWsOpen; | ||
| _webSocket.OnError -= OnWsError; | ||
| _webSocket.OnClose -= OnWsClose; | ||
| _webSocket.OnMessage -= OnWsMessage; | ||
| _webSocket = null; | ||
| } | ||
|
|
||
| _webSocket = CreateWebSocket(_port); | ||
| _webSocket.OnOpen += OnWsOpen; | ||
| _webSocket.OnError += OnWsError; | ||
| _webSocket.OnClose += OnWsClose; | ||
| _webSocket.OnMessage += OnWsMessage; | ||
|
|
||
| var startTime = DateTime.UtcNow; | ||
| _wsConnected = new TaskCompletionSource<bool>(); | ||
|
|
||
| attempt++; | ||
| Debug.Log($"[MWA] Connect attempt {attempt}, state: {_webSocket.State}"); | ||
| _webSocket.Connect(); | ||
|
|
||
| var success = await _wsConnected.Task; | ||
| Debug.Log($"[MWA] Connect attempt {attempt} result, state: {_webSocket.State}"); | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
|
|
||
| if (success) | ||
| return; | ||
|
|
||
| var duration = (int)(DateTime.UtcNow - startTime).TotalMilliseconds; | ||
| if (duration < delayMs) | ||
| { | ||
| await Task.Delay(delayMs - duration, _cancellationToken); | ||
| } | ||
|
|
||
| delayMs = Math.Min(delayMs * 2, delayCap); | ||
|
|
||
| } while (_webSocket.State != WebSocketState.Open && !_cancellationToken.IsCancellationRequested && | ||
| attempt < maxAttempts); | ||
|
|
||
| throw new TimeoutException("WebSocket connect timed out after max attempts"); | ||
| } | ||
|
|
||
| private async void ListenKeyExchange() | ||
| private void OnWsOpen() | ||
| { | ||
| while (!_handledEncryptedMessage) | ||
| Debug.Log("[MWA] WS Opened"); | ||
|
|
||
| if (_isConnecting) | ||
| { | ||
| var timeDelta = TimeSpan.FromMilliseconds(300); | ||
| await Task.Delay(timeDelta); | ||
| _wsConnected.TrySetResult(true); | ||
| } | ||
| } | ||
|
|
||
| private void HandleEncryptedSessionPayload(byte[] e) | ||
| private void OnWsClose(WebSocketCloseCode closeCode) | ||
| { | ||
| if (!_didConnect) | ||
| Debug.Log($"[MWA] WS Closed: {closeCode}"); | ||
| if (closeCode == WebSocketCloseCode.Normal) | ||
| return; | ||
|
|
||
| if (!_isConnecting) | ||
| { | ||
| throw new InvalidOperationException("Invalid message received; terminating session"); | ||
| _tcs?.TrySetException(new Exception($"WS closed unexpectedly: {closeCode}")); | ||
| } | ||
| else | ||
| { | ||
| _wsConnected.TrySetResult(false); | ||
| } | ||
|
|
||
| var de = _session.DecryptSessionPayload(e); | ||
| var message = System.Text.Encoding.UTF8.GetString(de); | ||
| _client.Receive(message); | ||
| var receivedResponse = JsonConvert.DeserializeObject<Response<object>>(message);; | ||
| ExecuteNextAction(receivedResponse); | ||
| } | ||
|
|
||
| private static void OnWsError(string message) | ||
| { | ||
| Debug.Log($"[MWA] WS Error: {message}"); | ||
| } | ||
|
|
||
| private void ReceivePublicKeyHandler(byte[] m) | ||
| private void OnWsMessage(byte[] bytes) | ||
| { | ||
| try | ||
| { | ||
| _session.GenerateSessionEcdhSecret(m); | ||
| var messageSender = new MobileWalletAdapterWebSocket(_webSocket, _session); | ||
| _client = new MobileWalletAdapterClient(messageSender); | ||
| _webSocket.OnMessage -= ReceivePublicKeyHandler; | ||
| _webSocket.OnMessage += HandleEncryptedSessionPayload; | ||
|
|
||
| // Executing the first action | ||
| ExecuteNextAction(); | ||
| // First message expected: raw pubkey for ECDH | ||
| if (_client == null) | ||
| { | ||
| _session.GenerateSessionEcdhSecret(bytes); | ||
| var messageSender = new MobileWalletAdapterWebSocket(_webSocket, _session); | ||
| _client = new MobileWalletAdapterClient(messageSender); | ||
|
|
||
| Debug.Log("[MWA] Key exchange complete → encrypted session ready"); | ||
| } | ||
| // All other should be encrypted messages | ||
| else | ||
| { | ||
| var decrypted = _session.DecryptSessionPayload(bytes); | ||
| var json = System.Text.Encoding.UTF8.GetString(decrypted); | ||
| _client.Receive(json); | ||
|
|
||
| Debug.Log($"[MWA] Received: {json}"); | ||
|
|
||
| var response = JsonConvert.DeserializeObject<Response<object>>(json); | ||
| _responseTcs.TrySetResult(response); | ||
| } | ||
| } | ||
| catch (Exception e) | ||
| catch (Exception ex) | ||
| { | ||
| Console.WriteLine(e); | ||
| Debug.Log($"[MWA] Message handler error: {ex}"); | ||
| _responseTcs.TrySetException(ex); | ||
| } | ||
| } | ||
|
Comment on lines
+233
to
264
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
At the point Line 223 already uses 🐛 Route failures to the active waiter- catch (Exception ex)
- {
- Debug.Log($"[MWA] Message handler error: {ex}");
- _responseTcs.TrySetException(ex);
- }
+ catch (Exception ex)
+ {
+ Debug.Log($"[MWA] Message handler error: {ex}");
+ if (_client == null)
+ {
+ // Key-exchange phase: fail the connect waiter and the outer scenario.
+ _wsConnected?.TrySetException(ex);
+ _tcs?.TrySetException(ex);
+ }
+ else
+ {
+ _responseTcs?.TrySetException(ex);
+ }
+ }Consider the same 🤖 Prompt for AI Agents |
||
|
|
||
| private void ExecuteNextAction(Response<object> response = null) | ||
| private Task WaitForKeyExchangeAsync(CancellationToken ct) | ||
| { | ||
| if (_actions.Count == 0 || response is { Failed: true }) | ||
| CloseAssociation(response); | ||
| var action = _actions.Dequeue(); | ||
| action.Invoke(_client); | ||
| return Task.Run(async () => | ||
| { | ||
| var start = DateTime.UtcNow; | ||
| while (_client == null) | ||
| { | ||
| if (ct.IsCancellationRequested || DateTime.UtcNow - start > _keyExchangeTimeout) | ||
| throw new TimeoutException("Key exchange timed out"); | ||
|
|
||
| await Task.Delay(200, ct); | ||
| } | ||
| }, ct); | ||
| } | ||
|
Comment on lines
+266
to
+279
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial Prefer a TCS signal over 200 ms polling for key exchange.
♻️ Signal-based wait- private Task WaitForKeyExchangeAsync(CancellationToken ct)
- {
- return Task.Run(async () =>
- {
- var start = DateTime.UtcNow;
- while (_client == null)
- {
- if (ct.IsCancellationRequested || DateTime.UtcNow - start > _keyExchangeTimeout)
- throw new TimeoutException("Key exchange timed out");
-
- await Task.Delay(200, ct);
- }
- }, ct);
- }
+ private readonly TaskCompletionSource<bool> _keyExchangeTcs = new();
+
+ private async Task WaitForKeyExchangeAsync(CancellationToken ct)
+ {
+ using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
+ timeoutCts.CancelAfter(_keyExchangeTimeout);
+ using (timeoutCts.Token.Register(() => _keyExchangeTcs.TrySetException(
+ new TimeoutException("Key exchange timed out"))))
+ {
+ await _keyExchangeTcs.Task;
+ }
+ }And in 🤖 Prompt for AI Agents |
||
|
|
||
| private async Task CleanupAsync() | ||
| { | ||
| if (_webSocket is { State: WebSocketState.Open }) | ||
| await _webSocket.Close(); | ||
|
|
||
| if (_webSocket != null) | ||
| { | ||
| _webSocket.OnOpen -= OnWsOpen; | ||
| _webSocket.OnMessage -= OnWsMessage; | ||
| _webSocket.OnError -= OnWsError; | ||
| _webSocket.OnClose -= OnWsClose; | ||
| _webSocket = null; | ||
| } | ||
|
|
||
| _client = null; | ||
| _disposed = true; | ||
| } | ||
|
|
||
| private async void CloseAssociation(Response<object> response) | ||
| void IDisposable.Dispose() | ||
| { | ||
| _webSocket.OnMessage -= HandleEncryptedSessionPayload; | ||
| _handledEncryptedMessage = true; | ||
| await _webSocket.Close(); | ||
| _startAssociationTaskCompletionSource.SetResult(response); | ||
| if (_disposed) return; | ||
| _ = CleanupAsync(); | ||
| } | ||
|
Comment on lines
+281
to
303
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Two viable fixes:
♻️ Option 1: IAsyncDisposable-public class LocalAssociationScenario : IDisposable
+public class LocalAssociationScenario : IAsyncDisposable
@@
- void IDisposable.Dispose()
- {
- if (_disposed) return;
- _ = CleanupAsync();
- }
+ public async ValueTask DisposeAsync()
+ {
+ if (_disposed) return;
+ _disposed = true;
+ await CleanupAsync();
+ }Then at each call site: 🤖 Prompt for AI Agents |
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall timeout isn’t enforced across the connection/action flow.
You create a linked CTS with
CancelAfter, but_cancellationTokenandThrowIfCancellationRequesteduse the originalct, so the 30s timeout won’t cancelConnectWithBackoffAsyncor the action loop when no external token is provided.✅ Minimal fix to honor the overall timeout
Also applies to: 94-97
🤖 Prompt for AI Agents