Skip to content

Commit e3a2930

Browse files
authored
fix onedrive connector (#362)
1 parent 7c0cffa commit e3a2930

File tree

4 files changed

+14
-33
lines changed

4 files changed

+14
-33
lines changed

CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
## 0.4.4
2+
3+
### Fixes
4+
5+
* **Fix AsyncIO support for OneDrive connector**
6+
17
## 0.4.3
28

39
### Enhancements

unstructured_ingest/__version__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.4.3" # pragma: no cover
1+
__version__ = "0.4.4" # pragma: no cover

unstructured_ingest/v2/interfaces/indexer.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from abc import ABC, abstractmethod
1+
from abc import ABC
22
from typing import Any, AsyncGenerator, Generator, Optional, TypeVar
33

44
from pydantic import BaseModel
@@ -22,9 +22,8 @@ class Indexer(BaseProcess, BaseConnector, ABC):
2222
def is_async(self) -> bool:
2323
return False
2424

25-
@abstractmethod
2625
def run(self, **kwargs: Any) -> Generator[FileData, None, None]:
27-
pass
26+
raise NotImplementedError()
2827

2928
async def run_async(self, **kwargs: Any) -> AsyncGenerator[FileData, None]:
3029
raise NotImplementedError()

unstructured_ingest/v2/processes/connectors/onedrive.py

+5-29
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from dataclasses import dataclass
66
from pathlib import Path
77
from time import time
8-
from typing import TYPE_CHECKING, Any, AsyncIterator, Generator, Iterator, Optional, TypeVar
8+
from typing import TYPE_CHECKING, Any, AsyncIterator, Optional
99

1010
from dateutil import parser
1111
from pydantic import Field, Secret
@@ -101,27 +101,6 @@ class OnedriveIndexerConfig(IndexerConfig):
101101
recursive: bool = False
102102

103103

104-
T = TypeVar("T")
105-
106-
107-
def async_iterable_to_sync_iterable(iterator: AsyncIterator[T]) -> Iterator[T]:
108-
# This version works on Python 3.9 by manually handling the async iteration.
109-
loop = asyncio.new_event_loop()
110-
asyncio.set_event_loop(loop)
111-
try:
112-
while True:
113-
try:
114-
# Instead of anext(iterator), we directly call __anext__().
115-
# __anext__ returns a coroutine that we must run until complete.
116-
future = iterator.__anext__()
117-
result = loop.run_until_complete(future)
118-
yield result
119-
except StopAsyncIteration:
120-
break
121-
finally:
122-
loop.close()
123-
124-
125104
@dataclass
126105
class OnedriveIndexer(Indexer):
127106
connection_config: OnedriveConnectionConfig
@@ -215,7 +194,10 @@ async def drive_item_to_file_data(self, drive_item: "DriveItem") -> FileData:
215194
# Offload the file data creation if it's not guaranteed async
216195
return await asyncio.to_thread(self.drive_item_to_file_data_sync, drive_item)
217196

218-
async def _run_async(self, **kwargs: Any) -> AsyncIterator[FileData]:
197+
def is_async(self) -> bool:
198+
return True
199+
200+
async def run_async(self, **kwargs: Any) -> AsyncIterator[FileData]:
219201
token_resp = await asyncio.to_thread(self.connection_config.get_token)
220202
if "error" in token_resp:
221203
raise SourceConnectionError(
@@ -230,12 +212,6 @@ async def _run_async(self, **kwargs: Any) -> AsyncIterator[FileData]:
230212
file_data = await self.drive_item_to_file_data(drive_item=drive_item)
231213
yield file_data
232214

233-
def run(self, **kwargs: Any) -> Generator[FileData, None, None]:
234-
# Convert the async generator to a sync generator without loading all data into memory
235-
async_gen = self._run_async(**kwargs)
236-
for item in async_iterable_to_sync_iterable(async_gen):
237-
yield item
238-
239215

240216
class OnedriveDownloaderConfig(DownloaderConfig):
241217
pass

0 commit comments

Comments
 (0)