-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathes3_map_reduce.cpp
93 lines (77 loc) · 2.66 KB
/
es3_map_reduce.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#include <iostream>
#include <fstream>
#include <utility>
#include <vector>
#include <map>
#include <regex>
class SimpleMapperInput{
std::string line;
public:
SimpleMapperInput(std::string line):line(std::move(line)){}
std::string getLine() {return line;}
};
template<typename T>
class SimpleResult{
std::string key;
T value;
public:
SimpleResult() = default;
SimpleResult(std::string key) :key(std::move(key)){}
SimpleResult(std::string key, T value):
key(std::move(key)),value(std::move(value)){}
std::string getKey(){return key;}
T getValue(){return value;}
};
template<typename V, typename A>
class SimpleReducerInput {
std::string key;
V value;
A accumulator;
public:
SimpleReducerInput(std::string key, V value, A accumulator):
key(std::move(key)),value(std::move(value)), accumulator(std::move(accumulator)){}
std::string getKey(){return key;}
V getValue(){return value;}
A getAccumulator(){ return accumulator;}
};
template<typename MapperInputT, typename ResultT>
std::vector<ResultT> mymap_ip(MapperInputT input){
std::smatch ip_match;
std::string line = input.getLine();
std::regex_search(line, ip_match, std::regex("^[^ ]+"));
return std::vector<ResultT> {ResultT(ip_match[0], 1)};
}
template<typename ReducerInput, typename ResultT>
ResultT myreduce_ip(ReducerInput input){
return ResultT(input.getKey(), input.getValue() + input.getAccumulator());
}
template<typename MapperInputT, typename ResultT, typename ReducerInputT, typename M, typename R>
std::map<std::string, ResultT> mapreduce(const std::string& inputf, M &mapfun, R &reducefun){
std::string line;
std::map<std::string, ResultT> results;
std::ifstream ifs(inputf);
if(!ifs.is_open()){
std::cout<<"unable to open file"<<std::endl;
}
while(getline(ifs, line)){
std::vector<ResultT> m_results = mapfun(MapperInputT(line));
for(auto r: m_results){
auto it = results.find(r.getKey());
ResultT acc = it == results.end() ? ResultT(r.getKey()) : it->second;
ResultT new_acc = reducefun(ReducerInputT(r.getKey(), r.getValue(), acc.getValue()));
results[new_acc.getKey()] = new_acc;
}
}
return results;
}
int main() {
using ResultT = SimpleResult<int>;
using ReducerInputT = SimpleReducerInput<int, int>;
auto results = mapreduce<SimpleMapperInput, ResultT , ReducerInputT >(
"../input.log",
mymap_ip<SimpleMapperInput, ResultT>,
myreduce_ip<ReducerInputT, ResultT >);
for(auto &x: results){
std::cout<<x.first<<": "<<x.second.getValue()<<std::endl;
}
}