Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
321 changes: 316 additions & 5 deletions nautilus_trader/adapters/polymarket/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from py_clob_client.client import PartialCreateOrderOptions
from py_clob_client.client import TradeParams
from py_clob_client.clob_types import AssetType
from py_clob_client.clob_types import PostOrdersArgs
from py_clob_client.exceptions import PolyApiException

from nautilus_trader.adapters.polymarket.common.cache import get_polymarket_trades_key
Expand Down Expand Up @@ -77,6 +78,7 @@
from nautilus_trader.execution.messages import GeneratePositionStatusReports
from nautilus_trader.execution.messages import QueryAccount
from nautilus_trader.execution.messages import SubmitOrder
from nautilus_trader.execution.messages import SubmitOrderList
from nautilus_trader.execution.reports import FillReport
from nautilus_trader.execution.reports import OrderStatusReport
from nautilus_trader.execution.reports import PositionStatusReport
Expand Down Expand Up @@ -956,6 +958,92 @@ async def _cancel_all_orders(self, command: CancelAllOrders) -> None:
finally:
await self._retry_manager_pool.release(retry_manager)

async def _cancel_all_global(self) -> None:
"""
Cancel all orders for this API key using Polymarket's cancel_all endpoint.

This cancels ALL orders across all markets and strategies. Use with caution as
it cannot be filtered by instrument or strategy.

"""
self._log.info("Canceling ALL orders globally via Polymarket cancel_all endpoint")

retry_manager = await self._retry_manager_pool.acquire()
try:
response: JSON | None = await retry_manager.run(
"cancel_all_global",
[],
asyncio.to_thread,
self._http_client.cancel_all,
)
if not response or not retry_manager.result:
self._log.error(f"Failed to cancel all orders: {retry_manager.message}")
else:
canceled = response.get("canceled", [])
not_canceled = response.get("not_canceled", {})
self._log.info(
f"Cancel all result: {len(canceled)} canceled, "
f"{len(not_canceled)} not canceled",
)
for order_id, reason in not_canceled.items():
self._log.warning(f"Order {order_id} not canceled: {reason}")
finally:
await self._retry_manager_pool.release(retry_manager)

async def _cancel_market_orders(
self,
instrument_id: InstrumentId | None = None,
asset_id: str = "",
) -> None:
"""
Cancel orders for a specific market using Polymarket's cancel_market_orders
endpoint.

Parameters
----------
instrument_id : InstrumentId, optional
The instrument ID to derive the market (condition_id).
asset_id : str, optional
The specific asset ID (token_id) to cancel orders for.

"""
from nautilus_trader.adapters.polymarket.common.symbol import get_polymarket_condition_id
from nautilus_trader.adapters.polymarket.common.symbol import get_polymarket_token_id

market = ""
if instrument_id is not None:
market = get_polymarket_condition_id(instrument_id)
if not asset_id:
asset_id = get_polymarket_token_id(instrument_id)

self._log.info(
f"Canceling orders for market={market or 'ALL'}, asset_id={asset_id or 'ALL'}",
)

retry_manager = await self._retry_manager_pool.acquire()
try:
response: JSON | None = await retry_manager.run(
"cancel_market_orders",
[instrument_id] if instrument_id else [],
asyncio.to_thread,
self._http_client.cancel_market_orders,
market,
asset_id,
)
if not response or not retry_manager.result:
self._log.error(f"Failed to cancel market orders: {retry_manager.message}")
else:
canceled = response.get("canceled", [])
not_canceled = response.get("not_canceled", {})
self._log.info(
f"Cancel market orders result: {len(canceled)} canceled, "
f"{len(not_canceled)} not canceled",
)
for order_id, reason in not_canceled.items():
self._log.warning(f"Order {order_id} not canceled: {reason}")
finally:
await self._retry_manager_pool.release(retry_manager)

async def _submit_order(self, command: SubmitOrder) -> None:
await self._maintain_active_market(command.instrument_id)

Expand All @@ -979,17 +1067,18 @@ async def _submit_order(self, command: SubmitOrder) -> None:
)
return

if order.is_post_only:
# post_only orders only supported with GTC or GTD time_in_force
if order.is_post_only and order.time_in_force not in (TimeInForce.GTC, TimeInForce.GTD):
self._log.error(
f"Cannot submit order {order.client_order_id}: "
"Post-only orders not supported on Polymarket",
"Post-only orders require GTC or GTD time in force",
LogColor.RED,
)
self.generate_order_denied(
strategy_id=order.strategy_id,
instrument_id=order.instrument_id,
client_order_id=order.client_order_id,
reason="POST_ONLY_NOT_SUPPORTED",
reason="POST_ONLY_REQUIRES_GTC_OR_GTD",
ts_event=self._clock.timestamp_ns(),
)
return
Expand Down Expand Up @@ -1031,6 +1120,222 @@ async def _submit_order(self, command: SubmitOrder) -> None:
ts_event=self._clock.timestamp_ns(),
)

def _validate_order_for_batch(self, order: Order) -> str | None:
"""
Validate an order for batch submission.

Returns None if valid, or an error reason string if invalid.

"""
if order.is_reduce_only:
return "REDUCE_ONLY_NOT_SUPPORTED"

if order.is_post_only and order.time_in_force not in (TimeInForce.GTC, TimeInForce.GTD):
return "POST_ONLY_REQUIRES_GTC_OR_GTD"

if order.time_in_force not in VALID_POLYMARKET_TIME_IN_FORCE:
return "UNSUPPORTED_TIME_IN_FORCE"

if order.order_type != OrderType.LIMIT:
return "BATCH_ONLY_SUPPORTS_LIMIT_ORDERS"

if order.is_quote_quantity:
return "UNSUPPORTED_QUOTE_QUANTITY"

return None

async def _submit_order_list(self, command: SubmitOrderList) -> None:
"""
Submit a batch of orders to Polymarket using the post_orders endpoint.

Parameters
----------
command : SubmitOrderList
The command containing the list of orders to submit.

"""
order_list = command.order_list
orders = order_list.orders

if not orders:
self._log.warning("Order list is empty, nothing to submit")
return

# Filter out closed orders
orders = [order for order in orders if not order.is_closed]
if not orders:
return

# Validate all orders before processing
valid_orders = []
for order in orders:
denial_reason = self._validate_order_for_batch(order)
if denial_reason:
self._log.error(
f"Cannot submit order {order.client_order_id}: {denial_reason}",
LogColor.RED,
)
self.generate_order_denied(
strategy_id=order.strategy_id,
instrument_id=order.instrument_id,
client_order_id=order.client_order_id,
reason=denial_reason,
ts_event=self._clock.timestamp_ns(),
)
continue
valid_orders.append(order)

if not valid_orders:
self._log.warning("No valid orders to submit after validation")
return

self._log.info(f"Submitting batch of {len(valid_orders)} orders to Polymarket")

# Maintain active markets for all orders
for order in valid_orders:
await self._maintain_active_market(order.instrument_id)

# Sign all orders
signed_orders_args = await self._sign_orders_for_batch(valid_orders)

# Generate submitted events for all orders
now_ns = self._clock.timestamp_ns()
for order in valid_orders:
self.generate_order_submitted(
strategy_id=order.strategy_id,
instrument_id=order.instrument_id,
client_order_id=order.client_order_id,
ts_event=now_ns,
)

# Submit batch
await self._post_signed_orders_batch(valid_orders, signed_orders_args)

async def _sign_orders_for_batch(
self,
orders: list[Order],
) -> list[PostOrdersArgs]:
"""
Sign multiple orders for batch submission.
"""
signed_orders_args: list[PostOrdersArgs] = []
signing_start = self._clock.timestamp()

for order in orders:
instrument = self._cache.instrument(order.instrument_id)

order_args = OrderArgs(
price=float(order.price),
token_id=get_polymarket_token_id(order.instrument_id),
size=float(order.quantity),
side=order_side_to_str(order.side),
expiration=int(nanos_to_secs(order.expire_time_ns)),
)

neg_risk = self._get_neg_risk_for_instrument(instrument)
options = PartialCreateOrderOptions(neg_risk=neg_risk)

signed_order = await asyncio.to_thread(
self._http_client.create_order,
order_args,
options=options,
)

order_type = convert_tif_to_polymarket_order_type(order.time_in_force)
signed_orders_args.append(
PostOrdersArgs(
order=signed_order,
orderType=order_type,
postOnly=order.is_post_only,
),
)

interval = self._clock.timestamp() - signing_start
self._log.info(
f"Signed {len(orders)} Polymarket orders in {interval:.3f}s",
LogColor.BLUE,
)

return signed_orders_args

async def _post_signed_orders_batch(
self,
orders: list[Order],
signed_orders_args: list[PostOrdersArgs],
) -> None:
"""
Post a batch of signed orders to Polymarket.
"""
retry_manager = await self._retry_manager_pool.acquire()
try:
client_order_ids = [order.client_order_id for order in orders]
response = await retry_manager.run(
"submit_orders_batch",
client_order_ids,
asyncio.to_thread,
self._http_client.post_orders,
signed_orders_args,
)

if not response:
self._reject_all_orders(orders, str(retry_manager.message))
return

self._process_batch_response(orders, response)

except Exception as e:
self._log.error(f"Error submitting order batch: {e}")
self._reject_all_orders(orders, str(e))
finally:
await self._retry_manager_pool.release(retry_manager)

def _reject_all_orders(self, orders: list[Order], reason: str) -> None:
"""
Generate rejection events for all orders.
"""
for order in orders:
self.generate_order_rejected(
strategy_id=order.strategy_id,
instrument_id=order.instrument_id,
client_order_id=order.client_order_id,
reason=reason,
ts_event=self._clock.timestamp_ns(),
)

def _process_batch_response(self, orders: list[Order], response: list) -> None:
"""
Process the response from a batch order submission.
"""
for i, result in enumerate(response):
order = orders[i]
if result.get("success"):
venue_order_id = VenueOrderId(result["orderID"])
self._cache.add_venue_order_id(order.client_order_id, venue_order_id)

# Signal order event
event = self._ack_events_order.get(venue_order_id)
if event:
event.set()

# Signal trade event
trade_event = self._ack_events_trade.get(venue_order_id)
if trade_event:
trade_event.set()

self._log.debug(
f"Order {order.client_order_id} accepted, venue_order_id={venue_order_id}",
)
else:
reason = result.get("errorMsg", "Unknown error")
self.generate_order_rejected(
strategy_id=order.strategy_id,
instrument_id=order.instrument_id,
client_order_id=order.client_order_id,
reason=reason,
ts_event=self._clock.timestamp_ns(),
)
self._log.warning(f"Order {order.client_order_id} rejected: {reason}")

def _deny_market_order_quantity(self, order: Order, reason: str) -> None:
self._log.error(
f"Cannot submit market order {order.client_order_id}: {reason}",
Expand Down Expand Up @@ -1142,9 +1447,14 @@ async def _submit_limit_order(self, command: SubmitOrder, instrument) -> None:
ts_event=self._clock.timestamp_ns(),
)

await self._post_signed_order(order, signed_order)
await self._post_signed_order(order, signed_order, post_only=order.is_post_only)

async def _post_signed_order(self, order: Order, signed_order) -> None:
async def _post_signed_order(
self,
order: Order,
signed_order,
post_only: bool = False,
) -> None:
retry_manager = await self._retry_manager_pool.acquire()
try:
response: JSON | None = await retry_manager.run(
Expand All @@ -1154,6 +1464,7 @@ async def _post_signed_order(self, order: Order, signed_order) -> None:
self._http_client.post_order,
signed_order,
convert_tif_to_polymarket_order_type(order.time_in_force),
post_only,
)
if not response or not response.get("success"):
self.generate_order_rejected(
Expand Down
Loading
Loading