Commit 1068eb4b authored by Steven Pisarski's avatar Steven Pisarski

Added akka remote and paralellism config, changed references to type List to...

Added akka remote and paralellism config, changed references to type List to Seq when possible, functionalized most of the rest remaining routines.
parent d0131c0d
......@@ -2,6 +2,7 @@ package com.cablelabs.eventgen
import java.util.Date
import _root_.akka.actor.Address
import com.cablelabs.eventgen.akka.GeneratorActors
import com.cablelabs.eventgen.akka.GeneratorActors.Event
import com.cablelabs.eventgen.algorithm.Model
......@@ -9,8 +10,6 @@ import com.cablelabs.eventgen.model.{InputDefinition, OutputDefinition}
import com.typesafe.scalalogging.slf4j.Logger
import org.slf4j.LoggerFactory
import scala.collection.mutable
/**
* Responsible for generating and routing event payloads
* @param inputDef - the inbound event definition
......@@ -19,9 +18,13 @@ import scala.collection.mutable
* @param outDefs - defines how where to send the events
* @param sendPastEvents - when true, events < now will not be routed
* @param numSchedulerThreads - the number of threads to allocate to quartz for each Akka quartz scheduler actor
* @param remoteAddr - the address to the remote actor system (new Address("akka", "sparkWorker", "bda-storm02", 7078))
* @param minActors - the minimum number of actors for parallelism
* @param maxActors - the maximum number of actors for parallelism
*/
class Engine(val inputDef: InputDefinition, val temporalAlgo: Model, val factAlgos: List[Model],
outDefs: Set[OutputDefinition], sendPastEvents: Boolean, numSchedulerThreads: Int)
class Engine(val inputDef: InputDefinition, val temporalAlgo: Model, val factAlgos: Seq[Model],
outDefs: Set[OutputDefinition], sendPastEvents: Boolean, numSchedulerThreads: Int,
remoteAddr: Address, minActors: Int, maxActors: Int)
extends Serializable {
private[this] def logger = Logger(LoggerFactory.getLogger("GeneratorEngine"))
......@@ -29,7 +32,8 @@ class Engine(val inputDef: InputDefinition, val temporalAlgo: Model, val factAlg
/**
* The main actor responsible for calling the engine for retrieving and to schedule the event to be emitted
*/
val generatorActor = GeneratorActors.generateAndSchedule(this, outDefs, new Date(), sendPastEvents, numSchedulerThreads)
val generatorActor = GeneratorActors.generateAndSchedule(this, outDefs, new Date(), sendPastEvents,
numSchedulerThreads, remoteAddr, minActors, maxActors)
/**
* Begins the event generation process indefinitely
......@@ -44,15 +48,14 @@ class Engine(val inputDef: InputDefinition, val temporalAlgo: Model, val factAlg
* @return - the newly generated event
*/
def nextEvent(seedEvent: Map[String, Any]): Map[String, Any] = {
val out = mutable.HashMap[String, Any]()
val dateOffset = temporalAlgo.predictRaw(inputDef.temporalAlgoFeatures(seedEvent)).toLong
val seedEventDate = inputDef.temporal.eventValue[Date](seedEvent)
out += inputDef.temporal.name -> new Date(dateOffset + seedEventDate.getTime)
inputDef.dimensionValues(seedEvent).foreach(p => out+= p._1 -> p._2)
factAlgos.foreach(p => {
out += p.field.name -> p.predict(inputDef.factAlgoFeatures(seedEvent, p.field.name))
})
val out1 = inputDef.temporal.name -> new Date(dateOffset + seedEventDate.getTime)
val out2 = inputDef.dimensionValues(seedEvent).map(p => p._1 -> p._2).toMap
val out3 = factAlgos.map(p => p.field.name -> p.predict(inputDef.factAlgoFeatures(seedEvent, p.field.name))).toMap
val out = out2 ++ out3 + out1
logger.debug(s"Generated next event $out")
out.toMap
out
}
}
\ No newline at end of file
......@@ -5,6 +5,7 @@ import javax.jms.{Connection, MessageProducer, Session}
import akka.actor._
import akka.camel.{CamelExtension, Oneway, Producer}
import akka.remote.RemoteScope
import akka.routing.{DefaultResizer, SmallestMailboxRouter}
import com.cablelabs.eventgen.Engine
import com.cablelabs.eventgen.model.OutputDefinition
......@@ -21,6 +22,8 @@ import org.slf4j.LoggerFactory
*/
object GeneratorActors {
private[this] def logger = Logger(LoggerFactory.getLogger("GeneratorActors"))
/**
* Returns the Spark actor system
* @return
......@@ -47,11 +50,14 @@ object GeneratorActors {
* @param startTime - the time when the generator is starting
* @param sendPastEvents - denotes whether or not to send back-dated events
* @param numSchedThreads - number of threads for each actor's quartz scheduler
* @param minActors - the minimum number of actors for parallelism
* @param maxActors - the maximum number of actors for parallelism
* @return - the main actor to which to send the seed events
*/
def generateAndSchedule(engine: Engine, outDefs: Set[OutputDefinition], startTime: Date, sendPastEvents: Boolean,
numSchedThreads: Int): ActorRef =
generateAndSchedule(actorSystem, engine, outDefs, startTime, sendPastEvents, numSchedThreads)
numSchedThreads: Int, remoteAddr: Address, minActors: Int, maxActors: Int): ActorRef =
generateAndSchedule(actorSystem, engine, outDefs, startTime, sendPastEvents, numSchedThreads, remoteAddr,
minActors, maxActors)
/**
* Creates the generator actor hierarchy generally when not running in a Spark
......@@ -61,21 +67,34 @@ object GeneratorActors {
* @param startTime - the time when the generator is starting
* @param sendPastEvents - denotes whether or not to send back-dated events
* @param numSchedThreads - number of threads for each actor's quartz scheduler
* @param remoteAddr - the address to the remote actor system
* @param minActors - the minimum number of actors for parallelism
* @param maxActors - the maximum number of actors for parallelism
* @return - the main actor to which to send the seed events
*/
private[akka] def generateAndSchedule(inActorSystem: ActorSystem, engine: Engine, outDefs: Set[OutputDefinition],
startTime: Date, sendPastEvents: Boolean, numSchedThreads: Int): ActorRef =
inActorSystem.actorOf(Props(classOf[GeneratorActor], engine, outDefs, startTime, sendPastEvents, numSchedThreads)
.withRouter(new SmallestMailboxRouter(resizer = new DefaultResizer(1, 50)))
// .withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka.tcp://sparkMaster@bda-storm01.cablelabs.com:7077"))))
.withMailbox("akka.actor.mailbox.unbounded-queue-based")
, "GeneratorActorHierarchy")
startTime: Date, sendPastEvents: Boolean, numSchedThreads: Int,
remoteAddr: Address, minActors: Int, maxActors: Int): ActorRef =
if(remoteAddr != null) {
logger.info(s"Deploying actors to remote actor system @ $remoteAddr")
inActorSystem.actorOf(Props(classOf[GeneratorActor], engine, outDefs, startTime, sendPastEvents, numSchedThreads)
.withRouter(new SmallestMailboxRouter(resizer = new DefaultResizer(minActors, maxActors)))
.withDeploy(Deploy(scope = RemoteScope(remoteAddr)))
.withMailbox("akka.actor.mailbox.unbounded-queue-based")
, "GeneratorActorHierarchy")
} else {
logger.info("Deploying actors to local actor system")
inActorSystem.actorOf(Props(classOf[GeneratorActor], engine, outDefs, startTime, sendPastEvents, numSchedThreads)
.withRouter(new SmallestMailboxRouter(resizer = new DefaultResizer(minActors, maxActors)))
.withMailbox("akka.actor.mailbox.unbounded-queue-based")
, "GeneratorActorHierarchy")
}
/**
* Contains a generated event which is being used for pattern matching in some of the actors
* @param values - the event values
*/
case class Event(dimString: String, values: Map[String, Any])
case class Event(dimString: String, values: Map[String, Any]) extends Serializable
/**
* The event object used by the generator's downstream actors
......@@ -83,7 +102,7 @@ object GeneratorActors {
* @param event - the event payload
* @param time - the time the event should be or have been sent
*/
case class DimTimeEvent(dimString: String, event: Map[String, Any], time: Date)
case class DimTimeEvent(dimString: String, event: Map[String, Any], time: Date) extends Serializable
/**
* The main actor responsible for obtaining the next event and scheduling it output
......@@ -127,6 +146,7 @@ object GeneratorActors {
class ScheduleAndNotify(val numThreads: Int, val startTime: Date,
val sendPastEvents: Boolean, val echo: Boolean,
outDefs: Set[OutputDefinition]) extends Actor {
import org.quartz.TriggerBuilder._
import scala.collection.JavaConversions._
......
......@@ -2,8 +2,6 @@ package com.cablelabs.eventgen.algorithm
import java.util.Date
import scala.collection.mutable.ArrayBuffer
/**
* Helpful methods when working with Machine Learning predictive algorithms
*/
......@@ -14,7 +12,7 @@ object AlgorithmUtil {
* @return - tuple2 where the two Lists (values/weights) have a polynomial applied
* their sizes will equal values.size * degree
*/
def polynomialWithWeights(degree: Int, values: List[Double], weights: List[Double]): (List[Double], List[Double]) = {
def polynomialWithWeights(degree: Int, values: Seq[Double], weights: Seq[Double]): (Seq[Double], Seq[Double]) = {
val thisDegree = if (degree < 1) 1 else degree
if (thisDegree != 1) (polynomial(thisDegree, values), polynomialWeights(thisDegree, weights))
else (values, weights)
......@@ -27,7 +25,7 @@ object AlgorithmUtil {
* @param values - the values to which to apply the polynomial function
* @return - a new list who's size is = values.size * degree
*/
def polynomial(degree: Int, values: List[Double]): List[Double] = {
def polynomial(degree: Int, values: Seq[Double]): Seq[Double] = {
val thisDegree = if (degree < 1) 1 else degree
if (thisDegree != 1) {
val out = Array.fill[Double](values.size * thisDegree)(0d)
......@@ -37,7 +35,7 @@ object AlgorithmUtil {
out(index) = Math.pow(values(col), deg + 1)
}
}
out.toList
out.toSeq
} else values
}
......@@ -47,7 +45,7 @@ object AlgorithmUtil {
* @param weights - the weights without any polynomial function applied
* @return - the weights with a polynomial function applied
*/
def polynomialWeights(degree: Int, weights: List[Double]): List[Double] = {
def polynomialWeights(degree: Int, weights: Seq[Double]): Seq[Double] = {
val thisDegree = if (degree < 1) 1 else degree
if (thisDegree != 1) {
val out = Array.fill[Double](weights.size * thisDegree)(0d)
......@@ -57,7 +55,7 @@ object AlgorithmUtil {
out(index) = weights(col)
}
}
out.toList
out.toSeq
} else weights
}
......@@ -68,7 +66,7 @@ object AlgorithmUtil {
* @param numIter - the number of times to perform the flattening function
* @return - a new list of flattened features
*/
def flatten(features: List[Double], numIter: Int): List[Double] =
def flatten(features: Seq[Double], numIter: Int): Seq[Double] =
if (numIter > 0) {
val flattened = features.map(num => {
if (num == 0) 0
......@@ -83,22 +81,21 @@ object AlgorithmUtil {
* @param dates - the dates to calculate (unsorted)
* @return - the tuple
*/
def getDurations(dates: List[Date]): (List[Date], List[Double], Double) = {
def getDurations(dates: Seq[Date]): (Seq[Date], Seq[Double], Double) = {
val sorted = dates.sortBy(x => x.getTime)
val durations = new ArrayBuffer[Double]()
var prevDate = new Date(0)
// TODO - try to achieve same with a higher order function or recursion
sorted.foreach(p => {
if (prevDate.getTime == 0) {
durations += 0d
} else {
durations += (p.getTime - prevDate.getTime).toDouble
}
prevDate = p
})
val durations = {
def loop(dates: Seq[Date], prevDate: Date, acc: List[Double]): Seq[Double] =
if (dates.isEmpty) acc
else {
val duration = if (prevDate.getTime == 0) 0d else (dates.head.getTime - prevDate.getTime).toDouble
loop(dates.tail, dates.head, acc.:+(duration))
}
loop(sorted, new Date(0), List())
}
val avg = if (durations.size > 1) durations.sum / (durations.size - 1) else 0d
assert(sorted.size == durations.size)
(sorted, durations.toList, avg)
(sorted, durations.toSeq, avg)
}
}
......@@ -22,14 +22,14 @@ trait Model extends Serializable {
* @param features - the feature set
* @return - the predicted value
*/
def predict(features: List[Double]): Any
def predict(features: Seq[Double]): Any
/**
* Returns the predicted value of double back to the client for translation
* @param features - the feature set
* @return - the predicted value
*/
def predictRaw(features: List[Double]): Double
def predictRaw(features: Seq[Double]): Double
}
/**
......@@ -37,7 +37,7 @@ trait Model extends Serializable {
*/
trait ConstantModel extends Model {
def value: Any
override def predict(features: List[Double]): Any = value
override def predict(features: Seq[Double]): Any = value
}
/**
......@@ -78,9 +78,9 @@ trait ClassificationModel extends SupervisedModel {
* The sorted keys contained in lableMap
* @return
*/
def labelKeys: List[Double]
def labelKeys: Seq[Double]
override def predict(features: List[Double]): Any = {
override def predict(features: Seq[Double]): Any = {
assert(featuresSize == features.size)
val pred = model.predict(new DenseVector(features.toArray))
val key = closestKey(pred, labelKeys.size / 2, 0)
......@@ -104,7 +104,7 @@ trait ClassificationModel extends SupervisedModel {
closestKey(value, newIndex, index)
}
override def predictRaw(features: List[Double]): Double = model.predict(new DenseVector(features.toArray))
override def predictRaw(features: Seq[Double]): Double = model.predict(new DenseVector(features.toArray))
}
/**
......@@ -118,12 +118,12 @@ trait RegressionModel extends SupervisedModel {
*/
private[algorithm] def model: SparkRegressionModel
override def predict(features: List[Double]): Any = {
override def predict(features: Seq[Double]): Any = {
assert(featuresSize == features.size)
field.convert(predictRaw(features))
}
override def predictRaw(features: List[Double]): Double = model.predict(new DenseVector(features.toArray))
override def predictRaw(features: Seq[Double]): Double = model.predict(new DenseVector(features.toArray))
}
/**
......@@ -136,7 +136,7 @@ class NaiveBayesModel(override val field: Field, override val trainingSet: RDD[L
val labelMap: Map[Double, Any], val lambda: Double) extends ClassificationModel {
val featuresSize = trainingSet.first().features.size
private[algorithm] val model = NaiveBayes.train(trainingSet, lambda)
val labelKeys = labelMap.keySet.toList.sortBy(x => x)
val labelKeys = labelMap.keySet.toSeq.sortBy(x => x)
}
/**
......@@ -147,7 +147,7 @@ class NaiveBayesModel(override val field: Field, override val trainingSet: RDD[L
* @param stepSize - the size of each step taken during gradient descent
*/
class LinearRegressionModel(override val field: Field, override val trainingSet: RDD[LabeledPoint],
val weights: List[Double] = List[Double](),val numIterations: Int, val stepSize: Double)
val weights: Seq[Double] = Seq[Double](),val numIterations: Int, val stepSize: Double)
extends RegressionModel {
val featuresSize = trainingSet.first().features.size
private[algorithm] val model = if (weights.isEmpty)
......@@ -162,14 +162,14 @@ class LinearRegressionModel(override val field: Field, override val trainingSet:
* Constant model that will return the longVal for each predict() & predictRaw() method call
*/
class ConstantIntModel(override val field: Field, override val value: Long) extends ConstantModel {
override def predictRaw(features: List[Double]): Double = value
override def predictRaw(features: Seq[Double]): Double = value
}
/**
* Constant model that will return the floatValfloatVal for each predict() & predictRaw() method call
*/
class ConstantFloatModel(override val field: Field, override val value: Double) extends ConstantModel {
override def predictRaw(features: List[Double]): Double = value
override def predictRaw(features: Seq[Double]): Double = value
}
/**
......@@ -177,5 +177,5 @@ class ConstantFloatModel(override val field: Field, override val value: Double)
* hash code value asd a Double
*/
class ConstantStringModel(override val field: Field, override val value: String) extends ConstantModel {
override def predictRaw(features: List[Double]): Double = value.hashCode.toDouble
override def predictRaw(features: Seq[Double]): Double = value.hashCode.toDouble
}
......@@ -18,6 +18,7 @@ object TemporalDimAccumParam extends AccumulatorParam[Map[String, List[Date]]] {
* @return - the newly combined map
*/
override def addInPlace(r1: Map[String, List[Date]], r2: Map[String, List[Date]]): Map[String, List[Date]] = {
// TODO - make more functional
val out = new collection.mutable.HashMap[String, List[Date]]()
r1.foreach(p1 =>
out += (p1._1 -> p1._2)
......@@ -55,6 +56,7 @@ object KeyValueSetAccumParam extends AccumulatorParam[Map[String, Set[Any]]] {
* @return - the combined
*/
override def addInPlace(r1: Map[String, Set[Any]], r2: Map[String, Set[Any]]): Map[String, Set[Any]] = {
// TODO - make more functional
val out = new collection.mutable.HashMap[String, Set[Any]]()
r1.foreach(p1 =>
out += (p1._1 -> p1._2)
......
......@@ -15,7 +15,6 @@ import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.slf4j.LoggerFactory
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
......@@ -158,11 +157,8 @@ class SparkAnalyzer(val data: RDD[(String, Date, Map[String, Any])], val inputDe
val accum = data.context.accumulator(Map[String, Set[Any]]())(KeyValueSetAccumParam)
data.foreach(p => {
val factMap = inputDef.factValues(p._3)
val map = new mutable.HashMap[String, Set[Any]]()
factMap.foreach(f => {
map += (f._1 -> Set(f._2))
})
accum += map.toMap
val map = factMap.map(f => f._1 -> Set(f._2))
accum += map
})
accum.value
}
......@@ -173,10 +169,7 @@ class SparkAnalyzer(val data: RDD[(String, Date, Map[String, Any])], val inputDe
*/
def dimEventsCount(): Map[String, Long] = {
logger.info("Retrieving the number of events for each dimensional set")
val out = data.map { p =>
p._1
}.countByValue()
out.toMap
data.map(p => p._1).countByValue().toMap
}
/**
......@@ -185,9 +178,7 @@ class SparkAnalyzer(val data: RDD[(String, Date, Map[String, Any])], val inputDe
*/
def events(): RDD[Map[String, Any]] = {
logger.info("Retrieving all parsed event payloads")
data.map(p => {
p._3
})
data.map(p => p._3)
}
/**
......@@ -198,22 +189,17 @@ class SparkAnalyzer(val data: RDD[(String, Date, Map[String, Any])], val inputDe
logger.info("Retrieving the training set for the temporal prediction algorithm")
if (inputDef.temporal.algoDef.isInstanceOf[SupervisedTraining]) {
val accum = data.context.accumulator(Map[String, List[Date]]())(TemporalDimAccumParam)
data.foreach(p => {
val map = new mutable.HashMap[String, List[Date]]()
map += (p._1 -> List[Date](p._2))
accum += map.toMap
})
data.foreach(p => accum += Map(p._1 -> List[Date](p._2)))
// Determine the duration of each event by dimensionality
// Key is the dimString and values contain the sorted dates and assocaited duration between events as well as the
// average duration
val accumMap = new mutable.HashMap[String, (List[Date], List[Double], Double)]()
accum.value.foreach(p => {
val durations = AlgorithmUtil.getDurations(p._2)
accumMap += (p._1 ->(durations._1, durations._2, durations._3))
})
val accumMap = accum.value.map(p => {
val d = AlgorithmUtil.getDurations(p._2)
p._1 -> (d._1, d._2, d._3)
}).toMap
val dMetrics = durationMetrics(accumMap.toMap)
val dMetrics = durationMetrics(accumMap)
// Build the training set
val labeledPoints = data.map { p =>
......@@ -271,11 +257,14 @@ class SparkAnalyzer(val data: RDD[(String, Date, Map[String, Any])], val inputDe
*
* Tuple containing the mean, median, first and third quartiles, and total duration of all events
*/
private[this] def durationMetrics(durations: Map[String, (List[Date], List[Double], Double)]) = {
private[this] def durationMetrics(durations: Map[String, (Seq[Date], Seq[Double], Double)]) = {
logger.info("Retrieving the metrics for durations generally required for determining the first event for the temporal prediction algorithm")
var total = 0d
var count = 0
// TODO make more functional
var list = new ArrayBuffer[Double]()
var earliest = new Date()
var latest = new Date(0)
// TODO - try and make more functional
......
......@@ -4,12 +4,9 @@ import java.security.MessageDigest
import com.cablelabs.eventgen.algorithm.AlgorithmUtil
import org.json4s.JsonAST.{JObject, JValue}
import org.json4s.StringInput
import org.json4s.native.JsonMethods
import org.json4s.native.JsonMethods._
import org.json4s.{JField, StringInput}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
/**
* Class defining the input fields used for initializing the event Generator
......@@ -24,94 +21,57 @@ class InputDefinition(val temporal: Temporal, val dimensionSet: Set[Dimension],
require(dimensionSet != null && dimensionSet.size > 0)
require(factSet != null && factSet.size > 0)
private[this] val tmpDims = mutable.Map[String, Dimension]()
private[this] val tmpFacts = mutable.Map[String, Fact]()
private[this] val dimPos = mutable.Map[String, Int]()
private[this] val tmpPosDims = ArrayBuffer[Dimension]()
private[this] val factPos = mutable.Map[String, Int]()
private[this] val tmpPosFacts = ArrayBuffer[Fact]()
private[this] val tmpFieldMap = mutable.Map[String, InputField]()
tmpFieldMap += (temporal.name -> temporal)
// TODO - make functional, this is ugly
/* Setup dimension data structures */
dimensionSet.foreach((dim: Dimension) => {
tmpFieldMap += (dim.name -> dim)
tmpDims += (dim.name -> dim)
dimPos += (dim.name -> dim.position)
})
dimPos.toList sortBy ( _._2 ) foreach {
case (key, value) => tmpPosDims += tmpDims.get(key).get
}
/**
* Dimensions in priority order
*/
val positionalDims = tmpPosDims.toList
/**
* The Dimensions by name
*/
val dimensions = tmpDims.toMap
val dimensions = dimensionSet.map(d => d.name -> d).toMap
/* Setup fact data structures */
factSet.foreach((fact: Fact) => {
tmpFieldMap += (fact.name -> fact)
tmpFacts += (fact.name -> fact)
factPos += (fact.name -> fact.position)
})
private[this] val dimPos = dimensionSet.map(d => d.name -> d.position).toSeq.sortBy( _._2 )
/**
* Contains all fields where the key is the field's name
* Dimensions in priority order
*/
val fieldMap = tmpFieldMap.toMap
val positionalDims = dimPos.map(d => dimensions(d._1))
factPos.toList sortBy ( _._2 ) foreach {
case (key, value) => tmpPosFacts += tmpFacts.get(key).get
}
/**
* Facts in priority order
* Contains all fields where the key is the field's name
*/
val positionalFacts = tmpPosFacts.toList
val fieldMap = dimensionSet.map(d => d.name -> d).toMap ++ factSet.map(f => f.name -> f).toMap + (temporal.name -> temporal)
/**
* The Facts by name
*/
val facts = tmpFacts.toMap
val facts = factSet.map(f => f.name -> f).toMap
/**
* Facts in priority order
*/
val positionalFacts = factSet.map(f => f.name -> f.position).toSeq.sortBy( _._2 ).map(f => facts(f._1))
/**
* Returns all of the fact values of an event in order by dependencies
* @param event - the event to parse
* @return - an ordered map of all fact values
*/
def factValues(event: Map[String, Any]): collection.mutable.LinkedHashMap[String, Any] = {
val out = new collection.mutable.LinkedHashMap[String, Any]()
facts.foreach((factEntry: (String, Fact)) => {
val key = factEntry._1
out += (key -> factEntry._2.eventValue[Any](event))
})
out
}
def factValues(event: Map[String, Any]): Map[String, Any] = facts.map(f => f._1 -> f._2.eventValue(event))
/**
* Returns all of the dimension values from an event in priority order
* @param event - the event to parse
* @return - an ordered map of all dimension values
*/
def dimensionValues(event: Map[String, Any]): collection.mutable.LinkedHashMap[String, Any] = {
val out = new collection.mutable.LinkedHashMap[String, Any]()
positionalDims.foreach((dim: Dimension) => {
out += (dim.name -> dim.eventValue[Any](event))
})
out
}
def dimensionValues(event: Map[String, Any]): Seq[(String, Any)] =
positionalDims.map(d => d.name -> d.eventValue[Any](event))
/**
* Returns an encoded String containing each dimension key/value in priority order
* @param event - the event to parse
* @return - a big long String
*/
def dimString(event: Map[String, Any]): String = {
val dimIdList = positionalDims.map(d => d.name + "::" + d.eventStringValue(event))
dimIdList filter (_.nonEmpty) mkString "|"
}
def dimString(event: Map[String, Any]): String =
positionalDims.map(d => d.name + "::" + d.eventStringValue(event)).filter(_.nonEmpty).mkString("|")
/**
* Hashing function below plagurized from http://code-redefined.blogspot.com/2009/05/md5-sum-in-scala.html
......@@ -153,13 +113,7 @@ class InputDefinition(val temporal: Temporal, val dimensionSet: Set[Dimension],
* @param event - the event to convert
* @return - the JSON object
*/
def toJson(event: Map[String, Any]): JValue = {
val fieldValueList = ArrayBuffer[JField]()
fieldMap.foreach((field: (String, InputField)) => {
fieldValueList += field._2.jField(event)
})
new JObject(fieldValueList.toList)
}
def toJson(event: Map[String, Any]): JValue = new JObject(fieldMap.map(f => f._2.jField(event)).toList)
/**
* Converts an event into a JSON String on a single line (not prety)
......@@ -192,11 +146,11 @@ class InputDefinition(val temporal: Temporal, val dimensionSet: Set[Dimension],
* @param event - the event to parse
* @return - the feature set
*/
def temporalAlgoFeatures(event: Map[String, Any]): List[Double] = {
def temporalAlgoFeatures(event: Map[String, Any]): Seq[Double] = {
temporal.algoDef match {
case algoDef:SupervisedTraining =>
algoFeatures(event, temporal.factIndex, algoDef.polyDegree, algoDef.flatten)
case _ => List[Double]()
case _ => Seq[Double]()
}
}
......@@ -206,12 +160,12 @@ class InputDefinition(val temporal: Temporal, val dimensionSet: Set[Dimension],
* @param name - the fact name for which the training set will be generated
* @return - the features
*/
def factAlgoFeatures(event: Map[String, Any], name: String): List[Double] = {
def factAlgoFeatures(event: Map[String, Any], name: String): Seq[Double] = {
val fact = facts.get(name).get
fact.algoDef match {
case algoDef:SupervisedTraining =>
algoFeatures(event, positionalFacts.indexOf(fact), algoDef.polyDegree, algoDef.flatten)
case _ => List[Double]()
case _ => Seq[Double]()