Commit fd446c81 authored by Steven Pisarski's avatar Steven Pisarski

Removed AlgorithmUtil methods into InputDefinition and SparkAnalyzer.

parent 13882b62
package com.cablelabs.eventgen.algorithm
import java.util.Date
/**
* Helpful methods when working with Machine Learning predictive algorithms
*/
object AlgorithmUtil {
/**
* Derived from https://github.com/adityashah30/multipolyfit/blob/master/src/polynomial/PolynomialRegression.java
* @param values - the values to convert
* @return - tuple2 where the two Lists (values/weights) have a polynomial applied
* their sizes will equal values.size * degree
*/
def polynomialWithWeights(degree: Int, values: Seq[Double], weights: Seq[Double]): (Seq[Double], Seq[Double]) = {
val thisDegree = if (degree < 1) 1 else degree
if (thisDegree != 1) (polynomial(thisDegree, values), polynomialWeights(thisDegree, weights))
else (values, weights)
}
/**
* Applies a polynomial function to a ML feature set
* Derived from https://github.com/adityashah30/multipolyfit/blob/master/src/polynomial/PolynomialRegression.java
* @param degree - the polynomial degree. if < 1, 1 will be used
* @param values - the values to which to apply the polynomial function
* @return - a new list who's size is = values.size * degree
*/
def polynomial(degree: Int, values: Seq[Double]): Seq[Double] = {
val thisDegree = if (degree < 1) 1 else degree
if (thisDegree != 1) {
val out = Array.fill[Double](values.size * thisDegree)(0d)
for (col <- 0 to values.size - 1) {
for (deg <- 0 to thisDegree - 1) {
val index = col * thisDegree + deg
out(index) = Math.pow(values(col), deg + 1)
}
}
out.toSeq
} else values
}
/**
* Derives the field weights based on the polynomial degree parameter
* @param degree - the polynomial degree. if < 1, 1 will be used
* @param weights - the weights without any polynomial function applied
* @return - the weights with a polynomial function applied
*/
def polynomialWeights(degree: Int, weights: Seq[Double]): Seq[Double] = {
val thisDegree = if (degree < 1) 1 else degree
if (thisDegree != 1) {
val out = Array.fill[Double](weights.size * thisDegree)(0d)
for (col <- 0 to weights.size - 1) {
for (deg <- 0 to thisDegree - 1) {
val index = col * thisDegree + deg
out(index) = weights(col)
}
}
out.toSeq
} else weights
}
/**
* Applies Math.log() to the Math.abs() value of each feature numIter number of times.
* When the original value is 0, it remains.
* @param features - the features to flatten
* @param numIter - the number of times to perform the flattening function
* @return - a new list of flattened features
*/
def flatten(features: Seq[Double], numIter: Int): Seq[Double] =
if (numIter > 0) {
val flattened = features.map(num => {
if (num == 0) 0
else Math.log(Math.abs(num))
})
flatten(flattened, numIter - 1)
} else features
/**
* Returns a tuple containing the list of sorted dates corresponding, a list of durations
* and the average of all durations
* @param dates - the dates to calculate (unsorted)
* @return - the tuple
*/
def getDurations(dates: Seq[Date]): (Seq[Date], Seq[Double], Double) = {
val sorted = dates.sortBy(x => x.getTime)
val durations = {
def loop(dates: Seq[Date], prevDate: Date, acc: List[Double]): Seq[Double] =
if (dates.isEmpty) acc
else {
val duration = if (prevDate.getTime == 0) 0d else (dates.head.getTime - prevDate.getTime).toDouble
loop(dates.tail, dates.head, acc.:+(duration))
}
loop(sorted, new Date(0), List())
}
val avg = if (durations.size > 1) durations.sum / (durations.size - 1) else 0d
assert(sorted.size == durations.size)
(sorted, durations.toSeq, avg)
}
}
......@@ -4,7 +4,6 @@ import java.io.{BufferedReader, InputStreamReader}
import java.net.URI
import java.util.Date
import com.cablelabs.eventgen.algorithm.AlgorithmUtil
import com.cablelabs.eventgen.model.{InputDefinition, SupervisedTraining}
import com.typesafe.scalalogging.slf4j.Logger
import org.apache.hadoop.conf.Configuration
......@@ -108,6 +107,29 @@ object SparkAnalyzer {
(headerLine, hdrs)
}
}
/**
* Returns a tuple containing the list of sorted dates corresponding, a list of durations
* and the average of all durations
* @param dates - the dates to calculate (unsorted)
* @return - the tuple
*/
private[analysis] def getDurations(dates: Seq[Date]): (Seq[Date], Seq[Double], Double) = {
val sorted = dates.sortBy(x => x.getTime)
val durations = {
def loop(dates: Seq[Date], prevDate: Date, acc: List[Double]): Seq[Double] =
if (dates.isEmpty) acc
else {
val duration = if (prevDate.getTime == 0) 0d else (dates.head.getTime - prevDate.getTime).toDouble
loop(dates.tail, dates.head, acc.:+(duration))
}
loop(sorted, new Date(0), List())
}
val avg = if (durations.size > 1) durations.sum / (durations.size - 1) else 0d
assert(sorted.size == durations.size)
(sorted, durations.toSeq, avg)
}
}
/**
......@@ -195,7 +217,7 @@ class SparkAnalyzer(val data: RDD[(String, Date, Map[String, Any])], val inputDe
// Key is the dimString and values contain the sorted dates and assocaited duration between events as well as the
// average duration
val accumMap = accum.value.map(p => {
val d = AlgorithmUtil.getDurations(p._2)
val d = SparkAnalyzer.getDurations(p._2)
p._1 -> (d._1, d._2, d._3)
}).toMap
......
......@@ -3,7 +3,6 @@ package com.cablelabs.eventgen.model
import java.io.InputStream
import java.security.MessageDigest
import com.cablelabs.eventgen.algorithm.AlgorithmUtil
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import org.json4s.JsonAST.{JObject, JValue}
......@@ -26,6 +25,75 @@ object InputDefinition {
val config = mapper.readValue(is, classOf[InputDefinitionYaml])
config.inputDef
}
/**
* Derived from https://github.com/adityashah30/multipolyfit/blob/master/src/polynomial/PolynomialRegression.java
* @param values - the values to convert
* @return - tuple2 where the two Lists (values/weights) have a polynomial applied
* their sizes will equal values.size * degree
*/
private[model] def polynomialWithWeights(degree: Int, values: Seq[Double], weights: Seq[Double]): (Seq[Double], Seq[Double]) = {
val thisDegree = if (degree < 1) 1 else degree
if (thisDegree != 1) (polynomial(thisDegree, values), polynomialWeights(thisDegree, weights))
else (values, weights)
}
/**
* Applies a polynomial function to a ML feature set
* Derived from https://github.com/adityashah30/multipolyfit/blob/master/src/polynomial/PolynomialRegression.java
* @param degree - the polynomial degree. if < 1, 1 will be used
* @param values - the values to which to apply the polynomial function
* @return - a new list who's size is = values.size * degree
*/
private[model] def polynomial(degree: Int, values: Seq[Double]): Seq[Double] = {
val thisDegree = if (degree < 1) 1 else degree
if (thisDegree != 1) {
val out = Array.fill[Double](values.size * thisDegree)(0d)
for (col <- 0 to values.size - 1) {
for (deg <- 0 to thisDegree - 1) {
val index = col * thisDegree + deg
out(index) = Math.pow(values(col), deg + 1)
}
}
out.toSeq
} else values
}
/**
* Derives the field weights based on the polynomial degree parameter
* @param degree - the polynomial degree. if < 1, 1 will be used
* @param weights - the weights without any polynomial function applied
* @return - the weights with a polynomial function applied
*/
private[model] def polynomialWeights(degree: Int, weights: Seq[Double]): Seq[Double] = {
val thisDegree = if (degree < 1) 1 else degree
if (thisDegree != 1) {
val out = Array.fill[Double](weights.size * thisDegree)(0d)
for (col <- 0 to weights.size - 1) {
for (deg <- 0 to thisDegree - 1) {
val index = col * thisDegree + deg
out(index) = weights(col)
}
}
out.toSeq
} else weights
}
/**
* Applies Math.log() to the Math.abs() value of each feature numIter number of times.
* When the original value is 0, it remains.
* @param features - the features to flatten
* @param numIter - the number of times to perform the flattening function
* @return - a new list of flattened features
*/
private[model] def flatten(features: Seq[Double], numIter: Int): Seq[Double] =
if (numIter > 0) {
val flattened = features.map(num => {
if (num == 0) 0
else Math.log(Math.abs(num))
})
flatten(flattened, numIter - 1)
} else features
}
/**
......@@ -192,8 +260,8 @@ class InputDefinition(val temporal: Temporal, val dimensionSet: Set[Dimension],
* @return - the feature set
*/
private def algoFeatures(event: Map[String, Any], factPosition: Int, algo: SupervisedTraining): Seq[Double] =
AlgorithmUtil.polynomial(algo.polyDegree,
AlgorithmUtil.flatten(
InputDefinition.polynomial(algo.polyDegree,
InputDefinition.flatten(
temporalFeatures(event, algo) ++: dimensionFeatures(event, algo) ++: factFeatures(event, factPosition, algo),
algo.flatten))
......@@ -234,13 +302,13 @@ class InputDefinition(val temporal: Temporal, val dimensionSet: Set[Dimension],
def algoWeights(field: AlgorithmRole): Seq[Double] = field match {
case temporal: Temporal => temporal.algoDef match {
case regression: RegressionDefinition =>
AlgorithmUtil.polynomialWeights(regression.polyDegree,
InputDefinition.polynomialWeights(regression.polyDegree,
temporalWeights(regression) ++: dimensionWeights(regression) ++: factWeights(regression, temporal.factPosition))
case _ => Seq()
}
case fact: Fact => fact.algoDef match {
case regression: RegressionDefinition =>
AlgorithmUtil.polynomialWeights(regression.polyDegree,
InputDefinition.polynomialWeights(regression.polyDegree,
temporalWeights(regression) ++: dimensionWeights(regression) ++: factWeights(regression, fact.position))
case _ => Seq()
}
......
package com.cablelabs.eventgen.algorithm
import java.util.Date
import com.cablelabs.eventgen.UnitSpec
/**
* Tests the methods in AlgorithmUtil
*/
class AlgorithmUtilTest extends UnitSpec {
test("polynomial method should return the same list with a degree of 1") {
val list = List[Double](1, 2, 3)
assert(list == AlgorithmUtil.polynomial(1, list))
}
test("polynomial method should return the same list with a degree < 1") {
val list = List[Double](1, 2, 3)
assert(list == AlgorithmUtil.polynomial(0, list))
}
test("polynomial method should return a larger list with a degree of 2") {
val list = List[Double](1, 2, 3)
val result = AlgorithmUtil.polynomial(2, list)
assert(list.size * 2 == result.size)
assert(Math.pow(list(0), 1) == result(0))
assert(Math.pow(list(0), 2) == result(1))
assert(Math.pow(list(1), 1) == result(2))
assert(Math.pow(list(1), 2) == result(3))
assert(Math.pow(list(2), 1) == result(4))
assert(Math.pow(list(2), 2) == result(5))
}
test("polynomial method should return a larger list with a degree of 5") {
val list = List[Double](10, 20, 30)
val result = AlgorithmUtil.polynomial(5, list)
assert(list.size * 5 == result.size)
assert(Math.pow(list(0), 1) == result(0))
assert(Math.pow(list(0), 2) == result(1))
assert(Math.pow(list(0), 3) == result(2))
assert(Math.pow(list(0), 4) == result(3))
assert(Math.pow(list(0), 5) == result(4))
assert(Math.pow(list(1), 1) == result(5))
assert(Math.pow(list(1), 2) == result(6))
assert(Math.pow(list(1), 3) == result(7))
assert(Math.pow(list(1), 4) == result(8))
assert(Math.pow(list(1), 5) == result(9))
assert(Math.pow(list(2), 1) == result(10))
assert(Math.pow(list(2), 2) == result(11))
assert(Math.pow(list(2), 3) == result(12))
assert(Math.pow(list(2), 4) == result(13))
assert(Math.pow(list(2), 5) == result(14))
}
test("polynomialWeights method should return the same list with a degree of 1") {
val list = List[Double](1, 2, 3)
assert(list == AlgorithmUtil.polynomialWeights(1, list))
}
test("polynomialWeights method should return the same list with a degree < 1") {
val list = List[Double](1, 2, 3)
assert(list == AlgorithmUtil.polynomialWeights(0, list))
}
test("polynomialWeights method should return a larger list with a degree of 2") {
val list = List[Double](1, 2, 3)
val result = AlgorithmUtil.polynomialWeights(2, list)
assert(list.size * 2 == result.size)
assert(list(0) == result(0))
assert(list(0) == result(1))
assert(list(1) == result(2))
assert(list(1) == result(3))
}
test("polynomialWeights method should return a larger list with a degree of 5") {
val list = List[Double](1, 2, 3)
val result = AlgorithmUtil.polynomialWeights(5, list)
assert(list.size * 5 == result.size)
assert(list(0) == result(0))
assert(list(0) == result(1))
assert(list(0) == result(2))
assert(list(0) == result(3))
assert(list(0) == result(4))
assert(list(1) == result(5))
assert(list(1) == result(6))
assert(list(1) == result(7))
assert(list(1) == result(8))
assert(list(1) == result(9))
assert(list(2) == result(10))
assert(list(2) == result(11))
assert(list(2) == result(12))
assert(list(2) == result(13))
assert(list(2) == result(14))
}
test("polynomialWithWeights method should return the same lists with a degree of 1") {
val list1 = List[Double](1, 2, 3)
val list2 = List[Double](4, 5, 6)
val converted = AlgorithmUtil.polynomialWithWeights(1, list1, list2)
assert(list1 == converted._1)
assert(list2 == converted._2)
}
test("polynomialWithWeights method should return the same lists with a degree < 1") {
val list1 = List[Double](1, 2, 3)
val list2 = List[Double](4, 5, 6)
val converted = AlgorithmUtil.polynomialWithWeights(0, list1, list2)
assert(list1 == converted._1)
assert(list2 == converted._2)
}
test("polynomialWithWeights method should return the larger lists with a degree of 2") {
val list1 = List[Double](1, 2, 3)
val list2 = List[Double](4, 5, 6)
val converted = AlgorithmUtil.polynomialWithWeights(2, list1, list2)
assert(list1.size * 2 == converted._1.size)
assert(AlgorithmUtil.polynomial(2, list1) == converted._1)
assert(list2.size * 2 == converted._2.size)
assert(AlgorithmUtil.polynomialWeights(2, list2) == converted._2)
}
test("polynomialWithWeights method should return the larger lists with a degree of 5") {
val list1 = List[Double](1, 2, 3)
val list2 = List[Double](4, 5, 6)
val converted = AlgorithmUtil.polynomialWithWeights(5, list1, list2)
assert(list1.size * 5 == converted._1.size)
assert(AlgorithmUtil.polynomial(5, list1) == converted._1)
assert(list2.size * 5 == converted._2.size)
assert(AlgorithmUtil.polynomialWeights(5, list2) == converted._2)
}
test("flatten method should apply a log function once") {
val list = List[Double](10, 20, 30)
val result = AlgorithmUtil.flatten(list, 1)
assert(Math.log(list(0)) == result(0))
assert(Math.log(list(1)) == result(1))
assert(Math.log(list(2)) == result(2))
}
test("flatten method should apply a log function 2x") {
val list = List[Double](10, 20, 30)
val result = AlgorithmUtil.flatten(list, 2)
assert(Math.log(Math.abs(Math.log(list(0)))) == result(0))
assert(Math.log(Math.abs(Math.log(list(1)))) == result(1))
assert(Math.log(Math.abs(Math.log(list(2)))) == result(2))
}
test("getDurations method should return the length of time between dates") {
val dates = List[Date](new Date(1000), new Date(3000), new Date(2000))
val durations = AlgorithmUtil.getDurations(dates)
assert(3 == durations._1.size)
assert(3 == durations._2.size)
assert(new Date(1000) == durations._1(0))
assert(new Date(2000) == durations._1(1))
assert(new Date(3000) == durations._1(2))
assert(0 == durations._2(0))
assert(1000 == durations._2(1))
assert(1000 == durations._2(2))
}
}
package com.cablelabs.eventgen.analysis
import java.util.Date
import com.cablelabs.eventgen.UnitSpec
/**
* Tests the SparkAnalyzer object methods.
*/
class SparkAnalyzerTest extends UnitSpec {
test("getDurations method should return the length of time between dates") {
val dates = List[Date](new Date(1000), new Date(3000), new Date(2000))
val durations = SparkAnalyzer.getDurations(dates)
assert(3 == durations._1.size)
assert(3 == durations._2.size)
assert(new Date(1000) == durations._1(0))
assert(new Date(2000) == durations._1(1))
assert(new Date(3000) == durations._1(2))
assert(0 == durations._2(0))
assert(1000 == durations._2(1))
assert(1000 == durations._2(2))
}
}
......@@ -461,5 +461,141 @@ class InputDefinitionTest extends UnitSpec {
}
}
test("polynomial method should return the same list with a degree of 1") {
val list = List[Double](1, 2, 3)
assert(list == InputDefinition.polynomial(1, list))
}
test("polynomial method should return the same list with a degree < 1") {
val list = List[Double](1, 2, 3)
assert(list == InputDefinition.polynomial(0, list))
}
test("polynomial method should return a larger list with a degree of 2") {
val list = List[Double](1, 2, 3)
val result = InputDefinition.polynomial(2, list)
assert(list.size * 2 == result.size)
assert(Math.pow(list(0), 1) == result(0))
assert(Math.pow(list(0), 2) == result(1))
assert(Math.pow(list(1), 1) == result(2))
assert(Math.pow(list(1), 2) == result(3))
assert(Math.pow(list(2), 1) == result(4))
assert(Math.pow(list(2), 2) == result(5))
}
test("polynomial method should return a larger list with a degree of 5") {
val list = List[Double](10, 20, 30)
val result = InputDefinition.polynomial(5, list)
assert(list.size * 5 == result.size)
assert(Math.pow(list(0), 1) == result(0))
assert(Math.pow(list(0), 2) == result(1))
assert(Math.pow(list(0), 3) == result(2))
assert(Math.pow(list(0), 4) == result(3))
assert(Math.pow(list(0), 5) == result(4))
assert(Math.pow(list(1), 1) == result(5))
assert(Math.pow(list(1), 2) == result(6))
assert(Math.pow(list(1), 3) == result(7))
assert(Math.pow(list(1), 4) == result(8))
assert(Math.pow(list(1), 5) == result(9))
assert(Math.pow(list(2), 1) == result(10))
assert(Math.pow(list(2), 2) == result(11))
assert(Math.pow(list(2), 3) == result(12))
assert(Math.pow(list(2), 4) == result(13))
assert(Math.pow(list(2), 5) == result(14))
}
test("polynomialWeights method should return the same list with a degree of 1") {
val list = List[Double](1, 2, 3)
assert(list == InputDefinition.polynomialWeights(1, list))
}
test("polynomialWeights method should return the same list with a degree < 1") {
val list = List[Double](1, 2, 3)
assert(list == InputDefinition.polynomialWeights(0, list))
}
test("polynomialWeights method should return a larger list with a degree of 2") {
val list = List[Double](1, 2, 3)
val result = InputDefinition.polynomialWeights(2, list)
assert(list.size * 2 == result.size)
assert(list(0) == result(0))
assert(list(0) == result(1))
assert(list(1) == result(2))
assert(list(1) == result(3))
}
test("polynomialWeights method should return a larger list with a degree of 5") {
val list = List[Double](1, 2, 3)
val result = InputDefinition.polynomialWeights(5, list)
assert(list.size * 5 == result.size)
assert(list(0) == result(0))
assert(list(0) == result(1))
assert(list(0) == result(2))
assert(list(0) == result(3))
assert(list(0) == result(4))
assert(list(1) == result(5))
assert(list(1) == result(6))
assert(list(1) == result(7))
assert(list(1) == result(8))
assert(list(1) == result(9))
assert(list(2) == result(10))
assert(list(2) == result(11))
assert(list(2) == result(12))
assert(list(2) == result(13))
assert(list(2) == result(14))
}
test("polynomialWithWeights method should return the same lists with a degree of 1") {
val list1 = List[Double](1, 2, 3)
val list2 = List[Double](4, 5, 6)
val converted = InputDefinition.polynomialWithWeights(1, list1, list2)
assert(list1 == converted._1)
assert(list2 == converted._2)
}
test("polynomialWithWeights method should return the same lists with a degree < 1") {
val list1 = List[Double](1, 2, 3)
val list2 = List[Double](4, 5, 6)
val converted = InputDefinition.polynomialWithWeights(0, list1, list2)
assert(list1 == converted._1)
assert(list2 == converted._2)
}
test("polynomialWithWeights method should return the larger lists with a degree of 2") {
val list1 = List[Double](1, 2, 3)
val list2 = List[Double](4, 5, 6)
val converted = InputDefinition.polynomialWithWeights(2, list1, list2)
assert(list1.size * 2 == converted._1.size)
assert(InputDefinition.polynomial(2, list1) == converted._1)
assert(list2.size * 2 == converted._2.size)
assert(InputDefinition.polynomialWeights(2, list2) == converted._2)
}
test("polynomialWithWeights method should return the larger lists with a degree of 5") {
val list1 = List[Double](1, 2, 3)
val list2 = List[Double](4, 5, 6)
val converted = InputDefinition.polynomialWithWeights(5, list1, list2)
assert(list1.size * 5 == converted._1.size)
assert(InputDefinition.polynomial(5, list1) == converted._1)
assert(list2.size * 5 == converted._2.size)
assert(InputDefinition.polynomialWeights(5, list2) == converted._2)
}
test("flatten method should apply a log function once") {
val list = List[Double](10, 20, 30)