Cook Processor is a processor component aimed at cooking or customizing the data transformation using a script. 

FQCN

com.datumize.ddc.component.cook.CookProcessor

Overview

The Cook Processor is the processor that transforms pipeline data using a customized script. This is a very effective way of extending the product to support rich features and specific project requirements such as ad-hoc metric calculation, custom business rules or data formats. The Cook Processer uses scripting languages supported by the Java Virtual Machine, as defined in JSR 223. When writing a script in a compliant language, you are granted access to the entire standard Java library. See Using Java from Scripts for advanced documentation on how scripting languages interact with the JVM.

The following scripting languages are supported:

  • Groovy: this is the default scripting language and is fully supported. According to Apache Groovy, it is a "powerful, optionally typed and dynamic language, with static-typing and static compilation capabilities, for the Java platform aimed at improving developer productivity thanks to a concise, familiar and easy to learn syntax."
  • Javascript: optional scripting language, not enabled by default, based on the Rhino engine.
  • Python: optional scripting language, not enabled by default, based on the Jython engine.

The following code excerpt represents a common use case for scripting: out of the captured data in different formats, the script selects and creates relevant output variables that will be integrated into another system for further analytics. The example below comes from SNMP data extracted from a Wi-Fi device, including MAC addresses for connected clients. It works with a variable <input> that represents that input payload as wrapped in a record, and supports another variable <output> that will be automatically mapped to the output record. This simple mechanism, together with the scripting language capabilities, allows for very dynamic and powerful transformations. 

// Wi-Fi example: from a SNMP endpoint we got a bunch of fields.
// But on a Kafka target we only need some fields when MAC is not null.
// Time is converted into milliseconds
if(input.ap_mac!=null) {
	output.ts = System.currentTimeMillis();
	output.ap_mac = input.ap_mac;
}
GROOVY

The scripting engine runs in the same JVM that is executing DDC. That specifically means you need to be careful with the script: performance might be affected in case of a malformed code. The scripting code runs inside an isolated ClassLoader, only the JRE classes and provided classes via dependencies can be used. 

Properties


Common properties that apply to all source and processor components of a pipeline.

PROPERTYIDDESCRIPTION

REQUIRED

TYPE

DEFAULT

EXAMPLES

Common
IdentifierIDComponent unique Identifier within the pipeline, read only, only useful for advanced mode.YesStringAuto

MyComponent_23423

This Identifier is automatically generated by the system and you can't change it. Might be helpful for advanced pipeline configuration.

DescriptiondescriptionA short description for the component, to be visualized inside the component representation, aimed at providing additional information to understand the pipeline at a glance.NoString

Extract customer id and loyalty number.

Short and sweet description.

TopictopicAll Source and Processor components support a topic to tag the output records. If the record does not have a tag applied, this topic will be automatically applied. The topic may be used by future components to route, group or classify records.NoString

foo

All output records will be tagged using "foo", unless they have not been tagged during the execution of the step.

PROPERTYIDDESCRIPTIONREQUIREDTYPEDEFAULTEXAMPLES
Default
LanguagelanguageScripting language to be used in all the operation* propertiesYesStringgroovyjavascript
ScriptoperationScript code to execute. This code will be executed at every iteration.YesString

Bootstrap scriptoperation-bootstrapScript code to initialize the resources.  This code will be executed on step initialization phase. If thread-safe is enabled, this code will be executed only one time, otherwise, it will be executed one time per each thread executing this step.NoString

Shutdown scriptoperation-shutdownScript executed in DDC shutdown phase. NoString

Parametersoperation-parametersOperation parameters to use with the script. This values are accessible within the script using the binding "conf"NoParameter List
param1=foo,bar;param2=zid
DependenciesdependenciesList of external dependencies that have to be added to scripts classpath. Must be a list of path. See also BundlesNoList

Advanced
Propagate input mapkeep-inputInput keys are propagated to output map. Only if input object is a Map, otherwise this value is ignoredNoBoolean

Thread safethread-safeTells to the system if context can be shared across the threads executing this step or not. If enabled, all threads executing this step will share the same context which means that race condition can occur if classes there are not prepared.No Booleanfalse


Scripting guidelines

Binding variables

Script communicates between the DDC pipeline outside the script via binding variables. Bindings can be accessed within the script like a normal variable that is already defined.

Global variableTypeDescription
inputAnyRepresents the input data that is coming from the previous component in the pipeline.
outputMap|Any*

Used to return a value to the outside pipeline. 

confMap

Contains static and non updateable configurations provided via pipeline configuration. Expressed as a map, i.e mykey=value11. This configuration allows for additional flexibility in the script.

log

Log4j LoggerProvides possibility to write to ddc log file. ex: log.info('something happened')
contextMapContext is a Map kept between script executions. More info in Context chapter.

Context and initialization

Contex is a key-value storage in memory that can be used to share instances and objects between executions. The context is created when the component is instantiated with the minimal useful object on it. Before any execution of the component the context can be "extended" with the some code scripted on a property(for instance operation-boostrap on cookprocessor or script-bootstrap in scriptCombiner) .

When DDC is stopped the context is destroyed, just before that happens a "operation-shutdown" script is executed, so as to be able to release or gratefully destroy anything there. This is only available in cookProcessor.

Context can be accessed with the keyword context. It can be accessed in the same way as a Map, In groovy, value=context.key to obtain the value corresponding to key and context.key=value to set.

Common use case of the context is to avoid initializing a heavy class. Imagine the powerful, but slow to instantiate class com.datumize.Powerful that has a method doIt(x) that must be called at each Cook Processor execution. The way to tackle this is instantiating the com.datumize.Powerful on operation-boostrap property, keep it on the context, and read from context on every Cook Processor execution.

operation|script-bootstrap

import com.datumize.Powerful

def power = new Powerful() 
context.power=power;
GROOVY

operation|script

def result = context.power.doIt(input)

ouput.value = result
GROOVY

On this example com.datumize.Powerful is only instantiated once, so resources are not wasted.

Classloader Isolation & Dependencies

Objects created in scripting components can be used in other steps using his methods. 

Reflection will not work as expected because classloaders will be different

Every script runs in an isolated class loader, that means that the classes available to be used are limited to the classes available and JRE and scripting builtin.

This feature avoid problems between different versions of same class. That means that in the same pipeline different script component can use the same library xyz but different version without worrying about unexpected behaviors due to different versions.

As this is not enough, there is an easy way to provide external libraries to be used in the script. All jars provided via dependencies property are loaded in the independent component script class loader so classes on it can be used in the script


Examples

This example represents a simple computation that uses all available global variables.

// Simple computation
// Compute the sum of input (in 'count') plus a configured value ('sum')
// The result to be stored in 'my_result'
// Configuration: sum=55
output.my_result = input.count + Integer.valueOf(conf.sum(0));
log.info('the value is' + output.my_result);
GROOVY


This other example is slightly more complex because it uses standard Java classes (such as System) to enrich the output data. Please note that the output data fields need to be explicitly declared: the transformation does not copy input fields to output fields by default.

// Simple filtering + mapping + conversion
// Source data coming from SNMP Wi-Fi
// Target data structured for a Kafka message
if (input.ap_mac != null) {
	// Use standard Java classes and methods
	output.ts = System.currentTimeMillis();
	// Output fields are not automatically copied from input
	output.ap_mac = input.ap_mac;
	output.mac = input.mac;
	output.rssi = input.rssi;
	output.noise = input.noise;
	// Simple conversions: seconds to milliseconds
	output.lastSeen = ((long)input.last_seen*1000);
	output.latestAssocTime = ((long)input.latest_assoc_time*1000);
}
GROOVY


This is a fairly complex transformation example for a web service representing a booking in the Open Travel (OTA) format.

// OTA example: Fields extraction from a OTA booking WS
// Output will contain: 
//  - request content type
//  - ip src
//  - ip destination
//  - request date
//  - request hour
//  - request minute
//  - check in date
//  - checkout date
// The input has been previously parsed from a TCP/HTTP network communication
// Input contains:
//  - request: request parsed content in XML format
//  - response: response  parsed content in XML format
//  - httpcomm: HTTP communication metadata

// Variables created
// rq and rs are maps
// httpCom is a HTTPCommunication obj
// trans is a TransportCommunication (this object contains Transportation OSI layer stuff)
// net is a NetworkCommunication (this object contains Network OSI layer stuff)
rq = input.request; 
rs = input.response; 
httpCom = input.httpcomm; 
trans = httpCom.getTransport(); 
net = trans.getNetworkCommunication(); 

// Output network information
// Request HTTP header extracted for content type
// Origin and destination IP address available as IP/TCP/HTTP metadata.
output.contentType = httpCom.getRequest().getHeaders().get('content-type'); 
output.ipSrc = net.getSrcIp(); 
output.ipDst = net.getDstIp(); 

// Output timestamps
// Real time-stamping: extract first packet timestamp and convert it into a Date object
rqDate = new Date(net.getRqIniTs()).toInstant().atZone(ZoneId.of('UTC')); 
// Extract the date in a nice format
output.requestDate = LocalDate.from(rqDate.toLocalDate()).toString(); 
output.requestHour = LocalDateTime.from(rqDate.toLocalDateTime()).getHour(); 
output.requestMinute=LocalDateTime.from(rqDate.toLocalDateTime()).getMinute(); 

// Uses Matryoska (external library provided via dependencies) to query Java Maps using pseudo xpath
// Not required but simplifies much the code
rqContent=new Matryoshka(rq); 
// Extract the fields from XML request content.
output.checkIn = rqContent.get('AvailabilityRequest/CheckInDate').value();  
output.checkOut = rqContent.get('AvailabilityRequest/CheckOutDate').value();
GROOVY