44from dataclasses import dataclass
55from typing import Any , AsyncIterator , Optional , Union
66
7+ from neomodel ._async_compat .util import AsyncUtil
78from neomodel .async_ import relationship_manager
89from neomodel .async_ .database import adb
910from neomodel .async_ .node import AsyncStructuredNode
@@ -1198,25 +1199,77 @@ async def _execute(self, lazy: bool = False, dict_output: bool = False) -> Any:
11981199 for item in self ._ast .additional_return
11991200 ]
12001201 query = self .build_query ()
1201- results , prop_names = await adb .cypher_query (
1202- query ,
1203- self ._query_params ,
1204- resolve_objects = True ,
1205- )
1206- if dict_output :
1207- for item in results :
1208- yield dict (zip (prop_names , item ))
1209- return
1210- # The following is not as elegant as it could be but had to be copied from the
1211- # version prior to cypher_query with the resolve_objects capability.
1212- # It seems that certain calls are only supposed to be focusing to the first
1213- # result item returned (?)
1214- if results and len (results [0 ]) == 1 :
1215- for n in results :
1216- yield n [0 ]
1202+
1203+ # Use streaming for async code to avoid loading all results into memory
1204+ if AsyncUtil .is_async_code :
1205+ # Helper to process streaming results
1206+ async def process_stream (stream_iterator ):
1207+ first_result = True
1208+ result_has_single_column = False
1209+ async for values , prop_names in stream_iterator :
1210+ if first_result :
1211+ # Determine format on first result
1212+ result_has_single_column = len (values ) == 1
1213+ first_result = False
1214+
1215+ if dict_output :
1216+ yield dict (zip (prop_names , values ))
1217+ elif result_has_single_column :
1218+ yield values [0 ]
1219+ else :
1220+ yield values
1221+
1222+ # Stream results one by one from the database
1223+ if adb ._active_transaction :
1224+ # Use current transaction if active
1225+ stream = adb ._stream_cypher_query (
1226+ adb ._active_transaction ,
1227+ query ,
1228+ self ._query_params ,
1229+ handle_unique = True ,
1230+ resolve_objects = True ,
1231+ )
1232+ async for item in process_stream (stream ):
1233+ yield item
1234+ return
1235+ else :
1236+ # Create a session for streaming
1237+ # Note: We need to keep the session open during iteration
1238+ async with adb .driver .session (
1239+ database = adb ._database_name ,
1240+ impersonated_user = adb .impersonated_user ,
1241+ ) as session :
1242+ stream = adb ._stream_cypher_query (
1243+ session ,
1244+ query ,
1245+ self ._query_params ,
1246+ handle_unique = True ,
1247+ resolve_objects = True ,
1248+ )
1249+ async for item in process_stream (stream ):
1250+ yield item
1251+ return
12171252 else :
1218- for result in results :
1219- yield result
1253+ # Sync code path: use traditional approach (fetch all results)
1254+ results , prop_names = await adb .cypher_query (
1255+ query ,
1256+ self ._query_params ,
1257+ resolve_objects = True ,
1258+ )
1259+ if dict_output :
1260+ for item in results :
1261+ yield dict (zip (prop_names , item ))
1262+ return
1263+ # The following is not as elegant as it could be but had to be copied from the
1264+ # version prior to cypher_query with the resolve_objects capability.
1265+ # It seems that certain calls are only supposed to be focusing to the first
1266+ # result item returned (?)
1267+ if results and len (results [0 ]) == 1 :
1268+ for n in results :
1269+ yield n [0 ]
1270+ else :
1271+ for result in results :
1272+ yield result
12201273
12211274
12221275class AsyncBaseSet :
@@ -1243,6 +1296,16 @@ async def all(self, lazy: bool = False) -> list:
12431296 return results
12441297
12451298 async def __aiter__ (self ) -> AsyncIterator :
1299+ """
1300+ Async iterator that streams results from the database one at a time.
1301+
1302+ This provides true async iteration without loading all results into memory first.
1303+ For large result sets, this is much more memory efficient than using all().
1304+
1305+ Example:
1306+ async for node in Coffee.nodes:
1307+ print(node.name) # Process each node as it arrives
1308+ """
12461309 ast = await self .query_cls (self ).build_ast ()
12471310 async for item in ast ._execute ():
12481311 yield item
0 commit comments