java网络编程模型之BIO、NIO、AIO

AIO模型采用“订阅-通知”模式:即应用程序向操作系统注册IO监听,然后继续做自己的事情 。当操作系统发生IO事件,并且准备好数据后,再主动通知应用程序,触发相应的函数 。流程图如下
AIO框架简析
下面的图片展示了AIO模型中各个类与接口之间的关系
基本执行流程
首先来介绍一下aio模型的基于客户端的基本执行流程
通过roup线程组创建一个对象,用于监听和接受客户端的连接请求 。调用的bind()方法,将其绑定到指定的IP地址和端口号 。调用的()方法,开始监听客户端的连接请求 。一旦监听到客户端的连接请求,在()方法中传入实例,将服务端的通道初始化 。在通道初始化时会创建一个实例作为参数传入,继承了(通用消息处理器),在类中定义了三个抽象类,分别是(通道激活前)、(通道断开前)、(读取消息抽象类前),在中对上述方法进行重写,在这些方法可以增加相应的业务逻辑 。中的方法会被触发,该方法是连接建立后进行异步读取数据的回调函数,在该回调函数中首先会读取数据进行判断是否关闭通道(在关闭通道之前会调用),然后利用缓冲流进行数据读取(),然后调用异步通道对象的read方法注册下一次的异步读取事件,进行循环读取 。由于该模型的通信方式是全双工通信模式,因此客户端和服务端可以同时互相发送消息,服务端要进行消息发送时会通过中的方法进行消息推送 。代码实现
客户端
package com.kjz.NettyDemo.Aio.client;import java.nio.channels.AsynchronousSocketChannel; // 异步Socket通道import java.net.InetSocketAddress; // InetSocketAddress类用于封装IP地址和端口号import java.nio.ByteBuffer; // ByteBuffer类用于读写操作import java.nio.charset.Charset; // Charset类用于指定字符编码方式import java.util.concurrent.Future; // Future接口用于获取异步操作的结果public class AioClient {public static void main(String[] args) throws Exception {// 创建异步Socket通道AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();// 建立连接并返回Future对象Future future = socketChannel.connect(new InetSocketAddress("192.168.1.116", 7397));// 打印启动提示信息System.out.println("kjz-demo-netty client start done.");// 等待连接完成future.get();// 通过通道进行数据读取操作//参数一:一个容量为1024的字节缓冲区对象,用于存储服务器发送的数据 。//参数二:是一个附件对象,可以在异步操作完成后传递给回调函数 。//参数三:回调函数,用于在异步读取完成后处理读取的结果(消息处理器)socketChannel.read(ByteBuffer.allocate(1024),null,new AioClientHandler(socketChannel, Charset.forName("GBK")));// 睡眠100秒,保持客户端运行//通过调用 Thread.sleep 方法可以暂停当前线程的执行,确保客户端的事件循环(Event Loop)持续执行,// 从而保持与服务器的通信 。Thread.sleep(100000);}}
客户端消息处理器
package com.kjz.NettyDemo.Aio.client;import com.kjz.NettyDemo.Aio.ChannelAdapter;import com.kjz.NettyDemo.Aio.ChannelHandler;import java.io.IOException;import java.nio.channels.AsynchronousSocketChannel;import java.nio.charset.Charset;import java.util.Date;//客户端消息处理器public class AioClientHandler extends ChannelAdapter {public AioClientHandler(AsynchronousSocketChannel channel, Charset charset) {super(channel, charset);}//channelActive方法在AIO客户端与服务器建立连接后被调用,输出连接信息 。@Overridepublic void channelActive(ChannelHandler ctx) {try {System.out.println("连接远程服务端:" +//ctx通道处理器,从通道处理器中获取当前连接的异步通道对象ctx.channel().getRemoteAddress()+"成功");//通知客户端链接建立成功} catch (IOException e) {e.printStackTrace();}}//断开连接@Overridepublic void channelInactive(ChannelHandler ctx) {}/*channelRead方法在AIO客户端接收到服务器发送的消息时被调用,首先输出接收到的消息,然后通过ctx.writeAndFlush方法向服务器发送响应消息,告知服务器已经成功处理该消息 。全双共通信方式*/@Overridepublic void channelRead(ChannelHandler ctx, Object msg) {System.out.println("服务端收到:" + new Date() + " " + msg + "\r\n");ctx.writeAndFlush("客户端信息处理Success!\r\n");}}
服务端
package com.kjz.NettyDemo.Aio.server;import java.net.InetSocketAddress;import java.nio.channels.AsynchronousChannelGroup;import java.nio.channels.AsynchronousServerSocketChannel;import java.util.concurrent.CountDownLatch;import java.util.concurrent.Executors;//服务端public class AioServer extendsThread{//异步服务端socket通道对象private AsynchronousServerSocketChannel serverSocketChannel;@Overridepublic void run() {try {//使用AsynchronousChannelGroup是为了管理异步通道资源,它可以将多个异步通道共享同一线程池 。// 在这里,使用固定大小的线程池来处理异步请求,避免了每次请求都创建新线程的开销 。serverSocketChannel = AsynchronousServerSocketChannel.open(AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(),10));//绑定端口serverSocketChannel.bind(new InetSocketAddress(7397));System.out.println("kjz-demo-netty server start done.");//通过CountDownLatch等待客户端连接CountDownLatch latch = new CountDownLatch(1);//监听客户端连接serverSocketChannel.accept(this, new AioServerChannelInitializer());latch.await();} catch (Exception e) {e.printStackTrace();}}public AsynchronousServerSocketChannel serverSocketChannel() {return serverSocketChannel;}public static void main(String[] args) {new AioServer().start();}}
服务端初始化通道
package com.kjz.NettyDemo.Aio.server;import com.kjz.NettyDemo.Aio.ChannelInitializer;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousSocketChannel;import java.nio.charset.Charset;import java.util.concurrent.TimeUnit;//服务端初始化通道public class AioServerChannelInitializer extends ChannelInitializer {@Overrideprotected void initChannel(AsynchronousSocketChannel channel) throws Exception {channel.read(ByteBuffer.allocate(1024), 10, TimeUnit.SECONDS,null, new AioServerHandler(channel, Charset.forName("GBK")));}}
服务端消息处理器
package com.kjz.NettyDemo.Aio.server;import com.kjz.NettyDemo.Aio.ChannelAdapter;import com.kjz.NettyDemo.Aio.ChannelHandler;import java.io.IOException;import java.nio.channels.AsynchronousSocketChannel;import java.nio.charset.Charset;import java.util.Date;public class AioServerHandler extends ChannelAdapter {public AioServerHandler(AsynchronousSocketChannel channel, Charset charset) {super(channel, charset);}@Overridepublic void channelActive(ChannelHandler ctx) {try {System.out.println("连接远程客户端:" +ctx.channel().getRemoteAddress()+"成功");//通知客户端链接建立成功ctx.writeAndFlush("服务端连接建立成功" + " " + new Date() + " " +ctx.channel().getRemoteAddress() + "\r\n");} catch (IOException e) {e.printStackTrace();}}@Overridepublic void channelInactive(ChannelHandler ctx) {}@Overridepublic void channelRead(ChannelHandler ctx, Object msg) {System.out.println("服务端收到:" + new Date() + " " + msg + "\r\n");ctx.writeAndFlush("服务端信息处理Success!\r\n");}}
通用消息处理器
package com.kjz.NettyDemo.Aio;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;import java.nio.charset.Charset;import java.util.concurrent.TimeUnit;//通用消息处理器public abstract class ChannelAdapter implements CompletionHandler {//异步socket通道private AsynchronousSocketChannel channel;//字符集private Charset charset;//构造方法public ChannelAdapter(AsynchronousSocketChannel channel, Charset charset) {this.channel = channel;this.charset = charset;//通道打开时if (channel.isOpen()) {channelActive(new ChannelHandler(channel, charset));}}//异步读取到数据后的回调函数,该方法根据异步读取结果判断是否需要关闭通道,// 然后将缓冲区中的字节数组转换为字符串,并通过channelRead方法处理读取到的消息 。@Overridepublic void completed(Integer result, Object attachment) {try {//创建一个大小为1024的ByteBuffer对象,用于存储读取到的数据final ByteBuffer buffer = ByteBuffer.allocate(1024);//定义一个超时时间,单位为秒,默认为1小时 。final long timeout = 60 * 60L;//调用channel.read方法进行异步读取//buffer参数是要读取的缓冲区//timeout是读取超时时间//TimeUnit.SECONDS表示超时时间的单位//attachment;附件//new CompletionHandler()返回一个新的CompletionHandler对象用于处理读取结果 。channel.read(buffer, timeout, TimeUnit.SECONDS, null,//匿名类new CompletionHandler() {@Overridepublic void completed(Integer result, Object attachment) {//判断是否需要关闭通道if (result == -1) {try {//首先判断result是否等于-1,如果是则表示通道已关闭,// 需要执行关闭通道的逻辑 。在关闭通道之前,会调用channelInactive方法表示通道已断开,// 并关闭通道 。channelInactive(new ChannelHandler(channel, charset));channel.close();} catch (IOException e) {e.printStackTrace();}return;}//使用buffer.flip()反转缓冲区,将读取到的数据准备为读取状态 。buffer.flip();//创建一个新的ChannelHandler对象,并通过channelRead方法处理读取到的消息,// 同时传入通道和字符集 。channelRead(new ChannelHandler(channel, charset), charset.decode(buffer));//调用buffer.clear()清空缓冲区,以便重新写入数据 。buffer.clear();//注册下一次的异步读取事件,使用之前定义的超时时间和时间单位,null表示附件,// this表示当前的CompletionHandler对象 。循环读取channel.read(buffer, timeout, TimeUnit.SECONDS, null, this);}//异步读取出现异常时的回调函数,将异常信息输出到控制台 。@Overridepublic void failed(Throwable exc, Object attachment) {exc.printStackTrace();}});} catch (Exception e) {e.printStackTrace();}}@Overridepublic void failed(Throwable exc, Object attachment) {exc.getStackTrace();}//通道激活public abstract void channelActive(ChannelHandler ctx);//通道断开public abstract void channelInactive(ChannelHandler ctx);// 读取消息抽象类public abstract void channelRead(ChannelHandler ctx, Object msg);}
通道处理器
package com.kjz.NettyDemo.Aio;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousSocketChannel;import java.nio.charset.Charset;//通道处理器public class ChannelHandler {private AsynchronousSocketChannel channel;private Charset charset;public ChannelHandler(AsynchronousSocketChannel channel, Charset charset) {this.channel = channel;this.charset = charset;}//用于将数据写入异步Socket通道中,并发送到远程节点public void writeAndFlush(Object msg) {//方法接收一个Object类型的参数msg,首先将其转换为字节数组byte[] bytes = msg.toString().getBytes(charset);//根据该字节数组的长度创建一个新的ByteBuffer对象writeBuffer 。ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);//字节数组写入缓冲区中writeBuffer.put(bytes);//将缓冲区准备为写入状态writeBuffer.flip();channel.write(writeBuffer);}public AsynchronousSocketChannel channel() {return channel;}public void setChannel(AsynchronousSocketChannel channel) {this.channel = channel;}}
初始化处理通道的回调方法
package com.kjz.NettyDemo.Aio;import com.kjz.NettyDemo.Aio.server.AioServer;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;//初始化处理通道的回调方法public abstract class ChannelInitializer implementsCompletionHandler {@Overridepublic void completed(AsynchronousSocketChannel channel, AioServer attachment) {try {//调用initChannel(channel)方法对通道进行初始化操作initChannel(channel);} catch (Exception e) {e.printStackTrace();} finally {//都会执行// 再此接收客户端连接,保持持续监听attachment.serverSocketChannel().accept(attachment, this);}}@Overridepublic void failed(Throwable exc, AioServer attachment) {exc.getStackTrace();}//初始化通道方法,具体的初始化逻辑由子类实现,因此该方法被声明为抽象方法 。protected abstract void initChannel(AsynchronousSocketChannel channel) throws Exception;}
BIO模型 基础补充
BIO模型是基于实现的,对象是实现网络通信的基础类之一,用于在客户端和服务器之间建立可靠的双向通信连接 。下面首先先介绍一下该对象
和:
类用于客户端,可以与服务端建立连接,发送请求和接收响应 。
类用于服务器端,监听指定的端口,接受客户端的连接请求,并创建相应的对象进行通信 。
构造方法:
类的常用构造方法有以下几种:
( host, int port):根据主机名和端口号创建对象 。
( , int port):根据IP地址和端口号创建对象 。
( host, int port,, int ):根据主机名、端口号、本地IP地址和本地端口号创建对象 。
类的常用构造方法有以下几种:
(int port):创建一个绑定到指定端口号的对象 。
(int port, int ):创建一个绑定到指定端口号,并指定连接请求队列长度的对象 。
(int port, int ,):创建一个绑定到指定端口号和本地IP地址,并指定连接请求队列长度的对象 。
【java网络编程模型之BIO、NIO、AIO】常用方法:
类的常用方法:
():获取与关联的输入流,用于接收服务器发送的数据 。
():获取与关联的输出流,用于向服务器发送数据 。
():判断是否连接到远程主机 。
():判断是否已关闭 。
close():关闭连接 。
类的常用方法:
():监听并接受客户端的连接请求,并返回一个新的对象供通信使用 。
():判断是否已绑定到指定的端口 。
():判断是否已关闭 。
close():关闭 。
Java中的对象是网络通信的基础类,用于建立客户端与服务器之间的连接并进行数据交换 。它提供了丰富的方法和功能,方便开发者在网络编程中进行数据传输和操作 。
执行基本流程
从服务端来分析:
创建对象,绑定监听端口 。进入循环等待客户端连接请求 。在循环中调用方法接受客户端的连接请求,一旦有客户端连接成功,就创建一个新的对象来处理与该客户端的通信 。BIO模型的特点是采用阻塞式I/O,即当服务器执行方法时,如果没有客户端连接到来,服务器会一直阻塞在这一步,直到有新的连接请求到达才会继续执行 。创建线程处理客户端的请求和数据传输 。每个客户端连接都会占用一个独立的线程来处理通信 。服务器持续监听客户端的连接请求 。代码实现
客户端
package com.kjz.NettyDemo.Bio.client;import java.io.IOException;import java.net.Socket;import java.nio.charset.Charset;public class BioClient {public static void main(String[] args) {try {Socket socket = new Socket("192.168.1.116", 7397);System.out.println("kjz-demo-netty client start done.");BioClientHandler bioClientHandler = new BioClientHandler(socket,Charset.forName("utf-8"));bioClientHandler.start();} catch (IOException e) {e.printStackTrace();}}}
客户端消息处理器
package com.kjz.NettyDemo.Bio.client;import com.kjz.NettyDemo.Bio.ChannelAdapter;import com.kjz.NettyDemo.Bio.ChannelHandler;import java.net.Socket;import java.nio.charset.Charset;import java.text.SimpleDateFormat;import java.util.Date;//客户端消息处理器public class BioClientHandler extends ChannelAdapter {public BioClientHandler(Socket socket, Charset charset) {super(socket, charset);}@Overridepublic void channelActive(ChannelHandler ctx) {System.out.println("连接报告LocalAddress:" + ctx.socket().getLocalAddress());ctx.writeAndFlush("hi! My name is KJZ BioClient to msg for you \r\n");}@Overridepublic void channelRead(ChannelHandler ctx, Object msg) {System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);ctx.writeAndFlush("hi 我已经收到你的消息Success!\r\n");}}
服务端
package com.kjz.NettyDemo.Bio.server;import com.kjz.NettyDemo.Bio.server.BioServerHandler;import java.io.IOException;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.net.Socket;import java.nio.charset.Charset;public class BioServer extends Thread {private ServerSocket serverSocket = null;public static void main(String[] args) {BioServer bioServer = new BioServer();bioServer.start();}@Overridepublic void run() {try {serverSocket = new ServerSocket();serverSocket.bind(new InetSocketAddress(7397));System.out.println("kjz-demo-netty bio server start done. ");//进入循环等待客户端连接请求:while (true) {//在循环中调用accept方法接受客户端的连接请求,一旦有客户端连接成功,// 就创建一个新的Socket对象来处理与该客户端的通信 。Socket socket = serverSocket.accept();//创建BioServerHandler线程处理客户端的请求和数据传输BioServerHandler handler = newBioServerHandler(socket, Charset.forName("GBK"));handler.start();}} catch (IOException e) {e.printStackTrace();}}}
服务端消息处理器
package com.kjz.NettyDemo.Bio.server;import com.kjz.NettyDemo.Bio.ChannelAdapter;import com.kjz.NettyDemo.Bio.ChannelHandler;import java.net.Socket;import java.nio.charset.Charset;import java.text.SimpleDateFormat;import java.util.Date;//服务端消息处理器public class BioServerHandler extends ChannelAdapter {public BioServerHandler(Socket socket, Charset charset) {super(socket, charset);}@Overridepublic void channelActive(ChannelHandler ctx) {System.out.println("连接LocalAddress:" + ctx.socket().getLocalAddress());ctx.writeAndFlush("hi! My name is KJZ BioServer to msg for you \r\n");}@Overridepublic void channelRead(ChannelHandler ctx, Object msg) {System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);ctx.writeAndFlush("hi 我已经收到你的消息Success!\r\n");}}
通用消息处理器
package com.kjz.NettyDemo.Bio;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.net.Socket;import java.nio.charset.Charset;//通用消息处理器public abstract class ChannelAdapter extends Thread {private Socket socket;private ChannelHandler channelHandler;private Charset charset;public ChannelAdapter(Socket socket, Charset charset) {this.socket = socket;this.charset = charset;while (!socket.isConnected()) {break;}channelHandler = new ChannelHandler(this.socket, charset);channelActive(channelHandler);}@Overridepublic void run() {try {BufferedReader input = new BufferedReader(new InputStreamReader(this.socket.getInputStream(), charset));String str = null;while ((str = input.readLine()) != null) {channelRead(channelHandler, str);}} catch (IOException e) {e.printStackTrace();}}// 连接通知抽象类public abstract void channelActive(ChannelHandler ctx);// 读取消息抽象类public abstract void channelRead(ChannelHandler ctx, Object msg);}
通道处理器
package com.kjz.NettyDemo.Bio;import java.io.IOException;import java.io.OutputStream;import java.net.Socket;import java.nio.charset.Charset;//通道处理器publicclass ChannelHandler {private Socket socket;private Charset charset;public ChannelHandler(Socket socket, Charset charset) {this.socket = socket;this.charset = charset;}//消息写入public void writeAndFlush(Object msg) {OutputStream out = null;try {out = socket.getOutputStream();out.write((msg.toString()).getBytes(charset));out.flush();} catch (IOException e) {e.printStackTrace();}}public Socket socket() {return socket;}}
NIO模型 基础补充

是Java NIO中的一个重要组件,用于实现非阻塞IO操作 。它可以通过一个线程同时监听多个的IO事件,从而实现高效的IO多路复用 。
的主要作用是管理多个,并监听这些上的IO事件 。它可以通过调用()方法阻塞等待就绪的IO事件,然后返回就绪的IO事件的数量 。通过()方法可以获取到已经就绪的IO事件的集合,然后可以遍历这个集合进行相应的处理 。
的常用方法包括:

java网络编程模型之BIO、NIO、AIO

文章插图
open():创建一个新的对象 。
close():关闭对象 。
():阻塞等待就绪的IO事件,返回就绪的IO事件的数量 。
(long ):阻塞等待就绪的IO事件,最多等待毫秒,返回就绪的IO事件的数量 。
():非阻塞立即返回就绪的IO事件的数量 。
():唤醒阻塞在()方法上的线程 。
keys():返回当前注册在上的所有的 。
():返回已经就绪的IO事件的集合 。
在使用时,需要将注册到上,并指定感兴趣的IO事件,如读、写、连接、接收等事件 。通过可以获取到注册的以及感兴趣的IO事件 。
执行基本流程
从服务端的角度进行分析:
启动服务器时,创建一个实例,并打开一个通道,并对进行响应配置 。
使用绑定指定的端口号,并设置最大连接数(.().bind(new (port), 1024)) 。
将注册到上,设置关注的事件为.,表示对客户端连接事件感兴趣(.(, .)) 。
创建一个实例,并将和字符集传递给它 。
开始进入事件循环(while(true)),不断轮询上发生的事件 。
调用的()方法,阻塞等待事件发生 。
当某个事件发生时,()方法返回,并返回一组发生事件的集合 。
遍历处理每个,判断其对应的事件类型 。
如果是事件,表示有客户端连接请求,调用的()方法处理连接请求 。
如果是事件,表示有数据可读,调用的()方法处理读取的数据 。
在()和()方法中,可以通过的()方法将要发送的数据写入通道 。
继续循环执行步骤6-12,处理下一个事件 。
代码实现
客户端
package com.kjz.NettyDemo.Nio.client;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.nio.charset.Charset;public class NioClient {public static void main(String[] args) throws IOException {//创建Selector对象Selector selector = Selector.open();//创建SocketChannel对象SocketChannel socketChannel = SocketChannel.open();//配置为非阻塞模式socketChannel.configureBlocking(false);//尝试建立连接boolean isConnect = socketChannel.connect(new InetSocketAddress("192.168.1.116", 7397));if (isConnect) {//如果连接成功,则表示该SocketChannel已经可以进行读操作(OP_READ),// 因此将其注册到Selector上,等待IO事件的发生 。socketChannel.register(selector, SelectionKey.OP_READ);} else {//如果连接失败,则需要等待连接建立完成(OP_CONNECT)的IO事件 。// 同样将该SocketChannel注册到Selector上,等待IO事件的发生 。socketChannel.register(selector, SelectionKey.OP_CONNECT);}System.out.println("kjz-demo-netty nio client start done.");//创建客户端消息处理器对象,,并传入Selector和字符集参数,然后启动该对象的线程 。new NioClientHandler(selector, Charset.forName("GBK")).start();}}
客户端消息处理器
package com.kjz.NettyDemo.Nio.client;import com.kjz.NettyDemo.Nio.ChannelAdapter;import com.kjz.NettyDemo.Nio.ChannelHandler;import java.io.IOException;import java.nio.channels.Selector;import java.nio.charset.Charset;import java.text.SimpleDateFormat;import java.util.Date;//客户端消息处理器public class NioClientHandler extends ChannelAdapter {public NioClientHandler(Selector selector, Charset charset) {super(selector, charset);}@Overridepublic void channelActive(ChannelHandler ctx) {try {System.out.println("连接报告LocalAddress:" + ctx.channel().getLocalAddress());//向服务端响应连接成功的信息ctx.writeAndFlush("hi! My name is KJZ NioClient to msg for you \r\n");} catch (IOException e) {e.printStackTrace();}}@Overridepublic void channelRead(ChannelHandler ctx, Object msg) {System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+ " 接收到消息:" + msg);ctx.writeAndFlush("hi 我已经收到你的消息Success!\r\n");}}
服务端
package com.kjz.NettyDemo.Nio.server;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.charset.Charset;public class NioServer {private Selector selector;private ServerSocketChannel socketChannel;public static void main(String[] args) throws IOException {new NioServer().bind(7397);}public void bind(int port) {try {selector = Selector.open();socketChannel = ServerSocketChannel.open();socketChannel.configureBlocking(false);socketChannel.socket().bind(new InetSocketAddress(port), 1024);socketChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("itstack-demo-netty nio server start done. ");new NioServerHandler(selector, Charset.forName("GBK")).start();} catch (IOException e) {e.printStackTrace();}}}
服务端消息处理器
package com.kjz.NettyDemo.Nio.server;import com.kjz.NettyDemo.Nio.ChannelAdapter;import com.kjz.NettyDemo.Nio.ChannelHandler;import java.io.IOException;import java.nio.channels.Selector;import java.nio.charset.Charset;import java.text.SimpleDateFormat;import java.util.Date;//服务端消息处理器public class NioServerHandler extends ChannelAdapter {public NioServerHandler(Selector selector, Charset charset) {super(selector, charset);}@Overridepublic void channelActive(ChannelHandler ctx) {try {System.out.println("连接报告LocalAddress:" + ctx.channel().getLocalAddress());ctx.writeAndFlush("hi! My name is KJZ NioServer to msg for you \r\n");} catch (IOException e) {e.printStackTrace();}}@Overridepublic void channelRead(ChannelHandler ctx, Object msg) {System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);ctx.writeAndFlush("hi 我已经收到你的消息Success!\r\n");}}
通用消息处理器
package com.kjz.NettyDemo.Nio;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.charset.Charset;import java.util.Iterator;import java.util.Set;//通用消息处理类public abstract class ChannelAdapter extends Thread {private Selector selector;private ChannelHandler channelHandler;private Charset charset;public ChannelAdapter(Selector selector, Charset charset) {this.selector = selector;this.charset = charset;}//线程执行逻辑@Overridepublic void run() {while (true) {try {//阻塞等待就绪的IO事件/*** 常见的等待就绪的IO事件包括:* 可读事件(OP_READ):表示SocketChannel中有数据可读取 。* 可写事件(OP_WRITE):表示SocketChannel可以写入数据 。* 连接建立完成事件(OP_CONNECT):表示SocketChannel的连接已经建立完成 。* 新的客户端连接事件(OP_ACCEPT):表示ServerSocketChannel有新的客户端连接请求 。*/selector.select(1000);//遍历等待的IO事件Set selectedKeys = selector.selectedKeys();Iterator it = selectedKeys.iterator();SelectionKey key = null;while (it.hasNext()) {key = it.next();it.remove();handleInput(key);}} catch (Exception ignore) {}}}//处理IO事件private void handleInput(SelectionKey key) throws IOException {if (!key.isValid()) return;// 获取IO事件类型Class superclass = key.channel().getClass().getSuperclass();//根据不同的IO类型进行处理//客户端SocketChannelif (superclass == SocketChannel.class){SocketChannel socketChannel = (SocketChannel) key.channel();if (key.isConnectable()) {//判断是否连接成功if (socketChannel.finishConnect()) {channelHandler = newChannelHandler(socketChannel, charset);channelActive(channelHandler);//事件注册socketChannel.register(selector,SelectionKey.OP_READ);} else {System.exit(1);}}}// 服务端ServerSocketChannelif (superclass == ServerSocketChannel.class){if (key.isAcceptable()) {ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();//通过accept方法获取SocketChannelSocketChannel socketChannel = serverSocketChannel.accept();//配置为非阻塞模式socketChannel.configureBlocking(false);//注册读事件(OP_READ)socketChannel.register(selector, SelectionKey.OP_READ);//创建ChannelHandler对象,并调用channelActive方法进行连接通知 。channelHandler = new ChannelHandler(socketChannel, charset);channelActive(channelHandler);}}//如果是读事件(isReadableif (key.isReadable()) {//从key中获取SocketChannel对象SocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer readBuffer = ByteBuffer.allocate(1024);//从SocketChannel中读取数据到ByteBuffer中int readBytes = socketChannel.read(readBuffer);if (readBytes > 0) {//调用flip()方法将Buffer从写模式切换为读模式readBuffer.flip();//根据剩余可读数据的长度,创建一个字节数组 。byte[] bytes = new byte[readBuffer.remaining()];//将缓冲区中的数据读取到字节数组中readBuffer.get(bytes);channelRead(channelHandler, new String(bytes, charset));} else if (readBytes < 0) {key.cancel();socketChannel.close();}}}// 连接通知抽象类public abstract void channelActive(ChannelHandler ctx);// 读取消息抽象类public abstract void channelRead(ChannelHandler ctx, Object msg);}
通道处理器
package com.kjz.NettyDemo.Nio;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.SocketChannel;import java.nio.charset.Charset;//通道处理器public class ChannelHandler {private SocketChannel channel;private Charset charset;public ChannelHandler(SocketChannel channel, Charset charset) {this.channel = channel;this.charset = charset;}//将数据写入通道public void writeAndFlush(Object msg) {try {byte[] bytes = msg.toString().getBytes(charset);ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);writeBuffer.put(bytes);writeBuffer.flip();channel.write(writeBuffer);} catch (IOException e) {e.printStackTrace();}}public SocketChannel channel() {return channel;}}