1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */
22
23 package org.jboss.remoting.stream;
24
25 import org.jboss.logging.Logger;
26 import org.jboss.remoting.InvocationRequest;
27 import org.jboss.remoting.InvokerLocator;
28 import org.jboss.remoting.ServerInvocationHandler;
29 import org.jboss.remoting.ServerInvoker;
30 import org.jboss.remoting.callback.InvokerCallbackHandler;
31 import org.jboss.remoting.transport.Connector;
32 import org.jboss.remoting.transport.PortUtil;
33 import org.jboss.remoting.util.SecurityUtility;
34
35 import javax.management.MBeanServer;
36 import java.io.IOException;
37 import java.io.InputStream;
38 import java.net.InetAddress;
39 import java.net.UnknownHostException;
40 import java.security.AccessController;
41 import java.security.PrivilegedActionException;
42 import java.security.PrivilegedExceptionAction;
43
44 /**
45 * This is a helper class that runs internal to remoting on the
46 * client side. It contains a reference to a local input stream
47 * and creates a remoting server to receive calls from a target
48 * remoting server (via calls from a StreamHandler initiated by a
49 * server invoker handler).
50 * <p/>
51 * NOTE: That once this class receives the close() method called
52 * from the server, it will also stop and destroy the internal
53 * remoting server, since is assumed there will be no more callbacks
54 * (since the stream itself is closed).
55 *
56 * @author <a href="mailto:tom.elrod@jboss.com">Tom Elrod</a>
57 */
58 public class StreamServer
59 {
60 private InputStream streamSource = null;
61
62 private String transport = "socket";
63 private String host = "localhost";
64 private int port = 5405;
65
66 private Connector connector = null;
67
68 private boolean internalConnector = true;
69
70 private static final Logger log = Logger.getLogger(StreamServer.class);
71
72 public static final String STREAM_TRANSPORT_KEY = "remoting.stream.transport";
73 public static final String STREAM_HOST_KEY = "remoting.stream.host";
74 public static final String STREAM_PORT_KEY = "remoting.stream.port";
75
76
77 /**
78 * Creates the server wrapped around the specified input stream.
79 * This will create the remoting server as well.
80 *
81 * @param stream
82 * @throws Exception
83 */
84 public StreamServer(InputStream stream) throws Exception
85 {
86 this.streamSource = stream;
87 String locatorURI = getLocatorURI();
88 setupServer(locatorURI);
89 }
90
91 public StreamServer(InputStream stream, InvokerLocator locator) throws Exception
92 {
93 this.streamSource = stream;
94 setupServer(locator.getLocatorURI());
95 }
96
97 public StreamServer(InputStream stream, Connector connector) throws Exception
98 {
99 this.streamSource = stream;
100 this.connector = connector;
101 if(connector != null)
102 {
103 if(!connector.isStarted())
104 {
105 throw new IllegalStateException("Connector (" + connector + ") passed to act as stream server has not been started.");
106 }
107 ServerInvocationHandler invocationHandler = new Handler(connector);
108 connector.addInvocationHandler("stream", invocationHandler);
109 internalConnector = false;
110 }
111 else
112 {
113 throw new NullPointerException("Connector passed to act as stream server can not be null.");
114 }
115 }
116
117 private String getLocatorURI() throws IOException
118 {
119 // check for system properties for locator values
120 transport = getSystemProperty(STREAM_TRANSPORT_KEY, transport);
121 try
122 {
123 host = getLocalHostName();
124 }
125 catch(UnknownHostException e)
126 {
127 try
128 {
129 InetAddress localAddress = getLocalHost();
130 host = localAddress.getHostAddress();
131 }
132 catch(UnknownHostException e1)
133 {
134 log.error("Stream server could not determine local host or address.");
135 }
136 }
137
138 host = getSystemProperty(STREAM_HOST_KEY, host);
139 String defaultPort = "" + PortUtil.findFreePort(host);
140 String sPort = getSystemProperty(STREAM_PORT_KEY, defaultPort);
141
142 try
143 {
144 port = Integer.parseInt(sPort);
145 }
146 catch(NumberFormatException e)
147 {
148 log.error("Stream server could not convert specified port " + sPort + " to a number.");
149 }
150
151 return transport + "://" + host + ":" + port;
152 }
153
154 /**
155 * Gets the locator to call back on this server to get the inputstream data.
156 *
157 * @return
158 * @throws Exception
159 */
160 public String getInvokerLocator() throws Exception
161 {
162 String locator = null;
163
164 if(connector != null)
165 {
166 locator = connector.getInvokerLocator();
167 }
168 return locator;
169 }
170
171 public void setupServer(String locatorURI) throws Exception
172 {
173 InvokerLocator locator = new InvokerLocator(locatorURI);
174
175 connector = new Connector();
176 connector.setInvokerLocator(locator.getLocatorURI());
177 connector.create();
178
179 ServerInvocationHandler invocationHandler = new Handler(connector);
180 connector.addInvocationHandler("stream", invocationHandler);
181
182 connector.start();
183
184 }
185
186 /**
187 * Handler for accepting method calls on the input stream and perform the coresponding
188 * method call on the original input stream and returning the data.
189 */
190 public class Handler implements ServerInvocationHandler
191 {
192 private Connector connector = null;
193
194 public Handler(Connector connector)
195 {
196 this.connector = connector;
197 }
198
199 public Object invoke(InvocationRequest invocation) throws Throwable
200 {
201 Object obj = invocation.getParameter();
202
203 // will expect the parameter to ALWAYS be of type StreamCallPaylod
204 if(obj instanceof StreamCallPayload)
205 {
206 StreamCallPayload payload = (StreamCallPayload) obj;
207 String method = payload.getMethod();
208
209 if(StreamHandler.READ.equals(method))
210 {
211 int i = streamSource.read();
212 return new Integer(i);
213 }
214 else if(StreamHandler.AVAILABLE.equals(method))
215 {
216 int i = streamSource.available();
217 return new Integer(i);
218 }
219 else if(StreamHandler.CLOSE.equals(method))
220 {
221 streamSource.close();
222 if(connector != null && internalConnector)
223 {
224 connector.stop();
225 connector.destroy();
226 }
227 }
228 else if(StreamHandler.RESET.equals(method))
229 {
230 streamSource.reset();
231 }
232 else if(StreamHandler.MARKSUPPORTED.equals(method))
233 {
234 boolean b = streamSource.markSupported();
235 return new Boolean(b);
236 }
237 else if(StreamHandler.MARKREADLIMIT.equals(method))
238 {
239 Object[] param = payload.getParams();
240 Integer intr = (Integer) param[0];
241 int readLimit = intr.intValue();
242 streamSource.mark(readLimit);
243 }
244 else if(StreamHandler.SKIP.equals(method))
245 {
246 Object[] param = payload.getParams();
247 Long lg = (Long) param[0];
248 long n = lg.longValue();
249 long ret = streamSource.skip(n);
250 return new Long(ret);
251 }
252 else if(StreamHandler.READBYTEARRAY.equals(method))
253 {
254 Object[] param = payload.getParams();
255 byte[] byteParam = (byte[]) param[0];
256 int i = streamSource.read(byteParam);
257 StreamCallPayload ret = new StreamCallPayload(StreamHandler.READBYTEARRAY);
258 ret.setParams(new Object[]{byteParam, new Integer(i)});
259 return ret;
260 }
261 else
262 {
263 throw new Exception("Unsupported method call - " + method);
264 }
265 }
266 else
267 {
268 log.error("Can not process invocation request because is not of type StreamCallPayload.");
269 throw new Exception("Invalid payload type. Must be of type StreamCallPayload.");
270 }
271 return null;
272 }
273
274 /**
275 * Adds a callback handler that will listen for callbacks from
276 * the server invoker handler.
277 *
278 * @param callbackHandler
279 */
280 public void addListener(InvokerCallbackHandler callbackHandler)
281 {
282 // NO OP as do not handling callback listeners in this example
283 }
284
285 /**
286 * Removes the callback handler that was listening for callbacks
287 * from the server invoker handler.
288 *
289 * @param callbackHandler
290 */
291 public void removeListener(InvokerCallbackHandler callbackHandler)
292 {
293 // NO OP as do not handling callback listeners in this example
294 }
295
296 /**
297 * set the mbean server that the handler can reference
298 *
299 * @param server
300 */
301 public void setMBeanServer(MBeanServer server)
302 {
303 // NO OP as do not need reference to MBeanServer for this handler
304 }
305
306 /**
307 * set the invoker that owns this handler
308 *
309 * @param invoker
310 */
311 public void setInvoker(ServerInvoker invoker)
312 {
313 // NO OP as do not need reference back to the server invoker
314 }
315 }
316
317 static private String getSystemProperty(final String name, final String defaultValue)
318 {
319 if (SecurityUtility.skipAccessControl())
320 return System.getProperty(name, defaultValue);
321
322 String value = null;
323 try
324 {
325 value = (String)AccessController.doPrivileged( new PrivilegedExceptionAction()
326 {
327 public Object run() throws Exception
328 {
329 return System.getProperty(name, defaultValue);
330 }
331 });
332 }
333 catch (PrivilegedActionException e)
334 {
335 throw (RuntimeException) e.getCause();
336 }
337
338 return value;
339 }
340
341 static private InetAddress getLocalHost() throws UnknownHostException
342 {
343 if (SecurityUtility.skipAccessControl())
344 {
345 try
346 {
347 return InetAddress.getLocalHost();
348 }
349 catch (IOException e)
350 {
351 return InetAddress.getByName("127.0.0.1");
352 }
353 }
354
355 try
356 {
357 return (InetAddress) AccessController.doPrivileged( new PrivilegedExceptionAction()
358 {
359 public Object run() throws IOException
360 {
361 try
362 {
363 return InetAddress.getLocalHost();
364 }
365 catch (IOException e)
366 {
367 return InetAddress.getByName("127.0.0.1");
368 }
369 }
370 });
371 }
372 catch (PrivilegedActionException e)
373 {
374 throw (UnknownHostException) e.getCause();
375 }
376 }
377
378 static private String getLocalHostName() throws UnknownHostException
379 {
380 if (SecurityUtility.skipAccessControl())
381 {
382 return getLocalHost().getHostName();
383 }
384
385 try
386 {
387 return (String) AccessController.doPrivileged( new PrivilegedExceptionAction()
388 {
389 public Object run() throws IOException
390 {
391 InetAddress address = null;
392 try
393 {
394 address = InetAddress.getLocalHost();
395 }
396 catch (IOException e)
397 {
398 address = InetAddress.getByName("127.0.0.1");
399 }
400
401 return address.getHostName();
402 }
403 });
404 }
405 catch (PrivilegedActionException e)
406 {
407 throw (UnknownHostException) e.getCause();
408 }
409 }
410 }