1. Apache Kafka介绍 Kafka 是最初由Linkedin公司开发,是一个分布式、支持分区(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统。它的最大特性就是可以实时的处理大量数据,以满足各种需求场景:比如基于Hadoop的批处理系统、低延迟的实时系统、Storm/Spark Streaming流式引擎、web/Nginx日志、访问日志、消息服务等等,Kafka使用 Scala 语言编写,Linkedin于2010年将Kafka贡献给 Apache 基金会并称为顶级开源项目。
2.消息系统分类 一个消息系统负责将数据从一个应用传递到另外一个应用,应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消息。有两种主要的消息传递模式:
点对点传递模式
发布-订阅模式
大部分的消息系统选用发布-订阅模式。
2.1 点对点消息介绍 2.1.1 介绍 在点对点消息系统中,消息持久化到一个队列中。此时,将有一个或多个消费者消费队列中的数据。但是一条消息只能被消费一次。当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。该模式即使有多个消费者同时消费数据,也能保证数据处理的顺序。这种架构描述示意图如下:
2.1.2 特点
一般基于Pull(拉取) 或者Polling(轮询) 接收消息。
发送到队列的消息被一个且仅一个接收者接收,即使多个接收者在同一个队列中监听同一消息。
既支持异步 “即发即弃” 的消息传送方式,也支持同步请求/应答传送方式
即发即弃:发送消息之后就返回不用关心数据是否被处理。
同步请求/应答:在发送的消息被处理之后才会返回
2.2 发布-订阅消息系统 2.2.1 介绍 在发布-订阅消息系统中,消息被持久化到一个主题(topic)中。与点对点消息系统不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。该模式的示例图如下:
2.2.2 特点
发布到一个主题的消息可以被多个订阅者所接收。
发布/订阅既可基于push消费数据,也可基于Pull或者Polling消费数据
解耦能力比点对点消息系统更强
支持多播
2.3 消息系统适用场景
场景
说明
系统解耦
各个系统之间通过消息系统这个统一的接口交换数据,无需了解彼此的存在
数据冗余
部分消息系统具有消息持久化能力,可规避消息处理前丢失的风险
系统扩展
消息系统是统一的数据接口,各系统可独立扩展
峰值处理
消息系统可顶住峰值流量,业务系统可根据处理能力从消息系统中获取并处理对应量的请求
可恢复性
系统中部分组件失效并不影响整个系统,它回复后扔可从消息系统中获取并处理数据
异步通信
在不需要立即处理请求的场景下,可以将请求放入消息系统,适合的时候再处理
2.4 常见消息系统对比
消息系统
RabbitMQ
ActiveMQ
RocketMQ
Kafka
社区/公司
Mozilla Publish License
Apache
Ali
Apache
授权方式
开源
开源
开源
开源
开发语言
Erlang
Java
Java
Scala&Java
批量操作
不支持
支持
支持
支持
部署方式
单机/集群
单机/集群
单机/集群
单机/集群
HA
Master/slave模式,master提供服务,slave仅备份
基于Zookeeper + LevelDB的Master Slave实现方式
支持多Master模式、多Master多Slave模式、异步复制模式、多Master多Slave同步双写模式
支持Relica机制,leader宕机后备份自动顶替,并重新选举leader(基于zk)
数据可靠性
仅保证数据不丢失,有slave用作备份
master/slave
支持异步实时刷盘,同步刷盘,同步复制,异步复制
数据可靠,并且有replica机制,有容错容灾能力
有序性
只有使用一个client才能保证有序
有序
有序
多Client保证有序
管理界面
较好
一般
命令行
官方只提供命令行,Yahoo开源了自己的Kafka管理界面,Kafka Manager
负载均衡
支持
支持
支持
支持
3. kafka 部署 3.1 单机部署,使用kafka自带zookeeper
启动zookeeper
1 2 3 4 5 6 7 8 9 [root@node1 kafka_2.12-1.0.2]# cd /opt/kafka_2.12-1.0.2 [root@node1 kafka_2.12-1.0.2]# bin/zookeeper-server-start.sh config/zookeeper.properties [2020-02-09 00:24:25,127] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ... [2020-02-09 00:24:25,178] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer) [2020-02-09 00:24:25,187] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
检测zookeeper是否启动成功
1 2 3 4 [root@node1 ~]# lsof -i:2181 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 5108 root 90u IPv6 74947 0t0 TCP *:eforward (LISTEN)
启动kafka broker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 [root@node1 kafka_2.12-1.0.2]# bin/kafka-server-start.sh config/server.properties [2020-02-09 00:30:39,912] INFO KafkaConfig values: advertised.host.name = null advertised.listeners = null advertised.port = null alter.config.policy.class.name = null authorizer.class.name = auto.create.topics.enable = true auto.leader.rebalance.enable = true background.threads = 10 ...... [2020-02-09 00:30:42,259] INFO Kafka version : 1.0.2 (org.apache.kafka.common.utils.AppInfoParser) [2020-02-09 00:30:42,259] INFO Kafka commitId : 2a121f7b1d402825 (org.apache.kafka.common.utils.AppInfoParser) [2020-02-09 00:30:42,262] INFO [KafkaServer id =0] started (kafka.server.KafkaServer)
检查kafka broker 是否启动成功
1 2 3 4 5 6 7 8 9 10 11 12 13 14 [root@node1 ~]# lsof -i:2181 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 5108 root 90u IPv6 74947 0t0 TCP *:eforward (LISTEN) java 5108 root 91u IPv6 75799 0t0 TCP localhost:eforward->localhost:58961 (ESTABLISHED) java 5392 root 89u IPv6 75798 0t0 TCP localhost:58961->localhost:eforward (ESTABLISHED) [root@node1 ~]# [root@node1 ~]# lsof -i:9092 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 5392 root 104u IPv6 75813 0t0 TCP *:XmlIpcRegSvc (LISTEN) java 5392 root 111u IPv6 75818 0t0 TCP node1:40656->node1:XmlIpcRegSvc (ESTABLISHED) java 5392 root 112u IPv6 75819 0t0 TCP node1:XmlIpcRegSvc->node1:40656 (ESTABLISHED)
这时kafka已经部署完成,接下来检测kafka是否能够正常使用
创建topic
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 [root@node1 kafka_2.12-1.0.2]# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test1 --partitions 3 --replication-factor 1 Created topic "test1" . [root@node1 kafka_2.12-1.0.2]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test1 Topic:test1 PartitionCount:3 ReplicationFactor:1 Configs: Topic: test1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test1 Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: test1 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
启动producer 并发送数据
1 2 3 4 5 6 7 8 9 [root@node1 kafka_2.12-1.0.2]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1 >1 >2 >3 >4 >5
启动consumer 并消费数据
1 2 3 4 5 6 7 8 [root@node1 kafka_2.12-1.0.2]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 1 2 3 4 5
3.2 部署kafka集群,使用外部zookeeper
修改配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 vim config/server.properties broker.id=0 listeners=PLAINTEXT://172.16.72.150:9092 log.dirs=/opt/kafka_2.12-1.0.2/kafka-logs zookeeper.connect=node1:2181,node2:2181,node3:2181
将kafka 安装文件拷贝到其他节点并修改broker.id 与listeners 中的IP地址
1 2 3 4 5 6 7 8 9 10 11 12 13 scp -r kafka_2.12-1.0.2/ root@node2:/opt scp -r kafka_2.12-1.0.2/ root@node3:/opt broker.id=1 listeners=PLAINTEXT://172.16.72.151:9092 broker.id=2 listeners=PLAINTEXT://172.16.72.152:9092
启动kafka 服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 bin/kafka-server-start.sh config/server.properties bin/kafka-server-start.sh -daemon config/server.properties -rw-r--r--. 1 root root 172 2月 10 15:47 log-cleaner.log -rw-r--r--. 1 root root 17K 2月 10 15:47 kafkaServer.out -rw-r--r--. 1 root root 17K 2月 10 15:47 server.log -rw-r--r--. 1 root root 4.5K 2月 10 15:47 kafkaServer-gc.log.0.current -rw-r--r--. 1 root root 180 2月 10 15:47 state-change.log -rw-r--r--. 1 root root 4.3K 2月 10 15:47 controller.log -rw-r--r--. 1 root root 0 2月 9 21:27 kafka-authorizer.log -rw-r--r--. 1 root root 0 2月 9 21:27 kafka-request.log -rw-r--r--. 1 root root 1976 2月 9 21:27 zookeeper-gc.log.0.current [root@node1 ~]# lsof -i:2181 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 7728 root 46u IPv6 80681 0t0 TCP *:eforward (LISTEN) java 7728 root 59u IPv6 84114 0t0 TCP node1:eforward->node1:43554 (ESTABLISHED) java 8177 root 89u IPv6 84113 0t0 TCP node1:43554->node1:eforward (ESTABLISHED) [root@node1 ~]# lsof -i:9092 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 8177 root 104u IPv6 84127 0t0 TCP node1:XmlIpcRegSvc (LISTEN) java 8177 root 108u IPv6 84133 0t0 TCP node1:40682->node1:XmlIpcRegSvc (ESTABLISHED) java 8177 root 112u IPv6 84134 0t0 TCP node1:XmlIpcRegSvc->node1:40682 (ESTABLISHED) java 8177 root 116u IPv6 84169 0t0 TCP node1:52665->node2:XmlIpcRegSvc (ESTABLISHED) java 8177 root 120u IPv6 84171 0t0 TCP node1:46187->node3:XmlIpcRegSvc (ESTABLISHED) [root@node2 ~]# lsof -i:2181 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 25279 root 46u IPv6 50238 0t0 TCP *:eforward (LISTEN) java 48812 root 89u IPv6 164762 0t0 TCP node2:34171->node3:eforward (ESTABLISHED) [root@node2 ~]# lsof -i:9092 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 48812 root 104u IPv6 164773 0t0 TCP node2:XmlIpcRegSvc (LISTEN) java 48812 root 108u IPv6 164777 0t0 TCP node2:XmlIpcRegSvc->node1:52665 (ESTABLISHED) [root@node3 ~]# lsof -i:2181 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 24879 root 46u IPv6 49041 0t0 TCP *:eforward (LISTEN) java 24879 root 60u IPv6 160074 0t0 TCP node3:eforward->node3:56261 (ESTABLISHED) java 24879 root 61u IPv6 159881 0t0 TCP node3:eforward->node2:34171 (ESTABLISHED) java 47833 root 89u IPv6 160073 0t0 TCP node3:56261->node3:eforward (ESTABLISHED) [root@node3 ~]# lsof -i:9092 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 47833 root 104u IPv6 160116 0t0 TCP node3:XmlIpcRegSvc (LISTEN) java 47833 root 109u IPv6 160120 0t0 TCP node3:XmlIpcRegSvc->node1:46187 (ESTABLISHED)
创建topic
1 2 3 4 5 6 7 8 9 10 11 12 [root@node1 kafka_2.12-1.0.2]# bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --create --topic test1 --partitions 3 --replication-factor 1 Created topic "test1" . [root@node2 kafka_2.12-1.0.2]# [root@node1 kafka_2.12-1.0.2]# bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --list test1 [root@node1 kafka_2.12-1.0.2]# bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --describe --topic test1 Topic:test1 PartitionCount:3 ReplicationFactor:1 Configs: Topic: test1 Partition: 0 Leader: 2 Replicas: 2 Isr: 2 Topic: test1 Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: test1 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
启动消费者
1 2 3 4 5 6 7 8 [root@node2 kafka_2.12-1.0.2]# bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic test1 hello kafka 1 4 2 3 5
启动生产者并发送数据
1 2 3 4 5 6 7 [root@node1 kafka_2.12-1.0.2]# bin/kafka-console-producer.sh --broker-list node1:9092 --topic test1 >hello kafka >1 >2 >3 >4 >5
4.kafka 架构介绍 Kafka发布订阅的对象是Topic。我们可以为每一类数据创建一个Topic,把向Topic发送消息的客户端称作为producer,把从Topic订阅消息的客户端称作是consumer。Producer和Consumer可以同时从多个Topic读写数据。一个Kafka集群由一个或多个broker 服务器组成,它负责持久化和备份具体的Kafka消息。Kafka架构如下图
Zookeeper:可以是Zookeeper 单节点或者集群,也可以是Kafka自带的Zookeeper,做集群主要是为了Zookeeper的HA。
Kafka Server: 也叫Broker,需要先启动Zookeeper,然后在启动Kafka Server,这两个都启动之后,kafka服务就可用了。
Producer :也称为生产者或发布者,从外部拿数据,然后将数据发送到kafka topic当中。
Consumer:也称为消费者或订阅者,从kafka topic 中消费数据。
消息系统通常都会由生产者、消费者和broker三大部分组成,生产者将消息写入broker,消费者从broker中读取数消息数据,具体步骤如下:
1.客户端连接对象将消息包装到请求发送到服务端
2.服务端的入口也有一个连接对象负责接收请求,并将消息以文件的形式存储起来。
3.服务端返回相应结果给生产者客户端。
1.客户端连接对象将消费信息包装到请求发送给服务端。
2.服务端从文件存储系统中取出消息数据。
3.服务端返回相应结果给消费者客户端。
4.客户端将响应结果还原成消息并开始处理消息数据。
Kafka中的Producer和Consumer采用的是push-and-pull模式,即Producer只管向Broker push消息,Consumer只管从Broker pull消息,两者对消息的生产和消费是异步的。
kafka设计目标
高吞吐率:在廉价的商用机器上单机可支持每秒100w条消息的读写
消息持久化:所有消息均被持久化到磁盘(不会造成性能低下),无消息丢失(单节点磁盘问题支持多节点间的消息复制),支持消息重放
完全分布式:Producer,broker,consumer均支持水平扩展
同时满足适应在线流处理和离线批处理