DDC policies define the runtime behaviour for pipelines and streams. Concurrency, scheduling, execution, batching, buffering and error handling are some of the runtime aspects to be configured through policies.

Overview

Configuring DDC policies is an advanced topic. It offers maximum versatility for shaping pipelines and streams for differing usage contexts.  

You can define policies that control the behaviour of components and streams that compose the pipeline. Policies are very granular to support a wide variety of use cases and deployment models: underlying hardware can be a tiny device, desktop computer where minimal resources should be used up, or a powerful server with multiple cores and vast memory. Also, you should keep in mind that DDC runs in a Java Virtual Machine (JVM): the JVM configuration is modeled after a policy as well, for maximum control over the runtime behaviour. 

Pipeline Policies

Execution policy

The execution policy specifies how to execute the component. The following table explains the available properties.

Property

ID

Description

Type

Default

Execution policytype

Execution policy type.

  • ONDATA: the component will only be executed if any of the input streams have any data available.
  • ALWAYS: the componen will always be executed, independently of the data available in input streams.

ONDATA

ALWAYS

ONDATA
Flushflush-after-dataStates whether the component should be executed once more (flush) after input data is exhausted.Booleanfalse


Scheduling policy

The scheduling policy specifies when to execute the component, how much data to process at once, and how many threads will be associated. The following table explains the available properties.

Property

ID

Description

Type

Default value

Scheduling policytype

Scheduling policy type:

  • SLEEP: after executing the component, the next execution will wait until the duration parameter.
  • FIXED_RATE: the component is always executed at intervals defined by duration. It could happen that previous component execution has not finished yet.
  • CRON: use a Unix cron expression. Similar to FIXED_RATE, but supports a richer set of events. 

SLEEP

FIXED_RATE

CRON

SLEEP
DurationdurationScheduling duration, used only for SLEEP and FIXED_RATE schedulings.Duration0s
Cron expression cron

Cron scheduling expression. Similar to FIXED_RATE but supports richer activation options. The cron expression is evaluated in local time zone.

Cron Expression* * * * *
Batch sizebatchNumber of records that will be processed at every activation.Integer (>0)1
ThreadsthreadsNumber of concurrent threads that will be created for a component.Integer (>0)1


Error policy

The error policy specifies what to do when an error occurs while executing the component. The following table explains the available properties.

Property

ID

Description

Type

Default value

Error policytype

Error policy type:

  • DISCARD: the record (or batch of records) are just discarded and logged.
  • RETRY: the record (or batch of records) are retried a number of times specified in max-retries.

DISCARD

RETRY

DISCARD
Maximum retriesmax-retriesMaximum number of retries for RETRY error policy.Integer0


Stream Policies

Streams are buffers with a limited capacity and read/write behaviour depending on the kind of access. A stream write/ready policy will affect all components writing/reading that stream. The following table explains the available properties.

Property

ID

Description

Type

Default value

Maximum capacitycapacityStream maximum capacity.Integer (>0)10240
Read timeouttimeout-readRead policy for readers.Timeout policyFOREVER / 0s
Write timeouttimeout-writeWrite policy for writers.Timeout policyFOREVER / 0s

A stream has both a read policy and a write policy that defines the runtime behaviour towards the waiting time. The Timeout policy has following properties:

Property

Parameter

Description

Type

Default value

TimeouttypeTimeout type. 

IMMEDIATE

FOREVER

TIMED

FOREVER
Wait timeouttimeoutTimeout duration. Only valid for TIMED type.Duration0s

Read timeout behaviour:

  • IMMEDIATE: records are read from the stream immediately. If the stream is empty, no records will be returned and the component will be executed.
  • FOREVER: blocking version. Retrieves records from the stream, waiting up forever for records to become available.
  • TIMED: blocking version with a maximum timeout. Retrieves the records from the stream, waiting up to the specific wait time for records to become available. If after that time no records are available, no records will be returned and the component will be executed.

Write timeout behaviour:

  • IMMEDIATE: write records into the stream regardless of how many records are already stored in the stream. If the maximum capacity is exceeded when writing, an exception will be thrown and records will be discarded.
  • FOREVER: blocking version. Write the records into the stream, waiting up for space to become available.
  • TIMED: blocking version with timeout. Write the records into the stream, waiting up to the specific wait time for space to become available. If after that time no space is available, an exception will be thrown and records will be discarded.


DDC Instance Policy

The Java Virtual Machine (JVM) that runs DDC is managed through a policy.

This policy is formatted according the JVM options rules:

  • A line starting with a dash (-) is treated as a JVM option that applies to all versions.
  • A line starting with a number followed by a colon (8:) is treated as a JVM option that applies to the matching Java major version only.
  • A line starting with a number followed by a dash followed by a colon (9-:) is treated as a JVM option that applies to the matching Java specified major version and all larger Java major versions.
  • A line starting with a number followed by a dash followed by a number followed by a colon (8-10:) is treated as a JVM option that applies to the specified range of matching Java major versions.
  • A line starting with the number sign character (#) will be treated as a comment and will be ignored.

For example, if the specified Java major version is 8, the following JVM options will be accepted:

-XX:+PrintGCDateStamps
8:-XX:+PrintGCDateStamps
8-:-XX:+PrintGCDateStamps
7-8:-XX:+PrintGCDateStamps

And the following options will be ignored because the Java version does not match with the one defined in the lines:

9:-Xlog:age*=trace,gc*,safepoint:file=logs/gc.log:utctime,pid,tags:filecount=32,filesize=64m
9-:-Xlog:age*=trace,gc*,safepoint:file=logs/gc.log:utctime,pid,tags:filecount=32,filesize=64m}
9-10:-Xlog:age*=trace,gc*,safepoint:file=logs/gc.log:utctime,pid,tags:filecount=32,filesize=64m}

Here is a sample policy file that contains all the JVM options. The uncommented ones are those that are normally used in a DDC setup.

JVM Option File for DDC

## JVM configuration
 
################################################################
## IMPORTANT: JVM heap size
################################################################
##
## You should always set the min and max JVM heap
## size to the same value. For example, to set
## the heap to 4 GB, set:
##
## -Xms4g
## -Xmx4g
##
################################################################
 
# Xms represents the initial size of total heap space
# Xmx represents the maximum size of total heap space
 
-Xms1g
-Xmx1g
 
################################################################
## Debug settings
################################################################
 
## -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8888
 
 
################################################################
## Flight Recorder settings.
################################################################
 
## -XX:+UnlockCommercialFeatures
## -XX:+FlightRecorder
## -XX:StartFlightRecording=duration=285s,filename=$DATUMIZE_DDC_HOME/log/jfr.jfr,settings=mem
 
################################################################
## HPROF Settings
################################################################
## -agentlib:hprof=heap=dump,file=${DATUMIZE_DDC_HOME}/log/hprof.bin,format=b,depth=10,doe=y
 
################################################################
## Expert settings
################################################################
 
## JVM Mode (server or client)
-server
 
## GC configuration
-XX:MaxGCPauseMillis=150
-XX:+UseG1GC
#-XX:PermSize=512m
#-XX:ParallelGCThreads=4
#-XX:ConcGCThreads=2
#-XX:InitiatingHeapOccupancyPercent=70
#-XX:+UseConcMarkSweepGC
#-XX:CMSInitiatingOccupancyFraction=75
#-XX:+UseCMSInitiatingOccupancyOnly
 
## optimizations
 
# pre-touch memory pages used by the JVM during initialization
-XX:+AlwaysPreTouch
 
## basic
 
# explicitly set the stack size
#-Xss1m
 
# ensure UTF-8 encoding by default (e.g. filenames)
#-Dfile.encoding=UTF-8
 
# use our provided JNA always versus the system one
#-Djna.nosys=true
 
# turn off a JDK optimization that throws away stack traces for common
# exceptions because stack traces are important for debugging
-XX:-OmitStackTraceInFastThrow
 
## JDK 8 GC logging
8:-Xloggc:"$DATUMIZE_DDC
_HOME"/log/"$DATUMIZE_DDC_INSTANCE".gc8
#8:-XX:+HeapDumpOnOutOfMemoryError
#8:-XX:HeapDumpPath=${DATUMIZE_DDC_HOME}/log
#8:-XX:+PrintGCDateStamps
#8:-XX:+PrintTenuringDistribution
#8:-XX:+PrintGCApplicationStoppedTime
#8:-XX:+UseGCLogFileRotation
#8:-XX:NumberOfGCLogFiles=32
#8:-XX:GCLogFileSize=64m
 
# JDK 9+ GC logging
9-:-Xlog:gc*,gc+age=trace,safepoint:file=${loggc}:utctime,pid,tags:filecount=32,filesize=64m
# due to internationalization enhancements in JDK 9 Elasticsearch need to set the provider to COMPAT otherwise
# time/date parsing will break in an incompatible way for some date patterns and locals
9-:-Djava.locale.providers=COMPAT
 
# temporary workaround for C2 bug with JDK 10 on hardware with AVX-512
10-:-XX:UseAVX=2
BASH