Kafka入门教程 (四) Spring-kafka的使用和原理

Create Topics

定义一个KafkaAdmin的bean,可以自动检测集群中是否存在topic,不存在则创建
@Bean public KafkaAdmin admin() { Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses())); return new KafkaAdmin(configs); } @Bean public NewTopic topic1() { return new NewTopic("foo", 10, (short) 2); } @Bean public NewTopic topic2() { return new NewTopic("bar", 10, (short) 2); }

Send Message

使用kafkaTemplate即可
@Bean public ProducerFactory<Integer, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // See <https://kafka.apache.org/documentation/#producerconfigs> for more properties return props; } @Bean public KafkaTemplate<Integer, String> kafkaTemplate() { return new KafkaTemplate<Integer, String>(producerFactory()); }

Receive Message:Consumer

接收消息(Consumer)有两种方式,一种是使用@KafkaListener注解的方式,一种是实现MessageListener<K, V>接口。

容易混淆的几个类

//接口,封装原生KafkaConsumer,一个container封装一个consumer interface MessageListenerContainer; //单线程container实现,只启动一个consumer class KafkaMessageListenerContainer implemets MessageListenerContainer; //多线程container实现,负责创建多个KafkaMessageListenerContainer class ConcurrentMessageListenerContainer implemets MessageListenerContainer; //接口,工厂模式,container工厂,负责创建container,当使用@KafkaListener时需要提供 interface KafkaListenerContainerFactory<C extends MessageListenerContainer>; //container工厂的唯一实现,且参数为多线程container,如果需要单线程,setConsurrency(null)即可,这也是默认参数 class KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K, V>>

实现接口(非注解模式)

需要提供:
  • MessageListenerContainer
前者是consumer的业务实现(消息处理方法),后者是spring-kafka封装的对象,负责创建consumer

MessageListenerContainer

负责创建MessageListener,有两种MessageListenerContainer
  • KafkaMessageListenerContainer
  • ConcurrentMessageListenerContainer
前者是单线程消费使用;后者是多线程消费使用,通过代理的方式创建多个消费者。二者构造函数也比较相似
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties) public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties, TopicPartitionInitialOffset... topicPartitions)
  • ConsumerFactory:包含了Consumer的配置信息
  • ContainerProperties:构造函数设置监听的topics,ContainerProps.setMessageListener方法设置MessageListener
  • TopicPartitionInitialOffset:可以指定要消费的Topic/Partition和对应的offset,也可以在ContainerProperties中指定,指定后属于低级消费
多线程下,使用ConcurrentMessageListenerContainer::setConcurrency(3),就会创建3个KafkaMessageListenerContainer。 一个单线程的示例如下
Configuration @EnableKafka public class KafkaConfig { @Bean public KafkaMessageListenerContainer<String,String> kafkaConatiner(){ ContainerProperties properties=new ContainerProperties(topic) //can use TopicPartitionInitialOffset constructor to use low level consumer KafkaMessageListenerContainer<String,String> container=new KafkaMessageListenerContainer<String,String>(consumerFactory(),properties) container.getContainerProperties().setMessageListener("your listener") container.getContainerProperties().setAckMode(//ack mode,参见官网说明) } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,servers) //put all configs in map ... return props; } }
注意,高级消费要设置partition.assignment.strategy为RoundRobinAssignor,保证线程的平均分配(提醒:一个partition中,对于一个consumer group只能有一个线程消费,所以不存在多线程并发的问题,这也是Kafka高吞吐量的一个保证)如果不设置,可能会导致很多线程空闲。同时,线程多于partition*topic,Spring会自动的减少线程数来降低消耗。设置方法为
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor")

MessageListener<K, V>

这里有多种不同的接口,如下:
public interface MessageListener<K, V> { 1 void onMessage(ConsumerRecord<K, V> data); } public interface AcknowledgingMessageListener<K, V> { 2 void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment); } public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 3 void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer); } public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 4 void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); } public interface BatchMessageListener<K, V> { 5 void onMessage(List<ConsumerRecord<K, V>> data); } public interface BatchAcknowledgingMessageListener<K, V> { 6 void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment); } public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 7 void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer); } public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 8 void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); }
总结一下就是:
 
根据Container中kafka的配置选用适当的MessageListener即可。 注意
  1. Consumer是线程不安全的,只能在调用他的线程中使用其方法
  1. 使用ack时, prop的map中要设置auto commit 为false,同时要在ContainerProperties::setAckMode中设置为MANNUAL或者MANNUAL_IMMEDIATE

@KafkaListener

首先按照上文的介绍创建consumerFactory(),再创建KafkaListenerContainerFactory,示例如下
@Configuration @EnableKafka public class KafkaConfig { @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3);// set to null to use single thread factory.setBatchListener(true);// enable this when use batch listener factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); ... return props; } }
直接添加注解到方法上
@Component public class Listener { @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId") public void listen(String data) { ... } }
注意
  1. 默认需要创建一个名为**“kafkaListenerContainerFactory”的bean**,如果名称不一致,则需要在@KafkaListener中指定ContainerFactory
  1. 以上是多线程下的配置,非多线程设置concurrency为null,setConcurrency(null)
  1. 如果@KafkaListener只指定topic,则属于高级消费;指定了@TopicPartition则属于低级消费,要注意***设置线程分配策略***
  1. 被注解的方法参数如果是List,要记得设置batchListener为true
  1. 同样注意ackmode

Commit Offset

默认自动提交 配置中设为false,需要设置ackmode ContainerProperties.setAckMode(AbstractMessageListenerContainer.AckMode ackMode)
  • RECORD - 处理完一条记录后提交
  • BATCH - 处理完poll的一批数据后提交.
  • TIME - 处理完poll的一批数据后并且距离上次提交超过了设置的ackTime
  • COUNT - 处理完poll的一批数据后并且距离上次提交处理的记录数超过了设置的ackCount
  • COUNT_TIME - TIME和COUNT中任意一条满足即提交.
  • MANUAL - 手动调用Acknowledgment.acknowledge()后,并且处理完poll的这批数据后提交
  • MANUAL_IMMEDIATE - 手动调用Acknowledgment.acknowledge()后立即提交

Hint提示

一个程序有多种consumer配置?

  1. 扩展接口的方法,只需要定义新的KafkaMessageListenerContainer实现新配置,绑定新的MessageListener即可。
  1. @KafkaListener注解的方法:定义新的kafkaListenerContainerFactory实现新配置(主要是ContainerProperties),定义新的方法,添加@KafkaListener注解并指定刚创建的ContainerFactory即可。

选择用哪个?

  1. 都支持低级消费,都支持手动提交offset
  1. 注解不能方便的绑定topic,paitition等信息(当这些由运行时决定时)

其它

  1. @kafkaListener注解可以添加到类上,方法上加@KafkaHandler可以不同方法处理不同类型的消息
  1. 被注解的方法可以通过@Header获取更多record的信息

spring做了什么

  1. KafkaListenerContainerFactory创建ConcurrentMessageListenerContainer(仅使用注解时调用)
  1. ConcurrentMessageListenerContainer<K, V> 初始化,调用doStart方法(仅多线程时调用),根据线程数创建KafkaMessageListenerContainer(数量等于max[线程数,partition数]),并分配partition
    1. protected void doStart() { //不在运行时执行 if (!isRunning()) { checkTopics(); ContainerProperties containerProperties = getContainerProperties(); TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions(); //指定了topicPartitions,检查currency是否大于partition数,是则修正为partition数(低级消费) if (topicPartitions != null && this.concurrency > topicPartitions.length) { this.logger.warn("When specific partitions are provided, the concurrency must be less than or " + "equal to the number of partitions; reduced from " + this.concurrency + " to " + topicPartitions.length); this.concurrency = topicPartitions.length; } //设置标志位 setRunning(true); //循环创建KafkaMessageListenerContainer for (int i = 0; i < this.concurrency; i++) { KafkaMessageListenerContainer<K, V> container; //高级消费 if (topicPartitions == null) { container = new KafkaMessageListenerContainer<>(this, this.consumerFactory, containerProperties); } //低级消费,通过partitionSubset方法分配 else { container = new KafkaMessageListenerContainer<>(this, this.consumerFactory, containerProperties, partitionSubset(containerProperties, i)); } String beanName = getBeanName(); container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i); if (getApplicationEventPublisher() != null) { container.setApplicationEventPublisher(getApplicationEventPublisher()); } container.setClientIdSuffix("-" + i); container.setGenericErrorHandler(getGenericErrorHandler()); container.setAfterRollbackProcessor(getAfterRollbackProcessor()); //调用KafkaMessageListenerContainer的start方法 container.start(); this.containers.add(container); } } }
      //获取当前线程要处理的partitions private TopicPartitionInitialOffset[] partitionSubset(ContainerProperties containerProperties, int i) { TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions(); //一个线程,直接返回所有的 if (this.concurrency == 1) { return topicPartitions; } else { int numPartitions = topicPartitions.length; //线程数等于partition数,直接返回对应的下标的partition if (numPartitions == this.concurrency) { return new TopicPartitionInitialOffset[] { topicPartitions[i] }; } else { //获得每个线程平均partition数(整数) int perContainer = numPartitions / this.concurrency; TopicPartitionInitialOffset[] subset; //最后一个线程获得i * perContainer之后所有的partition //低阶消费,spring的分配不是很均匀的分配方式,所以建议设置线程数=partitions数 if (i == this.concurrency - 1) { subset = Arrays.copyOfRange(topicPartitions, i * perContainer, topicPartitions.length); } //其余每个线程获得perContainer个连续partition else { subset = Arrays.copyOfRange(topicPartitions, i * perContainer, (i + 1) * perContainer); } return subset; } } }
  1. 调用AbstractMessageListenerContainer的start方法,该方法调用KafkaMessageListenerContainer的doStart()方法,该方法初始化container,创建ListenerConsumer,以下是ListenerConsumer构造方法的一部分。(根据上一步是否分配partition,调用subscribe或assign创建Consumer对象)
    1. final Consumer<K, V> consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer( this.consumerGroupId, this.containerProperties.getClientId(), KafkaMessageListenerContainer.this.clientIdSuffix); this.consumer = consumer; ConsumerRebalanceListener rebalanceListener = createRebalanceListener(consumer); //未指定topic partition,调用subscribe,高级api if (KafkaMessageListenerContainer.this.topicPartitions == null) { if (this.containerProperties.getTopicPattern() != null) { consumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener); } else { consumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), rebalanceListener); } } else { //指定topic partition,调用assign,低级api List<TopicPartitionInitialOffset> topicPartitions = Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions); this.definedPartitions = new HashMap<>(topicPartitions.size()); for (TopicPartitionInitialOffset topicPartition : topicPartitions) { this.definedPartitions.put(topicPartition.topicPartition(), new OffsetMetadata(topicPartition.initialOffset(), topicPartition.isRelativeToCurrent(), topicPartition.getPosition())); } consumer.assign(new ArrayList<>(this.definedPartitions.keySet())); }

© Song 2015 - 2024