Commit 767affa8 authored by Steven Pisarski's avatar Steven Pisarski

Added support for output to RabbitMQ.

parent 59d01199
......@@ -27,6 +27,7 @@ dependencies {
'org.apache.activemq:activemq-camel:5.10.0',
'org.quartz-scheduler:quartz:2.2.1',
'org.apache.velocity:velocity:1.7', // for templates
'com.rabbitmq:amqp-client:2.8.1', // for RabbitMQ
)
// Cloudera
......
......@@ -235,8 +235,8 @@ object Generator extends App {
def seedEngineFromAnalyzer(analyzer: SparkAnalyzer, engine: Engine, timeOffset: Long, useNow: Boolean,
filter: (((String, Map[String, Any])) => Boolean)): Unit = {
logger.info("Seeding generator with last know events")
analyzer.lastEventByDim().filter(filter).foreach(
entry => seedEngine(engine, entry._1, entry._2, timeOffset, useNow))
val events = analyzer.lastEventByDim().filter(filter).collect()
events.foreach(entry => seedEngine(engine, entry._1, entry._2, timeOffset, useNow))
}
/**
......
......@@ -9,6 +9,7 @@ import akka.remote.RemoteScope
import akka.routing.{DefaultResizer, SmallestMailboxRouter}
import com.cablelabs.eventgen.Engine
import com.cablelabs.eventgen.model.OutputDefinition
import com.rabbitmq.client.{Channel, ConnectionFactory}
import com.typesafe.scalalogging.slf4j.Logger
import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.activemq.transport.stomp.StompConnection
......@@ -112,7 +113,11 @@ case class DimTimeEvent(dimString: String, event: Map[String, Any], time: Date)
class SeedActor(val actor: ActorRef) extends Actor {
private[this] val logger = Logger(LoggerFactory.getLogger("SeedActor"))
private[this] var logger: Logger = _
override def preStart() = {
logger = Logger(LoggerFactory.getLogger("SeedActor"))
}
def receive = {
case thisEvent: Event =>
......@@ -133,11 +138,15 @@ class SeedActor(val actor: ActorRef) extends Actor {
class GeneratorActor(val engine: Engine, outDefs: Set[OutputDefinition],
startTime: Date, sendPastEvents: Boolean, numSchedThreads: Int) extends Actor {
private[this] val logger = Logger(LoggerFactory.getLogger("GeneratorActor"))
private[this] var logger: Logger = _
val schedActor = context.actorOf(
Props(classOf[ScheduleAndNotify], numSchedThreads, startTime, sendPastEvents,true, outDefs), "ScheduleAndNotify")
override def preStart() = {
logger = Logger(LoggerFactory.getLogger("GeneratorActor"))
}
def receive = {
case thisEvent: Event =>
logger.debug(s"Predicting next event - ${thisEvent.values}")
......@@ -169,7 +178,7 @@ class ScheduleAndNotify(val numThreads: Int, val startTime: Date,
import scala.collection.JavaConversions._
private[this] val logger = Logger(LoggerFactory.getLogger("ScheduleAndNotify"))
private[this] var logger: Logger = _
val formatAndRoute = context.actorOf(Props(classOf[FormatAndRoute], outDefs), "FormatAndRoute")
......@@ -177,6 +186,7 @@ import scala.collection.JavaConversions._
var scheduler: Scheduler = _
override def preStart() = {
logger = Logger(LoggerFactory.getLogger("ScheduleAndNotify"))
if (scheduler == null) {
logger.info(s"Starting Quartz for scheduling events")
val props = new Properties()
......@@ -225,7 +235,11 @@ import scala.collection.JavaConversions._
*/
class FormatAndRoute(val outputDefs: Set[OutputDefinition]) extends Actor {
private[this] val logger = Logger(LoggerFactory.getLogger("FormatAndRoute"))
private[this] var logger: Logger = _
override def preStart() = {
logger = Logger(LoggerFactory.getLogger("FormatAndRoute"))
}
val routerMap = outputDefs.map(outDef => {
val actor = outDef.protocol match {
......@@ -233,6 +247,10 @@ class FormatAndRoute(val outputDefs: Set[OutputDefinition]) extends Actor {
context.actorOf(
Props(classOf[StompRouter], outDef.host, outDef.port, outDef.name, outDef.routeType),
s"StompRouter-${outDef.name}")
case "rabbitMq" =>
context.actorOf(
Props(classOf[RabbitMqRouter], outDef.host, outDef.user, outDef.pass, outDef.name),
s"RabbitMqRouter-${outDef.name}")
case _ =>
context.actorOf(
Props(classOf[AmqRouter], outDef.protocol, outDef.host, outDef.port, outDef.name, outDef.routeType),
......@@ -273,7 +291,7 @@ class CamelProducer(val endpointUri: String) extends Producer with Oneway
class AmqRouter(val protocol: String, val host: String, val port: Int = 61616, val name: String, val routeType: String)
extends Actor {
private[this] val logger = Logger(LoggerFactory.getLogger("AmqRouter"))
private[this] var logger: Logger = _
/**
* The AMQ broker URL
......@@ -286,6 +304,7 @@ class AmqRouter(val protocol: String, val host: String, val port: Int = 61616, v
var messageProducer: MessageProducer = _
override def preStart() = {
logger = Logger(LoggerFactory.getLogger("AmqRouter"))
logger.info(s"Connecting to broker $url")
conn = new ActiveMQConnectionFactory(url).createConnection
conn.start()
......@@ -321,19 +340,22 @@ class AmqRouter(val protocol: String, val host: String, val port: Int = 61616, v
* @param name - the queue/topic name
*/
// TODO - add support for failover and figure out why Camel is not working in a Spark Cluster
class StompRouter(val host: String, val port: Int = 61616, val name: String, val routeType: String)
// TODO - make sure stomp routing requires configuration of user and pass so we can take out the default values
class StompRouter(val host: String, val port: Int = 61616, val name: String, val routeType: String,
val user: String = "guest", val pass: String = "guest")
extends Actor {
private[this] val logger = Logger(LoggerFactory.getLogger("StompRouter"))
private[this] var logger: Logger = _
// Stomp object that should be instantiated on startup
var conn: StompConnection = _
override def preStart() = {
logger = Logger(LoggerFactory.getLogger("StompRouter"))
logger.info(s"Connecting to stomp broker $host:$port")
conn = new StompConnection()
conn.open(host, port)
conn.connect("guest", "guest")
conn.connect(user, pass)
logger.info(s"Opened stomp connection to $routeType/$name")
}
......@@ -347,6 +369,46 @@ class StompRouter(val host: String, val port: Int = 61616, val name: String, val
}
}
/**
* Responsible for routing events to a RabbitMQ broker
* @param host - the RabbitMQ host
* @param user - the RabbitMQ user
* @param pass - the RabbitMQ password
* @param name - the queue name
*/
class RabbitMqRouter(val host: String, val user: String, val pass: String, val name: String)
extends Actor {
require(host != null && host.nonEmpty)
require(user != null && user.nonEmpty)
require(pass != null && pass.nonEmpty)
require(name != null && name.nonEmpty)
private[this] var logger: Logger = _
// Stomp object that should be instantiated on startup
var channel: Channel = _
override def preStart() = {
logger = Logger(LoggerFactory.getLogger("RabbitMqRouter"))
val factory = new ConnectionFactory()
val uri = s"amqp://$user:$pass@$host/$user"
factory.setUri(uri)
val connection = factory.newConnection()
channel = connection.createChannel()
channel.queueDeclare(name, false, false, false, null)
logger.info(s"Opened RabbitMQ connection to URI - $uri and queue $name")
}
override def postStop() = channel.close()
override def receive = {
case msg =>
logger.debug(s"Sending $msg")
channel.basicPublish("", "hello", null, msg.toString.getBytes)
logger.debug(s"Sent $msg")
}
}
/**
* The Quartz job that simply executes the function
*/
......
......@@ -36,16 +36,16 @@ object OutputDefinition {
config.format match {
case "json" =>
new OutputDefinition(config.prototcol, config.host, config.port, config.routeType, config.name,
fieldSet.toSet, OutputEventFormatters.jsonFormat)
new OutputDefinition(config.protocol, config.host, config.port, config.user, config.pass, config.routeType,
config.name, fieldSet.toSet, OutputEventFormatters.jsonFormat)
case _ =>
Velocity.init(OutputEventFormatters.velocityProps)
val ctx = new VelocityContext()
val template = Velocity.getTemplate(config.format)
val fmt = OutputEventFormatters.velocityFormat(ctx, template)
new OutputDefinition(config.prototcol, config.host, config.port, config.routeType, config.name,
fieldSet.toSet, fmt)
new OutputDefinition(config.protocol, config.host, config.port, config.user, config.pass, config.routeType,
config.name, fieldSet.toSet, fmt)
}
}).toSet
}
......@@ -61,14 +61,21 @@ object OutputDefinition {
* @param fields - the fields and how they will be mapped - must have at least two fields (one should always be a date)
* @param formatter - the function responsible for transforming the data to the required format
*/
class OutputDefinition(val protocol: String, val host: String, val port: Int, val routeType: String,
val name: String, val fields: Set[OutputField],
class OutputDefinition(val protocol: String, val host: String, val port: Int, val user: String, val pass: String,
val routeType: String, val name: String, val fields: Set[OutputField],
private[model] val formatter: Map[String, Any] => Any) extends Serializable {
require(protocol != null && !protocol.isEmpty)
require(host != null && !host.isEmpty)
require(port > 0)
require(routeType != null && (routeType == "topic" || routeType == "queue"))
require(name != null && !name.isEmpty)
require(protocol != null && protocol.nonEmpty)
require(host != null && host.nonEmpty)
require(
if (protocol != null && (protocol != "stomp" && protocol != "rabbitMq"))
routeType != null && (routeType == "topic" || routeType == "queue")
else true)
require(
if (protocol != "stomp" && protocol != "rabbitMq")
port > 0
else true)
require(name != null && name.nonEmpty)
require(fields != null && fields.size > 1)
require(formatter != null)
......@@ -120,17 +127,25 @@ class OutputDefsYaml(@JsonProperty("outputDefs") jDefs: java.util.List[OutputDef
val definitions = jDefs.asScala.toList
}
class OutputDefinitionYaml(@JsonProperty("protocol") val prototcol: String,
class OutputDefinitionYaml(@JsonProperty("protocol") val protocol: String,
@JsonProperty("host") val host: String,
@JsonProperty("port") val port: Int,
@JsonProperty("user") val user: String,
@JsonProperty("pass") val pass: String,
@JsonProperty("routeType") val routeType: String,
@JsonProperty("name") val name: String,
@JsonProperty("format") val format: String,
@JsonProperty("fields") jFields: java.util.List[OutputFieldYaml]) {
require(prototcol != null)
require(host != null)
require(port > 0)
require(routeType != null && (routeType == "topic" || routeType == "queue"))
require(protocol != null && protocol.nonEmpty)
require(host != null && host.nonEmpty)
require(
if (protocol != null && (protocol != "stomp" && protocol != "rabbitMq"))
routeType != null && (routeType == "topic" || routeType == "queue")
else true)
require(
if (protocol != "stomp" && protocol != "rabbitMq")
port > 0
else true)
require(name != null)
require(format != null)
require(jFields != null && jFields.size() > 1)
......@@ -141,6 +156,6 @@ class OutputFieldYaml(@JsonProperty("name") val name: String,
@JsonProperty("description") val description: String = "",
@JsonProperty("inputFieldName") val inputFieldName: String,
@JsonProperty("dateFmtStr") val dateFmtStr: String) {
require(name != null)
require(inputFieldName != null)
require(name != null && name.nonEmpty)
require(inputFieldName != null && inputFieldName.nonEmpty)
}
......@@ -22,7 +22,7 @@ trait AkkaTestUtils extends UnitSpec {
*/
def akkaTest(name: String)(body: => Unit) {
test(name, AkkaTest){
system = ActorSystem("my-quartz-system")
system = ActorSystem("test-system")
try {
body
}
......
......@@ -19,7 +19,7 @@ class ScheduleAndNotifyTest extends SparkTestUtils with BeforeAndAfter {
val dateFmtStr = "MM-dd-yyyy HH:mm:ss"
val dateFormat = new SimpleDateFormat(dateFmtStr)
val od = new OutputDefinition("vm", "localhost", 61616, "topic", "test",
val od = new OutputDefinition("vm", "localhost", 61616, "admin", "admin", "topic", "test",
Set[OutputField](
new IntOutput(name = "intOutput", inputField = new IntDimension(name = "intDim", position = 4)),
new DateOutput(name = "dateOutput", inputField = new DateTemporal(name = "dateTemporal",
......
......@@ -51,57 +51,57 @@ class OutputDefinitionTest extends UnitSpec {
test("OutputDefinition constructor should throw an IllegalArgumentException parameters are not correct") {
an [IllegalArgumentException] should be thrownBy {
new OutputDefinition(null, null, 0, null, null, null, null)
new OutputDefinition(null, null, 0, null, null, null, null, null, null)
}
an [IllegalArgumentException] should be thrownBy {
new OutputDefinition(null, "test", 1, routeTypeTopic, "test", fields, OutputEventFormatters.jsonFormat)
new OutputDefinition(null, "test", 1, null, null, routeTypeTopic, "test", fields, OutputEventFormatters.jsonFormat)
}
an [IllegalArgumentException] should be thrownBy {
new OutputDefinition("", "test", 1, routeTypeQueue, "test", fields, OutputEventFormatters.jsonFormat)
new OutputDefinition("", "test", 1, null, null, routeTypeQueue, "test", fields, OutputEventFormatters.jsonFormat)
}
an [IllegalArgumentException] should be thrownBy {
new OutputDefinition("tcp", null, 1, routeTypeQueue, "test", fields, OutputEventFormatters.jsonFormat)
new OutputDefinition("tcp", null, 1, null, null, routeTypeQueue, "test", fields, OutputEventFormatters.jsonFormat)
}
an [IllegalArgumentException] should be thrownBy {
new OutputDefinition("nio", "", 1, routeTypeTopic, "test", fields, OutputEventFormatters.jsonFormat)
new OutputDefinition("nio", "", 1, null, null, routeTypeTopic, "test", fields, OutputEventFormatters.jsonFormat)
}
an [IllegalArgumentException] should be thrownBy {
new OutputDefinition("vm", "localhost", 0, routeTypeQueue, "test", fields, OutputEventFormatters.jsonFormat)
new OutputDefinition("vm", "localhost", 0, null, null, routeTypeQueue, "test", fields, OutputEventFormatters.jsonFormat)
}
an [IllegalArgumentException] should be thrownBy {
new OutputDefinition("vm", "localhost", 1, null, "test", fields, OutputEventFormatters.jsonFormat)
new OutputDefinition("vm", "localhost", 1, null, null, null, "test", fields, OutputEventFormatters.jsonFormat)
}
an [IllegalArgumentException] should be thrownBy {
new OutputDefinition("tcp", "localhost", 1000, routeTypeTopic, null, fields, OutputEventFormatters.jsonFormat)
new OutputDefinition("tcp", "localhost", 1000, null, null, routeTypeTopic, null, fields, OutputEventFormatters.jsonFormat)
}
an [IllegalArgumentException] should be thrownBy {
new OutputDefinition("tcp", "localhost", 1000, routeTypeQueue, "", fields, OutputEventFormatters.jsonFormat)
new OutputDefinition("tcp", "localhost", 1000, null, null, routeTypeQueue, "", fields, OutputEventFormatters.jsonFormat)
}
an [IllegalArgumentException] should be thrownBy {
new OutputDefinition("foo", "localhost", 1000, routeTypeTopic, "test", null, OutputEventFormatters.jsonFormat)
new OutputDefinition("foo", "localhost", 1000, null, null, routeTypeTopic, "test", null, OutputEventFormatters.jsonFormat)
}
an [IllegalArgumentException] should be thrownBy {
new OutputDefinition("foo", "localhost", 1000, routeTypeQueue, "test", Set[OutputField](), OutputEventFormatters.jsonFormat)
new OutputDefinition("foo", "localhost", 1000, null, null, routeTypeQueue, "test", Set[OutputField](), OutputEventFormatters.jsonFormat)
}
an [IllegalArgumentException] should be thrownBy {
new OutputDefinition("bar", "localhost", 1000, routeTypeTopic, "test", fields, null)
new OutputDefinition("bar", "localhost", 1000, null, null, routeTypeTopic, "test", fields, null)
}
an [IllegalArgumentException] should be thrownBy {
// Input has two identical input keys
new OutputDefinition("tcp", "localhost", 1000, routeTypeQueue, "test", badFields1, OutputEventFormatters.jsonFormat)
new OutputDefinition("tcp", "localhost", 1000, null, null, routeTypeQueue, "test", badFields1, OutputEventFormatters.jsonFormat)
}
an [IllegalArgumentException] should be thrownBy {
// Input has two identical output keys
new OutputDefinition("tcp", "localhost", 1000, routeTypeTopic, "test", badFields2, OutputEventFormatters.jsonFormat)
new OutputDefinition("tcp", "localhost", 1000, null, null, routeTypeTopic, "test", badFields2, OutputEventFormatters.jsonFormat)
}
an [IllegalArgumentException] should be thrownBy {
// Input has two identical output keys
new OutputDefinition("tcp", "localhost", 1000, routeTypeQueue, "test", badFields3, OutputEventFormatters.jsonFormat)
new OutputDefinition("tcp", "localhost", 1000, null, null, routeTypeQueue, "test", badFields3, OutputEventFormatters.jsonFormat)
}
}
test("OutputDefinition constructor should create an object") {
val od = new OutputDefinition(protocol, host, port, routeTypeTopic, destName, fields, OutputEventFormatters.jsonFormat)
val od = new OutputDefinition(protocol, host, port, null, null, routeTypeTopic, destName, fields, OutputEventFormatters.jsonFormat)
assert(protocol == od.protocol)
assert(host == od.host)
assert(port == od.port)
......@@ -111,7 +111,7 @@ class OutputDefinitionTest extends UnitSpec {
}
test("remap method should take elements common to a & b and remap to b's keys") {
val od = new OutputDefinition(protocol, host, port, routeTypeQueue, destName, fields, OutputEventFormatters.jsonFormat)
val od = new OutputDefinition(protocol, host, port, null, null, routeTypeQueue, destName, fields, OutputEventFormatters.jsonFormat)
val remapped = od.remap(event.toMap)
assert(4 == remapped.size)
assert(event.get(f1.inputField.name).get == remapped.get(f1.name).get)
......@@ -121,7 +121,7 @@ class OutputDefinitionTest extends UnitSpec {
}
test("generate method should create a JSON string") {
val od = new OutputDefinition(protocol, host, port, routeTypeTopic, destName, fields, OutputEventFormatters.jsonFormat)
val od = new OutputDefinition(protocol, host, port, null, null, routeTypeTopic, destName, fields, OutputEventFormatters.jsonFormat)
val strVal = od.convert(event.toMap).asInstanceOf[String]
assert(strVal != null)
val jsonInput = new StreamInput(new ByteArrayInputStream(strVal.getBytes))
......@@ -136,7 +136,7 @@ class OutputDefinitionTest extends UnitSpec {
Velocity.init(OutputEventFormatters.velocityProps)
val ctx = new VelocityContext()
val template = Velocity.getTemplate("OutputDefSpec.vm")
val od = new OutputDefinition(protocol, host, port, routeTypeQueue, destName, fields,
val od = new OutputDefinition(protocol, host, port, null, null, routeTypeQueue, destName, fields,
OutputEventFormatters.velocityFormat(ctx, template))
val convertedValue = od.convert(event.toMap).asInstanceOf[String]
assert(convertedValue != null)
......@@ -244,4 +244,6 @@ class OutputDefinitionTest extends UnitSpec {
}
})
}
// TODO - add in tests for RabbitMQ and Stomp configurations
}
......@@ -25,7 +25,7 @@ class OutputDefsYamlTest extends UnitSpec {
assert(config.definitions.size == 2)
val def1 = config.definitions(0)
assert(def1.prototcol == "vm")
assert(def1.protocol == "vm")
assert(def1.host == "localhost")
assert(def1.port == 61616)
assert(def1.routeType == "topic")
......@@ -62,7 +62,7 @@ class OutputDefsYamlTest extends UnitSpec {
assert(def1.fields(6).dateFmtStr == null)
val def2 = config.definitions(1)
assert(def2.prototcol == "vm")
assert(def2.protocol == "vm")
assert(def2.host == "localhost")
assert(def2.port == 61616)
assert(def2.routeType == "topic")
......@@ -99,7 +99,7 @@ class OutputDefsYamlTest extends UnitSpec {
assert(def2.fields(6).dateFmtStr == null)
}
test("Missing protocol") {
test("Not stomp or RMQ - Missing protocol") {
val yaml =
"""outputDefs:
- host: localhost
......@@ -122,7 +122,7 @@ class OutputDefsYamlTest extends UnitSpec {
}
}
test("Missing host") {
test("Not stomp or RMQ - Missing host") {
val yaml =
"""outputDefs:
- protocol: vm
......@@ -145,7 +145,7 @@ class OutputDefsYamlTest extends UnitSpec {
}
}
test("Missing port") {
test("Not stomp or RMQ - Missing port") {
val yaml =
"""outputDefs:
- protocol: vm
......@@ -169,7 +169,7 @@ class OutputDefsYamlTest extends UnitSpec {
}
test("Zero port") {
test("Not stomp or RMQ - Zero port") {
val yaml =
"""outputDefs:
- protocol: vm
......@@ -192,7 +192,7 @@ class OutputDefsYamlTest extends UnitSpec {
mapper.readValue(is, classOf[OutputDefsYaml])
}
}
test("Missing routeType") {
test("Not stomp or RMQ - Missing routeType") {
val yaml =
"""outputDefs:
- protocol: vm
......@@ -215,7 +215,7 @@ class OutputDefsYamlTest extends UnitSpec {
}
}
test("Incorrect route type (not 'topic' || 'queue')") {
test("Not stomp or RMQ - Incorrect route type (not 'topic' || 'queue')") {
val yaml =
"""outputDefs:
- protocol: vm
......@@ -352,4 +352,6 @@ class OutputDefsYamlTest extends UnitSpec {
mapper.readValue(is, classOf[OutputDefsYaml])
}
}
// TODO - add in tests for RabbitMQ and Stomp configurations
}
package com.cablelabs.eventgen.model
import com.cablelabs.eventgen.akka.AkkaTestUtils
import com.rabbitmq.client.{ConnectionFactory, QueueingConsumer}
/**
* Not a real test. Originally used to test against a Heroku cloud based broker hence the cryptic endpoints
* If this test is not extended (only if RMQ supports running in JVM) then it can be removed
*/
class RabbitMqRouterTest extends AkkaTestUtils {
// val uri = "amqp://zutmawnt:ywqbdohUdNAUsX6Ej4ZwCnD9ShZRm4Ns@tiger.cloudamqp.com/zutmawnt"
// val uri = "amqp://zutmawnt:ywqbdohUdNAUsX6Ej4ZwCnD9ShZRm4Ns@tiger.cloudamqp.com/zutmawnt"
// val uri = "amqp://localhost"
/*
akkaTest("RMQ Actor") {
val channelName = "hello-test-RMQ-Actor"
val host = "tiger.cloudamqp.com"
val user = "zutmawnt"
val pass = "ywqbdohUdNAUsX6Ej4ZwCnD9ShZRm4Ns"
val queueName = "RabbitMqRouterTest"
val actor = system.actorOf(Props(classOf[RabbitMqRouter], host, user, pass, queueName))
val uri = s"amqp://$user:$pass@$host/$user"
val channel = getChannel(uri, queueName)
val consumer = new QueueingConsumer(channel)
channel.basicConsume(channelName, true, consumer)
Thread.sleep(1000)
actor ! s"hello-test2a - ${System.currentTimeMillis()}"
actor ! s"hello-test2b - ${System.currentTimeMillis()}"
actor ! s"hello-test2c - ${System.currentTimeMillis()}"
Thread.sleep(3000)
}
*/
def getChannel(uri: String, queueName: String) = {
val factory = new ConnectionFactory()
factory.setUri(uri)
val connection = factory.newConnection()
val channel = connection.createChannel()
channel.queueDeclare(queueName, false, false, false, null)
channel
}
}
object Pub extends App {
val uri = "amqp://zutmawnt:ywqbdohUdNAUsX6Ej4ZwCnD9ShZRm4Ns@tiger.cloudamqp.com/zutmawnt"
// val uri = "amqp://localhost"
val factory = new ConnectionFactory()
factory.setUri(uri)
val connection = factory.newConnection()
val channel = connection.createChannel()
channel.queueDeclare("RabbitMqRouterTest", false, false, false, null)
val message = s"Hello from Pub ${System.currentTimeMillis()}!"
channel.basicPublish("", "hello", null, message.getBytes)
System.out.println(" [x] Sent '" + message + "'")
}
object Sub extends App {
val uri = "amqp://zutmawnt:ywqbdohUdNAUsX6Ej4ZwCnD9ShZRm4Ns@tiger.cloudamqp.com/zutmawnt"
// val uri = "amqp://admin:admin@tiger.cloudamqp.com"
// val uri = "amqp://localhost"
val factory = new ConnectionFactory()
factory.setUri(uri)
val connection = factory.newConnection()
val channel = connection.createChannel()
// channel.queueDeclare("RabbitMqRouterTest", false, false, false, null)
// channel.queueDeclare("CMHealthXML", false, false, false, null)
channel.queueDeclare("CMHealthJson", false, false, false, null)
val consumer = new QueueingConsumer(channel)
channel.basicConsume("hello", true, consumer)
while (true) {
val delivery = consumer.nextDelivery()
val message = new String(delivery.getBody)
System.out.println(s" [x] Received '$message' at ${System.currentTimeMillis()}")
}
}
sparkUri: local[8]
appName: CM-Analyzer-small
schemaUri: testData/cm/definition/cm-constant-input-fact-pred-test.yaml
outputDefUri: testData/cm/definition/cm-out-integration.yaml
#outputDefUri: testData/cm/definition/cm-out-integration-RMQ.yaml
fileDelim: "|"
eventsUri: testData/cm/events/cm_1a.txt
eventTimeOffset: 123
sendPastEvents: false
useNow: true
numSchedulerThreads: 10
seedFilters:
- 99:2b:b2:11:4k:k4
- 99:46:8g:b9:10:9z
\ No newline at end of file
outputDefs:
- protocol: rabbitMq
host: tiger.cloudamqp.com
user: zutmawnt
pass: ywqbdohUdNAUsX6Ej4ZwCnD9ShZRm4Ns
name: CMHealthXML
format: testData/cm/definition/amdocs-cm.vm