View Javadoc

1   /*
2      Copyright 2007 Ramon Servadei
3   
4      Licensed under the Apache License, Version 2.0 (the "License");
5      you may not use this file except in compliance with the License.
6      You may obtain a copy of the License at
7   
8          http://www.apache.org/licenses/LICENSE-2.0
9   
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15   */
16  package fulmine.distribution.connection.tcp;
17  
18  import static fulmine.util.Utils.COLON;
19  import static fulmine.util.Utils.COMMA_SPACE;
20  import static fulmine.util.Utils.string;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.nio.channels.SelectionKey;
25  import java.nio.channels.Selector;
26  import java.nio.channels.SocketChannel;
27  import java.util.List;
28  
29  import fulmine.context.IFrameworkContext;
30  import fulmine.distribution.connection.IConnection;
31  import fulmine.distribution.connection.IConnectionBroker;
32  import fulmine.distribution.events.ConnectionDestroyedEvent;
33  import fulmine.distribution.events.MessageConsumedEvent;
34  import fulmine.event.EventSource;
35  import fulmine.event.IEvent;
36  import fulmine.event.IEventSource;
37  import fulmine.event.listener.AbstractEventHandler;
38  import fulmine.event.listener.IEventListener;
39  import fulmine.event.listener.MultiEventListener;
40  import fulmine.util.log.AsyncLog;
41  
42  /**
43   * A {@link SocketChannel} connection to a remote {@link IFrameworkContext}
44   * instance. This uses the Java NIO framework.
45   * 
46   * @author Ramon Servadei
47   */
48  public abstract class AbstractSocketChannelConnection extends
49      TcpConnectionParameters implements IConnection
50  {
51      /**
52       * Handles the {@link MessageConsumedEvent}s and decrements the count of
53       * messages that are awaiting processing. If
54       * {@link TcpConnection#destroyWhenMessagesProcessed} is set and the count
55       * of {@link TcpConnection#messagesToProcess} is zero, the handler will
56       * finish the destroy sequence of the connection.
57       * 
58       * @author Ramon Servadei
59       */
60      private final class MessageConsumedEventHandler extends
61          AbstractEventHandler<MessageConsumedEvent>
62      {
63          @Override
64          public void handle(MessageConsumedEvent event)
65          {
66              boolean destroy = false;
67              synchronized (AbstractSocketChannelConnection.this)
68              {
69                  AbstractSocketChannelConnection.this.messagesToProcess--;
70                  if (AbstractSocketChannelConnection.this.messagesToProcess == 0
71                      && AbstractSocketChannelConnection.this.destroyWhenMessagesProcessed)
72                  {
73                      destroy = true;
74                  }
75              }
76              if (destroy)
77              {
78                  // finish the destroy sequence
79                  AbstractSocketChannelConnection.this.doComponentDestroy();
80              }
81          }
82  
83          @Override
84          protected AsyncLog getLog()
85          {
86              return AbstractSocketChannelConnection.this.getLog();
87          }
88  
89          @Override
90          public String toString()
91          {
92              return getClass().getSimpleName();
93          }
94      }
95  
96      /** The socket channel to the remote {@link IConnectionBroker} */
97      private SocketChannel socketChannel;
98  
99      /** The delegate for the {@link IEventSource} operations of this */
100     protected IEventSource eventSourceDelegate;
101 
102     /**
103      * Flag to indicate the socket direction, by default connections are
104      * outbound
105      */
106     private boolean outbound = true;
107 
108     /**
109      * Set when the connection is established; this happens when both contexts
110      * know each others identity. The context making the outbound connection
111      * waits for the connection 'ack' from the other context. The context
112      * receiving the connection sets this when it receives the 'con' message
113      * from the initiating context.
114      */
115     private boolean connectionEstablished;
116 
117     /** The handler for {@link MessageConsumedEvent}s */
118     private final IEventListener messageConsumedEventHandler;
119 
120     /**
121      * A flag to inform the {@link MessageConsumedEventHandler} that the
122      * connection should be destroyed when all messages have been consumed.
123      * 
124      * @see #messagesToProcess
125      */
126     protected volatile boolean destroyWhenMessagesProcessed;
127 
128     /** The context to use for raising events */
129     private final IFrameworkContext context;
130 
131     /**
132      * Tracks the number of messages received and not yet processed. When the
133      * message is processed, a {@link MessageConsumedEvent} is raised that is
134      * intercepted by the {@link MessageConsumedEventHandler} which decrements
135      * this.
136      * <p>
137      * Access must be synchronized on the
138      * {@link AbstractSocketChannelConnection} instance.
139      */
140     protected int messagesToProcess;
141 
142     /**
143      * Standard constructor for the parameters of the connection.
144      * 
145      * @param context
146      *            the context this connection services
147      * @param identity
148      *            the connection identity
149      * @param address
150      *            the IP address or resolvable host name of the remote context
151      * @param port
152      *            the port for the socket connection to the remote context
153      * @param connectionHashCode
154      *            the hashcode of the connection to the remote context
155      * @throws IllegalStateException
156      *             if the socket could not be constructed
157      */
158     @SuppressWarnings("unchecked")
159     public AbstractSocketChannelConnection(IFrameworkContext context,
160         String identity, String address, int port, int connectionHashCode)
161     {
162         super(identity, connectionHashCode, address, port);
163         this.context = context;
164         this.messageConsumedEventHandler =
165             new MultiEventListener(
166                 MessageConsumedEventHandler.class.getSimpleName(),
167                 context,
168                 AbstractEventHandler.getEventHandlerMappings(this.new MessageConsumedEventHandler()));
169     }
170 
171     // no messages to process...
172     protected void doComponentDestroy()
173     {
174         // log the final part of the destroy sequence
175         if (getLog().isInfoEnabled())
176         {
177             getLog().info("Destroying[2] " + this + " (stacktrace follows)",
178                 new Exception());
179         }
180         if (this.eventSourceDelegate != null)
181         {
182             this.eventSourceDelegate.destroy();
183         }
184     }
185 
186     protected IFrameworkContext getContext()
187     {
188         return this.context;
189     }
190 
191     @Override
192     protected void doStart()
193     {
194         try
195         {
196             if (isOutbound())
197             {
198                 final SocketChannel localSocketChannel = SocketChannel.open();
199                 localSocketChannel.configureBlocking(false);
200                 // as the channel is in non-blocking mode, we need some extra
201                 // work to connect
202                 if (!localSocketChannel.connect(new InetSocketAddress(
203                     getRemoteHostAddress(), getRemoteHostTcpPort())))
204                 {
205                     // the connection has not completed, so wait on the selector
206                     // until we can complete the connection
207                     final Selector selector = Selector.open();
208                     final SelectionKey key =
209                         localSocketChannel.register(selector,
210                             SelectionKey.OP_CONNECT);
211                     selector.select();
212                     localSocketChannel.finishConnect();
213                     key.cancel();
214                     selector.close();
215                 }
216                 if (getLog().isDebugEnabled())
217                 {
218                     getLog().debug("(->) Connected to " + this);
219                 }
220                 setSocketChannel(localSocketChannel);
221             }
222             else
223             {
224                 if (getLog().isDebugEnabled())
225                 {
226                     getLog().debug("(<-) Connected to " + this);
227                 }
228             }
229         }
230         catch (Exception e)
231         {
232             this.socketChannel = null;
233             final String warning =
234                 "Could not create socket to " + getAddress() + COLON
235                     + getRemoteHostTcpPort() + ". Calling destroy().";
236             destroy();
237             throw new RuntimeException(warning, e);
238         }
239     }
240 
241     public boolean isConnected()
242     {
243         return this.socketChannel != null
244             && this.socketChannel.socket() != null
245             && this.socketChannel.socket().isConnected();
246     }
247 
248     /**
249      * Set the direction of the socket connection.
250      * 
251      * @param outbound
252      *            <code>true</code> for an outbound connecting socket (connects
253      *            to a server socket of a remote context), <code>false</code>
254      *            for an inbound connecting one
255      */
256     void setOutbound(boolean outbound)
257     {
258         this.outbound = outbound;
259     }
260 
261     /**
262      * Indicates the connecting direction.
263      * 
264      * @return <code>true</code> if this socket connects to a server socket,
265      *         <code>false</code> if it was an inbound connection from another
266      *         context
267      */
268     public boolean isOutbound()
269     {
270         return outbound;
271     }
272 
273     /**
274      * Destroy the internal TCP I/O components.
275      */
276     @Override
277     protected void doDestroy()
278     {
279         if (getLog().isInfoEnabled())
280         {
281             getLog().info("Destroying[1] " + this + " (stacktrace follows)",
282                 new Exception());
283         }
284         if (this.socketChannel != null)
285         {
286             try
287             {
288                 if (this.socketChannel.socket() != null)
289                 {
290                     this.socketChannel.socket().getOutputStream().flush();
291                     this.socketChannel.socket().shutdownInput();
292                     this.socketChannel.socket().shutdownOutput();
293                     this.socketChannel.socket().close();
294                 }
295                 this.socketChannel.close();
296             }
297             catch (IOException e)
298             {
299                 if (getLog().isDebugEnabled())
300                 {
301                     getLog().debug("Could not close " + this, e);
302                 }
303             }
304         }
305         synchronized (this)
306         {
307             if (this.messagesToProcess == 0)
308             {
309                 doComponentDestroy();
310             }
311         }
312         if (getContext() != null && getContext().isActive())
313         {
314             getContext().queueEvent(
315                 new ConnectionDestroyedEvent(getContext(),
316                     this.getRemoteContextIdentity()));
317         }
318         if (getLog().isInfoEnabled())
319         {
320             getLog().info("Destroyed " + this);
321         }
322     }
323 
324     public void addEvent(IEvent event)
325     {
326         this.eventSourceDelegate.addEvent(event);
327     }
328 
329     public boolean addListener(IEventListener listener)
330     {
331         return this.eventSourceDelegate.addListener(listener);
332     }
333 
334     public byte getEventSourceGroupId()
335     {
336         return this.eventSourceDelegate.getEventSourceGroupId();
337     }
338 
339     public List<IEventListener> getListeners()
340     {
341         return this.eventSourceDelegate.getListeners();
342     }
343 
344     public boolean removeListener(IEventListener listener)
345     {
346         return this.eventSourceDelegate.removeListener(listener);
347     }
348 
349     public List<IEventListener> removeListeners()
350     {
351         return this.eventSourceDelegate.removeListeners();
352     }
353 
354     public String toDetailedString()
355     {
356         return toString();
357     }
358 
359     public String toIdentityString()
360     {
361         return super.toString();
362     }
363 
364     protected SocketChannel getSocketChannel()
365     {
366         return this.socketChannel;
367     }
368 
369     /**
370      * Determine if the application level connection handshake has been
371      * established. When a transport connection is made, the context making the
372      * connection sends out a 'con' message which the other context responds to
373      * with an 'ack'. This sequence completes the application level handshake
374      * for the connection and exchanges details about each context.
375      * <p>
376      * Each context marks its end of the connection as established as per the
377      * following;
378      * <ul>
379      * <li>when the context making the outbound connection receives the 'ack'
380      * <li>when the context receiving the inbound connection receives the 'con'
381      * </ul>
382      * 
383      * @return <code>true</code> if the connection handshake has been completed
384      *         and the details about the remote context are known.
385      */
386     protected boolean isConnectionHandshakeComplete()
387     {
388         return this.connectionEstablished;
389     }
390 
391     /**
392      * This method is called when all relevant details of the connection have
393      * been established.
394      * 
395      * @see #isConnectionHandshakeComplete()
396      */
397     protected void connectionHandshakeComplete()
398     {
399         this.connectionEstablished = true;
400         this.eventSourceDelegate = new EventSource(toString());
401         this.eventSourceDelegate.start();
402         addListener(messageConsumedEventHandler);
403     }
404 
405     void setSocketChannel(SocketChannel socketChannel)
406     {
407         this.socketChannel = socketChannel;
408     }
409 
410     @Override
411     public String toString()
412     {
413         if (this.socketChannel != null)
414         {
415             return string(this, getIdentity() + COMMA_SPACE
416                 + this.socketChannel.socket() + ", remoteContextHashcode="
417                 + getRemoteContextHashCode());
418         }
419         return super.toString();
420     }
421 }