Kafka总结(一)kafka部署与架构介绍


1. Apache Kafka介绍

Kafka 是最初由Linkedin公司开发,是一个分布式、支持分区(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统。它的最大特性就是可以实时的处理大量数据,以满足各种需求场景:比如基于Hadoop的批处理系统、低延迟的实时系统、Storm/Spark Streaming流式引擎、web/Nginx日志、访问日志、消息服务等等,Kafka使用 Scala 语言编写,Linkedin于2010年将Kafka贡献给 Apache 基金会并称为顶级开源项目。

2.消息系统分类

一个消息系统负责将数据从一个应用传递到另外一个应用,应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消息。有两种主要的消息传递模式:

  1. 点对点传递模式
  2. 发布-订阅模式

大部分的消息系统选用发布-订阅模式。

2.1 点对点消息介绍

2.1.1 介绍

在点对点消息系统中,消息持久化到一个队列中。此时,将有一个或多个消费者消费队列中的数据。但是一条消息只能被消费一次。当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。该模式即使有多个消费者同时消费数据,也能保证数据处理的顺序。这种架构描述示意图如下:

点对点消息系统图示

2.1.2 特点

  1. 一般基于Pull(拉取) 或者Polling(轮询) 接收消息。

  2. 发送到队列的消息被一个且仅一个接收者接收,即使多个接收者在同一个队列中监听同一消息。

  3. 既支持异步 “即发即弃” 的消息传送方式,也支持同步请求/应答传送方式

    • 即发即弃:发送消息之后就返回不用关心数据是否被处理。
    • 同步请求/应答:在发送的消息被处理之后才会返回

2.2 发布-订阅消息系统

2.2.1 介绍

在发布-订阅消息系统中,消息被持久化到一个主题(topic)中。与点对点消息系统不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。该模式的示例图如下:
发布-订阅消息系统图示

2.2.2 特点

  1. 发布到一个主题的消息可以被多个订阅者所接收。
  2. 发布/订阅既可基于push消费数据,也可基于Pull或者Polling消费数据
  3. 解耦能力比点对点消息系统更强
  4. 支持多播

2.3 消息系统适用场景

场景 说明
系统解耦 各个系统之间通过消息系统这个统一的接口交换数据,无需了解彼此的存在
数据冗余 部分消息系统具有消息持久化能力,可规避消息处理前丢失的风险
系统扩展 消息系统是统一的数据接口,各系统可独立扩展
峰值处理 消息系统可顶住峰值流量,业务系统可根据处理能力从消息系统中获取并处理对应量的请求
可恢复性 系统中部分组件失效并不影响整个系统,它回复后扔可从消息系统中获取并处理数据
异步通信 在不需要立即处理请求的场景下,可以将请求放入消息系统,适合的时候再处理

2.4 常见消息系统对比

消息系统
RabbitMQ ActiveMQ RocketMQ Kafka
社区/公司 Mozilla Publish License Apache Ali Apache
授权方式 开源 开源 开源 开源
开发语言 Erlang Java Java Scala&Java
批量操作 不支持 支持 支持 支持
部署方式 单机/集群 单机/集群 单机/集群 单机/集群
HA Master/slave模式,master提供服务,slave仅备份 基于Zookeeper + LevelDB的Master Slave实现方式 支持多Master模式、多Master多Slave模式、异步复制模式、多Master多Slave同步双写模式 支持Relica机制,leader宕机后备份自动顶替,并重新选举leader(基于zk)
数据可靠性 仅保证数据不丢失,有slave用作备份 master/slave 支持异步实时刷盘,同步刷盘,同步复制,异步复制 数据可靠,并且有replica机制,有容错容灾能力
有序性 只有使用一个client才能保证有序 有序 有序 多Client保证有序
管理界面 较好 一般 命令行 官方只提供命令行,Yahoo开源了自己的Kafka管理界面,Kafka Manager
负载均衡 支持 支持 支持 支持

3. kafka 部署

3.1 单机部署,使用kafka自带zookeeper

  1. 启动zookeeper
[root@node1 kafka_2.12-1.0.2]# cd /opt/kafka_2.12-1.0.2
# 启动zookeeper
[root@node1 kafka_2.12-1.0.2]# bin/zookeeper-server-start.sh config/zookeeper.properties # 前台启动
[2020-02-09 00:24:25,127] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
[2020-02-09 00:24:25,178] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2020-02-09 00:24:25,187] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

  1. 检测zookeeper是否启动成功
# 在新的命令行窗口执行
[root@node1 ~]# lsof -i:2181
COMMAND  PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    5108 root   90u  IPv6  74947      0t0  TCP *:eforward (LISTEN)
  1. 启动kafka broker
# 在新的命令行窗口执行
# bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]* 设置-daemon 则会后台运行 否则前台运行
[root@node1 kafka_2.12-1.0.2]# bin/kafka-server-start.sh config/server.properties
[2020-02-09 00:30:39,912] INFO KafkaConfig values:
    advertised.host.name = null
    advertised.listeners = null
    advertised.port = null
    alter.config.policy.class.name = null
    authorizer.class.name =
    auto.create.topics.enable = true
    auto.leader.rebalance.enable = true
    background.threads = 10
    ......
[2020-02-09 00:30:42,259] INFO Kafka version : 1.0.2 (org.apache.kafka.common.utils.AppInfoParser)
[2020-02-09 00:30:42,259] INFO Kafka commitId : 2a121f7b1d402825 (org.apache.kafka.common.utils.AppInfoParser)
[2020-02-09 00:30:42,262] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
  1. 检查kafka broker 是否启动成功
# 在新的命令行窗口执行
[root@node1 ~]# lsof -i:2181
COMMAND  PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    5108 root   90u  IPv6  74947      0t0  TCP *:eforward (LISTEN)
java    5108 root   91u  IPv6  75799      0t0  TCP localhost:eforward->localhost:58961 (ESTABLISHED)
java    5392 root   89u  IPv6  75798      0t0  TCP localhost:58961->localhost:eforward (ESTABLISHED)
[root@node1 ~]#
[root@node1 ~]# lsof -i:9092
COMMAND  PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    5392 root  104u  IPv6  75813      0t0  TCP *:XmlIpcRegSvc (LISTEN)
java    5392 root  111u  IPv6  75818      0t0  TCP node1:40656->node1:XmlIpcRegSvc (ESTABLISHED)
java    5392 root  112u  IPv6  75819      0t0  TCP node1:XmlIpcRegSvc->node1:40656 (ESTABLISHED)

这时kafka已经部署完成,接下来检测kafka是否能够正常使用

  1. 创建topic
# 在新的命令行窗口执行
# 测试创建topic 
[root@node1 kafka_2.12-1.0.2]# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test1 --partitions 3 --replication-factor 1
Created topic "test1".

# 查看topic 列表

# 检查topic 状态
[root@node1 kafka_2.12-1.0.2]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test1
Topic:test1    PartitionCount:3    ReplicationFactor:1    Configs:
    Topic: test1    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
    Topic: test1    Partition: 1    Leader: 0    Replicas: 0    Isr: 0
    Topic: test1    Partition: 2    Leader: 0    Replicas: 0    Isr: 0

# 这里的leader是指 broker的id
  1. 启动producer 并发送数据
# 在新的命令行窗口执行
# 启动producer发送消息
[root@node1 kafka_2.12-1.0.2]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1
>1
>2
>3
>4
>5
  1. 启动consumer 并消费数据
# 在新的命令行窗口执行
# 启动consumer消费消息
[root@node1 kafka_2.12-1.0.2]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1
1
2
3
4
5

3.2 部署kafka集群,使用外部zookeeper

  1. 修改配置文件
# 需要修改的配置项有四个 broker.id、listeners、log.dirs、zookeeper.connect
# 每个borker的id是唯一的,多个broker要设置不同的id

vim config/server.properties

broker.id=0

# listeners一定要配置成为IP地址;如果配置为localhost或服务器的hostname,
# 在使用java发送数据时就会抛出异 常:org.apache.kafka.common.errors.TimeoutException: Batch Expired 。
# 因为在没有配置advertised.host.name 的情况下,Kafka并没有像官方文档宣称的那样改为广播我们配置的host.name,
# 而是广播了主机配置的hostname。远端的客户端并没有配置 hosts,所以自然是连接不上这个hostname的
listeners=PLAINTEXT://172.16.72.150:9092

# 存储数据路径,默认是在/tmp目录下,需要修改 首先需要创建对应文件夹
log.dirs=/opt/kafka_2.12-1.0.2/kafka-logs

# zookeeper地址
zookeeper.connect=node1:2181,node2:2181,node3:2181
  1. 将kafka 安装文件拷贝到其他节点并修改broker.id 与listeners 中的IP地址
scp -r kafka_2.12-1.0.2/ root@node2:/opt
scp -r kafka_2.12-1.0.2/ root@node3:/opt

#在node2中修改 server.properties 配置文件

broker.id=1
listeners=PLAINTEXT://172.16.72.151:9092

#在node3中修改

broker.id=2
listeners=PLAINTEXT://172.16.72.152:9092
  1. 启动kafka 服务
#分别在每台服务器上执行启动命令
#前台运行
bin/kafka-server-start.sh config/server.properties

#后台运行
bin/kafka-server-start.sh -daemon config/server.properties

#日志运行 KAFKA_HOME/logs
-rw-r--r--. 1 root root  172 2月  10 15:47 log-cleaner.log #Kafka日志清理操作相关统计信息
-rw-r--r--. 1 root root  17K 2月  10 15:47 kafkaServer.out #KafkaServer运行日志
-rw-r--r--. 1 root root  17K 2月  10 15:47 server.log #KafkaServer运行日志
-rw-r--r--. 1 root root 4.5K 2月  10 15:47 kafkaServer-gc.log.0.current # Kafka运行过程,进行GC操作时的日志
-rw-r--r--. 1 root root  180 2月  10 15:47 state-change.log #Kafka分区角色切换等状态转换日志
-rw-r--r--. 1 root root 4.3K 2月  10 15:47 controller.log  #KafkaController运行时日志

-rw-r--r--. 1 root root      0 2月   9 21:27 kafka-authorizer.log #Kafka权限认证相应操作日志
-rw-r--r--. 1 root root      0 2月   9 21:27 kafka-request.log #Kafka相应网络请求日志
-rw-r--r--. 1 root root   1976 2月   9 21:27 zookeeper-gc.log.0.current # kafka 自带的Zookeeper GC 日志


#然后查看端口使用情况

# node1
[root@node1 ~]# lsof -i:2181
COMMAND  PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    7728 root   46u  IPv6  80681      0t0  TCP *:eforward (LISTEN)
java    7728 root   59u  IPv6  84114      0t0  TCP node1:eforward->node1:43554 (ESTABLISHED)
java    8177 root   89u  IPv6  84113      0t0  TCP node1:43554->node1:eforward (ESTABLISHED)
[root@node1 ~]# lsof -i:9092
COMMAND  PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    8177 root  104u  IPv6  84127      0t0  TCP node1:XmlIpcRegSvc (LISTEN)
java    8177 root  108u  IPv6  84133      0t0  TCP node1:40682->node1:XmlIpcRegSvc (ESTABLISHED)
java    8177 root  112u  IPv6  84134      0t0  TCP node1:XmlIpcRegSvc->node1:40682 (ESTABLISHED)
java    8177 root  116u  IPv6  84169      0t0  TCP node1:52665->node2:XmlIpcRegSvc (ESTABLISHED)
java    8177 root  120u  IPv6  84171      0t0  TCP node1:46187->node3:XmlIpcRegSvc (ESTABLISHED)

#node2
[root@node2 ~]# lsof -i:2181
COMMAND   PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    25279 root   46u  IPv6  50238      0t0  TCP *:eforward (LISTEN)
java    48812 root   89u  IPv6 164762      0t0  TCP node2:34171->node3:eforward (ESTABLISHED)
[root@node2 ~]# lsof -i:9092
COMMAND   PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    48812 root  104u  IPv6 164773      0t0  TCP node2:XmlIpcRegSvc (LISTEN)
java    48812 root  108u  IPv6 164777      0t0  TCP node2:XmlIpcRegSvc->node1:52665 (ESTABLISHED)

#node3
[root@node3 ~]# lsof -i:2181
COMMAND   PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    24879 root   46u  IPv6  49041      0t0  TCP *:eforward (LISTEN)
java    24879 root   60u  IPv6 160074      0t0  TCP node3:eforward->node3:56261 (ESTABLISHED)
java    24879 root   61u  IPv6 159881      0t0  TCP node3:eforward->node2:34171 (ESTABLISHED)
java    47833 root   89u  IPv6 160073      0t0  TCP node3:56261->node3:eforward (ESTABLISHED)
[root@node3 ~]# lsof -i:9092
COMMAND   PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    47833 root  104u  IPv6 160116      0t0  TCP node3:XmlIpcRegSvc (LISTEN)
java    47833 root  109u  IPv6 160120      0t0  TCP node3:XmlIpcRegSvc->node1:46187 (ESTABLISHED)
  1. 创建topic
[root@node1 kafka_2.12-1.0.2]# bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --create --topic test1 --partitions 3 --replication-factor 1
Created topic "test1".
[root@node2 kafka_2.12-1.0.2]#
#查看所有topic
[root@node1 kafka_2.12-1.0.2]# bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181  --list
test1
# 查看topic 状态
[root@node1 kafka_2.12-1.0.2]# bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --describe --topic test1
Topic:test1    PartitionCount:3    ReplicationFactor:1    Configs:
    Topic: test1    Partition: 0    Leader: 2    Replicas: 2    Isr: 2
    Topic: test1    Partition: 1    Leader: 0    Replicas: 0    Isr: 0
    Topic: test1    Partition: 2    Leader: 1    Replicas: 1    Isr: 1
  1. 启动消费者
#待生产者发送数据之后收到以下消息
[root@node2 kafka_2.12-1.0.2]# bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic test1
hello kafka
1
4
2
3
5
  1. 启动生产者并发送数据
[root@node1 kafka_2.12-1.0.2]# bin/kafka-console-producer.sh --broker-list node1:9092 --topic test1
>hello kafka
>1
>2
>3
>4
>5

4.kafka 架构介绍

Kafka发布订阅的对象是Topic。我们可以为每一类数据创建一个Topic,把向Topic发送消息的客户端称作为producer,把从Topic订阅消息的客户端称作是consumer。Producer和Consumer可以同时从多个Topic读写数据。一个Kafka集群由一个或多个broker 服务器组成,它负责持久化和备份具体的Kafka消息。Kafka架构如下图

Kafka架构图

  • Zookeeper:可以是Zookeeper 单节点或者集群,也可以是Kafka自带的Zookeeper,做集群主要是为了Zookeeper的HA。
  • Kafka Server: 也叫Broker,需要先启动Zookeeper,然后在启动Kafka Server,这两个都启动之后,kafka服务就可用了。
  • Producer :也称为生产者或发布者,从外部拿数据,然后将数据发送到kafka topic当中。
  • Consumer:也称为消费者或订阅者,从kafka topic 中消费数据。

消息系统通常都会由生产者、消费者和broker三大部分组成,生产者将消息写入broker,消费者从broker中读取数消息数据,具体步骤如下:

  • 生产者客户端应用程序产生消息:

1.客户端连接对象将消息包装到请求发送到服务端

2.服务端的入口也有一个连接对象负责接收请求,并将消息以文件的形式存储起来。

3.服务端返回相应结果给生产者客户端。

  • 消费者客户端应用消费数据:

1.客户端连接对象将消费信息包装到请求发送给服务端。

2.服务端从文件存储系统中取出消息数据。

3.服务端返回相应结果给消费者客户端。

4.客户端将响应结果还原成消息并开始处理消息数据。

Kafka中的Producer和Consumer采用的是push-and-pull模式,即Producer只管向Broker push消息,Consumer只管从Broker pull消息,两者对消息的生产和消费是异步的。

  • kafka设计目标
    • 高吞吐率:在廉价的商用机器上单机可支持每秒100w条消息的读写
    • 消息持久化:所有消息均被持久化到磁盘(不会造成性能低下),无消息丢失(单节点磁盘问题支持多节点间的消息复制),支持消息重放
    • 完全分布式:Producer,broker,consumer均支持水平扩展
    • 同时满足适应在线流处理和离线批处理

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
Kafka总结(二)常见组件上 Broker 、Producer和 Consumer 介绍 Kafka总结(二)常见组件上 Broker 、Producer和 Consumer 介绍
1. Broker1.1 Broker 介绍 Broker 没有副本机制,一旦 Broker 宕机,该 Broker 的消息都将不可用。 Broker 不会保存 Consumer 消费 topic partition offset 的状态
2020-02-09
下一篇 
Zookeeper 命令介绍 Zookeeper 命令介绍
1. 创建节点 create 功能:创建普通节点,如果创建节点中的内容中有空格需要用双引号包起来。 命令格式:create [-s] [-e] [-c] [-t ttl] path [data] [acl] -s:创建带编号的节点 -e
2020-02-05
  目录