从Faust文档中,我找不到如何将消费者设置为特定的偏移量。
使用confluent-kafka时,我使用consumer.offsets_for_times查找start_offset,然后将TopicPartition分配给该特定偏移量,如下所示:
start_offset = consumer.offsets_for_times([
TopicPartition("prediction.OfferPredictionCheckpoint", 0, int(start_date)),
TopicPartition("prediction.OfferPredictionCheckpoint", 1, int(start_date)),
])
consumer.assign([
TopicPartition("prediction.OfferPredictionCheckpoint", partition_number, pos)
])关于浮士德,我找不到更多:
consumer_auto_offset_reset它只允许您设置最早或最晚。我该如何从特定的小时或一天的开始开始阅读呢?
发布于 2020-02-20 02:25:52
它可以转到特定的偏移量,但是我不认为有一种简单的方法可以告诉它转到特定的时间或日期,而不需要一些额外的逻辑(也许可以使用偏移量进行二进制搜索?)。
发布于 2021-05-28 17:27:18
要将偏移量设置为特定值,可以使用以下示例。这里我将偏移量设置为50000。每次我启动我的应用程序时,代理程序从偏移量50000开始读取。为此,我使用app.consumer.seek
这里,tp接受两个参数,在本例中为topic - test,分区号为0。有关更多信息,请访问faust.types
from faust.types import TP, Message
tp = TP("test", 0)
topic = app.topic(tp.topic)
@app.task()
async def on_start():
await app.consumer.seek(tp, 50000)
print("App startet")
@app.agent(topic)
async def receive(stream):
async for event in stream.events():
print((event.message.offset, event.value))https://stackoverflow.com/questions/60267358
复制相似问题