流量分析的瑞士军刀:Zeek


介绍

概览

Zeek是一个开源的、被动网络流量分析软件。它主要被用作安全监测设备来检查链路上的所有流量中是否有恶意活动的痕迹。但更普遍地,Zeek支持大量安全领域外的流量分析任务,包括性能测量和帮助排查问题。

在一个地方部署Zeek,将会得到大量不同类型的日志文件,它们详细记录了当前网络中的活动信息。这些日志不仅包含了当前网络中每一个连接的详细记录,而且还包含了应用层信息,比如,所有的HTTP会话以及他们的请求URI、关键头部字段、MIME类型和服务器响应;DNS请求和响应;SSL证书;SMTP会话的关键信息;诸如此类。默认情况下,Zeek把这些信息写入以tab分隔的日志文件中以便其它软件进行后续的处理。但用户也可以选择其它的输出格式和后端存储,比如数据库。

除了日志外,Zeek还内置了许多用于一系列分析和检测任务的功能,包括从HTTP会话中提取文件,通过外部接口检测恶意软件,报告网络上看到的易受攻击的软件版本,识别流行的web应用,检测SSH爆破,验证SSL证书链,等等。

但是,理解Zeek的关键在于意识到即使系统已经内置了如此强大的功能,它本质上还是一个用于流量分析的完全可自定义、可修改的平台:Zeek提供给用户一种特定领域的、图灵完全的脚本语言用于表达任意的分析任务。概念上,你可以认为Zeek是“特定领域的Python”(或者Perl):就像Python一样,系统内置了大量的功能(“标准库”),但你并没有被局限于系统附带的功能,而是可以通过编写你自己的代码来使用Zeek。事实上,所有Zeek的默认分析,包括所有的日志记录,是这些脚本的结果;没有硬编码的特定分析在系统的核心代码中。

Zeek可以运行在标准硬件上,因此提供了相对于昂贵的专有方案的低成本替代方案。除了价格外,Zeek在能力方面也远远地走在了其它网络监测工具的前面,它们始终都只能执行一些硬编码的分析任务。我们尤其要强调Zeek并不是传统意义上的基于特征的入侵检测系统(IDS)。尽管它确实支持这样的功能,但Zeek的脚本语言实际上极大地 丰富了寻找恶意活动的方法,包括语义误用、异常检测以及行为分析。

许多地方部署Zeek是为了保护它们的网络基础设施,包括许多大学、研究机构、超算中心、开放科学组织以及大公司。Zeek主要关注于高速率、大流量的网络监测,而且越来越多的地方正在使用系统来监测它们的10GE网络,其中一些甚至已经开始使用100GE的链路。Zeek通过可扩展的负载均衡来达到如此高的性能:这些地方通常运行“Zeek集群”——一个高速率的前端负载均衡设备分发流量到多个后端PC,每个PC运行Zeek实例来分析它们那份流量。中央管理系统负责协调进程,在后端之间同步状态以及向操作人员提供一个中央管理界面用配置和查看聚合后的日志。Zeek的管理框架——ZeekControl——支持开箱即用的集群配置。

特性

Zeek通过它的脚本语言支持一系列的分析。然而即使不做修改它也自带了一整套功能。

  • 部署

    • 运行在标准硬件上的类Unix系统中(包括Linux、FreeBSD和MacOS)。
    • 连接网络分流器或者监控端口的完全被动式的分析。
    • 使用标准的libpcap接口用于捕获数据包。
    • 实时和离线分析。
    • 对于大规模部署的集群支持。
    • 对于单独和集群部署的统一管理框架。
    • 使用BSD开源协议。
  • 分析

    • 用于离线分析和取证的全面的活动日志。
    • 端口无关的应用层协议分析。
    • 支持许多应用层协议(包括DNS、FTP、HTTP、IRC、SMTP、SSH、SSL)。
    • 通过应用层协议传输的文件内容分析,包括MD5/SHA1指纹计算。
    • 全面的IPv6支持。
    • 隧道检测和分析(包括Ayiya、Teredo、GTPv1)。Zeek解开隧道的封装然后继续分析它们的内容,就好像没有隧道一样。
    • 协议分析过程中广泛的完整性检查。
    • 支持IDS形式的模式匹配。
  • 脚本语言

    • 能够表达任意分析任务的图灵完全语言。
    • 基于事件的编程模型。
    • 特地领域的数据类型,比如IP地址(透明地处理IPv4和IPv6)、端口和计时器。
    • 广泛支持跟踪和管理随着时间变化的网络状态。
  • 接口

    • 默认输出到结构化的ASCII日志中。
    • 可选的后端有ElasticSearch和DataSeries。其它的数据库接口还在计划中。
    • 外部输入实时集成到分析中。数据库的实时输入还在计划中。
    • 和外部程序交换Zeek事件的C函数库。实现了Perl、Python和Ruby的绑定。
    • 能够在脚本语言中触发任意的外部进程。

历史

Zeek History Timeline

Zeek的历史时间线(点击以放大)

Zeek的历史比大多数人意识到的还要悠久(以前叫做“Bro”,不是“Zeek”)。Vern Paxson 于20年前设计和实现了最初的版本。Vern在1995年开始编写代码,当时他还是 Lawrence Berkeley National Laboratory (LBNL) 的一个研究员。伯克利实验室在1996年开始了实际的部署,然后USENIX Security Symposium杂志在1998年发表了Bro的最初的论文(后来在2003年的 期刊论文 中进行了完善改进)。2003年, National Science Foundation (NSF) 开始支持 International Computer Science Institute (ICSI) Bro的研究和开发,现在Vern正在领导ICSI下的 Networking and Security 小组 。在过去的这些年里,一个由ICSI研究者和学生组成的不断增长的团队不断为Zeek添加新的功能,而且LBNL仍然在 Department of Energy (DOE) 的资助下继续支持。

许多Zeek的能力都起源于学术研究项目,研究结果经常发表在顶级会议上。但是,Zeek成功的关键是从一开始就弥合了传统的学术研究和实际应用之间的裂隙,这为研究提供了坚实的基础来保证构想的方案代表了真实世界的挑战。甚至,随着Zeek用户群体的增长,以研究为中心的开发模式最终成为了系统演进的瓶颈:研究经费并不愿意支持软件开发和维护这样平凡的事情,即使这些事情对于用户体验非常重要。虽然Zeek的能力总是远在传统的系统之上,但是一次成功的部署过去常常需要杰出的专业技术,而且还要投入大量时间与Zeek陡峭的学习曲线做斗争。在2010年,NSF开始着手解决这个问题,通过授予ICSI权限,让它从SDCI项目脱离出来以完全致力于Zeek的开发工作。在NSF的支持下, National Center for Supercomputing Applications (NCSA) 加入团队成为了主要的伙伴,然后项目从2.0版本开始修复许多用户反映的问题。自从这个版本发布以来,Zeek在不同环境下的部署数量经历了巨大的增长,现在Zeek团队正在此基础上继续努力增强系统的能力以面对未来网络的挑战。

架构

Zeek Architecture

Zeek的内部架构。

架构上,Zeek分为两层。它的 事件引擎 (或者 core )把进入的数据包流简化为一系列高级的 事件 。这些事件以策略无关的形式反映了网络活动,比如,他们描述了 什么 已经出现,而不是 为什么 ,或者是否它是特别的。举个例子,网络中的每一个HTTP请求会生成一个相应的 http_request 事件,它包含了涉及到的IP地址和端口、请求的URL以及使用的HTTP版本。但是这个事件并没有传递出更进一步的 解释 ,比如,URI是否和恶意网站有关系。

更高级的语义实际上是由Zeek的第二个主要组件实现的,它负责执行一系列使用Zeek脚本语言编写的 事件处理 程序。这些脚本能够表达安全策略,比如,当监测设备检测到不同类型的活动时该采取什么活动。更普遍地,它们能够从输入流量中得到任何想要的属性和统计数据。Zeek语言内置了许多特定领域的类型和支持功能;而且,更重要的是,允许脚本维护时间状态,从而使得它们能够对不同会话和主机之间的通信进行追踪和关联。Zeek脚本能够生成实时的警报和执行任意的外部程序,比如,触发对一次攻击的积极响应。

安装

Zeek 的安装还是比较简单的,笔者主要是在 Mac 上以及 Linux 上安装。这两个操作系统的安装方式还是比较类似的。对于 Linux 而言,需要安装一些依赖包:

sudo yum install cmake make gcc gcc-c++ flex bison libpcap-devel openssl-devel python-devel swig zlib-devel

这里我有遇到一个问题就是可能你的 Redhat 镜像源里面没有包含 libpcap-devel,因为这个包在可选的范围内,而内网的服务器又没有互联网连接。可以通过手工下载相应版本的 libpcap 以及 libpcap-devel 即可。

Mac 上需要的依赖更少一点,首先需要确保安装了 xcode-select,如果没有安装,可以通过 xcode-select --install 来进行安装。Mac 上只需要安装依赖 cmake, swig, openssl, bison 即可,可以通过 Homebrew 来进行安装。

依赖包安装完毕之后就可以安装 Zeek,其实是可以通过包管理工具来进行安装的,不过这里我推荐使用基于源码的安装方式,安装比较简单而且还容易排查问题。从 Zeek 的 Github Release 即可下载源码包,目前我安装的是 3.0.0 版本,注意一点是,如果使用最新的版本,可能需要 7.0 以上版本的 cmake,因为需要 C++17 的语言特性。而一般镜像源默认的 cmake 版本是4+版本,所以如果你的服务器也无法上互联网,建议可以安装 3.0.0 版本。

./configure & make & make install

安装使用上面的命令就可以了,不过 make 的时间还是比较长的,这个取决于你机器的性能,不过一般安装还是需要半个小时到一个小时,这也是因为 C++ 编译速度比较慢的原因。

集群安装

集群安装的方式和单机的方式不太一样。之前在测试环境使用的都是单机模式,集群则可以管理多个实例,后来我也尝试了通过集群的方式来进行安装。如果需要配置集群的话,建议安装 PF_RING,PF_RING 可以加速网络包的速度。对于 Zeek 集群上的每个 worker 都是需要安装 PF_RING,但只需要在 manager 上安装 Zeek 就可以了,可以通过 zeekctl 在其它 worker 上安装 Zeek。不过需要确保可以通过 ssh 到其它 woker 机器上,可以通过公钥的形式来实现,将 manager 的公钥放到其它 worker 的 authorized_keys 中。

PF_RING 的安装步骤相对来说多了一些,但也是按照说明安装即可。和上面的单机安装方式不同的是集群安装的方式的时候,安装 Zeek 需要配置前缀。

安装 PF_RING

tar xvzf PF_RING-5.6.2.tar.gz
cd PF_RING-5.6.2/userland/lib
./configure --prefix=/opt/pfring
make install
cd ../libpcap
./configure --prefix=/opt/pfring
make install
cd ../tcpdump-4.1.1
./configure --prefix=/opt/pfring
make install
cd ../../kernel
make
make install
modprobe pf_ring enable_tx_capture=0 min_num_slots=32768

安装 Zeek

./configure --with-pcap=/opt/pfring
make 
make install

确保 Zeek 正确关联到了 PF_RING 中的 libpcap 库中

ldd /usr/local/zeek/bin/zeek | grep pcap
      libpcap.so.1 => /opt/pfring/lib/libpcap.so.1 (0x00007fa6d7d24000)

接着就是通过 PF_RING 来进行 Zeekctl 的配置,Zeek 的安装路径一般都在 /usr/local/zeek。通过 /usr/local/zeek/etc/node.cfg 来进行集群结点的配置,在集群配置中,manager, proxy 以及 worker 是必须的,如果不设置 logger,默认将 manager 作为 logger。

[worker-1]
type=worker
host=10.0.0.50
interface=eth0
lb_method=pf_ring
lb_procs=10
pin_cpus=2,3,4,5,6,7,8,9,10,11

接下来只需要通过 zeekctl install 就会在其它实例上来进行安装了。如果安装过程中出现了问题,可以通过 zeekctl diag woker-1 来排查具体的原因。

Zeek 结合被动扫描器的玩法

上面讲的都是 Zeek 的安装,下面聊一下 Zeek 和被动扫描器的结合。被动扫描器的效果往往取决于流量的质量和数量,在我们的实际实践中,发现通过 Zeek 获取的流量占我们被动扫描器测试流量的绝大一部分。Zeek 对于 http 解析的日志都会存储在 /usr/local/zeek/logs 中。如果 Zeek 是启动状态,那么 http.log 的路径会在 /usr/local/zeel/logs/current 中,而历史日志则会被打包。如果使用 Zeek 去捕获流量的时候,日志往往会占很大的存储,所以要记得修改 Zeek 日志的存储路径,否则很容易就把系统盘塞满。

通过脚本自定义 http.log

http.log 中其实已经包含了丰富的字段,常见的一些字段如下:

# ts          uid          orig_h        orig_p  resp_h         resp_p
1311627961.8  HSH4uV8KVJg  192.168.1.100 52303   192.150.187.43 80

不过里面还有一些信息是缺失的,比如一些 http 请求头以及 POST 请求的请求体,为了添加这些字段,可以通过自定义 Zeek 脚本来实现,Zeek 脚本的能力真的非常强大,通过脚本其实有很多更高级的玩法。

添加请求头

@load base/protocols/http/main
module HTTP;
export {
    redef record Info += {
            header_host:    string  &log    &optional;
            header_accept:  string  &log    &optional;
            header_accept_charset:  string  &log    &optional;
            header_accept_encoding:  string  &log    &optional;
            header_accept_language:  string  &log    &optional;
            header_accept_ranges:  string  &log    &optional;
            header_authorization:  string  &log    &optional;
            header_connection:  string  &log    &optional;
            header_cookie:  string  &log    &optional;
            header_content_length:  string  &log    &optional;
            header_content_type:  string  &log    &optional;
    };
}
event http_header(c: connection, is_orig: bool, name: string, value: string) &priority=3
        {
        if ( ! c?$http )
                return;
        if ( is_orig )
                {
                if ( log_client_header_names )
                        {
                switch ( name ) {
                                case "HOST":
                                    c$http$header_host = value;
                                    break;
                                case "ACCEPT":
                                    c$http$header_accept = value;
                                    break;
                                case "ACCEPT-CHARSET":
                                    c$http$header_accept_charset = value;
                                    break;
                                case "ACCEPT-ENCODING":
                                    c$http$header_accept_encoding = value;
                                    break;
                                case "ACCEPT-LANGUAGE":
                                    c$http$header_accept_language = value;
                                    break;
                                case "ACCEPT-RANGES":
                                    c$http$header_accept_ranges = value;
                                    break;
                                case "AUTHORIZATION":
                                    c$http$header_authorization = value;
                                    break;
                                case "CONNECTION":
                                    c$http$header_connection = value;
                                    break;
                                case "COOKIE":
                                    c$http$header_cookie = value;
                                    break;
                                case "CONTENT-LENGTH":
                                    c$http$header_content_length = value;
                                    break;
                                case "CONTENT-TYPE":
                                    c$http$header_content_type = value;
                                    break;
                                }
            }
                }
        }

添加 POST 请求体

export {
    ## The length of POST bodies to extract.
    const http_post_body_length = 200 &redef;
}
redef record HTTP::Info += {
    postdata: string &log &optional;
};
event log_post_bodies(f: fa_file, data: string)
    {
    for ( cid in f$conns )
        {
        local c: connection = f$conns[cid];
        if ( ! c$http?$postdata )
            c$http$postdata = "";
        # If we are already above the captured size here, just return.
        if ( |c$http$postdata| > http_post_body_length )
            return;
        c$http$postdata = c$http$postdata + data;
        if ( |c$http$postdata| > http_post_body_length )
            {
            c$http$postdata = c$http$postdata[0:http_post_body_length] + "...";
            }
        }
    }
event file_over_new_connection(f: fa_file, c: connection, is_orig: bool)
    {
    if ( is_orig && c?$http && c$http?$method && c$http$method == "POST" )
        {
        Files::add_analyzer(f, Files::ANALYZER_DATA_EVENT, [$stream_event=log_post_bodies]);
        }
    }

通过上述的脚本就可以添加一些请求头以及 POST 请求的请求体,完整的脚本可以参考 http-custom。脚本编写完毕,需要通过 zeekctl 部署才能生效,步骤也非常简单。

mv http-custom /usr/local/bro/share/bro/base/protocols
echo '@load base/protocols/http-custom' >> /usr/local/bro/share/bro/site/local.bro
zeekctl deploy

对于被动扫描器,我们目前的方案是通过 Filebeat 去采集日志然后输出给 Logstash 做处理,处理完毕之后再输出到 Kafka。

img

Filebeat 加 Logstash 适用于多种场景,在日常的各种日志采集场景都能派上用场。通过 Logstash 可以完成日志灵活的处理,因为 Logstash 里面包含了各种丰富的插件,几乎可以完成对于日志的任何操作。比如为了保证 POST 请求体保证传输的正确性,可以通过 base64 来进行编码。通过 logstash-filter-base64 可以遍历地实现字段的编码或者解码。通过 filter 中的 mutate 插件可以增加字段或者删除字段。

base64 {
     field => "postdata"
     action => "encode"
   }

通过这种方案还有一个优势就是我们还可以将我们的日志输出到别的地方,比如 es,这个也可以方便后续排查日志问题。

不过我在后面又发现了一种新的方案,可以通过 Zeek 的插件,将 http.log 直接输出到 Kafka,这个方案的优点主要是更高效,同时也节省了一些成本,毕竟 Logstash 需要的机器性能还是比较大的。对于这个方案主要是两个问题,第一个问题是首先需要处理好日志的格式,这样保证后续处理地便利性;第二个问题是如何将日志直接从 Zeek 输出到 Kafka。其实我是先解决了第一个问题再解决第二个问题的,因为第二个问题的处理的方式更灵活,得益于 Zeek 脚本的便利性,肯定是可以实现的。

img

metron-bro-plugin-kafka 是 Apache 官方的一个 Bro 的插件,不过因为 Zeek3.0.0 是可以兼容的,所以这个插件是可以使用的。这个插件有两种安装方式,一种是通过 bro-pkg (Bro 的官方包管理工具)来进行安装,另外一种则是通过手工安装。由于网络的原因,我更推荐使用手工安装的方式,我尝试通过 bro-pkg 的方式来进行安装,速度特别慢。

安装 librdkafka

curl -L https://github.com/edenhill/librdkafka/archive/v0.11.5.tar.gz | tar xvzcd librdkafka-0.11.5/
./configure --enable-sasl
make
sudo make install

安装插件

./configure --bro-dist=$BRO_SRC
make
sudo make install

这里有一个坑就是安装文档根本就没有说 $BRO_SRC 是哪个路径,所以安装的时候总是报错,后来才弄清楚这个路径其实就是当初 Zeek 解压后的路径,即 Zeek 安装包的路径。

验证结果

zeek -N Apache::Kafka
Apache::Kafka - Writes logs to Kafka (dynamic, version 0.3)

接着就是将 http 的日志进行处理,因为在原始的 http.log 中有还多字段是我们并不需要的。在研究了官方文档之后,可以通过 Filters 可以定义一个新的日志文件,可以拷贝其它的日志输出到新的文件,可以自定义字段,方式比较灵活。另外还可以通过 Writer 可以将日志写入到 sqlite 数据库中。不过,这里我们主要是通过插件将日志写入到 Kafka。

我们的目标是获取 http.log 中的部分字段,所以可以通过 Filters 来实现日志文件的复制并且对日志字段进行过滤,基于 KafkaWriter 将日志文件直接写入到 Kafka 中。为了定义 Filter,在 /usr/local/zeek/share/zeek/base/protocols/http/main.zeek 的 zeek_init 函数中进行定义:

event zeek_init() &priority=5
     {
     Log::create_stream(HTTP::LOG, [$columns=Info, $ev=log_http, $path="http"]);
     Analyzer::register_for_ports(Analyzer::ANALYZER_HTTP, ports);
     local filter: Log::Filter =
         [
         $name="kafka-http",
         $include=set("host","id.resp_p","uri"),
         $writer=Log::WRITER_KAFKAWRITER
         ];
      Log::add_filter(HTTP::LOG, filter);
     }

另外,记得在 /usr/local/zeek/share/zeek/site/local.zeek 中定义 Kafka 的 topic 和 Broker:

redef Kafka::topic_name = "bro-test";
redef Kafka::kafka_conf = table(
    ["metadata.broker.list"] = "127.0.0.1:9092"
);

最后记得使用 zeekctl deploy 重新部署一下,这样脚本就生效了,日志就可以直接写入到 Kafka 中,大大提高效率。


文章作者: sfc9982
版权声明: 本博客所有文章除特別声明外,均采用 CC BY-NC-ND 4.0 许可协议。转载请注明来源 sfc9982 !
  目录