package niocommunicate; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.CancelledKeyException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Arrays; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Server { private Selector selector = getSelector(); private ServerSocketChannel ss = null; private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 500, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(20)); private static Map<Integer, SelectionKey> selectionKeyMap = new ConcurrentHashMap<>(); public Selector getSelector() { try { return Selector.open(); } catch (IOException e) { e.printStackTrace(); } return null; } /** * 创建非阻塞服务器绑定5555端口 */ public Server() { try { ss = ServerSocketChannel.open(); ss.bind(new InetSocketAddress(5555)); ss.configureBlocking(false); if (selector == null) { selector = Selector.open(); } ss.register(selector, SelectionKey.OP_ACCEPT); } catch (Exception e) { e.printStackTrace(); close(); } } /** * 关闭服务器 */ private void close() { threadPool.shutdown(); try { if (ss != null) { ss.close(); } if (selector != null) { selector.close(); } } catch (IOException e) { e.printStackTrace(); } } /** * 启动选择器监听客户端事件 */ private void start() { threadPool.execute(new Runnable() { @Override public void run() { try { while (true) { if (selector.select(10) == 0) { continue; } Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey selectedKey = iterator.next(); iterator.remove(); try { if (selectedKey.isReadable()) { if (selectionKeyMap.get(selectedKey.hashCode()) != selectedKey) { selectionKeyMap.put(selectedKey.hashCode(), selectedKey); threadPool.execute(new ReadClientSocketHandler(selectedKey)); } } else if (selectedKey.isWritable()) { Object responseMessage = selectedKey.attachment(); SocketChannel serverSocketChannel = (SocketChannel) selectedKey.channel(); selectedKey.interestOps(SelectionKey.OP_READ); if (responseMessage != null) { threadPool.execute(new WriteClientSocketHandler(serverSocketChannel, responseMessage)); } } else if (selectedKey.isAcceptable()) { ServerSocketChannel ssc = (ServerSocketChannel) selectedKey.channel(); SocketChannel clientSocket = ssc.accept(); if (clientSocket != null) { clientSocket.configureBlocking(false); clientSocket.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } } catch (CancelledKeyException cc) { selectedKey.cancel(); selectionKeyMap.remove(selectedKey.hashCode()); } } } } catch (Exception e) { e.printStackTrace(); close(); } } }); } /** * 响应数据给客户端线程 * @author haoguo * */ private class WriteClientSocketHandler implements Runnable { SocketChannel client; Object respnoseMessage; WriteClientSocketHandler(SocketChannel client, Object respnoseMessage) { this.client = client; this.respnoseMessage = respnoseMessage; } @Override public void run() { byte[] responseByteData = null; String logResponseString = ""; if (respnoseMessage instanceof byte[]) { responseByteData = (byte[]) respnoseMessage; logResponseString = new String(responseByteData); } else if (respnoseMessage instanceof String) { logResponseString = (String) respnoseMessage; responseByteData = logResponseString.getBytes(); } if (responseByteData == null || responseByteData.length == 0) { System.out.println("响应的数据为空"); return; } try { client.write(ByteBuffer.wrap(responseByteData)); System.out.println("server响应客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + logResponseString + "]"); } catch (IOException e) { e.printStackTrace(); try { client.close(); } catch (IOException e1) { e1.printStackTrace(); } } } } /** * 读客户端发送数据线程 * @author haoguo * */ private class ReadClientSocketHandler implements Runnable { private SocketChannel client; private ByteBuffer tmp = ByteBuffer.allocate(1024); private SelectionKey selectionKey; ReadClientSocketHandler(SelectionKey selectionKey) { this.selectionKey = selectionKey; this.client = (SocketChannel) selectionKey.channel(); } @Override public void run() { try { tmp.clear(); byte[] data = new byte[0]; int len = -1; while ((len = client.read(tmp)) > 0) { data = Arrays.copyOf(data, data.length + len); System.arraycopy(tmp.array(), 0, data, data.length - len, len); tmp.rewind(); } if (data.length == 0) { return; } System.out.println("接收到客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + new String(data) + "]"); // dosomthing byte[] response = "response".getBytes(); client.register(selector, SelectionKey.OP_WRITE, response); } catch (IOException e) { System.out.println("客户端[" + selectionKey.hashCode() + "]关闭了连接"); try { SelectionKey selectionKey = client.keyFor(selector); selectionKey.cancel(); client.close(); } catch (IOException e1) { e1.printStackTrace(); } } finally { selectionKeyMap.remove(selectionKey.hashCode()); } } } public static void main(String[] args) { Server server = new Server(); server.start(); } }
package niocommunicate; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Arrays; import java.util.Iterator; import java.util.LinkedList; import java.util.List; public class Client { SocketChannel client; Selector selctor = getSelector(); private volatile boolean run = true; private List<Object> messageQueue = new LinkedList<>(); public Selector getSelector() { try { return Selector.open(); } catch (IOException e) { e.printStackTrace(); } return null; } public Client() { try { client = SocketChannel.open(); client.configureBlocking(false); client.connect(new InetSocketAddress(InetAddress.getLocalHost(), 5555)); client.register(selctor, SelectionKey.OP_CONNECT); } catch (IOException e) { e.printStackTrace(); } new Thread(new Runnable() { @Override public void run() { while (run) { try { if (selctor.select(20) == 0) { continue; } Iterator<SelectionKey> iterator = selctor.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); if (selectionKey.isConnectable()) { SocketChannel sc = (SocketChannel) selectionKey.channel(); sc.finishConnect(); sc.register(selctor, SelectionKey.OP_READ); } else if (selectionKey.isWritable()) { selectionKey.interestOps(SelectionKey.OP_READ); Object requestMessage = selectionKey.attachment(); SocketChannel writeSocketChannel = (SocketChannel) selectionKey.channel(); byte[] requestByteData = null; if (requestMessage instanceof byte[]) { requestByteData = (byte[]) requestMessage; } else if (requestMessage instanceof String) { requestByteData = ((String) requestMessage).getBytes(); System.out.println("client send Message:[" + requestMessage + "]"); } else { System.out.println("unsupport send Message Type" + requestMessage.getClass()); } System.out.println("requestMessage:" + requestMessage); if (requestByteData != null && requestByteData.length > 0) { try { writeSocketChannel.write(ByteBuffer.wrap(requestByteData)); } catch (IOException e) { e.printStackTrace(); } } } else if (selectionKey.isReadable()) { SocketChannel readSocketChannel = (SocketChannel) selectionKey.channel(); ByteBuffer tmp = ByteBuffer.allocate(1024); int len = -1; byte[] data = new byte[0]; if ((len = readSocketChannel.read(tmp)) > 0) { data = Arrays.copyOf(data, data.length + len); System.arraycopy(tmp.array(), 0, data, data.length - len, len); tmp.rewind(); } if (data.length > 0) { System.out.println("客户端接收到数据:[" + new String(data) + "]"); } } } } catch (IOException e1) { e1.printStackTrace(); close(); } } } }).start(); try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } public void close() { try { SelectionKey selectionKey = client.keyFor(selctor); selectionKey.cancel(); client.close(); run = false; } catch (IOException e) { e.printStackTrace(); } } public void writeData(String data) { messageQueue.add(data); while (messageQueue.size() > 0) { Object firstSendData = messageQueue.remove(0); try { client.register(selctor, SelectionKey.OP_WRITE, firstSendData); } catch (ClosedChannelException e) { e.printStackTrace(); } try { Thread.sleep(40); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { Client client = new Client(); long t1 = System.currentTimeMillis(); for (int i = 10; i < 200; i++) { client.writeData(i + "nimddddddddddsssssssssssssssssssssssssssssssssssscccccccccccccccccccccccc" + "ccccccccccccccccccccccccccccccccccccccccccccccccccccccccdddddddddddd" + "dddddddddddddddddwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwaaaaaaaaaaaaaa" + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaddddddddddddddddddddddddddddddd" + "ddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddrrrr" + "jjjjjjjjjjjjjjjjjjjjjjjjjjjjrrrrrrrrrrrrrrrrrrrrrrrrrrrkkkkkkkkkkkkkkkkkkkk" + "kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkjjjjkkkkkklllllllllllllllllllllllllll" + "lllllllldddddddddddddmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmddaei" + "nimddddddddddsssssssssssssssssssssssssssssssssssscccccccccccccccccccccccc" + "ccccccccccccccccccccccccccccccccccccccccccccccccccccccccdddddddddddd" + "dddddddddddddddddwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwaaaaaaaaaaaaaa" + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaddddddddddddddddddddddddddddddd" + "ddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddrrrr" + "jjjjjjjjjjjjjjjjjjjjjjjjjjjjrrrrrrrrrrrrrrrrrrrrrrrrrrrkkkkkkkkkkkkkkkkkkkk" + "kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkjjjjkkkkkklllllllllllllllllllllllllll" + "lllllllldddddddddddddmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmddaei" + i); } long t2 = System.currentTimeMillis(); System.out.println("总共耗时:" + (t2 - t1) + "ms"); client.close(); } }
package niocommunicate; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.CancelledKeyException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Arrays; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Server { private Selector selector = getSelector(); private ServerSocketChannel ss = null; private ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 500, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(20)); private Map<Integer, SelectionKey> selectionKeyMap = new ConcurrentHashMap<>(); private Map<Integer, List<Object>> responseMessageQueue = new ConcurrentHashMap<>(); private volatile boolean run = true; private volatile boolean isClose = false; public Selector getSelector() { try { return Selector.open(); } catch (IOException e) { e.printStackTrace(); } return null; } /** * 创建非阻塞服务器绑定5555端口 */ public Server() { try { ss = ServerSocketChannel.open(); ss.bind(new InetSocketAddress(5555)); ss.configureBlocking(false); if (selector == null) { selector = Selector.open(); } ss.register(selector, SelectionKey.OP_ACCEPT); } catch (Exception e) { e.printStackTrace(); close(); } } public boolean isClose() { return isClose; } /** * 关闭服务器 */ private void close() { run = false; isClose = true; threadPool.shutdown(); try { if (ss != null) { ss.close(); } if (selector != null) { selector.close(); } } catch (IOException e) { e.printStackTrace(); } } /** * 启动选择器监听客户端事件 */ private void start() { threadPool.execute(new Runnable() { @Override public void run() { try { while (run) { if (selector.select(10) == 0) { continue; } Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey selectedKey = iterator.next(); iterator.remove(); try { if (selectedKey.isReadable()) { if (selectionKeyMap.get(selectedKey.hashCode()) != selectedKey) { selectionKeyMap.put(selectedKey.hashCode(), selectedKey); threadPool.execute(new ReadClientSocketHandler(selectedKey)); } } else if (selectedKey.isWritable()) { SocketChannel serverSocketChannel = (SocketChannel) selectedKey.channel(); selectedKey.interestOps(SelectionKey.OP_READ); List<Object> list = responseMessageQueue.get(selectedKey.hashCode()); if (list == null) { list = new LinkedList<Object>(); responseMessageQueue.put(selectedKey.hashCode(), list); } while (list.size() > 0) { Object responseMessage = list.remove(0); if (responseMessage != null) { threadPool.execute(new WriteClientSocketHandler(serverSocketChannel, responseMessage)); } } } else if (selectedKey.isAcceptable()) { ServerSocketChannel ssc = (ServerSocketChannel) selectedKey.channel(); SocketChannel clientSocket = ssc.accept(); if (clientSocket != null) { clientSocket.configureBlocking(false); clientSocket.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } } catch (CancelledKeyException cc) { selectedKey.cancel(); int hashCode = selectedKey.hashCode(); selectionKeyMap.remove(hashCode); responseMessageQueue.remove(hashCode); } } } } catch (Exception e) { e.printStackTrace(); close(); } } }); } /** * 响应数据给客户端线程 * * @author haoguo * */ private class WriteClientSocketHandler implements Runnable { SocketChannel client; Object respnoseMessage; WriteClientSocketHandler(SocketChannel client, Object respnoseMessage) { this.client = client; this.respnoseMessage = respnoseMessage; } @Override public void run() { byte[] responseByteData = null; String logResponseString = ""; if (respnoseMessage instanceof byte[]) { responseByteData = (byte[]) respnoseMessage; logResponseString = new String(responseByteData); } else if (respnoseMessage instanceof String) { logResponseString = (String) respnoseMessage; responseByteData = logResponseString.getBytes(); } if (responseByteData == null || responseByteData.length == 0) { System.out.println("响应的数据为空"); return; } try { client.write(ByteBuffer.wrap(responseByteData)); System.out.println("server响应客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + logResponseString + "]"); } catch (IOException e) { e.printStackTrace(); try { SelectionKey selectionKey = client.keyFor(selector); if (selectionKey != null) { selectionKey.cancel(); int hashCode = selectionKey.hashCode(); responseMessageQueue.remove(hashCode); } if (client != null) { client.close(); } } catch (IOException e1) { e1.printStackTrace(); } } } } /** * 读客户端发送数据线程 * * @author haoguo * */ private class ReadClientSocketHandler implements Runnable { private SocketChannel client; private ByteBuffer tmp = ByteBuffer.allocate(1024); private SelectionKey selectionKey; int hashCode; ReadClientSocketHandler(SelectionKey selectionKey) { this.selectionKey = selectionKey; this.client = (SocketChannel) selectionKey.channel(); this.hashCode = selectionKey.hashCode(); } @Override public void run() { try { tmp.clear(); byte[] data = new byte[0]; int len = -1; while ((len = client.read(tmp)) > 0) { data = Arrays.copyOf(data, data.length + len); System.arraycopy(tmp.array(), 0, data, data.length - len, len); tmp.rewind(); } if (data.length == 0) { return; } String readData = new String(data); System.out.println("接收到客户端[" + hashCode + "]数据 :[" + readData.substring(0, 3) + "]"); // dosomthing byte[] response = ("response" + readData.substring(0, 3)).getBytes(); List<Object> list = responseMessageQueue.get(hashCode); list.add(response); client.register(selector, SelectionKey.OP_WRITE); // client.register(selector, SelectionKey.OP_WRITE, response); } catch (IOException e) { System.out.println("客户端[" + selectionKey.hashCode() + "]关闭了连接"); try { SelectionKey selectionKey = client.keyFor(selector); if (selectionKey != null) { selectionKey.cancel(); } if (client != null) { client.close(); } } catch (IOException e1) { e1.printStackTrace(); } } finally { selectionKeyMap.remove(hashCode); } } } public static void main(String[] args) { Server server = new Server(); server.start(); } }