通过JAVA NIO实现Socket服务器与客户端功能

package niocommunicate;
 
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
public class Server {
 
    private Selector selector = getSelector();
    private ServerSocketChannel ss = null;
    private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 500, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<Runnable>(20));
 
    private static Map<Integer, SelectionKey> selectionKeyMap = new ConcurrentHashMap<>();
 
    public Selector getSelector() {
        try {
            return Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
 
    /**
     * 创建非阻塞服务器绑定5555端口
     */
    public Server() {
        try {
            ss = ServerSocketChannel.open();
            ss.bind(new InetSocketAddress(5555));
            ss.configureBlocking(false);
            if (selector == null) {
                selector = Selector.open();
            }
            ss.register(selector, SelectionKey.OP_ACCEPT);
        } catch (Exception e) {
            e.printStackTrace();
            close();
        }
    }
 
    /**
     * 关闭服务器
     */
    private void close() {
        threadPool.shutdown();
        try {
            if (ss != null) {
                ss.close();
            }
            if (selector != null) {
                selector.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    /**
     * 启动选择器监听客户端事件
     */
    private void start() {
        threadPool.execute(new Runnable() {
 
            @Override
            public void run() {
                try {
                    while (true) {
                        if (selector.select(10) == 0) {
                            continue;
                        }
                        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                        while (iterator.hasNext()) {
                            SelectionKey selectedKey = iterator.next();
                            iterator.remove();
                            try {
                                if (selectedKey.isReadable()) {
 
                                    if (selectionKeyMap.get(selectedKey.hashCode()) != selectedKey) {
                                        selectionKeyMap.put(selectedKey.hashCode(), selectedKey);
                                        threadPool.execute(new ReadClientSocketHandler(selectedKey));
                                    }
 
                                } else if (selectedKey.isWritable()) {
                                    Object responseMessage = selectedKey.attachment();
                                    SocketChannel serverSocketChannel = (SocketChannel) selectedKey.channel();
                                    selectedKey.interestOps(SelectionKey.OP_READ);
                                    if (responseMessage != null) {
                                        threadPool.execute(new WriteClientSocketHandler(serverSocketChannel,
                                                responseMessage));
                                    }
                                } else if (selectedKey.isAcceptable()) {
                                    ServerSocketChannel ssc = (ServerSocketChannel) selectedKey.channel();
                                    SocketChannel clientSocket = ssc.accept();
                                    if (clientSocket != null) {
                                        clientSocket.configureBlocking(false);
                                        clientSocket.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                                    }
                                }
                            } catch (CancelledKeyException cc) {
                                selectedKey.cancel();
                                selectionKeyMap.remove(selectedKey.hashCode());
                            }
                        }
 
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    close();
                }
            }
 
        });
    }
 
    /**
     * 响应数据给客户端线程
     * @author haoguo
     *
     */
    private class WriteClientSocketHandler implements Runnable {
        SocketChannel client;
        Object respnoseMessage;
 
        WriteClientSocketHandler(SocketChannel client, Object respnoseMessage) {
            this.client = client;
            this.respnoseMessage = respnoseMessage;
        }
 
        @Override
        public void run() {
            byte[] responseByteData = null;
            String logResponseString = "";
            if (respnoseMessage instanceof byte[]) {
                responseByteData = (byte[]) respnoseMessage;
                logResponseString = new String(responseByteData);
            } else if (respnoseMessage instanceof String) {
                logResponseString = (String) respnoseMessage;
                responseByteData = logResponseString.getBytes();
            }
            if (responseByteData == null || responseByteData.length == 0) {
                System.out.println("响应的数据为空");
                return;
            }
            try {
                client.write(ByteBuffer.wrap(responseByteData));
                System.out.println("server响应客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + logResponseString
                        + "]");
            } catch (IOException e) {
                e.printStackTrace();
                try {
                    client.close();
                } catch (IOException e1) {
 
                    e1.printStackTrace();
                }
            }
        }
    }
 
    /**
     * 读客户端发送数据线程
     * @author haoguo
     *
     */
    private class ReadClientSocketHandler implements Runnable {
        private SocketChannel client;
        private ByteBuffer tmp = ByteBuffer.allocate(1024);
        private SelectionKey selectionKey;
 
        ReadClientSocketHandler(SelectionKey selectionKey) {
            this.selectionKey = selectionKey;
            this.client = (SocketChannel) selectionKey.channel();
        }
 
        @Override
        public void run() {
            try {
                tmp.clear();
                byte[] data = new byte[0];
                int len = -1;
                while ((len = client.read(tmp)) > 0) {
                    data = Arrays.copyOf(data, data.length + len);
                    System.arraycopy(tmp.array(), 0, data, data.length - len, len);
                    tmp.rewind();
                }
                if (data.length == 0) {
                    return;
                }
                System.out.println("接收到客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + new String(data) + "]");
                // dosomthing
                byte[] response = "response".getBytes();
                client.register(selector, SelectionKey.OP_WRITE, response);
            } catch (IOException e) {
 
                System.out.println("客户端[" + selectionKey.hashCode() + "]关闭了连接");
                try {
                    SelectionKey selectionKey = client.keyFor(selector);
                    selectionKey.cancel();
                    client.close();
                } catch (IOException e1) {
 
                    e1.printStackTrace();
                }
            } finally {
                selectionKeyMap.remove(selectionKey.hashCode());
            }
        }
    }
 
    public static void main(String[] args) {
        Server server = new Server();
        server.start();
    }
}

package niocommunicate;
 
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
 
public class Client {
    SocketChannel client;
    Selector selctor = getSelector();
 
    private volatile boolean run = true;
 
    private List<Object> messageQueue = new LinkedList<>();
 
    public Selector getSelector() {
        try {
            return Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
 
    public Client() {
        try {
            client = SocketChannel.open();
            client.configureBlocking(false);
            client.connect(new InetSocketAddress(InetAddress.getLocalHost(), 5555));
            client.register(selctor, SelectionKey.OP_CONNECT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        new Thread(new Runnable() {
 
            @Override
            public void run() {
                while (run) {
                    try {
                        if (selctor.select(20) == 0) {
                            continue;
                        }
                        Iterator<SelectionKey> iterator = selctor.selectedKeys().iterator();
                        while (iterator.hasNext()) {
                            SelectionKey selectionKey = iterator.next();
                            iterator.remove();
                            if (selectionKey.isConnectable()) {
                                SocketChannel sc = (SocketChannel) selectionKey.channel();
                                sc.finishConnect();
                                sc.register(selctor, SelectionKey.OP_READ);
                            } else if (selectionKey.isWritable()) {
                                selectionKey.interestOps(SelectionKey.OP_READ);
                                Object requestMessage = selectionKey.attachment();
                                SocketChannel writeSocketChannel = (SocketChannel) selectionKey.channel();
                                byte[] requestByteData = null;
                                if (requestMessage instanceof byte[]) {
                                    requestByteData = (byte[]) requestMessage;
                                } else if (requestMessage instanceof String) {
                                    requestByteData = ((String) requestMessage).getBytes();
                                    System.out.println("client send Message:[" + requestMessage + "]");
                                } else {
                                    System.out.println("unsupport send Message Type" + requestMessage.getClass());
                                }
                                System.out.println("requestMessage:" + requestMessage);
                                if (requestByteData != null && requestByteData.length > 0) {
                                    try {
                                        writeSocketChannel.write(ByteBuffer.wrap(requestByteData));
                                    } catch (IOException e) {
                                        e.printStackTrace();
                                    }
                                }
                            } else if (selectionKey.isReadable()) {
                                SocketChannel readSocketChannel = (SocketChannel) selectionKey.channel();
                                ByteBuffer tmp = ByteBuffer.allocate(1024);
                                int len = -1;
                                byte[] data = new byte[0];
                                if ((len = readSocketChannel.read(tmp)) > 0) {
                                    data = Arrays.copyOf(data, data.length + len);
                                    System.arraycopy(tmp.array(), 0, data, data.length - len, len);
                                    tmp.rewind();
                                }
                                if (data.length > 0) {
                                    System.out.println("客户端接收到数据:[" + new String(data) + "]");
                                }
                            }
                        }
                    } catch (IOException e1) {
                        e1.printStackTrace();
                        close();
                    }
                }
            }
        }).start();
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
 
            e.printStackTrace();
        }
    }
 
    public void close() {
        try {
            SelectionKey selectionKey = client.keyFor(selctor);
            selectionKey.cancel();
            client.close();
            run = false;
        } catch (IOException e) {
 
            e.printStackTrace();
        }
    }
 
    public void writeData(String data) {
        messageQueue.add(data);
        while (messageQueue.size() > 0) {
            Object firstSendData = messageQueue.remove(0);
            try {
                client.register(selctor, SelectionKey.OP_WRITE, firstSendData);
            } catch (ClosedChannelException e) {
                e.printStackTrace();
            }
            try {
                Thread.sleep(40);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
 
    public static void main(String[] args) {
 
        Client client = new Client();
        long t1 = System.currentTimeMillis();
        for (int i = 10; i < 200; i++) {
            client.writeData(i + "nimddddddddddsssssssssssssssssssssssssssssssssssscccccccccccccccccccccccc"
                    + "ccccccccccccccccccccccccccccccccccccccccccccccccccccccccdddddddddddd"
                    + "dddddddddddddddddwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwaaaaaaaaaaaaaa"
                    + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaddddddddddddddddddddddddddddddd"
                    + "ddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddrrrr"
                    + "jjjjjjjjjjjjjjjjjjjjjjjjjjjjrrrrrrrrrrrrrrrrrrrrrrrrrrrkkkkkkkkkkkkkkkkkkkk"
                    + "kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkjjjjkkkkkklllllllllllllllllllllllllll"
                    + "lllllllldddddddddddddmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmddaei"
                    + "nimddddddddddsssssssssssssssssssssssssssssssssssscccccccccccccccccccccccc"
                    + "ccccccccccccccccccccccccccccccccccccccccccccccccccccccccdddddddddddd"
                    + "dddddddddddddddddwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwaaaaaaaaaaaaaa"
                    + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaddddddddddddddddddddddddddddddd"
                    + "ddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddrrrr"
                    + "jjjjjjjjjjjjjjjjjjjjjjjjjjjjrrrrrrrrrrrrrrrrrrrrrrrrrrrkkkkkkkkkkkkkkkkkkkk"
                    + "kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkjjjjkkkkkklllllllllllllllllllllllllll"
                    + "lllllllldddddddddddddmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmddaei" + i);
        }
        long t2 = System.currentTimeMillis();
        System.out.println("总共耗时:" + (t2 - t1) + "ms");
        client.close();
    }
}

package niocommunicate;
 
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
public class Server {
 
    private Selector selector = getSelector();
    private ServerSocketChannel ss = null;
    private ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 500, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<Runnable>(20));
 
    private Map<Integer, SelectionKey> selectionKeyMap = new ConcurrentHashMap<>();
    private Map<Integer, List<Object>> responseMessageQueue = new ConcurrentHashMap<>();
    private volatile boolean run = true;
    private volatile boolean isClose = false;
 
    public Selector getSelector() {
        try {
            return Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
 
    /**
     * 创建非阻塞服务器绑定5555端口
     */
    public Server() {
        try {
            ss = ServerSocketChannel.open();
            ss.bind(new InetSocketAddress(5555));
            ss.configureBlocking(false);
            if (selector == null) {
                selector = Selector.open();
            }
            ss.register(selector, SelectionKey.OP_ACCEPT);
        } catch (Exception e) {
            e.printStackTrace();
            close();
        }
    }
 
    public boolean isClose() {
        return isClose;
    }
 
    /**
     * 关闭服务器
     */
    private void close() {
        run = false;
        isClose = true;
        threadPool.shutdown();
        try {
            if (ss != null) {
                ss.close();
            }
            if (selector != null) {
                selector.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    /**
     * 启动选择器监听客户端事件
     */
    private void start() {
        threadPool.execute(new Runnable() {
 
            @Override
            public void run() {
                try {
                    while (run) {
                        if (selector.select(10) == 0) {
                            continue;
                        }
                        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                        while (iterator.hasNext()) {
                            SelectionKey selectedKey = iterator.next();
                            iterator.remove();
                            try {
                                if (selectedKey.isReadable()) {
 
                                    if (selectionKeyMap.get(selectedKey.hashCode()) != selectedKey) {
                                        selectionKeyMap.put(selectedKey.hashCode(), selectedKey);
                                        threadPool.execute(new ReadClientSocketHandler(selectedKey));
                                    }
 
                                } else if (selectedKey.isWritable()) {
                                    SocketChannel serverSocketChannel = (SocketChannel) selectedKey.channel();
                                    selectedKey.interestOps(SelectionKey.OP_READ);
                                    List<Object> list = responseMessageQueue.get(selectedKey.hashCode());
                                    if (list == null) {
                                        list = new LinkedList<Object>();
                                        responseMessageQueue.put(selectedKey.hashCode(), list);
                                    }
                                    while (list.size() > 0) {
                                        Object responseMessage = list.remove(0);
                                        if (responseMessage != null) {
                                            threadPool.execute(new WriteClientSocketHandler(serverSocketChannel,
                                                    responseMessage));
                                        }
                                    }
                                } else if (selectedKey.isAcceptable()) {
                                    ServerSocketChannel ssc = (ServerSocketChannel) selectedKey.channel();
                                    SocketChannel clientSocket = ssc.accept();
                                    if (clientSocket != null) {
                                        clientSocket.configureBlocking(false);
                                        clientSocket.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                                    }
                                }
                            } catch (CancelledKeyException cc) {
                                selectedKey.cancel();
                                int hashCode = selectedKey.hashCode();
                                selectionKeyMap.remove(hashCode);
                                responseMessageQueue.remove(hashCode);
                            }
                        }
 
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    close();
                }
            }
 
        });
    }
 
    /**
     * 响应数据给客户端线程
     *
     * @author haoguo
     *
     */
    private class WriteClientSocketHandler implements Runnable {
        SocketChannel client;
        Object respnoseMessage;
 
        WriteClientSocketHandler(SocketChannel client, Object respnoseMessage) {
            this.client = client;
            this.respnoseMessage = respnoseMessage;
        }
 
        @Override
        public void run() {
            byte[] responseByteData = null;
            String logResponseString = "";
            if (respnoseMessage instanceof byte[]) {
                responseByteData = (byte[]) respnoseMessage;
                logResponseString = new String(responseByteData);
            } else if (respnoseMessage instanceof String) {
                logResponseString = (String) respnoseMessage;
                responseByteData = logResponseString.getBytes();
            }
            if (responseByteData == null || responseByteData.length == 0) {
                System.out.println("响应的数据为空");
                return;
            }
            try {
                client.write(ByteBuffer.wrap(responseByteData));
                System.out.println("server响应客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + logResponseString
                        + "]");
            } catch (IOException e) {
                e.printStackTrace();
                try {
                    SelectionKey selectionKey = client.keyFor(selector);
                    if (selectionKey != null) {
                        selectionKey.cancel();
                        int hashCode = selectionKey.hashCode();
                        responseMessageQueue.remove(hashCode);
                    }
                    if (client != null) {
                        client.close();
                    }
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
        }
    }
 
    /**
     * 读客户端发送数据线程
     *
     * @author haoguo
     *
     */
    private class ReadClientSocketHandler implements Runnable {
        private SocketChannel client;
        private ByteBuffer tmp = ByteBuffer.allocate(1024);
        private SelectionKey selectionKey;
        int hashCode;
 
        ReadClientSocketHandler(SelectionKey selectionKey) {
            this.selectionKey = selectionKey;
            this.client = (SocketChannel) selectionKey.channel();
            this.hashCode = selectionKey.hashCode();
        }
 
        @Override
        public void run() {
            try {
                tmp.clear();
                byte[] data = new byte[0];
                int len = -1;
                while ((len = client.read(tmp)) > 0) {
                    data = Arrays.copyOf(data, data.length + len);
                    System.arraycopy(tmp.array(), 0, data, data.length - len, len);
                    tmp.rewind();
                }
                if (data.length == 0) {
                    return;
                }
                String readData = new String(data);
                System.out.println("接收到客户端[" + hashCode + "]数据 :[" + readData.substring(0, 3) + "]");
                // dosomthing
                byte[] response = ("response" + readData.substring(0, 3)).getBytes();
                List<Object> list = responseMessageQueue.get(hashCode);
                list.add(response);
                client.register(selector, SelectionKey.OP_WRITE);
                // client.register(selector, SelectionKey.OP_WRITE, response);
            } catch (IOException e) {
                System.out.println("客户端[" + selectionKey.hashCode() + "]关闭了连接");
                try {
                    SelectionKey selectionKey = client.keyFor(selector);
                    if (selectionKey != null) {
                        selectionKey.cancel();
                    }
                    if (client != null) {
                        client.close();
                    }
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            } finally {
                selectionKeyMap.remove(hashCode);
            }
        }
    }
 
    public static void main(String[] args) {
        Server server = new Server();
        server.start();
    }
}

编程技巧