Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changes/unreleased/Under the Hood-20260513-085513.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
kind: Under the Hood
body: Silence asyncio DeprecationWarning in LSP connection unit tests by binding a fresh event loop and using loop.create_future() instead of bare asyncio.Future().
time: 2026-05-13T08:55:13.549958-05:00
139 changes: 78 additions & 61 deletions tests/unit/lsp/test_lsp_connection.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""Unit tests for the LSP connection module."""

import asyncio
import contextlib
import socket
import subprocess
from collections.abc import Iterator
from unittest.mock import AsyncMock, MagicMock, patch

import pytest
Expand All @@ -16,6 +18,18 @@
)


@contextlib.contextmanager
def _sync_test_event_loop() -> Iterator[asyncio.AbstractEventLoop]:
"""Bind a fresh loop so ``asyncio.Future()`` / ``create_future`` is safe in sync tests."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
yield loop
finally:
loop.close()
asyncio.set_event_loop(None)
Comment thread
DevonFulcher marked this conversation as resolved.


class TestJsonRpcMessage:
"""Test JsonRpcMessage dataclass."""

Expand Down Expand Up @@ -555,27 +569,28 @@ def test_handle_response_message(self, tmp_path):

conn = SocketLSPConnection([str(binary_path)], "/test")

# Create a pending request
future = asyncio.Future()
conn.state.pending_requests[42] = future
with _sync_test_event_loop() as loop:
# Create a pending request
future = loop.create_future()
conn.state.pending_requests[42] = future

# Handle response message
message = JsonRpcMessage(id=42, result={"success": True})
# Handle response message
message = JsonRpcMessage(id=42, result={"success": True})

with patch.object(future, "get_loop") as mock_get_loop:
mock_loop = MagicMock()
mock_get_loop.return_value = mock_loop
with patch.object(future, "get_loop") as mock_get_loop:
mock_loop = MagicMock()
mock_get_loop.return_value = mock_loop

conn._handle_incoming_message(message)
conn._handle_incoming_message(message)

# Verify future was resolved
mock_loop.call_soon_threadsafe.assert_called_once()
args = mock_loop.call_soon_threadsafe.call_args[0]
assert args[0] == future.set_result
assert args[1] == {"success": True}
# Verify future was resolved
mock_loop.call_soon_threadsafe.assert_called_once()
args = mock_loop.call_soon_threadsafe.call_args[0]
assert args[0] == future.set_result
assert args[1] == {"success": True}

# Verify request was removed from pending
assert 42 not in conn.state.pending_requests
# Verify request was removed from pending
assert 42 not in conn.state.pending_requests

def test_handle_error_response(self, tmp_path):
"""Test handling error response."""
Expand All @@ -584,25 +599,26 @@ def test_handle_error_response(self, tmp_path):

conn = SocketLSPConnection([str(binary_path)], "/test")

# Create a pending request
future = asyncio.Future()
conn.state.pending_requests[42] = future
with _sync_test_event_loop() as loop:
# Create a pending request
future = loop.create_future()
conn.state.pending_requests[42] = future

# Handle error response
message = JsonRpcMessage(
id=42, error={"code": -32601, "message": "Method not found"}
)
# Handle error response
message = JsonRpcMessage(
id=42, error={"code": -32601, "message": "Method not found"}
)

with patch.object(future, "get_loop") as mock_get_loop:
mock_loop = MagicMock()
mock_get_loop.return_value = mock_loop
with patch.object(future, "get_loop") as mock_get_loop:
mock_loop = MagicMock()
mock_get_loop.return_value = mock_loop

conn._handle_incoming_message(message)
conn._handle_incoming_message(message)

# Verify future was rejected
mock_loop.call_soon_threadsafe.assert_called_once()
args = mock_loop.call_soon_threadsafe.call_args[0]
assert args[0] == future.set_exception
# Verify future was rejected
mock_loop.call_soon_threadsafe.assert_called_once()
args = mock_loop.call_soon_threadsafe.call_args[0]
assert args[0] == future.set_exception

def test_handle_unknown_response(self, tmp_path):
"""Test handling response for unknown request ID."""
Expand Down Expand Up @@ -631,40 +647,41 @@ def test_handle_notification(self, tmp_path):

conn = SocketLSPConnection([str(binary_path)], "/test")

# Create futures waiting for compile complete event
future1 = asyncio.Future()
future2 = asyncio.Future()
conn.state.pending_notifications[LspEventName.compileComplete] = [
future1,
future2,
]

# Handle compile complete notification
message = JsonRpcMessage(
method="dbt/lspCompileComplete", params={"success": True}
)
with _sync_test_event_loop() as loop:
# Create futures waiting for compile complete event
future1 = loop.create_future()
future2 = loop.create_future()
conn.state.pending_notifications[LspEventName.compileComplete] = [
future1,
future2,
]

# Handle compile complete notification
message = JsonRpcMessage(
method="dbt/lspCompileComplete", params={"success": True}
)

with (
patch.object(future1, "get_loop") as mock_get_loop1,
patch.object(future2, "get_loop") as mock_get_loop2,
):
mock_loop1 = MagicMock()
mock_loop2 = MagicMock()
mock_get_loop1.return_value = mock_loop1
mock_get_loop2.return_value = mock_loop2
with (
patch.object(future1, "get_loop") as mock_get_loop1,
patch.object(future2, "get_loop") as mock_get_loop2,
):
mock_loop1 = MagicMock()
mock_loop2 = MagicMock()
mock_get_loop1.return_value = mock_loop1
mock_get_loop2.return_value = mock_loop2

conn._handle_incoming_message(message)
conn._handle_incoming_message(message)

# Verify futures were resolved
mock_loop1.call_soon_threadsafe.assert_called_once_with(
future1.set_result, {"success": True}
)
mock_loop2.call_soon_threadsafe.assert_called_once_with(
future2.set_result, {"success": True}
)
# Verify futures were resolved
mock_loop1.call_soon_threadsafe.assert_called_once_with(
future1.set_result, {"success": True}
)
mock_loop2.call_soon_threadsafe.assert_called_once_with(
future2.set_result, {"success": True}
)

# Verify compile state was set
assert conn.state.compiled is True
# Verify compile state was set
assert conn.state.compiled is True

def test_handle_unknown_notification(self, tmp_path):
"""Test handling unknown notification."""
Expand Down
Loading