Commit 43505220 authored by Steven Pisarski's avatar Steven Pisarski

Changed actor creation from standalone actors to an actor hierarchy...

Changed actor creation from standalone actors to an actor hierarchy (instantiating other actors directly from another actor)
parent eca53522
......@@ -2,7 +2,8 @@ package com.cablelabs.eventgen
import java.util.Date
import com.cablelabs.eventgen.akka.{Event, GeneratorActors}
import com.cablelabs.eventgen.akka.GeneratorActors
import com.cablelabs.eventgen.akka.GeneratorActors.Event
import com.cablelabs.eventgen.algorithm.Model
import com.cablelabs.eventgen.model.{InputDefinition, OutputDefinition}
import com.typesafe.scalalogging.slf4j.Logger
......
......@@ -2,7 +2,6 @@ package com.cablelabs.eventgen.model
import java.io.InputStream
import com.cablelabs.eventgen.akka.JMSRouteType
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import org.apache.velocity.VelocityContext
......@@ -180,13 +179,9 @@ object EventUtil {
}
})
val routeType = config.routeType match {
case "queue" => JMSRouteType.QUEUE
case _ => JMSRouteType.TOPIC
}
config.format match {
case "json" =>
out += new OutputDefinition(config.prototcol, config.host, config.port, routeType, config.name,
out += new OutputDefinition(config.prototcol, config.host, config.port, config.routeType, config.name,
fieldSet.toSet, OutputEventFormatters.jsonFormat)
case _ =>
Velocity.init(OutputEventFormatters.velocityProps)
......@@ -194,7 +189,7 @@ object EventUtil {
val template = Velocity.getTemplate(config.format)
val fmt = OutputEventFormatters.velocityFormat(ctx, template)
out += new OutputDefinition(config.prototcol, config.host, config.port, routeType, config.name,
out += new OutputDefinition(config.prototcol, config.host, config.port, config.routeType, config.name,
fieldSet.toSet, fmt)
}
})
......
package com.cablelabs.eventgen.model
import com.cablelabs.eventgen.akka.JMSRouteType.JMSRouteType
/**
* Defines how the generated data will be routed
* @param protocol - the AMQ protocol
......@@ -12,13 +10,13 @@ import com.cablelabs.eventgen.akka.JMSRouteType.JMSRouteType
* @param fields - the fields and how they will be mapped - must have at least two fields (one should always be a date)
* @param formatter - the function responsible for transforming the data to the required format
*/
class OutputDefinition(val protocol: String, val host: String, val port: Int, val routeType: JMSRouteType,
class OutputDefinition(val protocol: String, val host: String, val port: Int, val routeType: String,
val name: String, val fields: Set[OutputField],
private[model] val formatter: Map[String, Any] => Any) extends Serializable {
require(protocol != null && !protocol.isEmpty)
require(host != null && !host.isEmpty)
require(port > 0)
require(routeType != null)
require(routeType != null && (routeType == "topic" || routeType == "queue"))
require(name != null && !name.isEmpty)
require(fields != null && fields.size > 1)
require(formatter != null)
......
......@@ -3,7 +3,6 @@ package com.cablelabs.eventgen
import _root_.akka.actor.Props
import _root_.akka.camel.Consumer
import com.cablelabs.eventgen.akka.GeneratorActors
import com.cablelabs.eventgen.akka.JMSRouteType.JMSRouteType
import org.apache.activemq.camel.component.ActiveMQComponent
import scala.collection.mutable
......@@ -21,8 +20,8 @@ object TestActors {
* @param routeType - TOPIC|QUEUE (default TOPIC)
* @return - the actor reference
*/
def activeMqConsumer(protocol: String = "tcp", host: String, port: Int = 61616, name: String,
routeType: JMSRouteType, notifier: Notifier) = {
def activeMqConsumer(protocol: String = "tcp", host: String, port: Int = 61616, name: String, routeType: String,
notifier: Notifier) = {
val baseUrl = protocol match {
case "vm" =>
"vm://localhost?broker.persistent=false" // For testing
......
package com.cablelabs.eventgen.akka
import java.util.Date
import akka.actor.{Actor, ActorSystem}
import akka.actor.ActorSystem
import com.cablelabs.eventgen.UnitSpec
import org.scalatest.Tag
import scala.collection.mutable
/**
* Utilitys for running test for Akka actors
*/
......@@ -38,21 +34,3 @@ trait AkkaTestUtils extends UnitSpec {
}
}
/**
* For use in the ObservationActor to intercept the events being emitted from the actor under test.
*/
class Observer {
val events = new mutable.ListBuffer[(Any, Date)]()
def addEvent(event: Any) = events += new Tuple2[Any, Date](event, new Date())
}
/**
* The test actor for intercepting messages sent to it
* @param observer - the observer object
*/
class ObservationActor(val observer: Observer) extends Actor {
override def receive = {
case msg => observer.addEvent(msg)
}
}
......@@ -9,6 +9,6 @@ import org.apache.spark.SparkContext
object JmsConsumer extends App {
new SparkContext("local[4]", "test")
val notifier = new Notifier
TestActors.activeMqConsumer("tcp", "bda-active01", 61616, "CMHealthXML", JMSRouteType.TOPIC, notifier)
TestActors.activeMqConsumer("tcp", "bda-active01", 61616, "CMHealthXML", "topic", notifier)
Thread.sleep(10000)
}
package com.cablelabs.eventgen.akka
import java.text.SimpleDateFormat
import java.util.Date
import akka.actor.Props
import com.cablelabs.eventgen.SparkTestUtils
import akka.camel.CamelMessage
import com.cablelabs.eventgen.akka.GeneratorActors.{DimTimeEvent, ScheduleAndNotify}
import com.cablelabs.eventgen.model._
import com.cablelabs.eventgen.{Notifier, SparkTestUtils, TestActors}
import org.apache.spark.SparkEnv
import org.scalatest.BeforeAndAfter
import scala.util.parsing.json.JSON
/**
* Tests the actor classes contained in the GeneratorActors.scala file
*/
class ScheduleAndNotifyTest extends SparkTestUtils with AkkaTestUtils with BeforeAndAfter {
var observer: Observer = null
class ScheduleAndNotifyTest extends SparkTestUtils with BeforeAndAfter {
val dateFmtStr = "MM-dd-yyyy HH:mm:ss"
val dateFormat = new SimpleDateFormat(dateFmtStr)
val od = new OutputDefinition("vm", "localhost", 61616, "topic", "test",
Set[OutputField](
new IntOutput(name = "intOutput", inputField = new IntegerDimension(name = "intDim", position = 4)),
new DateOutput(name = "dateOutput", inputField = new DateTemporal(name = "dateTemporal",
denormFields = List[String](), algoDef = new ConstantIntDefinition(1000), factIndex = 1, dateFmtStr = dateFmtStr),
dateFmtStr = dateFmtStr)),
OutputEventFormatters.jsonFormat)
var notifier: Notifier = null
before {
observer = new Observer
notifier = new Notifier
}
akkaTest("Scheduler should fire one event at the time requested") {
sparkTest("Scheduler should fire one event at the time requested") {
TestActors.activeMqConsumer(od.protocol, od.host, od.port, od.name,
od.routeType, notifier)
val timer = SparkEnv.get.actorSystem.actorOf(Props(classOf[ScheduleAndNotify], 100, new Date(), true, true, Set(od)))
val sleepTime = 1000
val date = new Date(System.currentTimeMillis() + sleepTime)
val observerActor = system.actorOf(Props(classOf[ObservationActor], observer))
val timer = GeneratorActors.scheduleAndNotify(system, 100, new Date(), sendPastEvents = true, observerActor)
val event = new DimTimeEvent("dimString", Map[String, Any]("foo" -> "bar"), date)
val event = new DimTimeEvent("dimString", Map[String, Any]("intDim" -> 99, "dateTemporal" -> date), date)
timer ! event
Thread.sleep(10)
assert(0 == observer.events.size)
Thread.sleep(sleepTime + 10)
assert(1 == observer.events.size)
assert(event == observer.events(0)._1)
assert(date.getTime <= observer.events(0)._2.getTime)
assert(0 == notifier.list.size)
Thread.sleep(sleepTime + 1000)
assert(1 == notifier.list.size)
val outEvent1 = notifier.list(0).asInstanceOf[CamelMessage].body.toString
val eventMap = JSON.parseFull(outEvent1).get.asInstanceOf[Map[String, Any]]
assert(99 == eventMap("intOutput"))
assert(date.toString == dateFormat.parse(eventMap("dateOutput").toString).toString)
}
akkaTest("Scheduler should fire two events requested in sequence") {
val observerActor = system.actorOf(Props(classOf[ObservationActor], observer))
val timer = GeneratorActors.scheduleAndNotify(system, 100, new Date(), sendPastEvents = true, observerActor)
sparkTest("Scheduler should fire two events requested in sequence") {
TestActors.activeMqConsumer(od.protocol, od.host, od.port, od.name,
od.routeType, notifier)
val timer = SparkEnv.get.actorSystem.actorOf(Props(classOf[ScheduleAndNotify], 100, new Date(), true, true, Set(od)))
val sleepTime = 1000
val date1 = new Date(System.currentTimeMillis() + sleepTime)
val date2 = new Date(System.currentTimeMillis() + sleepTime * 2)
val event1 = new DimTimeEvent("dimString1", Map[String, Any]("foo" -> "bar1"), date1)
val event1 = new DimTimeEvent("dimString1", Map[String, Any]("intDim" -> 88, "dateTemporal" -> date1), date1)
timer ! event1
Thread.sleep(10)
val event2 = new DimTimeEvent("dimString2", Map[String, Any]("foo" -> "bar2"), date2)
val event2 = new DimTimeEvent("dimString2", Map[String, Any]("intDim" -> 99, "dateTemporal" -> date2), date2)
timer ! event2
Thread.sleep(10)
assert(0 == observer.events.size)
assert(0 == notifier.list.size)
Thread.sleep(sleepTime + 10)
assert(1 == observer.events.size)
assert(event1 == observer.events(0)._1)
assert(date1.getTime <= observer.events(0)._2.getTime)
assert(1 == notifier.list.size)
val outEvent1 = notifier.list(0).asInstanceOf[CamelMessage].body.toString
val eventMap1 = JSON.parseFull(outEvent1).get.asInstanceOf[Map[String, Any]]
assert(88 == eventMap1("intOutput"))
assert(date1.toString == dateFormat.parse(eventMap1("dateOutput").toString).toString)
Thread.sleep(sleepTime + 10)
assert(2 == observer.events.size)
assert(event2 == observer.events(1)._1)
assert(date2.getTime <= observer.events(1)._2.getTime)
assert(2 == notifier.list.size)
val outEvent2 = notifier.list(1).asInstanceOf[CamelMessage].body.toString
val eventMap2 = JSON.parseFull(outEvent2).get.asInstanceOf[Map[String, Any]]
assert(99 == eventMap2("intOutput"))
assert(date2.toString == dateFormat.parse(eventMap2("dateOutput").toString).toString)
}
akkaTest("Scheduler should fire two events requested out of sequence") {
val observerActor = system.actorOf(Props(classOf[ObservationActor], observer))
val timer = GeneratorActors.scheduleAndNotify(system, 100, new Date(), sendPastEvents = true, observerActor)
sparkTest("Scheduler should fire two events requested out of sequence") {
TestActors.activeMqConsumer(od.protocol, od.host, od.port, od.name,
od.routeType, notifier)
val timer = SparkEnv.get.actorSystem.actorOf(Props(classOf[ScheduleAndNotify], 100, new Date(), true, true, Set(od)))
val sleepTime = 1000
val date1 = new Date(System.currentTimeMillis() + sleepTime)
val date2 = new Date(System.currentTimeMillis() + sleepTime * 2)
val event1 = new DimTimeEvent("dimString1", Map[String, Any]("foo" -> "bar1"), date2)
val event1 = new DimTimeEvent("dimString1", Map[String, Any]("intDim" -> 99, "dateTemporal" -> date2), date2)
timer ! event1
Thread.sleep(10)
val event2 = new DimTimeEvent("dimString2", Map[String, Any]("foo" -> "bar2"), date1)
val event2 = new DimTimeEvent("dimString2", Map[String, Any]("intDim" -> 88, "dateTemporal" -> date1), date1)
timer ! event2
Thread.sleep(10)
assert(0 == observer.events.size)
assert(0 == notifier.list.size)
Thread.sleep(sleepTime + 10)
assert(1 == observer.events.size)
assert(event2 == observer.events(0)._1)
assert(date1.getTime <= observer.events(0)._2.getTime)
assert(1 == notifier.list.size)
val outEvent2 = notifier.list(0).asInstanceOf[CamelMessage].body.toString
val eventMap2 = JSON.parseFull(outEvent2).get.asInstanceOf[Map[String, Any]]
assert(88 == eventMap2("intOutput"))
assert(date1.toString == dateFormat.parse(eventMap2("dateOutput").toString).toString)
Thread.sleep(sleepTime + 10)
assert(2 == observer.events.size)
assert(event1 == observer.events(1)._1)
assert(date2.getTime <= observer.events(1)._2.getTime)
assert(2 == notifier.list.size)
val outEvent1 = notifier.list(1).asInstanceOf[CamelMessage].body.toString
val eventMap1 = JSON.parseFull(outEvent1).get.asInstanceOf[Map[String, Any]]
assert(99 == eventMap1("intOutput"))
assert(date2.toString == dateFormat.parse(eventMap1("dateOutput").toString).toString)
}
}
\ No newline at end of file
......@@ -7,7 +7,7 @@ class StompRouterTest { //extends SparkTestUtils with AkkaTestUtils {
val host = "localhost"
val port = 61613
val name = "testStompEndpoint"
val routeType = JMSRouteType.TOPIC
val routeType = "topic"
/* Do not activate to run with the build as this requires an external AMQ broker
sparkTest("Stomp routing to AMQ") {
......
......@@ -5,7 +5,6 @@ import java.text.SimpleDateFormat
import java.util.Date
import com.cablelabs.eventgen.UnitSpec
import com.cablelabs.eventgen.akka.JMSRouteType
import org.apache.velocity.VelocityContext
import org.apache.velocity.app.Velocity
import org.json4s.StreamInput
......@@ -41,8 +40,8 @@ class OutputDefinitionTest extends UnitSpec {
val protocol = "vm"
val host = "localhost"
val port = 61616
val routeTypeTopic = JMSRouteType.TOPIC
val routeTypeQueue = JMSRouteType.QUEUE
val routeTypeTopic = "topic"
val routeTypeQueue = "queue"
val destName = "test"
event += f1.inputField.name -> 10
event += f2.inputField.name -> 10.25d
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment