View Javadoc

1   package fulmine.distribution.connection.tcp;
2   
3   import static fulmine.distribution.connection.tcp.ProtocolMessageConstants.CONNECTION_MSG_TYPE;
4   import static fulmine.distribution.connection.tcp.ProtocolMessageConstants.CONN_ACK_MSG_TYPE;
5   import static fulmine.distribution.connection.tcp.ProtocolMessageConstants.DELIMITER;
6   import static fulmine.util.Utils.logException;
7   
8   import java.io.IOException;
9   import java.nio.ByteBuffer;
10  import java.nio.channels.SelectionKey;
11  import java.nio.channels.SocketChannel;
12  import java.util.Arrays;
13  import java.util.Iterator;
14  import java.util.List;
15  
16  import fulmine.context.IFrameworkContext;
17  import fulmine.distribution.connection.IConnection;
18  import fulmine.distribution.events.ConnectionAvailableEvent;
19  import fulmine.distribution.events.ConnectionDestroyedEvent;
20  import fulmine.distribution.events.MessageConsumedEvent;
21  import fulmine.distribution.events.MessageEvent;
22  import fulmine.event.IEventSource;
23  import fulmine.event.listener.IEventListener;
24  import fulmine.protocol.specification.ByteConstants;
25  import fulmine.protocol.specification.ByteReader;
26  import fulmine.protocol.specification.ByteWriter;
27  import fulmine.util.array.ArrayUtils;
28  import fulmine.util.collection.CollectionFactory;
29  import fulmine.util.io.DelegatingSelectionKeyTask;
30  import fulmine.util.io.ISelectionKeyTask;
31  import fulmine.util.log.AsyncLog;
32  
33  /**
34   * Handles the socket processing for a single connection to a remote
35   * {@link IFrameworkContext}.
36   * <p>
37   * Received TCP messages from the remote context may be concatenated together;
38   * e.g. the remote context sends "conn:lasers" then "syn". These may be received
39   * by this connection as a single message "conn:laserssyn". To be able to
40   * distinguish these messages, each message is prefixed with the byte length of
41   * the application message. This prefix is always 4 bytes, big-endian; e.g. the
42   * above example would resolve to "11conn:lasers3syn".
43   * <p>
44   * This generates the following events:
45   * <ul>
46   * <li>{@link MessageEvent}
47   * <li>{@link ConnectionAvailableEvent}
48   * <li>{@link ConnectionDestroyedEvent}
49   * </ul>
50   * <p>
51   * When this object is destroyed via the {@link #destroy()} method, this only
52   * closes down the TCP IO resources and signals that only when all pending
53   * {@link MessageEvent}s have been processed should this object's
54   * {@link IEventSource} functions be terminated; this will then remove any
55   * {@link IEventListener}s that would normally receive messages from this
56   * connection.
57   * 
58   * @author Ramon Servadei
59   */
60  public final class TcpConnection extends AbstractSocketChannelConnection
61      implements IEventSource, IConnection, ISelectionKeyTask
62  {
63      private final static AsyncLog LOG = new AsyncLog(TcpConnection.class);
64  
65      /**
66       * The byte representation of the {@link CONNECTION_MSG_TYPE}
67       */
68      private final static byte[] CONNECTION_MSG_TYPE_BYTES =
69          ByteWriter.getBytes(CONNECTION_MSG_TYPE);
70  
71      /**
72       * The byte representation of the {@link CONN_ACK_MSG_TYPE}
73       */
74      private final static byte[] CONN_ACK_MSG_TYPE_BYTES =
75          ByteWriter.getBytes(CONN_ACK_MSG_TYPE);
76  
77      /** Delegate {@link ISelectionKeyTask} */
78      private DelegatingSelectionKeyTask selectionKeyTask;
79  
80      /** The pending messages to send */
81      final List<ByteBuffer> messages = CollectionFactory.newList();
82  
83      /** The receive buffer, 65535 bytes */
84      private final static byte[] recvBuffer = new byte[65535];
85  
86      /** The TCP connection broker */
87      private final TcpConnectionBroker broker;
88  
89      /**
90       * Standard constructor
91       * 
92       * @param broker
93       *            the {@link TcpConnectionBroker}
94       * @param remoteContextIdentity
95       *            the identity of the remote context this connection connects to
96       * @param address
97       *            the TCP address of the server port the context exposes to
98       *            other remote contexts
99       * @param port
100      *            the TCP server port the context exposes to other remote
101      *            contexts
102      * @param connectionHashCode
103      *            the hashcode of the connection to the remote context
104      */
105     public TcpConnection(IFrameworkContext context, TcpConnectionBroker broker,
106         String remoteContextIdentity, String address, int port,
107         int connectionHashCode)
108     {
109         super(context, remoteContextIdentity, address, port, connectionHashCode);
110         this.broker = broker;
111         this.selectionKeyTask = new DelegatingSelectionKeyTask(this);
112     }
113 
114     /**
115      * Add the data to the pending messages list
116      * 
117      * @param data
118      *            the data to add to the pending messages
119      */
120     public void send(byte[] data)
121     {
122         synchronized (this)
123         {
124             byte[] actual = encode(data);
125 
126             final ByteBuffer wrapped = ByteBuffer.wrap(actual);
127             messages.add(wrapped);
128             // we're interested in writing now
129             getSelectionKey().interestOps(
130                 getSelectionKey().interestOps() | SelectionKey.OP_WRITE);
131             getSelectionKey().selector().wakeup();
132         }
133     }
134 
135     /**
136      * Add a 4 byte prefix to the data that includes the length of the data as a
137      * big-endian integer.
138      * 
139      * @param data
140      *            the data
141      * @return an array equal in length to <code>4 + data.length</code> with a
142      *         copy of data starting from element 4
143      */
144     static byte[] encode(byte[] data)
145     {
146         // add the integer length of the data, always the first 4 bytes
147         byte[] actual = new byte[data.length + 4];
148         System.arraycopy(data, 0, actual, 4, data.length);
149         int startFrom = 0;
150         for (int i = 4; i > 0; i--)
151         {
152             actual[startFrom++] =
153                 (byte) (data.length >>> ByteConstants.bitShiftForByteOrdinal[i]);
154         }
155         return actual;
156     }
157 
158     /**
159      * Split the data into its sub-messages. Messages have a 4 byte prefix that
160      * is the message length in bytes, excluding the 4 bytes, as a big-endian
161      * integer. This allows multiple messages to be received as a single TCP
162      * frame.
163      * 
164      * @param message
165      *            the full TCP frame data
166      * @return a list of the individual messages contained in the message
167      *         argument
168      */
169     static List<byte[]> decode(byte[] message)
170     {
171         // the TCP message could be a concatenation of multiple ones
172         // the format is: <4 bytes len><subdata><4 bytes len><subdata>
173         List<byte[]> values = CollectionFactory.newList();
174         int lenPtr = 0;
175         int len = 0;
176         while (lenPtr < message.length)
177         {
178             len = ByteReader.readInteger(message, lenPtr, 4);
179             // copy the sub data into the new array
180             byte[] subData = new byte[len];
181             lenPtr += 4;
182             System.arraycopy(message, lenPtr, subData, 0, len);
183             lenPtr += len;
184             values.add(subData);
185         }
186         return values;
187     }
188 
189     public void run()
190     {
191         if (isActive())
192         {
193             if (getSelectionKey().isWritable())
194             {
195                 write();
196             }
197             if (getSelectionKey().isReadable())
198             {
199                 read();
200             }
201         }
202     }
203 
204     /**
205      * Reads data from the socket
206      */
207     private void read()
208     {
209         try
210         {
211             final ByteBuffer recv = ByteBuffer.wrap(recvBuffer);
212             final SocketChannel socketChannel =
213                 ((SocketChannel) getSelectionKey().channel());
214             final int size = socketChannel.read(recv);
215             if (size == -1)
216             {
217                 /*
218                  * end of stream reached (see
219                  * ReadableByteChannel.read(ByteBuffer dst)
220                  */
221                 throw new IOException("end-of-stream");
222             }
223             final byte[] bufferData = new byte[recv.position()];
224             System.arraycopy(recv.array(), 0, bufferData, 0, recv.position());
225             // find any sub-messages
226             final List<byte[]> decoded = decode(bufferData);
227             for (byte[] byteData : decoded)
228             {
229                 if (getLog().isTraceEnabled())
230                 {
231                     final String stringData =
232                         new String(byteData, ByteConstants.ENCODING).trim();
233                     getLog().trace(
234                         this + " received '"
235                             + Arrays.toString(stringData.getBytes()) + "'");
236                 }
237                 if (!isConnectionHandshakeComplete())
238                 {
239                     if (ArrayUtils.startsWith(CONNECTION_MSG_TYPE_BYTES,
240                         byteData))
241                     {
242                         final String data =
243                             new String(byteData, ByteConstants.ENCODING).trim();
244                         // this is where we find out the identity of the
245                         // connecting remote context
246                         final String[] fields = data.split(DELIMITER);
247                         setIdentity(fields[1]);
248                         setRemoteContextHashCode(Integer.parseInt(fields[2]));
249                         connectionHandshakeComplete();
250                         // send the ack
251                         send(ByteWriter.getBytes(CONN_ACK_MSG_TYPE + DELIMITER
252                             + getContext().getIdentity() + DELIMITER
253                             + getContext().getContextHashCode()));
254 
255                         // notify with the new connection
256                         getContext().queueEvent(
257                             new ConnectionAvailableEvent(getContext(), this));
258                     }
259                     else
260                     {
261                         if (ArrayUtils.startsWith(CONN_ACK_MSG_TYPE_BYTES,
262                             byteData))
263                         {
264                             /*
265                              * retrieve the identity and remote context hashcode
266                              * - if we reconnect to a remote context that has
267                              * bounced, it will have a new hashcode but this
268                              * local context may not have received the new
269                              * connection details with the new hashcode. To
270                              * prevent the situation where the newly made
271                              * connection will be severed because the hashcodes
272                              * would be different, we always set the connections
273                              * remote context hashcode from the 'ack' message.
274                              */
275                             final String data =
276                                 new String(byteData, ByteConstants.ENCODING).trim();
277                             final String[] fields = data.split(DELIMITER);
278                             setIdentity(fields[1]);
279                             setRemoteContextHashCode(Integer.parseInt(fields[2]));
280                             connectionHandshakeComplete();
281                             // notify with the new connection
282                             getContext().queueEvent(
283                                 new ConnectionAvailableEvent(getContext(), this));
284                         }
285                         else
286                         {
287                             if (getLog().isWarnEnabled())
288                             {
289                                 getLog().warn(
290                                     "Connection not properly established"
291                                         + " so could not process message '"
292                                         + new String(byteData,
293                                             ByteConstants.ENCODING).trim()
294                                         + "'");
295                             }
296                             // undetermined state, so shutdown
297                             destroy();
298                         }
299                     }
300                 }
301                 else
302                 {
303                     final MessageEvent messageEvent =
304                         new MessageEvent(this, byteData);
305                     messageEvent.setTriggerEvent(new MessageConsumedEvent(this,
306                         messageEvent));
307                     synchronized (this)
308                     {
309                         this.messagesToProcess++;
310                     }
311                     getContext().queueEvent(messageEvent);
312                 }
313             }
314         }
315         catch (IOException e)
316         {
317             logException(getLog(), "Destroying " + this
318                 + " because data could not be read from "
319                 + getSelectionKey().channel(), e);
320             this.destroyWhenMessagesProcessed = true;
321             destroy();
322         }
323     }
324 
325     /**
326      * Writes any data in the message buffer to the socket
327      */
328     private void write()
329     {
330         List<ByteBuffer> copy = CollectionFactory.newList(this.messages.size());
331         synchronized (this)
332         {
333             copy.addAll(this.messages);
334             this.messages.clear();
335         }
336         for (Iterator<ByteBuffer> iterator = copy.iterator(); iterator.hasNext();)
337         {
338             final ByteBuffer data = iterator.next();
339             try
340             {
341                 if (getLog().isTraceEnabled())
342                 {
343                     getLog().trace(
344                         this + " sending '" + Arrays.toString(data.array())
345                             + "'");
346                 }
347                 ((SocketChannel) getSelectionKey().channel()).write(data);
348             }
349             catch (IOException e)
350             {
351                 logException(getLog(), this + " could not write data '"
352                     + new String(data.array()) + "'", e);
353             }
354         }
355         synchronized (this)
356         {
357             if (this.messages.size() == 0)
358             {
359                 // nothing more to write, so remove write interest
360                 getSelectionKey().interestOps(
361                     getSelectionKey().interestOps() ^ SelectionKey.OP_WRITE);
362             }
363         }
364     }
365 
366     @Override
367     protected void doDestroy()
368     {
369         super.doDestroy();
370         try
371         {
372             if (getSelectionKey() != null
373                 && getSelectionKey().channel() != null)
374             {
375                 getSelectionKey().channel().close();
376             }
377         }
378         catch (Exception e)
379         {
380             logException(getLog(), "Could not close channel for " + this, e);
381         }
382     }
383 
384     public SelectionKey getSelectionKey()
385     {
386         return selectionKeyTask.getSelectionKey();
387     }
388 
389     public void setSelectionKey(SelectionKey selectionKey)
390     {
391         selectionKeyTask.setSelectionKey(selectionKey);
392     }
393 
394     protected AsyncLog getLog()
395     {
396         return LOG;
397     }
398 
399     @Override
400     protected void doComponentDestroy()
401     {
402         super.doComponentDestroy();
403         this.broker.getSelectorTasks().unregister(
404             this.selectionKeyTask.getSelectionKey());
405     }
406 }