Commit 31978dd8 authored by Steven Pisarski's avatar Steven Pisarski

Making code more functional

parent 41eace3c
......@@ -15,14 +15,9 @@ object AlgorithmUtil {
* their sizes will equal values.size * degree
*/
def polynomialWithWeights(degree: Int, values: List[Double], weights: List[Double]): (List[Double], List[Double]) = {
var thisDegree = degree
if (degree < 1) thisDegree = 1
if (degree != 1) {
(polynomial(thisDegree, values), polynomialWeights(thisDegree, weights))
} else {
(values, weights)
}
val thisDegree = if (degree < 1) 1 else degree
if (thisDegree != 1) (polynomial(thisDegree, values), polynomialWeights(thisDegree, weights))
else (values, weights)
}
/**
......@@ -33,10 +28,8 @@ object AlgorithmUtil {
* @return - a new list who's size is = values.size * degree
*/
def polynomial(degree: Int, values: List[Double]): List[Double] = {
var thisDegree = degree
if (degree < 1) thisDegree = 1
if (degree != 1) {
val thisDegree = if (degree < 1) 1 else degree
if (thisDegree != 1) {
val out = Array.fill[Double](values.size * thisDegree)(0d)
for (col <- 0 to values.size - 1) {
for (deg <- 0 to thisDegree - 1) {
......@@ -45,9 +38,7 @@ object AlgorithmUtil {
}
}
out.toList
} else {
values
}
} else values
}
/**
......@@ -57,8 +48,7 @@ object AlgorithmUtil {
* @return - the weights with a polynomial function applied
*/
def polynomialWeights(degree: Int, weights: List[Double]): List[Double] = {
var thisDegree = degree
if (degree < 1) thisDegree = 1
val thisDegree = if (degree < 1) 1 else degree
if (thisDegree != 1) {
val out = Array.fill[Double](weights.size * thisDegree)(0d)
for (col <- 0 to weights.size - 1) {
......@@ -68,9 +58,7 @@ object AlgorithmUtil {
}
}
out.toList
} else {
weights
}
} else weights
}
/**
......@@ -80,17 +68,14 @@ object AlgorithmUtil {
* @param numIter - the number of times to perform the flattening function
* @return - a new list of flattened features
*/
def flatten(features: List[Double], numIter: Int): List[Double] = {
def flatten(features: List[Double], numIter: Int): List[Double] =
if (numIter > 0) {
val flattened = features.map(num => {
if (num == 0) 0
else Math.log(Math.abs(num))
})
flatten(flattened, numIter - 1)
} else {
features
}
}
} else features
/**
* Returns a tuple containing the list of sorted dates corresponding, a list of durations
......@@ -102,6 +87,7 @@ object AlgorithmUtil {
val sorted = dates.sortBy(x => x.getTime)
val durations = new ArrayBuffer[Double]()
var prevDate = new Date(0)
// TODO - try to achieve same with a higher order function or recursion
sorted.foreach(p => {
if (prevDate.getTime == 0) {
durations += 0d
......@@ -110,11 +96,7 @@ object AlgorithmUtil {
}
prevDate = p
})
var avg = 0d
if (durations.size > 1) {
avg = durations.sum / (durations.size - 1)
}
val avg = if (durations.size > 1) durations.sum / (durations.size - 1) else 0d
assert(sorted.size == durations.size)
(sorted, durations.toList, avg)
}
......
......@@ -85,38 +85,26 @@ trait ClassificationModel extends SupervisedModel {
val pred = model.predict(new DenseVector(features.toArray))
val key = closestKey(pred, labelKeys.size / 2, 0)
val label = labelMap.get(key)
if (label == None) {
throw new RuntimeException("Predicted value not contained in the label map")
} else {
label.get
}
if (label == None) throw new RuntimeException("Predicted value not contained in the label map")
else label.get
}
/**
* Returns the closest key from the predicted to the list holding the classification values
* TODO - write unit test for this private method
*/
private[this] def closestKey(value: Double, index: Int, lastIndex: Int): Double = {
if(index == lastIndex) {
if (index < labelKeys.size && index >= 0 && labelKeys.size > 0) {
labelKeys(index)
} else {
0
}
} else {
var newIndex = 0
if(value > labelKeys(index)) {
newIndex = index / 2
} else {
newIndex = (labelKeys.size - index) / 2 + index
}
private[this] def closestKey(value: Double, index: Int, lastIndex: Int): Double =
if(index == lastIndex)
if (index < labelKeys.size && index >= 0 && labelKeys.size > 0) labelKeys(index)
else 0
else {
val newIndex =
if(value > labelKeys(index)) index / 2
else(labelKeys.size - index) / 2 + index
closestKey(value, newIndex, index)
}
}
override def predictRaw(features: List[Double]): Double = {
model.predict(new DenseVector(features.toArray))
}
override def predictRaw(features: List[Double]): Double = model.predict(new DenseVector(features.toArray))
}
/**
......@@ -135,9 +123,7 @@ trait RegressionModel extends SupervisedModel {
field.convert(predictRaw(features))
}
override def predictRaw(features: List[Double]): Double = {
model.predict(new DenseVector(features.toArray))
}
override def predictRaw(features: List[Double]): Double = model.predict(new DenseVector(features.toArray))
}
/**
......@@ -160,15 +146,15 @@ class NaiveBayesModel(override val field: Field, override val trainingSet: RDD[L
* @param numIterations - the number of times gradient descent will run during training
* @param stepSize - the size of each step taken during gradient descent
*/
class LinearRegressionModel(override val field: Field, override val trainingSet: RDD[LabeledPoint], val weights: List[Double] = List[Double](),
val numIterations: Int, val stepSize: Double) extends RegressionModel {
private[algorithm] var model:SparkRegressionModel = null
class LinearRegressionModel(override val field: Field, override val trainingSet: RDD[LabeledPoint],
val weights: List[Double] = List[Double](),val numIterations: Int, val stepSize: Double)
extends RegressionModel {
val featuresSize = trainingSet.first().features.size
if (weights.isEmpty) {
model = LinearRegressionWithSGD.train(trainingSet, numIterations, stepSize, 1.0)
} else {
private[algorithm] val model = if (weights.isEmpty)
LinearRegressionWithSGD.train(trainingSet, numIterations, stepSize, 1.0)
else {
require(weights.size == featuresSize)
model = LinearRegressionWithSGD.train(trainingSet, numIterations, stepSize, 1.0, new DenseVector(weights.toArray))
LinearRegressionWithSGD.train(trainingSet, numIterations, stepSize, 1.0, new DenseVector(weights.toArray))
}
}
......
......@@ -90,6 +90,7 @@ object SparkAnalyzer {
val fileSystem = FileSystem.get(new URI(fileUri), new Configuration())
val fileEntries = fileSystem.listStatus(new Path(fileUri))
logger.info(s"Searching ${fileEntries.length} files for the proper header with the delimiter '$delim'")
// TODO - try and make more functional
var hdrs: Array[String] = null
var headerLine = ""
var ctr = 0
......@@ -137,6 +138,7 @@ class SparkAnalyzer(val data: RDD[(String, Date, Map[String, Any])], val inputDe
data.groupBy(_._1).flatMap[(String, Map[String, Any])](p => {
var outVals = ("", Map[String, Any]())
var date = new Date(0)
// TODO - try and make more functional
p._2.foreach(e => {
if (e._2.getTime > date.getTime) {
date = e._2
......@@ -276,6 +278,7 @@ class SparkAnalyzer(val data: RDD[(String, Date, Map[String, Any])], val inputDe
var list = new ArrayBuffer[Double]()
var earliest = new Date()
var latest = new Date(0)
// TODO - try and make more functional
durations.foreach(duration => {
duration._2._2.filter(_ != 0).foreach(d => {
total += d
......
......@@ -36,7 +36,7 @@ abstract class InputField(override val name: String, override val description: S
abstract class Temporal(override val name: String, override val description: String = "", val denormFields: List[String],
val algoDef: AlgorithmDefinition, override val factIndex: Int)
extends InputField(name, description) with TemporalRole {
def denormalize(event: Map[String, Any]): mutable.LinkedHashMap[String, Long]
def denormalize(event: Map[String, Any]): Map[String, Long]
}
/**
......@@ -56,9 +56,11 @@ class DateTemporal(override val name: String, override val description: String =
* @param event - the event to parse
* @return - the map
*/
override def denormalize(event: Map[String, Any]): mutable.LinkedHashMap[String, Long] = {
override def denormalize(event: Map[String, Any]): Map[String, Long] = {
val date = eventValue[Date](event)
val dt = new DateTime(date)
// TODO - try and make more functional
var outMap = new mutable.LinkedHashMap[String, Long]()
/**
......@@ -97,14 +99,12 @@ class DateTemporal(override val name: String, override val description: String =
// do nothing
}
if (denormFields.size > 0) {
if (denormFields.size > 0)
denormFields.foreach((field: String) => {
mapField(field)
})
} else {
mapField("timestamp")
}
outMap
else mapField("timestamp")
outMap.toMap
}
}
......
......@@ -22,7 +22,7 @@ object EventUtil {
* @param input - the JsonInput
* @return - returns an InputDefinition
*/
// TODO - use YAML instead and maybe support a single input/output definition
// TODO - use YAML instead and maybe support a single input/output definition and it will be much more functional
def inputDefinition(input: JsonInput): InputDefinition = {
val values = parse(input).values.asInstanceOf[Map[String, Map[String, Any]]]
......
......@@ -109,11 +109,8 @@ class InputDefinition(val temporal: Temporal, val dimensionSet: Set[Dimension],
* @return - a big long String
*/
def dimString(event: Map[String, Any]): String = {
var out = ""
positionalDims.foreach((dim: Dimension) => {
out += dim.name + "::" + dim.eventStringValue(event) + "|"
})
out
val dimIdList = positionalDims.map(d => d.name + "::" + d.eventStringValue(event))
dimIdList filter (_.nonEmpty) mkString "|"
}
/**
......@@ -137,16 +134,8 @@ class InputDefinition(val temporal: Temporal, val dimensionSet: Set[Dimension],
* @return - the mapped event
*/
def fromStringArr(headers: Array[String], values: Array[String]): Map[String, Any] = {
val out = mutable.Map[String, Any]()
if(headers.length != values.length) {
throw new RuntimeException
}
var i = 0
headers.foreach((header: String) => {
out += (headers(i) -> getFieldValue(headers(i), values(i)))
i += 1
})
out.toMap
require(headers.length == values.length)
(for (i <- 0 until headers.length) yield headers(i) -> getFieldValue(headers(i), values(i))).toMap
}
/**
......@@ -155,13 +144,9 @@ class InputDefinition(val temporal: Temporal, val dimensionSet: Set[Dimension],
* @param value - the field's String value to parse
* @return - the reference of the proper type only if the field is located else None
*/
def getFieldValue(name: String, value: String):Any = {
if (fieldMap.get(name) == None) {
None
} else {
fieldMap.get(name).get.convert(value)
}
}
def getFieldValue(name: String, value: String):Any =
if (fieldMap.get(name) == None) None
else fieldMap.get(name).get.convert(value)
/**
* Converts an event to a json4s JValue JSON object
......@@ -181,18 +166,14 @@ class InputDefinition(val temporal: Temporal, val dimensionSet: Set[Dimension],
* @param event - the event to convert
* @return - the JSON String
*/
def toJsonString(event: Map[String, Any]): String = {
JsonMethods.compact(JsonMethods.render(toJson(event)))
}
def toJsonString(event: Map[String, Any]): String = JsonMethods.compact(JsonMethods.render(toJson(event)))
/**
* Converts a JSON object to a standard event Map[String, Any]
* @param json - the JSON object to parse
* @return - the Map
*/
def fromJson(json: JValue): Map[String, Any] = {
fromJsonStr(JsonMethods.compact(JsonMethods.render(json)))
}
def fromJson(json: JValue): Map[String, Any] = fromJsonStr(JsonMethods.compact(JsonMethods.render(json)))
/**
* Converts a JSON payload to a standard event Map[String, Any]
......@@ -200,17 +181,10 @@ class InputDefinition(val temporal: Temporal, val dimensionSet: Set[Dimension],
* @return - the Map
*/
def fromJsonStr(jsonStr: String): Map[String, Any] = {
val input = new StringInput(jsonStr)
val values = parse(input).values.asInstanceOf[Map[String, Any]]
val out = mutable.Map[String, Any]()
values.foreach((entry: (String, Any)) => {
val field = fieldMap.get(entry._1)
if (field != None) {
out += (entry._1 -> fieldMap.get(entry._1).get.convert(entry._2.toString))
}
})
out.toMap
// TODO determine how to get rid of this asInstanceOf call
val values = parse(new StringInput(jsonStr)).values.asInstanceOf[Map[String, Any]]
val filtered = values.filter(e => fieldMap.get(e._1) != None)
filtered.map(e => e._1 -> fieldMap.get(e._1).get.convert(e._2.toString))
}
/**
......@@ -221,10 +195,8 @@ class InputDefinition(val temporal: Temporal, val dimensionSet: Set[Dimension],
def temporalAlgoFeatures(event: Map[String, Any]): List[Double] = {
temporal.algoDef match {
case algoDef:SupervisedTraining =>
algoFeatures(event, temporal.factIndex,
algoDef.polyDegree, algoDef.flatten)
case _ =>
List[Double]()
algoFeatures(event, temporal.factIndex, algoDef.polyDegree, algoDef.flatten)
case _ => List[Double]()
}
}
......@@ -239,8 +211,7 @@ class InputDefinition(val temporal: Temporal, val dimensionSet: Set[Dimension],
fact.algoDef match {
case algoDef:SupervisedTraining =>
algoFeatures(event, positionalFacts.indexOf(fact), algoDef.polyDegree, algoDef.flatten)
case _ =>
List[Double]()
case _ => List[Double]()
}
}
......@@ -248,31 +219,17 @@ class InputDefinition(val temporal: Temporal, val dimensionSet: Set[Dimension],
* Return the training features for the ML algorithms
* @param event - the event to parse
* @param factIndex - generates the feature set up to the index of the fact requested.
* When 0, no facts will be included. When -1, all will be generated
* When 0, no facts will be included. When < 0, all will be generated
* @return - the feature set
*/
def algoFeatures(event: Map[String, Any], factIndex: Int, polyDegree: Int, flatten: Int): List[Double] = {
var featureList = ArrayBuffer[Double]()
// Populate temporal features
val temporalValues = temporal.denormalize(event)
temporalValues.foreach((entry: (String, Long)) => {
featureList += entry._2.toDouble
})
var featureList = temporal.denormalize(event).map(f => f._2.toDouble).toList
// Populate dimension features
positionalDims.foreach (dim => {
featureList += dim.mlTrainingValue(event)
})
var factIndexToInclude = factIndex
if (factIndexToInclude == -1) factIndexToInclude = facts.size
for (i <- 0 to factIndexToInclude - 1) {
featureList += positionalFacts(i).mlTrainingValue(event)
}
AlgorithmUtil.polynomial(polyDegree,
AlgorithmUtil.flatten(featureList.toList, flatten))
positionalDims.foreach(dim => featureList = featureList :+ dim.mlTrainingValue(event))
val factIndexToInclude = if (factIndex < 0) facts.size else factIndex
for (i <- 0 until factIndexToInclude)
featureList = featureList :+ positionalFacts(i).mlTrainingValue(event)
AlgorithmUtil.polynomial(polyDegree, AlgorithmUtil.flatten(featureList.toList, flatten))
}
/**
......@@ -282,34 +239,23 @@ class InputDefinition(val temporal: Temporal, val dimensionSet: Set[Dimension],
def temporalAlgoWeights(): List[Double] = {
temporal.algoDef match {
case algoDef:SupervisedTraining =>
val weightList = ArrayBuffer[Double]()
var weightList = List[Double]()
// TODO - add these weights to the temporal definition
// Populate temporal features
var weight = 100
for (i <- 1 to temporal.denormFields.size) {
weightList += weight
}
for (i <- 1 to temporal.denormFields.size) weightList = weightList.::(100)
// Populate dimension features
weight = 2000
positionalDims.foreach (dim => {
weightList += weight
})
positionalDims.foreach(dim => weightList = weightList.::(2000))
if (temporal.factIndex < 0)
positionalFacts.foreach (fact => weightList = weightList.::(10))
else
for (i <- 0 to temporal.factIndex - 1)
weightList = weightList.::(10)
weight = 10
if (temporal.factIndex == -1) {
positionalFacts.foreach (fact => {
weightList += weight
})
} else {
for (i <- 0 to temporal.factIndex - 1) {
weightList += weight
}
}
AlgorithmUtil.polynomialWeights(algoDef.polyDegree, weightList.toList)
case _ =>
List[Double]()
}
}
}
\ No newline at end of file
......@@ -2,8 +2,6 @@ package com.cablelabs.eventgen.model
import com.cablelabs.eventgen.akka.JMSRouteType.JMSRouteType
import scala.collection.mutable
/**
* Defines how the generated data will be routed
* @param protocol - the AMQ protocol
......@@ -28,14 +26,11 @@ class OutputDefinition(val protocol: String, val host: String, val port: Int, va
val id = s"$host:$port/$routeType/$name"
// Ensure each field name is unique
private val inNameSet = mutable.Set[String]()
private val outNameSet = mutable.Set[String]()
fields.foreach(f => {
inNameSet += f.inputField.name
outNameSet += f.name
})
private val inNameSet = fields.map(f => f.inputField.name)
private val outNameSet = fields.map(f => f.name)
require(inNameSet.size == outNameSet.size && outNameSet.size == fields.size)
// TODO make more functional
var hasTemporal = false
fields.foreach(p => {
if (p.inputField.isInstanceOf[Temporal]) hasTemporal = true
......@@ -49,16 +44,11 @@ class OutputDefinition(val protocol: String, val host: String, val port: Int, va
* @param rawData - the raw generated event data using the input definition keys
* @return - new map with keys corresponding to each OutputField.name
*/
private[model] def remap(rawData: Map[String, Any]): Map[String, Any] = {
val out = new mutable.HashMap[String, Any]()
fields.foreach(f => {
if (rawData.get(f.inputField.name) != None) {
val formatted = f.format(rawData)
out += formatted._1 -> formatted._2
}
})
out.toMap
}
private[model] def remap(rawData: Map[String, Any]): Map[String, Any] =
fields.filter(f => rawData.get(f.inputField.name) != None).map(f => {
val formatted = f.format(rawData)
formatted._1 -> formatted._2
}).toMap
/**
* Applies the format() function on the remapped event payload for serialization
......
......@@ -134,6 +134,7 @@ class OutputDefinitionTest extends UnitSpec {
}
test("generate method should apply the data to a velocity template") {
Velocity.init(OutputEventFormatters.velocityProps)
val ctx = new VelocityContext()
val template = Velocity.getTemplate("OutputDefSpec.vm")
val od = new OutputDefinition(protocol, host, port, routeTypeQueue, destName, fields,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment