Connecting to Apache Kafka securely using Flink SQL

Flink SQL Tutorials

Connecting to Apache Kafka securely using Flink SQL

Note: This tutorial is mainly focused on securing connections between Flink SQL and Kafka. For detailed information on working with Flink ETL Jobs and Session Clusters, look at the Interactive ETL example.

Flink SQL is a powerful tool for data exploration, manipulation and inter-connection. It allows you to access the power of Flink’s distributed stream processing abilities with a familiar interface. In this tutorial, we go over ways to securely connect to Kafka from Flink SQL.

In the sections below, we cover all the supported secure connection options when running Kafka with the Strimzi Kubernetes operator. In each case, we go over how to configure the data generator script so that it sets up an example Kafka cluster with the appropriate secure listener, and then we show the configuration needed to connect to that secure listener in Flink SQL.

The tutorial is based on the StreamsHub Flink SQL Examples repository and the code can be found under the tutorials/secure-kafka directory.

Note:

Unsecure #

PLAINTEXT #

  • No encryption.
  • No authentication.

Set up the demo application:

# Note: PLAINTEXT is the default option if you don't pass SECURE_KAFKA.
# Note: This sets up Kafka, Flink, recommendation-app (generates example data), etc.
SECURE_KAFKA=PLAINTEXT ./scripts/data-gen-setup.sh

# This creates a standalone Flink job
kubectl -n flink apply -f recommendation-app/flink-deployment.yaml

The commands above apply the following:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false      # Plain listener with no encryption

We can connect to the plain listener, like in the other Flink SQL tutorials, using the query below:

CREATE TABLE SalesRecordTable ( 
    invoice_id STRING, 
    user_id STRING, 
    product_id STRING, 
    quantity STRING, 
    unit_cost STRING, 
    `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', 
    WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND 
) WITH ( 
    'connector' = 'kafka',
    'topic' = 'flink.sales.records',
    
    -- Connect over PLAINTEXT (commented out because it is the default)
    -- 'properties.security.protocol' = 'PLAINTEXT',
    
    -- Point to our plain listener
    'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092',
    
    'properties.group.id' = 'sales-record-group', 
    'value.format' = 'avro-confluent', 
    'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', 
    'scan.startup.mode' = 'latest-offset'
); 

You can verify that the test data is flowing correctly by querying the Kafka topic using the console consumer:

kubectl exec -it my-cluster-dual-role-0 -n flink -- /bin/bash \
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink.sales.records

Secure #

Note:

  • Each example code block is diffed against the PLAINTEXT example above.
    • The mTLS example below provides a KafkaUser. Each KafkaUser after that is diffed against it.
  • A TLS listener is always included to allow the recommendation-app to send example data.
    • A recommendation-app-kafka-user KafkaUser is also created and is always added to the superUsers list in the Kafka cluster, if authorization is enabled.

Testing (preamble) #

You can verify that the different authentication methods, shown below, are working by doing the following:

  • Run the commands in the example, as instructed.

  • Port forward the Flink Job Manager pod so we can access it:

    kubectl -n flink port-forward <job-manager-pod> 8081:8081

    The job manager pod will have the name format standalone-etl-secure-<alphanumeric>, your kubectl should tab-complete the name. If it doesn’t then you can find the job manager name by running kubectl -n flink get pods.

  • Make an API request to the JobManager REST API and parse the JSON response with jq like so:

    # Note: There should only be one continuously running job
    RUNNING_JOB_ID=$(curl -s localhost:8081/jobs/ | \
      jq -r '.jobs[] | select(.status == "RUNNING") | .id')
    
    # Get the number of records written to the output table/topic
    curl -s localhost:8081/jobs/$RUNNING_JOB_ID | \
      jq '.vertices[] | select(.name | contains("Writer")) |
          {
            "write-records": .metrics."write-records",
            "write-records-complete": .metrics."write-records-complete"
          }'

    This should output the following result:

    {
      "write-records": 10,
      "write-records-complete": true
    }

    Note: If the result is different, you might’ve made the API request too quickly. Try again in a few seconds.

TLS #

  • Encrypted using TLS (e.g. TLSv1.3 and TLS_AES_256_GCM_SHA384).
  • Server is authenticated by client.

Set up the demo application:

SECURE_KAFKA=TLS ./scripts/data-gen-setup.sh

# Note: This creates a standalone Flink job that connects to the Kafka listener
#       below and copies 10 records from an existing topic to a newly created one
kubectl -n flink apply -f secure-kafka/TLS/standalone-etl-secure-deployment.yaml

The commands above apply the following:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    listeners:
-     - name: plain
-       port: 9092
-       type: internal
-       tls: false

+     - name: tls
+       port: 9093
+       type: internal
+       tls: true       # Enable TLS encryption on listener

To securely connect to a listener with TLS enabled, we need to mount the my-cluster-cluster-ca-cert secret generated by Strimzi into our Flink pod, so that we can instruct Flink to trust the Kafka cluster’s TLS certificate.

Note: The secret contains more than one CA certificate during the CA certificate renewal period and all of them must be added to the truststore. You must handle this in production.

In standalone-etl-secure-deployment.yaml, we do this in the following way (secrets in other examples are mounted similarly):

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: standalone-etl-secure
spec:
  image: ...
  podTemplate:
    kind: Pod
    spec:
      volumes:
        - name: my-cluster-cluster-ca-cert
          secret:
            secretName: my-cluster-cluster-ca-cert
            items:
              - key: ca.crt   # We only need the current cluster CA
                path: ca.crt  # certificate for the examples.
      containers:
        - name: flink-main-container
          volumeMounts:
            - name: my-cluster-cluster-ca-cert
              mountPath: "/opt/my-cluster-cluster-ca-cert"
              readOnly: true

We can connect to the listener using the query below:

-- NOTE: This line is included in standalone-etl-secure-deployment.yaml
CREATE TABLE SalesRecordTable ( 
    invoice_id STRING, 
    user_id STRING, 
    product_id STRING, 
    quantity STRING, 
    unit_cost STRING, 
    `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', 
    WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND 
) WITH ( 
    'connector' = 'kafka',
    'topic' = 'flink.sales.records',
 
-   'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092',

+     -- Change to secure listener
+   'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9093',

+     -- Connect over SSL
+   'properties.security.protocol' = 'SSL',

+     -- Provide path to mounted secret containing the
+     -- Kafka cluster CA certificate generated by Strimzi
+   'properties.ssl.truststore.location' = '/opt/my-cluster-cluster-ca-cert/ca.crt',

+     -- Indicate certificate is of type PEM (as opposed to JKS or PKCS12)
+   'properties.ssl.truststore.type' = 'PEM',

    'properties.group.id' = 'sales-record-group',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6',
    'scan.startup.mode' = 'latest-offset'
);

mTLS #

  • Encrypted using TLS (e.g. TLSv1.3 and TLS_AES_256_GCM_SHA384).
  • Both server and client are authenticated.

Set up the demo application:

SECURE_KAFKA=mTLS ./scripts/data-gen-setup.sh

kubectl -n flink apply -f secure-kafka/mTLS/standalone-etl-secure-deployment.yaml

The commands above apply the following:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    listeners:
-     - name: plain
-       port: 9092
-       type: internal
-       tls: false

+     - name: mtls
+       port: 9094
+       type: internal
+       tls: true
+       authentication:
+         type: tls     # Enable TLS client authentication
# KafkaUser added for client authentication examples,
# not necessary for Kafka listeners without
# 'authentication' property.
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: tls       # This will generate a 'my-user' Secret containing credentials

To securely connect to a listener with mTLS enabled, we need to mount the my-cluster-cluster-ca-cert secret onto our Flink pod and pass the contents of the certificate and key from the my-user secret generated by Strimzi to our Flink SQL statements. By doing this, we can instruct Flink to trust the Kafka cluster’s TLS certificate and to send on our own credentials.

Note: The FlinkDeployment volumes are the same as before.

We can connect to the listener using the query below:

-- NOTE: This line is included in standalone-etl-secure-deployment.yaml
CREATE TABLE SalesRecordTable ( 
    invoice_id STRING, 
    user_id STRING, 
    product_id STRING, 
    quantity STRING, 
    unit_cost STRING, 
    `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', 
    WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND 
) WITH ( 
    'connector' = 'kafka',
    'topic' = 'flink.sales.records',
-   'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092',
+   'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9094',
+   'properties.security.protocol' = 'SSL',
+   'properties.ssl.truststore.location' = '/opt/my-cluster-cluster-ca-cert/ca.crt',
+   'properties.ssl.truststore.type' = 'PEM',

+     -- Provide contents of the user certificate and key from the
+     -- 'my-user' secret generated by the Strimzi User Operator
+     -- for the 'my-user' KafkaUser
+   'properties.ssl.keystore.certificate.chain' = '{{secret:flink/my-user/user.crt}}',
+   'properties.ssl.keystore.key' = '{{secret:flink/my-user/user.key}}',
+   'properties.ssl.keystore.type' = 'PEM',

    'properties.group.id' = 'sales-record-group',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6',
    'scan.startup.mode' = 'latest-offset'
);

SCRAM-SHA-512 #

A TLS connection will lose its security if the Certificate Authority that issued the TLS certificate is compromised. SCRAM is a family of authentication mechanisms that aim to prevent this using a combination of challenge-response, salting, hashing, channel binding, etc.

Set up the demo application:

SECURE_KAFKA=SCRAM ./scripts/data-gen-setup.sh

kubectl -n flink apply -f secure-kafka/SCRAM/standalone-etl-secure-deployment.yaml

The commands above apply the following:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    listeners:
-     - name: plain
-       port: 9092
-       type: internal
-       tls: false

+     - name: scram
+       port: 9094
+       type: internal
+       tls: true
+       authentication:
+         type: scram-sha-512   # Specify SCRAM authentication
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
- authentication:
-   type: tls
+ authentication:
+   type: scram-sha-512     # Specify SCRAM authentication

We can connect to the listener using the query below:

-- NOTE: This line is included in standalone-etl-secure-deployment.yaml
CREATE TABLE SalesRecordTable ( 
    invoice_id STRING, 
    user_id STRING, 
    product_id STRING, 
    quantity STRING, 
    unit_cost STRING, 
    `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', 
    WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND 
) WITH ( 
    'connector' = 'kafka',
    'topic' = 'flink.sales.records',
-   'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092',
+   'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9094',
+   'properties.ssl.truststore.location' = '/opt/my-cluster-cluster-ca-cert/ca.crt',
+   'properties.ssl.truststore.type' = 'PEM',

+     -- Connect over SASL_SSL, this allows us to specify a SASL mechanism
+   'properties.security.protocol' = 'SASL_SSL',

+     -- Connect using SCRAM mechanism
+   'properties.sasl.mechanism' = 'SCRAM-SHA-512',

+     -- Connect using Kafka's ScramLoginModule
+     -- Provide `user.password` from the generated `my-user` secret.
+   'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule
+       required
+           username="my-user"
+           password="{{secret:flink/my-user/password}}"
+       ;',

    'properties.group.id' = 'sales-record-group',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6',
    'scan.startup.mode' = 'latest-offset'
);

OAuth 2.0 #

OAuth 2.0 is a standardized protocol for authorization, but is commonly used inside of authentication protocols.

In this example, the data-gen-setup.sh script creates a secure Keycloak deployment as our OAuth 2.0 provider. It is an open-source piece of Identity and Access Management software that is built on top of the OAuth 2.0 specification. The script automatically creates a self-signed TLS certificate for Keycloak’s HTTPS endpoints.

Since our data-gen-setup.sh script sets up a Kafka cluster using Strimzi, this example uses the Keycloak config and realm from the strimzi-kafka-operator Keycloak example.

Note:

  • Keycloak Web UI credentials:
    • Username: admin
    • Password: admin

Set up the demo application:

# Note: For OAuth 2.0, a secure Keycloak deployment with
#       a self-signed HTTPS TLS certificate is generated
SECURE_KAFKA=OAuth2 ./scripts/data-gen-setup.sh

kubectl -n flink apply -f secure-kafka/OAuth2/standalone-etl-secure-deployment.yaml

The commands above apply the following:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    listeners:
-     - name: plain
-       port: 9092
-       type: internal
-       tls: false

+     - name: oauth2
+       port: 9094
+       type: internal
+       tls: true
+       authentication:
+         type: oauth       # Specify OAuth 2.0 authentication

+         # Specify OAuth 2.0 JWKS/JWT details
+         validIssuerUri: https://keycloak.flink.svc:8443/realms/kafka-authz
+         jwksEndpointUri: https://keycloak.flink.svc:8443/realms/kafka-authz/protocol/openid-connect/certs
+         userNameClaim: preferred_username

+         # Trust self-signed TLS certificate used by Keycloak HTTPS endpoint
+         tlsTrustedCertificates:
+           - secretName: keycloak-cert
+             certificate: tls.crt
# This KafkaUser is only needed if the Kafka listener is using simple authorization
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
- name: my-user
+ name: service-account-kafka   # Create KafkaUser for user in the Keycloak realm
  labels:
    strimzi.io/cluster: my-cluster
spec:
- authentication:               # Remove authentication
-   type: tls

The end of a typical OAuth 2.0 flow involves granting the user an access token after successful authorization and redirecting them to a callback URL. Since we’re using Strimzi, we need to instruct Flink to use Strimzi’s OAuth 2.0 callback handler to handle this.

Note: As mentioned at the top of this tutorial, the StreamsHub Flink SQL Runner distribution comes with Strimzi’s OAuth 2.0 callback handler included.

We can connect to the listener using the query below:

-- NOTE: This line is included in standalone-etl-secure-deployment.yaml
CREATE TABLE SalesRecordTable (
    invoice_id STRING,
    user_id STRING,
    product_id STRING,
    quantity STRING,
    unit_cost STRING,
    `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp',
    WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND
) WITH ( 
    'connector' = 'kafka',
    'topic' = 'flink.sales.records',
-   'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092',
+   'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9094',
+   'properties.security.protocol' = 'SASL_SSL',
+   'properties.ssl.truststore.location' = '/opt/my-cluster-cluster-ca-cert/ca.crt',
+   'properties.ssl.truststore.type' = 'PEM',

+     -- Connect using OAUTHBEARER mechanism (OAuth 2.0 with Bearer/access token)
+   'properties.sasl.mechanism' = 'OAUTHBEARER',

+     -- Connect using Kafka's OAuthBearerLoginModule (shaded in our Flink distribution to prevent dependency conflicts)
+     -- Provide path to mounted secret containing Keycloak's self-signed TLS certificate
+     -- Provide OAuth 2.0 client and endpoint details for/from Keycloak realm
+   'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
+       required
+           oauth.ssl.truststore.location="/opt/keycloak-ca-cert/ca.crt"
+           oauth.ssl.truststore.type="PEM"
+           oauth.client.id="kafka"
+           oauth.client.secret="{{secret:flink/keycloak-kafka-client-secret/secret}}"
+           oauth.token.endpoint.uri="https://keycloak.flink.svc:8443/realms/kafka-authz/protocol/openid-connect/token"
+       ;',

+     -- Use Strimzi's OAuth 2.0 callback handler
+   'properties.sasl.login.callback.handler.class' = 'io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler',

    'properties.group.id' = 'sales-record-group',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6',
    'scan.startup.mode' = 'latest-offset'
);

Custom #

Custom authentication allows wide flexibility in how authentication is carried out. For the sake of simplicity, this example shows how to use a custom TLS client authentication truststore.

Set up the demo application:

# Note: For custom authentication, a self-signed
# 'my-user-custom-cert' TLS certificate is created
SECURE_KAFKA=custom ./scripts/data-gen-setup.sh

kubectl -n flink apply -f secure-kafka/custom/standalone-etl-secure-deployment.yaml

The commands above apply the following:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    listeners:
-     - name: plain
-       port: 9092
-       type: internal
-       tls: false

+     - name: custom
+       port: 9094
+       type: internal
+       tls: true
+       authentication:
+         type: custom      # Specify custom authentication
+         sasl: false       # Disable SASL since it is unnecessary
+         listenerConfig:
+           ssl.client.auth: required       # Require TLS client authentication

+             # Provide path to self-signed client TLS truststore
+           ssl.truststore.location: /mnt/my-user-custom-cert/ca.crt
+           ssl.truststore.type: PEM
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
- authentication:
-   type: tls
+ authentication:
+   type: tls-external  # Specify external TLS authentication
+                       # (we use our own self-signed certificate)

We can connect to the listener using the query below:

-- NOTE: This line is included in standalone-etl-secure-deployment.yaml
CREATE TABLE SalesRecordTable (
    invoice_id STRING,
    user_id STRING,
    product_id STRING,
    quantity STRING,
    unit_cost STRING,
    `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp',
    WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND
) WITH ( 
    'connector' = 'kafka',
    'topic' = 'flink.sales.records',
-   'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092',
+   'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9094',
+   'properties.security.protocol' = 'SSL',
+   'properties.ssl.truststore.location' = '/opt/my-cluster-cluster-ca-cert/ca.crt',
+   'properties.ssl.truststore.type' = 'PEM',

+     -- Provide contents of the self-signed TLS certificate and key from the
+     -- 'my-user-custom-cert' secret generated by the Cert-Manager Operator
+     -- for our 'my-user-custom-cert' Certificate
+   'properties.ssl.keystore.certificate.chain' = '{{secret:flink/my-user-custom-cert/tls.crt}}',
+   'properties.ssl.keystore.key' = '{{secret:flink/my-user-custom-cert/tls.key}}',
+   'properties.ssl.keystore.type' = 'PEM',

    'properties.group.id' = 'sales-record-group',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6',
    'scan.startup.mode' = 'latest-offset'
);