Commit d0131c0d authored by Steven Pisarski's avatar Steven Pisarski

Named actors

parent 43505220
......@@ -66,9 +66,10 @@ object GeneratorActors {
private[akka] def generateAndSchedule(inActorSystem: ActorSystem, engine: Engine, outDefs: Set[OutputDefinition],
startTime: Date, sendPastEvents: Boolean, numSchedThreads: Int): ActorRef =
inActorSystem.actorOf(Props(classOf[GeneratorActor], engine, outDefs, startTime, sendPastEvents, numSchedThreads)
.withRouter(new SmallestMailboxRouter(resizer = new DefaultResizer(1, 50))))
// .withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka.tcp://sparkMaster@bda-storm01.cablelabs.com:7077")))))
// .withMailbox("akka.actor.mailbox.unbounded-queue-based")
.withRouter(new SmallestMailboxRouter(resizer = new DefaultResizer(1, 50)))
// .withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka.tcp://sparkMaster@bda-storm01.cablelabs.com:7077"))))
.withMailbox("akka.actor.mailbox.unbounded-queue-based")
, "GeneratorActorHierarchy")
/**
* Contains a generated event which is being used for pattern matching in some of the actors
......@@ -98,7 +99,7 @@ object GeneratorActors {
private[this] val logger = Logger(LoggerFactory.getLogger("GeneratorActor"))
val schedActor = context.actorOf(
Props(classOf[ScheduleAndNotify], numSchedThreads, startTime, sendPastEvents,true, outDefs))
Props(classOf[ScheduleAndNotify], numSchedThreads, startTime, sendPastEvents,true, outDefs), "ScheduleAndNotify")
def receive = {
case thisEvent: Event =>
......@@ -107,7 +108,8 @@ object GeneratorActors {
val newEventDate = engine.inputDef.temporal.eventValue[Date](newEvent)
if (newEventDate.getTime - System.currentTimeMillis() < -10000)
logger.warn(s"Scheduling ${thisEvent.dimString} for $newEventDate ${newEventDate.getTime - System.currentTimeMillis()}ms late")
logger.warn(s"Scheduling ${thisEvent.dimString} for $newEventDate "
+ s"${newEventDate.getTime - System.currentTimeMillis()}ms late")
schedActor ! new DimTimeEvent(thisEvent.dimString, newEvent, newEventDate)
}
......@@ -123,16 +125,15 @@ object GeneratorActors {
* @param outDefs - the output definitions
*/
class ScheduleAndNotify(val numThreads: Int, val startTime: Date,
val sendPastEvents: Boolean, val echo: Boolean, outDefs: Set[OutputDefinition])
extends Actor {
val sendPastEvents: Boolean, val echo: Boolean,
outDefs: Set[OutputDefinition]) extends Actor {
import org.quartz.TriggerBuilder._
import scala.collection.JavaConversions._
private[this] val logger = Logger(LoggerFactory.getLogger("ScheduleAndNotify"))
val formatAndRoute = context.actorOf(Props(classOf[FormatAndRoute], outDefs))
val formatAndRoute = context.actorOf(Props(classOf[FormatAndRoute], outDefs), "FormatAndRoute")
// Non-serializable member who should be instantiated on startup
var scheduler: Scheduler = null
......@@ -186,9 +187,13 @@ import scala.collection.JavaConversions._
val routerMap = outputDefs.map(outDef => {
val actor = outDef.protocol match {
case "stomp" =>
context.actorOf(Props(classOf[StompRouter], outDef.host, outDef.port, outDef.name, outDef.routeType))
context.actorOf(
Props(classOf[StompRouter], outDef.host, outDef.port, outDef.name, outDef.routeType),
s"StompRouter-${outDef.name}")
case _ =>
context.actorOf(Props(classOf[AmqRouter], outDef.protocol, outDef.host, outDef.port, outDef.name, outDef.routeType))
context.actorOf(
Props(classOf[AmqRouter], outDef.protocol, outDef.host, outDef.port, outDef.name, outDef.routeType),
s"AmqRouter-${outDef.name}")
}
outDef.id -> actor
}).toMap
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment