Kafka入门教程 (四) Spring-kafka的使用和原理

鉴于网上的文章写的比较杂乱或者浅显,参考Spring-Kafka的官方文档,总结了一下,文中可能有描述不当或者不正确的地方,欢迎勘误。

Create Topics

定义一个KafkaAdmin的bean,可以自动检测集群中是否存在topic,不存在则创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses()));
return new KafkaAdmin(configs);
}

Bean
public NewTopic topic1() {
return new NewTopic("foo", 10, (short) 2);
}

@Bean
public NewTopic topic2() {
return new NewTopic("bar", 10, (short) 2);
}

Send Message

使用kafkaTemplate即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}

Receive Message:Consumer

接收消息(Consumer)有两种方式,一种是使用@KafkaListener注解的方式,一种是实现MessageListener接口。

容易混淆的几个类

1
2
3
4
5
6
7
8
9
10
11
//接口,封装原生KafkaConsumer,一个container封装一个consumer
interface MessageListenerContainer;
//单线程container实现,只启动一个consumer
class KafkaMessageListenerContainer implemets MessageListenerContainer;
//多线程container实现,负责创建多个KafkaMessageListenerContainer
class ConcurrentMessageListenerContainer implemets MessageListenerContainer;

//接口,工厂模式,container工厂,负责创建container,当使用@KafkaListener时需要提供
interface KafkaListenerContainerFactory<C extends MessageListenerContainer>;
//container工厂的唯一实现,且参数为多线程container,如果需要单线程,setConsurrency(null)即可,这也是默认参数
class KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K, V>>

实现接口(非注解模式)

需要提供:

前者是consumer的业务实现(消息处理方法),后者是spring-kafka封装的对象,负责创建consumer

MessageListenerContainer

负责创建MessageListener,有两种MessageListenerContainer

  • KafkaMessageListenerContainer
  • ConcurrentMessageListenerContainer

前者是单线程消费使用;后者是多线程消费使用,通过代理的方式创建多个消费者。二者构造函数也比较相似

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties,
TopicPartitionInitialOffset... topicPartitions)
```

- ConsumerFactory:包含了Consumer的配置信息
- ContainerProperties:构造函数设置监听的topics,ContainerProps.setMessageListener方法设置MessageListener
- TopicPartitionInitialOffset:可以指定要消费的Topic/Partition和对应的offset,也可以在ContainerProperties中指定,指定后属于低级消费


多线程下,使用ConcurrentMessageListenerContainer::setConcurrency(3),就会创建3个KafkaMessageListenerContainer。
一个单线程的示例如下
```java
Configuration
@EnableKafka
public class KafkaConfig {

@Bean
public KafkaMessageListenerContainer<String,String> kafkaConatiner(){
ContainerProperties properties=new ContainerProperties(topic) //can use TopicPartitionInitialOffset constructor to use low level consumer
KafkaMessageListenerContainer<String,String> container=new KafkaMessageListenerContainer<String,String>(consumerFactory(),properties)
container.getContainerProperties().setMessageListener("your listener")
container.getContainerProperties().setAckMode(//ack mode,参见官网说明)
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,servers)
//put all configs in map
...
return props;
}
}

注意,高级消费要设置partition.assignment.strategy为RoundRobinAssignor,保证线程的平均分配(提醒:一个partition中,对于一个consumer group只能有一个线程消费,所以不存在多线程并发的问题,这也是Kafka高吞吐量的一个保证)如果不设置,可能会导致很多线程空闲。同时,线程多于partition*topic,Spring会自动的减少线程数来降低消耗。设置方法为

1
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor")

MessageListener

这里有多种不同的接口,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public interface MessageListener<K, V> { 1

void onMessage(ConsumerRecord<K, V> data);

}

public interface AcknowledgingMessageListener<K, V> { 2

void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);

}

public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 3

void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);

}

public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 4

void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);

}

public interface BatchMessageListener<K, V> { 5

void onMessage(List<ConsumerRecord<K, V>> data);

}

public interface BatchAcknowledgingMessageListener<K, V> { 6

void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);

}

public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 7

void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);

}

public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 8

void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);

}

总结一下就是:

序号 消费方式 自动提交 提供Consumer对象
1 单条
2 单条
3 单条
4 单条
5 批量
6 批量
7 批量
8 批量

根据Container中kafka的配置选用适当的MessageListener即可。
注意

  1. Consumer是线程不安全的,只能在调用他的线程中使用其方法
  2. 使用ack时, prop的map中要设置auto commit 为false,同时要在ContainerProperties::setAckMode中设置为MANNUAL或者MANNUAL_IMMEDIATE

@KafkaListener

首先按照上文的介绍创建consumerFactory(),再创建KafkaListenerContainerFactory,示例如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Configuration
@EnableKafka
public class KafkaConfig {

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);// set to null to use single thread
factory.setBatchListener(true);// enable this when use batch listener
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}

@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
...
return props;
}
}

直接添加注解到方法上

1
2
3
4
5
6
7
8
@Component
public class Listener {

@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}

注意

  1. 默认需要创建一个名为“kafkaListenerContainerFactory”的bean,如果名称不一致,则需要在@KafkaListener中指定ContainerFactory
  2. 以上是多线程下的配置,非多线程设置concurrency为null,setConcurrency(null)
  3. 如果@KafkaListener只指定topic,则属于高级消费;指定了@TopicPartition则属于低级消费,要注意设置线程分配策略
  4. 被注解的方法参数如果是List,要记得设置batchListener为true
  5. 同样注意ackmode

Commit Offset

默认自动提交
配置中设为false,需要设置ackmode
ContainerProperties.setAckMode(AbstractMessageListenerContainer.AckMode ackMode)

  • RECORD - 处理完一条记录后提交
  • BATCH - 处理完poll的一批数据后提交.
  • TIME - 处理完poll的一批数据后并且距离上次提交超过了设置的ackTime
  • COUNT - 处理完poll的一批数据后并且距离上次提交处理的记录数超过了设置的ackCount
  • COUNT_TIME - TIME和COUNT中任意一条满足即提交.
  • MANUAL - 手动调用Acknowledgment.acknowledge()后,并且处理完poll的这批数据后提交
  • MANUAL_IMMEDIATE - 手动调用Acknowledgment.acknowledge()后立即提交

Hint提示

一个程序有多种consumer配置?

  1. 扩展接口的方法,只需要定义新的KafkaMessageListenerContainer实现新配置,绑定新的MessageListener即可。
  2. @KafkaListener注解的方法:定义新的kafkaListenerContainerFactory实现新配置(主要是ContainerProperties),定义新的方法,添加@KafkaListener注解并指定刚创建的ContainerFactory即可。

选择用哪个?

  1. 都支持低级消费,都支持手动提交offset
  2. 注解不能方便的绑定topic,paitition等信息(当这些由运行时决定时)

其它

  1. @kafkaListener注解可以添加到类上,方法上加@KafkaHandler可以不同方法处理不同类型的消息
  2. 被注解的方法可以通过@Header获取更多record的信息

spring做了什么

  1. KafkaListenerContainerFactory创建ConcurrentMessageListenerContainer(仅使用注解时调用)
  2. ConcurrentMessageListenerContainer 初始化,调用doStart方法(仅多线程时调用),根据线程数创建KafkaMessageListenerContainer(数量等于max[线程数,partition数]),并分配partition
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    protected void doStart() {
    //不在运行时执行
    if (!isRunning()) {
    checkTopics();
    ContainerProperties containerProperties = getContainerProperties();
    TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
    //指定了topicPartitions,检查currency是否大于partition数,是则修正为partition数(低级消费)
    if (topicPartitions != null
    && this.concurrency > topicPartitions.length) {
    this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
    + "equal to the number of partitions; reduced from " + this.concurrency + " to "
    + topicPartitions.length);
    this.concurrency = topicPartitions.length;
    }
    //设置标志位
    setRunning(true);

    //循环创建KafkaMessageListenerContainer
    for (int i = 0; i < this.concurrency; i++) {
    KafkaMessageListenerContainer<K, V> container;
    //高级消费
    if (topicPartitions == null) {
    container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,
    containerProperties);
    }
    //低级消费,通过partitionSubset方法分配
    else {
    container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,
    containerProperties, partitionSubset(containerProperties, i));
    }
    String beanName = getBeanName();
    container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
    if (getApplicationEventPublisher() != null) {
    container.setApplicationEventPublisher(getApplicationEventPublisher());
    }
    container.setClientIdSuffix("-" + i);
    container.setGenericErrorHandler(getGenericErrorHandler());
    container.setAfterRollbackProcessor(getAfterRollbackProcessor());
    //调用KafkaMessageListenerContainer的start方法
    container.start();
    this.containers.add(container);
    }
    }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//获取当前线程要处理的partitions
private TopicPartitionInitialOffset[] partitionSubset(ContainerProperties containerProperties, int i) {
TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
//一个线程,直接返回所有的
if (this.concurrency == 1) {
return topicPartitions;
}
else {
int numPartitions = topicPartitions.length;
//线程数等于partition数,直接返回对应的下标的partition
if (numPartitions == this.concurrency) {
return new TopicPartitionInitialOffset[] { topicPartitions[i] };
}
else {
//获得每个线程平均partition数(整数)
int perContainer = numPartitions / this.concurrency;
TopicPartitionInitialOffset[] subset;
//最后一个线程获得i * perContainer之后所有的partition
//低阶消费,spring的分配不是很均匀的分配方式,所以建议设置线程数=partitions数
if (i == this.concurrency - 1) {
subset = Arrays.copyOfRange(topicPartitions, i * perContainer, topicPartitions.length);
}
//其余每个线程获得perContainer个连续partition
else {
subset = Arrays.copyOfRange(topicPartitions, i * perContainer, (i + 1) * perContainer);
}
return subset;
}
}
}
  1. 调用AbstractMessageListenerContainer的start方法,该方法调用KafkaMessageListenerContainer的doStart()方法,该方法初始化container,创建ListenerConsumer,以下是ListenerConsumer构造方法的一部分。(根据上一步是否分配partition,调用subscribe或assign创建Consumer对象)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    final Consumer<K, V> consumer =
    KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
    this.consumerGroupId,
    this.containerProperties.getClientId(),
    KafkaMessageListenerContainer.this.clientIdSuffix);
    this.consumer = consumer;

    ConsumerRebalanceListener rebalanceListener = createRebalanceListener(consumer);

    //未指定topic partition,调用subscribe,高级api
    if (KafkaMessageListenerContainer.this.topicPartitions == null) {
    if (this.containerProperties.getTopicPattern() != null) {
    consumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener);
    }
    else { consumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), rebalanceListener);
    }
    }
    else {
    //指定topic partition,调用assign,低级api
    List<TopicPartitionInitialOffset> topicPartitions =
    Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
    this.definedPartitions = new HashMap<>(topicPartitions.size());
    for (TopicPartitionInitialOffset topicPartition : topicPartitions) {
    this.definedPartitions.put(topicPartition.topicPartition(),
    new OffsetMetadata(topicPartition.initialOffset(), topicPartition.isRelativeToCurrent(),
    topicPartition.getPosition()));
    }
    consumer.assign(new ArrayList<>(this.definedPartitions.keySet()));
    }
Song wechat
扫一扫,关注微信公众号,订阅我的博客
扫码领红包,支持走一波