Commit ca778471 authored by Steven Pisarski's avatar Steven Pisarski

Refactored some names for clarity, general housekeeping, and addition to tests...

Refactored some names for clarity, general housekeeping, and addition to tests for the ScheduleAndNotify actor
parent cbbe00ac
......@@ -64,8 +64,8 @@ object GeneratorActors {
*/
private[akka] def generateAndSchedule(inActorSystem: ActorSystem, engine: Engine, outDefs: Set[OutputDefinition],
sendPastEvents: Boolean, numSchedThreads: Int): ActorRef = {
val fmtActor = formatter(inActorSystem, outDefs)
val schedActor = scheduler(inActorSystem, numSchedThreads, fmtActor)
val fmtActor = formatAndRoute(inActorSystem, outDefs)
val schedActor = scheduleAndNotify(inActorSystem, numSchedThreads, fmtActor)
generate(inActorSystem, engine, sendPastEvents, schedActor)
}
......@@ -81,28 +81,30 @@ object GeneratorActors {
inActorSystem.actorOf(Props(classOf[GeneratorActor], engine, sendPastEvents, Set(scheduler)))
/**
* Returns an ActorRef that is implemented with the QuartzConsumer Akka Actor for scheduling events emission
* Returns an ActorRef that is implemented with the QuartzConsumer Akka Actor for scheduling event delivery.
* This actor will also echo the original message back to its sender
* @param inActorSystem - the actor system to use
* @param numThreads - the number of threads to allocate to each actor
* @param formatter - the actor responsible for formatting the output and sending it to the router actor
* @return - the scheduler ActorRef
*/
private[eventgen] def scheduler(inActorSystem: ActorSystem, numThreads: Int, formatter: ActorRef): ActorRef =
inActorSystem.actorOf(Props(classOf[QuartzConsumer], numThreads, Set(formatter)))
private[eventgen] def scheduleAndNotify(inActorSystem: ActorSystem, numThreads: Int, formatter: ActorRef): ActorRef =
inActorSystem.actorOf(Props(classOf[ScheduleAndNotify], numThreads, Set(formatter), true))
/**
* The actor responsible for formatting the Map[String, Any] to the desired event payload
* The actor responsible for formatting the Map[String, Any] to the desired event payload and routing to the endpoints
* described in each OutputDefintion
* @param inActorSystem - the actor system to use
* @param outputDefs - the definition for each output format/location
* @return - the formatter ActorRef
*/
private def formatter(inActorSystem: ActorSystem, outputDefs: Set[OutputDefinition]): ActorRef = {
private[eventgen] def formatAndRoute(inActorSystem: ActorSystem, outputDefs: Set[OutputDefinition]): ActorRef = {
val routerMap = new mutable.HashMap[String, ActorRef]()
outputDefs.foreach(outDef => {
if (routerMap.get(outDef.id) == None)
routerMap += outDef.id -> activeMqRouter(outDef.protocol, outDef.host, outDef.port, outDef.name, outDef.routeType)
})
val props = Props(classOf[PayloadFormatter], outputDefs, routerMap.toMap)
val props = Props(classOf[FormatAndRoute], outputDefs, routerMap.toMap)
inActorSystem.actorOf(props)
}
......@@ -115,7 +117,7 @@ object GeneratorActors {
* @param routeType - TOPIC|QUEUE (default TOPIC)
* @return - the actor reference
*/
private def activeMqRouter(protocol: String = "tcp", host: String, port: Int = 61616, name: String,
private[eventgen] def activeMqRouter(protocol: String = "tcp", host: String, port: Int = 61616, name: String,
routeType: JMSRouteType = JMSRouteType.TOPIC): ActorRef = {
val baseUrl = protocol match {
case "vm" => "vm://localhost?broker.persistent=false" // For testing
......@@ -168,12 +170,13 @@ class GeneratorActor(val engine: Engine, val sendPastEvents: Boolean, val toForw
}
/**
* Akka actor who owns a quartz scheduler. It sends messages to the actors member then sends the event back to
* the generator actor to seed the next event
* Akka actor who has a quartz scheduler. It sends messages to the actors member then sends the event back to
* the generator actor to seed the next event when echo is true
* @param numThreads - the number of quartz scheduler threads
* @param actors - the downstream actors that can accept the DimTimeEvent
* @param echo - when true, sender will be notified with the original event
*/
class QuartzConsumer(val numThreads: Int, val actors: Set[ActorRef]) extends Actor {
class ScheduleAndNotify(val numThreads: Int, val actors: Set[ActorRef], val echo: Boolean) extends Actor {
import org.quartz.TriggerBuilder._
import scala.collection.JavaConversions._
......@@ -187,7 +190,7 @@ import scala.collection.JavaConversions._
val thisSender = sender
val jobData = Map[String, () => Unit]("function" -> (() => {
actors.foreach(_ ! timeEvent)
thisSender ! new Event(timeEvent.event)
if (echo) thisSender ! new Event(timeEvent.event)
}))
val job = JobBuilder.newJob(classOf[ExecuteFunctionJob])
.withIdentity(jobId)
......@@ -203,7 +206,7 @@ import scala.collection.JavaConversions._
* @param outputDefs - describes how to convert and where to send the events
* @param actors - the actors to invoke for each given endpoint
*/
class PayloadFormatter(val outputDefs: Set[OutputDefinition], val actors: Map[String, ActorRef]) extends Actor {
class FormatAndRoute(val outputDefs: Set[OutputDefinition], val actors: Map[String, ActorRef]) extends Actor {
require(actors.size > 0)
private[this] def logger = Logger(LoggerFactory.getLogger("PayloadFormatter"))
def receive = {
......@@ -235,7 +238,6 @@ object JMSRouteType extends Enumeration {
* The Quartz job that simply executes the function
*/
private class ExecuteFunctionJob extends Job {
private[this] def logger = Logger(LoggerFactory.getLogger("GeneratorActors Quartz Job"))
override def execute(context: JobExecutionContext): Unit = {
val data = context.getMergedJobDataMap.getWrappedMap
data.get("function").asInstanceOf[() => Unit]()
......
package com.cablelabs.eventgen.akka
import akka.actor.ActorSystem
import java.util.Date
import akka.actor.{Actor, ActorSystem}
import com.cablelabs.eventgen.UnitSpec
import org.scalatest.Tag
import scala.collection.mutable
/**
* Utilitys for running test for Akka actors
*/
......@@ -34,3 +38,21 @@ trait AkkaTestUtils extends UnitSpec {
}
}
/**
* For use in the ObservationActor to intercept the events being emitted from the actor under test.
*/
class Observer {
val events = new mutable.ListBuffer[(Any, Date)]()
def addEvent(event: Any) = events += new Tuple2[Any, Date](event, new Date())
}
/**
* The test actor for intercepting messages sent to it
* @param observer - the observer object
*/
class ObservationActor(val observer: Observer) extends Actor {
override def receive = {
case msg => observer.addEvent(msg)
}
}
package com.cablelabs.eventgen.akka
import java.util.Date
import akka.actor.{Actor, Props}
import com.cablelabs.eventgen.SparkTestUtils
import org.scalatest.BeforeAndAfter
import scala.collection.mutable
/**
* Tests the actor classes contained in the GeneratorActors.scala file
*/
class GeneratorActorsTest extends SparkTestUtils with AkkaTestUtils with BeforeAndAfter {
var observer: Observer = null
before {
observer = new Observer
}
akkaTest("Shceduler should fire one event at the time requested") {
val sleepTime = 3000
val date = new Date(System.currentTimeMillis() + sleepTime)
val observerActor = system.actorOf(Props(classOf[ObservationActor], observer))
val timer = GeneratorActors.scheduler(system, 100, observerActor)
val event = new DimTimeEvent("dimString", Map[String, Any]("foo" -> "bar"), date)
timer ! event
Thread.sleep(10)
assert(0 == observer.events.size)
Thread.sleep(sleepTime + 10)
assert(1 == observer.events.size)
assert(event == observer.events(0)._1)
assert(date.getTime <= observer.events(0)._2.getTime)
}
}
case class Message(value: String)
class Observer {
val events = new mutable.ListBuffer[(Any, Date)]()
def addEvent(event: Any) = events += new Tuple2[Any, Date](event, new Date())
}
class ObservationActor(val observer: Observer) extends Actor {
override def receive = {
case msg => observer.addEvent(msg)
}
}
\ No newline at end of file
package com.cablelabs.eventgen.akka
import java.util.Date
import akka.actor.Props
import com.cablelabs.eventgen.SparkTestUtils
import org.scalatest.BeforeAndAfter
/**
* Tests the actor classes contained in the GeneratorActors.scala file
*/
class ScheduleAndNotifyTest extends SparkTestUtils with AkkaTestUtils with BeforeAndAfter {
var observer: Observer = null
before {
observer = new Observer
}
akkaTest("Scheduler should fire one event at the time requested") {
val sleepTime = 1000
val date = new Date(System.currentTimeMillis() + sleepTime)
val observerActor = system.actorOf(Props(classOf[ObservationActor], observer))
val timer = GeneratorActors.scheduleAndNotify(system, 100, observerActor)
val event = new DimTimeEvent("dimString", Map[String, Any]("foo" -> "bar"), date)
timer ! event
Thread.sleep(10)
assert(0 == observer.events.size)
Thread.sleep(sleepTime + 10)
assert(1 == observer.events.size)
assert(event == observer.events(0)._1)
assert(date.getTime <= observer.events(0)._2.getTime)
}
akkaTest("Scheduler should fire two events requested in sequence") {
val observerActor = system.actorOf(Props(classOf[ObservationActor], observer))
val timer = GeneratorActors.scheduleAndNotify(system, 100, observerActor)
val sleepTime = 1000
val date1 = new Date(System.currentTimeMillis() + sleepTime)
val date2 = new Date(System.currentTimeMillis() + sleepTime * 2)
val event1 = new DimTimeEvent("dimString1", Map[String, Any]("foo" -> "bar1"), date1)
timer ! event1
Thread.sleep(10)
val event2 = new DimTimeEvent("dimString2", Map[String, Any]("foo" -> "bar2"), date2)
timer ! event2
Thread.sleep(10)
assert(0 == observer.events.size)
Thread.sleep(sleepTime + 10)
assert(1 == observer.events.size)
assert(event1 == observer.events(0)._1)
assert(date1.getTime <= observer.events(0)._2.getTime)
Thread.sleep(sleepTime + 10)
assert(2 == observer.events.size)
assert(event2 == observer.events(1)._1)
assert(date2.getTime <= observer.events(1)._2.getTime)
}
akkaTest("Scheduler should fire two events requested out of sequence") {
val observerActor = system.actorOf(Props(classOf[ObservationActor], observer))
val timer = GeneratorActors.scheduleAndNotify(system, 100, observerActor)
val sleepTime = 1000
val date1 = new Date(System.currentTimeMillis() + sleepTime)
val date2 = new Date(System.currentTimeMillis() + sleepTime * 2)
val event1 = new DimTimeEvent("dimString1", Map[String, Any]("foo" -> "bar1"), date2)
timer ! event1
Thread.sleep(10)
val event2 = new DimTimeEvent("dimString2", Map[String, Any]("foo" -> "bar2"), date1)
timer ! event2
Thread.sleep(10)
assert(0 == observer.events.size)
Thread.sleep(sleepTime + 10)
assert(1 == observer.events.size)
assert(event2 == observer.events(0)._1)
assert(date1.getTime <= observer.events(0)._2.getTime)
Thread.sleep(sleepTime + 10)
assert(2 == observer.events.size)
assert(event1 == observer.events(1)._1)
assert(date2.getTime <= observer.events(1)._2.getTime)
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment