引用一段官网的说明:
Apache Kafka 是 一个分布式流处理平台 ,主要有以下三个特性:
- 可以让你发布和订阅流式的记录。这一方面与消息队列或者企业消息系统类似。
- 可以储存流式的记录,并且有较好的容错性。
- 可以在流式记录产生时就进行处理。
Kafka 的一些基本概念:
- Kafka 是以一个集群的方式运行在一台或者多台服务器上的。
- Kafka 的消息以 topic 来进行区分。
- 将向 Kafka 集群中指定的 topic 发布消息的服务称为 Producer。
- 将从 Kafka 集群中指定的一个或多个 topic 中获取消息并消费的服务称为 Consumer。
以上内容均来自:Kafka 中文文档 。有关 topic、producer、consumer、client 的详细讲解可阅读官方文档。
1. 搭建 Kafka 环境
由于个人较为倾向于使用 Docker 搭建服务环境,所以文中只介绍如何使用 Docker 搭建 Kafka 环境。
在 kafka 目录下创建 docker-compose.yml 文件。
docker-compose.yml 文件内容:
version: '3'
services:
zookeeper:
container_name: kafka_zookeeper
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
container_name: kafka
image: wurstmeister/kafka:latest
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
保存文件后,在 kafka 目录下执行 docker-compose 命令:
docker-compose up -d
执行过程中会自动下载 wurstmeister/zookeeper
和 wurstmeister/kafka
镜像。如果无法下载该镜像文件,可以手动执行 pull 命令:
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
至于如何通过 kafka 命令创建 topic,发布消息以及消费消息,可对照官方文档进行操作,本文重点在于如何在 Spring Boot 中使用 Kafka。
2. Spring Boot 整合 Kafka
搭建好了 Kafka 环境,我们开始着手 Spring Boot 的整合。
Demo 项目中 Spring Boot 的版本为 2.2.6.RELASE,并使用 lombok 工具。
2.1 添加依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.2 文件配置
在 application.yml 文件中添加 kafka 配置:
spring:
kafka:
# kafka 地址,多个用 “,”隔开
bootstrap-servers: localhost:9092
template:
# 默认 topic
default-topic: kafka_test
# 生产者配置
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 3
# 消费者配置
consumer:
group-id: kafka-consumer
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
isolation-level: read_committed
enable-auto-commit: true
# listener 配置,作用于 @KafkaListener
listener:
type: batch
concurrency: 4
poll-timeout: 3000
2.3 JavaConfig 配置 Kafka
创建 KafkaConfig 配置类:
@Configuration
@EnableKafka
public class KafkaConfig{
@Autowired
private KafkaProperties kafkaProperties;
@Bean
public AdminClient adminClient() {
Map<String, Object> configs = new HashMap<>(2);
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
return AdminClient.create(configs);
}
}
在 appliction.yml 文件中添加的 kafka 配置均会配置到 KafkaProperties
中。
使用 @EnableKafka
注解开启 Kafka 支持,通过 Spring Boot 的自动装配自动将 ProducerFactory
、ConsumerFactory
添加至 Spring IOC 容器中。
在配置类中向 Spring IOC 容器中注册 AdminClient
,后面会用到。
2.4 创建 topic
新建一个 KafkaController
类:
@Slf4j
@RestController
@RequestMapping("kafka")
public class KafkaController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private AdminClient adminClient;
@PutMapping("topic")
public void buildTopic(@RequestParam("name") String name, @RequestParam("numPartitions") Integer numPartitions,
@RequestParam("replicationFactor") Short replicationFactor) {
NewTopic newTopic = new NewTopic(name, numPartitions, replicationFactor);
adminClient.createTopics(Collections.singletonList(newTopic));
}
}
新建或者更新 topic 需要三个参数:
- name: topic 名称。
- numPartitions: topic 分区数量。
- replicationFactor:topic 副本数量。
同样还可以通过 AdminClient
查询或删除 topic :
@GetMapping("topics")
public HttpEntity<?> getAllTopics() {
ListTopicsResult listTopicsResult = adminClient.listTopics();
try {
Set<String> names = listTopicsResult.names().get();
return ResponseEntity.ok(names);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("服务异常");
}
}
@DeleteMapping("topic/{topic}")
public void deleteTopic(@PathVariable String topic) {
adminClient.deleteTopics(Collections.singletonList(topic));
}
查询当前 topic 列表,返回如下结果:
[
"kafka_test2",
"kafka_test"
]
调用接口删除 kafka_test2
后,再次查询:
[
"kafka_test"
]
2.5 发布消息
@GetMapping("produce")
public void send(@RequestParam("message") String message) {
/// 发送消息至默认topic
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.sendDefault(message
buildCallBack(future, message);
}
@GetMapping("produce/{topic}")
public void sendToTopic(@PathVariable String topic, @RequestParam("message") String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
buildCallBack(future, message);
}
@SuppressWarnings("NullableProblems")
private void buildCallBack(ListenableFuture<SendResult<String, String>> future, String message) {
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure( Throwable throwable) {
log.info("消息 [{}] 发送失败,错误原因: {}", message, throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("消息 [{}] 发送成功,当前 partition: {},当前 offset: {}", message,
result.getRecordMetadata().partition(), result.getRecordMetadata().offset());
}
});
}
代码中我们通过 ListenableFutureCallback
回调的方式来确定消息是否发送成功,成功回调 onSuccess
方法,失败回调 onFailure
方法。
如此,现在就可以发布消息了,但是还没有消费者消费这些消息。
2.6 消费消息
新建一个 KafkaMessageListener
类:
@Component
@Slf4j
public class KafkaMessageListener {
@KafkaListener(topics = "kafka_test", groupId = "kafka-consumer")
public void listen(String message) {
log.info("收到消息: {}", message);
}
}
下面我们大致分析一下,@KafkaListener
是如何工作的:
通过 application.yml 配置,Spring Boot 会帮助我们创建一个 DefaultConsumerFactory
对象:
public class DefaultKafkaConsumerFactory<K, V> implements ConsumerFactory<K, V> {
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(DefaultKafkaConsumerFactory.class));
private final Map<String, Object> configs;
private Supplier<Deserializer<K>> keyDeserializerSupplier;
private Supplier<Deserializer<V>> valueDeserializerSupplier;
// ......
}
而后将该 DefaultConsumerFactory
对象作为参数传递给 ConcurrentKafkaListenerContainerFactory
,以此创建消息监听容器:
public class ConcurrentKafkaListenerContainerFactory<K, V>
extends AbstractKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K, V>, K, V> {
// ......
@Override
protected ConcurrentMessageListenerContainer<K, V> createContainerInstance(KafkaListenerEndpoint endpoint) {
TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign();
if (topicPartitions != null && topicPartitions.length > 0) {
ContainerProperties properties = new ContainerProperties(topicPartitions);
return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
}
else {
Collection<String> topics = endpoint.getTopics();
if (!topics.isEmpty()) {
ContainerProperties properties = new ContainerProperties(topics.toArray(new String[0]));
return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
}
else {
ContainerProperties properties = new ContainerProperties(endpoint.getTopicPattern());
return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
}
}
}
// ......
}
getConsumerFactory
方法继承自 AbstractKafkaListenerContainerFactory
:
private ConsumerFactory<? super K, ? super V> consumerFactory;
public ConsumerFactory<? super K, ? super V> getConsumerFactory() {
return this.consumerFactory;
}
接着通过 @KafkaListener
注解指定监听的 topic 以及消费消息的 Consumer 消费组。
至此,我们已经配置好了 Producer、Consumer 以及配置好消息监听了,接下来看看效果。
3. 测试消息发送与接收
启动项目后,控制台有一段这样的输出:
2020-04-30 17:41:23.968 INFO 17284 --- [ main] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
bootstrap.servers = [127.0.0.1:9092]
client.dns.lookup = default
client.id =
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
2020-04-30 17:41:24.007 INFO 17284 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
2020-04-30 17:41:24.008 INFO 17284 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
2020-04-30 17:41:24.008 INFO 17284 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1588239684006
输出了 AdminClient 的配置信息。
目前项目中有两个 topic : kafka_test、kafka_test2
启动后,我们分别访问:
- localhost:8080/kafka/produce?message=你好, 世界
- localhost:8080/kafka/produce/kafkast2?message=hello,kafka
控制台先输出如下结果:
2020-04-30 20:25:50.426 INFO 19376 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [47.105.106.3:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = kafka-consumer
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_committed
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2020-04-30 20:25:50.453 INFO 19376 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
2020-04-30 20:25:50.453 INFO 19376 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
2020-04-30 20:25:50.453 INFO 19376 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1588249550453
2020-04-30 20:25:50.454 INFO 19376 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-1, groupId=kafka-consumer] Subscribed to topic(s): kafka_test
这部分信息是 Consumer 的配置信息,不过类似的信息输出了 4 次是为什么呢?
因为我们监听容器中指定的 topic 有 4 个分区,而默认情况下,每一个分区就要对应一个 Consumer,所以创建了 4 个 Consumer 。
接着控制台输出了 Producer 的配置信息:
2020-04-30 20:26:05.284 INFO 19376 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [127.0.0.1:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id =
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 3
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2020-04-30 20:26:05.294 INFO 19376 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
2020-04-30 20:26:05.294 INFO 19376 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
2020-04-30 20:26:05.294 INFO 19376 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1588249565294
2020-04-30 20:26:05.413 INFO 19376 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: 2lYAT_NgRw28lGEvmJe_eg
在之后就是我们刚刚发送的消息:
[ad | producer-1] c.n.kafka.controller.KafkaController : 消息 [你好, 世界] 发送成功,当前 partition: 0,当前 offset: 6
[ntainer#0-0-C-1] c.n.kafka.listener.KafkaMessageListener : 收到消息: 你好, 世界
[ad | producer-1] c.n.kafka.controller.KafkaController : 消息 [hello kafka] 发送成功,当前 partition: 0,当前 offset: 1
可以看到 发送到 kafka_test2 中的消息 【hello kafka】 没有被消费。
4. @KafkaListener 的更多用法
我们还可以使用 @KafkaListener
同时监听多个 topic 消息:
@KafkaListener(topics = {"kafka_test", "kafka_test2"}, groupId = "kafka-consumer")
以及通过 @Header 注解当前消息的来源信息:
@KafkaListener(topics = {"kafka_test", "kafka_test2"}, groupId = "kafka-consumer")
public void listen(@Payload String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
log.info("收到来自 topic: {}, partition: {} 的消息: {}", topic, partition, message);
}
重新启动项目,控制台输出:
.... KafkaMessageListener : 收到来自 topic: kafka_test2, partition: 0 的消息: hello kafka
由于之前 kafka_test2 中有一条消息没有消费,在本次监听中首先对这条消息进行了消费。接着再推送两条消息:
c.n.kafka.controller.KafkaController : 消息 [ message from kafka_test] 发送成功,当前 partition: 0,当前 offset: 7
c.n.kafka.listener.KafkaMessageListener : 收到来自 topic: kafka_test, partition: 0 的消息: message from kafka_test
c.n.kafka.controller.KafkaController : 消息 [ message from kafka_test2] 发送成功,当前 partition: 0,当前 offset: 3
c.n.kafka.listener.KafkaMessageListener : 收到来自 topic: kafka_test2, partition: 0 的消息: message from kafka_test2
5. 发送对象内容
定义一个订单实体类:
@Data
public class Order implements Serializable {
private static final long serialVersionUID = -6330218506336380533L;
private Long id;
private String orderNo;
private List<ShopDetail> shopList;
private Integer amount;
}
我们接下来要向 kafka 中推送订单消息。
5.1 JSON 方式
通过自定义配置 JsonSerializer
和 JsonDeserializer
对消息进行序列化和反序列化:
@Bean
public ProducerFactory<String, Order> producerFactory() {
Map<String, Object> configs = new HashMap<>(8);
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configs);
}
@Bean
public ConsumerFactory<String, Order> consumerFactory() {
Map<String, Object> configs = new HashMap<>(4);
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
configs.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumer().getGroupId());
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getConsumer().getAutoOffsetReset());
return new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new JsonDeserializer<>(Order.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Order> factory() {
ConcurrentKafkaListenerContainerFactory<String, Order> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
同时还要修改 KafkaMessageListener
:
@KafkaListener(topics = {"kafka_test", "kafka_test2"}, groupId = "kafka-consumer")
public void listen(@Payload Order order,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
log.info("收到来自 topic: {}, partition: {} 的订单消息: {}", topic, partition, order);
}
这样配置完成后就可以发送和消费对象消息了。
5.2 String 方式(推荐)
相信大家都感觉到了,直接使用 JSON 序列化和反序列化的方式需要进行自定义配置。如果是针对多个不同的对象,就还需要更多的配置,总之做的就是吃力不讨好的事。
反正都是以 JSON 的形式传递消息,那为什么不传递 JSON 格式的 字符串 呢,同样的效果,只需要进行一步转换,无需更多的配置:
在 KafkaController
中添加接口:
@Autowired
private ObjectMapper objectMapper;
@PostMapping("produce/order/{topic}")
public void sendOrderToTopic(@PathVariable String topic, @RequestBody Order order) {
String message = null;
try {
message = objectMapper.writeValueAsString(order);
} catch (JsonProcessingException e) {
log.error("parse json error", e);
}
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
buildCallBack(future, message);
}
修改 KafkaMessageListener
:
@Autowired
private ObjectMapper objectMapper;
@KafkaListener(topics = {"kafka_test", "kafka_test2"}, groupId = "kafka-consumer")
public void listen(@Payload String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
try {
Order order = objectMapper.readValue(message, Order.class);
log.info("收到来自 topic: {}, partition: {} 的订单消息: {}", topic, partition, order);
} catch (JsonProcessingException e) {
log.error("parse json error, json string: {}", message, e);
}
}
发送一条这样的订单消息,查看控制台输出:
c.n.kafka.controller.KafkaController : 消息 [{"id":1,"orderNo":"20200430102348389","shopList":[{"id":1,"productNo":"A10001","productName":"Iphone 11 Pro","price":899900,"buyNum":"1"},{"id":1,"productNo":"A10002","productName":"MacBook Pro","price":1899900,"buyNum":"1"}],"amount":2799800}] 发送成功,当前 partition: 0,当前 offset: 8
c.n.kafka.listener.KafkaMessageListener : 收到来自 topic: kafka_test, partition: 0 的订单消息: Order(id=1, orderNo=20200430102348389, shopList=[ShopDetail(id=1, productNo=A10001, productName=Iphone 11 Pro, price=899900, buyNum=1), ShopDetail(id=1, productNo=A10002, productName=MacBook Pro, price=1899900, buyNum=1)], amount=2799800)
相比较而言,在没有特殊需求的情况下,直接使用 String 的方式传递消息无疑是最为方便的。极其不推荐使用 JSON 方式 或者 自定义序列化 方式来处理消息。
源码地址:https://github.com/NekoChips/SpringDemo/tree/master/23.springboot-Kafka
关于作者:NekoChips
本文地址:https://chenyangjie.com.cn/articles/2020/04/30/1588254024454.html
版权声明:本篇所有文章仅用于学习和技术交流,本作品采用 BY-NC-SA 4.0 许可协议,如需转载请注明出处!
许可协议:
