-
Notifications
You must be signed in to change notification settings - Fork 124
Expand file tree
/
Copy pathtest_lsp_connection.py
More file actions
1093 lines (828 loc) · 36.8 KB
/
Copy pathtest_lsp_connection.py
File metadata and controls
1093 lines (828 loc) · 36.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Unit tests for the LSP connection module."""
import asyncio
import contextlib
import socket
import subprocess
from collections.abc import Iterator
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from dbt_mcp.lsp.lsp_connection import (
SocketLSPConnection,
LspConnectionState,
LspEventName,
JsonRpcMessage,
event_name_from_string,
)
@contextlib.contextmanager
def _sync_test_event_loop() -> Iterator[asyncio.AbstractEventLoop]:
"""Bind a fresh loop so ``asyncio.Future()`` / ``create_future`` is safe in sync tests."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
yield loop
finally:
loop.close()
asyncio.set_event_loop(None)
class TestJsonRpcMessage:
"""Test JsonRpcMessage dataclass."""
def test_to_dict_with_request(self):
"""Test converting a request message to dictionary."""
msg = JsonRpcMessage(id=1, method="initialize", params={"processId": None})
result = msg.to_dict()
assert result == {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {"processId": None},
}
def test_to_dict_with_response(self):
"""Test converting a response message to dictionary."""
msg = JsonRpcMessage(id=1, result={"capabilities": {}})
result = msg.to_dict()
assert result == {"jsonrpc": "2.0", "id": 1, "result": {"capabilities": {}}}
def test_to_dict_with_error(self):
"""Test converting an error message to dictionary."""
msg = JsonRpcMessage(
id=1, error={"code": -32601, "message": "Method not found"}
)
result = msg.to_dict()
assert result == {
"jsonrpc": "2.0",
"id": 1,
"error": {"code": -32601, "message": "Method not found"},
}
def test_to_dict_notification(self):
"""Test converting a notification message to dictionary."""
msg = JsonRpcMessage(
method="window/logMessage", params={"type": 3, "message": "Server started"}
)
result = msg.to_dict()
assert result == {
"jsonrpc": "2.0",
"method": "window/logMessage",
"params": {"type": 3, "message": "Server started"},
}
def test_from_dict(self):
"""Test creating message from dictionary."""
data = {
"jsonrpc": "2.0",
"id": 42,
"method": "textDocument/completion",
"params": {"textDocument": {"uri": "file:///test.sql"}},
}
msg = JsonRpcMessage(**data)
assert msg.jsonrpc == "2.0"
assert msg.id == 42
assert msg.method == "textDocument/completion"
assert msg.params == {"textDocument": {"uri": "file:///test.sql"}}
class TestLspEventName:
"""Test LspEventName enum and helpers."""
def test_event_name_from_string_valid(self):
"""Test converting valid string to event name."""
assert (
event_name_from_string("dbt/lspCompileComplete")
== LspEventName.compileComplete
)
assert event_name_from_string("window/logMessage") == LspEventName.logMessage
assert event_name_from_string("$/progress") == LspEventName.progress
def test_event_name_from_string_invalid(self):
"""Test converting invalid string returns None."""
assert event_name_from_string("invalid/event") is None
assert event_name_from_string("") is None
class TestLspConnectionState:
"""Test LspConnectionState dataclass."""
def test_initial_state(self):
"""Test initial state values."""
state = LspConnectionState()
assert state.initialized is False
assert state.shutting_down is False
assert state.capabilities is not None
assert len(state.capabilities) == 0
assert state.pending_requests == {}
assert state.pending_notifications == {}
assert state.compiled is False
def test_get_next_request_id(self):
"""Test request ID generation."""
state = LspConnectionState()
# Should start at 20 to avoid collisions
id1 = state.get_next_request_id()
id2 = state.get_next_request_id()
id3 = state.get_next_request_id()
assert id1 == 20
assert id2 == 21
assert id3 == 22
class TestLSPConnectionInitialization:
"""Test LSP connection initialization and validation."""
def test_init_valid_binary(self, tmp_path):
"""Test initialization with valid binary path."""
# Create a dummy binary file
binary_path = tmp_path / "lsp-server"
binary_path.touch()
conn = SocketLSPConnection(
cmd=[str(binary_path)],
cwd="/test/dir",
args=["--arg1", "--arg2"],
connection_timeout=15,
default_request_timeout=60,
)
assert conn.cmd == [str(binary_path)]
assert conn.cwd == "/test/dir"
assert conn.args == ["--arg1", "--arg2"]
assert conn.host == "127.0.0.1"
assert conn.port == 0
assert conn.connection_timeout == 15
assert conn.default_request_timeout == 60
assert conn.process is None
assert isinstance(conn.state, LspConnectionState)
class TestSocketSetup:
"""Test socket setup and lifecycle."""
def test_setup_socket_success(self, tmp_path):
"""Test successful socket setup."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
with patch("socket.socket") as mock_socket_class:
mock_socket = MagicMock()
mock_socket.getsockname.return_value = ("127.0.0.1", 54321)
mock_socket_class.return_value = mock_socket
conn.setup_socket()
# Verify socket setup
mock_socket_class.assert_called_once_with(
socket.AF_INET, socket.SOCK_STREAM
)
mock_socket.setsockopt.assert_called_once_with(
socket.SOL_SOCKET, socket.SO_REUSEADDR, 1
)
mock_socket.bind.assert_called_once_with(("127.0.0.1", 0))
mock_socket.listen.assert_called_once_with(1)
assert conn.port == 54321
assert conn._socket == mock_socket
class TestProcessLaunching:
"""Test LSP process launching and termination."""
@pytest.mark.asyncio
async def test_launch_lsp_process_success(self, tmp_path):
"""Test successful process launch."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test/dir")
conn.port = 12345
with patch("asyncio.create_subprocess_exec") as mock_create_subprocess:
mock_process = MagicMock()
mock_process.pid = 9999
mock_create_subprocess.return_value = mock_process
await conn.launch_lsp_process()
# Verify process was started with correct arguments
mock_create_subprocess.assert_called_once_with(
str(binary_path), "--socket", "12345", "--project-dir", "/test/dir"
) # cmd=[str(binary_path)] is spread, so Fusion would add "lsp" here
assert conn.process == mock_process
class TestStartStop:
"""Test start/stop lifecycle."""
@pytest.mark.asyncio
async def test_start_success(self, tmp_path):
"""Test successful server start."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
# Mock socket setup
mock_socket = MagicMock()
mock_connection = MagicMock()
mock_socket.getsockname.return_value = ("127.0.0.1", 54321)
# Mock process
mock_process = MagicMock()
mock_process.pid = 9999
with (
patch("socket.socket", return_value=mock_socket),
patch("asyncio.create_subprocess_exec", return_value=mock_process),
patch.object(conn, "_read_loop", new_callable=AsyncMock),
patch.object(conn, "_write_loop", new_callable=AsyncMock),
):
# Mock socket accept
async def mock_accept_wrapper():
return mock_connection, ("127.0.0.1", 12345)
with patch("asyncio.get_running_loop") as mock_loop:
mock_loop.return_value.run_in_executor.return_value = (
mock_accept_wrapper()
)
mock_loop.return_value.create_task.side_effect = (
lambda coro: asyncio.create_task(coro)
)
await conn.start()
assert conn.process == mock_process
assert conn._connection == mock_connection
assert conn._reader_task is not None
assert conn._writer_task is not None
@pytest.mark.asyncio
async def test_start_already_running(self, tmp_path):
"""Test starting when already running."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
conn.process = MagicMock() # Simulate already running
with (
patch("socket.socket"),
patch("asyncio.create_subprocess_exec") as mock_create_subprocess,
):
await conn.start()
# Should not create a new process
mock_create_subprocess.assert_not_called()
@pytest.mark.asyncio
async def test_start_timeout(self, tmp_path):
"""Test start timeout when server doesn't connect."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test", connection_timeout=0.1)
mock_socket = MagicMock()
mock_socket.getsockname.return_value = ("127.0.0.1", 54321)
mock_process = MagicMock()
with (
patch("socket.socket", return_value=mock_socket),
patch("asyncio.create_subprocess_exec", return_value=mock_process),
):
# Simulate timeout in socket.accept
mock_socket.accept.side_effect = TimeoutError
with patch("asyncio.get_running_loop") as mock_loop:
mock_loop.return_value.run_in_executor.side_effect = TimeoutError
with pytest.raises(
RuntimeError, match="Timeout waiting for LSP server to connect"
):
await conn.start()
@pytest.mark.asyncio
async def test_stop_complete_cleanup(self, tmp_path):
"""Test complete cleanup on stop."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
# Setup mocks for running state
conn.process = MagicMock()
conn.process.terminate = MagicMock()
conn.process.wait = AsyncMock()
conn.process.kill = MagicMock()
conn._socket = MagicMock()
conn._connection = MagicMock()
# Create mock tasks with proper async behavior
async def mock_task():
pass
conn._reader_task = asyncio.create_task(mock_task())
conn._writer_task = asyncio.create_task(mock_task())
# Let tasks complete
await asyncio.sleep(0.01)
# Store references before they are set to None
mock_connection = conn._connection
mock_socket = conn._socket
mock_process = conn.process
with patch.object(conn, "_send_shutdown_request") as mock_shutdown:
await conn.stop()
# Verify cleanup methods were called
mock_shutdown.assert_called_once()
mock_connection.close.assert_called_once()
mock_socket.close.assert_called_once()
mock_process.terminate.assert_called_once()
# Verify everything was set to None
assert conn.process is None
assert conn._socket is None
assert conn._connection is None
@pytest.mark.asyncio
async def test_stop_force_kill(self, tmp_path):
"""Test force kill when process doesn't terminate."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
# Setup mock process that doesn't terminate
mock_process = MagicMock()
mock_process.terminate = MagicMock()
mock_process.wait = AsyncMock(
side_effect=[subprocess.TimeoutExpired("cmd", 1), None]
)
mock_process.kill = MagicMock()
conn.process = mock_process
await conn.stop()
# Verify force kill was called
mock_process.terminate.assert_called_once()
mock_process.kill.assert_called_once()
class TestInitializeMethod:
"""Test LSP initialization handshake."""
@pytest.mark.asyncio
async def test_initialize_success(self, tmp_path):
"""Test successful initialization."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
conn.process = MagicMock() # Simulate running
# Mock send_request to return capabilities
mock_result = {
"capabilities": {
"textDocumentSync": 2,
"completionProvider": {"triggerCharacters": [".", ":"]},
}
}
with (
patch.object(
conn, "send_request", new_callable=AsyncMock
) as mock_send_request,
patch.object(conn, "send_notification") as mock_send_notification,
):
mock_send_request.return_value = mock_result
await conn.initialize(timeout=5)
# Verify initialize request was sent
mock_send_request.assert_called_once()
call_args = mock_send_request.call_args
assert call_args[0][0] == "initialize"
assert call_args[1]["timeout"] == 5
params = call_args[0][1]
assert params["rootUri"] is None # currently not using cwd
assert params["clientInfo"]["name"] == "dbt-mcp"
# Verify initialized notification was sent
mock_send_notification.assert_called_once_with("initialized", {})
# Verify state was updated
assert conn.state.initialized is True
assert conn.state.capabilities == mock_result["capabilities"]
@pytest.mark.asyncio
async def test_initialize_already_initialized(self, tmp_path):
"""Test initialization when already initialized."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
conn.process = MagicMock()
conn.state.initialized = True
with pytest.raises(RuntimeError, match="LSP server is already initialized"):
await conn.initialize()
class TestMessageParsing:
"""Test JSON-RPC message parsing."""
def test_parse_message_complete(self, tmp_path):
"""Test parsing a complete message."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
# Create a valid LSP message
content = '{"jsonrpc":"2.0","id":1,"result":{"test":true}}'
header = f"Content-Length: {len(content)}\r\n\r\n"
buffer = (header + content).encode("utf-8")
message, remaining = conn._parse_message(buffer)
assert message is not None
assert message.id == 1
assert message.result == {"test": True}
assert remaining == b""
def test_parse_message_incomplete_header(self, tmp_path):
"""Test parsing with incomplete header."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
buffer = b"Content-Length: 50\r\n" # Missing \r\n\r\n
message, remaining = conn._parse_message(buffer)
assert message is None
assert remaining == buffer
def test_parse_message_incomplete_content(self, tmp_path):
"""Test parsing with incomplete content."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
content = '{"jsonrpc":"2.0","id":1,"result":{"test":true}}'
header = f"Content-Length: {len(content)}\r\n\r\n"
# Only include part of the content
buffer = (header + content[:10]).encode("utf-8")
message, remaining = conn._parse_message(buffer)
assert message is None
assert remaining == buffer
def test_parse_message_invalid_json(self, tmp_path):
"""Test parsing with invalid JSON content."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
content = '{"invalid json'
header = f"Content-Length: {len(content)}\r\n\r\n"
buffer = (header + content).encode("utf-8")
message, remaining = conn._parse_message(buffer)
assert message is None
assert remaining == b"" # Invalid message is discarded
def test_parse_message_missing_content_length(self, tmp_path):
"""Test parsing with missing Content-Length header."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
buffer = b'Some-Header: value\r\n\r\n{"test":true}'
message, remaining = conn._parse_message(buffer)
assert message is None
assert remaining == b'{"test":true}' # Header consumed, content remains
def test_parse_message_multiple_messages(self, tmp_path):
"""Test parsing multiple messages from buffer."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
# Create two messages
content1 = '{"jsonrpc":"2.0","id":1,"result":true}'
content2 = '{"jsonrpc":"2.0","id":2,"result":false}'
header1 = f"Content-Length: {len(content1)}\r\n\r\n"
header2 = f"Content-Length: {len(content2)}\r\n\r\n"
buffer = (header1 + content1 + header2 + content2).encode("utf-8")
# Parse first message
message1, remaining1 = conn._parse_message(buffer)
assert message1 is not None
assert message1.id == 1
assert message1.result is True
# Parse second message
message2, remaining2 = conn._parse_message(remaining1)
assert message2 is not None
assert message2.id == 2
assert message2.result is False
assert remaining2 == b""
class TestMessageHandling:
"""Test incoming message handling."""
def test_handle_response_message(self, tmp_path):
"""Test handling response to a request."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
with _sync_test_event_loop() as loop:
# Create a pending request
future = loop.create_future()
conn.state.pending_requests[42] = future
# Handle response message
message = JsonRpcMessage(id=42, result={"success": True})
with patch.object(future, "get_loop") as mock_get_loop:
mock_loop = MagicMock()
mock_get_loop.return_value = mock_loop
conn._handle_incoming_message(message)
# Verify future was resolved
mock_loop.call_soon_threadsafe.assert_called_once()
args = mock_loop.call_soon_threadsafe.call_args[0]
assert args[0] == future.set_result
assert args[1] == {"success": True}
# Verify request was removed from pending
assert 42 not in conn.state.pending_requests
def test_handle_error_response(self, tmp_path):
"""Test handling error response."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
with _sync_test_event_loop() as loop:
# Create a pending request
future = loop.create_future()
conn.state.pending_requests[42] = future
# Handle error response
message = JsonRpcMessage(
id=42, error={"code": -32601, "message": "Method not found"}
)
with patch.object(future, "get_loop") as mock_get_loop:
mock_loop = MagicMock()
mock_get_loop.return_value = mock_loop
conn._handle_incoming_message(message)
# Verify future was rejected
mock_loop.call_soon_threadsafe.assert_called_once()
args = mock_loop.call_soon_threadsafe.call_args[0]
assert args[0] == future.set_exception
def test_handle_unknown_response(self, tmp_path):
"""Test handling response for unknown request ID."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
# Handle response with unknown ID
message = JsonRpcMessage(id=999, result={"test": True})
with patch.object(conn, "_send_message") as mock_send:
conn._handle_incoming_message(message)
# Should send empty response back
mock_send.assert_called_once()
sent_msg = mock_send.call_args[0][0]
assert isinstance(sent_msg, JsonRpcMessage)
assert sent_msg.id == 999
assert sent_msg.result is None
def test_handle_notification(self, tmp_path):
"""Test handling notification messages."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
with _sync_test_event_loop() as loop:
# Create futures waiting for compile complete event
future1 = loop.create_future()
future2 = loop.create_future()
conn.state.pending_notifications[LspEventName.compileComplete] = [
future1,
future2,
]
# Handle compile complete notification
message = JsonRpcMessage(
method="dbt/lspCompileComplete", params={"success": True}
)
with (
patch.object(future1, "get_loop") as mock_get_loop1,
patch.object(future2, "get_loop") as mock_get_loop2,
):
mock_loop1 = MagicMock()
mock_loop2 = MagicMock()
mock_get_loop1.return_value = mock_loop1
mock_get_loop2.return_value = mock_loop2
conn._handle_incoming_message(message)
# Verify futures were resolved
mock_loop1.call_soon_threadsafe.assert_called_once_with(
future1.set_result, {"success": True}
)
mock_loop2.call_soon_threadsafe.assert_called_once_with(
future2.set_result, {"success": True}
)
# Verify compile state was set
assert conn.state.compiled is True
def test_handle_unknown_notification(self, tmp_path):
"""Test handling unknown notification."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
# Handle unknown notification
message = JsonRpcMessage(method="unknown/notification", params={"data": "test"})
# Should not raise, just log
conn._handle_incoming_message(message)
class TestSendRequest:
"""Test sending requests to LSP server."""
@pytest.mark.asyncio
async def test_send_request_success(self, tmp_path):
"""Test successful request sending."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
conn.process = MagicMock() # Simulate running
with (
patch.object(conn, "_send_message") as mock_send,
patch("asyncio.wait_for", new_callable=AsyncMock) as mock_wait_for,
):
mock_wait_for.return_value = {"result": "success"}
result = await conn.send_request(
"testMethod", {"param": "value"}, timeout=5
)
# Verify message was sent
mock_send.assert_called_once()
sent_msg = mock_send.call_args[0][0]
assert isinstance(sent_msg, JsonRpcMessage)
assert sent_msg.method == "testMethod"
assert sent_msg.params == {"param": "value"}
assert sent_msg.id is not None
# Verify result
assert result == {"result": "success"}
@pytest.mark.asyncio
async def test_send_request_not_running(self, tmp_path):
"""Test sending request when server not running."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
# process is None - not running
with pytest.raises(RuntimeError, match="LSP server is not running"):
await conn.send_request("testMethod")
@pytest.mark.asyncio
async def test_send_request_timeout(self, tmp_path):
"""Test request timeout."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection(str(binary_path), "/test", default_request_timeout=1)
conn.process = MagicMock()
with patch.object(conn, "_send_message"):
# Create a future that never resolves
future = asyncio.Future()
conn.state.pending_requests[20] = future
# Use real wait_for to test timeout
result = await conn.send_request("testMethod", timeout=0.01)
assert "error" in result
class TestSendNotification:
"""Test sending notifications to LSP server."""
def test_send_notification_success(self, tmp_path):
"""Test successful notification sending."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
conn.process = MagicMock()
with patch.object(conn, "_send_message") as mock_send:
conn.send_notification(
"window/showMessage", {"type": 3, "message": "Hello"}
)
# Verify message was sent
mock_send.assert_called_once()
sent_msg = mock_send.call_args[0][0]
assert isinstance(sent_msg, JsonRpcMessage)
assert sent_msg.method == "window/showMessage"
assert sent_msg.params == {"type": 3, "message": "Hello"}
assert sent_msg.id is None # Notifications have no ID
def test_send_notification_not_running(self, tmp_path):
"""Test sending notification when server not running."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
# process is None - not running
with pytest.raises(RuntimeError, match="LSP server is not running"):
conn.send_notification("testMethod")
class TestWaitForNotification:
"""Test waiting for notifications."""
def test_wait_for_notification(self, tmp_path):
"""Test registering to wait for a notification."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
with patch("asyncio.get_running_loop") as mock_get_loop:
mock_loop = MagicMock()
mock_future = MagicMock()
mock_loop.create_future.return_value = mock_future
mock_get_loop.return_value = mock_loop
result = conn.wait_for_notification(LspEventName.compileComplete)
# Verify future was created and registered
assert result == mock_future
assert LspEventName.compileComplete in conn.state.pending_notifications
assert (
mock_future
in conn.state.pending_notifications[LspEventName.compileComplete]
)
class TestSendMessage:
"""Test low-level message sending."""
def test_send_message_with_jsonrpc_message(self, tmp_path):
"""Test sending JsonRpcMessage."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
conn._outgoing_queue = MagicMock()
message = JsonRpcMessage(id=1, method="test", params={"key": "value"})
conn._send_message(message)
# Verify message was queued
conn._outgoing_queue.put_nowait.assert_called_once()
data = conn._outgoing_queue.put_nowait.call_args[0][0]
# Parse the data to verify format
assert b"Content-Length:" in data
assert b"\r\n\r\n" in data
# JSON might have spaces after colons, check for both variants
assert b'"jsonrpc"' in data and b'"2.0"' in data
assert b'"method"' in data and b'"test"' in data
class TestShutdown:
"""Test shutdown sequence."""
def test_send_shutdown_request(self, tmp_path):
"""Test sending shutdown and exit messages."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
with patch.object(conn, "_send_message") as mock_send:
conn._send_shutdown_request()
# Verify two messages were sent
assert mock_send.call_count == 2
# First should be shutdown request
shutdown_msg = mock_send.call_args_list[0][0][0]
assert isinstance(shutdown_msg, JsonRpcMessage)
assert shutdown_msg.method == "shutdown"
assert shutdown_msg.id is not None
# Second should be exit notification
exit_msg = mock_send.call_args_list[1][0][0]
assert isinstance(exit_msg, JsonRpcMessage)
assert exit_msg.method == "exit"
assert exit_msg.id is None
class TestIsRunning:
"""Test is_running method."""
def test_is_running_true(self, tmp_path):
"""Test when process is running."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
conn.process = MagicMock()
conn.process.returncode = None
assert conn.is_running() is True
def test_is_running_false_no_process(self, tmp_path):
"""Test when no process."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
assert conn.is_running() is False
def test_is_running_false_process_exited(self, tmp_path):
"""Test when process has exited."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
conn.process = MagicMock()
conn.process.returncode = 0
assert conn.is_running() is False
class TestReadWriteLoops:
"""Test async I/O loops."""
@pytest.mark.asyncio
async def test_read_loop_processes_messages(self, tmp_path):
"""Test read loop processes incoming messages."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
# Setup mock connection
mock_connection = MagicMock()
conn._connection = mock_connection
# Create test data
content = '{"jsonrpc":"2.0","id":1,"result":true}'
header = f"Content-Length: {len(content)}\r\n\r\n"
test_data = (header + content).encode("utf-8")
# Mock recv to return data once then empty
recv_calls = [test_data, b""]
async def mock_recv_wrapper(size):
if recv_calls:
return recv_calls.pop(0)
return b""
with (
patch("asyncio.get_running_loop") as mock_get_loop,
patch.object(conn, "_handle_incoming_message") as mock_handle,
):
mock_loop = MagicMock()
mock_get_loop.return_value = mock_loop
mock_loop.run_in_executor.side_effect = (
lambda _, func, *args: mock_recv_wrapper(*args)
)
# Run read loop (will exit when recv returns empty)
await conn._read_loop()
# Verify message was handled
mock_handle.assert_called_once()
handled_msg = mock_handle.call_args[0][0]
assert handled_msg.id == 1
assert handled_msg.result is True
@pytest.mark.asyncio
async def test_write_loop_sends_messages(self, tmp_path):
"""Test write loop sends queued messages."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = SocketLSPConnection([str(binary_path)], "/test")
# Setup mock connection
mock_connection = MagicMock()
conn._connection = mock_connection
# Queue test data
test_data = b"test message data"
conn._outgoing_queue.put_nowait(test_data)
# Set stop event after first iteration
async def stop_after_one():
await asyncio.sleep(0.01)
conn._stop_event.set()
with patch("asyncio.get_running_loop") as mock_get_loop:
mock_loop = MagicMock()
mock_get_loop.return_value = mock_loop
mock_loop.run_in_executor.return_value = asyncio.sleep(0)
# Run both coroutines
await asyncio.gather(
conn._write_loop(), stop_after_one(), return_exceptions=True
)
# Verify data was sent
mock_loop.run_in_executor.assert_called()
call_args = mock_loop.run_in_executor.call_args_list[-1]
assert call_args[0][1] == mock_connection.sendall