引用一段官网的说明:

Apache Kafka 是 一个分布式流处理平台 ,主要有以下三个特性:

  1. 可以让你发布和订阅流式的记录。这一方面与消息队列或者企业消息系统类似。
  2. 可以储存流式的记录,并且有较好的容错性。
  3. 可以在流式记录产生时就进行处理。

Kafka 的一些基本概念:

  • Kafka 是以一个集群的方式运行在一台或者多台服务器上的。
  • Kafka 的消息以 topic 来进行区分。
  • 将向 Kafka 集群中指定的 topic 发布消息的服务称为 Producer
  • 将从 Kafka 集群中指定的一个或多个 topic 中获取消息并消费的服务称为 Consumer

image.png

以上内容均来自:Kafka 中文文档 。有关 topic、producer、consumer、client 的详细讲解可阅读官方文档。

1. 搭建 Kafka 环境

由于个人较为倾向于使用 Docker 搭建服务环境,所以文中只介绍如何使用 Docker 搭建 Kafka 环境。

在 kafka 目录下创建 docker-compose.yml 文件。

docker-compose.yml 文件内容

version: '3'
  services:
    zookeeper:
      container_name: kafka_zookeeper
      image: wurstmeister/zookeeper
      ports:
	- "2181:2181"
    kafka:
      container_name: kafka
      image: wurstmeister/kafka:latest
      ports:
	- "9092:9092"
      environment:
	KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092
	KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
	KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      volumes:
	- /var/run/docker.sock:/var/run/docker.sock

保存文件后,在 kafka 目录下执行 docker-compose 命令:

docker-compose up -d

执行过程中会自动下载 wurstmeister/zookeeperwurstmeister/kafka 镜像。如果无法下载该镜像文件,可以手动执行 pull 命令:

docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka

至于如何通过 kafka 命令创建 topic,发布消息以及消费消息,可对照官方文档进行操作,本文重点在于如何在 Spring Boot 中使用 Kafka。

2. Spring Boot 整合 Kafka

搭建好了 Kafka 环境,我们开始着手 Spring Boot 的整合。

Demo 项目中 Spring Boot 的版本为 2.2.6.RELASE,并使用 lombok 工具。

2.1 添加依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2.2 文件配置

在 application.yml 文件中添加 kafka 配置:

spring:
  kafka:
    # kafka 地址,多个用 “,”隔开
    bootstrap-servers: localhost:9092
    template:
      # 默认 topic
      default-topic: kafka_test
    # 生产者配置
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      retries: 3
    # 消费者配置
    consumer:
      group-id: kafka-consumer
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      isolation-level: read_committed
      enable-auto-commit: true
    # listener 配置,作用于 @KafkaListener
    listener:
      type: batch
      concurrency: 4
      poll-timeout: 3000

2.3 JavaConfig 配置 Kafka

创建 KafkaConfig 配置类:

@Configuration
@EnableKafka
public class KafkaConfig{

    @Autowired
    private KafkaProperties kafkaProperties;
  
    @Bean
    public AdminClient adminClient() {
        Map<String, Object> configs = new HashMap<>(2);
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        return AdminClient.create(configs);
    }
}

在 appliction.yml 文件中添加的 kafka 配置均会配置到 KafkaProperties 中。

使用 @EnableKafka 注解开启 Kafka 支持,通过 Spring Boot 的自动装配自动将 ProducerFactoryConsumerFactory 添加至 Spring IOC 容器中。

image.png

在配置类中向 Spring IOC 容器中注册 AdminClient ,后面会用到。

2.4 创建 topic

新建一个 KafkaController 类:

@Slf4j
@RestController
@RequestMapping("kafka")
public class KafkaController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private AdminClient adminClient;

    @PutMapping("topic")
    public void buildTopic(@RequestParam("name") String name, @RequestParam("numPartitions") Integer numPartitions,
                           @RequestParam("replicationFactor") Short replicationFactor) {
        NewTopic newTopic = new NewTopic(name, numPartitions, replicationFactor);
        adminClient.createTopics(Collections.singletonList(newTopic));
    }
}

新建或者更新 topic 需要三个参数:

  • name: topic 名称。
  • numPartitions: topic 分区数量。
  • replicationFactor:topic 副本数量。

同样还可以通过 AdminClient 查询或删除 topic :

@GetMapping("topics")
public HttpEntity<?> getAllTopics() {
    ListTopicsResult listTopicsResult = adminClient.listTopics();
    try {
        Set<String> names = listTopicsResult.names().get();
        return ResponseEntity.ok(names);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
        return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("服务异常");
    }
}

@DeleteMapping("topic/{topic}")
public void deleteTopic(@PathVariable String topic) {
    adminClient.deleteTopics(Collections.singletonList(topic));
}

查询当前 topic 列表,返回如下结果:

[
    "kafka_test2",
    "kafka_test"
]

调用接口删除 kafka_test2 后,再次查询:

[
    "kafka_test"
]

2.5 发布消息

@GetMapping("produce")
public void send(@RequestParam("message") String message) {
    /// 发送消息至默认topic
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.sendDefault(message
    buildCallBack(future, message);
}

@GetMapping("produce/{topic}")
public void sendToTopic(@PathVariable String topic, @RequestParam("message") String message) {
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
    buildCallBack(future, message);
}

@SuppressWarnings("NullableProblems")
private void buildCallBack(ListenableFuture<SendResult<String, String>> future, String message) {
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
  
        @Override
        public void onFailure( Throwable throwable) {
            log.info("消息 [{}] 发送失败,错误原因: {}", message, throwable.getMessage());
        }
        @Override
        public void onSuccess(SendResult<String, String> result) {
            log.info("消息 [{}] 发送成功,当前 partition: {},当前 offset: {}", message,
                    result.getRecordMetadata().partition(), result.getRecordMetadata().offset());
        }
    });
}

代码中我们通过 ListenableFutureCallback 回调的方式来确定消息是否发送成功,成功回调 onSuccess 方法,失败回调 onFailure 方法。

如此,现在就可以发布消息了,但是还没有消费者消费这些消息。

2.6 消费消息

新建一个 KafkaMessageListener 类:

@Component
@Slf4j
public class KafkaMessageListener {

    @KafkaListener(topics = "kafka_test", groupId = "kafka-consumer")
    public void listen(String message) {
        log.info("收到消息: {}", message);  
    }
}

下面我们大致分析一下,@KafkaListener 是如何工作的:

通过 application.yml 配置,Spring Boot 会帮助我们创建一个 DefaultConsumerFactory 对象:

public class DefaultKafkaConsumerFactory<K, V> implements ConsumerFactory<K, V> {

	private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(DefaultKafkaConsumerFactory.class));

	private final Map<String, Object> configs;

	private Supplier<Deserializer<K>> keyDeserializerSupplier;

	private Supplier<Deserializer<V>> valueDeserializerSupplier;

	// ......

}

而后将该 DefaultConsumerFactory 对象作为参数传递给 ConcurrentKafkaListenerContainerFactory ,以此创建消息监听容器:

public class ConcurrentKafkaListenerContainerFactory<K, V>
		extends AbstractKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K, V>, K, V> {
	// ......

	@Override
	protected ConcurrentMessageListenerContainer<K, V> createContainerInstance(KafkaListenerEndpoint endpoint) {
		TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign();
		if (topicPartitions != null && topicPartitions.length > 0) {
			ContainerProperties properties = new ContainerProperties(topicPartitions);
			return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
		}
		else {
			Collection<String> topics = endpoint.getTopics();
			if (!topics.isEmpty()) {
				ContainerProperties properties = new ContainerProperties(topics.toArray(new String[0]));
				return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
			}
			else {
				ContainerProperties properties = new ContainerProperties(endpoint.getTopicPattern());
				return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
			}
		}
	}

	// ......

}

getConsumerFactory 方法继承自 AbstractKafkaListenerContainerFactory

private ConsumerFactory<? super K, ? super V> consumerFactory;

public ConsumerFactory<? super K, ? super V> getConsumerFactory() {
	return this.consumerFactory;
}

接着通过 @KafkaListener 注解指定监听的 topic 以及消费消息的 Consumer 消费组

至此,我们已经配置好了 Producer、Consumer 以及配置好消息监听了,接下来看看效果。

3. 测试消息发送与接收

启动项目后,控制台有一段这样的输出:

2020-04-30 17:41:23.968  INFO 17284 --- [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
	bootstrap.servers = [127.0.0.1:9092]
	client.dns.lookup = default
	client.id = 
	connections.max.idle.ms = 300000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 120000
	retries = 5
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS

2020-04-30 17:41:24.007  INFO 17284 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
2020-04-30 17:41:24.008  INFO 17284 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
2020-04-30 17:41:24.008  INFO 17284 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1588239684006

输出了 AdminClient 的配置信息。


目前项目中有两个 topic : kafka_test、kafka_test2

启动后,我们分别访问:

  1. localhost:8080/kafka/produce?message=你好, 世界
  2. localhost:8080/kafka/produce/kafkast2?message=hello,kafka

控制台先输出如下结果:

2020-04-30 20:25:50.426  INFO 19376 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = latest
	bootstrap.servers = [47.105.106.3:9092]
	check.crcs = true
	client.dns.lookup = default
	client.id = 
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = true
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = kafka-consumer
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	isolation.level = read_committed
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2020-04-30 20:25:50.453  INFO 19376 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
2020-04-30 20:25:50.453  INFO 19376 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
2020-04-30 20:25:50.453  INFO 19376 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1588249550453
2020-04-30 20:25:50.454  INFO 19376 --- [           main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-1, groupId=kafka-consumer] Subscribed to topic(s): kafka_test

这部分信息是 Consumer 的配置信息,不过类似的信息输出了 4 次是为什么呢?

因为我们监听容器中指定的 topic 有 4 个分区,而默认情况下,每一个分区就要对应一个 Consumer,所以创建了 4Consumer


接着控制台输出了 Producer 的配置信息:

2020-04-30 20:26:05.284  INFO 19376 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [127.0.0.1:9092]
	buffer.memory = 33554432
	client.dns.lookup = default
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = false
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 3
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2020-04-30 20:26:05.294  INFO 19376 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
2020-04-30 20:26:05.294  INFO 19376 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
2020-04-30 20:26:05.294  INFO 19376 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1588249565294
2020-04-30 20:26:05.413  INFO 19376 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: 2lYAT_NgRw28lGEvmJe_eg

在之后就是我们刚刚发送的消息:

[ad | producer-1] c.n.kafka.controller.KafkaController     : 消息 [你好, 世界] 发送成功,当前 partition: 0,当前 offset: 6
[ntainer#0-0-C-1] c.n.kafka.listener.KafkaMessageListener  : 收到消息: 你好, 世界
[ad | producer-1] c.n.kafka.controller.KafkaController     : 消息 [hello kafka] 发送成功,当前 partition: 0,当前 offset: 1

可以看到 发送到 kafka_test2 中的消息 【hello kafka】 没有被消费。

4. @KafkaListener 的更多用法

我们还可以使用 @KafkaListener 同时监听多个 topic 消息:

@KafkaListener(topics = {"kafka_test", "kafka_test2"}, groupId = "kafka-consumer")

以及通过 @Header 注解当前消息的来源信息:

@KafkaListener(topics = {"kafka_test", "kafka_test2"}, groupId = "kafka-consumer")
public void listen(@Payload String message,
                   @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                   @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
    log.info("收到来自 topic: {}, partition: {} 的消息: {}", topic, partition, message);
}

重新启动项目,控制台输出:

.... KafkaMessageListener  : 收到来自 topic: kafka_test2, partition: 0 的消息: hello kafka

由于之前 kafka_test2 中有一条消息没有消费,在本次监听中首先对这条消息进行了消费。接着再推送两条消息:

c.n.kafka.controller.KafkaController     : 消息 [ message from kafka_test] 发送成功,当前 partition: 0,当前 offset: 7
c.n.kafka.listener.KafkaMessageListener  : 收到来自 topic: kafka_test, partition: 0 的消息:  message from kafka_test
c.n.kafka.controller.KafkaController     : 消息 [ message from kafka_test2] 发送成功,当前 partition: 0,当前 offset: 3
c.n.kafka.listener.KafkaMessageListener  : 收到来自 topic: kafka_test2, partition: 0 的消息:  message from kafka_test2

5. 发送对象内容

定义一个订单实体类:

@Data
public class Order implements Serializable {

    private static final long serialVersionUID = -6330218506336380533L;

    private Long id;

    private String orderNo;

    private List<ShopDetail> shopList;

    private Integer amount;
}

我们接下来要向 kafka 中推送订单消息。

5.1 JSON 方式

通过自定义配置 JsonSerializerJsonDeserializer 对消息进行序列化和反序列化:

@Bean
public ProducerFactory<String, Order> producerFactory() {
    Map<String, Object> configs = new HashMap<>(8);
    configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
    configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(configs);
}

@Bean
public ConsumerFactory<String, Order> consumerFactory() {
    Map<String, Object> configs = new HashMap<>(4);
    configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
    configs.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumer().getGroupId());
    configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getConsumer().getAutoOffsetReset());
    return new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new JsonDeserializer<>(Order.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Order> factory() {
    ConcurrentKafkaListenerContainerFactory<String, Order> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

同时还要修改 KafkaMessageListener

@KafkaListener(topics = {"kafka_test", "kafka_test2"}, groupId = "kafka-consumer")
public void listen(@Payload Order order,
                   @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                   @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
    log.info("收到来自 topic: {}, partition: {} 的订单消息: {}", topic, partition, order);
}

这样配置完成后就可以发送和消费对象消息了。

5.2 String 方式(推荐)

相信大家都感觉到了,直接使用 JSON 序列化和反序列化的方式需要进行自定义配置。如果是针对多个不同的对象,就还需要更多的配置,总之做的就是吃力不讨好的事。

反正都是以 JSON 的形式传递消息,那为什么不传递 JSON 格式的 字符串 呢,同样的效果,只需要进行一步转换,无需更多的配置:

KafkaController 中添加接口:

@Autowired
private ObjectMapper objectMapper;

@PostMapping("produce/order/{topic}")
public void sendOrderToTopic(@PathVariable String topic, @RequestBody Order order) {
    String message = null;
    try {
        message = objectMapper.writeValueAsString(order);
    } catch (JsonProcessingException e) {
        log.error("parse json error", e);
    }
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
    buildCallBack(future, message);
}

修改 KafkaMessageListener

@Autowired
private ObjectMapper objectMapper;

@KafkaListener(topics = {"kafka_test", "kafka_test2"}, groupId = "kafka-consumer")
public void listen(@Payload String message,
                   @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                   @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
    try {
        Order order = objectMapper.readValue(message, Order.class);
        log.info("收到来自 topic: {}, partition: {} 的订单消息: {}", topic, partition, order);
    } catch (JsonProcessingException e) {
        log.error("parse json error, json string: {}", message, e);
    }
}

发送一条这样的订单消息,查看控制台输出:

image.png

c.n.kafka.controller.KafkaController     : 消息 [{"id":1,"orderNo":"20200430102348389","shopList":[{"id":1,"productNo":"A10001","productName":"Iphone 11 Pro","price":899900,"buyNum":"1"},{"id":1,"productNo":"A10002","productName":"MacBook Pro","price":1899900,"buyNum":"1"}],"amount":2799800}] 发送成功,当前 partition: 0,当前 offset: 8
c.n.kafka.listener.KafkaMessageListener  : 收到来自 topic: kafka_test, partition: 0 的订单消息: Order(id=1, orderNo=20200430102348389, shopList=[ShopDetail(id=1, productNo=A10001, productName=Iphone 11 Pro, price=899900, buyNum=1), ShopDetail(id=1, productNo=A10002, productName=MacBook Pro, price=1899900, buyNum=1)], amount=2799800)

相比较而言,在没有特殊需求的情况下,直接使用 String 的方式传递消息无疑是最为方便的。极其不推荐使用 JSON 方式 或者 自定义序列化 方式来处理消息。

源码地址:https://github.com/NekoChips/SpringDemo/tree/master/23.springboot-Kafka


关于作者:NekoChips
本文地址:https://chenyangjie.com.cn/articles/2020/04/30/1588254024454.html
版权声明:本篇所有文章仅用于学习和技术交流,本作品采用 BY-NC-SA 4.0 许可协议,如需转载请注明出处!
许可协议:知识共享许可协议