Kafka Avro使用架构注册表失败将序列化/反序列化为具体类型

2020-02-22 java apache-kafka avro confluent confluent-schema-registry

我似乎无法在其具体的avro实现中使用消息,但出现以下异常:

class org.apache.avro.generic.GenericData$Record cannot be cast to class my.package.MyConcreteClass

这是代码(我使用Spring Boot

MyProducer.java

private final KafkaTemplate<String, MyConcreteClass> kafkaTemplate;

public PositionProducer(KafkaTemplate<String, MyConcreteClass> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
}

public void sendMessage(MyConcreteClass myConcreteClass) {
    this.kafkaTemplate.send(topic, myConcreteClass);
}

MyConsumer.java

@KafkaListener(topics = "#{'${consumer.topic.name}'}", groupId = "#{'${spring.kafka.consumer.group-id}'}")
public void listen(MyConcreteClass incomingMsg) {
    //handle
}

请注意,如果我将所有内容都更改为GenericRecord ,则反序列化将正常工作,因此我知道所有配置(未粘贴)均已正确配置。

还可能需要注意的一点很重要,我没有亲自注册架构,而是让我的客户代码替我完成。

有任何想法吗?

编辑:

配置:

@Bean
public ConsumerFactory<String, MyConcreteClass> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");
    props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
    return new DefaultKafkaConsumerFactory<>(props);
}

Answers

您需要自定义使用者配置。 ContentDeserializer必须是具有对架构注册表的引用的KafkaAvroDeserializer。

MyConcreteClass需要扩展SpecificRecord

您可以使用Avro maven插件从架构生成它

然后,您必须配置序列化程序以知道要使用特定记录

props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true") ;

Related