Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .DS_Store
Binary file not shown.
Binary file added Week 2/.DS_Store
Binary file not shown.
Binary file added Week 2/Day 2/.DS_Store
Binary file not shown.
Binary file added Week 2/Day 3/.DS_Store
Binary file not shown.
Binary file added Week 2/Day 3/Submissions/.DS_Store
Binary file not shown.
Empty file.
76 changes: 76 additions & 0 deletions Week 2/Day 3/Submissions/loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from __future__ import print_function

import os
import numpy as np
import pandas as pd
import tarfile
import urllib.request
import zipfile
from glob import glob
from sklearn.model_selection import train_test_split


def flights(url = "https://storage.googleapis.com/dask-tutorial-data/nycflights.tar.gz", \
data_dir='data', rows_num=10000):

flights_raw = os.path.join(data_dir, 'nycflights.tar.gz')
flightdir = os.path.join(data_dir, 'nycflights')
jsondir = os.path.join(data_dir, 'flightjson')

if not os.path.exists(data_dir):
os.mkdir(data_dir)

if not os.path.exists(flights_raw):
print("- Downloading NYC Flights dataset... ", end='', flush=True)
urllib.request.urlretrieve(url, flights_raw)
print("done", flush=True)

if not os.path.exists(flightdir):
print("- Extracting flight data... ", end='', flush=True)
tar_path = os.path.join(data_dir, 'nycflights.tar.gz')
with tarfile.open(tar_path, mode='r:gz') as flights:
flights.extractall(data_dir+'/')
print("done", flush=True)

if not os.path.exists(jsondir):
print("- Creating json data... ", end='', flush=True)
os.mkdir(jsondir)
for path in glob(os.path.join(data_dir, 'nycflights', '*.csv')):
prefix = os.path.splitext(os.path.basename(path))[0]
# Just take the first 10000 rows for the demo
df = pd.read_csv(path).iloc[:rows_num]
df.to_json(os.path.join(data_dir, 'flightjson', prefix + '.json'),
orient='records', lines=True)
print("done", flush=True)

print("** Finished! **")

def build_dataset(data_dir='data', fill_nan=False, one_hot_encod=False):
filenames = glob('./{}/nycflights/*.csv'.format(data_dir))
dataframes = [pd.read_csv(f) for f in filenames]

frame = pd.concat(dataframes, axis=0, ignore_index=True)

if fill_nan:
print("Not implemented") # should be written
pass
else:
frame = frame.dropna()

if one_hot_encod:
print("Not implemented") # should be written
pass
else:
frame = frame.select_dtypes(['number'])

frame = frame.sample(frac=1)
train, test = train_test_split(frame, test_size=0.2)

# separate features and target
X_train = train.drop(columns=['DepDelay'])
X_test = test.drop(columns=['DepDelay'])

y_train = train['DepDelay']
y_test = test['DepDelay']

return X_train, y_train, X_test, y_test
27 changes: 27 additions & 0 deletions Week 2/Day 3/Submissions/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import argparse
from loader import flights, build_dataset
from models import Classifiers

def set_args(parser):
parser.add_argument('--url', default="https://storage.googleapis.com/dask-tutorial-data/nycflights.tar.gz", \
type=str, help="URL to download dataset")
parser.add_argument('--data_dir', default='data', type=str, help='Directory to store dataset')
parser.add_argument('--rows_num', default=10000, type=int, help='Number of rows to use')
return parser

def main():
parser = argparse.ArgumentParser()
set_args(parser)
args = parser.parse_args()

flights(args.url, args.data_dir, args.rows_num)

X_train, y_train, X_test, y_test = build_dataset(args.data_dir)

for model_name in ["dask_xgboost", "xgboost"]: # "xgboost"
clf = Classifiers(model_name, (X_train, y_train, X_test, y_test))
clf.run_clf()

if __name__ == '__main__':
main()

103 changes: 103 additions & 0 deletions Week 2/Day 3/Submissions/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import xgboost as xgb
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import GridSearchCV
import dask_ml.xgboost
import pandas as pd
import numpy as np
import dask.dataframe as dd
from dask.distributed import Client
import dask_ml.model_selection
from scipy_utils import make_cluster
import time
import os
import csv

class Classifiers():
def __init__(self, name, dataset):
self.name = name
self.dataset = dataset

def df2dd(self): #convert pandas dataframe to dask format
X_train, y_train, X_test, y_test = self.dataset
X_train_df = dd.from_pandas(X_train, npartitions=1).to_dask_array()
y_train_df = dd.from_pandas(y_train, npartitions=1).to_dask_array()
X_test_df = dd.from_pandas(X_test, npartitions=1).to_dask_array()
y_test_df = dd.from_pandas(y_test, npartitions=1).to_dask_array()

return X_train_df, y_train_df, X_test_df, y_test_df

def run_clf(self):
if self.name == "xgboost":
accuracy, training_time, grid_search_time = self.simple_model()
elif self.name == "dask_xgboost":
self.dataset = self.df2dd()
accuracy, training_time, grid_search_time = self.dask_model()

if "stat_file.csv" in os.listdir("./"):
open_mode = 'a'
else:
open_mode = 'w'
with open("stat_file.csv", open_mode, newline ='') as f:
Writer = csv.writer(f)
Writer.writerow([self.name, " error: " + str(round(accuracy, 3)), \
" training time: "+str(round(training_time, 3)), \
" grid search time: " + str(round(grid_search_time, 3))])

def dask_model(self):
cluster = make_cluster()
cluster

client = Client(cluster)
client

X_train, y_train, X_test, y_test = self.dataset

# search parameters
grid_values = {'max_depth': [3, 4, 5], 'learning_rate':[0.1, 0.01, 0.05]}
clf = xgb.XGBRegressor(objective ='reg:squarederror', n_estimators = 10)
grid_clf = dask_ml.model_selection.GridSearchCV(clf, param_grid=grid_values, n_jobs=2)
time_start = time.time()
grid_clf.fit(X_train, y_train)
grid_time_dif = time.time() - time_start

best_params=grid_clf.best_params_

time_start = time.time()
clf = dask_ml.xgboost.XGBRegressor(objective='reg:squarederror', **best_params)
best_clf = clf.fit(X_train, y_train)
train_time_dif = time.time() - time_start
predictions = best_clf.predict(X_test)
mae = dask_ml.metrics.mean_absolute_error(y_test, predictions)

print("For dask_model mae is ", mae, ", train_time is ", train_time_dif)
return mae, train_time_dif, grid_time_dif

def simple_model(self):
X_train, y_train, X_test, y_test = self.dataset
data_dmatrix = xgb.DMatrix(X_train, y_train)

clf = xgb.XGBRegressor(objective ='reg:squarederror', n_estimators = 10)

# search parameters
grid_values = {'max_depth': [3, 4, 5], 'learning_rate':[0.1, 0.01, 0.05]}
grid_clf = GridSearchCV(clf, param_grid = grid_values,scoring="neg_mean_squared_error", cv=3, n_jobs=2)
time_start = time.time()
grid_clf.fit(X_train,y_train)
grid_time_dif = time.time() - time_start

#train model with best parameters
time_start = time.time()
best_clf = xgb.train(dtrain=data_dmatrix, params=grid_clf.best_params_)
train_time_dif = time.time() - time_start

#Predict values based on new parameters
test_dmatrix = xgb.DMatrix(X_test, y_test)
preds = best_clf.predict(test_dmatrix)
mae = mean_absolute_error(y_test, preds)

print("For simple_model mae is ", mae, ", train_time is ", train_time_dif)

return mae, train_time_dif, grid_time_dif



4 changes: 4 additions & 0 deletions Week 2/Day 3/Submissions/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash
set -x

python3 main.py
9 changes: 9 additions & 0 deletions Week 2/Day 3/Submissions/scipy_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
def make_cluster(**kwargs):
try:
from dask_kubernetes import KubeCluster
kwargs.setdefault('n_workers', 9)
cluster = KubeCluster(**kwargs)
except ImportError:
from distributed.deploy.local import LocalCluster
cluster = LocalCluster()
return cluster
2 changes: 2 additions & 0 deletions Week 2/Day 3/Submissions/stat_file.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
dask_xgboost, error: 4.227, training time: 120.383, grid search time: 59.537
xgboost, error: 7.454, training time: 7.004, grid search time: 96.15