使用 Elastic Beats 搜集日志到 Pulsar

阅读本文需要约 5 分钟。

本文主要涉及 Beats 和 Pulsar 服务的搭建、测试、数据的发送及消费。通过本篇文章,能够对 Beats 如何发送数据到 Pulsar,以及如何使用 Pulsar 对数据进行消费有一个比较全面的了解。

关于 Apache Pulsar 和 Elastic Beats

Apache Pulsar 是一个分布式消息发布订阅系统。Elastic Beats 集合了多种单一用途数据采集器。本文针对各种 Beats 的使用做了简要介绍。

Elastic Beats 包括 Filebeat、Metricbeat、Functionbeat、Winlogbeat、Journalbeat 和 Auditbeat,本项目开发的 Output 适用于以上所有 Beat。具体使用方法,后面会逐步说明。本次试验以 Filebeat 的使用为例。

环境依赖

在进行本次试验之前,你需要在电脑上搭建以下环境依赖,本次试验测试是在 Mac 系统上进行的。  

  • Docker:根据 说明

    ( https://www.docker.com/get-started )完成 Docker 的安装。  

由于 Golang 的包在本地 build 会出现一些网络问题,故本次试验(包括 Build 和 Install 等)均在 Docker 中完成。

初始化网络

在测试中 Beats 到 Pulsar 的网络环境必须是相通的,因此需要提前初始化网络。

1docker network create pulsar-beat

该命令会创建一个名为 pulsar-beat 的网络,之后 Pulsar 服务和 Beats 都会连接到该网络。

使用 Docker 镜像编译本项目

1. 下载代码到本地

1git clone https://github.com/streamnative/pulsar-beat-output

由于多方面的原因,本项目把一些依赖如 Apache Pulsar Go Client Elastic Beats  都放在了 vendor 文件夹下,可直接在 Docker 中进行 Build。

Apache Pulsar Go Client:

https://github.com/apache/pulsar/pulsar-client-go/pulsar

Elastic Beats: 

https://github.com/elastic/beats

2. 下载 Docker 镜像并安装依赖

1docker pull golang:1.12.4
2docker run -it -d --network pulsar-beat --name pulsar-beat-golang golang:1.12.4 /bin/bash
3mkdir -p src/github.com/streamnative/pulsar-beat-output
4mv pulsar-beat-output/* src/github.com/streamnative/pulsar-beat-output/
5docker cp src pulsar-beat-golang:/go/

以上,首先拉取 golang 1.12.4 版本的镜像,然后使用 docker run 命令启动该镜像并命名为 pulsar-beat-golang-d 参数使该服务以后台模式运行, -it 以交互模式运行容器,并为容器分配一个伪输入终端。 --network 代表该容器使用 pulsar-beat 这个网络,之后 Pulsar 服务也会加入该网络。

3. 进入容器,安装依赖

1docker exec -it pulsar-beat-golang /bin/bash
2wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/DEB/apache-pulsar-client.deb
3wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/DEB/apache-pulsar-client-dev.deb
4dpkg -i apache-pulsar-client.deb
5dpkg -i apache-pulsar-client-dev.deb

由于本项目使用到了 Pulsar 的 Golang Client ,该 Client 需要依赖 deb 包,因此在编译之前需要提前装好这些包。其他 Linux 发行版可根据需要在这里

( https://archive.apache.org/dist/pulsar/pulsar-2.3.0/ )进行下载安装。

4. 开始编译

继续上面的步骤,使用如下命令编译本项目:

1cd src/github.com/streamnative/pulsar-beat-output/
2go build -o filebeat main.go

执行上述命令后,当前目录下会生成一个名为 filebeat 的可执行文件,之后可以使用这个文件进行数据搜集。

安装测试

单机模式下安装 Pulsar

为方便测试,本次使用单机模式安装 Pulsar,并使用 Docker 镜像 apachepulsar/pulsar:2.3.0 启动服务。新开一个窗口,拉取 Pulsar 的镜像并启动服务。这里测试使用的是 2.3.0 的镜像,当然也可以使用最新版 2.4.0 的镜像。

1docker pull apachepulsar/pulsar:2.3.0
2docker run -d -it --network pulsar-beat -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-beat-standalone apachepulsar/pulsar:2.3.0 bin/pulsar standalone

该服务暴露出 6650 和 8080 端口供外部服务使用,这个容器将 Pulsar 服务的 data 目录挂载在当前目录下,并且将其命名为 pulsar-beat-standalone-d 参数使该服务以后台模式运行, -it 是以交互模式运行该容器,并为容器分配一个伪输入终端。

1docker logs -f pulsar-flume-standalone
2
301:55:55.005 [pulsar-web-55-1] INFO  org.apache.pulsar.broker.web.PulsarWebResource - Successfully validated clusters on tenant [public]
401:55:55.013 [pulsar-ordered-OrderedExecutor-1-0-EventThread] INFO  org.apache.pulsar.zookeeper.ZooKeeperDataCache - [State:CONNECTED Timeout:30000 sessionid:0x10003995a81000a local:/127.0.0.1:54508 remoteserver:localhost/127.0.0.1:2181 lastZxid:156 xid:41 sent:41 recv:43 queuedpkts:0 pendingresp:0 queuedevents:0] Received ZooKeeper watch event: WatchedEvent state:SyncConnected type:NodeDataChanged path:/admin/policies/public/default
501:55:55.014 [pulsar-web-55-1] INFO  org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Successfully updated the replication clusters on namespace public/default
601:55:55.016 [pulsar-web-55-1] INFO  org.eclipse.jetty.server.RequestLog - 172.17.0.3 - - [08/May/2019:01:55:55 +0000] "POST /admin/v2/namespaces/public/default/replication HTTP/1.1" 204 0 "-" "Jersey/2.27 (HttpUrlConnection 1.8.0_181)" 15

使用上述命令查看 Docker 日志,若出现以上日志信息,说明 Pulsar 在单机模式下启动成功了。

启动消费者客户端

测试脚本文件放在这里

( https://github.com/streamnative/pulsar-beat-output/blob/master/pulsar-client.py ),内容如下:

 1import pulsar
 2
 3client = pulsar.Client('pulsar://localhost:6650')
 4consumer = client.subscribe('my_topic', subscription_name='test123')
 5
 6while True:
 7    msg = consumer.receive()
 8    print("Received message: '%s'" % msg.data())
 9    consumer.acknowledge(msg)
10
11client.close()

该脚本会连接到本地 Pulsar 服务的 6650 端口,因为是在 Pulsar 服务所在的容器中启动,因此服务地址是 localhost: 6650 。该脚本会在 my_topic 这一 Topic 上进行消费,该 Topic 是 Beats 中配置的 Topic ,并且指定订阅名称为 test123

1docker cp pulsar-client.py pulsar-beat-standalone:/pulsar
2docker exec -it pulsar-beat-standalone /bin/bash
3python pulsar-client.py

以上命令把消费者脚本复制到 pulsar-beat-standalone 容器中,然后进入容器,使用 python 命令启动该脚本。出现如下信息时,说明该消费者成功启动。

1root@1cda587d2694:/pulsar# python pulsar-client.py
22019-05-08 02:02:01.823 INFO  Client:88 | Subscribing on Topic :my_topic
32019-05-08 02:02:01.824 INFO  ConnectionPool:72 | Created connection for pulsar://localhost:6650
42019-05-08 02:02:01.825 INFO  ClientConnection:300 | [127.0.0.1:47872 -> 127.0.0.1:6650] Connected to broker
52019-05-08 02:02:01.845 INFO  HandlerBase:52 | [persistent://public/default/my_topic, test123, 0] Getting connection from pool
62019-05-08 02:02:01.870 INFO  ConnectionPool:72 | Created connection for pulsar://1cda587d2694:6650
72019-05-08 02:02:01.871 INFO  ClientConnection:302 | [127.0.0.1:47874 -> 127.0.0.1:6650] Connected to broker through proxy. Logical broker: pulsar://1cda587d2694:6650
82019-05-08 02:02:01.872 INFO  ConsumerImpl:169 | [persistent://public/default/my_topic, test123, 0] Created consumer on broker [127.0.0.1:47874 -> 127.0.0.1:6650]

配置并启动 Filebeat

本次试验使用 Filebeat 进行测试,对于其他 Beat ,除最开始的编译不同外,配置方式是类似的,后面会介绍其他 Beat 的编译方式。

配置 Filebeat

回到最开始编译的地方,继续向下进行。首先需对 Filebeat 进行配置,增加如下内容到 filebeat.yml 

( https://github.com/streamnative/pulsar-beat-output/blob/master/filebeat.yml ) 文件。

1output.pulsar:
2  url: "pulsar://pulsar-beat-standalone:6650"
3  topic: my_topic
4  name: test123

该配置的各项含义:

  • url: Pulsar 服务的地址。因为 Filebeat 和 Pulsar 都在同一个网络 pulsar-beat 中,因此可直接使用 hostname 进行连接。

  • topic: Pulsar 中接收数据的主题。

  • name: 生产者的名称。

有了上面几项配置,可成功进行本次试验。关于更多配置,参考这里

( https://github.com/streamnative/pulsar-beat-output )。

修改目录权限

1chown -R root:root filebeat.yml test_module/modules.d/system.yml test_module/module/system
2cp test_module/module/system/auth/test/test.log /var/log/messages.log
3cp filebeat filebeat.yml test_module

本次测试使用 Filebeat 搜集系统日志,发送到 Pulsar 服务。其中,应用了 Filebeat 中的 system 模块,修改权限后,复制测试日志到 /var/log 下,因为在 system 模块的配置文件

( https://github.com/streamnative/pulsar-beat-output/blob/master/test_module/module/system/syslog/manifest.yml )

中默认搜集该目录下日志的数据。最后,复制 Filebeat 的可执行文件 filebeat 和配置文件 filebeat.yml 到测试目录下。

1cd test_module
2./filebeat modules enable system
3./filebeat -c filebeat.yml -e

进入测试目录,打开 system 模块,启动 filebeat。 -c 用来指定配置文件, -e 开启日志到控制台。

这时会输出如下类似信息:

 12019-05-08T02:38:58.991Z    INFO    add_cloud_metadata/add_cloud_metadata.go:340    add_cloud_metadata: hosting provider type not detected.
 22019-05-08T02:39:05.976Z    INFO    log/input.go:138    Configured paths: [/var/log/auth.log* /var/log/secure*]
 32019-05-08T02:39:05.976Z    INFO    log/input.go:138    Configured paths: [/var/log/messages* /var/log/syslog*]
 42019-05-08T02:39:05.977Z    INFO    input/input.go:114  Starting input of type: log; ID: 967574352861831335
 52019-05-08T02:39:05.977Z    INFO    input/input.go:114  Starting input of type: log; ID: 16982476733437093780
 62019-05-08T02:39:05.978Z    INFO    log/harvester.go:254    Harvester started for file: /var/log/messages.log
 72019-05-08T02:39:06.979Z    INFO    pipeline/output.go:95   Connecting to file(pulsar://pulsar-beat-standalone:6650)
 82019-05-08T02:39:06.980Z    INFO    pulsar/client.go:64 start create pulsar client
 92019-05-08T02:39:06.980Z    INFO    pulsar/client.go:69 start create pulsar producer
102019/05/08 02:39:06.982 c_client.go:68: [info] INFO  | ConnectionPool:72 | Created connection for pulsar://pulsar-beat-standalone:6650
112019/05/08 02:39:06.986 c_client.go:68: [info] INFO  | ClientConnection:300 | [172.24.0.2:59696 -> 172.24.0.3:6650] Connected to broker
122019/05/08 02:39:07.003 c_client.go:68: [info] INFO  | BatchMessageContainer:43 | { BatchContainer [size = 0] [batchSizeInBytes_ = 0] [maxAllowedMessageBatchSizeInBytes_ = 131072] [maxAllowedNumMessagesInBatch_ = 1000] [topicName = persistent://public/default/my_topic] [producerName_ = test123] [batchSizeInBytes_ = 0] [numberOfBatchesSent = 0] [averageBatchSize = 0]} BatchMessageContainer constructed
132019/05/08 02:39:07.004 c_client.go:68: [info] INFO  | HandlerBase:52 | [persistent://public/default/my_topic, test123] Getting connection from pool
142019/05/08 02:39:07.005 c_client.go:68: [info] INFO  | ConnectionPool:72 | Created connection for pulsar://c37002ed8073:6650
152019/05/08 02:39:07.007 c_client.go:68: [info] INFO  | ClientConnection:302 | [172.24.0.2:59698 -> 172.24.0.3:6650] Connected to broker through proxy. Logical broker: pulsar://c37002ed8073:6650
162019/05/08 02:39:07.021 c_client.go:68: [info] INFO  | ProducerImpl:155 | [persistent://public/default/my_topic, test123] Created producer on broker [172.24.0.2:59698 -> 172.24.0.3:6650]

同时,在刚才开启的消费者端可看到如下信息:

1Received message: '{"@timestamp":"2019-05-08T02:39:05.978Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.0.0","pipeline":"filebeat-7.0.0-system-syslog-pipeline"},"input":{"type":"log"},"host":{"os":{"kernel":"4.9.125-linuxkit","codename":"stretch","platform":"debian","version":"9 (stretch)","family":"debian","name":"Debian GNU/Linux"},"name":"50dd65646ea7","id":"8b6f0bc66c81d2b9ef76a3fc9b52b452","containerized":true,"hostname":"50dd65646ea7","architecture":"x86_64"},"agent":{"id":"e74b6d2c-5ec3-40bb-a23c-d3610d8b40fa","version":"7.0.0","type":"filebeat","ephemeral_id":"3f7daa1b-2f74-4550-b19a-88bf75367c6a","hostname":"50dd65646ea7"},"log":{"offset":0,"file":{"path":"/var/log/messages.log"}},"service":{"type":"system"},"fileset":{"name":"syslog"},"ecs":{"version":"1.0.0"},"message":"Feb 21 21:54:44 localhost sshd[3402]: Accepted publickey for vagrant from 10.0.2.2 port 63673 ssh2: RSA 39:33:99:e9:a0:dc:f2:33:a3:e5:72:3b:7c:3a:56:84","event":{"module":"system","dataset":"system.syslog"}}'
2Received message: '{"@timestamp":"2019-05-08T02:39:05.978Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.0.0","pipeline":"filebeat-7.0.0-system-syslog-pipeline"},"event":{"module":"system","dataset":"system.syslog"},"agent":{"ephemeral_id":"3f7daa1b-2f74-4550-b19a-88bf75367c6a","hostname":"50dd65646ea7","id":"e74b6d2c-5ec3-40bb-a23c-d3610d8b40fa","version":"7.0.0","type":"filebeat"},"ecs":{"version":"1.0.0"},"service":{"type":"system"},"input":{"type":"log"},"host":{"name":"50dd65646ea7","os":{"name":"Debian GNU/Linux","kernel":"4.9.125-linuxkit","codename":"stretch","platform":"debian","version":"9 (stretch)","family":"debian"},"id":"8b6f0bc66c81d2b9ef76a3fc9b52b452","containerized":true,"hostname":"50dd65646ea7","architecture":"x86_64"},"log":{"file":{"path":"/var/log/messages.log"},"offset":152},"message":"Feb 23 00:13:35 localhost sshd[7483]: Accepted password for vagrant from 192.168.33.1 port 58803 ssh2","fileset":{"name":"syslog"}}'

至此,通过 Filebeat 搜集 system 的日志信息并发送到 Pulsar 的整个流程就完成了。

编译其他 Beat  code

1go build -o metricbeat metricbeat.go
2go build -o filebeat filebeat.go
3go build -o functionbeat functionbeat.go
4go build -o journalbeat journalbeat.go
5go build -o auditbeat auditbeat.go
6go build -o winlogbeat winlogbeat.go
7go build -o packetbeat packetbeat.go

其他 Beat 的使用方式同 Filebeat,具体可以参考 官方文档

( https://www.elastic.co/cn/products/beats )。

问题

使用中出现其他问题,可以参考 FAQ

( https://github.com/streamnative/pulsar-beat-output/blob/master/README.md )。

作者 | tuteng

审校 | Ying Zhan +  Jennifer Huang

编辑 | Susan

更多关于 Pulsar 的技术干货和产品动态,请关注 ApachePulsar 微信公众号。

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章