欢迎参与 8 月 1 日中午 11 点的线上分享,了解 GreptimeDB 联合处理指标和日志的最新方案! 👉🏻 点击加入

Skip to content
On this page
教程
2024-06-04

用户指南 | 如何使用 Flow 功能实现持续聚合,赋能实时计算和查询

在最新发布的版本 v0.8 中,GreptimeDB 实现了 Flow Engine 来支持持续聚合功能,用户可以实时计算和查询数据的总和、平均值或使用其他聚合计算功能。本文介绍了 GreptimeDB 中持续聚合功能的基本用法和特性,并且举例说明了创建、使用和删除 Flow 任务的流程。

在最新发布的版本 v0.8 中,GreptimeDB 实现了 Flow Engine 来支持持续聚合功能,用户可以实时计算和查询数据的总和、平均值或使用其他聚合计算功能。

本文介绍了 GreptimeDB 中持续聚合功能的基本用法和特性,并且举例说明了创建、使用和删除 Flow 任务的流程。

什么是持续聚合

持续聚合功能在实际应用中有许多落地场景,比如 Streaming ETL、实时分析、监控报警等。其中一个最常见的应用是降采样(Downsampling),使用窗口函数,可以把一个毫秒级输出频率的信号降采样到秒级(比如通过计算一秒内的平均值),这样就可以节省存储和计算成本。

进一步地,例如一个速度传感器高频输入大量数据,持续聚合功能可以对这些输入数据进行过滤,过滤掉速度低于或高于一定数值的数据点,并且计算每五分钟内的平均速度,最后将结果输出到结果表中。

持续聚合功能由 Flow Engine 提供。Flow 是 GreptimeDB 内置的一个轻量级流处理引擎,为用户提供了持续聚合、窗口计算等功能。用户可以直接使用 SQL 语句来创建一个 Flow 任务进行持续聚合,无需额外编写业务代码。Flow 任务可用于实时数据处理、实时计算等场景。

应用示例

持续聚合功能可以完全通过 SQL 来定义和使用,本文将演示从创建 Flow 任务,接受数据进行流处理,到删除该 Flow 任务的全部流程

我们以一个速度传感器读入左右轮的瞬时速度,并且过滤掉较高或较低的异常值,并计算五秒内的平均速度为例。

首先,创建一个源数据表作为输入:

sql
CREATE TABLE velocity (
    ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    left_wheel FLOAT,
    right_wheel FLOAT,
    TIME INDEX(ts)
);

以及作为 Flow 任务输出的结果表:

sql
CREATE TABLE avg_speed (
    avg_speed FLOAT,
    start_window TIMESTAMP TIME INDEX,
    end_window TIMESTAMP,
    update_at TIMESTAMP,
);

接下来就可以创建 Flow 任务了,这里需要使用我们提供的 SQL 方言语法 CREATE FLOW,可以参考如下例子:

sql
CREATE FLOW calc_avg_speed
SINK TO avg_speed
AS
SELECT avg((left_wheel+right_wheel)/2)
FROM velocity
WHERE left_wheel > 0.5 AND right_wheel > 0.5 AND left_wheel < 60 AND right_wheel < 60
GROUP BY tumble(ts, '5 second');

上述 SQL 语句的意思是:创建一个名为 calc_avg_speed 的 Flow 任务,它会把结果输出到 avg_speed 表上。其上运行的查询由 AS 之后的 SELECT 语句定义。

首先,过滤掉左右轮速度值中过小或过大的值(小于等于 0.5 或大于等于 60);然后,基于输入表 velocity,以 ts 列的每五秒的间隔作为窗口,计算窗口内的平均速度。Flow 作业当中的查询完全基于 SQL 语法,并根据需求实现了相关扩展。

现在 Flow 任务已经创建,想要观察 avg_speed 中的持续聚合的结果,只需要向源数据表 velocity 中插入数据:

sql
INSERT INTO velocity 
VALUES
    ("2021-07-01 00:00:00.200", 0.0, 0.7),
    ("2021-07-01 00:00:00.200", 0.0, 61.0),
    ("2021-07-01 00:00:02.500", 2.0, 1.0,);

注意其中前两行都因为不符合条件被过滤掉了,只留下第三行被用于计算,查询输出表就可以得到计算结果:

sql
SELECT * FROM avg_speed;
sql
 avg_speed |        start_window        |         end_window         |         update_at          
-----------+----------------------------+----------------------------+----------------------------
       1.5 | 2021-07-01 00:00:00.000000 | 2021-07-01 00:00:05.000000 | 2024-06-04 03:35:20.670000
(1 row)

尝试向 velocity 表中插入更多数据:

sql
INSERT INTO velocity 
VALUES
    ("2021-07-01 00:00:05.100", 5.0, 4.0),
    ("2021-07-01 00:00:09.600", 2.3, 2.1);

结果表 avg_speed 现在包含两行:分别表示两个 5 秒窗口的平均值,1.5 和 3.35(=(4.5+2.2)/2)

sql
SELECT * FROM avg_speed;
sql
 avg_speed |        start_window        |         end_window         |         update_at          
-----------+----------------------------+----------------------------+----------------------------
       1.5 | 2021-07-01 00:00:00.000000 | 2021-07-01 00:00:05.000000 | 2024-06-04 03:35:20.670000
      3.35 | 2021-07-01 00:00:05.000000 | 2021-07-01 00:00:10.000000 | 2024-06-04 03:35:34.693000

avg_speed 表中的列解释如下:

  • avg_speed:窗口中计算得到的平均速度;

  • start_window:窗口的开始时间;

  • end_window:窗口的结束时间;

  • update_at:更新行数据的时间。

其中 start_windowend_window 是 Flow 引擎的时间窗口函数 tumble 自动添加的。update_at 则是 Flow 引擎对 Flow 任务输出表自动添加的一列,用于标记这一行数据的最新更新时间,以便了解 Flow 任务的运行情况。

最后,使用 DROP FLOW 删除这个 Flow 任务:

sql
DROP FLOW calc_avg_speed;

Flow 管理及高级特性

创建或更新 Flow

创建 Flow 的语法是:

sql
CREATE FLOW [ IF NOT EXISTS ] <flow-name>
SINK TO <sink-table-name>
[ EXPIRE AFTER <expr> ]
[ COMMENT = "<string>" ]
AS 
<sql>;

上述创建 Flow 任务的语法的解释如下:

  • flow-name 是全局唯一的标识符。

  • sink-table-name 是存储聚合数据的表名。它可以是一个现有的表或一个新表。如果目标表不存在,Flow 将自动创建目标表。

  • EXPIRE AFTER 是一个可选的时间间隔(使用 SQL的INTERVAL语法表示),用于从 Flow 引擎中清除过期的中间状态。

  • COMMENT 是 Flow 任务的注释性描述。

  • <sql>部分对应具体的持续聚合查询。Flow 计算引擎会从中提取引用到的表名并且作为 Flow 任务的源表。

一个简单的示例:

sql
CREATE FLOW IF NOT EXISTS my_flow
SINK TO my_sink_table
EXPIRE AFTER INTERVAL '1 hour'
COMMENT = "My first flow in GreptimeDB"
AS
SELECT count(item) from my_source_table GROUP BY tumble(time_index, '5 minutes');

其中 EXPIRE AFTER 项可能需要进一步解释。简单来说,像所有现代的流处理系统一样,Flow 计算引擎有两个重要概念:系统时间和事件时间

  • 系统时间:也叫处理时间,就是进行流处理计算的机器的系统时间。

  • 事件时间:某一行数据代表的事件发生的时间,一般也会记录在该行数据的某一列中,Flow 将 TIME INDEX 列视为事件时间。

EXPIRE AFTER 过期机制利用系统时间和事件时间之间的差值,清除掉 Flow 中间状态中过于古老的行。,上面示例 SQL 中,事件时间老于系统时间一小时以上的行就会被清除掉,不再参与运算。

注意,EXPIRE AFTER 只作用于新到达的数据。因此输出表中的结果不会单纯因为时间流逝而产生变化,只是不再会有更老的数据被更新到结果表上了。

另外, Flow 的中间状态目前也没有进行任何持久化,而是纯内存的,之后会添加持久化功能以使其可以在重启后也能保证数据正确。

删除 Flow

使用如下语句即可删除一个 Flow 任务:

sql
DROP FLOW [IF EXISTS] <name>

Flow 目前支持的聚合函数

除了 countsumavgminmax 这几种聚合函数,Flow 目前还支持了加减乘除、比较和逻辑运算几种标量函数,以及固定窗口的 tumble 函数。

未来,我们计划在持续聚合中支持更多的聚合函数、标量函数和窗口函数。

总结

本文介绍了 GreptimeDB 中持续聚合功能的基本用法和特性,并且举例说明了创建、使用和删除 Flow 任务的流程。使用持续聚合可以随时、低延时(秒级/亚秒级)地获取用户关心的信息,同时避免了额外的内存和计算上的开销。

未来,除了支持更多函数之外,我们还会支持流处理中间状态的持久化和诸如 Temporal Filter 等高级功能,更详细的信息可以参考相关的用户文档开发指南

关于 Greptime

Greptime 格睿科技专注于为可观测、物联网及车联网等领域提供实时、高效的数据存储和分析服务,帮助客户挖掘数据的深层价值。目前基于云原生的时序数据库 GreptimeDB 已经衍生出多款适合不同用户的解决方案,更多信息或 demo 展示请联系下方小助手(微信号:greptime)。

欢迎对开源感兴趣的朋友们参与贡献和讨论,从带有 good first issue 标签的 issue 开始你的开源之旅吧~期待在开源社群里遇见你!添加小助手微信即可加入“技术交流群”与志同道合的朋友们面对面交流哦~

Star us on GitHub Now: https://github.com/GreptimeTeam/greptimedb

官网:https://greptime.cn/

文档:https://docs.greptime.cn/

Twitter: https://twitter.com/Greptime

Slack: https://greptime.com/slack

LinkedIn: https://www.linkedin.com/company/greptime/

加入我们的社区

获取 Greptime 最新更新,并与其他用户讨论。