Commit 4dbda7a1 authored by Steven Pisarski's avatar Steven Pisarski

Added finer tuning controls for existing machine learning predictive algorithms.

parent 767affa8
......@@ -287,12 +287,9 @@ object AnalyzeData extends App {
* @param name - the job name
*/
private[this] def scheduleNow(f: () => Unit, scheduler: Scheduler, name: String): Unit = {
val jobData = mutable.Map[String, () => Unit]()
jobData += "function" -> f
val job = JobBuilder.newJob(classOf[RouteJob])
.withIdentity(name)
.setJobData(new JobDataMap(mapAsJavaMap(jobData)))
.setJobData(new JobDataMap(mapAsJavaMap(Map("function" -> f))))
val trigger = newTrigger().withIdentity(name).startNow()
logger.trace(s"Scheduling job $name")
scheduler.scheduleJob(job.build(), trigger.build())
......
......@@ -89,6 +89,13 @@ trait ClassificationModel extends SupervisedModel {
else label.get
}
/**
* Returns the value most closely associated with the numeric value
* @param key - the value to search
* @return - Any value but Nothing or null
*/
def labelValue(key: Double): Any = labelMap(closestKey(key, labelKeys.size / 2, 0))
/**
* Returns the closest key from the predicted to the list holding the classification values
* TODO - write unit test for this private method
......@@ -147,7 +154,7 @@ class NaiveBayesModel(override val field: Field, override val trainingSet: RDD[L
* @param stepSize - the size of each step taken during gradient descent
*/
class LinearRegressionModel(override val field: Field, override val trainingSet: RDD[LabeledPoint],
val weights: Seq[Double] = Seq[Double](),val numIterations: Int, val stepSize: Double)
val weights: Seq[Double] = Seq[Double](), val numIterations: Int, val stepSize: Double)
extends RegressionModel {
val featuresSize = trainingSet.first().features.size
private[algorithm] val model = if (weights.isEmpty)
......
......@@ -254,7 +254,7 @@ class SparkAnalyzer(val data: RDD[(String, Date, Map[String, Any])], val inputDe
/**
* Returns the fact training set for a given fact
* @return - a map of the training sets
* @return - an RDD of the training sets
*/
def factTrainingSet(name: String): RDD[(LabeledPoint, Any)] = {
val fact = inputDef.facts.get(name).get
......@@ -264,7 +264,7 @@ class SparkAnalyzer(val data: RDD[(String, Date, Map[String, Any])], val inputDe
data.flatMap[(LabeledPoint, Any)](p => {
val features = inputDef.factAlgoFeatures(p._3, fact)
Seq((LabeledPoint(inputDef.fieldMap.get(name).get.mlTrainingValue(p._3), new DenseVector(features.toArray)),
fact.eventValue[Any](p._3)))
fact.eventValue(p._3)))
})
case _ =>
data.context.parallelize(List[(LabeledPoint, Any)]())
......
......@@ -26,74 +26,6 @@ object InputDefinition {
config.inputDef
}
/**
* Derived from https://github.com/adityashah30/multipolyfit/blob/master/src/polynomial/PolynomialRegression.java
* @param values - the values to convert
* @return - tuple2 where the two Lists (values/weights) have a polynomial applied
* their sizes will equal values.size * degree
*/
private[model] def polynomialWithWeights(degree: Int, values: Seq[Double], weights: Seq[Double]): (Seq[Double], Seq[Double]) = {
val thisDegree = if (degree < 1) 1 else degree
if (thisDegree != 1) (polynomial(thisDegree, values), polynomialWeights(thisDegree, weights))
else (values, weights)
}
/**
* Applies a polynomial function to a ML feature set
* Derived from https://github.com/adityashah30/multipolyfit/blob/master/src/polynomial/PolynomialRegression.java
* @param degree - the polynomial degree. if < 1, 1 will be used
* @param values - the values to which to apply the polynomial function
* @return - a new list who's size is = values.size * degree
*/
private[model] def polynomial(degree: Int, values: Seq[Double]): Seq[Double] = {
val thisDegree = if (degree < 1) 1 else degree
if (thisDegree != 1) {
val out = Array.fill[Double](values.size * thisDegree)(0d)
for (col <- 0 to values.size - 1) {
for (deg <- 0 to thisDegree - 1) {
val index = col * thisDegree + deg
out(index) = Math.pow(values(col), deg + 1)
}
}
out.toSeq
} else values
}
/**
* Derives the field weights based on the polynomial degree parameter
* @param degree - the polynomial degree. if < 1, 1 will be used
* @param weights - the weights without any polynomial function applied
* @return - the weights with a polynomial function applied
*/
private[model] def polynomialWeights(degree: Int, weights: Seq[Double]): Seq[Double] = {
val thisDegree = if (degree < 1) 1 else degree
if (thisDegree != 1) {
val out = Array.fill[Double](weights.size * thisDegree)(0d)
for (col <- 0 to weights.size - 1) {
for (deg <- 0 to thisDegree - 1) {
val index = col * thisDegree + deg
out(index) = weights(col)
}
}
out.toSeq
} else weights
}
/**
* Applies Math.log() to the Math.abs() value of each feature numIter number of times.
* When the original value is 0, it remains.
* @param features - the features to flatten
* @param numIter - the number of times to perform the flattening function
* @return - a new list of flattened features
*/
private[model] def flatten(features: Seq[Double], numIter: Int): Seq[Double] =
if (numIter > 0) {
val flattened = features.map(num => {
if (num == 0) 0
else Math.log(Math.abs(num))
})
flatten(flattened, numIter - 1)
} else features
}
/**
......@@ -233,8 +165,7 @@ class InputDefinition(val temporal: Temporal, val dimensionSet: Set[Dimension],
*/
def temporalAlgoFeatures(event: Map[String, Any]): Seq[Double] =
temporal.algoDef match {
case algoDef:SupervisedTraining =>
algoFeatures(event, temporal.factPosition, algoDef)
case algoDef:SupervisedTraining => algoFeatures(event, temporal.factPosition, algoDef)
case _ => Seq[Double]()
}
......@@ -246,8 +177,7 @@ class InputDefinition(val temporal: Temporal, val dimensionSet: Set[Dimension],
*/
def factAlgoFeatures(event: Map[String, Any], fact: Fact): Seq[Double] =
fact.algoDef match {
case algoDef:SupervisedTraining =>
algoFeatures(event, fact.position, algoDef)
case algoDef:SupervisedTraining => algoFeatures(event, fact.position, algoDef)
case _ => Seq[Double]()
}
......@@ -259,10 +189,8 @@ class InputDefinition(val temporal: Temporal, val dimensionSet: Set[Dimension],
* @return - the feature set
*/
private def algoFeatures(event: Map[String, Any], factPosition: Int, algo: SupervisedTraining): Seq[Double] =
InputDefinition.polynomial(algo.polyDegree,
InputDefinition.flatten(
temporalFeatures(event, algo) ++: dimensionFeatures(event, algo) ++: factFeatures(event, factPosition, algo),
algo.flatten))
MachineLearning.polynomial(algo.polyDegree, algo.flatten(
temporalFeatures(event, algo) ++: dimensionFeatures(event, algo) ++: factFeatures(event, factPosition, algo)))
/**
* Return the temporal training features for a SupervisedTraining algorithm
......@@ -301,13 +229,13 @@ class InputDefinition(val temporal: Temporal, val dimensionSet: Set[Dimension],
def algoWeights(field: AlgorithmRole): Seq[Double] = field match {
case temporal: Temporal => temporal.algoDef match {
case regression: RegressionDefinition =>
InputDefinition.polynomialWeights(regression.polyDegree,
MachineLearning.polynomialWeights(regression.polyDegree,
temporalWeights(regression) ++: dimensionWeights(regression) ++: factWeights(regression, temporal.factPosition))
case _ => Seq()
}
case fact: Fact => fact.algoDef match {
case regression: RegressionDefinition =>
InputDefinition.polynomialWeights(regression.polyDegree,
MachineLearning.polynomialWeights(regression.polyDegree,
temporalWeights(regression) ++: dimensionWeights(regression) ++: factWeights(regression, fact.position))
case _ => Seq()
}
......@@ -429,29 +357,30 @@ class AlgoYaml(@JsonProperty("name") name: String,
@JsonProperty("constType") constType: String,
@JsonProperty("constVal") constVal: String,
@JsonProperty("omitFields") jOmitFields: java.util.Set[String],
@JsonProperty("flatten") flatten: Int,
@JsonProperty("flatten") _flatten: AlgoFlattenYAML,
@JsonProperty("polyDegree") polyDegree: Int,
@JsonProperty("iterations") iterations: Int,
@JsonProperty("stepSize") stepSize: Float,
@JsonProperty("lambda") lambda: Float,
@JsonProperty("weights") jWeights: java.util.Set[AlgoWeights]) {
@JsonProperty("weights") jWeights: java.util.Set[AlgoWeightsYAML]) {
// Currently only support these
require(name != null && (name == "linearRegression" || name == "naiveBayes" || name == "constant"))
val flatten = if (_flatten != null) _flatten else new AlgoFlattenYAML("", 0, 0)
private val omitFields =
if (jOmitFields == null) Set[String]()
else jOmitFields.asScala.toSet
val algorithm = {
def algorithm = {
name match {
case "linearRegression" =>
val weights =
if (jWeights == null) Map[String, Int]()
else jWeights.asScala.map(f => f.name -> f.weight).toMap
new LinearRegressionDefinition(omitFields, weights, flatten, polyDegree, iterations, stepSize)
new LinearRegressionDefinition(omitFields, weights, flatten.flatten, polyDegree, iterations, stepSize)
case "naiveBayes" =>
new NaiveBayesDefinition(omitFields, flatten, polyDegree, lambda)
new NaiveBayesDefinition(omitFields, flatten.flatten, polyDegree, lambda)
case "constant" =>
constType match {
case "string" => new ConstantStringDefinition(constVal)
......@@ -467,4 +396,63 @@ class AlgoYaml(@JsonProperty("name") name: String,
* @param name - the field name
* @param weight - the field's weight
*/
class AlgoWeights(@JsonProperty("name") val name: String, @JsonProperty("weight") val weight: Int)
\ No newline at end of file
class AlgoWeightsYAML(@JsonProperty("name") val name: String, @JsonProperty("weight") val weight: Int)
/**
* Responsible for creating the flatten partial function
* @param mode - the flatten mode (currently only support "naturalLog", "log" with base, and "root" with base or
* no change)
* @param base - the mode's base (log base or the root where 2 denotes sqrt, 3 cbrt, etc...)
* @param iterations - the number of times to apply the mode to the feature set
*/
class AlgoFlattenYAML(@JsonProperty("mode") val mode: String, @JsonProperty("base") val base: Int,
@JsonProperty("iterations") val iterations: Int) extends Serializable {
/**
* Returns the function responsible for flattening out the ML algorithm's feature set
* @return
*/
val flatten: (Seq[Double]) => Seq[Double] = {
mode match {
case "naturalLog" =>
require(iterations > 0)
def out(iter: Int, base: Int)(in: Seq[Double]): Seq[Double] = {
def loop(iter: Int, features: Seq[Double]): Seq[Double] =
if (iter < 1) features
else loop(iter - 1, features.map(f =>
if (f == 0) 0
else math.log(math.abs(f)) * (if (f < 0) -1 else 1)))
loop(iter, in)
}
out(iterations, base)
case "log" =>
require(iterations > 0)
require(base > 1)
def out(iter: Int, base: Int)(in: Seq[Double]): Seq[Double] = {
def loop(iter: Int, features: Seq[Double]): Seq[Double] =
if (iter < 1) features
else loop(iter - 1, features.map(f =>
if (f == 0) 0
else math.log10(math.abs(f)) / math.log10(base) * (if (f < 0) -1 else 1)))
loop(iter, in)
}
out(iterations, base)
case "root" =>
require(iterations > 0)
require(base > 1)
def out(iter: Int, base: Int)(in: Seq[Double]): Seq[Double] = {
def loop(iter: Int, features: Seq[Double]): Seq[Double] =
if (iter < 1) features
else loop(iter - 1, features.map(f =>
math.pow(Math.E, math.log(math.abs(f)) / base) * (if (f < 0) -1 else 1)))
loop(iter, in)
}
out(iterations, base)
case _ =>
def out(in: Seq[Double]): Seq[Double] = in
out
}
}
}
\ No newline at end of file
package com.cablelabs.eventgen.model
object MachineLearning {
/**
* Derived from https://github.com/adityashah30/multipolyfit/blob/master/src/polynomial/PolynomialRegression.java
* @param values - the values to convert
* @return - tuple2 where the two Lists (values/weights) have a polynomial applied
* their sizes will equal values.size * degree
*/
// TODO - consider moving this method to the SupervisedModel trait
private[model] def polynomialWithWeights(degree: Int, values: Seq[Double], weights: Seq[Double]): (Seq[Double], Seq[Double]) = {
val thisDegree = if (degree < 1) 1 else degree
if (thisDegree != 1) (polynomial(thisDegree, values), polynomialWeights(thisDegree, weights))
else (values, weights)
}
/**
* Applies a polynomial function to a ML feature set
* Derived from https://github.com/adityashah30/multipolyfit/blob/master/src/polynomial/PolynomialRegression.java
* @param degree - the polynomial degree. if < 1, 1 will be used
* @param values - the values to which to apply the polynomial function
* @return - a new list who's size is = values.size * degree
*/
// TODO - consider moving this method to the SupervisedModel trait
def polynomial(degree: Int, values: Seq[Double]): Seq[Double] = {
val thisDegree = if (degree < 1) 1 else degree
if (thisDegree != 1) {
val out = Array.fill[Double](values.size * thisDegree)(0d)
for (col <- 0 to values.size - 1) {
for (deg <- 0 to thisDegree - 1) {
val index = col * thisDegree + deg
out(index) = Math.pow(values(col), deg + 1)
}
}
out.toSeq
} else values
}
/**
* Derives the field weights based on the polynomial degree parameter
* @param degree - the polynomial degree. if < 1, 1 will be used
* @param weights - the weights without any polynomial function applied
* @return - the weights with a polynomial function applied
*/
// TODO - consider moving this method to the SupervisedModel trait
def polynomialWeights(degree: Int, weights: Seq[Double]): Seq[Double] = {
val thisDegree = if (degree < 1) 1 else degree
if (thisDegree != 1) {
val out = Array.fill[Double](weights.size * thisDegree)(0d)
for (col <- 0 to weights.size - 1) {
for (deg <- 0 to thisDegree - 1) {
val index = col * thisDegree + deg
out(index) = weights(col)
}
}
out.toSeq
} else weights
}
}
/**
* Algorithm trait - right now there is nothing to define but keeping for semantic and future extension reasons
*/
......@@ -17,11 +76,10 @@ trait SupervisedTraining extends AlgorithmDefinition {
def omitFields: Set[String]
/**
* Denotes how many times the feature set will be flattened by taking the original value and applying the log10
* value
* Partial function to flatten out the feature set values
* @return
*/
def flatten: Int
def flatten: (Seq[Double]) => Seq[Double]
/**
* Denotes the polynomial degrees to apply to the machine learning feature set
......@@ -61,15 +119,17 @@ trait ConstantDefinition extends AlgorithmDefinition {
* @param stepSize - amount to move the point during gradient descent for each step iteration
*/
class LinearRegressionDefinition(override val omitFields: Set[String], override val weights: Map[String, Int],
override val flatten: Int = 0, override val polyDegree: Int = 1,
val iterations: Int, val stepSize: Double)
override val flatten: (Seq[Double]) => Seq[Double],
override val polyDegree: Int = 1, val iterations: Int,
val stepSize: Double)
extends RegressionDefinition
/**
* Attributes required for training a Naive Bayes classification algorithm
* @param lambda - the smoothing parameter used when training the predictive algoritm
*/
class NaiveBayesDefinition(override val omitFields: Set[String], override val flatten: Int = 0,
class NaiveBayesDefinition(override val omitFields: Set[String],
override val flatten: (Seq[Double]) => Seq[Double],
override val polyDegree: Int = 1,val lambda: Double)
extends ClassificationDefinition
......
......@@ -9,7 +9,8 @@ temporal:
- day_of_month
algo:
name: linearRegression
flatten: 2
flatten:
mode: foo
polyDegree: 3
iterations: 20
stepSize: 0.001
......@@ -46,7 +47,8 @@ facts:
position: 10
algo:
name: linearRegression
flatten: 2
flatten:
mode: foo
polyDegree: 3
iterations: 20
stepSize: 0.001
......@@ -56,7 +58,8 @@ facts:
position: 20
algo:
name: linearRegression
flatten: 2
flatten:
mode: foo
polyDegree: 3
iterations: 20
stepSize: 0.001
......@@ -66,7 +69,8 @@ facts:
position: 30
algo:
name: naiveBayes
flatten: 2
flatten:
mode: foo
polyDegree: 3
lambda: 0.001
- name: date_fact
......@@ -76,6 +80,7 @@ facts:
position: 40
algo:
name: naiveBayes
flatten: 2
flatten:
mode: foo
polyDegree: 3
lambda: 0.001
\ No newline at end of file
......@@ -9,7 +9,8 @@ temporal:
- day_of_month
algo:
name: naiveBayes
flatten: 2
flatten:
mode: foo
polyDegree: 3
lambda: 0.001
omitFields:
......@@ -27,7 +28,8 @@ facts:
position: 10
algo:
name: linearRegression
flatten: 2
flatten:
mode: foo
polyDegree: 3
iterations: 20
stepSize: 0.001
......@@ -37,7 +39,8 @@ facts:
position: 20
algo:
name: linearRegression
flatten: 2
flatten:
mode: foo
polyDegree: 3
iterations: 20
stepSize: 0.001
......@@ -47,7 +50,8 @@ facts:
position: 30
algo:
name: naiveBayes
flatten: 2
flatten:
mode: foo
polyDegree: 3
lambda: 0.001
- name: date_fact
......@@ -57,6 +61,7 @@ facts:
position: 40
algo:
name: naiveBayes
flatten: 2
flatten:
mode: foo
polyDegree: 3
lambda: 0.001
\ No newline at end of file
......@@ -27,8 +27,7 @@ class EngineTest extends EngineTester {
outDef.routeType, notifier)
val seed = analyzer.lastEventByDim().toLocalIterator.next()
val newSeed = collection.mutable.Map(seed._2.toSeq: _*)
newSeed += inputDef.temporal.name -> new Date()
val newSeed = seed._2 ++ Map(inputDef.temporal.name -> new Date())
val dimString = inputDef.dimString(newSeed.toMap)
engine.seed(dimString, newSeed.toMap)
......
......@@ -11,7 +11,7 @@ import com.cablelabs.eventgen.model.{InputDefinition, LinearRegressionDefinition
* Tests may fail during build. see https://issues.apache.org/jira/browse/SPARK-2243
* Set Spark URI to local[1] to mitigate but will slow tests down
*/
class LinearRegressionCmModelTest extends AnalyzerTester {
class LinearRegressionCmTemporalModelTest extends AnalyzerTester {
val dateFormat = "MM-dd-yyyy HH:mm:ss a"
val dateFormatter = new SimpleDateFormat(dateFormat)
......@@ -28,11 +28,7 @@ class LinearRegressionCmModelTest extends AnalyzerTester {
inputDef.temporal.algoDef.asInstanceOf[LinearRegressionDefinition].iterations,
inputDef.temporal.algoDef.asInstanceOf[LinearRegressionDefinition].stepSize)
assert(trainingSet != null && trainingSet.size == 1000)
// This is currently using 90% instead of 5%
// TODO - determine if we can significantly close this gap, however this training set has such a sparse number
// of events for each dimensionality that can be causing significant outliers
ModelValidator.validateModel(temporalModel, trainingSet, analyzer, 0.40, outputValues = false)
ModelValidator.validateModel(temporalModel, trainingSet, analyzer, .99, outputValues = false)
}
}
......@@ -11,7 +11,8 @@ import com.cablelabs.eventgen.model.{InputDefinition, LinearRegressionDefinition
* Tests may fail during build. see https://issues.apache.org/jira/browse/SPARK-2243
* Set Spark URI to local[1] to mitigate but will slow tests down
*/
class LinearRegressionIvrModelTest extends AnalyzerTester {
// TODO - Replace this test as the IVR data set is not condusive for temporal predictions
class LinearRegressionIvrTemporalModelTest extends AnalyzerTester {
val dateFormat = "MM-dd-yyyy HH:mm:ss a"
val dateFormatter = new SimpleDateFormat(dateFormat)
......@@ -28,11 +29,7 @@ class LinearRegressionIvrModelTest extends AnalyzerTester {
inputDef.temporal.algoDef.asInstanceOf[LinearRegressionDefinition].iterations,
inputDef.temporal.algoDef.asInstanceOf[LinearRegressionDefinition].stepSize)
assert(trainingSet != null && trainingSet.size == 1000)
// This is currently using 90% instead of 5%
// TODO - determine if we can significantly close this gap, however this training set has such a sparse number
// of events for each dimensionality that can be causing significant outliers
ModelValidator.validateModel(temporalModel, trainingSet, analyzer, 0.80, outputValues = false)
ModelValidator.validateModel(temporalModel, trainingSet, analyzer, .95, outputValues = false)
}
}
......@@ -8,67 +8,96 @@ import org.apache.spark.mllib.regression.LabeledPoint
*/
object ModelValidator {
def validateModel(model: Model, trainingSet: Array[LabeledPoint], analyzer: SparkAnalyzer, threshold: Double,
outputValues: Boolean): Unit = {
def validateModel(model: Model, trainingSet: Array[LabeledPoint], analyzer: SparkAnalyzer, accuracy: Double,
outputValues: Boolean): Unit = model match {
case model: LinearRegressionModel =>
// Predict based on the actual training set
var diff = 0d
var totalPred = 0d
var totalLabel = 0d
var predCount = 0
var minPred = 9999999999d
var maxPred = 0d
var minLabel = 9999999999d
var maxLabel = 0d
trainingSet.foreach(p => {
val pred = model.predictRaw(p.features.toArray.toList)
totalPred += pred
totalLabel += p.label
predCount += 1
diff += p.label - pred
assert(p.label > -1)
// TODO - This should always be < 0 but the current test data and algorithms are currently not working properly
// assert(pred > 0)
if (pred < minPred) minPred = pred
if (pred > maxPred) maxPred = pred
if (p.label < minLabel) minLabel = p.label
if (p.label > maxLabel) maxLabel = p.label
if (outputValues) {
println(s"label = ${p.label} - pred = $pred")
// if (pred < 1500000) println(s"pred < 1.5m label = ${p.label} - pred = $pred")
// if (pred > 10000000) println(s"pred > 10m label = ${p.label} - pred = $pred")
// assert(Math.abs(p.label - pred) < p.label * .1)
}
})
val averageLabel = totalLabel / predCount
val averagePred = totalPred / predCount
val thresholdVal = averageLabel * (1.0 - threshold)
println (s"min pred = $minPred")
println (s"max pred = $maxPred")
println (s"average prediction = $averagePred")
println (s"min label = $minLabel")
println (s"max label = $maxLabel")
println (s"average label = $averageLabel")
assert(predCount == trainingSet.size)
// Check that cost of the entire set is < the threshold
assert(thresholdVal < averagePred)
// Predict based on the actual events used for training