Filebeat 工作原理
如官方文档所描述的那样,filebeat 包含两个主要的组件,inputs
和harvesters
.
harvesters 组件
harvester
负责读取单个文件的内容.它逐行读取每个文件,将内容发送到 output
.对于每个文件,filebeat 都会启动一个 harvester,负责打开和关闭文件.默认情况下 filebeat 会使文件处于打开状态,直到达到 close_inactive
配置的未读取到内容的超时时长.
需要注意的是: 如果 harvester 在读取文件过程中,对该文件删除或重命名,filebeat 会继续读取该文件,直到 harvester 关闭为止.
inputs 组件
input
负责管理 harvesters 并查找所有可读取的输入源文件.
如果 input 类型为 log
,则 input 将查找与定义的 glob 路径匹配的所有文件,并为每个文件启动 harvester.每个 intput 都在其自己的 goroutine 中运行.示例如下:
1 | filebeat.inputs: |
input 支持多种类型,详见input types
filebeat 如何保存文件状态
filebeat 会保存每个文件的读取状态,并将相关状态刷新到磁盘中的 registry file
(注册表文件)中,用于记住 harvester 正在读取的文件的偏移量,并确保发送了所有的日志内容.重新启动 filebeat 时,会使用注册表文件中的数据来重建读取状态,并且 harvester 会在上一次读取到的位置继续进行读取.
由于在 harvester 读取文件过程中,可以重命名或移动文件,因此文件名和路径不足以标识文件.对于每个文件,filebeat 都存储唯一的标识符已检测文件是否被读取过
如果每天创建大量新文件,则可能会发现注册表文件变得太大.有关可以设置为解决此问题的配置选项的详细信息,请参阅注册表文件太大相关内容.主要是使用 clean_removed
和clean_inactive
配置,分别对被忽略的旧文件和删除旧文件的注册表信息进行删除.
filebeat 如何保证事件至少一次被传递
filebeat 之所以能够实现此行为,是因为它在注册表文件中存储了每个事件的传递状态.在定义的 output 无法访问且尚未确认所有事件的情况下,filebeat 将继续尝试发送事件,直到 output 确认它已经收到事件为止.
重新启动 filebeat 时,将再次发送之前发送到 output 但在未确认的所有事件.这样可以确保每个事件至少发送一次,但是最终可能会将重复的事件发送到 output.您可以通过 shutdown_timeout
配置项设置 filebeat 在关闭之前等待的时间.
filebeat 配置
filebeat 配置文件默认为 filebeat.yml
.官方给出了不同平台下 filebeat 配置文件的位置.详见官方文档
filebeat 支持对 input 等配置进行拆分,并以文件形式包含到主配置文件中
- 可以使用
filebeat.config.inputs
将 inputs 相关配置拆分后,包含到主配置文件中.
1 | filebeat.config.inputs: |
- 可以使用
filebeat.config.modules
将 modules 相关配置拆分后,包含到主配置文件中.
1 | filebeat.config.modules: |
inputs
inputs 指定了 filebeat 如何查找和处理输入的数据.配置块为 filebeat.inputs
.
inputs 支持多种输入类型,常用的主要如下:
log
: 从日志文件中读取内容
inputs 包含一些通用的配置,主要如下:
通用配置 | 描述 | 默认值 |
---|---|---|
enabled | 是否启用该 input | true |
tags | filebeat 为每个事件添加的标签字段列表,主要用于过滤操作 | - |
fields | 添加额外的字段列表到 output.添加的额外字段默认会在 fields 字段下 |
- |
fields_under_root | 将额外添加的字段直接添加到 output 中,而不在 fields 字段下 |
false |
processors | 应用于输入数据的处理器列表 | - |
keep_null | 是否将值为 null 字段在 output 文档中发布 | false |
index | 设置来自此输入事件的索引 | 示例: %{[agent.name]}-myindex-%{+ yyyy.MM.dd} 替换为 filebeat-myindex-2019.11.01 |
log
log
类型的 input 用于从日志文件中读取行,它包含如下参数:
path
: input 读取文件的路径列表recursive_glob.enabled
: 是否将**
扩展为递归 glob 匹配.最多支持 8 级.默认为 true
以下为
log
,container
类型共有的参数
encoding
: 用于读取字符数据的文件编码.如:plain
,utf-8
,gbk
.exclude_lines
: 删除与列表中正则表达式匹配的所有行include_lines
: 仅保留与列表中正则表达式匹配的所有行harvester_buffer_size
: harvester 读取文件时的缓冲区大小,单位字节.默认为 16384(16KB)json
: 解码结构为 JSON 消息的日志.exclude_files
: 忽略列表中正则表达式匹配的文件ingore_older
: 忽略在指定时间之前修改的文件close_inactive
: 若在指定时间内文件内容未更新,则关闭文件句柄.一般设置为比日志文件更新频率大的值.默认值为 5m.close_renamed
: 是否在重命名文件后,关闭文件处理程序.默认为 falseclose_removed
: 是否在文件删除后,关闭文件处理程序.默认为 falseclose_eof
: 是否在达到文件末尾时,立即关闭文件.默认为 falseclose_timeout
: 在指定的超时时间后,harvester 关闭文件.若文件仍在更新,可以在scan_frequency
后在次启动新的 harvester.对于释放从磁盘上删除的文件特别有用.默认为 0,表示禁用clean_inactive
: 若在指定时间内文件内容未更新,则删除注册表中文件的状态信息clean_removed
: 是否在重命名文件后,则删除注册表中文件的状态信息.默认为 true.若文件在段时间内消息并再次出现,则将从头开始读取文件内容scan_frequency
: 指定每隔多久扫描一次 path 路径中新文件的频率.默认为 10sscan.sort
: 对扫描到的文件按指定字段进行排序.可选值为使用modtime
修改文件和filename
文件名称进行排序.默认为空值scan.order
: 对扫描到的文件进行升序或降序排序.可选值为asc
升序和desc
降序.tail_files
: 是否从文件末尾开始进行读取文件,默认值为 falsesymlinks
: 是否除常规文件外还读取符号链接文件.默认为 false.如果符号链接与源文件都在 path 路径下,则可能会发送重复的数据.在处理 Kubernetes 日志文件时很有用.harvester_limit
: 限制一个 input 并行启动 harvester 的数量.默认为 0,表示没有限制.
container
container
类型的 input 用于从容器日志文件中读取日志文件,它包含如下参数:
stream
: 仅从指定的数据流中读取,可选值为all
,stdout
或stderr
.默认值为all
.format
: 使用给定格式读取日志文件.auto
,docker
或cri
.默认为auto
,自动检测.
其余参数见 log
kafka
kafka
类型的 input 用于从 Kafka 集群中读取日志文件,主要支持的 kafka 集群版本为 0.11 - 2.1.0
.它包含如下参数:
hosts
: kafka 集群主机列表topics
: 从指定 kafka 主题中读取消息group_id
: kafka 消费者组的 idclient_id
: kafka 客户端 id.可选参数version
: kafka 协议版本.默认为1.0.0
initial_offset
: 从主题读取的初始位置.可选值为oldest
或newest
.默认为oldest
.connect_backoff
: 出现错误后,重新连接 kafka 集群的等待时间.默认为 30s.consume_backoff
: 读取失败后,重新读取数据的等待时间.默认为 2s.max_wait_time
: 读取时等待最小输入字节数的时间.默认值为 250ms.wait_close
: 关闭时,等待传输消息并确认的时间.isolation_level
: 设置 kafka 的组隔离级别.默认为read_uncommitted
read_uncommitted
: 返回消息通道中的所有消息read_committed
: 隐藏作为中止的事务的一部分的消息
fatch
: 从 kafka 获取数据的相关设置.min
,default
,max
分别设置获取数据的最小,默认,最大 bytes 数.默认分别为1,1MB,0.
rebalance
: kafka 负载均衡设置strategy
: 负载均衡策略.range
遍历或roundrobin
轮询.默认为range
timeout
: 负载均衡的超时时间.默认为 60smax_retries
: 负载均衡失败后重启多少次.默认为 4retry_backoff
: 负载均衡失败后要等待多长时间.默认为 2s.
redis
redis
类型的 input 用于从 redis 慢日志中读取数据.它包含如下参数:
hosts
: redis 服务列表password
: 连接 redis 的密码scan_frequency
: 从 redis 慢日志中读取数据的频率.默认为 10s.timeout
: 等待 redis 响应的超时时间.默认为 1s.network
: redis 连接的网络类型.可选值为tcp
,tcp4
,tcp6
,unix
.默认值为tcp
.maxconn
: redis 客户端的最大连接数.默认为 10.
syslog
syslog
类型的 input 用于从 TCP,UDP,Unix 套接字中读取事件.它包含如下参数:
参数 | 描述 | protocol.udp |
protocol.tcp |
protocol.unix |
---|---|---|---|---|
max_message_size |
接收数据的最大大小 | 10KiB | 20MiB | 20MiB |
host |
监听事件流的主机和端口 | - | - | 不支持 |
path |
unix 套节点地址 | 不支持 | 不支持 | - |
socket_type |
unix 套接字类型 | 不支持 | 不支持 | stream 或 datagram ,默认为 stream |
group |
unix 套接字的属组 | 不支持 | 不支持 | 运行 filebeat 程序的用户属组 |
mode |
unix 套接字的文件模式 | 不支持 | 不支持 | 默认为 0755 |
read_buffer |
套接字上读取缓冲区的大小 | - | - | - |
timeout |
套接字的读写超时时间 | - | - | - |
frame |
指定用于拆分传入事件的帧 | 不支持 | - | - |
line_delimiter |
指定分割传入事件的字符 | 不支持 | \n | \n |
max_connections |
指定任意时间点的最大连接数 | 不支持 | - | - |
timeout |
不活动的远程连接关闭之前的秒数 | 不支持 | 300s | 300s |
ssl |
ssl 参数配置 | 不支持 | - | - |
stdin
redis
类型的 input 用于从 stdin 中读取事件,它主要用于测试.
output
您可以通过设置 filebeat.yml
配置文件的 output
部分中的参数,将filebeat 中的事件写入到指定 output.filebeat 只能定义一个 output.
elasticsearch
elasticsearch
类型的 output 将事件输出到 elasticsearch.它包含以下参数:
api_key
: 使用 api_key 保护与 elasticsearch 的通信.username
: elasticsearch 用户名password
: elasticsearch 密码parameters
: HTTP 参数字典protocol
: elasticsearch 协议名称,http
或https
.默认为http
path
: HTTP API 调用之前的 HTTP 路径前缀.headers
: 自定义 HTTP 请求头
以下为
elasticsearch
,logstash
类型共有的参数
enabled
: 是否启用此输出,默认为true
hosts
: elasticsearch 的主机列表.compression_level
: 事件的 gzip 压缩级别,1-9
压缩级别逐级递增.默认为 0,表示不压缩escape_html
: 是否转义 html 为字符串.默认为 falseworker
: 向每个 elasticsearch 实例发送事件的工作程序数,最好在负载均衡模式下使用.默认为 1.proxy_url
: 连接 elasticsearch 的代理 URLindex
: 写入事件事的索引名称.默认值为filebeat-%{[agent.version]}-%{+yyyy.MM.dd}
indices
: 索引选择器规则的数组.支持条件判断等复杂条件index
: 索引名称mappings
: 一个字典,使用 index 返回的值,并将其映射到新名称default
: 如果 mapping 没有找到匹配项,则使用默认的字符串when
: 在何时使用此索引.见如下示例
1 | # when 条件判断示例 |
bulk_max_size
: 单个 elasticsearch 批量 API 索引请求中要批量处理的最大事件数.默认为 50.backoff.init
: 网络错误后尝试重新连接到 elasticsearch 的等待时长.backoff.max
: 网络错误后尝试重新连接到 elasticsearch 的最大等待时长.默认为 60stimeout
: elasticsearch 请求的超时时长,默认为 90s.ssl
: SSL 配置相关
logstash
logstash
类型的 output 将事件输出到 logstash.它包含以下参数:
其余参数见 elasticsearch
loadbalance
: 是否启用多个 logstash 的负载均衡.默认为 false,随机分发到主机上ttl
: filebeat 与 logstash 连接的生存时间,之后将重新建立连接.默认为 0,表示禁用此功能
kafka
kafka
类型的 output 将事件输出到 kafka.它包含以下参数:
enabled
: 是否启用此 outputhosts
: kafka 集群主机列表topic
: 从指定 kafka 主题中读取消息topics
: topic 选择器规则的数组.支持条件判断等复杂条件topic
: topic 名称mappings
: 一个字典,使用 topic 返回的值,并将其映射到新名称default
: 如果 mapping 没有找到匹配项,则使用默认的字符串when
: 在何时使用此 topic.见如下示例
1 | output.kafka: |
version
: kafka 协议版本.默认为1.0.0
username
: 连接 kafka 的用户名password
: 连接 kafka 的密码client_id
: kafka 客户端 id.可选参数worker
: 向每个 kafka 实例发送事件的工作程序数,最好在负载均衡模式下使用.默认为 1.codec
: 编码格式.默认为 jsonmetadata
: kafka 元数据信息设置.bulk_max_size
: 单个 kafka 请求中要批量处理的最大事件数.默认值为 2048.bulk_flush_frequency
: 向 kafka 发送请求之前需要等待的时间.默认为 0.表示没有延迟.timeout
: kafka 响应超时时间.默认值为 30schannel_buffer_size
: kafka 在 output 管道中缓冲的消息数.keep_alive
: 活动连接的保持活动时长.默认为 0.表示禁用连接.compression
: 压缩编码方式.可选值为none
,snappy
,lz4
或gzip
.默认值为gzip
.compression_level
: 压缩级别,0-9 依次升高.默认为 4.max_message_bytes
: json 消息的最大大小.默认为 100 万字节
redis
redis
类型的 output 用于将事件输出到 Redis list 或 channel 中.此插件与 logstash 的 redis 输入插件兼容.它包含如下参数:
enabled
: 是否启用此插件hosts
: redis 服务列表password
: 连接 redis 的密码db
: 连接 redis 的数据库.默认为 0.key
: 发送事件到 redis 的 list 或 channel 的 key 的名称.若为空,则使用index
设置的值.index
: 添加到事件元数据中的索引名称,以供 logstash 使用.默认值为filebeat
.keys
: key 选择器的规则的数组.支持条件判断等复杂条件key
: key 名称mappings
: 一个字典,使用 key 返回的值,并将其映射到新名称default
: 如果 mapping 没有找到匹配项,则使用默认的字符串when
: 在何时使用此 key.见如下示例
1 | output.redis: |
datatype
: 用于发布事件的 redis 数据类型.可选值为list
(使用rpush
命令)或channel
(使用publish
命令).默认为list
.codec
: 发送到 redis 数据的编码格式.默认为json
worker
: redis 连接的网络类型.可选值为tcp
,tcp4
,tcp6
,unix
.默认值为tcp
.loadbalance
: redis 客户端的最大连接数.默认为 10.timeout
: 连接 redis 的超时时间.默认为 5s.backoff.init
: 网络错误后尝试重新连接到 redis 的等待时长.默认为 1s.backoff.max
: 网络错误后尝试重新连接到 redis 的最大等待时长.默认为 60s.max_retries
: 设置重试次数bulk_max_size
: 单个 redis 请求或 pipeline 批量发送数据最大数据大小.默认为 2048.
file
file
类型的 output 用于将事件输出到文件中.每个事件采用 json 格式,主要用于测试.它包含如下参数:
enabled
: 是否启用此插件.默认为 truepath
: 文件位置.必选参数filename
: 生成文件名称.默认设置为 Beat 的名称.如filebeat
生成文件将是filebeat.1
rotate_every_kb
: 每个文件的最大大小,以 kb 为单位.超过此大小后,文件会自动滚动.默认值为 1024 kb.number_of_files
:path
下保存的最大文件数.最早的文件将被删除.默认为 7.permissions
: 指定创建文件的权限.默认值为0600
.codec
: 输出事件的编码格式.默认为 json.
console
console
类型的 output 用于将事件输出到标准输出.每个事件采用 json 格式,主要用于测试.它包含如下参数:
pretty
: 是否以漂亮格式输出.默认为 falseenabled
: 是否启用此插件.默认为 truebulk_max_size
: 输出缓冲区的最大事件数.默认为 1024.codec
: 输出的编码格式
Logging
配置文件 filebeat.yml
的 logging
部分包含用于配置日志输出的选项.日志系统可将日志写入 syslog 或滚动日志文件中.如果未显式配置,则使用滚动文件输出日志.它主要包含如下配置项:
logging.to_stderr
: 如果为 true,则将所有日志写入到标准输出中.与使用-e
选项相同.logging.to_syslog
: 如果为 true,则将所有日志写入到 syslog 中.logging.to_files
: 如果为 true,则将所有日志写入到滚动日志文件中,在日志文件大小到达一定大小后进行滚动logging.level
: 设置日志级别.可选值为debug
,info
,warning
,error
.默认值为info
logging.metrics.enabled
: 是否定期记录其内部数据指标.logging.metrics.period
: 日志记录内部指标的时间间隔.默认为 30s.logging.files.path
: 日志文件写入的目录.默认值为logs
logging.files.name
: 日志的文件名,默认值为filebeat
.logging.files.rotateeverybytes
: 日志文件的最大大小.默认值为 10485760 (10 MB).logging.files.keepfiles
: 日志保存的文件个数.默认值为 7.logging.files.permissions
: 滚动日志时要使用的权限掩码.默认值为 0600.logging.files.interval
: 除了基于大小的滚动外,filebeat 日志还支持基于日期的文件滚动.默认是禁用的logging.files.rotateonstartup
: 如果日志文件启动时已存在,则立即滚动并写入新的文件中.logging.json
: 是否以 json 格式写入文件.默认为 false.
HTTP endpoint
http.enabled
: 是否启用 HTTP endpoint.默认为 falsehttp.host
: 绑定地址,支持主机名,IP 地址, Unix 套接字.默认为localhost
http.port
: http endpoint 监听端口.默认为5066
1 | # /stats 暴露了内部指标,如下 |