Overview

Kafka Sink is a sink component aimed at storing records into a Kafka messaging system.

FQCN

com.datumize.ddc.component.kafka.KafkaSink

Properties


Common properties that apply to all sink components of a pipeline.

PROPERTYIDDESCRIPTION

REQUIRED

TYPE

DEFAULT

EXAMPLES

Common
IdentifierIDComponent unique Identifier within the pipeline, read only, only useful for advanced mode.YesStringAuto

MyComponent_23423

This Identifier is automatically generated by the system and you can't change it. Might be helpful for advanced pipeline configuration.

DescriptiondescriptionA short description for the component, to be visualized inside the component representation, aimed at providing additional information to understand the pipeline at a glance.NoString

Extract customer id and loyalty number.

Short and sweet description.

PROPERTYIDDESCRIPTIONREQUIREDTYPEDEFAULTEXAMPLES
Default
URLurl

List of Kafka URLs to connect to.

YesURL List

myserver:3456,otherserver:3434

A list of valid Kafka URL separated by comma.

Topickafka-topicKafka topic to publish the dataYesString

customers

Connect to a Kafka topic named "customers".

Key serializer

key-serializer

Key serializer.

No

See Serializers section.


See the list of supported Serializers
Value serializervalue-serializerValue serializerYesSee Serializers section.
See the list of supported Serializers
Key extractorkey-extractorExtract from the record data to be send as a Kafka keyNoSee Extractors section.
See the list of supported Extractors
Value extractorvalue-extractorExtract from the record data to be send as  Kafka value

No

See Extractors section.


See the list of supported Extractors
Authenticationkafka-authAuthentication details.NoSee Authenticaton methodsNo AuthSee the list of supported Authentication methods
Advanced
Client Idclient-idIdentifier for the Kafka client.NoStringRandom UUID
Asynchronous

async

Whether the push of data will be done in async mode. If not, the sink will be blocked until Kafka acknowledges the writing of data.

No

Boolean

false (sync mode)


Batch size

batch

Kafka batch size, determines how many records are sent at once.

No

Integer

10000


Acknowledge

ack

Controls the ACK parameter of Kafka.

No

String

all (wait for all acks)


Retry count

retries

Send retries to Kafka server.

No

Integer

1


Linger timeout

linger-ms

Time waiting before send. See linger.ms

No

Duration

1ms


Send block timeout

max-block-ms

Timeout to block send operation when producer buffer is full. See max.block.ms

No

Duration

60s


Maximum buffer size

buffer-size

Maximum message size.

No

Capacity

32MB


Automatic commitauto-commitEnable auto commit.NoBooleantrue
Commit intervalauto-commit-intervalKafka auto commit interval.NoDuration100ms
Delivery timeoutdelivery-timeoutTotal time that a message will be delayed prior to sending. See delivery.timeout.msNoDuration5m
Request timeoutrequest-timeoutMaximum amount of time the client will wait for the response of a request. See request.timeout.msNoDuration30s
Advanced parameters

advanced-parameters

Kafka advanced client parameters, according to Kafka documentation

No

JSON List


{"key1":"value1","key2":"value2"}

Authentication methods

Kafka allows using different authentication mechanism to verify the identity of users and applications.

DDC supports Simple Authentication and Security Layer (SASL) and mutual TLS (mTLS) using X.509 certificates.

No Auth

It establish an unauthenticated connection with the Kafka broker.

SALS

It uses Simple Authentication and Security Layer (SASL) and can be enabled concurrently with TLS encryption.

DDC only supports PLAIN authentication with or without TLS:

  • PLAINTEXT: Un-authenticated, non-encrypted channel.
  • SSL: SSL channel.
  • SALS_PLAINTEXT: SASL authenticated, non-encrypted channel.
  • SASL_SSL: SASL authenticated, SSL channel
PROPERTYIDDESCRIPTIONREQUIREDTYPEDEFAULTEXAMPLES
Default
Security protocolsecurity-protocolSASL security protocol.No

PLAINTEXT

SSL

SALS_PLAINTEXT

SASL_SSL



UseruserUsername.YesString

PasswordpasswordPassword.YesString

X509

It uses SSL (TLS) client authentication through X.509 private key / certificate pairs.

PROPERTYIDDESCRIPTIONREQUIREDTYPEDEFAULTEXAMPLES
Default
CertificatecertificateContent of the client's certificate.Yes

String



Private Keycertificate-private-keyContent of the client's private key.YesString

CA certificateca-certificateContent of a trusted certificate of a Certification Authority (CA) with which the client's certificate has been signed.NoString

Advanced
Key Algorithmkey-type-algorithmAlgorithm used to generate the key pair.NoStringRSA
TLS protocoltls-protocolSSL/TLS protocol used to generate the SSL context. See ssl.protocolNoStringTLSv1.3

TLSv1.2

TLSv1.3

Enabled protocolsenabled-protocolsThe list of protocols enabled for SSL/TLS connections. See ssl.enabled.protocols NoList
TLSv1.2, TLSv1.3
Endpoint id algorithmendpoint-id-algorithmThe endpoint identification algorithm to validate server hostname using server certificate. See ssl.endpoint.identification.algorithmNoStringhttps
Key manager algorithmkey-manager-algorithmThe algorithm used by key manager factory for SSL/TLS connections. See ssl.keymanager.algorithmNoStringSunX509
Trust manager algorithmtrust-manager-algorithmThe algorithm used by trust manager factory for SSL/TLS connections. See ssl.trustmanager.algorithmNoStringSunX509