KafkaProducer.send()在获取metadata时会阻塞主线程

根据Kakfa的文档,KafkaProducer.send()方法是异步的,一直以来我也是这样认为的:

* The {@link #send(ProducerRecord) send()} method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.

最近我们改为使用阿里云提供的Kafka服务。迁移后发现开发环境中调用了KafkaProducer.send()的方法执行时间有时会变得异常长,排查后发现是KafkaProducer.send()在获取metadata时导致的。

之前使用自己搭建的Kafka集群时,连接速度快,因而没有发现这一情况。改为使用阿里云的Kafka集群后,由于开发环境和Kafka集群不在一个VPC下,需要使用公网地址访问,连接速度较慢(时间长达1~2秒),这一情况才暴露出来。

查看KafkaProducer的代码,发现doSend()方法中,第一步就是调用waitOnMetadata()方法获取topic的metadata信息:

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;

    try {
        KafkaProducer.ClusterAndWaitTime clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), this.maxBlockTimeMs);
        …………

而waitOnMetadata()方法中使用do-while循环来获取metadata信息,直至获取成功或抛出异常,因而阻塞在此:

private KafkaProducer.ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
    this.metadata.add(topic);
    Cluster cluster = this.metadata.fetch();
    Integer partitionsCount = cluster.partitionCountForTopic(topic);
    if (partitionsCount == null || partition != null && partition >= partitionsCount) {
        long begin = this.time.milliseconds();
        long remainingWaitMs = maxWaitMs;

        long elapsed;
        do {
            log.trace("Requesting metadata update for topic {}.", topic);
            int version = this.metadata.requestUpdate();
            this.sender.wakeup();

            try {
                this.metadata.awaitUpdate(version, remainingWaitMs);
            } catch (TimeoutException var15) {
                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
            }

            cluster = this.metadata.fetch();
            elapsed = this.time.milliseconds() - begin;
            if (elapsed >= maxWaitMs) {
                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
            }

            if (cluster.unauthorizedTopics().contains(topic)) {
                throw new TopicAuthorizationException(topic);
            }

            remainingWaitMs = maxWaitMs - elapsed;
            partitionsCount = cluster.partitionCountForTopic(topic);
        } while(partitionsCount == null);

        if (partition != null && partition >= partitionsCount) {
            throw new KafkaException(String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));
        } else {
            return new KafkaProducer.ClusterAndWaitTime(cluster, elapsed);
        }
    } else {
        return new KafkaProducer.ClusterAndWaitTime(cluster, 0L);
    }
}

解决方法很简单,自己开一个线程来调用KafkaProducer.send()就可以了。只不过这里不得不吐槽一下Kafka的文档,在这点上写的实在是有误导。

发表评论

电子邮件地址不会被公开。 必填项已用*标注