Commit da0dae9f authored by Steven Pisarski's avatar Steven Pisarski

Fixed issues reading in fact training set from HDFS and added optimization...

Fixed issues reading in fact training set from HDFS and added optimization that will significantly reduce the number of times the dimString is being derived.
parent 201d9176
......@@ -23,6 +23,7 @@ dependencies {
'com.typesafe.scala-logging:scala-logging-slf4j_2.10:2.1.2',
'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.4.2', // For YAML
'com.typesafe.akka:akka-camel_2.10:2.2.3',
'org.apache.activemq:activemq-all:5.10.0',
'org.apache.activemq:activemq-camel:5.10.0',
'org.quartz-scheduler:quartz:2.2.1',
'org.apache.velocity:velocity:1.7', // for templates
......
......@@ -32,9 +32,10 @@ class Engine(val inputDef: InputDefinition, val temporalAlgo: Model, val factAlg
/**
* Begins the event generation process indefinitely
* @param dimString - the dimensional set's unique identifier
* @param seedEvent - the data used to predict the next event
*/
def seed(seedEvent: Map[String, Any]): Unit = generatorActor ! new Event(seedEvent)
def seed(dimString: String, seedEvent: Map[String, Any]): Unit = generatorActor ! new Event(dimString, seedEvent)
/**
* Generates the next event with the same dimensionality as the seedEvent
......
......@@ -240,8 +240,8 @@ object Generator extends App {
new NaiveBayesModel(fact, features, labelMap.toMap,
definition.lambda)
case definition: LinearRegressionDefinition =>
new LinearRegressionModel(fact, features, inputDef.temporalAlgoWeights(),
definition.iterations, definition.stepSize)
new LinearRegressionModel(field = fact, trainingSet = features, numIterations = definition.iterations,
stepSize = definition.stepSize)
}
case definition: ConstantDefinition =>
definition match {
......@@ -307,7 +307,7 @@ object Generator extends App {
*/
def seedEngineFromAnalyzer(analyzer: SparkAnalyzer, engine: Engine): Unit = {
logger.info("Seeding generator with last know events")
analyzer.lastEventByDim().foreach(entry => engine.seed(entry._2))
analyzer.lastEventByDim().foreach(entry => engine.seed(entry._1, entry._2))
}
/**
......@@ -318,8 +318,8 @@ object Generator extends App {
*/
// TODO - Test me
def seedEngineFromCache(sc: SparkContext, engine: Engine, seedEventsUri: String): Unit = {
val seedEvents = sc.objectFile[Map[String, Any]](seedEventsUri).collect()
seedEvents.foreach(entry => engine.seed(entry))
val seedEvents = sc.objectFile[(String, Map[String, Any])](seedEventsUri).collect()
seedEvents.foreach(entry => engine.seed(entry._1, entry._2))
}
}
\ No newline at end of file
package com.cablelabs.eventgen.akka
import java.util.Date
import javax.jms.Session
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.camel.{CamelExtension, Oneway, Producer}
......@@ -8,7 +9,8 @@ import com.cablelabs.eventgen.Engine
import com.cablelabs.eventgen.akka.JMSRouteType.JMSRouteType
import com.cablelabs.eventgen.model.OutputDefinition
import com.typesafe.scalalogging.slf4j.Logger
import org.apache.activemq.camel.component.ActiveMQComponent
import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.activemq.transport.stomp.StompConnection
import org.apache.spark.SparkEnv
import org.quartz._
import org.quartz.impl.StdSchedulerFactory
......@@ -117,8 +119,10 @@ object GeneratorActors {
* @param routeType - TOPIC|QUEUE (default TOPIC)
* @return - the actor reference
*/
// TODO - Try to make Camel work properly
private[eventgen] def activeMqRouter(protocol: String = "tcp", host: String, port: Int = 61616, name: String,
routeType: JMSRouteType = JMSRouteType.TOPIC): ActorRef = {
/*
val baseUrl = protocol match {
case "vm" => "vm://localhost?broker.persistent=false" // For testing
case _ => s"$protocol://$host:$port"
......@@ -130,14 +134,19 @@ object GeneratorActors {
else camelSystem.context.getComponent("activemq").asInstanceOf[ActiveMQComponent].setBrokerURL(baseUrl)
actorSystem.actorOf(Props(classOf[CamelProducer], amqUrl))
*/
protocol match {
case "stomp" => actorSystem.actorOf(Props(classOf[StompRouter], host, port, name, routeType))
case _ => actorSystem.actorOf(Props(classOf[AmqRouter], protocol, host, port, name, routeType))
}
}
}
/**
* Contains a generated event which is being used for pattern matching in some of the actors
* @param event - the event
* @param values - the event values
*/
case class Event(event: Map[String, Any])
case class Event(dimString: String, values: Map[String, Any])
/**
* The event object used by the generator's downstream actors
......@@ -157,16 +166,16 @@ class GeneratorActor(val engine: Engine, val sendPastEvents: Boolean, val toForw
private[this] def logger = Logger(LoggerFactory.getLogger("GeneratorActor"))
def receive = {
case Event(event) =>
logger.debug(s"Predicting next event - $event")
var newEvent = event
case thisEvent: Event =>
logger.debug(s"Predicting next event - ${thisEvent.values}")
var newEvent = thisEvent.values
var newEventDate = new Date(0)
do {
newEvent = engine.nextEvent(event)
newEvent = engine.nextEvent(thisEvent.values)
newEventDate = engine.inputDef.temporal.eventValue[Date](newEvent)
} while (!sendPastEvents && newEventDate.getTime < System.currentTimeMillis())
toForward.foreach(_ ! new DimTimeEvent(engine.inputDef.dimString(newEvent), newEvent, newEventDate))
toForward.foreach(_ ! new DimTimeEvent(thisEvent.dimString, newEvent, newEventDate))
}
}
......@@ -191,7 +200,7 @@ import scala.collection.JavaConversions._
val thisSender = sender
val jobData = Map[String, () => Unit]("function" -> (() => {
actors.foreach(_ ! timeEvent)
if (echo) thisSender ! new Event(timeEvent.event)
if (echo) thisSender ! new Event(timeEvent.dimString, timeEvent.event)
}))
val job = JobBuilder.newJob(classOf[ExecuteFunctionJob])
.withIdentity(jobId)
......@@ -209,7 +218,7 @@ import scala.collection.JavaConversions._
*/
class FormatAndRoute(val outputDefs: Set[OutputDefinition], val actors: Map[String, ActorRef]) extends Actor {
require(actors.size > 0)
private[this] def logger = Logger(LoggerFactory.getLogger("PayloadFormatter"))
private[this] def logger = Logger(LoggerFactory.getLogger("FormatAndRoute"))
def receive = {
case timeEvent: DimTimeEvent =>
outputDefs.foreach(outDef => {
......@@ -224,6 +233,7 @@ class FormatAndRoute(val outputDefs: Set[OutputDefinition], val actors: Map[Stri
* akka-camel producer for use by the event generator to creating output streams
* @param endpointUri - the camel URI
*/
// TODO - Why is this not working in a Spark Cluster. Should be able to remove AmqRouter once that is resolved
class CamelProducer(val endpointUri: String) extends Producer with Oneway
/**
......@@ -235,6 +245,63 @@ object JMSRouteType extends Enumeration {
val QUEUE = Value("queue")
}
/**
* Responsible for routing events to an AMQ broker
* @param protocol - the AMQ protocol
* @param host - the AMQ host
* @param port - the AMQ port
* @param routeType - TOPIC|QUEUE
* @param name - the queue/topic name
*/
// TODO - add support for failover and figure out why Camel is not working in a Spark Cluster
class AmqRouter(val protocol: String, val host: String, val port: Int = 61616, val name: String, val routeType: JMSRouteType)
extends Actor {
private[this] def logger = Logger(LoggerFactory.getLogger("AmqRouter"))
val url = s"$protocol://$host:$port"
logger.info(s"Connecting to broker $url")
val conn = new ActiveMQConnectionFactory(url).createConnection
conn.start()
val session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)
logger.info(s"Started JMS session to $routeType/$name")
override def receive = {
case msg =>
logger.debug(s"Sending $msg")
val txtMsg = session.createTextMessage(msg.toString)
messageProducer.send(txtMsg)
}
private val messageProducer = routeType match {
case JMSRouteType.TOPIC => session.createProducer(session.createTopic(name))
case JMSRouteType.QUEUE => session.createProducer(session.createQueue(name))
}
}
/**
* Responsible for routing events to an AMQ broker
* @param host - the AMQ host
* @param port - the AMQ port
* @param routeType - TOPIC|QUEUE
* @param name - the queue/topic name
*/
// TODO - add support for failover and figure out why Camel is not working in a Spark Cluster
class StompRouter(val host: String, val port: Int = 61616, val name: String, val routeType: JMSRouteType)
extends Actor {
private[this] def logger = Logger(LoggerFactory.getLogger("StompRouter"))
logger.info(s"Connecting to stomp broker $host:$port")
val conn = new StompConnection()
conn.open(host, port)
conn.connect("guest", "guest")
logger.info(s"Opened stomp connection to $routeType/$name")
override def receive = {
case msg =>
logger.debug(s"Sending $msg")
conn.send(s"/$routeType/$name", msg.toString)
logger.debug(s"Sent $msg")
}
}
/**
* The Quartz job that simply executes the function
*/
......
......@@ -45,6 +45,12 @@ trait ConstantModel extends Model {
*/
trait SupervisedModel extends Model {
/**
* The number of features this supervised model has
* @return
*/
def featuresSize: Int
/**
* The required for supervised training operations
*/
......@@ -75,6 +81,7 @@ trait ClassificationModel extends SupervisedModel {
def labelKeys: List[Double]
override def predict(features: List[Double]): Any = {
assert(featuresSize == features.size)
val pred = model.predict(new DenseVector(features.toArray))
val key = closestKey(pred, labelKeys.size / 2, 0)
val label = labelMap.get(key)
......@@ -124,6 +131,7 @@ trait RegressionModel extends SupervisedModel {
private[algorithm] def model: SparkRegressionModel
override def predict(features: List[Double]): Any = {
assert(featuresSize == features.size)
field.convert(predictRaw(features))
}
......@@ -140,6 +148,7 @@ trait RegressionModel extends SupervisedModel {
*/
class NaiveBayesModel(override val field: Field, override val trainingSet: RDD[LabeledPoint],
val labelMap: Map[Double, Any], val lambda: Double) extends ClassificationModel {
val featuresSize = trainingSet.first().features.size
private[algorithm] val model = NaiveBayes.train(trainingSet, lambda)
val labelKeys = labelMap.keySet.toList.sortBy(x => x)
}
......@@ -147,16 +156,18 @@ class NaiveBayesModel(override val field: Field, override val trainingSet: RDD[L
/**
* Implementation of a Linear Regression predictive algorithm
* @param trainingSet - contains the labels and associated features used to train the algorithm
* @param weights - the weights for each individual column used during learning - the size must = the training set size
* @param weights - the weights for each individual column used during learning - the size must = the feature set size
* @param numIterations - the number of times gradient descent will run during training
* @param stepSize - the size of each step taken during gradient descent
*/
class LinearRegressionModel(override val field: Field, override val trainingSet: RDD[LabeledPoint], val weights: List[Double] = List[Double](),
val numIterations: Int, val stepSize: Double) extends RegressionModel {
private[algorithm] var model:SparkRegressionModel = null
if (weights.size != trainingSet.count()) {
val featuresSize = trainingSet.first().features.size
if (weights.isEmpty) {
model = LinearRegressionWithSGD.train(trainingSet, numIterations, stepSize, 1.0)
} else {
require(weights.size == featuresSize)
model = LinearRegressionWithSGD.train(trainingSet, numIterations, stepSize, 1.0, new DenseVector(weights.toArray))
}
}
......
......@@ -28,8 +28,9 @@ class EngineTest extends EngineTester {
val seed = analyzer.lastEventByDim().toLocalIterator.next()
val newSeed = collection.mutable.Map(seed._2.toSeq: _*)
newSeed += inputDef.temporal.name -> new Date()
val dimString = inputDef.dimString(newSeed.toMap)
engine.seed(newSeed.toMap)
engine.seed(dimString, newSeed.toMap)
Thread.sleep(5100)
// TODO - add more validation to ensure the output payload is as expected
assert(5 == notifier.list.size)
......
......@@ -2,8 +2,8 @@ package com.cablelabs.eventgen
import _root_.akka.actor.Props
import _root_.akka.camel.Consumer
import com.cablelabs.eventgen.akka.GeneratorActors
import com.cablelabs.eventgen.akka.JMSRouteType.JMSRouteType
import com.cablelabs.eventgen.akka.{GeneratorActors, JMSRouteType}
import org.apache.activemq.camel.component.ActiveMQComponent
import scala.collection.mutable
......@@ -22,7 +22,7 @@ object TestActors {
* @return - the actor reference
*/
def activeMqConsumer(protocol: String = "tcp", host: String, port: Int = 61616, name: String,
routeType: JMSRouteType = JMSRouteType.TOPIC, notifier: Notifier) = {
routeType: JMSRouteType, notifier: Notifier) = {
val baseUrl = protocol match {
case "vm" =>
"vm://localhost?broker.persistent=false" // For testing
......@@ -30,6 +30,7 @@ object TestActors {
}
if (GeneratorActors.camelSystem.context.getComponent("activemq") == null)
GeneratorActors.camelSystem.context.addComponent("activemq", ActiveMQComponent.activeMQComponent(baseUrl))
else GeneratorActors.camelSystem.context.getComponent("activemq").asInstanceOf[ActiveMQComponent].setBrokerURL(baseUrl)
val amqUrl = s"activemq:$routeType:$name"
GeneratorActors.actorSystem.actorOf(Props(classOf[CamelConsumer], amqUrl, notifier))
}
......
package com.cablelabs.eventgen.akka
/**
* Tests for the StompRouter actor.
*/
class StompRouterTest { //extends SparkTestUtils with AkkaTestUtils {
val host = "localhost"
val port = 61613
val name = "testStompEndpoint"
val routeType = JMSRouteType.TOPIC
/* Do not activate to run with the build as this requires an external AMQ broker
sparkTest("Stomp routing to AMQ") {
val notifier = new Notifier
TestActors.activeMqConsumer("tcp", "host", port, name, routeType, notifier)
val stompActor = GeneratorActors.actorSystem.actorOf(Props(classOf[StompRouter], host, port, name, routeType))
stompActor ! "fubar"
}
*/
}
outputDefs:
- protocol: tcp
host: localhost
port: 61616
routeType: topic
name: CMHealthXML
format: testData/cm/definition/amdocs-cm.vm
fields:
- name: EventTimeStamp
description: temporal field
inputFieldName: poll_date
dateFmtStr: MM/dd/yyyy HH:mm:ss
- name: MAC
description: The MAC address
inputFieldName: mac
- name: Region
description: CMTS Region
inputFieldName: cmts
- name: Node
description: the node name
inputFieldName: node
- name: DownstreamReceivePowerNum
description: downstream receive power
inputFieldName: downstream_receive_power_num
- name: UpstreamTransmitPowerNum
description: upstream transmit power
inputFieldName: upstream_transmit_power_num
- name: DownstreamSNRRt
description: downstream SNRRt
inputFieldName: downstream_snr_rt
- protocol: stomp
host: localhost
port: 61613
routeType: topic
name: CMHealthJson
format: json
fields:
- name: EventTimeStamp
description: temporal field
inputFieldName: poll_date
dateFmtStr: MM/dd/yyyy HH:mm:ss
- name: MAC
description: The MAC address
inputFieldName: mac
- name: Region
description: CMTS Region
inputFieldName: cmts
- name: Node
description: the node name
inputFieldName: node
- name: DownstreamReceivePowerNum
description: downstream receive power
inputFieldName: downstream_receive_power_num
- name: UpstreamTransmitPowerNum
description: upstream transmit power
inputFieldName: upstream_transmit_power_num
- name: DownstreamSNRRt
description: downstream SNRRt
inputFieldName: downstream_snr_rt
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment