44from dataclasses import dataclass
55from typing import Any , AsyncIterator , Optional , Union
66
7+ from neomodel ._async_compat .util import AsyncUtil
78from neomodel .async_ import relationship_manager
89from neomodel .async_ .database import adb
910from neomodel .async_ .node import AsyncStructuredNode
@@ -1218,25 +1219,87 @@ async def _execute(self, lazy: bool = False, dict_output: bool = False) -> Any:
12181219 for item in self ._ast .additional_return
12191220 ]
12201221 query = self .build_query ()
1221- results , prop_names = await adb .cypher_query (
1222- query ,
1223- self ._query_params ,
1224- resolve_objects = True ,
1225- )
1226- if dict_output :
1227- for item in results :
1228- yield dict (zip (prop_names , item ))
1229- return
1230- # The following is not as elegant as it could be but had to be copied from the
1231- # version prior to cypher_query with the resolve_objects capability.
1232- # It seems that certain calls are only supposed to be focusing to the first
1233- # result item returned (?)
1234- if results and len (results [0 ]) == 1 :
1235- for n in results :
1236- yield n [0 ]
1222+
1223+ # Use streaming for async code to avoid loading all results into memory
1224+ if AsyncUtil .is_async_code :
1225+ # Helper to process streaming results
1226+ async def process_stream (stream_iterator ):
1227+ first_result = True
1228+ result_has_single_column = False
1229+ async for values , prop_names in stream_iterator :
1230+ if first_result :
1231+ # Determine format on first result
1232+ result_has_single_column = len (values ) == 1
1233+ first_result = False
1234+
1235+ if dict_output :
1236+ yield dict (zip (prop_names , values ))
1237+ elif result_has_single_column :
1238+ yield values [0 ]
1239+ else :
1240+ yield values
1241+
1242+ # Stream results one by one from the database
1243+ if adb ._active_transaction :
1244+ # Use current transaction if active
1245+ stream = adb ._stream_cypher_query (
1246+ adb ._active_transaction ,
1247+ query ,
1248+ self ._query_params ,
1249+ handle_unique = True ,
1250+ resolve_objects = True ,
1251+ )
1252+ async for item in process_stream (stream ):
1253+ yield item
1254+ else :
1255+ # Create a session for streaming
1256+ # Note: We need to keep the session open during iteration
1257+ async with adb .driver .session (
1258+ database = adb ._database_name ,
1259+ impersonated_user = adb .impersonated_user ,
1260+ ) as session :
1261+ stream = adb ._stream_cypher_query (
1262+ session ,
1263+ query ,
1264+ self ._query_params ,
1265+ handle_unique = True ,
1266+ resolve_objects = True ,
1267+ )
1268+ async for item in process_stream (stream ):
1269+ yield item
12371270 else :
1238- for result in results :
1239- yield result
1271+ # Sync code path: use traditional approach (fetch all results)
1272+ results , prop_names = await adb .cypher_query (
1273+ query ,
1274+ self ._query_params ,
1275+ resolve_objects = True ,
1276+ )
1277+ if dict_output :
1278+ for item in results :
1279+ yield dict (zip (prop_names , item ))
1280+ return
1281+ # The following is not as elegant as it could be but had to be copied from the
1282+ # version prior to cypher_query with the resolve_objects capability.
1283+ # It seems that certain calls are only supposed to be focusing to the first
1284+ # result item returned (?)
1285+ if results and len (results [0 ]) == 1 :
1286+ for n in results :
1287+ yield n [0 ]
1288+ else :
1289+ for result in results :
1290+ yield result
1291+
1292+
1293+ @dataclass
1294+ class Path :
1295+ """Path traversal definition."""
1296+
1297+ value : str
1298+ optional : bool = False
1299+ include_nodes_in_return : bool = True
1300+ include_rels_in_return : bool = True
1301+ relation_filtering : bool = False
1302+ alias : str | None = None
12401303
12411304
12421305class AsyncBaseSet :
@@ -1249,6 +1312,10 @@ class AsyncBaseSet:
12491312 query_cls = AsyncQueryBuilder
12501313 source_class : type [AsyncStructuredNode ]
12511314
1315+ # Attributes defined in subclasses (AsyncNodeSet)
1316+ _unique_variables : list [str ]
1317+ relations_to_fetch : list [Path ]
1318+
12521319 async def all (self , lazy : bool = False ) -> list :
12531320 """
12541321 Return all nodes belonging to the set
@@ -1263,6 +1330,16 @@ async def all(self, lazy: bool = False) -> list:
12631330 return results
12641331
12651332 async def __aiter__ (self ) -> AsyncIterator :
1333+ """
1334+ Async iterator that streams results from the database one at a time.
1335+
1336+ This provides true async iteration without loading all results into memory first.
1337+ For large result sets, this is much more memory efficient than using all().
1338+
1339+ Example:
1340+ async for node in Coffee.nodes:
1341+ print(node.name) # Process each node as it arrives
1342+ """
12661343 ast = await self .query_cls (self ).build_ast ()
12671344 async for item in ast ._execute ():
12681345 yield item
@@ -1320,18 +1397,6 @@ async def get_item(self, key: int | slice) -> Optional["AsyncBaseSet"]:
13201397 return _first_item
13211398
13221399
1323- @dataclass
1324- class Path :
1325- """Path traversal definition."""
1326-
1327- value : str
1328- optional : bool = False
1329- include_nodes_in_return : bool = True
1330- include_rels_in_return : bool = True
1331- relation_filtering : bool = False
1332- alias : str | None = None
1333-
1334-
13351400@dataclass
13361401class RelationNameResolver :
13371402 """Helper to refer to a relation variable name.
0 commit comments