WsztRush

Jstorm

实际上做过的大部分的系统很难算是分布式系统,处理的流程一般是:

  1. 将请求随机交给一台机器;
  2. 在这台机器上面进行处理,如果处理量比较大就开多线程;

在一台机器上面去开多线程处理感觉又回到了古老的单机作战的时代,扩展和性能都是有上限的,那么能不能将一个任务交给不同的机器来协同完成?

数据处理方面根据不同的场景大致可以分为下面两种场景:

  1. 离线分析:特点是查询单一、数据加工过程也单一,用Hadoop来解决;
  2. 在线分析:特点是查询随机,玩的就是索引,那么QPS必然不会很高,而且需要的都是好机器,可以用阿里云的ADS来解决;

看是天下太平,但真正用的时候还是会有问题:

像ADS这样的产品,可以在一份静态的数据上面方便地进行各种查询,但是新增、更新操作特别频繁的情况下,索引的重构可能会存在瓶颈。

实时性也是数据处理的一个目标,记得之前在压测的时候在Solr实时更新索引上死得很惨,所以需要考虑更加有效、低成本地解决方式。

基本概念

集群架构如下:

以及:

其中:

  1. nimbus:分发任务、任务、监控集群运行状态;
  2. supervisor:监听nimbus的指令,接受分发代码、任务并执行;
  3. worker:真正执行的进程;
  4. task:任务;
  5. executor:一个线程,用来轮询task中的接口;

在jstorm中运行的程序就是一个topology,说白了就是一个jar包,可以在nimbus节点使用命名把jar包提交给集群,那么这个jar包中的程序的结构如下:

其中:

  1. spout:源头;
  2. bolt:处理器;
  3. tuple:数据;

注:任务分发、启动的流程可以看这里

在分布式系统中数据都准确性是最难的,通常在检查消息消费失败的时候重试。而在Jstorm里面中就更为复杂,一个消息从spout发给bolt,而一个bolt可能发给多个bolt,这样就构成了一棵树形结构,在jstorm中用acker机制来检测消息是否失败:

显然这是一个随机算法,但是发生错误的概率非常小!但是,如果真的出错那就蛋疼了,做个开关或者接口让程序开发者来自己实现acker不好么?

在并发编程中经常要碰到去处理资源竞争,处理的方式基本上两种:

  1. 根据所需要的资源进行分组

用第2种方式,将不同的任务分发给不同的节点来处理,这样就可以减少很多的竞争、简化处理的代码、容易提高性能,在jstorm提供了丰富grouping方式:

  1. 随机分组
  2. 根据某个字段分组
  3. 广播
  4. 直接分组

说到消息分组,可能看起来是在emit的时候就直接发出去了,但事实上并不是这样:

  1. 创建tuple;
  2. worker将目标taskId+tuple放到待发送队列;
  3. 由一个单独的线程来负责将消息发送给对应的任务处理;

另外在jstorm中提供了一些更高级的抽象:

  1. CoordinatedBolt
  2. Transactional Topology
  3. DRPC
  4. Trident

为了做到处理且仅处理一次的目的,并且不能牺牲掉并发性,那么Transactional Topology的做法是将一个batch的计算分为两个阶段来完成:

  1. processing:多个batch并行处理;
  2. commit:一个一个地提交,batch之间保持强有序;

而Trident则是一个更方便、可靠的接口~

附jstorm中常用参数设置:

参数 含义
topology.message.timeout.secs 超时时间,如果超时则认为失败
topology.max.task.parallelism 最大并行度
topology.state.synchronization.timeout.secs 组件同步状态源的最大超时时间
topology.max.spout.pending 缓存spout发送的tuple数,超出会阻塞
topology.executor.receive.buffer.size executor线程的接收队列大小
topology.executor.send.buffer.size executor线程的发送队列大小
topology.receiver.buffer.size worker接收线程缓存消息的大小
topology.transfer.buffer.size worker进程中向外发送消息的缓存大小
storm.messaging.netty.max_wait_ms 最大等待时间
storm.messaging.netty.min_wait_ms 最小等待时间
topology.ackers ackaer任务数

DRPC

互联网中用户的操作通常是轻量级的,插一下数据库、更新一下缓存就搞定了,但是对于仓库管理这种系统很多操作比较重,有些任务可能需要遍历数据库中的表才能完成,而Distributed RPC可能是提高性能、降低风险的一个途径。

用Jstorm来搞DRPC的过程如下:

DRPCClient client = new DRPCClient("drpc-host", 3772);
String result = client.execute("reach", "http://twitter.com");

客户端给DRPC服务器发送要执行的方法名称以及参数,实现了这个函数的topology使用DRPCSpout从DRPC服务器接收函数调用流。每个函数调用被DRPC服务器标记了一个唯一的id。这个topology然后计算结果,在topology的最后一个叫做ReturnResults的bolt会将调用的结果发送给DRPC服务器。

之前想过可能用一些更简单的方法来试下:

  1. 在spout的节点提供一个rpc服务;
  2. 在调用这个服务的时候发送消息给bolt,然后在bolt中完成任务的分发及处理;
  3. 在spout中统计是否所有的消息都成功消费;
  4. 返回数据;

这样弄会有一些问题,但是感觉没有强依赖于jstorm,而仅仅是用它的消息分发机制就可以了。

数据流式处理

在流式处理的过程中,每次面对的都是单个的记录,而事实上统计都是有状态的,那么:

  1. 可以把状态保存在本地;
  2. 可以把状态保存在hbase、ldb等外部的存储;

保存在本地的内存中速度会很快,也没有序列化等开销,而且通常流处理中时效性要求是比价强的,过去很久的数据基本上就不会再次处理。所以:流玩的好不好,就看本地缓存用的好不好了。

总结

很多语言从一开始就在考虑如何简单地去实现并发编程:

  1. erlang
  2. golang

而我们这些作为老Java程序猿只能是自己去东拼西凑地搞出一个来简化分布式系统中的开发成本。现在考虑下来jstorm应该是个不错的选择:

  1. 运行一个固定结构的topology;
  2. 在topology上面可以动态地加载、编译我们要执行的脚本;
  3. 在spout、bolt中调用脚本中的方法;

这样也许我们可以更低的成本来享受分布式带来的好处~~