Overview

Composer Processor is a processor component aimed at composing and group advanced records, using a number of different streaming or windowing strategies.

FQCN

com.datumize.ddc.component.composer.ComposerProcessor


A composer is used to "accumulate" or group records in a single stop to produce richer data.

Two records are accumulated in the same "bag" if they have the same key.

This component is very versatile and supports a rich set of use cases: such as grouping records to be write in the same file, eliminate duplicates in an stream for a period of time, or in a more advanced case: calculate averages of streaming data in a period of time.

Properties


Common properties that apply to all source and processor components of a pipeline.

PROPERTYIDDESCRIPTION

REQUIRED

TYPE

DEFAULT

EXAMPLES

Common
IdentifierIDComponent unique Identifier within the pipeline, read only, only useful for advanced mode.YesStringAuto

MyComponent_23423

This Identifier is automatically generated by the system and you can't change it. Might be helpful for advanced pipeline configuration.

DescriptiondescriptionA short description for the component, to be visualized inside the component representation, aimed at providing additional information to understand the pipeline at a glance.NoString

Extract customer id and loyalty number.

Short and sweet description.

TopictopicAll Source and Processor components support a topic to tag the output records. If the record does not have a tag applied, this topic will be automatically applied. The topic may be used by future components to route, group or classify records.NoString

foo

All output records will be tagged using "foo", unless they have not been tagged during the execution of the step.

PROPERTYIDDESCRIPTIONREQUIREDTYPEDEFAULT
Default

Rules

rulesStreaming rules to group recordsYesList

Window

windowWindow definitionNoWindow

Time-rate

time-rateTime rate definition. Useful to slow down the stream under certain limits. ex: No more than 10 records per minute...NoTime-Rated

Key extractor

key-extractor

Defines the key used to group records. All records with same key will be group and combined.

No

See Extractor section.

FixedExtractor

Value extractor

value-extractorValues that will be group.No See Extractor section.NoExtractor

Header

header

Asset that will be added at the beginning of the output record

No

Asset


Footer

footerAsset that will be added at the end of the output recordNoAsset

Combiner

combinerCombine all the records that has been grouped togetherNoSee Combiner section

Rule

Define if a group of records can be considered closed so it can be "propagated" to the next steps in the pipeline.

It also decides if the new element have to be kept or discarded. In advances usage the value can be manipulated as well and store calculated values.

Streaming vs Windowing

Composer tries to group the incoming records in two independent ways: in streaming mode and in windows.


Streaming

Every record in the composer is checked agains all rules defined in the rules property.

Rules are evaluated in order and the evaluation process is stopped at the first rule that "decides" that the group must be closed, rules after that rule are not executed at any circumstance.


Windowing

Windows are a period of time and a rule that defines if the group of elements must be closed and propagated or discarded.

The rule within a window is not executed for every incoming record, it is executed based on time and independtly if there is a streaming of data or not.


There are 3 types:

Sliding: Fixed-size, windows that work on differences between record timestamps

Tumbling: Fixed-size, non-overlapping, gap-less windows

Session: Dynamically-sized, non-overlapping, data-driven windows


Tumbling

Tumbling time windows are  windows based on time intervals. They model fixed-size, non-overlapping, gap-less windows. A tumbling window is defined by a single property: the window’s size. Since tumbling windows never overlap, a data record will belong to one and only one window.



Sliding

Sliding fixed time windows that start when a new key (element with that key) arrives, all elements from that moment until windows is close (when the fixed time is reached) will be go in the same "bag". Since sliding windows never overlap, a data record will belong to one and only one.


Session

Session windows aren't time fixed. A session window is closed after a period of time without new elements with the same key. Since sliding windows never overlap, a data record will belong to one and only one.

Streaming rules

The following compatible implementations are provided with the DDC.

By Size

When the content reaches a certain amount of byte, the group is closed and propagated.

PROPERTYIDDESCRIPTIONREQUIREDTYPEDEFAULT

max-size

max-sizeMaximum size of the content as a capacityYesInteger

By Records

When the content contains a certain amount of records, the group is closed and propagated.

PROPERTYIDDESCRIPTIONREQUIREDTYPEDEFAULT

max-records

max-recordsMaximum number of records to compound a fileYesInteger


No Duplicates


New elements are discarded when an element with the same key is present. This use case needs to be working together with a Window.

Script

Totally customizable streaming rule. Behavior is defined by the provided script. See SDK chapter and scripting in rules chapter below for more and detailed information.

Window

Define the windows behavior when it is closed

PROPERTYIDDESCRIPTIONREQUIREDTYPEDEFAULT
IntervalintervalSize of the windowsYesDuration
DelaydelayExtra time to accept delayed messagesNoDuration0
TypetypeType of the windows. Accepted values TUMBLING, SESSION, SLIDGINGNoWindowTypeTUMBLING
Groupgroup
NoBooleantrue
PrecisionprecisionMinimum precision. More precision means more resources consumed, but more accurasy. Actually the real interval = interval + precisionNoDuration1s
Custom ruleruleWhen custom rule is provided, group property is ignored just because the responsability to group or not goes to the scriptNoScriptRuleWindowImpl


Window rules

Script

Totally customizable window rule. Behavior is defined by the provided script. See SDK chapter and scripting in rules chapter below for more and detailed information.

Rules & scripting

See SDK chapter for more information and context


PROPERTYIDDESCRIPTIONREQUIREDTYPEDEFAULT

Parameters

parametersStreaming rules to group recordsNoList

Language

languageScript languageYesStringgroovy

Script

scriptScript to decide if group must be closedyesString

Script bootstrap

script-bootstrap

Script executed when rule is initialized

No

String


Dependencies

dependenciesDependencies libraries to be used in the scriptsNo List

Special bindings

Apart of the normal bindings, rules some more.

Global variable

Type

Description

metadataMap

Contains the metadata related to this group of elments. Contains the group key, the lastElement timestamp and when timestamp of the first element.

It is writable, so rule can write cutom values on it. 

lazyValue

Lazy ListAccess to the elements that are currently in the group. Values are not directly provided. To get them lazyValue.get() is needed
factoryRuleResultFactoryFactory to produce rule result. See next chapter for more information.
isWindowsExpiredBoolean(Only window) if window is expired or not.


How it works

When a rule is executed some behaviors can be defined: group or not, discard or not the current grouped elements and append or not the new element (only for streaming rules). Order of those actions it is also important: it is not the same append and group, than group and append, first the new element is included in the group generated, in second case not, but it is kept for next windows.

A rule must return a RuleResult value. This object specifies what has to be done for the current group and if it is a streaming rule, the new incoming element.


The RuleResult is created using the factory provided as a binding.

The factory has a bunch of methods to specify what to do:

MethodParametersDescription

groupAndKeep

group, toKeepElements provided as argument must be grouped and propagated, objects provided in the second parameter must be kept (to keep it must be a list)

groupAndDiscard

groupElements provided as argument must be grouped. All other elements are discarded

notGroupAndKeep

toKeepNot  group but keep the elements provided (to keep it must be a list)

groupAndDiscardAll

-Group all elements and then remove them.

groupAndAppend

-Close current group and keep create a new group with the new element

discardAndAppendAndGroup

-Discard current elements and close the group with the new element on it.

appendAndGroup

-Add the new element in the group and close it but keep all of them for the next group

appendAndGroupAndDiscard

-Add the new element in the group and close it and not keep them for the next group

groupAndDiscardAndAppend

-Close the current group of elements, discard them and add the new one for the next group.

notGroupAndAppend

-Not close the group and append the new element

notGroup

-Not group and discard new element

notGroupAndDiscardAll

-Not group and discard all the current group


Streaming rule that group elements with same key in groups of 2.

def count = metadata.getElementsCount();
if (count==0) { 
	return factory.notGroupAndAppend();
} else if (count == 2) { 
	return factory.appendAndGroupAndDiscard();
} else { 
	return factory.notGroupAndAppend();
}
GROOVY

Assets

The following compatible Asset implementations are provided with the DDC.

StringAsset

Asset as a plain String.

Name

Description

Type

Required

Default

content

Content of the asset as a String

String

yes


charset

Charset

Charset


UTF-8

add-pre-eol

Add and end of line character before the content

Boolean


false

add-post-eol

Add and end of line character after the content

Boolean


true

end-of-line

End of line character. Dec unicode ex: 10. Or the character itself '\n'

Character


System line separator

FileAsset

Extract the asset content from an existing file.

Name

Description

Type

Required

Default

path

Path of the asset from which to extract the content

Path

yes



Examples

Group everything in groups of 2

This groups everything in groups of 2, it groups everything because key-extractor is not set, ergo the default value is used which means that FixedKeyExtractor will be used. FixedExtractor gives to all objects the same key.

{
	"step-type": "processor",
	"id": "composer",
	"class" : "com.datumize.ddc.component.composer.ComposerProcessor",
	"rules" : [
    	{
        	"class" : "com.datumize.ddc.component.composer.rules.streaming.ByRecords",
			"max-records" : 2
		}
	]
}
JS

All groups will be of 2 always. If the stream of elements is stopped and there is 1 element pending to be grouped it will be waiting to do a group forever.

Group everything in groups of 2 but no more than 5min

Similar to the previous example, but we don't want one having an element getting stuck more than 5 minutes. 

To avoid stuck elements it is needed to introduce a window.

{
	"step-type": "processor",
	"id": "composer",
	"class" : "com.datumize.ddc.component.composer.ComposerProcessor",
	"rules" : [
    	{
        	"class" : "com.datumize.ddc.component.composer.rules.streaming.ByRecords",
			"max-records" : 2
		}
	],
	"window" : {
		"class" : "com.datumize.ddc.component.composer.ComposerProcessor$Window",
		"interval" : "5m",
		"type" : "SLIDING"
	}
}
JS

The difference between the previous example and this one is that if the stream is stopped and no more elements come, the last one that is stuck waiting for its partner, it will be stuck no more than 5 minutes due to the window which will close after 5 minutes from when the first element arrives.

Group everything in 5min windows

IN this example lets do groups of N elements in 5 min windows.

{
	"step-type": "processor",
	"id": "composer",
	"class" : "com.datumize.ddc.component.composer.ComposerProcessor",
	"rules" : [
		{
        	"class" : "com.datumize.ddc.component.composer.rules.streaming.Append"
		}
	],
	"window" : {
		"class" : "com.datumize.ddc.component.composer.ComposerProcessor$Window",
		"interval" : "5m",
		"type" : "SLIDING"
	}
}
JS

All elements are appended unconditionally to current opened group. After 5 minutes since the first element of the opened group are added, the group is closed and propagated.


Group everything in 5min windows started at "o'clock"

Lets do groups of N elements in 5 min windows, but "beautiful" windows. Previous examples a window started when the fist element arrives and it is close after 5min. 

If first element arrives at 13:02:55 it will be closed at 13:07:55. To close windows at "o'clock" use TUMBLING windows, which means that will be close every 5min but starting at 13:00:00, 13:05:00, 13:10:00 ...

{
	"step-type": "processor",
	"id": "composer",
	"class" : "com.datumize.ddc.component.composer.ComposerProcessor",
	"rules" : [
		{
        	"class" : "com.datumize.ddc.component.composer.rules.streaming.Append"
		}
	],
	"window" : {
		"class" : "com.datumize.ddc.component.composer.ComposerProcessor$Window",
		"interval" : "5m",
		"type" : "THUMBLING"
	}
}
JS

Group everything after 5min without new elements

This is the concept of Session. A sliding window starts to count from the first element being added the the group, and the session is closed when some time has passed after the last element was added to the group.

{
	"step-type": "processor",
	"id": "composer",
	"class" : "com.datumize.ddc.component.composer.ComposerProcessor",
	"rules" : [
		{
        	"class" : "com.datumize.ddc.component.composer.rules.streaming.Append"
		}
	],
	"window" : {
		"class" : "com.datumize.ddc.component.composer.ComposerProcessor$Window",
		"interval" : "5m",
		"type" : "SESSION"
	}
}
JS


Remove duplicates during 1min

If a source is producing repeated values and they are polluting the data stream the best solution is use the NoDuplicates rule plus a desired window. 

That rule will propagate the fist occurrence of an element and it will just ignore the rest until the window rule remove it, next element will be propagated again.


{
	"step-type": "processor",
	"id": "composer",
	"class" : "com.datumize.ddc.component.composer.ComposerProcessor",
	"rules" : [
		{
        	"class" : "com.datumize.ddc.component.composer.rules.streaming.NoDuplicates"
		}
	],
	"window" : {
		"class" : "com.datumize.ddc.component.composer.ComposerProcessor$Window",
		"interval" : "1m",
		"window-close-strategy" : "DISCARD",
		"type" : "SLIDING"
	}
}
JS

Note the group : false, to indicate that we want to not propagate the groups when the windows is closed.

No more than 10 elements per group AND no more than 100k and not older than 1minute

More than one streaming rule can be defined.

{
	"step-type": "processor",
	"id": "composer",
	"class" : "com.datumize.ddc.component.composer.ComposerProcessor",
	"rules" : [
		{
        	"class" : "com.datumize.ddc.component.composer.rules.streaming.ByRecords",
			"max-records": 10
		},
		{
			"class" : "com.datumize.ddc.component.composer.rules.streaming.BySize",
			"max-size" : "100k"
		}
	],
	"window" : {
		"class" : "com.datumize.ddc.component.composer.ComposerProcessor$Window",
		"interval" : "1m",
		"type" : "SLIDING"
	}
}
JS

Custom streaming rule

It is possible to write a totally custom rule.

The example below is a custom stream rule that group elements in groups of 3.

{
	"step-type": "processor",
	"id": "composer",
	"class" : "com.datumize.ddc.component.composer.ComposerProcessor",
	"rules" : [
		{
 			"class" : "com.datumize.ddc.component.composer.rules.streaming.ScriptRule",
			"script" : "def count = metadata.getElementsCount();
						if (count!=2) { //if there aren't 2 element just append the incoming element to the open group
							return factory.notGroupAndAppend(); 
						} else { //count ==2, we append the incoming element to the group, that already has 2 elements. We group, so it has 3, and we do not keep it. So next group will be opened empty
							return factory.appendAndGroupAndDiscard();
						}"
		}
	]
}
CODE

Note: This is not a valid JSON. \n are added to improve readability.

Note 2: This could be combined with a Window, like the other examples.

Grouping elements by key

An important concept to not mix elements, is the key concept. All examples until this point has used default keyExtractor which put all elements in the same bag. 

Imagine you want to group events from different sources in groups of 5, without setting the key Extractor events from different sources will be group together and this is not the desired behavior.

Each event is a map with keys "origin", "v" and "ts", origin identifies the source that generates the value in "v" at timestamp stored in "ts"

{
	"step-type": "processor",
	"id": "composer",
	"class" : "com.datumize.ddc.component.composer.ComposerProcessor",
	"key-extractor" : {
		"class" : "com.datumize.core.converter.extractors.KeyMapExtractor",
		"path" : "origin"
	}
	"rules" : [
    	{
        	"class" : "com.datumize.ddc.component.composer.rules.streaming.ByRecords",
			"max-records" : 5
		}
	]
}
JS

In this example, Maps are represented in "JSON" format, the event { origin : sensorA, v : 44, ts: 20201255145522 } and the event { origin : sensorB, v : -5, ts: 20201255145525 }  hese won't be grouped in the same bag, because both comes from different origins. event { origin : sensorA, v : 45, ts: 20201255145642 } will be grouped with the first event.

Calculating a 5min average 

Imagine a stream of Map elements with the keys "k" and "v". "k" represents the key value and "v" an integer value. 

Below example calculates the 5min AVG of all element with the same "k"

The approach to do it consistly and add all the elements with the same key in the streaming rule, then, when the window is closed after 5min from the first element arrived, the average is calculated and propagated doing the sum value calculated in the streaming rule / # of elements.

{
	"step-type": "processor",
	"id": "composer",
	"class" : "com.datumize.ddc.component.composer.ComposerProcessor",
	"key-extractor" : {
		"class" : "com.datumize.core.converter.extractors.KeyMapExtractor",
		"path" : "k"
	},
	"value-extractor" : {
		"class" : "com.datumize.core.converter.extractors.KeyMapExtractor",
		"path" : "v"
	}
	"rules" : [
    	{
        	"class" : "com.datumize.ddc.component.composer.rules.streaming.ScriptRule",
			"script" : "def current = 0;
						def list = lazyValues.get(); //get the current value of th sum. Remember that lazyValues is always a list
						if (list != null) { 
							current = list.get(0);
						}
						metadata.incElementsCount(); //increase the number of elements processed in the current group
						def sum = current + input; //sum the current summatory value with the new incoming element
						log.info('sum = ' + sum); // just log this temporary value in the DDC log file
						return factory.notGroupAndKeep([sum]);" //we don't want to group, as we are just accumulating values. toKeep values must be a List always. [sum] creates a list with sum
		}
	],
	"window": {
		"class" : "com.datumize.ddc.component.composer.ComposerProcessor$Window",
		"interval" : "5m",
		"type" : "SLIDING",
		"rule" : 
			{
				"class" : "com.datumize.ddc.component.composer.rules.window.ScriptRuleWindowImpl",
				"script" : "if (!isWindowsExpired) { //windows rule is called every shor time. binding isWindowsExpired tells you if window has expired or not.
								log.info('no expired');
								return factory.notGroup(); //as window is still open, we do not group.
							}
							log.info('expired');
							def current = lazyValues.get(); //get the accumulated value. Remember is a lazyValue so get is needed. Remember too that current is a list.
							if (current == null) {
								return factory.notGroup(); 
							}
							def count = metadata.getElementsCount(); //get the total element that has been grouped
							log.info('count' + count);
							return factory.groupAndDiscard([current.get(0)/count]);" //group the summatory of all elemets splited but the number of them
			}
	}
}
JS

Note: This is not a valid JSON. \n are added to improve readability.

The AVG also could be tackled with another approach. Using Append as streaming rule, then implement a custom window rule that when isWindowExpred gets all the values stored, loop them accumulating the values and at the end split it by the # of them. This is a correct approach as well but for a big amount of elements it is less performant.

In this example the average is calculated but with a few more changes we could calculate the MAX, the MIN...