我是指Flink 1.14版本的Kafka源代码连接器下面的代码。
我期待以下要求。
使用新的KafkaConsumer API (KafkaSource),我面临以下问题
之后提交。
当您在2s/3s内手动关闭应用程序并重新启动时。由于上次使用的消息未提交,因此将读取两次(重复)。
为了反复检查这个特性,我尝试过使用Flink的旧消费者API (FlinkKafkaConsumer)。它在那里工作得很好。当一条信息被立即消耗时,它就会被送回卡夫卡。
遵循的步骤
如果我遗漏了什么或任何财产需要添加,请提出建议。
@Test
public void test() throws Exception {
System.out.println("FlinkKafkaStreamsTest started ..");
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.enableCheckpointing(500);
env.setParallelism(4);
Properties propertiesOld = new Properties();
Properties properties = new Properties();
String inputTopic = "input_topic";
String bootStrapServers = "localhost:29092";
String groupId_older = "older_test1";
String groupId = "test1";
propertiesOld.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
propertiesOld.put(ConsumerConfig.GROUP_ID_CONFIG, groupId_older);
propertiesOld.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
/******************** Old Kafka API **************/
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(inputTopic,
new KRecordDes(),
propertiesOld);
flinkKafkaConsumer.setStartFromGroupOffsets();
env.addSource(flinkKafkaConsumer).print("old-api");
/******************** New Kafka API **************/
KafkaSourceBuilder<String> sourceBuilder = KafkaSource.<String>builder()
.setBootstrapServers(bootStrapServers)
.setTopics(inputTopic)
.setGroupId(groupId)
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperty("enable.auto.commit", "false")
.setProperty("commit.offsets.on.checkpoint", "true")
.setProperties(properties)
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST));
KafkaSource<String> kafkaSource = sourceBuilder.build();
SingleOutputStreamOperator<String> source = env
.fromSource(kafkaSource, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");
source.print("new-api");
env.execute();
}
static class KRecordDes implements KafkaDeserializationSchema<String>{
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
return new String(consumerRecord.value());
}
}注意:我需要在相同代码中使用Flink有界源代码阅读器的其他需求,这些代码可以在新API(KafkaSource)中获得。
发布于 2021-12-16 16:43:07
资料来源:
注意到,卡夫卡源代码做而不是依赖于提交的偏移量来实现容错。提交偏移仅用于公开使用者和消费群体的进度以进行监视。
当Flink作业从失败中恢复时,而不是在代理上使用提交的偏移量,它将从最近的成功检查点恢复状态,并从存储在该检查点中的偏移量恢复消耗,因此检查点之后的记录将被“重放”一点。因为您使用的是打印接收器,这并不完全支持-一旦语义,您将看到重复的记录,实际上是记录后,最近成功的检查点。
对于您提到的偏移提交的2-3秒延迟,这是因为SourceReaderBase的实现。简而言之,SplitFetcher管理一个任务队列,当一个偏移提交任务被推入队列时,它将在调用KafkaConsumer#poll()超时的正在运行的提取任务超时之前不会被执行。如果交通流量很小,延误可能会更长。但是请注意,这不会影响正确性: KafkaSource不会使用提交的偏移量来进行容错。
https://stackoverflow.com/questions/70375310
复制相似问题