2011年9月18日星期日

一场风暴(Storm)即将来临:关于新版本的更多细节和计划

我们收到了很多问题,问我们,现在既然 BackType 已经被 Twitter 收购了,我们会在 Storm 上面做些什么。这里,我很高兴地宣布我们将在 9 月 19 日在 Strange Loop 发布 Storm!要想进一步了解情况,请查看讲座信息

在我关于 Storm 的预览贴里面,我探讨了 Storm 能够怎么应用在大量不同的实时计算问题上面。在这篇文章里面,我会给出关于 Storm 更多的细节和它用起来什么样子。

下面是 Storm 三大使用方法的总结:

  1. 流处理:Storm 可以用于处理数据流并实时更新数据库。和那些通过网络队列和工人(worker)处理流的标准方式不同,Storm 能够容错且易于扩展。
  2. 持续计算:Storm 可以处理持续查询并把结果以流的形式实时返回给客户端。一个例子就是把 Twitter 上的趋势话题以流的形式传回给浏览器。浏览器会趋势话题在发生的时候实时地看到它们。
  3. 分布式 RPC:Storm 可以用于在执行过程中并行化高强度的查询。这个意思就是说你的 Storm 拓扑是一个分布式函数,其等待调用消息。当它收到调用消息的时候,他对查询进行计算并送回结果。分布式 RPC 的例子有搜索查询的并行化和在很多大集合上面进行集合操作。
Storm 的美妙之处在于它可以用很简单的一组原语来解决多种多样的问题。

Storm 集群中的部件

一个 Storm 集群表面上看起来和一个 Hadoop 集群很像。在 Hadoop 上面你运行“MapReduce 工作”,在 Storm 你运行的是“拓扑(topologies)”。“工作”和“拓扑”他们本身非常不同——一个主要的不同点就是 MapReduce 工作终究会完成,而拓扑会永远的处理消息(除非你把它杀掉)。

Storm 集群上有了两类节点:主(master)节点和工人(worker)节点。主节点运行一个称为  “Nimbus” 的 daemon,类似于 Hadoop 里面的 JobTracker。Nimbus负责吧代码分布到集群里面的各个地方,给机器指定任务,并监测故障。

每个工人节点会运行一个叫做 “Supervisor(监督员)” 的 daemon。监督员监听分配到其机器的工作,并根据需要和 Nimbus 分配给它的任务启动和停止工人进程。每个工人进程执行拓扑的一部分;一个执行中的拓扑包括很多分布在很多机器上的很多工人进程。


Nimbus 和 Supervisor之间的协调是通过 ZooKeeper 集群来实现的。此外,Nimbus daemon 和 Supervisor daemon 都是快速失败的,且无状态;所有的状态都保存在 ZooKeeper 中或者本地磁盘上。这意味着你可以用 kill -9 命令杀死Nimbus 或者 Supervisor,然后他们会重启,像什么都没有发生过一样。这个设计让 Storm 集群变得难以置信的稳定。我们有一些运行了几个月从来没有维护过的拓扑。

运行一个 Storm 拓扑

运行一个拓扑很简单。首先,你把你所有的代码和他们依赖的库包装到一个 jar 里面去。然后,你运行像这样一个命令:
storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2
这个命运会用参数 arg1 和 arg2 来运行类 backtype.storm.MyTopology。类的 main 函数定义了拓扑并将其提交给 Nimbus。命令中的 storm jar 部分处理了和 Nimbus 的链接以及上传 jar 文件。

拓扑的定义就只是 Thrift 结构而已,Nimbus 也是一个 Thrift 服务,你可以用任何语言创建和提交拓扑。上面的就是一个最简单的用基于 JVM 的语言来实现的例子。

流和拓扑

现在让我们深入看看 Storm 提供的用于进行可扩展实时计算的抽象概念。在我介绍完主要的抽象概念之后,我会用一个具体的 Storm 拓扑的例子把所有的东西系在一起。

Storm 里面最核心的抽象概念是“流”。一个流是一个无边界的元组(tuple)序列。Storm 提供了一些列用于把一个流在分布式环境中可靠地转换成一个新流的原语(primitives)。比如,你可以把一个推文的流转换成一个趋势话题的流。

Storm 提供的进行流转换的基本原语是 “spout(喷口)” 和 “bolt(螺栓)”。Spout 和 bolt 都各有接口,你需要实现他们来执行你的应用程序的逻辑。

Spout 是一个流的来源。比如,一个 spout 可以从 Kestrel 队列中读取元组,并把他们输出为一个流。一个 spout 也可以连接到 Twitter API 并输出一个推文流。

Bolt 则用于进行一步流转换操作。它根据输入的流创建一个新流。复杂的流操作,比如从推文流计算趋势话题流,则可能需要多个步骤,也就是多个 bolt。

多步的流转换则被包装成为一个拓扑,拓扑是你可以提交给 Storm 集群执行的最高级别的抽象概念。一个拓扑是一个流转换的图,其每个节点都是一个 spout 或者 bolt。图中的边则表示哪个 bolt 订阅了哪个流。当 spout 或者 bolt 输出一个元组到一个流的时候,它也同时把这个元组发送到了该流所有的订阅者哪里。


Storm 里面的所有东西都是分布式并行运行的。Spout 和 bolt 以集群中的多个线程的形式执行,他们以一种分布式的方式把消息相互传递。消息永远也不会经过任何形式的中央路由节点,也没有任何中间队列。元组总是直接地从创建它的线程传递到消耗它的线程中去。

Storm 保证流过拓扑的每条消息都会被处理,即使某台机器死掉了且它上面的消息丢了。Storm 不用中间队列来实现这一切是它能够工作的关键和它这么快的原因。

让我们看一个具体的 spout、bolt 和拓扑的例子,来增强我们对概念的理解。

一个简单的拓扑

我要展示给你们的简单的拓扑是 “数单词数流”。这个拓扑包含了一个输出句子的 spout,最后的 bolt 输出每个单词在所有句中累积出现的次数。 每次一个单词的计数更新的时候,一个新的计数都会被输出。拓扑是这个样子:


在 Java 里面你这么定义拓扑:



这个拓扑的 spout 从位于 kestrel.backtype.com 端口 22133 的 Kestrel 服务器上的 “sentence_queue” 队列里面读出句子。

我们使用了 setSpout 方法来把一个Spout 和一个它独有的 id 加入到拓扑中去。拓扑中的每个节点必须有一个 id,这个 id 会被别的 bolt 用来订阅该节点的输出流。在这个拓扑里面,KestrelSpout 的 id 是 1。

setBolt 函数则被用于把 Bolt 插入到拓扑中去。这个拓扑里面定义的第一个 bolt 是 SplitSentence bolt。这个 bolt 把一个句子流转换成一个单词流。让我们看看 SplitSentence 的实现:

这里最核心的方法就是 execute 方法。如你所见,它把一个句子分拆成单词并把每个单词当成一个元组输出。另一个重要的方法是 declareOutputFields,其声明了 bolt 的输出元组的 schema(结构)。这里它声明了其会输出一个只含一个叫做 “word” 字段的1-元组。

你可以用任何语言实现 Bolt。下面是一个用 Python 实现的 bolt:

setBolt 的最后一个参数是你希望 bolt 使用的并行量。SplitSentence bolt 用了并行量 10,也就是说会有 10 个线程在 Storm 机群里面并行执行这个 bolt。要扩展一个拓扑,你需要做的就是增大里面成为瓶颈的 bolt 的并行量。

setBolt 方法返回一个对象,你用它来定义 bolt 的输入。继续看我们的例子,SplitSentence bolt 用乱序分组(shuffle grouping)的方式订阅了部件 “1” 的输出流。“1” 表示先前定义了的 KestrelSpout。我在后面会解释这个乱序分组。现在重要的是,SplitSentence bolt 已经会消耗 KestrelSpout 输出的每一个元组了。

一个 bolt 可以通过把输入定义串联起来的方式订阅多个输入流,比如:

你可以用这个功能来实现诸如流连接(Join)等操作。

这个数单词拓扑里面的最后一个 bolt,WordCount,从 SplitSentence 的输出里面读出单词,然后输出更新了的单词计数。下面是 WordCount 的实现:

WordCount 在内存里面存了一个从单词到计数映射的 map。当它看到一个词的时候,它就在内部 map 里面更新单词计数,然后把更新了的计数作为一个元组输出。最后,在 declareOutputFields 方法里面,bolt 声明了它会输出一个含有 “word” 和 “count” 两个字段的 2-元组。

如果任务死了,保存在内存里面的 map 也会丢失。如果在任务死去的时候保存 bolt 的内部状态对你来说很重要,你可以用像 Riak、Cassandra 或者 Memcached 之类的外部数据库来保存单词计数状态。这里我们用内存中的 HashMap 只是为了方便。

最后,WordCount bolt 声明了它的输入来自组件 2,也就是 SplitSentence bolt。它通过在 ”word“ 字段上进行“字段分组(field grouping)”消耗这个流的内容。

”字段分组“,和前面我稍微提到的“乱序分组”一样,都是一种“流分组“。”流分组“是最后一块把拓扑紧紧联系在一起的组件。

流分组(Stream Grouping)

流分组告诉拓扑如何在两个部件之间传递元组。请记住,spout 和 bolt 是作为很多任务在整个集群里面并行执行的。如果你看看拓扑在任务层次是怎么执行的,它应该看起来是这个样子:


当 Bolt A 的一个任务输出元组到 Bolt B 的时候,它应该输出给哪个任务呢?

”流分组“就可以回答这个问题,它告诉 Storm 如何在两组人物之间传递元组。有几种不同的流分组。

最简单的分组方式叫做”乱序分组(shuffle grouping)“,它只是把元组发送到一个随机的任务上。在数单词拓扑里面我们用了乱序分组,它把来自 KestrelSpout 的元组发送到 SplitSentence bolt。它可以均匀地在所有 SplitSentence bolt 任务上分布元组处理工作。

更有趣的一种分组叫做”字段分组(field grouping)“。在 SplitSentence bolt 和 WorldCount bolt 之间我们使用了字段分组。对于 WordCount bolt 而言,每次把同样的单词发送到同样的任务至关重要。不然,超过一个任务会看到同一个单词,它们每个都会输出一个不正确的计数值,因为它们没有谁有完整的信息。字段分组让你可以把多个流按照其字段的某个子集分组。这就导致了相同字段子集值都送到了同样的任务。因为 WordCount 通过 “word” 字段上的字段分组订阅了 SplitSentence 的输出流,同样的单词总是会被送到同样的任务中去,bolt 于是就是产生正确的输出。

字段分组是实现流连接(Join)、流聚集(aggregation)以及很多其他功能的基础。在底层,字段分组是通过一致散列函数(consistent hashing)实现的。

此外还有几种分组方式,但是他它们超过了本文讨论东西的范围。

有了这些,你现在应该有了所有你需要用以理解数单词拓扑的东西。拓扑并不需要太多的代码,而且它们是完全可扩展且容错的。不管你是每秒处理 10 条消息还是每秒 10 万条消息,你可以通过调整每个部件的并行量就可以增大或者减小拓扑的规模。

Storm 所隐藏的复杂性

Storm 提供的抽象非常简单。一个拓扑由 spout 和 bolt 组成,你用流分组把他们连接在一起使得数据流动。你只需指定每个部件需要的并行量,然后把所有东西放到 jar 文件里面去,把拓扑和代码都提交给 Nimbus,Storm 就会让你的拓扑永久运行。下面是 Storm 在底层非常鲁棒地实现这些抽象的方法一瞥:

  1. 消息处理保证:Storm 保证从 spout 出来的每个元组都会被拓扑完全处理。为了实现这项功能,Storm 跟踪一个元组引发的消息树。如果一个元组没有被完全处理,Storm 会从 spout 重演(reproduce)这个元组。Storm 集成了一些聪明的技术来高效地跟踪跟踪树。
  2. 鲁棒的进程管理:Storm 的主要任务之一是管理集群里面的进程。当一个新的工人(worker)被指定给一个 supervisor 之后,工人应该被尽快启动。当此工人不再被指定给这个 supervisor 时,它应该被杀掉且清除。

    一个把这个做得特别糟的系统的例子就是 Hadoop。当 Hadoop 启动一个任务的时候,任务退出的担子是放在任务自己身上的。不幸的是,任务有时候会退出失败然后成为孤儿进程,耗尽其它任务的内存和资源。

    在 Storm 里面,杀死一个工人进程的担子是由启动该进程的 supervisor 承担的。在 Storm 里面就根本不会有孤儿任务,不管你怎么增重机器负担或是发生多少错误。做到这一点有一些微妙之处,因为 Storm 不仅需要跟踪它启动的工人进程,他也需要跟踪工人进程启动的子进程(当 bolt 是用别的语言编写的时候,会有一个子进程被启动)。

    Nimbus daemon 和 supervisor daemon 都是无状态的和快速失败(fail-fast)的。如果它们死了,正在运行中的拓扑不会受到影响。这和 Hadoop 的工作方式也形成了鲜明的对比。
  3. 错误检测和自动任务重分配:一个运行中的拓扑里面的任务会向 Nimbus 发送心跳信息以示它们在正常运行。Nimbus 监测这些心跳信号,如果它们超时了,它会重新分配任务。此外,集群里面的所有正在给失败任务发送消息的任务都很快就会重新连结到新任务。
  4. 高效的消息传递:在任务之间的消息传递并没有使用任何中间队列,取而代之,消息是在任务之间通过 ZeroMQ 直接传递的。这比使用中间队列更简单且高效得多。ZeroMQ 是一个聪明的“超级套接字(super-socket)”库,其使用多种技巧来最大化消息传递带宽。比如,它会检测网络是否繁忙,并自动的把消息分批送往目的地。

    另一个进程间消息传递的重要部分是高效的消息序列化和反序列化。又一次地,Storm 把这一切为你自动化了。默认情况下,你可以在元组中使用任何原语类型、字符串、或者二进制记录。如果你想使用另一种类型,你只需要实现一个简单的接口告诉 Storm 你想如何序列化它。然后,每当 Storm 遇到这个类型的时候,它会自动使用这个序列化器。
  5. 本地模式和分布式模式:Storm 有一个“本地模式”,它可以在一个进程里面模拟一整个 Storm 集群。这可以让你快速迭代开发你的拓扑且为你的拓扑写单元测试。你可以把同样的代码在本地模式里运行,就像他们在集群模式里面运行一样。
Storm 非常易于使用、配置以及运营。对于处理每秒几百条消息的个人开发者,到每秒处理几十万条消息的大公司,Storm 都同样有效。

和“复杂事件处理(Complex Event Processing, CEP)”的关系

Storm 和诸如 EsperStreambase、和 S4 等“复杂事件处理”系统处于同一领域。在这些系统中,最接近且可以比较是 S4。Storm 和 S4 之间的最大区别是 Storm 在故障的情况下也可以保证消息会被处理,而 S4 有时会丢失消息。

有些复杂事件处理系统有内置的数据存储层。有了 Storm,你会用像 Cassandra 或者 Riak 之类的外部的数据库和你的拓扑一起工作。一个数据存储系统不可能满足所有的应用程序,因为不同的应用程序有不同的数据模型和访问模式。Storm 是一个计算系统,而不是一个存储系统。然而,Storm 有一些强大的工具让你实现数据本地化,即使你使用了一个外部数据库。

总结

我们现在只探讨了 Storm 的九牛一毛。Storm 核心的“流”概念有着比我所展示的更广泛的使用方法——我们还没有探讨诸如多路流、隐含流、或直接分组。我展示了 Storm 的两个主要抽象,spout 和 bolt,但是我们还没有探讨 Storm 的第三个,可能也是最强大的一个抽象,“状态 spout”。我们也没有探讨如何用 Storm 实现分布式 RPC,没有探讨 Storm 的强大的自动部署功能,你可以按一个按钮就在 EC2 (译者注:Amazon 的云计算平台)上面创建一个 Storm 集群。

所有的这些东西,你都得等到 9 月 19 号。在那以前,我会添加一些 Storm 文档以便你能在它发布之后快速地把它跑起来。我们为 Storm 的发布感到非常兴奋,我也希望能在 Strange Loop 上,在它发布之时结识各位。

——Nanthan Marz (@nathanmarz)

原文发布日期:2011 年 8 月 4 日

没有评论:

发表评论