

Pattern Recognition 模式识别


It is a common use case to search for a set of event patterns, especially in case of data streams. Flink comes with a complex event processing (CEP) library which allows for pattern detection in event streams. Furthermore, Flink’s SQL API provides a relational way of expressing queries with a large set of built-in functions and rule-based optimizations that can be used out of the box.
搜索一组事件模式是一种常见的用例,尤其是在数据流的情况下。Flink附带了一个复杂事件处理(CEP)库,允许在事件流中进行模式检测。此外,Flink的SQL API提供了一种关系型的查询表达方式,其中包含大量内置函数和基于规则的优化,可以开箱即用。

In December 2016, the International Organization for Standardization (ISO) released a new version of the SQL standard which includes Row Pattern Recognition in SQL (ISO/IEC TR 19075-5:2016). It allows Flink to consolidate CEP and SQL API using the MATCH_RECOGNIZE clause for complex event processing in SQL.
2016年12月,国际标准化组织(ISO)发布了新版本的SQL标准,其中包括SQL中的行模式识别 (ISO/IEC TR 19075-5:2016)。它允许Flink使用MATCH_RECOGNIZE 子句合并CEP和SQL API,以便在SQL中进行复杂事件处理。

A MATCH_RECOGNIZE clause enables the following tasks:

  • Logically partition and order the data that is used with the PARTITION BY and ORDER BY clauses.
    对与PARTITION BY和ORDER BY子句一起使用的数据进行逻辑分区和排序。
  • Define patterns of rows to seek using the PATTERN clause. These patterns use a syntax similar to that of regular expressions.
  • The logical components of the row pattern variables are specified in the DEFINE clause.
  • Define measures, which are expressions usable in other parts of the SQL query, in the MEASURES clause.

The following example illustrates the syntax for basic pattern recognition:

SELECT T.aid,, T.cid
FROM MyTable
      PARTITION BY userid
      ORDER BY proctime
      MEASURES AS aid, AS bid, AS cid
      PATTERN (A B C)
        A AS name = 'a',
        B AS name = 'b',
        C AS name = 'c'
    ) AS T

This page will explain each keyword in more detail and will illustrate more complex examples.

Flink’s implementation of the MATCH_RECOGNIZE clause is a subset of the full standard. Only those features documented in the following sections are supported. Additional features may be supported based on community feedback, please also take a look at the known limitations.

Introduction and Examples

Installation Guide

The pattern recognition feature uses the Apache Flink’s CEP library internally. In order to be able to use the MATCH_RECOGNIZE clause, the library needs to be added as a dependency to your Maven project.
模式识别功能在内部使用Apache Flink的CEP库。为了能够使用MATCH_RECOGNIZE子句,需要将库作为依赖项添加到Maven项目中。


Alternatively, you can also add the dependency to the cluster classpath (see the dependency section for more information).

If you want to use the MATCH_RECOGNIZE clause in the SQL Client, you don’t have to do anything as all the dependencies are included by default.

SQL Semantics

Every MATCH_RECOGNIZE query consists of the following clauses:

  • PARTITION BY - defines the logical partitioning of the table; similar to a GROUP BY operation.
    PARTITION BY : 定义表的逻辑分区;类似于GROUP BY操作。
  • ORDER BY - specifies how the incoming rows should be ordered; this is essential as patterns depend on an order.
    ORDER BY :指定传入行的排序方式;这是至关重要的,因为模式取决于顺序。
  • MEASURES - defines output of the clause; similar to a SELECT clause.
    MEASURES :定义语句的输出;类似于SELECT子句。
  • ONE ROW PER MATCH - output mode which defines how many rows per match should be produced.
    ONE ROW PER MATCH : 输出模式,定义每个匹配应该产生多少行。
  • AFTER MATCH SKIP - specifies where the next match should start; this is also a way to control how many distinct matches a single event can belong to.
    AFTER MATCH SKIP : 指定下一个匹配的开始位置;这也是一种控制单个事件可以属于多少不同匹配的方法。
  • PATTERN - allows constructing patterns that will be searched for using a regular expression-like syntax.
    PATTERN : 允许构建类似正则表达式的语法进行搜索的模式。
  • DEFINE - this section defines the conditions that the pattern variables must satisfy.
    DEFINE : 定义模式变量必须满足的条件。

Attention Currently, the MATCH_RECOGNIZE clause can only be applied to an append table. Furthermore, it always produces an append table as well.


For our examples, we assume that a table Ticker has been registered. The table contains prices of stocks at a particular point in time.

The table has a following schema:

     |-- symbol: String                           # symbol of the stock 股票的符号
     |-- price: Long                              # price of the stock
     |-- tax: Long                                # tax liability of the stock 股票的纳税义务
     |-- rowtime: TimeIndicatorTypeInfo(rowtime)  # point in time when the change to those values happened

For simplification, we only consider the incoming data for a single stock ACME. A ticker could look similar to the following table where rows are continuously appended.

symbol         rowtime         price    tax
======  ====================  ======= =======
'ACME'  '01-Apr-11 10:00:00'   12      1
'ACME'  '01-Apr-11 10:00:01'   17      2
'ACME'  '01-Apr-11 10:00:02'   19      1
'ACME'  '01-Apr-11 10:00:03'   21      3
'ACME'  '01-Apr-11 10:00:04'   25      2
'ACME'  '01-Apr-11 10:00:05'   18      1
'ACME'  '01-Apr-11 10:00:06'   15      1
'ACME'  '01-Apr-11 10:00:07'   14      2
'ACME'  '01-Apr-11 10:00:08'   24      2
'ACME'  '01-Apr-11 10:00:09'   25      2
'ACME'  '01-Apr-11 10:00:10'   19      1

The task is now to find periods of a constantly decreasing price of a single ticker. For this, one could write a query like:

FROM Ticker
        PARTITION BY symbol
        ORDER BY rowtime
            START_ROW.rowtime AS start_tstamp,
            LAST(PRICE_DOWN.rowtime) AS bottom_tstamp,
            LAST(PRICE_UP.rowtime) AS end_tstamp
            PRICE_DOWN AS
                (LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < START_ROW.price) OR
                    PRICE_DOWN.price < LAST(PRICE_DOWN.price, 1),
            PRICE_UP AS
                PRICE_UP.price > LAST(PRICE_DOWN.price, 1)
    ) MR;

The query partitions the Ticker table by the symbol column and orders it by the rowtime time attribute.

The PATTERN clause specifies that we are interested in a pattern with a starting event START_ROW that is followed by one or more PRICE_DOWN events and concluded with a PRICE_UP event. If such a pattern can be found, the next pattern match will be seeked at the last PRICE_UP event as indicated by the AFTER MATCH SKIP TO LAST clause.
PATTERN子句指定我们对一个模式感兴趣,该模式有一个起始事件START_ROW,其后是一个或多个PRICE_DOWN事件,并以PRICE_UP事件结束。如果可以找到这样的模式,下一个模式匹配将会在AFTER MATCH SKIP TO LAST子句所示的最后一个PRICE_UP事件上被找到。

The DEFINE clause specifies the conditions that need to be met for a PRICE_DOWN and PRICE_UP event. Although the START_ROW pattern variable is not present it has an implicit condition that is evaluated always as TRUE.

A pattern variable PRICE_DOWN is defined as a row with a price that is smaller than the price of the last row that met the PRICE_DOWN condition. For the initial case or when there is no last row that met the PRICE_DOWN condition, the price of the row should be smaller than the price of the preceding row in the pattern (referenced by START_ROW).
模式变量PRICE_DOWN定义为价格小于满足PRICE_ DOWN条件的最后一行的价格的行。对于初始情况,或者当没有满足PRICE_DOWN条件的最后一行时,该行的价格应该小于模式中前一行的价格(由START_ROW引用)。

A pattern variable PRICE_UP is defined as a row with a price that is larger than the price of the last row that met the PRICE_DOWN condition.

This query produces a summary row for each period in which the price of a stock was continuously decreasing.

The exact representation of the output rows is defined in the MEASURES part of the query. The number of output rows is defined by the ONE ROW PER MATCH output mode.
输出行的精确表示在查询的MEASURES部分中定义。输出行数由“ONE ROW PER MATCH”输出模式定义。

 symbol       start_tstamp       bottom_tstamp         end_tstamp
=========  ==================  ==================  ==================
ACME       01-APR-11 10:00:04  01-APR-11 10:00:07  01-APR-1

