温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

再看 Kafka Lag

发布时间:2020-07-14 07:03:17 来源:网络 阅读:583 作者:Java_老男孩 栏目:编程语言

在《Kafka的Lag计算误区及正确实现》一文中提及了kafka.admin.ConsumerGroupCommand.PartitionAssignmentState无法被外部访问,故要将PartitionAssignmentState前的protected修饰符去掉

可以直接将describeGroup返回的结果转换成JSON然后传至监控页面(supported by YANGliiN oba)。代码如下:

String[] agrs = {"--describe", "--bootstrap-server", brokers, "--group", groupId};
ConsumerGroupCommand.ConsumerGroupCommandOptions options =
        new ConsumerGroupCommand.ConsumerGroupCommandOptions(agrs);
ConsumerGroupCommand.KafkaConsumerGroupService kafkaConsumerGroupService =
        new ConsumerGroupCommand.KafkaConsumerGroupService(options);
ObjectMapper mapper = new ObjectMapper();
//1. 使用jackson-module-scala_2.12
mapper.registerModule(new DefaultScalaModule());
//2. 反序列化时忽略对象不存在的属性
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
//3. 将Scala对象序列化成JSON字符串
String source = mapper.writeValueAsString(kafkaConsumerGroupService.describeGroup()._2.get());

这里需要采用的是jackson-module-scala的包实现,如果直接用普通的JSON序列化方式那么会达不到想要的效果,jackson以及jackson-module-scala对应的Maven库如下:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>2.9.4</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.module</groupId>
    <artifactId>jackson-module-scala_2.12</artifactId>
    <version>2.9.5</version>
</dependency>

注意如果本地安装的Scala版本与所配置的jackson-module-scala版本不一致的话会报出一些异常。发散一下思维:既然可以序列化为JSON,那么完全可以通过JSON再反序列化会对象,只不过通过JSON作为中间媒介,将访问受限的Scala对象转变为Java对象,上面剩余代码如下:

//4. 将JSON字符串反序列化成Java对象
List<PartitionAssignmentState> target = mapper.readValue(source,
        getCollectionType(mapper,List.class,PartitionAssignmentState.class));
//5. 排序
target.sort((o1, o2) -> o1.getPartition() - o2.getPartition());
//6. 打印
printPasList(target);

如此就可以达到与前面几篇文章中关于获取消费者详情功能同样的效果。这里有两个注意要点:

  1. PartitionAssignmentState中的coordinator是Node类型,这个类型需要自定义,Kafka原生的会报错。
  2. 反序列化时Node会有一个empty的属性不识别,解决方案参考代码中的步骤2.

代码更多细节请参考:代码

通过JSON的序列化和反序列化操作实现了原本不能为之的事情,那么思维再发散一下,也可以序列化成字节流,比如通过ByteBuffer进行转换,只不过编程逻辑变得复杂了。

上面这段陈述有可能会让人觉得Scala与Java之间的互操作起来不容易,其实不然,上面这段陈述只是用来补充一下如何获取消费者详情的另一种方法,Scala与Java之间的互操作还是比较简单的,一般情况下都可以直接使用对方的类。对于集合而言,Scala中还有用于Scala与Java集合的互转的scala.collection.JavaConverters(scala2.8.1开始引入),与此雷同的scala.collection.JavaConversions已被标注为@Deprecated(since 2.12.0)。在scala代码中如果需要集合转换,首先引入scala.collection.JavaConverters._,进而显示调用asJava或者asScala方法完成转型。关于Scala与Java集合互转的介绍会在下一篇文章中呈现。


本文的重点是你有没有收获与成长,其余的都不重要,希望读者们能谨记这一点。同时我经过多年的收藏目前也算收集到了一套完整的学习资料,包括但不限于:分布式架构、高可扩展、高性能、高并发、Jvm性能调优、Spring,MyBatis,Nginx源码分析,Redis,ActiveMQ、、Mycat、Netty、Kafka、Mysql、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多个知识点高级进阶干货,希望对想成为架构师的朋友有一定的参考和帮助

需要更详细思维导图和以下资料的可以加一下技术交流分享群:“708 701 457”免费获取

再看 Kafka Lag
再看 Kafka Lag
再看 Kafka Lag
再看 Kafka Lag

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI