SparkAnalyzer.scala 12 KB
Newer Older
1 2
package com.cablelabs.eventgen.analysis

3 4
import java.io.{BufferedReader, InputStreamReader}
import java.net.URI
5 6
import java.util.Date

7
import com.cablelabs.eventgen.model.{InputDefinition, SupervisedTraining}
Steven Pisarski's avatar
Steven Pisarski committed
8
import com.typesafe.scalalogging.slf4j.Logger
9 10
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
11 12
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.DenseVector
13
import org.apache.spark.mllib.regression.LabeledPoint
14
import org.apache.spark.rdd.RDD
Steven Pisarski's avatar
Steven Pisarski committed
15
import org.slf4j.LoggerFactory
16 17

import scala.collection.mutable.ArrayBuffer
18
import scala.util.Random
19 20 21 22 23 24

/**
 * Performs Spark-based utilitarian operations required to analyze a dataset
 */
object SparkAnalyzer {

Steven Pisarski's avatar
Steven Pisarski committed
25 26
  private[this] def logger = Logger(LoggerFactory.getLogger("Spark Analyzer factory"))

27
  /**
28 29
   * Creates a SparkAnalyzer
   * Use this method for submitting to a Spark cluster
30
   * @param inputDef - defines the data to be read in for analysis
31 32 33 34
   * @param fileUri - the URI to the file or directory
   * @param delim - the file delimiter for parsing
   * @return - the RDD
   */
35
  private[cablelabs] def analyze(inputDef: InputDefinition, fileUri: String, delim: Char): SparkAnalyzer = {
Steven Pisarski's avatar
Steven Pisarski committed
36
    logger.info(s"Analyzing data from $fileUri within a Spark cluster")
37
    analyzeData(new SparkContext(), inputDef, fileUri, delim)
38 39 40
  }

  /**
41 42
   * Creates a SparkAnalyzer for submitting as a spark standalone or for unit testing
   * @param sc - the SparkContext
43
   * @param inputDef - defines the data to be read in for analysis
44 45
   * @param fileUri - the URI to the file or directory
   * @param delim - the file delimiter for parsing
46 47
   * @return - the RDD
   */
48
  private[cablelabs] def analyze(sc:SparkContext, inputDef: InputDefinition, fileUri: String,
49
                                 delim: Char): SparkAnalyzer = {
50 51
    logger.info(s"Analyzing data from $fileUri within the local cluster")
    analyzeData(sc, inputDef, fileUri, delim)
52 53 54 55 56
  }

  /**
   * Creates a SparkAnalyzer
   * @param sc - the Spark context
57
   * @param inputDef - defines the data to be read in for analysis
58 59 60 61
   * @param fileUri - the URI to the file or directory
   * @param delim - the file delimiter for parsing
   * @return - the RDD
   */
62 63 64
  private[this] def analyzeData(sc:SparkContext, inputDef: InputDefinition, fileUri: String, delim: Char): SparkAnalyzer = {
    val hdrTuple = retrieveHeaders(inputDef, fileUri, delim)
    if (hdrTuple._2.length >= inputDef.fieldMap.size) {
65
      val rawRDD = sc.textFile(fileUri).filter(_ != hdrTuple._1)
66

67
      // Tuple containing (dimString: String, temporalValue: Date, event: Map[String, Any])
68
      val outRDD = rawRDD.map { line =>
69
        val fields = line.split(delim)
70
        val event = inputDef.fromStringArr(hdrTuple._2, fields)
71
        (inputDef.dimString(event), inputDef.temporal.eventValue[Date](event), event)
72
      }
73
      new SparkAnalyzer(outRDD, inputDef)
74
    } else {
75
      throw new RuntimeException("No files found")
76 77
    }
  }
78 79 80

  /**
   * Retrieves the header row from the first file found in a directory containing delimited flat text files
81
   * @param inputDef - the definition of the inbound events
82 83 84 85 86
   * @param fileUri - the URI from where the file/files can be read
   * @param delim - the column delimiter
   * @return - a tuple containing a string which is the actual header line generally to be used for filtering
   *         and an Array[String] containing the parsed columns for mapping purposes
   */
87
  private[this] def retrieveHeaders(inputDef: InputDefinition, fileUri: String, delim: Char): (String, Array[String]) = {
88 89 90
    val fileSystem = FileSystem.get(new URI(fileUri), new Configuration())
    val fileEntries = fileSystem.listStatus(new Path(fileUri))
    logger.info(s"Searching ${fileEntries.length} files for the proper header with the delimiter '$delim'")
Steven Pisarski's avatar
Steven Pisarski committed
91
    // TODO - try and make more functional
92 93 94 95 96 97 98 99 100 101 102
    var hdrs: Array[String] = null
    var headerLine = ""
    var ctr = 0
    if(fileEntries.length > 0) {
      do {
        logger.info(s"Checking file entry $ctr")
        val reader = new BufferedReader(new InputStreamReader(fileSystem.open(fileEntries(ctr).getPath)))
        headerLine = reader.readLine()
        hdrs = headerLine.split(delim)
        logger.info(s"Header line = \n$headerLine\n headers = $hdrs")
        ctr += 1
103
      } while (hdrs == null || hdrs.length < inputDef.fieldMap.size || ctr < fileEntries.length)
104 105 106 107 108 109
      fileSystem.close()
      (headerLine, hdrs)
    } else {
      (headerLine, hdrs)
    }
  }
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132

  /**
   * Returns a tuple containing the list of sorted dates corresponding, a list of durations
   * and the average of all durations
   * @param dates - the dates to calculate (unsorted)
   * @return - the tuple
   */
  private[analysis] def getDurations(dates: Seq[Date]): (Seq[Date], Seq[Double], Double) = {
    val sorted = dates.sortBy(x => x.getTime)

    val durations = {
      def loop(dates: Seq[Date], prevDate: Date, acc: List[Double]): Seq[Double] =
        if (dates.isEmpty) acc
        else {
          val duration = if (prevDate.getTime == 0) 0d else (dates.head.getTime - prevDate.getTime).toDouble
          loop(dates.tail, dates.head, acc.:+(duration))
        }
      loop(sorted, new Date(0), List())
    }
    val avg = if (durations.size > 1) durations.sum / (durations.size - 1) else 0d
    assert(sorted.size == durations.size)
    (sorted, durations.toSeq, avg)
  }
133 134 135 136
}

/**
 * Performs analysis on a given group of csv files
137
 * @param data - the data to analyze
138
 * @param inputDef - the event definition
139
 */
140
class SparkAnalyzer(val data: RDD[(String, Date, Map[String, Any])], val inputDef: InputDefinition)
141
  extends java.io.Serializable {
142

Steven Pisarski's avatar
Steven Pisarski committed
143 144
  private[this] def logger = Logger(LoggerFactory.getLogger("Spark Analyzer"))

145
  /**
146
   * Count of all events in learning set
147
   */
148
  def eventCount(): Long = {
Steven Pisarski's avatar
Steven Pisarski committed
149
    logger.info("Retrieving the number of events that will be analyzed")
150
    data.count()
151 152 153 154
  }

  /**
   * Returns the last event for a given dimensionality
155
   * @return - an RDD where the tuple._1 is the dimension string and tuple._2 is the event payload
156
   */
157
  def lastEventByDim(): RDD[(String, Map[String, Any])] = {
158
    logger.info("seedEvents - Retrieving the last known event for each dimensional set")
159 160
    data.groupBy(_._1).flatMap[(String, Map[String, Any])](p => {
      var outVals = ("", Map[String, Any]())
161
      var date = new Date(0)
Steven Pisarski's avatar
Steven Pisarski committed
162
      // TODO - try and make more functional
163 164 165 166 167 168 169
      p._2.foreach(e => {
        if (e._2.getTime > date.getTime) {
          date = e._2
          outVals = (e._1, e._3)
        }
      })
      Seq(outVals)
170 171 172
    })
  }

173 174 175 176
  /**
   * Returns a map of all fact values by fact name
   * @return - the key is the fact name
   */
177
  def factValues(): Map[String, Set[Any]] = {
Steven Pisarski's avatar
Steven Pisarski committed
178
    logger.info("Retrieving the set of fact values by fact name")
179
    val accum = data.context.accumulator(Map[String, Set[Any]]())(KeyValueSetAccumParam)
180
    data.foreach(p => {
181
      val factMap = inputDef.factValues(p._3)
182 183
      val map = factMap.map(f => f._1 -> Set(f._2))
      accum += map
184 185 186 187 188 189 190 191
    })
    accum.value
  }

  /**
   * Returns the number of events received by dimensionality
   * @return - a map where the key is the dimension string and the value is the count of the number of events received
   */
192
  def dimEventsCount(): Map[String, Long] = {
Steven Pisarski's avatar
Steven Pisarski committed
193
    logger.info("Retrieving the number of events for each dimensional set")
194
    data.map(p => p._1).countByValue().toMap
195 196 197
  }

  /**
198 199 200
   * Returns all of the parsed and converted event payloads
   * @return - the events
   */
201
  def events(): RDD[Map[String, Any]] = {
Steven Pisarski's avatar
Steven Pisarski committed
202
    logger.info("Retrieving all parsed event payloads")
203
    data.map(p => p._3)
204 205
  }

206 207 208 209
  /**
   * Returns the training set required for the temporal prediction algorithm
   * @return
   */
210
  def temporalTrainingSet(): RDD[LabeledPoint] = {
Steven Pisarski's avatar
Steven Pisarski committed
211
    logger.info("Retrieving the training set for the temporal prediction algorithm")
212
    if (inputDef.temporal.algoDef.isInstanceOf[SupervisedTraining]) {
213
      val accum = data.context.accumulator(Map[String, List[Date]]())(TemporalDimAccumParam)
214
      data.foreach(p => accum += Map(p._1 -> List[Date](p._2)))
215

216 217 218
      // Determine the duration of each event by dimensionality
      // Key is the dimString and values contain the sorted dates and assocaited duration between events as well as the
      // average duration
219
      val accumMap = accum.value.map(p => {
220
        val d = SparkAnalyzer.getDurations(p._2)
221 222
        p._1 -> (d._1, d._2, d._3)
      }).toMap
223

224
      val dMetrics = durationMetrics(accumMap)
225 226 227 228

      // Build the training set
      val labeledPoints = data.map { p =>
        val datesOption = accumMap.get(p._1)
229
        val features = inputDef.temporalAlgoFeatures(p._3)
230 231 232 233 234 235 236 237 238 239 240 241 242 243
        val vector = new DenseVector(features.toArray)
        if (datesOption != None) {
          val dates = datesOption.get
          val dateIndex = dates._1.indexOf(p._2)
          if (dates._2(dateIndex) == 0 && dates._2.size > 1) {
            LabeledPoint(dates._2.sum / (dates._2.size - 1), vector)
          } else {
            val rnd = Random.nextInt(8)
            if (rnd == 0) LabeledPoint(dMetrics._1, vector) // median
            else if (rnd == 1) LabeledPoint(dMetrics._2, vector) // first quartile
            else if (rnd == 2) LabeledPoint(dMetrics._3, vector) // third quartile
            else if (rnd == 3) LabeledPoint(dMetrics._4, vector) // third quartile
            else LabeledPoint(dMetrics._5, vector) // event range
          }
244
        } else {
245 246
          // This should never happen
          LabeledPoint(0d, vector)
247 248
        }
      }
249 250 251
      labeledPoints
    } else {
      data.context.parallelize(List[LabeledPoint]())
252
    }
253
  }
254

255
  /**
256
   * Returns the fact training set for a given fact
257
   * @return - an RDD of the training sets
258
   */
259
  def factTrainingSet(name: String): RDD[(LabeledPoint, Any)] = {
260
    val fact = inputDef.facts.get(name).get
261 262 263
    fact.algoDef match {
      case algoDef:SupervisedTraining =>
        logger.info("Retrieving the training sets for each fact prediction algorithm")
264
        data.flatMap[(LabeledPoint, Any)](p => {
265
          val features = inputDef.factAlgoFeatures(p._3, fact)
266
          Seq((LabeledPoint(inputDef.fieldMap.get(name).get.mlTrainingValue(p._3), new DenseVector(features.toArray)),
267
            fact.eventValue(p._3)))
268 269
        })
      case _ =>
270
        data.context.parallelize(List[(LabeledPoint, Any)]())
271
    }
272
  }
273

274
  /**
Steven Pisarski's avatar
Steven Pisarski committed
275 276 277 278
   * Retrieving the metrics for durations generally required for determining the first event for the
   * temporal prediction algorithm for dimensional sets that contain very few events to create some type
   * of duration for the first event.
   *
279
   * Tuple containing the mean, median, first and third quartiles, and total duration of all events
280
   */
281
  private[this] def durationMetrics(durations: Map[String, (Seq[Date], Seq[Double], Double)]) = {
Steven Pisarski's avatar
Steven Pisarski committed
282
    logger.info("Retrieving the metrics for durations generally required for determining the first event for the temporal prediction algorithm")
283 284
    var total = 0d
    var count = 0
285 286

    // TODO make more functional
287
    var list = new ArrayBuffer[Double]()
288

289 290
    var earliest = new Date()
    var latest = new Date(0)
Steven Pisarski's avatar
Steven Pisarski committed
291
    // TODO - try and make more functional
292 293 294 295 296 297 298 299 300 301
    durations.foreach(duration => {
      duration._2._2.filter(_ != 0).foreach(d => {
        total += d
        count += 1
        list += d
      })
      duration._2._1.foreach(f = date => {
        if (date.getTime < earliest.getTime) earliest = date
        if (date.getTime > latest.getTime) latest = date
      })
302
    })
303

304 305 306 307 308 309 310 311 312
    val totalDuration = latest.getTime - earliest.getTime
    if (list.size > 2) {
      val sortedDurations = list.sorted
      (
        (total / count).toLong, // mean
        sortedDurations(list.size / 2).toLong, // median
        sortedDurations(list.size / 2 / 2).toLong, // first quartile
        sortedDurations(list.size / 2 + list.size / 2 / 2).toLong, // third quartile
        latest.getTime - earliest.getTime // total duration
313
      )
314 315 316 317 318 319 320
    } else {
      (
        totalDuration / 2, // mean
        totalDuration / 2, // median
        totalDuration / 4, // first quartile
        totalDuration / 4 * 3, // third quartile
        latest.getTime - earliest.getTime // total duration
321
      )
322
    }
323 324
  }
}