Skip to content

Commit bcbbff5

Browse files
committed
Use a consistent mechanism of record keeping and record fetching across models
1 parent c94ebe9 commit bcbbff5

18 files changed

Lines changed: 669 additions & 1102 deletions

sim/src/models/batcher.rs

Lines changed: 113 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::f64::INFINITY;
33
use serde::{Deserialize, Serialize};
44

55
use super::model_trait::{DevsModel, Reportable, ReportableModel, SerializableModel};
6-
use super::ModelMessage;
6+
use super::{ModelMessage, ModelRecord};
77
use crate::simulator::Services;
88
use crate::utils::errors::SimulationError;
99

@@ -26,6 +26,8 @@ pub struct Batcher {
2626
max_batch_time: f64,
2727
max_batch_size: usize,
2828
#[serde(default)]
29+
store_records: bool,
30+
#[serde(default)]
2931
state: State,
3032
}
3133

@@ -47,6 +49,7 @@ struct State {
4749
phase: Phase,
4850
until_next_event: f64,
4951
jobs: Vec<String>,
52+
records: Vec<ModelRecord>,
5053
}
5154

5255
impl Default for State {
@@ -55,6 +58,7 @@ impl Default for State {
5558
phase: Phase::Passive,
5659
until_next_event: INFINITY,
5760
jobs: Vec::new(),
61+
records: Vec::new(),
5862
}
5963
}
6064
}
@@ -72,83 +76,168 @@ impl Batcher {
7276
job_out_port: String,
7377
max_batch_time: f64,
7478
max_batch_size: usize,
79+
store_records: bool,
7580
) -> Self {
7681
Self {
7782
ports_in: PortsIn { job: job_in_port },
7883
ports_out: PortsOut { job: job_out_port },
7984
max_batch_time,
8085
max_batch_size,
86+
store_records,
8187
state: State::default(),
8288
}
8389
}
8490

85-
fn add_to_batch(&mut self, incoming_message: &ModelMessage) -> Result<(), SimulationError> {
91+
fn add_to_batch(
92+
&mut self,
93+
incoming_message: &ModelMessage,
94+
services: &mut Services,
95+
) -> Result<(), SimulationError> {
8696
self.state.phase = Phase::Batching;
8797
self.state.jobs.push(incoming_message.content.clone());
98+
self.record(
99+
services.global_time(),
100+
String::from("Arrival"),
101+
incoming_message.content.clone(),
102+
);
88103
Ok(())
89104
}
90105

91-
fn start_batch(&mut self, incoming_message: &ModelMessage) -> Result<(), SimulationError> {
106+
fn start_batch(
107+
&mut self,
108+
incoming_message: &ModelMessage,
109+
services: &mut Services,
110+
) -> Result<(), SimulationError> {
92111
self.state.phase = Phase::Batching;
93112
self.state.until_next_event = self.max_batch_time;
94113
self.state.jobs.push(incoming_message.content.clone());
114+
self.record(
115+
services.global_time(),
116+
String::from("Arrival"),
117+
incoming_message.content.clone(),
118+
);
95119
Ok(())
96120
}
97121

98-
fn fill_batch(&mut self, incoming_message: &ModelMessage) -> Result<(), SimulationError> {
122+
fn fill_batch(
123+
&mut self,
124+
incoming_message: &ModelMessage,
125+
services: &mut Services,
126+
) -> Result<(), SimulationError> {
99127
self.state.phase = Phase::Release;
100128
self.state.until_next_event = 0.0;
101129
self.state.jobs.push(incoming_message.content.clone());
130+
self.record(
131+
services.global_time(),
132+
String::from("Arrival"),
133+
incoming_message.content.clone(),
134+
);
102135
Ok(())
103136
}
104137

105-
fn release_full_queue(&mut self) -> Result<Vec<ModelMessage>, SimulationError> {
138+
fn release_full_queue(
139+
&mut self,
140+
services: &mut Services,
141+
) -> Result<Vec<ModelMessage>, SimulationError> {
106142
self.state.phase = Phase::Passive;
107143
self.state.until_next_event = INFINITY;
108144
Ok((0..self.state.jobs.len())
109-
.map(|_| ModelMessage {
110-
port_name: self.ports_out.job.clone(),
111-
content: self.state.jobs.remove(0),
145+
.map(|_| {
146+
self.record(
147+
services.global_time(),
148+
String::from("Departure"),
149+
self.state.jobs[0].clone(),
150+
);
151+
ModelMessage {
152+
port_name: self.ports_out.job.clone(),
153+
content: self.state.jobs.remove(0),
154+
}
112155
})
113156
.collect())
114157
}
115158

116-
fn release_partial_queue(&mut self) -> Result<Vec<ModelMessage>, SimulationError> {
159+
fn release_partial_queue(
160+
&mut self,
161+
services: &mut Services,
162+
) -> Result<Vec<ModelMessage>, SimulationError> {
117163
self.state.phase = Phase::Batching;
118164
self.state.until_next_event = self.max_batch_time;
119165
Ok((0..self.max_batch_size)
120-
.map(|_| ModelMessage {
121-
port_name: self.ports_out.job.clone(),
122-
content: self.state.jobs.remove(0),
166+
.map(|_| {
167+
self.record(
168+
services.global_time(),
169+
String::from("Departure"),
170+
self.state.jobs[0].clone(),
171+
);
172+
ModelMessage {
173+
port_name: self.ports_out.job.clone(),
174+
content: self.state.jobs.remove(0),
175+
}
123176
})
124177
.collect())
125178
}
179+
180+
fn release_multiple(
181+
&mut self,
182+
services: &mut Services,
183+
) -> Result<Vec<ModelMessage>, SimulationError> {
184+
self.state.phase = Phase::Release;
185+
self.state.until_next_event = 0.0;
186+
Ok((0..self.max_batch_size)
187+
.map(|_| {
188+
self.record(
189+
services.global_time(),
190+
String::from("Departure"),
191+
self.state.jobs[0].clone(),
192+
);
193+
ModelMessage {
194+
port_name: self.ports_out.job.clone(),
195+
content: self.state.jobs.remove(0),
196+
}
197+
})
198+
.collect())
199+
}
200+
201+
fn record(&mut self, time: f64, action: String, subject: String) {
202+
if self.store_records {
203+
self.state.records.push(ModelRecord {
204+
time,
205+
action,
206+
subject,
207+
})
208+
}
209+
}
126210
}
127211

128212
impl DevsModel for Batcher {
129213
fn events_ext(
130214
&mut self,
131215
incoming_message: &ModelMessage,
132-
_services: &mut Services,
216+
services: &mut Services,
133217
) -> Result<(), SimulationError> {
134218
match (
135219
&self.state.phase,
136220
self.state.jobs.len() + 1 < self.max_batch_size,
137221
) {
138-
(Phase::Batching, true) => self.add_to_batch(incoming_message),
139-
(Phase::Passive, true) => self.start_batch(incoming_message),
222+
(Phase::Batching, true) => self.add_to_batch(incoming_message, services),
223+
(Phase::Passive, true) => self.start_batch(incoming_message, services),
140224
(Phase::Release, true) => Err(SimulationError::InvalidModelState),
141-
(_, false) => self.fill_batch(incoming_message),
225+
(_, false) => self.fill_batch(incoming_message, services),
142226
}
143227
}
144228

145229
fn events_int(
146230
&mut self,
147-
_services: &mut Services,
231+
services: &mut Services,
148232
) -> Result<Vec<ModelMessage>, SimulationError> {
149-
match self.state.jobs.len() <= self.max_batch_size {
150-
true => self.release_full_queue(),
151-
false => self.release_partial_queue(),
233+
match (
234+
self.state.jobs.len() <= self.max_batch_size,
235+
self.state.jobs.len() >= 2 * self.max_batch_size,
236+
) {
237+
(true, false) => self.release_full_queue(services),
238+
(false, true) => self.release_multiple(services),
239+
(false, false) => self.release_partial_queue(services),
240+
(true, true) => Err(SimulationError::InvalidModelState),
152241
}
153242
}
154243

@@ -169,6 +258,10 @@ impl Reportable for Batcher {
169258
Phase::Release => String::from("Releasing batch"),
170259
}
171260
}
261+
262+
fn records(&self) -> &Vec<ModelRecord> {
263+
&self.state.records
264+
}
172265
}
173266

174267
impl ReportableModel for Batcher {}

0 commit comments

Comments
 (0)