首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >关闭后如何重新连接kafka producer?

关闭后如何重新连接kafka producer?
EN

Stack Overflow用户
提问于 2016-10-07 20:14:07
回答 3查看 13.3K关注 0票数 4

我有一个多线程应用程序,它使用producer类来产生消息,之前我使用下面的代码为每个新构建的request.where KafkaProducer创建producer,如下所示:

代码语言:javascript
复制
KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(prop);

ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, objBytes);
producer.send(data, new Callback() {

                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        isValidMsg[0] = false;
                        exception.printStackTrace();
                        saveOrUpdateLog(msgBean, producerType, exception);
                        logger.error("ERROR:Unable to produce message.",exception);
                    }
                }
            });
producer.close();

然后我读了关于producer的Kafka文档,了解到我们应该使用单个producer实例来获得良好的性能。

然后,我在一个单例类中创建了KafkaProducer的单个实例。

现在,我们应该在何时何地关闭生产者。显然,如果我们在第一次发送请求后关闭生产者,它将找不到生产者来重新发送消息,因此抛出:

代码语言:javascript
复制
java.lang.IllegalStateException: Cannot send after the producer is closed.

或者我们如何在关闭后重新连接到producer。问题是,如果程序崩溃或有异常,那么?

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2016-10-07 20:22:25

通常,在KafkaProducer上调用close()就足以确保所有飞行记录都已完成:

代码语言:javascript
复制
/**
 * Close this producer. This method blocks until all previously sent requests complete.
 * This method is equivalent to <code>close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)</code>.
 * <p>
 * <strong>If close() is called from {@link Callback}, a warning message will be logged and close(0, TimeUnit.MILLISECONDS)
 * will be called instead. We do this because the sender thread would otherwise try to join itself and
 * block forever.</strong>
 * <p>
 *
 * @throws InterruptException If the thread is interrupted while blocked
 */

如果您的生产者在应用程序的整个生命周期中都在使用,那么在收到终止信号之前不要关闭它,然后调用close()。正如文档中所说,在多线程环境中使用生产者是安全的,因此您应该重用相同的实例。

如果您在多个线程中共享KafkaProducer,您有两个选择:

  1. 在通过Runtime.getRuntime().addShutdownHook从主执行线程注册关闭回调时调用close()
  2. 让您的多线程方法竞相关闭,只允许一个方法获胜。

2的粗略草图可能如下所示:

代码语言:javascript
复制
object KafkaOwner {
  private var producer: KafkaProducer = ???
  @volatile private var isClosed = false
     
  def close(): Unit = {
    if (!isClosed) {
      kafkaProducer.close()
      isClosed = true
    }
  }
    
  def instance: KafkaProducer = {
    this.synchronized {
      if (!isClosed) producer 
      else {
        producer = new KafkaProducer()
        isClosed = false
      }
    }
  }
}
票数 5
EN

Stack Overflow用户

发布于 2017-07-11 19:26:29

如javadoc for KafkaProducer中所述。

代码语言:javascript
复制
public void close()

Close this producer. This method blocks until all previously sent requests complete.
This method is equivalent to close(Long.MAX_VALUE, TimeUnit.MILLISECONDS).

源:https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#close()

所以你不必担心你的消息不会被发送,即使你在发送后立即调用close。

如果您计划多次使用某个KafkaProducer,则仅在使用完它之后才将其关闭。如果你仍然想保证你的消息在你的方法完成之前被发送,而不是在缓冲区中等待,那么使用KafkaProducer#flush(),它将阻塞直到当前缓冲区被发送。如果你愿意,你也可以在Future#get()上阻塞。

如果你不打算关闭你的KafkaProducer (例如,在短命的应用程序中,你只发送一些数据,应用程序在发送后立即终止),也有一个警告要注意。JVM线程是一个守护线程,这意味着KafkaProducer不会等到这个线程结束后才终止VM。因此,为了确保您的消息实际发送,请在Future#get()上使用KafkaProducer#flush()、no-arg KafkaProducer#close()或block。

票数 3
EN

Stack Overflow用户

发布于 2018-05-24 13:30:07

Kafka producer应该是线程安全和节俭的线程池。您可能想要使用

代码语言:javascript
复制
producer.flush();

而不是

代码语言:javascript
复制
producer.close();

让生产者打开,直到程序终止,或者直到你确定你不再需要它。

如果您仍然想要关闭生成器,则按需重新创建它。

代码语言:javascript
复制
producer = new KafkaProducer<String, byte[]>(prop);
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/39917046

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档