DeepFlow 扩展协议解析实践
2023-12-13本文整理自 DeepFlow 社区 Commiter 郑志聪在「云原生 + 可观测性 Meetup 广州站 - DeepFlow 扩展协议解析实践」中的分享实录。回看链接,PPT 下载。
DeepFlow 对于云原生应用来说,完全具备零侵扰的形式提供应用可观测能力,这个能力得益于其基于 eBPF 以零侵扰的方式采集应用数据,并通过解析多种应用及业务协议来实现应用可观测能力,本文通过深入剖析代码流程,用实战的方法介绍 DeepFlow 中两种常见的协议扩展方法:如何从零开始增加支持一种全新的协议(MongoDB),以及如何利用 DeepFlow 的 Wasm Plugin 机制对已有协议(Kafka)的解析能力进行增强,此次分享的 MongoDB 协议及 Kafka 增加的字段都已经合 DeepFlow 6.4 版本,欢迎大家使用。
0x0: 从分析协议规范开始(MongoDB)
协议文档的分析思路
首先要从官方网站找到协议解析的文档,在协议文档《mongodb-wire-protocol#standard-message-header》中,可以看到 MongoDB 的协议头结构体描述如下:
1 | struct MsgHeader { |
上述结构代码理解为下图所示:
image
⚠️注意,在协议文档《mongodb-wire-protocol》有一段说明,MongoDB 协议是用了字节小端顺序:
Byte Ordering
All integers in the MongoDB wire protocol use little-endian byte order: that is, least-significant
接下来从实际的抓包看一下实际的数据是长什么样子的:
image
1 | 0000 a3 00 00 00 0a 50 88 48 23 00 00 00 dd 07 00 00 |
上述的抓包数据简单拆解到如下信息:
- 字段
messageLength
为a3 00 00 00
:即 消息长度为a3
- 字段
requestID
为0a 50 88 48
:即 请求ID为4888500a
- 字段
responseTo
为23 00 00 00
:即 对ID为23
的响应 - 字段
opCode
为dd 07 00 00
:即 命令号为7dd
,十进制是2013
,对应协议文档中的OP_MSG
指令
MongoDB 协议操作码说明表
操作码名称 | 操作码 | 操作码说明 | 额外说明 |
---|---|---|---|
OP_COMPRESSED | 2012 | 使用压缩 | |
OP_MSG | 2013 | Send a message using the standard format. Used for both client requests and database replies. | |
OP_REPLY | 1 | 通过 responseTo 指定响应客户端请求。 | Deprecated in MongoDB 5.0. Removed in MongoDB 5.1.| |
OP_UPDATE | 2001 | 更新文档 | Deprecated in MongoDB 5.0. Removed in MongoDB 5.1. |
OP_INSERT | 2002 | 插入文档 | Deprecated in MongoDB 5.0. Removed in MongoDB 5.1. |
RESERVED | 2003 | 略 | |
OP_QUERY | 2004 | 查询文档 | Deprecated in MongoDB 5.0. Removed in MongoDB 5.1. |
OP_GET_MORE | 2005 | 略 | Deprecated in MongoDB 5.0. Removed in MongoDB 5.1. |
OP_DELETE | 2006 | 删除文档 | Deprecated in MongoDB 5.0. Removed in MongoDB 5.1. |
OP_KILL_CURSORS | 2007 | 略 | Deprecated in MongoDB 5.0. Removed in MongoDB 5.1. |
对最常见的操作码 OP_MSG
分析
从协议文档 《mongodb-wire-protocol#op_msg》 查看 OP_MSG
的结构体:
1 | OP_MSG { |
OP_MSG
需要关注的解码内容在 Sections
,只需要判断 kind
为 0
和 1
的情况,其中:
- 0:后面直接用
BSON
解码 - 1:先偏移
int32
和c_string
占用的byte
后,用BSON
解码后面的内容
image
从实际抓包看一下原始数据。如下所示,MongoDB 协议的操作码 OP_MSG
内容从第十六(从 0 开始数,后续文档统一按此规律)字节开始:
image
1 | 0000 a3 00 00 00 0a 50 88 48 23 00 00 00 dd 07 00 00 |
不需要关心字段 flagBits
,偏移4个字节后从第四个字节判断字段 kind
类型。由此判断后面为 BSON
结构数据。
到这里我们已经基本了解到 MongoDB 协议的数据结构和解码思路了,接下来我们开始在 DeepFlow Agent 中尝试实现解码观察。
0x1: 在 DeepFlow 中扩展一种新协议(MongoDB)
DeepFlow Agent 的开发文档概要
前提, DeepFlow Agent 的原生开发需要掌握 Rust 语言的基础开发能力。
接下来先参考官方文档《HOW_TO_SUPPORT_YOUR_PROTOCOL_CN》了解几个关键信息:
L7Protocol
用于标识协议常量- 源码位置:
deepflow/agent/crates/public/src/l7_protocol.rs
- 源码位置:
L7ProtocolParser
主要用于协议判断和解析出L7ProtocolInfo
(七层协议的基础结构信息)- 源码位置:
deepflow/agent/src/common/l7_protocol_log.rs
- 源码位置:
L7ProtocolInfo
由L7ProtocolParser
解析出来,并且用于后续会话聚合- 源码位置:
deepflow/agent/src/common/l7_protocol_info.rs
- 源码位置:
L7ProtocolInfoInterface
七层协议结构L7ProtocolInfo
都需要实现这个接口来处理特征逻辑- 源码位置:
deepflow/agent/src/common/l7_protocol_info.rs
- 源码位置:
L7ProtocolSendLog
统一发送到deepflow-server
的结构- 源码位置:
deepflow/agent/src/flow_generator/protocol_logs/pb_adapter.rs
- 源码位置:
在 DeepFlow Agent 中开发的大致步骤:
- 在
deepflow/agent/crates/public/src/l7_protocol.rs
添加对应协议名称和协议号。 L7ProtocolParser::parse_payload()
需要返回L7ProtocolInfo
,所以需要先定义一个结构,实现L7ProtocolInfoInterface
接口并且添加到L7ProtocolInfo
这个枚举。- 实现
L7ProtocolParserInterface
接口,并添加到deepflow/agent/src/common/l7_protocol_log.rs
中的impl_protocol_parser!
宏。 - 在
deepflow-server
中只需增加一个常量用于搜索提示即可。
代码指引
定义一个协议,并用一个常量标识
源码位置:
deepflow/agent/crates/public/src/l7_protocol.rs
,DeepFlow Agent 通过遍历所有支持协议判断一个流的应用层协议。
这里说明一下,由于业界的通用应用协议没有一个约束字段来定义应用协议类型,所以在大量网络包是通过遍历已知协议解码逻辑来判断应用层协议的。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28pub enum L7Protocol {
Unknown = 0,
Other = 1,
// HTTP
Http1 = 20,
Http2 = 21,
Http1TLS = 22,
Http2TLS = 23,
// RPC
Dubbo = 40,
Grpc = 41,
SofaRPC = 43,
FastCGI = 44,
// SQL
MySQL = 60,
PostgreSQL = 61,
// NoSQL
Redis = 80,
+ MongoDB = 81,
// MQ
Kafka = 100,
MQTT = 101,
// INFRA
DNS = 120,
Custom = 127,
Max = 255,
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21impl From<String> for L7Protocol {
fn from(l7_protocol_str: String) -> Self {
let l7_protocol_str = l7_protocol_str.to_lowercase();
match l7_protocol_str.as_str() {
"http" | "https" => Self::Http1,
"dubbo" => Self::Dubbo,
"grpc" => Self::Grpc,
"fastcgi" => Self::FastCGI,
"custom" => Self::Custom,
"sofarpc" => Self::SofaRPC,
"mysql" => Self::MySQL,
+ "mongodb" => Self::MongoDB,
"postgresql" => Self::PostgreSQL,
"redis" => Self::Redis,
"kafka" => Self::Kafka,
"mqtt" => Self::MQTT,
"dns" => Self::DNS,
_ => Self::Unknown,
}
}
}为新协议准备解析逻辑
定义结构体:在
deepflow/agent/src/flow_generator/protocol_logs/
该路径下找一个目录建立相关的协议解析逻辑代码文件,该案例的代码文件放在上述目录下的sql/mongo.rs
。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30pub struct MongoDBInfo {
msg_type: LogMessageType,
pub req_len: u32,
pub resp_len: u32,
//// 参考“deepflow/agent/src/flow_generator/protocol_logs/pb_adapter.rs”
// 准备要处理的结构体。
// 其中“request_id”、“response_id”、“op_code”和“op_code_name”是
// 从mongodb header解析出来的关键信息。
pub request_id: u32,
pub response_id: u32,
pub op_code: u32,
pub op_code_name: String,
//// “request”、“response”和“response_code”是
// 从mongodb协议主体内容解析出来的所需信息。
pub request: String,
pub response: String,
pub response_code: i32,
////
pub status: L7ResponseStatus,
}实现
L7ProtocolParserInterface
先看源码结构逻辑(以下只显示需处理函数,不需处理的保留默认逻辑即可)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
pub trait L7ProtocolParserInterface {
fn check_payload(&mut self, payload: &[u8], param: &ParseParam) -> bool;
// 协议解析
fn parse_payload(&mut self, payload: &[u8], param: &ParseParam) -> Result<L7ParseResult>;
// 返回协议号和协议名称,由于的bitmap使用u128,所以协议号不能超过128.
// 其中 crates/public/src/l7_protocol.rs 里面的 pub const L7_PROTOCOL_xxx 是已实现的协议号.
// ===========================================================================================
// return protocol number and protocol string. because of bitmap use u128, so the max protocol number can not exceed 128
// crates/public/src/l7_protocol.rs, pub const L7_PROTOCOL_xxx is the implemented protocol.
fn protocol(&self) -> L7Protocol;
// l4是tcp时是否解析,用于快速过滤协议
// ==============================
// whether l4 is parsed when tcp, use for quickly protocol filter
fn parsable_on_tcp(&self) -> bool {
true
}
// l4是udp是是否解析,用于快速过滤协议
// ==============================
// whether l4 is parsed when udp, use for quickly protocol filter
fn parsable_on_udp(&self) -> bool {
true
}
// return perf data
fn perf_stats(&mut self) -> Option<L7PerfStats>;
}解码协议的第一步是如何识别协议,代码中需处理
L7ProtocolParserInterface::check_payload()
逻辑定义
MongoDB
协议头并解码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21// 定义MongoDB协议头结构体,并对必要信息字段一一解码
pub struct MongoDBHeader {
length: u32,
request_id: u32,
response_to: u32,
op_code: u32,
op_code_name: String,
}
impl MongoDBHeader {
fn decode(&mut self, payload: &[u8]) -> isize {
// 对payload前16位以MongoDBHeader结构解码,判断是否符合MongoDB的协议
}
fn is_request(&self) -> bool {
// 解码op_code判断是否request
}
pub fn get_op_str(&self) -> &'static str {
// 解码op_code出对应文本描述
}
}在
L7ProtocolParserInterface::check_payload()
调用MongoDB
协议头解码逻辑
在此过程,把
protocol(&self)
和parsable_on_udp(&self)
也一并处理。
1
2
3
4
5
6
7
8
9
10
11
12impl L7ProtocolParserInterface for MongoDBLog {
fn check_payload(&mut self, payload: &[u8], param: &ParseParam) -> bool {
let mut header = MongoDBHeader::default();
header.decode(payload);
return header.is_request();
}
fn protocol(&self) -> L7Protocol {
L7Protocol::MongoDB
}
// udp协议的跳过解码
fn parsable_on_udp(&self) -> bool {false}
}第一步的效果展示
到这一步的解码将会得到如下展示效果,接下来还需要对具体的协议操作码做进一步解码。
image
解码协议的第二步是对关键指令定义结构体和解码接口逻辑实现,对应处理是
L7ProtocolParserInterface::parse_payload()
代码实现,这里以OP_MSG
为例定义
OP_MSG
操作码的结构体并解码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
pub struct MongoOpMsg {
flag: u32,
sections: Sections,
checksum: Option<u32>,
}
impl MongoOpMsg {
fn decode(&mut self, payload: &[u8]) -> Result<bool> {
// 略过偏移逻辑
let _ = sections.decode(&payload);
self.sections = sections;
Ok(true)
}
}对
OP_MSG
操作码中业务需要关注的字段Sections
做进一步解码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
struct Sections {
kind: u8,
kind_name: String,
// kind: 0 mean doc
doc: Document,
// kind: 1 mean body
size: Option<i32>,
c_string: Option<String>,
}
impl Sections {
pub fn decode(&mut self, payload: &[u8]) -> Result<bool> {
match self.kind {
0 => {// Body}
1 => {// Doc}
2 => {// Internal}
_ => {// Unknown}
}
Ok(true)
}
}处理
L7ProtocolParserInterface::parse_payload
,返回L7ProtocolInfo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
pub struct MongoDBLog {
info: MongoDBInfo,
perf_stats: Option<L7PerfStats>,
}
impl L7ProtocolParserInterface for MongoDBLog {
fn parse_payload(&mut self, payload: &[u8], param: &ParseParam) -> Result<L7ParseResult> {
let mut info = MongoDBInfo::default();
self.parse(payload, param.l4_protocol, param.direction, &mut info)?; // 解码得到L7ProtocolInfo
}
}
impl MongoDBLog {
fn parse(&mut self,payload:&[u8],proto:IpProtocol,dir:PacketDirection,info:&mut MongoDBInfo,)-> Result<bool> { // 解码指令获取请求和响应等信息}
// command decode
match info.op_code {
_OP_MSG if payload.len() > _MSG_DOC_SECTION_OFFSET => {
// OP_MSG
let mut msg_body = MongoOpMsg::default();
// TODO: Message Flags
msg_body.decode(&payload[_MSG_DOC_SECTION_OFFSET..])?;
}
}
}
}为
MongoDBInfo
实现L7ProtocolInfoInterface
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21impl L7ProtocolInfoInterface for MongoDBInfo {
fn session_id(&self) -> Option<u32> {
// 这里返回流标识id,例如 http2 返回 streamid,dns 返回 transaction id,如果没有就返回 None
}
fn merge_log(&mut self, other: L7ProtocolInfo) -> Result<()> {
// 这里的self必定是请求,other必定是响应
if let L7ProtocolInfo::MongoDBInfo(other) = other {
self.merge(other);
}
Ok(())
}
fn app_proto_head(&self) -> Option<AppProtoHead> {
// 这里返回一个 AppProtoHead 结构,返回 None 直接丢弃这段数据
Some(AppProtoHead {
proto: L7Protocol::MongoDB,
})
}
fn is_tls(&self) -> bool {
self.is_tls
}
}为
MongoDBInfo
实现L7ProtocolSendLog
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20impl From<MongoDBInfo> for L7ProtocolSendLog {
fn from(f: MongoDBInfo) -> Self {
let log = L7ProtocolSendLog {
// 这里需要把 info 转换成统一的发送结构 L7ProtocolSendLog
};
return log;
}
}
// 参考源码来自:deepflow/agent/src/flow_generator/protocol_logs/pb_adapter.rs
pub struct L7ProtocolSendLog {
pub req_len: Option<u32>,
pub resp_len: Option<u32>,
pub row_effect: u32,
pub req: L7Request,
pub resp: L7Response,
pub version: Option<String>,
pub trace_info: Option<TraceInfo>,
pub ext_info: Option<ExtendedInfo>,
}把实现
L7ProtocolParserInterface
的接口,添加到deepflow/agent/src/common/l7_protocol_log.rs
中的impl_protocol_parser!
宏。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17impl_protocol_parser! {
pub enum L7ProtocolParser {
// http have two version but one parser, can not place in macro param.
// custom must in frist so can not place in macro
DNS(DnsLog),
SofaRPC(SofaRpcLog),
MySQL(MysqlLog),
Kafka(KafkaLog),
Redis(RedisLog),
+ MongoDB(MongoDBLog),
PostgreSQL(PostgresqlLog),
Dubbo(DubboLog),
FastCGI(FastCGILog),
MQTT(MqttLog),
// add protocol below
}
}第二步的效果
image
通过
perf_states
统计记录QPS
、耗时
和异常
情况1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27impl L7ProtocolParserInterface for MongoDBLog {
fn parse_payload(&mut self, payload: &[u8], param: &ParseParam) -> Result<L7ParseResult> {
let mut info = MongoDBInfo::default();
self.parse(payload, param.l4_protocol, param.direction, &mut info)?; // 解码得到L7ProtocolInfo
info.cal_rrt(param, None).map(|rrt| {
info.rrt = rrt;
+ self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); // 耗时
});
}
impl MongoDBLog {
fn parse(&mut self,payload:&[u8],proto:IpProtocol,dir:PacketDirection,info:&mut MongoDBInfo,) -> Result<bool> { // 解码指令获取请求和响应等信息
if header.is_request() {
+ self.perf_stats.as_mut().map(|p: &mut L7PerfStats| p.inc_req()); // 请求记录
} else {
+ self.perf_stats.as_mut().map(|p| p.inc_resp()); // 响应记录
}
match info.op_code {
_OP_REPLY if payload.len() > _HEADER_SIZE => {
let mut msg_body = MongoOpReply::default();
msg_body.decode(&payload[_HEADER_SIZE..])?;
if !msg_body.reply_ok {
+ self.perf_stats.as_mut().map(|p| p.inc_resp_err());// 异常记录
}
}
}
}
}效果如图: ![image](http://yunshan-guangzhou.oss-cn-beijing.aliyuncs.com/yunshan-ticket/png/d2b5ca33bd970f64a6301fa75ae2eb22_20231205160823.png)
最后在 deepflow-server 补充服务端的协议识别
以下两部分内容在代码文件server/libs/datatype/flow.go
中1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21type L7Protocol uint8
const (
L7_PROTOCOL_UNKNOWN L7Protocol = 0
L7_PROTOCOL_OTHER L7Protocol = 1
L7_PROTOCOL_HTTP_1 L7Protocol = 20
L7_PROTOCOL_HTTP_2 L7Protocol = 21
L7_PROTOCOL_HTTP_1_TLS L7Protocol = 22
L7_PROTOCOL_HTTP_2_TLS L7Protocol = 23
L7_PROTOCOL_DUBBO L7Protocol = 40
L7_PROTOCOL_GRPC L7Protocol = 41
L7_PROTOCOL_SOFARPC L7Protocol = 43
L7_PROTOCOL_FASTCGI L7Protocol = 44
L7_PROTOCOL_MYSQL L7Protocol = 60
L7_PROTOCOL_POSTGRE L7Protocol = 61
L7_PROTOCOL_REDIS L7Protocol = 80
+ L7_PROTOCOL_MONGODB L7Protocol = 81
L7_PROTOCOL_KAFKA L7Protocol = 100
L7_PROTOCOL_MQTT L7Protocol = 101
L7_PROTOCOL_DNS L7Protocol = 120
L7_PROTOCOL_CUSTOM L7Protocol = 127
)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28func (p L7Protocol) String() string {
formatted := ""
switch p {
case L7_PROTOCOL_HTTP_1:
formatted = "HTTP"
case L7_PROTOCOL_DNS:
formatted = "DNS"
case L7_PROTOCOL_MYSQL:
formatted = "MySQL"
case L7_PROTOCOL_POSTGRE:
formatted = "PostgreSQL"
case L7_PROTOCOL_REDIS:
formatted = "Redis"
+ case L7_PROTOCOL_MONGODB:
+ formatted = "MongoDB"
case L7_PROTOCOL_DUBBO:
formatted = "Dubbo"
case L7_PROTOCOL_GRPC:
formatted = "gRPC"
case L7_PROTOCOL_CUSTOM:
formatted = "Custom"
case L7_PROTOCOL_OTHER:
formatted = "Others"
default:
formatted = "N/A"
}
return formatted
}server/querier/db_descriptions/clickhouse/tag/enum/l7_protocol
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19# Value , DisplayName , Description
0 , N/A ,
1 , Others ,
20 , HTTP ,
21 , HTTP2 ,
22 , HTTP1_TLS ,
23 , HTTP2_TLS ,
40 , Dubbo ,
41 , gRPC ,
43 , SOFARPC ,
44 , FastCGI ,
60 , MySQL ,
61 , PostgreSQL ,
80 , Redis ,
+ 81 , MongoDB ,
100 , Kafka ,
101 , MQTT ,
120 , DNS ,
127 , Custom ,
到这里已经完成 DeepFlow Agent 的原生协议扩展了,参考《# 完整指南:如何编译、打包和部署二次开发的 DeepFlow 》编译程序发布即可。
如果想快速实现一个协议采集解析,或者不熟悉Rust语言呢?我们还有一个选择,就是利用Wasm插件快速扩展协议解码。
0x2: 用 Wasm 插件增强 DeepFlow 协议采集能力(Kafka)
该案例是用 Wasm 扩展 Kafka 协议支持 Topic
的实践。
首先还是参考Kafka的官方文档对 Kafka协议 做一个简单的分析
Kafka协议分析
Kafka
的 Header
和 Data
概览
image
Kafka 的 Fetch API
image
Kafka 的 Produce API
image
Kafka 协议 DeepFlow Agent 原生解码:
截止到 v6.3.x 版本,DeepFlow Agent 对 Kafka的原生解码如下图所示,还不支持 Topic 字段的解码,
且 API 的解码还没有版本号。
接下来的插件开发主要解决 Topic 字段的解码放在 resource 展示,同时把 API 的版本号也解析出来。
image
DeepFlow Agent 的 Wasm 插件
参考官方插件文档《 wasm-plugin》,需要注意两点:
DeepFlow Agent 通过遍历所有支持协议判断一个流的应用层协议,顺序是:
HTTP
->Wasm Hook
->DNS
-> …需要使用
Go
版本不低于1.21
并且tinygo
版本需要不低于0.29
Wasm Go SDK 的框架
先对框架有一个大概的认识,如下代码所示,整个框架逻辑都在以下五个接口函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20package main
import "github.com/deepflowio/deepflow-wasm-go-sdk/sdk"
// 定义结构,需要实现 sdk.Parser 接口
type plugin struct {}
func (p plugin) HookIn() []sdk.HookBitmap {return []sdk.HookBitmap{}}
// HookIn() 包含 HOOK_POINT_HTTP_REQ 时,http 请求解析完成返回之前会调用。
// HttpReqCtx 包含了 BaseCtx 和已经解析出来的一些 http 头部
func (p plugin) OnHttpReq(ctx *sdk.HttpReqCtx) sdk.Action {
return sdk.HttpReqActionAbortWithResult(nil, trace, attr)
}
func (p plugin) OnHttpResp(ctx *sdk.HttpRespCtx) sdk.Action {return sdk.ActionNext()}
func (p plugin) OnCheckPayload(baseCtx *sdk.ParseCtx) (uint8, string) {return 0, "ownwasm"}
func (p plugin) OnParsePayload(baseCtx *sdk.ParseCtx) sdk.ParseAction {
return sdk.ParseActionAbortWithL7Info([]*sdk.L7ProtocolInfo{})
}
// main 需要注册解析器
func main() {
sdk.SetParser(plugin{})
}DeepFlow Agent 会遍历所有插件调用对应的 Export 函数,但是遍历的行为可以通过返回值控制
返回值 说明 sdk.ActionNext() 停止当前插件,直接执行下一个插件 sdk.ActionAbort() 停止当前插件并且停止遍历 sdk.ActionAbortWithErr(err) 停止当前插件,打印错误日志并且停止遍历 sdk.HttpActionAbortWithResult() Agent 停止遍历并且提取相应返回结果 sdk.ParseActionAbortWithL7Info() Agent 停止遍历并且提取相应返回结果 ⚠️注意:
因为该案例不涉及
HTTP
协议的处理,所以OnHttpReq()
和OnHttpResp()
直接使用sdk.ActionNext()
跳过即可。该案例也不会用到
sdk.HttpActionAbortWithResult()
。HookBitmap
的三个hook
点hook点 说明 HOOK_POINT_HTTP_REQ 表示 http 请求解析完成返回之前 HOOK_POINT_HTTP_RESP 表示 http 响应解析完成返回之前 HOOK_POINT_PAYLOAD_PARSE 表示协议的判断和解析 ⚠️注意:因为该案例不涉及
HTTP
协议的处理,所以HOOK_POINT_HTTP_REQ
和HOOK_POINT_HTTP_RESP
在该案例也不会用到。
插件代码指引
梳理后的 Kafka 协议的 Wasm 插件代码框架
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19package main
import "github.com/deepflowio/deepflow-wasm-go-sdk/sdk"
// 定义结构,需要实现 sdk.Parser 接口
type kafkaParser struct {}
func (p kafkaParser) HookIn() []sdk.HookBitmap {
return []sdk.HookBitmap{sdk.HOOK_POINT_PAYLOAD_PARSE}
}
// 跳过HTTP协议处理
func (p kafkaParser) OnHttpReq(ctx *sdk.HttpReqCtx) sdk.Action {return sdk.ActionNext()}
func (p kafkaParser) OnHttpResp(ctx *sdk.HttpRespCtx) sdk.Action {return sdk.ActionNext()}
// 协议判断检查
func (p kafkaParser) OnCheckPayload(baseCtx *sdk.ParseCtx) (uint8, string) {return 100, "kafka"}
// 协议解码
func (p kafkaParser) OnParsePayload(baseCtx *sdk.ParseCtx) sdk.ParseAction {
return sdk.ParseActionAbortWithL7Info([]*sdk.L7ProtocolInfo{})
}
// main 需要注册解析器
func main() {sdk.SetParser(plugin{})}协议识别
⚠️注意:以下代码注释
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27func (p kafkaParser) OnCheckPayload(ctx *sdk.ParseCtx) (uint8, string) {
// 跳过UDP协议数据
if ctx.L4 != sdk.TCP {
return 0, ""
}
// 如果环境有标准规范的端口约定,插件中指定端口会减少协议数据的遍历,优化解码时cpu等资源消耗
if ctx.DstPort < 9092 || ctx.DstPort > 9093 {
return 0, ""
}
// 读取抓包数据
payload, err := ctx.GetPayload()
if err != nil {
sdk.Error("get payload fail: %v", err)
return 0, ""
}
// 引用"github.com/segmentio/kafka-go/protocol"来解码
bl, err := protocol.ReadAll(protocol.NewBytes(payload))
if err != nil {
sdk.Error("read payload fail: %v", err)
return 0, ""
}
b, _ := decodeHeader(bl)
if !b {
return 0, ""
}
return WASM_KAFKA_PROTOCOL, "kafka"
}协议 API 解码
官方代码框架
OnParsePayload()
的逻辑如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31func (p plugin) OnParsePayload(baseCtx *sdk.ParseCtx) sdk.ParseAction {
// ctx.L7 就是 OnCheckPayload 返回的协议号,可以先根据4层协议或协议号过滤。
if ctx.L4 != sdk.TCP {return sdk.ActionNext()}
payload, err := ctx.GetPayload()
if err != nil {return sdk.ActionAbortWithErr(err)}
// the parse logic here
// ...
/* 关于 L7ProtocolInfo 结构:
type L7ProtocolInfo struct {
ReqLen *int // 请求长度 例如 http 的 content-length
RespLen *int // 响应长度 例如 http 的 content-length
RequestID *uint32 // 子流的id标识,例如 http2 的 stream id,dns 的 transaction id
Req *Request
Resp *Response
Trace *Trace // 跟踪信息
Kv []KeyVal // 对应 attribute
}
type Request struct {
ReqType string // 对应请求类型
Domain string // 对应请求域名
Resource string // 对应请求资源
Endpoint string // 对应 endpoint
}
type Response struct {
Status RespStatus // 对应响应状态
Code *int32 // 对应响应码
Result string // 对应响应结果
Exception string // 对应响应异常
}*/
return sdk.ParseActionAbortWithL7Info([]*sdk.L7ProtocolInfo{})
}Topic 字段解码的代码逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31func (p kafkaParser) OnParsePayload(ctx *sdk.ParseCtx) sdk.Action {
// the parse logic here
// ...
// 解码 header base size :
// req_len(int32) + api_key(int16) + api_ver(int16) + c_id(int32) + client_len(int16)
// = 14
var header_offset = 14 + header.clientLen
var topic_size int16 = 0
var topic_name = ""
switch protocol.ApiKey(header.apikey) {
case protocol.Produce:
topic_size, topic_name = decodeProduce(header.apiversion, payload[header_offset:])
case protocol.Fetch:
topic_size, topic_name = decodeFetch(header.apiversion, payload[header_offset:])
}
if topic_size == 0 {
return sdk.ActionNext()
}
req = &sdk.Request{
ReqType: protocol.ApiKey(header.apikey).String() + "_v" + strconv.Itoa(int(header.apiversion)),
Resource: topic_name,
}
return sdk.ParseActionAbortWithL7Info([]*sdk.L7ProtocolInfo{
{
RequestID: &id,
ReqLen: &length,
Req: req,
},
})
}
加载插件和效果展示
执行如下命令编译插件,通过CTL方式加载插件
1 | tinygo build -o build/topic.wasm -target wasi -panic=trap -scheduler=none -no-debug ./wasm/kafka/topic.go |
准备好 DeepFlow Agent 的配置文件增加如下配置。注意,DeepFlow Agent 可以加载多个 Wasm 插件。
1 | ############ |
执行命令更新配置
1 | deepflow-plugin git:(main) ✗ deepflow-ctl agent-group-config update -f g-d2d06af17e.yaml |
当 DeepFlow Agent 日志出现如下图黄字体内容,即加载成功。
image
在 Grafana 上,可以看到原生的 Kafka 协议被覆盖,出现了几个变化:
Protocol
字段从Kafka
变成Custom
Request type
字段的API
多了版本号Request resource
字段出现了Topic
信息
image
0x3: 两种扩展方法的总结对比
最后对比一下两个协议扩展的方式,要注意⚠️的是:
- 两者都存在一个共性问题,就是每增加一个协议,识别协议解码的效率相对降低
- 可以通过配置的方式减少需解码的协议数量
原生Rust扩展:
- 优点:
- 运行时的资源占用比插件低
- 支持的功能比插件的丰富,且定制性更灵活
- 缺点:
- 在语言方面的开发难度比插件的大
- 相对插件开发而言,新增协议需要改动的地方较多,还涉及到 Server 的一小部分代码
Wasm插件扩展:
- 优点:
- 用 Golang 开发相对 Rust 语言难度较低
- 可在运行时通过 CLI 方式加载
- 扩展性强
- 缺点:
- Go 的标准库和第三方库有一定的限制,且调试难度大,导致插件异常较难排除
- 由于Wasm本身限制等问题,导致功能相对 Rust 原生开发较弱
- 资源增加,特别是内存方面。
0x4: 参考资料
0x05: 什么是 DeepFlow
DeepFlow 是云杉网络开发的一款可观测性产品,旨在为复杂的云基础设施及云原生应用提供深度可观测性。DeepFlow 基于 eBPF 实现了应用性能指标、分布式追踪、持续性能剖析等观测信号的零侵扰(Zero Code
)采集,并结合智能标签(SmartEncoding
)技术实现了所有观测信号的全栈(Full Stack
)关联和高效存取。使用 DeepFlow,可以让云原生应用自动具有深度可观测性,从而消除开发者不断插桩的沉重负担,并为 DevOps/SRE 团队提供从代码到基础设施的监控及诊断能力。
GitHub 地址:https://github.com/deepflowio/deepflow
访问 DeepFlow Demo,体验零插桩、全覆盖、全关联的可观测性。