Save This Page
Home » JBoss-5.1.0 » org » jboss » remoting » stream » [javadoc | source]
    1   /*
    2   * JBoss, Home of Professional Open Source
    3   * Copyright 2005, JBoss Inc., and individual contributors as indicated
    4   * by the @authors tag. See the copyright.txt in the distribution for a
    5   * full listing of individual contributors.
    6   *
    7   * This is free software; you can redistribute it and/or modify it
    8   * under the terms of the GNU Lesser General Public License as
    9   * published by the Free Software Foundation; either version 2.1 of
   10   * the License, or (at your option) any later version.
   11   *
   12   * This software is distributed in the hope that it will be useful,
   13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
   14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
   15   * Lesser General Public License for more details.
   16   *
   17   * You should have received a copy of the GNU Lesser General Public
   18   * License along with this software; if not, write to the Free
   19   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
   20   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
   21   */
   22   
   23   package org.jboss.remoting.stream;
   24   
   25   import org.jboss.logging.Logger;
   26   import org.jboss.remoting.InvocationRequest;
   27   import org.jboss.remoting.InvokerLocator;
   28   import org.jboss.remoting.ServerInvocationHandler;
   29   import org.jboss.remoting.ServerInvoker;
   30   import org.jboss.remoting.callback.InvokerCallbackHandler;
   31   import org.jboss.remoting.transport.Connector;
   32   import org.jboss.remoting.transport.PortUtil;
   33   import org.jboss.remoting.util.SecurityUtility;
   34   
   35   import javax.management.MBeanServer;
   36   import java.io.IOException;
   37   import java.io.InputStream;
   38   import java.net.InetAddress;
   39   import java.net.UnknownHostException;
   40   import java.security.AccessController;
   41   import java.security.PrivilegedActionException;
   42   import java.security.PrivilegedExceptionAction;
   43   
   44   /**
   45    * This is a helper class that runs internal to remoting on the
   46    * client side.  It contains a reference to a local input stream
   47    * and creates a remoting server to receive calls from a target
   48    * remoting server (via calls from a StreamHandler initiated by a
   49    * server invoker handler).
   50    * <p/>
   51    * NOTE: That once this class receives the close() method called
   52    * from the server, it will also stop and destroy the internal
   53    * remoting server, since is assumed there will be no more callbacks
   54    * (since the stream itself is closed).
   55    *
   56    * @author <a href="mailto:tom.elrod@jboss.com">Tom Elrod</a>
   57    */
   58   public class StreamServer
   59   {
   60      private InputStream streamSource = null;
   61   
   62      private String transport = "socket";
   63      private String host = "localhost";
   64      private int port = 5405;
   65   
   66      private Connector connector = null;
   67   
   68      private boolean internalConnector = true;
   69   
   70      private static final Logger log = Logger.getLogger(StreamServer.class);
   71   
   72      public static final String STREAM_TRANSPORT_KEY = "remoting.stream.transport";
   73      public static final String STREAM_HOST_KEY = "remoting.stream.host";
   74      public static final String STREAM_PORT_KEY = "remoting.stream.port";
   75   
   76   
   77      /**
   78       * Creates the server wrapped around the specified input stream.
   79       * This will create the remoting server as well.
   80       *
   81       * @param stream
   82       * @throws Exception
   83       */
   84      public StreamServer(InputStream stream) throws Exception
   85      {
   86         this.streamSource = stream;
   87         String locatorURI = getLocatorURI();
   88         setupServer(locatorURI);
   89      }
   90   
   91      public StreamServer(InputStream stream, InvokerLocator locator) throws Exception
   92      {
   93         this.streamSource = stream;
   94         setupServer(locator.getLocatorURI());
   95      }
   96   
   97      public StreamServer(InputStream stream, Connector connector) throws Exception
   98      {
   99         this.streamSource = stream;
  100         this.connector = connector;
  101         if(connector != null)
  102         {
  103            if(!connector.isStarted())
  104            {
  105               throw new IllegalStateException("Connector (" + connector + ") passed to act as stream server has not been started.");
  106            }
  107            ServerInvocationHandler invocationHandler = new Handler(connector);
  108            connector.addInvocationHandler("stream", invocationHandler);
  109            internalConnector = false;
  110         }
  111         else
  112         {
  113            throw new NullPointerException("Connector passed to act as stream server can not be null.");
  114         }
  115      }
  116   
  117      private String getLocatorURI() throws IOException
  118      {
  119         // check for system properties for locator values
  120         transport = getSystemProperty(STREAM_TRANSPORT_KEY, transport);
  121         try
  122         {
  123            host = getLocalHostName();
  124         }
  125         catch(UnknownHostException e)
  126         {
  127            try
  128            {
  129               InetAddress localAddress = getLocalHost();
  130               host = localAddress.getHostAddress();
  131            }
  132            catch(UnknownHostException e1)
  133            {
  134               log.error("Stream server could not determine local host or address.");
  135            }
  136         }
  137   
  138         host = getSystemProperty(STREAM_HOST_KEY, host);
  139         String defaultPort = "" + PortUtil.findFreePort(host);
  140         String sPort = getSystemProperty(STREAM_PORT_KEY, defaultPort);
  141         
  142         try
  143         {
  144            port = Integer.parseInt(sPort);
  145         }
  146         catch(NumberFormatException e)
  147         {
  148            log.error("Stream server could not convert specified port " + sPort + " to a number.");
  149         }
  150   
  151         return transport + "://" + host + ":" + port;
  152      }
  153   
  154      /**
  155       * Gets the locator to call back on this server to get the inputstream data.
  156       *
  157       * @return
  158       * @throws Exception
  159       */
  160      public String getInvokerLocator() throws Exception
  161      {
  162         String locator = null;
  163   
  164         if(connector != null)
  165         {
  166            locator = connector.getInvokerLocator();
  167         }
  168         return locator;
  169      }
  170   
  171      public void setupServer(String locatorURI) throws Exception
  172      {
  173         InvokerLocator locator = new InvokerLocator(locatorURI);
  174   
  175         connector = new Connector();
  176         connector.setInvokerLocator(locator.getLocatorURI());
  177         connector.create();
  178   
  179         ServerInvocationHandler invocationHandler = new Handler(connector);
  180         connector.addInvocationHandler("stream", invocationHandler);
  181   
  182         connector.start();
  183   
  184      }
  185   
  186      /**
  187       * Handler for accepting method calls on the input stream and perform the coresponding
  188       * method call on the original input stream and returning the data.
  189       */
  190      public class Handler implements ServerInvocationHandler
  191      {
  192         private Connector connector = null;
  193   
  194         public Handler(Connector connector)
  195         {
  196            this.connector = connector;
  197         }
  198   
  199         public Object invoke(InvocationRequest invocation) throws Throwable
  200         {
  201            Object obj = invocation.getParameter();
  202   
  203            // will expect the parameter to ALWAYS be of type StreamCallPaylod
  204            if(obj instanceof StreamCallPayload)
  205            {
  206               StreamCallPayload payload = (StreamCallPayload) obj;
  207               String method = payload.getMethod();
  208   
  209               if(StreamHandler.READ.equals(method))
  210               {
  211                  int i = streamSource.read();
  212                  return new Integer(i);
  213               }
  214               else if(StreamHandler.AVAILABLE.equals(method))
  215               {
  216                  int i = streamSource.available();
  217                  return new Integer(i);
  218               }
  219               else if(StreamHandler.CLOSE.equals(method))
  220               {
  221                  streamSource.close();
  222                  if(connector != null && internalConnector)
  223                  {
  224                     connector.stop();
  225                     connector.destroy();
  226                  }
  227               }
  228               else if(StreamHandler.RESET.equals(method))
  229               {
  230                  streamSource.reset();
  231               }
  232               else if(StreamHandler.MARKSUPPORTED.equals(method))
  233               {
  234                  boolean b = streamSource.markSupported();
  235                  return new Boolean(b);
  236               }
  237               else if(StreamHandler.MARKREADLIMIT.equals(method))
  238               {
  239                  Object[] param = payload.getParams();
  240                  Integer intr = (Integer) param[0];
  241                  int readLimit = intr.intValue();
  242                  streamSource.mark(readLimit);
  243               }
  244               else if(StreamHandler.SKIP.equals(method))
  245               {
  246                  Object[] param = payload.getParams();
  247                  Long lg = (Long) param[0];
  248                  long n = lg.longValue();
  249                  long ret = streamSource.skip(n);
  250                  return new Long(ret);
  251               }
  252               else if(StreamHandler.READBYTEARRAY.equals(method))
  253               {
  254                  Object[] param = payload.getParams();
  255                  byte[] byteParam = (byte[]) param[0];
  256                  int i = streamSource.read(byteParam);
  257                  StreamCallPayload ret = new StreamCallPayload(StreamHandler.READBYTEARRAY);
  258                  ret.setParams(new Object[]{byteParam, new Integer(i)});
  259                  return ret;
  260               }
  261               else
  262               {
  263                  throw new Exception("Unsupported method call - " + method);
  264               }
  265            }
  266            else
  267            {
  268               log.error("Can not process invocation request because is not of type StreamCallPayload.");
  269               throw new Exception("Invalid payload type.  Must be of type StreamCallPayload.");
  270            }
  271            return null;
  272         }
  273   
  274         /**
  275          * Adds a callback handler that will listen for callbacks from
  276          * the server invoker handler.
  277          *
  278          * @param callbackHandler
  279          */
  280         public void addListener(InvokerCallbackHandler callbackHandler)
  281         {
  282            // NO OP as do not handling callback listeners in this example
  283         }
  284   
  285         /**
  286          * Removes the callback handler that was listening for callbacks
  287          * from the server invoker handler.
  288          *
  289          * @param callbackHandler
  290          */
  291         public void removeListener(InvokerCallbackHandler callbackHandler)
  292         {
  293            // NO OP as do not handling callback listeners in this example
  294         }
  295   
  296         /**
  297          * set the mbean server that the handler can reference
  298          *
  299          * @param server
  300          */
  301         public void setMBeanServer(MBeanServer server)
  302         {
  303            // NO OP as do not need reference to MBeanServer for this handler
  304         }
  305   
  306         /**
  307          * set the invoker that owns this handler
  308          *
  309          * @param invoker
  310          */
  311         public void setInvoker(ServerInvoker invoker)
  312         {
  313            // NO OP as do not need reference back to the server invoker
  314         }
  315      }
  316   
  317      static private String getSystemProperty(final String name, final String defaultValue)
  318      {
  319         if (SecurityUtility.skipAccessControl())
  320            return System.getProperty(name, defaultValue);
  321            
  322         String value = null;
  323         try
  324         {
  325            value = (String)AccessController.doPrivileged( new PrivilegedExceptionAction()
  326            {
  327               public Object run() throws Exception
  328               {
  329                  return System.getProperty(name, defaultValue);
  330               }
  331            });
  332         }
  333         catch (PrivilegedActionException e)
  334         {
  335            throw (RuntimeException) e.getCause();
  336         }
  337         
  338         return value;
  339      }
  340      
  341      static private InetAddress getLocalHost() throws UnknownHostException
  342      {
  343         if (SecurityUtility.skipAccessControl())
  344         {
  345            try
  346            {
  347               return InetAddress.getLocalHost();
  348            }
  349            catch (IOException e)
  350            {
  351               return InetAddress.getByName("127.0.0.1");
  352            }
  353         }
  354   
  355         try
  356         {
  357            return (InetAddress) AccessController.doPrivileged( new PrivilegedExceptionAction()
  358            {
  359               public Object run() throws IOException
  360               {
  361                  try
  362                  {
  363                     return InetAddress.getLocalHost();
  364                  }
  365                  catch (IOException e)
  366                  {
  367                     return InetAddress.getByName("127.0.0.1");
  368                  }
  369               }
  370            });
  371         }
  372         catch (PrivilegedActionException e)
  373         {
  374            throw (UnknownHostException) e.getCause();
  375         }
  376      }
  377      
  378      static private String getLocalHostName() throws UnknownHostException
  379      {
  380         if (SecurityUtility.skipAccessControl())
  381         {
  382            return getLocalHost().getHostName();
  383         }
  384   
  385         try
  386         {
  387            return (String) AccessController.doPrivileged( new PrivilegedExceptionAction()
  388            {
  389               public Object run() throws IOException
  390               {
  391                  InetAddress address = null;
  392                  try
  393                  {
  394                     address = InetAddress.getLocalHost();
  395                  }
  396                  catch (IOException e)
  397                  {
  398                     address = InetAddress.getByName("127.0.0.1");
  399                  }
  400                  
  401                  return address.getHostName();
  402               }
  403            });
  404         }
  405         catch (PrivilegedActionException e)
  406         {
  407            throw (UnknownHostException) e.getCause();
  408         }
  409      }
  410   }

Save This Page
Home » JBoss-5.1.0 » org » jboss » remoting » stream » [javadoc | source]