通过JAVA NIO实现Socket服务器与客户端功能详解编程语言

package niocommunicate; 
  
import java.io.IOException; 
import java.net.InetSocketAddress; 
import java.nio.ByteBuffer; 
import java.nio.channels.CancelledKeyException; 
import java.nio.channels.SelectionKey; 
import java.nio.channels.Selector; 
import java.nio.channels.ServerSocketChannel; 
import java.nio.channels.SocketChannel; 
import java.util.Arrays; 
import java.util.Iterator; 
import java.util.Map; 
import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 
  
public class Server { 
  
    private Selector selector = getSelector(); 
    private ServerSocketChannel ss = null; 
    private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 500, TimeUnit.MILLISECONDS, 
            new ArrayBlockingQueue<Runnable>(20)); 
  
    private static Map<Integer, SelectionKey> selectionKeyMap = new ConcurrentHashMap<>(); 
  
    public Selector getSelector() { 
        try { 
            return Selector.open(); 
        } catch (IOException e) { 
            e.printStackTrace(); 
        } 
        return null; 
    } 
  
    /** 
     * 创建非阻塞服务器绑定5555端口 
     */ 
    public Server() { 
        try { 
            ss = ServerSocketChannel.open(); 
            ss.bind(new InetSocketAddress(5555)); 
            ss.configureBlocking(false); 
            if (selector == null) { 
                selector = Selector.open(); 
            } 
            ss.register(selector, SelectionKey.OP_ACCEPT); 
        } catch (Exception e) { 
            e.printStackTrace(); 
            close(); 
        } 
    } 
  
    /** 
     * 关闭服务器 
     */ 
    private void close() { 
        threadPool.shutdown(); 
        try { 
            if (ss != null) { 
                ss.close(); 
            } 
            if (selector != null) { 
                selector.close(); 
            } 
        } catch (IOException e) { 
            e.printStackTrace(); 
        } 
    } 
  
    /** 
     * 启动选择器监听客户端事件 
     */ 
    private void start() { 
        threadPool.execute(new Runnable() { 
  
            @Override 
            public void run() { 
                try { 
                    while (true) { 
                        if (selector.select(10) == 0) { 
                            continue; 
                        } 
                        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); 
                        while (iterator.hasNext()) { 
                            SelectionKey selectedKey = iterator.next(); 
                            iterator.remove(); 
                            try { 
                                if (selectedKey.isReadable()) { 
  
                                    if (selectionKeyMap.get(selectedKey.hashCode()) != selectedKey) { 
                                        selectionKeyMap.put(selectedKey.hashCode(), selectedKey); 
                                        threadPool.execute(new ReadClientSocketHandler(selectedKey)); 
                                    } 
  
                                } else if (selectedKey.isWritable()) { 
                                    Object responseMessage = selectedKey.attachment(); 
                                    SocketChannel serverSocketChannel = (SocketChannel) selectedKey.channel(); 
                                    selectedKey.interestOps(SelectionKey.OP_READ); 
                                    if (responseMessage != null) { 
                                        threadPool.execute(new WriteClientSocketHandler(serverSocketChannel, 
                                                responseMessage)); 
                                    } 
                                } else if (selectedKey.isAcceptable()) { 
                                    ServerSocketChannel ssc = (ServerSocketChannel) selectedKey.channel(); 
                                    SocketChannel clientSocket = ssc.accept(); 
                                    if (clientSocket != null) { 
                                        clientSocket.configureBlocking(false); 
                                        clientSocket.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); 
                                    } 
                                } 
                            } catch (CancelledKeyException cc) { 
                                selectedKey.cancel(); 
                                selectionKeyMap.remove(selectedKey.hashCode()); 
                            } 
                        } 
  
                    } 
                } catch (Exception e) { 
                    e.printStackTrace(); 
                    close(); 
                } 
            } 
  
        }); 
    } 
  
    /** 
     * 响应数据给客户端线程 
     * @author haoguo 
     * 
     */ 
    private class WriteClientSocketHandler implements Runnable { 
        SocketChannel client; 
        Object respnoseMessage; 
  
        WriteClientSocketHandler(SocketChannel client, Object respnoseMessage) { 
            this.client = client; 
            this.respnoseMessage = respnoseMessage; 
        } 
  
        @Override 
        public void run() { 
            byte[] responseByteData = null; 
            String logResponseString = ""; 
            if (respnoseMessage instanceof byte[]) { 
                responseByteData = (byte[]) respnoseMessage; 
                logResponseString = new String(responseByteData); 
            } else if (respnoseMessage instanceof String) { 
                logResponseString = (String) respnoseMessage; 
                responseByteData = logResponseString.getBytes(); 
            } 
            if (responseByteData == null || responseByteData.length == 0) { 
                System.out.println("响应的数据为空"); 
                return; 
            } 
            try { 
                client.write(ByteBuffer.wrap(responseByteData)); 
                System.out.println("server响应客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + logResponseString 
                        + "]"); 
            } catch (IOException e) { 
                e.printStackTrace(); 
                try { 
                    client.close(); 
                } catch (IOException e1) { 
  
                    e1.printStackTrace(); 
                } 
            } 
        } 
    } 
  
    /** 
     * 读客户端发送数据线程 
     * @author haoguo 
     * 
     */ 
    private class ReadClientSocketHandler implements Runnable { 
        private SocketChannel client; 
        private ByteBuffer tmp = ByteBuffer.allocate(1024); 
        private SelectionKey selectionKey; 
  
        ReadClientSocketHandler(SelectionKey selectionKey) { 
            this.selectionKey = selectionKey; 
            this.client = (SocketChannel) selectionKey.channel(); 
        } 
  
        @Override 
        public void run() { 
            try { 
                tmp.clear(); 
                byte[] data = new byte[0]; 
                int len = -1; 
                while ((len = client.read(tmp)) > 0) { 
                    data = Arrays.copyOf(data, data.length + len); 
                    System.arraycopy(tmp.array(), 0, data, data.length - len, len); 
                    tmp.rewind(); 
                } 
                if (data.length == 0) { 
                    return; 
                } 
                System.out.println("接收到客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + new String(data) + "]"); 
                // dosomthing 
                byte[] response = "response".getBytes(); 
                client.register(selector, SelectionKey.OP_WRITE, response); 
            } catch (IOException e) { 
  
                System.out.println("客户端[" + selectionKey.hashCode() + "]关闭了连接"); 
                try { 
                    SelectionKey selectionKey = client.keyFor(selector); 
                    selectionKey.cancel(); 
                    client.close(); 
                } catch (IOException e1) { 
  
                    e1.printStackTrace(); 
                } 
            } finally { 
                selectionKeyMap.remove(selectionKey.hashCode()); 
            } 
        } 
    } 
  
    public static void main(String[] args) { 
        Server server = new Server(); 
        server.start(); 
    } 
}

package niocommunicate; 
  
import java.io.IOException; 
import java.net.InetAddress; 
import java.net.InetSocketAddress; 
import java.nio.ByteBuffer; 
import java.nio.channels.ClosedChannelException; 
import java.nio.channels.SelectionKey; 
import java.nio.channels.Selector; 
import java.nio.channels.SocketChannel; 
import java.util.Arrays; 
import java.util.Iterator; 
import java.util.LinkedList; 
import java.util.List; 
  
public class Client { 
    SocketChannel client; 
    Selector selctor = getSelector(); 
  
    private volatile boolean run = true; 
  
    private List<Object> messageQueue = new LinkedList<>(); 
  
    public Selector getSelector() { 
        try { 
            return Selector.open(); 
        } catch (IOException e) { 
            e.printStackTrace(); 
        } 
        return null; 
    } 
  
    public Client() { 
        try { 
            client = SocketChannel.open(); 
            client.configureBlocking(false); 
            client.connect(new InetSocketAddress(InetAddress.getLocalHost(), 5555)); 
            client.register(selctor, SelectionKey.OP_CONNECT); 
        } catch (IOException e) { 
            e.printStackTrace(); 
        } 
        new Thread(new Runnable() { 
  
            @Override 
            public void run() { 
                while (run) { 
                    try { 
                        if (selctor.select(20) == 0) { 
                            continue; 
                        } 
                        Iterator<SelectionKey> iterator = selctor.selectedKeys().iterator(); 
                        while (iterator.hasNext()) { 
                            SelectionKey selectionKey = iterator.next(); 
                            iterator.remove(); 
                            if (selectionKey.isConnectable()) { 
                                SocketChannel sc = (SocketChannel) selectionKey.channel(); 
                                sc.finishConnect(); 
                                sc.register(selctor, SelectionKey.OP_READ); 
                            } else if (selectionKey.isWritable()) { 
                                selectionKey.interestOps(SelectionKey.OP_READ); 
                                Object requestMessage = selectionKey.attachment(); 
                                SocketChannel writeSocketChannel = (SocketChannel) selectionKey.channel(); 
                                byte[] requestByteData = null; 
                                if (requestMessage instanceof byte[]) { 
                                    requestByteData = (byte[]) requestMessage; 
                                } else if (requestMessage instanceof String) { 
                                    requestByteData = ((String) requestMessage).getBytes(); 
                                    System.out.println("client send Message:[" + requestMessage + "]"); 
                                } else { 
                                    System.out.println("unsupport send Message Type" + requestMessage.getClass()); 
                                } 
                                System.out.println("requestMessage:" + requestMessage); 
                                if (requestByteData != null && requestByteData.length > 0) { 
                                    try { 
                                        writeSocketChannel.write(ByteBuffer.wrap(requestByteData)); 
                                    } catch (IOException e) { 
                                        e.printStackTrace(); 
                                    } 
                                } 
                            } else if (selectionKey.isReadable()) { 
                                SocketChannel readSocketChannel = (SocketChannel) selectionKey.channel(); 
                                ByteBuffer tmp = ByteBuffer.allocate(1024); 
                                int len = -1; 
                                byte[] data = new byte[0]; 
                                if ((len = readSocketChannel.read(tmp)) > 0) { 
                                    data = Arrays.copyOf(data, data.length + len); 
                                    System.arraycopy(tmp.array(), 0, data, data.length - len, len); 
                                    tmp.rewind(); 
                                } 
                                if (data.length > 0) { 
                                    System.out.println("客户端接收到数据:[" + new String(data) + "]"); 
                                } 
                            } 
                        } 
                    } catch (IOException e1) { 
                        e1.printStackTrace(); 
                        close(); 
                    } 
                } 
            } 
        }).start(); 
        try { 
            Thread.sleep(200); 
        } catch (InterruptedException e) { 
  
            e.printStackTrace(); 
        } 
    } 
  
    public void close() { 
        try { 
            SelectionKey selectionKey = client.keyFor(selctor); 
            selectionKey.cancel(); 
            client.close(); 
            run = false; 
        } catch (IOException e) { 
  
            e.printStackTrace(); 
        } 
    } 
  
    public void writeData(String data) { 
        messageQueue.add(data); 
        while (messageQueue.size() > 0) { 
            Object firstSendData = messageQueue.remove(0); 
            try { 
                client.register(selctor, SelectionKey.OP_WRITE, firstSendData); 
            } catch (ClosedChannelException e) { 
                e.printStackTrace(); 
            } 
            try { 
                Thread.sleep(40); 
            } catch (InterruptedException e) { 
                e.printStackTrace(); 
            } 
        } 
    } 
  
    public static void main(String[] args) { 
  
        Client client = new Client(); 
        long t1 = System.currentTimeMillis(); 
        for (int i = 10; i < 200; i++) { 
            client.writeData(i + "nimddddddddddsssssssssssssssssssssssssssssssssssscccccccccccccccccccccccc" 
                    + "ccccccccccccccccccccccccccccccccccccccccccccccccccccccccdddddddddddd" 
                    + "dddddddddddddddddwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwaaaaaaaaaaaaaa" 
                    + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaddddddddddddddddddddddddddddddd" 
                    + "ddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddrrrr" 
                    + "jjjjjjjjjjjjjjjjjjjjjjjjjjjjrrrrrrrrrrrrrrrrrrrrrrrrrrrkkkkkkkkkkkkkkkkkkkk" 
                    + "kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkjjjjkkkkkklllllllllllllllllllllllllll" 
                    + "lllllllldddddddddddddmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmddaei" 
                    + "nimddddddddddsssssssssssssssssssssssssssssssssssscccccccccccccccccccccccc" 
                    + "ccccccccccccccccccccccccccccccccccccccccccccccccccccccccdddddddddddd" 
                    + "dddddddddddddddddwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwaaaaaaaaaaaaaa" 
                    + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaddddddddddddddddddddddddddddddd" 
                    + "ddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddrrrr" 
                    + "jjjjjjjjjjjjjjjjjjjjjjjjjjjjrrrrrrrrrrrrrrrrrrrrrrrrrrrkkkkkkkkkkkkkkkkkkkk" 
                    + "kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkjjjjkkkkkklllllllllllllllllllllllllll" 
                    + "lllllllldddddddddddddmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmddaei" + i); 
        } 
        long t2 = System.currentTimeMillis(); 
        System.out.println("总共耗时:" + (t2 - t1) + "ms"); 
        client.close(); 
    } 
}

package niocommunicate; 
  
import java.io.IOException; 
import java.net.InetSocketAddress; 
import java.nio.ByteBuffer; 
import java.nio.channels.CancelledKeyException; 
import java.nio.channels.SelectionKey; 
import java.nio.channels.Selector; 
import java.nio.channels.ServerSocketChannel; 
import java.nio.channels.SocketChannel; 
import java.util.Arrays; 
import java.util.Iterator; 
import java.util.LinkedList; 
import java.util.List; 
import java.util.Map; 
import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 
  
public class Server { 
  
    private Selector selector = getSelector(); 
    private ServerSocketChannel ss = null; 
    private ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 500, TimeUnit.MILLISECONDS, 
            new ArrayBlockingQueue<Runnable>(20)); 
  
    private Map<Integer, SelectionKey> selectionKeyMap = new ConcurrentHashMap<>(); 
    private Map<Integer, List<Object>> responseMessageQueue = new ConcurrentHashMap<>(); 
    private volatile boolean run = true; 
    private volatile boolean isClose = false; 
  
    public Selector getSelector() { 
        try { 
            return Selector.open(); 
        } catch (IOException e) { 
            e.printStackTrace(); 
        } 
        return null; 
    } 
  
    /** 
     * 创建非阻塞服务器绑定5555端口 
     */ 
    public Server() { 
        try { 
            ss = ServerSocketChannel.open(); 
            ss.bind(new InetSocketAddress(5555)); 
            ss.configureBlocking(false); 
            if (selector == null) { 
                selector = Selector.open(); 
            } 
            ss.register(selector, SelectionKey.OP_ACCEPT); 
        } catch (Exception e) { 
            e.printStackTrace(); 
            close(); 
        } 
    } 
  
    public boolean isClose() { 
        return isClose; 
    } 
  
    /** 
     * 关闭服务器 
     */ 
    private void close() { 
        run = false; 
        isClose = true; 
        threadPool.shutdown(); 
        try { 
            if (ss != null) { 
                ss.close(); 
            } 
            if (selector != null) { 
                selector.close(); 
            } 
        } catch (IOException e) { 
            e.printStackTrace(); 
        } 
    } 
  
    /** 
     * 启动选择器监听客户端事件 
     */ 
    private void start() { 
        threadPool.execute(new Runnable() { 
  
            @Override 
            public void run() { 
                try { 
                    while (run) { 
                        if (selector.select(10) == 0) { 
                            continue; 
                        } 
                        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); 
                        while (iterator.hasNext()) { 
                            SelectionKey selectedKey = iterator.next(); 
                            iterator.remove(); 
                            try { 
                                if (selectedKey.isReadable()) { 
  
                                    if (selectionKeyMap.get(selectedKey.hashCode()) != selectedKey) { 
                                        selectionKeyMap.put(selectedKey.hashCode(), selectedKey); 
                                        threadPool.execute(new ReadClientSocketHandler(selectedKey)); 
                                    } 
  
                                } else if (selectedKey.isWritable()) { 
                                    SocketChannel serverSocketChannel = (SocketChannel) selectedKey.channel(); 
                                    selectedKey.interestOps(SelectionKey.OP_READ); 
                                    List<Object> list = responseMessageQueue.get(selectedKey.hashCode()); 
                                    if (list == null) { 
                                        list = new LinkedList<Object>(); 
                                        responseMessageQueue.put(selectedKey.hashCode(), list); 
                                    } 
                                    while (list.size() > 0) { 
                                        Object responseMessage = list.remove(0); 
                                        if (responseMessage != null) { 
                                            threadPool.execute(new WriteClientSocketHandler(serverSocketChannel, 
                                                    responseMessage)); 
                                        } 
                                    } 
                                } else if (selectedKey.isAcceptable()) { 
                                    ServerSocketChannel ssc = (ServerSocketChannel) selectedKey.channel(); 
                                    SocketChannel clientSocket = ssc.accept(); 
                                    if (clientSocket != null) { 
                                        clientSocket.configureBlocking(false); 
                                        clientSocket.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); 
                                    } 
                                } 
                            } catch (CancelledKeyException cc) { 
                                selectedKey.cancel(); 
                                int hashCode = selectedKey.hashCode(); 
                                selectionKeyMap.remove(hashCode); 
                                responseMessageQueue.remove(hashCode); 
                            } 
                        } 
  
                    } 
                } catch (Exception e) { 
                    e.printStackTrace(); 
                    close(); 
                } 
            } 
  
        }); 
    } 
  
    /** 
     * 响应数据给客户端线程 
     * 
     * @author haoguo 
     * 
     */ 
    private class WriteClientSocketHandler implements Runnable { 
        SocketChannel client; 
        Object respnoseMessage; 
  
        WriteClientSocketHandler(SocketChannel client, Object respnoseMessage) { 
            this.client = client; 
            this.respnoseMessage = respnoseMessage; 
        } 
  
        @Override 
        public void run() { 
            byte[] responseByteData = null; 
            String logResponseString = ""; 
            if (respnoseMessage instanceof byte[]) { 
                responseByteData = (byte[]) respnoseMessage; 
                logResponseString = new String(responseByteData); 
            } else if (respnoseMessage instanceof String) { 
                logResponseString = (String) respnoseMessage; 
                responseByteData = logResponseString.getBytes(); 
            } 
            if (responseByteData == null || responseByteData.length == 0) { 
                System.out.println("响应的数据为空"); 
                return; 
            } 
            try { 
                client.write(ByteBuffer.wrap(responseByteData)); 
                System.out.println("server响应客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + logResponseString 
                        + "]"); 
            } catch (IOException e) { 
                e.printStackTrace(); 
                try { 
                    SelectionKey selectionKey = client.keyFor(selector); 
                    if (selectionKey != null) { 
                        selectionKey.cancel(); 
                        int hashCode = selectionKey.hashCode(); 
                        responseMessageQueue.remove(hashCode); 
                    } 
                    if (client != null) { 
                        client.close(); 
                    } 
                } catch (IOException e1) { 
                    e1.printStackTrace(); 
                } 
            } 
        } 
    } 
  
    /** 
     * 读客户端发送数据线程 
     * 
     * @author haoguo 
     * 
     */ 
    private class ReadClientSocketHandler implements Runnable { 
        private SocketChannel client; 
        private ByteBuffer tmp = ByteBuffer.allocate(1024); 
        private SelectionKey selectionKey; 
        int hashCode; 
  
        ReadClientSocketHandler(SelectionKey selectionKey) { 
            this.selectionKey = selectionKey; 
            this.client = (SocketChannel) selectionKey.channel(); 
            this.hashCode = selectionKey.hashCode(); 
        } 
  
        @Override 
        public void run() { 
            try { 
                tmp.clear(); 
                byte[] data = new byte[0]; 
                int len = -1; 
                while ((len = client.read(tmp)) > 0) { 
                    data = Arrays.copyOf(data, data.length + len); 
                    System.arraycopy(tmp.array(), 0, data, data.length - len, len); 
                    tmp.rewind(); 
                } 
                if (data.length == 0) { 
                    return; 
                } 
                String readData = new String(data); 
                System.out.println("接收到客户端[" + hashCode + "]数据 :[" + readData.substring(0, 3) + "]"); 
                // dosomthing 
                byte[] response = ("response" + readData.substring(0, 3)).getBytes(); 
                List<Object> list = responseMessageQueue.get(hashCode); 
                list.add(response); 
                client.register(selector, SelectionKey.OP_WRITE); 
                // client.register(selector, SelectionKey.OP_WRITE, response); 
            } catch (IOException e) { 
                System.out.println("客户端[" + selectionKey.hashCode() + "]关闭了连接"); 
                try { 
                    SelectionKey selectionKey = client.keyFor(selector); 
                    if (selectionKey != null) { 
                        selectionKey.cancel(); 
                    } 
                    if (client != null) { 
                        client.close(); 
                    } 
                } catch (IOException e1) { 
                    e1.printStackTrace(); 
                } 
            } finally { 
                selectionKeyMap.remove(hashCode); 
            } 
        } 
    } 
  
    public static void main(String[] args) { 
        Server server = new Server(); 
        server.start(); 
    } 
}

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/10535.html

(0)
上一篇 2021年7月19日 10:19
下一篇 2021年7月19日 10:19

相关推荐

发表回复

登录后才能评论