Skip to content

Commit 9fca458

Browse files
committed
feat: add optional multi-key priority support for RedisSpider
- redis_key can now be str (unchanged behavior) or list[str] - List mode: priority ordered (index 0 = highest) - Drains current queue for throughput - Optional redis_key_check_interval for periodic preemption - Idle detection considers all configured keys - Keys are templated, deduplicated, and validated - Backward compatible: single-key behavior unchanged Closes #310
1 parent f90f76e commit 9fca458

3 files changed

Lines changed: 393 additions & 29 deletions

File tree

src/scrapy_redis/defaults.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,5 @@
2626
START_URLS_KEY = "%(name)s:start_urls"
2727
START_URLS_AS_SET = False
2828
START_URLS_AS_ZSET = False
29+
REDIS_KEY_CHECK_INTERVAL = None
2930
MAX_IDLE_TIME = 0

src/scrapy_redis/spiders.py

Lines changed: 118 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@ class RedisMixin:
1919
redis_key = None
2020
redis_batch_size = None
2121
redis_encoding = None
22+
redis_key_check_interval = None
2223

2324
# Redis client placeholder.
2425
server = None
2526

2627
# Idle start time
2728
spider_idle_start_time = int(time.time())
2829
max_idle_time = None
30+
_redis_keys = None
31+
_last_priority_scan = 0
2932

3033
def start_requests(self):
3134
"""Returns a batch of start requests from redis."""
@@ -56,10 +59,7 @@ def setup_redis(self, crawler=None):
5659
defaults.START_URLS_KEY,
5760
)
5861

59-
self.redis_key = self.redis_key % {"name": self.name}
60-
61-
if not self.redis_key.strip():
62-
raise ValueError("redis_key must not be empty")
62+
self.redis_key, self._redis_keys = self._normalize_redis_key(self.redis_key)
6363

6464
if self.redis_batch_size is None:
6565
self.redis_batch_size = settings.getint(
@@ -76,12 +76,6 @@ def setup_redis(self, crawler=None):
7676
"REDIS_ENCODING", defaults.REDIS_ENCODING
7777
)
7878

79-
self.logger.info(
80-
"Reading start URLs from redis key '%(redis_key)s' "
81-
"(batch size: %(redis_batch_size)s, encoding: %(redis_encoding)s)",
82-
self.__dict__,
83-
)
84-
8579
self.server = connection.from_settings(crawler.settings)
8680

8781
if settings.getbool("REDIS_START_URLS_AS_SET", defaults.START_URLS_AS_SET):
@@ -94,6 +88,38 @@ def setup_redis(self, crawler=None):
9488
self.fetch_data = self.pop_list_queue
9589
self.count_size = self.server.llen
9690

91+
if self.redis_key_check_interval is None:
92+
self.redis_key_check_interval = settings.get(
93+
"REDIS_KEY_CHECK_INTERVAL", defaults.REDIS_KEY_CHECK_INTERVAL
94+
)
95+
96+
if self.redis_key_check_interval in (None, 0, "0"):
97+
self.redis_key_check_interval = 0
98+
else:
99+
try:
100+
self.redis_key_check_interval = int(self.redis_key_check_interval)
101+
except (TypeError, ValueError):
102+
raise ValueError("redis_key_check_interval must be an integer or None")
103+
104+
self._last_priority_scan = 0
105+
106+
if self._redis_keys:
107+
self.logger.info(
108+
"Reading start URLs from redis keys '%(redis_keys)s' "
109+
"(batch size: %(redis_batch_size)s, encoding: %(redis_encoding)s)",
110+
{
111+
"redis_keys": self._redis_keys,
112+
"redis_batch_size": self.redis_batch_size,
113+
"redis_encoding": self.redis_encoding,
114+
},
115+
)
116+
else:
117+
self.logger.info(
118+
"Reading start URLs from redis key '%(redis_key)s' "
119+
"(batch size: %(redis_batch_size)s, encoding: %(redis_encoding)s)",
120+
self.__dict__,
121+
)
122+
97123
if self.max_idle_time is None:
98124
self.max_idle_time = settings.get(
99125
"MAX_IDLE_TIME_BEFORE_CLOSE", defaults.MAX_IDLE_TIME
@@ -108,6 +134,83 @@ def setup_redis(self, crawler=None):
108134
# that's when we will schedule new requests from redis queue
109135
crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)
110136

137+
def _normalize_redis_key(self, redis_key):
138+
if isinstance(redis_key, str):
139+
redis_key = redis_key % {"name": self.name}
140+
if not redis_key.strip():
141+
raise ValueError("redis_key must not be empty")
142+
return redis_key, None
143+
144+
if isinstance(redis_key, list):
145+
redis_keys = []
146+
seen = set()
147+
for key in redis_key:
148+
formatted_key = key % {"name": self.name}
149+
stripped_key = formatted_key.strip()
150+
if not stripped_key:
151+
raise ValueError("redis_key must not be empty")
152+
if stripped_key not in seen:
153+
seen.add(stripped_key)
154+
redis_keys.append(stripped_key)
155+
156+
if not redis_keys:
157+
raise ValueError("redis_key list is empty after normalization")
158+
159+
return redis_keys[0], redis_keys
160+
161+
return redis_key, None
162+
163+
def _is_multi_key_mode(self):
164+
return bool(self._redis_keys)
165+
166+
def _any_key_has_items(self):
167+
if not self._is_multi_key_mode():
168+
return self.count_size(self.redis_key) > 0
169+
return sum(self.count_size(key) for key in self._redis_keys) > 0
170+
171+
def _select_priority_key(self):
172+
"""Return the first non-empty key from _redis_keys in priority order."""
173+
if not self._redis_keys:
174+
return self.redis_key
175+
for key in self._redis_keys:
176+
if self.count_size(key) > 0:
177+
return key
178+
return None
179+
180+
def _maybe_switch_to_higher_priority_key(self):
181+
if not self._redis_keys:
182+
return
183+
184+
selected_key = self._select_priority_key()
185+
if selected_key is None:
186+
return
187+
188+
try:
189+
current_index = self._redis_keys.index(self.redis_key)
190+
except ValueError:
191+
current_index = len(self._redis_keys)
192+
193+
selected_index = self._redis_keys.index(selected_key)
194+
if selected_index < current_index:
195+
self.redis_key = selected_key
196+
197+
def _maybe_check_priority_scan(self):
198+
if not self._redis_keys:
199+
return
200+
201+
if self.count_size(self.redis_key) == 0:
202+
selected_key = self._select_priority_key()
203+
if selected_key is not None:
204+
self.redis_key = selected_key
205+
206+
if not self.redis_key_check_interval:
207+
return
208+
209+
current_time = time.time()
210+
if current_time - self._last_priority_scan >= self.redis_key_check_interval:
211+
self._maybe_switch_to_higher_priority_key()
212+
self._last_priority_scan = current_time
213+
111214
def pop_list_queue(self, redis_key, batch_size):
112215
with self.server.pipeline() as pipe:
113216
pipe.lrange(redis_key, 0, batch_size - 1)
@@ -125,6 +228,7 @@ def pop_priority_queue(self, redis_key, batch_size):
125228
def next_requests(self):
126229
"""Returns a request to be scheduled or none."""
127230
# XXX: Do we need to use a timeout here?
231+
self._maybe_check_priority_scan()
128232
found = 0
129233
datas = self.fetch_data(self.redis_key, self.redis_batch_size)
130234
for data in datas:
@@ -208,6 +312,7 @@ def make_request_from_data(self, data):
208312
def schedule_next_requests(self):
209313
"""Schedules a request if available"""
210314
# TODO: While there is capacity, schedule a batch of redis requests.
315+
self._maybe_check_priority_scan()
211316
for req in self.next_requests():
212317
# see https://github.com/scrapy/scrapy/issues/5994
213318
if scrapy_version >= (2, 6):
@@ -221,7 +326,9 @@ def spider_idle(self):
221326
or close spider when waiting seconds > MAX_IDLE_TIME_BEFORE_CLOSE.
222327
MAX_IDLE_TIME_BEFORE_CLOSE will not affect SCHEDULER_IDLE_BEFORE_CLOSE.
223328
"""
224-
if self.server is not None and self.count_size(self.redis_key) > 0:
329+
self._maybe_check_priority_scan()
330+
331+
if self.server is not None and self._any_key_has_items():
225332
self.spider_idle_start_time = int(time.time())
226333

227334
self.schedule_next_requests()

0 commit comments

Comments
 (0)