Introduction
This project aims to simplify the process of running SQL queries using Apache Flink on Kubernetes. Currently, in order to run SQL queries with Flink, you would need to run the Flink SQL client CLI or submit queries via a REST request to a Flink SQL Gateway instance.
This project provides a convenient wrapper application and container image for use with the Flink Kubernetes Operator’s FlinkDeployment
custom resource, allowing you to specify your SQL queries as arguments.
Installation
In order to be able to deploy Flink SQL jobs the Flink Kubernetes Operator must be installed.
CertManager Installation
The operator installs a webhook that requires CertManager to function, so this needs to be installed first:
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.18.2/cert-manager.yaml
kubectl wait deployment --all --for=condition=Available=True --timeout=300s -n cert-manager
Flink Kubernetes Operator Installation
First add the helm
repository, if you haven’t already:
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.12.1/
Then install the helm chart for operator 1.12.1:
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator \
--set podSecurityContext=null \
--set defaultConfiguration."log4j-operator\.properties"=monitorInterval\=30 \
--set defaultConfiguration."log4j-console\.properties"=monitorInterval\=30 \
--set defaultConfiguration."flink-conf\.yaml"="kubernetes.operator.metrics.reporter.prom.factory.class\:\ org.apache.flink.metrics.prometheus.PrometheusReporterFactory
kubernetes.operator.metrics.reporter.prom.port\:\ 9249 " \
-n <operator-namespace>
Deploying Flink jobs in other namespaces
The helm installation will create the required service, role, and role-binding to run Flink jobs in the defined operator namespace. The Flink Kubernetes Operator, by default, watches all namespaces in the Kubernetes cluster. To run Flink jobs in a namespace other than the operator’s namespace, create a service account with the required permissions in that namespace.
The Flink documentation covers the RBAC design and requirements for the Flink Kubernetes Operator in detail.
However, you can find an example service, role and role-binding in the install/namespace-rbac
directory which you can use to setup a new application namespace:
kubectl -n <application-namespace> -f install/namespace-rbac
This creates a flink
service account that must be referenced in the FlinkDeployment
resource using the spec.serviceAccount
property.
Writing Queries
For information on how Flink SQL supports writing queries, please refer to the upstream documentation.
Using secrets
You can use Kubernetes secrets with the Flink SQL Runner to provide security credentials to Flink job for connecting to the source or the target systems. Secrets can be directly templated in the SQL statements with the following pattern:
{{secret:<NAMESPACE>/<SECRET NAME>/<DATA KEY>}}
Special Characters
Note that semicolon ;
is a special character used as a statement delimiter. If it’s part of your SQL statements, make sure it is escaped by \\
.
For example, it might be used when specifying a properties.sasl.jaas.config
value for a Kafka connector configuration.
In this case, it would look something like this:
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"test-user\" password=\"{{secret:flink/test-user/user.password}}\"\\;'
Deploying a Flink SQL Query
Your SQL query can be submitted via either:
The
spec.job.args
field of theFlinkDeployment
custom resource.The
SQL_STATEMENTS
environment variable of the mainFlinkDeployment
container.
spec.job.args
Your SQL query should be formed of a single string within an array literal ([ ]
).
Multi-line yaml strings (using |
,>
characters) are not currently supported.
However, newlines, tabs, and other whitespace characters within a single string are ignored, so queries can still be well-formatted.
See the example below for an illustration of the formatting.
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: standalone-etl
spec:
image: quay.io/streamshub/flink-sql-runner:0.2.0
flinkVersion: v2_0
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:///opt/streamshub/flink-sql-runner.jar
args: ["
CREATE TABLE orders (
order_number BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING,
last_name STRING>,
last_name STRING,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen'
);
CREATE TABLE print_table
WITH (
'connector' = 'print'
)
LIKE orders
;
INSERT INTO print_table
SELECT *
FROM orders;
"]
parallelism: 1
upgradeMode: stateless
SQL_STATEMENTS
This environment variable is useful for providing an SQL query from a ConfigMap
or Secret
.
For example, like this:
-- standalone-etl.sql
CREATE TABLE orders (
order_number BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING,
last_name STRING>,
last_name STRING,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen'
);
CREATE TABLE print_table
WITH (
'connector' = 'print'
)
LIKE orders;
INSERT INTO print_table
SELECT *
FROM orders;
kubectl create configmap -n flink standalone-etl-sql \
--from-file=SQL_STATEMENTS=standalone-etl.sql
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: standalone-etl
spec:
image: quay.io/streamshub/flink-sql-runner:0.2.0
flinkVersion: v2_0
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
podTemplate:
kind: Pod
spec:
containers:
- name: flink-main-container
envFrom:
- configMapRef:
name: standalone-etl-sql
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:///opt/streamshub/flink-sql-runner.jar
parallelism: 1
upgradeMode: stateless
Monitoring metrics
Note | This section assumes the following:
|
Integrating Prometheus into Flink cluster
Installing Prometheus
Depending on whether you’re using Kubernetes or OpenShift, you will have to follow different steps to set up Prometheus.
Kubernetes
After deploying a Flink cluster, you can deploy Prometheus to monitor metrics from the job manager and task manager by following these steps:
Install the CoreOS Prometheus Operator:
curl -s https://raw.githubusercontent.com/coreos/prometheus-operator/v0.84.0/bundle.yaml > prometheus-operator-deployment.yaml # Specify to install operator into 'flink' namespace sed -E -i '/[[:space:]]*namespace: [a-zA-Z0-9-]*$/s/namespace:[[:space:]]*[a-zA-Z0-9-]*$/namespace: flink/' prometheus-operator-deployment.yaml kubectl create -f prometheus-operator-deployment.yaml
Install the following pre-configured CRs:
kubectl apply -f prometheus-install/ -n flink
Service
andServiceMonitor
for the Flink kubernetes operator.PodMonitor
for therecommendation-app
Flink JobManager and TaskManager.
Also, install the following Kubernetes-specific pre-configured CRs:
Prometheus
andService
for the Prometheus instance.ServiceAccount
,Role
, andRoleBinding
for allowing Prometheus to scrape targets.kubectl apply -f prometheus-install/kubernetes/ -n flink
Expose the Prometheus UI with a port-forward rule:
kubectl port-forward svc/prometheus-service 9090 -n flink
Now, you can monitor the metrics in the Flink kubernetes operator, job manager, or task manager via the Prometheus UI accessible at
localhost:9090
.
OpenShift
Note | Make sure you have enabled monitoring for user-defined projects. E.g. like this:
|
Since OpenShift comes with Prometheus built-in and pre-configured,
we can integrate with it by deploying a PodMonitor
CR
for the Flink cluster.
Install the following pre-configured CRs:
oc apply -f prometheus-install/ -n flink
Service
andServiceMonitor
for the Flink kubernetes operator.PodMonitor
for therecommendation-app
Flink JobManager and TaskManager.
It takes around 5 minutes for the Prometheus operator to update the config for the Prometheus server. After that, you can query the metrics in the OpenShift UI by navigating to the "Observe" section in the left nav bar.
Integrating Grafana into Flink cluster
Depending on whether you’re using Kubernetes or OpenShift, you will have to follow different steps to set up Grafana.
Installing Grafana
Kubernetes
After deploying Prometheus, you can deploy Grafana to create dashboards and visualize the data.
Install the pre-configured Grafana server,
Service
, data sources, and example Flink dashboard:kubectl apply -f grafana-install/kubernetes/grafana-datasources.yaml -n flink kubectl apply -f grafana-install/ -n flink
Notegrafana-datasources.yaml
is pre-configured to connect to the Prometheus server from the previous section.
OpenShift
Since our Grafana deployment will need to fetch data from OpenShift’s Prometheus instance,
we need to create a ServiceAccount
with the right permissions and pass its token to Grafana.
Create a
ServiceAccount
with permissions for monitoring:oc project openshift-user-workload-monitoring oc create sa grafana-sa oc adm policy add-cluster-role-to-user cluster-monitoring-view -z grafana-sa
Create a token for the
ServiceAccount
, then apply a Grafana datasource containing the token:export PROMETHEUS_TOKEN=`oc -n openshift-user-workload-monitoring create token grafana-sa` cat grafana-install/openshift/grafana-datasources.yaml | envsubst | oc apply -n flink -f -
NoteIn production, you should bind the token to an object. Install the pre-configured Grafana server,
Service
, and example Flink dashboard:oc apply -f grafana-install/ -n flink
Viewing the example dashboard
After confirming the Grafana deployment is running, we can expose it with a port-forward rule:
kubectl port-forward svc/grafana-service 3000 -n flink
Note | The default Grafana username and password are both admin . |
Upon logging in to the Grafana UI, you should be greeted with the provided example Flink dashboard.