Skip to content

Commit 7e10e4e

Browse files
committed
fix race condition casued exit
1 parent b5e547e commit 7e10e4e

File tree

5 files changed

+37
-49
lines changed

5 files changed

+37
-49
lines changed

core/utils/redis_utils.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ def create_task_from_url(self, url, add_whitelist=True, valid_url_check=True):
153153
:param add_whitelist: for init scan task, disabled in task result produce
154154
:return:
155155
"""
156-
if valid_url_check and not self.valid_task_url(url): return
156+
if valid_url_check and not self.valid_task_url(url): return False
157157

158158
logger.info('add task: %s' % url.urlstring)
159159
self.redis_client.lpush(self.l_url_tasks, url.urlstring)
@@ -163,6 +163,7 @@ def create_task_from_url(self, url, add_whitelist=True, valid_url_check=True):
163163
self.set_url_scanned(url)
164164
# incr req count
165165
self.incr_hostname_reqcount(url.hostname)
166+
return True
166167

167168
def valid_task_url(self, url):
168169
"""

core/worker/consumer.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from core.spider.spider import SpiderPage
1111
from core.utils.log import logger
1212
from core.utils.redis_utils import RedisUtils
13+
from core.utils.url import URL
1314

1415

1516
class Consumer(object):
@@ -18,33 +19,38 @@ def __init__(self, **kwargs):
1819
:return: :class:Consumer object
1920
:rtype: Consumer
2021
"""
22+
self.context = kwargs.pop('context')
2123
self.__cookie_file = kwargs.pop('cookie_file')
2224
self.redis_handle = RedisUtils(db=kwargs.pop('redis_db'), tld=kwargs.pop('tld'))
2325

24-
def consume(self, tspider_context):
26+
def consume(self):
2527
if not self.redis_handle.connected:
2628
logger.error('no redis connection found in consumer! exit.')
2729
return
2830
while True:
2931
try:
3032
url = self.redis_handle.fetch_one_task()
33+
with self.context['lock']:
34+
self.context['live_spider_counts'].value += 1
35+
self.context['task_counts'].value -= 1
3136
logger.info('get task url: %s' % url)
32-
logger.info('%d tasks left' % self.redis_handle.task_counts)
33-
with tspider_context['lock']:
34-
tspider_context['live_spider_counts'].value += 1
35-
self.start_spider(url, self.__cookie_file)
37+
logger.info('%d tasks left' % self.context['task_counts'].value)
38+
if not self.redis_handle.is_blocked(URL(url)):
39+
self.start_spider(url, self.__cookie_file)
3640
except:
3741
logger.exception('consumer exception!')
3842
if not self.redis_handle.connected:
3943
logger.error('redis disconnected! reconnecting...')
4044
self.redis_handle.connect()
4145
time.sleep(10)
4246
finally:
43-
with tspider_context['lock']:
44-
tspider_context['live_spider_counts'].value -= 1
47+
with self.context['lock']:
48+
self.context['live_spider_counts'].value -= 1
4549

4650
def start_spider(self, url, cookie_file=None):
4751
results = SpiderPage(url, cookie_file=cookie_file).spider()
52+
with self.context['lock']:
53+
self.context['result_counts'].value += len(results)
4854
for _ in results:
4955
self.redis_handle.insert_result(_)
5056

core/worker/producer.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,12 @@ def __init__(self, **kwargs):
2626
:return: :class:Producer object
2727
:rtype: Producer
2828
"""
29+
self.context = kwargs.pop('context')
2930
self.__mongo_db = kwargs.pop('mongo_db')
3031
self.mongo_handle = None
3132
self.redis_handle = RedisUtils(db=kwargs.pop('redis_db'), tld=kwargs.pop('tld'))
3233

33-
def produce(self, tspider_context):
34+
def produce(self):
3435
# mongodb with multipleprocessing must be init after fork
3536
self.mongo_handle = MongoUtils(db=self.__mongo_db)
3637
if not self.redis_handle.connected or not self.mongo_handle.connected:
@@ -40,8 +41,9 @@ def produce(self, tspider_context):
4041
while True:
4142
try:
4243
_, req = self.redis_handle.fetch_one_result()
43-
remainder_result = self.redis_handle.result_counts
44-
logger.debug('got req, %d results left' % remainder_result)
44+
with self.context['lock']:
45+
self.context['result_counts'].value -= 1
46+
logger.debug('got req, %d results left' % self.context['result_counts'].value)
4547
self.proc_req(req)
4648
except:
4749
logger.exception('produce exception!')
@@ -53,11 +55,10 @@ def produce(self, tspider_context):
5355
self.mongo_handle.connect()
5456
time.sleep(10)
5557
finally:
56-
if remainder_result == 0 and self.redis_handle.task_counts == 0:
57-
with tspider_context['lock']:
58-
live_spider_counts = tspider_context['live_spider_counts'].value
59-
if live_spider_counts == 0:
60-
tspider_context['task_done'].set()
58+
with self.context['lock']:
59+
if self.context['result_counts'].value == 0:
60+
if self.context['live_spider_counts'].value == 0 and self.context['task_counts'].value == 0:
61+
self.context['task_done'].set()
6162

6263
def proc_req(self, req):
6364
try:
@@ -99,16 +100,18 @@ def proc_req(self, req):
99100
elif method == 'GET':
100101
# new host found, add index page to task queue
101102
if self.redis_handle.get_hostname_reqcount(url.hostname) == 0:
102-
self.redis_handle.create_task_from_url(URL(url.index_page), add_whitelist=False)
103+
self.create_task_from_url(URL(url.index_page), add_whitelist=False)
103104
# check url validation inside create_url_task
104-
self.redis_handle.create_task_from_url(url, add_whitelist=False)
105+
self.create_task_from_url(url, add_whitelist=False)
105106
else:
106107
# not GET nor POST
107108
logger.error('HTTP Verb %s found!' % method)
108109
logger.debug(data)
109110

110-
def create_task_from_url(self, url):
111-
self.redis_handle.create_task_from_url(url)
111+
def create_task_from_url(self, url, **kwargs):
112+
with self.context['lock']:
113+
if self.redis_handle.create_task_from_url(url, **kwargs):
114+
self.context['task_counts'].value += 1
112115

113116
def create_task_from_file(self, fileobj):
114117
"""
@@ -121,7 +124,7 @@ def create_task_from_file(self, fileobj):
121124
line = line.strip()
122125
if not line: continue
123126
url = URL(line)
124-
self.redis_handle.create_task_from_url(url)
127+
self.create_task_from_url(url)
125128

126129

127130
if __name__ == '__main__':

tools/block_domain.py

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,28 +8,9 @@
88
See the file 'doc/COPYING' for copying permission
99
"""
1010
import sys
11-
import traceback
1211

13-
from core.utils.url import URL
1412
from core.utils.redis_utils import RedisUtils
1513

16-
17-
def remove_from_tasklist(domain):
18-
urls = []
19-
while True:
20-
try:
21-
urlstring = r.fetch_one_task(timeout=3)
22-
url = URL(urlstring)
23-
if r.is_blocked(url):
24-
continue
25-
urls.append(urlstring)
26-
except:
27-
break
28-
29-
for url in urls:
30-
r.create_task_from_url(URL(url), add_whitelist=False, valid_url_check=False)
31-
32-
3314
if __name__ == '__main__':
3415
if len(sys.argv) != 3:
3516
print 'usage: block_domain.py db target.com'
@@ -38,9 +19,4 @@ def remove_from_tasklist(domain):
3819
domain = sys.argv[2]
3920
r = RedisUtils(db=db)
4021
r.add_blocklist(domain)
41-
42-
if r.redis_client.hexists(r.h_blocklist, domain):
43-
remove_from_tasklist(domain)
44-
print 'add success!'
45-
else:
46-
print 'add failed!'
22+
print 'add success!'

tspider.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,18 +62,20 @@ def cmdparse():
6262
os.remove(os.path.join(TMPDIR_PATH, f))
6363
tspider_context = {
6464
'live_spider_counts': Value('i', 0),
65+
'task_counts': Value('i', 0),
66+
'result_counts': Value('i', 0),
6567
'task_done': Event(),
6668
'lock': Lock()}
67-
kwargs = {'tld': args.tld, 'cookie_file': args.cookie_file,
69+
kwargs = {'tld': args.tld, 'cookie_file': args.cookie_file, 'context': tspider_context,
6870
'redis_db': args.redis_db, 'mongo_db': args.mongo_db}
6971
for _ in range(args.consumer):
7072
worker = Consumer(**kwargs).consume
71-
proc = Process(name='consumer-%d' % _, target=worker, args=(tspider_context,))
73+
proc = Process(name='consumer-%d' % _, target=worker)
7274
proc.daemon = True
7375
proc.start()
7476
for _ in range(args.producer):
7577
worker = Producer(**kwargs).produce
76-
proc = Process(name='producer-%d' % _, target=worker, args=(tspider_context,))
78+
proc = Process(name='producer-%d' % _, target=worker)
7779
proc.daemon = True
7880
proc.start()
7981

0 commit comments

Comments
 (0)