Kafka 是一款具备高吞吐量、高可靠性和高可扩展性的分布式消息队列,而 GreptimeDB 是专门用于存储时间序列数据的开源时序数据库。两者在各自的领域都表现出色,但如何高效地连接它们以实现数据的无缝传输和处理?
Vector 作为一个高速且可扩展的数据管道工具发挥了作用。它能够从多个来源(如应用日志、系统指标)收集、转换并传输数据,并将这些数据发送到不同的目标(如数据库、监控系统)。
而随着 GreptimeDB 现已全面支持日志数据的存储与分析,日志接收功能 greptime log sink 已被集成到 Vector 中,使用户可以通过 greptime_logs
sink 将来自 Vector 的各种数据源轻松写入 GreptimeDB。详情和示例代码可参考此文:《Vector 增加 GreptimeDB 日志写入支持,连接数十种数据源》。
接下来,本文将详细介绍如何使用 Vector 从 Kafka 读取日志数据并将其写入 GreptimeDB,包括具体的实现步骤与示例代码。
准备工作
假设我们已经有一个 Kafka 集群,其中有一个名为 test_topic
的 topic,里面存储了日志数据。Kafka 中的示例数据内容如下:
127.0.0.1 - - [04/Sep/2024:15:46:13 -0700] "GET / HTTP/1.1" 200 615 "-" "Mozilla/5.0 (X11; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0"
接下来,我们需要安装 Vector 和 GreptimeDB。
安装 & 配置 Vector
Vector 是一个开源的数据收集工具,支持从多种数据源读取数据,并将数据写入多种数据目的地。我们可以使用 Vector 从 Kafka 读取数据,并将数据写入 GreptimeDB。
安装 Vector 非常简单,可通过二进制容器等进行安装,具体安装步骤请参考 Vector 官方文档。
安装完成后,我们需要配置 Vector,使其能够从 Kafka 读取数据并写入 GreptimeDB。下面是一个简单的 Vector 配置文件:
[sources.mq]
type = "kafka"
group_id = "vector0"
topics = ["test_topic"]
bootstrap_servers = "kafka:9092"
[sinks.console]
type = "console"
inputs = [ "mq" ]
encoding.codec = "text"
[sinks.sink_greptime_logs]
type = "greptimedb_logs"
table = "demo_logs"
pipeline_name = "demo_pipeline"
compression = "gzip"
inputs = [ "mq" ]
endpoint = "http://greptimedb:4000"
上面的配置文件中,我们定义了一个名为 mq
的 source,用于从 Kafka 读取数据。我们还定义了一个名为 sink_greptime_logs
的 sink,用于将数据写入 GreptimeDB。
安装 & 配置 GreptimeDB
GreptimeDB 是一个开源的时序数据库,专门用于存储时间序列数据。我们可以使用 GreptimeDB 存储从 Kafka 读取的日志数据。
安装 GreptimeDB 同样非常简单,可通过二进制、容器等进行安装。具体安装步骤请参考 GreptimeDB 官方文档。
安装完成后,我们使用默认配置即可。因为日志数据多种多样,我们提供了 Pipeline 功能来处理和过滤日志数据,只保留日志中我们关心的数据,我们将在后续技术博客中分享 Pipeline 引擎的实现原理和方案步骤,敬请期待。
如下例子对我们提供的 nginx 日志格式进行了解析,我们使用如下所示的 Pipeline 配置文件。
processors:
- dissect:
fields:
- message
patterns:
- '%{ip} - - [%{datetime}] "%{method} %{path} %{protocol}" %{status} %{size} "-" "%{user_agent}"'
- date:
fields:
- datetime
formats:
- "%d/%b/%Y:%H:%M:%S %z"
- date:
fields:
- timestamp
formats:
- "%Y-%m-%dT%H:%M:%S%.3fZ"
transform:
- fields:
- ip
- path
type: string
- fields:
- method
- protocol
type: string
index: tag
- fields:
- user_agent
type: string
index: fulltext
- fields:
- status
type: uint32
index: tag
- fields:
- size
type: uint32
- fields:
- datetime
type: timestamp
index: timestamp
- fields:
- timestamp
type: timestamp
在上面的 Pipeline 配置文件中,我们使用 dissect
processor 对日志数据进行解析。本来非结构化的日志数据,被拆分并进行格式转化后,获得了一个结构化的数据,包含 ip
、datetime
、 method
、path
、 protocol
、 status
、size
和 user_agent
。然后使用 date
processor 对时间两个不同格式的时间字段进行解析。最后使用 transform
对字段进行转换,并设置 index
。
关于 index
,我们指定了 method
、protocol
、status
为 tag 字段,主要用于高效的查询,一些不确定值的数量的字段,或者值的数量特别多的,不建议设置为 tag,这会导致高基问题。所以 ip
和 size
均没有被设置为 tag 字段。
在 path
和 user_agent
字段,我们增加了全文索引。以便可以使用模糊搜索来快速找到的关心的内容。详细的查询语法可参考此处。
上述配置文件可通过 HTTP 接口上传到 GreptimeDB 中,以创建一个名为 demo_pipeline
的 Pipeline 用于日志的解析与修剪,然后存入 GreptimeDB 中。
curl -X 'POST' 'http://greptimedb:4000/v1/events/pipelines/demo_pipeline' -F 'file=@/config_data/pipeline.yaml' -v
运行 Vector & GreptimeDB
现在,我们已经准备好了 Vector 和 GreptimeDB,现在就可以运行它们了。成功后,Vector 将从 Kafka 读取数据,并将数据写入 GreptimeDB。
我们可以通过 MySQL 协议连接 GreptimeDB,查看数据。
mysql> show tables;
+-------------+
| Tables |
+-------------+
| demo_logs |
| numbers |
+-------------+
3 rows in set (0.00 sec)
mysql> select * from demo_logs order by timestamp desc limit 10;
+-----------------+--------+----------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+
| ip | method | protocol | path | user_agent | status | size | datetime | timestamp |
+-----------------+--------+----------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+
| 37.254.223.207 | DELETE | HTTP/2.0 | /about | Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; The World) | 201 | 495 | 2024-10-28 03:39:29 | 2024-10-28 03:39:29.982000 |
| 113.26.47.170 | PUT | HTTP/2.0 | /contact | Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; LBBROWSER) | 404 | 183 | 2024-10-28 03:39:26 | 2024-10-28 03:39:26.977000 |
| 33.80.49.13 | PUT | HTTP/2.0 | /about | Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11 | 500 | 150 | 2024-10-28 03:39:23 | 2024-10-28 03:39:23.973000 |
| 240.14.156.37 | DELETE | HTTP/1.1 | /contact | Mozilla/5.0 (Linux; U; Android 3.0; en-us; Xoom Build/HRI39) AppleWebKit/534.13 (KHTML, like Gecko) Version/4.0 Safari/534.13 | 200 | 155 | 2024-10-28 03:39:20 | 2024-10-28 03:39:20.969000 |
| 210.90.39.41 | POST | HTTP/2.0 | /about | Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; LBBROWSER) | 201 | 188 | 2024-10-28 03:39:17 | 2024-10-28 03:39:17.964000 |
| 219.88.194.150 | DELETE | HTTP/1.1 | /contact | Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.0; Trident/4.0) | 404 | 704 | 2024-10-28 03:39:14 | 2024-10-28 03:39:14.963000 |
| 130.255.0.241 | DELETE | HTTP/1.1 | /contact | Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1) | 500 | 816 | 2024-10-28 03:39:11 | 2024-10-28 03:39:11.959000 |
| 168.144.155.215 | POST | HTTP/1.1 | / | Mozilla/5.0 (iPhone; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5 | 500 | 511 | 2024-10-28 03:39:08 | 2024-10-28 03:39:08.954000 |
| 28.112.30.158 | GET | HTTP/1.1 | /about | Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; en) Opera 9.50 | 200 | 842 | 2024-10-28 03:39:05 | 2024-10-28 03:39:05.950000 |
| 166.9.187.104 | GET | HTTP/2.0 | /blog | Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.71 Safari/537.36 | 201 | 970 | 2024-10-28 03:39:02 | 2024-10-28 03:39:02.946000 |
+-----------------+--------+----------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+
10 rows in set (0.00 sec)
mysql> desc demo_logs;
+------------+---------------------+------+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+------------+---------------------+------+------+---------+---------------+
| ip | String | | YES | | FIELD |
| method | String | PRI | YES | | TAG |
| protocol | String | PRI | YES | | TAG |
| path | String | | YES | | FIELD |
| user_agent | String | | YES | | FIELD |
| status | UInt32 | PRI | YES | | TAG |
| size | UInt32 | | YES | | FIELD |
| datetime | TimestampNanosecond | PRI | NO | | TIMESTAMP |
| timestamp | TimestampNanosecond | | YES | | FIELD |
+------------+---------------------+------+------+---------+---------------+
9 rows in set (0.00 sec)
mysql> show create table demo_logs;
+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table |
+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| demo_logs | CREATE TABLE IF NOT EXISTS `demo_logs` (
`ip` STRING NULL,
`method` STRING NULL,
`protocol` STRING NULL,
`path` STRING NULL FULLTEXT WITH(analyzer = 'English', case_sensitive = 'false'),
`user_agent` STRING NULL FULLTEXT WITH(analyzer = 'English', case_sensitive = 'false'),
`status` INT UNSIGNED NULL,
`size` INT UNSIGNED NULL,
`datetime` TIMESTAMP(9) NOT NULL,
`timestamp` TIMESTAMP(9) NULL,
TIME INDEX (`datetime`),
PRIMARY KEY (`method`, `protocol`, `status`)
)
ENGINE=mito
WITH(
append_mode = 'true'
) |
+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)
现在我们的数据已经入库,可以利用 GreptimeDB 提供的一些功能来快速过滤我们关心的数据,比如通过全文搜索我们可以对 UA 进行模糊匹配,快速找到 UA
包含 Android
的数据。
mysql> SELECT * FROM demo_logs WHERE MATCHES(user_agent, 'Android') limit 10;
+-----------------+--------+----------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+
| ip | method | protocol | path | user_agent | status | size | datetime | timestamp |
+-----------------+--------+----------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+
| 240.14.156.37 | DELETE | HTTP/1.1 | /contact | Mozilla/5.0 (Linux; U; Android 3.0; en-us; Xoom Build/HRI39) AppleWebKit/534.13 (KHTML, like Gecko) Version/4.0 Safari/534.13 | 200 | 155 | 2024-10-28 03:39:20 | 2024-10-28 03:39:20.969000 |
| 186.44.204.29 | DELETE | HTTP/1.1 | / | Opera/9.80 (Android 2.3.4; Linux; Opera Mobi/build-1107180945; U; en-GB) Presto/2.8.149 Version/11.10 | 201 | 343 | 2024-10-28 03:45:33 | 2024-10-28 03:45:33.459000 |
| 75.246.111.167 | DELETE | HTTP/1.1 | /contact | Mozilla/5.0 (Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1 | 404 | 869 | 2024-10-28 03:38:59 | 2024-10-28 03:38:59.942000 |
| 236.239.192.109 | DELETE | HTTP/1.1 | /contact | Mozilla/5.0 (Linux; U; Android 2.2.1; zh-cn; HTC_Wildfire_A3333 Build/FRG83D) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1 | 500 | 892 | 2024-10-28 03:38:53 | 2024-10-28 03:38:53.934000 |
| 232.232.14.176 | DELETE | HTTP/1.1 | /contact | Mozilla/5.0 (Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1 | 500 | 644 | 2024-10-28 03:46:42 | 2024-10-28 03:46:42.550000 |
| 135.16.130.172 | DELETE | HTTP/2.0 | / | MQQBrowser/26 Mozilla/5.0 (Linux; U; Android 2.3.7; zh-cn; MB200 Build/GRJ22; CyanogenMod-7) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1 | 404 | 177 | 2024-10-28 03:47:27 | 2024-10-28 03:47:27.613000 |
| 69.23.7.123 | GET | HTTP/1.1 | /blog | Mozilla/5.0 (Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1 | 201 | 770 | 2024-10-28 03:45:09 | 2024-10-28 03:45:09.425000 |
| 37.61.6.211 | GET | HTTP/1.1 | /blog | MQQBrowser/26 Mozilla/5.0 (Linux; U; Android 2.3.7; zh-cn; MB200 Build/GRJ22; CyanogenMod-7) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1 | 404 | 298 | 2024-10-28 03:45:21 | 2024-10-28 03:45:21.442000 |
| 244.166.255.46 | GET | HTTP/2.0 | /blog | Mozilla/5.0 (Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1 | 201 | 963 | 2024-10-28 03:45:48 | 2024-10-28 03:45:48.478000 |
| 35.169.107.238 | GET | HTTP/2.0 | /blog | Mozilla/5.0 (Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1 | 404 | 249 | 2024-10-28 03:46:48 | 2024-10-28 03:46:48.558000 |
+-----------------+--------+----------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+
10 rows in set (0.01 sec)
在进行一些统计工作或者问题排查的时候,经常性的需要区分用户的渠道和 url。例如,我们可能需要筛选出 Android 渠道的用户,并查看访问 blog 页面时的 HTTP 状态码分布。通过以下 SQL 查询,可以快速获取所需结果,显著减少数据处理时间。
mysql> SELECT method,status,count(*) FROM demo_logs WHERE MATCHES(user_agent, 'Android') and MATCHES(path, 'blog') group by method,status;
+--------+--------+----------+
| method | status | COUNT(*) |
+--------+--------+----------+
| GET | 404 | 2 |
| GET | 201 | 3 |
| PUT | 500 | 2 |
| POST | 404 | 1 |
+--------+--------+----------+
4 rows in set (0.01 sec)
我们已经将此过程打包成一个 docker compose 文件,欢迎前往 GitHub demo-scene repo 获取相关源码和指南:
https://github.com/GreptimeTeam/demo-scene/tree/main/kafka-ingestion
总结
本文介绍了如何利用 Vector 从 Kafka 读取日志数据并写入 GreptimeDB。Vector 是一个开源的数据收集工具,支持从多种数据源读取数据,并将数据写入多种数据目的地。目前已支持 GreptimeDB 的 sink,可以很方便的将原先系统中的监控数据导入 GreptimeDB 中。
本文介绍了如何利用 Vector 工具将 Kafka 中的日志数据无缝传输至 GreptimeDB 中,充分利用 GreptimeDB 在存储和分析时序数据上的优势,以及 Vector 的灵活性让数据处理更加高效。
GreptimeDB 强大的日志存储和查询功能为日志分析提供了可靠保障,无论是构建日志管理系统,还是进行实时监控与分析,Kafka + Vector + GreptimeDB 的组合能够帮助用户实现高效的数据流转与处理。
未来我们将进一步介绍如何通过 GreptimeDB 的 Pipeline 引擎实现更加复杂的日志处理和数据过滤,敬请期待!
11 月 9 日我们将在深圳举办「云平台及 AI 时代下的可观测性技术演进」线下沙龙,欢迎报名!
无法线下参加的朋友欢迎预约线上直播:
关于 Greptime
Greptime 格睿科技专注于为可观测、物联网及车联网等领域提供实时、高效的数据存储和分析服务,帮助客户挖掘数据的深层价值。目前基于云原生的时序数据库 GreptimeDB 已经衍生出多款适合不同用户的解决方案,更多信息或 demo 展示请联系下方小助手(微信号:greptime)。
欢迎对开源感兴趣的朋友们参与贡献和讨论,从带有 good first issue 标签的 issue 开始你的开源之旅吧~期待在开源社群里遇见你!添加小助手微信即可加入“技术交流群”与志同道合的朋友们面对面交流哦~
Star us on GitHub Now: https://github.com/GreptimeTeam/greptimedb
Twitter: https://twitter.com/Greptime
Slack: https://greptime.com/slack