Kafka Pentaho Data Integration ETL Implementation

Kafka Pentaho Data Integration ETL Implementation tutorial provides example in a few steps how to configure access to kafka stream with PDI Spoon and how to write and read messages

1. Access to Kafka stream

First you need a running kafka cluster. If you want to set up a test POC Kafka server please read this 15 minutes Kafka setup in 5 steps tutorial. Check whether the queue is accessible from the Pentaho ETL machine.

2. Set up Kafka components in Pentaho Data Integration

Open up Spoon and go to Tools -> Marketplace. Then select Apache Kafka Producer and Apache Kafka Consumer and install them. After restarting the client two new transformations should appear under Input and Output

Pentaho Data Integration Kafka connectors:
Pentaho Data Integration Kafka connectors

3. PDI Transformation to produce sample data

Let's now create a PDI transformation which would produce 100 test messages. It contains a row generator, a sequence is added and then the customer static value is concatenated with a sequence. The transformation and source rows look as follows:

Pentaho Data Integration Kafka producer sample:
Pentaho Data Integration Kafka producer sample

The Apache Kafka Producer is added as an output step. It's important to set the following parameters:

  • topic (customer_topic)
  • message field name (customer_name_seq)
  • metadata.broker.list is localhost:9092 in our example

    The transformation is ready and can be executed. As we can see in the kafka console consumer window the messages were successfully produced and delivered to the stream.

    Pentaho Data Integration Kafka producer sample:
    Pentaho Data Integration Kafka producer sample

    4. PDI Transformation to consume sample data

    The data is already in the queue, now let's create a Kafka consumer. The first step is to drag Apache Kafka Consumer into a transformation, then double click to set up a few options:

  • Topic name
  • Target message field name
  • Target key field name
  • zookeeper.connect

    The sample transformation will spool the messages to the CSV file (Text file output step). Note that the transformation is executed and will be running and listening for new messages until maximum duration of consumption is reached (it relates to the duration of the step and not an individual read). As shown in the example below the data is read correctly.
    Pentaho Data Integration Kafka consumer example:
    Pentaho Data Integration Kafka consumer example

    Nest steps would be to produce and consume JSON messages instead of simple open text messages, implement an upsert mechanism for uploading the data to the data warehouse or a NoSQL database and make the process fault tolerant.

    Back to the Data Warehousing tutorial home