-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDriverInputT.cpp
60 lines (46 loc) · 1.93 KB
/
DriverInputT.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
//
// Created by Matilde Pulidori on 24/05/2020.
//
#include "DriverInputT.h"
DriverInputT::DriverInputT(std::string input): filename(input) {}
DriverInputT::~DriverInputT() {}
std::string DriverInputT::getFilename() {
return this->filename;
}
std::map<std::string, ResultT> DriverInputT::mapreduce(DriverInputT::mapfun<MapperInputT, ResultT> mapfunct,
DriverInputT::reducefun<ReducerInputT, ResultT> reducefunct) {
std::string line;
std::map<std::string, ResultT> accumulators;
std::ifstream ifs(this->getFilename());
if (!ifs.is_open()) {
std::cout << "unable to open file" << std::endl;
}
while (getline(ifs, line)) {
std::vector<ResultT> results = mapfunct(MapperInputT(line));
for (ResultT t : results) {
ResultT newResult = reducefunct(
ReducerInputT(t.getKey(), t.getValue(), accumulators[t.getKey()].getValue()));
accumulators[newResult.getKey()] = newResult;
}
}
return accumulators;
}
std::map<std::string, ResultT> DriverInputT::mapreducePipe(DriverInputT::mapfun<MapperInputT, ResultT> mapfunct,
DriverInputT::reducefun<ReducerInputT, ResultT> reducefunct) {
std::string line;
std::map<std::string, ResultT> accumulators;
std::ifstream ifs(this->getFilename());
if (!ifs.is_open()) {
std::cout << "unable to open file" << std::endl;
}
while (getline(ifs, line)) {
write_pipe(mapper_pipe, MapperInputT(line));
std::vector<ResultT> results = read_from_pipe(mapper_pipe);
for (ResultT t : results) {
write_pipe(reducer_pipe, ReducerInputT(t.getKey(), t.getValue(), accumulators[t.getKey()].getValue()) );
ResultT newResult = read_from_pipe(reducer_pipe);
accumulators[newResult.getKey()] = newResult;
}
}
return accumulators;
}