通过JAVA NIO实现Socket服务器与客户端功能详解编程语言

package niocommunicate; 
import java.io.IOException; 
import java.net.InetSocketAddress; 
import java.nio.ByteBuffer; 
import java.nio.channels.CancelledKeyException; 
import java.nio.channels.SelectionKey; 
import java.nio.channels.Selector; 
import java.nio.channels.ServerSocketChannel; 
import java.nio.channels.SocketChannel; 
import java.util.Arrays; 
import java.util.Iterator; 
import java.util.Map; 
import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 
public class Server { 
private Selector selector = getSelector(); 
private ServerSocketChannel ss = null; 
private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 500, TimeUnit.MILLISECONDS, 
new ArrayBlockingQueue<Runnable>(20)); 
private static Map<Integer, SelectionKey> selectionKeyMap = new ConcurrentHashMap<>(); 
public Selector getSelector() { 
try { 
return Selector.open(); 
} catch (IOException e) { 
e.printStackTrace(); 
} 
return null; 
} 
/** 
* 创建非阻塞服务器绑定5555端口 
*/ 
public Server() { 
try { 
ss = ServerSocketChannel.open(); 
ss.bind(new InetSocketAddress(5555)); 
ss.configureBlocking(false); 
if (selector == null) { 
selector = Selector.open(); 
} 
ss.register(selector, SelectionKey.OP_ACCEPT); 
} catch (Exception e) { 
e.printStackTrace(); 
close(); 
} 
} 
/** 
* 关闭服务器 
*/ 
private void close() { 
threadPool.shutdown(); 
try { 
if (ss != null) { 
ss.close(); 
} 
if (selector != null) { 
selector.close(); 
} 
} catch (IOException e) { 
e.printStackTrace(); 
} 
} 
/** 
* 启动选择器监听客户端事件 
*/ 
private void start() { 
threadPool.execute(new Runnable() { 
@Override 
public void run() { 
try { 
while (true) { 
if (selector.select(10) == 0) { 
continue; 
} 
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); 
while (iterator.hasNext()) { 
SelectionKey selectedKey = iterator.next(); 
iterator.remove(); 
try { 
if (selectedKey.isReadable()) { 
if (selectionKeyMap.get(selectedKey.hashCode()) != selectedKey) { 
selectionKeyMap.put(selectedKey.hashCode(), selectedKey); 
threadPool.execute(new ReadClientSocketHandler(selectedKey)); 
} 
} else if (selectedKey.isWritable()) { 
Object responseMessage = selectedKey.attachment(); 
SocketChannel serverSocketChannel = (SocketChannel) selectedKey.channel(); 
selectedKey.interestOps(SelectionKey.OP_READ); 
if (responseMessage != null) { 
threadPool.execute(new WriteClientSocketHandler(serverSocketChannel, 
responseMessage)); 
} 
} else if (selectedKey.isAcceptable()) { 
ServerSocketChannel ssc = (ServerSocketChannel) selectedKey.channel(); 
SocketChannel clientSocket = ssc.accept(); 
if (clientSocket != null) { 
clientSocket.configureBlocking(false); 
clientSocket.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); 
} 
} 
} catch (CancelledKeyException cc) { 
selectedKey.cancel(); 
selectionKeyMap.remove(selectedKey.hashCode()); 
} 
} 
} 
} catch (Exception e) { 
e.printStackTrace(); 
close(); 
} 
} 
}); 
} 
/** 
* 响应数据给客户端线程 
* @author haoguo 
* 
*/ 
private class WriteClientSocketHandler implements Runnable { 
SocketChannel client; 
Object respnoseMessage; 
WriteClientSocketHandler(SocketChannel client, Object respnoseMessage) { 
this.client = client; 
this.respnoseMessage = respnoseMessage; 
} 
@Override 
public void run() { 
byte[] responseByteData = null; 
String logResponseString = ""; 
if (respnoseMessage instanceof byte[]) { 
responseByteData = (byte[]) respnoseMessage; 
logResponseString = new String(responseByteData); 
} else if (respnoseMessage instanceof String) { 
logResponseString = (String) respnoseMessage; 
responseByteData = logResponseString.getBytes(); 
} 
if (responseByteData == null || responseByteData.length == 0) { 
System.out.println("响应的数据为空"); 
return; 
} 
try { 
client.write(ByteBuffer.wrap(responseByteData)); 
System.out.println("server响应客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + logResponseString 
+ "]"); 
} catch (IOException e) { 
e.printStackTrace(); 
try { 
client.close(); 
} catch (IOException e1) { 
e1.printStackTrace(); 
} 
} 
} 
} 
/** 
* 读客户端发送数据线程 
* @author haoguo 
* 
*/ 
private class ReadClientSocketHandler implements Runnable { 
private SocketChannel client; 
private ByteBuffer tmp = ByteBuffer.allocate(1024); 
private SelectionKey selectionKey; 
ReadClientSocketHandler(SelectionKey selectionKey) { 
this.selectionKey = selectionKey; 
this.client = (SocketChannel) selectionKey.channel(); 
} 
@Override 
public void run() { 
try { 
tmp.clear(); 
byte[] data = new byte[0]; 
int len = -1; 
while ((len = client.read(tmp)) > 0) { 
data = Arrays.copyOf(data, data.length + len); 
System.arraycopy(tmp.array(), 0, data, data.length - len, len); 
tmp.rewind(); 
} 
if (data.length == 0) { 
return; 
} 
System.out.println("接收到客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + new String(data) + "]"); 
// dosomthing 
byte[] response = "response".getBytes(); 
client.register(selector, SelectionKey.OP_WRITE, response); 
} catch (IOException e) { 
System.out.println("客户端[" + selectionKey.hashCode() + "]关闭了连接"); 
try { 
SelectionKey selectionKey = client.keyFor(selector); 
selectionKey.cancel(); 
client.close(); 
} catch (IOException e1) { 
e1.printStackTrace(); 
} 
} finally { 
selectionKeyMap.remove(selectionKey.hashCode()); 
} 
} 
} 
public static void main(String[] args) { 
Server server = new Server(); 
server.start(); 
} 
}

package niocommunicate; 
import java.io.IOException; 
import java.net.InetAddress; 
import java.net.InetSocketAddress; 
import java.nio.ByteBuffer; 
import java.nio.channels.ClosedChannelException; 
import java.nio.channels.SelectionKey; 
import java.nio.channels.Selector; 
import java.nio.channels.SocketChannel; 
import java.util.Arrays; 
import java.util.Iterator; 
import java.util.LinkedList; 
import java.util.List; 
public class Client { 
SocketChannel client; 
Selector selctor = getSelector(); 
private volatile boolean run = true; 
private List<Object> messageQueue = new LinkedList<>(); 
public Selector getSelector() { 
try { 
return Selector.open(); 
} catch (IOException e) { 
e.printStackTrace(); 
} 
return null; 
} 
public Client() { 
try { 
client = SocketChannel.open(); 
client.configureBlocking(false); 
client.connect(new InetSocketAddress(InetAddress.getLocalHost(), 5555)); 
client.register(selctor, SelectionKey.OP_CONNECT); 
} catch (IOException e) { 
e.printStackTrace(); 
} 
new Thread(new Runnable() { 
@Override 
public void run() { 
while (run) { 
try { 
if (selctor.select(20) == 0) { 
continue; 
} 
Iterator<SelectionKey> iterator = selctor.selectedKeys().iterator(); 
while (iterator.hasNext()) { 
SelectionKey selectionKey = iterator.next(); 
iterator.remove(); 
if (selectionKey.isConnectable()) { 
SocketChannel sc = (SocketChannel) selectionKey.channel(); 
sc.finishConnect(); 
sc.register(selctor, SelectionKey.OP_READ); 
} else if (selectionKey.isWritable()) { 
selectionKey.interestOps(SelectionKey.OP_READ); 
Object requestMessage = selectionKey.attachment(); 
SocketChannel writeSocketChannel = (SocketChannel) selectionKey.channel(); 
byte[] requestByteData = null; 
if (requestMessage instanceof byte[]) { 
requestByteData = (byte[]) requestMessage; 
} else if (requestMessage instanceof String) { 
requestByteData = ((String) requestMessage).getBytes(); 
System.out.println("client send Message:[" + requestMessage + "]"); 
} else { 
System.out.println("unsupport send Message Type" + requestMessage.getClass()); 
} 
System.out.println("requestMessage:" + requestMessage); 
if (requestByteData != null && requestByteData.length > 0) { 
try { 
writeSocketChannel.write(ByteBuffer.wrap(requestByteData)); 
} catch (IOException e) { 
e.printStackTrace(); 
} 
} 
} else if (selectionKey.isReadable()) { 
SocketChannel readSocketChannel = (SocketChannel) selectionKey.channel(); 
ByteBuffer tmp = ByteBuffer.allocate(1024); 
int len = -1; 
byte[] data = new byte[0]; 
if ((len = readSocketChannel.read(tmp)) > 0) { 
data = Arrays.copyOf(data, data.length + len); 
System.arraycopy(tmp.array(), 0, data, data.length - len, len); 
tmp.rewind(); 
} 
if (data.length > 0) { 
System.out.println("客户端接收到数据:[" + new String(data) + "]"); 
} 
} 
} 
} catch (IOException e1) { 
e1.printStackTrace(); 
close(); 
} 
} 
} 
}).start(); 
try { 
Thread.sleep(200); 
} catch (InterruptedException e) { 
e.printStackTrace(); 
} 
} 
public void close() { 
try { 
SelectionKey selectionKey = client.keyFor(selctor); 
selectionKey.cancel(); 
client.close(); 
run = false; 
} catch (IOException e) { 
e.printStackTrace(); 
} 
} 
public void writeData(String data) { 
messageQueue.add(data); 
while (messageQueue.size() > 0) { 
Object firstSendData = messageQueue.remove(0); 
try { 
client.register(selctor, SelectionKey.OP_WRITE, firstSendData); 
} catch (ClosedChannelException e) { 
e.printStackTrace(); 
} 
try { 
Thread.sleep(40); 
} catch (InterruptedException e) { 
e.printStackTrace(); 
} 
} 
} 
public static void main(String[] args) { 
Client client = new Client(); 
long t1 = System.currentTimeMillis(); 
for (int i = 10; i < 200; i++) { 
client.writeData(i + "nimddddddddddsssssssssssssssssssssssssssssssssssscccccccccccccccccccccccc" 
+ "ccccccccccccccccccccccccccccccccccccccccccccccccccccccccdddddddddddd" 
+ "dddddddddddddddddwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwaaaaaaaaaaaaaa" 
+ "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaddddddddddddddddddddddddddddddd" 
+ "ddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddrrrr" 
+ "jjjjjjjjjjjjjjjjjjjjjjjjjjjjrrrrrrrrrrrrrrrrrrrrrrrrrrrkkkkkkkkkkkkkkkkkkkk" 
+ "kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkjjjjkkkkkklllllllllllllllllllllllllll" 
+ "lllllllldddddddddddddmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmddaei" 
+ "nimddddddddddsssssssssssssssssssssssssssssssssssscccccccccccccccccccccccc" 
+ "ccccccccccccccccccccccccccccccccccccccccccccccccccccccccdddddddddddd" 
+ "dddddddddddddddddwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwaaaaaaaaaaaaaa" 
+ "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaddddddddddddddddddddddddddddddd" 
+ "ddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddrrrr" 
+ "jjjjjjjjjjjjjjjjjjjjjjjjjjjjrrrrrrrrrrrrrrrrrrrrrrrrrrrkkkkkkkkkkkkkkkkkkkk" 
+ "kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkjjjjkkkkkklllllllllllllllllllllllllll" 
+ "lllllllldddddddddddddmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmddaei" + i); 
} 
long t2 = System.currentTimeMillis(); 
System.out.println("总共耗时:" + (t2 - t1) + "ms"); 
client.close(); 
} 
}

package niocommunicate; 
import java.io.IOException; 
import java.net.InetSocketAddress; 
import java.nio.ByteBuffer; 
import java.nio.channels.CancelledKeyException; 
import java.nio.channels.SelectionKey; 
import java.nio.channels.Selector; 
import java.nio.channels.ServerSocketChannel; 
import java.nio.channels.SocketChannel; 
import java.util.Arrays; 
import java.util.Iterator; 
import java.util.LinkedList; 
import java.util.List; 
import java.util.Map; 
import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 
public class Server { 
private Selector selector = getSelector(); 
private ServerSocketChannel ss = null; 
private ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 500, TimeUnit.MILLISECONDS, 
new ArrayBlockingQueue<Runnable>(20)); 
private Map<Integer, SelectionKey> selectionKeyMap = new ConcurrentHashMap<>(); 
private Map<Integer, List<Object>> responseMessageQueue = new ConcurrentHashMap<>(); 
private volatile boolean run = true; 
private volatile boolean isClose = false; 
public Selector getSelector() { 
try { 
return Selector.open(); 
} catch (IOException e) { 
e.printStackTrace(); 
} 
return null; 
} 
/** 
* 创建非阻塞服务器绑定5555端口 
*/ 
public Server() { 
try { 
ss = ServerSocketChannel.open(); 
ss.bind(new InetSocketAddress(5555)); 
ss.configureBlocking(false); 
if (selector == null) { 
selector = Selector.open(); 
} 
ss.register(selector, SelectionKey.OP_ACCEPT); 
} catch (Exception e) { 
e.printStackTrace(); 
close(); 
} 
} 
public boolean isClose() { 
return isClose; 
} 
/** 
* 关闭服务器 
*/ 
private void close() { 
run = false; 
isClose = true; 
threadPool.shutdown(); 
try { 
if (ss != null) { 
ss.close(); 
} 
if (selector != null) { 
selector.close(); 
} 
} catch (IOException e) { 
e.printStackTrace(); 
} 
} 
/** 
* 启动选择器监听客户端事件 
*/ 
private void start() { 
threadPool.execute(new Runnable() { 
@Override 
public void run() { 
try { 
while (run) { 
if (selector.select(10) == 0) { 
continue; 
} 
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); 
while (iterator.hasNext()) { 
SelectionKey selectedKey = iterator.next(); 
iterator.remove(); 
try { 
if (selectedKey.isReadable()) { 
if (selectionKeyMap.get(selectedKey.hashCode()) != selectedKey) { 
selectionKeyMap.put(selectedKey.hashCode(), selectedKey); 
threadPool.execute(new ReadClientSocketHandler(selectedKey)); 
} 
} else if (selectedKey.isWritable()) { 
SocketChannel serverSocketChannel = (SocketChannel) selectedKey.channel(); 
selectedKey.interestOps(SelectionKey.OP_READ); 
List<Object> list = responseMessageQueue.get(selectedKey.hashCode()); 
if (list == null) { 
list = new LinkedList<Object>(); 
responseMessageQueue.put(selectedKey.hashCode(), list); 
} 
while (list.size() > 0) { 
Object responseMessage = list.remove(0); 
if (responseMessage != null) { 
threadPool.execute(new WriteClientSocketHandler(serverSocketChannel, 
responseMessage)); 
} 
} 
} else if (selectedKey.isAcceptable()) { 
ServerSocketChannel ssc = (ServerSocketChannel) selectedKey.channel(); 
SocketChannel clientSocket = ssc.accept(); 
if (clientSocket != null) { 
clientSocket.configureBlocking(false); 
clientSocket.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); 
} 
} 
} catch (CancelledKeyException cc) { 
selectedKey.cancel(); 
int hashCode = selectedKey.hashCode(); 
selectionKeyMap.remove(hashCode); 
responseMessageQueue.remove(hashCode); 
} 
} 
} 
} catch (Exception e) { 
e.printStackTrace(); 
close(); 
} 
} 
}); 
} 
/** 
* 响应数据给客户端线程 
* 
* @author haoguo 
* 
*/ 
private class WriteClientSocketHandler implements Runnable { 
SocketChannel client; 
Object respnoseMessage; 
WriteClientSocketHandler(SocketChannel client, Object respnoseMessage) { 
this.client = client; 
this.respnoseMessage = respnoseMessage; 
} 
@Override 
public void run() { 
byte[] responseByteData = null; 
String logResponseString = ""; 
if (respnoseMessage instanceof byte[]) { 
responseByteData = (byte[]) respnoseMessage; 
logResponseString = new String(responseByteData); 
} else if (respnoseMessage instanceof String) { 
logResponseString = (String) respnoseMessage; 
responseByteData = logResponseString.getBytes(); 
} 
if (responseByteData == null || responseByteData.length == 0) { 
System.out.println("响应的数据为空"); 
return; 
} 
try { 
client.write(ByteBuffer.wrap(responseByteData)); 
System.out.println("server响应客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + logResponseString 
+ "]"); 
} catch (IOException e) { 
e.printStackTrace(); 
try { 
SelectionKey selectionKey = client.keyFor(selector); 
if (selectionKey != null) { 
selectionKey.cancel(); 
int hashCode = selectionKey.hashCode(); 
responseMessageQueue.remove(hashCode); 
} 
if (client != null) { 
client.close(); 
} 
} catch (IOException e1) { 
e1.printStackTrace(); 
} 
} 
} 
} 
/** 
* 读客户端发送数据线程 
* 
* @author haoguo 
* 
*/ 
private class ReadClientSocketHandler implements Runnable { 
private SocketChannel client; 
private ByteBuffer tmp = ByteBuffer.allocate(1024); 
private SelectionKey selectionKey; 
int hashCode; 
ReadClientSocketHandler(SelectionKey selectionKey) { 
this.selectionKey = selectionKey; 
this.client = (SocketChannel) selectionKey.channel(); 
this.hashCode = selectionKey.hashCode(); 
} 
@Override 
public void run() { 
try { 
tmp.clear(); 
byte[] data = new byte[0]; 
int len = -1; 
while ((len = client.read(tmp)) > 0) { 
data = Arrays.copyOf(data, data.length + len); 
System.arraycopy(tmp.array(), 0, data, data.length - len, len); 
tmp.rewind(); 
} 
if (data.length == 0) { 
return; 
} 
String readData = new String(data); 
System.out.println("接收到客户端[" + hashCode + "]数据 :[" + readData.substring(0, 3) + "]"); 
// dosomthing 
byte[] response = ("response" + readData.substring(0, 3)).getBytes(); 
List<Object> list = responseMessageQueue.get(hashCode); 
list.add(response); 
client.register(selector, SelectionKey.OP_WRITE); 
// client.register(selector, SelectionKey.OP_WRITE, response); 
} catch (IOException e) { 
System.out.println("客户端[" + selectionKey.hashCode() + "]关闭了连接"); 
try { 
SelectionKey selectionKey = client.keyFor(selector); 
if (selectionKey != null) { 
selectionKey.cancel(); 
} 
if (client != null) { 
client.close(); 
} 
} catch (IOException e1) { 
e1.printStackTrace(); 
} 
} finally { 
selectionKeyMap.remove(hashCode); 
} 
} 
} 
public static void main(String[] args) { 
Server server = new Server(); 
server.start(); 
} 
}

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/10535.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论