Utilizing SQL Stream Builder with IBM Cloud – IBM Developer


The Apache Flink mission offers the flexibility to carry out stateful computations over knowledge streams. The SQL Stream Builder interface is used to create stateful stream processing jobs utilizing SQL. The executed SQL queries run as jobs on Flink. Collectively, these parts make up the Cloudera Streaming Analytics (CSA) package deal, which is offered with Cloudera Knowledge Platform Streaming Version with IBM.

On this publish, we’ll cowl find out how to use the streaming providers obtainable in Cloudera Knowledge Platform to speak with IBM providers. Particularly, we’ll cowl:

To accompany this, we’ve created a video demonstration. Take a look at the video beneath that goes via the examples lined right here.

Producing messages with Apache Kafka

Earlier than we sort out consuming messages on IBM merchandise, we have to produce messages. Usually, you’ll get feeds of streaming knowledge like entry knowledge used to detect fraud use or real-time financials knowledge. There are various tutorials obtainable on-line that cowl these ideas extensively, just like the Apache Kafka Quickstart Doc. We will probably be utilizing Kafka brokers which might be put in on the Cloudera Knowledge Platform as a part of the streaming bundle.

For all the examples right here, we will probably be counting on messages that have been generated utilizing the kafka-python library. The code snippet beneath is what we used to generate faux knowledge and to ship messages:

from kafka import KafkaProducer

import json
import time
import uuid

producer = KafkaProducer(bootstrap_servers="cid-vm-05.cdplab.native:9093",
                        value_serializer=lambda v: json.dumps(v).encode('utf-8'))

i = 0
whereas True:
    i = i + 1
    msg = {'uid': uuid.uuid4().hex, 'sdata': ipercent2 }
    meta = producer.ship(subject, key=b'message', worth=msg)

When run, the code will produce messages just like these seen beneath:

{"uid": "ce1178d33abf4e6297b56b362270830e", "sdata": 0}
{"uid": "ed30f9904f8443ddaef189f1ba5bb02c", "sdata": 1}
{"uid": "d90c3e2bd9994e3e8e6e202ead6ea183", "sdata": 0}
{"uid": "8ce9270033454f54b9fc67b7ef43bc83", "sdata": 1}

Run the applying to begin producing messages:

[root@cid-vm-05 ~]# kinit stevemar
Password for stevemar@CDPLAB.LOCAL:
[root@cid-vm-05 ~]# python3 kafka-tests/producer.py
{'uid': 'af2adbcf77bc45ed97339b669261f10b', 'sdata': 1}
{'uid': '61b777c774c64a788042f7c94e4950de', 'sdata': 0}
{'uid': '90e554b6f51d4952962530deca86f41b', 'sdata': 1}

Now that we’re in a position to produce messages, we’ll begin consuming them. To start with, let’s learn them utilizing SQL Stream Builder.

Studying Kafka messages with SQL Stream Builder

Putting in SQL Stream Builder (SSB) and Flink on a Cloudera cluster is documented within the CSA Quickstart web page. Moreover, we discovered it helpful to Allow Knox for SSB to authenticate extra simply.

By default, the Kafka occasion on the Cloudera Knowledge Platform cluster will probably be added as a Knowledge Supplier. Select to create a brand new Apache Kafka desk; this will probably be our knowledge supply for all examples going ahead. Be aware that some modifications could also be required in Ranger in your person or group entry insurance policies to have the ability to view all Kafka subjects.

Adding a new Apache Kafka data source
Determine 1. Including a brand new Apache Kafka knowledge supply

Viewing the Knowledge Definition Language (DDL) of an Apache Kafka supply ought to look just like the configuration beneath:

CREATE TABLE `default_catalog`.`default_database`.`kafka` (
  `uid` VARCHAR(2147483647),
  `sdata` BIGINT
) COMMENT 'kafka'
  'properties.bootstrap.servers' = 'cid-vm-05.cdplab.native:9093',
  'properties.auto.offset.reset' = 'earliest',
  'connector' = 'kafka',
  'properties.ssl.truststore.location' = '/choose/cloudera/safety/pki/truststore.jks',
  'properties.request.timeout.ms' = '120000',
  'properties.transaction.timeout.ms' = '900000',
  'format' = 'json',
  'subject' = 'stevemartest',
  'properties.safety.protocol' = 'SASL_SSL',
  'scan.startup.mode' = 'earliest-offset',
  'properties.sasl.kerberos.service.title' = 'kafka'

Now we are able to run a easy SQL question to see the info being produced by our Python software that’s posting the messages to our Kafka occasion:

The Outcomes tab within the UI exhibits the JSON payload being produced.

SSB executing a SQL query and displaying the data
Determine 2. SSB executing a SQL question and displaying the info

Now that we’re in a position to affirm that SSB is working with our Kafka occasion, we are able to go to the subsequent step of pushing the messages to a different system. That’s performed by creating and defining new sinks. There are various predefined sinks obtainable out of the field.

Sinking messages to PostgreSQL on IBM Cloud with SQL Stream Builder

The primary sink we’ll take a look at makes use of the JDBC choice that’s supported by Flink. On the time of this writing, there isn’t a assist for Db2, so we determined to check with PostgreSQL on IBM Cloud.

As soon as an occasion of PostgreSQL was created, we navigated to the Credentials web page to seek out the related data to ascertain a connection, equivalent to hostname, username, and password. Utilizing the IBM Cloud Shell was handy because it already included the psql command-line interface.

To authenticate to the PostgreSQL occasion, carry out a step just like the one beneath, changing the credentials with your personal:

cloudshell:~$ PGSSLROOTCERT=87ca6778-aaaa-1111-bbbb-2222-be1101c
cloudshell:~$ psql "host=aac4-1111-bbbb-2222-570065c.databases.appdomain.cloud port=32662 dbname=ibmclouddb person=ibm_cloud_111222333 password=ibm_pass_aaabbbccc"

Make sure that a desk is created by operating the SQL assertion beneath as a result of we’ll be sinking messages to this desk:

CREATE TABLE test_table_kafka (
    uid VARCHAR(1000),
    sdata BIGINT

Again in SSB, we create a brand new desk, however select Flink DDL and the JDBC template. Substitute within the connection data for the PostgreSQL occasion.

CREATE TABLE test_table_kafka (
  `uid` VARCHAR(1000),
  `sdata` BIGINT
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://aac4-1111-bbbb-2222-570065c.databases.appdomain.cloud:32662/ibmclouddb',
    'table-name' = 'test_table_kafka',
    'username' = 'ibm_cloud_111222333',
    'password' = 'ibm_pass_aaabbbccc'

We’ll as soon as once more run a easy SQL question. However this time we’ll swap the Sink from None to the brand new PostgreSQL choice:

The SQL assertion will probably be executed, and also you’ll be capable of see the info being positioned into the PostgreSQL database. We’ve added just a few psql instructions beneath that can be utilized to view the info.

To listing tables:

ibmclouddb=> dt
                     Listing of relations
 Schema |        Title        | Kind  |        Proprietor
 public | test_table_kafka   | desk | ibm-cloud-base-user

To explain a desk:

ibmclouddb=> d test_table_kafka
                  Desk "public.test_table_kafka"
 Column |          Kind           | Collation | Nullable | Default
 uid    | character various(1000) |           |          |
 sdata  | bigint                  |           |          |

To depend the variety of rows within the desk:

ibmclouddb=> choose depend(*) from test_table_kafka;

To view the primary 10 rows of the desk:

ibmclouddb=> choose * from test_table_kafka restrict 10;
               uid                | sdata
 0ddc4ff5e95349cba8b7d012bb152c6e |     0
 eef42141040844b8a5bed7fbe6437ec3 |     1
 bb46c46197c9429f96a9139212482a88 |     0
 9543d995131c4921bfe11a754f94b0a6 |     1
 6e8e3180054241a3971d8d39bb5518ac |     0
 d90ff2d560484034a80a3eaf9a6c5abe |     1
 564b1361536e4ad58bc8857a27c9bf58 |     0
 e15011fc9c0748ea96bc4e4c9e824c40 |     1
 37bc09cec2e7415aac4e4f98116921f6 |     0
 3a29f48b3110421d91bdf74ec7f92862 |     1

We’ll now transfer onto the subsequent instance, utilizing IBM Cloud Object Storage.

Sinking messages to IBM Cloud Object Storage with SQL Stream Builder

The second sink we’ll take a look at makes use of the S3 plugin that’s supported by Flink. The IBM Cloud Object Storage service gives an S3-compatible API so the plugin can be utilized with none modification.

As soon as an occasion of Cloud Object Storage is created, navigate to the Credentials web page to create a brand new API key and secret. You should definitely embrace the choice for enabling HMAC credentials. Jot down the API key and secret since we’ll want them later.

Create a bucket in Cloud Object Storage, within the examples beneath, we known as it ssb-sink-test.

Again in SQL Stream Builder, we’ll create a brand new desk, however this time we’ll select Flink DDL. Substitute in details about your connection beneath:

  uid VARCHAR(64),
  sdata BIGINT
  'connector' = 'filesystem',
  'path' = 's3://ssb-sink-test/mydata',
  'format' = 'json',
  'sink.partition-commit.set off'='process-time'

Moreover, Flink’s configuration file will should be up to date. The configuration file is situated at /and many others/flink/conf/flink-conf.yaml. See the instance beneath for which new properties are required:

s3.access-key: 6c5e41-access-key-136f8f
s3.connection.ssl.enabled: false
s3.endpoint: s3.us-east.cloud-object-storage.appdomain.cloud
s3.path.type.entry: true
s3.secret-key: 6c5e41-secret-key-136f8f

We’ll as soon as once more run a easy SQL question, however this time we’ll swap the Sink from None to the brand new COS choice:

The SQL assertion will probably be executed and also you’ll be capable of see the info being positioned as particular person information within the bucket we created.

Data saved in an IBM Cloud Object Storage bucket
Determine 3. Knowledge saved in an IBM Cloud Object Storage bucket

a particular file exhibits us the payload.

The payloads of Kafka messages are seen within the file
Determine 4. The payloads of Kafka messages are seen inside the file

Now that we’re in a position to see messages on two IBM Cloud providers, we’ll flip our consideration to IBM DataStage, an ETL providing obtainable on each IBM Cloud Pak for Knowledge and IBM Cloud Pak for Knowledge-as-a-Service.

Studying Kafka messages with IBM DataStage

On this instance, we aren’t utilizing SQL Stream Builder however utilizing built-in capabilities of IBM DataStage to learn messages from a Kafka dealer. It’s value mentioning that our Cloudera cluster had Kerberos enabled, so some configuration was required. The Configuring Hive with Kerberos documentation was useful and may very well be tailored for IBM DataStage.

As soon as the required Kafka configuration information have been moved over to the suitable IBM DataStage container, we may take a look at the connection. A easy job with a single Kafka supply and Peek goal can take a look at the connection. By default, the connection will learn 100 messages at a time.

Data saved in an IBM Cloud Object Storage bucket
Determine 5. Knowledge saved in an IBM Cloud Object Storage bucket

Trying on the logs will present the latest messages from the Kafka dealer.

Payloads of Kafka messages are seen within the file
Determine 6. The payloads of Kafka messages are seen inside the file

Abstract and subsequent steps

We hope you discovered extra about find out how to combine IBM merchandise with Apache Flink and Cloudera SQL Stream Builder. Let’s keep in contact by visiting the IBM Group to publish questions and discuss to our specialists.


Leave a Reply

Your email address will not be published. Required fields are marked *