首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何将Faust中的消费者设置为特定的偏移量

如何将Faust中的消费者设置为特定的偏移量
EN

Stack Overflow用户
提问于 2020-02-18 01:07:48
回答 2查看 1.5K关注 0票数 3

从Faust文档中,我找不到如何将消费者设置为特定的偏移量。

使用confluent-kafka时,我使用consumer.offsets_for_times查找start_offset,然后将TopicPartition分配给该特定偏移量,如下所示:

代码语言:javascript
复制
start_offset = consumer.offsets_for_times([
    TopicPartition("prediction.OfferPredictionCheckpoint", 0, int(start_date)),
    TopicPartition("prediction.OfferPredictionCheckpoint", 1, int(start_date)),
])

consumer.assign([
    TopicPartition("prediction.OfferPredictionCheckpoint", partition_number, pos)
])

关于浮士德,我找不到更多:

代码语言:javascript
复制
consumer_auto_offset_reset

它只允许您设置最早或最晚。我该如何从特定的小时或一天的开始开始阅读呢?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-02-20 02:25:52

我想这可能就是你要找的:https://faust.readthedocs.io/en/latest/reference/faust.transport.consumer.html#faust.transport.consumer.Consumer.seek

它可以转到特定的偏移量,但是我不认为有一种简单的方法可以告诉它转到特定的时间或日期,而不需要一些额外的逻辑(也许可以使用偏移量进行二进制搜索?)。

票数 1
EN

Stack Overflow用户

发布于 2021-05-28 17:27:18

要将偏移量设置为特定值,可以使用以下示例。这里我将偏移量设置为50000。每次我启动我的应用程序时,代理程序从偏移量50000开始读取。为此,我使用app.consumer.seek

这里,tp接受两个参数,在本例中为topic - test,分区号为0。有关更多信息,请访问faust.types

代码语言:javascript
复制
from faust.types import TP, Message

tp = TP("test", 0)
topic = app.topic(tp.topic)

@app.task()
async def on_start():
    await app.consumer.seek(tp, 50000)
    print("App startet")

@app.agent(topic)
async def receive(stream):
    async for event in stream.events():
        print((event.message.offset, event.value))
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60267358

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档