Commit 7fd11211 authored by Steven Pisarski's avatar Steven Pisarski

Added the ability to control the seed date to avoid the latency of event...

Added the ability to control the seed date to avoid the latency of event generation as the last known event could be very old relative to today.
parent 61f81814
package com.cablelabs.eventgen
import java.net.URI
import java.util.Date
import com.cablelabs.eventgen.algorithm._
import com.cablelabs.eventgen.analysis.SparkAnalyzer
......@@ -43,6 +44,8 @@ object Generator extends App {
| [-sparkUri] location of spark master (only required when not running via spark-deploy like in your IDE or with and want to run locally - e.g. "local[4]" [OPTIONAL]
| [-appName] the application name to be registered with the Spark cluster (only optionally used in conjunction with -sparkUri)
| QUARTZ SCHEDULING
| [-eventTimeOffset] the number of milliseconds to offset the time
| [-useNow] when true, the generator converts the seed event time to now
| [-sendPastEvents] when true, the generator create events from the past (default false)
| [-numSchedulerThreads] the number of quartz scheduler threads to allocate to each actor (DEFAULT 1000)
|"""
......@@ -62,13 +65,17 @@ object Generator extends App {
val outputDefFs = FileSystem.get(new URI(outputDefUri), new Configuration())
val outputDefs = EventUtil.outputDefinition(outputDefFs.open(new Path(outputDefUri)), inputDef)
val sendPastEvents = {
val eventTimeOffset =
if (opts.get('eventTimeOffset) == None) 0l else opts.get('eventTimeOffset).get.toString.toLong
val useNow =
if (opts.get('useNow) == None) false else opts.get('useNow).get.toString.toBoolean
val sendPastEvents =
if (opts.get('sendPastEvents) == None) false else opts.get('sendPastEvents).get.toString.toBoolean
}
val numSchedulerThreads = {
val numSchedulerThreads =
if (opts.get('numSchedulerThreads) == None) 1000 else opts.get('numSchedulerThreads).get.toString.toInt
}
if (opts.get('temporalTrainingSetUri) != None && opts.get('factTrainingSetUri)
!= None && opts.get('seedEventsUri) != None) {
......@@ -91,7 +98,7 @@ object Generator extends App {
outputDefs = outputDefs, numSchedulerThreads = numSchedulerThreads
)
seedEngineFromCache(sc, eventGen, opts.get('seedEventsUri).get.asInstanceOf[String])
seedEngineFromCache(sc, eventGen, opts.get('seedEventsUri).get.asInstanceOf[String], eventTimeOffset, useNow)
} else {
......@@ -114,7 +121,7 @@ object Generator extends App {
val eventGen = engineFromAnalyzer(analyzer, sendPastEvents = sendPastEvents,
outputDefs = outputDefs, numSchedulerThreads = numSchedulerThreads)
seedEngineFromAnalyzer(analyzer, eventGen)
seedEngineFromAnalyzer(analyzer, eventGen, eventTimeOffset, useNow)
}
} catch {
case e: Exception => throw new RuntimeException(USAGE, e)
......@@ -150,6 +157,10 @@ object Generator extends App {
generatorOptions(theMap ++ Map('seedEventsUri -> value.toString), tail)
case "-sendPastEvents" :: value :: tail =>
generatorOptions(theMap ++ Map('sendPastEvents -> value.toString), tail)
case "-eventTimeOffset" :: value :: tail =>
generatorOptions(theMap ++ Map('eventTimeOffset -> value.toString), tail)
case "-useNow" :: value :: tail =>
generatorOptions(theMap ++ Map('useNow -> value.toString), tail)
case "-numSchedulerThreads" :: value :: tail =>
generatorOptions(theMap ++ Map('numSchedulerThreads -> value.toString), tail)
case _ => theMap
......@@ -304,10 +315,13 @@ object Generator extends App {
* Starts the generator with the last known event from each dimensionality set
* @param analyzer - the analyzer for retrieval of the necessary data from Spark
* @param engine - the event generator's engine
* @param timeOffset - the number of ms for altering the event date
* @param useNow - alter the event date to now
*/
def seedEngineFromAnalyzer(analyzer: SparkAnalyzer, engine: Engine): Unit = {
// TODO - Unit test me
def seedEngineFromAnalyzer(analyzer: SparkAnalyzer, engine: Engine, timeOffset: Long, useNow: Boolean): Unit = {
logger.info("Seeding generator with last know events")
analyzer.lastEventByDim().foreach(entry => engine.seed(entry._1, entry._2))
analyzer.lastEventByDim().foreach(entry => seedEngine(engine, entry._1, entry._2, timeOffset, useNow))
}
/**
......@@ -315,11 +329,31 @@ object Generator extends App {
* @param sc - the SparkContext
* @param engine - the event generator's engine
* @param seedEventsUri - the event file location
* @param timeOffset - the number of ms for altering the event date
* @param useNow - alter the event date to now
*/
// TODO - Test me
def seedEngineFromCache(sc: SparkContext, engine: Engine, seedEventsUri: String): Unit = {
// TODO - Unit test me
def seedEngineFromCache(sc: SparkContext, engine: Engine, seedEventsUri: String, timeOffset: Long,
useNow: Boolean): Unit = {
val seedEvents = sc.objectFile[(String, Map[String, Any])](seedEventsUri).collect()
seedEvents.foreach(entry => engine.seed(entry._1, entry._2))
seedEvents.foreach(entry => seedEngine(engine, entry._1, entry._2, timeOffset, useNow))
logger.info(s"Seeded ${seedEvents.length} events from cache with useNow = $useNow and timeOffset = $timeOffset")
}
/**
* Seeds the generator and alters the temporal field date by the timeOffset
* @param engine - the generator engine
* @param dimString - the dimension string
* @param event - the event
* @param timeOffset - the number of ms for altering the event date
* @param useNow - alter the event date to now
*/
private def seedEngine(engine: Engine, dimString: String, event: Map[String, Any], timeOffset: Long,
useNow: Boolean) = {
val date = engine.inputDef.temporal.eventValue[Date](event)
val newDate = if (useNow) new Date(System.currentTimeMillis() + timeOffset) else new Date(date.getTime + timeOffset)
val newEvent = event.+(engine.inputDef.temporal.name -> newDate)
engine.seed(dimString, newEvent)
}
}
\ No newline at end of file
......@@ -171,7 +171,7 @@ class GeneratorActor(val engine: Engine, val sendPastEvents: Boolean, val toForw
var newEvent = thisEvent.values
var newEventDate = new Date(0)
do {
newEvent = engine.nextEvent(thisEvent.values)
newEvent = engine.nextEvent(newEvent)
newEventDate = engine.inputDef.temporal.eventValue[Date](newEvent)
} while (!sendPastEvents && newEventDate.getTime < System.currentTimeMillis())
......
......@@ -52,6 +52,9 @@ class CamelConsumer(val endpointUri: String, val notifier: Notifier) extends Con
*/
class Notifier {
var list = new mutable.ArrayBuffer[Any]()
def notify(msg: Any) = list += msg
def notify(msg: Any) = {
list += msg
println(msg)
}
}
package com.cablelabs.eventgen.akka
import com.cablelabs.eventgen.{Notifier, TestActors}
import org.apache.spark.SparkContext
/**
* Simple JMS Consumer for tesing purposes.
*/
object JmsConsumer extends App {
new SparkContext("local[4]", "test")
val notifier = new Notifier
TestActors.activeMqConsumer("tcp", "bda-active01", 61616, "CMHealthXML", JMSRouteType.TOPIC, notifier)
Thread.sleep(10000)
}
{
"poll_date": {
"desc": "Date of CM poll",
"type": "date",
"dateFormat": "MM-dd-yyyy-HH:mm:ss",
"role": {
"type": "temporal",
"denormFields": ["day_of_week", "day_of_month", "day_of_year", "hour_of_day"],
"algo": {
"name" : "constant",
"flatten": 2,
"polyDegree": 3,
"factIndex": -1,
"value": 1000
}
}
},
"cmts": {
"desc": "The CMTS name",
"type": "string",
"role": {
"type": "dimension",
"position": 10
}
},
"node": {
"desc": "The Node name",
"type": "string",
"role": {
"type": "dimension",
"position": 20
}
},
"mac": {
"desc": "The MAC address",
"type": "string",
"role": {
"type": "dimension",
"position": 30
}
},
"lat": {
"desc": "The geo latitude",
"type": "float",
"role": {
"type": "dimension",
"position": 40
}
},
"lng": {
"desc": "The geo longitude",
"type": "float",
"role": {
"type": "dimension",
"position": 50
}
},
"downstream_receive_power_num": {
"desc": "fact 1-out",
"type": "float",
"role": {
"type": "fact",
"position": 100,
"algo": {
"name" : "linearRegression",
"flatten": 2,
"polyDegree": 3,
"includeFacts": true,
"iterations": 20,
"stepSize": 0.001
}
}
},
"upstream_transmit_power_num": {
"desc": "fact 2-out",
"type": "float",
"role": {
"type": "fact",
"position": 110,
"algo": {
"name" : "linearRegression",
"flatten": 2,
"polyDegree": 3,
"includeFacts": true,
"iterations": 20,
"stepSize": 0.001
}
}
},
"downstream_snr_rt": {
"desc": "fact 3-out",
"type": "float",
"role": {
"type": "fact",
"position": 130,
"algo": {
"name" : "linearRegression",
"flatten": 2,
"polyDegree": 3,
"includeFacts": true,
"iterations": 20,
"stepSize": 0.001
}
}
},
"t3_timeouts_cnt": {
"desc": "fact 4",
"type": "integer",
"role": {
"type": "fact",
"position": 40,
"algo": {
"name" : "linearRegression",
"flatten": 2,
"polyDegree": 3,
"includeFacts": true,
"iterations": 20,
"stepSize": 0.001
}
}
},
"t4_timeouts_cnt": {
"desc": "fact 4",
"type": "integer",
"role": {
"type": "fact",
"position": 40,
"algo": {
"name" : "linearRegression",
"flatten": 2,
"polyDegree": 3,
"includeFacts": true,
"iterations": 20,
"stepSize": 0.001
}
}
},
"lost_syncs_cnt": {
"desc": "fact 5",
"type": "integer",
"role": {
"type": "fact",
"position": 50,
"algo": {
"name" : "linearRegression",
"flatten": 2,
"polyDegree": 3,
"includeFacts": true,
"iterations": 20,
"stepSize": 0.001
}
}
},
"resets_cnt": {
"desc": "fact 6",
"type": "integer",
"role": {
"type": "fact",
"position": 60,
"algo": {
"name" : "linearRegression",
"flatten": 2,
"polyDegree": 3,
"includeFacts": true,
"iterations": 20,
"stepSize": 0.001
}
}
},
"ds_fec_corrected_cnt": {
"desc": "fact 7",
"type": "integer",
"role": {
"type": "fact",
"position": 70,
"algo": {
"name" : "linearRegression",
"flatten": 2,
"polyDegree": 3,
"includeFacts": true,
"iterations": 20,
"stepSize": 0.001
}
}
},
"ds_fec_uncorrected_cnt": {
"desc": "fact 8",
"type": "integer",
"role": {
"type": "fact",
"position": 80,
"algo": {
"name" : "linearRegression",
"flatten": 2,
"polyDegree": 3,
"includeFacts": true,
"iterations": 20,
"stepSize": 0.001
}
}
},
"ds_fec_unerrored_cnt": {
"desc": "fact 9",
"type": "integer",
"role": {
"type": "fact",
"position": 90,
"algo": {
"name" : "linearRegression",
"flatten": 2,
"polyDegree": 3,
"includeFacts": true,
"iterations": 20,
"stepSize": 0.001
}
}
}
}
\ No newline at end of file
......@@ -4,7 +4,7 @@ outputDefs:
port: 61616
routeType: topic
name: CMHealthXML
format: hdfs://bda-hdfs01/tmp/cm/definitions/amdocs-cm.vm
format: testData/cm/definition/amdocs-cm.vm
fields:
- name: EventTimeStamp
description: temporal field
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment