我想要解决的问题
所以我有一个Observable股票的来源。这只会给我感兴趣的股票带来麻烦。我需要做的是接收这些股票价格,在每一次x秒之后(例如,假设每3秒)发送一个价格快照进行处理。如果在3秒内,我收到两个滴答为同一股票,我只需要最新的滴答。这种处理计算量很大,因此,如果可能的话,我希望避免将相同的股票价格发送给处理两次。
举个例子。
假设在序列开始的时候,我收到了两个滴答声:->,,MSFT:1美元,GOOG:2美元。
在接下来的3秒内,我什么也没收到,所以MSFT & GOOG滴答应该被发送到处理。
现在,我收到新的滴答声:-> :1美元,GOOG:3美元,INTL:3美元
再一次,让我们假设在接下来的3秒内什么都不会出现。
在这里,由于MSFT的价格没有变化(仍然是1美元),只有GOOG & INTL应该被发送去处理。
这会在一天内重复。
现在,我认为Rx有助于以简单而优雅的方式解决这类问题。但我有问题要问正确的问题。到目前为止,这就是我所拥有的,我将试图解释它的作用以及它的问题所在
var finalQuery =
from priceUpdate in **Observable<StockTick>**
group priceUpdate by priceUpdate.Stock into grouped
from combined in Observable.Interval(TimeSpan.FromSeconds(3))
.CombineLatest(grouped, (t, pei) => new { PEI = pei, Interval = t })
group combined by new { combined.Interval } into combined
select new
{
Interval = combined.Key.Interval,
PEI = combined.Select(c => new StockTick(c.PEI.Stock, c.PEI.Price))
};
finalQuery
.SelectMany(combined => combined.PEI)
.Distinct(pu => new { pu.Stock, pu.Price })
.Subscribe(priceUpdate =>
{
Process(priceUpdate);
});
public class StockTick
{
public StockTick(string stock, decimal price)
{
Stock = stock;
Price = price;
}
public string Stock {get;set;}
public decimal Price {get;set;}
}因此,这得到了股票价格,按股票分组,然后将这个分组序列中的最新信息与Observable.Interval组合起来。通过这种方式,我试图确保只处理股票的最新滴答,并且每3秒启动一次。
然后,它再一次将它按间隔分组,因此,每隔3秒我就有一组序列。
作为最后一步,我使用SelectMany将这个序列简化为股票价格更新序列,并使用Distinct来确保同一股票的相同价格不会被处理两次。
这个查询有两个问题,我不喜欢。首先,我真的不喜欢双组赛-有什么办法可以避免吗?第二,使用这种方法,我必须一个接一个地处理价格,我真正想要的是快照--也就是在3秒内,我将打包并发送给处理,但不知道如何捆绑处理。
我很乐意以其他方式来解决这个问题,但是我更愿意呆在Rx中,除非真的有更好的东西。
发布于 2017-07-02 23:26:14
几件事:
Sample操作符:DistinctUntilChanged而不是Distinct。如果您使用Distinct,那么如果MSFT从1美元降到2美元,然后返回到1美元,那么您将不会在第三个勾号上得到一个事件。我想你的解决方案会是这样的:
IObservable<StockTick> source;
source
.GroupBy(st => st.Stock)
.Select(stockObservable => stockObservable
.Sample(TimeSpan.FromSeconds(3))
.DistinctUntilChanged(st => st.Price)
)
.Merge()
.Subscribe(st => Process(st));编辑 (Distinct性能问题):
每个Distinct操作符都必须在其中维护完整的不同历史记录。如果你有一只高价的股票,比如AMZN,到目前为止,它的价格从958美元到974美元不等,那么你可能会得到很多数据。这是大约1600个可能的数据点,它们必须位于内存中,直到您取消订阅Distinct为止。它也将最终降低性能,因为每个AMZN滴答必须与目前1600点的数据点进行比较,然后才能通过。如果这是一个长期运行的过程(跨越多个交易日),那么您将得到更多的数据点。
给定N个股票,就有N个Distinct运算符需要相应地操作。将这种行为乘以N个股票,你就会遇到一个不断增加的问题。
https://stackoverflow.com/questions/44873667
复制相似问题