温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

如何进行Java--Netty的入门

发布时间:2021-11-20 14:10:13 来源:亿速云 阅读:157 作者:柒染 栏目:大数据

这篇文章将为大家详细讲解有关如何进行Java--Netty的入门,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

Netty简介

Netty是一个高性能,高可扩展性的异步事件驱动的网络应用程序框架,它极大的简化了TCP和UDP客户端和服务器端网络开发。它是一个NIO框架,对Java NIO进行了良好的封装。作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。

Netty的特性

  • 统一的API,适用于不同的协议

  • 基于灵活、可扩展的事件驱动模型

  • 高度可定制的线程模型

  • 更好的吞吐量,低延迟

  • 更省资源,尽量减少不必要的内存拷贝

  • 完整的SSL/TLS和STARTTLS的支持

  • 能在Applet与Android的限制环境运行良好

  • 不再因过快、过慢或超负载连接导致OutOfMemoryError

  • 不再有在高速网络环境下NIO读写频率不一致的问题

Netty核心内容

Netty中最核心的内容主要有以下四个方面:

  • Reactor线程模型:一种高性能的多线程程序设计思路

  • Netty中自己定义的Channel概念:增强版的通道概念

  • ChannelPipeline职责链设计模式:事件处理机制

  • 内存管理:增强的ByteBuf缓冲区

Netty整体结构图

如何进行Java--Netty的入门

Netty核心组件

EventLoop:EventLoop维护了一个线程和任务队列,支持异步提交执行任务。EventLoop自身实现了Executor接口,当调用executor方法提交任务时,则判断是否启动,未启动则调用内置的executor创建新线程来触发run方法执行,其大致流程参考Netty源码SingleThreadEventExecutor如下:

如何进行Java--Netty的入门

EventLoopGroup:EventLoopGroup主要是管理eventLoop的生命周期,可以将其看作是一个线程池,其内部维护了一组EventLoop,每个eventLoop对应处理多个Channel,而一个Channel只能对应一个EventLoop

如何进行Java--Netty的入门

Bootstrap:BootStrap 是客户端的引导类,主要用于客户端连接远程主机,有1个EventLoopGroup。Bootstrap 在调用 bind()(连接UDP)和 connect()(连接TCP)方法时,会新创建一个单独的、没有父 Channel 的 Channel 来实现所有的网络交换。

ServerBootstrap:ServerBootstrap 是服务端的引导类,主要用户服务端绑定本地端口,有2个EventLoopGroup。ServerBootstarp 在调用 bind() 方法时会创建一个 ServerChannel 来接受来自客户端的连接,并且该 ServerChannel 管理了多个子 Channel 用于同客户端之间的通信。

Channel:Netty中的Channel是一个抽象的概念,可以理解为对Java NIO Channel的增强和扩展,增加了许多新的属性和方法,如bing方法等。

ChannelFuture:ChannelFuture能够注册一个或者多个ChannelFutureListener 实例,当操作完成时,不管成功还是失败,均会被通知。ChannelFuture存储了之后执行的操作的结果并且无法预测操作何时被执行,提交至Channel的操作按照被唤醒的顺序被执行。

ChannelHandler:ChannelHandler用来处理业务逻辑,分别有入站和出站的实现。

ChannelPipeline:ChannelPipeline 提供了 ChannelHandler链的容器,并定义了用于在该链上传播入站和出站事件流的API。

Netty线程模型

Netty的线程模型是基于Reactor模式的线程实现。关于Reactor模式可以参考Reactor模式 ,Netty中依据用户的配置可以支持单线程的Reactor模型,多线程的Reactor模型以及主从多Reactor的模型。在Netty中其大致流程如下如下:

如何进行Java--Netty的入门

Netty入门代码示例

服务端代码示例:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.nio.charset.Charset;
public class EchoServer {
 public static void main(String[] args) {

   // accept线程组,用来接受连接
   EventLoopGroup bossGroup = new NioEventLoopGroup(1);

   // I/O线程组, 用于处理业务逻辑
   EventLoopGroup workerGroup = new NioEventLoopGroup(1);

   try {
     // 服务端启动引导
     ServerBootstrap b = new ServerBootstrap();

     b.group(bossGroup, workerGroup) // 绑定两个线程组
         .channel(NioServerSocketChannel.class) // 指定通道类型
         .option(ChannelOption.SO_BACKLOG, 100) // 设置TCP连接的缓冲区
         .handler(new LoggingHandler(LogLevel.INFO)) // 设置日志级别
         .childHandler(
             new ChannelInitializer<SocketChannel>() {
               @Override
               protected void initChannel(SocketChannel socketChannel) throws Exception {
                 ChannelPipeline pipeline = socketChannel.pipeline(); // 获取处理器链
                 pipeline.addLast(new EchoServerHandler()); // 添加新的件处理器

               }

             });

     // 通过bind启动服务
     ChannelFuture f = b.bind(8080).sync();

     // 阻塞主线程,知道网络服务被关闭
     f.channel().closeFuture().sync();

   } catch (Exception e) {
     e.printStackTrace();
   } finally {
     workerGroup.shutdownGracefully();
     bossGroup.shutdownGracefully();
   }
 }

}

class EchoServerHandler extends ChannelInboundHandlerAdapter {

 // 每当从客户端收到新的数据时,这个方法会在收到消息时被调用
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
   System.out.println("收到数据:" + ((ByteBuf) msg).toString(Charset.defaultCharset()));
   ctx.write(Unpooled.wrappedBuffer("Server message".getBytes()));
   ctx.fireChannelRead(msg);
 }
 // 数据读取完后被调用  @Override
 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
   ctx.flush();
 }
 // 当Netty由于IO错误或者处理器在处理事件时抛出的异常时被调用  @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
   cause.printStackTrace();
   ctx.close();
 }
}

客户端代码示例:

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.nio.charset.Charset;
public class EchoClient {
 public static void main(String[] args) {
   EventLoopGroup group = new NioEventLoopGroup();
   try {
     Bootstrap b = new Bootstrap();
     b.group(group)
         .channel(NioSocketChannel.class)
         .option(ChannelOption.TCP_NODELAY, true)
         .handler(
             new ChannelInitializer<SocketChannel>() {
               @Override
               public void initChannel(SocketChannel ch) throws Exception {
                 ChannelPipeline p = ch.pipeline();
                 p.addLast(new EchoClientHandler());
               }
             });
     ChannelFuture f = b.connect("127.0.0.1", 8080).sync();
     f.channel().closeFuture().sync();
   } catch (Exception e) {
     e.printStackTrace();
   } finally {
     group.shutdownGracefully();
   }
 }

}

class EchoClientHandler extends ChannelInboundHandlerAdapter {
 private final ByteBuf firstMessage;
 public EchoClientHandler() {
   firstMessage = Unpooled.buffer(256);
   for (int i = 0; i < firstMessage.capacity(); i++) {
     firstMessage.writeByte((byte) i);
   }
 }
 @Override
 public void channelActive(ChannelHandlerContext ctx) {
   ctx.writeAndFlush(firstMessage);
 }
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) {
   System.out.println("收到数据:" + ((ByteBuf) msg).toString(Charset.defaultCharset()));
   ctx.write(Unpooled.wrappedBuffer("Client message".getBytes()));
 }
 @Override
 public void channelReadComplete(ChannelHandlerContext ctx) {
   ctx.flush();
 }
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
   cause.printStackTrace();
   ctx.close();
 }
}

关于如何进行Java--Netty的入门就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI