View Javadoc

1   /*
2    Copyright 2007 Ramon Servadei
3   
4    Licensed under the Apache License, Version 2.0 (the "License");
5    you may not use this file except in compliance with the License.
6    You may obtain a copy of the License at
7   
8    http://www.apache.org/licenses/LICENSE-2.0
9   
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15   */
16  package fulmine.distribution.connection.tcp;
17  
18  import static fulmine.distribution.connection.tcp.ProtocolMessageConstants.CONNECTION_MSG_TYPE;
19  import static fulmine.distribution.connection.tcp.ProtocolMessageConstants.DELIMITER;
20  import static fulmine.util.Utils.COLON;
21  import static fulmine.util.Utils.EMPTY_STRING;
22  import static fulmine.util.Utils.logException;
23  
24  import java.io.IOException;
25  import java.net.InetAddress;
26  import java.net.InetSocketAddress;
27  import java.net.UnknownHostException;
28  import java.nio.channels.SelectionKey;
29  import java.nio.channels.ServerSocketChannel;
30  import java.nio.channels.SocketChannel;
31  
32  import fulmine.AbstractLifeCycle;
33  import fulmine.context.IFrameworkContext;
34  import fulmine.distribution.connection.IConnectionBroker;
35  import fulmine.distribution.connection.IConnectionParameters;
36  import fulmine.event.EventFrameExecution;
37  import fulmine.model.container.IContainer;
38  import fulmine.model.field.IntegerField;
39  import fulmine.model.field.StringField;
40  import fulmine.protocol.specification.ByteWriter;
41  import fulmine.util.io.AbstractSelectionKeyTask;
42  import fulmine.util.io.ISelectorTasksStateListener;
43  import fulmine.util.io.SelectorTasks;
44  import fulmine.util.log.AsyncLog;
45  
46  /**
47   * A connection broker implementation based on TCP/IP sockets. This has a single
48   * thread that handles input and output. This should be adequate as a context is
49   * usually the source or sink of data.
50   * <p>
51   * The TCP parameters are controlled by:
52   * <ul>
53   * <li>{@link #CONTEXT_IP_ADDRESS}
54   * <li>{@link #CONTEXT_TCP_PORT}
55   * <li>{@link #CONTEXT_RCV_BUFFER}
56   * </ul>
57   * 
58   * @author Ramon Servadei
59   */
60  public final class TcpConnectionBroker extends AbstractLifeCycle implements
61      IConnectionBroker
62  {
63      final static AsyncLog LOG = new AsyncLog(TcpConnectionBroker.class);
64  
65      /**
66       * The system property for the IP address of the context server socket. On a
67       * multi-homed host, this is useful to control which address the server
68       * socket binds to. Use <code>null</code> to accept on all addresses the
69       * host uses.
70       */
71      public static final String CONTEXT_IP_ADDRESS = "context.ip.address";
72  
73      /** The system property for the TCP port */
74      public static final String CONTEXT_TCP_PORT = "context.tcp.port";
75  
76      /** The default for the TCP port */
77      public static final int DEFAULT_CONTEXT_TCP_PORT = 16207;
78  
79      /**
80       * The system property for overriding the default receive buffer size for
81       * the server port. This must be specified in units of bytes.
82       */
83      public static final String CONTEXT_RCV_BUFFER = "context.tcp.buffer.rcv";
84  
85      /** The host name */
86      public final static String HOST_NAME;
87      static
88      {
89          String name = null;
90          try
91          {
92              name = InetAddress.getLocalHost().getHostName();
93          }
94          catch (UnknownHostException e)
95          {
96              LOG.fatal("Could not get host name", e);
97              System.exit(1);
98          }
99          HOST_NAME = name;
100     }
101 
102     /** The IP address for the TCP/IP socket of this context */
103     private final String address;
104 
105     /** The port number for the TCP/IP socket of this context */
106     private final int port;
107 
108     /** The server socket channel of this context */
109     ServerSocketChannel serverSocketChannel;
110 
111     /** The selector tasks */
112     private final SelectorTasks selectorTasks;
113 
114     /** The task that handles new socket connections to the server socket */
115     private final ConnectionAcceptTask connectionAcceptTask;
116 
117     /** The thread that handles all the socket IO activity */
118     private Thread connectionThread;
119 
120     /** The context to use when subscribing for events */
121     final IFrameworkContext context;
122 
123     /**
124      * Construct the {@link TcpConnectionBroker} using parameters for the
125      * identity, IP address and TCP port.
126      * 
127      * @param context
128      *            The context to use when subscribing for events
129      * @param address
130      *            the IP address of the host of this {@link TcpConnectionBroker}
131      *            , <code>null</code> for the host address of the local host
132      * @param port
133      *            the TCP port for this{@link TcpConnectionBroker},
134      *            <code>0</code> for {@link #DEFAULT_CONTEXT_TCP_PORT}
135      */
136     public TcpConnectionBroker(IFrameworkContext context, String address,
137         int port)
138     {
139         super();
140         this.context = context;
141         this.address =
142             address == null ? TcpConnectionBroker.HOST_NAME : address;
143         this.port =
144             port <= 0 ? TcpConnectionBroker.DEFAULT_CONTEXT_TCP_PORT : port;
145         this.selectorTasks = new SelectorTasks();
146         this.connectionAcceptTask = new ConnectionAcceptTask();
147         this.selectorTasks.setStateListener(this.connectionAcceptTask);
148 
149         if (getLog().isInfoEnabled())
150         {
151             getLog().info("Created " + this);
152         }
153     }
154 
155     /**
156      * Default constructor that reads system properties for the IP address and
157      * TCP port to use.
158      * 
159      * @param context
160      *            The context to use when subscribing for events
161      * @see #CONTEXT_IP_ADDRESS
162      * @see #CONTEXT_TCP_PORT
163      * @see #DEFAULT_CONTEXT_TCP_PORT
164      */
165     public TcpConnectionBroker(IFrameworkContext context)
166     {
167         this(context, System.getProperty(
168             TcpConnectionBroker.CONTEXT_IP_ADDRESS,
169             TcpConnectionBroker.HOST_NAME),
170             Integer.parseInt(System.getProperty(
171                 TcpConnectionBroker.CONTEXT_TCP_PORT, EMPTY_STRING
172                     + TcpConnectionBroker.DEFAULT_CONTEXT_TCP_PORT)));
173     }
174 
175     public String getAddress()
176     {
177         return this.address;
178     }
179 
180     public int getPort()
181     {
182         return this.port;
183     }
184 
185     @Override
186     protected void doDestroy()
187     {
188         if (getLog().isInfoEnabled())
189         {
190             getLog().info("Destroying " + this);
191         }
192         if (this.serverSocketChannel != null)
193         {
194             try
195             {
196                 this.serverSocketChannel.close();
197             }
198             catch (IOException e)
199             {
200                 if (getLog().isDebugEnabled())
201                 {
202                     getLog().debug("Could not close " + this, e);
203                 }
204             }
205         }
206         this.selectorTasks.destroy();
207         if (getLog().isInfoEnabled())
208         {
209             getLog().info("Destroyed " + this);
210         }
211     }
212 
213     // synchronized to ensure only 1 thread creates a connection at a time
214     public synchronized void connect(IConnectionParameters connectionParameters)
215     {
216         TcpConnection connection = null;
217 
218         if (!isActive())
219         {
220             return;
221         }
222 
223         TcpConnectionParameters parameters =
224             (TcpConnectionParameters) connectionParameters;
225 
226         connection =
227             new TcpConnection(this.context, this, parameters.getIdentity(),
228                 parameters.getRemoteHostAddress(),
229                 parameters.getRemoteHostTcpPort(),
230                 parameters.getRemoteContextHashCode());
231         if (getLog().isInfoEnabled())
232         {
233             getLog().info("(->) Connecting to outbound " + connection);
234         }
235         connection.start();
236 
237         if (connection.getSocketChannel() != null)
238         {
239             registerConnection(connection, connection.getSocketChannel());
240             if (getLog().isInfoEnabled())
241             {
242                 getLog().info("(->) Registered outbound " + connection);
243             }
244             // send the connection 'handshake'
245             connection.send(ByteWriter.getBytes(CONNECTION_MSG_TYPE + DELIMITER
246                 + this.context.getIdentity() + DELIMITER
247                 + this.context.getContextHashCode()));
248         }
249         else
250         {
251             if (getLog().isInfoEnabled())
252             {
253                 getLog().info("No socket for " + connection);
254             }
255         }
256     }
257 
258     private void registerConnection(TcpConnection tcpConnection,
259         SocketChannel socketChannel)
260     {
261         this.selectorTasks.register(SelectionKey.OP_READ, socketChannel,
262             tcpConnection);
263     }
264 
265     @Override
266     protected AsyncLog getLog()
267     {
268         return LOG;
269     }
270 
271     @Override
272     protected void doStart()
273     {
274         try
275         {
276             if (getLog().isInfoEnabled())
277             {
278                 getLog().info("Starting " + this);
279             }
280             this.serverSocketChannel = ServerSocketChannel.open();
281             this.serverSocketChannel.configureBlocking(false);
282             if (System.getProperty(CONTEXT_RCV_BUFFER) != null)
283             {
284                 this.serverSocketChannel.socket().setReceiveBufferSize(
285                     Integer.parseInt(System.getProperty(CONTEXT_RCV_BUFFER)));
286             }
287             final InetSocketAddress endpoint =
288                 new InetSocketAddress(this.address, this.port);
289             this.serverSocketChannel.socket().bind(endpoint);
290             this.selectorTasks.openSelector();
291             this.connectionThread =
292                 new Thread(this.selectorTasks.getRunnable(), this.toString());
293             this.connectionThread.setDaemon(true);
294             this.connectionThread.start();
295         }
296         catch (IOException e)
297         {
298             getLog().fatal("Could not open channel for " + this, e);
299             // super fatal
300             System.exit(1);
301         }
302         if (getLog().isInfoEnabled())
303         {
304             getLog().info("Started " + this);
305         }
306         final IContainer systemInfo = this.context.getSystemInfo();
307         systemInfo.beginFrame(new EventFrameExecution());
308         try
309         {
310             systemInfo.add(new StringField(CONTEXT_IP_ADDRESS, getAddress()));
311             systemInfo.add(new IntegerField(CONTEXT_TCP_PORT, getPort()));
312         }
313         finally
314         {
315             systemInfo.endFrame();
316         }
317     }
318 
319     SelectorTasks getSelectorTasks()
320     {
321         return this.selectorTasks;
322     }
323 
324     /**
325      * Task that handles new socket connections to this context's server socket.
326      * It creates a {@link TcpConnection} and registers this against the
327      * {@link SocketChannel} that wraps the underlying new socket connection.
328      * 
329      * @author Ramon Servadei
330      * 
331      */
332     private final class ConnectionAcceptTask extends AbstractSelectionKeyTask
333         implements ISelectorTasksStateListener
334     {
335         public ConnectionAcceptTask()
336         {
337             super();
338         }
339 
340         public void run()
341         {
342             if (TcpConnectionBroker.this.isActive()
343                 && getSelectionKey().isAcceptable())
344             {
345                 try
346                 {
347                     SocketChannel conn =
348                         TcpConnectionBroker.this.serverSocketChannel.accept();
349 
350                     if (LOG.isInfoEnabled())
351                     {
352                         LOG.info("(<-) Accepted inbound " + conn);
353                     }
354                     conn.configureBlocking(false);
355                     final String address =
356                         conn.socket().getInetAddress().getHostName();
357                     final int port = conn.socket().getPort();
358                     final TcpConnection connection =
359                         new TcpConnection(TcpConnectionBroker.this.context,
360                             TcpConnectionBroker.this, "(pending...)", address,
361                             port, 0);
362                     connection.setOutbound(false);
363                     connection.setSocketChannel(conn);
364                     connection.start();
365                     registerConnection(connection,
366                         connection.getSocketChannel());
367                     if (LOG.isInfoEnabled())
368                     {
369                         LOG.info("(<-) Registered inbound " + conn);
370                     }
371                 }
372                 catch (Exception e)
373                 {
374                     logException(getLog(), this
375                         + " could not accept connection", e);
376                 }
377             }
378         }
379 
380         public void selectorOpened(SelectorTasks tasks)
381         {
382             if (TcpConnectionBroker.this.isActive())
383             {
384                 // (re)register
385                 TcpConnectionBroker.this.selectorTasks.register(
386                     SelectionKey.OP_ACCEPT,
387                     TcpConnectionBroker.this.serverSocketChannel,
388                     TcpConnectionBroker.this.connectionAcceptTask);
389             }
390         }
391     }
392 
393     @Override
394     public String toString()
395     {
396         return getClass().getSimpleName() + "<" + this.context.getIdentity()
397             + COLON + this.address + COLON + this.port + ">";
398     }
399 }