Commit 57bee902 authored by Steven Pisarski's avatar Steven Pisarski

Refactored prediction model to ease testing and added more prediction tests....

Refactored prediction model to ease testing and added more prediction tests. Will continue to tune and create other existing prediction tests.
parent 4dbda7a1
......@@ -156,8 +156,7 @@ object AnalyzeData extends App {
val fileSystem = FileSystem.get(new URI(uri), new Configuration())
val stream = fileSystem.create(new Path(uri))
val model = Generator.buildFactModel(analyzer.inputDef, p)
val labeledPoints = p._2.map(_._1)
val predictions = gatherPredictionMetrics(model, labeledPoints)
val predictions = gatherPredictionMetrics(model, p._2)
outputPredictionMetrics(stream, predictions)
stream.close()
logger.info(s"Completed persisting fact training metrics to $uri")
......@@ -306,18 +305,19 @@ object AnalyzeData extends App {
* @param trainingSet - the set that could have been used to train the model
* @return - the metrics
*/
private[this] def gatherPredictionMetrics(model: Model, trainingSet: RDD[LabeledPoint]) = {
private[this] def gatherPredictionMetrics(model: Model, trainingSet: RDD[(LabeledPoint, Any)]) = {
logger.info("Gathering prediction metrics")
var totalLabels = 0d
var totalPredRaw = 0d
// TODO - consider creating a Spark accumulator so this can be done across the cluster
trainingSet.collect().map (p => {
val predValueRaw = model.predictRaw(p.features.toArray.toList)
val diffRaw = p.label - predValueRaw
val predValue = model.predict(p.features.toArray.toList)
totalLabels += p.label
val labledPoint = p._1
val predValueRaw = model.predictRaw(labledPoint.features.toArray.toList)
val diffRaw = labledPoint.label - predValueRaw
val predValue = model.predict(labledPoint.features.toArray.toList)
totalLabels += labledPoint.label
totalPredRaw += predValueRaw
(p.label, predValueRaw, diffRaw, predValue)
(labledPoint.label, predValueRaw, diffRaw, predValue)
})
}
}
......
......@@ -101,13 +101,12 @@ object Generator extends App {
* @return - the configured prediction model
*/
// TODO - test me
def buildTemporalModel(inputDef: InputDefinition, trainingSet: RDD[LabeledPoint]): Model =
def buildTemporalModel(inputDef: InputDefinition, trainingSet: RDD[(LabeledPoint, Any)]): Model =
inputDef.temporal.algoDef match {
case definition: SupervisedTraining =>
definition match {
case definition: NaiveBayesDefinition =>
val labelMap = trainingSet.map(p => p.label -> p.label).collect().toMap
new NaiveBayesModel(inputDef.temporal, trainingSet, labelMap, definition.lambda)
new NaiveBayesModel(inputDef.temporal, trainingSet, definition.lambda)
case definition: LinearRegressionDefinition =>
new LinearRegressionModel(inputDef.temporal, trainingSet, inputDef.algoWeights(inputDef.temporal),
definition.iterations, definition.stepSize)
......@@ -151,15 +150,11 @@ object Generator extends App {
val fact = inputDef.facts.get(data._1).get
fact.algoDef match {
case definition: SupervisedTraining =>
val trainingSet = data._2
// TODO - fix me when built from a cached RDD (works when comes directly from the analyzer)
val features = trainingSet.map(s => s._1)
definition match {
case definition: NaiveBayesDefinition =>
val labelMap = trainingSet.map(s => s._1.label -> s._2).collect().toMap
new NaiveBayesModel(fact, features, labelMap, definition.lambda)
new NaiveBayesModel(fact, data._2, definition.lambda)
case definition: LinearRegressionDefinition =>
new LinearRegressionModel(fact, features, inputDef.algoWeights(fact), definition.iterations,
new LinearRegressionModel(fact, data._2, inputDef.algoWeights(fact), definition.iterations,
definition.stepSize)
}
case definition: ConstantDefinition =>
......@@ -191,7 +186,7 @@ object Generator extends App {
def engineFromCache(sc: SparkContext, inputDef: InputDefinition, temporalTrainingSetUri: String, factTrainingSetUri: String,
sendPastEvents: Boolean, outputDefs: Set[OutputDefinition], numSchedulerThreads: Int,
remoteAddr: Address, minActors: Int, maxActors: Int) = {
val temporalTrainingSet = sc.objectFile[LabeledPoint](temporalTrainingSetUri)
val temporalTrainingSet = sc.objectFile[(LabeledPoint, Any)](temporalTrainingSetUri)
val factTrainingSet = getFactTrainingSets(sc, inputDef, factTrainingSetUri)
val factTrainingList = factTrainingSet.toSeq
new Engine(inputDef,
......
......@@ -49,12 +49,12 @@ trait SupervisedModel extends Model {
* The number of features this supervised model has
* @return
*/
def featuresSize: Int
private[algorithm] def featuresSize: Int
/**
* The required for supervised training operations
*/
def trainingSet: RDD[LabeledPoint]
private[algorithm] def trainingSet: RDD[(LabeledPoint, Any)]
}
/**
......@@ -72,46 +72,60 @@ trait ClassificationModel extends SupervisedModel {
* Contains a map of labels and associated values
* @return
*/
def labelMap: Map[Double, Any]
private[algorithm] def labelMap: Map[Double, Any]
/**
* The sorted keys contained in lableMap
* @return
*/
def labelKeys: Seq[Double]
private[algorithm] def labelKeys: List[Double]
override def predict(features: Seq[Double]): Any = {
assert(featuresSize == features.size)
val pred = model.predict(new DenseVector(features.toArray))
val key = closestKey(pred, labelKeys.size / 2, 0)
val key = closestKey(pred)
val label = labelMap.get(key)
if (label == None) throw new RuntimeException("Predicted value not contained in the label map")
else label.get
}
override def predictRaw(features: Seq[Double]): Double = model.predict(new DenseVector(features.toArray))
/**
* Returns the value most closely associated with the numeric value
* @param key - the value to search
* @return - Any value but Nothing or null
*/
def labelValue(key: Double): Any = labelMap(closestKey(key, labelKeys.size / 2, 0))
def labelValue(key: Double): Any = labelMap(closestKey(key))
/**
* Returns the closest key from the predicted to the list holding the classification values
* TODO - write unit test for this private method
* Returns the key with the value closes to the value parameter
* @param value - the value to search
* @return - the key
*/
private[this] def closestKey(value: Double, index: Int, lastIndex: Int): Double =
if(index == lastIndex)
if (index < labelKeys.size && index >= 0 && labelKeys.size > 0) labelKeys(index)
else 0
else {
val newIndex =
if(value > labelKeys(index)) index / 2
else(labelKeys.size - index) / 2 + index
closestKey(value, newIndex, index)
}
def closestKey(value: Double): Double =
if(labelKeys.contains(value)) labelKeys(labelKeys.indexOf(value))
else closestKey(value, labelKeys.size / 2, 0)
override def predictRaw(features: Seq[Double]): Double = model.predict(new DenseVector(features.toArray))
/**
* Recursively searches through the ordered sequence of keys for the value closest to the value
*/
private def closestKey(value: Double, index: Int, lastIndex: Int): Double =
if(index == lastIndex || value == labelKeys(index)) labelKeys(index)
else if (value < labelKeys(index))
if (index == 0) labelKeys(index)
else if (value == labelKeys(index - 1)) labelKeys(index - 1)
else if(value > labelKeys(index - 1))
if (labelKeys(index) - value > value - labelKeys(index -1)) labelKeys(index - 1) else labelKeys(index)
else closestKey(value, index / 2, index)
else
if (index + 1 == labelKeys.size) labelKeys(index)
else if (value == labelKeys(index + 1)) labelKeys(index + 1)
else if(value < labelKeys(index + 1))
if (labelKeys(index + 1) - value > value - labelKeys(index)) labelKeys(index) else labelKeys(index + 1)
else
if (lastIndex < index) closestKey(value, (labelKeys.size - index) / 2 + index + 1, index)
else closestKey(value, (lastIndex - index) / 2 + index + 1, index)
}
/**
......@@ -135,15 +149,15 @@ trait RegressionModel extends SupervisedModel {
/**
* Implementation of a Naive Bayes classification predictive algorithm
* @param trainingSet - tuple where _1 are labels and _2 are the associated features used to train the algorithm
* @param labelMap - used to map the predicted value to the actual
* @param lambda - the algorithm's smoothing parameter
*/
class NaiveBayesModel(override val field: Field, override val trainingSet: RDD[LabeledPoint],
val labelMap: Map[Double, Any], val lambda: Double) extends ClassificationModel {
val featuresSize = trainingSet.first().features.size
private[algorithm] val model = NaiveBayes.train(trainingSet, lambda)
val labelKeys = labelMap.keySet.toSeq.sortBy(x => x)
class NaiveBayesModel(override val field: Field, override val trainingSet: RDD[(LabeledPoint, Any)],
val lambda: Double) extends ClassificationModel {
private val actualTrainingSet = trainingSet.map(_._1)
private[algorithm] override val model = NaiveBayes.train(actualTrainingSet, lambda)
private[algorithm] override val labelMap = trainingSet.collect().map(p => p._1.label -> p._2).toMap
private[algorithm] override val labelKeys = labelMap.keySet.toSeq.sortBy(x => x).toList
private[algorithm] lazy override val featuresSize = actualTrainingSet.first().features.size
}
/**
......@@ -153,16 +167,17 @@ class NaiveBayesModel(override val field: Field, override val trainingSet: RDD[L
* @param numIterations - the number of times gradient descent will run during training
* @param stepSize - the size of each step taken during gradient descent
*/
class LinearRegressionModel(override val field: Field, override val trainingSet: RDD[LabeledPoint],
class LinearRegressionModel(override val field: Field, override val trainingSet: RDD[(LabeledPoint, Any)],
val weights: Seq[Double] = Seq[Double](), val numIterations: Int, val stepSize: Double)
extends RegressionModel {
val featuresSize = trainingSet.first().features.size
private[algorithm] val model = if (weights.isEmpty)
LinearRegressionWithSGD.train(trainingSet, numIterations, stepSize, 1.0)
else {
require(weights.size == featuresSize)
LinearRegressionWithSGD.train(trainingSet, numIterations, stepSize, 1.0, new DenseVector(weights.toArray))
}
private val actualTrainingSet = trainingSet.map(_._1)
private[algorithm] lazy override val featuresSize = actualTrainingSet.first().features.size
private[algorithm] override val model = if (weights.isEmpty)
LinearRegressionWithSGD.train(trainingSet.map(_._1), numIterations, stepSize, 1.0)
else {
require(weights.size == featuresSize)
LinearRegressionWithSGD.train(actualTrainingSet, numIterations, stepSize, 1.0, new DenseVector(weights.toArray))
}
}
/**
......
......@@ -207,7 +207,7 @@ class SparkAnalyzer(val data: RDD[(String, Date, Map[String, Any])], val inputDe
* Returns the training set required for the temporal prediction algorithm
* @return
*/
def temporalTrainingSet(): RDD[LabeledPoint] = {
def temporalTrainingSet(): RDD[(LabeledPoint, Any)] = {
logger.info("Retrieving the training set for the temporal prediction algorithm")
if (inputDef.temporal.algoDef.isInstanceOf[SupervisedTraining]) {
val accum = data.context.accumulator(Map[String, List[Date]]())(TemporalDimAccumParam)
......@@ -224,7 +224,7 @@ class SparkAnalyzer(val data: RDD[(String, Date, Map[String, Any])], val inputDe
val dMetrics = durationMetrics(accumMap)
// Build the training set
val labeledPoints = data.map { p =>
data.map { p =>
val datesOption = accumMap.get(p._1)
val features = inputDef.temporalAlgoFeatures(p._3)
val vector = new DenseVector(features.toArray)
......@@ -232,23 +232,23 @@ class SparkAnalyzer(val data: RDD[(String, Date, Map[String, Any])], val inputDe
val dates = datesOption.get
val dateIndex = dates._1.indexOf(p._2)
if (dates._2(dateIndex) == 0 && dates._2.size > 1) {
LabeledPoint(dates._2.sum / (dates._2.size - 1), vector)
val value = dates._2.sum / (dates._2.size - 1)
(LabeledPoint(value, vector), value)
} else {
val rnd = Random.nextInt(8)
if (rnd == 0) LabeledPoint(dMetrics._1, vector) // median
else if (rnd == 1) LabeledPoint(dMetrics._2, vector) // first quartile
else if (rnd == 2) LabeledPoint(dMetrics._3, vector) // third quartile
else if (rnd == 3) LabeledPoint(dMetrics._4, vector) // third quartile
else LabeledPoint(dMetrics._5, vector) // event range
if (rnd == 0) (LabeledPoint(dMetrics._1, vector), dMetrics._1) // median
else if (rnd == 1) (LabeledPoint(dMetrics._2, vector), dMetrics._2) // first quartile
else if (rnd == 2) (LabeledPoint(dMetrics._3, vector), dMetrics._3) // third quartile
else if (rnd == 3) (LabeledPoint(dMetrics._4, vector), dMetrics._4) // third quartile
else (LabeledPoint(dMetrics._5, vector), dMetrics._5) // event range
}
} else {
// This should never happen
LabeledPoint(0d, vector)
(LabeledPoint(0d, vector), 0)
}
}
labeledPoints
} else {
data.context.parallelize(List[LabeledPoint]())
data.context.parallelize(List[(LabeledPoint, Any)]())
}
}
......
......@@ -121,16 +121,16 @@ trait ConstantDefinition extends AlgorithmDefinition {
class LinearRegressionDefinition(override val omitFields: Set[String], override val weights: Map[String, Int],
override val flatten: (Seq[Double]) => Seq[Double],
override val polyDegree: Int = 1, val iterations: Int,
val stepSize: Double)
val stepSize: Double = .001)
extends RegressionDefinition
/**
* Attributes required for training a Naive Bayes classification algorithm
* @param lambda - the smoothing parameter used when training the predictive algoritm
*/
class NaiveBayesDefinition(override val omitFields: Set[String],
class NaiveBayesDefinition(override val omitFields: Set[String] = Set(),
override val flatten: (Seq[Double]) => Seq[Double],
override val polyDegree: Int = 1,val lambda: Double)
override val polyDegree: Int = 1, val lambda: Double = .001)
extends ClassificationDefinition
/**
......
package com.cablelabs.eventgen.algorithm
import java.io.{File, FileInputStream}
import java.text.SimpleDateFormat
import com.cablelabs.eventgen.AnalyzerTester
import com.cablelabs.eventgen.model.{InputDefinition, LinearRegressionDefinition, NaiveBayesDefinition}
/**
* Tests the configured models against a known set of 10k events of IVR data
*/
class IvrFactPredictionsTest extends AnalyzerTester {
val dateFormat = "MM-dd-yyyy HH:mm:ss a"
val dateFormatter = new SimpleDateFormat(dateFormat)
val inputDef = InputDefinition.inputDefinition(
new FileInputStream(new File("testData/ivr/definition/ivr-fact-pred-input.yaml")))
val eventUri = new File("testData/ivr/events").toURI.toString
val delim = ','
analyzerTest("Analyze fact preditions to ensure the average prediction is within 95% of the average label") {
inputDef.positionalFacts.foreach(fact => {
val trainingSet = analyzer.factTrainingSet(fact.name)
assert(trainingSet != null && trainingSet.count() == 1000)
val model = fact.algoDef match {
case lrDef: LinearRegressionDefinition =>
new LinearRegressionModel(fact, analyzer.factTrainingSet(fact.name),
inputDef.algoWeights(fact), lrDef.iterations, lrDef.stepSize)
case nbDef: NaiveBayesDefinition =>
new NaiveBayesModel(fact, analyzer.factTrainingSet(fact.name), nbDef.lambda)
}
println(s"Validating model for fact with name - ${fact.name}")
ModelValidator.validateModel(model, trainingSet.collect(), analyzer, 0.95, outputValues = false)
})
}
}
package com.cablelabs.eventgen.algorithm
import com.cablelabs.eventgen.analysis.SparkAnalyzer
import com.cablelabs.eventgen.model.{Fact, Temporal}
import org.apache.spark.mllib.regression.LabeledPoint
/**
......@@ -8,7 +9,7 @@ import org.apache.spark.mllib.regression.LabeledPoint
*/
object ModelValidator {
def validateModel(model: Model, trainingSet: Array[LabeledPoint], analyzer: SparkAnalyzer, accuracy: Double,
def validateModel(model: Model, trainingSet: Array[(LabeledPoint, Any)], analyzer: SparkAnalyzer, accuracy: Double,
outputValues: Boolean): Unit = model match {
case model: LinearRegressionModel =>
......@@ -23,48 +24,46 @@ object ModelValidator {
var maxLabel = 0d
trainingSet.foreach(p => {
val pred = model.predictRaw(p.features.toArray)
val label = p._1.label
val pred = model.predictRaw(p._1.features.toArray)
totalPred += pred
totalLabel += p.label
totalLabel += label
predCount += 1
diff += p.label - pred
assert(p.label > -1)
// TODO - This should always be < 0 but the current test data and algorithms are currently not working properly
// assert(pred > 0)
diff += label - pred
if (pred < minPred) minPred = pred
if (pred > maxPred) maxPred = pred
if (p.label < minLabel) minLabel = p.label
if (p.label > maxLabel) maxLabel = p.label
if (label < minLabel) minLabel = label
if (label > maxLabel) maxLabel = label
if (outputValues) {
println(s"label = ${p.label} - pred = $pred")
// if (pred < 1500000) println(s"pred < 1.5m label = ${p.label} - pred = $pred")
// if (pred > 10000000) println(s"pred > 10m label = ${p.label} - pred = $pred")
// assert(Math.abs(p.label - pred) < p.label * .1)
}
if (outputValues) println(s"label = $label - pred = $pred")
})
val averageLabel = totalLabel / predCount
val averagePred = totalPred / predCount
val thresholdVal = averageLabel * accuracy
val actualAccuracy = 1 - math.abs(averageLabel - averagePred) / math.abs(averageLabel)
println (s"pred count = $predCount")
println (s"min pred = $minPred")
println (s"max pred = $maxPred")
println (s"average prediction = $averagePred")
println (s"min label = $minLabel")
println (s"max label = $maxLabel")
println (s"average label = $averageLabel")
println (s"actualAccuracy = $actualAccuracy")
assert(predCount == trainingSet.size)
// Check that cost of the entire set is < the threshold
assert(thresholdVal < averagePred)
assert(accuracy < actualAccuracy)
// Predict based on the actual events used for training
val events = analyzer.events().collect()
assert(events.size == trainingSet.size)
var totalPred2 = 0d
events.foreach(p => {
totalPred2 += model.predictRaw(analyzer.inputDef.temporalAlgoFeatures(p))
events.foreach(event => model.field match {
case temporal: Temporal =>
totalPred2 += model.predictRaw(analyzer.inputDef.temporalAlgoFeatures(event))
case fact: Fact =>
totalPred2 += model.predictRaw(analyzer.inputDef.factAlgoFeatures(event, fact))
})
// Assure that both the actual feature sets used for training and the re-calculated versions from the events
......@@ -79,9 +78,9 @@ object ModelValidator {
var numMatches = 0
trainingSet.foreach(p => {
val pred = model.predict(p.features.toArray)
val pred = model.predict(p._1.features.toArray)
predCount += 1
val labelValue = model.labelValue(p.label)
val labelValue = model.labelValue(p._1.label)
if (labelValue == pred) {
numMatches += 1
}
......
package com.cablelabs.eventgen.algorithm
import java.io.{File, FileInputStream}
import java.text.SimpleDateFormat
import com.cablelabs.eventgen.AnalyzerTester
import com.cablelabs.eventgen.model.{InputDefinition, NaiveBayesDefinition}
/**
* Tests the NaiveBayesModel against a known set of 10k events of IVR data
* Tests may fail during build. see https://issues.apache.org/jira/browse/SPARK-2243
* Set Spark URI to local[1] to mitigate but will slow tests down
*/
class NaiveBayesIvrFactModelTest extends AnalyzerTester {
val dateFormat = "MM-dd-yyyy HH:mm:ss a"
val dateFormatter = new SimpleDateFormat(dateFormat)
val inputDef = InputDefinition.inputDefinition(
new FileInputStream(new File("testData/ivr/definition/ivr-naive-bayes-input.yaml")))
/* DO NOT SET THESE WHEN BUILDING - settings only here as sanity check for talking to the HDFS
val schemaUri = "hdfs://hadoop-01/tmp/inputDef/ivr.json"
val fileSystem = FileSystem.get(new URI(schemaUri), new Configuration())
val inputDef = EventUtil.parseSchema(new StreamInput(fileSystem.open(new Path(schemaUri))))
*/
val eventUri = new File("testData/ivr/events").toURI.toString
//DO NOT SET THESE WHEN BUILDING - settings only here as sanity check for talking to the HDFS
// val eventUri = "hdfs://hadoop-01/tmp/ivr_files_1"
val delim = ','
analyzerTest("Analyze Naive Bayes classification model should predict fact values minimal error") {
inputDef.facts.filter(p => p._2.algoDef.isInstanceOf[NaiveBayesDefinition]).map(_._2).foreach(fact => {
val trainingSet = analyzer.factTrainingSet(fact.name)
val labelMap = trainingSet.collect().map(p => p._1.label -> p._2).toMap
val temporalModel = new NaiveBayesModel(inputDef.temporal, trainingSet.map(_._1),
labelMap, fact.algoDef.asInstanceOf[NaiveBayesDefinition].lambda)
assert(trainingSet != null && trainingSet.count() == 1000)
// This is currently using 40% accuracy which sucks
// TODO - determine if we can significantly close this gap, however this training set has such a sparse number
// of events for each dimensionality that can be causing significant outliers
ModelValidator.validateModel(temporalModel, trainingSet.map(_._1).collect(), analyzer, 0.40, outputValues = false)
})
}
}
package com.cablelabs.eventgen.algorithm
import com.cablelabs.eventgen.SparkTestUtils
import com.cablelabs.eventgen.model.{AlgoFlattenYAML, NaiveBayesDefinition, StringFact}
import org.apache.spark.mllib.linalg.DenseVector
import org.apache.spark.mllib.regression.LabeledPoint
/**
* Tests the methods on the NaiveBayesModel class
*/
class NaiveBayesModelTest extends SparkTestUtils {
val flatten = new AlgoFlattenYAML("", 0, 0).flatten
val fact = new StringFact("foo", "", 1, new NaiveBayesDefinition(flatten = flatten, lambda = 5.0, polyDegree = 2))
val trainingSet: Seq[(LabeledPoint, Any)] = Seq(
(LabeledPoint(1.1, new DenseVector(Array(10.0, 20.0))), "ABC"),
(LabeledPoint(1.2, new DenseVector(Array(20.0, 30.0))), "DEF"),
(LabeledPoint(1.3, new DenseVector(Array(30.0, 40.0))), "GHI"),
(LabeledPoint(1.4, new DenseVector(Array(40.0, 50.0))), "JKL"),
(LabeledPoint(1.5, new DenseVector(Array(50.0, 60.0))), "MNO"),
(LabeledPoint(1.6, new DenseVector(Array(60.0, 70.0))), "PQR")
)
sparkTest("closestKey returns the key value that exactly corresponds to the input") {
val nbm = new NaiveBayesModel(fact, sc.parallelize(trainingSet), 0.01)
assert(1.1 == nbm.closestKey(1.1))
assert(1.2 == nbm.closestKey(1.2))
assert(1.3 == nbm.closestKey(1.3))
assert(1.4 == nbm.closestKey(1.4))
assert(1.5 == nbm.closestKey(1.5))
assert(1.6 == nbm.closestKey(1.6))
}
sparkTest("labelValue returns the key value that exactly corresponds to the input") {
val nbm = new NaiveBayesModel(fact, sc.parallelize(trainingSet), 0.01)
assert("ABC" == nbm.labelValue(1.1))
assert("DEF" == nbm.labelValue(1.2))
assert("GHI" == nbm.labelValue(1.3))
assert("JKL" == nbm.labelValue(1.4))
assert("MNO" == nbm.labelValue(1.5))
assert("PQR" == nbm.labelValue(1.6))
}
sparkTest("closestKey returns the key value that most closely corresponds to the input") {
val nbm = new NaiveBayesModel(fact, sc.parallelize(trainingSet), 0.01)
assert(1.1 == nbm.closestKey(-1.1499))
assert(1.1 == nbm.closestKey(1.1499))
assert(1.1 == nbm.closestKey(1.15))
assert(1.1 == nbm.closestKey(1))
assert(1.1 == nbm.closestKey(0))
assert(1.2 == nbm.closestKey(1.1501))
assert(1.2 == nbm.closestKey(1.2499))
assert(1.3 == nbm.closestKey(1.251))
assert(1.3 == nbm.closestKey(1.3499))
assert(1.4 == nbm.closestKey(1.351))
assert(1.4 == nbm.closestKey(1.4499))
assert(1.5 == nbm.closestKey(1.451))
assert(1.5 == nbm.closestKey(1.5499))
assert(1.6 == nbm.closestKey(1.551))
assert(1.6 == nbm.closestKey(10))
}
sparkTest("labelValue returns the key value that most closely corresponds to the input") {
val nbm = new NaiveBayesModel(fact, sc.parallelize(trainingSet), 0.01)
assert("ABC" == nbm.labelValue(1.1499))
assert("ABC" == nbm.labelValue(1.1499))
assert("ABC" == nbm.labelValue(1.15))
assert("ABC" == nbm.labelValue(1))
assert("ABC" == nbm.labelValue(0))
assert("DEF" == nbm.labelValue(1.1501))
assert("DEF" == nbm.labelValue(1.2499))
assert("GHI" == nbm.labelValue(1.251))
assert("GHI" == nbm.labelValue(1.3499))
assert("JKL" == nbm.labelValue(1.351))
assert("JKL" == nbm.labelValue(1.4499))
assert("MNO" == nbm.labelValue(1.451))
assert("MNO" == nbm.labelValue(1.5499))
assert("PQR" == nbm.labelValue(1.551))
assert("PQR" == nbm.labelValue(10))
}
sparkTest("prediction should return expected values") {
val nbm = new NaiveBayesModel(fact, sc.parallelize(trainingSet), 1)
assert("ABC" == nbm.predict(Seq(10.0, 20.0)))
assert("DEF" == nbm.predict(Seq(20.0, 30.0)))
assert("GHI" == nbm.predict(Seq(30.0, 40.0)))
assert("JKL" == nbm.predict(Seq(40.0, 50.0)))
assert("MNO" == nbm.predict(Seq(50.0, 60.0)))
assert("PQR" == nbm.predict(Seq(60.0, 70.0)))
}
}
......@@ -26,7 +26,7 @@ class SparkAnalyzerGenericTest extends AnalyzerTester {
assert(9 == trainingSet.size)
val rawFeatures = inputDef.temporal.denormFields.size + inputDef.dimensionSet.size + inputDef.factSet.size
val expectedNumFeatures = rawFeatures * inputDef.temporal.algoDef.asInstanceOf[SupervisedTraining].polyDegree
assert(expectedNumFeatures == trainingSet.iterator.next().features.size)
assert(expectedNumFeatures == trainingSet.iterator.next()._1.features.size)
}
analyzerTest("Fact training set should return the proper number of training sets and features for the first fact") {
......
......@@ -27,7 +27,7 @@ class SparkAnalyzerIvrTest extends AnalyzerTester {
assert(1000 == trainingSet.size)
val rawFeatures = inputDef.temporal.denormFields.size + inputDef.dimensionSet.size + inputDef.factSet.size
val expectedNumFeatures = rawFeatures * inputDef.temporal.algoDef.asInstanceOf[SupervisedTraining].polyDegree
assert(expectedNumFeatures == trainingSet.iterator.next().features.size)
assert(expectedNumFeatures == trainingSet.iterator.next()._1.features.size)
}
analyzerTest("Fact training set should return the proper number of training sets and features for the first fact") {
......
......@@ -11,7 +11,7 @@ class DateFactTest extends UnitSpec {
val dateFormat = "MM/dd/yyyy HH:mm:ss a"
val flatten: (Seq[Double]) => Seq[Double] = Seq[Double]
val flatten = new AlgoFlattenYAML("", 0, 0).flatten
test("DateFact construction all fields positionally should return proper values") {
val mlDef = new NaiveBayesDefinition(Set(), flatten, 1, .001)
......
......@@ -11,7 +11,7 @@ import org.joda.time.DateTime
class DateTemporalTest extends UnitSpec {
val dateFormat = "MM/dd/yyyy HH:mm:ss a"
val flatten: (Seq[Double]) => Seq[Double] = Seq[Double]
val flatten = new AlgoFlattenYAML("", 0, 0).flatten
test("DateTemporal construction all fields positionally should return proper values") {
val mlDef = new NaiveBayesDefinition(Set(), flatten, 3, .001)
......
......@@ -7,7 +7,7 @@ import com.cablelabs.eventgen.UnitSpec
*/
class FloatFactTest extends UnitSpec {
val flatten: (Seq[Double]) => Seq[Double] = Seq[Double]
val flatten = new AlgoFlattenYAML("", 0, 0).flatten
test("FloatFact construction all fields positionally should return proper values") {
val mlDef = new NaiveBayesDefinition(Set(), flatten, 1, .001)
......
......@@ -7,7 +7,7 @@ import com.cablelabs.eventgen.UnitSpec
*/
class IntFactTest extends UnitSpec {
val flatten: (Seq[Double]) => Seq[Double] = Seq[Double]
val flatten = new AlgoFlattenYAML("", 0, 0).flatten
test("IntegerFact construction all fields positionally should return proper values") {
val mlDef = new NaiveBayesDefinition(Set(), flatten, 1, .001)
......