我有一个多线程应用程序,它使用producer类来产生消息,之前我使用下面的代码为每个新构建的request.where KafkaProducer创建producer,如下所示:
KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(prop);
ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, objBytes);
producer.send(data, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
isValidMsg[0] = false;
exception.printStackTrace();
saveOrUpdateLog(msgBean, producerType, exception);
logger.error("ERROR:Unable to produce message.",exception);
}
}
});
producer.close();然后我读了关于producer的Kafka文档,了解到我们应该使用单个producer实例来获得良好的性能。
然后,我在一个单例类中创建了KafkaProducer的单个实例。
现在,我们应该在何时何地关闭生产者。显然,如果我们在第一次发送请求后关闭生产者,它将找不到生产者来重新发送消息,因此抛出:
java.lang.IllegalStateException: Cannot send after the producer is closed.或者我们如何在关闭后重新连接到producer。问题是,如果程序崩溃或有异常,那么?
发布于 2016-10-07 20:22:25
通常,在KafkaProducer上调用close()就足以确保所有飞行记录都已完成:
/**
* Close this producer. This method blocks until all previously sent requests complete.
* This method is equivalent to <code>close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)</code>.
* <p>
* <strong>If close() is called from {@link Callback}, a warning message will be logged and close(0, TimeUnit.MILLISECONDS)
* will be called instead. We do this because the sender thread would otherwise try to join itself and
* block forever.</strong>
* <p>
*
* @throws InterruptException If the thread is interrupted while blocked
*/如果您的生产者在应用程序的整个生命周期中都在使用,那么在收到终止信号之前不要关闭它,然后调用close()。正如文档中所说,在多线程环境中使用生产者是安全的,因此您应该重用相同的实例。
如果您在多个线程中共享KafkaProducer,您有两个选择:
Runtime.getRuntime().addShutdownHook从主执行线程注册关闭回调时调用close() 2的粗略草图可能如下所示:
object KafkaOwner {
private var producer: KafkaProducer = ???
@volatile private var isClosed = false
def close(): Unit = {
if (!isClosed) {
kafkaProducer.close()
isClosed = true
}
}
def instance: KafkaProducer = {
this.synchronized {
if (!isClosed) producer
else {
producer = new KafkaProducer()
isClosed = false
}
}
}
}发布于 2017-07-11 19:26:29
如javadoc for KafkaProducer中所述。
public void close()
Close this producer. This method blocks until all previously sent requests complete.
This method is equivalent to close(Long.MAX_VALUE, TimeUnit.MILLISECONDS).源:https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#close()
所以你不必担心你的消息不会被发送,即使你在发送后立即调用close。
如果您计划多次使用某个KafkaProducer,则仅在使用完它之后才将其关闭。如果你仍然想保证你的消息在你的方法完成之前被发送,而不是在缓冲区中等待,那么使用KafkaProducer#flush(),它将阻塞直到当前缓冲区被发送。如果你愿意,你也可以在Future#get()上阻塞。
如果你不打算关闭你的KafkaProducer (例如,在短命的应用程序中,你只发送一些数据,应用程序在发送后立即终止),也有一个警告要注意。JVM线程是一个守护线程,这意味着KafkaProducer不会等到这个线程结束后才终止VM。因此,为了确保您的消息实际发送,请在Future#get()上使用KafkaProducer#flush()、no-arg KafkaProducer#close()或block。
发布于 2018-05-24 13:30:07
Kafka producer应该是线程安全和节俭的线程池。您可能想要使用
producer.flush();而不是
producer.close();让生产者打开,直到程序终止,或者直到你确定你不再需要它。
如果您仍然想要关闭生成器,则按需重新创建它。
producer = new KafkaProducer<String, byte[]>(prop);https://stackoverflow.com/questions/39917046
复制相似问题