Commit 41eace3c authored by Steven Pisarski's avatar Steven Pisarski

Housekeeping and code fragments possibly helpful for distributing actors...

Housekeeping and code fragments possibly helpful for distributing actors across the spark cluster workers
parent 8181f786
......@@ -60,49 +60,37 @@ object Generator extends App {
filter(config.seedFilters)
}
val sc =
if (config.sparkUri != null)
new SparkContext(config.sparkUri, config.appName, sparkConf)
else new SparkContext(sparkConf)
if (!config.analyzeFirst) {
val sc = {
if (config.sparkUri != null)
new SparkContext(config.sparkUri, config.appName, sparkConf)
else new SparkContext(sparkConf)
}
// Load from cached data
val eventGen = engineFromCache(sc, config.inputDef, config.temporalTrainingSetUri, config.factTrainingSetUri,
sendPastEvents = config.sendPastEvents, config.outputDefs, config.numSchedulerThreads)
// TODO - add in filter configuration
seedEngineFromCache(sc, eventGen, config.seedEventsUri, config.eventTimeOffset, config.useNow, seedFilter)
} else {
// Load from raw data
val analyzer = {
if (config.sparkUri == null) {
SparkAnalyzer.analyze(config.inputDef, config.eventsUri, config.fileDelim)
} else {
val sc = {
if (config.sparkUri == null)
new SparkContext(config.sparkUri, config.appName, sparkConf)
else new SparkContext(sparkConf)
}
SparkAnalyzer.analyze(sc, config.inputDef, config.eventsUri, config.fileDelim)
}
}
val analyzer =
if (config.sparkUri == null) SparkAnalyzer.analyze(config.inputDef, config.eventsUri, config.fileDelim)
else SparkAnalyzer.analyze(sc, config.inputDef, config.eventsUri, config.fileDelim)
val eventGen = engineFromAnalyzer(analyzer, sendPastEvents = config.sendPastEvents,
outputDefs = config.outputDefs, numSchedulerThreads = config.numSchedulerThreads)
seedEngineFromAnalyzer(analyzer, eventGen, config.eventTimeOffset, config.useNow, seedFilter)
}
seedEngineFromAnalyzer(analyzer, eventGen, config.eventTimeOffset, config.useNow, seedFilter)
}
}
/**
* Trains temporal prediction model to be ready for performing predictions
* @param analyzer - used to retrieve the temporal training set
* @return - the configured prediction model
*/
def buildTemporalModel(analyzer: SparkAnalyzer): Model = {
def buildTemporalModel(analyzer: SparkAnalyzer): Model =
buildTemporalModel(analyzer.inputDef, analyzer.temporalTrainingSet())
}
/**
* Trains the temporal prediction model for event frequency predictions
......@@ -111,7 +99,7 @@ object Generator extends App {
* @return - the configured prediction model
*/
// TODO - test me
def buildTemporalModel(inputDef: InputDefinition, trainingSet: RDD[LabeledPoint]): Model = {
def buildTemporalModel(inputDef: InputDefinition, trainingSet: RDD[LabeledPoint]): Model =
inputDef.temporal.algoDef match {
case definition: SupervisedTraining =>
definition match {
......@@ -132,15 +120,13 @@ object Generator extends App {
case definition: ConstantStringDefinition => new ConstantStringModel(inputDef.temporal, definition.asInstanceOf[String])
}
}
}
/**
* Trains each fact prediction model and returns in priority order to begin performing predictions
* @return - the configured prediction model
*/
def buildFactModels(analyzer: SparkAnalyzer): List[Model] = {
val inputDef = analyzer.inputDef
val input = inputDef.positionalFacts.map(fact => (fact.name, analyzer.factTrainingSet(fact.name)))
val input = analyzer.inputDef.positionalFacts.map(fact => (fact.name, analyzer.factTrainingSet(fact.name)))
buildFactModels(analyzer.inputDef, input)
}
......@@ -151,9 +137,8 @@ object Generator extends App {
* @return - a list of trained models
*/
// TODO - test me especially to ensure that the models are being returned in the proper order
def buildFactModels(inputDef: InputDefinition, trainingSets: List[(String, RDD[(LabeledPoint, Any)])]): List[Model] = {
def buildFactModels(inputDef: InputDefinition, trainingSets: List[(String, RDD[(LabeledPoint, Any)])]): List[Model] =
trainingSets.map(buildFactModel(inputDef, _))
}
/**
* Returns the proper model for the given fact
......@@ -216,14 +201,12 @@ object Generator extends App {
outputDefs, sendPastEvents, numSchedulerThreads)
}
private[this] def getFactTrainingSets(sc: SparkContext, inputDef: InputDefinition, factTrainingSetUri: String): Map[String, RDD[(LabeledPoint, Any)]] = {
val out = new mutable.HashMap[String, RDD[(LabeledPoint, Any)]]()
inputDef.positionalFacts.foreach(f => {
private[this] def getFactTrainingSets(sc: SparkContext, inputDef: InputDefinition,
factTrainingSetUri: String): Map[String, RDD[(LabeledPoint, Any)]] =
inputDef.positionalFacts.map(f => {
val rdd = sc.objectFile[(LabeledPoint, Any)](factTrainingSetUri + '/' + f.name + ".rdd")
out += f.name -> rdd
})
out.toMap
}
f.name -> rdd
}).toMap
/**
* Factory type method that creates a Generator object
......@@ -286,7 +269,6 @@ object Generator extends App {
val newEvent = event.+(engine.inputDef.temporal.name -> newDate)
engine.seed(dimString, newEvent)
}
}
/**
......
......@@ -3,7 +3,7 @@ package com.cablelabs.eventgen.akka
import java.util.{Date, Properties}
import javax.jms.Session
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.actor._
import akka.camel.{CamelExtension, Oneway, Producer}
import com.cablelabs.eventgen.Engine
import com.cablelabs.eventgen.akka.JMSRouteType.JMSRouteType
......@@ -80,6 +80,7 @@ object GeneratorActors {
*/
private def generateAndSchedule(inActorSystem: ActorSystem, engine: Engine, scheduler: ActorRef): ActorRef =
inActorSystem.actorOf(Props(classOf[GeneratorActor], engine, Set(scheduler)))
// .withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka.tcp://sparkMaster@bda-storm01.cablelabs.com:7077")))))
/**
* Returns an ActorRef that is implemented with the QuartzConsumer Akka Actor for scheduling event delivery.
......@@ -93,6 +94,7 @@ object GeneratorActors {
private[eventgen] def scheduleAndNotify(inActorSystem: ActorSystem, numThreads: Int, startTime: Date,
sendPastEvents: Boolean, formatter: ActorRef): ActorRef =
inActorSystem.actorOf(Props(classOf[ScheduleAndNotify], numThreads, Set(formatter), startTime, sendPastEvents, true))
// .withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka.tcp://sparkMaster@bda-storm01.cablelabs.com:7077")))))
/**
* The actor responsible for formatting the Map[String, Any] to the desired event payload and routing to the endpoints
......@@ -108,6 +110,7 @@ object GeneratorActors {
routerMap += outDef.id -> activeMqRouter(outDef.protocol, outDef.host, outDef.port, outDef.name, outDef.routeType)
})
val props = Props(classOf[FormatAndRoute], outputDefs, routerMap.toMap)
// .withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka.tcp://sparkMaster@bda-storm01.cablelabs.com:7077"))))
inActorSystem.actorOf(props)
}
......@@ -123,7 +126,16 @@ object GeneratorActors {
// TODO - Try to make Camel work properly
private[eventgen] def activeMqRouter(protocol: String = "tcp", host: String, port: Int = 61616, name: String,
routeType: JMSRouteType = JMSRouteType.TOPIC): ActorRef = {
/*
protocol match {
case "stomp" => actorSystem.actorOf(Props(classOf[StompRouter], host, port, name, routeType))
// .withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka.tcp://sparkMaster@bda-storm01.cablelabs.com:7077")))))
case _ => actorSystem.actorOf(Props(classOf[AmqRouter], protocol, host, port, name, routeType))
// .withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka.tcp://sparkMaster@bda-storm01.cablelabs.com:7077")))))
}
}
}
/* Fragment for using camel for sending out jms but is not functioning in the integration environment
val baseUrl = protocol match {
case "vm" => "vm://localhost?broker.persistent=false" // For testing
case _ => s"$protocol://$host:$port"
......@@ -136,12 +148,6 @@ object GeneratorActors {
actorSystem.actorOf(Props(classOf[CamelProducer], amqUrl))
*/
protocol match {
case "stomp" => actorSystem.actorOf(Props(classOf[StompRouter], host, port, name, routeType))
case _ => actorSystem.actorOf(Props(classOf[AmqRouter], protocol, host, port, name, routeType))
}
}
}
/**
* Contains a generated event which is being used for pattern matching in some of the actors
......@@ -223,9 +229,8 @@ import scala.collection.JavaConversions._
logger.warn(s"Scheduling ${timeEvent.dimString} ${timeEvent.time.getTime < System.currentTimeMillis()}ms late")
scheduler.scheduleJob(job.build(), trigger.build())
} else {
if (echo) sender ! new Event(timeEvent.dimString, timeEvent.event)
}
} else if (echo)
sender ! new Event(timeEvent.dimString, timeEvent.event)
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment