常见的实时计算方面有两种方案:
用阿里云的ADS做OLAP还是很不错的,但是消耗的资源比较大,而流计算的方案则相对比较廉价。因为工作中用JSTORM做实时计算比较多,而且去年JSTORM也正式加入了APACHE(说明其实力不一般啊),所以最近打算总结一下。
消息从源头(spout)流出来以后,流入处理节点(bolt),在处理节点完成之后有可能产生新的消息并流到后面的处理节点,如下图:
+-------+ +------+ +------+ message--->| spout |--->| bolt |--->| bolt |---> +-------+ +------+ +------+
在spout节点需要实现的方法(以IRichSpout为例)如下:
public class MySpout implements IRichSpout{ public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector){ // 初始化 } public void nextTuple(){ // 发送消息 } public void ack(Object messageId){ // 消息消费成功 } public void fail(Object messageId){ // 消息消费失败 } }
相应的bolt节点需要实现的方法(以IRichBolt为例)就要少一些:
public class MyBolt implements IRichBolt { public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { // 初始化 } public void execute(Tuple tuple) { // 处理消息 } }
在框架中通过调用指定的main方法来创建任务(Topology),下面来看个在本地运行的例子:
public static void main(String[] args) throws Exception { // 开始构建任务结构 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("my_spout", new MySpout(), 10); // 设置源头节点及其并发度 builder.setBolt("my_bolt", new MyBolt(), 10).shuffleGrouping("my_spout");// 设置处理节点及其并发度 // 伪集群方式运行 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("my_topology", config, builder.createTopology()); // 执行一段时间后停止 Thread.sleep(6000000); cluster.shutdown(); }
到这里就明白最简单的JSTORM的任务的在编程时的结构了,下面可以来看其整体上的架构。