A step by step tutorial to capture data from syslog and publish to AWS IoT Core via the MQTT Sink . We will use a cook processor to filter a specific process.

Objectives

This tutorial contains different resources that you can reuse and extend for additional learning. Feel free to experiment!


You can download our tutorial templates here https://download.datumize.tech/dtz-templates/



Below is a snippet of the syslog we will be reading. We will only extract message from the process systemd, all other messages will be discarded.


Nov 25 15:03:11 ip-172-31-38-145 systemd[1]: Created slice User Slice of UID 1001.
Nov 25 15:03:11 ip-172-31-38-145 systemd[1]: Starting User Runtime Directory /run/user/1001...
Nov 25 15:03:11 ip-172-31-38-145 systemd[1]: Finished User Runtime Directory /run/user/1001.
Nov 25 15:03:11 ip-172-31-38-145 systemd[1]: Starting User Manager for UID 1001...
Nov 25 15:03:11 ip-172-31-38-145 systemd[32996]: Reached target Paths.
Nov 25 15:03:11 ip-172-31-38-145 systemd[32996]: Reached target Timers.
Nov 25 15:03:11 ip-172-31-38-145 systemd[32996]: Starting D-Bus User Message Bus Socket.
Nov 25 15:03:11 ip-172-31-38-145 systemd[32996]: Listening on GnuPG network certificate management daemon.
Nov 25 15:03:11 ip-172-31-38-145 systemd[32996]: Listening on GnuPG cryptographic agent and passphrase cache (access for web browsers).
Nov 25 15:03:11 ip-172-31-38-145 systemd[32996]: Listening on GnuPG cryptographic agent and passphrase cache (restricted).
Nov 25 15:03:11 ip-172-31-38-145 systemd[32996]: Listening on GnuPG cryptographic agent (ssh-agent emulation).
Nov 25 15:03:11 ip-172-31-38-145 systemd[32996]: Listening on GnuPG cryptographic agent and passphrase cache.
Nov 25 15:03:11 ip-172-31-38-145 systemd[32996]: Listening on debconf communication socket.
Nov 25 15:03:11 ip-172-31-38-145 systemd[32996]: Listening on REST API socket for snapd user session agent.
Nov 25 15:03:11 ip-172-31-38-145 systemd[32996]: Listening on D-Bus User Message Bus Socket.
Nov 25 15:03:11 ip-172-31-38-145 systemd[32996]: Reached target Sockets.
Nov 25 15:03:11 ip-172-31-38-145 systemd[32996]: Reached target Basic System.
Nov 25 15:03:11 ip-172-31-38-145 systemd[32996]: Reached target Main User Target.
Nov 25 15:03:11 ip-172-31-38-145 systemd[32996]: Startup finished in 63ms.
Nov 25 15:03:11 ip-172-31-38-145 systemd[1]: Started User Manager for UID 1001.
Nov 25 15:03:11 ip-172-31-38-145 systemd[1]: Started Session 13 of user dtz.
CODE

Build the IoT Core Pipeline


The table below summarizes the components used in the pipeline.

Component TypeNameDescription
SourceTailfSourceReads data from rotated log file.
ProcessorCookProcessorProvides on the edge processing.
SinkMQTTSinkPublishes to MQTT message broker. 


Drag the required components from the Pallet to the Workbench and join them with a Single Memory Stream.


The table below summarizes the properties to configure the Tailf Source component.

DefaultField NameValueRequired
File/var/log/syslog*
ResumeEnabled*
DeserializerSyslog map deserializer*
Buffer Size1.00k*


The Syslog map deserialiser will output data in the following format:

severity=null

hostname=ip-172-31-38-145

logMessage=chef-client[33819]: [2021-11-25T15:43:15+00:00] INFO: Report handlers complete

facility=null

timestamp=Nov 25 15:43:15


The table below summarizes the properties to configure the Cook Processor component.

DefaultField Name ValueRequired
Operation
strProcess= input.logMessage.split("\\[", 2);

if(strProcess[0] == 'systemd'){
    strProcessId = strProcess[1].split("\\]: ", 2); 
    output.process = strProcess[0];
    output.processid = strProcessId[0];
    output.message = strProcessId[1];

    input.each{ key, value -> 
        output[key] = value;
    }
}
CODE
*


And finally, the table below summarizes the properties to configure the MQTT Sink component.

DefaultField Name ValueRequired

Host

{IoT endpoint}

*

port

8883


SerialiserMap to JSON Serializer
Connection


Mqtt Topiciot_dtz
Security


SecureEnabled

Certificate{certificate}

Certificate Private Key{certificate private key}



Deploy the Pipeline to DDC Instance

In Zentral, you will only need to have one machine, one instance and one pipeline.

The pipeline can be deployed with the default runtime policy.



Check Expected Output

From within your AWS IoT console you will be able to view the data stream.