Commit af6e9c1e authored by Steven Pisarski's avatar Steven Pisarski

Analyzer and generator production configuration files.

parent 88cba7cd
......@@ -263,8 +263,9 @@ class SparkAnalyzer(val data: RDD[(String, Date, Map[String, Any])], val inputDe
logger.info("Retrieving the training sets for each fact prediction algorithm")
data.flatMap[(LabeledPoint, Any)](p => {
val features = inputDef.factAlgoFeatures(p._3, fact)
Seq((LabeledPoint(inputDef.fieldMap.get(name).get.mlTrainingValue(p._3), new DenseVector(features.toArray)),
fact.eventValue(p._3)))
Seq((LabeledPoint(inputDef.fieldMap.get(name).get.mlTrainingValue(p._3),
new DenseVector(features.toArray)),
fact.eventValue(p._3)))
})
case _ =>
data.context.parallelize(List[(LabeledPoint, Any)]())
......
......@@ -3,7 +3,9 @@ package com.cablelabs.eventgen.model
import java.text.SimpleDateFormat
import java.util.Date
import com.typesafe.scalalogging.slf4j.Logger
import org.joda.time.DateTime
import org.slf4j.LoggerFactory
/**
* Abstract class for all event fields
......@@ -48,6 +50,7 @@ class DateTemporal(override val name: String, override val description: String =
dateFmtStr: String, override val factPosition: Int)
extends Temporal(name, description, denormFields, algoDef, factPosition) with DateRole {
val dateFormat = new SimpleDateFormat(dateFmtStr)
private[this] def logger = Logger(LoggerFactory.getLogger("DateTemporal"))
/**
* Returns an ordered map where the key contains the name of the denormalized field
......@@ -55,7 +58,29 @@ class DateTemporal(override val name: String, override val description: String =
* @return - the map
*/
override def denormalize(event: Map[String, Any]): Seq[(String, Long)] = {
val date = eventValue[Date](event)
/* TODO FIXME
This should never occur but I have seen a few exceptions where None is being returned from the Map but very
sporadically only when running within a Spark cluster. Have never observed in dev
"""ERROR DateTemporal: Unable to find date field with name poll_date in payload
Map(mac -> 99:2b:b2:11:4k:k4, lng -> -94.413, resets_cnt -> 2, cmts -> ziu92.chbglghbghig.qy.chbglgh,
downstream_snr_rt -> 37.0, ds_fec_uncorrected_cnt -> 175539, lost_syncs_cnt -> 36,
ds_fec_unerrored_cnt -> 4200663303, upstream_transmit_power_num -> 45.5, poll_date -> None,
downstream_receive_power_num -> -2.9, t3_timeouts_cnt -> 67, node -> 40I, t4_timeouts_cnt -> 0, lat -> 39.111,
ds_fec_corrected_cnt -> 66683)}. Using now for the date"""
Where the line in the file looks like:
ziu92.chbglghbghig.qy.chbglgh|40I|99:2b:b2:11:4k:k4|39.111|-94.413|-2.9|45.5|37|67|0|36|2|66683|175539|4200663303|07-23-2014-06:04:59
and the last field contains the properly formatted date string
See InputDefinition.fromStringArr() & InputDefinition.getFieldValue which may shed some light on the problem
*/
val date = try eventValue[Date](event) catch {
case e: Exception =>
logger.error(s"Unable to find date field with name $name in payload $event}. Using now for the date", e)
new Date()
}
val dt = new DateTime(date)
// TODO - try and make more functional
......
......@@ -9,6 +9,6 @@ import org.apache.spark.SparkContext
object JmsConsumer extends App {
new SparkContext("local[4]", "test")
val notifier = new Notifier
TestActors.activeMqConsumer("tcp", "bda-active01", 61616, "CMHealthXML", "topic", notifier)
TestActors.activeMqConsumer("tcp", "bda-active01", 61616, "CMConnectionInfoXML", "topic", notifier)
Thread.sleep(10000)
}
......@@ -29,7 +29,7 @@ class OutputDefsYamlTest extends UnitSpec {
assert(def1.host == "localhost")
assert(def1.port == 61616)
assert(def1.routeType == "topic")
assert(def1.name == "CMHealthXML")
assert(def1.name == "CMConnectionInfoXML")
assert(def1.format == "testData/cm/definition/amdocs-cm.vm")
assert(def1.fields.size == 7)
assert(def1.fields(0).name == "EventTimeStamp")
......@@ -66,7 +66,7 @@ class OutputDefsYamlTest extends UnitSpec {
assert(def2.host == "localhost")
assert(def2.port == 61616)
assert(def2.routeType == "topic")
assert(def2.name == "CMHealthJson")
assert(def2.name == "CMConnectionInfoJson")
assert(def2.format == "json")
assert(def2.fields.size == 7)
assert(def2.fields(0).name == "EventTimeStamp")
......
......@@ -3,7 +3,7 @@ outputDefs:
host: tiger.cloudamqp.com
user: zutmawnt
pass: ywqbdohUdNAUsX6Ej4ZwCnD9ShZRm4Ns
name: CMHealthXML
name: CMConnectionInfoXML
format: testData/cm/definition/amdocs-cm.vm
fields:
- name: EventTimeStamp
......@@ -32,7 +32,7 @@ outputDefs:
host: tiger.cloudamqp.com
user: zutmawnt
pass: ywqbdohUdNAUsX6Ej4ZwCnD9ShZRm4Ns
name: CMHealthJson
name: CMConnectionInfoJson
format: json
fields:
- name: EventTimeStamp
......
......@@ -3,7 +3,7 @@ outputDefs:
host: localhost
port: 61616
routeType: topic
name: CMHealthXML
name: CMConnectionInfoXML
format: testData/cm/definition/amdocs-cm.vm
fields:
- name: EventTimeStamp
......@@ -34,7 +34,7 @@ outputDefs:
routeType: topic
user: guest
pass: guest
name: CMHealthJson
name: CMConnectionInfoJson
format: json
fields:
- name: EventTimeStamp
......
......@@ -3,7 +3,7 @@ outputDefs:
host: localhost
port: 61616
routeType: topic
name: CMHealthXML
name: CMConnectionInfoXML
format: testData/cm/definition/amdocs-cm.vm
fields:
- name: EventTimeStamp
......@@ -32,7 +32,7 @@ outputDefs:
host: localhost
port: 61616
routeType: topic
name: CMHealthJson
name: CMConnectionInfoJson
format: json
fields:
- name: EventTimeStamp
......
schemaUri: hdfs://bda-hdfs01/tmp/cm/conf/input/cm-constant-prod-input.yaml
fileDelim: "|"
eventsUri: hdfs://bda-hdfs01/tmp/cm/data
temporalTrainingSetUri: hdfs://bda-hdfs01/tmp/cm/analysis/temporalTrainingSet.rdd
factTrainingSetUri: hdfs://bda-hdfs01/tmp/cm/analysis/factTrainingSets
seedEventsUri: hdfs://bda-hdfs01/tmp/cm/analysis/seedEvents.rdd
eventCountUri: hdfs://bda-hdfs01/tmp/cm/analysis/eventCount.txt
dimCountUri: hdfs://bda-hdfs01/tmp/cm/analysis/dimCount.txt
dimEventsCountUri: hdfs://bda-hdfs01/tmp/cm/analysis/dimEventsCount.txt
factValuesUri: hdfs://bda-hdfs01/tmp/cm/analysis/factValues.txt
temporalTrainingMetricsUri: hdfs://bda-hdfs01/tmp/cm/analysis/temporalTrainingMetrics.txt
factTrainingMetricsUri: hdfs://bda-hdfs01/tmp/cm/analysis/factTrainingMetrics
numThreads: 20
\ No newline at end of file
schemaUri: hdfs://bda-hdfs01/tmp/cm/conf/input/cm-constant-prod-input.yaml
fileDelim: "|"
eventsUri: hdfs://bda-hdfs01/tmp/cm/data
factTrainingMetricsUri: hdfs://bda-hdfs01/tmp/cm/analysis/factTrainingMetrics-all
numThreads: 20
\ No newline at end of file
schemaUri: hdfs://bda-hdfs01/tmp/cm/conf/input/cm-constant-prod-input.yaml
fileDelim: "|"
eventsUri: hdfs://bda-hdfs01/tmp/cm/data/cm_1.txt
factTrainingMetricsUri: hdfs://bda-hdfs01/tmp/cm/analysis/factTrainingMetrics-small
numThreads: 20
\ No newline at end of file
schemaUri: hdfs://bda-hdfs01/tmp/cm/conf/input/cm-constant-prod-input.yaml
fileDelim: "|"
eventsUri: hdfs://bda-hdfs01/tmp/cm/data
temporalTrainingSetUri: hdfs://bda-hdfs01/tmp/cm/analysis/temporalTrainingSet.rdd
factTrainingSetUri: hdfs://bda-hdfs01/tmp/cm/analysis/factTrainingSets
seedEventsUri: hdfs://bda-hdfs01/tmp/cm/analysis/seedEvents.rdd
numThreads: 20
\ No newline at end of file
schemaUri: hdfs://bda-hdfs01/tmp/cm/conf/input/cm-constant-prod-input.yaml
fileDelim: "|"
eventsUri: hdfs://bda-hdfs01/tmp/cm/data/cm_1.txt
temporalTrainingSetUri: hdfs://bda-hdfs01/tmp/cm/analysis-small/temporalTrainingSet.rdd
factTrainingSetUri: hdfs://bda-hdfs01/tmp/cm/analysis-small/factTrainingSets
seedEventsUri: hdfs://bda-hdfs01/tmp/cm/analysis-small/seedEvents.rdd
numThreads: 20
\ No newline at end of file
schemaUri: hdfs://bda-hdfs01/tmp/cm/conf/input/cm-constant-prod-input.yaml
outputDefUri: hdfs://bda-hdfs01/tmp/cm/conf/output/cm-out.yaml
temporalTrainingSetUri: hdfs://bda-hdfs01/tmp/cm/analysis/temporalTrainingSet.rdd
factTrainingSetUri: hdfs://bda-hdfs01/tmp/cm/analysis/factTrainingSet.rdd
seedEventsUri: hdfs://bda-hdfs01/tmp/cm/analysis/seedEvents.rdd
sendPastEvents: true
useNow: true
numSchedulerThreads: 10
seedFilters:
- 99:2b:b2:11:4k:k4
- 99:46:8g:b9:10:9z
- 28:zk:f9:22:22:54
- ii:z8:34:20:06:z4
- ii:31:zb:i5:90:98
\ No newline at end of file
schemaUri: hdfs://bda-hdfs01/tmp/cm/conf/input/cm-constant-prod-input.yaml
outputDefUri: hdfs://bda-hdfs01/tmp/cm/conf/output/cm-out.yaml
temporalTrainingSetUri: hdfs://bda-hdfs01/tmp/cm/analysis/temporalTrainingSet.rdd
factTrainingSetUri: hdfs://bda-hdfs01/tmp/cm/analysis/factTrainingSet.rdd
seedEventsUri: hdfs://bda-hdfs01/tmp/cm/analysis/seedEvents.rdd
sendPastEvents: true
useNow: true
numSchedulerThreads: 10000
schemaUri: hdfs://bda-hdfs01/tmp/cm/conf/input/cm-constant-short-input.yaml
outputDefUri: hdfs://bda-hdfs01/tmp/cm/conf/output/cm-out.yaml
temporalTrainingSetUri: hdfs://bda-hdfs01/tmp/cm/analysis/temporalTrainingSet.rdd
factTrainingSetUri: hdfs://bda-hdfs01/tmp/cm/analysis/factTrainingSet.rdd
seedEventsUri: hdfs://bda-hdfs01/tmp/cm/analysis/seedEvents.rdd
sendPastEvents: true
useNow: true
numSchedulerThreads: 10
seedFilters:
- 99:2b:b2:11:4k:k4
- 99:46:8g:b9:10:9z
- 28:zk:f9:22:22:54
- ii:z8:34:20:06:z4
- ii:31:zb:i5:90:98
\ No newline at end of file
schemaUri: hdfs://bda-hdfs01/tmp/cm/conf/input/cm-constant-short-input.yaml
outputDefUri: hdfs://bda-hdfs01/tmp/cm/conf/output/cm-out.yaml
temporalTrainingSetUri: hdfs://bda-hdfs01/tmp/cm/analysis/temporalTrainingSet.rdd
factTrainingSetUri: hdfs://bda-hdfs01/tmp/cm/analysis/factTrainingSet.rdd
seedEventsUri: hdfs://bda-hdfs01/tmp/cm/analysis/seedEvents.rdd
sendPastEvents: true
useNow: true
numSchedulerThreads: 10000
schemaUri: hdfs://bda-hdfs01/tmp/cm/conf/input/cm-constant-short-input.yaml
outputDefUri: hdfs://bda-hdfs01/tmp/cm/conf/output/cm-out.yaml
temporalTrainingSetUri: hdfs://bda-hdfs01/tmp/cm/analysis-small/temporalTrainingSet.rdd
factTrainingSetUri: hdfs://bda-hdfs01/tmp/cm/analysis-small/factTrainingSets
seedEventsUri: hdfs://bda-hdfs01/tmp/cm/analysis-small/seedEvents.rdd
sendPastEvents: true
useNow: true
numSchedulerThreads: 10
seedFilters:
- 99:2b:b2:11:4k:k4
- 99:46:8g:b9:10:9z
- 28:zk:f9:22:22:54
- ii:z8:34:20:06:z4
- ii:31:zb:i5:90:98
\ No newline at end of file
temporal:
name: poll_date
description: Date of CM poll
type: date
dateFormat: MM-dd-yyyy-HH:mm:ss
factPosition: -1
denormFields:
- day_of_week
- day_of_month
- day_of_year
- hour_of_day
algo:
name: constant
constType: integer
constVal: 28800000
dimensions:
- name: cmts
description: The CMTS name
type: string
position: 10
- name: node
description: The Node name
type: string
position: 20
- name: mac
description: The MAC address
type: string
position: 30
- name: lat
description: The geo latitude
type: float
position: 40
- name: lng
description: The geo longitude
type: float
position: 50
facts:
- name: downstream_receive_power_num
description: fact 1-out
type: float
position: 140
algo:
name: linearRegression
flatten:
mode: log
base: 2
iterations: 3
polyDegree: 3
iterations: 75
stepSize: 0.002
weights:
- name: day_of_week
weight: 10
- name: day_of_month
weight: 10
- name: day_of_year
weight: 10
- name: hour_of_day
weight: 50
- name: month_of_year
weight: 25
- name: year
weight: 10
- name: cmts
weight: 150
- name: node
weight: 200
- name: mac
weight: 250
- name: lat
weight: 250
- name: lng
weight: 250
- name: upstream_transmit_power_num
description: fact 2-out
type: float
position: 110
algo:
name: linearRegression
flatten:
mode: log
base: 5
iterations: 2
polyDegree: 2
iterations: 75
stepSize: 0.04
weights:
- name: day_of_week
weight: 10
- name: day_of_month
weight: 10
- name: day_of_year
weight: 10
- name: hour_of_day
weight: 100
- name: month_of_year
weight: 50
- name: year
weight: 10
- name: cmts
weight: 100
- name: node
weight: 100
- name: mac
weight: 150
- name: lat
weight: 150
- name: lng
weight: 150
- name: downstream_snr_rt
description: fact 3-out
type: float
position: 130
algo:
name: linearRegression
flatten:
mode: log
base: 5
iterations: 2
polyDegree: 2
iterations: 75
stepSize: 0.05
weights:
- name: day_of_week
weight: 10
- name: day_of_month
weight: 10
- name: day_of_year
weight: 10
- name: hour_of_day
weight: 100
- name: month_of_year
weight: 50
- name: year
weight: 10
- name: cmts
weight: 100
- name: node
weight: 100
- name: mac
weight: 150
- name: lat
weight: 150
- name: lng
weight: 150
- name: t3_timeouts_cnt
description: fact 4
type: integer
position: 40
algo:
name: linearRegression
flatten:
mode: log
base: 5
iterations: 2
polyDegree: 4
iterations: 30
stepSize: 0.02
weights:
- name: day_of_week
weight: 10
- name: day_of_month
weight: 10
- name: day_of_year
weight: 10
- name: hour_of_day
weight: 100
- name: month_of_year
weight: 50
- name: year
weight: 10
- name: cmts
weight: 100
- name: node
weight: 100
- name: mac
weight: 150
- name: lat
weight: 150
- name: lng
weight: 150
- name: t4_timeouts_cnt
description: fact 4b
type: integer
position: 41
algo:
name: linearRegression
flatten:
mode: log
base: 5
iterations: 2
polyDegree: 4
iterations: 30
stepSize: 0.02
weights:
- name: day_of_week
weight: 10
- name: day_of_month
weight: 10
- name: day_of_year
weight: 10
- name: hour_of_day
weight: 100
- name: month_of_year
weight: 50
- name: year
weight: 10
- name: cmts
weight: 100
- name: node
weight: 100
- name: mac
weight: 150
- name: lat
weight: 150
- name: lng
weight: 150
- name: lost_syncs_cnt
description: fact 5
type: integer
position: 50
algo:
name: linearRegression
flatten:
mode: log
base: 5
iterations: 3
polyDegree: 3
iterations: 50
stepSize: 0.02
weights:
- name: day_of_week
weight: 10
- name: day_of_month
weight: 10
- name: day_of_year
weight: 10
- name: hour_of_day
weight: 100
- name: month_of_year
weight: 50
- name: year
weight: 10
- name: cmts
weight: 100
- name: node
weight: 100
- name: mac
weight: 150
- name: lat
weight: 150
- name: lng
weight: 150
- name: resets_cnt
description: fact 6
type: integer
position: 60
algo:
name: linearRegression
flatten:
mode: log
base: 5
iterations: 2
polyDegree: 2
iterations: 70
stepSize: 0.025
weights:
- name: day_of_week
weight: 10
- name: day_of_month
weight: 10
- name: day_of_year
weight: 10
- name: hour_of_day
weight: 100
- name: month_of_year
weight: 50
- name: year
weight: 10
- name: cmts
weight: 100
- name: node
weight: 100
- name: mac
weight: 150
- name: lat
weight: 150
- name: lng
weight: 150
- name: ds_fec_corrected_cnt
description: fact 7
type: integer
position: 70
algo:
name: linearRegression
flatten:
mode: log
base: 5
iterations: 2
polyDegree: 4
iterations: 30
stepSize: 0.02
weights:
- name: day_of_week
weight: 10
- name: day_of_month
weight: 10
- name: day_of_year
weight: 10
- name: hour_of_day
weight: 100
- name: month_of_year
weight: 50
- name: year
weight: 10
- name: cmts
weight: 100
- name: node
weight: 100
- name: mac
weight: 150
- name: lat
weight: 150
- name: lng
weight: 150
- name: ds_fec_uncorrected_cnt
description: fact 8
type: integer
position: 80
algo:
name: linearRegression
flatten:
mode: log
base: 5
iterations: 2
polyDegree: 4
iterations: 30
stepSize: 0.02
weights:
- name: day_of_week
weight: 10
- name: day_of_month
weight: 10
- name: day_of_year
weight: 10
- name: hour_of_day
weight: 100
- name: month_of_year
weight: 50
- name: year
weight: 10
- name: cmts
weight: 100
- name: node
weight: 100
- name: mac
weight: 150
- name: lat
weight: 150
- name: lng
weight: 150
- name: ds_fec_unerrored_cnt
description: fact 9
type: integer
position: 90
algo:
name: linearRegression
flatten:
mode: log
base: 5
iterations: 2
polyDegree: 4
iterations: 30
stepSize: 0.02
weights:
- name: day_of_week
weight: 10
- name: day_of_month
weight: 10
- name: day_of_year
weight: 10
- name: hour_of_day
weight: 100
- name: month_of_year
weight: 50
- name: year
weight: 10
- name: cmts
weight: 100
- name: node
weight: 100
- name: mac
weight: 150
- name: lat
weight: 150
- name: lng
weight: 150
temporal:
name: poll_date
description: Date of CM poll
type: date<