Skip to content

Commit 8c22aff

Browse files
committed
exit when task done
1 parent dc90dc5 commit 8c22aff

File tree

4 files changed

+36
-14
lines changed

4 files changed

+36
-14
lines changed

core/worker/consumer.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,27 @@ def __init__(self, **kwargs):
2121
self.__cookie_file = kwargs.pop('cookie_file')
2222
self.redis_handle = RedisUtils(db=kwargs.pop('redis_db'), tld=kwargs.pop('tld'))
2323

24-
def consume(self):
24+
def consume(self, tspider_context):
2525
if not self.redis_handle.connected:
2626
logger.error('no redis connection found in consumer! exit.')
2727
return
28-
2928
while True:
3029
try:
3130
url = self.redis_handle.fetch_one_task()
3231
logger.info('get task url: %s' % url)
3332
logger.info('%d tasks left' % self.redis_handle.task_counts)
33+
with tspider_context['lock']:
34+
tspider_context['live_spider_counts'].value += 1
3435
self.start_spider(url, self.__cookie_file)
3536
except:
3637
logger.exception('consumer exception!')
3738
if not self.redis_handle.connected:
3839
logger.error('redis disconnected! reconnecting...')
3940
self.redis_handle.connect()
4041
time.sleep(10)
42+
finally:
43+
with tspider_context['lock']:
44+
tspider_context['live_spider_counts'].value -= 1
4145

4246
def start_spider(self, url, cookie_file=None):
4347
results = SpiderPage(url, cookie_file=cookie_file).spider()

core/worker/producer.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def __init__(self, **kwargs):
3030
self.mongo_handle = None
3131
self.redis_handle = RedisUtils(db=kwargs.pop('redis_db'), tld=kwargs.pop('tld'))
3232

33-
def produce(self):
33+
def produce(self, tspider_context):
3434
# mongodb with multipleprocessing must be init after fork
3535
self.mongo_handle = MongoUtils(db=self.__mongo_db)
3636
if not self.redis_handle.connected or not self.mongo_handle.connected:
@@ -40,7 +40,8 @@ def produce(self):
4040
while True:
4141
try:
4242
_, req = self.redis_handle.fetch_one_result()
43-
logger.debug('got req, %d results left' % self.redis_handle.result_counts)
43+
remainder_result = self.redis_handle.result_counts
44+
logger.debug('got req, %d results left' % remainder_result)
4445
self.proc_req(req)
4546
except:
4647
logger.exception('produce exception!')
@@ -51,6 +52,12 @@ def produce(self):
5152
logger.error('mongodb disconnected! reconnecting...')
5253
self.mongo_handle.connect()
5354
time.sleep(10)
55+
finally:
56+
if remainder_result == 0 and self.redis_handle.task_counts == 0:
57+
with tspider_context['lock']:
58+
live_spider_counts = tspider_context['live_spider_counts'].value
59+
if live_spider_counts == 0:
60+
tspider_context['task_done'].set()
5461

5562
def proc_req(self, req):
5663
try:

doc/CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
### (2017-04-11)
2+
3+
- 修复自适应协议(//开头的url)导致的异常
4+
- 修复urlparse后用urlunsplit重新组合url导致的错误
5+
- 修复request捕获的url数据未加type字段
6+
- 新增任务扫描完成后退出功能
7+
18
### (2017-04-05)
29

310
- 添加MutationObserver支持

tspider.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,17 @@
77
Copyright (c) 2016-2017 [email protected] (http://twi1ight.com/)
88
See the file 'doc/COPYING' for copying permission
99
"""
10+
import os
1011
import sys
1112
import argparse
12-
from multiprocessing import Process
13+
from multiprocessing import Process, Value, Lock, Event
1314

1415
from core.utils.log import logger
1516
from core.utils.redis_utils import RedisUtils
1617
from core.utils.url import URL
1718
from core.worker.consumer import Consumer
1819
from core.worker.producer import Producer
19-
from settings import VERSION, RedisConf, MongoConf
20+
from settings import VERSION, RedisConf, MongoConf, TMPDIR_PATH
2021

2122

2223
def cmdparse():
@@ -52,25 +53,29 @@ def cmdparse():
5253

5354
if __name__ == '__main__':
5455
args = cmdparse()
55-
producer_pool = []
56-
consumer_pool = []
5756
redis_handle = RedisUtils(db=args.redis_db)
5857
if args.keepon:
5958
redis_handle.restore_startup_params(args)
6059
logger.info(args)
6160

61+
for f in os.listdir(TMPDIR_PATH):
62+
os.remove(os.path.join(TMPDIR_PATH, f))
63+
tspider_context = {}
64+
tspider_context['live_spider_counts'] = Value('i', 0)
65+
tspider_context['task_done'] = Event()
66+
tspider_context['lock'] = Lock()
6267
kwargs = {'tld': args.tld, 'cookie_file': args.cookie_file,
6368
'redis_db': args.redis_db, 'mongo_db': args.mongo_db}
6469
for _ in range(args.consumer):
6570
worker = Consumer(**kwargs).consume
66-
proc = Process(name='consumer-%d' % _, target=worker)
71+
proc = Process(name='consumer-%d' % _, target=worker, args=(tspider_context,))
72+
proc.daemon = True
6773
proc.start()
68-
consumer_pool.append(proc)
6974
for _ in range(args.producer):
7075
worker = Producer(**kwargs).produce
71-
proc = Process(name='producer-%d' % _, target=worker)
76+
proc = Process(name='producer-%d' % _, target=worker, args=(tspider_context,))
77+
proc.daemon = True
7278
proc.start()
73-
producer_pool.append(proc)
7479

7580
if not args.keepon:
7681
redis_handle.flushdb()
@@ -88,5 +93,4 @@ def cmdparse():
8893
producer.create_task_from_file(target)
8994

9095
redis_handle.close()
91-
map(lambda x: x.join(), consumer_pool)
92-
map(lambda x: x.join(), producer_pool)
96+
tspider_context['task_done'].wait()

0 commit comments

Comments
 (0)