Commit be14bdff authored by Steven Pisarski's avatar Steven Pisarski

Removed command line arguments and replaced with YAML configuration for starting the engine.

parent 7fd11211
......@@ -28,7 +28,7 @@ class Engine(val inputDef: InputDefinition, val temporalAlgo: Model, val factAlg
/**
* The main actor responsible for calling the engine for retrieving and to schedule the event to be emitted
*/
val generatorActor = GeneratorActors.generateAndSchedule(this, outDefs, sendPastEvents, numSchedulerThreads)
val generatorActor = GeneratorActors.generateAndSchedule(this, outDefs, new Date(), sendPastEvents, numSchedulerThreads)
/**
* Begins the event generation process indefinitely
......
package com.cablelabs.eventgen.akka
import java.util.Date
import java.util.{Date, Properties}
import javax.jms.Session
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
......@@ -50,9 +50,9 @@ object GeneratorActors {
* @param numSchedThreads - number of threads for each actor's quartz scheduler
* @return - the main actor to which to send the seed events
*/
def generateAndSchedule(engine: Engine, outDefs: Set[OutputDefinition], sendPastEvents: Boolean,
def generateAndSchedule(engine: Engine, outDefs: Set[OutputDefinition], startTime: Date, sendPastEvents: Boolean,
numSchedThreads: Int): ActorRef =
generateAndSchedule(actorSystem, engine, outDefs, sendPastEvents, numSchedThreads)
generateAndSchedule(actorSystem, engine, outDefs, startTime, sendPastEvents, numSchedThreads)
/**
* Responsible for creating the necessary actors required for event generation generally when not running in a Spark
......@@ -65,33 +65,34 @@ object GeneratorActors {
* @return - the main actor to which to send the seed events
*/
private[akka] def generateAndSchedule(inActorSystem: ActorSystem, engine: Engine, outDefs: Set[OutputDefinition],
sendPastEvents: Boolean, numSchedThreads: Int): ActorRef = {
startTime: Date, sendPastEvents: Boolean, numSchedThreads: Int): ActorRef = {
val fmtActor = formatAndRoute(inActorSystem, outDefs)
val schedActor = scheduleAndNotify(inActorSystem, numSchedThreads, fmtActor)
generate(inActorSystem, engine, sendPastEvents, schedActor)
val schedActor = scheduleAndNotify(inActorSystem, numSchedThreads, startTime, sendPastEvents, fmtActor)
generateAndSchedule(inActorSystem, engine, schedActor)
}
/**
* The main generator actor
* @param inActorSystem - the actor system to use
* @param engine - used for generating events
* @param sendPastEvents - denotes whether or not to send back-dated events
* @param scheduler - the actor responsible for scheduling the conversion and routing of the event data
* @return - the generator ActorRef
*/
private def generate(inActorSystem: ActorSystem, engine: Engine, sendPastEvents: Boolean, scheduler: ActorRef): ActorRef =
inActorSystem.actorOf(Props(classOf[GeneratorActor], engine, sendPastEvents, Set(scheduler)))
private def generateAndSchedule(inActorSystem: ActorSystem, engine: Engine, scheduler: ActorRef): ActorRef =
inActorSystem.actorOf(Props(classOf[GeneratorActor], engine, Set(scheduler)))
/**
* Returns an ActorRef that is implemented with the QuartzConsumer Akka Actor for scheduling event delivery.
* This actor will also echo the original message back to its sender
* @param inActorSystem - the actor system to use
* @param numThreads - the number of threads to allocate to each actor
* @param sendPastEvents - denotes whether or not to send back-dated events
* @param formatter - the actor responsible for formatting the output and sending it to the router actor
* @return - the scheduler ActorRef
*/
private[eventgen] def scheduleAndNotify(inActorSystem: ActorSystem, numThreads: Int, formatter: ActorRef): ActorRef =
inActorSystem.actorOf(Props(classOf[ScheduleAndNotify], numThreads, Set(formatter), true))
private[eventgen] def scheduleAndNotify(inActorSystem: ActorSystem, numThreads: Int, startTime: Date,
sendPastEvents: Boolean, formatter: ActorRef): ActorRef =
inActorSystem.actorOf(Props(classOf[ScheduleAndNotify], numThreads, Set(formatter), startTime, sendPastEvents, true))
/**
* The actor responsible for formatting the Map[String, Any] to the desired event payload and routing to the endpoints
......@@ -159,21 +160,19 @@ case class DimTimeEvent(dimString: String, event: Map[String, Any], time: Date)
/**
* The main actor responsible for obtaining the next event and scheduling it output
* @param engine - required for generating the next event
* @param sendPastEvents - when true, all generated events from the past will be output
* @param toForward - the actors to forward message
*/
class GeneratorActor(val engine: Engine, val sendPastEvents: Boolean, val toForward: Set[ActorRef]) extends Actor {
class GeneratorActor(val engine: Engine, val toForward: Set[ActorRef]) extends Actor {
private[this] def logger = Logger(LoggerFactory.getLogger("GeneratorActor"))
def receive = {
case thisEvent: Event =>
logger.debug(s"Predicting next event - ${thisEvent.values}")
var newEvent = thisEvent.values
var newEventDate = new Date(0)
do {
newEvent = engine.nextEvent(newEvent)
newEventDate = engine.inputDef.temporal.eventValue[Date](newEvent)
} while (!sendPastEvents && newEventDate.getTime < System.currentTimeMillis())
val newEvent = engine.nextEvent(thisEvent.values)
val newEventDate = engine.inputDef.temporal.eventValue[Date](newEvent)
if (newEventDate.getTime - System.currentTimeMillis() > 10000)
logger.warn(s"Scheduling ${thisEvent.dimString} for $newEventDate ${newEventDate.getTime - System.currentTimeMillis()}ms late")
toForward.foreach(_ ! new DimTimeEvent(thisEvent.dimString, newEvent, newEventDate))
}
......@@ -184,30 +183,49 @@ class GeneratorActor(val engine: Engine, val sendPastEvents: Boolean, val toForw
* the generator actor to seed the next event when echo is true
* @param numThreads - the number of quartz scheduler threads
* @param actors - the downstream actors that can accept the DimTimeEvent
* @param startTime - the time when the generator is starting
* @param sendPastEvents - when true, all events from the past will be output
* @param echo - when true, sender will be notified with the original event
*/
class ScheduleAndNotify(val numThreads: Int, val actors: Set[ActorRef], val echo: Boolean) extends Actor {
class ScheduleAndNotify(val numThreads: Int, val actors: Set[ActorRef], val startTime: Date,
val sendPastEvents: Boolean, val echo: Boolean) extends Actor {
import org.quartz.TriggerBuilder._
import scala.collection.JavaConversions._
val scheduler = new StdSchedulerFactory().getScheduler
private[this] def logger = Logger(LoggerFactory.getLogger("ScheduleAndNotify"))
val props = new Properties()
props.put("org.quartz.threadPool.threadCount", numThreads.toString)
props.put("org.quartz.threadPool.threadPriority", String.valueOf(Thread.MAX_PRIORITY - 1))
val scheduler = new StdSchedulerFactory(props).getScheduler
scheduler.start()
def receive = {
case timeEvent: DimTimeEvent =>
if (sendPastEvents || startTime.getTime <= timeEvent.time.getTime) {
val jobId = timeEvent.dimString + '|' + timeEvent.time.getTime + "|scheduled"
val thisSender = sender
// Closure to send into the Quartz scheduler
val jobData = Map[String, () => Unit]("function" -> (() => {
actors.foreach(_ ! timeEvent)
if (echo) thisSender ! new Event(timeEvent.dimString, timeEvent.event)
}))
val job = JobBuilder.newJob(classOf[ExecuteFunctionJob])
.withIdentity(jobId)
.setJobData(new JobDataMap(mapAsJavaMap(jobData)))
val trigger = newTrigger().withIdentity(jobId).startAt(timeEvent.time).endAt(timeEvent.time)
.withSchedule(SimpleScheduleBuilder.repeatMinutelyForTotalCount(1))
if (timeEvent.time.getTime < System.currentTimeMillis())
logger.warn(s"Scheduling ${timeEvent.dimString} ${timeEvent.time.getTime < System.currentTimeMillis()}ms late")
scheduler.scheduleJob(job.build(), trigger.build())
} else {
if (echo) sender ! new Event(timeEvent.dimString, timeEvent.event)
}
}
}
......@@ -224,6 +242,8 @@ class FormatAndRoute(val outputDefs: Set[OutputDefinition], val actors: Map[Stri
outputDefs.foreach(outDef => {
val out = outDef.convert(timeEvent.event)
logger.debug(s"Sending event - $out")
if (timeEvent.time.getTime - System.currentTimeMillis() > 10000)
logger.warn(s"Formatting ${timeEvent.dimString} ${timeEvent.time.getTime - System.currentTimeMillis()}ms late")
actors.get(outDef.id).get ! out
})
}
......
......@@ -21,7 +21,7 @@ class ScheduleAndNotifyTest extends SparkTestUtils with AkkaTestUtils with Befor
val sleepTime = 1000
val date = new Date(System.currentTimeMillis() + sleepTime)
val observerActor = system.actorOf(Props(classOf[ObservationActor], observer))
val timer = GeneratorActors.scheduleAndNotify(system, 100, observerActor)
val timer = GeneratorActors.scheduleAndNotify(system, 100, new Date(), sendPastEvents = true, observerActor)
val event = new DimTimeEvent("dimString", Map[String, Any]("foo" -> "bar"), date)
timer ! event
Thread.sleep(10)
......@@ -34,7 +34,7 @@ class ScheduleAndNotifyTest extends SparkTestUtils with AkkaTestUtils with Befor
akkaTest("Scheduler should fire two events requested in sequence") {
val observerActor = system.actorOf(Props(classOf[ObservationActor], observer))
val timer = GeneratorActors.scheduleAndNotify(system, 100, observerActor)
val timer = GeneratorActors.scheduleAndNotify(system, 100, new Date(), sendPastEvents = true, observerActor)
val sleepTime = 1000
val date1 = new Date(System.currentTimeMillis() + sleepTime)
......@@ -63,7 +63,7 @@ class ScheduleAndNotifyTest extends SparkTestUtils with AkkaTestUtils with Befor
akkaTest("Scheduler should fire two events requested out of sequence") {
val observerActor = system.actorOf(Props(classOf[ObservationActor], observer))
val timer = GeneratorActors.scheduleAndNotify(system, 100, observerActor)
val timer = GeneratorActors.scheduleAndNotify(system, 100, new Date(), sendPastEvents = true, observerActor)
val sleepTime = 1000
val date1 = new Date(System.currentTimeMillis() + sleepTime)
......
schemaUri: hdfs://bda-hdfs01/tmp/cm/definitions/cm-constant-short.json
outputDefUri: hdfs://bda-hdfs01/tmp/cm/definitions/cm-out.yaml
eventsUri: hdfs://bda-hdfs01/tmp/cm/data
fileDelim: "|"
seedFilters:
- 99:2b:b2:11:4k:k4
- 99:46:8g:b9:10:9z
- 28:zk:f9:22:22:54
- ii:z8:34:20:06:z4
- ii:31:zb:i5:90:98
\ No newline at end of file
schemaUri: hdfs://bda-hdfs01/tmp/cm/definitions/cm-constant-short.json
outputDefUri: hdfs://bda-hdfs01/tmp/cm/definitions/cm-out.yaml
eventsUri: hdfs://bda-hdfs01/tmp/cm/data
fileDelim: "|"
schemaUri: hdfs://bda-hdfs01/tmp/cm/definitions/cm-constant-short.json
outputDefUri: hdfs://bda-hdfs01/tmp/cm/definitions/cm-out.yaml
temporalTrainingSetUri: hdfs://bda-hdfs01/tmp/cm/analysis/ml/temporalTrainingSet.rdd
factTrainingSetUri: hdfs://bda-hdfs01/tmp/cm/analysis/ml/factTrainingSet.rdd
seedEventsUri: hdfs://bda-hdfs01/tmp/cm/analysis/ml/seedEvents.rdd
sendPastEvents: true
useNow: true
numSchedulerThreads: 10000
seedFilters:
- 99:2b:b2:11:4k:k4
- 99:46:8g:b9:10:9z
- 28:zk:f9:22:22:54
- ii:z8:34:20:06:z4
- ii:31:zb:i5:90:98
\ No newline at end of file
schemaUri: hdfs://bda-hdfs01/tmp/cm/definitions/cm-constant-short.json
outputDefUri: hdfs://bda-hdfs01/tmp/cm/definitions/cm-out.yaml
temporalTrainingSetUri: hdfs://bda-hdfs01/tmp/cm/analysis/ml/temporalTrainingSet.rdd
factTrainingSetUri: hdfs://bda-hdfs01/tmp/cm/analysis/ml/factTrainingSet.rdd
seedEventsUri: hdfs://bda-hdfs01/tmp/cm/analysis/ml/seedEvents.rdd
sendPastEvents: true
useNow: true
numSchedulerThreads: 10000
sparkUri: local[8]
appName: CM-Generator-small
schemaUri: testData/cm/definition/cm-constant-short.json
outputDefUri: testData/cm/definition/cm-out-integration.yaml
temporalTrainingSetUri: testData/cm/analysis/temporalTrainingSet.rdd
factTrainingSetUri: testData/cm/analysis/factTrainingSet.rdd
seedEventsUri: testData/cm/analysis/seedEvents.rdd
eventTimeOffset: 123
sendPastEvents: false
useNow: true
numSchedulerThreads: 999
seedFilters:
- 99:2b:b2:11:4k:k4
- 99:46:8g:b9:10:9z
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment