Commit 8145df3a authored by Steven Pisarski's avatar Steven Pisarski

Changed input definition configuration from JSON to YAML.

parent 68206c90
......@@ -16,7 +16,6 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkContext
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.json4s.StreamInput
import org.quartz.TriggerBuilder._
import org.quartz._
import org.quartz.impl.StdSchedulerFactory
......@@ -379,7 +378,7 @@ class AnalyzeDataYaml(@JsonProperty("schemaUri") _schemaUri: String,
require(_schemaUri != null)
private val schemaFs = FileSystem.get(new URI(_schemaUri), new Configuration())
val inputDef = InputDefinition.inputDefinition(new StreamInput(schemaFs.open(new Path(_schemaUri))))
val inputDef = InputDefinition.inputDefinition(schemaFs.open(new Path(_schemaUri)))
val appName = if (_appName == null) "DataAnalyzer" else _appName
......
......@@ -15,7 +15,6 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.json4s.StreamInput
import org.slf4j.LoggerFactory
/**
......@@ -44,11 +43,13 @@ object Generator extends App {
val seedFilter: ((String, Map[String, Any])) => Boolean =
if (config.seedFilters.isEmpty) {
def filter(filters: Seq[String]): ((String, Map[String, Any])) => Boolean = {entry => true}
filter(config.seedFilters)
logger.info(s"Not using a seed filter")
def filter(filters: List[String]): ((String, Map[String, Any])) => Boolean = {entry => true}
filter(config.seedFilters.toList)
} else {
def filter(filters: Seq[String]): ((String, Map[String, Any])) => Boolean = { entry =>
def loop(leftovers: Seq[String]): Boolean = leftovers match {
logger.info(s"Adding ${config.seedFilters.size} seed filter values - ${config.seedFilters}")
def filter(filters: List[String]): ((String, Map[String, Any])) => Boolean = { entry =>
def loop(leftovers: List[String]): Boolean = leftovers match {
case x :: xs =>
if (entry._1.contains(x)) true
else loop(leftovers.tail)
......@@ -56,7 +57,7 @@ object Generator extends App {
}
loop(filters)
}
filter(config.seedFilters)
filter(config.seedFilters.toList)
}
val sc = if (config.sparkUri != null)
......@@ -113,11 +114,11 @@ object Generator extends App {
}
case definition: ConstantDefinition => definition match {
case definition: ConstantIntDefinition =>
new ConstantIntModel(inputDef.temporal, definition.value.asInstanceOf[Long])
new ConstantIntModel(inputDef.temporal, definition.value.toString.toLong)
case definition: ConstantFloatDefinition =>
new ConstantFloatModel(inputDef.temporal, definition.asInstanceOf[Double])
new ConstantFloatModel(inputDef.temporal, definition.value.toString.toDouble)
case definition: ConstantStringDefinition =>
new ConstantStringModel(inputDef.temporal, definition.asInstanceOf[String])
new ConstantStringModel(inputDef.temporal, definition.value.toString)
}
}
......@@ -322,7 +323,7 @@ class GeneratorYaml(@JsonProperty("schemaUri") _schemaUri: String,
require(_schemaUri != null)
private val schemaFs = FileSystem.get(new URI(_schemaUri), new Configuration())
val inputDef = InputDefinition.inputDefinition(new StreamInput(schemaFs.open(new Path(_schemaUri))))
val inputDef = InputDefinition.inputDefinition(schemaFs.open(new Path(_schemaUri)))
require(_outputDefUri != null)
private val outputDefFs = FileSystem.get(new URI(_outputDefUri), new Configuration())
......
......@@ -33,7 +33,7 @@ abstract class InputField(override val name: String, override val description: S
* @param algoDef - the algorithm that determines the event frequency
* @param factIndex - the last fact index to be used in the temporal training set
*/
abstract class Temporal(override val name: String, override val description: String = "", val denormFields: List[String],
abstract class Temporal(override val name: String, override val description: String = "", val denormFields: Seq[String],
val algoDef: AlgorithmDefinition, override val factIndex: Int)
extends InputField(name, description) with TemporalRole {
def denormalize(event: Map[String, Any]): Map[String, Long]
......@@ -46,7 +46,7 @@ abstract class Temporal(override val name: String, override val description: Str
* @param factIndex - the last fact index to be used in the temporal training set
*/
class DateTemporal(override val name: String, override val description: String = "",
override val denormFields: List[String], override val algoDef: AlgorithmDefinition,
override val denormFields: Seq[String], override val algoDef: AlgorithmDefinition,
dateFmtStr: String, override val factIndex: Int)
extends Temporal(name, description, denormFields, algoDef, factIndex) with DateRole {
val dateFormat = new SimpleDateFormat(dateFmtStr)
......
......@@ -19,33 +19,24 @@ object OutputDefinition {
* @return - returns the configured OutputDefinitions
*/
def outputDefinitions(is: InputStream, inputDef: InputDefinition): Set[OutputDefinition] = {
val out = scala.collection.mutable.Set[OutputDefinition]()
val mapper = new ObjectMapper(new YAMLFactory())
val config = mapper.readValue(is, classOf[OutputDefsYaml])
config.definitions.foreach(config => {
val fieldSet = scala.collection.mutable.Set[OutputField]()
config.fields.foreach(field => {
val inputFieldItem = inputDef.fieldMap.get(field.inputFieldName)
if (inputFieldItem != None) {
val inputField = inputFieldItem.get
inputField match {
case role: IntegerRole =>
fieldSet += new IntOutput(field.name, field.description, inputField)
case role: FloatRole =>
fieldSet += new FloatOutput(field.name, field.description, inputField)
case role: StringRole =>
fieldSet += new StringOutput(field.name, field.description, inputField)
case role: DateRole =>
fieldSet += new DateOutput(field.name, field.description, field.dateFmtStr, inputField)
}
val config = new ObjectMapper(new YAMLFactory()).readValue(is, classOf[OutputDefsYaml])
config.definitions.map(config => {
val fieldSet = config.fields.filter(f => inputDef.fieldMap.get(f.inputFieldName) != None).map(field => {
inputDef.fieldMap(field.inputFieldName) match {
case role: IntegerRole =>
new IntOutput(field.name, field.description, inputDef.fieldMap(field.inputFieldName))
case role: FloatRole =>
new FloatOutput(field.name, field.description, inputDef.fieldMap(field.inputFieldName))
case role: StringRole =>
new StringOutput(field.name, field.description, inputDef.fieldMap(field.inputFieldName))
case role: DateRole =>
new DateOutput(field.name, field.description, field.dateFmtStr, inputDef.fieldMap(field.inputFieldName))
}
})
config.format match {
case "json" =>
out += new OutputDefinition(config.prototcol, config.host, config.port, config.routeType, config.name,
new OutputDefinition(config.prototcol, config.host, config.port, config.routeType, config.name,
fieldSet.toSet, OutputEventFormatters.jsonFormat)
case _ =>
Velocity.init(OutputEventFormatters.velocityProps)
......@@ -53,11 +44,10 @@ object OutputDefinition {
val template = Velocity.getTemplate(config.format)
val fmt = OutputEventFormatters.velocityFormat(ctx, template)
out += new OutputDefinition(config.prototcol, config.host, config.port, config.routeType, config.name,
new OutputDefinition(config.prototcol, config.host, config.port, config.routeType, config.name,
fieldSet.toSet, fmt)
}
})
out.toSet
}).toSet
}
}
......
{
"temporal_field": {
"desc": "Temporal 1",
"type": "date",
"dateFormat": "MM/dd/yyyy HH:mm:ss a",
"role": {
"type": "temporal",
"denormFields": ["hour_of_day", "day_of_month"],
"algo": {
"name" : "linearRegression",
"flatten": 2,
"polyDegree": 3,
"factIndex": -1,
"iterations": 20,
"stepSize": 0.001
}
}
},
"int_dim": {
"desc": "Integer Dimension",
"type": "integer",
"role": {
"type": "dimension",
"position": 10
}
},
"float_dim": {
"desc": "Float Dimension",
"type": "float",
"role": {
"type": "dimension",
"position": 20
}
},
"string_dim": {
"desc": "String Dimension",
"type": "string",
"role": {
"type": "dimension",
"position": 30
}
},
"date_dim": {
"desc": "Date Dimension",
"type": "date",
"dateFormat": "MM/dd/yyyy HH:mm:ss a",
"role": {
"type": "dimension",
"position": 40
}
},
"int_fact": {
"desc": "Integer Fact",
"type": "integer",
"role": {
"type": "fact",
"position": 10,
"algo": {
"name" : "linearRegression",
"flatten": 2,
"polyDegree": 3,
"iterations": 20,
"stepSize": 0.001
}
}
},
"float_fact": {
"desc": "Float Fact",
"type": "float",
"role": {
"type": "fact",
"position": 20,
"algo": {
"name" : "linearRegression",
"flatten": 2,
"polyDegree": 3,
"iterations": 20,
"stepSize": 0.001
}
}
},
"string_fact": {
"desc": "String Fact",
"type": "string",
"role": {
"type": "fact",
"position": 30,
"algo": {
"name" : "naiveBayes",
"flatten": 2,
"polyDegree": 3,
"lambda": 0.001
}
}
},
"date_fact": {
"desc": "Date Fact",
"type": "date",
"dateFormat": "MM/dd/yyyy HH:mm:ss a",
"role": {
"type": "fact",
"position": 40,
"algo": {
"name" : "naiveBayes",
"flatten": 2,
"polyDegree": 3,
"lambda": 0.001
}
}
}
}
\ No newline at end of file
temporal:
name: temporal_field
description: Temporal 1
type: date
dateFormat: MM/dd/yyyy HH:mm:ss a
factIndex: -1
denormFields:
- hour_of_day
- day_of_month
algo:
name: linearRegression
flatten: 2
polyDegree: 3
iterations: 20
stepSize: 0.001
dimensions:
- name: int_dim
description: Integer Dimension
type: integer
position: 10
- name: float_dim
description: Float Dimension
type: float
position: 20
- name: string_dim
description: String Dimension
type: string
position: 30
- name: date_dim
description: Date Dimension
type: date
position: 40
dateFormat: MM/dd/yyyy HH:mm:ss a
facts:
- name: int_fact
description: Integer Fact
type: integer
position: 10
algo:
name: linearRegression
flatten: 2
polyDegree: 3
iterations: 20
stepSize: 0.001
- name: float_fact
description: Float Fact
type: float
position: 20
algo:
name: linearRegression
flatten: 2
polyDegree: 3
iterations: 20
stepSize: 0.001
- name: string_fact
description: String Fact
type: string
position: 30
algo:
name: naiveBayes
flatten: 2
polyDegree: 3
lambda: 0.001
- name: date_fact
description: Date Fact
type: date
dateFormat: MM/dd/yyyy HH:mm:ss a
position: 40
algo:
name: naiveBayes
flatten: 2
polyDegree: 3
lambda: 0.001
\ No newline at end of file
{
"temporal_field": {
"desc": "Temporal 1",
"type": "date",
"dateFormat": "MM/dd/yyyy HH:mm:ss a",
"role": {
"type": "temporal",
"denormFields": ["hour_of_day", "day_of_month"],
"algo": {
"name" : "naiveBayes",
"flatten": 2,
"polyDegree": 3,
"factIndex": -1,
"lambda": 0.001
}
}
},
"string_dim": {
"desc": "String Dimension",
"type": "string",
"role": {
"type": "dimension",
"position": 10
}
},
"int_fact": {
"desc": "Integer Fact",
"type": "integer",
"role": {
"type": "fact",
"position": 10,
"algo": {
"name" : "linearRegression",
"flatten": 2,
"polyDegree": 3,
"iterations": 20,
"stepSize": 0.001
}
}
},
"float_fact": {
"desc": "Float Fact",
"type": "float",
"role": {
"type": "fact",
"position": 20,
"algo": {
"name" : "linearRegression",
"flatten": 2,
"polyDegree": 3,
"iterations": 20,
"stepSize": 0.001
}
}
},
"string_fact": {
"desc": "String Fact",
"type": "string",
"role": {
"type": "fact",
"position": 30,
"algo": {
"name" : "naiveBayes",
"flatten": 2,
"polyDegree": 3,
"lambda": 0.001
}
}
},
"date_fact": {
"desc": "Date Fact",
"type": "date",
"dateFormat": "MM/dd/yyyy HH:mm:ss a",
"role": {
"type": "fact",
"position": 40,
"algo": {
"name" : "naiveBayes",
"flatten": 2,
"polyDegree": 3,
"lambda": 0.001
}
}
}
}
\ No newline at end of file
temporal:
name: temporal_field
description: Temporal 1
type: date
dateFormat: MM/dd/yyyy HH:mm:ss a
factIndex: -1
denormFields:
- hour_of_day
- day_of_month
algo:
name: naiveBayes
flatten: 2
polyDegree: 3
lambda: 0.001
dimensions:
- name: string_dim
description: String Dimension
type: string
position: 10
facts:
- name: int_fact
description: Integer Fact
type: integer
position: 10
algo:
name: linearRegression
flatten: 2
polyDegree: 3
iterations: 20
stepSize: 0.001
- name: float_fact
description: Float Fact
type: float
position: 20
algo:
name: linearRegression
flatten: 2
polyDegree: 3
iterations: 20
stepSize: 0.001
- name: string_fact
description: String Fact
type: string
position: 30
algo:
name: naiveBayes
flatten: 2
polyDegree: 3
lambda: 0.001
- name: date_fact
description: Date Fact
type: date
dateFormat: MM/dd/yyyy HH:mm:ss a
position: 40
algo:
name: naiveBayes
flatten: 2
polyDegree: 3
lambda: 0.001
\ No newline at end of file
{
"string_dim": {
"desc": "String Dimension",
"type": "string",
"role": {
"type": "dimension",
"position": 10
}
},
"int_fact": {
"desc": "Integer Fact",
"type": "integer",
"role": {
"type": "fact",
"position": 10,
"algo": {
"name" : "linearRegression",
"flatten": 2,
"polyDegree": 3,
"iterations": 20,
"stepSize": 0.001
}
}
}
}
\ No newline at end of file
dimensions:
- name: string_dim
description: String Dimension
type: string
position: 10
facts:
- name: int_fact
description: Integer Fact
type: integer
position: 10
algo:
name: linearRegression
flatten: 2
polyDegree: 3
iterations: 20
stepSize: 0.001
{
"temporal_field": {
"desc": "Temporal 1",
"type": "date",
"dateFormat": "MM/dd/yyyy HH:mm:ss a",
"role": {
"type": "temporal",
"denormFields": ["hour_of_day", "day_of_month"],
"algo": {
"name" : "linearRegression",
"flatten": 2,
"polyDegree": 3,
"factIndex": -1,
"iterations": 20,
"stepSize": 0.001
}
}
},
"int_fact": {
"desc": "Integer Fact",
"type": "integer",
"role": {
"type": "fact",
"position": 40,
"machineLearning": {
"flatten": 2,
"polyDegree": 3,
"algo": {
"name" : "linearRegression",
"iterations": 20,
"stepSize": 0.001
}
}
}
}
}
\ No newline at end of file
temporal:
name: temporal_field
description: Temporal 1
type: date
dateFormat: MM/dd/yyyy HH:mm:ss a
factIndex: -1
denormFields:
- hour_of_day
- day_of_month
algo:
name: linearRegression
flatten: 2
polyDegree: 3
iterations: 20
stepSize: 0.001
facts:
- name: int_fact
description: Integer Fact
type: integer
position: 10
algo:
name: linearRegression
flatten: 2
polyDegree: 3
iterations: 20
stepSize: 0.001
{
"string_dim": {
"desc": "String Dimension",
"type": "string",
"role": {
"type": "dimension",
"position": 10
}
},
"temporal_field": {
"desc": "Temporal 1",
"type": "date",
"dateFormat": "MM/dd/yyyy HH:mm:ss a",
"role": {
"type": "temporal",
"denormFields": ["hour_of_day", "day_of_month"],
"algo": {
"name" : "linearRegression",
"flatten": 2,
"polyDegree": 3,
"includeFacts": true,
"iterations": 20,
"stepSize": 0.001
}
}
}
}
\ No newline at end of file
temporal:
name: temporal_field
description: Temporal 1
type: date
dateFormat: MM/dd/yyyy HH:mm:ss a
factIndex: -1
denormFields:
- hour_of_day
- day_of_month
algo: