Kafka配置支持多种消息格式的核心思路
Kafka本身不限制消息格式,其核心机制是通过**序列化(Producer端)将业务对象转换为二进制数据传输,通过反序列化(Consumer端)**将二进制数据还原为业务对象。支持多种消息格式的关键是为不同Topic或消息类型配置对应的序列化/反序列化器,确保生产者和消费者能正确处理数据。
JSON是Kafka中最常用的文本格式之一,适合需要调试或跨语言的场景。配置需使用JsonSerializer(Producer端)和JsonDeserializer(Consumer端),Spring Boot中可通过application.yml简化配置:
# Producer配置(Spring Boot)
spring:
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer # Key序列化(字符串)
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # Value序列化(JSON)
# Consumer配置(Spring Boot)
spring:
kafka:
consumer:
group-id: test-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # Key反序列化
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer # Value反序列化
properties:
spring.json.trusted.packages: "com.example.model" # 指定信任的反序列化包路径(避免安全问题)
生产者发送JSON消息示例:
kafkaTemplate.send("json-topic", "key1", new User(1L, "Alice", 25, "Female")); // 自动序列化为JSON
消费者接收JSON消息示例:
@KafkaListener(topics = "json-topic", groupId = "test-group")
public void listen(User user) { // 自动反序列化为User对象
System.out.println("Received user: " + user.getName());
}
Avro是二进制格式,具有高压缩率和Schema演化能力,适合大数据场景。需依赖Confluent Avro库,配置KafkaAvroSerializer(Producer)和KafkaAvroDeserializer(Consumer),并通过Schema Registry管理Schema:
# Producer配置(Spring Boot)
spring:
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
properties:
schema.registry.url: http://localhost:8081 # Schema Registry地址
# Consumer配置(Spring Boot)
spring:
kafka:
consumer:
group-id: avro-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema.registry.url: http://localhost:8081
specific.avro.reader: true # 使用具体类型反序列化(而非GenericRecord)
生产者发送Avro消息示例(需提前定义Schema):
// 定义Avro Schema(User.avsc)
// {"type":"record","name":"User","fields":[{"name":"id","type":"long"},{"name":"name","type":"string"}]}
User user = new User(1L, "Bob");
kafkaTemplate.send("avro-topic", "key2", user); // 自动序列化为Avro二进制
若业务需要极致性能或特殊数据结构,可自定义序列化器(实现Serializer接口)和反序列化器(实现Deserializer接口)。例如,序列化User对象为ByteBuffer(包含定长ID、变长姓名和定长性别/年龄):
// 自定义序列化器
public class UserSerializer implements Serializer<User> {
@Override
public byte[] serialize(String topic, User data) {
byte[] nameBytes = data.getName().getBytes(StandardCharsets.UTF_8);
ByteBuffer buffer = ByteBuffer.allocate(8 + 4 + nameBytes.length + 4 + 4); // id(8)+nameLen(4)+name+sex(4)+age(4)
buffer.putLong(data.getId());
buffer.putInt(nameBytes.length);
buffer.put(nameBytes);
buffer.putInt(data.getSex());
buffer.putInt(data.getAge());
return buffer.array();
}
}
// 自定义反序列化器
public class UserDeserializer implements Deserializer<User> {
@Override
public User deserialize(String topic, byte[] data) {
ByteBuffer buffer = ByteBuffer.wrap(data);
long id = buffer.getLong();
int nameLen = buffer.getInt();
byte[] nameBytes = new byte[nameLen];
buffer.get(nameBytes);
String name = new String(nameBytes, StandardCharsets.UTF_8);
int sex = buffer.getInt();
int age = buffer.getInt();
return new User(id, name, sex, age);
}
}
配置Producer和Consumer使用自定义序列化器:
// Producer配置
Properties props = new Properties();
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.example.UserSerializer");
KafkaProducer<String, User> producer = new KafkaProducer<>(props);
// Consumer配置
Properties props = new Properties();
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "com.example.UserDeserializer");
KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props);
若同一Topic中存在多种消息格式(如部分消息为JSON、部分为Avro),可通过消息头(Headers)或消息前缀识别格式,手动处理反序列化:
@KafkaListener(topics = "mixed-topic")
public void listen(ConsumerRecord<String, byte[]> record) {
byte[] value = record.value();
if (value[0] == '{') { // 简单判断:以'{'开头的是JSON
String json = new String(value, StandardCharsets.UTF_8);
User user = objectMapper.readValue(json, User.class); // 使用Jackson解析JSON
System.out.println("Received JSON user: " + user.getName());
} else { // 否则假设是Avro(需配合Schema Registry)
User user = kafkaAvroDeserializer.deserialize(record.topic(), value);
System.out.println("Received Avro user: " + user.getName());
}
}
这种方式需确保消息格式有明确的区分标识(如前缀、Magic Byte),避免误判。