首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >无法在Flink新Kafka消费者api的检查点上向Kafka提交消费抵消(1.14)

无法在Flink新Kafka消费者api的检查点上向Kafka提交消费抵消(1.14)
EN

Stack Overflow用户
提问于 2021-12-16 07:29:52
回答 1查看 1.2K关注 0票数 2

我是指Flink 1.14版本的Kafka源代码连接器下面的代码。

我期待以下要求。

  • 在应用程序非常新的开始时,必须从检查点上卡夫卡主题
  • 的最新偏移量中读取,它必须在重启后(当应用程序手动/系统错误终止时)将所消耗的偏移量提交给Kafka
  • ,它必须从上次提交的偏移量中选择,并且必须使用消费者延迟,从此以后还要使用新的事件提要。

使用新的KafkaConsumer API (KafkaSource),我面临以下问题

  • 能够执行上述要求,但不能在检查点(500 on )上提交所消耗的偏移量。而是在2s或3s.

之后提交。

当您在2s/3s内手动关闭应用程序并重新启动时。由于上次使用的消息未提交,因此将读取两次(重复)。

为了反复检查这个特性,我尝试过使用Flink的旧消费者API (FlinkKafkaConsumer)。它在那里工作得很好。当一条信息被立即消耗时,它就会被送回卡夫卡。

遵循的步骤

  • 设置了Kafka
  • ,下面的flink代码用于消费。代码包括旧的和新的API。这两个API都将使用卡夫卡主题,并在控制台打印,
  • 将一些消息推送到Kafka主题。在推送一些消息之后,在控制台中可见的
  • 将终止Flink作业。
  • 检查卡夫卡的两个API的消费者组。新的flink消费者API的组-id(Test1)消费延迟大于0,与旧的消费者API的重新启动Flink作业相比,您可以看到那些未提交的消息在控制台中从新的Flink kafka可见--消费API导致重复消息。

如果我遗漏了什么或任何财产需要添加,请提出建议。

代码语言:javascript
复制
 @Test
    public void test() throws Exception {

        System.out.println("FlinkKafkaStreamsTest started ..");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.enableCheckpointing(500);
        env.setParallelism(4);

        Properties propertiesOld = new Properties();
        Properties properties = new Properties();
        String inputTopic = "input_topic";
        String bootStrapServers = "localhost:29092";
        String groupId_older = "older_test1";
        String groupId = "test1";

        propertiesOld.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
        propertiesOld.put(ConsumerConfig.GROUP_ID_CONFIG, groupId_older);
        propertiesOld.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);


        /******************** Old Kafka API **************/
        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(inputTopic,
                new KRecordDes(),
                propertiesOld);
        flinkKafkaConsumer.setStartFromGroupOffsets();
        env.addSource(flinkKafkaConsumer).print("old-api");


        /******************** New Kafka API **************/
        KafkaSourceBuilder<String> sourceBuilder = KafkaSource.<String>builder()
                .setBootstrapServers(bootStrapServers)
                .setTopics(inputTopic)
                .setGroupId(groupId)
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setProperty("enable.auto.commit", "false")
                .setProperty("commit.offsets.on.checkpoint", "true")
                .setProperties(properties)
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST));

        KafkaSource<String> kafkaSource = sourceBuilder.build();

        SingleOutputStreamOperator<String> source = env
                .fromSource(kafkaSource, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");

        source.print("new-api");

        env.execute();
    }
    static class KRecordDes implements  KafkaDeserializationSchema<String>{
        @Override
        public TypeInformation<String> getProducedType() {
            return TypeInformation.of(String.class);
        }
        @Override
        public boolean isEndOfStream(String nextElement) {
            return false;
        }
        @Override
        public String deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
            return new String(consumerRecord.value());
        }
    }

注意:我需要在相同代码中使用Flink有界源代码阅读器的其他需求,这些代码可以在新API(KafkaSource)中获得。

EN

回答 1

Stack Overflow用户

发布于 2021-12-16 16:43:07

资料来源:

注意到,卡夫卡源代码做而不是依赖于提交的偏移量来实现容错。提交偏移仅用于公开使用者和消费群体的进度以进行监视。

当Flink作业从失败中恢复时,而不是在代理上使用提交的偏移量,它将从最近的成功检查点恢复状态,并从存储在该检查点中的偏移量恢复消耗,因此检查点之后的记录将被“重放”一点。因为您使用的是打印接收器,这并不完全支持-一旦语义,您将看到重复的记录,实际上是记录后,最近成功的检查点。

对于您提到的偏移提交的2-3秒延迟,这是因为SourceReaderBase的实现。简而言之,SplitFetcher管理一个任务队列,当一个偏移提交任务被推入队列时,它将在调用KafkaConsumer#poll()超时的正在运行的提取任务超时之前不会被执行。如果交通流量很小,延误可能会更长。但是请注意,这不会影响正确性: KafkaSource不会使用提交的偏移量来进行容错。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70375310

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档