package fr.umlv.tcp;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * A Server that replies, for each line of text it receives from a client, 
 * with the uppercase line of the same text. This server requires two threads:
 * one is for accepting new client connections (blocking) and the other is 
 * for serving all other accepted clients (not blocking).
 * 
 * @author duris
 *
 */
public class BetterUpperCaseAcceptBlockingServer implements Runnable {
  final static int DEFAULT_PORT = 1234; 
  final static int SIZE = 1024;
  private final ServerSocketChannel serverChannel;
  
  private ConcurrentLinkedQueue<SocketChannel> channelQueue;
  
  private final Selector selector = Selector.open();
  
  /**
   * Constructs a server bound to the given port. 
   * @param port the port to bind the server socket.
   * @throws IOException If an I/O error occurs,
   * if the bind operation fails, or if the socket is already bound.
   */
  public BetterUpperCaseAcceptBlockingServer(int port) throws IOException {
    serverChannel = ServerSocketChannel.open();
    serverChannel.socket().bind(new InetSocketAddress(port));
    channelQueue = new ConcurrentLinkedQueue<SocketChannel>();
  }
  
  /**
   * Constructs a server bound to the default port. 
   * @throws IOException If an I/O error occurs,
   * if the bind operation fails, or if the socket is already bound.
   */
  public BetterUpperCaseAcceptBlockingServer() throws IOException {
    this(DEFAULT_PORT);
  }
  
  /**
   * Writes in buffer <code>out</code> the uppercase
   * version of the string found in buffer <code>in</code>.
   */
  void serve(ByteBuffer in, ByteBuffer out) throws IOException {
    // version "brutale" sans tenir compte du codage!!!
    byte[] buf = new byte[in.remaining()];
    in.get(buf);
    out.put(new String(buf).toUpperCase().getBytes());
  }
  
  /**
   * This version implements blocking server-socket
   * accept but non-blocking socket read from connections.
   */
  public void launchAcceptBlocking() throws IOException {
    // starts another thread (this.run()) to handle 
    // the channel selector that deals with accepted clients
    Thread thread = new Thread(this,"selectorThread");
    thread.start();
    
    SocketChannel clientChannel = null;
    while (true) {
      // accepts a new client connection 
      clientChannel = serverChannel.accept();
      System.err.println("New connection accepted from " + clientChannel.socket().getRemoteSocketAddress());
      
      // put this client channel into a queue in order 
      // to be registered by the selector thread
      channelQueue.add(clientChannel);
      selector.wakeup();
    }
  }
  
  /**
   * Handles the selection of ready channels and serves them.
   */
  public void run() {
    Set<SelectionKey> selectedKeys = selector.selectedKeys();
    
    // listen the selector forever 
    while (true) {
      try {
        // blocks until something is readable on a channel
        selector.select();
        
        // deals whith each ready channels
        for(Iterator<SelectionKey> iKeys = selectedKeys.iterator(); iKeys.hasNext(); ) {
          SelectionKey key = iKeys.next();
          SocketChannel clientChannel = (SocketChannel) key.channel();
          System.out.println("channel for " 
              + clientChannel.socket().getRemoteSocketAddress() 
              + " is ready to read");
          if (!useConnection(clientChannel)) {
            // end of stream
            key.cancel(); // remove the key from the whole selector
            clientChannel.close();
          }
          iKeys.remove(); // says that interest op for this selected key has been consummed
          
        }
        
        // takes into account new accepted channels
        SocketChannel sc = null;
        while((sc = channelQueue.poll()) != null) {
        	sc.configureBlocking(false);
        	sc.register(selector,SelectionKey.OP_READ);
        }
      } catch(IOException ioe) {
        ioe.printStackTrace();
      }
    }
  }
  
  /**
   * Reads and writes bytes on clientChannel.
   * Returns false if connection is terminated.
   */
  boolean useConnection(SocketChannel clientChannel) throws IOException {
    ByteBuffer bbIn = ByteBuffer.allocate(SIZE);
    ByteBuffer bbOut = ByteBuffer.allocate(SIZE);    
    int bytes = 0;
    while ((bytes = clientChannel.read(bbIn)) > 0) {
      bbIn.flip();
      serve(bbIn,bbOut);
      bbOut.flip();
      clientChannel.write(bbOut);
      bbIn.clear();
      bbOut.clear();
    }
    if (bytes==-1) {
      System.out.println("End of connection...");
      return false; // end of stream
    }
    return true; // nothing more to read by now
  }
  
  /**
   * Main for test.
   */
  public static void main(String[] args) throws IOException {
    BetterUpperCaseAcceptBlockingServer server;
    if(args.length == 0) {
      server = new BetterUpperCaseAcceptBlockingServer();
    } else {
      server = new BetterUpperCaseAcceptBlockingServer(Integer.parseInt(args[0]));
    }
    server.launchAcceptBlocking();
  }
}
