我正在使用Flink 1.13.1,并试图用10k记录的RPS(每秒速率)将数据写入kafka。我有一个由30个经纪人组成的kafka集群,我的flink工作做了一个过滤器操作员,只将数据汇到kafka。下面是我的生产者设置,最初我有5个接收器主题分区,但现在10个,我仍然得到同样的问题。另外,我尝试将request.timeout.ms设置为1分钟,而不是卡夫卡默认的30秒,但仍然得到了120001 ms has passed since batch creation。由于此错误,检查点失败。检查点大小仅为107 is,因为flink只向kafka提交偏移。作业并行性为36。这是纳卡的属性和完整的堆栈跟踪。
private static Properties kafkaSinkProperties() {
val properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
properties.setProperty("bootstrap.servers", KafkaServers.SINK_KAFKA_BOOTSTRAP_SERVER);
properties.setProperty("request.timeout.ms", "60000");
return properties;
}
FlinkKafkaProducer<Map<String, Object>> kafkaProducer =
new FlinkKafkaProducer<Map<String, Object>>(
KafkaTopics.TOPIC, jsonSerde, kafkaSinkProperties());
filterStream.addSink(kafkaProducer);java.lang.Exception: Could not perform checkpoint 6984 for operator Source: source -> Filter -> Sink: Unnamed (42/48)#3.
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:1000)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$7(StreamTask.java:960)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 6984 for operator Source: source -> Filter -> Sink: Unnamed (42/48)#3. Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:264)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1086)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1070)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:988)
... 13 more
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 20 record(s) for topic-1:120000 ms has passed since batch creation
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1095)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:1002)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:99)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:320)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:1100)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:218)
... 23 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 20 record(s) for topic-1:120000 ms has passed since batch creation发布于 2022-08-17 10:05:28
卡夫卡负担过重,最简单的方法就是给你的卡夫卡增加更多的资源。
有些小事你可以做:
acks=1而不是acks=all发布于 2022-08-23 09:04:02
这无疑是Flink-Kafka发送问题的一个迹象。它可能是卡夫卡超载,或者你有一些非常重要的连接问题,在你的设置。当然,您可以优化您的软件或硬件设置,但也可以考虑充分了解您的错误的本质。
我认为Flink-Kafka相互作用的哲学是,大多数配置应该在Flink操作符本身中完成。
因此,我认为卡夫卡司机的下一步选择是值得考虑的:
https://stackoverflow.com/questions/71822260
复制相似问题