DDC policies define the runtime behaviour for pipelines and streams. Concurrency, scheduling, execution, batching, buffering and error handling are some of the runtime aspects to be configured through policies.

Overview

Configuring DDC policies is an advanced topic. It offers maximum versatility for shaping pipelines and streams for differing usage contexts.  

You can define policies that control the behaviour of components and streams that compose the pipeline. Policies are very granular to support a wide variety of use cases and deployment models: underlying hardware can be a tiny device, desktop computer where minimal resources should be used up, or a powerful server with multiple cores and vast memory. Also, you should keep in mind that DDC runs in a Java Virtual Machine (JVM): the JVM configuration is modeled after a policy as well, for maximum control over the runtime behaviour. 

Each running pipeline needs a runtime configuration. A pipeline can not be run without previously creating a named Pipeline policy. The Pipeline Policy editor represents the designed pipeline and allows the configuration of DDC Policies. You can't change the shape of a pipeline in terms of components and streams however you can select each component and stream and assign runtime properties. These policies impact the behavior of the runtime behaviors and are important for the proper execution of the pipeline. These Pipeline policies can be saved and used later for reoccurring pipeline needs by selecting them after saving them, by selecting them in the DDC pipeline drop down. Please keep in mind as of version 6.1,  properties related to a policy are copied when the policy itself is copied.

Properties

Properties flexibility comes when overriding in a deployment plan.

To override a property the way to do so is to use the DDC Pipeline Policy  in the deployment plan. You will need to publish the previous pipeline version in order to assign it to a new policy.

 In “More actions”  you will find “Properties” that will list all the available properties.

This will open a Properties sidebar with all the global and component properties that were assigned previously in our published pipeline version.

The properties will contain the default value, that can be overridden. This is then assigned to the to a previously created target instance allowing you to reuse a pipeline, but adjusting its policy by including the new property value which you have selected for the new pipeline with only the minor properties having to be changed, and deployed to the machine.This is because when a DDC runs a pipeline, it will always have a DDC Pipeline Policy associated to it:

  • If it’s a default policy, then the properties will not be overridden because the policy does not contain any override for the properties. So the configured value for the property is used without any replacement.

  • If it’s a custom policy, then for each property:

    • If the policy overrides, then DDC will instantiate the pipeline with the overridden value.

    • If it is not overridden, so the configured value is used.


Types of Policies

There are several types of policies depending on what you want to define as an additional consideration. Please see the table below for more information.


Type

Has default policy?

Depends on a version?

Description


Machine policy

Yes

No, machines have no versioning.

It defines whatever applies to the machine/OS or the Chef agent.

Currently, we will only define 2 parameters for the agent (interval and splay). Interval settings refer to the frequency of the policy execution and Splay . The interval and Splay settings are set to give the machine instructions on how often to execute the policy, where Interval is the frequency of the event, and splay is the offset value used to add the desired randomization of the trigger to reduce the likelihood of the machine policies triggering at the same time across the machine and creating unnecessary load. Note:  it is set to a default null value.

Is useful for setting specific parameters that are more related to the machine, for example, the user to execute the chef agent, or the level of patching we want for the OS.

DDD policy

Yes

No, instances have no versioning.

Configuration parameters for the DDD, such as ramfile size and so on.

DDC policy

Yes

No, instances have no versioning.

Defines the policy that affects a DDC instance, such as:

  • JVM options

  • Logging options (log4j.xml)

  • Anything else that affects the whole instance, for example DDC port.

The reason for this IS to keep everything in the same policy to prevent having lots of micro-policies. 

DDC pipeline policy

YES

However using a default policy may need some level of fine tuning based on use case.

Yes, it depends on a DDC Pipeline version.

Specific policy for a DDC pipeline. You can have as many as necessary. Each running combination (instance-pipeline) might have a different policy, or all instances can share the same one if hardware resources are similar. See Pipeline policies for creating a saved and name Pipeline policy. Polices can be reused.


Pipeline Policies

Execution policy

The execution policy specifies how to execute the component. The following table explains the available properties.

Property

ID

Description

Type

Default

Execution policytype

Execution policy type.

  • ONDATA: the component will only be executed if any of the input streams have any data available.
  • ALWAYS: the componen will always be executed, independently of the data available in input streams.

ONDATA

ALWAYS

ONDATA
Flushflush-after-dataStates whether the component should be executed once more (flush) after input data is exhausted.Booleanfalse


Scheduling policy

The scheduling policy specifies when to execute the component, how much data to process at once, and how many threads will be associated. The following table explains the available properties.

Property

ID

Description

Type

Default value

Scheduling policytype

Scheduling policy type:

  • SLEEP: after executing the component, the next execution will wait until the duration parameter.
  • FIXED_RATE: the component is always executed at intervals defined by duration. It could happen that previous component execution has not finished yet.
  • CRON: use a Unix cron expression. Similar to FIXED_RATE, but supports a richer set of events. 

SLEEP

FIXED_RATE

CRON

SLEEP
DurationdurationScheduling duration, used only for SLEEP and FIXED_RATE schedulings.Duration0s
Cron expression cron

Cron scheduling expression. Similar to FIXED_RATE but supports richer activation options. The cron expression is evaluated in local time zone.

Cron Expression* * * * *
Batch sizebatchNumber of records that will be processed at every activation.Integer (>0)1
ThreadsthreadsNumber of concurrent threads that will be created for a component.Integer (>0)1


Error policy

The error policy specifies what to do when an error occurs while executing the component. The following table explains the available properties.

Property

ID

Description

Type

Default value

Error policytype

Error policy type:

  • DISCARD: the record (or batch of records) are just discarded and logged.
  • RETRY: the record (or batch of records) are retried a number of times specified in max-retries.

DISCARD

RETRY

DISCARD
Maximum retriesmax-retriesMaximum number of retries for RETRY error policy.Integer0


Stream Policies

Streams are buffers with a limited capacity and read/write behaviour depending on the kind of access. A stream write/ready policy will affect all components writing/reading that stream. The following table explains the available properties.

Property

ID

Description

Type

Default value

Maximum capacitycapacityStream maximum capacity.Integer (>0)10240
Read timeouttimeout-readRead policy for readers.Timeout policyFOREVER / 0s
Write timeouttimeout-writeWrite policy for writers.Timeout policyFOREVER / 0s

A stream has both a read policy and a write policy that defines the runtime behaviour towards the waiting time. The Timeout policy has following properties:

Property

Parameter

Description

Type

Default value

TimeouttypeTimeout type. 

IMMEDIATE

FOREVER

TIMED

FOREVER
Wait timeouttimeoutTimeout duration. Only valid for TIMED type.Duration0s

Read timeout behaviour:

  • IMMEDIATE: records are read from the stream immediately. If the stream is empty, no records will be returned and the component will be executed.
  • FOREVER: blocking version. Retrieves records from the stream, waiting up forever for records to become available.
  • TIMED: blocking version with a maximum timeout. Retrieves the records from the stream, waiting up to the specific wait time for records to become available. If after that time no records are available, no records will be returned and the component will be executed.

Write timeout behaviour:

  • IMMEDIATE: write records into the stream regardless of how many records are already stored in the stream. If the maximum capacity is exceeded when writing, an exception will be thrown and records will be discarded.
  • FOREVER: blocking version. Write the records into the stream, waiting up for space to become available.
  • TIMED: blocking version with timeout. Write the records into the stream, waiting up to the specific wait time for space to become available. If after that time no space is available, an exception will be thrown and records will be discarded.


DDC Instance Policy

The Java Virtual Machine (JVM) that runs DDC is managed through a policy.

This policy is formatted according the JVM options rules:

  • A line starting with a dash (-) is treated as a JVM option that applies to all versions.
  • A line starting with a number followed by a colon (8:) is treated as a JVM option that applies to the matching Java major version only.
  • A line starting with a number followed by a dash followed by a colon (9-:) is treated as a JVM option that applies to the matching Java specified major version and all larger Java major versions.
  • A line starting with a number followed by a dash followed by a number followed by a colon (8-10:) is treated as a JVM option that applies to the specified range of matching Java major versions.
  • A line starting with the number sign character (#) will be treated as a comment and will be ignored.

For example, if the specified Java major version is 8, the following JVM options will be accepted:

-XX:+PrintGCDateStamps
8:-XX:+PrintGCDateStamps
8-:-XX:+PrintGCDateStamps
7-8:-XX:+PrintGCDateStamps

And the following options will be ignored because the Java version does not match with the one defined in the lines:

9:-Xlog:age*=trace,gc*,safepoint:file=logs/gc.log:utctime,pid,tags:filecount=32,filesize=64m
9-:-Xlog:age*=trace,gc*,safepoint:file=logs/gc.log:utctime,pid,tags:filecount=32,filesize=64m}
9-10:-Xlog:age*=trace,gc*,safepoint:file=logs/gc.log:utctime,pid,tags:filecount=32,filesize=64m}

Here is a sample policy file that contains all the JVM options. The uncommented ones are those that are normally used in a DDC setup.

JVM Option File for DDC

## JVM configuration
 
################################################################
## IMPORTANT: JVM heap size
################################################################
##
## You should always set the min and max JVM heap
## size to the same value. For example, to set
## the heap to 4 GB, set:
##
## -Xms4g
## -Xmx4g
##
################################################################
 
# Xms represents the initial size of total heap space
# Xmx represents the maximum size of total heap space
 
-Xms1g
-Xmx1g
 
################################################################
## Debug settings
################################################################
 
## -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8888
 
 
################################################################
## Flight Recorder settings.
################################################################
 
## -XX:+UnlockCommercialFeatures
## -XX:+FlightRecorder
## -XX:StartFlightRecording=duration=285s,filename=$DATUMIZE_DDC_HOME/log/jfr.jfr,settings=mem
 
################################################################
## HPROF Settings
################################################################
## -agentlib:hprof=heap=dump,file=${DATUMIZE_DDC_HOME}/log/hprof.bin,format=b,depth=10,doe=y
 
################################################################
## Expert settings
################################################################
 
## JVM Mode (server or client)
-server
 
## GC configuration
-XX:MaxGCPauseMillis=150
-XX:+UseG1GC
#-XX:PermSize=512m
#-XX:ParallelGCThreads=4
#-XX:ConcGCThreads=2
#-XX:InitiatingHeapOccupancyPercent=70
#-XX:+UseConcMarkSweepGC
#-XX:CMSInitiatingOccupancyFraction=75
#-XX:+UseCMSInitiatingOccupancyOnly
 
## optimizations
 
# pre-touch memory pages used by the JVM during initialization
-XX:+AlwaysPreTouch
 
## basic
 
# explicitly set the stack size
#-Xss1m
 
# ensure UTF-8 encoding by default (e.g. filenames)
#-Dfile.encoding=UTF-8
 
# use our provided JNA always versus the system one
#-Djna.nosys=true
 
# turn off a JDK optimization that throws away stack traces for common
# exceptions because stack traces are important for debugging
-XX:-OmitStackTraceInFastThrow
 
## JDK 8 GC logging
8:-Xloggc:"$DATUMIZE_DDC
_HOME"/log/"$DATUMIZE_DDC_INSTANCE".gc8
#8:-XX:+HeapDumpOnOutOfMemoryError
#8:-XX:HeapDumpPath=${DATUMIZE_DDC_HOME}/log
#8:-XX:+PrintGCDateStamps
#8:-XX:+PrintTenuringDistribution
#8:-XX:+PrintGCApplicationStoppedTime
#8:-XX:+UseGCLogFileRotation
#8:-XX:NumberOfGCLogFiles=32
#8:-XX:GCLogFileSize=64m
 
# JDK 9+ GC logging
9-:-Xlog:gc*,gc+age=trace,safepoint:file=${loggc}:utctime,pid,tags:filecount=32,filesize=64m
# due to internationalization enhancements in JDK 9 Elasticsearch need to set the provider to COMPAT otherwise
# time/date parsing will break in an incompatible way for some date patterns and locals
9-:-Djava.locale.providers=COMPAT
 
# temporary workaround for C2 bug with JDK 10 on hardware with AVX-512
10-:-XX:UseAVX=2
BASH