同步非阻塞IO

小龙 789 2021-07-22

简介

从JDK1.4开始提出了NIO的概念,目的是希望可以避免传统IO处理的阻塞行为(就是如果数据没有读取完或者没有发送完,那么操作将一直处于停滞状态),解决了堵塞的问题实际上就提升了处理的性能,NIO中最大的特点就是进行网络同步非阻塞IO的程序实现

传统的IO为阻塞IO,也称为BIO;使用传统的IO客户单连接服务器端之后,服务器端就会一直等着客户端发消息来,如果没有消息发来,那么处理的线程将会出现阻塞,一直等着;这样就会造成资源的浪费。

解决资源浪费最好的做法是使用同步非阻塞IO,服务器实现的模式为一个用户一个线程,所有客户端发送的连接请求都会注册到一个多路复用器上,多路复用器通过轮询的机制,发现当前有IO操作请求之后才会启动一个线程来处理对应的操作

简单模型解释:烧开水实验
BIO:烧水的人会一直观察着水壶的状态,如果水烧好了,那么直接作出反应,如果水没烧好,则静静等待;
NIO:烧水的人不会一直区盯着水的状态,而是采用轮询的形式,每隔一段时间查看一次水是否开了,水开了就处理,没开就不理,继续保存隔一段时间观察的状态。

NIO实现架构

在NIO之中所有用户的请求依然是通过多线程的方式来完成的,但是在任何一个服务器中线程都是稀缺资源,应该充分的发挥出内核线程的优势,所以最佳的解决方法就是使用线程池来完成线程的管理,可用线程数一定和当前的硬件环境相当。

在NIO中最为关键的部分就是需要进行一个轮询的机制,它提供了一个多路复用器的概念,利用此复用器可以实现所有连接的管理,这些连接可能还未发送任何的IO操作,在进行所有通道管理的过程中,都是通过一个筛选器完成用户IO操作请求判断的,这个筛选器使用“java.nio.chanels.selector”来描述

代码实现NIO服务端

首先建立一个公共的接口,保存Host和Port

public interface ServerInfo {
    public static final int PORT = 80;
    public static final String HOST = "127.0.0.1";


创建处理的线程类

public class EchoServerThread implements Runnable {
    private SocketChannel socketChannel;
    private boolean flag = true;
    public EchoServerThread(SocketChannel socketChannel) {
        try {
            this.socketChannel = socketChannel;
            System.out.println("【客户端连接成功】连接的客户端地址为:" + socketChannel.getRemoteAddress());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void run() {
        ByteBuffer buffer = ByteBuffer.allocate(128); // 开辟缓冲区
        try {
            while (this.flag) {
                buffer.clear(); // 在读取通道数据之前先清空缓冲区
                int read = this.socketChannel.read(buffer); // 将通道的数据读取到缓冲区中
                String readMessage = new String(buffer.array(), 0, read).trim(); // 将缓冲区的数消息转为字符串
                System.out.println("【服务端收到的消息】 : " + readMessage);
                String writeMessage = "【ECHO】" + readMessage; // 设置返回的消息
                if ("bye".equalsIgnoreCase(readMessage) || "byebye".equalsIgnoreCase(readMessage) || "exit".equalsIgnoreCase(readMessage)) {
                    writeMessage = "【ECHO】与服务断开连接...";
                    flag = false;
                }
                buffer.clear();// 上面已经读过一次了,再次使用需要清空缓冲区
                buffer.put(writeMessage.getBytes());
                buffer.flip(); // 重置缓冲区
                this.socketChannel.write(buffer);  // 往通道写入数据
            }
            this.socketChannel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

创建服务器类

public class EchoServer {
    public void start() throws Exception{
        // 服务器的线程是稀缺资源,所以首先创建一个线程池管理线程
        ExecutorService threadPool = Executors.newFixedThreadPool(8); // 创建线程池
        // 要进行服务端的开发,首先要创建一个服务端的socket通道
        ServerSocketChannel channel = ServerSocketChannel.open(); // 开启服务端socket通道
        channel.configureBlocking(false); // 将通道设置为非阻塞
        channel.bind(new InetSocketAddress(ServerInfo.PORT)); //  通道绑定一个端口,Host绑定本机默认host
        // 打开一个选择器
        Selector selector = Selector.open();
        channel.register(selector, SelectionKey.OP_ACCEPT); // 将通道注册的到选择器中,并设置可接受事件为感兴趣事件
        // 所有的用户的连接都要注册到选择器上(多路复用器上),那么就利选择器的循环模式来判断请求的状态
        int SelectorState = 0; // 保存用户的状态
        while ((SelectorState = selector.select()) > 0){
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); // 获取所有的连接,并获取iterator,方便遍历
            while (iterator.hasNext()){
                SelectionKey selectionKey = iterator.next(); //  获取当前处理的状态
                if (selectionKey.isAcceptable()){ // 如果现在是一个连接请求状态
                    SocketChannel socketChannel = channel.accept(); // 创建客户端通道
                    if (socketChannel != null){
                        threadPool.submit(new EchoServerThread(socketChannel)); // 往线程池中提交一个任务
                    }
                }
                iterator.remove(); // 以及完成处理,所以不再需要做后续的重复轮询
            }
        }
        threadPool.shutdown(); // 关闭线程池
        channel.close(); // 关闭通道
    }
    public static void main(String[] args) throws Exception {
        new EchoServer().start(); // main方法只做启动
    }
}

编写NIO客户端类

public class EchoClient {
    public static void main(String[] args) throws Exception{
        SocketChannel channel = SocketChannel.open(); // 打开客户端通道
        channel.connect(new InetSocketAddress(ServerInfo.HOST,ServerInfo.PORT)); // 连接指定ip和端口的服务器
        ByteBuffer buffer = ByteBuffer.allocate(128); // 创建缓冲区
        boolean flag = true;
        while (flag){
            buffer.clear();
            String message = InputUtil.getString("请输入要发送的消息:");
            buffer.put(message.getBytes());
            buffer.flip();
            channel.write(buffer);
            buffer.clear();
            int read = channel.read(buffer); // 读取服务器端返回的消息
            System.err.println(new String(buffer.array(), 0 , read));
            if ("bye".equalsIgnoreCase(message) || "byebye".equalsIgnoreCase(message) || "exit".equalsIgnoreCase(message)) {
                flag = false; // 结束循环
            }
        }
    }
}