Skip to content

Commit f50e276

Browse files
committed
Use a special counter job.getCounters().findCounter("Derived triples",
"writeJustificationToMapReduceContext").getValue();
1 parent 095e5f2 commit f50e276

File tree

2 files changed

+155
-57
lines changed

2 files changed

+155
-57
lines changed

mrj-0.1/src/cn/edu/neu/mitt/mrj/io/dbs/CassandraDB.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -271,9 +271,12 @@ public Cassandra.Iface getDBClient(){
271271
* Get the row count according to the triple type.
272272
* @return row count.
273273
*/
274-
public long getRowCountAccordingTripleType(int tripletype){
274+
public long getRowCountAccordingTripleType(int tripletype, int limit){
275275
String query = "SELECT COUNT(*) FROM " + KEYSPACE + "." + COLUMNFAMILY_JUSTIFICATIONS +
276276
" WHERE " + COLUMN_TRIPLE_TYPE + " = " + tripletype;
277+
278+
if (limit > 0)
279+
query = query + " LIMIT " + limit;
277280

278281
long num = 0;
279282
try {
@@ -415,6 +418,8 @@ public static void writeJustificationToMapReduceContext(
415418
variables.add(ByteBufferUtil.bytes(source.getStep())); // It corresponds to COLUMN_INFERRED_STEPS where steps = 0 means an original triple
416419
context.write(keys, variables);
417420

421+
// Added by WuGang, 2015-06-04
422+
context.getCounter("Derived triples", "writeJustificationToMapReduceContext").increment(1);
418423
}
419424

420425
public boolean loadSetIntoMemory(Set<Long> schemaTriples, Set<Integer> filters, int previousStep) throws IOException, InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException {
@@ -582,7 +587,7 @@ public Map<Long, Collection<Long>> loadMapIntoMemory(Set<Integer> filters) throw
582587
return loadMapIntoMemory(filters, false);
583588
}
584589

585-
// 返回的key就是triple的subject,value是object
590+
// ���ص�key����triple��subject��value��object
586591
public Map<Long, Collection<Long>> loadMapIntoMemory(Set<Integer> filters, boolean inverted) throws IOException, InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException {
587592
long startTime = System.currentTimeMillis();
588593

@@ -665,7 +670,7 @@ public static void main(String[] args) {
665670
System.out.println(schemaTriples);
666671

667672
//modified 2015/5/19
668-
System.out.println("Transitive: " + db.getRowCountAccordingTripleType(TriplesUtils.TRANSITIVE_TRIPLE));
673+
System.out.println("Transitive: " + db.getRowCountAccordingTripleType(TriplesUtils.TRANSITIVE_TRIPLE, 0));
669674

670675
System.exit(0);
671676
} catch (TTransportException e) {

mrj-0.1/src/cn/edu/neu/mitt/mrj/reasoner/owl/OWLReasoner.java

Lines changed: 147 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class OWLReasoner extends Configured implements Tool {
3737
public static final String OWL_PROP_INHERITANCE_TMP = "/dir-tmp-prop-inheritance/";
3838
public static final String OWL_PROP_INHERITANCE = "/dir-prop-inheritance/";
3939
public static final String OWL_TRANSITIVITY_BASE = OWL_PROP_INHERITANCE_TMP + "dir-transitivity-base/";
40-
public static final String OWL_TRANSITIVITY = "dir-transitivity/"; // Added by WuGang 2010-08-25,新加的目录
40+
public static final String OWL_TRANSITIVITY = "dir-transitivity/"; // Added by WuGang 2010-08-25���¼ӵ�Ŀ¼
4141

4242
public static final String OWL_SYNONYMS_TABLE = "dir-table-synonyms/";
4343
public static final String OWL_SYNONYMS_TABLE_NEW = "_table_synonyms_new/";
@@ -111,69 +111,132 @@ public static void main(String[] args) {
111111
System.exit(0);
112112
}
113113

114+
// public long launchClosure(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
115+
// parseArgs(args);
116+
// long derivedTriples = 0;
117+
// long totalTriple = 0;
118+
// long startTime = System.currentTimeMillis();
119+
//
120+
// boolean firstCycle = true;
121+
// int currentStep = 0;
122+
// int lastDerivationStep = 0;
123+
//
124+
// do {
125+
// if (!firstCycle && lastDerivationStep == (currentStep - 4))
126+
// break;
127+
// currentStep++;
128+
// System.out.println(">>>>>>>>>>> Start new OWL Reasoner loop <<<<<<<<<<<");
129+
// long propDerivation = inferPropertiesInheritance(args);
130+
// System.out.println("----------- End inferPropertiesInheritance");
131+
// derivedTriples = inferTransitivityStatements(args) + propDerivation;
132+
// System.out.println("----------- End inferTransitivityStatements");
133+
// if (derivedTriples > 0)
134+
// lastDerivationStep = currentStep;
135+
//
136+
// if (!firstCycle && lastDerivationStep == (currentStep - 4))
137+
// break;
138+
// currentStep++;
139+
// long sameAsDerivation = inferSameAsStatements(args);
140+
// System.out.println("----------- End inferSameAsStatements");
141+
// derivedTriples += sameAsDerivation;
142+
// if (sameAsDerivation > 0)
143+
// lastDerivationStep = currentStep;
144+
//
145+
// if (!firstCycle && lastDerivationStep == (currentStep - 4))
146+
// break;
147+
// currentStep++;
148+
// long equivalenceDerivation = inferEquivalenceStatements(args);
149+
// System.out.println("-----------inferEquivalenceStatements");
150+
// derivedTriples += equivalenceDerivation;
151+
// if (equivalenceDerivation > 0) lastDerivationStep = currentStep;
152+
//
153+
// if (!firstCycle && lastDerivationStep == (currentStep - 4))
154+
// break;
155+
// currentStep++;
156+
// long hasValueDerivation = inferHasValueStatements(args);
157+
// System.out.println("-----------inferHasValueStatements����");
158+
// derivedTriples += hasValueDerivation;
159+
// if (hasValueDerivation > 0) lastDerivationStep = currentStep;
160+
//
161+
// if (!firstCycle && lastDerivationStep == (currentStep - 4))
162+
// break;
163+
// currentStep++;
164+
// long someAllDerivation = inferSomeAndAllValuesStatements(args);
165+
// System.out.println("-----------inferSomeAndAllValuesStatements����");
166+
// derivedTriples += someAllDerivation;
167+
// if (someAllDerivation > 0) lastDerivationStep = currentStep;
168+
//
169+
// totalTriple += derivedTriples;
170+
// firstCycle = false;
171+
// } while (derivedTriples > 0);
172+
//
173+
// log.info("Time (in seconds): " + (System.currentTimeMillis() - startTime)/1000);
174+
// log.info("Number derived triples: " + totalTriple);
175+
//
176+
// return totalTriple;
177+
// }
178+
114179
public long launchClosure(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
115180
parseArgs(args);
116-
long derivedTriples = 0;
117-
long totalTriple = 0;
181+
long approxiDerivedTriples = 0;
182+
long approxiDerivedTriplesPrevious = 0;
183+
long approxiTotalTriple = 0;
118184
long startTime = System.currentTimeMillis();
119185

120186
boolean firstCycle = true;
121187
int currentStep = 0;
122188
int lastDerivationStep = 0;
123189

124190
do {
125-
if (!firstCycle && lastDerivationStep == (currentStep - 4))
126-
break;
191+
approxiDerivedTriplesPrevious = approxiDerivedTriples;
192+
approxiDerivedTriples = 0;
193+
127194
currentStep++;
128195
System.out.println(">>>>>>>>>>> Start new OWL Reasoner loop <<<<<<<<<<<");
129196
long propDerivation = inferPropertiesInheritance(args);
130197
System.out.println("----------- End inferPropertiesInheritance");
131-
derivedTriples = inferTransitivityStatements(args) + propDerivation;
198+
approxiDerivedTriples = inferTransitivityStatements(args) + propDerivation;
132199
System.out.println("----------- End inferTransitivityStatements");
133-
if (derivedTriples > 0)
200+
if (approxiDerivedTriples > 0)
134201
lastDerivationStep = currentStep;
135202

136-
if (!firstCycle && lastDerivationStep == (currentStep - 4))
137-
break;
138203
currentStep++;
139204
long sameAsDerivation = inferSameAsStatements(args);
140205
System.out.println("----------- End inferSameAsStatements");
141-
derivedTriples += sameAsDerivation;
206+
approxiDerivedTriples += sameAsDerivation;
142207
if (sameAsDerivation > 0)
143208
lastDerivationStep = currentStep;
144209

145-
if (!firstCycle && lastDerivationStep == (currentStep - 4))
146-
break;
147210
currentStep++;
148211
long equivalenceDerivation = inferEquivalenceStatements(args);
149212
System.out.println("-----------inferEquivalenceStatements");
150-
derivedTriples += equivalenceDerivation;
151-
if (equivalenceDerivation > 0) lastDerivationStep = currentStep;
213+
approxiDerivedTriples += equivalenceDerivation;
214+
if (equivalenceDerivation > 0)
215+
lastDerivationStep = currentStep;
152216

153-
if (!firstCycle && lastDerivationStep == (currentStep - 4))
154-
break;
155217
currentStep++;
156218
long hasValueDerivation = inferHasValueStatements(args);
157-
System.out.println("-----------inferHasValueStatements结束");
158-
derivedTriples += hasValueDerivation;
159-
if (hasValueDerivation > 0) lastDerivationStep = currentStep;
219+
System.out.println("-----------inferHasValueStatements����");
220+
approxiDerivedTriples += hasValueDerivation;
221+
if (hasValueDerivation > 0)
222+
lastDerivationStep = currentStep;
160223

161-
if (!firstCycle && lastDerivationStep == (currentStep - 4))
162-
break;
163224
currentStep++;
164225
long someAllDerivation = inferSomeAndAllValuesStatements(args);
165-
System.out.println("-----------inferSomeAndAllValuesStatements结束");
166-
derivedTriples += someAllDerivation;
167-
if (someAllDerivation > 0) lastDerivationStep = currentStep;
226+
System.out.println("-----------inferSomeAndAllValuesStatements����");
227+
approxiDerivedTriples += someAllDerivation;
228+
if (someAllDerivation > 0)
229+
lastDerivationStep = currentStep;
168230

169-
totalTriple += derivedTriples;
231+
// approxiTotalTriple += approxiDerivedTriples;
170232
firstCycle = false;
171-
} while (derivedTriples > 0);
233+
} while ((approxiDerivedTriples - approxiDerivedTriplesPrevious) > 0);
172234

173235
log.info("Time (in seconds): " + (System.currentTimeMillis() - startTime)/1000);
174-
log.info("Number derived triples: " + totalTriple);
236+
log.info("Approximate number derived triples: " + approxiDerivedTriples);
175237

176-
return totalTriple;
238+
// return approxiTotalTriple;
239+
return approxiDerivedTriples;
177240
}
178241

179242
@Override
@@ -205,7 +268,7 @@ private long inferPropertiesInheritance(String[] args) throws IOException, Inter
205268
job.setReducerClass(OWLNotRecursiveReducer.class);
206269

207270
job.waitForCompletion(true);
208-
271+
209272

210273
Counter derivedTriples = job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter","REDUCE_OUTPUT_RECORDS");
211274
//Remove the transitivity triples
@@ -220,7 +283,10 @@ private long inferPropertiesInheritance(String[] args) throws IOException, Inter
220283
else
221284
shouldInferTransitivity = false;
222285

223-
return totalDerivation;
286+
long approximateDerivedCount = job.getCounters().findCounter("Derived triples", "writeJustificationToMapReduceContext").getValue();
287+
return approximateDerivedCount;
288+
289+
// return totalDerivation;
224290
}
225291

226292

@@ -230,19 +296,20 @@ private long inferPropertiesInheritance(String[] args) throws IOException, Inter
230296
private long inferTransitivityStatements(String[] args)
231297
throws IOException, InterruptedException, ClassNotFoundException {
232298
boolean derivedNewStatements = true;
233-
// System.out.println("在inferTransitivityStatements里头。");
299+
// System.out.println("��inferTransitivityStatements��ͷ��");
234300

235301
// We'll not use filesystem but db.getTransitiveStatementsCount()
236302
long derivation = 0;
237303
int level = 0;
238304

239305
//modified 2015/5/19
240-
long beforeInferCount = db.getRowCountAccordingTripleType(TriplesUtils.TRANSITIVE_TRIPLE);
306+
// long beforeInferCount = db.getRowCountAccordingTripleType(TriplesUtils.TRANSITIVE_TRIPLE);
307+
long approximatePreviousDerivedCount = 0;
241308

242309
//modified 2015/5/19
243310
//for(int i = 0;i <= 3; i++){
244-
while ((beforeInferCount > 0) && derivedNewStatements && shouldInferTransitivity) {
245-
// System.out.println("开始在inferTransitivityStatements的while循环中寻找。");
311+
while (shouldInferTransitivity) {
312+
// System.out.println("��ʼ��inferTransitivityStatements��whileѭ����Ѱ�ҡ�");
246313
level++;
247314

248315
//Configure input. Take only the directories that are two levels below
@@ -266,15 +333,23 @@ private long inferTransitivityStatements(String[] args)
266333
// About duplication, we will modify the checkTransitivity to return transitive triple counts
267334
// and then do subtraction.
268335

269-
long afterInferCount = db.getRowCountAccordingTripleType(TriplesUtils.TRANSITIVE_TRIPLE);
270-
derivation = afterInferCount - beforeInferCount;
271-
derivedNewStatements = (derivation > 0);
272-
beforeInferCount = afterInferCount; // Update beforeInferCount
336+
// long afterInferCount = db.getRowCountAccordingTripleType(TriplesUtils.TRANSITIVE_TRIPLE);
337+
// derivation = afterInferCount - beforeInferCount;
338+
// derivedNewStatements = (derivation > 0);
339+
// beforeInferCount = afterInferCount; // Update beforeInferCount
340+
341+
long approximateDerivedCount = job.getCounters().findCounter("Derived triples", "writeJustificationToMapReduceContext").getValue();
342+
343+
if ((approximateDerivedCount - approximatePreviousDerivedCount) == 0)
344+
break;
345+
else
346+
approximatePreviousDerivedCount = approximateDerivedCount;
273347
}
274348

275349
previousTransitiveDerivation = step;
276350

277-
return derivation;
351+
// return derivation;
352+
return approximatePreviousDerivedCount;
278353
}
279354

280355

@@ -295,7 +370,7 @@ private long inferSameAsStatements(String[] args) {
295370
//i++;
296371
//if (i == 3)
297372
// return 0;
298-
if (db.getRowCountAccordingTripleType(TriplesUtils.DATA_TRIPLE_SAME_AS)==0) // We need not to infer on SameAs
373+
if (db.getRowCountAccordingTripleType(TriplesUtils.DATA_TRIPLE_SAME_AS, 1)==0) // We need not to infer on SameAs
299374
return 0;
300375

301376
Set<Integer> filters = new HashSet<Integer>();
@@ -326,14 +401,15 @@ private long inferSameAsStatements(String[] args) {
326401
//Filter the table.
327402

328403
//modified 2015/5/19
329-
long tableSize = db.getRowCountAccordingTripleType(TriplesUtils.SYNONYMS_TABLE);
404+
long testtableSize = db.getRowCountAccordingTripleType(TriplesUtils.SYNONYMS_TABLE, 1);
330405

331406
// System.out.println("tableSize Ϊ : " + tableSize);
332407
// System.out.println("sizeDictionary Ϊ : " + sizeDictionary);
333408
// System.out.println("derivedTriples Ϊ : " + derivedTriples);
334409

335410
//modified 2015/5/19
336-
if (tableSize > sizeDictionary || derivedTriples > 0) {
411+
// modified by WuGang, 2015-06-04
412+
if (testtableSize > 0 || derivedTriples > 0) {
337413
//for(int j =0 ;j <= 3 ; j++){
338414
//1) Calculate the URIs distribution and get the first 2M.
339415
job = MapReduceReasonerJobConfig.createNewJob(
@@ -418,7 +494,8 @@ private long inferSameAsStatements(String[] args) {
418494

419495
}
420496
//modified 2015/5/19
421-
sizeDictionary = tableSize;
497+
// Commented by WuGang, 2015-06-04
498+
// sizeDictionary = tableSize;
422499

423500
} catch (Exception e) {
424501
log.error("Job execution has failed", e);
@@ -469,8 +546,10 @@ private long inferSomeAndAllValuesStatements(String[] args) throws IOException,
469546
// Added by Wugang 20150111
470547
//long countRule15 = db.getRowCountAccordingRule((int)TriplesUtils.OWL_HORST_15); // see OWLAllSomeValuesReducer
471548
//long countRule16 = db.getRowCountAccordingRule((int)TriplesUtils.OWL_HORST_16); // see OWLAllSomeValuesReducer
549+
long approximatePreviousDerivedCount = 0;
472550

473-
while (derivedNewStatements) {
551+
while(true){
552+
// while (derivedNewStatements) {
474553
step++;
475554
Job job = MapReduceReasonerJobConfig.createNewJob(
476555
OWLReasoner.class,
@@ -495,14 +574,24 @@ private long inferSomeAndAllValuesStatements(String[] args) throws IOException,
495574
// totalDerivation = countRule15 + countRule16;
496575

497576
derivedNewStatements = (totalDerivation > 0);
577+
578+
579+
long approximateDerivedCount = job.getCounters().findCounter("Derived triples", "writeJustificationToMapReduceContext").getValue();
580+
581+
if ((approximateDerivedCount - approximatePreviousDerivedCount) == 0)
582+
break;
583+
else
584+
approximatePreviousDerivedCount = approximateDerivedCount;
585+
498586
}
499587

500588
// Added by Wugang 20150111
501589
//countRule15 = db.getRowCountAccordingRule((int)TriplesUtils.OWL_HORST_15) - countRule15; // see OWLAllSomeValuesReducer
502590
//countRule16 = db.getRowCountAccordingRule((int)TriplesUtils.OWL_HORST_16) - countRule16; // see OWLAllSomeValuesReducer
503591
//totalDerivation = countRule15 + countRule16;
504592

505-
return totalDerivation;
593+
// return totalDerivation;
594+
return approximatePreviousDerivedCount;
506595
}
507596

508597
/*
@@ -523,7 +612,7 @@ private long inferHasValueStatements(String[] args) throws IOException, Interrup
523612
numMapTasks,
524613
numReduceTasks, true, true);
525614

526-
long schemaOnPropertySize = db.getRowCountAccordingTripleType(TriplesUtils.SCHEMA_TRIPLE_ON_PROPERTY);
615+
long schemaOnPropertySize = db.getRowCountAccordingTripleType(TriplesUtils.SCHEMA_TRIPLE_ON_PROPERTY, 1);
527616
if (schemaOnPropertySize == 0)
528617
return 0;
529618

@@ -539,14 +628,18 @@ private long inferHasValueStatements(String[] args) throws IOException, Interrup
539628
job.waitForCompletion(true);
540629

541630
// Get inferred count
542-
if (job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "REDUCE_OUTPUT_RECORDS").getValue() > 0) {
543-
// countRule14a = db.getRowCountAccordingRule((int)TriplesUtils.OWL_HORST_14a) - countRule14a; // see OWLAllSomeValuesReducer
544-
// countRule14b = db.getRowCountAccordingRule((int)TriplesUtils.OWL_HORST_14b) - countRule14b; // see OWLAllSomeValuesReducer
545-
// return(countRule14a + countRule14b);
546-
return 0;
547-
} else {
548-
return 0;
549-
}
631+
// if (job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "REDUCE_OUTPUT_RECORDS").getValue() > 0) {
632+
// // countRule14a = db.getRowCountAccordingRule((int)TriplesUtils.OWL_HORST_14a) - countRule14a; // see OWLAllSomeValuesReducer
633+
// // countRule14b = db.getRowCountAccordingRule((int)TriplesUtils.OWL_HORST_14b) - countRule14b; // see OWLAllSomeValuesReducer
634+
// // return(countRule14a + countRule14b);
635+
// return 0;
636+
// } else {
637+
// return 0;
638+
// }
639+
640+
long approximateDerivedCount = job.getCounters().findCounter("Derived triples", "writeJustificationToMapReduceContext").getValue();
641+
return approximateDerivedCount;
642+
550643
}
551644

552645
}

0 commit comments

Comments
 (0)