InputDefinition.scala 18.2 KB
Newer Older
1
package com.cablelabs.eventgen.model
2

3
import java.io.InputStream
4 5
import java.security.MessageDigest

6 7
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
8
import org.json4s.JsonAST.{JObject, JValue}
9
import org.json4s.StringInput
10
import org.json4s.native.JsonMethods
11
import org.json4s.native.JsonMethods._
12 13 14 15 16

/**
 * Object used to create InputDefinition objects
 */
object InputDefinition {
17

18
  /**
19 20 21
   * Retrieves the InputDefinition by parsing a YAML configuration file
   * @param is - the stream to the YAML file
   * @return - the input definition object
22
   */
23 24 25 26
  def inputDefinition(is: InputStream): InputDefinition = {
    val mapper = new ObjectMapper(new YAMLFactory())
    val config = mapper.readValue(is, classOf[InputDefinitionYaml])
    config.inputDef
27
  }
28

29
}
30 31

/**
32
 * Class defining the input fields used for initializing the event Generator
33 34 35 36
 * @param temporal - the field that drives the temporal aspect of the stream
 * @param dimensionSet - the event's dimension fields
 * @param factSet - the event's fact fields
 */
37
class InputDefinition(val temporal: Temporal, val dimensionSet: Set[Dimension], val factSet: Set[Fact])
38 39
  extends Serializable {

40 41 42 43
  require(temporal != null)
  require(dimensionSet != null && dimensionSet.size > 0)
  require(factSet != null && factSet.size > 0)

44 45 46
  /**
   * The Dimensions by name
   */
47
  val dimensions = dimensionSet.map(d => d.name -> d).toMap
48

49
  /**
50
   * Dimensions in priority order
51
   */
52
  val positionalDims = dimensionSet.map(d => d.name -> d.position).toSeq.sortBy( _._2 ).map(d => dimensions(d._1))
53

54
  /**
55
   * Contains all fields where the key is the field's name
56
   */
Steven Pisarski's avatar
Steven Pisarski committed
57 58
  val fieldMap =
    dimensionSet.map(d => d.name -> d).toMap ++ factSet.map(f => f.name -> f).toMap + (temporal.name -> temporal)
59

60 61 62
  /**
   * The Facts by name
   */
63 64 65 66 67 68
  val facts = factSet.map(f => f.name -> f).toMap

  /**
   * Facts in priority order
   */
  val positionalFacts = factSet.map(f => f.name -> f.position).toSeq.sortBy( _._2 ).map(f => facts(f._1))
69 70 71 72 73 74

  /**
   * Returns all of the fact values of an event in order by dependencies
   * @param event - the event to parse
   * @return - an ordered map of all fact values
   */
75
  def factValues(event: Map[String, Any]): Map[String, Any] = facts.map(f => f._1 -> f._2.eventValue(event))
76 77 78 79 80 81

  /**
   * Returns all of the dimension values from an event in priority order
   * @param event - the event to parse
   * @return - an ordered map of all dimension values
   */
82 83
  def dimensionValues(event: Map[String, Any]): Seq[(String, Any)] =
    positionalDims.map(d => d.name -> d.eventValue[Any](event))
84 85 86 87 88 89

  /**
   * Returns an encoded String containing each dimension key/value in priority order
   * @param event - the event to parse
   * @return - a big long String
   */
90 91
  def dimString(event: Map[String, Any]): String =
    positionalDims.map(d => d.name + "::" + d.eventStringValue(event)).filter(_.nonEmpty).mkString("|")
92 93 94 95 96 97

  /**
   * Hashing function below plagurized from http://code-redefined.blogspot.com/2009/05/md5-sum-in-scala.html
   * @param event - the event
   * @return - hopefully a unique value for any unique set of dimension values
   */
98
  def dimHash(event: Map[String, Any]): String = {
99 100 101 102 103 104 105 106
    // TODO - keep an eye on the performance of this algorithm but it is as accurate as using the entire string but
    // 1/2 the size or so (32 character string from >40 characters)
    val md5 = MessageDigest.getInstance("MD5")
    md5.reset()
    md5.update(dimString(event).getBytes)
    md5.digest().map(0xFF & _).map { "%02x".format(_) }.foldLeft(""){_ + _}
  }

107 108 109 110 111 112
  /**
   * Creates an event Map from an array of strings and values converted to the proper data type
   * @param headers - the header values
   * @param values - the event values to parse and convert
   * @return - the mapped event
   */
113
  def fromStringArr(headers: Array[String], values: Array[String]): Map[String, Any] = {
Steven Pisarski's avatar
Steven Pisarski committed
114 115
    require(headers.length == values.length)
    (for (i <- 0 until headers.length) yield headers(i) -> getFieldValue(headers(i), values(i))).toMap
116 117
  }

118 119 120 121 122 123
  /**
   * Returns the field's value of the required type parsed from a String
   * @param name - the name of the field
   * @param value - the field's String value to parse
   * @return - the reference of the proper type only if the field is located else None
   */
Steven Pisarski's avatar
Steven Pisarski committed
124 125 126
  def getFieldValue(name: String, value: String):Any =
    if (fieldMap.get(name) == None) None
    else fieldMap.get(name).get.convert(value)
127 128 129 130 131 132

  /**
   * Converts an event to a json4s JValue JSON object
   * @param event - the event to convert
   * @return - the JSON object
   */
133
  def toJson(event: Map[String, Any]): JValue = new JObject(fieldMap.map(f => f._2.jField(event)).toList)
134 135 136 137 138 139

  /**
   * Converts an event into a JSON String on a single line (not prety)
   * @param event - the event to convert
   * @return - the JSON String
   */
Steven Pisarski's avatar
Steven Pisarski committed
140
  def toJsonString(event: Map[String, Any]): String = JsonMethods.compact(JsonMethods.render(toJson(event)))
141

142
  /**
143
   * Converts a JSON object to a standard event Map[String, Any]
144 145 146
   * @param json - the JSON object to parse
   * @return - the Map
   */
Steven Pisarski's avatar
Steven Pisarski committed
147
  def fromJson(json: JValue): Map[String, Any] = fromJsonStr(JsonMethods.compact(JsonMethods.render(json)))
148

149
  /**
150
   * Converts a JSON payload to a standard event Map[String, Any]
151 152 153
   * @param jsonStr - the JSON document
   * @return - the Map
   */
154
  def fromJsonStr(jsonStr: String): Map[String, Any] = {
Steven Pisarski's avatar
Steven Pisarski committed
155 156 157 158
    // TODO determine how to get rid of this asInstanceOf call
    val values = parse(new StringInput(jsonStr)).values.asInstanceOf[Map[String, Any]]
    val filtered = values.filter(e => fieldMap.get(e._1) != None)
    filtered.map(e => e._1 -> fieldMap.get(e._1).get.convert(e._2.toString))
159
  }
160 161

  /**
162
   * Return the training features for the temporal ML algorithm in the first tuple position and the algo weights
163 164 165
   * @param event - the event to parse
   * @return - the feature set
   */
Steven Pisarski's avatar
Steven Pisarski committed
166
  def temporalAlgoFeatures(event: Map[String, Any]): Seq[Double] =
167
    temporal.algoDef match {
168
      case algoDef:SupervisedTraining => algoFeatures(event, temporal.factPosition, algoDef)
169
      case _ => Seq[Double]()
170
    }
171 172 173 174

  /**
   * Return the training features for a given event and fact attribute by name
   * @param event - the event to parse
175
   * @param fact - the fact field for which the training set will be generated
176 177
   * @return - the features
   */
Steven Pisarski's avatar
Steven Pisarski committed
178
  def factAlgoFeatures(event: Map[String, Any], fact: Fact): Seq[Double] =
179
    fact.algoDef match {
180
      case algoDef:SupervisedTraining => algoFeatures(event, fact.position, algoDef)
181
      case _ => Seq[Double]()
182
    }
183 184

  /**
185
   * Return the training features for a SupervisedTraining algorithm
186
   * @param event - the event to parse
187
   * @param factPosition - generates the feature set up to the index of the fact requested.
Steven Pisarski's avatar
Steven Pisarski committed
188
   *                    When 0, no facts will be included. When < 0, all will be generated
189 190
   * @return - the feature set
   */
191
  private def algoFeatures(event: Map[String, Any], factPosition: Int, algo: SupervisedTraining): Seq[Double] =
192 193
    MachineLearning.polynomial(algo.polyDegree, algo.flatten(
        temporalFeatures(event, algo) ++: dimensionFeatures(event, algo) ++: factFeatures(event, factPosition, algo)))
194

195
  /**
196 197 198
   * Return the temporal training features for a SupervisedTraining algorithm
   * @param event - the event to parse
   * @return - the feature set
199
   */
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
  private def temporalFeatures(event: Map[String, Any], algo: SupervisedTraining): Seq[Double] =
    temporal.denormalize(event).filter(p => !algo.omitFields.contains(p._1)).map(f => f._2.toDouble)

  /**
   * Return the dimensional training features for a SupervisedTraining algorithm
   * @param event - the event to parse
   * @return - the feature set
   */
  private def dimensionFeatures(event: Map[String, Any], algo: SupervisedTraining): Seq[Double] =
    positionalDims.filter(p => !algo.omitFields.contains(p.name)).map(_.mlTrainingValue(event))

  /**
   * Return the fact training features for a SupervisedTraining algorithm
   * @param event - the event to parse
   * @param factPosition - generates the feature set up to the index of the fact requested.
   *                    When 0, no facts will be included. When < 0, all will be generated
   * @return - the feature set
   */
  private def factFeatures(event: Map[String, Any], factPosition: Int, algo: SupervisedTraining): Seq[Double] =
    positionalFacts.filter(p =>
      if (factPosition < 0) !algo.omitFields.contains(p.name)
      else p.position < factPosition && !algo.omitFields.contains(p.name)
    ).map(_.mlTrainingValue(event))

  /**
   * Returns the machine learning training weights as configured
   * @param field - the field to process
   * @return - the weights only for temporal or fact RegressionDefinitions else and empty Seq will be returned
   */
  def algoWeights(field: AlgorithmRole): Seq[Double] = field match {
    case temporal: Temporal => temporal.algoDef match {
      case regression: RegressionDefinition =>
232
        MachineLearning.polynomialWeights(regression.polyDegree,
233 234 235 236 237
          temporalWeights(regression) ++: dimensionWeights(regression) ++: factWeights(regression, temporal.factPosition))
      case _ => Seq()
    }
    case fact: Fact => fact.algoDef match {
      case regression: RegressionDefinition =>
238
        MachineLearning.polynomialWeights(regression.polyDegree,
239 240
          temporalWeights(regression) ++: dimensionWeights(regression) ++: factWeights(regression, fact.position))
      case _ => Seq()
241
    }
242
    case _ => Seq()
243
  }
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272

  /**
   * Returns the weights for the temporal features
   * @param regression - the regression definition
   * @return - the weights
   */
  private def temporalWeights(regression: RegressionDefinition): Seq[Double] =
    temporal.denormFields.filter(f => !regression.omitFields.contains(f))
      .map(f => if (regression.weights.get(f) == None) 0d else regression.weights(f))

  /**
   * Returns the weights for the dimension features
   * @param regression - the regression definition
   * @return - the weights
   */
  private def dimensionWeights(regression: RegressionDefinition): Seq[Double] =
    positionalDims.filter(p => !regression.omitFields.contains(p.name))
      .map(f => if (regression.weights.get(f.name) == None) 0d else regression.weights(f.name))

  /**
   * Returns the weights for the fact features
   * @param regression - the regression definition
   * @return - the weights
   */
  private def factWeights(regression: RegressionDefinition, factPosition: Int): Seq[Double] =
    positionalFacts.filter(p =>
      if (factPosition < 0) !regression.omitFields.contains(p.name)
      else p.position < factPosition && !regression.omitFields.contains(p.name))
      .map(f => if (regression.weights.get(f.name) == None) 0d else regression.weights(f.name))
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
}

import com.fasterxml.jackson.annotation.JsonProperty

import scala.collection.JavaConverters._

/**
 * Used by jackson when parsing the OutputDefinition YAML file
 */
class InputDefinitionYaml(@JsonProperty("temporal") temporal: TemporalFieldYaml,
                          @JsonProperty("dimensions") jDimensions: java.util.List[DimensionFieldYaml],
                          @JsonProperty("facts") jFacts: java.util.List[FactFieldYaml]) {
  require(temporal != null)
  require(jDimensions != null && jDimensions.size() > 0)
  require(jFacts != null && jFacts.size() > 0)
Steven Pisarski's avatar
Steven Pisarski committed
288 289 290
  val inputDef =
    new InputDefinition(temporal.temporalField, jDimensions.asScala.map(f => f.dimField).toSet,
      jFacts.asScala.map(f => f.factField).toSet)
291 292 293 294 295 296 297 298 299
}

/**
 * For parsing of the YAML temporal field definition
 */
class TemporalFieldYaml(@JsonProperty("name") name: String,
                      @JsonProperty("description") description: String = "",
                      @JsonProperty("type") fieldType: String,
                      @JsonProperty("dateFormat") dateFormat: String,
300
                      @JsonProperty("factPosition") factPosition: Int,
301 302 303 304 305
                      @JsonProperty("denormFields") jDenormFields: java.util.List[String],
                      @JsonProperty("algo") algo: AlgoYaml) {
  require(name != null)
  require(fieldType != null && fieldType == "date") // Currently only supports type Date
  if (fieldType == "date") require(dateFormat != null)
Steven Pisarski's avatar
Steven Pisarski committed
306 307
  val temporalField =
    new DateTemporal(name, description, jDenormFields.asScala, algo.algorithm, dateFormat, factPosition)
308 309 310 311 312 313 314 315 316 317 318 319 320 321
}

/**
 * For parsing of the YAML dimension field definition
 */
class DimensionFieldYaml(@JsonProperty("name") name: String,
                        @JsonProperty("description") description: String = "",
                        @JsonProperty("type") fieldType: String,
                        @JsonProperty("position") position: Int,
                        @JsonProperty("dateFormat") dateFormat: String) {
  require(name != null)
  require(fieldType != null &&
    (fieldType == "integer" || fieldType == "float" || fieldType == "string" || fieldType == "date"))
  if (fieldType == "date") require(dateFormat != null)
Steven Pisarski's avatar
Steven Pisarski committed
322
  val dimField = fieldType match {
323 324
    case "date" => new DateDimension(name, description, position, dateFormat)
    case "string" => new StringDimension(name, description, position)
325
    case "integer" => new IntDimension(name, description, position)
326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
    case "float" => new FloatDimension(name, description, position)
  }
}

/**
 * For parsing of the YAML fact field definition
 */
class FactFieldYaml(@JsonProperty("name") name: String,
                    @JsonProperty("description") description: String = "",
                    @JsonProperty("type") fieldType: String,
                    @JsonProperty("dateFormat") dateFormat: String,
                    @JsonProperty("position") position: Int,
                    @JsonProperty("algo") algo: AlgoYaml) {

  require(name != null)
  require(fieldType != null &&
    (fieldType == "integer" || fieldType == "float" || fieldType == "string" || fieldType == "date"))
  if (fieldType == "date") require(dateFormat != null)

Steven Pisarski's avatar
Steven Pisarski committed
345
  val factField = fieldType match {
346 347
    case "date" => new DateFact(name, description, position, algo.algorithm, dateFormat)
    case "string" => new StringFact(name, description, position, algo.algorithm)
348
    case "integer" => new IntFact(name, description, position, algo.algorithm)
349 350 351 352 353 354 355 356 357 358
    case "float" => new FloatFact(name, description, position, algo.algorithm)
  }
}

/**
 * For parsing of the YAML algorithm section of a field definition
 */
class AlgoYaml(@JsonProperty("name") name: String,
               @JsonProperty("constType") constType: String,
               @JsonProperty("constVal") constVal: String,
359
               @JsonProperty("omitFields") jOmitFields: java.util.Set[String],
360
               @JsonProperty("flatten") _flatten: AlgoFlattenYAML,
361 362 363
               @JsonProperty("polyDegree") polyDegree: Int,
               @JsonProperty("iterations") iterations: Int,
               @JsonProperty("stepSize") stepSize: Float,
364
               @JsonProperty("lambda") lambda: Float,
365
               @JsonProperty("weights") jWeights: java.util.Set[AlgoWeightsYAML]) {
366 367 368

  // Currently only support these
  require(name != null && (name == "linearRegression" || name == "naiveBayes" || name == "constant"))
369
  val flatten = if (_flatten != null) _flatten else new AlgoFlattenYAML("", 0, 0)
370 371 372 373

  private val omitFields =
    if (jOmitFields == null) Set[String]()
    else jOmitFields.asScala.toSet
374

375
  def algorithm = {
376 377
    name match {
      case "linearRegression" =>
378 379 380
        val weights =
          if (jWeights == null) Map[String, Int]()
          else jWeights.asScala.map(f => f.name -> f.weight).toMap
381
        new LinearRegressionDefinition(omitFields, weights, flatten.flatten, polyDegree, iterations, stepSize)
382
      case "naiveBayes" =>
383
        new NaiveBayesDefinition(omitFields, flatten.flatten, polyDegree, lambda)
384 385 386 387 388 389 390 391 392
      case "constant" =>
        constType match {
          case "string" => new ConstantStringDefinition(constVal)
          case "float" => new ConstantFloatDefinition(constVal.toDouble)
          case _ => new ConstantIntDefinition(constVal.toInt)
        }
    }
  }
}
393 394 395 396 397 398

/**
 * Used for adding a weight to a training set feature for supervised algorithm training
 * @param name - the field name
 * @param weight - the field's weight
 */
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458
class AlgoWeightsYAML(@JsonProperty("name") val name: String, @JsonProperty("weight") val weight: Int)

/**
 * Responsible for creating the flatten partial function
 * @param mode - the flatten mode (currently only support "naturalLog", "log" with base, and "root" with base or
 *               no change)
 * @param base - the mode's base (log base or the root where 2 denotes sqrt, 3 cbrt, etc...)
 * @param iterations - the number of times to apply the mode to the feature set
 */
class AlgoFlattenYAML(@JsonProperty("mode") val mode: String, @JsonProperty("base") val base: Int,
                     @JsonProperty("iterations") val iterations: Int) extends Serializable {
  /**
   * Returns the function responsible for flattening out the ML algorithm's feature set
   * @return
   */
  val flatten: (Seq[Double]) => Seq[Double] = {
    mode match {
      case "naturalLog" =>
        require(iterations > 0)

        def out(iter: Int, base: Int)(in: Seq[Double]): Seq[Double] = {
          def loop(iter: Int, features: Seq[Double]): Seq[Double] =
            if (iter < 1) features
            else loop(iter - 1, features.map(f =>
              if (f == 0) 0
              else math.log(math.abs(f)) * (if (f < 0) -1 else 1)))
          loop(iter, in)
        }
        out(iterations, base)
      case "log" =>
        require(iterations > 0)
        require(base > 1)

        def out(iter: Int, base: Int)(in: Seq[Double]): Seq[Double] = {
          def loop(iter: Int, features: Seq[Double]): Seq[Double] =
            if (iter < 1) features
            else loop(iter - 1, features.map(f =>
              if (f == 0) 0
              else math.log10(math.abs(f)) / math.log10(base) * (if (f < 0) -1 else 1)))
          loop(iter, in)
        }
        out(iterations, base)
      case "root" =>
        require(iterations > 0)
        require(base > 1)

        def out(iter: Int, base: Int)(in: Seq[Double]): Seq[Double] = {
          def loop(iter: Int, features: Seq[Double]): Seq[Double] =
            if (iter < 1) features
            else loop(iter - 1, features.map(f =>
              math.pow(Math.E, math.log(math.abs(f)) / base) * (if (f < 0) -1 else 1)))
          loop(iter, in)
        }
        out(iterations, base)
      case _ =>
        def out(in: Seq[Double]): Seq[Double] = in
        out
    }
  }
}