根据Kakfa的文档,KafkaProducer.send()方法是异步的,一直以来我也是这样认为的:
* The {@link #send(ProducerRecord) send()} method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.
最近我们改为使用阿里云提供的Kafka服务。迁移后发现开发环境中调用了KafkaProducer.send()的方法执行时间有时会变得异常长,排查后发现是KafkaProducer.send()在获取metadata时导致的。
之前使用自己搭建的Kafka集群时,连接速度快,因而没有发现这一情况。改为使用阿里云的Kafka集群后,由于开发环境和Kafka集群不在一个VPC下,需要使用公网地址访问,连接速度较慢(时间长达1~2秒),这一情况才暴露出来。
查看KafkaProducer的代码,发现doSend()方法中,第一步就是调用waitOnMetadata()方法获取topic的metadata信息:
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
KafkaProducer.ClusterAndWaitTime clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), this.maxBlockTimeMs);
…………
而waitOnMetadata()方法中使用do-while循环来获取metadata信息,直至获取成功或抛出异常,因而阻塞在此:
private KafkaProducer.ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
this.metadata.add(topic);
Cluster cluster = this.metadata.fetch();
Integer partitionsCount = cluster.partitionCountForTopic(topic);
if (partitionsCount == null || partition != null && partition >= partitionsCount) {
long begin = this.time.milliseconds();
long remainingWaitMs = maxWaitMs;
long elapsed;
do {
log.trace("Requesting metadata update for topic {}.", topic);
int version = this.metadata.requestUpdate();
this.sender.wakeup();
try {
this.metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException var15) {
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
}
cluster = this.metadata.fetch();
elapsed = this.time.milliseconds() - begin;
if (elapsed >= maxWaitMs) {
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
}
if (cluster.unauthorizedTopics().contains(topic)) {
throw new TopicAuthorizationException(topic);
}
remainingWaitMs = maxWaitMs - elapsed;
partitionsCount = cluster.partitionCountForTopic(topic);
} while(partitionsCount == null);
if (partition != null && partition >= partitionsCount) {
throw new KafkaException(String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));
} else {
return new KafkaProducer.ClusterAndWaitTime(cluster, elapsed);
}
} else {
return new KafkaProducer.ClusterAndWaitTime(cluster, 0L);
}
}
解决方法很简单,自己开一个线程来调用KafkaProducer.send()就可以了。只不过这里不得不吐槽一下Kafka的文档,在这点上写的实在是有误导。