diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/CoGrouped.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/CoGrouped.scala index fed2756fb6..1b999e58f5 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/CoGrouped.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/CoGrouped.scala @@ -163,7 +163,14 @@ trait CoGrouped[K, +R] extends KeyedListLike[K, R, CoGrouped] with CoGroupable[K def reducers = self.reducers def keyOrdering = self.keyOrdering def joinFunction = { (k: K, leftMost: Iterator[CTuple], joins: Seq[Iterable[CTuple]]) => - fn(k, joinF(k, leftMost, joins)) + val joined = joinF(k, leftMost, joins) + /* + * After the join, if the key has no values, don't present it to the mapGroup + * function. Doing so would break the invariant: + * + * a.join(b).toTypedPipe.group.mapGroup(fn) == a.join(b).mapGroup(fn) + */ + if (joined.nonEmpty) fn(k, joined) else Iterator.empty } } } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/Grouped.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/Grouped.scala index 54434982c2..3b422afb59 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/Grouped.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/Grouped.scala @@ -260,7 +260,11 @@ case class ValueSortedReduce[K, V1, V2]( override def mapGroup[V3](fn: (K, Iterator[V2]) => Iterator[V3]) = { // don't make a closure val localRed = reduceFn - val newReduce = { (k: K, iter: Iterator[V1]) => fn(k, localRed(k, iter)) } + val newReduce = { (k: K, iter: Iterator[V1]) => + val step1 = localRed(k, iter) + // Only pass non-Empty iterators to subsequent functions + if (step1.nonEmpty) fn(k, step1) else Iterator.empty + } ValueSortedReduce[K, V1, V3]( keyOrdering, mapped, valueSort, newReduce, reducers) } @@ -293,7 +297,11 @@ case class IteratorMappedReduce[K, V1, V2]( override def mapGroup[V3](fn: (K, Iterator[V2]) => Iterator[V3]) = { // don't make a closure val localRed = reduceFn - val newReduce = { (k: K, iter: Iterator[V1]) => fn(k, localRed(k, iter)) } + val newReduce = { (k: K, iter: Iterator[V1]) => + val step1 = localRed(k, iter) + // Only pass non-Empty iterators to subsequent functions + if (step1.nonEmpty) fn(k, step1) else Iterator.empty + } copy(reduceFn = newReduce) } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/KeyedList.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/KeyedList.scala index 7b7f3f8e9c..2ac3e64aae 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/KeyedList.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/KeyedList.scala @@ -70,6 +70,10 @@ trait KeyedListLike[K, +T, +This[K, +T] <: KeyedListLike[K, T, This]] * Prefer this to toList, when you can avoid accumulating the whole list in memory. * Prefer sum, which is partially executed map-side by default. * Use mapValueStream when you don't care about the key for the group. + * + * Iterator is always Non-empty. + * Note, any key that has all values removed will not appear in subsequent + * .mapGroup/mapValueStream */ def mapGroup[V](smfn: (K, Iterator[T]) => Iterator[V]): This[K, V] diff --git a/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala index e8e8a6181c..f9c5afedbf 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala @@ -1103,6 +1103,29 @@ class TypedSelfLeftCrossTest extends Specification { } } +class JoinMapGroupJob(args: Args) extends Job(args) { + def r1 = TypedPipe.from(Seq((1, 10))) + def r2 = TypedPipe.from(Seq((1, 1), (2, 2), (3, 3))) + r1.groupBy(_._1).join(r2.groupBy(_._1)) + .mapGroup { case (a, b) => Iterator("a") } + .write(TypedTsv("output")) +} +class JoinMapGroupJobTest extends Specification { + import Dsl._ + noDetailedDiffs() + + "A JoinMapGroupJob" should { + JobTest(new JoinMapGroupJob(_)) + .sink[(Int, String)](TypedTsv[(Int, String)]("output")) { outBuf => + "not duplicate keys" in { + outBuf.toList must be_==(List((1, "a"))) + } + } + .run + .finish + } +} + class TypedSketchJoinJob(args: Args) extends Job(args) { val zero = TypedPipe.from(TypedTsv[(Int, Int)]("input0")) val one = TypedPipe.from(TypedTsv[(Int, Int)]("input1"))