MapReduce读取每个统计好的文件 MapTask任务中,设置一个公共堆,大小为100,将每个ip的频率进行插入堆中,最小的将被丢弃掉,结束后统计出每个文件的top100 使用一个ReduceTask进行一次求topN
前言 使用 flink 很长一段时间了,突然发现竟然没有计算过 topN,这可是 flink 常见的计算场景了, 故自己想了一个场景来计算一下。
思路1:快速选择算法 可以采用快速选择算法,借助快排,设mid为每次划分中间结果,每次划分完之后如果mid==k,则说明序列刚刚好,第k位置和他前面的位置都是前K大的数,如果mid < k,则说明第K大的元素在后半部分,则前半部分肯定是前K大的数,只需从后半部分找k – mid大的数即可,否则如果mid > k,则说明第K大的数在前半部分,只需从前半部分找前K大的数字即可。 时间复杂度:假设每次划分的mid都在中间,每层都只是对一半做划分,所以每次划分的数据量为 n,n/2,n/4,n/8…一共有logn层,根据等比数列可以算出来时间复杂度为O(n) C++代码演示
2 TopN ? TopN不带X,但是TopN达到的效果与RankX有异曲同工之妙。它的特别之处是返回的不是值,是前N行的表,所以需要与Calculate或其他计算类函数结合起来使用。 [前5名销售量] = Calculate([销售量],TopN(5,all('区域负责人名单'),[销售量])) ? TopN返回的表更改了矩阵表中的初始上下文,所以每一行的结果都为136。 这个时候,如果老板想要看前10名的情况,你只需要把TopN公式里的5改成10;如果想要按季度分析,只要把日历表中的年份月份换成年份季度,如果想要计算销售额而不是销售量,那就把销售量度量值都替换成销售额。
Kylin在1.6.0版本中提到了TopN的性能提升非常大:https://issues.apache.org/jira/browse/KYLIN-1917 TopN相关的类存放路径:/core-metadata /src/main/java/org/apache/kylin/measure/topn 经过测试之后发现,确实有了很大的提升。 在Kylin1.5.4版本中,TopN的性能非常差,有时候cube中如果定义了TopN度量,那么build任务就会需要非常久的时间,甚至直接ERROR。 我们主要是为了了解TopN的实现,因此这里不多做介绍。 Kylin为TopN度量也实现了一个专门的类TopNMeasureType,这个类包括了TopN度量的一些基本信息,主要跟Kylin的度量实现框架有关系,我们也不多做介绍。
场景描述:TopN 是统计报表和大屏非常常见的功能,主要用来实时计算排行榜。流式的TopN可以使业务方在内存中按照某个统计指标(如出现次数)计算排名并快速出发出更新后的排行榜。 我们以统计词频为例展示一下如何快速开发一个计算TopN的flink程序。 关键词:Flink TopN TopN 是统计报表和大屏非常常见的功能,主要用来实时计算排行榜。 嵌套TopN 全局topN的缺陷是,由于windowall是一个全局并发为1的操作,所有的数据只能汇集到一个节点进行 TopN 的计算,那么计算能力就会受限于单台机器,容易产生数据热点问题。 解决思路就是使用嵌套 TopN,或者说两层 TopN。在原先的 TopN 前面,再加一层 TopN,用于分散热点。 例如可以先加一层分组 TopN,第一层会计算出每一组的 TopN,而后在第二层中进行合并汇总,得到最终的全网TopN。第二层虽然仍是单点,但是大量的计算量由第一层分担了,而第一层是可以水平扩展的。
[开发技巧]·TopN指标计算方法 ? 1.概念介绍 在图片分类的中经常可以看到Top-1,Top-5等TopN准确率(或者时错误率)。 那这个TopN是什么意思呢? 最终得到[3 1 2 0] 这个怎么应用到TopN计算中呢?
这也是在TopN问题中,能始终保持N个元素,并且很高效的一个原因. 删除最小节点过程是用树的最后一个节点替换为根节点,并重新调整为小顶堆. 在java中,解决TopN问题,可以直接使用优先队列类(PriorityBlockingQueue),这个类已经替我们实现了添加和删除操作,并且能通过扩展Comparator能自定义排序方法.有兴趣的可以看看源码 siftUpComparable() siftUpUsingComparator() 删除节点方法: siftDownComparable() siftDownUsingComparator() 我们通过TopN 问题,发现小顶堆是最优解,并详细了解了节点的添加和删除过程.最后附上Java版的小顶堆(优先队列)如何解决TopN问题的. 附上代码: static void topN(int[] data, int N) { PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue
这个时候就需要TOPN函数了: 语法= TOPN(<n_value>,