Expand on the Wi-Fi and IoT Data Tutorial by enriching the data with different processors and joining the result into a single sink.

Objectives

In this tutorial you will learn: 

  • How to enrich pipelines with processors.
  • Join multiple data sources.



Import Basic Tutorial

To complete this tutorial you will need access to our sandbox sources, please contact us if you would like access.

**username**, **password**, **db_username**, **db_password** fields need to be replaced with your sandbox details.

We will start by importing the Wi-FI to Motion Intelligence.json from our downloads page https://download.datumize.tech/dtz-templates/

The table below summarizes the components used in the pipeline.

Component TypeNameDescription
SourceHTTPApiPollingSourceSource to actively request an API from intervals of time (polling).
ProcessorExtractorProcessor
ProcessorFlattenerAdapter
SourceDatabaseSourceReads data from database source.
ProcessorJoinerProcessorJoins Multiple data sources.
ProcessorComposerProcessorComposer with configurable partitioner.
SinkFileSinkWrites data to file.

Modify the imported pipeline as below.

The table below summarizes the properties to configure the HTTPApiPolling Source component.


Field NameValueRequired
Auth MethodCookieAuthMethod*

user**username***

password**password***

login-endpointhttps://sandbox.datumize.net/api/users/login*

login-body{"username" : "{{username}}", "password" : "{{password}}"}*

login-content-typeapplication/json*

cookie-domainnull*

cookie-path/*

login-TTL5m*
Request BuilderRequest builder simple get*

endpointhttps://sandbox.datumize.net/api*
DeserializerJson map deserializer*

The table below summarizes the properties to configure the Extractor Processor component.


Field Name ValueRequired
ExtractorField to Map Extractor*

Pathbody/data*

Defaultnull

The table below summarizes the properties to configure the Flattener Processor component.


Field Name ValueRequired

Topicdevices

The table below summarizes the properties to configure the Database Source component.


Field Name ValueRequired

Topicaps

Urljdbc:postgresql://sandbox.datumize.net:5432/demo?reWriteBatchedInserts=true*

Driverorg.postgresql.Driver*

User**db_username***

Password**db_password***

Queryselect * from aps*
ConverterJdbc row map converter*

The table below summarizes the properties to configure the Joiner Processor component.


Field Name ValueRequired

Topic Adevices*
Id Extractor AField to Map Extractor*

pathap_mac*

Topic Baps*
Id Extractor BField to Map Extractor*

pathap_mac
CombinerFlat List to map combiner*

timeout1h*

The table below summarizes the properties to configure the Composer Processor component.


Field Name ValueRequired
RulesRecords Rule*

Max records300*

And finally, the table below summarizes the properties to configure the File Sink component.


Field Name ValueRequired

Directory Base

/opt/datumize/join*

Serializer

Map to CSV Serializer

Fields

Enter fields in the order you would like them write to CSV.






Deploy the Pipeline to DDC Instance

In Zentral, you will only need to have one machine, one instance and one pipeline.

The table below summarizes the properties to be defined in the DDC Runtime Policy for the HTTPApiPolling  Source component.

Field NameValue
Execution Policy Type
TypeAlways
Flush After DataFalse
Scheduling Policy Type
TypeSLEEP
Duration30s
Cron******
Batch1
Threads1
Error Policy Type
TypeDISCARD
Max-Retries0

We will set the HTTPApiPollingSource policy with a duration of 30s. We also need to set Batch and Threads to 1.

The table below summarizes the properties to be defined in the DDC Runtime Policy for the Database Source component.

Field NameValue
Execution Policy Type
TypeAlways
Flush After DataFalse
Scheduling Policy Type
TypeSLEEP
Duration30s
Cron******
Batch1
Threads1
Error Policy Type
TypeDISCARD
Max-Retries0

We will set the Database Source policy with a duration of 30s. We also need to set Batch and Threads to 1.




Check Expected Output

If you browse the Directory base you will notice CSV files containing mac, ap_mac, ap_name, ap_id, ap_zone and time_stamp each file contains 300 records. Folders are based on write time.