Skip to content

Commit 610f11d

Browse files
committed
[Tunnel] Add octobot_tunnel package — Python Piko client (TDD)
Implements a full Python equivalent of the Go github.com/andydunstall/piko/client SDK: - yamux/: Pure-Python yamux multiplexer protocol (frame encode/decode, session, stream with flow-control window management) - client/: Upstream (listen on endpoint), Listener (accept multiplexed streams, auto-reconnect with exponential backoff), Dialer (outbound TCP connections), Forwarder (bidirectional bridge to local address), WebSocket transport wrapper (aiohttp), Logger protocol, Backoff, typed exceptions 138 tests covering every behaviour: yamux framing, session lifecycle, ping/pong, GoAway, window updates, backoff jitter, URL construction (http→ws, https→wss), auth headers, retryable HTTP status codes, TLS/proxy forwarding, reconnect on session dropout, close vs shutdown semantics, bidirectional data copy. Wired into pants.toml source roots and root BUILD PACKAGE_SOURCES/PACKAGE_REQS. https://claude.ai/code/session_01QjvZSQCamcHU9NR5MxMzN1
1 parent 07bb389 commit 610f11d

35 files changed

Lines changed: 2817 additions & 0 deletions

BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ PACKAGE_SOURCES = [
4444
"packages/tentacles_manager:octobot_tentacles_manager",
4545
"packages/trading:octobot_trading",
4646
"packages/trading_backend:trading_backend",
47+
"packages/tunnel:octobot_tunnel",
4748
]
4849

4950
PACKAGE_REQS = [
@@ -55,6 +56,7 @@ PACKAGE_REQS = [
5556
"packages/tentacles_manager:reqs",
5657
"packages/trading:reqs",
5758
"packages/trading_backend:reqs",
59+
"packages/tunnel:reqs",
5860
]
5961

6062
PACKAGE_FULL_REQS = [

packages/tunnel/BUILD

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
python_requirements(name="reqs")
2+
3+
python_sources(name="octobot_tunnel", sources=["octobot_tunnel/**/*.py"])
4+
5+
python_tests(
6+
name="tests",
7+
sources=["tests/**/test_*.py"],
8+
dependencies=[
9+
":octobot_tunnel",
10+
":reqs",
11+
"//:dev_reqs",
12+
],
13+
)

packages/tunnel/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Changelog
2+
3+
## [0.0.1]
4+
- Initial release: Python Piko client replicating the Go SDK

packages/tunnel/MANIFEST.in

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
include requirements.txt
2+
recursive-include octobot_tunnel *.py

packages/tunnel/README.md

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
# OctoBot Tunnel
2+
[![Telegram](https://img.shields.io/badge/Telegram-grey.svg?logo=telegram)](https://t.me/OctoBot_Project)
3+
[![Twitter](https://img.shields.io/twitter/follow/DrakkarsOctobot.svg?label=twitter&style=social)](https://x.com/DrakkarsOctoBot)
4+
5+
Python client for [Piko](https://github.com/andydunstall/piko) — an open-source reverse proxy (Ngrok alternative) that lets services behind firewalls register outbound-only tunnels.
6+
7+
This package is a full Python reimplementation of the official Go SDK (`github.com/andydunstall/piko/client`), using `asyncio` and `aiohttp`. It reproduces every feature of the Go client with identical behaviour.
8+
9+
This project is related to [OctoBot](https://github.com/Drakkar-Software/OctoBot).
10+
11+
---
12+
13+
## How Piko works
14+
15+
Upstream services open an outbound WebSocket connection to the Piko server and declare which **endpoint** they are listening on. The Piko server multiplexes incoming external connections back through that tunnel using [yamux](https://github.com/hashicorp/yamux). Services never need a public IP or open inbound port.
16+
17+
```
18+
External client ──► Piko server (port 8000) ──► upstream tunnel ──► your service
19+
Piko server (port 8001) ◄── WebSocket (outbound-only)
20+
```
21+
22+
---
23+
24+
## Installation
25+
26+
```bash
27+
pip install octobot-tunnel
28+
```
29+
30+
Or add to your project's `requirements.txt`:
31+
32+
```
33+
aiohttp>=3.9.5
34+
```
35+
36+
---
37+
38+
## Usage
39+
40+
### Listen on an endpoint (Upstream)
41+
42+
Register a service as a Piko upstream and accept incoming connections as an `asyncio`-native stream:
43+
44+
```python
45+
import asyncio
46+
import octobot_tunnel
47+
48+
async def main():
49+
upstream = octobot_tunnel.Upstream(
50+
url="http://localhost:8001",
51+
token="my-api-token", # optional JWT token
52+
)
53+
listener = await upstream.listen("my-endpoint")
54+
print("Listening on endpoint:", listener.endpoint_id())
55+
56+
while True:
57+
conn = await listener.accept()
58+
asyncio.ensure_future(handle(conn))
59+
60+
async def handle(conn):
61+
data = await conn.read(4096)
62+
await conn.write(b"HTTP/1.1 200 OK\r\n\r\nHello from Piko!")
63+
await conn.close()
64+
65+
asyncio.run(main())
66+
```
67+
68+
### Forward to a local service (ListenAndForward)
69+
70+
Transparently bridge all incoming Piko connections to a locally-running service:
71+
72+
```python
73+
import asyncio
74+
import octobot_tunnel
75+
76+
async def main():
77+
upstream = octobot_tunnel.Upstream(
78+
url="http://localhost:8001",
79+
token="my-api-token",
80+
)
81+
# All connections to "my-endpoint" are forwarded to localhost:3000
82+
forwarder = await upstream.listen_and_forward("my-endpoint", "localhost:3000")
83+
await forwarder.wait()
84+
85+
asyncio.run(main())
86+
```
87+
88+
### Dial an endpoint (Dialer)
89+
90+
Open an outbound TCP connection to any registered Piko endpoint:
91+
92+
```python
93+
import asyncio
94+
import octobot_tunnel
95+
96+
async def main():
97+
dialer = octobot_tunnel.Dialer(
98+
url="http://localhost:8000",
99+
token="my-api-token",
100+
)
101+
conn = await dialer.dial("my-endpoint")
102+
await conn.write(b"hello")
103+
response = await conn.read(4096)
104+
print("Response:", response)
105+
await conn.close()
106+
107+
asyncio.run(main())
108+
```
109+
110+
---
111+
112+
## API Reference
113+
114+
### `Upstream`
115+
116+
Manages listening on Piko upstream endpoints.
117+
118+
| Parameter | Type | Default | Description |
119+
|---|---|---|---|
120+
| `url` | `str \| None` | `http://localhost:8001` | Piko upstream port URL |
121+
| `token` | `str \| None` | `None` | JWT bearer token for authentication |
122+
| `tenant_id` | `str \| None` | `None` | Tenant identifier (experimental) |
123+
| `tls_context` | `ssl.SSLContext \| None` | `None` | TLS configuration |
124+
| `proxy_url` | `str \| None` | `None` | HTTP proxy URL |
125+
| `min_reconnect_backoff` | `float` | `0.1` | Minimum reconnect delay in seconds |
126+
| `max_reconnect_backoff` | `float` | `15.0` | Maximum reconnect delay in seconds |
127+
| `logger` | `Logger \| None` | `None` | Optional structured logger |
128+
129+
**Methods:**
130+
- `async listen(endpoint_id: str) → Listener` — connect and return a listener
131+
- `async listen_and_forward(endpoint_id: str, addr: str) → Forwarder` — connect and start forwarding in the background
132+
133+
---
134+
135+
### `Listener`
136+
137+
Accepts incoming connections for a Piko endpoint.
138+
139+
**Methods:**
140+
- `async accept() → Stream` — wait for the next incoming connection
141+
- `async accept_with_context(stop_event: asyncio.Event) → Stream` — like `accept()`, stops when the event is set
142+
- `addr() → PikoAddr` — returns `PikoAddr(network="tcp", address=endpoint_id)`
143+
- `endpoint_id() → str` — the registered endpoint name
144+
- `async close()` — send GoAway (stop accepting new connections, keep established streams open)
145+
- `async shutdown()` — close the underlying yamux session (terminates all streams)
146+
147+
On transient disconnections, `accept()` automatically reconnects using the `Upstream`'s backoff configuration, identical to the Go client.
148+
149+
---
150+
151+
### `Dialer`
152+
153+
Opens TCP connections to Piko endpoints via the proxy port.
154+
155+
| Parameter | Type | Default | Description |
156+
|---|---|---|---|
157+
| `url` | `str \| None` | `http://localhost:8000` | Piko proxy port URL |
158+
| `token` | `str \| None` | `None` | JWT bearer token |
159+
| `tls_context` | `ssl.SSLContext \| None` | `None` | TLS configuration |
160+
161+
**Methods:**
162+
- `async dial(endpoint_id: str) → Stream` — open a connection to the endpoint
163+
164+
---
165+
166+
### `Forwarder`
167+
168+
Created by `Upstream.listen_and_forward()`. Runs in the background.
169+
170+
**Methods:**
171+
- `async wait()` — block until the forwarder stops
172+
- `async close()` — stop accepting new connections and cancel active forwards
173+
174+
---
175+
176+
### `Stream`
177+
178+
Returned by `Listener.accept()` and `Dialer.dial()`.
179+
180+
**Methods:**
181+
- `async read(n: int = -1) → bytes`
182+
- `async write(data: bytes) → None`
183+
- `async close() → None`
184+
185+
---
186+
187+
### Exceptions
188+
189+
| Exception | Description |
190+
|---|---|
191+
| `ConnectionClosed` | Operation on a closed connection or listener |
192+
| `RetryableError` | Transient server error (HTTP 408/429/500/502/503/504) |
193+
| `PikoError` | Base class for all tunnel exceptions |
194+
195+
---
196+
197+
## Protocol details
198+
199+
- **Transport:** WebSocket (via `aiohttp`)
200+
- Upstream connects to `ws(s)://host:8001/piko/v1/upstream/{endpoint_id}`
201+
- Dialer connects to `ws(s)://host:8000/_piko/v1/tcp/{endpoint_id}`
202+
- **Multiplexing:** [yamux](https://github.com/hashicorp/yamux) client session over the WebSocket stream
203+
- **Authentication:** `Authorization: Bearer <token>` header; optional `x-piko-tenant-id` header
204+
- **Reconnection:** exponential backoff with 10% jitter (default: 100 ms → 15 s), retrying on transient HTTP errors
205+
206+
---
207+
208+
## Development
209+
210+
```bash
211+
pip install -r requirements.txt
212+
pip install pytest pytest-asyncio
213+
214+
# Run all 138 tests
215+
pytest tests/
216+
```
217+
218+
### Test coverage
219+
220+
| Module | Tests |
221+
|---|---|
222+
| `yamux/frame.py` | 31 — encode/decode all frame types, flags, round-trips |
223+
| `yamux/stream.py` | 15 — read/write, flow-control window, FIN/RST |
224+
| `yamux/session.py` | 16 — accept, data dispatch, ping/pong, GoAway, concurrency |
225+
| `client/backoff.py` | 7 — jitter range, doubling, cap, infinite/limited retries |
226+
| `client/websocket.py` | 14 — auth headers, binary buffering, retryable status codes, TLS/proxy |
227+
| `client/upstream.py` | 13 — URL construction, retry logic, config forwarding |
228+
| `client/listener.py` | 14 — accept, reconnect, close vs shutdown, addr |
229+
| `client/dialer.py` | 13 — URL construction, dial, config forwarding |
230+
| `client/forwarder.py` | 11 — bidirectional copy, wait/close, connection refused |
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# This file is part of OctoBot Tunnel (https://github.com/Drakkar-Software/OctoBot-Tunnel)
2+
# Copyright (c) 2025 Drakkar-Software, All rights reserved.
3+
#
4+
# OctoBot is free software; you can redistribute it and/or
5+
# modify it under the terms of the GNU General Public License
6+
# as published by the Free Software Foundation; either
7+
# version 3.0 of the License, or (at your option) any later version.
8+
#
9+
# OctoBot is distributed in the hope that it will be useful,
10+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12+
# General Public License for more details.
13+
#
14+
# You should have received a copy of the GNU General Public
15+
# License along with OctoBot. If not, see <https://www.gnu.org/licenses/>.
16+
17+
PROJECT_NAME = "OctoBot-Tunnel"
18+
AUTHOR = "Drakkar-Software"
19+
VERSION = "0.0.1" # major.minor.revision
20+
21+
from octobot_tunnel.client import (
22+
Upstream,
23+
Listener,
24+
Dialer,
25+
Forwarder,
26+
Stream,
27+
PikoAddr,
28+
ConnectionClosed,
29+
RetryableError,
30+
Logger,
31+
)
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from octobot_tunnel.client.exceptions import (
2+
PikoError,
3+
ConnectionClosed,
4+
RetryableError,
5+
)
6+
from octobot_tunnel.client.logger import Logger
7+
from octobot_tunnel.client.backoff import Backoff
8+
from octobot_tunnel.client.dialer import Dialer, Stream
9+
from octobot_tunnel.client.listener import Listener, PikoAddr
10+
from octobot_tunnel.client.upstream import Upstream
11+
from octobot_tunnel.client.forwarder import Forwarder
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import random
2+
3+
4+
class Backoff:
5+
"""
6+
Exponential backoff with 10% random jitter, matching the Go client.
7+
8+
Set retries=0 to retry forever.
9+
"""
10+
11+
def __init__(self, retries: int, min_backoff: float, max_backoff: float):
12+
self._retries = retries
13+
self._min_backoff = min_backoff
14+
self._max_backoff = max_backoff
15+
self._attempts = 0
16+
self._last_backoff = 0.0
17+
18+
def backoff(self) -> tuple[float, bool]:
19+
"""Return (delay_seconds, should_retry)."""
20+
if self._retries != 0 and self._attempts > self._retries:
21+
return 0.0, False
22+
self._attempts += 1
23+
delay = self._next_wait()
24+
self._last_backoff = delay
25+
return delay, True
26+
27+
def _next_wait(self) -> float:
28+
if self._last_backoff == 0.0:
29+
b = self._min_backoff
30+
else:
31+
b = self._last_backoff * 2.0
32+
if b > self._max_backoff:
33+
b = self._max_backoff
34+
jitter = 1.0 + random.random() * 0.1
35+
return b * jitter

0 commit comments

Comments
 (0)