Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for Counters inside the Execution Monad. #982

Merged
merged 4 commits into from
Jul 30, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions scalding-core/src/main/scala/com/twitter/scalding/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,24 @@ trait Config {
* This is *required* if you are using counters. You must use
* the same UniqueID as you used when defining your jobs.
*/
def setUniqueId(u: UniqueID): Config =
this + (UniqueID.UNIQUE_JOB_ID -> u.get)
def addUniqueId(u: UniqueID): Config =
update(UniqueID.UNIQUE_JOB_ID) {
case None => (Some(u.get), ())
case Some(str) => (Some((str.split(",").toSet + u.get).mkString(",")), ())
}._2

/**
* Allocate a new UniqueID if there is not one present
*/
def ensureUniqueId: (UniqueID, Config) =
update(UniqueID.UNIQUE_JOB_ID) {
case None =>
val uid = UniqueID.getRandom
(Some(uid.get), uid)
case s @ Some(str) =>
(s, UniqueID(str.split(",").head))
}

/*
* Add this class name and the md5 hash of it into the config
*/
Expand Down
143 changes: 127 additions & 16 deletions scalding-core/src/main/scala/com/twitter/scalding/Execution.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ limitations under the License.
package com.twitter.scalding

import com.twitter.algebird.monad.Reader
import com.twitter.algebird.Monoid
import com.twitter.scalding.cascading_interop.FlowListenerPromise
import com.twitter.scalding.Dsl.flowDefToRichFlowDef

Expand All @@ -27,7 +28,7 @@ import cascading.flow.{ FlowDef, Flow }
* This is a Monad, that represents a computation and a result
*/
sealed trait Execution[+T] {
import Execution.{ Mapped, FactoryExecution, FlatMapped, Zipped }
import Execution.{ Mapped, MapCounters, FactoryExecution, FlatMapped, Zipped }

/*
* First run this Execution, then move to the result
Expand All @@ -42,7 +43,28 @@ sealed trait Execution[+T] {
def map[U](fn: T => U): Execution[U] =
Mapped(this, fn)

def run(conf: Config, mode: Mode)(implicit cec: ConcurrentExecutionContext): Future[T]
/**
* Reads the counters into the value, but does not reset them.
* You may want .getAndResetCounters
*
*/
def getCounters: Execution[(T, ExecutionCounters)] =
MapCounters[T, (T, ExecutionCounters)](this, { case tc @ (t, c) => (tc, c) })

def getAndResetCounters: Execution[(T, ExecutionCounters)] =
getCounters.resetCounters

/**
* Resets the counters back to zero. This can happen if
* you want to reset before a zip or a call to flatMap
*/
def resetCounters: Execution[T] =
MapCounters[T, T](this, { case (t, c) => (t, ExecutionCounters.empty) })

def run(conf: Config, mode: Mode)(implicit cec: ConcurrentExecutionContext): Future[T] =
runStats(conf, mode)(cec).map(_._1)

protected def runStats(conf: Config, mode: Mode)(implicit cec: ConcurrentExecutionContext): Future[(T, ExecutionCounters)]

/**
* This is convenience for when we don't care about the result.
Expand All @@ -68,42 +90,55 @@ sealed trait Execution[+T] {
object Execution {

private case class Const[T](get: () => T) extends Execution[T] {
def run(conf: Config, mode: Mode)(implicit cec: ConcurrentExecutionContext) =
Future(get())
def runStats(conf: Config, mode: Mode)(implicit cec: ConcurrentExecutionContext) =
Future(get(), ExecutionCounters.empty)

override def unit = Const(() => ())
}
private case class FlatMapped[S, T](prev: Execution[S], fn: S => Execution[T]) extends Execution[T] {
def run(conf: Config, mode: Mode)(implicit cec: ConcurrentExecutionContext) = for {
s <- prev.run(conf, mode)
def runStats(conf: Config, mode: Mode)(implicit cec: ConcurrentExecutionContext) = for {
(s, st1) <- prev.runStats(conf, mode)
next = fn(s)
t <- next.run(conf, mode)
} yield t
(t, st2) <- next.runStats(conf, mode)
} yield (t, Monoid.plus(st1, st2))
}
private case class Mapped[S, T](prev: Execution[S], fn: S => T) extends Execution[T] {
def run(conf: Config, mode: Mode)(implicit cec: ConcurrentExecutionContext) =
prev.run(conf, mode).map(fn)
def runStats(conf: Config, mode: Mode)(implicit cec: ConcurrentExecutionContext) =
prev.runStats(conf, mode).map { case (s, stats) => (fn(s), stats) }

// Don't bother applying the function if we are mapped
override def unit = prev.unit
}
private case class MapCounters[T, U](prev: Execution[T],
fn: ((T, ExecutionCounters)) => (U, ExecutionCounters)) extends Execution[U] {
def runStats(conf: Config, mode: Mode)(implicit cec: ConcurrentExecutionContext) =
prev.runStats(conf, mode).map(fn)
}

private case class Zipped[S, T](one: Execution[S], two: Execution[T]) extends Execution[(S, T)] {
def run(conf: Config, mode: Mode)(implicit cec: ConcurrentExecutionContext) =
one.run(conf, mode).zip(two.run(conf, mode))
def runStats(conf: Config, mode: Mode)(implicit cec: ConcurrentExecutionContext) =
one.runStats(conf, mode).zip(two.runStats(conf, mode))
.map { case ((s, ss), (t, st)) => ((s, t), Monoid.plus(ss, st)) }

// Make sure we remove any mapping functions on both sides
override def unit = one.unit.zip(two.unit).map(_ => ())
}
private case class UniqueIdExecution[T](fn: UniqueID => Execution[T]) extends Execution[T] {
def runStats(conf: Config, mode: Mode)(implicit cec: ConcurrentExecutionContext) = {
val (uid, nextConf) = conf.ensureUniqueId
fn(uid).runStats(nextConf, mode)
}
}
/*
* This is the main class the represents a flow without any combinators
*/
private case class FlowDefExecution[T](result: (Config, Mode) => (FlowDef, (JobStats => Future[T]))) extends Execution[T] {
def run(conf: Config, mode: Mode)(implicit cec: ConcurrentExecutionContext) = {
def runStats(conf: Config, mode: Mode)(implicit cec: ConcurrentExecutionContext) = {
for {
(flowDef, fn) <- Future(result(conf, mode))
jobStats <- ExecutionContext.newContext(conf)(flowDef, mode).run
t <- fn(jobStats)
} yield t
} yield (t, ExecutionCounters.fromJobStats(jobStats))
}

/*
Expand All @@ -124,8 +159,8 @@ object Execution {
}
}
private case class FactoryExecution[T](result: (Config, Mode) => Execution[T]) extends Execution[T] {
def run(conf: Config, mode: Mode)(implicit cec: ConcurrentExecutionContext) =
unwrap(conf, mode, this).run(conf, mode)
def runStats(conf: Config, mode: Mode)(implicit cec: ConcurrentExecutionContext) =
unwrap(conf, mode, this).runStats(conf, mode)

@annotation.tailrec
private def unwrap[U](conf: Config, mode: Mode, that: Execution[U]): Execution[U] =
Expand Down Expand Up @@ -169,6 +204,20 @@ object Execution {
fn: (Config, Mode) => ((FlowDef, JobStats => Future[T]))): Execution[T] =
FlowDefExecution(fn)

/**
* Use this to use counters/stats with Execution. You do this:
* Execution.withId { implicit uid =>
* val myStat = Stat("myStat") // uid is implicitly pulled in
* pipe.map { t =>
* if(someCase(t)) myStat.inc
* fn(t)
* }
* .writeExecution(mySink)
* }
*
*/
def withId[T](fn: UniqueID => Execution[T]): Execution[T] = UniqueIdExecution(fn)

/**
* This creates a new ExecutionContext, passes to the reader, builds the flow
* and cleans up the state of the FlowDef
Expand Down Expand Up @@ -262,3 +311,65 @@ object Execution {
go(exs.toList, from(Nil)).map(_.reverse)
}
}

trait ExecutionCounters {
def keys: Set[StatKey]
def apply(key: StatKey): Long = get(key).getOrElse(0L)
def get(key: StatKey): Option[Long]
def toMap: Map[StatKey, Long] = keys.map { k => (k, get(k).getOrElse(0L)) }.toMap
}

object ExecutionCounters {
def empty: ExecutionCounters = new ExecutionCounters {
def keys = Set.empty
def get(key: StatKey) = None
override def toMap = Map.empty
}

def fromCascading(cs: cascading.stats.CascadingStats): ExecutionCounters = new ExecutionCounters {
import scala.collection.JavaConverters._

val keys = (for {
group <- cs.getCounterGroups.asScala
counter <- cs.getCountersFor(group).asScala
} yield StatKey(counter, group)).toSet

def get(k: StatKey) =
if (keys(k)) {
// Yes, cascading is reversed frow what we did in Stats. :/
Some(cs.getCounterValue(k.group, k.counter))
} else None
}

def fromJobStats(js: JobStats): ExecutionCounters = {
val counters = js.counters
new ExecutionCounters {
def keys = for {
group <- counters.keySet
counter <- counters(group).keys
} yield StatKey(counter, group)

def get(k: StatKey) = counters.get(k.group).flatMap(_.get(k.counter))
}
}

/**
* This allows us to merge the results of two computations
*/
implicit def monoid: Monoid[ExecutionCounters] = new Monoid[ExecutionCounters] {
override def isNonZero(that: ExecutionCounters) = that.keys.nonEmpty
def zero = ExecutionCounters.empty
def plus(left: ExecutionCounters, right: ExecutionCounters) = {
val allKeys = left.keys ++ right.keys
val allValues = allKeys
.map { k => (k, left(k) + right(k)) }
.toMap
// Don't capture right and left
new ExecutionCounters {
def keys = allKeys
def get(k: StatKey) = allValues.get(k)
override def toMap = allValues
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ trait ExecutionContext {
// [error] (resultT, Try(mode.newFlowConnector(finalConf).connect(newFlowDef)))
try {
// identify the flowDef
val withId = config.setUniqueId(UniqueID.getIDFor(flowDef))
val withId = config.addUniqueId(UniqueID.getIDFor(flowDef))
val flow = mode.newFlowConnector(withId).connect(flowDef)
Success(flow)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class Job(val args: Args) extends FieldConversions with java.io.Serializable {
}

// Do this before the job is submitted, because the flowDef is transient
private[this] val uniqueId = UniqueID.getIDFor(flowDef).get
private[this] val uniqueId = UniqueID.getIDFor(flowDef)

/**
* Copy this job
Expand Down
56 changes: 42 additions & 14 deletions scalding-core/src/main/scala/com/twitter/scalding/JobStats.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import scala.collection.JavaConverters._
import cascading.flow.Flow
import cascading.stats.{ CascadeStats, CascadingStats, FlowStats }

import scala.util.Try
import scala.util.{ Failure, Try }

object JobStats {
def apply(stats: CascadingStats): JobStats = {
Expand All @@ -32,7 +32,7 @@ object JobStats {
})
}

private def counterMap(stats: CascadingStats): Map[String, Any] =
private def counterMap(stats: CascadingStats): Map[String, Map[String, Long]] =
stats.getCounterGroups.asScala.map { group =>
(group, stats.getCountersFor(group).asScala.map { counter =>
(counter, stats.getCounterValue(group, counter))
Expand All @@ -54,24 +54,52 @@ object JobStats {
"stopped" -> stats.isStopped,
"successful" -> stats.isSuccessful)

// TODO: this does not handle null
// https://github.com/twitter/scalding/issues/972
def toJsonValue(a: Any): String = {
Try(a.toString.toInt)
.recoverWith { case t: Throwable => Try(a.toString.toDouble) }
.recover {
case t: Throwable =>
val s = a.toString
"\"%s\"".format(s)
/**
* Returns the counters with Group String -> Counter String -> Long
*/
def toCounters(cMap: Any): Try[Map[String, Map[String, Long]]] =
// This really sucks, but this is what happens when you let Map[String, Any] into your code
cMap match {
case m: Map[_, _] => Try {
m.foldLeft(Map.empty[String, Map[String, Long]]) {
case (acc, (k: String, v: Any)) => v match {
case m: Map[_, _] =>
acc + (k -> m.foldLeft(Map.empty[String, Long]) {
case (acc2, (k: String, v: Long)) => acc2 + (k -> v)
case (_, kv) => sys.error("inner k, v not (String, Long):" + kv)
})
case _ => sys.error("inner values are not Maps: " + v)
}
case kv => sys.error("Map does not contain string keys: " + (kv))
}
}
.get
.toString
}
case _ => Failure(new Exception("%s not a Map[String, Any]".format(cMap)))
}

def toJsonValue(a: Any): String =
if (a == null) "null"
else {
Try(a.toString.toInt)
.recoverWith { case t: Throwable => Try(a.toString.toDouble) }
.recover {
case t: Throwable =>
val s = a.toString
"\"%s\"".format(s)
}
.get
.toString
}
}

// Simple wrapper for a Map that contains the useful info from the job flow's stats
// If you want to write this, call toMap and use json, etc... to write it
case class JobStats(toMap: Map[String, Any]) {
def counters: Map[String, Map[String, Long]] =
toMap.get("counters")
.map(JobStats.toCounters(_))
.getOrElse(sys.error("counters missing from: " + toMap))
.get

def toJson: String =
toMap.map { case (k, v) => "\"%s\" : %s".format(k, JobStats.toJsonValue(v)) }
.mkString("{", ",", "}")
Expand Down
Loading