Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create starlette subscriptions example #7

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Empty file.
24 changes: 24 additions & 0 deletions startlette-subscriptions/api/defintions/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from enum import Enum
import strawberry

from jobs import JobModel


@strawberry.enum
class JobStatus(Enum):
PENDING = "pending"
SUCCESS = "success"
FAILURE = "FAILURE"


@strawberry.type
class Job:
id: str
status: JobStatus

@classmethod
def from_instance(cls, instance: JobModel):
return cls(
id=instance.id,
status=JobStatus(instance.status),
)
12 changes: 12 additions & 0 deletions startlette-subscriptions/api/mutation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from strawberry.tools import create_type

from .mutations.create_job import create_job
from .mutations.update_job_status import update_job_status

Mutation = create_type(
"Mutation",
[
create_job,
update_job_status,
],
)
Empty file.
11 changes: 11 additions & 0 deletions startlette-subscriptions/api/mutations/create_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import strawberry


from jobs import controller
from ..defintions.job import Job


@strawberry.mutation
def create_job() -> Job:
model = controller.create_job()
return Job.from_instance(model)
11 changes: 11 additions & 0 deletions startlette-subscriptions/api/mutations/update_job_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import strawberry


from jobs import controller
from ..defintions.job import Job, JobStatus


@strawberry.mutation
def update_job_status(id: str, status: JobStatus) -> Job:
model = controller.update_status(id, status.value)
return Job.from_instance(model)
21 changes: 21 additions & 0 deletions startlette-subscriptions/api/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import strawberry

from jobs import controller

from .mutation import Mutation
from .subscription import Subscription
from .defintions.job import Job


@strawberry.type
class Query:
@strawberry.field
def ping(self) -> str:
return "pong"

@strawberry.field
def get_job_by_id(self, id: str) -> Job:
return Job.from_instance(controller.get_job(id))


schema = strawberry.Schema(Query, mutation=Mutation, subscription=Subscription)
12 changes: 12 additions & 0 deletions startlette-subscriptions/api/subscription.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from strawberry.tools import create_type

from .subscriptions.latest_crypto_price import latest_crypto_price
from .subscriptions.get_job_status import get_job_status

Subscription = create_type(
"Subscription",
[
latest_crypto_price,
get_job_status,
],
)
Empty file.
24 changes: 24 additions & 0 deletions startlette-subscriptions/api/subscriptions/get_job_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from typing import AsyncGenerator

import strawberry


from jobs import controller
from jobs.subscription_hub import hub

from ..defintions.job import Job


@strawberry.subscription
async def get_job_status(id: str) -> AsyncGenerator[Job, None]:
# Look up job id
job = controller.get_job(id)

yield Job.from_instance(job)

try:
queue = hub.subscribe_to_job(id)
while (val := await queue.get()) is not None:
yield Job.from_instance(val)
finally:
hub.unsubscribe(id, queue)
45 changes: 45 additions & 0 deletions startlette-subscriptions/api/subscriptions/latest_crypto_price.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from typing import AsyncGenerator
import json
from datetime import datetime

import strawberry
import websockets


@strawberry.type
class Ticker:
pair: str
best_ask_price: str
best_bid_price: str
close_price: str
timestamp: datetime


@strawberry.subscription
async def latest_crypto_price(pair: str = "ETH/EUR") -> AsyncGenerator[Ticker, None]:
async with websockets.connect("wss://ws.kraken.com") as websocket:
await websocket.send(
json.dumps(
{
"event": "subscribe",
"pair": [pair],
"subscription": {"name": "ticker"},
}
)
)

while True:
data = await websocket.recv()
data = json.loads(data)
if "event" in data:
if "status" in data and data["status"] == "error":
raise Exception(data["errorMessage"])

if isinstance(data, list):
yield Ticker(
pair=pair,
best_ask_price=data[1]["a"][0],
best_bid_price=data[1]["b"][0],
close_price=data[1]["c"][0],
timestamp=datetime.now(),
)
12 changes: 12 additions & 0 deletions startlette-subscriptions/jobs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import Dict
from dataclasses import dataclass


@dataclass
class JobModel:
id: str
status: str


# Map of created jobs
JOBS: Dict[str, JobModel] = {}
28 changes: 28 additions & 0 deletions startlette-subscriptions/jobs/controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import uuid


from . import JOBS, JobModel
from .subscription_hub import hub


def get_job(id: str) -> JobModel:
if id not in JOBS:
raise Exception(f"Can't find job: {id}")

return JOBS[id]


def create_job() -> JobModel:
id = str(uuid.uuid4())
model = JobModel(id=id, status="pending")
JOBS[id] = model
return model


def update_status(id: str, status: str) -> JobModel:
job = get_job(id)
job.status = status

hub.update_job(id, job)

return job
30 changes: 30 additions & 0 deletions startlette-subscriptions/jobs/subscription_hub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from collections import defaultdict
from asyncio import Queue


class SubscriptionHub:
def __init__(self):
self.subscriptions = defaultdict(list)

def subscribe_to_job(self, job_id):
queue = Queue()

self.subscriptions[job_id].append(queue)

return queue

def update_job(self, job_id, job):
# No one is listening to the this job
if job_id not in self.subscriptions:
return

queues = self.subscriptions[job_id]
for queue in queues:
queue.put_nowait(job)

def unsubscribe(self, job_id, queue):
queues = self.subscriptions[job_id]
queues.remove(queue)


hub = SubscriptionHub()
5 changes: 5 additions & 0 deletions startlette-subscriptions/main/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from strawberry.asgi import GraphQL

from api.schema import schema

app = GraphQL(schema)
Loading