• 对于注定会优秀的人来说,他所需要的,只是时间----博主
  • 手懒得,必受贫穷,手勤的,必得富足----《圣经》
  • 帮助别人,成就自己。愿君在本站能真正有所收获!
  • 如果你在本站中发现任何问题,欢迎留言指正!
  • 宝剑锋从磨砺出,梅花香自苦寒来!
  • 本站开启了防爆破关小黑屋机制,如果您是正常登录但被关进小黑屋,请联系站长解除!

<十四>ELK-学习笔记–elk采集日志的流程引入kafka集群的配置流程

ELK eryajf 4个月前 (12-14) 4045°C 已收录 0个评论
本文预计阅读时间 36 分钟

当日志量足够大的时候,就有必要使用kafka了,这里记录一下整个配置的流程。

日志采集的流程简示如下:

filebeat–> kafka–> logstash–> elasticsearch–> kiabana

版本如下:

  • filebeat:6.5.4
  • zookeeper-3.4.10
  • kafka:kafka_2.12-2.0.0
  • logstash:6.5.4
  • elasticsearch:6.5.4
  • kiabana:6.5.4

看上去应该先部署filebeat,这里因为熟悉了每个阶段组件所负责的内容,因此可以根据依赖顺序,来部署。

主机 组件
192.168.3.3–node1 elk,zookeeper,kafka
192.168.3.4–node2 nginx,zookeeper,kafka
192.168.3.5–node3 zookeeper,kafka

1,准备工作。

  • 安装一些依赖包。
yum -y install lrzsz vim curl wget java ntpdate && ntpdate -u cn.pool.ntp.org
  • 添加hosts。
echo "192.168.3.3 elk-node1" >> /etc/hosts

这里java环境是非常重要的,如果不通过yum安装,源码方式也是可以的。但要注意配置好环境变量。

  • 配置yum源。

添加源:

cat > /etc/yum.repos.d/elk.repo << EOF
[elasticsearch-6.x]
name=Elasticsearch repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF

导入key:

rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

如果执行这一步报错,则有可能是主机时间问题,可以同步一下主机时间,在执行!

2,kafka集群。

将三台主机串起来,形成集群。

1,安装zookeeper。

首先安装配置zookeeper,三台主机都要操作:

wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
tar xf zookeeper-3.4.10.tar.gz -C /usr/local
cd /usr/local
mv zookeeper-3.4.10/ zookeeper

配置zookeeper,三台配置一样,在此一一列出:

node1

$ egrep -v "^$|^#" conf/zoo.cfg

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/logs
clientPort=2181
maxClientCnxns=1000
autopurge.snapRetainCount=6
autopurge.purgeInterval=3
server.1=192.168.3.3:2888:3888
server.2=192.168.3.4:2888:3888
server.3=192.168.3.5:2888:3888

node2

$ egrep -v "^$|^#" conf/zoo.cfg

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/logs
clientPort=2181
maxClientCnxns=1000
autopurge.snapRetainCount=6
autopurge.purgeInterval=3
server.1=192.168.3.3:2888:3888
server.2=192.168.3.4:2888:3888
server.3=192.168.3.5:2888:3888

node3

$ egrep -v "^$|^#" conf/zoo.cfg

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/logs
clientPort=2181
maxClientCnxns=1000
autopurge.snapRetainCount=6
autopurge.purgeInterval=3
server.1=192.168.3.3:2888:3888
server.2=192.168.3.4:2888:3888
server.3=192.168.3.5:2888:3888

分别在三台机器创建对应目录:

mkdir  -p /usr/local/zookeeper/data
mkdir -p /usr/local/zookeeper/logs

添加主机ID标识:

node1

echo "1" > /usr/local/zookeeper/data/myid

node2

echo "2" > /usr/local/zookeeper/data/myid

node3

echo "3" > /usr/local/zookeeper/data/myid

分别添加启动文件,此文件一样,故而不重复列出:

$ cat /lib/systemd/system/zookeeper.service

[Unit]
Description=zookeeper.service
After=network.target

[Service]
Type=simple
ExecStart=/usr/local/zookeeper/bin/zkServer.sh start-foreground
PrivateTmp=true

[Install]
WantedBy=multi-user.target

启动zookeeper:

systemctl start zookeeper
systemctl status zookeeper
systemctl enable zookeeper

2,安装kafka。

然后安装kafka,分别在三台主机进行如下配置:

$ wget https://archive.apache.org/dist/kafka/2.0.0/kafka_2.12-2.0.0.tgz
$ tar xf kafka_2.12-2.0.0.tgz -C /usr/local
$ cd /usr/local
$ mv kafka_2.12-2.0.0/ kafka

三台机器配置文件略有不同,因此一一列出:

node1

$ cat config/server.properties

#
#------------------------------------
# eryajf Kafka Config                  |
#------------------------------------

# Common configuration
broker.id=19
port=9092
host.name=192.168.3.3
log.dirs=/usr/local/kafka/logs/kafka
listeners=PLAINTEXT://192.168.3.3:9092
advertised.host.name=192.168.3.3

# Log configuration
num.partitions=6
num.recovery.threads.per.data.dir=1
message.max.bytes=1000000
auto.create.topics.enable=true
auto.leader.rebalance.enable=true
compression.type=snappy
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.retention.hours=336
log.flush.interval.ms=10000
log.flush.interval.messages=20000
log.flush.scheduler.interval.ms=2000
log.roll.hours=336
log.retention.check.interval.ms=300000
log.segment.bytes=1073741824

# Replication configurations
num.replica.fetchers=2
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
replica.lag.time.max.ms=10000
controller.socket.timeout.ms=30000
controller.message.queue.size=10

# Socket server configuration
num.io.threads=8
num.network.threads=4
socket.request.max.bytes=104857600
socket.receive.buffer.bytes=1048576
socket.send.buffer.bytes=1048576
queued.max.requests=100
fetch.purgatory.purge.interval.requests=100
producer.purgatory.purge.interval.requests=100

# Topic configuration
delete.topic.enable=true

# ZK configuration
zookeeper.connect=192.168.3.4:2181,192.168.3.5:2181,192.168.3.3:2181
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000

max.poll.interval.ms=300000
session.timeout.ms=300000

node2

$ cat config/server.properties

#
#------------------------------------
# eryajf Kafka Config                  |
#------------------------------------

# Common configuration
broker.id=20
port=9092
host.name=192.168.3.4
log.dirs=/usr/local/kafka/logs/kafka
listeners=PLAINTEXT://192.168.3.4:9092
advertised.host.name=192.168.3.4

# Log configuration
num.partitions=6
num.recovery.threads.per.data.dir=1
message.max.bytes=1000000
auto.create.topics.enable=true
auto.leader.rebalance.enable=true
compression.type=snappy
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.retention.hours=336
log.flush.interval.ms=10000
log.flush.interval.messages=20000
log.flush.scheduler.interval.ms=2000
log.roll.hours=336
log.retention.check.interval.ms=300000
log.segment.bytes=1073741824

# Replication configurations
num.replica.fetchers=2
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
replica.lag.time.max.ms=10000
controller.socket.timeout.ms=30000
controller.message.queue.size=10

# Socket server configuration
num.io.threads=8
num.network.threads=4
socket.request.max.bytes=104857600
socket.receive.buffer.bytes=1048576
socket.send.buffer.bytes=1048576
queued.max.requests=100
fetch.purgatory.purge.interval.requests=100
producer.purgatory.purge.interval.requests=100

# Topic configuration
delete.topic.enable=true

# ZK configuration
zookeeper.connect=192.168.3.4:2181,192.168.3.5:2181,192.168.3.3:2181
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000

max.poll.interval.ms=300000
session.timeout.ms=300000

node3

$ cat config/server.properties

#
#------------------------------------
# eryajf Kafka Config                  |
#------------------------------------

# Common configuration
broker.id=21
port=9092
host.name=192.168.3.5
log.dirs=/usr/local/kafka/logs/kafka
listeners=PLAINTEXT://192.168.3.5:9092
advertised.host.name=192.168.3.5

# Log configuration
num.partitions=6
num.recovery.threads.per.data.dir=1
message.max.bytes=1000000
auto.create.topics.enable=true
auto.leader.rebalance.enable=true
compression.type=snappy
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.retention.hours=336
log.flush.interval.ms=10000
log.flush.interval.messages=20000
log.flush.scheduler.interval.ms=2000
log.roll.hours=336
log.retention.check.interval.ms=300000
log.segment.bytes=1073741824

# Replication configurations
num.replica.fetchers=2
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
replica.lag.time.max.ms=10000
controller.socket.timeout.ms=30000
controller.message.queue.size=10

# Socket server configuration
num.io.threads=8
num.network.threads=4
socket.request.max.bytes=104857600
socket.receive.buffer.bytes=1048576
socket.send.buffer.bytes=1048576
queued.max.requests=100
fetch.purgatory.purge.interval.requests=100
producer.purgatory.purge.interval.requests=100

# Topic configuration
delete.topic.enable=true

# ZK configuration
zookeeper.connect=192.168.3.4:2181,192.168.3.5:2181,192.168.3.3:2181
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000

max.poll.interval.ms=300000
session.timeout.ms=300000

分别创建日志目录:

$ mkdir -p /usr/local/kafka/logs/kafka

配置启动文件:

$ cat /lib/systemd/system/kafka.service

[Unit]
Description=kafka.service
After=network.target remote-fs.target zookeeper.service

[Service]
Type=simple
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties

[Install]
WantedBy=multi-user.target

启动:

systemctl start kafka
systemctl status kafka
systemctl enable kafka

查看topic:

$ cat list.sh
bin/kafka-topics.sh --list --zookeeper localhost:2181

$ chmod +x list.sh

$ ./list.sh
--from-beginning
__consumer_offsets
nginx-access

查看topic的内容:

$ cat topic.sh
bin/kafka-console-consumer.sh --bootstrap-server  localhost:9092 --topic $1  --from-beginning

$ chmod +x topic.sh

$ ./topic.sh nginx-access  #即可看到对应topic的内容

3,配置总代理。

如果是生产使用,可以在前端挂一个lb,现在是测试,因此使用NGINX进行一波代理。这里使用了一个四层代理,所以如下配置需要放在http区块儿之外:

stream {
    upstream test_codis_proxy_1 {
        server 192.168.3.3:9092 max_fails=3 fail_timeout=2s weight=10;
        server 192.168.3.4:9092 max_fails=3 fail_timeout=2s weight=10;
        server 192.168.3.5:9092 max_fails=3 fail_timeout=2s weight=10;
    }
    server {
        listen 9095;
        proxy_connect_timeout 3s;
        proxy_timeout 60s;
        proxy_buffer_size 16k;
        proxy_pass test_codis_proxy_1;
    }
}

配置完成之后就可以使用了。

3,filebeat。

安装应用。

yum -y install filebeat-6.5.4

简单配置。

$ cd /etc/filebeat
$ cat filebeat.yml

filebeat.config_dir: /etc/filebeat/conf.d/
filebeat.shutdown_timeout: 5s
fields_under_root: true
fields:
  ip: "10.3.2.12"
  groups: nginx

output.kafka:
  enabled: true
  hosts: ["192.168.3.4:9095"]
  topic: "%{[log_topic]}"
  worker: 2
  max_retries: 3
  bulk_max_size: 2048
  timeout: 30s
  broker_timeout: 10s
  channel_buffer_size: 256
  keep_alive: 30
  compression: gzip
  max_message_bytes: 1000000
  required_acks: 1

logging.level: warning
#logging.level: debug

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false

所有采集日志的配置文件放在 /etc/filebeat/conf.d/目录下,这里创建一个采集NGINX日志的为例。

$ mkdir conf.d

$ cat conf.d/nginx.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /data/log/tmp.log
  fields_under_root: true
  json.keys_under_root: true
  json.overwrite_keys: true
  json.message_key: message
  ignore_older: 4h
  scan_frequency: 10s
  clean_inactive: 5h
  close_inactive: 4h
  clean_removed: true
  close_removed: true
  close_renamed: true
  tail_files: true
  fields:
    type: log
    log_topic: nginx-access

然后启动应用。

systemctl restart filebeat
systemctl status filebeat
systemctl enable filebeat

4,logstash。

安装应用:

yum -y install logstash-6.5.4

配置应用:

$ cat /etc/logstash/logstash.yml

pipeline.workers: 2
pipeline.output.workers: 2
#每次发送的事件数
pipeline.batch.size: 800

http.host: "0.0.0.0"
log.level: warn
path.logs: /var/log/logstash
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.url: ["http://192.168.3.3:9200"]
xpack.monitoring.collection.interval: 10s
slowlog.threshold.warn: 2s
slowlog.threshold.info: 1s
slowlog.threshold.debug: 500ms
slowlog.threshold.trace: 100ms

添加NGINX日志配置:

$ cat /etc/logstash/conf.d/nginx.yml

input {
  kafka {
      bootstrap_servers  => "192.168.3.4:9095"
      group_id          => "nginx"
      consumer_threads => 6
      topics            => "nginx-access"
      codec             => "json"
      client_id => "nginx"
   }
}

filter {
    geoip {
        source => "remote_addr"
        fields => ["city_name", "country_code2", "country_name", "latitude", "longitude", "region_name"]
        remove_field => ["[geoip][latitude]", "[geoip][longitude]"]
    }
    json {
        source => "message"
        target => "jsoncontent"
    }
}

output {
   elasticsearch {
      hosts => ["http://192.168.3.3:9200"]
      index => "logstash-nginx-%{+YYYY.MM}"
   }
}

添加从管道引用:

$ cat /etc/logstash/pipelines.yml

# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
#   https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html

- pipeline.id: nginx
  path.config: "/etc/logstash/conf.d/nginx.yml"

启动应用:

systemctl start logstash
systemctl status logstash
systemctl enable logstash

5,elasticsearch。

安装应用:

yum -y install elasticsearch-6.5.4

配置应用:

$ cat /etc/elasticsearch/elasticsearch.yml

cluster.name: my-application
node.name: node-1
path.data: /logs/elasticsearch6
path.logs: /logs/elasticsearch6/log
network.host: 0.0.0.0
http.port: 9200
discovery.zen.ping.unicast.hosts: ["elk-node1"]
discovery.zen.minimum_master_nodes: 1
xpack.security.enabled: false

创建对应目录:

mkdir -p /logs/elasticsearch6/log
cd /logs
chown -R elasticsearch.elasticsearch elasticsearch6/

启动:

systemctl start elasticsearch
systemctl status elasticsearch
systemctl enable elasticsearch

6,kibana。

安装应用:

yum -y install kibana-6.5.4

配置应用:

$ cat /etc/kibana/kibana.yml

server.port: 5601
server.host: "0.0.0.0"
elasticsearch.url: "http://192.168.3.3:9200"
kibana.index: ".kibana"

配置发现,当我把上边配置写入kibana,然后启动,看状态是正常的,但是访问起来总是会报 Kibana server is not ready yet,这似乎是一个经典的错误,却又让人无从下手解决。经过我的一些测试,获得以小经验。

那就是,此处配置文件,不建议直接把原来配置内容清空,然后添加当前内容的方式,尽管在上边配置elasticsearch以及logstash的时候,都这么做了,两个应用都没有发生什么奇怪的问题,但是这在kibana这里,似乎是不可行的,于是如果已经陷入上边那个报错之中了,那么我的建议是首先把当前kibana卸载,然后重新安装,接着在原有配置文件中,比照着上边的四项配置文件进行更改即可,配置完毕之后,启动kibana,等个两三分钟之后再访问会发现,问题就神奇的消失了。

启动:

systemctl start kibana
systemctl status kibana
systemctl enable kibana

然后浏览器访问kibana,就能够看到对应索引以及日志了。

7,两个插件。

使用过程中,可以添加两个插件,以便于日常管理与维护,一个是kafka管理,一个es的监控。

管理组件,直接通过docker来部署。

1,kafka-manager

$ cat docker-compose.yml
version: '2'
services:
  kafka-manager:
    image: sheepkiller/kafka-manager                ## 镜像:开源的web管理kafka集群的界面
    environment:
        ZK_HOSTS: 192.168.3.3                   ## 修改:宿主机IP
    ports:
      - "9000:9000"                                 ## 暴露端口

启动之后访问即可。

2,es监控。

docker run -d -p 9001:9000 lmenezes/cerebro

然后浏览器访问即可。

注意:

  • 注意kafka版本问题。https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html

weinxin
扫码订阅本站,第一时间获得更新
微信扫描二维码,订阅我们网站的动态,另外不定时发送WordPress小技巧,你可以随时退订,欢迎订阅哦~

二丫讲梵 , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权 , 转载请注明<十四>ELK-学习笔记–elk采集日志的流程引入kafka集群的配置流程
喜欢 (2)
[如果想支持本站,可支付宝赞助]
分享 (0)
eryajf
关于作者:
学无止境,我愿意无止境学。书山有路,我愿意举身投火,淬炼成金!永远不要忘记,激情的奋进,就是美好的未来!

您必须 登录 才能发表评论!