根据Kakfa的文档,KafkaProducer.send()方法是异步的,一直以来我也是这样认为的:
* The {@link #send(ProducerRecord) send()} method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.
最近我们改为使用阿里云提供的Kafka服务。迁移后发现开发环境中调用了KafkaProducer.send()的方法执行时间有时会变得异常长,排查后发现是KafkaProducer.send()在获取metadata时导致的。
之前使用自己搭建的Kafka集群时,连接速度快,因而没有发现这一情况。改为使用阿里云的Kafka集群后,由于开发环境和Kafka集群不在一个VPC下,需要使用公网地址访问,连接速度较慢(时间长达1~2秒),这一情况才暴露出来。
查看KafkaProducer的代码,发现doSend()方法中,第一步就是调用waitOnMetadata()方法获取topic的metadata信息:
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { KafkaProducer.ClusterAndWaitTime clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), this.maxBlockTimeMs); …………
而waitOnMetadata()方法中使用do-while循环来获取metadata信息,直至获取成功或抛出异常,因而阻塞在此:
private KafkaProducer.ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException { this.metadata.add(topic); Cluster cluster = this.metadata.fetch(); Integer partitionsCount = cluster.partitionCountForTopic(topic); if (partitionsCount == null || partition != null && partition >= partitionsCount) { long begin = this.time.milliseconds(); long remainingWaitMs = maxWaitMs; long elapsed; do { log.trace("Requesting metadata update for topic {}.", topic); int version = this.metadata.requestUpdate(); this.sender.wakeup(); try { this.metadata.awaitUpdate(version, remainingWaitMs); } catch (TimeoutException var15) { throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); } cluster = this.metadata.fetch(); elapsed = this.time.milliseconds() - begin; if (elapsed >= maxWaitMs) { throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); } if (cluster.unauthorizedTopics().contains(topic)) { throw new TopicAuthorizationException(topic); } remainingWaitMs = maxWaitMs - elapsed; partitionsCount = cluster.partitionCountForTopic(topic); } while(partitionsCount == null); if (partition != null && partition >= partitionsCount) { throw new KafkaException(String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount)); } else { return new KafkaProducer.ClusterAndWaitTime(cluster, elapsed); } } else { return new KafkaProducer.ClusterAndWaitTime(cluster, 0L); } }
解决方法很简单,自己开一个线程来调用KafkaProducer.send()就可以了。只不过这里不得不吐槽一下Kafka的文档,在这点上写的实在是有误导。