简介
ELK 套装包括 ElasticSearch、LogStash 和 Kibana。 其中,ElasticSearch 是一个数据搜索引擎(基于 Apache Lucene)+分布式 NoSQL 数据库;LogStash 是一个消息采集转换器,类似 Syslog,可以接收包括日志消息在内的多种数据格式,然后进行格式转换,发送给后端继续处理;Kibana 是一个 Web 前段,带有强大的数据分析、整理和可视化功能。
是不是听起来就觉得十分强大!
一般情况下,ELK 套装的工作流程为
原始数据 -> LogStash -> ElasticSearch -> Kibana
。
网络流量信息采集协议常见包括 sFlow 和 NetFlow,两种协议都支持采集需要的网络信息,发送到指定的采集器(例如 netflow-analyzer、sFlowTrend 和 softflowd)。其中,sFlow 一般基于采样,NetFlow 则可以基于所有网包信息获取更为精确的信息。
这里展示如何使用 ELK 套装 + NetFlow 来实时可视化网络流量,基于 sFlow 的操作是类似的,也给出了关键步骤。
安装 ELK 套装
安装 ElasticSearch
步骤参考 http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/setup-repositories.html。可以选择源码编译或安装包安装。
以 CentOS 下使用安装包为例,首先添加证书。
$ rpm --import https://packages.elasticsearch.org/GPG-KEY-elasticsearch
添加源文件 /etc/yum.repos.d/elasticsearch.repo,内容为
[elasticsearch-1.4] name=Elasticsearch repository for 1.4.x packages baseurl=http://packages.elasticsearch.org/elasticsearch/1.4/centos gpgcheck=1 gpgkey=http://packages.elasticsearch.org/GPG-KEY-elasticsearch enabled=1
执行安装命令
$ yum update; yum install -y elasticsearch
如果开启了 iptables,需要打开相关的端口。
-A INPUT -p tcp -m state --state NEW -m tcp -m multiport --dports 9200:9300 -j ACCEPT -A INPUT -m pkttype --pkt-type multicast -j ACCEPT
启动 elasticsearch 服务,默认监听在 9200 端口,配置文件在
/etc/elasticsearch/elasticsearch.yml
。$ /etc/init.d/elasticsearch restart
配置 elasticsearch 服务随系统自动启动
$ chkconfig --add elasticsearch
此时,查询本地 9200 端口,会获取类似下面的反馈信息,表示服务启动成功。
$ curl -XGet localhost:9200 { "status" : 200, "name" : "Thunderball", "cluster_name" : "elasticsearch", "version" : { "number" : "1.4.4", "build_hash" : "c88f77ffc81301dfa9dfd81ca2232f09588bd512", "build_timestamp" : "2015-02-19T13:05:36Z", "build_snapshot" : false, "lucene_version" : "4.10.3"} , "tagline" : "You Know, for Search"}
为了让采集到的数据的类型能被 elasticsearch 正确处理,添加如下模板,自动对所有 logstash_netflow- 开头的索引采取指定的类型解析:
curl -XPUT http://localhost:9200/_template/logstash_netflow -d '{ "template" : "logstash_netflow-*", "settings": { "index.cache.field.type": "soft", "index.store.compress.stored": true }, "mappings" : { "_default_" : { "_all" : {"enabled" : false}, "properties" : { "@message": { "index": "analyzed", "type": "string" }, "@source": { "index": "not_analyzed", "type": "string" }, "@source_host": { "index": "not_analyzed", "type": "string" }, "@source_path": { "index": "not_analyzed", "type": "string" }, "@tags": { "index": "not_analyzed", "type": "string" }, "@timestamp": { "index": "not_analyzed", "type": "date" }, "@type": { "index": "not_analyzed", "type": "string" }, "netflow": { "dynamic": true, "path": "full", "properties": { "version": { "index": "analyzed", "type": "integer" }, "first_switched": { "index": "not_analyzed", "type": "date" }, "last_switched": { "index": "not_analyzed", "type": "date" }, "direction": { "index": "not_analyzed", "type": "integer" }, "flowset_id": { "index": "not_analyzed", "type": "integer" }, "flow_sampler_id": { "index": "not_analyzed", "type": "integer" }, "flow_seq_num": { "index": "not_analyzed", "type": "long" }, "src_tos": { "index": "not_analyzed", "type": "integer" }, "tcp_flags": { "index": "not_analyzed", "type": "integer" }, "protocol": { "index": "not_analyzed", "type": "integer" }, "ipv4_next_hop": { "index": "analyzed", "type": "ip" }, "in_bytes": { "index": "not_analyzed", "type": "long" }, "in_pkts": { "index": "not_analyzed", "type": "long" }, "out_bytes": { "index": "not_analyzed", "type": "long" }, "out_pkts": { "index": "not_analyzed", "type": "long" }, "input_snmp": { "index": "not_analyzed", "type": "long" }, "output_snmp": { "index": "not_analyzed", "type": "long" }, "ipv4_dst_addr": { "index": "analyzed", "type": "ip" }, "ipv4_src_addr": { "index": "analyzed", "type": "ip" }, "dst_mask": { "index": "analyzed", "type": "integer" }, "src_mask": { "index": "analyzed", "type": "integer" }, "dst_as": { "index": "analyzed", "type": "integer" }, "src_as": { "index": "analyzed", "type": "integer" }, "l4_dst_port": { "index": "not_analyzed", "type": "long" }, "l4_src_port": { "index": "not_analyzed", "type": "long" } }, "type": "object" } } } } }'
安装 LogStash
从
http://www.elasticsearch.org/overview/logstash/download/
下载源码包或安装包。
以 CentOS 下使用安装包为例:
$ yum install https://download.elasticsearch.org/logstash/logstash/packages/centos/logstash-1.4.2-1_2c0f5a1.noarch.rpm
默认安装到
/opt/logstash/
目录下,配置文件在 /etc/logstash/
目录下。
也可以通过添加 yum 源的方式。
$cat /etc/yum.repos.d/logstash.repo [logstash-1.4] name=logstash repository for 1.4.x packages baseurl=http://packages.elasticsearch.org/logstash/1.4/centos gpgcheck=1 gpgkey=http://packages.elasticsearch.org/GPG-KEY-elasticsearch enabled=1
测试安装是否成功
$ /opt/logstash/bin/logstash -e 'input { stdin { }} output { stdout {}} '
此时,随意从控制台输入内容后回车,会作为一条 log,显示出来。
LogStash 支持配置文件,下面通过配置文件配置 LogStash 将收到的 NetFlow 消息进行格式转换后发送到本机的 ElasticSearch。
假定 NetFlow 消息会被采集器发送到本地的 2055 端口,将消息转换后发送给本地的 elastic search 主机,索引名称为 "logstash_netflow-%{+YYYY.MM.dd}"。
$ cat /etc/logstash/conf.d/netflow.conf input { udp { port => 2055 codec => netflow { definitions => "/opt/logstash/lib/logstash/codecs/netflow/netflow.yaml"}} } output { stdout { codec => rubydebug} elasticsearch { index => "logstash_netflow-%{+YYYY.MM.dd}" host => localhost}}
之后重启 logstash 服务。
安装 Kibana
下载压缩包,并解压。
$ axel https://download.elasticsearch.org/kibana/kibana/kibana-4.0.0-beta3.tar.gz $ tar xzvf kibana-4.0.0-beta3.tar.gz $ cd kibana-4.0.0-beta3
修改 config/kibana.yml,指定 elasticsearch 所在地址,默认认为在本地的 9200 端口。
运行 kibana。
$ ./bin/kibana
启动成功后,会打印如下的信息
The Kibana Backend is starting up... be patient {"@timestamp":"2015-01-08T13:54:43+08:00","level":"INFO","name":"Kibana","message":"Kibana server started on tcp://0.0.0.0:5601 in production mode."}
通过浏览器访问本地的 5601 端口,因为没有数据,所以这个时候看不到任何数据信息。如果有了采集到的数据后,选择
logstash_netflow-*
格式的索引,会查询到所有的 NetFlow 数据信息。配置数据采集
配置 OpenvSwitch
利用下面的命令,创建一个新的网桥 ovs-br,作为要进行抓包的网桥,也可以使用已有网桥。
$ ovs-vsctl add-br ovs-br
并配置一个 IP 地址给网桥内部接口,否则后面启动 Docker 服务会报错(检查避免与 Docker 默认网桥冲突)。
$ ifconfig ovs-br 172.17.42.1/24
下面可以选择使用 NetFlow 采集还是 sFlow 采集,需要对应调整 logstash 中的配置。
打开 NetFlow
COLLECTOR_IP=127.0.0.1 COLLECTOR_PORT=2055 TIMEOUT=10 sudo ovs-vsctl -- set Bridge ovs-br netflow=@nf \ -- --id=@nf create NetFlow targets=\"${COLLECTOR_IP}:${COLLECTOR_PORT}\" active-timeout=${TIMEOUT}
清除 netflow 规则。
$ sudo ovs-vsctl clear Bridge ovsbr netflow
打开 sFlow
$ COLLECTOR_IP=127.0.0.1 $ COLLECTOR_PORT=6343 $ AGENT=eth0 $ HEADER=128 $ SAMPLING=512 $ POLLING=10
其中,AGENT 是从本地的哪个网卡发出流量到采集器。
$ sudo ovs-vsctl -- \ --id=@sflow create sflow agent=${AGENT} target=\"${COLLECTOR_IP}:${COLLECTOR_PORT}\" header=${HEADER} \ sampling=${SAMPLING} polling=${POLLING} \ -- set bridge ovs-br sflow=@sflow
清除 sflow 规则。
$ sudo ovs-vsctl clear Bridge ovsbr sflow
使用 Docker 容器生成流量
这里可以使用 Docker 容器来模拟主机发送流量。
在启动 Docker 服务的时候,使用 -b BRIDGE 或 --bridge=BRIDGE 来指定使用的网桥。或者修改 Docker 配置文件来添加参数并重启服务。
Ubuntu 下配置文件为 /etc/default/docker,重启服务命令为
service docker restart
。
CentOS 下配置文件为 /etc/sysconfig/docker,重启服务命令为
/etc/init.d/docker restart
。
搜索带有 iperf 的镜像。
$ docker search iperf NAME DESCRIPTION STARS OFFICIAL AUTOMATED moutten/iperf Dockerfile to setup an iperf server for ne... 0 [OK] xeor/iperf-server 0 [OK] gdanii/iperf Ubuntu 14.04 with iperf 0 chengping/iperf docker test with iperf 0 m2i3/iperf 0 kiratomato/iperf 0 brendanburns/iperf 0 mbennett/iperf_client for internal testing 0 mbennett/iperf_server 0 mbennett/iperfbase 0 mbennett/iperfserver 0
这里可以任意选择一个,例如下载 gdanii/iperf 镜像。
$ docker pull gdanii/iperf
启动两个容器,一个当作服务端(地址为 172.17.0.2)
$ iperf -s
一个当作客户端(地址为 172.17.0.3),对服务端发起请求。
$ iperf -c 172.17.0.2
这个时候刷新 kibana 页面,就可以看到收集到的每条 flow 的信息了。一条 NetFlow 的信息可能长成下面的样子。
{ "_index": "logstash_netflow-2015.04.28", "_type": "logs", "_id": "tnUt34s8QCyN-E1mc46wqQ", "_score": 1, "_source": { "@version": "1", "@timestamp": "2015-04-28T03:02:27.760Z", "netflow": { "version": "5", "flow_seq_num": "190", "engine_type": "9", "engine_id": "9", "sampling_algorithm": "0", "sampling_interval": "0", "flow_records": "2", "ipv4_src_addr": "10.0.0.2", "ipv4_dst_addr": "10.0.0.4", "ipv4_next_hop": "0.0.0.0", "input_snmp": "2", "output_snmp": "3", "in_pkts": "10", "in_bytes": "980", "first_switched": "2015-04-28T02:46:57.760Z", "last_switched": "2015-04-28T03:02:27.759Z", "l4_src_port": "0", "l4_dst_port": "2048", "tcp_flags": "0", "protocol": "1", "src_tos": "0", "src_as": "0", "dst_as": "0", "src_mask": "0", "dst_mask": "0" }, "host": "9.186.100.88" }, "fields": { "netflow.last_switched": [ 1430190147759 ], "@timestamp": [ 1430190147760 ], "netflow.first_switched": [ 1430189217760 ] } }
这些信息作为记录被存放到了 elastic search 中。可以通过自定义可视化集合来实时监控各个主机的进出流量情况、流量中各种应用的占比情况等等。
也可以使用 kvm 启动两个 vm 挂载到 ovs-br 上进行测试或者利用本地主机作为客户端或服务端之一。这里不再赘述。
当然, ELK 的功能要远大于此,关键在于如何灵活运用,可以通过阅读官方文档进行学习。
参考
- ElasticSearch: http://elasticsearch.org
- http://www.areteix.net/blog/2013/08/network-flow-monitoring-with-open-vswitch/
- http://blogs.cisco.com/security/step-by-step-setup-of-elk-for-netflow-analytics
- netflow-analyzer: http://www.solarwinds.com/products/freetools/netflow-analyzer.aspx
- sFlowTrend: http://www.inmon.com/products/sFlowTrend.php
- softflowd: https://code.google.com/p/softflowd