Commit eca53522 authored by Steven Pisarski's avatar Steven Pisarski

Adding stop and start to actors requiring access to external and...

Adding stop and start to actors requiring access to external and non-serializable resources such as the Quartz scheduler or JMS connection.
parent 31978dd8
package com.cablelabs.eventgen.akka
import java.util.{Date, Properties}
import javax.jms.Session
import javax.jms.{Connection, MessageProducer, Session}
import akka.actor._
import akka.camel.{CamelExtension, Oneway, Producer}
......@@ -12,7 +12,7 @@ import com.typesafe.scalalogging.slf4j.Logger
import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.activemq.transport.stomp.StompConnection
import org.apache.spark.SparkEnv
import org.quartz._
import org.quartz.{Scheduler, _}
import org.quartz.impl.StdSchedulerFactory
import org.slf4j.LoggerFactory
......@@ -169,7 +169,8 @@ case class DimTimeEvent(dimString: String, event: Map[String, Any], time: Date)
* @param toForward - the actors to forward message
*/
class GeneratorActor(val engine: Engine, val toForward: Set[ActorRef]) extends Actor {
private[this] def logger = Logger(LoggerFactory.getLogger("GeneratorActor"))
private[this] val logger = Logger(LoggerFactory.getLogger("GeneratorActor"))
def receive = {
case thisEvent: Event =>
......@@ -177,7 +178,7 @@ class GeneratorActor(val engine: Engine, val toForward: Set[ActorRef]) extends A
val newEvent = engine.nextEvent(thisEvent.values)
val newEventDate = engine.inputDef.temporal.eventValue[Date](newEvent)
if (newEventDate.getTime - System.currentTimeMillis() > 10000)
if (newEventDate.getTime - System.currentTimeMillis() < -10000)
logger.warn(s"Scheduling ${thisEvent.dimString} for $newEventDate ${newEventDate.getTime - System.currentTimeMillis()}ms late")
toForward.foreach(_ ! new DimTimeEvent(thisEvent.dimString, newEvent, newEventDate))
......@@ -199,13 +200,20 @@ class ScheduleAndNotify(val numThreads: Int, val actors: Set[ActorRef], val star
import scala.collection.JavaConversions._
private[this] def logger = Logger(LoggerFactory.getLogger("ScheduleAndNotify"))
private[this] val logger = Logger(LoggerFactory.getLogger("ScheduleAndNotify"))
// Non-serializable member who should be instantiated on startup
var scheduler: Scheduler = null
override def preStart() = {
val props = new Properties()
props.put("org.quartz.threadPool.threadCount", numThreads.toString)
props.put("org.quartz.threadPool.threadPriority", String.valueOf(Thread.MAX_PRIORITY - 1))
val scheduler = new StdSchedulerFactory(props).getScheduler
scheduler = new StdSchedulerFactory(props).getScheduler
scheduler.start()
}
override def postStop() = scheduler.shutdown()
def receive = {
case timeEvent: DimTimeEvent =>
......@@ -225,7 +233,7 @@ import scala.collection.JavaConversions._
val trigger = newTrigger().withIdentity(jobId).startAt(timeEvent.time).endAt(timeEvent.time)
.withSchedule(SimpleScheduleBuilder.repeatMinutelyForTotalCount(1))
if (timeEvent.time.getTime < System.currentTimeMillis())
if (timeEvent.time.getTime - System.currentTimeMillis() < -10000)
logger.warn(s"Scheduling ${timeEvent.dimString} ${timeEvent.time.getTime < System.currentTimeMillis()}ms late")
scheduler.scheduleJob(job.build(), trigger.build())
......@@ -241,7 +249,9 @@ import scala.collection.JavaConversions._
*/
class FormatAndRoute(val outputDefs: Set[OutputDefinition], val actors: Map[String, ActorRef]) extends Actor {
require(actors.size > 0)
private[this] def logger = Logger(LoggerFactory.getLogger("FormatAndRoute"))
private[this] val logger = Logger(LoggerFactory.getLogger("FormatAndRoute"))
def receive = {
case timeEvent: DimTimeEvent =>
outputDefs.foreach(outDef => {
......@@ -281,13 +291,36 @@ object JMSRouteType extends Enumeration {
// TODO - add support for failover and figure out why Camel is not working in a Spark Cluster
class AmqRouter(val protocol: String, val host: String, val port: Int = 61616, val name: String, val routeType: JMSRouteType)
extends Actor {
private[this] def logger = Logger(LoggerFactory.getLogger("AmqRouter"))
private[this] val logger = Logger(LoggerFactory.getLogger("AmqRouter"))
/**
* The AMQ broker URL
*/
val url = s"$protocol://$host:$port"
// JMS objects who should be instantiated on startup
var conn: Connection = null
var session: Session = null
var messageProducer: MessageProducer = null
override def preStart() = {
logger.info(s"Connecting to broker $url")
val conn = new ActiveMQConnectionFactory(url).createConnection
conn = new ActiveMQConnectionFactory(url).createConnection
conn.start()
val session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)
logger.info(s"Started JMS session to $routeType/$name")
messageProducer = routeType match {
case JMSRouteType.TOPIC => session.createProducer(session.createTopic(name))
case JMSRouteType.QUEUE => session.createProducer(session.createQueue(name))
}
}
override def postStop() = {
messageProducer.close()
session.close()
conn.close()
}
override def receive = {
case msg =>
......@@ -295,11 +328,6 @@ class AmqRouter(val protocol: String, val host: String, val port: Int = 61616, v
val txtMsg = session.createTextMessage(msg.toString)
messageProducer.send(txtMsg)
}
private val messageProducer = routeType match {
case JMSRouteType.TOPIC => session.createProducer(session.createTopic(name))
case JMSRouteType.QUEUE => session.createProducer(session.createQueue(name))
}
}
/**
......@@ -312,12 +340,21 @@ class AmqRouter(val protocol: String, val host: String, val port: Int = 61616, v
// TODO - add support for failover and figure out why Camel is not working in a Spark Cluster
class StompRouter(val host: String, val port: Int = 61616, val name: String, val routeType: JMSRouteType)
extends Actor {
private[this] def logger = Logger(LoggerFactory.getLogger("StompRouter"))
private[this] val logger = Logger(LoggerFactory.getLogger("StompRouter"))
// Stomp object that should be instantiated on startup
var conn: StompConnection = null
override def preStart() = {
logger.info(s"Connecting to stomp broker $host:$port")
val conn = new StompConnection()
conn = new StompConnection()
conn.open(host, port)
conn.connect("guest", "guest")
logger.info(s"Opened stomp connection to $routeType/$name")
}
override def postStop() = conn.close()
override def receive = {
case msg =>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment