0%

Kafka学习记录及简单上手代码

kafka是一个开源的消息系统,由Scala语言写成,由apache基金会开发

kafka

为什么需要消息队列?

解耦:允许独立的拓展或修该双方逻辑交互过程程序,只要保证保证双方的遵守同样的接口约束
冗余:保证数据多个副本不至于数据丢失以及数据重复
拓展性:可添加相应的额外处理过程
灵活性&峰值处理能力:突发流量访问激增,仍然能够保证程序稳定运行
可恢复性:消息队列降低了进程间的耦合性,一个消息队列处理进程挂掉,加入队列的消息任然可以在系统恢复后被处理
顺序保证:到部分消息队列都要保证处理消息的顺序性(kafka保证一个partition内的消息的有序性)
缓冲:加快数据处理速度
异步通信:减少程序等待,提高程序处理速度

kafka定义

kafka是一个开源的消息系统,由Scala语言写成,由apache基金会开发
kafka是一个分布式消息队列,kafka对消息保存时根据Topic进行归类,发送消息者称为producer,消息接收者为consumer,kafka集群中存在多个kafka实例,每个实例(server)称为broker
kafka集群依赖zookeeper里面的meta信息

kafka的架构流程

流程中的角色
producer消息生产者,指消息kafka broker发送数据的客户端
consumer消息消费者,向kafka broker 取数据的客户端
topic 主题,可理解为一个队列
cocnsumer group 消费者组,多个消费者共同消费一个topic消息
broker 就是一台kafka服务器,一个集群中有多个broker,一个broker有多个topic
partition 一个topic可以分为多个partition,每个partition是一个有序队列,partition中的消息都有一个有序的id(offset),kafka只保证一个partition中的消息的有序性,不能保证一个topic或者多个partition中消息的有序性
offset 存储文件按照offset.kafka来命名,用offset命名的好处是方便查找

kafka处理流程步奏

producer生产数据到kafuka集群,消息发送到集群中的相应的broker 的topic 以及partition中
其中各个topic 和partition都在不同的broker中存在副本,并且各个broker存在leader,follower两种类型,kafka读写数据只存在于leader服务器上,follow只做副本以及当leader挂掉之后可做为新的leader
consumer消费集群中的数据,当数据量较大的时候就让多个消费者消费数据提高消费能力,就有了cocnsumer group,同一个消费者组数据共享,多个消费者消费部分数据
zookeeper存放kafka的meta集群信息以及消费者消费队列的信息(消费偏移量offset)

kafka下载地址:http://kafka.apache.org/downloads

kafka集群安装

  1. 下载kafka安装到指定目录,这里我放在/opt/module/kafka下,解压安装
  2. 创建一个logs日志文件夹(因为kafka存储的日志数据可做其他用途)
  3. 修改配置文件安装目录下的config/server.properties
  4. 配置环境变量profile(为了在任何路径都可执行kafka命令,这一步可省略)
1
2
3
#kafka home 
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
  1. 分发安装包xsync
  2. 启动集群bin/kafka-server-start.sh config/server.properties &
    后台启动不打日志,守护线程启动 nohup bin/kafka-server-start.sh -daemons config/server.properties &

server.properties配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker. 服务器唯一ID
broker.id=0

# Switch to enable topic deletion or not, default value is false 删除topic功能开关
#delete.topic.enable=true

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network 处理网络请求的的线程数
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O 处理磁盘io的线程数量
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM) 请求套接字的缓冲区大小
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files kafka运行日志存放的路径
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across topic在当前broker上的分区个数
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array. 恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age segment文件保留的最长时间,超时将删除
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes. 配置链接zookeeper集群地址
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

kafka命令行操作

关闭集群

1
bin/kafka-server-stop.sh stop 

查看当前服务中的所有topic

1
bin/kafka-topics.sh --zookeeper hadoop1:2181 --list

创建topics

1
2
3
4
5
bin/kafka-topics.sh --zookeeper hadoop1:2181 --create --topic hadoop1 --partitions 3 --replication-factor 3
参数说明
--topic 定义topic名
--replication-factor 定义副本数
--partitions 定义分区数

查看topic信息

1
bin/kafka-topics.sh --zookeeper hadoop1:2181 --describe --topic hadoop1

删除topic

1
bin/kafka-topics.sh --zookeeper hadoop1:2181 --delete --topic hadoop1

发送消息

1
bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic hadoop1

单个消费者启动

1
2
3
4
bin/kafka-console-consumer.sh --zookeeper hadoop1:2181 --from-beginning --topic hadoop1
配置消费者组的模式(修改consumer.properties文件,group.id=test-consumer-group)
bin/kafka-console-consumer.sh --zookeeper hadoop1:2181 --topic hadoop1 --consumer.config config/consumer.properties
bin/kafka-console-consumer.sh --bootstrap-server hadoop1:2181 --topic hadoop1 --consumer.config config/consumer.properties

查看消费者组信息

1
bin/kafka-consumer-offset-checker.sh --zookeeper hadoop1:2181 --group kafka-group

副本再平衡

1
bin/kafka-preferred-replica-election.sh --zookeeper hadoop1:2181

kafka生产过程分析

producer采用推(push)模式将消息发布到broker,每条消息都被追加(append)到分区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存高,零拷贝,kafka分段日志(segment),kafka预读(read ahead),后写(Write behind)保障kafka吞吐率)

分区(partition)消息发送时都被发送到一个topic,其本质就是一个目录,而topic是由一些partition logs(分区日志)组成,每个partition中俄消息都是有序的,生产的消息被不断追加到partition log 上,其中每一个消息都被赋予一个offset

分区的原因
方便集群拓展;
提高并发以partition 为单位读写
分区的原则
指定partition;
未指定partition但指定key,通过key的value进行hash出一个partition;
partition和key都未指定,使用轮询选出一个partition

副本(replication)
同一个partition可以有多个副本,如果没有副本,一旦broker宕机,其上所有partition的数据都不能被消费同时producer也不能将数据存储在其partition,引入replication,同一个partition可能会有多个副本,而这时需要在其副本中选出leader,生产者和消费者
只与leader交互,其他replication作为follower从leader中复制数据
ack应答机制
0:producer发送数据给leader,不管是否leader收到,效率最高
1(kafka默认):producer发送数据给leader,leader存放在log之后,producer就发送新的数据,这一段时间不管follower是否向leader同步成功,数据较为安全
-1(all):producer发送数据给leader,leader存放在log之后,等待所有follower同步成功之后,producer才能发送下一条数据,数据最安全,性能非常差

kafka消费过程分析

consumer链接zookeeper获取kafka集群的相关信息(主要是消费数据的offset值保存在zookeeper,在后期版本之后offset可放在集群中不用放在zookeeper)
消费者pull拉取kafka集群中指定的topic分区的信息消费,分批次取数据(缓冲数据多条数据),一个分区只能一个消费者消费,但是一个消费者可以消费多个分区,group consumer(消费者组)中包含的多个消费者里面的offset统一管理同组消费者成员消费
后期添加消费者消费,集群可做再平衡,消费者组从新从zookeeper获取集群信息,再平衡在kafka集群中进行

zookeeper存储kafka主要节点属性
cluster kafka集群信息(集群id)
controller 集群控制器(leader broker作为整个集群的控制器)
controller_epoch 轮值(leader竞选次数)
brokers 单个服务器信息(ids,topics,seqid)
consumers 消费者组信息(ids,owners)

kafka面试要点

  1. kafka的吞吐量高的原因:顺写日志,零拷贝(zero-cooy),kafka分段日志(segment),kafka预读(read ahead),后写(Write behind),批处理,压缩(byte)保障kafka吞吐率
  2. kafka的偏移量offset存放位置:早前版本zookeeper,后期版本放在cluster版本中(_consumer_offset),自定义维护offset(eg:redis里面,数据库都可以)
  3. kafka消费方式:poll,拉取方式
  4. 防止kafka数据丢失的的措施:同步发送数据,ack=1(all)
  5. 重复消费:维护offset避免重复消费(低级api)
  6. kafka元数据存放:zookeeper(/cluster,/controller,/controller_epoch,/brokers,/consumers)
  7. kafka选举机制:/controller 不同机器同时去zookeeper注册/controller节点,先到先得,成功之后从isr列表选取leader,全挂等待副本
  8. kafka的消费速度:增加分区和消费者,增加poll数量,增大批处理大小

上手代码地址:https://gitee.com/ArnoldSu/kafka