1
1
import asyncio
2
- from abc import abstractmethod
2
+ from abc import ABC , abstractmethod
3
3
from concurrent .futures import ThreadPoolExecutor
4
4
from logging import getLogger
5
5
from typing import Any , AsyncGenerator , Dict , Optional
@@ -34,7 +34,7 @@ async def transform_record(self, record: Any) -> Any:
34
34
raise NotImplementedError
35
35
36
36
37
- class ConcurrentTransformer (Transformer ):
37
+ class ConcurrentTransformer (Transformer , ABC ):
38
38
"""A `ConcurrentTransformer` can process multiple records concurrently.
39
39
40
40
`ConcurrentTransformer`s are useful when the transformation work is IO
@@ -60,7 +60,7 @@ def __init__(
60
60
self .pending_tasks = []
61
61
62
62
def on_worker_thread_start (self ):
63
- pass
63
+ pass # do nothing
64
64
65
65
async def drain_completed_tasks (self ):
66
66
tasks_drained = 0
@@ -107,7 +107,7 @@ def do_work_on_record(self, record: Any) -> Any:
107
107
async def yield_processor (self ):
108
108
await asyncio .sleep (0 )
109
109
110
- async def emit_outstanding_records (self ):
110
+ async def emit_outstanding_records (self , context : StepContext ):
111
111
while self .pending_tasks :
112
112
async for result in self .drain_completed_tasks ():
113
113
yield result
0 commit comments