Co2y's Blog

kafka相关操作记录

某个项目需要使用kafka与hbase phoenix等结合使用,故在此记录一些关于kafka的操作

简介

Kafka是一个分布式、可分区、可复制的消息系统。Kafka将消息以topic为单位进行归纳;Kafka发布消息的程序称为producer,也叫生产者;Kafka预订topics并消费消息的程序称为consumer,也叫消费者;当Kafka以集群的方式运行时,可以由一个服务或者多个服务组成,每个服务叫做一个broker,运行过程中producer通过网络将消息发送到Kafka集群,集群向消费者提供消息。

部分概念

  • Broker:消息中间件处理结点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。
  • Topic:一类消息,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。
  • Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。
  • duplication:冗余,每个partition可以设置冗余数来保证高可用。
  • Segment:partition物理上由多个segment组成,下面2.2和2.3有详细说明。
  • offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息。

kafka的数据文件存在kafka-logs目录下,子目录由topic名称和partition ID两部分组成,例如:

1
2
3
# ls
my-replicated-topic5-3 recovery-point-offset-checkpoint
my-replicated-topic5-0 my-replicated-topic5-4 replication-offset-checkpoint

子目录内由索引文件.index 和数据文件.log 两部分组成。

使用

安装

kafka的安装很简单,下载解压之后,主要修改server.properties配置文件

1
2
3
4
5
修改
broker.id=1
log.dirs=/usr/kafka-logs默认为/tmp/kafka-log
设置log.cleaner.enable=true 让kafka自动清理日志
设置zookeeper.connect=linux1:2181,linux2:2181,linux3:2181/kafka

然后拷贝到多个节点上,每个节点运行bin/kafka-server-start.sh config/server.properties &

脚本

kafka相关的脚本都在bin目录下,具体使用查看相关命令即可。

因为kafka是利用zookeeper来管理集群的,所以很多元数据存在zookeeper上,默认是/kafka目录,相关信息可以利用zkCli.sh进入ZK查看

开发

关于生产者和消费者线程的设置

默认情况下,一条线程只对一个partition进行操作。在生产者开发时,可以通过制定partition的策略(根据key进行分发),利用多个线程往多个partition中写数据,以此提高吞吐量。 在消费者端,也可以利用多个线程同时消费多个partition内的数据,kafka不保证消息的有序,但是可以保证每个partition内消费的数据是有序的。topic下的一个分区只能被同一个consumergroup下的一个consumer线程来消费,但反之并不成立,即一个consumer线程可以消费多个分区的数据,比如Kafka提供的ConsoleConsumer,默认就只是一个线程来消费所有分区的数据。——其实ConsoleConsumer可以使用通配符的功能实现同时消费多个topic数据,

确定线程数的方法

创建一个只有1个分区的topic,然后测试这个topic的producer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位可以是MB/s。然后假设总的目标吞吐量是Tt,那么分区数 = Tt / max(Tp, Tc)
Tp表示producer的吞吐量。测试producer通常是很容易的,因为它的逻辑非常简单,就是直接发送消息到Kafka就好了。Tc表示consumer的吞吐量。测试Tc通常与应用的关系更大, 因为Tc的值取决于你拿到消息之后执行什么操作,因此Tc的测试通常也要麻烦一些。
我个人的观点,如果你的分区数是N,那么最好线程数也保持为N,这样通常能够达到最大的吞吐量。超过N的配置只是浪费系统资源,因为多出的线程不会被分配到任何分区。