Flink SQL 知其所以然:SQL 的时区问题!

SQL 时区问题

1.SQL 时区解决的问题

首先说一下这个问题的背景:

大家想一下离线 Hive 环境中,有遇到过时区时区相关的问题吗?

至少博主目前没有碰到过,因为这个问题在底层的数据集成系统都已经给解决了,小伙伴萌拿到手的 ODS 层表都是已经按照所在地区的时区给格式化好的了。

举个例子:小伙伴萌看到日期分区为 2022-01-01 的 Hive 表时,可以默认认为该分区中的数据就对应到你所在地区的时区的 2022-01-01 日的数据。

但是 Flink 中时区问题要特别引起关注,不加小心就会误用。

而本节 SQL 时区旨在帮助大家了解到以下两个场景的问题:

  • 在 1.13 之前,DDL create table 中使用 PROCTIME() 指定处理时间列时,返回值类型为 TIMESTAMP(3) 类型,而 TIMESTAMP(3) 是不带任何时区信息的,默认为 UTC 时间(0 时区)。
  • 使用 StreamTableEnvironment::createTemporaryView 将 DataStream 转为 Table 时,注册处理时间(proctime.proctime)、事件时间列(rowtime.rowtime)时,两列时间类型也为 TIMESTAMP(3) 类型,不带时区信息。

而以上两个场景就会导致:

  • 在北京时区的用户使用 TIMESTAMP(3) 类型的时间列开最常用的 1 天的窗口时,划分出来的窗口范围是北京时间的 [2022-01-01 08:00:00, 2022-01-02 08:00:00],而不是北京时间的 [2022-01-01 00:00:00, 2022-01-02 00:00:00]。因为 TIMESTAMP(3) 是默认的 UTC 时间,即 0 时区。
  • 北京时区的用户将 TIMESTAMP(3) 类型时间属性列转为 STRING 类型的数据展示时,也是 UTC 时区的,而不是北京时间的。

因此充分了解本节的知识内容可以很好的帮你避免时区问题错误。

2.SQL 时间类型

  • Flink SQL 支持 TIMESTAMP(不带时区信息的时间)、TIMESTAMP_LTZ(带时区信息的时间)
  • TIMESTAMP(不带时区信息的时间):是通过一个 年, 月, 日, 小时, 分钟, 秒 和 小数秒 的字符串来指定。举例:1970-01-01 00:00:04.001。
  • 为什么要使用字符串来指定呢?因为此种类型不带时区信息,所以直接用一个字符串指定就好了?
  • 那 TIMESTAMP 字符串的时间代表的是什么时区的时间呢?UTC 时区,也就是默认 0 时区,对应中国北京是东八区。
  • TIMESTAMP_LTZ(带时区信息的时间):没有字符串来指定,而是通过 java 标准 epoch 时间 1970-01-01T00:00:00Z 开始计算的毫秒数。举例:1640966400000。
  • 其时区信息是怎么指定的呢?是通过本次任务中的时区配置参数 table.local-time-zone 设置的。
  • 时间戳本身也不带有时区信息,为什么要使用时间戳来指定呢?就是因为时间戳不带有时区信息,所以我们通过配置 table.local-time-zone 时区参数之后,就能将一个不带有时区信息的时间戳转换为带有时区信息的字符串了。举例:table.local-time-zone 为 Asia/Shanghai 时,4001 时间戳转化为字符串的效果是 1970-01-01 08:00:04.001。

3.时区参数生效的 SQL 时间函数

以下 SQL 中的时间函数都会受到时区参数的影响,从而做到最后显示给用户的时间、窗口的划分都按照用户设置时区之内的时间。

  • LOCALTIME;
  • LOCALTIMESTAMP;
  • CURRENT_DATE;
  • CURRENT_TIME;
  • CURRENT_TIMESTAMP;
  • CURRENT_ROW_TIMESTAMP();
  • NOW();
  • PROCTIME():其中 PROCTIME() 在 1.13 版本及之后版本,返回值类型是 TIMESTAMP_LTZ(3)。

在 Flink SQL client 中执行结果如下:

Flink SQL> SET sql-client.execution.result-mode=tableau;
Flink SQL> CREATE VIEW MyView1 AS SELECT LOCALTIME, LOCALTIMESTAMP, CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_ROW_TIMESTAMP(), NOW(), PROCTIME();
Flink SQL> DESC MyView1;

+------------------------+-----------------------------+-------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+------------------------+-----------------------------+-------+-----+--------+-----------+
| LOCALTIME | TIME(0) | false | | | |
| LOCALTIMESTAMP | TIMESTAMP(3) | false | | | |
| CURRENT_DATE | DATE | false | | | |
| CURRENT_TIME | TIME(0) | false | | | |
| CURRENT_TIMESTAMP | TIMESTAMP_LTZ(3) | false | | | |
|CURRENT_ROW_TIMESTAMP() | TIMESTAMP_LTZ(3) | false | | | |
| NOW() | TIMESTAMP_LTZ(3) | false | | | |
| PROCTIME() | TIMESTAMP_LTZ(3) *PROCTIME* | false | | | |
+------------------------+-----------------------------+-------+-----+--------+-----------+

Flink SQL> SET table.local-time-zone=UTC;
Flink SQL> SELECT * FROM MyView1;

+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| LOCALTIME | LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME | CURRENT_TIMESTAMP | CURRENT_ROW_TIMESTAMP() | NOW() | PROCTIME() |
+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| 15:18:36 | 2021-04-15 15:18:36.384 | 2021-04-15 | 15:18:36 | 2021-04-15 15:18:36.384 | 2021-04-15 15:18:36.384 | 2021-04-15 15:18:36.384 | 2021-04-15 15:18:36.384 |
+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+

Flink SQL> SET table.local-time-zone=Asia/Shanghai;
Flink SQL> SELECT * FROM MyView1;

+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| LOCALTIME | LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME | CURRENT_TIMESTAMP | CURRENT_ROW_TIMESTAMP() | NOW() | PROCTIME() |
+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| 23:18:36 | 2021-04-15 23:18:36.384 | 2021-04-15 | 23:18:36 | 2021-04-15 23:18:36.384 | 2021-04-15 23:18:36.384 | 2021-04-15 23:18:36.384 | 2021-04-15 23:18:36.384 |
+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+


Flink SQL> CREATE VIEW MyView2 AS SELECT TO_TIMESTAMP_LTZ(4001, 3) AS ltz, TIMESTAMP '1970-01-01 00:00:01.001' AS ntz;
Flink SQL> DESC MyView2;

+------+------------------+-------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+------+------------------+-------+-----+--------+-----------+
| ltz | TIMESTAMP_LTZ(3) | true | | | |
| ntz | TIMESTAMP(3) | false | | | |
+------+------------------+-------+-----+--------+-----------+

Flink SQL> SET table.local-time-zone=UTC;
Flink SQL> SELECT * FROM MyView2;

+-------------------------+-------------------------+
| ltz | ntz |
+-------------------------+-------------------------+
| 1970-01-01 00:00:04.001 | 1970-01-01 00:00:01.001 |
+-------------------------+-------------------------+

Flink SQL> SET table.local-time-zone=Asia/Shanghai;
Flink SQL> SELECT * FROM MyView2;

+-------------------------+-------------------------+
| ltz | ntz |
+-------------------------+-------------------------+
| 1970-01-01 08:00:04.001 | 1970-01-01 00:00:01.001 |
+-------------------------+-------------------------+

Flink SQL> CREATE VIEW MyView3 AS SELECT ltz, CAST(ltz AS TIMESTAMP(3)), CAST(ltz AS STRING), ntz, CAST(ntz AS TIMESTAMP_LTZ(3)) FROM MyView2;
Flink SQL> DESC MyView3;
+-------------------------------+------------------+-------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+-------------------------------+------------------+-------+-----+--------+-----------+
| ltz | TIMESTAMP_LTZ(3) | true | | | |
| CAST(ltz AS TIMESTAMP(3)) | TIMESTAMP(3) | true | | | |
| CAST(ltz AS STRING) | STRING | true | | | |
| ntz | TIMESTAMP(3) | false | | | |
| CAST(ntz AS TIMESTAMP_LTZ(3)) | TIMESTAMP_LTZ(3) | false | | | |
+-------------------------------+------------------+-------+-----+--------+-----------+

Flink SQL> SELECT * FROM MyView3;

+-------------------------+---------------------------+-------------------------+-------------------------+-------------------------------+
| ltz | CAST(ltz AS TIMESTAMP(3)) | CAST(ltz AS STRING) | ntz | CAST(ntz AS TIMESTAMP_LTZ(3)) |
+-------------------------+---------------------------+-------------------------+-------------------------+-------------------------------+
| 1970-01-01 08:00:04.001 | 1970-01-01 08:00:04.001 | 1970-01-01 08:00:04.001 | 1970-01-01 00:00:01.001 | 1970-01-01 00:00:01.001 |
+-------------------------+---------------------------+-------------------------+-------------------------+-------------------------------+

4.事件时间和时区应用案例

这里分两类,分别是 TIMESTAMP(不带时区信息的时间)、TIMESTAMP_LTZ(带时区信息的时间) 的事件时间 Flink SQL 任务。

  • TIMESTAMP(不带时区信息的时间)
Flink SQL> CREATE TABLE MyTable2 (
item STRING,
price DOUBLE,
ts TIMESTAMP(3), -- TIMESTAMP 类型的时间戳
WATERMARK FOR ts AS ts - INTERVAL '10' SECOND
) WITH (
'connector' = 'socket',
'hostname' = '127.0.0.1',
'port' = '9999',
'format' = 'csv'
);

Flink SQL> CREATE VIEW MyView4 AS
SELECT
TUMBLE_START(ts, INTERVAL '10' MINUTES) AS window_start,
TUMBLE_END(ts, INTERVAL '10' MINUTES) AS window_end,
TUMBLE_ROWTIME(ts, INTERVAL '10' MINUTES) as window_rowtime,
item,
MAX(price) as max_price
FROM MyTable2
GROUP BY TUMBLE(ts, INTERVAL '10' MINUTES), item;

Flink SQL> DESC MyView4;

+----------------+------------------------+------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+----------------+------------------------+------+-----+--------+-----------+
| window_start | TIMESTAMP(3) | true | | | |
| window_end | TIMESTAMP(3) | true | | | |
| window_rowtime | TIMESTAMP(3) *ROWTIME* | true | | | |
| item | STRING | true | | | |
| max_price | DOUBLE | true | | | |
+----------------+------------------------+------+-----+--------+-----------+

将数据写入到 MyTable2 中:

> nc -lk 9999
A,1.1,2021-04-15 14:01:00
B,1.2,2021-04-15 14:02:00
A,1.8,2021-04-15 14:03:00
B,2.5,2021-04-15 14:04:00
C,3.8,2021-04-15 14:05:00
C,3.8,2021-04-15 14:11:00

最终结果如下:

Flink SQL> SET table.local-time-zone=UTC; 
Flink SQL> SELECT * FROM MyView4;

+-------------------------+-------------------------+-------------------------+------+-----------+
| window_start | window_end | window_rowtime | item | max_price |
+-------------------------+-------------------------+-------------------------+------+-----------+
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | A | 1.8 |
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | B | 2.5 |
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | C | 3.8 |
+-------------------------+-------------------------+-------------------------+------+-----------+

Flink SQL> SET table.local-time-zone=Asia/Shanghai;
Flink SQL> SELECT * FROM MyView4;

+-------------------------+-------------------------+-------------------------+------+-----------+
| window_start | window_end | window_rowtime | item | max_price |
+-------------------------+-------------------------+-------------------------+------+-----------+
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | A | 1.8 |
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | B | 2.5 |
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | C | 3.8 |
+-------------------------+-------------------------+-------------------------+------+-----------+

通过上述结果可见,使用 TIMESTAMP(不带时区信息的时间) 进开窗,在 UTC 时区下的计算结果与在 Asia/Shanghai 时区下计算的窗口开始时间,窗口结束时间和窗口的时间是相同的。

  • TIMESTAMP_LTZ(带时区信息的时间)
Flink SQL> CREATE TABLE MyTable3 (
item STRING,
price DOUBLE,
ts BIGINT, -- long 类型的时间戳
ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3), -- 转为 TIMESTAMP_LTZ 类型的时间戳
WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '10' SECOND
) WITH (
'connector' = 'socket',
'hostname' = '127.0.0.1',
'port' = '9999',
'format' = 'csv'
);

Flink SQL> CREATE VIEW MyView5 AS
SELECT
TUMBLE_START(ts_ltz, INTERVAL '10' MINUTES) AS window_start,
TUMBLE_END(ts_ltz, INTERVAL '10' MINUTES) AS window_end,
TUMBLE_ROWTIME(ts_ltz, INTERVAL '10' MINUTES) as window_rowtime,
item,
MAX(price) as max_price
FROM MyTable3
GROUP BY TUMBLE(ts_ltz, INTERVAL '10' MINUTES), item;

Flink SQL> DESC MyView5;

+----------------+----------------------------+-------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+----------------+----------------------------+-------+-----+--------+-----------+
| window_start | TIMESTAMP(3) | false | | | |
| window_end | TIMESTAMP(3) | false | | | |
| window_rowtime | TIMESTAMP_LTZ(3) *ROWTIME* | true | | | |
| item | STRING | true | | | |
| max_price | DOUBLE | true | | | |
+----------------+----------------------------+-------+-----+--------+-----------+

将数据写入 MyTable3:

A,1.1,1618495260000  # 对应到 UTC 时区的时间为 2021-04-15 14:01:00
B,1.2,1618495320000 # 对应到 UTC 时区的时间为 2021-04-15 14:02:00
A,1.8,1618495380000 # 对应到 UTC 时区的时间为 2021-04-15 14:03:00
B,2.5,1618495440000 # 对应到 UTC 时区的时间为 2021-04-15 14:04:00
C,3.8,1618495500000 # 对应到 UTC 时区的时间为 2021-04-15 14:05:00
C,3.8,1618495860000 # 对应到 UTC 时区的时间为 2021-04-15 14:11:00

最终结果如下:

Flink SQL> SET table.local-time-zone=UTC; 
Flink SQL> SELECT * FROM MyView5;

+-------------------------+-------------------------+-------------------------+------+-----------+
| window_start | window_end | window_rowtime | item | max_price |
+-------------------------+-------------------------+-------------------------+------+-----------+
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | A | 1.8 |
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | B | 2.5 |
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | C | 3.8 |
+-------------------------+-------------------------+-------------------------+------+-----------+

Flink SQL> SET table.local-time-zone=Asia/Shanghai;
Flink SQL> SELECT * FROM MyView5;

+-------------------------+-------------------------+-------------------------+------+-----------+
| window_start | window_end | window_rowtime | item | max_price |
+-------------------------+-------------------------+-------------------------+------+-----------+
| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999 | A | 1.8 |
| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999 | B | 2.5 |
| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999 | C | 3.8 |
+-------------------------+-------------------------+-------------------------+------+-----------+

通过上述结果可见,使用 TIMESTAMP_LTZ(带时区信息的时间) 进开窗,在 UTC 时区下的计算结果与在 Asia/Shanghai 时区下计算的窗口开始时间,窗口结束时间和窗口的时间是不同的,都是按照时区进行格式化的。

5.处理时间和时区应用案例

Flink SQL 定义处理时间属性列是通过 PROCTIME() 函数来指定的,其返回值类型是 TIMESTAMP_LTZ。

注意:

在 Flink 1.13 之前,PROCTIME() 函数返回类型是 TIMESTAMP,返回值是 UTC 时区的时间戳,例如,上海时间显示为 2021-03-01 12:00:00 时,PROCTIME() 返回值显示 2021-03-01 04:00:00,我们进行使用是错误的。Flink 1.13 修复了这个问题,使用 TIMESTAMP_LTZ 作为 PROCTIME() 的返回类型,这样 Flink 就会自动获取当前时区信息,然后进行处理,不需要用户再进行时区的格式化处理了。

如下案例:

Flink SQL> SET table.local-time-zone=UTC;
Flink SQL> SELECT PROCTIME();

+-------------------------+
| PROCTIME() |
+-------------------------+
| 2021-04-15 14:48:31.387 |
+-------------------------+

Flink SQL> SET table.local-time-zone=Asia/Shanghai;
Flink SQL> SELECT PROCTIME();

+-------------------------+
| PROCTIME() |
+-------------------------+
| 2021-04-15 22:48:31.387 |
+-------------------------+

Flink SQL> CREATE TABLE MyTable1 (
item STRING,
price DOUBLE,
proctime as PROCTIME()
) WITH (
'connector' = 'socket',
'hostname' = '127.0.0.1',
'port' = '9999',
'format' = 'csv'
);

Flink SQL> CREATE VIEW MyView3 AS
SELECT
TUMBLE_START(proctime, INTERVAL '10' MINUTES) AS window_start,
TUMBLE_END(proctime, INTERVAL '10' MINUTES) AS window_end,
TUMBLE_PROCTIME(proctime, INTERVAL '10' MINUTES) as window_proctime,
item,
MAX(price) as max_price
FROM MyTable1
GROUP BY TUMBLE(proctime, INTERVAL '10' MINUTES), item;

Flink SQL> DESC MyView3;

+-----------------+-----------------------------+-------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+-----------------+-----------------------------+-------+-----+--------+-----------+
| window_start | TIMESTAMP(3) | false | | | |
| window_end | TIMESTAMP(3) | false | | | |
| window_proctime | TIMESTAMP_LTZ(3) *PROCTIME* | false | | | |
| item | STRING | true | | | |
| max_price | DOUBLE | true | | | |
+-----------------+-----------------------------+-------+-----+--------+-----------+

将数据写入到 MyTable1 中:

> nc -lk 9999
A,1.1
B,1.2
A,1.8
B,2.5
C,3.8

其输出结果如下:

Flink SQL> SET table.local-time-zone=UTC;
Flink SQL> SELECT * FROM MyView3;

+-------------------------+-------------------------+-------------------------+------+-----------+
| window_start | window_end | window_procime | item | max_price |
+-------------------------+-------------------------+-------------------------+------+-----------+
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.005 | A | 1.8 |
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.007 | B | 2.5 |
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.007 | C | 3.8 |
+-------------------------+-------------------------+-------------------------+------+-----------+

Flink SQL> SET table.local-time-zone=Asia/Shanghai;
Flink SQL> SELECT * FROM MyView3;

+-------------------------+-------------------------+-------------------------+------+-----------+
| window_start | window_end | window_procime | item | max_price |
+-------------------------+-------------------------+-------------------------+------+-----------+
| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.005 | A | 1.8 |
| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.007 | B | 2.5 |
| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.007 | C | 3.8 |
+-------------------------+-------------------------+-------------------------+------+-----------+

通过上述结果可见,使用处理时间进行开窗,在 UTC 时区下的计算结果与在 Asia/Shanghai 时区下计算的窗口开始时间,窗口结束时间和窗口的时间是不同的,都是按照时区进行格式化的。

6.SQL 时间函数返回在流批任务中的异同

以下函数:

  • LOCALTIME
  • LOCALTIMESTAMP
  • CURRENT_DATE
  • CURRENT_TIME
  • CURRENT_TIMESTAMP
  • NOW()

在 Streaming 模式下这些函数是每条记录都会计算一次,但在 Batch 模式下,只会在 query 开始时计算一次,所有记录都使用相同的时间结果。

以下时间函数无论是在 Streaming 模式还是 Batch 模式下,都会为每条记录计算一次结果:

  • CURRENT_ROW_TIMESTAMP()
  • PROCTIME()
 
友情链接
鄂ICP备19019357号-22