Commit 61f81814 authored by Steven Pisarski's avatar Steven Pisarski

Added HDFS resource loader for retrieving velocity templates from HDFS

parent da0dae9f
......@@ -5,6 +5,8 @@ import java.io.InputStream
import com.cablelabs.eventgen.akka.JMSRouteType
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import org.apache.velocity.VelocityContext
import org.apache.velocity.app.Velocity
import org.json4s.JsonInput
import org.json4s.native.JsonMethods._
......@@ -187,8 +189,13 @@ object EventUtil {
out += new OutputDefinition(config.prototcol, config.host, config.port, routeType, config.name,
fieldSet.toSet, OutputEventFormatters.jsonFormat)
case _ =>
Velocity.init(OutputEventFormatters.velocityProps)
val ctx = new VelocityContext()
val template = Velocity.getTemplate(config.format)
val fmt = OutputEventFormatters.velocityFormat(ctx, template)
out += new OutputDefinition(config.prototcol, config.host, config.port, routeType, config.name,
fieldSet.toSet, OutputEventFormatters.velocityFormat(config.format))
fieldSet.toSet, fmt)
}
})
out.toSet
......
......@@ -3,8 +3,7 @@ package com.cablelabs.eventgen.model
import java.io.StringWriter
import java.util.Properties
import org.apache.velocity.VelocityContext
import org.apache.velocity.app.Velocity
import org.apache.velocity.{Template, VelocityContext}
import scala.util.parsing.json.JSONObject
......@@ -21,7 +20,8 @@ object OutputEventFormatters {
def velocityProps = {
val props = new Properties()
props.put("resource.loader", "class, file")
props.put("resource.loader", "hdfs, file, class")
props.put("hdfs.resource.loader.class", "com.cablelabs.eventgen.velocity.HdfsResourceLoader")
props.put("class.resource.loader.class", "org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader")
props.put("file.resource.loader.class", "org.apache.velocity.runtime.resource.loader.FileResourceLoader")
props.put("file.resource.loader.path", ".")
......@@ -32,21 +32,18 @@ object OutputEventFormatters {
props
}
Velocity.init(velocityProps)
val ctx = new VelocityContext()
/**
* Applies the data to a velocity template
* @param templateUri - Velocity template either located in the classpath or filesystem
* @param ctx - Velocity context
* @param template - Vela
* @return - the data values applied to the template
*/
// TODO - figure out how to allow for templates to be retrieved from HDFS
def velocityFormat(templateUri: String): Map[String, Any] => Any = { event =>
def velocityFormat(ctx: VelocityContext, template: Template): Map[String, Any] => Any = { event =>
event.foreach(e => {
ctx.put(e._1, e._2)
})
val writer = new StringWriter()
val template = Velocity.getTemplate(templateUri)
template.merge(ctx, writer)
writer.toString
}
......
package com.cablelabs.eventgen.velocity
import java.io.InputStream
import java.net.URI
import org.apache.commons.collections.ExtendedProperties
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.velocity.runtime.resource.Resource
import org.apache.velocity.runtime.resource.loader.ResourceLoader
/**
* For loading Velocity templates from HDFS.
*/
class HdfsResourceLoader extends ResourceLoader {
var lastModified = System.currentTimeMillis()
override def init(configuration: ExtendedProperties): Unit = {}
override def isSourceModified(resource: Resource): Boolean = false
override def getResourceStream(source: String): InputStream = {
try {
val fs = FileSystem.get(new URI(source), new Configuration())
lastModified = fs.getFileStatus(new Path(source)).getAccessTime
fs.open(new Path(source))
}
catch {
case e:Exception => null
}
}
override def getLastModified(resource: Resource): Long = lastModified
}
package com.cablelabs.eventgen.model
import com.cablelabs.eventgen.UnitSpec
import org.apache.velocity.VelocityContext
import org.apache.velocity.app.Velocity
import org.json4s.StreamInput
/**
......@@ -148,7 +150,10 @@ class EventUtilTest extends UnitSpec {
assert(61616 == outputDef.port)
assert("changeme1" == outputDef.name)
val events = Map[String, Any]("one" -> 2)
assert(OutputEventFormatters.velocityFormat("test1.vm")(events) == outputDef.formatter(events))
Velocity.init(OutputEventFormatters.velocityProps)
val ctx = new VelocityContext()
val template = Velocity.getTemplate("test1.vm")
assert(OutputEventFormatters.velocityFormat(ctx, template)(events) == outputDef.formatter(events))
outputDef.fields.foreach(f => {
f.name match {
case "eventDate" =>
......
......@@ -6,6 +6,8 @@ import java.util.Date
import com.cablelabs.eventgen.UnitSpec
import com.cablelabs.eventgen.akka.JMSRouteType
import org.apache.velocity.VelocityContext
import org.apache.velocity.app.Velocity
import org.json4s.StreamInput
import org.json4s.native.JsonMethods._
......@@ -132,7 +134,10 @@ class OutputDefinitionTest extends UnitSpec {
}
test("generate method should apply the data to a velocity template") {
val od = new OutputDefinition(protocol, host, port, routeTypeQueue, destName, fields, OutputEventFormatters.velocityFormat("OutputDefSpec.vm"))
val ctx = new VelocityContext()
val template = Velocity.getTemplate("OutputDefSpec.vm")
val od = new OutputDefinition(protocol, host, port, routeTypeQueue, destName, fields,
OutputEventFormatters.velocityFormat(ctx, template))
val convertedValue = od.convert(event.toMap).asInstanceOf[String]
assert(convertedValue != null)
println(convertedValue)
......
......@@ -4,7 +4,7 @@ outputDefs:
port: 61616
routeType: topic
name: CMHealthXML
format: testData/cm/definition/amdocs-cm.vm
format: hdfs://bda-hdfs01/tmp/cm/definitions/amdocs-cm.vm
fields:
- name: EventTimeStamp
description: temporal field
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment