Java NIO编程示例
服务端:
package org.example; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class NIOServer { static ExecutorService executorService = Executors.newFixedThreadPool(10); public static void main(String[] args) { try { // 打开一个选择器 Selector selector = SelectorProvider.provider().openSelector(); // 打开一个服务器套接字通道 ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); // 绑定到指定端口 serverChannel.socket().bind(new InetSocketAddress(8081)); // 将服务器通道注册到选择器上,并指定关注的事件为接收连接 serverChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("服务器已启动,监听端口8080..."); while (true) { // 选择器等待就绪的通道 selector.select(); // 获取就绪的选键集合 Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); keyIterator.remove(); // 检查选键的状态并进行相应的处理 if (key.isAcceptable()) { handleAccept(key); } else if (key.isReadable()) { processMsg(key); } else if (key.isWritable()) { // 通常这里不会处理写就绪,因为写操作通常由应用逻辑触发 } else if (key.isConnectable()) { // 这是客户端连接的情况,对于服务器来说通常不会遇到 } } } } catch (IOException e) { e.printStackTrace(); System.err.println("服务器发生异常,已关闭。"); } finally { executorService.shutdown(); } } private static void handleAccept(SelectionKey key) throws IOException { ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); SocketChannel clientChannel = serverChannel.accept(); clientChannel.configureBlocking(false); // 注册到选择器并关注读事件 clientChannel.register(key.selector(), SelectionKey.OP_READ); System.out.println("接受到新的客户端连接:" + clientChannel.getRemoteAddress()); } private static void sendData(SocketChannel socketChannel, String message) { ByteBuffer buffer = ByteBuffer.wrap(message.getBytes()); while (buffer.hasRemaining()) { try { socketChannel.write(buffer); } catch (IOException e) { throw new RuntimeException(e); } } } public static void processMsg(SelectionKey key) { SocketChannel clientChannel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(2); StringBuilder sb = new StringBuilder(); try { while (true) { int bytesRead = clientChannel.read(buffer); if (bytesRead == -1) { // 客户端关闭连接 clientChannel.close(); break; } else if (bytesRead == 0) { // 消息读取结束 break; } buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); sb.append(new String(bytes, StandardCharsets.UTF_8)); buffer.clear(); } if (!clientChannel.isOpen()) return; executorService.submit(() -> { System.out.println("收到消息:" + sb); sendData(clientChannel, "abcdaaaaaaaaaaaaaaabbbbbbbbccccccccccccccccccccccccccccd"); }); } catch (Exception e) { // 发生读错误,关闭连接并打印错误信息 System.err.println("读错误:" + e.getMessage()); try { clientChannel.close(); } catch (IOException ex) { ex.printStackTrace(); } } } }
客户端:
package org.example; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; public class NIOClient { public static void main(String[] args) { // 服务器的地址和端口 String serverAddress = "localhost"; int serverPort = 8081; try { // 打开一个套接字通道 SocketChannel socketChannel = SocketChannel.open(); // 配置为非阻塞模式 socketChannel.configureBlocking(false); // 尝试连接到服务器 if (socketChannel.connect(new InetSocketAddress(serverAddress, serverPort))) { // 连接成功,可以直接发送数据(但在非阻塞模式下,这通常不会发生) sendData(socketChannel, "Hello, Server!"); } else { // 连接可能需要时间,等待连接完成(在实际应用中,这通常是在一个循环中完成的) while (!socketChannel.finishConnect()) { // 可以在这里执行其他任务,比如更新用户界面等 // 但在这个简单的例子中,我们只是空转等待连接完成 } // 连接完成,发送数据 for (int index = 1; index <= 5; index++) { sendData(socketChannel, "Hello, Server!"); } } // 读取服务器的响应(在非阻塞模式下,这可能需要多次尝试) StringBuilder sb = new StringBuilder(); ByteBuffer buffer = ByteBuffer.allocate(1024); while (socketChannel.read(buffer) <= 0) { // 如果没有数据可读,可以在这里执行其他任务 // 但在这个简单的例子中,我们只是空转等待数据到来 } // 服务器发送过来的所有数据 buffer.flip(); String response = new String(buffer.array(), 0, buffer.limit()); sb.append(response); while (socketChannel.read(buffer) > 0) { buffer.flip(); response = new String(buffer.array(), 0, buffer.limit()); sb.append(response); } System.out.println("Received from server: " + sb); // 关闭连接 socketChannel.close(); } catch (Exception e) { e.printStackTrace(); System.err.println("Client failed to connect to server or read response."); } System.out.println("finish"); } private static void sendData(SocketChannel socketChannel, String message) throws Exception { ByteBuffer buffer = ByteBuffer.wrap(message.getBytes()); while (buffer.hasRemaining()) { socketChannel.write(buffer); } } }
执行效果: