Expand on the Joining Data Tutorial to split certain data in a pipeline and send to a dedicated sink.

Objectives

In this tutorial you will learn: 

  • How to split a pipeline.
  • Write to multiple sink components.

Import Basic Tutorial

To complete this tutorial you will need access to our sandbox sources, please contact us if you would like access.

**username**, **password**, **db_username**, **db_password** fields need to be replaced with your sandbox details.


We will start by importing the join.json from our downloads page https://download.datumize.tech/dtz-templates/

The table below summarizes the components used in the pipeline.

Component TypeNameDescription
SourceHTTPApiPollingSourceSource to actively request an API from intervals of time (polling).
ProcessorExtractorProcessor
ProcessorFlattenerAdapter
SourceDatabaseSourceReads data from database source.
ProcessorJoinerProcessorJoins Multiple data sources.
ProcessorTopicAdapterProcessorSplits a Pipeline.
SinkLoggerSinkLogs the record into the info log, using the toString() method
ProcessorComposerProcessorComposer with configurable partitioner.
SinkFileSinkWrites data to file.

Modify the imported pipeline as below.

The table below summarizes the properties to configure the HTTPApiPolling Source component.


Field NameValueRequired
Auth MethodCookieAuthMethod*

user**username***

password**password***

login-endpointhttps://sandbox.datumize.net/api/users/login*

login-body{"username" : "{{username}}", "password" : "{{password}}"}*

login-content-typeapplication/json*

cookie-domainnull*

cookie-path/*

login-TTL5m*
Request BuilderRequest builder simple get*

endpointhttps://sandbox.datumize.net/api*
DeserializerJson map deserializer*

The table below summarizes the properties to configure the Extractor Processor component.


Field Name ValueRequired
ExtractorField to Map Extractor*

Pathbody/data*

Defaultnull

The table below summarizes the properties to configure the Flattener Processor component.


Field Name ValueRequired

Topicdevices

The table below summarizes the properties to configure the Database Source component.


Field Name ValueRequired

Topicaps

Urljdbc:postgresql://sandbox.datumize.net:5432/demo?reWriteBatchedInserts=true*

Driverorg.postgresql.Driver*

User**db_username***

Password**db_password***

Queryselect * from aps*
ConverterJdbc row map converter*

The table below summarizes the properties to configure the Joiner Processor component.


Field Name ValueRequired

Topic Adevices*
Id Extractor AField to Map Extractor*

pathap_mac*

Topic Baps*
Id Extractor BField to Map Extractor*

pathap_mac
CombinerFlat List to map combiner*

timeout1h*


The table below summarizes the properties to configure the Topic Adapter Processor component.


Field Name ValueRequired
ExtractorField to Map Extractor*

pathap_zone*
Value Topic

Set the value to be extracted and the destination topic.
(Any record with ap_zone = restaurante will be sent to topic restauranteTopic)

*
Default TopicdefaultTopic*

Now we have split data into two topics. We need to send the topics to the correct components.

Modify the stream going to Composer Processor.

Set topic to defaultTopic

Modify the stream going to Logger Sink.

Set topic to restauranteTopic

The table below summarizes the properties to configure the Composer Processor component.


Field Name ValueRequired
RulesRecords Rule*

Max records300*

And finally, the table below summarizes the properties to configure the File Sink component.


Field Name ValueRequired

Directory Base

/opt/datumize/split*

Serializer

Map to CSV Serializer

Fields

Enter fields in the order you would like them write to CSV.






Deploy the Pipeline to DDC Instance

In Zentral, you will only need to have one machine, one instance and one Pipeline.

The table below summarizes the properties to be defined in the DDC Runtime Policy for the HTTPApiPolling  Source component.

Field NameValue
Execution Policy Type
TypeAlways
Flush After DataFalse
Scheduling Policy Type
TypeSLEEP
Duration30s
Cron******
Batch1
Threads1
Error Policy Type
TypeDISCARD
Max-Retries0

We will set the HTTPApiPollingSource policy with a duration of 30s. We also need to set Batch and Threads to 1.

The table below summarizes the properties to be defined in the DDC Runtime Policy for the Database Source component.

Field NameValue
Execution Policy Type
TypeAlways
Flush After DataFalse
Scheduling Policy Type
TypeSLEEP
Duration30s
Cron******
Batch1
Threads1
Error Policy Type
TypeDISCARD
Max-Retries0

We will set the Database Source policy with a duration of 30s. We also need to set Batch and Threads to 1.




Check Expected Output

If you review the instance log you will see all records with ap_zone=restaurante.

The remaining records will be written to the Directory base.

ubuntu@ip-172-31-35-173:~$ tail -100 /opt/datumize/split/20210209/0918/ebd11e7e-9af6-460a-80dd-506b946fc591 
30:d9:d9:e6:54:88,14:57:9f:d7:bf:00,Access Point 8,7,,1612862165000
30:07:4d:96:22:ba,14:57:9f:d7:bb:60,Access Point 9,8,,1612862165000
28:ed:e0:14:85:ac,3c:e8:24:56:7a:c0,Access Point 10,9,,1612862195000
04:d6:aa:aa:59:a0,3c:e8:24:56:91:e0,Access Point 12,11,,1612862165000
30:07:4d:a3:cf:56,14:57:9f:d7:bd:60,Access Point 16,15,,1612862165000
40:98:ad:19:70:9b,14:57:9f:d8:2b:00,Access Point 17,16,,1612862165000
08:c5:e1:44:5e:9e,14:57:9f:d8:14:60,Access Point 19,18,,1612862165000
14:c2:13:88:d8:27,14:57:9f:d8:15:e0,Access Point 20,19,,1612862165000
3c:ab:8e:37:23:93,14:57:9f:d8:29:00,Access Point 21,20,,1612862165000
6c:4d:73:be:5b:0b,14:57:9f:d8:19:a0,Access Point 22,21,,1612862165000
00:f7:6f:f1:56:c1,14:57:9f:d7:c5:a0,Access Point 23,22,,1612862165000
68:e7:c2:48:4d:4b,14:57:9f:d7:b8:80,Access Point 24,23,,1612862165000
58:b1:0f:07:95:30,14:57:9f:d7:aa:a0,Access Point 25,24,,1612862165000
18:81:0e:30:7d:b2,50:5d:ac:b8:d9:00,Access Point 27,26,,1612862165000
00:db:70:b8:1a:0a,14:57:9f:d8:2a:40,Access Point 28,27,,1612862165000
08:c5:e1:90:44:a6,3c:e8:24:56:f5:c0,Access Point 29,28,,1612862165000
0c:51:01:88:96:e9,3c:e8:24:56:50:80,Access Point 30,29,,1612862195000
00:cd:fe:17:b0:02,3c:e8:24:56:68:40,Access Point 31,30,,1612862165000
1c:9e:46:95:2e:42,14:57:9f:d7:fa:40,Access Point 32,31,,1612862165000
24:fb:65:83:5b:b9,14:57:9f:d8:0c:a0,Access Point 33,32,,1612862165000
50:32:37:21:42:14,3c:e8:24:56:f6:00,Access Point 34,33,,1612862165000
34:ab:37:99:6c:a2,50:5d:ac:b8:f5:a0,Access Point 35,34,,1612862165000
58:e2:8f:b5:45:8c,3c:e8:24:56:4a:80,Access Point 36,35,,1612862195000
60:8c:4a:af:b1:32,3c:e8:24:56:7e:20,Access Point 38,37,,1612862195000

CODE