/* GothMemberD.java * * Andy Goth * * This pile of bits is available under the GNU General Public License. * * Share and enjoy. */ import java.io.*; import java.net.*; import java.nio.*; import java.nio.channels.*; import java.nio.charset.*; import java.text.*; import java.util.*; /** This is the class that does the things. */ public class GothMemberD { /** The possible connection states. */ private enum State { /** TCP connection established, initializing User object. */ INITIALIZING, /** Awaiting login. */ AUTHENTICATING, /** User logged in, awaiting EXIT. */ CHATTING, /** User sent EXIT, awaiting TCP disconnect. */ DISCONNECTING, /** User has disconnected. */ FINISHED } /** A chat room. */ private class Room { List users; ServerSocketChannel tcp_server_channel; SocketAddress tcp_server_address; SelectionKey tcp_server_key; /** Listens on 'tcp_server_address'. */ Room(SocketAddress tcp_server_address) throws IOException { this.tcp_server_address = tcp_server_address; users = new LinkedList(); /* Listen. */ tcp_server_channel = ServerSocketChannel.open(); tcp_server_channel.configureBlocking(false); tcp_server_key = tcp_server_channel.register(selector, SelectionKey.OP_ACCEPT, this); tcp_server_channel.socket().bind(tcp_server_address); } /** Accepts an incoming connection. */ void tcp_server_acceptable() throws IOException { SocketChannel channel; User user; channel = tcp_server_channel.accept(); channel.configureBlocking(false); user = new User(this, channel); connect(user); } /** Handles 'user' connecting to the Room server. */ void connect(User user) throws IOException { users.add(user); user.state = State.AUTHENTICATING; } /** Handles 'user' attempting to join the Room. */ void join(User user) throws IOException { InetSocketAddress address; String welcome; /* Check for nickname collisions. */ for (User other : users) { if (other.state == State.CHATTING) { if (user.nickname.equals(other.nickname)) { /* Uh oh, nickname collision. */ user.send_tcp("RJCT " + user.nickname); user.state = State.DISCONNECTING; return; } } } /* No collision. Announce this new User to others. */ address = (InetSocketAddress)user.udp_address; broadcast_udp("JOIN " + user.nickname + " " + address.getAddress().getHostAddress() + " " + address.getPort()); /* Admit this new User to the room. */ user.state = State.CHATTING; /* Tell this new User who all is in the room. */ welcome = null; for (User other : users) { if (other.state == State.CHATTING) { address = (InetSocketAddress)other.udp_address; if (welcome == null) { welcome = "ACPT "; } else { welcome += ":"; } welcome += other.nickname + " " + address.getAddress().getHostAddress() + " " + address.getPort(); } } user.send_tcp(welcome); } /** Handles 'user' attempting to depart from the Room. */ void depart(User user) throws IOException { /* Tell all Users, including the departing User, of the User's * departure. */ broadcast_udp("EXIT " + user.nickname); user.state = State.DISCONNECTING; } /** Handles 'user' disconnecting from the Room server. */ void disconnect(User user) throws IOException { if (user.state == State.CHATTING) { /* Abnormal connection termination! Notify. */ user.state = State.FINISHED; depart(user); } /* Okie, done. */ users.remove(user); } /** Sends 'message' to all CHATTING Users. */ void broadcast_udp(String message) throws IOException { for (User user : users) { if (user.state == State.CHATTING) { user.send_udp(message); } } } } /** A user in the room. */ private class User { Room room; String nickname; SocketAddress udp_address; State state; SocketChannel tcp_channel; ByteBuffer tcp_recv_buf; ByteBuffer tcp_send_buf; SelectionKey tcp_key; DatagramChannel udp_channel; Queue udp_send_buf; SelectionKey udp_key; /** Creates a new User in 'room' who just connected on 'tcp_channel'. */ User(Room room, SocketChannel tcp_channel) throws IOException { this.room = room; this.nickname = null; this.udp_address = null; this.state = State.INITIALIZING; this.tcp_channel = tcp_channel; this.tcp_recv_buf = ByteBuffer.allocate(BUFFER_SIZE); this.tcp_send_buf = ByteBuffer.allocate(BUFFER_SIZE); this.tcp_key = tcp_channel.register(selector, SelectionKey.OP_READ, this); this.udp_channel = null; this.udp_send_buf = new LinkedList(); this.udp_key = null; log(tcp_channel, "connect"); } /** Handles incoming TCP data. */ void tcp_readable() throws IOException { int old_pos, pos; boolean seen_cr; old_pos = tcp_recv_buf.position(); if (old_pos != 0) { /* Back up one position to catch a possible CR. */ --old_pos; } /* Get the available bits. */ if (tcp_channel.read(tcp_recv_buf) == -1) { /* Disconnect? */ recv(null); return; } /* Prepare to process read bytes. */ tcp_recv_buf.flip(); /* Scan for line/packet breaks. */ seen_cr = false; for (pos = old_pos; pos < tcp_recv_buf.limit(); ++pos) { byte cur_byte; ByteBuffer slice; cur_byte = tcp_recv_buf.get(pos); if (cur_byte == (byte)'\r') { /* Remember to skip this CR if followed by LF. */ seen_cr = true; } else { if (cur_byte == (byte)'\n') { /* Got a line! Create a slice consisting of only this * line (not including \r\n). */ tcp_recv_buf.rewind(); slice = tcp_recv_buf.slice(); slice.limit(pos - (seen_cr ? 1 : 0)); /* Now decode into a String and process. */ recv(decoder.decode(slice).toString()); /* Remove the line from the buffer. */ tcp_recv_buf.position(pos + 1); tcp_recv_buf.compact(); /* We're still draining the buffer, so re-flip. */ pos = 0; tcp_recv_buf.flip(); } seen_cr = false; } } /* Prepare for further writes. */ tcp_recv_buf.position(tcp_recv_buf.limit()); tcp_recv_buf.limit(tcp_recv_buf.capacity()); } /** Writes queued outgoing TCP data. */ void tcp_writable() throws IOException { /* Write whatever can be written, and discard the written data from * the buffer. */ tcp_send_buf.flip(); tcp_channel.write(tcp_send_buf); tcp_send_buf.compact(); if (tcp_send_buf.position() != 0) { /* There are still bytes to write. */ tcp_key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } else { /* The buffer has been drained. */ tcp_key.interestOps(SelectionKey.OP_READ); } } /** Writes queued outgoing UDP data. */ void udp_writable() throws IOException { ByteBuffer buffer; /* Send to UDP server. */ buffer = udp_send_buf.remove(); buffer.flip(); udp_channel.write(buffer); if (udp_send_buf.isEmpty()) { /* The buffer has been drained. */ udp_key.interestOps(0); } } /** Sends 'message' to the User's TCP port. */ void send_tcp(String message) throws IOException, CharacterCodingException { CoderResult result; log(tcp_channel, "tcp tx: " + message); /* Convert to bytes. */ result = encoder.encode(CharBuffer.wrap(message), tcp_send_buf, true); if (result != CoderResult.UNDERFLOW) { result.throwException(); } /* Packet delimiter. */ tcp_send_buf.put((byte)'\r').put((byte)'\n'); /* Enqueue send to TCP server. */ tcp_key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } /** Sends 'message' to the User's UDP port. */ void send_udp(String message) throws IOException, CharacterCodingException { CoderResult result; ByteBuffer buffer; log(tcp_channel, "udp tx: " + message); /* Convert to bytes. */ buffer = ByteBuffer.allocate(BUFFER_SIZE); result = encoder.encode(CharBuffer.wrap(message), buffer, true); if (result != CoderResult.UNDERFLOW) { result.throwException(); } /* Packet delimiter. */ buffer.put((byte)'\r').put((byte)'\n'); /* Enqueue send to UDP server. */ udp_send_buf.add(buffer); udp_key.interestOps(SelectionKey.OP_WRITE); } /** Handles the receipt of 'message' from the User's TCP port; a null * 'messsage' indicates disconnection. */ void recv(String message) throws IOException { String cmd; String arg; int index; /* Divide the message into command and argument. */ if (message == null) { cmd = null; arg = null; log(tcp_channel, "disconnect"); } else { String parts[] = message.split(" ", 2); if (parts.length == 1) { cmd = parts[0]; arg = null; } else { cmd = parts[0]; arg = parts[1]; } log(tcp_channel, "tcp rx: " + message); } if (cmd == null) { /* The User disconnected. */ tcp_key.cancel(); if (udp_key != null) { udp_key.cancel(); } room.disconnect(this); } else if (cmd.equals("HELO")) { if (state == State.AUTHENTICATING) { String parts[] = arg.split(" "); if (parts.length == 3) { /* The User is logging in. */ nickname = parts[0]; udp_address = new InetSocketAddress(parts[1], Integer.parseInt(parts[2])); udp_channel = DatagramChannel.open(); udp_channel.configureBlocking(false); udp_channel.connect(udp_address); udp_send_buf = new LinkedList(); udp_key = udp_channel.register(selector, 0, this); room.join(this); } } } else if (cmd.equals("EXIT")) { if (state == State.CHATTING) { if (arg == null) { /* The User is leaving. */ room.depart(this); } } } } } /** Main room server. */ private Room room; /** Channel multiplexer. */ private Selector selector; private static Charset charset; private static CharsetDecoder decoder; private static CharsetEncoder encoder; private static final int BUFFER_SIZE = 1024; private static final String CHARSET_NAME = "utf-8"; private static final boolean LOGGING = true; static { /* Initialize charset junk. */ charset = Charset.forName(CHARSET_NAME); decoder = charset.newDecoder(); encoder = charset.newEncoder(); } private static void log(String who, String message) { if (LOGGING) { System.out.println( (new SimpleDateFormat("HH:mm:ss")).format(new Date()) + " [" + who + "] " + message); } } private static void log(InetSocketAddress address, String message) { log(address.getAddress().getHostAddress() + ":" + address.getPort(), message); } private static void log(SocketChannel channel, String message) { log((InetSocketAddress)channel.socket().getRemoteSocketAddress(), message); } /** Constructor. */ public GothMemberD(SocketAddress address) throws IOException { /* Construct the channel multiplexer. */ selector = Selector.open(); /* Create the Room server object. */ room = new Room(address); } /** One iteration of the event loop. */ public void next() throws IOException { /* Wait for an event, then process all events. */ selector.select(); /* Process all selected keys. */ for (SelectionKey key : selector.selectedKeys()) { /* Incoming TCP connection. */ if (key.isAcceptable()) { ((Room)key.attachment()).tcp_server_acceptable(); continue; } /* Incoming TCP data. */ if (key.isReadable()) { ((User)key.attachment()).tcp_readable(); continue; } /* Able to write outgoing TCP or UDP data. */ if (key.isWritable()) { if (key.channel() instanceof SocketChannel) { ((User)key.attachment()).tcp_writable(); } else if (key.channel() instanceof DatagramChannel) { ((User)key.attachment()).udp_writable(); } continue; } } /* Clear all processed keys. */ selector.selectedKeys().clear(); } /** Event loop. */ public void run() throws IOException { while (true) { next(); } } /** This function is not important enough to deserve a comment. */ public static void main(String[] args) { try { int port; GothMemberD memd; /* Check for proper number of command-line arguments. */ if (args.length != 1) { System.out.println("Usage: java GothMemberD "); System.exit(1); } /* Construct the application from the command-line arguments. */ port = Integer.parseInt(args[0]); memd = new GothMemberD(new InetSocketAddress(port)); /* Run forever. */ log("system", "tcp listen: " + port); memd.run(); } catch (Exception e) { /* Uh oh. */ e.printStackTrace(); System.exit(1); } } } /* vim: set ts=4 sts=4 sw=4 tw=80 et: */