2011年11月23日星期三

SpiderDuck:Twitter 的实时 URL 抓取器

推文中常常含有指向 web 各种内容的链接,包括图像、视频、新闻文章以及博客帖。SpiderDuck 是Twitter 的一项用于对这些链接进行实时抓取、解析下载内容并提取有趣的元数据的服务,并且,它使得其它的 Twitter 服务能够在数秒钟内即可使用这些数据。

Twitter 的许多的团队都需要访问链接的内容,尤其是需要用实时内容来改进 Twitter 的产品。比如:
  • 搜索:需要对解析后的 URL 建立索引并改进搜索质量;
  • 客户端:需要在推文旁边同时显示某些类型的媒体内容,比如照片;
  • 发推按钮:用以统计每条 URL 到底被共享了多少次;
  • 信任与安全部门:用以帮助检测恶意软件和垃圾信息;
  • 分析:发掘关于 Twitter 上被共享的链接的统计汇总信息。
背景

SpiderDuck 出现之前,Twitter 有一个用发送 HEAD 请求和跟踪跳转来解析所有推文中的 URL 的服务。这个服务很简单,也满足了公司当时的需求,但是它有几项限制:
  • 它解析 URL,但是并不真正的下载内容。解析信息被存在内存中缓存里但是并没有永久地保存在磁盘上。这意味着如果内存中的缓存实例被重启的话,数据就都会丢失。
  • 它没有实现现代抓取机器人的“礼貌性”,比如访问频率限制和遵守 robots.txt 的指示等等。
显然,我们需要建造一个能够克服上述限制并满足公司的长远目标的真正的 URL 抓取器。我们最初的想法使用某个开源的抓取代码,或以其为基础,但是我们意识到几所有能用的开源抓取器都有两个我们不需要的特征:
  • 它们都是递归抓取器。也就是说,它们是被设计来抓取页面并递归抓取从页面里面提取出来的所有链接的。递归抓取给爬虫的调度和长期队列的维护带来了很多复杂性,在我们情况下并不必要。
  • 它们被优化来进行大批量抓取。我们需要的是快速的、实时的 URL 抓取。
因此,我们决定设计一个能够满足 Twitter 的实时需要的新系统,并能够随其增长水平扩展。为了避免重新发明轮子,我们把新系统大部分建立在了开源的模块之上,从而可以继续利用开源社区的贡献。

这是 Twitter 的工程问题里面很典型的一个——它们和其它大型互联网公司的问题很类似,但是要求所有东西都能够实时工作又带来了独特而有趣的挑战。

系统概览

这里是讲述 SpiderDuck 如何工作的概览。下图画出了它的主要部件。


Kestrel:这是一个在 Twitter 广泛使用的,用以对新进推文进行排队的消息队列系统

Scheduler:这些工作单元决定是否要抓取一个 URL,计划抓取时间,并跟踪重定向跳转(如果有的话)。抓取之后,它会解析下载的内容,提取元数据(metadata),并把元数据写回 Metadata Store,把原始数据写入 Content Store。每个 scheduler 都独立工作;也就是说,我们可以把任意数量的 scheduler 加入到系统中,随推文和 URL 的数量增加水平地扩展系统。

Fetcher:这些是用于维护短期 URL 抓取队列的 Thrift 服务器,它们发送实际的 HTTP 抓取请求并实现速率控制和处理 robots.txt。就像 scheduler 一样,它们可以随抓取速率水平扩展。

Memcached:这是 fetcher 使用的分布式缓存,用以临时存储 robots.txt 文件。

Metadata Store:这是一个基于 Cassandra 的分布式散列表,用以存储网页的元数据和以 URL 索引的解析信息,以及系统最近遇到的每个 URL 的抓取状态。这个存储为 Twitter 所有的需要实时访问 URL 数据的客户服务。

Content Store:这是一个 HDFS 集群,用以存储下载的内容和所有的抓取信息。

现在我们将更详细地介绍 SpiderDuck 的两个主要部件——URL Scheduler 和 URL Fetcher。

URL Scheduler (URL 调度器)

下面的图表画出了 SpiderDuck Scheduler 里面的几个处理阶段。


就像 SpiderDuck 的大部分一样,Scheduler 也是建立在一个 Twitter 开发的开叫做 Finagle 的开源异步 RPC 框架之上。(实际上,这是最早的一个利用 Finagle 的项目)。上图里面的每一个方块,除了 Kestrel Reader,都是一个 Finagle Filter —— 一个允许把一系列处理阶段连接成一个完全异步流水线的抽象概念。完全异步则允许 SpiderDuck 以较少的、固定数量的线程处理很高的流量。

Kestrel Reader 会不断地询问是否有新的推文出现。当推文进来时,它们被发送到 Tweet Processor,它从其中提取 URL。每条 URL 然后就会被送到 Crawl Decider 阶段。该阶段从 Metadata Store 读取 URL 的抓取状态,以确定 SpiderDuck 是否之前已经见过了这个 URL。Crawl Decider 然后根据一个预先制定的抓取策略(就是如果 SpiderDuck 在过去 X 天内已经见过了此 URL 则不再重复抓取)来决定是否该 URL 应该被抓取。如果 Decider 决定不抓取该 URL,它会记录状态以表示处理完成。如果它决定要抓取这个 URL,它就会把 URL 送到 Fetcher Client 阶段。

Fetcher Client 阶段使用客户端库和 Fetcher 交谈。客户端库实现了逻辑用以够决定哪个 Fetcher 会被用来抓取该 URL;它也能够处理重定向跳转。(重定向跳转链非常普遍,因为 Twitter 上的贴的 URL 多数都被缩短了)经过 Scheduler 的每个 URL 都有一个相关的上下文对象。Fetcher Client 会把包括状态、下载的头以及内容的抓取信息添加到上下文对象中,并将其传递给 Post Processor。Post Processor 把下载的数据交给元数据提取器,它会检测页面的编码,并使用一个开源的 HTML5 解析器解析页面的内容。提取库实现了一系列启发式算法,用于提取诸如标题、简介、以及代表图片等元数据。Post Processor 然后把所有的元数据和抓取信息写入 Metadata Store。如果需要的话,Post Processor 还会调度一系列相关抓取。相关抓取的例子之一就是嵌入的媒体内容,比如图片。

后期处理(post processing)结束之后,URL 上下文对象被交给下一个阶段,其会使用一个叫做 Scribe 的开源日志聚集器在 Content Store (HDFS)的日志中记录所有信息,包括完整的内容。该阶段还通知所有的感兴趣的监听者 URL 处理结束了。通知使用了一个简单的发布者-订阅者模型,用 Kestrel 的分散队列实现。

所有的处理阶段都是异步运行的 —— 没有任何线程会等待一个阶段完成。和每个正在处理中的 URL 相关的状态都保存在相关的上下文对象中,所以线程模型也非常简单。异步实现也受益于 Finagle 和 Twitter Util 库提供的方便的抽象和构件。

URL Fetcher (URL 抓取器)

让我们来看看 Fetcher 如何处理一条 URL。


Fetcher 通过 Thrift 界面接收到 URL。经过一些简单的确认之后,Thrift 处理器把 URL 传递给 Request Queue Manager (请求队列管理器),其把 URL 指定给某个合适的请求队列。一个调度了的任务会按照固定的速率从请求队列中读取。一旦 URL 被从队列中取出来了,它就会被送到 HTTP Service 处理。建造在 Finagle 上面的 HTTP Service 首先检查 URL 相关的主机是否已经在缓存中了。如果没有,那么它会为它创建一个 Finagle 客户,并调度好 robots.txt 文件的抓取。在 robots.txt 被下载之后,HTTP Service 会抓取许可的 URL。robots.txt 文件本身是被缓存的,在进程中的 Host Cache 和 Memcached 里面各一份,以防止每次有该主机新的 URL 进来时重复抓取。

一些叫做 Vulture (秃鹫)的任务周期性地检查 Request Queue (请求队列)和 Host Cache (主机缓存)以寻找有一段时间都没有被使用的队列和主机;如果找到了,它们就会被删除。Vulture 还会通过日志和 Twitter Commons 状态输出库报告有用的统计信息。

Fetcher 的 Request Queue 还有一个重要的目标:速率限制。SpiderDuck 限制对每个域名发出的 HTTP 抓取请求,以保证不会使得 web 服务器过载。为了准确地限制速率,SpiderDuck 保证每一个 Request Queue 在任一时刻都被指定到刚好一个 Fetcher,并且能够在 Fetcher 失效的时候自动重新指定到另一个 Fetcher 上。一个叫做 Pacemaker 的机群软件包会把 Request Queue 指定给 Fetcher 并管理失效转移。Fetcher 客户库根据 URL 的域名把它们分配到不同的 Request Queue。对于整个 web 设置的默认速率限制也能够根据需要被对于具体的域名设置的速率限制取代。也就是说,如果 URL 进来的速度比处理它们的速度还要快,它们就会拒绝请求,以告诉客户端应该收敛,或者采取其它合适措施。

为了安全,Fetcher 被部署到了 Twitter 数据中心里面的一个特殊区域 DMZ。这意味着 Fetcher 不能访问 Twitter 的产品机群和服务。所以,确保它们的轻量级设计和自力更生非常重要,这也是一条指导很多方面的设计的原则。

Twitter 如何使用 SpiderDuck

Twitter 服务以很多方式使用 SpiderDuck 的数据。大部分会直接查询 Metadata Store 以获取 URL 的元数据(比如,标题)以及解析信息(所有重定向跳转之后的最终规范化 URL)。Metadata Store 是实时填充的,一般是在 URL 在推文中发布后的几秒钟内。这些服务并不直接和 Cassandra 交谈,而是通过一个代理这些请求的 Spiderduck Thrift 服务器。这个中间层为 SpiderDuck 提供了灵活性,使其能够透明地切换存储系统,如有需有。它同时也支持了比直接访问 Cassandra 更高级的 API 抽象。

其它服务会周期性的处理 SpiderDuck 在 HDFS 上的日志以生成聚合统计信息,用以 Twitter 的内部测量仪表板或者进行其它批量分析。仪表板帮助我们回答诸如“每天 Twitter 上有多少图片被共享?”、“Twitter 用户最经常链接到什么新闻网站?”以及“我们昨天从某个网站抓取了多少网页?”之类的问题。

需要注意的是,这些服务一般不会告诉 SpiderDuck 需要抓取什么东西;SpiderDuck 已经抓取了进入 Twitter 的所有 URL。取而代之,这些服务在 URL 可用之后询问它们的相关信息。SpiderDuck 也允许这些服务直接请求 Fetcher 通过 HTTP 抓取任意内容,(这样它们就能受益于我们的数据中心设置、速率限制、robot.txt 支持等功能),但这种用法并不普遍。

性能数据

SpiderDuck 每秒处理数百条 URL。这中间的大部分都是在 SpiderDuck 的抓取策略所定义的时间窗口里独一无二(unique)的,所以它们会被抓取。对于抓取了的 URL,SpiderDuck 处理延迟中值在 2 秒以下,99% 的处理延迟低于 5 秒。该延迟是基于推问发布时间测量的,也就是说在用户点击“发推”按钮后 5 秒内,推文中的 URL 就被提取出来,做好了抓取准备,获取了所有的重定向跳转,下载并解析了内容,并提取了元数据,并且它们已经通过 Metadata Store 对于客户可用了。这中间大部分的时间要么花在了 Fetcher Request Queue (因为速率限制)中,或者花在了从外部 web 服务器实际获取该 URL 上。SpiderDuck 本身只增加了几百毫秒的额外处理时间,大部分都花在 HTML 解析上。

SpiderDuck 的基于 Cassandra 的 Metadata Store 能够处理接近每秒 10,000 个请求。这些请求一般是针对单独或者小批次(小于 20 个)URL 的,但是它也能够处理大批次(200~300 个 URL)的请求。这个存储系统的读取延迟中值在 4 ~ 5 秒左右,第 99 百分区间在 50 ~ 60 毫秒左右。

致谢

SpiderDuck 的核心团队包括以下成员:Abhi Khune,Michael Busch,Paul Burstein,Raghavendra Prabhu,Tian Wang 以及 Yi Zhuang。此外,我们希望对遍布全公司的以下人员表示感谢,他们要么直接为该项目做出了贡献,帮助设置了 SpiderDuck 直接依赖的部件(比如 Cassandra、Finagle、Pacemaker 以及 Scribe),要么帮助建立了 SpiderDuck 独特的数据中心设置: Alan Liang, Brady Catherman, Chris Goffinet, Dmitriy Ryaboy, Gilad Mishne, John Corwin, John Sirois, Jonathan Boulle, Jonathan Reichhold, Marius Eriksen, Nick Kallen, Ryan King, Samuel Luckenbill, Steve Jiang, Stu Hood and Travis Crawford。我们也要感谢整个 Twitter 搜索团队提供的宝贵的设计反馈和支持。如果你也想参与这样的项目,和我们一起飞吧

原文发布时间:2011 年 11 月 14 日
翻译:王天

2011年9月18日星期日

一场风暴(Storm)即将来临:关于新版本的更多细节和计划

我们收到了很多问题,问我们,现在既然 BackType 已经被 Twitter 收购了,我们会在 Storm 上面做些什么。这里,我很高兴地宣布我们将在 9 月 19 日在 Strange Loop 发布 Storm!要想进一步了解情况,请查看讲座信息

在我关于 Storm 的预览贴里面,我探讨了 Storm 能够怎么应用在大量不同的实时计算问题上面。在这篇文章里面,我会给出关于 Storm 更多的细节和它用起来什么样子。

下面是 Storm 三大使用方法的总结:

  1. 流处理:Storm 可以用于处理数据流并实时更新数据库。和那些通过网络队列和工人(worker)处理流的标准方式不同,Storm 能够容错且易于扩展。
  2. 持续计算:Storm 可以处理持续查询并把结果以流的形式实时返回给客户端。一个例子就是把 Twitter 上的趋势话题以流的形式传回给浏览器。浏览器会趋势话题在发生的时候实时地看到它们。
  3. 分布式 RPC:Storm 可以用于在执行过程中并行化高强度的查询。这个意思就是说你的 Storm 拓扑是一个分布式函数,其等待调用消息。当它收到调用消息的时候,他对查询进行计算并送回结果。分布式 RPC 的例子有搜索查询的并行化和在很多大集合上面进行集合操作。
Storm 的美妙之处在于它可以用很简单的一组原语来解决多种多样的问题。

Storm 集群中的部件

一个 Storm 集群表面上看起来和一个 Hadoop 集群很像。在 Hadoop 上面你运行“MapReduce 工作”,在 Storm 你运行的是“拓扑(topologies)”。“工作”和“拓扑”他们本身非常不同——一个主要的不同点就是 MapReduce 工作终究会完成,而拓扑会永远的处理消息(除非你把它杀掉)。

Storm 集群上有了两类节点:主(master)节点和工人(worker)节点。主节点运行一个称为  “Nimbus” 的 daemon,类似于 Hadoop 里面的 JobTracker。Nimbus负责吧代码分布到集群里面的各个地方,给机器指定任务,并监测故障。

每个工人节点会运行一个叫做 “Supervisor(监督员)” 的 daemon。监督员监听分配到其机器的工作,并根据需要和 Nimbus 分配给它的任务启动和停止工人进程。每个工人进程执行拓扑的一部分;一个执行中的拓扑包括很多分布在很多机器上的很多工人进程。


Nimbus 和 Supervisor之间的协调是通过 ZooKeeper 集群来实现的。此外,Nimbus daemon 和 Supervisor daemon 都是快速失败的,且无状态;所有的状态都保存在 ZooKeeper 中或者本地磁盘上。这意味着你可以用 kill -9 命令杀死Nimbus 或者 Supervisor,然后他们会重启,像什么都没有发生过一样。这个设计让 Storm 集群变得难以置信的稳定。我们有一些运行了几个月从来没有维护过的拓扑。

运行一个 Storm 拓扑

运行一个拓扑很简单。首先,你把你所有的代码和他们依赖的库包装到一个 jar 里面去。然后,你运行像这样一个命令:
storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2
这个命运会用参数 arg1 和 arg2 来运行类 backtype.storm.MyTopology。类的 main 函数定义了拓扑并将其提交给 Nimbus。命令中的 storm jar 部分处理了和 Nimbus 的链接以及上传 jar 文件。

拓扑的定义就只是 Thrift 结构而已,Nimbus 也是一个 Thrift 服务,你可以用任何语言创建和提交拓扑。上面的就是一个最简单的用基于 JVM 的语言来实现的例子。

流和拓扑

现在让我们深入看看 Storm 提供的用于进行可扩展实时计算的抽象概念。在我介绍完主要的抽象概念之后,我会用一个具体的 Storm 拓扑的例子把所有的东西系在一起。

Storm 里面最核心的抽象概念是“流”。一个流是一个无边界的元组(tuple)序列。Storm 提供了一些列用于把一个流在分布式环境中可靠地转换成一个新流的原语(primitives)。比如,你可以把一个推文的流转换成一个趋势话题的流。

Storm 提供的进行流转换的基本原语是 “spout(喷口)” 和 “bolt(螺栓)”。Spout 和 bolt 都各有接口,你需要实现他们来执行你的应用程序的逻辑。

Spout 是一个流的来源。比如,一个 spout 可以从 Kestrel 队列中读取元组,并把他们输出为一个流。一个 spout 也可以连接到 Twitter API 并输出一个推文流。

Bolt 则用于进行一步流转换操作。它根据输入的流创建一个新流。复杂的流操作,比如从推文流计算趋势话题流,则可能需要多个步骤,也就是多个 bolt。

多步的流转换则被包装成为一个拓扑,拓扑是你可以提交给 Storm 集群执行的最高级别的抽象概念。一个拓扑是一个流转换的图,其每个节点都是一个 spout 或者 bolt。图中的边则表示哪个 bolt 订阅了哪个流。当 spout 或者 bolt 输出一个元组到一个流的时候,它也同时把这个元组发送到了该流所有的订阅者哪里。


Storm 里面的所有东西都是分布式并行运行的。Spout 和 bolt 以集群中的多个线程的形式执行,他们以一种分布式的方式把消息相互传递。消息永远也不会经过任何形式的中央路由节点,也没有任何中间队列。元组总是直接地从创建它的线程传递到消耗它的线程中去。

Storm 保证流过拓扑的每条消息都会被处理,即使某台机器死掉了且它上面的消息丢了。Storm 不用中间队列来实现这一切是它能够工作的关键和它这么快的原因。

让我们看一个具体的 spout、bolt 和拓扑的例子,来增强我们对概念的理解。

一个简单的拓扑

我要展示给你们的简单的拓扑是 “数单词数流”。这个拓扑包含了一个输出句子的 spout,最后的 bolt 输出每个单词在所有句中累积出现的次数。 每次一个单词的计数更新的时候,一个新的计数都会被输出。拓扑是这个样子:


在 Java 里面你这么定义拓扑:



这个拓扑的 spout 从位于 kestrel.backtype.com 端口 22133 的 Kestrel 服务器上的 “sentence_queue” 队列里面读出句子。

我们使用了 setSpout 方法来把一个Spout 和一个它独有的 id 加入到拓扑中去。拓扑中的每个节点必须有一个 id,这个 id 会被别的 bolt 用来订阅该节点的输出流。在这个拓扑里面,KestrelSpout 的 id 是 1。

setBolt 函数则被用于把 Bolt 插入到拓扑中去。这个拓扑里面定义的第一个 bolt 是 SplitSentence bolt。这个 bolt 把一个句子流转换成一个单词流。让我们看看 SplitSentence 的实现:

这里最核心的方法就是 execute 方法。如你所见,它把一个句子分拆成单词并把每个单词当成一个元组输出。另一个重要的方法是 declareOutputFields,其声明了 bolt 的输出元组的 schema(结构)。这里它声明了其会输出一个只含一个叫做 “word” 字段的1-元组。

你可以用任何语言实现 Bolt。下面是一个用 Python 实现的 bolt:

setBolt 的最后一个参数是你希望 bolt 使用的并行量。SplitSentence bolt 用了并行量 10,也就是说会有 10 个线程在 Storm 机群里面并行执行这个 bolt。要扩展一个拓扑,你需要做的就是增大里面成为瓶颈的 bolt 的并行量。

setBolt 方法返回一个对象,你用它来定义 bolt 的输入。继续看我们的例子,SplitSentence bolt 用乱序分组(shuffle grouping)的方式订阅了部件 “1” 的输出流。“1” 表示先前定义了的 KestrelSpout。我在后面会解释这个乱序分组。现在重要的是,SplitSentence bolt 已经会消耗 KestrelSpout 输出的每一个元组了。

一个 bolt 可以通过把输入定义串联起来的方式订阅多个输入流,比如:

你可以用这个功能来实现诸如流连接(Join)等操作。

这个数单词拓扑里面的最后一个 bolt,WordCount,从 SplitSentence 的输出里面读出单词,然后输出更新了的单词计数。下面是 WordCount 的实现:

WordCount 在内存里面存了一个从单词到计数映射的 map。当它看到一个词的时候,它就在内部 map 里面更新单词计数,然后把更新了的计数作为一个元组输出。最后,在 declareOutputFields 方法里面,bolt 声明了它会输出一个含有 “word” 和 “count” 两个字段的 2-元组。

如果任务死了,保存在内存里面的 map 也会丢失。如果在任务死去的时候保存 bolt 的内部状态对你来说很重要,你可以用像 Riak、Cassandra 或者 Memcached 之类的外部数据库来保存单词计数状态。这里我们用内存中的 HashMap 只是为了方便。

最后,WordCount bolt 声明了它的输入来自组件 2,也就是 SplitSentence bolt。它通过在 ”word“ 字段上进行“字段分组(field grouping)”消耗这个流的内容。

”字段分组“,和前面我稍微提到的“乱序分组”一样,都是一种“流分组“。”流分组“是最后一块把拓扑紧紧联系在一起的组件。

流分组(Stream Grouping)

流分组告诉拓扑如何在两个部件之间传递元组。请记住,spout 和 bolt 是作为很多任务在整个集群里面并行执行的。如果你看看拓扑在任务层次是怎么执行的,它应该看起来是这个样子:


当 Bolt A 的一个任务输出元组到 Bolt B 的时候,它应该输出给哪个任务呢?

”流分组“就可以回答这个问题,它告诉 Storm 如何在两组人物之间传递元组。有几种不同的流分组。

最简单的分组方式叫做”乱序分组(shuffle grouping)“,它只是把元组发送到一个随机的任务上。在数单词拓扑里面我们用了乱序分组,它把来自 KestrelSpout 的元组发送到 SplitSentence bolt。它可以均匀地在所有 SplitSentence bolt 任务上分布元组处理工作。

更有趣的一种分组叫做”字段分组(field grouping)“。在 SplitSentence bolt 和 WorldCount bolt 之间我们使用了字段分组。对于 WordCount bolt 而言,每次把同样的单词发送到同样的任务至关重要。不然,超过一个任务会看到同一个单词,它们每个都会输出一个不正确的计数值,因为它们没有谁有完整的信息。字段分组让你可以把多个流按照其字段的某个子集分组。这就导致了相同字段子集值都送到了同样的任务。因为 WordCount 通过 “word” 字段上的字段分组订阅了 SplitSentence 的输出流,同样的单词总是会被送到同样的任务中去,bolt 于是就是产生正确的输出。

字段分组是实现流连接(Join)、流聚集(aggregation)以及很多其他功能的基础。在底层,字段分组是通过一致散列函数(consistent hashing)实现的。

此外还有几种分组方式,但是他它们超过了本文讨论东西的范围。

有了这些,你现在应该有了所有你需要用以理解数单词拓扑的东西。拓扑并不需要太多的代码,而且它们是完全可扩展且容错的。不管你是每秒处理 10 条消息还是每秒 10 万条消息,你可以通过调整每个部件的并行量就可以增大或者减小拓扑的规模。

Storm 所隐藏的复杂性

Storm 提供的抽象非常简单。一个拓扑由 spout 和 bolt 组成,你用流分组把他们连接在一起使得数据流动。你只需指定每个部件需要的并行量,然后把所有东西放到 jar 文件里面去,把拓扑和代码都提交给 Nimbus,Storm 就会让你的拓扑永久运行。下面是 Storm 在底层非常鲁棒地实现这些抽象的方法一瞥:

  1. 消息处理保证:Storm 保证从 spout 出来的每个元组都会被拓扑完全处理。为了实现这项功能,Storm 跟踪一个元组引发的消息树。如果一个元组没有被完全处理,Storm 会从 spout 重演(reproduce)这个元组。Storm 集成了一些聪明的技术来高效地跟踪跟踪树。
  2. 鲁棒的进程管理:Storm 的主要任务之一是管理集群里面的进程。当一个新的工人(worker)被指定给一个 supervisor 之后,工人应该被尽快启动。当此工人不再被指定给这个 supervisor 时,它应该被杀掉且清除。

    一个把这个做得特别糟的系统的例子就是 Hadoop。当 Hadoop 启动一个任务的时候,任务退出的担子是放在任务自己身上的。不幸的是,任务有时候会退出失败然后成为孤儿进程,耗尽其它任务的内存和资源。

    在 Storm 里面,杀死一个工人进程的担子是由启动该进程的 supervisor 承担的。在 Storm 里面就根本不会有孤儿任务,不管你怎么增重机器负担或是发生多少错误。做到这一点有一些微妙之处,因为 Storm 不仅需要跟踪它启动的工人进程,他也需要跟踪工人进程启动的子进程(当 bolt 是用别的语言编写的时候,会有一个子进程被启动)。

    Nimbus daemon 和 supervisor daemon 都是无状态的和快速失败(fail-fast)的。如果它们死了,正在运行中的拓扑不会受到影响。这和 Hadoop 的工作方式也形成了鲜明的对比。
  3. 错误检测和自动任务重分配:一个运行中的拓扑里面的任务会向 Nimbus 发送心跳信息以示它们在正常运行。Nimbus 监测这些心跳信号,如果它们超时了,它会重新分配任务。此外,集群里面的所有正在给失败任务发送消息的任务都很快就会重新连结到新任务。
  4. 高效的消息传递:在任务之间的消息传递并没有使用任何中间队列,取而代之,消息是在任务之间通过 ZeroMQ 直接传递的。这比使用中间队列更简单且高效得多。ZeroMQ 是一个聪明的“超级套接字(super-socket)”库,其使用多种技巧来最大化消息传递带宽。比如,它会检测网络是否繁忙,并自动的把消息分批送往目的地。

    另一个进程间消息传递的重要部分是高效的消息序列化和反序列化。又一次地,Storm 把这一切为你自动化了。默认情况下,你可以在元组中使用任何原语类型、字符串、或者二进制记录。如果你想使用另一种类型,你只需要实现一个简单的接口告诉 Storm 你想如何序列化它。然后,每当 Storm 遇到这个类型的时候,它会自动使用这个序列化器。
  5. 本地模式和分布式模式:Storm 有一个“本地模式”,它可以在一个进程里面模拟一整个 Storm 集群。这可以让你快速迭代开发你的拓扑且为你的拓扑写单元测试。你可以把同样的代码在本地模式里运行,就像他们在集群模式里面运行一样。
Storm 非常易于使用、配置以及运营。对于处理每秒几百条消息的个人开发者,到每秒处理几十万条消息的大公司,Storm 都同样有效。

和“复杂事件处理(Complex Event Processing, CEP)”的关系

Storm 和诸如 EsperStreambase、和 S4 等“复杂事件处理”系统处于同一领域。在这些系统中,最接近且可以比较是 S4。Storm 和 S4 之间的最大区别是 Storm 在故障的情况下也可以保证消息会被处理,而 S4 有时会丢失消息。

有些复杂事件处理系统有内置的数据存储层。有了 Storm,你会用像 Cassandra 或者 Riak 之类的外部的数据库和你的拓扑一起工作。一个数据存储系统不可能满足所有的应用程序,因为不同的应用程序有不同的数据模型和访问模式。Storm 是一个计算系统,而不是一个存储系统。然而,Storm 有一些强大的工具让你实现数据本地化,即使你使用了一个外部数据库。

总结

我们现在只探讨了 Storm 的九牛一毛。Storm 核心的“流”概念有着比我所展示的更广泛的使用方法——我们还没有探讨诸如多路流、隐含流、或直接分组。我展示了 Storm 的两个主要抽象,spout 和 bolt,但是我们还没有探讨 Storm 的第三个,可能也是最强大的一个抽象,“状态 spout”。我们也没有探讨如何用 Storm 实现分布式 RPC,没有探讨 Storm 的强大的自动部署功能,你可以按一个按钮就在 EC2 (译者注:Amazon 的云计算平台)上面创建一个 Storm 集群。

所有的这些东西,你都得等到 9 月 19 号。在那以前,我会添加一些 Storm 文档以便你能在它发布之后快速地把它跑起来。我们为 Storm 的发布感到非常兴奋,我也希望能在 Strange Loop 上,在它发布之时结识各位。

——Nanthan Marz (@nathanmarz)

原文发布日期:2011 年 8 月 4 日

Finagle:一个协议不可知的 RPC 系统

Finagle是一个协议不可知的,异步的,用于 JVM 的 RPC 系统,它使得在 Java、Scala 或任何基于 JVM 的语言重构建鲁棒的客户端和服务器非常容易。


Twitter.com 上面即使是渲染最简单的网页也需要十多个说着不同协议的网络服务的合作。比如,为了渲染首页,应用程序需要向社交网络图(Social Graph)服务、Memcached、数据库、以及许多其它网络服务发出请求。他们每个都使用不同的协议:Thrift、Memcached、MySQL等等。此外,这些服务之间还相互交谈——他们既是服务器又是客户端。比如,社交网络图服务就提供了一个 Thrift 接口,但是它也从一个 MySQL 集群里面获取信息。

在这样一个系统里面,服务中断最常见的原因就是这些部件之间在发生故障的时候糟糕的交互;常见的故障包括崩溃的主机和极高的时延差异。这些故障可以通过让工作队列任务堆积、TCP 连接搅动(churn)、耗光内存和文件描述符等方式在系统里面叠加起来。在最糟的情况下,用户就会看到失败鲸

构造一个稳定的分布式系统的挑战

复杂的网络服务器和客户端有很多活动部件:故障检测器、负载平衡器、失效备援策略(failover strategy)等等。这些部件之间需要达到一种精致的平衡,以便对大型产品系统里面的故障有足够的弹性。

故障检测器、负载平衡器等部件的不同协议的很多不同实现使得这个任务变得尤其困难。比如,Thrift 的背压(back pressure)策略就和 HTTP 的不同。在事故的时候确保在这种异构系统上的覆盖率非常具有挑战性。

我们的方法

我们设计了一个能够用于我们所有协议的基本网络服务器和客户端组件的单一实现Finagle 是一个协议不可知的、异步的、用于 Java 虚拟机的远程过程调用(RPC)系统,它可能让在 Java、Scala或任何基于 JVM 的语言上构建鲁棒的客户端和服务器变得很容易。Finagle 支持广泛的基于请求/答复的 RPC 协议和很多类型的流协议。

Finagle 提供了以下功能的鲁棒实现:

  • 连接池(connection pool):带有限流(throttling)支持以防止 TCP 连接搅动(churn);
  • 故障检测器(failure detector),用于识别太慢或者崩溃了的主机;
  • 失效备援策略(failover strategies),用于把流量从不健康的主机上引开;
  • 平衡负载器(load-balancer),包括“最少连接”和其它策略;以及
  • 背压(back-pressure)技术,用于保护服务器免受客户端滥用或者叠罗汉(或DoS攻击)。
此外,Finagle 还让构造和部署下列服务变得容易:
  • 发布标准统计信息、日志和异常报告;
  • 支持跨协议的分布式追踪(以 Dapper 形式);
  • 选择性地使用 ZooKeeper 用于集群管理;以及
  • 支持常见切分(sharding)策略。
我们相信我们的工作是卓有成效的——我们现在能够非常轻松、安全地编写和部署一个网络服务了。

Twitter 里的 Finagle

今天,Finagle 已经部署到了 Twitter 多个前端和后端的运行产品中,包括我们的 URL 爬虫(crawler)和 HTTP 代理。我们计划更广泛地部署 Finagle。

一个基于 Finagle 的体系结构 (开发中)

上图展示了一个全面使用 Finagle 的未来体系结构。比如,User Service 是一个使用 Finalge Memcached 客户端的 Finagle 服务器,并和 Finagle Kestrel Service 交谈。

Finagle 如何工作

Finagle 非常灵活且易于使用,因为它是构造在几个简单的、可组合的基本元素上:Future,Services,以及 Filters。

Future 对象

在 Finagle 中,Future 对象是对于所有异步计算的统一抽象。一个 Future 表示了一个尚未完成的计算,其可能成功也可能失败。使用 Future 两个最基本的方法是:
  • 阻塞并等待计算结束返回
  • 注册一个回调函数,在计算最终成功或失败时 Future 回调
如果任务需要在计算结束之后继续异步执行,你可以指定一个成功回调函数和一个失败回调函数。回调函数通过 onSuccess 和 onFailure 函数注册:

组合 Future

Future 可以以有趣的方式组合或者转换,从而做到一些常常在函数式程序设计里面看到的组合行为。比如,你可以通过 map 把一个 Future[String] 转换成 Future[Int]:

类似地,你还可以用 flatMap 把一系列 Future 串成一个流水线:


在这个例子里面,User.authenticate() 是异步执行的;Tweet.findAllByUser() 在最终结果上被调用。在 Scala 里面这可以用另一种方式表达,用 for 语句:

当用 flatMap 或者 for 语句串联 Future 的时候,处理错误和异常也非常简单。在上面的例子中,ifUser.authenticate() 异步地抛出了一个异常,接下来对于 Tweet.findAllByUser() 的调用永远也不会发生。取而代之,流水线的结果表达式仍然是 Future[Seq[Tweet]] 类型,但是它含有异常值而不是推文。你可以使用 onFailure 回调函数或者其他组合计数来处理异常。

和其它异步编程技术(比如 CPS:Continuation-Passing Style)相比,Future 有一个很好的性质,就是你可以更容易的编写出清楚且鲁棒的异步代码,即使是带有复杂的散布/收集(scatter/gather)操作:


Service 对象

Service 是一个函数,其接受一个请求,返回一个 Future 对象作为答复。注意客户端和服务器都是用 Service 对象表示的。

要创建一个 Service 对象,你需要继承抽象的 Service 类并监听一个端口。下面是一个简单的 HTTP 服务器,监听端口 10000:

建立一个 HTTP 客户端就更简单了:



Filter 对象

Filter 是一种把你的应用程序中不同的阶段的孤立出来组成一个流水线的有用的方式。比如,你可能需要在你的 Service 开始接受请求前处理异常、授权等问题。

一个 Filter 包裹了一个 Service,且潜在地,把 Service 的输入和输出类型转换成其它类型。换一句话说,Filter 是一个转换器。下面是一个用来确保一个 HTTP 请求有合法的 OAuth 证书的、且使用一个异步的认证服务的 filter。

下面是一个修饰了 Service 的Filter:

Finagle 是一个开源项目,使用 Apache License, Version 2.0。源代码和文档都可以在 GitHub 上找到。

鸣谢

Finagle 最早构思来自 Marius Eriksen 和 Nick Kallen。其它主要贡献人员有 Arya Asemanfar, David Helder, Evan Meagher, Gary McCue, Glen Sanford, Grant Monroe, Ian Ownbey, Jake Donham, James Waldrop, Jeremy Cloud, Johan Oskarsson, Justin Zhu, Raghavendra Prabhu, Robey Pointer, Ryan King, Sam Whitlock, Steve Jenson, Wanli Yang, Wilhelm Bierbaum, William Morgan, Abhi Khune, and Srini Rajagopal。

原文链接:http://engineering.twitter.com/2011/08/finagle-protocol-agnostic-rpc-system.html
原文发表日期:2011 年 8 月 19 日

2011年9月17日星期六

Twitter 的新搜索体验背后的工程开发

今天(2011年5月31日),Twitter 发布了能够帮助我们的用户找到最相关的推文、图片和视频的个人化搜索体验。要创造这个产品,我们的基础架构需要支持两项主要的功能:搜索结果的相关性过滤和对于相关图片以及视频的识别。两项功能都需要我们完全重写我们搜索架构,他们的核心就是 BlenderEarlybird

搜索上的投入

自从我们在2008年收购了 Summize 以来,Twitter 不断在搜索上面加重了投入。我们的搜索团队从 3 人增长到了 15 人,并且把实时搜索引擎的可扩展性提高了两个数量级——这一切对于搜索基础架构的替换,都是在对既有服务没有主要任何干扰的情况下做到的。

搜索的进化背后的工程开发故事非常激动人心。原来的 Summize 基础结构使用 Ruby on Rails 作为前端以及 MySQL 作为后端(Twitter 和很多别的 startup 公司都用了同样的架构)。同时,Lucene 以及其他开源搜索技术并不支持实时搜索。结果就是,我们在 MySQL 里面构造了我们的倒排索引,利用其并发事务和 B-tree 数据结构来支持我们的并发索引和搜索。通过把索引分拆到多个数据库中以及使用多个前端副本(replication),这个基于 MySQL 的解决方案走得意想不到地远。2008 年的时候,Twitter 搜索的流量大概是 20 TPS(每秒推文)以及 200 QPS(每秒搜索)。在 2010 年 10 月的时候,当我们把 MySQL 换成了 Earlybird,这个系统平均能处理 1,000 TPS 以及 12,000 QPS。

Earlybird 是一个基于 Lucene 的实时倒排索引,在实时搜索上,它不仅给了我们比MySQL高一个数量级的性能,而且倍增了我们的内存使用效率,并提供了增加相关性过滤的灵活性。然而,我们还是需要换掉 Ruby on Rails 前端,因为它只能对 Earlybird 进行同步调用,并且在多年的扩张和向 Earlybird 的转移中,欠了一大堆技术债。

2011 年 4 月,我们发布了一个替代品,叫做 Blender。他把我们的搜索时延减到了原来的三分之一,带宽提高了 10 倍,并让我们终于能够从搜索基础架构里面去掉 Ruby on Rails。今天(2011 年 5 月)我们每秒大概索引 2,200 条推文,并提供 18,000 QPS 的搜索服务 (每天 16 亿次搜索!)。更重要的是,Blender 填补我们基础架构里面的一个空白,我们正需要这个东西来作一项自收购 Summize 以来最重要的面向用户的改动。

从 Hack-week 到产品化

当我们的团队发布 Earlybird 的时候,我们为其潜力而兴奋——它速度惊人,并且代码非常干净,且便于扩展。我们的一位工程师,Michael Busch,在德国休假的时候,实现了一个图片和视频搜索的演示。一周后,在 Twitter 的第一次 Hack-week,搜索团队以及其它团队的一些成员完成了我们新的搜索体验的第一个演示。来自公司内部的反响非常积极,这个演示就成为了我们的产品规划的一部分。

寻找相关推文

Twitter 上有很多信息——平均而言,每秒钟有 2,200 条新推文产生!在重大事件发生的时候,比如日本海啸时,这个数字还会增加二到三倍。一般来说,我们只对最值得回忆的或是那些用户互动最多的推文感兴趣。在我们的新的搜索体验中,我们只把与某个特定的用户最相关的结果显示出来。所以,搜索结果被个人化了,我们也滤掉那些和其他用户不产生共鸣的推文。

为了支持这项相关性过滤和个人化功能,我们需要三类信号:
  • 静态信号,在索引时加入
  • 共鸣信号,随时间变化动态更新
  • 关于搜索者的信息,搜索时提供
把所有的这些信号都放到我们的索引里面去意味着我们要改动我们的摄取(ingestion)流水线、Earlybird(我们的倒排索引)和 Blender(我们的前端)。我们还创建了一个新的更新器(updater)组件,其不断地把共鸣信号推送到 Earlybird。在摄取流水线中,我们加入了可以把静态信息标记到推文上一个新的流水线阶段。静态信息包括用户相关信息、推文文字的语言等等。然后,推文被复制到 Earlybird 的索引中(实时地)。我们扩展了 Lucene 的内部数据结构用以支持对于任意注解(annotation)的动态更新。动态更新(比如用户和推文的互动)会从更新器源源不断的送来。所有这些放在一起,Earlybird 和更新器就能够支持高速且非均匀地索引更新,且不需要使用并发锁或者拖慢搜索。

在搜索的时候,一台 Blender 服务器会解析用户的搜索词并将其和该用户的社交网络图一起发至多台 Earlybird 服务器。这些服务器会使用专门的排名函数合并多个相关性信号和社交网络图,为每一条推文计算出一个个人化的分数。分数最高且最新的推文会被送回到 Blender,然后它把它们合并起来、重新排名,然后最终返回给用户。

带有相关性支持的 Twitter 搜索基础架构
去除重复结果

重复和接近重复(类似)的推文在 Twitter 的搜索结果里面一般都不怎么有用。在流行或者重大事件发生的时候,也正是搜索应该对我们的用户最有帮助的时候,基本相同的推文就变得很多。即使这些重复的推文质量本身都很高,搜索者还是能获益于一个更多样化的结果集。我们使用一种基于 MinHashing 的技术来去除重复结果。对于每一条推文,我们计算多个签名(signature),两条推文如果有相同的签名集合则会被认为是相同。麻烦在哪儿?和 Twitter 所有的东西一样,短小才是关键:我们只有很少的内存预算能够用于存储签名。我们的算法把每一条推文都压缩到了 4 个字节,但是仍然能够使用极少的计算需求找到绝大多数的重复推文。

个人化

只有你通过选择关注感兴趣的账户的方式来个人化你的账户的时候,Twitter 才是最强大的。为什么你的搜索结果不该是个人化的呢?他们现在就是了!我们的排名函数会查看社交网络图和使用搜索者和推文作者的关系来计算分数。虽然社交网络图非常打,我们把每个用户最有用的信息都压缩到了一个布隆过滤器(Bloom Filter)里面,它提供给我们了空间上非常高效、常数时间的集合成员判断操作。在 Earlybird 扫描候选搜索结果的时候,它会看推文的作者在搜索者的社交网络图里面存在与否,并用此作为排名函数中的一个信号。

那些没有关注任何账号的用户也能从个人化机制中获益;比如,我们现在自动检测搜索者的偏好语言和地理位置。

图像和视频搜索

图像和视频有着惊人的描述人物、地点、和正在发生的实况事件的能力。比如,@jkrums 的 US Airways 1549 航班在哈德逊河迫降的照片和 @stefmara 关于奋进号航天飞机的最后一次发射的照片。

搜索推文和搜索推文中的实体(比如图片和视频)有着本质的不同。在前者中,判断一条推文是否和一个搜索匹配只需看推文的文字本身,无需别的信息。此外,每条推文的相关性信息也能用于排名和比较匹配的推文以找到最好结果。在搜索图片和视频的时候,情况就不一样了。比如,同样的图片可能被推了很多次,每次还都用了不同的关键字来描述图片。比如下面两条推文:

  • "This is my Australian Shepherd: http://bit.ly/kQvYGp" (这是我的澳大利亚牧羊犬)
  • "What a cute dog! RT This is my Australian Shepherd: http://bit.ly/kQvYGp". (真可爱的一条狗!RT 这是我的澳大利亚牧羊犬)
对于该图片的一种描述方法是,通过在推文中使用一系列的关键词,这里是“dog",“Australia”,以及“sheperd”,它们都描述了图片。如果一幅图片反复地在推文里面被一个关键词描述,那么这个图片多半和这个关键词有关。

哪到底什么让图片搜索成为了一个麻烦的问题呢?Twitter 允许你搜索几秒钟前的推文,里面的图片和视频也应该能够实时可用!Earlybird 使用倒排索引来搜索。这额数据结构虽然非常高效,它们却不支持嵌入式(inline)更新,于是,这使得添加额外的关键字到已经索引好了的文档末尾几乎不可能。

如果即时性不重要的话,我们可以用 MapReduce 程序周期性地累积关键词集合并生成倒排索引。在这些离线索引中,每条指向图片或者视频的链接都会是一个文档,文档的文本就是聚集的关键字。然而,要满足我们的时延(latency)目标,我们需要没几秒钟就运行一次这些 MapReduce 程序,这个解决方案是不现实的。

取而代之,我们扩展了 Earlybird 的数据结构以支持高效的推文内实体(entities)查询。在搜索的时候,我们会寻找匹配推文的图片和视频,并把它们存在一个自定制的哈希表(hash map)里面。每次有同样的 URL 加到哈希表里面的时候,它对应的计数器会增加。这个累计结束之后,我们将哈希表排序,并返回最佳的图片和视频用于显示。

下一步是什么?

我们搜索团队为能够创造驱动发现和帮助用户的创新性的搜索产品而感到兴奋。虽然新的搜索体验相比以前纯粹的实时搜索已经是重大的改进,我们才刚刚起步。在未来的几个月里,我们会改进搜索质量,扩张我们的基础架构,扩大我们的索引,并把相关性搜索带到移动设备上去。

如果你是一个富有才干的工程师,并且想为世界上最大的实时搜索引擎而工作,Twitter 搜索质量搜索基础架构团队正在招聘!

鸣谢

下列人员对本次产品发布有所贡献:Abhi Khune, Abdur Chowdhury, Aneesh Sharma, Ashok Banerjee, Ben Cherry, Brian Larson, Coleen Baik, David Chen, Frost Li, Gilad Mishne, Isaac Hepworth, Jon Boulle, Josh Brewer, Krishna Gade, Michael Busch, Mike Hayes, Nate Agrin, Patrick Lok, Raghavendra Prabu, Sarah Brown, Sam Luckenbill, Stephen Fedele, Tian Wang, Yi Zhuang, Zhenghua Li。

——@twittersearch


原文链接:http://engineering.twitter.com/2011/05/engineering-behind-twitters-new-search.html
原文发布时间:2011 年 5 月 31 日

Twitter Engineering 中文版

此博客将用于存放我在业余时间翻译的Twitter工程博客(Twitter Engineering Blog, http://engineering.twitter.com/)条目,希望对大家了解Twitter内部的工程技术有所帮助。