import java.io.*; import java.nio.*; import java.nio.channels.*; import java.net.*; import java.util.*; import java.nio.charset.*; import java.lang.*; public class NonBlockingServer { public Selector sel = null; public ServerSocketChannel server = null; public SocketChannel socket = null; public int port = 4900; String result = null; public NonBlockingServer() { System.out.println("Inside default ctor"); } public NonBlockingServer(int port) { System.out.println("Inside the other ctor"); port = port; } public void initializeOperations() throws IOException,UnknownHostException { System.out.println("Inside initialization"); sel = Selector.open(); server = ServerSocketChannel.open(); server.configureBlocking(false); InetAddress ia = InetAddress.getLocalHost(); InetSocketAddress isa = new InetSocketAddress(ia,port); server.socket().bind(isa); } public void startServer() throws IOException { System.out.println("Inside startserver"); initializeOperations(); System.out.println("Abt to block on select()"); SelectionKey acceptKey = server.register(sel, SelectionKey.OP_ACCEPT ); while (acceptKey.selector().select() > 0 ) { Set readyKeys = sel.selectedKeys(); Iterator it = readyKeys.iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey)it.next(); it.remove(); if (key.isAcceptable()) { System.out.println("Key is Acceptable"); ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); socket = (SocketChannel) ssc.accept(); socket.configureBlocking(false); SelectionKey another = socket.register(sel,SelectionKey.OP_READ|SelectionKey.OP_WRITE); } if (key.isReadable()) { System.out.println("Key is readable"); String ret = readMessage(key); if (ret.length() > 0) { writeMessage(socket,ret); } } if (key.isWritable()) { //System.out.println("The key is writable"); String ret = readMessage(key); socket = (SocketChannel)key.channel(); if (result.length() > 0 ) { writeMessage(socket,ret); } } } } } public void writeMessage(SocketChannel socket,String ret) { System.out.println("Inside the loop"); if (ret.equals("quit") || ret.equals("shutdown")) { return; } File file = new File(ret); try { RandomAccessFile rdm = new RandomAccessFile(file,"r"); FileChannel fc = rdm.getChannel(); ByteBuffer buffer = ByteBuffer.allocate(1024); fc.read(buffer); buffer.flip(); Charset set = Charset.forName("us-ascii"); CharsetDecoder dec = set.newDecoder(); CharBuffer charBuf = dec.decode(buffer); System.out.println(charBuf.toString()); buffer = ByteBuffer.wrap((charBuf.toString()).getBytes()); int nBytes = socket.write(buffer); System.out.println("nBytes = "+nBytes); result = null; } catch(Exception e) { e.printStackTrace(); } } public String readMessage(SelectionKey key) { int nBytes = 0; socket = (SocketChannel)key.channel(); ByteBuffer buf = ByteBuffer.allocate(1024); try { nBytes = socket.read(buf); buf.flip(); Charset charset = Charset.forName("us-ascii"); CharsetDecoder decoder = charset.newDecoder(); CharBuffer charBuffer = decoder.decode(buf); result = charBuffer.toString(); } catch(IOException e) { e.printStackTrace(); } return result; } public static void main(String args[]) { NonBlockingServer nb = new NonBlockingServer(); try { nb.startServer(); } catch (IOException e) { e.printStackTrace(); System.exit(-1); } } }
import java.nio.*; import java.nio.channels.*; import java.net.*; import java.io.*; import java.nio.channels.spi.*; import java.nio.charset.*; import java.lang.*; public class Client { public SocketChannel client = null; public InetSocketAddress isa = null; public RecvThread rt = null; public Client() { } public void makeConnection() { int result = 0; try { client = SocketChannel.open(); isa = new InetSocketAddress("liudong",4900); client.connect(isa); client.configureBlocking(false); receiveMessage(); } catch(UnknownHostException e) { e.printStackTrace(); } catch(IOException e) { e.printStackTrace(); } while ((result = sendMessage()) != -1) { } try { client.close(); System.exit(0); } catch(IOException e) { e.printStackTrace(); } } public int sendMessage() { System.out.println("Inside SendMessage"); BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); String msg = null; ByteBuffer bytebuf = ByteBuffer.allocate(1024); int nBytes = 0; try { msg = in.readLine(); System.out.println("msg is "+msg); bytebuf = ByteBuffer.wrap(msg.getBytes()); nBytes = client.write(bytebuf); System.out.println("nBytes is "+nBytes); if (msg.equals("quit") || msg.equals("shutdown")) { System.out.println("time to stop the client"); interruptThread(); try { Thread.sleep(5000); } catch(Exception e) { e.printStackTrace(); } client.close(); return -1; } } catch(IOException e) { e.printStackTrace(); } System.out.println("Wrote "+nBytes +" bytes to the server"); return nBytes; } public void receiveMessage() { rt = new RecvThread("Receive THread",client); rt.start(); } public void interruptThread() { rt.val = false; } public static void main(String args[]) { Client cl = new Client(); cl.makeConnection(); } public class RecvThread extends Thread { public SocketChannel sc = null; public boolean val = true; public RecvThread(String str,SocketChannel client) { super(str); sc = client; } public void run() { System.out.println("Inside receivemsg"); int nBytes = 0; ByteBuffer buf = ByteBuffer.allocate(2048); try { while (val) { while ( (nBytes = nBytes = client.read(buf)) > 0){ buf.flip(); Charset charset = Charset.forName("us-ascii"); CharsetDecoder decoder = charset.newDecoder(); CharBuffer charBuffer = decoder.decode(buf); String result = charBuffer.toString(); System.out.println(result); buf.flip(); } } } catch(IOException e) { e.printStackTrace(); } } } }