Commit 8163d639 authored by Steven Pisarski's avatar Steven Pisarski

Condensing and refactoring of input and output definitions. Also working...

Condensing and refactoring of input and output definitions. Also working towards making the change from input definition being defined in JSON to YAML.
parent d24d0cc6
......@@ -6,7 +6,7 @@ import java.util.Properties
import com.cablelabs.eventgen.algorithm.Model
import com.cablelabs.eventgen.analysis.SparkAnalyzer
import com.cablelabs.eventgen.model.{EventUtil, SupervisedTraining}
import com.cablelabs.eventgen.model.{InputDefinition, SupervisedTraining}
import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
......@@ -379,7 +379,7 @@ class AnalyzeDataYaml(@JsonProperty("schemaUri") _schemaUri: String,
require(_schemaUri != null)
private val schemaFs = FileSystem.get(new URI(_schemaUri), new Configuration())
val inputDef = EventUtil.inputDefinition(new StreamInput(schemaFs.open(new Path(_schemaUri))))
val inputDef = InputDefinition.inputDefinition(new StreamInput(schemaFs.open(new Path(_schemaUri))))
val appName = if (_appName == null) "DataAnalyzer" else _appName
......
......@@ -112,9 +112,12 @@ object Generator extends App {
definition.iterations, definition.stepSize)
}
case definition: ConstantDefinition => definition match {
case definition: ConstantIntDefinition => new ConstantIntModel(inputDef.temporal, definition.value.asInstanceOf[Long])
case definition: ConstantFloatDefinition => new ConstantFloatModel(inputDef.temporal, definition.asInstanceOf[Double])
case definition: ConstantStringDefinition => new ConstantStringModel(inputDef.temporal, definition.asInstanceOf[String])
case definition: ConstantIntDefinition =>
new ConstantIntModel(inputDef.temporal, definition.value.asInstanceOf[Long])
case definition: ConstantFloatDefinition =>
new ConstantFloatModel(inputDef.temporal, definition.asInstanceOf[Double])
case definition: ConstantStringDefinition =>
new ConstantStringModel(inputDef.temporal, definition.asInstanceOf[String])
}
}
......@@ -319,11 +322,11 @@ class GeneratorYaml(@JsonProperty("schemaUri") _schemaUri: String,
require(_schemaUri != null)
private val schemaFs = FileSystem.get(new URI(_schemaUri), new Configuration())
val inputDef = EventUtil.inputDefinition(new StreamInput(schemaFs.open(new Path(_schemaUri))))
val inputDef = InputDefinition.inputDefinition(new StreamInput(schemaFs.open(new Path(_schemaUri))))
require(_outputDefUri != null)
private val outputDefFs = FileSystem.get(new URI(_outputDefUri), new Configuration())
val outputDefs = EventUtil.outputDefinition(outputDefFs.open(new Path(_outputDefUri)), inputDef)
val outputDefs = OutputDefinition.outputDefinition(outputDefFs.open(new Path(_outputDefUri)), inputDef)
val appName = if (_appName == null) "EventGenerator" else _appName
......
package com.cablelabs.eventgen.model
import java.io.InputStream
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import org.apache.velocity.VelocityContext
import org.apache.velocity.app.Velocity
import org.json4s.JsonInput
import org.json4s.native.JsonMethods._
import scala.collection.immutable.HashSet
/**
* Utilities for events.
*/
object EventUtil {
/**
* Parses the file that defines the inbound data from which to learn (aka schema)
* @param input - the JsonInput
* @return - returns an InputDefinition
*/
// TODO - use YAML instead and maybe support a single input/output definition and it will be much more functional
def inputDefinition(input: JsonInput): InputDefinition = {
val values = parse(input).values.asInstanceOf[Map[String, Map[String, Any]]]
var temporal: Temporal = null
var dimensions = new HashSet[Dimension]
var facts = new HashSet[Fact]
/**
* Closure to parse the event schema fields
*/
val parseFields = (parent: (String, Map[String, Any])) => {
val name = parent._1
val childMap = parent._2
val dataType = childMap.get("type").get.asInstanceOf[String]
val description = childMap.get("desc").get.asInstanceOf[String]
val role = childMap.get("role").get.asInstanceOf[Map[String, Any]]
val roleType = role.get("type").get.asInstanceOf[String]
/**
* Closure for populating all InputDefinition fields
* @param dataType - the field's data type
* @param roleType - the type of role
*/
def populateFields(dataType: String, roleType: String) = roleType match {
case "temporal" =>
if (temporal != null) throw new RuntimeException("Can only have one temporal field")
val denormFields = role.get("denormFields").get.asInstanceOf[List[String]]
temporal = createTemporal(dataType, denormFields)
case "dimension" =>
val position = role.get("position").get.asInstanceOf[BigInt]
dimensions += createDimension(dataType, position.toInt)
case "fact" =>
val position = role.get("position").get.asInstanceOf[BigInt]
facts += createFact(dataType, position.toInt)
}
/**
* Closure for creating Temporal objects
* @param dt - the field's data type
* @return - a Temporal object
*/
def createTemporal(dt: String, denormFields: List[String]) = {
// Date is currently the only temporal type
val dateFmtStr = childMap.get("dateFormat").get.asInstanceOf[String]
val ml = role.get("algo").get.asInstanceOf[Map[String, Any]]
val factIndex: Int = ml.get("factIndex").get.toString.toInt
new DateTemporal(name, description, denormFields, algo(), dateFmtStr, factIndex)
}
/**
* Returns the AlgorithmDefinition for the current record in the event definition file
* @return
*/
def algo(): AlgorithmDefinition = {
val algoItem = role.get("algo").get.asInstanceOf[Map[String, Any]]
val flatten = algoItem.get("flatten").get.toString.toInt
val polyDegree = algoItem.get("polyDegree").get.toString.toInt
val algoName = algoItem.get("name").get.asInstanceOf[String]
algoName match {
case "linearRegression" =>
new LinearRegressionDefinition(flatten, polyDegree, algoItem.get("iterations").get.toString.toInt,
algoItem.get("stepSize").get.asInstanceOf[Double])
case "naiveBayes" =>
new NaiveBayesDefinition(flatten, polyDegree, algoItem.get("lambda").get.asInstanceOf[Double])
case "constant" =>
// TODO - test me
new ConstantIntDefinition(algoItem.get("value").get.asInstanceOf[BigInt].toLong)
}
}
/**
* Closure for creating Dimension objects
* @param dt - the field's data type
* @param p - the field's dimensional position
* @return - a Dimension object
*/
def createDimension(dt: String, p: Int):Dimension = dt match {
case "date" =>
val dateFmtStr = childMap.get("dateFormat").get.asInstanceOf[String]
new DateDimension(name, description, p, dateFmtStr)
case "string" => new StringDimension(name, description, p)
case _ => dataType match {
case "integer" =>
new IntegerDimension(name, description, p)
case "float" =>
new FloatDimension(name, description, p)
case _ =>
new StringDimension(name, description, p)
}
}
/**
* Closure for creating Fact objects
* @param dt - the field's data type
* @param priority - the fact's priority
*/
def createFact(dt: String, priority: Int) = dt match {
case "date" =>
val dateFmtStr = childMap.get("dateFormat").get.asInstanceOf[String]
new DateFact(name, description, priority, algo(), dateFmtStr)
case "string" => new StringFact(name, description, priority, algo())
case _ => dt match {
case "integer" =>
new IntegerFact(name, description, priority, algo())
case "float" =>
new FloatFact(name, description, priority, algo())
case _ =>
new StringFact(name, description, priority, algo())
}
}
populateFields(dataType, roleType)
}
values.foreach((parent: (String, Map[String, Any])) => {
parseFields(parent)
})
if (temporal == null) throw new RuntimeException("Must have one temporal field")
if (dimensions.size < 1) throw new RuntimeException("Must have at least one dimension field")
if (facts.size < 1) throw new RuntimeException("Must have at least one fact field")
new InputDefinition(temporal, dimensions.toSet, facts.toSet)
}
/**
* Parses the file that defines the outbound data
* @param is - the InputStream to the output definition
* @param inputDef - the input definition
* @return - returns an OutputDefinition
*/
def outputDefinition(is: InputStream, inputDef: InputDefinition): Set[OutputDefinition] = {
val out = scala.collection.mutable.Set[OutputDefinition]()
val mapper = new ObjectMapper(new YAMLFactory())
val config = mapper.readValue(is, classOf[OutputDefsYaml])
config.definitions.foreach(config => {
val fieldSet = scala.collection.mutable.Set[OutputField]()
config.fields.foreach(field => {
val inputFieldItem = inputDef.fieldMap.get(field.inputFieldName)
if (inputFieldItem != None) {
val inputField = inputFieldItem.get
inputField match {
case role: IntegerRole =>
fieldSet += new IntOutput(field.name, field.description, inputField)
case role: FloatRole =>
fieldSet += new FloatOutput(field.name, field.description, inputField)
case role: StringRole =>
fieldSet += new StringOutput(field.name, field.description, inputField)
case role: DateRole =>
fieldSet += new DateOutput(field.name, field.description, field.dateFmtStr, inputField)
}
}
})
config.format match {
case "json" =>
out += new OutputDefinition(config.prototcol, config.host, config.port, config.routeType, config.name,
fieldSet.toSet, OutputEventFormatters.jsonFormat)
case _ =>
Velocity.init(OutputEventFormatters.velocityProps)
val ctx = new VelocityContext()
val template = Velocity.getTemplate(config.format)
val fmt = OutputEventFormatters.velocityFormat(ctx, template)
out += new OutputDefinition(config.prototcol, config.host, config.port, config.routeType, config.name,
fieldSet.toSet, fmt)
}
})
out.toSet
}
}
......@@ -4,9 +4,145 @@ import java.security.MessageDigest
import com.cablelabs.eventgen.algorithm.AlgorithmUtil
import org.json4s.JsonAST.{JObject, JValue}
import org.json4s.StringInput
import org.json4s.native.JsonMethods
import org.json4s.native.JsonMethods._
import org.json4s.{JsonInput, StringInput}
/**
* Object used to create InputDefinition objects
*/
object InputDefinition {
/**
* Parses the file that defines the inbound data from which to learn (aka schema)
* @param input - the JsonInput
* @return - returns an InputDefinition
*/
// TODO - use YAML instead and maybe support a single input/output definition and it will be much more functional
def inputDefinition(input: JsonInput): InputDefinition = {
val values = parse(input).values.asInstanceOf[Map[String, Map[String, Any]]]
var temporal: Temporal = null
var dimensions = Set[Dimension]()
var facts = Set[Fact]()
/**
* Closure to parse the event schema fields
*/
val parseFields = (parent: (String, Map[String, Any])) => {
val name = parent._1
val childMap = parent._2
val dataType = childMap.get("type").get.asInstanceOf[String]
val description = childMap.get("desc").get.asInstanceOf[String]
val role = childMap.get("role").get.asInstanceOf[Map[String, Any]]
val roleType = role.get("type").get.asInstanceOf[String]
/**
* Closure for populating all InputDefinition fields
* @param dataType - the field's data type
* @param roleType - the type of role
*/
def populateFields(dataType: String, roleType: String) = roleType match {
case "temporal" =>
if (temporal != null) throw new RuntimeException("Can only have one temporal field")
val denormFields = role.get("denormFields").get.asInstanceOf[List[String]]
temporal = createTemporal(dataType, denormFields)
case "dimension" =>
val position = role.get("position").get.asInstanceOf[BigInt]
dimensions += createDimension(dataType, position.toInt)
case "fact" =>
val position = role.get("position").get.asInstanceOf[BigInt]
facts += createFact(dataType, position.toInt)
}
/**
* Closure for creating Temporal objects
* @param dt - the field's data type
* @return - a Temporal object
*/
def createTemporal(dt: String, denormFields: List[String]) = {
// Date is currently the only temporal type
val dateFmtStr = childMap.get("dateFormat").get.asInstanceOf[String]
val ml = role.get("algo").get.asInstanceOf[Map[String, Any]]
val factIndex: Int = ml.get("factIndex").get.toString.toInt
new DateTemporal(name, description, denormFields, algo(), dateFmtStr, factIndex)
}
/**
* Returns the AlgorithmDefinition for the current record in the event definition file
* @return
*/
def algo(): AlgorithmDefinition = {
val algoItem = role.get("algo").get.asInstanceOf[Map[String, Any]]
val flatten = algoItem.get("flatten").get.toString.toInt
val polyDegree = algoItem.get("polyDegree").get.toString.toInt
val algoName = algoItem.get("name").get.asInstanceOf[String]
algoName match {
case "linearRegression" =>
new LinearRegressionDefinition(flatten, polyDegree, algoItem.get("iterations").get.toString.toInt,
algoItem.get("stepSize").get.asInstanceOf[Double])
case "naiveBayes" =>
new NaiveBayesDefinition(flatten, polyDegree, algoItem.get("lambda").get.asInstanceOf[Double])
case "constant" =>
// TODO - test me
new ConstantIntDefinition(algoItem.get("value").get.asInstanceOf[BigInt].toLong)
}
}
/**
* Closure for creating Dimension objects
* @param dt - the field's data type
* @param p - the field's dimensional position
* @return - a Dimension object
*/
def createDimension(dt: String, p: Int):Dimension = dt match {
case "date" =>
val dateFmtStr = childMap.get("dateFormat").get.asInstanceOf[String]
new DateDimension(name, description, p, dateFmtStr)
case "string" => new StringDimension(name, description, p)
case _ => dataType match {
case "integer" =>
new IntegerDimension(name, description, p)
case "float" =>
new FloatDimension(name, description, p)
case _ =>
new StringDimension(name, description, p)
}
}
/**
* Closure for creating Fact objects
* @param dt - the field's data type
* @param priority - the fact's priority
*/
def createFact(dt: String, priority: Int) = dt match {
case "date" =>
val dateFmtStr = childMap.get("dateFormat").get.asInstanceOf[String]
new DateFact(name, description, priority, algo(), dateFmtStr)
case "string" => new StringFact(name, description, priority, algo())
case _ => dt match {
case "integer" =>
new IntegerFact(name, description, priority, algo())
case "float" =>
new FloatFact(name, description, priority, algo())
case _ =>
new StringFact(name, description, priority, algo())
}
}
populateFields(dataType, roleType)
}
values.foreach((parent: (String, Map[String, Any])) => {
parseFields(parent)
})
if (temporal == null) throw new RuntimeException("Must have one temporal field")
if (dimensions.size < 1) throw new RuntimeException("Must have at least one dimension field")
if (facts.size < 1) throw new RuntimeException("Must have at least one fact field")
new InputDefinition(temporal, dimensions.toSet, facts.toSet)
}
}
/**
* Class defining the input fields used for initializing the event Generator
......
package com.cablelabs.eventgen.model
import java.io.InputStream
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import org.apache.velocity.VelocityContext
import org.apache.velocity.app.Velocity
/**
* Object used to create sets of OutputDefinition objects
*/
object OutputDefinition {
/**
* Parses the file that defines the outbound data
* @param is - the InputStream to the output definition
* @param inputDef - the input definition
* @return - returns an OutputDefinition
*/
def outputDefinition(is: InputStream, inputDef: InputDefinition): Set[OutputDefinition] = {
val out = scala.collection.mutable.Set[OutputDefinition]()
val mapper = new ObjectMapper(new YAMLFactory())
val config = mapper.readValue(is, classOf[OutputDefsYaml])
config.definitions.foreach(config => {
val fieldSet = scala.collection.mutable.Set[OutputField]()
config.fields.foreach(field => {
val inputFieldItem = inputDef.fieldMap.get(field.inputFieldName)
if (inputFieldItem != None) {
val inputField = inputFieldItem.get
inputField match {
case role: IntegerRole =>
fieldSet += new IntOutput(field.name, field.description, inputField)
case role: FloatRole =>
fieldSet += new FloatOutput(field.name, field.description, inputField)
case role: StringRole =>
fieldSet += new StringOutput(field.name, field.description, inputField)
case role: DateRole =>
fieldSet += new DateOutput(field.name, field.description, field.dateFmtStr, inputField)
}
}
})
config.format match {
case "json" =>
out += new OutputDefinition(config.prototcol, config.host, config.port, config.routeType, config.name,
fieldSet.toSet, OutputEventFormatters.jsonFormat)
case _ =>
Velocity.init(OutputEventFormatters.velocityProps)
val ctx = new VelocityContext()
val template = Velocity.getTemplate(config.format)
val fmt = OutputEventFormatters.velocityFormat(ctx, template)
out += new OutputDefinition(config.prototcol, config.host, config.port, config.routeType, config.name,
fieldSet.toSet, fmt)
}
})
out.toSet
}
}
/**
* Defines how the generated data will be routed
* @param protocol - the AMQ protocol
......@@ -56,3 +117,40 @@ class OutputDefinition(val protocol: String, val host: String, val port: Int, va
def convert(rawData: Map[String, Any]): Any = formatter(remap(rawData))
}
import com.fasterxml.jackson.annotation.JsonProperty
import scala.collection.JavaConverters._
/**
* Used by jackson when parsing the OutputDefinition YAML file
*/
class OutputDefsYaml(@JsonProperty("outputDefs") jDefs: java.util.List[OutputDefinitionYaml]) {
require(jDefs != null && jDefs.size() > 0)
val definitions = jDefs.asScala.toList
}
class OutputDefinitionYaml(@JsonProperty("protocol") val prototcol: String,
@JsonProperty("host") val host: String,
@JsonProperty("port") val port: Int,
@JsonProperty("routeType") val routeType: String,
@JsonProperty("name") val name: String,
@JsonProperty("format") val format: String,
@JsonProperty("fields") jFields: java.util.List[OutputFieldYaml]) {
require(prototcol != null)
require(host != null)
require(port > 0)
require(routeType != null && (routeType == "topic" || routeType == "queue"))
require(name != null)
require(format != null)
require(jFields != null && jFields.size() > 1)
val fields = jFields.asScala.toList
}
class OutputFieldYaml(@JsonProperty("name") val name: String,
@JsonProperty("description") val description: String = "",
@JsonProperty("inputFieldName") val inputFieldName: String,
@JsonProperty("dateFmtStr") val dateFmtStr: String) {
require(name != null)
require(inputFieldName != null)
}
package com.cablelabs.eventgen.model
import com.fasterxml.jackson.annotation.JsonProperty
import scala.collection.JavaConverters._
/**
* Used by jackson when parsing the OutputDefinition YAML file
*/
class OutputDefsYaml(@JsonProperty("outputDefs") jDefs: java.util.List[OutputDefinitionYaml]) {
require(jDefs != null && jDefs.size() > 0)
val definitions = jDefs.asScala.toList
}
class OutputDefinitionYaml(@JsonProperty("protocol") val prototcol: String,
@JsonProperty("host") val host: String,
@JsonProperty("port") val port: Int,
@JsonProperty("routeType") val routeType: String,
@JsonProperty("name") val name: String,
@JsonProperty("format") val format: String,
@JsonProperty("fields") jFields: java.util.List[OutputFieldYaml]) {
require(prototcol != null)
require(host != null)
require(port > 0)
require(routeType != null && (routeType == "topic" || routeType == "queue"))
require(name != null)
require(format != null)
require(jFields != null && jFields.size() > 1)
val fields = jFields.asScala.toList
}
class OutputFieldYaml(@JsonProperty("name") val name: String,
@JsonProperty("description") val description: String = "",
@JsonProperty("inputFieldName") val inputFieldName: String,
@JsonProperty("dateFmtStr") val dateFmtStr: String) {
require(name != null)
require(inputFieldName != null)
}
......@@ -4,7 +4,7 @@ import java.io.{File, FileInputStream}
import java.text.SimpleDateFormat
import java.util.Date
import com.cablelabs.eventgen.model.EventUtil
import com.cablelabs.eventgen.model.{InputDefinition, OutputDefinition}
import org.json4s.FileInput
/**
......@@ -14,8 +14,10 @@ class EngineTest extends EngineTester {
val dateFormat = "MM-dd-yyyy HH:mm:ss a"
val dateFormatter = new SimpleDateFormat(dateFormat)
var inputDef = EventUtil.inputDefinition(new FileInput(new File("testData/generator/definition/cm-constant-short.json")))
val outputDefs = EventUtil.outputDefinition(new FileInputStream(new File("testData/cm/definition/cm-out.yaml")), inputDef)
var inputDef = InputDefinition.inputDefinition(
new FileInput(new File("testData/generator/definition/cm-constant-short.json")))
val outputDefs = OutputDefinition.outputDefinition(
new FileInputStream(new File("testData/cm/definition/cm-out.yaml")), inputDef)
val eventUri = new File("testData/generator/events/cm.txt").toURI.toString
val delim = '|'
val outDef = outputDefs.iterator.next()
......
......@@ -4,7 +4,7 @@ import java.io.{File, FileInputStream}
import java.text.SimpleDateFormat
import java.util.Date
import com.cablelabs.eventgen.model.{ConstantIntDefinition, EventUtil}
import com.cablelabs.eventgen.model.{ConstantIntDefinition, InputDefinition, OutputDefinition}
import org.json4s.FileInput
/**
......@@ -14,8 +14,9 @@ class GeneratorCmConstantTest extends EngineTester {
val dateFormat = "MM-dd-yyyy HH:mm:ss a"
val dateFormatter = new SimpleDateFormat(dateFormat)
val inputDef = EventUtil.inputDefinition(new FileInput(new File("testData/cm/definition/cm-constant.json")))
val outputDefs = EventUtil.outputDefinition(new FileInputStream(new File("testData/cm/definition/cm-out.yaml")), inputDef)
val inputDef = InputDefinition.inputDefinition(new FileInput(new File("testData/cm/definition/cm-constant.json")))
val outputDefs = OutputDefinition.outputDefinition(
new FileInputStream(new File("testData/cm/definition/cm-out.yaml")), inputDef)
val eventUri = new File("testData/cm/events/cm_1a.txt").toURI.toString
val delim = '|'
......