package niocommunicate;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Server {
private Selector selector = getSelector();
private ServerSocketChannel ss = null;
private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 500, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(20));
private static Map<Integer, SelectionKey> selectionKeyMap = new ConcurrentHashMap<>();
public Selector getSelector() {
try {
return Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* 创建非阻塞服务器绑定5555端口
*/
public Server() {
try {
ss = ServerSocketChannel.open();
ss.bind(new InetSocketAddress(5555));
ss.configureBlocking(false);
if (selector == null) {
selector = Selector.open();
}
ss.register(selector, SelectionKey.OP_ACCEPT);
} catch (Exception e) {
e.printStackTrace();
close();
}
}
/**
* 关闭服务器
*/
private void close() {
threadPool.shutdown();
try {
if (ss != null) {
ss.close();
}
if (selector != null) {
selector.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 启动选择器监听客户端事件
*/
private void start() {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
while (true) {
if (selector.select(10) == 0) {
continue;
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectedKey = iterator.next();
iterator.remove();
try {
if (selectedKey.isReadable()) {
if (selectionKeyMap.get(selectedKey.hashCode()) != selectedKey) {
selectionKeyMap.put(selectedKey.hashCode(), selectedKey);
threadPool.execute(new ReadClientSocketHandler(selectedKey));
}
} else if (selectedKey.isWritable()) {
Object responseMessage = selectedKey.attachment();
SocketChannel serverSocketChannel = (SocketChannel) selectedKey.channel();
selectedKey.interestOps(SelectionKey.OP_READ);
if (responseMessage != null) {
threadPool.execute(new WriteClientSocketHandler(serverSocketChannel,
responseMessage));
}
} else if (selectedKey.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) selectedKey.channel();
SocketChannel clientSocket = ssc.accept();
if (clientSocket != null) {
clientSocket.configureBlocking(false);
clientSocket.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
}
} catch (CancelledKeyException cc) {
selectedKey.cancel();
selectionKeyMap.remove(selectedKey.hashCode());
}
}
}
} catch (Exception e) {
e.printStackTrace();
close();
}
}
});
}
/**
* 响应数据给客户端线程
* @author haoguo
*
*/
private class WriteClientSocketHandler implements Runnable {
SocketChannel client;
Object respnoseMessage;
WriteClientSocketHandler(SocketChannel client, Object respnoseMessage) {
this.client = client;
this.respnoseMessage = respnoseMessage;
}
@Override
public void run() {
byte[] responseByteData = null;
String logResponseString = "";
if (respnoseMessage instanceof byte[]) {
responseByteData = (byte[]) respnoseMessage;
logResponseString = new String(responseByteData);
} else if (respnoseMessage instanceof String) {
logResponseString = (String) respnoseMessage;
responseByteData = logResponseString.getBytes();
}
if (responseByteData == null || responseByteData.length == 0) {
System.out.println("响应的数据为空");
return;
}
try {
client.write(ByteBuffer.wrap(responseByteData));
System.out.println("server响应客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + logResponseString
+ "]");
} catch (IOException e) {
e.printStackTrace();
try {
client.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
/**
* 读客户端发送数据线程
* @author haoguo
*
*/
private class ReadClientSocketHandler implements Runnable {
private SocketChannel client;
private ByteBuffer tmp = ByteBuffer.allocate(1024);
private SelectionKey selectionKey;
ReadClientSocketHandler(SelectionKey selectionKey) {
this.selectionKey = selectionKey;
this.client = (SocketChannel) selectionKey.channel();
}
@Override
public void run() {
try {
tmp.clear();
byte[] data = new byte[0];
int len = -1;
while ((len = client.read(tmp)) > 0) {
data = Arrays.copyOf(data, data.length + len);
System.arraycopy(tmp.array(), 0, data, data.length - len, len);
tmp.rewind();
}
if (data.length == 0) {
return;
}
System.out.println("接收到客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + new String(data) + "]");
// dosomthing
byte[] response = "response".getBytes();
client.register(selector, SelectionKey.OP_WRITE, response);
} catch (IOException e) {
System.out.println("客户端[" + selectionKey.hashCode() + "]关闭了连接");
try {
SelectionKey selectionKey = client.keyFor(selector);
selectionKey.cancel();
client.close();
} catch (IOException e1) {
e1.printStackTrace();
}
} finally {
selectionKeyMap.remove(selectionKey.hashCode());
}
}
}
public static void main(String[] args) {
Server server = new Server();
server.start();
}
}
package niocommunicate;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
public class Client {
SocketChannel client;
Selector selctor = getSelector();
private volatile boolean run = true;
private List<Object> messageQueue = new LinkedList<>();
public Selector getSelector() {
try {
return Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
public Client() {
try {
client = SocketChannel.open();
client.configureBlocking(false);
client.connect(new InetSocketAddress(InetAddress.getLocalHost(), 5555));
client.register(selctor, SelectionKey.OP_CONNECT);
} catch (IOException e) {
e.printStackTrace();
}
new Thread(new Runnable() {
@Override
public void run() {
while (run) {
try {
if (selctor.select(20) == 0) {
continue;
}
Iterator<SelectionKey> iterator = selctor.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
if (selectionKey.isConnectable()) {
SocketChannel sc = (SocketChannel) selectionKey.channel();
sc.finishConnect();
sc.register(selctor, SelectionKey.OP_READ);
} else if (selectionKey.isWritable()) {
selectionKey.interestOps(SelectionKey.OP_READ);
Object requestMessage = selectionKey.attachment();
SocketChannel writeSocketChannel = (SocketChannel) selectionKey.channel();
byte[] requestByteData = null;
if (requestMessage instanceof byte[]) {
requestByteData = (byte[]) requestMessage;
} else if (requestMessage instanceof String) {
requestByteData = ((String) requestMessage).getBytes();
System.out.println("client send Message:[" + requestMessage + "]");
} else {
System.out.println("unsupport send Message Type" + requestMessage.getClass());
}
System.out.println("requestMessage:" + requestMessage);
if (requestByteData != null && requestByteData.length > 0) {
try {
writeSocketChannel.write(ByteBuffer.wrap(requestByteData));
} catch (IOException e) {
e.printStackTrace();
}
}
} else if (selectionKey.isReadable()) {
SocketChannel readSocketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer tmp = ByteBuffer.allocate(1024);
int len = -1;
byte[] data = new byte[0];
if ((len = readSocketChannel.read(tmp)) > 0) {
data = Arrays.copyOf(data, data.length + len);
System.arraycopy(tmp.array(), 0, data, data.length - len, len);
tmp.rewind();
}
if (data.length > 0) {
System.out.println("客户端接收到数据:[" + new String(data) + "]");
}
}
}
} catch (IOException e1) {
e1.printStackTrace();
close();
}
}
}
}).start();
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void close() {
try {
SelectionKey selectionKey = client.keyFor(selctor);
selectionKey.cancel();
client.close();
run = false;
} catch (IOException e) {
e.printStackTrace();
}
}
public void writeData(String data) {
messageQueue.add(data);
while (messageQueue.size() > 0) {
Object firstSendData = messageQueue.remove(0);
try {
client.register(selctor, SelectionKey.OP_WRITE, firstSendData);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
try {
Thread.sleep(40);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Client client = new Client();
long t1 = System.currentTimeMillis();
for (int i = 10; i < 200; i++) {
client.writeData(i + "nimddddddddddsssssssssssssssssssssssssssssssssssscccccccccccccccccccccccc"
+ "ccccccccccccccccccccccccccccccccccccccccccccccccccccccccdddddddddddd"
+ "dddddddddddddddddwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwaaaaaaaaaaaaaa"
+ "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaddddddddddddddddddddddddddddddd"
+ "ddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddrrrr"
+ "jjjjjjjjjjjjjjjjjjjjjjjjjjjjrrrrrrrrrrrrrrrrrrrrrrrrrrrkkkkkkkkkkkkkkkkkkkk"
+ "kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkjjjjkkkkkklllllllllllllllllllllllllll"
+ "lllllllldddddddddddddmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmddaei"
+ "nimddddddddddsssssssssssssssssssssssssssssssssssscccccccccccccccccccccccc"
+ "ccccccccccccccccccccccccccccccccccccccccccccccccccccccccdddddddddddd"
+ "dddddddddddddddddwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwaaaaaaaaaaaaaa"
+ "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaddddddddddddddddddddddddddddddd"
+ "ddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddrrrr"
+ "jjjjjjjjjjjjjjjjjjjjjjjjjjjjrrrrrrrrrrrrrrrrrrrrrrrrrrrkkkkkkkkkkkkkkkkkkkk"
+ "kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkjjjjkkkkkklllllllllllllllllllllllllll"
+ "lllllllldddddddddddddmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmddaei" + i);
}
long t2 = System.currentTimeMillis();
System.out.println("总共耗时:" + (t2 - t1) + "ms");
client.close();
}
}
package niocommunicate;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Server {
private Selector selector = getSelector();
private ServerSocketChannel ss = null;
private ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 500, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(20));
private Map<Integer, SelectionKey> selectionKeyMap = new ConcurrentHashMap<>();
private Map<Integer, List<Object>> responseMessageQueue = new ConcurrentHashMap<>();
private volatile boolean run = true;
private volatile boolean isClose = false;
public Selector getSelector() {
try {
return Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* 创建非阻塞服务器绑定5555端口
*/
public Server() {
try {
ss = ServerSocketChannel.open();
ss.bind(new InetSocketAddress(5555));
ss.configureBlocking(false);
if (selector == null) {
selector = Selector.open();
}
ss.register(selector, SelectionKey.OP_ACCEPT);
} catch (Exception e) {
e.printStackTrace();
close();
}
}
public boolean isClose() {
return isClose;
}
/**
* 关闭服务器
*/
private void close() {
run = false;
isClose = true;
threadPool.shutdown();
try {
if (ss != null) {
ss.close();
}
if (selector != null) {
selector.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 启动选择器监听客户端事件
*/
private void start() {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
while (run) {
if (selector.select(10) == 0) {
continue;
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectedKey = iterator.next();
iterator.remove();
try {
if (selectedKey.isReadable()) {
if (selectionKeyMap.get(selectedKey.hashCode()) != selectedKey) {
selectionKeyMap.put(selectedKey.hashCode(), selectedKey);
threadPool.execute(new ReadClientSocketHandler(selectedKey));
}
} else if (selectedKey.isWritable()) {
SocketChannel serverSocketChannel = (SocketChannel) selectedKey.channel();
selectedKey.interestOps(SelectionKey.OP_READ);
List<Object> list = responseMessageQueue.get(selectedKey.hashCode());
if (list == null) {
list = new LinkedList<Object>();
responseMessageQueue.put(selectedKey.hashCode(), list);
}
while (list.size() > 0) {
Object responseMessage = list.remove(0);
if (responseMessage != null) {
threadPool.execute(new WriteClientSocketHandler(serverSocketChannel,
responseMessage));
}
}
} else if (selectedKey.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) selectedKey.channel();
SocketChannel clientSocket = ssc.accept();
if (clientSocket != null) {
clientSocket.configureBlocking(false);
clientSocket.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
}
} catch (CancelledKeyException cc) {
selectedKey.cancel();
int hashCode = selectedKey.hashCode();
selectionKeyMap.remove(hashCode);
responseMessageQueue.remove(hashCode);
}
}
}
} catch (Exception e) {
e.printStackTrace();
close();
}
}
});
}
/**
* 响应数据给客户端线程
*
* @author haoguo
*
*/
private class WriteClientSocketHandler implements Runnable {
SocketChannel client;
Object respnoseMessage;
WriteClientSocketHandler(SocketChannel client, Object respnoseMessage) {
this.client = client;
this.respnoseMessage = respnoseMessage;
}
@Override
public void run() {
byte[] responseByteData = null;
String logResponseString = "";
if (respnoseMessage instanceof byte[]) {
responseByteData = (byte[]) respnoseMessage;
logResponseString = new String(responseByteData);
} else if (respnoseMessage instanceof String) {
logResponseString = (String) respnoseMessage;
responseByteData = logResponseString.getBytes();
}
if (responseByteData == null || responseByteData.length == 0) {
System.out.println("响应的数据为空");
return;
}
try {
client.write(ByteBuffer.wrap(responseByteData));
System.out.println("server响应客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + logResponseString
+ "]");
} catch (IOException e) {
e.printStackTrace();
try {
SelectionKey selectionKey = client.keyFor(selector);
if (selectionKey != null) {
selectionKey.cancel();
int hashCode = selectionKey.hashCode();
responseMessageQueue.remove(hashCode);
}
if (client != null) {
client.close();
}
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
/**
* 读客户端发送数据线程
*
* @author haoguo
*
*/
private class ReadClientSocketHandler implements Runnable {
private SocketChannel client;
private ByteBuffer tmp = ByteBuffer.allocate(1024);
private SelectionKey selectionKey;
int hashCode;
ReadClientSocketHandler(SelectionKey selectionKey) {
this.selectionKey = selectionKey;
this.client = (SocketChannel) selectionKey.channel();
this.hashCode = selectionKey.hashCode();
}
@Override
public void run() {
try {
tmp.clear();
byte[] data = new byte[0];
int len = -1;
while ((len = client.read(tmp)) > 0) {
data = Arrays.copyOf(data, data.length + len);
System.arraycopy(tmp.array(), 0, data, data.length - len, len);
tmp.rewind();
}
if (data.length == 0) {
return;
}
String readData = new String(data);
System.out.println("接收到客户端[" + hashCode + "]数据 :[" + readData.substring(0, 3) + "]");
// dosomthing
byte[] response = ("response" + readData.substring(0, 3)).getBytes();
List<Object> list = responseMessageQueue.get(hashCode);
list.add(response);
client.register(selector, SelectionKey.OP_WRITE);
// client.register(selector, SelectionKey.OP_WRITE, response);
} catch (IOException e) {
System.out.println("客户端[" + selectionKey.hashCode() + "]关闭了连接");
try {
SelectionKey selectionKey = client.keyFor(selector);
if (selectionKey != null) {
selectionKey.cancel();
}
if (client != null) {
client.close();
}
} catch (IOException e1) {
e1.printStackTrace();
}
} finally {
selectionKeyMap.remove(hashCode);
}
}
}
public static void main(String[] args) {
Server server = new Server();
server.start();
}
}
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/10535.html