异步非阻塞IO - AIO

小龙 1,045 2021-07-23

简介

ASynchronous IO(异步IO处理),前面学习的BIO会出现不必要的线程性能损耗,同时处理效率低下;NIO是基于轮询的方式来实现IO处理的,那么就需要通过“while”循环不断的对其操作状态进行各种的判断处理,这样的操作过于繁琐,所以在此基础上,JDK1.7提供了AIO的概念;

BIO、NIO、AIO模型解释
BIO:客户端连接了就进行线程的分配,而后线程等待与客户端进行IO处理
NIO:客户端连接之后,暂时不进行线程的分配,当检测到用户需要进行IO处理(SelectorKey)了那么再为其进行线程资源的分配;
AIO:客户端连接之后暂时不分配线程,当客户端发生了IO处理,先进行IO处理,IO处理完成之后再分配线程与客户端进行数据发送或者是进行接收后的处理。

在AIO之中有一个回调操作,所有的回调操作都使用了同一个接口“CompletionHandle”,这个接口中有两个抽象方法:

方法名称类型描述
public void completed(V result, A attachment)方法当前操作以及处理完成后调用
public void failed(Throwable exc, A attachment)方法执行出现了异常时调用

从整体结构上来说,AIO和NIO最大的区别就在于回调机制上,正是因为有了回调机制,才可以进行处理完成后再通知的机制实现。

代码实现

AIO服务端

1、定义一个AIO的线程处理类

public class AIOEchoThread implements Runnable{
    private CountDownLatch latch ;
    private AsynchronousServerSocketChannel serverSocketChannel; // 异步服务端Socket
    public AIOEchoThread() throws IOException {
        this.serverSocketChannel = AsynchronousServerSocketChannel.open(); // 打开异步服务端Socket通道
        this.latch = new CountDownLatch(1);
        this.serverSocketChannel.bind(new InetSocketAddress(ServerInfo.PORT)); // 绑定服务器端口
    }
    @Override
    public void run() {
        this.serverSocketChannel.accept(this,new AcceptHandler()); // 连接回调,在建立连接时候就执行回调方法
        try {
            this.latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
    public AsynchronousServerSocketChannel getServerSocketChannel() {
        return serverSocketChannel;
    }
    public CountDownLatch getLatch() {
        return latch;
    }
}

2、进行连接的回调操作

public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AIOEchoThread> {
    @Override
    public void completed(AsynchronousSocketChannel channel, AIOEchoThread aioEchoThread) {
        aioEchoThread.getServerSocketChannel().accept(aioEchoThread,this); // 创建一个连接操作
        ByteBuffer buffer = ByteBuffer.allocate(128); // 定义一个缓冲区
        channel.read(buffer,buffer,new EchoHandler(channel)); // 创建处理回调
    }
    @Override
    public void failed(Throwable exc, AIOEchoThread attachment) {
        System.err.println("【AcceptHandler】服务端程序出现了错误" + exc.getMessage());
    }
}

3、实现Echo处理的回调操作

public class EchoHandler implements CompletionHandler<Integer, ByteBuffer> {
    private AsynchronousSocketChannel socketChannel;
    private boolean exit  = false; // 定义一个退出标记,如果等于true所有操作结束
    public EchoHandler(AsynchronousSocketChannel socketChannel) {
        this.socketChannel = socketChannel;
    }
    @Override
    public void completed(Integer result, ByteBuffer buffer) {
        /**
         * AIO的处理形式是先将数据接收,接收完之后再开启线程
         * 此时buffer中已经存在有客户端发送来的数据了,所有要将buffer重置
         */
        buffer.flip(); // 重置buffer
        String message = new String(buffer.array(), 0, buffer.remaining()); // 接收客户端发送来的数据
        System.err.println("【服务器端接收到的消息是】 " + message);
        String resultMessage = "【ECHO】" + message;
        if ("bye".equalsIgnoreCase(message) || "byebye".equalsIgnoreCase(message) || "exit".equalsIgnoreCase(message)) {
            resultMessage = "【ECHO】与服务断开连接...";
            exit = false;
        }
        echoWrite(resultMessage,buffer);
    }
    /**
     * 写入操作(给客户端回消息)
     * @param message 要返回的消息
     */
    public void echoWrite(String message,ByteBuffer buffer ) {
        buffer.clear();
        buffer.put(message.getBytes()); //  将要返回的数据写入缓冲区
        buffer.flip();
        this.socketChannel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                if (buffer.hasRemaining()) { // 如果缓冲区中有数据,就返回
                    EchoHandler.this.socketChannel.write(buffer,buffer,this);  // 进行回调处理
                }else { // 现在没有数据
                    if (EchoHandler.this.exit == false){ // 当前还会继续执行
                        ByteBuffer byteBuffer = ByteBuffer.allocate(128);
                        EchoHandler.this.socketChannel.read(byteBuffer,byteBuffer,new EchoHandler(socketChannel));
                    }
                }
            }
            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                closeClientChannel();
            }
        });

    }
    @Override
    public void failed(Throwable exc, ByteBuffer buffer) {
        closeClientChannel();
    }
    private void closeClientChannel(){
        try {
            System.err.println("【EchoHandler】应答操作失败,关闭客户端连接...");
            this.socketChannel.close();
        } catch (IOException e) {e.printStackTrace();}
    }
}

4、编写服务端启动程序

public class StartAIOServer {
    public static void main(String[] args) throws IOException {
        new Thread(new AIOEchoThread()).start();
    }
}

编写AIO客户端

1、创建一个AIOClientThread类

public class AIOClientThread implements Runnable{
    CountDownLatch latch;
    AsynchronousSocketChannel socketChannel ;
    public AIOClientThread() throws IOException {
        this.latch = new CountDownLatch(1);
        this.socketChannel = AsynchronousSocketChannel.open();
        this.socketChannel.connect(new InetSocketAddress(ServerInfo.HOST,ServerInfo.PORT));
    }
    @Override
    public void run() {
        try {
            this.latch.await();
            this.socketChannel.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 定义一个消息发送的处理
     * @param message
     * @return
     */
    public boolean sendMessage(String message){
        ByteBuffer buffer = ByteBuffer.allocate(128);
        buffer.put(message.getBytes());
        buffer.flip();
        this.socketChannel.write(buffer,buffer,new ClientWriteHandler(this.socketChannel,this.latch));
        if ("bye".equalsIgnoreCase(message) || "byebye".equalsIgnoreCase(message) || "exit".equalsIgnoreCase(message)) {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            this.latch.countDown();
            return false;
        }
        return true;
    }
}

2、创建客户端写回调处理类

public class ClientWriteHandler implements CompletionHandler<Integer, ByteBuffer> {
    private AsynchronousSocketChannel socketChannel;
    private CountDownLatch latch;
    public ClientWriteHandler(AsynchronousSocketChannel socketChannel, CountDownLatch latch) {
        this.socketChannel = socketChannel;
        this.latch = latch;
    }
    @Override
    public void completed(Integer result, ByteBuffer buffer) {
        if (buffer.hasRemaining()){
            this.socketChannel.write(buffer,buffer,this);
        }else {
            ByteBuffer readBuffer = ByteBuffer.allocate(128);
            this.socketChannel.read(readBuffer,readBuffer,new ClientReadHandler(this.socketChannel,this.latch));
        }
    }
    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        System.err.println("【ClientWriteHandler】数据写入错误,请重新连接服务器...");
        try {
            this.socketChannel.close();
            this.latch.countDown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3、创建客户端读回调处理类

public class ClientReadHandler implements CompletionHandler<Integer, ByteBuffer> {
    private AsynchronousSocketChannel socketChannel;
    private CountDownLatch latch;
    public ClientReadHandler(AsynchronousSocketChannel socketChannel, CountDownLatch latch) {
        this.socketChannel = socketChannel;
        this.latch = latch;
    }
    @Override
    public void completed(Integer result, ByteBuffer buffer) {
        buffer.flip();
        System.out.println(new String(buffer.array(),0,buffer.remaining()).trim());
    }
    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        System.err.println("【ClientReadHandler】数据读取错误,请重新连接服务器...");
        try {
            this.socketChannel.close();
            this.latch.countDown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

4、编写客户端启动程序

public class StartAIOClient {
    public static void main(String[] args) throws IOException {
        AIOClientThread aioClientThread = new AIOClientThread();
        new Thread(aioClientThread).start();
        while (aioClientThread.sendMessage(InputUtil.getString("请输入要发送的消息:").trim())){
            ;
        }
    }
}

执行结果:客户端

请输入要发送的消息:happy
【ECHO】happy
请输入要发送的消息:

执行结果:服务器端

【服务器端接收到的消息是】 happy