Skip to content

Commit 6f9ae63

Browse files
committed
Use parallel processes when analyzing ledger snapshots
1 parent 85f4205 commit 6f9ae63

File tree

2 files changed

+62
-48
lines changed

2 files changed

+62
-48
lines changed

tests/test_analyze.py

+20-25
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
from tokenomics_decentralization.analyze import analyze_snapshot, analyze, get_entries
2-
from unittest.mock import call
1+
from tokenomics_decentralization.analyze import analyze_snapshot, analyze, get_entries, analyze_ledger_snapshot
2+
from unittest.mock import call, Mock
33
import pathlib
44

55

@@ -129,20 +129,25 @@ def test_get_entries(mocker):
129129

130130

131131
def test_analyze(mocker):
132+
get_concurrency_mock = mocker.patch('tokenomics_decentralization.helper.get_concurrency_per_ledger')
133+
get_concurrency_mock.return_value = {'bitcoin': 2, 'ethereum': 2}
134+
135+
write_csv_output_mock = mocker.patch('tokenomics_decentralization.helper.write_csv_output')
136+
137+
analyze(['bitcoin'], ['2010-01-01'])
138+
assert len(write_csv_output_mock.call_args_list) == 1
139+
140+
141+
def test_analyze_ledger_snapshot(mocker):
132142
get_input_directories_mock = mocker.patch('tokenomics_decentralization.helper.get_input_directories')
133143
get_input_directories_mock.return_value = [pathlib.Path('/').resolve()]
134144

135145
is_file_mock = mocker.patch('os.path.isfile')
136146
is_file_mock.side_effect = {
137147
pathlib.Path('/bitcoin_2010-01-01_raw_data.csv').resolve(): True,
138-
pathlib.Path('/bitcoin_2011-01-01_raw_data.csv').resolve(): False,
139148
pathlib.Path('/ethereum_2010-01-01_raw_data.csv').resolve(): False,
140-
pathlib.Path('/ethereum_2011-01-01_raw_data.csv').resolve(): True,
141149
}.get
142150

143-
get_db_connector_mock = mocker.patch('tokenomics_decentralization.db_helper.get_connector')
144-
get_db_connector_mock.return_value = 'connector'
145-
146151
get_entries_mock = mocker.patch('tokenomics_decentralization.analyze.get_entries')
147152
entries = [1, 2]
148153
get_entries_mock.return_value = entries
@@ -153,36 +158,26 @@ def test_analyze(mocker):
153158
get_output_row_mock = mocker.patch('tokenomics_decentralization.helper.get_output_row')
154159
get_output_row_mock.return_value = 'row'
155160

156-
write_csv_output_mock = mocker.patch('tokenomics_decentralization.helper.write_csv_output')
161+
sema = Mock()
157162

158-
get_input_dirs_calls = []
159163
get_entries_calls = []
160164
analyze_snapshot_calls = []
161165
get_row_calls = []
162-
write_output_calls = []
166+
sema_release_calls = []
163167

164-
analyze(['bitcoin'], ['2010-01-01'])
165-
get_input_dirs_calls.append(call())
166-
assert get_input_directories_mock.call_args_list == get_input_dirs_calls
168+
analyze_ledger_snapshot('bitcoin', '2010-01-01', [], sema)
167169
get_entries_calls.append(call('bitcoin', '2010-01-01', pathlib.Path('/bitcoin_2010-01-01_raw_data.csv').resolve()))
168170
assert get_entries_mock.call_args_list == get_entries_calls
169171
analyze_snapshot_calls.append(call(entries))
170172
assert analyze_snapshot_mock.call_args_list == analyze_snapshot_calls
171173
get_row_calls.append(call('bitcoin', '2010-01-01', {'hhi': 1}))
172174
assert get_output_row_mock.call_args_list == get_row_calls
173-
write_output_calls.append(call(['row']))
174-
assert write_csv_output_mock.call_args_list == write_output_calls
175+
sema_release_calls.append(call())
176+
assert sema.release.call_args_list == sema_release_calls
175177

176-
analyze(['bitcoin', 'ethereum'], ['2010-01-01', '2011-01-01'])
177-
get_input_dirs_calls += 4 * [call()]
178-
assert get_input_directories_mock.call_args_list == get_input_dirs_calls
179-
get_entries_calls.append(call('bitcoin', '2010-01-01', pathlib.Path('/bitcoin_2010-01-01_raw_data.csv').resolve()))
180-
get_entries_calls.append(call('ethereum', '2011-01-01', pathlib.Path('/ethereum_2011-01-01_raw_data.csv').resolve()))
178+
analyze_ledger_snapshot('ethereum', '2010-01-01', [], sema)
181179
assert get_entries_mock.call_args_list == get_entries_calls
182-
analyze_snapshot_calls += 2 * [call(entries)]
183180
assert analyze_snapshot_mock.call_args_list == analyze_snapshot_calls
184-
get_row_calls.append(call('bitcoin', '2010-01-01', {'hhi': 1}))
185-
get_row_calls.append(call('ethereum', '2011-01-01', {'hhi': 1}))
186181
assert get_output_row_mock.call_args_list == get_row_calls
187-
write_output_calls.append(call(['row', 'row']))
188-
assert write_csv_output_mock.call_args_list == write_output_calls
182+
sema_release_calls.append(call()) # Test that semaphore release is called even if file does not exist
183+
assert sema.release.call_args_list == sema_release_calls

tokenomics_decentralization/analyze.py

+42-23
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import csv
2+
import multiprocessing
23
import os.path
34
import tokenomics_decentralization.helper as hlp
45
import tokenomics_decentralization.db_helper as db_hlp
@@ -113,35 +114,53 @@ def get_entries(ledger, date, filename):
113114
return entries
114115

115116

117+
def analyze_ledger_snapshot(ledger, date, output_rows, sema):
118+
"""
119+
Executes the analysis of a given ledgers and snapshot date.
120+
:param ledger: a ledger name
121+
:param date: a string in YYYY-MM-DD format
122+
:param output_rows: a list of strings in the form of csv output rows
123+
:param sema: a multiprocessing semaphore
124+
"""
125+
input_filename = None
126+
input_paths = [input_dir / f'{ledger}_{date}_raw_data.csv' for input_dir in hlp.get_input_directories()]
127+
for filename in input_paths:
128+
if os.path.isfile(filename):
129+
input_filename = filename
130+
break
131+
if input_filename:
132+
logging.info(f'[*] {ledger} - {date}')
133+
134+
entries = get_entries(ledger, date, filename)
135+
metrics_values = analyze_snapshot(entries)
136+
del entries
137+
138+
row = hlp.get_output_row(ledger, date, metrics_values)
139+
output_rows.append(row)
140+
141+
sema.release() # Release the semaphore s.t. the loop in analyze() can continue
142+
143+
116144
def analyze(ledgers, snapshot_dates):
117145
"""
118146
Executes the analysis of the given ledgers for the snapshot dates and writes the output
119147
to csv files.
120148
:param ledgers: a list of ledger names
121149
:param snapshot_dates: a list of strings in YYYY-MM-DD format
122150
"""
123-
output_rows = []
151+
manager = multiprocessing.Manager()
152+
output_rows = manager.list() # output_rows is a shared list across all parallel processes
153+
154+
concurrency = hlp.get_concurrency_per_ledger()
124155
for ledger in ledgers:
125-
logging.info(f'[*] {ledger} - Analyzing')
156+
sema = multiprocessing.Semaphore(concurrency[ledger])
157+
jobs = []
126158
for date in snapshot_dates:
127-
logging.info(f'[*] {ledger} - {date}')
128-
129-
input_filename = None
130-
input_paths = [input_dir / f'{ledger}_{date}_raw_data.csv' for input_dir in hlp.get_input_directories()]
131-
for filename in input_paths:
132-
if os.path.isfile(filename):
133-
input_filename = filename
134-
break
135-
if not input_filename:
136-
logging.error(f'{ledger} input data for {date} do not exist')
137-
continue
138-
139-
entries = get_entries(ledger, date, filename)
140-
metrics_values = analyze_snapshot(entries)
141-
del entries
142-
143-
output_rows.append(hlp.get_output_row(ledger, date, metrics_values))
144-
for metric, value in metrics_values.items():
145-
logging.info(f'{metric}: {value}')
146-
147-
hlp.write_csv_output(output_rows)
159+
sema.acquire() # Loop blocks here while the active processes are as many as the semaphore's limit
160+
p = multiprocessing.Process(target=analyze_ledger_snapshot, args=(ledger, date, output_rows, sema))
161+
jobs.append(p)
162+
p.start()
163+
for proc in jobs:
164+
proc.join()
165+
166+
hlp.write_csv_output(sorted(output_rows, key=lambda x: (x[0], x[1]))) # Csv rows ordered by ledger and date

0 commit comments

Comments
 (0)