温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

flink中怎么使用自定义聚合函数统计网站TP指标

发布时间:2021-12-31 10:37:34 来源:亿速云 阅读:118 作者:iii 栏目:大数据

这篇文章主要讲解了“flink中怎么使用自定义聚合函数统计网站TP指标”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“flink中怎么使用自定义聚合函数统计网站TP指标”吧!

背景

在网站性能测试中,我们经常会选择 TP50、TP95 或者 TP99 等作为性能指标。接下来我们讲讲这些指标的含义、以及在flink中如何实时统计:

  • TP50,top percent 50,即 50% 的数据都满足某一条件;

  • TP95,top percent 95,即 95% 的数据都满足某一条件;

  • TP99,top percent 99,即 99% 的数据都满足某一条件;

我们举一个例子,我们要统计网站一分钟之内的的响应时间的TP90,正常的处理逻辑就是把这一分钟之内所有的网站的响应时间从小到大排序,然后计算出总条数count,然后计算出排名在90%的响应时间是多少(count*0.9),就是我们要的值。

自定义聚合函数

这个需求很明显就是一个使用聚合函数来做的案例,Flink中提供了大量的聚合函数,比如count,max,min等等,但是对于这个需求,却无法满足,所以我们需要自定义一个聚合函数来实现我们的需求。

在前段时间,我们聊了聊flink的聚合算子,具体可参考: flink实战-聊一聊flink中的聚合算子 , 聚合算子是我们在写代码的时候用来实现一个聚合功能,聚合函数其实和聚合算子类似,只不过聚合函数用于在写sql的时候使用。

自定义聚合函数需要继承抽象类org.apache.flink.table.functions.AggregateFunction。并实现下面几个方法。

  • createAccumulator():这个方法会在一次聚合操作的开始调用一次,主要用于构造一个Accumulator,用于存储在聚合过程中的临时对象。

  • accumulate() 这个方法,每来一条数据会调用一次这个方法,我们就在这个方法里实现我们的聚合函数的具体逻辑。

  • getValue() 这个方法是在聚合结束以后,对中间结果做处理,然后将结果返回,最终sql中得到的结果数据就是这个值。

实例讲解

对于TP指标,正常的思路我们可以先创建一个临时变量,里面有一个list,每来一个数据,就放到这个list里面,在getValue方法里,进行排序,取相应的TP值。

但是这种思路会有一个问题,就是如果要聚合的时间范围内,数据过多的话。就会在list存储大量的数据,会造成checkpoint过大,时间过长,最后导致程序失败。得不到正确的结果。

所以我们需要换一个思路,既然最后我们想要的是一个有序列表,那么我们是不是可以把这个list结构优化一下,使用Treemap来存储,map的key就是指标,比如响应时间。value就是对应的指标出现的次数。这样getValue方法里,只需要将map的value值累加,就能得到总数count,然后计算出来相应的tp值的位置position,最后我们再从头累加map的value,直到累加结果大于相应的位置position,则map的key即为所求。

示例如下:我们先构建一个source,只是随机生成一个变量,网站的相应时间response_time。

	String sql = "CREATE TABLE source (\n" +
" response_time INT,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts," +
"proctime as proctime()\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='1000',\n" +
" 'fields.response_time.min'='1',\n" +
" 'fields.response_time.max'='1000'" +
")";

定义一个聚合函数用的临时变量:

public static class TPAccum{
public Integer tp;
public Map<Integer,Integer> map = new HashMap<>();
}

实现自定义聚合函数类

public static class TP extends AggregateFunction<Integer,TPAccum>{

@Override
public TPAccum createAccumulator(){
return new TPAccum();
}

@Override
public Integer getValue(TPAccum acc){
if (acc.map.size() == 0){
return null;
} else {
Map<Integer,Integer> map = new TreeMap<>(acc.map);
int sum = map.values().stream().reduce(0, Integer::sum);

int tp = acc.tp;
int responseTime = 0;
int p = 0;
Double d = sum * (tp / 100D);
for (Map.Entry<Integer,Integer> entry: map.entrySet()){
p += entry.getValue();
int position = d.intValue() - 1;
if (p >= position){
responseTime = entry.getKey();
break;
}

}
return responseTime;
}
}

public void accumulate(TPAccum acc, Integer iValue, Integer tp){
acc.tp = tp;
if (acc.map.containsKey(iValue)){
acc.map.put(iValue, acc.map.get(iValue) + 1);
} else {
acc.map.put(iValue, 1);
}
}

}

实际的查询sql如下:

String sqlSelect =
"select TUMBLE_START(proctime,INTERVAL '1' SECOND)  as starttime,mytp(response_time,50) from source" +
" group by TUMBLE(proctime,INTERVAL '1' SECOND)";

感谢各位的阅读,以上就是“flink中怎么使用自定义聚合函数统计网站TP指标”的内容了,经过本文的学习后,相信大家对flink中怎么使用自定义聚合函数统计网站TP指标这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI