Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Week 2/Day 3/Submissions/aleksandr_safin/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

### Ipynb ###
.ipynb_checkpoints/
*.ipynb

### Misc ###
data/
dask-worker-space/
38 changes: 38 additions & 0 deletions Week 2/Day 3/Submissions/aleksandr_safin/benchmarking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from sklearn.model_selection import GridSearchCV
import joblib
import time
import numpy as np

class ModelFitter():
def __init__(self, model, static_array=False):
"""
static_array (bool): whether to convert dask arrays to numpy, while using with sklearn model
"""
self.model = model
self.static_array = static_array

def fit_cv(self, X, y, n_folds, param_grid):
if self.static_array:
X_data = X.compute()
y_data = y.compute().ravel()
else:
X_data = X
y_data = y

grid_search = GridSearchCV(self.model, param_grid, cv=n_folds, n_jobs=1)
start_time = time.time()
with joblib.parallel_backend("dask", scatter=[X_data, y_data]):
grid_search.fit(X_data, y_data)
elapsed_time = time.time() - start_time

return elapsed_time, elapsed_time, grid_search.best_score_

def benchmark_models(X, y, models, n_folds, param_grid):
"""
Measure time for grid search for each model
"""

results = []
for m in models:
results.append(m.fit_cv(X, y, n_folds, param_grid))
return np.asarray(results)
19 changes: 19 additions & 0 deletions Week 2/Day 3/Submissions/aleksandr_safin/data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import dask.dataframe as dd
import numpy as np


def prepare_data(data_glob_path, target_columns = ["DepDelay"]):
df = dd.read_csv(data_glob_path, dtype={"TailNum": object, "CRSElapsedTime": np.float64}) # this is necessary to read data correctly

# drop the columns with a large share of nan's
df = df.drop(columns=["UniqueCarrier", "TailNum", "Origin", "Dest"], axis=0)
df = df.dropna()

# preparing target column and input dataframe
df_target = df[target_columns]
df = df.drop(columns=target_columns, axis=0)

# converting to dask_array to avoid problems with corrupted index and further related bugs
X = df.to_dask_array(lengths=True)
y = df_target.to_dask_array(lengths=True)
return X, y
49 changes: 49 additions & 0 deletions Week 2/Day 3/Submissions/aleksandr_safin/data_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import os
import numpy as np
import pandas as pd
import tarfile
import urllib.request
import zipfile
from glob import glob

raw_fname = 'nycflights.tar.gz'
dir_name = 'nycflights'
url = "https://storage.googleapis.com/dask-tutorial-data/nycflights.tar.gz"


def get_flights_data(data_dir, raw_fname, dir_name, url, n_rows = 10000):
flights_raw = os.path.join(data_dir, raw_fname)
flightdir = os.path.join(data_dir, dir_name)
jsondir = os.path.join(data_dir, 'flightjson')

os.makedirs(data_dir, exist_ok=True)

if not os.path.exists(flights_raw):
print("- Downloading NYC Flights dataset... ", end='', flush=True)
urllib.request.urlretrieve(url, flights_raw)
print("done", flush=True)

if not os.path.exists(flightdir):
print("- Extracting flight data... ", end='', flush=True)
tar_path = os.path.join(data_dir, raw_fname)
with tarfile.open(tar_path, mode='r:gz') as flights:
flights.extractall(data_dir)
print("done", flush=True)

if not os.path.exists(jsondir):
print("- Creating json data... ", end='', flush=True)
os.mkdir(jsondir)
for path in glob(os.path.join(data_dir, dir_name, '*.csv')):
prefix = os.path.splitext(os.path.basename(path))[0]

df = pd.read_csv(path).iloc[:n_rows]
df.to_json(os.path.join(jsondir, prefix + '.json'),
orient='records', lines=True)
print("done", flush=True)

def fetch_data(data_dir, dataset_name):
print("Setting up data directory")
print("-------------------------")
if dataset_name == "nycflights":
get_flights_data(data_dir, raw_fname, dir_name, url)
print("** Finished! **")
37 changes: 37 additions & 0 deletions Week 2/Day 3/Submissions/aleksandr_safin/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import os
from data_utils import fetch_data
from data import prepare_data
from utils import make_cluster
from dask.distributed import Client
from sklearn.ensemble import GradientBoostingRegressor
from dask_ml.xgboost import XGBRegressor
import pandas as pd

param_grid = {"learning_rate": [0.05, 0.1],
"n_estimators": [20, 50, 100],
"max_depth": [3]}
from benchmarking import ModelFitter, benchmark_models


def main(args):
fetch_data(args.data_dir, args.dataset_name)

X, y = prepare_data(os.path.join(args.data_dir, args.dataset_name, '*.csv'))

print("Starting dask cluster ...")
cluster = make_cluster(n_workers=args.n_workers)
client = Client()

# wrapping our target regressors for further benchmarking
sk_xgb = ModelFitter(GradientBoostingRegressor(), True)
dask_xgb = ModelFitter(XGBRegressor())

print("Starting benchmarking process ...")
results = benchmark_models(X, y, [sk_xgb, dask_xgb], args.n_folds, param_grid)

res_df = pd.DataFrame(index=["sklearn", "dask"], data=results, columns=["GS time", "training time", "Metric"])
print(res_df)


if __name__ == '__main__':
main()
23 changes: 23 additions & 0 deletions Week 2/Day 3/Submissions/aleksandr_safin/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import argparse
from main import main

def set_args(parser):
parser.add_argument('--data_dir', default="data", type=str,
help="data directory")
parser.add_argument('--dataset_name', default="nycflights", type=str,
help="name of the dataset")
parser.add_argument('--n_workers', default=10, type=int,
help="number of workers in cluster")
parser.add_argument('--n_folds', type=int,
default=3, help='number of folds for CV')
parser.add_argument('--random_state', type=int,
default=42, help='random state seed')
return parser


if __name__ == "__main__":
parser = argparse.ArgumentParser()
set_args(parser)
args = parser.parse_args()

main(args)
3 changes: 3 additions & 0 deletions Week 2/Day 3/Submissions/aleksandr_safin/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

python3 run.py
12 changes: 12 additions & 0 deletions Week 2/Day 3/Submissions/aleksandr_safin/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import joblib
from dask.distributed import Client

def make_cluster(**kwargs):
try:
from dask_kubernetes import KubeCluster
kwargs.setdefault('n_workers', 9)
cluster = KubeCluster(**kwargs)
except ImportError:
from distributed.deploy.local import LocalCluster
cluster = LocalCluster(**kwargs)
return cluster