main

Flink SQL Runner

main

Introduction

This project aims to simplify the process of running SQL queries using Apache Flink on Kubernetes. Currently, in order to run SQL queries with Flink, you would need to run the Flink SQL client CLI or submit queries via a REST request to a Flink SQL Gateway instance.

This project provides a convenient wrapper application and container image for use with the Flink Kubernetes Operator’s FlinkDeployment custom resource, allowing you to specify your SQL queries as arguments.

Installation

In order to be able to deploy Flink SQL jobs the Flink Kubernetes Operator must be installed.

CertManager Installation

The operator installs a webhook that requires CertManager to function, so this needs to be installed first:

kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.18.2/cert-manager.yaml
kubectl wait deployment --all  --for=condition=Available=True --timeout=300s -n cert-manager

First add the helm repository, if you haven’t already:

helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.12.1/

Then install the helm chart for operator 1.12.1:

helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator \
--set podSecurityContext=null \
--set defaultConfiguration."log4j-operator\.properties"=monitorInterval\=30 \
--set defaultConfiguration."log4j-console\.properties"=monitorInterval\=30 \
--set defaultConfiguration."flink-conf\.yaml"="kubernetes.operator.metrics.reporter.prom.factory.class\:\ org.apache.flink.metrics.prometheus.PrometheusReporterFactory
 kubernetes.operator.metrics.reporter.prom.port\:\ 9249 " \
-n <operator-namespace>

The helm installation will create the required service, role, and role-binding to run Flink jobs in the defined operator namespace. The Flink Kubernetes Operator, by default, watches all namespaces in the Kubernetes cluster. To run Flink jobs in a namespace other than the operator’s namespace, create a service account with the required permissions in that namespace.

The Flink documentation covers the RBAC design and requirements for the Flink Kubernetes Operator in detail. However, you can find an example service, role and role-binding in the install/namespace-rbac directory which you can use to setup a new application namespace:

kubectl -n <application-namespace> -f install/namespace-rbac

This creates a flink service account that must be referenced in the FlinkDeployment resource using the spec.serviceAccount property.

Writing Queries

For information on how Flink SQL supports writing queries, please refer to the upstream documentation.

Using secrets

You can use Kubernetes secrets with the Flink SQL Runner to provide security credentials to Flink job for connecting to the source or the target systems. Secrets can be directly templated in the SQL statements with the following pattern:

{{secret:<NAMESPACE>/<SECRET NAME>/<DATA KEY>}}

Special Characters

Note that semicolon ; is a special character used as a statement delimiter. If it’s part of your SQL statements, make sure it is escaped by \\. For example, it might be used when specifying a properties.sasl.jaas.config value for a Kafka connector configuration. In this case, it would look something like this:

'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"test-user\" password=\"{{secret:flink/test-user/user.password}}\"\\;'

Your SQL query can be submitted via either:

  • The spec.job.args field of the FlinkDeployment custom resource.

  • The SQL_STATEMENTS environment variable of the main FlinkDeployment container.

spec.job.args

Your SQL query should be formed of a single string within an array literal ([ ]). Multi-line yaml strings (using |,> characters) are not currently supported. However, newlines, tabs, and other whitespace characters within a single string are ignored, so queries can still be well-formatted. See the example below for an illustration of the formatting.

Example FlinkDeployment
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: standalone-etl
spec:
  image: quay.io/streamshub/flink-sql-runner:0.2.0
  flinkVersion: v2_0
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: local:///opt/streamshub/flink-sql-runner.jar
    args: ["
        CREATE TABLE orders (
          order_number BIGINT,
          price DECIMAL(32,2),
          buyer ROW<first_name STRING,
          last_name STRING>,
          last_name STRING,
          order_time TIMESTAMP(3)
        ) WITH (
          'connector' = 'datagen'
        );
        CREATE TABLE print_table
        WITH (
          'connector' = 'print'
        )
        LIKE orders
        ;
        INSERT INTO print_table
          SELECT *
          FROM orders;
        "]
    parallelism: 1
    upgradeMode: stateless

SQL_STATEMENTS

This environment variable is useful for providing an SQL query from a ConfigMap or Secret.

For example, like this:

Example SQL file
-- standalone-etl.sql
CREATE TABLE orders (
  order_number BIGINT,
  price DECIMAL(32,2),
  buyer ROW<first_name STRING,
  last_name STRING>,
  last_name STRING,
  order_time TIMESTAMP(3)
) WITH (
  'connector' = 'datagen'
);

CREATE TABLE print_table
WITH (
  'connector' = 'print'
)
LIKE orders;

INSERT INTO print_table
  SELECT *
  FROM orders;
Example ConfigMap from SQL file
kubectl create configmap -n flink standalone-etl-sql \
  --from-file=SQL_STATEMENTS=standalone-etl.sql
Example FlinkDeployment using SQL from ConfigMap
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: standalone-etl
spec:
  image: quay.io/streamshub/flink-sql-runner:0.2.0
  flinkVersion: v2_0
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  podTemplate:
    kind: Pod
    spec:
      containers:
        - name: flink-main-container
          envFrom:
            - configMapRef:
                name: standalone-etl-sql
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: local:///opt/streamshub/flink-sql-runner.jar
    parallelism: 1
    upgradeMode: stateless

Monitoring metrics

Note

This section assumes the following:

Installing Prometheus

Depending on whether you’re using Kubernetes or OpenShift, you will have to follow different steps to set up Prometheus.

Kubernetes

After deploying a Flink cluster, you can deploy Prometheus to monitor metrics from the job manager and task manager by following these steps:

  1. Install the CoreOS Prometheus Operator:

    curl -s https://raw.githubusercontent.com/coreos/prometheus-operator/v0.84.0/bundle.yaml > prometheus-operator-deployment.yaml
    
    # Specify to install operator into 'flink' namespace
    sed -E -i '/[[:space:]]*namespace: [a-zA-Z0-9-]*$/s/namespace:[[:space:]]*[a-zA-Z0-9-]*$/namespace: flink/' prometheus-operator-deployment.yaml
    
    kubectl create -f prometheus-operator-deployment.yaml
  2. Install the following pre-configured CRs:

    kubectl apply -f prometheus-install/ -n flink
    1. Service and ServiceMonitor for the Flink kubernetes operator.

    2. PodMonitor for the recommendation-app Flink JobManager and TaskManager.

  3. Also, install the following Kubernetes-specific pre-configured CRs:

    1. Prometheus and Service for the Prometheus instance.

    2. ServiceAccount, Role, and RoleBinding for allowing Prometheus to scrape targets.

      kubectl apply -f prometheus-install/kubernetes/ -n flink
  4. Expose the Prometheus UI with a port-forward rule:

    kubectl port-forward svc/prometheus-service 9090 -n flink
  5. Now, you can monitor the metrics in the Flink kubernetes operator, job manager, or task manager via the Prometheus UI accessible at localhost:9090.

    img.png

    img.png

    img.png

OpenShift
Note

Make sure you have enabled monitoring for user-defined projects. E.g. like this:

oc apply -f - <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
  name: cluster-monitoring-config
  namespace: openshift-monitoring
data:
  config.yaml: |
    enableUserWorkload: true
EOF

Since OpenShift comes with Prometheus built-in and pre-configured, we can integrate with it by deploying a PodMonitor CR for the Flink cluster.

  1. Install the following pre-configured CRs:

    oc apply -f prometheus-install/ -n flink
    1. Service and ServiceMonitor for the Flink kubernetes operator.

    2. PodMonitor for the recommendation-app Flink JobManager and TaskManager.

  2. It takes around 5 minutes for the Prometheus operator to update the config for the Prometheus server. After that, you can query the metrics in the OpenShift UI by navigating to the "Observe" section in the left nav bar.

    img.png

    img.png

    img.png

Depending on whether you’re using Kubernetes or OpenShift, you will have to follow different steps to set up Grafana.

Installing Grafana

Kubernetes

After deploying Prometheus, you can deploy Grafana to create dashboards and visualize the data.

  1. Install the pre-configured Grafana server, Service, data sources, and example Flink dashboard:

    kubectl apply -f grafana-install/kubernetes/grafana-datasources.yaml -n flink
    
    kubectl apply -f grafana-install/ -n flink
    Note
    grafana-datasources.yaml is pre-configured to connect to the Prometheus server from the previous section.
OpenShift

Since our Grafana deployment will need to fetch data from OpenShift’s Prometheus instance, we need to create a ServiceAccount with the right permissions and pass its token to Grafana.

  1. Create a ServiceAccount with permissions for monitoring:

    oc project openshift-user-workload-monitoring
    
    oc create sa grafana-sa
    
    oc adm policy add-cluster-role-to-user cluster-monitoring-view -z grafana-sa
  2. Create a token for the ServiceAccount, then apply a Grafana datasource containing the token:

    export PROMETHEUS_TOKEN=`oc -n openshift-user-workload-monitoring create token grafana-sa`
    
    cat grafana-install/openshift/grafana-datasources.yaml | envsubst | oc apply -n flink -f -
    Note
    In production, you should bind the token to an object.
  3. Install the pre-configured Grafana server, Service, and example Flink dashboard:

    oc apply -f grafana-install/ -n flink

Viewing the example dashboard

After confirming the Grafana deployment is running, we can expose it with a port-forward rule:

kubectl port-forward svc/grafana-service 3000 -n flink
Note
The default Grafana username and password are both admin.

Upon logging in to the Grafana UI, you should be greeted with the provided example Flink dashboard.

img.png