安装Kafka
运行Kafka和ELK Stack需要Java,所以让我们从安装Java开始:sudo apt-get update
sudo apt-get install default-jre接下来,Apache Kafka使用ZooKeeper来维护配置信息和同步,因此我们需要在设置Kafka之前安装ZooKeeper:sudo apt-get install zookeeperd
默认情况下,ZooKeeper侦听端口2181.您可以通过运行以下命令来检查:netstat -nlpt | grep ':2181'
接下来,让我们下载并解压Kafka:wget
tar -xvzf kafka_2.12-2.1.0.tgzsudo cp -r kafka_2.11-2.1.0 / opt / kafka我们现在准备运行Kafka,我们将使用此脚本:sudo /opt/kafka/bin/kafka-server-start.sh
/opt/kafka/config/server.properties您应该会看到一长列INFO消息,最后会显示一条消息,通知您Kafka已成功启动:[2018-12-30 08:57:45,714] INFO Kafka版本:2.1.0(org.apache.kafka.common.utils.AppInfoParser)
[2018-12-30 08:57:45,714] INFO Kafka commitId:809be928f1ae004e(org.apache.kafka.common.utils.AppInfoParser)[2018-12-30 08:57:45,716] INFO [KafkaServer id = 0]已启动(kafka.server.KafkaServer)恭喜,你已经启动并运行Kafka,并在端口9092上进行侦听。测试您的Kafka服务器
让我们来看看Kafka进行简单的测试。首先,使用以下命令使用单个分区和一个副本(我们只有一个Kafka服务器)创建您的第一个主题:
/opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181
--replication -因子 1 --partitions 1 --topic danielTest您应该看到以下输出:创建主题“danielTest”
使用控制台生成器,我们现在将一些示例消息发布到我们新创建的Kafka主题:/opt/kafka/bin/kafka-console-producer.sh --broker -list
localhost:9092 - 主题 danielTest在提示中,输入主题的一些消息:这只是一个考验 键入消息 OK 在单独的选项卡中,我们现在将运行Kafka使用者命令以从Kafka读取数据并将我们提交给主题的消息显示到stdout
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap -server
本地主机:9092 --topic danielTest --from -beginning您应该看到提交给主题的相同消息:这只是一个考验
输入信息好安装ELK堆栈现在我们确定发布/订阅机制已经启动,让我们安装用于记录它的组件 - Elasticsearch,Kibana和Filebeat。首先下载并安装Elastic公共签名密钥:
wget -qO - | 须藤
apt-key add -添加存储库定义:echo “deb stable main” |
sudo tee -a /etc/apt/sources.list.d/elastic-6.x.list更新系统,然后安装Elasticsearch:sudo apt-get update && sudo apt-get install elasticsearch
使用以下命令运行Elasticsearch:sudo 服务 elasticsearch 开始
您可以使用以下cURL确保Elasticsearch正在运行:卷曲 “http:// localhost:9200”
您应该看到类似于此的输出:{
“名字”:“6YVkfM0”, “cluster_name”:“elasticsearch”, “cluster_uuid”:“8d8-GCYiQoOQMJdDrzugdg”, “版本”:{“数字”:“6.5.4”,“build_flavor”:“默认”,“build_type”:“deb”,“build_hash”:“d2ef93d”,“build_date”:“2018-12-17T21:17:40.758843Z”,“build_snapshot”:false,“lucene_version”:“7.5.0”,“minimum_wire_compatibility_version”:“5.6.0”,“minimum_index_compatibility_version”:“5.0.0”
},
“标语”:“你知道,搜索”}接下来,我们将安装Kibana:sudo apt-get install kibana
打开Kibana配置文件: /etc/kibana/kibana.yml,并确保定义了以下配置:server.port:5601
elasticsearch.url:“http:// localhost:9200”并且,启动Kibana:sudo 服务 kibana 开始
要安装Filebeat,请使用:sudo apt install filebeat
配置管道我将介绍两种将Kafka日志发送到ELK堆栈的方法 - 一种是使用Logz.io,另一种是将它们运送到您自己的ELK部署中。运到Logz.io
要将数据发送到Logz.io,Filebeat配置文件中需要进行一些调整。由于我们的侦听器处理解析,因此在这种情况下不需要使用Logstash。首先,您需要下载SSL证书才能使用加密:
wget
COMODORSADomainValidationSecureServerCA.crtsudo mkdir -p / etc / pki / tls / certs
sudo cp COMODORSADomainValidationSecureServerCA.crt
的/ etc / PKI / TLS /证书/配置文件应如下所示:filebeat.inputs :
-
类型:日志
路径:- /opt/kafka/logs/server.log
领域:
logzio_codec : plaintoken :输入: kafka_server环境:开发
fields_under_root :true
编码: utf-8 ignore_older : 3h 多线:模式:'\ [[0-9] {4} - [0-9] {2} - [0-9] {2} [0-9] {2}:[0-9] {2}:[ 0-9] {2},[0-9] {3} \]([Aa] lert | ALERT | [T | t]种族| TRACE | [D | d] ebug | DEBUG | [N | n] otice |注意| [I | I] NFO | INFO | [W | W] ARN(?:荷兰国际集团)|?WARN(?: ING)|?[E | E] RR(?:或)|?ERR ?(?:OR)|?[C | C] RIT(?:的iCal)|?CRIT(?: ICAL)|?[F | F]阿塔尔贝哈|致命| [S | S] EVERE |严重| EMERG (?:eNCY)|?[EE] MERG?(?:ency))”否定:是的匹配:之后
registry_file : / var / lib / filebeat / registry
输出:
logstash :主持人:[ “listener.logz.io:5015” ] ssl : certificate_authorities :[ '/etc/pki/tls/certs/COMODORSADomainValidationSecureServerCA.crt' ]
处理器:
- add_host_metadata :〜
- add_cloud_metadata :〜
关于配置的一些注意事项:
您可以从Logz.io中的常规设置页面检索Logz.io帐户令牌(单击右上角的齿轮)。
请务必使用 kafka_server 作为日志类型来应用自动解析。我建议在启动Filebeat之前验证YAML。您可以使用 此在线工具。或者,您可以使用Filebeat向导自动生成YAML文件(在Filebeat部分的UI中的Log Shipping下可用)。 保存文件并启动Filebeat:sudo service filebeat start
您应该在一两分钟后开始看到您的Kafka服务器日志出现在Logz.io中:卡夫卡服务器
运送到ELK
要将Kafka服务器日志发送到您自己的ELK,您可以使用Kafka Filebeat模块。该模块收集数据,解析它并在Kibana中定义Elasticsearch索引模式。要使用该模块,首先要定义日志文件的路径:
sudo vim /etc/filebeat/modules.d/kafka.yml.disabled
-
模块: kafka
日志: 启用:true #var.kafka_home: var.paths :- “/opt/kafka/logs/server.log”
启用模块并使用以下设置环境:
sudo filebeat模块启用kafka
sudo filebeat setup -e最后但同样重要的是,重启Filebeat:sudo service filebeat restart
一两分钟后,打开Kibana,您会发现定义了“filebeat- *”索引,并在Discover页面上显示Kafka服务器日志:Kibana发现页面
分析数据
那么 - 我们在寻找什么?使用Kafka服务器日志可以做些什么?应用于日志的解析会解析出一些重要的字段 - 特别是日志级别以及生成日志的Kafka类和日志组件。我们可以使用这些字段以各种方式监视和解决Kafka问题。
例如,我们可以创建一个简单的可视化来显示我们正在运行的Kafka服务器的数量:
数据可视化
或者我们可以创建一个可视化,根据级别为我们提供不同日志的细分:
数据可视化
同样,我们可以创建一个可视化,显示更详细的Kafka组件的细分:
数据可视化
最后,您将这些可视化和其他可视化放入一个仪表板中,以监控您的Kafka实例:
数据可视化
尾注
就像堆栈中的任何其他组件一样,Kafka应该被记录和监控。在Logz.io,我们使用一个多层监控系统,其中包括指标和日志,以确保我们的数据管道按预期运行。如前所述,Kafka服务器日志只是Kafka生成的一种日志类型,因此您可能希望探索将其他类型的数据发送到ELK进行分析。无论哪种方式,ELK都是一款功能强大的分析工具,可以帮助您解决问题。
上面的仪表板可用于ELK应用程序 - Logz.io的仪表板和可视化库。要部署它,只需打开ELK应用程序并搜索“Kafka”。ELK应用程序