Commit cbbe00ac authored by Steven Pisarski's avatar Steven Pisarski

Comments

parent b1ac017b
......@@ -41,7 +41,7 @@ object GeneratorActors {
private[eventgen] def camelSystem(inActorSystem: ActorSystem) = CamelExtension(inActorSystem)
/**
* Responsible for creating the necessary actors required for event generation
* Responsible for creating the necessary actors required for event generation when running in a Spark runtime
* @param engine - the generator engine object
* @param outDefs - the output definitions
* @param sendPastEvents - denotes whether or not to send back-dated events
......@@ -51,7 +51,18 @@ object GeneratorActors {
def generateAndSchedule(engine: Engine, outDefs: Set[OutputDefinition], sendPastEvents: Boolean,
numSchedThreads: Int): ActorRef =
generateAndSchedule(actorSystem, engine, outDefs, sendPastEvents, numSchedThreads)
def generateAndSchedule(inActorSystem: ActorSystem, engine: Engine, outDefs: Set[OutputDefinition],
/**
* Responsible for creating the necessary actors required for event generation generally when not running in a Spark
* runtime. Designed primarily for unit testing of the actor system.
* @param inActorSystem - the actor system to use
* @param engine - the generator engine object
* @param outDefs - the output definitions
* @param sendPastEvents - denotes whether or not to send back-dated events
* @param numSchedThreads - number of threads for each actor's quartz scheduler
* @return - the main actor to which to send the seed events
*/
private[akka] def generateAndSchedule(inActorSystem: ActorSystem, engine: Engine, outDefs: Set[OutputDefinition],
sendPastEvents: Boolean, numSchedThreads: Int): ActorRef = {
val fmtActor = formatter(inActorSystem, outDefs)
val schedActor = scheduler(inActorSystem, numSchedThreads, fmtActor)
......@@ -60,6 +71,7 @@ object GeneratorActors {
/**
* The main generator actor
* @param inActorSystem - the actor system to use
* @param engine - used for generating events
* @param sendPastEvents - denotes whether or not to send back-dated events
* @param scheduler - the actor responsible for scheduling the conversion and routing of the event data
......@@ -69,7 +81,8 @@ object GeneratorActors {
inActorSystem.actorOf(Props(classOf[GeneratorActor], engine, sendPastEvents, Set(scheduler)))
/**
* Returns an ActorRef that is implemented with the QuartzConsumer Akka Actor
* Returns an ActorRef that is implemented with the QuartzConsumer Akka Actor for scheduling events emission
* @param inActorSystem - the actor system to use
* @param numThreads - the number of threads to allocate to each actor
* @param formatter - the actor responsible for formatting the output and sending it to the router actor
* @return - the scheduler ActorRef
......@@ -79,6 +92,7 @@ object GeneratorActors {
/**
* The actor responsible for formatting the Map[String, Any] to the desired event payload
* @param inActorSystem - the actor system to use
* @param outputDefs - the definition for each output format/location
* @return - the formatter ActorRef
*/
......
......@@ -19,7 +19,7 @@ class GeneratorActorsTest extends SparkTestUtils with AkkaTestUtils with BeforeA
observer = new Observer
}
akkaTest("QuartzConsumer one event should fire when requested") {
akkaTest("Shceduler should fire one event at the time requested") {
val sleepTime = 3000
val date = new Date(System.currentTimeMillis() + sleepTime)
val observerActor = system.actorOf(Props(classOf[ObservationActor], observer))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment