GeneratorActors.scala 15.3 KB
Newer Older
1
package com.cablelabs.eventgen.akka
2

3
import java.util.{Date, Properties}
4
import javax.jms.{Connection, MessageProducer, Session}
5

6
import akka.actor._
7
import akka.camel.{CamelExtension, Oneway, Producer}
8 9
import com.cablelabs.eventgen.Engine
import com.cablelabs.eventgen.akka.JMSRouteType.JMSRouteType
10 11
import com.cablelabs.eventgen.model.OutputDefinition
import com.typesafe.scalalogging.slf4j.Logger
12 13
import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.activemq.transport.stomp.StompConnection
14
import org.apache.spark.SparkEnv
15
import org.quartz.{Scheduler, _}
16
import org.quartz.impl.StdSchedulerFactory
17 18
import org.slf4j.LoggerFactory

19 20
import scala.collection.mutable

21 22 23 24 25 26 27 28 29 30 31
/**
 * Object contains methods required for creating the Akka Actor objects required for event generation
 */
object GeneratorActors {

  /**
   * Returns the Spark actor system
   * @return
   */
  private[eventgen] def actorSystem = SparkEnv.get.actorSystem

32
  /**
33
   * Returns the camel extension for the default actor system
34 35 36
   * @return
   */
  private[eventgen] def camelSystem = CamelExtension(actorSystem)
37

38 39 40 41 42 43 44 45
  /**
   * Returns the camel system for the given actor system
   * @param inActorSystem - the actor system to use
   * @return - the camel system object
   */
  private[eventgen] def camelSystem(inActorSystem: ActorSystem) = CamelExtension(inActorSystem)

  /**
Steven Pisarski's avatar
Steven Pisarski committed
46
   * Responsible for creating the necessary actors required for event generation when running in a Spark runtime
47 48 49 50 51 52
   * @param engine - the generator engine object
   * @param outDefs - the output definitions
   * @param sendPastEvents - denotes whether or not to send back-dated events
   * @param numSchedThreads - number of threads for each actor's quartz scheduler
   * @return - the main actor to which to send the seed events
   */
53
  def generateAndSchedule(engine: Engine, outDefs: Set[OutputDefinition], startTime: Date, sendPastEvents: Boolean,
54
                          numSchedThreads: Int): ActorRef =
55
    generateAndSchedule(actorSystem, engine, outDefs, startTime, sendPastEvents, numSchedThreads)
Steven Pisarski's avatar
Steven Pisarski committed
56 57 58 59 60 61 62 63 64 65 66 67

  /**
   * Responsible for creating the necessary actors required for event generation generally when not running in a Spark
   * runtime. Designed primarily for unit testing of the actor system.
   * @param inActorSystem - the actor system to use
   * @param engine - the generator engine object
   * @param outDefs - the output definitions
   * @param sendPastEvents - denotes whether or not to send back-dated events
   * @param numSchedThreads - number of threads for each actor's quartz scheduler
   * @return - the main actor to which to send the seed events
   */
  private[akka] def generateAndSchedule(inActorSystem: ActorSystem, engine: Engine, outDefs: Set[OutputDefinition],
68
                                        startTime: Date, sendPastEvents: Boolean, numSchedThreads: Int): ActorRef = {
69
    val fmtActor = formatAndRoute(inActorSystem, outDefs)
70 71
    val schedActor = scheduleAndNotify(inActorSystem, numSchedThreads, startTime, sendPastEvents, fmtActor)
    generateAndSchedule(inActorSystem, engine, schedActor)
72 73
  }

74 75
  /**
   * The main generator actor
Steven Pisarski's avatar
Steven Pisarski committed
76
   * @param inActorSystem - the actor system to use
77
   * @param engine - used for generating events
78 79
   * @param scheduler - the actor responsible for scheduling the conversion and routing of the event data
   * @return - the generator ActorRef
80
   */
81 82
  private def generateAndSchedule(inActorSystem: ActorSystem, engine: Engine, scheduler: ActorRef): ActorRef =
    inActorSystem.actorOf(Props(classOf[GeneratorActor], engine, Set(scheduler)))
83
//      .withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka.tcp://sparkMaster@bda-storm01.cablelabs.com:7077")))))
84 85

  /**
86 87
   * Returns an ActorRef that is implemented with the QuartzConsumer Akka Actor for scheduling event delivery.
   * This actor will also echo the original message back to its sender
Steven Pisarski's avatar
Steven Pisarski committed
88
   * @param inActorSystem - the actor system to use
89
   * @param numThreads - the number of threads to allocate to each actor
90
   * @param sendPastEvents - denotes whether or not to send back-dated events
91 92 93
   * @param formatter - the actor responsible for formatting the output and sending it to the router actor
   * @return - the scheduler ActorRef
   */
94 95 96
  private[eventgen] def scheduleAndNotify(inActorSystem: ActorSystem, numThreads: Int, startTime: Date,
                                          sendPastEvents: Boolean, formatter: ActorRef): ActorRef =
    inActorSystem.actorOf(Props(classOf[ScheduleAndNotify], numThreads, Set(formatter), startTime, sendPastEvents, true))
97
//      .withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka.tcp://sparkMaster@bda-storm01.cablelabs.com:7077")))))
98 99

  /**
100 101
   * The actor responsible for formatting the Map[String, Any] to the desired event payload and routing to the endpoints
   * described in each OutputDefintion
Steven Pisarski's avatar
Steven Pisarski committed
102
   * @param inActorSystem - the actor system to use
103
   * @param outputDefs - the definition for each output format/location
104
   * @return - the formatter ActorRef
105
   */
106
  private[eventgen] def formatAndRoute(inActorSystem: ActorSystem, outputDefs: Set[OutputDefinition]): ActorRef = {
107 108 109
    val routerMap = new mutable.HashMap[String, ActorRef]()
    outputDefs.foreach(outDef => {
      if (routerMap.get(outDef.id) == None)
110
        routerMap += outDef.id -> activeMqRouter(outDef.protocol, outDef.host, outDef.port, outDef.name, outDef.routeType)
111
    })
112
    val props = Props(classOf[FormatAndRoute], outputDefs, routerMap.toMap)
113
//      .withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka.tcp://sparkMaster@bda-storm01.cablelabs.com:7077"))))
114
    inActorSystem.actorOf(props)
115 116 117 118
  }

  /**
   * Returns the actor that is responsible for sending events to the topic
119
   * @param protocol - the AMQ protocol (usually tcp|nio
120 121
   * @param host - the AMQ host
   * @param port - the AMQ port (default 61616)
122 123
   * @param name - the topic or queue name
   * @param routeType - TOPIC|QUEUE (default TOPIC)
124 125
   * @return - the actor reference
   */
126
  // TODO - Try to make Camel work properly
127
  private[eventgen] def activeMqRouter(protocol: String = "tcp", host: String, port: Int = 61616, name: String,
128
                     routeType: JMSRouteType = JMSRouteType.TOPIC): ActorRef = {
129 130 131 132 133 134 135 136 137 138
    protocol match {
      case "stomp" => actorSystem.actorOf(Props(classOf[StompRouter], host, port, name, routeType))
//        .withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka.tcp://sparkMaster@bda-storm01.cablelabs.com:7077")))))
      case _ => actorSystem.actorOf(Props(classOf[AmqRouter], protocol, host, port, name, routeType))
//        .withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka.tcp://sparkMaster@bda-storm01.cablelabs.com:7077")))))
    }
  }
}

/* Fragment for using camel for sending out jms but is not functioning in the integration environment
139 140 141 142 143 144
    val baseUrl = protocol match {
      case "vm" => "vm://localhost?broker.persistent=false"  // For testing
      case _ => s"$protocol://$host:$port"
    }
    val amqUrl = s"activemq:$routeType:$name"

145 146
    if (camelSystem.context.getComponent("activemq") == null)
      camelSystem.context.addComponent("activemq", ActiveMQComponent.activeMQComponent(baseUrl))
147
    else camelSystem.context.getComponent("activemq").asInstanceOf[ActiveMQComponent].setBrokerURL(baseUrl)
148 149

    actorSystem.actorOf(Props(classOf[CamelProducer], amqUrl))
150
*/
151

152 153
/**
 * Contains a generated event which is being used for pattern matching in some of the actors
154
 * @param values - the event values
155
 */
156
case class Event(dimString: String, values: Map[String, Any])
157

158 159 160 161 162 163 164 165
/**
 * The event object used by the generator's downstream actors
 * @param dimString - the dimensional set identifier
 * @param event - the event payload
 * @param time - the time the event should be or have been sent
 */
case class DimTimeEvent(dimString: String, event: Map[String, Any], time: Date)

166 167 168
/**
 * The main actor responsible for obtaining the next event and scheduling it output
 * @param engine - required for generating the next event
169
 * @param toForward - the actors to forward message
170
 */
171
class GeneratorActor(val engine: Engine, val toForward: Set[ActorRef]) extends Actor {
172 173

  private[this] val logger = Logger(LoggerFactory.getLogger("GeneratorActor"))
174 175

  def receive = {
176 177
    case thisEvent: Event =>
      logger.debug(s"Predicting next event - ${thisEvent.values}")
178 179 180
      val newEvent = engine.nextEvent(thisEvent.values)
      val newEventDate = engine.inputDef.temporal.eventValue[Date](newEvent)

181
      if (newEventDate.getTime - System.currentTimeMillis() < -10000)
182
        logger.warn(s"Scheduling ${thisEvent.dimString} for $newEventDate ${newEventDate.getTime - System.currentTimeMillis()}ms late")
183

184
      toForward.foreach(_ ! new DimTimeEvent(thisEvent.dimString, newEvent, newEventDate))
185 186 187
  }
}

188
/**
189 190
 * Akka actor who has a quartz scheduler. It sends messages to the actors member then sends the event back to
 * the generator actor to seed the next event when echo is true
191 192
 * @param numThreads - the number of quartz scheduler threads
 * @param actors - the downstream actors that can accept the DimTimeEvent
193 194
 * @param startTime - the time when the generator is starting
 * @param sendPastEvents - when true, all events from the past will be output
195
 * @param echo - when true, sender will be notified with the original event
196
 */
197 198
class ScheduleAndNotify(val numThreads: Int, val actors: Set[ActorRef], val startTime: Date,
                        val sendPastEvents: Boolean, val echo: Boolean) extends Actor {
199 200 201 202
  import org.quartz.TriggerBuilder._

import scala.collection.JavaConversions._

203 204 205 206 207 208 209 210 211
  private[this] val logger = Logger(LoggerFactory.getLogger("ScheduleAndNotify"))

  // Non-serializable member who should be instantiated on startup
  var scheduler: Scheduler = null

  override def preStart() = {
    val props = new Properties()
    props.put("org.quartz.threadPool.threadCount", numThreads.toString)
    props.put("org.quartz.threadPool.threadPriority", String.valueOf(Thread.MAX_PRIORITY - 1))
212

213 214 215 216
    scheduler = new StdSchedulerFactory(props).getScheduler
    scheduler.start()
  }
  override def postStop() = scheduler.shutdown()
217

218
  def receive = {
219
    case timeEvent: DimTimeEvent =>
220
      if (sendPastEvents || startTime.getTime <= timeEvent.time.getTime) {
221 222
      val jobId = timeEvent.dimString + '|' + timeEvent.time.getTime + "|scheduled"
      val thisSender = sender
223 224

      // Closure to send into the Quartz scheduler
225 226
      val jobData = Map[String, () => Unit]("function" -> (() => {
        actors.foreach(_ ! timeEvent)
227
        if (echo) thisSender ! new Event(timeEvent.dimString, timeEvent.event)
228
      }))
229

230 231 232 233 234
      val job = JobBuilder.newJob(classOf[ExecuteFunctionJob])
        .withIdentity(jobId)
        .setJobData(new JobDataMap(mapAsJavaMap(jobData)))
      val trigger = newTrigger().withIdentity(jobId).startAt(timeEvent.time).endAt(timeEvent.time)
        .withSchedule(SimpleScheduleBuilder.repeatMinutelyForTotalCount(1))
235

236
      if (timeEvent.time.getTime - System.currentTimeMillis() < -10000)
237 238
        logger.warn(s"Scheduling ${timeEvent.dimString} ${timeEvent.time.getTime < System.currentTimeMillis()}ms late")

239
      scheduler.scheduleJob(job.build(), trigger.build())
240 241
      } else if (echo)
        sender ! new Event(timeEvent.dimString, timeEvent.event)
242 243 244
  }
}

245 246 247 248 249
/**
 * Converts and routes events to each defined output destination
 * @param outputDefs - describes how to convert and where to send the events
 * @param actors - the actors to invoke for each given endpoint
 */
250
class FormatAndRoute(val outputDefs: Set[OutputDefinition], val actors: Map[String, ActorRef]) extends Actor {
251
  require(actors.size > 0)
252 253 254

  private[this] val logger = Logger(LoggerFactory.getLogger("FormatAndRoute"))

255
  def receive = {
256
    case timeEvent: DimTimeEvent =>
257
      outputDefs.foreach(outDef => {
258
        val out = outDef.convert(timeEvent.event)
259
        logger.debug(s"Sending event - $out")
260 261
        if (timeEvent.time.getTime - System.currentTimeMillis() > 10000)
          logger.warn(s"Formatting ${timeEvent.dimString} ${timeEvent.time.getTime - System.currentTimeMillis()}ms late")
262
        actors.get(outDef.id).get ! out
263 264 265 266
      })
  }
}

267 268 269 270
/**
 * akka-camel producer for use by the event generator to creating output streams
 * @param endpointUri - the camel URI
 */
271
// TODO - Why is this not working in a Spark Cluster. Should be able to remove AmqRouter once that is resolved
272
class CamelProducer(val endpointUri: String) extends Producer with Oneway
273

274 275 276
/**
 * Used for the CamelProducer when it is sending its data out via JMS
 */
277 278 279 280 281
object JMSRouteType extends Enumeration {
  type JMSRouteType = Value
  val TOPIC = Value("topic")
  val QUEUE = Value("queue")
}
282

283 284 285 286 287 288 289 290 291 292 293
/**
 * Responsible for routing events to an AMQ broker
 * @param protocol - the AMQ protocol
 * @param host - the AMQ host
 * @param port - the AMQ port
 * @param routeType - TOPIC|QUEUE
 * @param name - the queue/topic name
 */
// TODO - add support for failover and figure out why Camel is not working in a Spark Cluster
class AmqRouter(val protocol: String, val host: String, val port: Int = 61616, val name: String, val routeType: JMSRouteType)
  extends Actor {
294 295 296 297 298 299

  private[this] val logger = Logger(LoggerFactory.getLogger("AmqRouter"))

  /**
   * The AMQ broker URL
   */
300
  val url = s"$protocol://$host:$port"
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323

  // JMS objects who should be instantiated on startup
  var conn: Connection = null
  var session: Session = null
  var messageProducer: MessageProducer = null

  override def preStart() = {
    logger.info(s"Connecting to broker $url")
    conn = new ActiveMQConnectionFactory(url).createConnection
    conn.start()
    session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)
    logger.info(s"Started JMS session to $routeType/$name")
    messageProducer = routeType match {
      case JMSRouteType.TOPIC => session.createProducer(session.createTopic(name))
      case JMSRouteType.QUEUE => session.createProducer(session.createQueue(name))
    }
  }

  override def postStop() = {
    messageProducer.close()
    session.close()
    conn.close()
  }
324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342

  override def receive = {
    case msg =>
      logger.debug(s"Sending $msg")
      val txtMsg = session.createTextMessage(msg.toString)
      messageProducer.send(txtMsg)
  }
}

/**
 * Responsible for routing events to an AMQ broker
 * @param host - the AMQ host
 * @param port - the AMQ port
 * @param routeType - TOPIC|QUEUE
 * @param name - the queue/topic name
 */
// TODO - add support for failover and figure out why Camel is not working in a Spark Cluster
class StompRouter(val host: String, val port: Int = 61616, val name: String, val routeType: JMSRouteType)
  extends Actor {
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357

  private[this] val logger = Logger(LoggerFactory.getLogger("StompRouter"))

  // Stomp object that should be instantiated on startup
  var conn: StompConnection = null

  override def preStart() = {
    logger.info(s"Connecting to stomp broker $host:$port")
    conn = new StompConnection()
    conn.open(host, port)
    conn.connect("guest", "guest")
    logger.info(s"Opened stomp connection to $routeType/$name")
  }

  override def postStop() = conn.close()
358 359 360 361 362 363 364 365 366

  override def receive = {
    case msg =>
      logger.debug(s"Sending $msg")
      conn.send(s"/$routeType/$name", msg.toString)
      logger.debug(s"Sent $msg")
  }
}

367 368 369 370 371 372 373 374 375
/**
 * The Quartz job that simply executes the function
 */
private class ExecuteFunctionJob extends Job {
  override def execute(context: JobExecutionContext): Unit = {
    val data = context.getMergedJobDataMap.getWrappedMap
    data.get("function").asInstanceOf[() => Unit]()
  }
}