44import asyncio
55import json
66import logging
7+ import multiprocessing
78import os
89import re
910import sys
@@ -1868,6 +1869,18 @@ def _process_run_crawlers(crawler_inputs: list[_CrawlerInput], storage_dir: str)
18681869 ]
18691870
18701871
1872+ def _run_in_subprocess (func : Any , * args : Any , ** kwargs : Any ) -> Any :
1873+ """Spawn a child process via ``ProcessPoolExecutor``, run *func*, and return the result.
1874+
1875+ Uses the ``spawn`` start method to avoid inheriting the parent's tokio runtime thread state
1876+ (created by ``pyo3-async-runtimes``) via ``fork``, which causes the child to hang on exit on
1877+ Linux. See pyo3-async-runtimes#40 / #64.
1878+ """
1879+ ctx = multiprocessing .get_context ('spawn' )
1880+ with ProcessPoolExecutor (max_workers = 1 , mp_context = ctx ) as executor :
1881+ return executor .submit (func , * args , ** kwargs ).result ()
1882+
1883+
18711884async def test_crawler_state_persistence (tmp_path : Path ) -> None :
18721885 """Test that crawler statistics and state persist and are loaded correctly.
18731886
@@ -1877,37 +1890,34 @@ async def test_crawler_state_persistence(tmp_path: Path) -> None:
18771890 storage_client = FileSystemStorageClient (), configuration = Configuration (storage_dir = str (tmp_path ))
18781891 )
18791892
1880- with ProcessPoolExecutor () as executor :
1881- # Crawl 2 requests in the first run and automatically persist the state.
1882- first_run_state = executor .submit (
1883- _process_run_crawlers ,
1884- crawler_inputs = [_CrawlerInput (requests = ['https://a.placeholder.com' , 'https://b.placeholder.com' ])],
1885- storage_dir = str (tmp_path ),
1886- ).result ()[0 ]
1887- # Expected state after first crawler run
1888- assert first_run_state .requests_finished == 2
1889- state = await state_kvs .get_value (f'{ BasicCrawler ._CRAWLEE_STATE_KEY } _0' )
1890- assert state .get ('urls' ) == ['https://a.placeholder.com' , 'https://b.placeholder.com' ]
1891-
1892- # Do not reuse the executor to simulate a fresh process to avoid modified class attributes.
1893- with ProcessPoolExecutor () as executor :
1894- # Crawl 1 additional requests in the second run, but use previously automatically persisted state.
1895- second_run_state = executor .submit (
1896- _process_run_crawlers ,
1897- crawler_inputs = [_CrawlerInput (requests = ['https://c.placeholder.com' ])],
1898- storage_dir = str (tmp_path ),
1899- ).result ()[0 ]
1900-
1901- # Expected state after second crawler run
1902- # 2 requests from first run and 1 request from second run.
1903- assert second_run_state .requests_finished == 3
1904-
1905- state = await state_kvs .get_value (f'{ BasicCrawler ._CRAWLEE_STATE_KEY } _0' )
1906- assert state .get ('urls' ) == [
1907- 'https://a.placeholder.com' ,
1908- 'https://b.placeholder.com' ,
1909- 'https://c.placeholder.com' ,
1910- ]
1893+ # Crawl 2 requests in the first run and automatically persist the state.
1894+ first_run_state = _run_in_subprocess (
1895+ _process_run_crawlers ,
1896+ crawler_inputs = [_CrawlerInput (requests = ['https://a.placeholder.com' , 'https://b.placeholder.com' ])],
1897+ storage_dir = str (tmp_path ),
1898+ )[0 ]
1899+ # Expected state after first crawler run
1900+ assert first_run_state .requests_finished == 2
1901+ state = await state_kvs .get_value (f'{ BasicCrawler ._CRAWLEE_STATE_KEY } _0' )
1902+ assert state .get ('urls' ) == ['https://a.placeholder.com' , 'https://b.placeholder.com' ]
1903+
1904+ # Crawl 1 additional request in the second run, but use previously automatically persisted state.
1905+ second_run_state = _run_in_subprocess (
1906+ _process_run_crawlers ,
1907+ crawler_inputs = [_CrawlerInput (requests = ['https://c.placeholder.com' ])],
1908+ storage_dir = str (tmp_path ),
1909+ )[0 ]
1910+
1911+ # Expected state after second crawler run
1912+ # 2 requests from first run and 1 request from second run.
1913+ assert second_run_state .requests_finished == 3
1914+
1915+ state = await state_kvs .get_value (f'{ BasicCrawler ._CRAWLEE_STATE_KEY } _0' )
1916+ assert state .get ('urls' ) == [
1917+ 'https://a.placeholder.com' ,
1918+ 'https://b.placeholder.com' ,
1919+ 'https://c.placeholder.com' ,
1920+ ]
19111921
19121922 assert first_run_state .crawler_started_at == second_run_state .crawler_started_at
19131923 assert first_run_state .crawler_finished_at
@@ -1926,41 +1936,39 @@ async def test_crawler_state_persistence_2_crawlers_with_migration(tmp_path: Pat
19261936 storage_client = FileSystemStorageClient (), configuration = Configuration (storage_dir = str (tmp_path ))
19271937 )
19281938
1929- with ProcessPoolExecutor () as executor :
1930- # Run 2 crawler, each crawl 1 request in and automatically persist the state.
1931- first_run_states = executor .submit (
1932- _process_run_crawlers ,
1933- crawler_inputs = [
1934- _CrawlerInput (requests = ['https://a.placeholder.com' ]),
1935- _CrawlerInput (requests = ['https://c.placeholder.com' ]),
1936- ],
1937- storage_dir = str (tmp_path ),
1938- ).result ()
1939- # Expected state after first crawler run
1940- assert first_run_states [0 ].requests_finished == 1
1941- assert first_run_states [1 ].requests_finished == 1
1942- state_0 = await state_kvs .get_value (f'{ BasicCrawler ._CRAWLEE_STATE_KEY } _0' )
1943- assert state_0 .get ('urls' ) == ['https://a.placeholder.com' ]
1944- state_1 = await state_kvs .get_value (f'{ BasicCrawler ._CRAWLEE_STATE_KEY } _1' )
1945- assert state_1 .get ('urls' ) == ['https://c.placeholder.com' ]
1946-
1947- with ProcessPoolExecutor () as executor :
1948- # Run 2 crawler, each crawl 1 request in and automatically persist the state.
1949- second_run_states = executor .submit (
1950- _process_run_crawlers ,
1951- crawler_inputs = [
1952- _CrawlerInput (requests = ['https://b.placeholder.com' ]),
1953- _CrawlerInput (requests = ['https://d.placeholder.com' ]),
1954- ],
1955- storage_dir = str (tmp_path ),
1956- ).result ()
1957- # Expected state after first crawler run
1958- assert second_run_states [0 ].requests_finished == 2
1959- assert second_run_states [1 ].requests_finished == 2
1960- state_0 = await state_kvs .get_value (f'{ BasicCrawler ._CRAWLEE_STATE_KEY } _0' )
1961- assert state_0 .get ('urls' ) == ['https://a.placeholder.com' , 'https://b.placeholder.com' ]
1962- state_1 = await state_kvs .get_value (f'{ BasicCrawler ._CRAWLEE_STATE_KEY } _1' )
1963- assert state_1 .get ('urls' ) == ['https://c.placeholder.com' , 'https://d.placeholder.com' ]
1939+ # Run 2 crawlers, each crawl 1 request and automatically persist the state.
1940+ first_run_states = _run_in_subprocess (
1941+ _process_run_crawlers ,
1942+ crawler_inputs = [
1943+ _CrawlerInput (requests = ['https://a.placeholder.com' ]),
1944+ _CrawlerInput (requests = ['https://c.placeholder.com' ]),
1945+ ],
1946+ storage_dir = str (tmp_path ),
1947+ )
1948+ # Expected state after first crawler run
1949+ assert first_run_states [0 ].requests_finished == 1
1950+ assert first_run_states [1 ].requests_finished == 1
1951+ state_0 = await state_kvs .get_value (f'{ BasicCrawler ._CRAWLEE_STATE_KEY } _0' )
1952+ assert state_0 .get ('urls' ) == ['https://a.placeholder.com' ]
1953+ state_1 = await state_kvs .get_value (f'{ BasicCrawler ._CRAWLEE_STATE_KEY } _1' )
1954+ assert state_1 .get ('urls' ) == ['https://c.placeholder.com' ]
1955+
1956+ # Run 2 crawlers again, each crawl 1 more request.
1957+ second_run_states = _run_in_subprocess (
1958+ _process_run_crawlers ,
1959+ crawler_inputs = [
1960+ _CrawlerInput (requests = ['https://b.placeholder.com' ]),
1961+ _CrawlerInput (requests = ['https://d.placeholder.com' ]),
1962+ ],
1963+ storage_dir = str (tmp_path ),
1964+ )
1965+ # Expected state after second crawler run
1966+ assert second_run_states [0 ].requests_finished == 2
1967+ assert second_run_states [1 ].requests_finished == 2
1968+ state_0 = await state_kvs .get_value (f'{ BasicCrawler ._CRAWLEE_STATE_KEY } _0' )
1969+ assert state_0 .get ('urls' ) == ['https://a.placeholder.com' , 'https://b.placeholder.com' ]
1970+ state_1 = await state_kvs .get_value (f'{ BasicCrawler ._CRAWLEE_STATE_KEY } _1' )
1971+ assert state_1 .get ('urls' ) == ['https://c.placeholder.com' , 'https://d.placeholder.com' ]
19641972
19651973
19661974async def test_crawler_intermediate_statistics () -> None :
0 commit comments