Note: This tutorial is mainly focused on anomaly detection. For detailed information on working with Flink ETL Jobs and Session Clusters, look at the Interactive ETL example.
Flink CEP (Complex Event Processing) is a Flink library made for finding patterns in data streams e.g. for detecting suspicious bank transactions.
It can be accessed in
Flink SQL using the
MATCH_RECOGNIZE
clause.
In this tutorial, we will use Flink SQL and its MATCH_RECOGNIZE
clause to detect and report suspicious sales across many users in real-time.
The tutorial is based on the StreamsHub
Flink SQL Examples repository and the code can be found under the
tutorials/anomaly-detection
directory.
Setup #
Note: If you want more information on what the steps below are doing, look at the Interactive ETL example setup which is almost identical.
Spin up a minikube cluster:
minikube start --cpus 4 --memory 16G
From the main
tutorials
directory, run the data generator setup script:./scripts/data-gen-setup.sh
(Optional) Verify that the test data is flowing correctly (wait a few seconds for messages to start flowing):
kubectl exec -it my-cluster-dual-role-0 -n flink -- /bin/bash \ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink.sales.records
Deploy a Flink session cluster:
kubectl -n flink apply -f anomaly-detection/flink-session-anomaly.yaml
Scenario #
Source Data Table #
The data generator application creates a topic (flink.sales.records
) containing sales records.
The schema for this topic can be seen in the data-generator/src/main/resources/sales.avsc
:
{
"namespace": "com.github.streamshub.kafka.data.generator.schema",
"type": "record",
"name": "Sales",
"fields": [
{"name": "user_id", "type": "string"},
{"name": "product_id", "type": "string"},
{"name": "invoice_id", "type": "string"},
{"name": "quantity", "type": "string"},
{"name": "unit_cost", "type": "string"}
]
}
Note: All of the fields are of type
string
for simplicity. We’llCAST()
quantity
to anINT
later on.
(Assuming you have the data generator up and running as per the instructions in the Setup section, you can verify this by running the following command):
$ kubectl exec -it my-cluster-dual-role-0 -n flink -- /bin/bash \
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic flink.sales.records
user-11188&30299767430689348882
£838
user-9467&63188787247555258843
£971
...
High Sale Quantities #
We want to detect if a user ordered a higher quantity than they usually do. This could be a sign that they made a mistake or their account has been hijacked, which could cause troubles for all parties involved if not dealt with promptly. It could also be a positive sign of increased interest in a particular product and our sales team might want to be notified. Regardless, it is a situation we would like to be able to spot and take action against.
Data analysis #
Setting up the Flink SQL CLI #
We’re going to use the Flink SQL CLI to interactively try various queries against our data.
First, let’s port forward the Flink Job Manager pod so the Flink SQL CLI can access it:
kubectl -n flink port-forward <job-manager-pod> 8081:8081
The job manager pod will have the name format session-cluster-anomaly-<alphanumeric>
, your kubectl
should tab-complete the name.
If it doesn’t, then you can find the job manager name by running kubectl -n flink get pods
.
Next, let’s get the Flink SQL CLI up and running:
podman run -it --rm --net=host \
quay.io/streamshub/flink-sql-runner:0.2.0 \
/opt/flink/bin/sql-client.sh embedded
Once we’re in, we can create a table for the sales records:
CREATE TABLE SalesRecordTable (
invoice_id STRING,
user_id STRING,
product_id STRING,
quantity STRING,
unit_cost STRING,
`purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'flink.sales.records',
'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092',
'properties.group.id' = 'sales-record-group',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6',
'scan.startup.mode' = 'latest-offset'
);
We can do a simple query to verify that the table was created correctly and that the data is flowing (give it a few seconds to start receiving data):
SELECT * FROM SalesRecordTable;
Classifying “unusual” sales #
There are many arbitrary ways we could use to define an “unusual” or “suspicious” sale quantity.
By looking at the data in the SalesRecordTable
, we can observe that users typically order quantities between 1
and 3
(inclusive):
Note: Don’t worry if the constantly changing output in the following queries is confusing. The main thing to take away is that the values in the
avg_quantity
column stay as2
most of the time, which means that the average quantity for each user stays as2
even as new sales are generated and cause the calculation to update.
Fetch all the sales and group them by user:
SELECT user_id, COUNT(user_id) AS total_sales_count FROM SalesRecordTable GROUP BY user_id;
For each user, take all of their sales and calculate the average quantity:
SELECT user_id, COUNT(user_id) AS total_sales_count, AVG(CAST(quantity AS INT)) AS avg_quantity FROM SalesRecordTable GROUP BY user_id;
However, we can occasionally see sales with much higher quantities. Usually, between 10
and 30
.
If we wanted to, we could simply classify any quantity above 3
as “unusual”:
SELECT *
FROM SalesRecordTable
WHERE quantity > 3;
Note: This query might take a couple of seconds to return results, since the sales with high quantities are unusual.
Of course, this wouldn’t be a particularly good measure. Specific users might always order higher quantities than the average user.
A more useful measure would involve calculating the average sale quantity of each user, and considering a quantity “unusual” if it is, for example, 3 times higher than that user’s average.
The following query will return sales that match that condition:
SELECT sales.*, user_average.avg_quantity
FROM SalesRecordTable sales
JOIN (
SELECT
user_id,
AVG(CAST(quantity AS INT)) AS avg_quantity
FROM SalesRecordTable
GROUP BY user_id
) user_average
ON sales.user_id = user_average.user_id
WHERE sales.quantity > (user_average.avg_quantity * 3);
While useful, the query above has several flaws:
The “unusual” quantities are included in the
AVG()
calculation and skew it.- Ideally, we would not only exclude these sales from the
AVG()
, but maybe even reset theAVG()
after an “unusual” sale.
- Ideally, we would not only exclude these sales from the
The
AVG()
calculation uses all of a user’s sales, which might not be ideal.This calculation could use a lot of computing resources, depending on the volume and frequency of sales.
Limiting the
AVG()
to sales made, for example, in the past week, might be more useful.
The query would become much more complex if, for example, we only wanted to return a match if two “unusual” sales occurred one after another.
- In a typical database, we would likely have to use some combination of
WITH_TIES
,OVER
, andPARTITION BY
. Assuming those are even supported.
- In a typical database, we would likely have to use some combination of
MATCH_RECOGNIZE
lets us easily and concisely solve these problems.
Using MATCH_RECOGNIZE
#
Simple pattern #
We can use MATCH_RECOGNIZE
to easily look for both simple and complex patterns.
For example, you can match any sale with a quantity higher than 3
like this:
SELECT *
FROM SalesRecordTable
MATCH_RECOGNIZE (
ORDER BY purchase_time
MEASURES
UNUSUAL_SALE.quantity AS unusual_quantity,
UNUSUAL_SALE.purchase_time AS unusual_tstamp
PATTERN (UNUSUAL_SALE)
DEFINE
UNUSUAL_SALE AS
CAST(UNUSUAL_SALE.quantity AS INT) > 3
);
The ORDER BY
clause is required, it allows us to search for patterns based on
different notions of time.
We pass it the
purchase_time
field from ourSalesRecordTable
, which contains copies of the timestamp embedded in our source KafkaConsumerRecord
s, as the event time.With streaming, this ensures the output of the query will be correct, even if some sales arrive late.
The DEFINE
and MEASURES
clauses are similar to the WHERE
and SELECT
SQL clauses respectively.
We
DEFINE
a singleUNUSUAL_SALE
“pattern variable” with our condition, then include it in our pattern.Note: The value of
UNUSUAL_SALE
is the row/sale which matches ourDEFINE
d condition.
In
MEASURES
, we use the value ofUNUSUAL_SALE
to output both the quantity and timestamp of the “unusual” sale.
Pattern navigation #
Maybe instead, we want to look for two sales with the same quantity occurring one after another. This could be a sign that the user accidentally made the same order twice.
For now, let’s simply match two sales occurring one after another, that both have a quantity of 2
. The
PATTERN
syntax is similar to
regular expression syntax, so we can easily extend our pattern from above using
“quantifiers” to accomplish that:
SELECT *
FROM SalesRecordTable
MATCH_RECOGNIZE (
ORDER BY purchase_time
MEASURES
FIRST(UNUSUAL_SALE.quantity) AS first_unusual_quantity,
FIRST(UNUSUAL_SALE.purchase_time) AS first_unusual_tstamp,
LAST(UNUSUAL_SALE.quantity) AS last_unusual_quantity,
LAST(UNUSUAL_SALE.purchase_time) AS last_unusual_tstamp
PATTERN (UNUSUAL_SALE{2})
DEFINE
UNUSUAL_SALE AS
CAST(UNUSUAL_SALE.quantity AS INT) = 2
);
Notice how the UNUSUAL_SALE
pattern variable doesn’t simply hold one value, it maps to multiple rows/sales/events. We’re able to pass it to the FIRST()
and LAST()
functions to output the quantities from both the first and second matching sale respectively.
These functions are specifically referred to as “offset functions”, since we use “logical offsets” to navigate the events mapped to a particular pattern variable.
Useful pattern #
Finally, let’s use some techniques from the useful measure/query in the
“Classifying “unusual” sales” section in a MATCH_RECOGNIZE
:
SELECT *
FROM SalesRecordTable
MATCH_RECOGNIZE (
PARTITION BY user_id
ORDER BY purchase_time
MEASURES
UNUSUAL_SALE.invoice_id AS unusual_invoice_id,
CAST(UNUSUAL_SALE.quantity AS INT) AS unusual_quantity,
UNUSUAL_SALE.purchase_time AS unusual_tstamp,
AVG(CAST(TYPICAL_SALE.quantity AS INT)) AS avg_quantity,
FIRST(TYPICAL_SALE.purchase_time) AS avg_first_sale_tstamp,
LAST(TYPICAL_SALE.purchase_time) AS avg_last_sale_tstamp
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (TYPICAL_SALE+? UNUSUAL_SALE) WITHIN INTERVAL '10' SECOND
DEFINE
UNUSUAL_SALE AS
UNUSUAL_SALE.quantity > AVG(CAST(TYPICAL_SALE.quantity AS INT)) * 3
);
This query might look intimidating at first, but becomes easier to understand once we break it down.
ORDER BY purchase_time
#
As mentioned previously, this clause allows us to look for pattens based on time. In our case, the purchase time of the sale.
PARTITION BY user_id
#
This is similar to GROUP BY user_id
in the original query, it lets us calculate AVG()
values and find matches for each user separately.
Unlike with a global user average, we won’t receive false positives because of a minority of users who order in large quantities often.
PATTERN (TYPICAL_SALE+? UNUSUAL_SALE)
#
We use two “pattern variables” in our pattern:
TYPICAL_SALE
: We use this to match all the sales made before an “unusual” sale.By not specifying a condition for this variable in
DEFINE
, the default condition is used, which evaluates totrue
for every row/sale.Notice how we use
TYPICAL_SALE+?
instead of justTYPICAL_SALE
.+
and?
are both “quantifiers”, just like the{ n }
inSALE{2}
from one of our previous queries.- We use them to match “one or more” typical sales, since an
AVG()
of zero sales isn’t very useful.
- We use them to match “one or more” typical sales, since an
By combining both of those “quantifiers” into
+?
, we make a “reluctant quantifier”.This reduces memory consumption by specifying to match as few typical sales as possible.
Since
TYPICAL_SALE
matches every row/sale by default, we need to use a “reluctant quantifier”, or the query might match all rows and never finish.
UNUSUAL_SALE
: We use this to match an “unusual” sale.Unlike in our original query, we’re able to easily calculate an
AVG()
of just typical sales by usingAVG(TYPICAL_SALE.quantity)
.- This prevents “unusual” sales from skewing our
AVG()
and creating false positives.
- This prevents “unusual” sales from skewing our
We append WITHIN INTERVAL '10' SECOND
after the pattern to set a
“time constraint”.
Note: We use an
INTERVAL
of10 SECOND
s for quick user feedback in this tutorial. In a real situation, you would probably use a much longer interval, for example:WITHIN INTERVAL '1' HOUR
.
Setting a time interval provides several benefits:
Only the sales made within the specified period are used.
- Memory use becomes more efficient, since we can prune sales older than this.
Our
AVG()
calculation changes from a typical arithmetic mean to a simple moving average.There are benefits and downsides to both approaches.
- In our case, sales are frequent, so only using recent sales can be beneficial.
MEASURES
#
Like in a typical SQL SELECT
, we use this clause to specify what to output, and use values from the “pattern variables” to output information on the “unusual” sale and the AVG()
of the typical sales.
We use FIRST()
and LAST()
to output timestamps for the first and last sales that were used to calculate our AVG()
. This information lets us know exactly which previous sales were used to determine an “unusual” sale.
ONE ROW PER MATCH
#
Currently, this is the only supported “output mode”.
As the name suggests, it indicates to only output one row when a match is found.
Once released, ALL ROWS PER MATCH
will allow you to output multiple rows instead.
AFTER MATCH SKIP PAST LAST ROW
#
This is pretty self-explanatory, we skip past the last row/sale of a match before looking for the next match.
Other “After Match Strategies” are available for skipping to different rows and pattern variable values inside the current match. However, they aren’t particularly useful in our scenario.
Our strategy skips past the “unusual” sale of the current match. This prevents the “unusual” sale from being wrongly used as the first “typical” sale of the next match and skewing the AVG()
.
Persisting back to Kafka #
Just like in the Interactive ETL tutorial, we can create a new table to persist the output of our query back to Kafka (look at that tutorial for an explanation of the steps below). This way, we don’t have to run the query every time we want to find “unusual” sales.
First, let’s define the table, and specify csv
as the format so we don’t have to provide a schema:
CREATE TABLE UnusualSalesRecordTable (
user_id STRING,
unusual_invoice_id STRING,
unusual_quantity INT,
unusual_tstamp TIMESTAMP(3),
avg_quantity INT,
avg_first_sale_tstamp TIMESTAMP(3),
avg_last_sale_tstamp TIMESTAMP(3),
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'flink.unusual.sales.records.interactive',
'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092',
'properties.client.id' = 'sql-cleaning-client',
'properties.transaction.timeout.ms' = '800000',
'key.format' = 'csv',
'value.format' = 'csv',
'value.fields-include' = 'ALL'
);
Next, let’s insert the results of our “unusual” sales pattern matching query into it:
INSERT INTO UnusualSalesRecordTable
SELECT *
FROM SalesRecordTable
MATCH_RECOGNIZE (
PARTITION BY user_id
ORDER BY purchase_time
MEASURES
UNUSUAL_SALE.invoice_id AS unusual_invoice_id,
CAST(UNUSUAL_SALE.quantity AS INT) AS unusual_quantity,
UNUSUAL_SALE.purchase_time AS unusual_tstamp,
AVG(CAST(TYPICAL_SALE.quantity AS INT)) AS avg_quantity,
FIRST(TYPICAL_SALE.purchase_time) AS avg_first_sale_tstamp,
LAST(TYPICAL_SALE.purchase_time) AS avg_last_sale_tstamp
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (TYPICAL_SALE+? UNUSUAL_SALE) WITHIN INTERVAL '10' SECOND
DEFINE
UNUSUAL_SALE AS
UNUSUAL_SALE.quantity > AVG(CAST(TYPICAL_SALE.quantity AS INT)) * 3
);
Finally, we can verify the data is being written to the new topic by running the following command in a new terminal:
$ kubectl exec -it my-cluster-dual-role-0 -n flink -- /bin/bash \
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic flink.unusual.sales.records.interactive
user-67,7850595442358871117,30,"2025-07-10 14:07:02.697",1,"2025-07-10 14:06:54.566","2025-07-10 14:07:01.679"
user-77,787429984061010435,10,"2025-07-10 14:07:04.729",1,"2025-07-10 14:06:58.641","2025-07-10 14:07:01.672"
user-98,3476938040725302112,20,"2025-07-10 14:07:05.751",1,"2025-07-10 14:06:56.594","2025-07-10 14:07:05.749"
Converting to a stand alone Flink job #
The ETL query (deployed above) will have to compete for resources with other queries running in the same Flink session cluster.
Instead, like in the
Interactive ETL example, we can use a FlinkDeployment
CR for deploying our queries as a stand-alone Flink Job.
There is an example FlinkDeployment
CR (standalone-etl-anomaly-deployment.yaml
) that we can use:
kubectl apply -n flink -f anomaly-detection/standalone-etl-anomaly-deployment.yaml
Finally, we can verify that data is being written to the new topic:
kubectl exec -it my-cluster-dual-role-0 -n flink -- /bin/bash \
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic flink.unusual.sales.records