Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions scalding-core/src/main/scala/com/twitter/scalding/Operations.scala
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,21 @@ package com.twitter.scalding {
}
}

/** In the typed API every reduce operation is handled by this Buffer */
private[scalding] object SkewMonitorCounters {
// Strangely, if group name and key name are different, then the counter would be zero
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is bug by itself. Can you confirm this is still true and not due to another issue? We don't want to make a bunch of groups.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be a bug. if you simply put

flowProcess.increment("TestGroup", "TestKey", 1)

in def operate(flowProcess: FlowProcess[_], call: BufferCall[Any]) {

and this should print 3 in the test

ReduceValueCounterTest. However it prints

PRINTING KEY AND GROUP! 0

val KeyCount = "scalding.debug.reduce.key.count"
val ValuesCountSum = "scalding.debug.reduce.value.count.sum"
val ValuesCountSquareSum = "scalding.debug.reduce.value.count.sum.square"
}

class CountingIterator[T](wraps: Iterator[T]) extends Iterator[T] {
// This Wrapper lets us know how many values have been iterated in an Iterator
private[this] var nextCalls = 0L
def hasNext = wraps.hasNext
def next = { nextCalls += 1; wraps.next }
def seen: Long = nextCalls
}

class TypedBufferOp[K, V, U](
conv: TupleConverter[K],
@transient reduceFn: (K, Iterator[V]) => Iterator[U],
Expand All @@ -487,9 +501,9 @@ package com.twitter.scalding {
def operate(flowProcess: FlowProcess[_], call: BufferCall[Any]) {
val oc = call.getOutputCollector
val key = conv(call.getGroup)
val values = call.getArgumentsIterator
val values = new CountingIterator(call.getArgumentsIterator
.asScala
.map(_.getObject(0).asInstanceOf[V])
.map(_.getObject(0).asInstanceOf[V]))

// Avoiding a lambda here
val resIter = reduceFnSer.get(key, values)
Expand All @@ -498,6 +512,17 @@ package com.twitter.scalding {
tup.set(0, resIter.next)
oc.add(tup)
}

val numValuesPerKey = values.seen

flowProcess.increment(SkewMonitorCounters.KeyCount, SkewMonitorCounters.KeyCount, 1L)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably better to use a group like "scalding.debug" since I think we want to group them up so they are easy to see next to each other in the job tracker UI.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want the group and counter to be the same string. We want the group to be something like "scalding debug" and the counter value is fine.

flowProcess.increment(SkewMonitorCounters.ValuesCountSum, SkewMonitorCounters.ValuesCountSum, numValuesPerKey)
flowProcess.increment(SkewMonitorCounters.ValuesCountSquareSum, SkewMonitorCounters.ValuesCountSquareSum, numValuesPerKey * numValuesPerKey)

// Uncomment the following to trigger the bug
//flowProcess.increment("TestGroup", "TestKey", 1)

}

}
}
44 changes: 44 additions & 0 deletions scalding-core/src/test/scala/com/twitter/scalding/CoreTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1817,6 +1817,50 @@ class CounterJobTest extends WordSpec with Matchers {
}
}

class ReduceValueCountJob(args: Args) extends Job(args) {
TypedPipe.from(List((1, (1, 1)), (2, (2, 2)), (1, (3, 3)), (3, (3, 3))))
.group
.forceToReducers
.sum
.toTypedPipe
.map{
case (a: Int, (b: Int, c: Int)) =>
(a, b, c)
}
.write(TypedTsv[(Int, Int, Int)](args("output")))
}

class ReduceValueCounterTest extends WordSpec with Matchers {
"Reduce Values Count" should {
JobTest(new com.twitter.scalding.ReduceValueCountJob(_))
.arg("output", "output0")
.counter("TestGroup", "TestKey"){
x => println("PRINTING KEY AND GROUP! " + x)
}
.counter(SkewMonitorCounters.KeyCount, SkewMonitorCounters.KeyCount){
x =>
x should be(3)
}
.counter(SkewMonitorCounters.ValuesCountSquareSum, SkewMonitorCounters.ValuesCountSquareSum) {
x =>
// key 1 has two values, thus 2^2 = 4. key 2 and 3 has only one respectively
x should be(4 + 1 + 1)
}
.counter(SkewMonitorCounters.ValuesCountSum, SkewMonitorCounters.ValuesCountSum) {
x =>
// sum of keys = 2 (for key 1) + 1 (for key 2) + 1 (for key 3)
x should be (2 + 1 + 1)
}
.sink[(Int, Int, Int)](TypedTsv[(Int, Int, Int)]("output0")){
tuples =>

}
.runHadoop
.finish
}

}

object DailySuffixTsvJob {
val strd1 = "2014-05-01"
val strd2 = "2014-05-02"
Expand Down