1. storm是什么?
- Storm是Twitter开源的一个分布式的实时计算系统
- 使用场景:数据的实时分析,持续计算,分布式RPC等等
2.storm 有哪些优点
- 分布式
- 可扩展
- 高可靠性
- 编程模型简单
- 高效实时
3. storm 常用网址
- 官网地址:http://storm-project.net/
- 源码地址:https://github.com/nathanmarz/storm
- 技术论坛:https://groups.google.com/forum/#!forum/storm-user
4. storm 中常用的类
- BaseRichSpout(消息生产者)(从外部文件或者消息队列里面读取数据,用来产生数据)
- BaseBasicBolt(消息处理者)(用来处理数据)
- TopologyBuilder(拓扑的构建器)(通过拓扑构建器 将spout和bolt组合起来)
- Values(将数据存放到values,发送到下一个组件)(将bolt产生的数据放在values里面,可以看作一个集合)
- Tuple(发送的数据被封装到Tuple,可以通tuple接收上个组件发送的消息)(可以看作一个集合,封装上一个组件发送的消息)
- Config(配置)
- StoemSubmitter/LocalCluster(拓扑提交器)
5.Storm 做wordcount计算
1 | // TopologyBuilder |
6. Storm 集群结构
Nimbus 和 supervisor
- 在Storm 的集群里面有两种节点:控制节点和工作节点。
- 控制节点上面运行一个叫Nimbus进程,Nimbus负责在集群里面分发代码,分配计算任务,并且监控状态。
- 每一个工作节点上面运行 SuperVisor进程。每个Supervisor负责监听从Nimbus分配给它执行的任务,据此启动或者停止执行任务的工作进程
- Nimbus和SuperVisor之间的所有协调工作都是通过Zookepper集群完成的
7. Storm 集群搭建
7.1 集群规划
| 主机名 | Storm角色 | Zookeeper |
|---|---|---|
| MASTER | Nimbus | 单节点Zookeeper |
| SLAVE-1 | Supervisor | |
| SLAVE-2 | Supervisor |
7.2 linux基本配置:
1 | 修改主机名 |
7.3 安装jdk
安装步骤:
1 | 1. 上传jdk安装包 |
7.4 搭建Zookeeper集群
1 | 1.解压Zookeeper |
7.5 . 安装Storm依赖
7.5.1 安装 zeromq
1 | 1. 解压zeromq |
7.5.2 编译安装
1 | 1. 解压文件 |
7.5.3 安装python
1 | 1. 解压 |
7.6 下载并解压Storm发布版本
1 | 1. 下载Storm发行版本 |
7.7 . 启动Storm各个后台进程
最后一步,启动Storm的所有后台进程。和Zookeeper一样,Storm也是快速失败(fail-fast)的系统,这样Storm才能在任意时刻被停止,并且当进程重启后被正确地恢复执行。这也是为什么Storm不在进程内保存状态的原因,即使Nimbus或Supervisors被重启,运行中的Topologies不会受到影响。
以下是启动Storm各个后台进程的方式:
- Nimbus: 在Storm主控节点上运行
"bin/storm nimbus >/dev/stormlog 2>&1 &"启动Nimbus后台程序,并放到后台执行将错误信息与标准信息都打印到文件中; - Supervisor: 在Storm各个工作节点上运行
"bin/storm supervisor >/dev/null 2>&1 &"启动Supervisor后台程序,并放到后台执行将错误信息与标准信息都打印到文件中; - UI: 在Storm主控节点上运行
"bin/storm ui >/dev/null 2>&1 &"启动UI后台程序,并放到后台执行,启动后可以通过http://{nimbus host}:8080观察集群的worker资源使用情况、Topologies的运行状态等信息将错误信息与标准信息都打印到文件中。
注意事项:
Storm后台进程被启动后,将在Storm安装部署目录下的logs/子目录下生成各个进程的日志文件。
经测试,Storm UI必须和Storm Nimbus部署在同一台机器上,否则UI无法正常工作,因为UI进程会检查本机是否存在Nimbus链接。
为了方便使用,可以将bin/storm加入到系统环境变量中。
至此,Storm集群已经部署、配置完毕,可以向集群提交拓扑运行了。
7.8 向集群提交任务
- 启动Storm Topology:
storm jar allmycode.jar org.me.MyTopology arg1 arg2 arg3
其中,allmycode.jar是包含Topology实现代码的jar包,org.me.MyTopology的main方法是Topology的入口,arg1、arg2和arg3为org.me.MyTopology执行时需要传入的参数。
- 停止Storm Topology:
storm kill {toponame}
其中,{toponame}为Topology提交到Storm集群时指定的Topology任务名称。
浏览器访问http://192.168.10.98:8080
- 代码示例
1 | // Topology |
启动一定数量的worker
worker中有具体的某个对象 spout bolt实例化之后的对象,我门可以指定spout和bolt的数量
7.9 关闭任务
1 | 关掉任务 |
8. Storm 内部运行关系
Nimbus 得到一个任务之后,首先将任务写到Zookeeper里面,supervisor 到Zookeeper里面领任务,SuperVisor领到任务(工作节点)之后会启动一定数量worker,worker 里面有task ++>spout/bolt的实例对象可以设置多个并行度,
9. 消息分发策略:Stream groupings
| 分发策略 | 说明 |
|---|---|
| Shuffer Grouping 随机分组 |
随机派发stream里面的tuple,保证每个bolt接收到的tuple数目相同 |
| Fields Grouping 按字段userid来分组 |
具有同样的userid的tuple会被分到相同的Bolts,而不同的userid则会分配到不同的Bolts. |
| All Grouping 广播发送 |
对于每一个tuple,所有的bolts都会收到。 |
| Global Grouping 全局分组 |
这个tuple被分配到storm中的一个bolt的其中的一个task。 再具体一点就是分配给id值最低的那个task. |
| non Grouping 不分组 |
这个分组的意思就是说stream不关心到底谁会收到tuple。 目前这种分组和shuffle grouping 是一样的效果, 有一点不同的是storm会把这个bolt放到这个bole的订阅者同一个线程里面去执行。 |
| direct Grouping 直接分组 |
这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接受者的 哪个task处理这个消息。 只有被生命为Direct Storm的消息流可以通过TopologyContext来获取处理它的消息的taskid(OutputCollector.emit方法也会返回taskid) |
| Local or shuffle Grouping | 如果目标bolt有一个或者多个task在同一工作进程中, tuple将会随机发送给这些tasks.否则,和普通的shuffle Grouping 行为一致。 |
10. Storm 中对象的生命周期
10.1 spout方法调用顺序
- declareOutputFields()(声明发送的字段。字段名称)
- Open()第一个执行
- Activate()将spout激活 如果没激活就不执行spout
- naxtTuple()(循环调用)
- Deactivate()当发送命令deactivate 时候执行
10.2 bolt方法调用顺序
- dectareOutputFields()(声明发送的字段。字段名称)
- Prepare()执行一次
- Execute()(循环执行)
11. 高可靠性
设置storm 支持的slots 数量
Vim conf
Supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
- 6704
- 6705
前面建议使用空格占位
重新平衡worker web界面 rebalance
在什么条件下,storm 才会认为一个从spout发送出来的消息被完全处理呢?
- Tuple tree不再生长
- 树中的任何消息被标识为“已处理”
使用strom提供的可靠处理特性:
- 无论何时在tuple tree中创建了一个新的节点,我们需要明确的通知storm
- 当处理完一个单独的消息时,我们需要告诉storm这颗tuple的变化状态
通过上面的两步,storm就可以检测到一个tuple tree何时被完全处理了,并且会调用相关的ack或fail方法、
锚定(anchoring) 把tuple传递进来 对tuple进行跟踪
怎么实现高可靠性
实现IRichSpout 接口
重写fail和ack方法
其他bolt对消息进行锚定
1 | if (count == 5) { |
1 | // Topology |