Commit 8181f786 authored by Steven Pisarski's avatar Steven Pisarski

Removed command line arguments and replaced with YAML configuration for...

Removed command line arguments and replaced with YAML configuration for starting the analyzer. Still requires integration testing
parent be14bdff
......@@ -22,7 +22,7 @@ import scala.collection.mutable
/**
* Responsible for starting the Event Generator
*
* spark-submit --class com.cablelabs.eventgen.Generator {path to generator config yaml file or HDFS}
* spark-submit --class com.cablelabs.eventgen.Generator {path to generator config yaml local or HDFS file}
*/
object Generator extends App {
......@@ -323,7 +323,7 @@ class GeneratorYaml(@JsonProperty("schemaUri") _schemaUri: String,
@JsonProperty("eventsUri") val eventsUri: String,
@JsonProperty("fileDelim") _fileDelim: String,
@JsonProperty("sparkUri") val sparkUri: String,
@JsonProperty("appName") val appName: String = "EventGenerator",
@JsonProperty("appName") _appName: String,
@JsonProperty("eventTimeOffset") val eventTimeOffset: Long = 0,
@JsonProperty("useNow") val useNow: Boolean = false,
@JsonProperty("sendPastEvents") val sendPastEvents: Boolean = false,
......@@ -339,6 +339,8 @@ class GeneratorYaml(@JsonProperty("schemaUri") _schemaUri: String,
private val outputDefFs = FileSystem.get(new URI(_outputDefUri), new Configuration())
val outputDefs = EventUtil.outputDefinition(outputDefFs.open(new Path(_outputDefUri)), inputDef)
val appName = if (_appName == null) "EventGenerator" else _appName
require((temporalTrainingSetUri != null && factTrainingSetUri != null && seedEventsUri != null)
|| (eventsUri != null && (_fileDelim != null || _fileDelim != "")))
......
package com.cablelabs.eventgen
import java.io.{ByteArrayInputStream, File, FileInputStream}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
/**
* Tests for the AnalyzeDataYaml class responsible for configuring the event generator
*/
class AnalyzeDataYamlTest extends UnitSpec {
test("Empty YAML") {
val is = new ByteArrayInputStream("".getBytes)
val mapper = new ObjectMapper(new YAMLFactory())
intercept[Exception] {
mapper.readValue(is, classOf[GeneratorYaml])
}
}
test("Standard from file for generator without analysis") {
val is = new FileInputStream(new File("testData/cm/conf/cm-analyzer.yaml"))
val mapper = new ObjectMapper(new YAMLFactory())
val config = mapper.readValue(is, classOf[AnalyzeDataYaml])
assert("local[8]" == config.sparkUri)
assert("CM-Analyzer-small" == config.appName)
assert(config.inputDef != null)
assert(16 == config.inputDef.fieldMap.size)
assert('|' == config.fileDelim)
assert("testData/cm/events/cm_1a.txt" == config.eventsUri)
assert("/tmp/cm/analysis/temporalTrainingSet.rdd" == config.temporalTrainingSetUri)
assert("/tmp/cm/analysis/factTrainingSet.rdd" == config.factTrainingSetUri)
assert("/tmp/cm/analysis/seedEvents.rdd" == config.seedEventsUri)
assert("/tmp/cm/analysis/eventCount.txt" == config.eventCountUri)
assert("/tmp/cm/analysis/dimCount.txt" == config.dimCountUri)
assert("/tmp/cm/analysis/dimEventsCount.txt" == config.dimEventsCountUri)
assert("/tmp/cm/analysis/factValues.txt" == config.factValuesUri)
assert("/tmp/cm/analysis/temporalTrainingMetrics.txt" == config.temporalTrainingMetricsUri)
assert("/tmp/cm/analysis/factTrainingMetrics.txt" == config.factTrainingMetricsUri)
assert(9 == config.numThreads)
}
test("missing schemaUri") {
val yaml =
"""sparkUri: local[8]
appName: CM-Analyzer-small
fileDelim: "|"
eventsUri: testData/cm/events/cm_1a.txt
temporalTrainingSetUri: /tmp/cm/analysis/temporalTrainingSet.rdd
factTrainingSetUri: /tmp/cm/analysis/factTrainingSet.rdd
seedEventsUri: /tmp/cm/analysis/seedEvents.rdd
eventCountUri: /tmp/cm/analysis/eventCount.txt
dimCountUri: /tmp/cm/analysis/dimCount.txt
dimEventsCountUri: /tmp/cm/analysis/dimEventsCount.txt
factValuesUri: /tmp/cm/analysis/factValues.txt
temporalTrainingMetricsUri: /tmp/cm/analysis/temporalTrainingMetrics.txt
factTrainingMetricsUri: /tmp/cm/analysis/factTrainingMetrics.txt
numThreads: 9"""
val is = new ByteArrayInputStream(yaml.getBytes)
val mapper = new ObjectMapper(new YAMLFactory())
intercept[Exception] {
mapper.readValue(is, classOf[GeneratorYaml])
}
}
test("missing eventsUri") {
val yaml =
"""sparkUri: local[8]
appName: CM-Analyzer-small
fileDelim: "|"
schemaUri: testData/cm/definition/cm-constant-short.json
temporalTrainingSetUri: /tmp/cm/analysis/temporalTrainingSet.rdd
factTrainingSetUri: /tmp/cm/analysis/factTrainingSet.rdd
seedEventsUri: /tmp/cm/analysis/seedEvents.rdd
eventCountUri: /tmp/cm/analysis/eventCount.txt
dimCountUri: /tmp/cm/analysis/dimCount.txt
dimEventsCountUri: /tmp/cm/analysis/dimEventsCount.txt
factValuesUri: /tmp/cm/analysis/factValues.txt
temporalTrainingMetricsUri: /tmp/cm/analysis/temporalTrainingMetrics.txt
factTrainingMetricsUri: /tmp/cm/analysis/factTrainingMetrics.txt
numThreads: 9"""
val is = new ByteArrayInputStream(yaml.getBytes)
val mapper = new ObjectMapper(new YAMLFactory())
intercept[Exception] {
mapper.readValue(is, classOf[GeneratorYaml])
}
}
test("minimum settings") {
val yaml =
"""schemaUri: testData/cm/definition/cm-constant-short.json
eventsUri: testData/cm/events/cm_1a.txt"""
val is = new ByteArrayInputStream(yaml.getBytes)
val mapper = new ObjectMapper(new YAMLFactory())
val config = mapper.readValue(is, classOf[AnalyzeDataYaml])
assert(null == config.sparkUri)
assert("DataAnalyzer" == config.appName)
assert(config.inputDef != null)
assert(16 == config.inputDef.fieldMap.size)
assert(',' == config.fileDelim)
assert("testData/cm/events/cm_1a.txt" == config.eventsUri)
assert(null == config.temporalTrainingSetUri)
assert(null == config.factTrainingSetUri)
assert(null == config.seedEventsUri)
assert(null == config.eventCountUri)
assert(null == config.dimCountUri)
assert(null == config.dimEventsCountUri)
assert(null == config.factValuesUri)
assert(null == config.temporalTrainingMetricsUri)
assert(null == config.factTrainingMetricsUri)
assert(5 == config.numThreads)
}
}
......@@ -166,7 +166,7 @@ seedEventsUri: seedEvents.rdd"""
val mapper = new ObjectMapper(new YAMLFactory())
val config = mapper.readValue(is, classOf[GeneratorYaml])
assert(null == config.sparkUri)
assert(null == config.appName)
assert("EventGenerator" == config.appName)
assert("temporalTrainingSet.rdd" == config.temporalTrainingSetUri)
assert("factTrainingSet.rdd" == config.factTrainingSetUri)
assert("seedEvents.rdd" == config.seedEventsUri)
......@@ -238,7 +238,7 @@ fileDelim: "|"
val mapper = new ObjectMapper(new YAMLFactory())
val config = mapper.readValue(is, classOf[GeneratorYaml])
assert(null == config.sparkUri)
assert(null == config.appName)
assert("EventGenerator" == config.appName)
assert(null == config.temporalTrainingSetUri)
assert(null == config.factTrainingSetUri)
assert(null == config.seedEventsUri)
......@@ -267,7 +267,7 @@ eventsUri: events.file
val mapper = new ObjectMapper(new YAMLFactory())
val config = mapper.readValue(is, classOf[GeneratorYaml])
assert(null == config.sparkUri)
assert(null == config.appName)
assert("EventGenerator" == config.appName)
assert(null == config.temporalTrainingSetUri)
assert(null == config.factTrainingSetUri)
assert(null == config.seedEventsUri)
......
sparkUri: local[8]
appName: CM-Analyzer-small
schemaUri: testData/cm/definition/cm-constant-short.json
fileDelim: "|"
eventsUri: testData/cm/events/cm_1a.txt
temporalTrainingSetUri: /tmp/cm/analysis/temporalTrainingSet.rdd
factTrainingSetUri: /tmp/cm/analysis/factTrainingSet.rdd
seedEventsUri: /tmp/cm/analysis/seedEvents.rdd
eventCountUri: /tmp/cm/analysis/eventCount.txt
dimCountUri: /tmp/cm/analysis/dimCount.txt
dimEventsCountUri: /tmp/cm/analysis/dimEventsCount.txt
factValuesUri: /tmp/cm/analysis/factValues.txt
temporalTrainingMetricsUri: /tmp/cm/analysis/temporalTrainingMetrics.txt
factTrainingMetricsUri: /tmp/cm/analysis/factTrainingMetrics.txt
numThreads: 9
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment