Skip to content

Commit c765887

Browse files
Merge pull request #75 from Blockchain-Technology-Lab/concurrency
Concurrency
2 parents 0a4821d + ccb829e commit c765887

File tree

5 files changed

+132
-54
lines changed

5 files changed

+132
-54
lines changed

requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ matplotlib~=3.4.3
77
pandas~=1.3.4
88
python-dateutil~=2.8.2
99
pytest-mock~=3.12.0
10+
psutil~=6.0.0

tests/test_analyze.py

+20-25
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
from tokenomics_decentralization.analyze import analyze_snapshot, analyze, get_entries
2-
from unittest.mock import call
1+
from tokenomics_decentralization.analyze import analyze_snapshot, analyze, get_entries, analyze_ledger_snapshot
2+
from unittest.mock import call, Mock
33
import pathlib
44

55

@@ -129,20 +129,25 @@ def test_get_entries(mocker):
129129

130130

131131
def test_analyze(mocker):
132+
get_concurrency_mock = mocker.patch('tokenomics_decentralization.helper.get_concurrency_per_ledger')
133+
get_concurrency_mock.return_value = {'bitcoin': 2, 'ethereum': 2}
134+
135+
write_csv_output_mock = mocker.patch('tokenomics_decentralization.helper.write_csv_output')
136+
137+
analyze(['bitcoin'], ['2010-01-01'])
138+
assert len(write_csv_output_mock.call_args_list) == 1
139+
140+
141+
def test_analyze_ledger_snapshot(mocker):
132142
get_input_directories_mock = mocker.patch('tokenomics_decentralization.helper.get_input_directories')
133143
get_input_directories_mock.return_value = [pathlib.Path('/').resolve()]
134144

135145
is_file_mock = mocker.patch('os.path.isfile')
136146
is_file_mock.side_effect = {
137147
pathlib.Path('/bitcoin_2010-01-01_raw_data.csv').resolve(): True,
138-
pathlib.Path('/bitcoin_2011-01-01_raw_data.csv').resolve(): False,
139148
pathlib.Path('/ethereum_2010-01-01_raw_data.csv').resolve(): False,
140-
pathlib.Path('/ethereum_2011-01-01_raw_data.csv').resolve(): True,
141149
}.get
142150

143-
get_db_connector_mock = mocker.patch('tokenomics_decentralization.db_helper.get_connector')
144-
get_db_connector_mock.return_value = 'connector'
145-
146151
get_entries_mock = mocker.patch('tokenomics_decentralization.analyze.get_entries')
147152
entries = [1, 2]
148153
get_entries_mock.return_value = entries
@@ -153,36 +158,26 @@ def test_analyze(mocker):
153158
get_output_row_mock = mocker.patch('tokenomics_decentralization.helper.get_output_row')
154159
get_output_row_mock.return_value = 'row'
155160

156-
write_csv_output_mock = mocker.patch('tokenomics_decentralization.helper.write_csv_output')
161+
sema = Mock()
157162

158-
get_input_dirs_calls = []
159163
get_entries_calls = []
160164
analyze_snapshot_calls = []
161165
get_row_calls = []
162-
write_output_calls = []
166+
sema_release_calls = []
163167

164-
analyze(['bitcoin'], ['2010-01-01'])
165-
get_input_dirs_calls.append(call())
166-
assert get_input_directories_mock.call_args_list == get_input_dirs_calls
168+
analyze_ledger_snapshot('bitcoin', '2010-01-01', [], sema)
167169
get_entries_calls.append(call('bitcoin', '2010-01-01', pathlib.Path('/bitcoin_2010-01-01_raw_data.csv').resolve()))
168170
assert get_entries_mock.call_args_list == get_entries_calls
169171
analyze_snapshot_calls.append(call(entries))
170172
assert analyze_snapshot_mock.call_args_list == analyze_snapshot_calls
171173
get_row_calls.append(call('bitcoin', '2010-01-01', {'hhi': 1}))
172174
assert get_output_row_mock.call_args_list == get_row_calls
173-
write_output_calls.append(call(['row']))
174-
assert write_csv_output_mock.call_args_list == write_output_calls
175+
sema_release_calls.append(call())
176+
assert sema.release.call_args_list == sema_release_calls
175177

176-
analyze(['bitcoin', 'ethereum'], ['2010-01-01', '2011-01-01'])
177-
get_input_dirs_calls += 4 * [call()]
178-
assert get_input_directories_mock.call_args_list == get_input_dirs_calls
179-
get_entries_calls.append(call('bitcoin', '2010-01-01', pathlib.Path('/bitcoin_2010-01-01_raw_data.csv').resolve()))
180-
get_entries_calls.append(call('ethereum', '2011-01-01', pathlib.Path('/ethereum_2011-01-01_raw_data.csv').resolve()))
178+
analyze_ledger_snapshot('ethereum', '2010-01-01', [], sema)
181179
assert get_entries_mock.call_args_list == get_entries_calls
182-
analyze_snapshot_calls += 2 * [call(entries)]
183180
assert analyze_snapshot_mock.call_args_list == analyze_snapshot_calls
184-
get_row_calls.append(call('bitcoin', '2010-01-01', {'hhi': 1}))
185-
get_row_calls.append(call('ethereum', '2011-01-01', {'hhi': 1}))
186181
assert get_output_row_mock.call_args_list == get_row_calls
187-
write_output_calls.append(call(['row', 'row']))
188-
assert write_csv_output_mock.call_args_list == write_output_calls
182+
sema_release_calls.append(call()) # Test that semaphore release is called even if file does not exist
183+
assert sema.release.call_args_list == sema_release_calls

tests/test_helper.py

+28-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import tokenomics_decentralization.helper as hlp
2+
from collections import namedtuple
23
import pathlib
34
import os
4-
import argparse
55
import datetime
66
import pytest
77

@@ -12,9 +12,9 @@ def test_valid_date():
1212

1313
for d in ['2022/1/01', '2022/01/1', '2022/1', '2022/01', '2022.1.01', '2022.01.1', '2022.1', '2022.01', 'blah',
1414
'2022-', '2022-1-1', '2022-1-01', '2022-1', '2022-01-1', '2022-02-29']:
15-
with pytest.raises(argparse.ArgumentTypeError) as e_info:
15+
with pytest.raises(ValueError) as e_info:
1616
hlp.valid_date(d)
17-
assert e_info.type == argparse.ArgumentTypeError
17+
assert e_info.type == ValueError
1818

1919

2020
def test_get_date_beginning():
@@ -486,3 +486,28 @@ def test_get_clusters(mocker):
486486
assert clusters['entity1'] == clusters['entity3']
487487
assert clusters['entity4'] == clusters['entity5']
488488
assert 'entity7' not in clusters.keys()
489+
490+
491+
def test_get_concurrency_per_ledger(mocker):
492+
psutil_memory_mock = mocker.patch('psutil.virtual_memory')
493+
psutil_memory_mock.return_value = namedtuple('VM', 'total')(10*10**9)
494+
495+
get_input_directories_mock = mocker.patch('tokenomics_decentralization.helper.get_input_directories')
496+
get_input_directories_mock.return_value = [pathlib.Path('/').resolve()]
497+
498+
get_ledgers_mock = mocker.patch('tokenomics_decentralization.helper.get_ledgers')
499+
get_ledgers_mock.return_value = ['bitcoin', 'ethereum']
500+
501+
os_walk_mock = mocker.patch('os.walk')
502+
os_walk_mock.return_value = [('/', 'foo', ['bitcoin_2010-01-01_raw_data.csv'])]
503+
504+
os_stat_mock = mocker.patch('os.stat')
505+
os_stat_mock.return_value = namedtuple('ST', 'st_size')(10*10**8)
506+
507+
concurrency = hlp.get_concurrency_per_ledger()
508+
assert concurrency == {'bitcoin': 3, 'ethereum': 1}
509+
510+
os_stat_mock.return_value = namedtuple('ST', 'st_size')(5*10**9)
511+
512+
with pytest.raises(ValueError):
513+
hlp.get_concurrency_per_ledger()

tokenomics_decentralization/analyze.py

+42-23
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import csv
2+
import multiprocessing
23
import os.path
34
import tokenomics_decentralization.helper as hlp
45
import tokenomics_decentralization.db_helper as db_hlp
@@ -113,35 +114,53 @@ def get_entries(ledger, date, filename):
113114
return entries
114115

115116

117+
def analyze_ledger_snapshot(ledger, date, output_rows, sema):
118+
"""
119+
Executes the analysis of a given ledgers and snapshot date.
120+
:param ledger: a ledger name
121+
:param date: a string in YYYY-MM-DD format
122+
:param output_rows: a list of strings in the form of csv output rows
123+
:param sema: a multiprocessing semaphore
124+
"""
125+
input_filename = None
126+
input_paths = [input_dir / f'{ledger}_{date}_raw_data.csv' for input_dir in hlp.get_input_directories()]
127+
for filename in input_paths:
128+
if os.path.isfile(filename):
129+
input_filename = filename
130+
break
131+
if input_filename:
132+
logging.info(f'[*] {ledger} - {date}')
133+
134+
entries = get_entries(ledger, date, filename)
135+
metrics_values = analyze_snapshot(entries)
136+
del entries
137+
138+
row = hlp.get_output_row(ledger, date, metrics_values)
139+
output_rows.append(row)
140+
141+
sema.release() # Release the semaphore s.t. the loop in analyze() can continue
142+
143+
116144
def analyze(ledgers, snapshot_dates):
117145
"""
118146
Executes the analysis of the given ledgers for the snapshot dates and writes the output
119147
to csv files.
120148
:param ledgers: a list of ledger names
121149
:param snapshot_dates: a list of strings in YYYY-MM-DD format
122150
"""
123-
output_rows = []
151+
manager = multiprocessing.Manager()
152+
output_rows = manager.list() # output_rows is a shared list across all parallel processes
153+
154+
concurrency = hlp.get_concurrency_per_ledger()
124155
for ledger in ledgers:
125-
logging.info(f'[*] {ledger} - Analyzing')
156+
sema = multiprocessing.Semaphore(concurrency[ledger])
157+
jobs = []
126158
for date in snapshot_dates:
127-
logging.info(f'[*] {ledger} - {date}')
128-
129-
input_filename = None
130-
input_paths = [input_dir / f'{ledger}_{date}_raw_data.csv' for input_dir in hlp.get_input_directories()]
131-
for filename in input_paths:
132-
if os.path.isfile(filename):
133-
input_filename = filename
134-
break
135-
if not input_filename:
136-
logging.error(f'{ledger} input data for {date} do not exist')
137-
continue
138-
139-
entries = get_entries(ledger, date, filename)
140-
metrics_values = analyze_snapshot(entries)
141-
del entries
142-
143-
output_rows.append(hlp.get_output_row(ledger, date, metrics_values))
144-
for metric, value in metrics_values.items():
145-
logging.info(f'{metric}: {value}')
146-
147-
hlp.write_csv_output(output_rows)
159+
sema.acquire() # Loop blocks here while the active processes are as many as the semaphore's limit
160+
p = multiprocessing.Process(target=analyze_ledger_snapshot, args=(ledger, date, output_rows, sema))
161+
jobs.append(p)
162+
p.start()
163+
for proc in jobs:
164+
proc.join()
165+
166+
hlp.write_csv_output(sorted(output_rows, key=lambda x: (x[0], x[1]))) # Csv rows ordered by ledger and date

tokenomics_decentralization/helper.py

+41-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import os
77
import datetime
88
import calendar
9-
import argparse
9+
import psutil
1010
import json
1111
from collections import defaultdict
1212
import logging
@@ -33,8 +33,8 @@ def valid_date(date_string):
3333
try:
3434
get_date_beginning(date_string)
3535
except ValueError:
36-
raise argparse.ArgumentTypeError("Please use the format YYYY-MM-DD for the timeframe argument "
37-
"(day and month can be omitted).")
36+
raise ValueError("Please use the format YYYY-MM-DD for the timeframe argument "
37+
"(day and month can be omitted).")
3838
return date_string
3939

4040

@@ -607,3 +607,41 @@ def get_clusters(ledger):
607607
cluster_mapping[item[0]] = cluster_name
608608

609609
return cluster_mapping
610+
611+
612+
def get_concurrency_per_ledger():
613+
"""
614+
Computes the maximum number of parallel processes that can run per ledger,
615+
based on the system's available memory.
616+
:returns: a dictionary where the keys are ledger names and values are integers
617+
"""
618+
system_memory_total = psutil.virtual_memory().total # Get the system's total memory
619+
system_memory_total -= 10**9 # Leave 1GB of memory to be used by other processes
620+
621+
concurrency = {}
622+
too_large_ledgers = set()
623+
input_dirs = get_input_directories()
624+
for ledger in get_ledgers():
625+
# Find the size of the largest input file per ledger
626+
max_file_size = 0
627+
for input_dir in input_dirs:
628+
for folder, _, files in os.walk(input_dir):
629+
for file in files:
630+
if file.startswith(ledger):
631+
max_file_size = max(max_file_size, os.stat(os.path.join(folder, file)).st_size)
632+
# Compute the max number of processes that can open the largest ledger file
633+
# and run in parallel without exhausting the system's memory.
634+
if max_file_size > 0:
635+
# When loaded in (a dict in) memory, each file consumes approx. 2.5x space compared to storage.
636+
concurrency[ledger] = int(system_memory_total / (2.5 * max_file_size))
637+
# Find if some ledger files are too large to fit in the system's available memory.
638+
if concurrency[ledger] == 0:
639+
too_large_ledgers.add(ledger)
640+
else:
641+
concurrency[ledger] = 1
642+
643+
if too_large_ledgers:
644+
raise ValueError('The max input files of the following ledgers are too'
645+
'large to load in memory' + ','.join(too_large_ledgers))
646+
647+
return concurrency

0 commit comments

Comments
 (0)