Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions socksio/socks4.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ def loads(cls, data: bytes) -> "SOCKS4Reply":
Raises:
ProtocolError: If the data does not match the spec.
"""
if isinstance(data, bytearray):
data = bytes(data)

if len(data) != 8 or data[0:1] != b"\x00":
raise ProtocolError("Malformed reply")

Expand Down
3 changes: 3 additions & 0 deletions socksio/socks5.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ def loads(cls, data: bytes) -> "SOCKS5Reply":
Raises:
ProtocolError: If the data does not match the spec.
"""
if isinstance(data, bytearray):
data = bytes(data)

if data[0:1] != b"\x05":
raise ProtocolError("Malformed reply")

Expand Down
24 changes: 13 additions & 11 deletions tests/test_socks4.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Type, Union
import pytest

from socksio import (
Expand Down Expand Up @@ -99,19 +100,20 @@ def test_socks4_connection_request(command: SOCKS4Command) -> None:


@pytest.mark.parametrize("request_reply_code", list(SOCKS4ReplyCode))
def test_socks4_receive_data(request_reply_code: bytes) -> None:
@pytest.mark.parametrize("data_type", [bytes, bytearray])
def test_socks4_receive_data(
request_reply_code: bytes, data_type: Union[Type[bytearray], Type[bytes]]
) -> None:
conn = SOCKS4Connection(user_id=b"socks")

reply = conn.receive_data(
b"".join(
[
b"\x00",
request_reply_code,
(8080).to_bytes(2, byteorder="big"),
b"\x7f\x00\x00\x01",
]
)
data = b"".join(
[
b"\x00",
request_reply_code,
(8080).to_bytes(2, byteorder="big"),
b"\x7f\x00\x00\x01",
]
)
reply = conn.receive_data(data_type(data))

assert reply == SOCKS4Reply(
reply_code=SOCKS4ReplyCode(request_reply_code), port=8080, addr="127.0.0.1"
Expand Down
9 changes: 7 additions & 2 deletions tests/test_socks5.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Type, Union
import pytest

from socksio import (
Expand Down Expand Up @@ -285,12 +286,14 @@ def test_socks5_request_ipv6(
),
],
)
@pytest.mark.parametrize("data_type", [bytes, bytearray])
def test_socks5_reply_success(
authenticated_conn: SOCKS5Connection,
atype: bytes,
addr: bytes,
expected_atype: SOCKS5AType,
expected_addr: str,
data_type: Union[Type[bytes], Type[bytearray]],
) -> None:
data = b"".join(
[
Expand All @@ -302,7 +305,7 @@ def test_socks5_reply_success(
(1080).to_bytes(2, byteorder="big"), # port
]
)
reply = authenticated_conn.receive_data(data)
reply = authenticated_conn.receive_data(data_type(data))

assert authenticated_conn.state == SOCKS5State.TUNNEL_READY
assert reply == SOCKS5Reply(
Expand Down Expand Up @@ -342,13 +345,15 @@ def test_socks5_receive_malformed_data(
),
],
)
@pytest.mark.parametrize("data_type", [bytes, bytearray])
def test_socks5_reply_error(
error_code: SOCKS5ReplyCode,
authenticated_conn: SOCKS5Connection,
atype: bytes,
addr: bytes,
expected_atype: SOCKS5AType,
expected_addr: str,
data_type: Union[Type[bytes], Type[bytearray]],
) -> None:
data = b"".join(
[
Expand All @@ -360,7 +365,7 @@ def test_socks5_reply_error(
(1080).to_bytes(2, byteorder="big"), # port
]
)
reply = authenticated_conn.receive_data(data)
reply = authenticated_conn.receive_data(data_type(data))

assert authenticated_conn.state == SOCKS5State.MUST_CLOSE
assert reply == SOCKS5Reply(
Expand Down