-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathsimulate.py
More file actions
217 lines (170 loc) · 8.59 KB
/
simulate.py
File metadata and controls
217 lines (170 loc) · 8.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
from multiprocessing import Pool
from typing import List, Iterator, Tuple
import numpy as np
import pandas as pd
import simpy
import os
from datetime import timedelta
from leaf.infrastructure import Infrastructure, Node
from leaf.power import PowerMeter, PowerModelNode
from strategy import Strategy, BidirectionalStrategy, AdHocStrategy
MEASUREMENT_INTERVAL = 30 # mins
ERROR_REPETITIONS = 10 # Repetitions for each run with random noise that simulates forecast errors
COUNTRIES = ["cal", "gb", "ger", "fr"]
Job = Tuple[int, int, int] # id, arrival time, duration
def main(node, ci, jobs: List[Job], strategy: Strategy):
env = simpy.Environment()
env.process(datacenter_process(env, jobs, strategy))
power_meter = PowerMeter(node, measurement_interval=MEASUREMENT_INTERVAL)
env.process(power_meter.run(env, delay=0.01))
# print(f"Starting simulation with strategy {strategy}...")
env.run(until=len(ci) * MEASUREMENT_INTERVAL)
# Watt usage at point in time
return np.array([float(measurement) for measurement in power_meter.measurements])
def _print_analysis(consumption_timeline, ci):
consumed_kwh = sum(consumption_timeline) * MEASUREMENT_INTERVAL / 60 / 1000
# gCO2/timestep = gCO2/kWh * W / 1000 / steps_per_hour
gco2_per_timestep = np.multiply(ci.values, consumption_timeline) * MEASUREMENT_INTERVAL / 60 / 1000
emitted_gco2 = sum(gco2_per_timestep)
print(f"- Consumed {consumed_kwh:.2f} kWh and emitted {emitted_gco2 / 1000:.2f} kg CO2e.")
print(f"- Average CO2 intensity of used energy was {emitted_gco2 / consumed_kwh:.2f} gCO2e/kWh.")
def datacenter_process(env: simpy.Environment, jobs: Iterator[Job], strategy: Strategy):
for _, arrival_time, duration in jobs:
time_until_arrival = arrival_time - env.now
if isinstance(strategy, BidirectionalStrategy):
time_until_arrival -= strategy.window_in_minutes
if time_until_arrival < 0:
continue
yield env.timeout(time_until_arrival)
env.process(strategy.run(env, duration))
def adhoc_worker(node, jobs, ci, error, seed, forecast_method, interruptible):
strategy = AdHocStrategy(node=node, ci=_apply_error(ci, error, seed),
interval=MEASUREMENT_INTERVAL, forecast=forecast_method, interruptible=interruptible)
return main(node, ci, jobs, strategy)
def bidirectional_worker(node, jobs, ci, error, seed, window):
strategy = BidirectionalStrategy(node=node, ci=_apply_error(ci, error, seed),
window_in_minutes=window * 30, interval=MEASUREMENT_INTERVAL)
consumption_timeline = main(node, ci, jobs, strategy)
return ci[consumption_timeline == 1].sum() / 365
def periodic_experiment(error, max_steps_window: int = 17):
result = {}
for country in COUNTRIES:
print(f"Running periodic experiment for {country} with error {error}.")
ci = _load_dataset(f"data/{country}_ci.csv")
ci = ci[ci.index.minute % MEASUREMENT_INTERVAL == 0]
infrastructure = Infrastructure()
node = Node("dc", power_model=PowerModelNode(power_per_cu=1))
infrastructure.add_node(node)
# daily at 1am
jobs: List[Job] = [(i, time + 60, MEASUREMENT_INTERVAL) for i, time in enumerate(range(0, 1440 * 365, 1440))]
window_results = []
for window in range(max_steps_window):
if error:
bidirectional_args = ((node, jobs, ci, error, seed, window) for seed in range(ERROR_REPETITIONS))
with Pool(ERROR_REPETITIONS) as pool:
repeat_results = pool.starmap(bidirectional_worker, bidirectional_args)
mean_result = np.array(repeat_results).mean()
window_results.append(mean_result)
else:
window_results.append(bidirectional_worker(node, jobs, ci, error, None, window))
result[country] = window_results
if not os.path.exists("results"):
os.makedirs("results")
with open(f"results/periodic_{error}.csv", "w") as csvfile:
pd.DataFrame(result).to_csv(csvfile, index=False)
def ml_experiment():
ml_jobs = generate_batch_jobs(n=3387,
min_duration=4 * 60,
max_duration=90 * 60,
workday_start=9 * 60,
workday_end=17 * 60,
workdays_only=True)
for country in COUNTRIES:
# Load CI data
ci = _load_dataset(f"data/{country}_ci.csv")
ci = ci[ci.index.minute % MEASUREMENT_INTERVAL == 0]
# Baseline Experiment
ad_hoc_experiment("ml", country, ml_jobs, ci, forecast_method=0, interruptible=False, error=False)
# Experiments
for interruptible in [True, False]:
for error in [0, 0.05, 0.1]:
for forecast_method in ["next_workday", "semi_weekly"]:
ad_hoc_experiment("ml", country, ml_jobs, ci,
forecast_method=forecast_method,
interruptible=interruptible,
error=error)
def ad_hoc_experiment(experiment_name: str,
country: str,
jobs: List,
ci,
forecast_method,
interruptible: bool,
error: float):
print(f"ad_hoc_experiment({experiment_name}, forecast_method={forecast_method}, "
f"interruptible={interruptible}, error={error}")
# Build infrastructure
infrastructure = Infrastructure()
node = Node("dc", power_model=PowerModelNode(power_per_cu=1))
infrastructure.add_node(node)
# Run experiment(s)
if error:
adhoc_args = ((node, jobs, ci, error, seed, forecast_method, interruptible) for seed in range(ERROR_REPETITIONS))
with Pool(ERROR_REPETITIONS) as pool: # from 0 to +-8h in 30min intervals (16 experiments)
worker_results = pool.starmap(adhoc_worker, adhoc_args)
timeline = np.mean(worker_results, axis=0)
else:
timeline = adhoc_worker(node, jobs, ci, error, None, forecast_method, interruptible)
# _print_analysis(timeline, ci)
# Store results
df = pd.DataFrame({"active_jobs": timeline, "ci": ci, "emissions": ci * timeline}, index=ci.index)
i = "_i" if interruptible else ""
e = f"_{error}" if error else ""
with open(f"results/{experiment_name}_{forecast_method}{e}{i}_{country}.csv", "w") as csvfile:
df.to_csv(csvfile)
def generate_batch_jobs(n: int,
min_duration: int,
max_duration: int,
workday_start: int,
workday_end: int,
workdays_only: bool) -> List[Job]:
rng = np.random.default_rng(0)
steps = pd.date_range("2020-01-01", "2021-01-01", freq="30min")
possible_durations = range(min_duration, max_duration, MEASUREMENT_INTERVAL)
possible_start_times = range(workday_start, workday_end, MEASUREMENT_INTERVAL)
days = pd.date_range("2020-01-01", "2020-12-31", freq="D")
if workdays_only:
usable_days = [day for day in days if 0 <= day.weekday() <= 4]
else:
usable_days = days
jobs_per_day = rng.multinomial(n, [1 / len(usable_days)] * len(usable_days))
s = pd.Series(jobs_per_day, index=usable_days).reindex(days, fill_value=0)
start_dates = []
for index, value in s.items():
ds = [index] * value
start_minutes = np.sort(rng.choice(possible_start_times, size=len(ds)))
for d, start_minute in zip(ds, start_minutes):
dt = d + timedelta(minutes=int(start_minute))
start_dates.append(steps.get_loc(dt) * MEASUREMENT_INTERVAL)
durations = rng.choice(possible_durations, size=n)
jobs = []
for job_id, (start_date, duration) in enumerate(zip(start_dates, durations)):
jobs.append((job_id, start_date, duration))
return jobs
def _load_dataset(filename):
with open(filename, "r") as csvfile:
ci = pd.read_csv(csvfile, index_col=0, parse_dates=True)["Carbon Intensity"]
# print(f"Average CO2 intensity of the year is {ci.mean():.2f} gCO2e/kWh.")
return ci["2020-01-01 00:00:00":]
def _apply_error(ci, error, seed):
if error is None:
return ci
rng = np.random.default_rng(seed)
return ci + rng.normal(0, error * ci.mean(), size=len(ci))
if __name__ == '__main__':
# Scenario I
print("Starting Scenario 1...")
periodic_experiment(error=0)
periodic_experiment(error=0.05)
# Scenario II
print("Starting Scenario 2...")
ml_experiment()