-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDistanceFinder.java
147 lines (100 loc) · 4.87 KB
/
DistanceFinder.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
public class DistanceFinder {
private static class DFMapper1 extends Mapper<LongWritable, Text, LongWritable, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] vals = value.toString().split("\\t", 2);
context.write(new LongWritable(Long.parseLong(vals[0])), new Text(vals[1]));
}
}
private static class DFReducer1 extends Reducer<LongWritable, Text, LongWritable, Text> {
@Override
protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int m = context.getConfiguration().getInt("mw", -1);
double[] ai = new double[m];
double[] bi = new double[m];
int i = (int)key.get() - 1;
for (Text value : values) {
if (value.toString().contains("\t")) {
String[] keyVal = value.toString().split("\\t");
ai[Integer.parseInt(keyVal[0]) - 1] = Double.parseDouble(keyVal[1]);
} else {
String[] vals = value.toString().split(",");
for (int j = 0; j < m; j++) {
bi[j] = Double.parseDouble(vals[j]);
}
}
}
double difference = 0d;
for (int j = 0; j < m; j++) {
difference += Math.pow(ai[j] - bi[j], 2d);
}
context.write(new LongWritable(i + 1), new Text(difference + ""));
}
}
private static class DFMapper2 extends Mapper<LongWritable, Text, LongWritable, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] vals = value.toString().split("\\t", 2);
context.write(new LongWritable(1), new Text(vals[1]));
}
}
private static class DFReducer2 extends Reducer<LongWritable, Text, LongWritable, Text> {
@Override
protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
double sum = 0d;
for (Text value : values) {
sum += Double.parseDouble(value.toString());
}
double result = Math.sqrt(sum);
context.write(new LongWritable(1), new Text(result + ""));
}
}
private Configuration configuration;
private String Xpath;
private String FGPath;
private String outputDir;
private String resultFileName;
public DistanceFinder(Configuration configuration, String XPath, String X1Path, String outputDir, String resultFileName) {
this.configuration = configuration;
this.Xpath = XPath;
this.FGPath = X1Path;
this.outputDir = outputDir;
this.resultFileName = resultFileName;
}
private Double getResult(FileSystem fileSystem, Path file) throws IOException {
FSDataInputStream input = fileSystem.open(file);
String line = new BufferedReader(new InputStreamReader(input)).readLine();
return Double.parseDouble(line.split("\t")[1]);
}
public Double run() throws IOException, ClassNotFoundException, InterruptedException {
String wd = configuration.get("wd");
Path wdPath = new Path(wd);
Job job = Job.getInstance(configuration, "DF1");
Path tmpPath = new Path(configuration.get("wd"), "dftmp");
job.setJarByClass(MRNMF.class);
FileInputFormat.addInputPath(job, new Path(Xpath));
MatrixUpdater.addInpuPath(job, new Path(FGPath));
FileOutputFormat.setOutputPath(job, tmpPath);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setMapperClass(DFMapper1.class);
job.setReducerClass(DFReducer1.class);
job.waitForCompletion(true);
job = Job.getInstance(configuration, "DF2");
job.setJarByClass(MRNMF.class);
MatrixUpdater.addInpuPath(job, tmpPath);
FileOutputFormat.setOutputPath(job, new Path(wdPath, outputDir));
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setMapperClass(DFMapper2.class);
job.setReducerClass(DFReducer2.class);
job.waitForCompletion(true);
FileSystem wdfs = wdPath.getFileSystem(new Configuration());
FileUtil.copyMerge(wdfs, new Path(wd, outputDir), wdfs, new Path(wd, resultFileName), false, new Configuration(), "");
return getResult(wdfs, new Path(wd, resultFileName));
}
}