Skip to content

Commit 83b1a96

Browse files
authored
Merge pull request #7 from laisee/codex/review-code-and-suggest-improvements
Implement project improvements
2 parents b20b7cb + 14bd440 commit 83b1a96

7 files changed

Lines changed: 156 additions & 282 deletions

File tree

.pre-commit-config.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
repos:
2+
- repo: https://github.com/astral-sh/ruff-pre-commit
3+
rev: v0.2.2
4+
hooks:
5+
- id: ruff
6+
- repo: https://github.com/PyCQA/bandit
7+
rev: 1.7.6
8+
hooks:
9+
- id: bandit
10+
args: ["-ll"]

README.md

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ The client is capable of:
2929
- **FIX Message Handling:** Constructs and sends FIX messages to the server.
3030
- **Secure Communication:** Establishes a secure WebSocket connection to send and receive messages.
3131
- **Message Validation:** Validates the presence of necessary fields in messages.
32-
- **Environment Configuration:** Uses environment variables loaded from a .env file for configuration.
32+
- **Environment Configuration:** Uses environment variables loaded from a .env file for configuration and fails fast when required values are missing.
33+
- **Async Operation:** Uses `websockets` with `asyncio` for non-blocking communication and heartbeat handling.
3334

3435
## Prerequisites
3536

@@ -61,7 +62,10 @@ The client is capable of:
6162

6263
2. **Install required python libraries:**
6364
```sh
64-
pip install -r requirements.txt
65+
pip install -r requirements.txt
66+
# or install in editable mode using pyproject
67+
pip install -e .
68+
```
6569

6670
3. **Generate API Keys**
6771
This is done at Power Trade UI under URL 'https://app.power.trade/api-keys'
@@ -97,3 +101,10 @@ The client is capable of:
97101
Review client actions as it executes logon to server, adds a new order, cancels the order while awaiting response(s).
98102
99103
A sleep action allows time to review the new order on system via API or UI before it's cancelled.
104+
105+
## Development
106+
107+
Run pre-commit locally to lint and scan before committing:
108+
```sh
109+
pre-commit run --all-files
110+
```

client.py

Lines changed: 41 additions & 261 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,20 @@
11
import asyncio
22
import logging
33
import os
4-
import socket
54
import ssl
65
import sys
7-
import threading
8-
import time
6+
from websockets import connect
97

108
from dotenv import load_dotenv
119

1210
from messages import (
13-
checkMsg,
1411
checkLogonMsg,
1512
getMsgCancel,
1613
getMsgHeartbeat,
1714
getMsgLogon,
1815
getMsgNewOrder,
19-
translateFix,
2016
)
21-
from utils import get_attr, get_log_filename
17+
from utils import get_log_filename
2218

2319
# Common settings
2420
SEPARATOR = "\x01"
@@ -47,277 +43,61 @@
4743
# Add file handler to the logger
4844
logger.addHandler(file_handler)
4945

50-
# first seqnum taken by LOGON message, this var is incremented for new orders, heartbeat
46+
# first seqnum taken by LOGON message, incremented for new orders and heartbeats
5147
seqnum = 2
52-
53-
# Event object to signal the heartbeat thread to stop
54-
stop_event = threading.Event()
55-
56-
57-
def send_heartbeat(apikey, conn):
48+
async def send_heartbeat(ws, apikey: str) -> None:
49+
"""Periodically send FIX heartbeat messages over the websocket."""
5850
global seqnum
59-
seqnum += 1
60-
try:
51+
init_sleep = int(os.getenv("INIT_SLEEP", 60))
52+
await asyncio.sleep(init_sleep)
53+
heartbeat_sleep = int(os.getenv("HEARTBEAT_SLEEP", 90))
54+
while True:
55+
await asyncio.sleep(heartbeat_sleep)
56+
seqnum += 1
6157
msg = getMsgHeartbeat(apikey, seqnum)
62-
conn.sendall(msg)
63-
logger.info(f"Sending Heartbeat Msg: {msg}")
64-
logger.info(f"Sent heartbeat message with Success @ seqnum {seqnum}")
65-
except Exception as e:
66-
logger.error(f"Failed to send Heartbeat message: error was '{e}'")
67-
68-
69-
def heartbeat_thread(apikey, conn, stop_event):
70-
try:
71-
INIT_SLEEP = os.getenv(
72-
"INIT_SLEEP", 60
73-
) # SLEEP for X seconds while client is starting up, default to 60 seconds
74-
time.sleep(INIT_SLEEP)
75-
HEARTBEAT_SLEEP = int(os.getenv("HEARTBEAT_SLEEP", 90)) # defaults to 90 secs
76-
while not stop_event.is_set():
77-
# delay start of thread by 20 secs
78-
send_heartbeat(apikey, conn)
79-
time.sleep(
80-
HEARTBEAT_SLEEP
81-
) # Send heartbeat every `HEARTBEAT_SLEEP` seconds
82-
except Exception as e:
83-
print(f"Heartbeat thread exception: {e}")
58+
await ws.send(msg)
59+
logger.info("Sent heartbeat message")
8460

8561

86-
async def main(server: str, port: int, apikey: str):
62+
async def main(server: str, port: int, apikey: str) -> None:
63+
"""Connect to the FIX endpoint and submit a sample order."""
8764
global seqnum
8865

89-
#
90-
# Trade test values
91-
# N.B. Not designed for PRODUCTION trading
92-
#
93-
RESP_SENDER = "PT-OE"
94-
SYMBOL: str = "ETH-USD"
95-
PRICE: float = 2508.08 #3090.00 + randint(1, 8)
96-
QUANTITY: float = .1
97-
98-
# Define server address w/ port
99-
server_addr = f"{server}:{port}"
100-
logger.info(f"server: {server_addr}")
101-
102-
# Create context for the TLS connection
103-
context = ssl.create_default_context()
104-
105-
# Wrap the socket with SSL
106-
context.load_verify_locations(cafile=os.getenv("CERTFILE_LOCATION", "cert.crt"))
107-
logger.info("Context created")
66+
SYMBOL = "ETH-USD"
67+
PRICE = 2508.08
68+
QUANTITY = 0.1
10869

109-
context.check_hostname = True
110-
context.verify_mode = ssl.CERT_REQUIRED
70+
uri = f"wss://{server}:{port}"
71+
ssl_context = ssl.create_default_context()
72+
ssl_context.load_verify_locations(cafile=os.getenv("CERTFILE_LOCATION", "cert.crt"))
11173

112-
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
113-
114-
# wait up to X secs for receiving responses
115-
logger.info( f"Assigning WAIT for Fix response messages of {os.getenv('MSG_RESPONSE_WAIT', 5)} seconds")
116-
sock.settimeout(int(os.getenv("MSG_RESPONSE_WAIT", 5)))
117-
118-
print(f"connecting to {server} on port {port} ...")
119-
sock.connect((server, port))
120-
121-
conn = context.wrap_socket(sock, server_hostname=server)
122-
123-
try:
124-
print("Handshaking Fix SSL/TLS connection ...")
125-
conn.do_handshake()
126-
127-
# Check Fix API connection with Logon message
74+
async with connect(uri, ssl=ssl_context) as ws:
12875
msg = getMsgLogon(apikey)
129-
error_msg = ""
130-
try:
131-
print(f"Sending Logon request {msg} to server {server} ...")
132-
logger.debug(f"Sending Logon msg {msg} to server {server} ...")
133-
# send Fix Logon message
134-
conn.sendall(msg)
135-
136-
print(f"Reading Logon response from server {server} ...")
137-
logger.debug(f"Reading Logon response from server {server} ...")
138-
response = conn.recv(1024)
76+
await ws.send(msg)
77+
response = await ws.recv()
78+
valid, error_msg = checkLogonMsg(response)
79+
if not valid:
80+
logger.error(f"Invalid Logon response: {error_msg}")
81+
return
13982

140-
print(f"Checking Logon response from server {response} ...")
141-
valid, error_msg = checkLogonMsg(response)
142-
if valid:
143-
logger.info("Received valid Logon response")
144-
# Start heartbeat thread
145-
threading.Thread(
146-
target=heartbeat_thread,
147-
args=(
148-
apikey,
149-
conn,
150-
stop_event,
151-
),
152-
).start()
153-
else:
154-
logger.error(f"Invalid Logon response: error was '{error_msg}'")
155-
sys.exit(1)
83+
heartbeat_task = asyncio.create_task(send_heartbeat(ws, apikey))
15684

157-
clOrdID, msg = getMsgNewOrder(SYMBOL, PRICE, QUANTITY, apikey, seqnum)
158-
decoded_msg = msg.decode("utf-8")
159-
print(
160-
"Sending new order [%s] with order details: {%s}"
161-
% (clOrdID, decoded_msg)
162-
)
163-
logger.debug(
164-
"Sending new order [%s] with order details: {%s}"
165-
% (clOrdID, decoded_msg)
166-
)
167-
conn.sendall(msg)
85+
clOrdID, order_msg = getMsgNewOrder(SYMBOL, PRICE, QUANTITY, apikey, seqnum)
86+
await ws.send(order_msg)
87+
logger.info("Sent new order")
16888

169-
print("Reading New Order response from server ...")
170-
response = conn.recv(1024)
89+
resp = await ws.recv()
90+
logger.info(f"Order response: {resp}")
17191

172-
logger.debug(f"Received(decoded): {response.decode('utf-8')}")
173-
valid = checkMsg(response, RESP_SENDER, apikey)
174-
(
175-
print("Received valid New Order response")
176-
if valid
177-
else print(f"Received invalid New Order response -> {response}")
178-
)
92+
cancel_id = 11111
93+
seqnum += 1
94+
_, cancel_msg = getMsgCancel(clOrdID, cancel_id, SYMBOL, apikey, seqnum)
95+
await ws.send(cancel_msg)
96+
logger.info("Sent cancel request")
17997

180-
#
181-
# iterate few times with sleep to allow trading messages from Limit Order to arrive
182-
#
183-
count = 0
184-
POLL_SLEEP = int(
185-
os.getenv("POLL_SLEEP", 5)
186-
) # seconds to sleep between iterations
187-
POLL_LIMIT = int(os.getenv("POLL_LIMIT", 10)) # iteration count
98+
await asyncio.sleep(int(os.getenv("FINAL_SLEEP", 20)))
99+
heartbeat_task.cancel()
188100

189-
logger.info(
190-
f"Waiting for New Order [{clOrdID}] confirmation response from server [{count}] ..."
191-
)
192-
193-
while count < POLL_LIMIT:
194-
time.sleep(POLL_SLEEP)
195-
try:
196-
logger.info("Waiting for new message ...")
197-
response = conn.recv(1024)
198-
# response = await asyncio.get_event_loop().sock_recv(conn, 1024)
199-
msg_str = response.decode("utf-8").replace(SEPARATOR, VERTLINE)
200-
if msg_str is not None:
201-
logger.info(f"Received(decoded):\n {msg_str}")
202-
msg_list = msg_str.split("8=FIX.4.4")
203-
for i, msg in enumerate(msg_list):
204-
logger.debug(
205-
"Recd msg: Ord '%s' Type [%s] Sts [%s]"
206-
% (
207-
get_attr(msg_str, "11"),
208-
translateFix("35", get_attr(msg_str, "35")),
209-
translateFix("39", get_attr(msg_str, "39")),
210-
)
211-
)
212-
if (
213-
get_attr(msg, "35") == "8"
214-
and translateFix("39", get_attr(msg, "39")) == "New"
215-
):
216-
logger.info(
217-
"Exit Wait loop for order confirmation as received order status == 'New'"
218-
)
219-
count = POLL_LIMIT
220-
break
221-
except Exception as e:
222-
logger.error("Error while waiting for new message -> %s" % e)
223-
count += 1
224-
225-
# setup cancel order to remove new order added above
226-
cancelOrderID = 11111 # clOrdID
227-
228-
print(f"Sleep {POLL_SLEEP*5} secs before starting to Cancel orders")
229-
logger.info("Sleep before starting to Cancel orders")
230-
time.sleep(POLL_SLEEP * 5)
231-
#
232-
# Cancel Order can be done if the New Limit Order above is not filled
233-
#
234-
logger.debug("Building Cancel Order Message for order %s" % cancelOrderID)
235-
seqnum += 1
236-
now, msg = getMsgCancel(clOrdID, cancelOrderID, SYMBOL, apikey, seqnum)
237-
logger.debug(
238-
"Sending Cancel Order Message %s for order %s with Seqnum {seqnum}"
239-
% (msg, cancelOrderID)
240-
)
241-
conn.sendall(msg)
242-
243-
#
244-
# Await response from order cancel message
245-
#
246-
count = 0
247-
POLL = True
248-
while POLL and count < POLL_LIMIT:
249-
time.sleep(POLL_SLEEP)
250-
logger.debug("Awaiting Cancel order response from server ...")
251-
response = conn.recv(1024)
252-
msg = response.decode("utf-8").replace(SEPARATOR, VERTLINE)
253-
logger.debug(
254-
"Received msg from server with type [%s] status [%s]"
255-
% (
256-
translateFix("35", get_attr(msg, "35")),
257-
translateFix("39", get_attr(msg, "39")),
258-
)
259-
)
260-
261-
#
262-
# was received message a 'heartbeat' [Msg Type = '0']
263-
#
264-
if get_attr(msg, "35") == "0":
265-
logger.info("Heartbeat msg received ...")
266-
#
267-
# received message an 'execution report' [Msg Type = '8']
268-
#
269-
elif (
270-
get_attr(msg, "35") == "8"
271-
and translateFix("39", get_attr(msg, "39")) == "Cancelled"
272-
):
273-
logger.info(
274-
"Received Order Cancel response with order status == 'Cancelled'"
275-
)
276-
POLL = False
277-
#
278-
# Check status of the order i.e. '2' for Filled, '8' for Rejected
279-
elif (
280-
get_attr(msg, "35") == "9"
281-
and translateFix("39", get_attr(msg, "39")) == "Rejected"
282-
):
283-
logger.info(
284-
"Received Order Cancel response with order status == 'Rejected'"
285-
)
286-
POLL = False
287-
else:
288-
logger.debug(
289-
f"Received(decoded): {response.decode('utf-8').replace(SEPARATOR,VERTLINE)}"
290-
)
291-
logger.debug(
292-
"Recd msg with type [%s] status [%s] for order %s"
293-
% (
294-
translateFix("35", get_attr(msg, "35")),
295-
translateFix("39", get_attr(msg, "39")),
296-
cancelOrderID,
297-
)
298-
)
299-
count += 1
300-
except socket.timeout:
301-
wait_time = os.getenv("MSG_RESPONSE_WAIT", 5)
302-
logger.info(f"Receive operation timed out after {wait_time} seconds.")
303-
except Exception as e:
304-
logger.error(f"Error while processing send/receive Fix messages: {e}")
305-
306-
except Exception as e:
307-
logger.error(f"Failed to make Fix connection and send Order message: {e}")
308-
finally:
309-
#
310-
# Allow 'FINAL_SLEEP' seconds to pass so we can check account balance / possition changes / open orders before closing connection which will remove open orders
311-
#
312-
FINAL_SLEEP = int(os.getenv("FINAL_SLEEP", 20))
313-
logger.info(f"\nWaiting {FINAL_SLEEP} secs to close connection")
314-
stop_event.set() # Signal the heartbeat thread to stop
315-
time.sleep(FINAL_SLEEP)
316-
sock.close()
317-
conn.close()
318-
logger.info(
319-
"\n**************************************************************************\n"
320-
)
321101

322102

323103
if __name__ == "__main__":

0 commit comments

Comments
 (0)