Java NIO编程示例

神圣兽国游尾郡窝窝乡独行侠 / 2024-09-21 / 原文

服务端:

package org.example;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NIOServer {
    static ExecutorService executorService = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {

        try {
            // 打开一个选择器
            Selector selector = SelectorProvider.provider().openSelector();

            // 打开一个服务器套接字通道
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking(false);

            // 绑定到指定端口
            serverChannel.socket().bind(new InetSocketAddress(8081));

            // 将服务器通道注册到选择器上,并指定关注的事件为接收连接
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);

            System.out.println("服务器已启动,监听端口8080...");

            while (true) {
                // 选择器等待就绪的通道
                selector.select();

                // 获取就绪的选键集合
                Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();

                while (keyIterator.hasNext()) {
                    SelectionKey key = keyIterator.next();
                    keyIterator.remove();

                    // 检查选键的状态并进行相应的处理
                    if (key.isAcceptable()) {
                        handleAccept(key);
                    } else if (key.isReadable()) {
                        processMsg(key);
                    } else if (key.isWritable()) {
                        // 通常这里不会处理写就绪,因为写操作通常由应用逻辑触发
                    } else if (key.isConnectable()) {
                        // 这是客户端连接的情况,对于服务器来说通常不会遇到
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
            System.err.println("服务器发生异常,已关闭。");
        } finally {
            executorService.shutdown();
        }
    }

    private static void handleAccept(SelectionKey key) throws IOException {
        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
        SocketChannel clientChannel = serverChannel.accept();
        clientChannel.configureBlocking(false);

        // 注册到选择器并关注读事件
        clientChannel.register(key.selector(), SelectionKey.OP_READ);
        System.out.println("接受到新的客户端连接:" + clientChannel.getRemoteAddress());
    }

    private static void sendData(SocketChannel socketChannel, String message) {
        ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
        while (buffer.hasRemaining()) {
            try {
                socketChannel.write(buffer);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static void processMsg(SelectionKey key) {
        SocketChannel clientChannel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(2);
        StringBuilder sb = new StringBuilder();
        try {
            while (true) {
                int bytesRead = clientChannel.read(buffer);
                if (bytesRead == -1) { // 客户端关闭连接
                    clientChannel.close();
                    break;
                } else if (bytesRead == 0) { // 消息读取结束
                    break;
                }
                buffer.flip();
                byte[] bytes = new byte[buffer.remaining()];
                buffer.get(bytes);
                sb.append(new String(bytes, StandardCharsets.UTF_8));
                buffer.clear();
            }
            if (!clientChannel.isOpen()) return;
            executorService.submit(() -> {
                System.out.println("收到消息:" + sb);
                sendData(clientChannel, "abcdaaaaaaaaaaaaaaabbbbbbbbccccccccccccccccccccccccccccd");
            });
        } catch (Exception e) {
            // 发生读错误,关闭连接并打印错误信息
            System.err.println("读错误:" + e.getMessage());
            try {
                clientChannel.close();
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }
}

客户端:

package org.example;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class NIOClient {

    public static void main(String[] args) {
        // 服务器的地址和端口
        String serverAddress = "localhost";
        int serverPort = 8081;

        try {
            // 打开一个套接字通道
            SocketChannel socketChannel = SocketChannel.open();
            // 配置为非阻塞模式
            socketChannel.configureBlocking(false);

            // 尝试连接到服务器
            if (socketChannel.connect(new InetSocketAddress(serverAddress, serverPort))) {
                // 连接成功,可以直接发送数据(但在非阻塞模式下,这通常不会发生)
                sendData(socketChannel, "Hello, Server!");
            } else {
                // 连接可能需要时间,等待连接完成(在实际应用中,这通常是在一个循环中完成的)
                while (!socketChannel.finishConnect()) {
                    // 可以在这里执行其他任务,比如更新用户界面等
                    // 但在这个简单的例子中,我们只是空转等待连接完成
                }
                // 连接完成,发送数据
                for (int index = 1; index <= 5; index++) {
                    sendData(socketChannel, "Hello, Server!");
                }
            }

            // 读取服务器的响应(在非阻塞模式下,这可能需要多次尝试)
            StringBuilder sb = new StringBuilder();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            while (socketChannel.read(buffer) <= 0) {
                // 如果没有数据可读,可以在这里执行其他任务
                // 但在这个简单的例子中,我们只是空转等待数据到来
            }

            // 服务器发送过来的所有数据
            buffer.flip();
            String response = new String(buffer.array(), 0, buffer.limit());
            sb.append(response);
            while (socketChannel.read(buffer) > 0) {
                buffer.flip();
                response = new String(buffer.array(), 0, buffer.limit());
                sb.append(response);
            }
            System.out.println("Received from server: " + sb);
            // 关闭连接
            socketChannel.close();
        } catch (Exception e) {
            e.printStackTrace();
            System.err.println("Client failed to connect to server or read response.");
        }
        System.out.println("finish");
    }

    private static void sendData(SocketChannel socketChannel, String message) throws Exception {
        ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
        while (buffer.hasRemaining()) {
            socketChannel.write(buffer);
        }
    }
}

执行效果: