Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions Week 2/Day 3/Submissions/alexey_boyko/data_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from __future__ import print_function

import os
import numpy as np
import pandas as pd
import tarfile
import urllib.request
import zipfile
import argparse
import json
from glob import glob

def download_flights(
data_dir='data',
flights_targz_url="https://storage.googleapis.com/dask-tutorial-data/nycflights.tar.gz"):
'''loading flights data from url, untaring, making jsons'''

print("Setting up data directory")
print("-------------------------")

if not os.path.exists(data_dir):
os.mkdir(data_dir)

flights_raw = os.path.join(data_dir, 'nycflights.tar.gz')
if not os.path.exists(flights_raw):
print("- Downloading NYC Flights dataset... ", end='', flush=True)
url = flights_targz_url
urllib.request.urlretrieve(url, flights_raw)
print("done", flush=True)
print("** Finished! **")

def untar(data_dir='data'):
flightdir = os.path.join(data_dir, 'nycflights')
if not os.path.exists(flightdir):
print("- Extracting flight data... ", end='', flush=True)
tar_path = os.path.join('data', 'nycflights.tar.gz')
with tarfile.open(tar_path, mode='r:gz') as flights:
flights.extractall('data/')
print("done", flush=True)


def jsonize(data_dir='data', n_rows=10000):
jsondir = os.path.join(data_dir, 'flightjson')
if not os.path.exists(jsondir):
print("- Creating json data... ", end='', flush=True)
os.mkdir(jsondir)
for path in glob(os.path.join('data', 'nycflights', '*.csv')):
prefix = os.path.splitext(os.path.basename(path))[0]
# Just take the first 10000 rows for the demo
df = pd.read_csv(path).iloc[:n_rows]
df.to_json(os.path.join('data', 'flightjson', prefix + '.json'),
orient='records', lines=True)
print("done", flush=True)


def load_dfs_from_jsons(data_dir='data', useDask=False, n_json_files=100):
paths = glob(os.path.join(data_dir, 'flightjson/*json'))

if not useDask:
dfs = [pd.read_json(path, lines=True) for path in paths[:n_json_files]]
dfs = pd.concat(dfs)
else:
from dask import delayed
import dask.dataframe as dd

import dask.bag as db #os.path.join(data_dir, 'flightjson/*json'
mybag = delayed(db.read_text(paths).map(json.loads))

mybag.to_dataframe()

dfs = [(dd.read_json(path, lines=True)) for path in paths[:n_json_files]]
dfs = dd.concat(dfs)

return dfs

def prepareData(dfs,
activeTextFeatures=['UniqueCarrier', 'Origin', 'Dest'], features2drop=['CRSElapsedTime'],
useDask=False):
'''X - pandas dataframe or its dask version'''
allTextFeatures = set(['UniqueCarrier', 'Origin', 'Dest', 'TailNum'])


dfs.columns.drop(['TaxiIn', 'TaxiOut'])
dfs = dfs.dropna()
y = dfs['DepDelay']

dfs = dfs[dfs.columns.drop('DepDelay')]
activeNonTextFeatures = dfs.columns.drop(allTextFeatures)
activeNonTextFeatures = activeNonTextFeatures.drop(features2drop) #, 'ArrDelay'])


if not useDask:
X = pd.concat([pd.get_dummies(dfs[col]) for col in activeTextFeatures], axis=1)
X = pd.concat([X, dfs[activeNonTextFeatures]], axis=1)
X = X.dropna(axis=1)
else:
import dask.dataframe as dd
# failed to implement categorical on delayed data
# dfs.categorize(columns=activeTextFeatures)
# X = dd.concat([dd.get_dummies(dfs[col]) for col in activeTextFeatures], axis=1)
# X = dd.concat([X, dfs[activeNonTextFeatures]], axis=1)
X = dfs[activeNonTextFeatures]

return X, y

def grid_search2pd(grid_search):
other_params = list(grid_search.cv_results_.keys())
other_params.remove('params')
resultCV = pd.concat([pd.DataFrame({key: grid_search.cv_results_[key]}) for key in other_params ], axis=1)
resultCV = pd.concat([pd.DataFrame(grid_search.cv_results_['params']), resultCV], axis=1)
return resultCV
106 changes: 106 additions & 0 deletions Week 2/Day 3/Submissions/alexey_boyko/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from __future__ import print_function

import os
import numpy as np
import pandas as pd
import tarfile
import urllib.request
import zipfile
import argparse
import joblib
import argparse



from sklearn.externals import joblib
from sklearn.metrics import mean_squared_error, r2_score


from dask.distributed import Client
from glob import glob
from data_utils import download_flights, untar, jsonize, load_dfs_from_jsons, prepareData, grid_search2pd


def make_cluster(**kwargs):
try:
from dask_kubernetes import KubeCluster
kwargs.setdefault('n_workers', 8)
cluster = KubeCluster(**kwargs)
except ImportError:
from distributed.deploy.local import LocalCluster
cluster = LocalCluster()
return cluster


def set_args(parser):
parser.add_argument('--dask', action='store_true', help='use dask')
parser.add_argument('--flights_url', default="https://storage.googleapis.com/dask-tutorial-data/nycflights.tar.gz", \
type=str, help="URL to download Flights data")
parser.add_argument('--data_dir', default='data', type=str, help='root of the dataset')
parser.add_argument('--n_rows', default=10000, type=int, help='number of rows extracted to jsons')
parser.add_argument('--out_file', default=10000, type=str, help='name of file to save results')

return parser


def main(args):
# download and prepare data
download_flights(data_dir=args.data_dir, flights_targz_url=args.flights_url)
untar(data_dir=args.data_dir)
jsonize(data_dir=args.data_dir, n_rows=args.n_rows)

dfs = load_dfs_from_jsons(useDask=args.dask)
print(dfs)
X, y = prepareData(dfs=dfs, useDask=args.dask)


if not args.dask:
from sklearn import linear_model
from sklearn.model_selection import train_test_split
from sklearn.model_selection import GridSearchCV
param_grid = {
'alpha': [1e-3, 1e-2, .5, 1],
'normalize': [True, False]
}
regr = linear_model.Lasso(max_iter=10000)
grid_search = GridSearchCV(regr, param_grid, cv=2, n_jobs=-1, verbose=2)
else:
from dask_ml.xgboost import XGBRegressor
from dask_ml.model_selection import train_test_split
from dask_ml.model_selection import GridSearchCV
param_grid = {
'max_depth': [3, 5],
'learning_rate': [1e-2, 1e-1],
'n_estimators': [10, 100],
}
regr = XGBRegressor()
grid_search = GridSearchCV(regr, param_grid, cv=2, n_jobs=-1)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33)
cluster = make_cluster()
client = Client(cluster)

with joblib.parallel_backend("dask"):
print('Doing gridsearch...')
grid_search = GridSearchCV(regr, param_grid, cv=2, n_jobs=-1) #dask gridsearch does not support verbose
grid_search.fit(X_train, y_train)

resultCV = grid_search2pd(grid_search)
resultCV.to_csv(args.out_file + '.csv')
print('gridsearch finished')
print('mean tests scores: ', resultCV['mean_test_score'])
print('mean fit times: ', resultCV['mean_fit_time'])
return


if __name__ == '__main__':
#init parser
parser = argparse.ArgumentParser()
set_args(parser)

#parse
args = parser.parse_args()
print('==============')
print(args)
print('==============')
main(args)
4 changes: 4 additions & 0 deletions Week 2/Day 3/Submissions/alexey_boyko/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
echo nodask
python main.py --n_rows=9999 --out_file nodask_results
echo dask
python main.py --n_rows=9999 --dask --out_file dask_results