Skip to content

Concurrency #75

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ matplotlib~=3.4.3
pandas~=1.3.4
python-dateutil~=2.8.2
pytest-mock~=3.12.0
psutil~=6.0.0
45 changes: 20 additions & 25 deletions tests/test_analyze.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from tokenomics_decentralization.analyze import analyze_snapshot, analyze, get_entries
from unittest.mock import call
from tokenomics_decentralization.analyze import analyze_snapshot, analyze, get_entries, analyze_ledger_snapshot
from unittest.mock import call, Mock
import pathlib


Expand Down Expand Up @@ -129,20 +129,25 @@ def test_get_entries(mocker):


def test_analyze(mocker):
get_concurrency_mock = mocker.patch('tokenomics_decentralization.helper.get_concurrency_per_ledger')
get_concurrency_mock.return_value = {'bitcoin': 2, 'ethereum': 2}

write_csv_output_mock = mocker.patch('tokenomics_decentralization.helper.write_csv_output')

analyze(['bitcoin'], ['2010-01-01'])
assert len(write_csv_output_mock.call_args_list) == 1


def test_analyze_ledger_snapshot(mocker):
get_input_directories_mock = mocker.patch('tokenomics_decentralization.helper.get_input_directories')
get_input_directories_mock.return_value = [pathlib.Path('/').resolve()]

is_file_mock = mocker.patch('os.path.isfile')
is_file_mock.side_effect = {
pathlib.Path('/bitcoin_2010-01-01_raw_data.csv').resolve(): True,
pathlib.Path('/bitcoin_2011-01-01_raw_data.csv').resolve(): False,
pathlib.Path('/ethereum_2010-01-01_raw_data.csv').resolve(): False,
pathlib.Path('/ethereum_2011-01-01_raw_data.csv').resolve(): True,
}.get

get_db_connector_mock = mocker.patch('tokenomics_decentralization.db_helper.get_connector')
get_db_connector_mock.return_value = 'connector'

get_entries_mock = mocker.patch('tokenomics_decentralization.analyze.get_entries')
entries = [1, 2]
get_entries_mock.return_value = entries
Expand All @@ -153,36 +158,26 @@ def test_analyze(mocker):
get_output_row_mock = mocker.patch('tokenomics_decentralization.helper.get_output_row')
get_output_row_mock.return_value = 'row'

write_csv_output_mock = mocker.patch('tokenomics_decentralization.helper.write_csv_output')
sema = Mock()

get_input_dirs_calls = []
get_entries_calls = []
analyze_snapshot_calls = []
get_row_calls = []
write_output_calls = []
sema_release_calls = []

analyze(['bitcoin'], ['2010-01-01'])
get_input_dirs_calls.append(call())
assert get_input_directories_mock.call_args_list == get_input_dirs_calls
analyze_ledger_snapshot('bitcoin', '2010-01-01', [], sema)
get_entries_calls.append(call('bitcoin', '2010-01-01', pathlib.Path('/bitcoin_2010-01-01_raw_data.csv').resolve()))
assert get_entries_mock.call_args_list == get_entries_calls
analyze_snapshot_calls.append(call(entries))
assert analyze_snapshot_mock.call_args_list == analyze_snapshot_calls
get_row_calls.append(call('bitcoin', '2010-01-01', {'hhi': 1}))
assert get_output_row_mock.call_args_list == get_row_calls
write_output_calls.append(call(['row']))
assert write_csv_output_mock.call_args_list == write_output_calls
sema_release_calls.append(call())
assert sema.release.call_args_list == sema_release_calls

analyze(['bitcoin', 'ethereum'], ['2010-01-01', '2011-01-01'])
get_input_dirs_calls += 4 * [call()]
assert get_input_directories_mock.call_args_list == get_input_dirs_calls
get_entries_calls.append(call('bitcoin', '2010-01-01', pathlib.Path('/bitcoin_2010-01-01_raw_data.csv').resolve()))
get_entries_calls.append(call('ethereum', '2011-01-01', pathlib.Path('/ethereum_2011-01-01_raw_data.csv').resolve()))
analyze_ledger_snapshot('ethereum', '2010-01-01', [], sema)
assert get_entries_mock.call_args_list == get_entries_calls
analyze_snapshot_calls += 2 * [call(entries)]
assert analyze_snapshot_mock.call_args_list == analyze_snapshot_calls
get_row_calls.append(call('bitcoin', '2010-01-01', {'hhi': 1}))
get_row_calls.append(call('ethereum', '2011-01-01', {'hhi': 1}))
assert get_output_row_mock.call_args_list == get_row_calls
write_output_calls.append(call(['row', 'row']))
assert write_csv_output_mock.call_args_list == write_output_calls
sema_release_calls.append(call()) # Test that semaphore release is called even if file does not exist
assert sema.release.call_args_list == sema_release_calls
31 changes: 28 additions & 3 deletions tests/test_helper.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import tokenomics_decentralization.helper as hlp
from collections import namedtuple
import pathlib
import os
import argparse
import datetime
import pytest

Expand All @@ -12,9 +12,9 @@ def test_valid_date():

for d in ['2022/1/01', '2022/01/1', '2022/1', '2022/01', '2022.1.01', '2022.01.1', '2022.1', '2022.01', 'blah',
'2022-', '2022-1-1', '2022-1-01', '2022-1', '2022-01-1', '2022-02-29']:
with pytest.raises(argparse.ArgumentTypeError) as e_info:
with pytest.raises(ValueError) as e_info:
hlp.valid_date(d)
assert e_info.type == argparse.ArgumentTypeError
assert e_info.type == ValueError


def test_get_date_beginning():
Expand Down Expand Up @@ -486,3 +486,28 @@ def test_get_clusters(mocker):
assert clusters['entity1'] == clusters['entity3']
assert clusters['entity4'] == clusters['entity5']
assert 'entity7' not in clusters.keys()


def test_get_concurrency_per_ledger(mocker):
psutil_memory_mock = mocker.patch('psutil.virtual_memory')
psutil_memory_mock.return_value = namedtuple('VM', 'total')(10*10**9)

get_input_directories_mock = mocker.patch('tokenomics_decentralization.helper.get_input_directories')
get_input_directories_mock.return_value = [pathlib.Path('/').resolve()]

get_ledgers_mock = mocker.patch('tokenomics_decentralization.helper.get_ledgers')
get_ledgers_mock.return_value = ['bitcoin', 'ethereum']

os_walk_mock = mocker.patch('os.walk')
os_walk_mock.return_value = [('/', 'foo', ['bitcoin_2010-01-01_raw_data.csv'])]

os_stat_mock = mocker.patch('os.stat')
os_stat_mock.return_value = namedtuple('ST', 'st_size')(10*10**8)

concurrency = hlp.get_concurrency_per_ledger()
assert concurrency == {'bitcoin': 3, 'ethereum': 1}

os_stat_mock.return_value = namedtuple('ST', 'st_size')(5*10**9)

with pytest.raises(ValueError):
hlp.get_concurrency_per_ledger()
65 changes: 42 additions & 23 deletions tokenomics_decentralization/analyze.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import csv
import multiprocessing
import os.path
import tokenomics_decentralization.helper as hlp
import tokenomics_decentralization.db_helper as db_hlp
Expand Down Expand Up @@ -113,35 +114,53 @@ def get_entries(ledger, date, filename):
return entries


def analyze_ledger_snapshot(ledger, date, output_rows, sema):
"""
Executes the analysis of a given ledgers and snapshot date.
:param ledger: a ledger name
:param date: a string in YYYY-MM-DD format
:param output_rows: a list of strings in the form of csv output rows
:param sema: a multiprocessing semaphore
"""
input_filename = None
input_paths = [input_dir / f'{ledger}_{date}_raw_data.csv' for input_dir in hlp.get_input_directories()]
for filename in input_paths:
if os.path.isfile(filename):
input_filename = filename
break
if input_filename:
logging.info(f'[*] {ledger} - {date}')

entries = get_entries(ledger, date, filename)
metrics_values = analyze_snapshot(entries)
del entries

row = hlp.get_output_row(ledger, date, metrics_values)
output_rows.append(row)

sema.release() # Release the semaphore s.t. the loop in analyze() can continue


def analyze(ledgers, snapshot_dates):
"""
Executes the analysis of the given ledgers for the snapshot dates and writes the output
to csv files.
:param ledgers: a list of ledger names
:param snapshot_dates: a list of strings in YYYY-MM-DD format
"""
output_rows = []
manager = multiprocessing.Manager()
output_rows = manager.list() # output_rows is a shared list across all parallel processes

concurrency = hlp.get_concurrency_per_ledger()
for ledger in ledgers:
logging.info(f'[*] {ledger} - Analyzing')
sema = multiprocessing.Semaphore(concurrency[ledger])
jobs = []
for date in snapshot_dates:
logging.info(f'[*] {ledger} - {date}')

input_filename = None
input_paths = [input_dir / f'{ledger}_{date}_raw_data.csv' for input_dir in hlp.get_input_directories()]
for filename in input_paths:
if os.path.isfile(filename):
input_filename = filename
break
if not input_filename:
logging.error(f'{ledger} input data for {date} do not exist')
continue

entries = get_entries(ledger, date, filename)
metrics_values = analyze_snapshot(entries)
del entries

output_rows.append(hlp.get_output_row(ledger, date, metrics_values))
for metric, value in metrics_values.items():
logging.info(f'{metric}: {value}')

hlp.write_csv_output(output_rows)
sema.acquire() # Loop blocks here while the active processes are as many as the semaphore's limit
p = multiprocessing.Process(target=analyze_ledger_snapshot, args=(ledger, date, output_rows, sema))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()

hlp.write_csv_output(sorted(output_rows, key=lambda x: (x[0], x[1]))) # Csv rows ordered by ledger and date
44 changes: 41 additions & 3 deletions tokenomics_decentralization/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import os
import datetime
import calendar
import argparse
import psutil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to be added to requirements.txt

import json
from collections import defaultdict
import logging
Expand All @@ -33,8 +33,8 @@ def valid_date(date_string):
try:
get_date_beginning(date_string)
except ValueError:
raise argparse.ArgumentTypeError("Please use the format YYYY-MM-DD for the timeframe argument "
"(day and month can be omitted).")
raise ValueError("Please use the format YYYY-MM-DD for the timeframe argument "
"(day and month can be omitted).")
return date_string


Expand Down Expand Up @@ -607,3 +607,41 @@ def get_clusters(ledger):
cluster_mapping[item[0]] = cluster_name

return cluster_mapping


def get_concurrency_per_ledger():
"""
Computes the maximum number of parallel processes that can run per ledger,
based on the system's available memory.
:returns: a dictionary where the keys are ledger names and values are integers
"""
system_memory_total = psutil.virtual_memory().total # Get the system's total memory
system_memory_total -= 10**9 # Leave 1GB of memory to be used by other processes

concurrency = {}
too_large_ledgers = set()
input_dirs = get_input_directories()
for ledger in get_ledgers():
# Find the size of the largest input file per ledger
max_file_size = 0
for input_dir in input_dirs:
for folder, _, files in os.walk(input_dir):
for file in files:
if file.startswith(ledger):
max_file_size = max(max_file_size, os.stat(os.path.join(folder, file)).st_size)
# Compute the max number of processes that can open the largest ledger file
# and run in parallel without exhausting the system's memory.
if max_file_size > 0:
# When loaded in (a dict in) memory, each file consumes approx. 2.5x space compared to storage.
concurrency[ledger] = int(system_memory_total / (2.5 * max_file_size))
# Find if some ledger files are too large to fit in the system's available memory.
if concurrency[ledger] == 0:
too_large_ledgers.add(ledger)
else:
concurrency[ledger] = 1

if too_large_ledgers:
raise ValueError('The max input files of the following ledgers are too'
'large to load in memory' + ','.join(too_large_ledgers))

return concurrency
Loading