首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >FlinkKafkaException:未能将数据发送给Kafka:自批处理创建以来,“`topic`”:120001 ms的过期19条记录已过

FlinkKafkaException:未能将数据发送给Kafka:自批处理创建以来,“`topic`”:120001 ms的过期19条记录已过
EN

Stack Overflow用户
提问于 2022-04-11 02:59:40
回答 2查看 532关注 0票数 1

我正在使用Flink 1.13.1,并试图用10k记录的RPS(每秒速率)将数据写入kafka。我有一个由30个经纪人组成的kafka集群,我的flink工作做了一个过滤器操作员,只将数据汇到kafka。下面是我的生产者设置,最初我有5个接收器主题分区,但现在10个,我仍然得到同样的问题。另外,我尝试将request.timeout.ms设置为1分钟,而不是卡夫卡默认的30秒,但仍然得到了120001 ms has passed since batch creation。由于此错误,检查点失败。检查点大小仅为107 is,因为flink只向kafka提交偏移。作业并行性为36。这是纳卡的属性和完整的堆栈跟踪。

代码语言:javascript
复制
private static Properties kafkaSinkProperties() {
    val properties = new Properties();
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
    properties.setProperty("bootstrap.servers", KafkaServers.SINK_KAFKA_BOOTSTRAP_SERVER);
    properties.setProperty("request.timeout.ms", "60000");
    return properties;
}

FlinkKafkaProducer<Map<String, Object>> kafkaProducer =
        new FlinkKafkaProducer<Map<String, Object>>(
            KafkaTopics.TOPIC, jsonSerde, kafkaSinkProperties());
filterStream.addSink(kafkaProducer);
代码语言:javascript
复制
java.lang.Exception: Could not perform checkpoint 6984 for operator Source: source -> Filter -> Sink: Unnamed (42/48)#3.
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:1000)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$7(StreamTask.java:960)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 6984 for operator Source: source -> Filter -> Sink: Unnamed (42/48)#3. Failure reason: Checkpoint was declined.
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:264)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1086)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1070)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:988)
    ... 13 more
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 20 record(s) for topic-1:120000 ms has passed since batch creation
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1095)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:1002)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:99)
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:320)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:1100)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:218)
    ... 23 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 20 record(s) for topic-1:120000 ms has passed since batch creation
EN

回答 2

Stack Overflow用户

发布于 2022-08-17 10:05:28

卡夫卡负担过重,最简单的方法就是给你的卡夫卡增加更多的资源。

有些小事你可以做:

  • 减少并行性(将立即将较少的数据发送给kafka )
  • IO卡夫卡操作次数少吗?
  • 设置卡夫卡acks=1而不是acks=all
  • 增加检查点触发间隔
票数 0
EN

Stack Overflow用户

发布于 2022-08-23 09:04:02

这无疑是Flink-Kafka发送问题的一个迹象。它可能是卡夫卡超载,或者你有一些非常重要的连接问题,在你的设置。当然,您可以优化您的软件或硬件设置,但也可以考虑充分了解您的错误的本质。

我认为Flink-Kafka相互作用的哲学是,大多数配置应该在Flink操作符本身中完成。

因此,我认为卡夫卡司机的下一步选择是值得考虑的:

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71822260

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档