View Javadoc

1   /*
2      Copyright 2007 Ramon Servadei
3   
4      Licensed under the Apache License, Version 2.0 (the "License");
5      you may not use this file except in compliance with the License.
6      You may obtain a copy of the License at
7   
8          http://www.apache.org/licenses/LICENSE-2.0
9   
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15   */
16  package fulmine.distribution.channel;
17  
18  import static fulmine.util.Utils.CLOSE_BRACE;
19  import static fulmine.util.Utils.CLOSE_CURLY;
20  import static fulmine.util.Utils.COMMA_SPACE;
21  import static fulmine.util.Utils.OPEN_BRACE;
22  import static fulmine.util.Utils.OPEN_CURLY;
23  import static fulmine.util.Utils.SPACE;
24  import static fulmine.util.Utils.logException;
25  import static fulmine.util.Utils.nullCheck;
26  import static fulmine.util.Utils.safeToString;
27  
28  import java.util.Arrays;
29  import java.util.Collection;
30  import java.util.List;
31  import java.util.Map;
32  
33  import fulmine.AbstractLifeCycle;
34  import fulmine.Domain;
35  import fulmine.IAddressable;
36  import fulmine.IDomain;
37  import fulmine.IType;
38  import fulmine.Type;
39  import fulmine.context.IFrameworkContext;
40  import fulmine.context.FulmineContext.SystemInfoFields;
41  import fulmine.distribution.connection.IConnection;
42  import fulmine.distribution.events.MessageEvent;
43  import fulmine.event.IEvent;
44  import fulmine.event.IEventSource;
45  import fulmine.event.listener.AbstractEventHandler;
46  import fulmine.event.listener.IEventListener;
47  import fulmine.event.listener.ILifeCycleEventListener;
48  import fulmine.event.listener.IPriorityEventListener;
49  import fulmine.event.subscription.ISubscriptionManager;
50  import fulmine.event.subscription.ISubscriptionParameters;
51  import fulmine.event.subscription.SubscriptionParameters;
52  import fulmine.event.system.ISystemEventListener;
53  import fulmine.event.system.RxEvent;
54  import fulmine.event.system.TxEvent;
55  import fulmine.model.container.IContainer;
56  import fulmine.model.container.IContainer.DataState;
57  import fulmine.model.container.events.ContainerDestroyedEvent;
58  import fulmine.model.container.events.ContainerStateChangeEvent;
59  import fulmine.model.container.events.RemoteContainerCreatedEvent;
60  import fulmine.model.container.subscription.ContainerSubscriptionManager;
61  import fulmine.model.container.subscription.remote.RemoteContainerSubscriptionManager;
62  import fulmine.model.container.subscription.remote.RxSubscription;
63  import fulmine.model.container.subscription.remote.RxSubscriptionFactory;
64  import fulmine.model.container.subscription.remote.TxSubscription;
65  import fulmine.model.container.subscription.remote.TxSubscriptionFactory;
66  import fulmine.model.field.LongField;
67  import fulmine.protocol.specification.ByteConstants;
68  import fulmine.protocol.specification.ByteWriter;
69  import fulmine.protocol.specification.IFrameReader;
70  import fulmine.rpc.events.RpcInvokeEvent;
71  import fulmine.util.array.ArrayUtils;
72  import fulmine.util.collection.CollectionFactory;
73  import fulmine.util.collection.CollectionUtils;
74  import fulmine.util.log.AsyncLog;
75  
76  /**
77   * The default channel implementation. This is thread safe. This implementation
78   * is composed of multiple collaborator objects to handle:
79   * <ul>
80   * <li>subscriptions for remote containers
81   * <li>subscriptions for local containers from the remote peer channel
82   * <li>messages coming from the remote peer channel - see {@link IChannel} for a
83   * description of the possible messages
84   * </ul>
85   * <p>
86   * <h2>Subscription handling</h2>
87   * All subscribe and unsubscribe methods are handled by two collaborating
88   * {@link ISubscriptionManager} objects; a 'transmit' and 'receive' instance. A
89   * channel passes all remote container subscription requests to its remote peer.
90   * The remote channel hands this subscription request to its 'transmit'
91   * subscription manager. This 'transmit' subscription manager handles
92   * registering for {@link TxEvent}s generated by the subscribed container(s);
93   * the events are then sent to the peer channel. It also handles situations
94   * where the container(s) may not exist yet. The local channel hands the
95   * subscription request to its 'receive' subscription manager which handles
96   * adding the subscription listener to the remote container. The 'receive'
97   * subscription manager also handles situations where the remote container does
98   * not exist yet; it listens for {@link RemoteContainerCreatedEvent}.
99   * <p>
100  * When a channel connects to another channel, they exchange a sequence of
101  * messages to synchronise readiness with each other. A channel registers for
102  * the following system events:
103  * <ul>
104  * <li>{@link ContainerDestroyedEvent}
105  * </ul>
106  * When a channel is notified with this event it transmits it to the remote
107  * channel (if there is a subscription) and this allows remote containers to be
108  * marked as {@link DataState#STALE} and then destroyed.
109  * <p>
110  * The diagram below illustrates the subscription mechanics.
111  * 
112  * <pre>
113  *          context       local_channel          remote_channel
114  *             |               |                    |
115  *             |               |                    |&lt;---- subscribe for
116  *             |               |                    |      remote container A
117  *             |               |&lt;--- subscribe -----|
118  *             |               |                    |
119  *             |               |                    |
120  *             |               |                    |
121  *             |               |                    |
122  * container A |               |                    |
123  *  created --&gt;|               |                    |
124  *             |-- TxEvent ---&gt;|                    |
125  *             | container A   |----- TxEvent -----&gt;|
126  *             |               |                    |----&gt; create remote 
127  *             |               |                    |      container A
128  * </pre>
129  * 
130  * When a channel is destroyed, it sends a {@link #MSG_DESTROY_CONNECTION} to
131  * the other channel so the other channel can destroy the connection. This
132  * allows messages sent by this channel to be processed by the other end and
133  * only after the messages have been processed will the connection be severed.
134  * 
135  * @author Ramon Servadei
136  */
137 public class Channel extends AbstractLifeCycle implements IChannel,
138     IChannelOperations
139 {
140     final static AsyncLog LOG = new AsyncLog(Channel.class);
141 
142     /** The shared state for the channel */
143     private final IChannelState state;
144 
145     /**
146      * The standard length in bytes for the header portion of the messages
147      * exchanged.
148      */
149     static final int HEADER_LEN =
150         ByteWriter.getBytes(IChannel.MSG_SUBSCRIBE).length;
151 
152     /** The byte[] for {@link IChannel#MSG_SUBSCRIBE} */
153     static final byte[] MSG_SUBSCRIBE_BYTES =
154         ByteWriter.getBytes(IChannel.MSG_SUBSCRIBE);
155 
156     /** The byte[] for {@link IChannel#MSG_UNSUBSCRIBE} */
157     static final byte[] MSG_UNSUBSCRIBE_BYTES =
158         ByteWriter.getBytes(IChannel.MSG_UNSUBSCRIBE);
159 
160     /** The byte[] for {@link IChannel#MSG_DATA} */
161     static final byte[] MSG_DATA_BYTES = ByteWriter.getBytes(IChannel.MSG_DATA);
162 
163     /** The byte[] for {@link IChannel#MSG_CONTAINER_DESTROYED} */
164     static final byte[] MSG_CONTAINER_DESTROYED_BYTES =
165         ByteWriter.getBytes(IChannel.MSG_CONTAINER_DESTROYED);
166 
167     /** The byte[] for {@link IChannel#MSG_SYN} */
168     static final byte[] MSG_SYN_BYTES = ByteWriter.getBytes(IChannel.MSG_SYN);
169 
170     /** The byte[] for {@link IChannel#MSG_SYN_ACK} */
171     static final byte[] MSG_SYN_ACK_BYTES =
172         ByteWriter.getBytes(IChannel.MSG_SYN_ACK);
173 
174     /** The byte[] for {@link IChannel#MSG_RETRANSMIT} */
175     static final byte[] MSG_RETRANSMIT_BYTES =
176         ByteWriter.getBytes(IChannel.MSG_RETRANSMIT);
177 
178     /** The byte[] for {@link IChannel#MSG_RETRANSMIT_ALL} */
179     static final byte[] MSG_RETRANSMIT_ALL_BYTES =
180         ByteWriter.getBytes(IChannel.MSG_RETRANSMIT_ALL);
181 
182     /** The byte[] for {@link IChannel#MSG_DESTROY_CONNECTION} */
183     static final byte[] MSG_DESTROY_CONNECTION_BYTES =
184         ByteWriter.getBytes(IChannel.MSG_DESTROY_CONNECTION);
185 
186     /** The byte[] for {@link IChannel#MSG_INVOKE_RPC} */
187     static final byte[] MSG_INVOKE_RPC_BYTES =
188         ByteWriter.getBytes(IChannel.MSG_INVOKE_RPC);
189 
190     /**
191      * Utility method to create a {@link IChannel#MSG_DATA} message using the
192      * data
193      * 
194      * @param data
195      *            that data for the data message body
196      * @return the complete data message structure to send to a peer channel
197      */
198     final static byte[] createDataMessage(byte[] data)
199     {
200         // prepend the update message header to the data
201         final int headerLen = Channel.MSG_DATA_BYTES.length;
202         byte[] message = new byte[data.length + headerLen];
203         System.arraycopy(data, 0, message, headerLen, data.length);
204         System.arraycopy(Channel.MSG_DATA_BYTES, 0, message, 0, headerLen);
205         return message;
206     }
207 
208     /**
209      * Construct the channel
210      * 
211      * @param connection
212      *            the connection to the remote context
213      * @param context
214      *            the local context for the channel
215      */
216     public Channel(IConnection connection, IFrameworkContext context)
217     {
218         /*
219          * NOTE: TxEvents and RxEvents are bound to containers, not the event
220          * type (unlike the container created/destroyed events). TxEvent and
221          * RxEvents are directed to the ChannelState's eventListener by code in
222          * the channel.
223          */
224         this(new ChannelState(context, connection));
225     }
226 
227     /**
228      * Internally chained constructor with the state to use
229      * 
230      * @param state
231      *            the state for the channel
232      */
233     protected Channel(IChannelState state)
234     {
235         super();
236         this.state = state;
237     }
238 
239     public final boolean subscribe(ISubscriptionParameters parameters)
240     {
241         checkActive();
242         if (getLog().isDebugEnabled())
243         {
244             getLog().debug(this + "Subscribe for " + safeToString(parameters));
245         }
246         final boolean subscribed =
247             getState().getRxSubscriptionManager().subscribe(parameters);
248         // always try to re-subscribe - all these ops are idempotent anyway
249         doSubscribeOperation(parameters);
250         send(ByteWriter.getBytes(Channel.MSG_SUBSCRIBE
251             + parameters.getType().value() + Channel.DELIMITER
252             + parameters.getDomain().value() + Channel.DELIMITER
253             + parameters.getIdentity()));
254         return subscribed;
255     }
256 
257     /**
258      * Perform any operation required when a subscription occurs.
259      * 
260      * @param parameters
261      *            the parameters identifying the subscription
262      */
263     protected void doSubscribeOperation(ISubscriptionParameters parameters)
264     {
265         // direct any RxEvents to the ChannelTransmissionListener
266         getState().getRxSubscriptionManager().addListener(parameters,
267             getState().getEventHandler());
268     }
269 
270     /**
271      * if there are no requests from the remote context, we can destroy the
272      * channel.
273      */
274     private void checkIfConnectionCanBeClosed()
275     {
276         if (getState().getTxSubscriptionManager().getSubscribedSources().size() == 0
277             && getState().getRxSubscriptionManager().getSubscribedSources().size() == 0)
278         {
279             if (getLog().isInfoEnabled())
280             {
281                 getLog().info(
282                     "No RX or TX subscriptions so destroying "
283                         + safeToString(this));
284             }
285             destroy();
286         }
287     }
288 
289     public final boolean unsubscribe(ISubscriptionParameters parameters)
290     {
291         checkActive();
292         // this is only called by the distribution manager when there are no
293         // listeners registered against the subscription parameters when
294         // processing an unsubscription
295         if (getLog().isDebugEnabled())
296         {
297             getLog().debug(this + "Unsubscribe for " + safeToString(parameters));
298         }
299         final boolean unsubscribed =
300             getState().getRxSubscriptionManager().unsubscribe(parameters);
301         doUnsubscribeOperation(parameters);
302         // always send the unsubscribe to the remote context, regardless
303         send(ByteWriter.getBytes(Channel.MSG_UNSUBSCRIBE
304             + parameters.getType().value() + Channel.DELIMITER
305             + parameters.getDomain().value() + Channel.DELIMITER
306             + parameters.getIdentity()));
307         checkIfConnectionCanBeClosed();
308         return unsubscribed;
309     }
310 
311     /**
312      * Perform any operation required when an unsubscription occurs.
313      * 
314      * @param parameters
315      *            the parameters identifying subscription
316      */
317     protected void doUnsubscribeOperation(ISubscriptionParameters parameters)
318     {
319         // noop
320     }
321 
322     public final void send(byte[] bytes)
323     {
324         checkActive();
325         getState().getConnection().send(bytes);
326     }
327 
328     @Override
329     protected AsyncLog getLog()
330     {
331         return LOG;
332     }
333 
334     @Override
335     protected void doStart()
336     {
337         getState().init(this, this);
338         getState().start();
339         if (getLog().isDebugEnabled())
340         {
341             getLog().debug(this + " sending SYN");
342         }
343         send(Channel.MSG_SYN_BYTES);
344     }
345 
346     @Override
347     protected final void doDestroy()
348     {
349         if (getLog().isInfoEnabled())
350         {
351             getLog().info("Destroying " + this + " (stacktrace follows)",
352                 new Exception());
353         }
354         // process the remote subscriptions
355         final Collection<? extends IEventSource> rxSubscribedSources =
356             getState().getRxSubscriptionManager().getSubscribedSources();
357         if (getLog().isTraceEnabled())
358         {
359             getLog().trace(
360                 this
361                     + "[channel destroy] RX subscribed sources to process are "
362                     + CollectionUtils.toFormattedString(rxSubscribedSources));
363         }
364         final int size = rxSubscribedSources.size();
365         int count = 0;
366         for (IEventSource eventSource : rxSubscribedSources)
367         {
368             try
369             {
370                 doChannelDestroyedOperation(eventSource, ++count == size);
371             }
372             catch (Exception e)
373             {
374                 logException(getLog(), eventSource, e);
375             }
376         }
377         // stop receiving any messages from the remote context (via the
378         // connection)
379         getState().getConnection().removeListener(getState().getEventHandler());
380         // all TX subscriptions are handled by the TxManager destroy
381         getState().destroy();
382     }
383 
384     /**
385      * Perform the operation required on the remote {@link IEventSource} that
386      * originated from the remote context this channel connects to, when the
387      * channel is destroyed.
388      * 
389      * @param eventSource
390      *            a remote event source that is associated with the remote
391      *            context this channel connects to
392      * @param last
393      *            signals if this is the last event source
394      */
395     protected void doChannelDestroyedOperation(IEventSource eventSource,
396         boolean last)
397     {
398         eventSource.destroy();
399     }
400 
401     public final String getRemoteContextIdentity()
402     {
403         return getState().getConnection().getRemoteContextIdentity();
404     }
405 
406     public final String toString()
407     {
408         return getToString(getConnection());
409     }
410 
411     public final boolean isConnectionSyn()
412     {
413         return getState().isConnectionSyn();
414     }
415 
416     public final void setConnectionSyn()
417     {
418         getState().setConnectionSyn();
419     }
420 
421     public final void requestRetransmit(String identityRegularExpression,
422         IType type, IDomain domain)
423     {
424         checkActive();
425         try
426         {
427             send(ByteWriter.getBytes(Channel.MSG_RETRANSMIT + type.value()
428                 + Channel.DELIMITER + domain.value() + Channel.DELIMITER
429                 + identityRegularExpression));
430         }
431         catch (Exception e)
432         {
433             logException(getLog(), this + " idregex="
434                 + identityRegularExpression + COMMA_SPACE + type + COMMA_SPACE
435                 + domain, e);
436         }
437     }
438 
439     public final void requestRetransmitAll()
440     {
441         checkActive();
442         send(Channel.MSG_RETRANSMIT_ALL_BYTES);
443     }
444 
445     public final void retransmitAll()
446     {
447         checkActive();
448         if (getLog().isDebugEnabled())
449         {
450             getLog().debug(this + " retransmitting all subscribed containers");
451         }
452         final Collection<? extends IEventSource> subscribedContainers =
453             getState().getTxSubscriptionManager().getSubscribedSources();
454         doRetransmit(subscribedContainers);
455     }
456 
457     public final void retransmit(String identityRegularExpression, IType type,
458         IDomain domain)
459     {
460         checkActive();
461         if (getLog().isDebugEnabled())
462         {
463             getLog().debug(
464                 this + " retransmitting containers for identityPattern='"
465                     + identityRegularExpression + "'" + ", type=" + type
466                     + ", domain=" + domain);
467         }
468         final Collection<IEventSource> subscribedContainers =
469             getState().getTxSubscriptionManager().getSubscribedSources(
470                 new SubscriptionParameters(identityRegularExpression, type,
471                     domain));
472         if (subscribedContainers.size() == 0)
473         {
474             // try a single container
475             if (getState().getContext().containsLocalContainer(
476                 identityRegularExpression, type, domain))
477             {
478                 subscribedContainers.add(getState().getContext().getLocalContainer(
479                     identityRegularExpression, type, domain));
480             }
481         }
482         doRetransmit(subscribedContainers);
483     }
484 
485     void doRetransmit(
486         final Collection<? extends IEventSource> subscribedContainers)
487     {
488         getState().getContext().execute(new Runnable()
489         {
490             public void run()
491             {
492                 final long start = System.nanoTime();
493                 for (IEventSource container : subscribedContainers)
494                 {
495                     try
496                     {
497                         // Note: this may block if there is a frame waiting and
498                         // the container is bound to a different event
499                         // processor.
500                         send(Channel.createDataMessage(Channel.this.getState().getContext().getFrameWriter().writeComplete(
501                             (IContainer) container)));
502                     }
503                     catch (Exception e)
504                     {
505                         logException(getLog(), container, e);
506                     }
507                 }
508                 final long sendTime = (System.nanoTime() - start);
509                 if (Channel.LOG.isDebugEnabled())
510                 {
511                     Channel.LOG.debug("Time to retransmit "
512                         + subscribedContainers.size() + " containers: "
513                         + sendTime + " nanoseconds");
514                 }
515             }
516         });
517     }
518 
519     public final IConnection getConnection()
520     {
521         return getState().getConnection();
522     }
523 
524     private String getToString(IConnection connection)
525     {
526         return getClass().getSimpleName() + SPACE + OPEN_BRACE
527             + (isActive() ? "active" : "inactive") + CLOSE_BRACE + SPACE
528             + OPEN_CURLY + connection + OPEN_BRACE
529             + connection.getRemoteContextHashCode() + CLOSE_BRACE + CLOSE_CURLY
530             + SPACE;
531     }
532 
533     /** Holds the shared state of the channel */
534     IChannelState getState()
535     {
536         return this.state;
537     }
538 
539     public final boolean addListener(ISubscriptionParameters parameters,
540         IEventListener listener)
541     {
542         return getState().getRxSubscriptionManager().addListener(parameters,
543             listener);
544     }
545 
546     public final void eventSourceCreated(IAddressable identity)
547     {
548         getState().getRxSubscriptionManager().eventSourceCreated(identity);
549     }
550 
551     public final void eventSourceDestroyed(IAddressable identity)
552     {
553         getState().getRxSubscriptionManager().eventSourceDestroyed(identity);
554     }
555 
556     public final Collection<IEventSource> getSubscribedSources()
557     {
558         return getState().getRxSubscriptionManager().getSubscribedSources();
559     }
560 
561     public final Collection<IEventSource> getSubscribedSources(
562         ISubscriptionParameters parameters)
563     {
564         return getState().getRxSubscriptionManager().getSubscribedSources(
565             parameters);
566     }
567 
568     public final boolean isSubscribed(IEventSource source)
569     {
570         return getState().getRxSubscriptionManager().isSubscribed(source);
571     }
572 
573     public final boolean removeListener(ISubscriptionParameters parameters,
574         IEventListener listener)
575     {
576         return getState().getRxSubscriptionManager().removeListener(parameters,
577             listener);
578     }
579 
580     public final boolean includes(IAddressable parameters)
581     {
582         return getState().getRxSubscriptionManager().includes(parameters);
583     }
584 
585     public List<IEventListener> getListeners(ISubscriptionParameters parameters)
586     {
587         return getState().getRxSubscriptionManager().getListeners(parameters);
588     }
589 
590     public final byte[] getContainerDestroyedMessage(
591         ContainerDestroyedEvent event)
592     {
593         return ByteWriter.getBytes(Channel.MSG_CONTAINER_DESTROYED
594             + event.getType().value() + Channel.DELIMITER
595             + event.getDomain().value() + Channel.DELIMITER
596             + event.getIdentity());
597     }
598 
599     public void invokeRpc(String remoteContextIdentity, byte[] rpcData)
600     {
601         send(ArrayUtils.merge(ByteWriter.getBytes(Channel.MSG_INVOKE_RPC),
602             rpcData));
603     }
604 }
605 
606 /**
607  * The internal state for a {@link Channel}.
608  * 
609  * @author Ramon Servadei
610  * 
611  */
612 class ChannelState extends AbstractLifeCycle implements IChannelState
613 {
614     private final static AsyncLog LOG = new AsyncLog(ChannelState.class);
615 
616     private final IFrameworkContext context;
617 
618     /**
619      * The subscription manager for {@link TxSubscription} instances. This
620      * manager handles subscriptions for transmitting local containers to the
621      * remote context; these are subscriptions from the remote context for local
622      * containers.
623      */
624     private final ISubscriptionManager txSubscriptionManager;
625 
626     /**
627      * The subscription manager for {@link RxSubscription} instances. This
628      * manager handles subscriptions for receiving remote containers from remote
629      * contexts; these are subscriptions from the local context for remote
630      * containers.
631      */
632     private final ISubscriptionManager rxSubscriptionManager;
633 
634     private boolean connectionSyn;
635 
636     private final IConnection connection;
637 
638     private ILifeCycleEventListener eventHandler;
639 
640     /** Fields for tracking TX and RX counts */
641     private final LongField txCount, rxCount;
642 
643     /**
644      * Holds the RxEvent listener token against the application subscription
645      * token
646      */
647     private final Map<Integer, Integer> tokenMap;
648 
649     ChannelState(IFrameworkContext context, IConnection connection)
650     {
651         super();
652         this.context = context;
653         this.connection = connection;
654         // this is not a remote subscription manager because it transmits local
655         // containers to the remote channel
656         this.txSubscriptionManager =
657             new ContainerSubscriptionManager(context,
658                 new TxSubscriptionFactory());
659         this.rxSubscriptionManager =
660             new RemoteContainerSubscriptionManager(context,
661                 connection.getRemoteContextIdentity(),
662                 new RxSubscriptionFactory());
663         this.tokenMap = CollectionFactory.newMap();
664         this.txCount =
665             new LongField(getConnection().getRemoteContextIdentity() + ":"
666                 + SystemInfoFields.TX_COUNT);
667         this.rxCount =
668             new LongField(getConnection().getRemoteContextIdentity() + ":"
669                 + SystemInfoFields.RX_COUNT);
670         getContext().getSystemInfo().add(ChannelState.this.txCount);
671         getContext().getSystemInfo().add(ChannelState.this.rxCount);
672     }
673 
674     public Map<Integer, Integer> getTokenMap()
675     {
676         return this.tokenMap;
677     }
678 
679     /**
680      * Initialise the state
681      * 
682      * @param channel
683      * @param channelOps
684      */
685     @SuppressWarnings("unchecked")
686     public final void init(IChannel channel, IChannelOperations channelOps)
687     {
688         Map<Class<? extends IEvent>, IEventListener> listeners =
689             AbstractEventHandler.getEventHandlerMappings(createEventHandlers(
690                 context, channel, channelOps));
691         this.eventHandler =
692             new ChannelTransmissionListener(OPEN_CURLY
693                 + Channel.class.getSimpleName() + " to "
694                 + connection.toString() + CLOSE_CURLY, context, listeners);
695 
696         // get ready to process messages from the peer channel
697         getConnection().addListener(getEventHandler());
698     }
699 
700     /**
701      * Create the event handlers that will react to the events and system events
702      * that the channel should handle.
703      * 
704      * @param context
705      * @param channel
706      * @param channelOps
707      * @return an array of {@link AbstractEventHandler}
708      */
709     @SuppressWarnings("unchecked")
710     AbstractEventHandler[] createEventHandlers(IFrameworkContext context,
711         IChannel channel, IChannelOperations channelOps)
712     {
713         return new AbstractEventHandler[] {
714             new MessageEventHandler(channel, channelOps, this),
715             new RxEventHandler(channelOps, this),
716             new ContainerDestroyedEventHandler(channelOps, this),
717             new ContainerStateChangeEventHandler(channelOps, this),
718             new TxEventHandler(channelOps, this), };
719     }
720 
721     protected final void doDestroy()
722     {
723         // signal the other end to break the connection
724         if (getConnection().isActive())
725         {
726             try
727             {
728                 getConnection().send(Channel.MSG_DESTROY_CONNECTION_BYTES);
729             }
730             catch (Exception e)
731             {
732                 logException(getLog(), this, e);
733             }
734         }
735         getContext().getSystemInfo().remove(this.txCount);
736         getContext().getSystemInfo().remove(this.rxCount);
737         doComponentDestroy();
738     }
739 
740     /**
741      * Delegate method to perform the destroy operation on this component.
742      */
743     void doComponentDestroy()
744     {
745         this.txSubscriptionManager.destroy();
746         this.rxSubscriptionManager.destroy();
747         this.eventHandler.destroy();
748     }
749 
750     protected void doStart()
751     {
752         this.txSubscriptionManager.start();
753         this.rxSubscriptionManager.start();
754         this.eventHandler.start();
755         // chances are the connection is already started
756         this.connection.start();
757     }
758 
759     @Override
760     protected AsyncLog getLog()
761     {
762         return LOG;
763     }
764 
765     /**
766      * Helper to process events received from the peer channel and from the
767      * local context.
768      */
769     public final ILifeCycleEventListener getEventHandler()
770     {
771         return eventHandler;
772     }
773 
774     public final void setConnectionSyn()
775     {
776         this.connectionSyn = true;
777     }
778 
779     /** The local context */
780     public final IFrameworkContext getContext()
781     {
782         return this.context;
783     }
784 
785     /**
786      * The subscription manager for {@link TxSubscription} instances. This
787      * manager handles subscriptions for transmitting local containers to the
788      * remote context; these are subscriptions from the remote context for local
789      * containers.
790      */
791     public final ISubscriptionManager getTxSubscriptionManager()
792     {
793         return this.txSubscriptionManager;
794     }
795 
796     /**
797      * The subscription manager for {@link RxSubscription} instances. This
798      * manager handles subscriptions for receiving remote containers from remote
799      * contexts; these are subscriptions from the local context for remote
800      * containers.
801      */
802     public final ISubscriptionManager getRxSubscriptionManager()
803     {
804         return this.rxSubscriptionManager;
805     }
806 
807     /**
808      * Indicates if the connection to the peer {@link IChannel} has been
809      * synchronised. The peer channels in the peer contexts will be created at
810      * different times, so they must synchronise with each other to ensure no
811      * missed registrations occur.
812      */
813     public final boolean isConnectionSyn()
814     {
815         return this.connectionSyn;
816     }
817 
818     /** The connection to use */
819     public final IConnection getConnection()
820     {
821         return this.connection;
822     }
823 
824     public final LongField getTxCount()
825     {
826         return this.txCount;
827     }
828 
829     public final LongField getRxCount()
830     {
831         return this.rxCount;
832     }
833 }
834 
835 /**
836  * Base class for channel event handlers.
837  * <p>
838  * All sub-classes are thread safe.
839  * 
840  * @author Ramon Servadei
841  * @param <EVENT>
842  *            the event type the handler works with
843  */
844 abstract class ChannelEventHandler<EVENT extends IEvent> extends
845     AbstractEventHandler<EVENT> implements ISystemEventListener
846 {
847     /** The channel operations for the handler */
848     private final IChannelOperations channelOps;
849 
850     /** The channel state for the handler */
851     private final IChannelState state;
852 
853     ChannelEventHandler(IChannelOperations channelOps, IChannelState state)
854     {
855         super();
856         nullCheck(channelOps, "No channel operations provided");
857         nullCheck(state, "No state provided");
858         this.channelOps = channelOps;
859         this.state = state;
860     }
861 
862     IChannelState getState()
863     {
864         return state;
865     }
866 
867     IChannelOperations getChannelOps()
868     {
869         return channelOps;
870     }
871 }
872 
873 /**
874  * Handles {@link ContainerStateChangeEvent}s and sends them to any remote
875  * contexts that have subscribed for the identified container.
876  * 
877  * @author Ramon Servadei
878  */
879 final class ContainerStateChangeEventHandler extends
880     ChannelEventHandler<ContainerStateChangeEvent>
881 {
882 
883     private final static AsyncLog LOG =
884         new AsyncLog(ContainerStateChangeEventHandler.class);
885 
886     ContainerStateChangeEventHandler(IChannelOperations channelOps,
887         IChannelState state)
888     {
889         super(channelOps, state);
890     }
891 
892     @Override
893     protected AsyncLog getLog()
894     {
895         return LOG;
896     }
897 
898     @Override
899     public void handle(ContainerStateChangeEvent event)
900     {
901         if (getState().getTxSubscriptionManager().isSubscribed(
902             event.getContainer()))
903         {
904             getChannelOps().send(
905                 Channel.createDataMessage(getState().getContext().getFrameWriter().writeMeta(
906                     event.getContainer())));
907         }
908     }
909 
910 }
911 
912 /**
913  * Handler for {@link RxEvent}s. Extracts the data from the {@link RxEvent} and
914  * uses an {@link IFrameReader} to apply the data to the correct
915  * {@link IContainer}. This is the class that updates remote containers.
916  * 
917  * @author Ramon Servadei
918  * 
919  */
920 final class RxEventHandler extends ChannelEventHandler<RxEvent>
921 {
922 
923     private final static AsyncLog LOG = new AsyncLog(RxEventHandler.class);
924 
925     /** counter for the RX events */
926     private long count;
927 
928     RxEventHandler(IChannelOperations channelOps, IChannelState state)
929     {
930         super(channelOps, state);
931     }
932 
933     /**
934      * Extracts the data from the {@link RxEvent} and uses an
935      * {@link IFrameReader} to apply the data to the correct {@link IContainer}.
936      */
937     @Override
938     public void handle(RxEvent event)
939     {
940         getState().getRxCount().set(count++);
941         if (getLog().isTraceEnabled())
942         {
943             getLog().trace(event.getTraceString());
944         }
945         // this ensures the container's event processor thread runs this
946         getState().getContext().getFrameReader().read(event.getBuffer(),
947             getState().getConnection().getRemoteContextIdentity(),
948             getState().getContext());
949     }
950 
951     @Override
952     public AsyncLog getLog()
953     {
954         return LOG;
955     }
956 }
957 
958 /**
959  * Handler for {@link TxEvent}s. Extracts the data from the {@link TxEvent} and
960  * sends it to the peer channel.
961  * 
962  * @author Ramon Servadei
963  * 
964  */
965 final class TxEventHandler extends ChannelEventHandler<TxEvent>
966 {
967 
968     private final static AsyncLog LOG = new AsyncLog(TxEventHandler.class);
969 
970     /** counter for the TX events */
971     private long count;
972 
973     TxEventHandler(IChannelOperations channelOps, IChannelState state)
974     {
975         super(channelOps, state);
976     }
977 
978     /**
979      * Extracts the data from the {@link TxEvent} and sends it to the peer
980      * channel.
981      */
982     @Override
983     public void handle(TxEvent event)
984     {
985         getState().getTxCount().set(count++);
986         if (getLog().isTraceEnabled())
987         {
988             getLog().trace(event.getTraceString());
989         }
990         // if the source of the event has been destroyed, don't send
991         if (getState().getContext().containsLocalContainer(
992             event.getSource().getIdentity(), event.getSource().getType(),
993             event.getSource().getDomain()))
994         {
995             byte[] data = event.getBuffer();
996             getChannelOps().send(Channel.createDataMessage(data));
997         }
998         else
999         {
1000             if (getLog().isDebugEnabled())
1001             {
1002                 getLog().debug("Source is destroyed, not sending " + event);
1003             }
1004         }
1005     }
1006 
1007     @Override
1008     public AsyncLog getLog()
1009     {
1010         return LOG;
1011     }
1012 }
1013 
1014 /**
1015  * Handler for {@link ContainerDestroyedEvent}s. Passes the event to the peer
1016  * channel.
1017  * 
1018  * @author Ramon Servadei
1019  * 
1020  */
1021 final class ContainerDestroyedEventHandler extends
1022     ChannelEventHandler<ContainerDestroyedEvent> implements
1023     IPriorityEventListener
1024 {
1025 
1026     private final static AsyncLog LOG =
1027         new AsyncLog(ContainerDestroyedEventHandler.class);
1028 
1029     ContainerDestroyedEventHandler(IChannelOperations channelOps,
1030         IChannelState state)
1031     {
1032         super(channelOps, state);
1033     }
1034 
1035     /**
1036      * Passes the event to the peer channel.The peer channel's
1037      * {@link MessageEventHandler#doRemoteContainerDestroyed(String, IType, IDomain)}
1038      * will receive this.
1039      */
1040     @Override
1041     public void handle(ContainerDestroyedEvent event)
1042     {
1043         if (getState().getTxSubscriptionManager().includes(
1044             new SubscriptionParameters(event.getIdentity(), event.getType(),
1045                 event.getDomain())))
1046         {
1047             getChannelOps().send(
1048                 getChannelOps().getContainerDestroyedMessage(event));
1049         }
1050     }
1051 
1052     @Override
1053     public AsyncLog getLog()
1054     {
1055         return LOG;
1056     }
1057 }
1058 
1059 /**
1060  * A handler for {@link MessageEvent}s. The message event payload is one of the
1061  * messages declared in the {@link IChannel} so this class provides the logic to
1062  * handle each type of message. Generally, each message is handled by a doXXX
1063  * method.
1064  * 
1065  * @author Ramon Servadei
1066  * 
1067  */
1068 class MessageEventHandler extends ChannelEventHandler<MessageEvent>
1069 {
1070 
1071     private final static AsyncLog LOG = new AsyncLog(MessageEventHandler.class);
1072 
1073     /** The channel for this handler */
1074     private final IChannel channel;
1075 
1076     /**
1077      * Standard constructor
1078      * 
1079      * @param channel
1080      *            the channel
1081      * @param channelOps
1082      *            the channel operations
1083      * @param state
1084      *            the channel state
1085      */
1086     MessageEventHandler(IChannel channel, IChannelOperations channelOps,
1087         IChannelState state)
1088     {
1089         super(channelOps, state);
1090         this.channel = channel;
1091     }
1092 
1093     @Override
1094     public void handle(MessageEvent event)
1095     {
1096         if (getLog().isTraceEnabled())
1097         {
1098             getLog().trace(event.getTraceString());
1099         }
1100         final byte[] data = event.getData();
1101         byte[] arg = new byte[data.length - Channel.HEADER_LEN];
1102         System.arraycopy(data, Channel.HEADER_LEN, arg, 0, arg.length);
1103 
1104         if (ArrayUtils.startsWith(Channel.MSG_DATA_BYTES, data))
1105         {
1106             doMessageReceived(arg);
1107             return;
1108         }
1109         if (ArrayUtils.startsWith(Channel.MSG_INVOKE_RPC_BYTES, data))
1110         {
1111             doInvokeRpc(arg);
1112             return;
1113         }
1114         try
1115         {
1116             final String payload = new String(arg, ByteConstants.ENCODING);
1117             final String[] elements = splitTypeDomainAndIdentity(payload);
1118             if (ArrayUtils.startsWith(Channel.MSG_SUBSCRIBE_BYTES, data))
1119             {
1120                 // subscription request from peer
1121                 doTxSubscribe(elements[2],
1122                     Type.get(Byte.parseByte(elements[0])),
1123                     Domain.get(Byte.parseByte(elements[1])));
1124                 return;
1125             }
1126             if (ArrayUtils.startsWith(Channel.MSG_UNSUBSCRIBE_BYTES, data))
1127             {
1128                 // unsubscribe request from peer
1129                 doTxUnsubscribe(elements[2],
1130                     Type.get(Byte.parseByte(elements[0])),
1131                     Domain.get(Byte.parseByte(elements[1])));
1132                 return;
1133             }
1134             if (ArrayUtils.startsWith(Channel.MSG_CONTAINER_DESTROYED_BYTES,
1135                 data))
1136             {
1137                 // a container destroyed in the peer
1138                 doRemoteContainerDestroyed(elements[2],
1139                     Type.get(Byte.parseByte(elements[0])),
1140                     Domain.get(Byte.parseByte(elements[1])));
1141                 return;
1142             }
1143             if (ArrayUtils.startsWith(Channel.MSG_SYN_BYTES, data))
1144             {
1145                 // a syn from the peer
1146                 doSynReceived();
1147                 return;
1148             }
1149             if (ArrayUtils.startsWith(Channel.MSG_SYN_ACK_BYTES, data))
1150             {
1151                 // an ack from the peer
1152                 doSynAckReceived();
1153                 return;
1154             }
1155             if (ArrayUtils.startsWith(Channel.MSG_RETRANSMIT_BYTES, data))
1156             {
1157                 doRetransmit(elements[2],
1158                     Type.get(Byte.parseByte(elements[0])),
1159                     Domain.get(Byte.parseByte(elements[1])));
1160                 return;
1161             }
1162             if (ArrayUtils.startsWith(Channel.MSG_RETRANSMIT_ALL_BYTES, data))
1163             {
1164                 doRetransmitAll();
1165                 return;
1166             }
1167             if (ArrayUtils.startsWith(Channel.MSG_DESTROY_CONNECTION_BYTES,
1168                 data))
1169             {
1170                 doDestroyConnection();
1171                 return;
1172             }
1173             throw new RuntimeException("Unhandled message=" + payload);
1174         }
1175         catch (Exception e)
1176         {
1177             throw new RuntimeException(
1178                 "Could not handle message (unencoded): '"
1179                     + Arrays.toString(data) + "'", e);
1180         }
1181     }
1182 
1183     /**
1184      * Handle an RPC invoke event. This is encapsulated in an
1185      * {@link RpcInvokeEvent} and fired at the context for it to locate the
1186      * appropriate handler
1187      * 
1188      * @param rpcData
1189      *            the RPC data
1190      */
1191     void doInvokeRpc(byte[] rpcData)
1192     {
1193         RpcInvokeEvent event =
1194             new RpcInvokeEvent(getState().getContext(), rpcData);
1195         // the RPC manager should pick this up
1196         getState().getContext().queueEvent(event);
1197     }
1198 
1199     /**
1200      * Handle a data message. This is generally an update to a remote container.
1201      * 
1202      * @param arg
1203      *            the byte[] encapsulating the data message
1204      */
1205     void doMessageReceived(byte[] arg)
1206     {
1207         // data for a remote container from the peer
1208         // NOTE: this will raise a ContainerCreatedEvent if it creates the
1209         // container
1210         final IContainer remoteContainer =
1211             getState().getContext().getFrameReader().getRemoteContainerForFrame(
1212                 arg, getState().getConnection().getRemoteContextIdentity(),
1213                 getState().getContext());
1214         /*
1215          * encapsulate in an RxEvent so the container's event processor will
1216          * handle this - spreads the load across all event processors (depending
1217          * on how the containers are allocated) rather than a single thread
1218          * executing all remote container events
1219          */
1220         RxEvent rx = new RxEvent(remoteContainer, arg);
1221         // add the receive event to the context
1222         getState().getContext().queueEvent(rx);
1223     }
1224 
1225     /**
1226      * Handles the signal that the identified remote container has been
1227      * destroyed in its local context. The remote container instance in this
1228      * context needs to be destroyed.
1229      * 
1230      * @param identityRegex
1231      *            remote container identity
1232      * @param type
1233      *            remote container type
1234      * @param domain
1235      *            remote container domain
1236      */
1237     void doRemoteContainerDestroyed(String identity, IType type, IDomain domain)
1238     {
1239         final IContainer remoteContainer =
1240             getState().getContext().getRemoteContainer(
1241                 getState().getConnection().getRemoteContextIdentity(),
1242                 identity, type, domain);
1243         remoteContainer.destroy();
1244     }
1245 
1246     final void doSynAckReceived()
1247     {
1248         if (!getChannelOps().isConnectionSyn())
1249         {
1250             if (Channel.LOG.isDebugEnabled())
1251             {
1252                 Channel.LOG.debug(getChannel()
1253                     + " received SYN_ACK, channel is ready.");
1254             }
1255             getChannelOps().setConnectionSyn();
1256             ChannelReadyEvent event =
1257                 new ChannelReadyEvent(getState().getContext(), getChannel());
1258             getState().getContext().queueEvent(event);
1259         }
1260         else
1261         {
1262             if (getLog().isDebugEnabled())
1263             {
1264                 getLog().debug(
1265                     getChannel()
1266                         + " is already ready, ignoring duplicate SYN_ACK.");
1267             }
1268         }
1269     }
1270 
1271     final void doSynReceived()
1272     {
1273         // always send a SYN_ACK response, even if we are synchronised
1274         if (Channel.LOG.isDebugEnabled())
1275         {
1276             Channel.LOG.debug(getChannel() + " received SYN, sending SYN_ACK");
1277         }
1278         getChannelOps().send(Channel.MSG_SYN_ACK_BYTES);
1279         if (!getChannelOps().isConnectionSyn())
1280         {
1281             // The other channel may not have received our first SYN so,
1282             // for safety, re-transmit the SYN. This might cause the peer
1283             // channel to retransmit a second SYN_ACK which we will receive -
1284             // and ignore as it will be a duplicate.
1285             if (Channel.LOG.isDebugEnabled())
1286             {
1287                 Channel.LOG.debug(getChannel() + " re-sending SYN");
1288             }
1289             getChannelOps().send(Channel.MSG_SYN_BYTES);
1290         }
1291     }
1292 
1293     /**
1294      * Mark matching containers to transmit. TxEvents will be generated by the
1295      * container(s) and received by the channel's event handler. These are
1296      * subsequently sent to the remote end of this channel.
1297      * 
1298      * @param identityRegex
1299      *            container identity
1300      * @param type
1301      *            container type
1302      * @param domain
1303      *            container domain
1304      */
1305     void doTxSubscribe(String identityRegex, IType type, IDomain domain)
1306     {
1307         /*
1308          * Creates a TxSubscription that does the business of marking the
1309          * containers to transmit. This allows the channel to intercept the
1310          * TxEvents (see update method)
1311          */
1312         final SubscriptionParameters parameters =
1313             new SubscriptionParameters(identityRegex, type, domain);
1314         if (getLog().isInfoEnabled())
1315         {
1316             getLog().info("[subscribe] " + parameters);
1317         }
1318         // these operations are idempotent, so even though we may receive
1319         // duplicate subscriptions, we never handle the duplicates.
1320         getState().getTxSubscriptionManager().subscribe(parameters);
1321         getState().getTxSubscriptionManager().addListener(parameters,
1322             getState().getEventHandler());
1323     }
1324 
1325     /**
1326      * Unmark matching containers from transmission.
1327      * 
1328      * @param identityRegex
1329      *            container identity to match
1330      * @param type
1331      *            container type to match
1332      * @param domain
1333      *            container domain to match
1334      */
1335     void doTxUnsubscribe(String identityRegex, IType type, IDomain domain)
1336     {
1337         // The channel will no longer receive the TxEvents
1338         final SubscriptionParameters parameters =
1339             new SubscriptionParameters(identityRegex, type, domain);
1340         if (getLog().isInfoEnabled())
1341         {
1342             getLog().info("[unsubscribe] " + parameters);
1343         }
1344         getState().getTxSubscriptionManager().unsubscribe(parameters);
1345     }
1346 
1347     /**
1348      * Handle the request to retransmit the identified local container
1349      * 
1350      * @param identityRegex
1351      *            container identity to match
1352      * @param type
1353      *            container type to match
1354      * @param domain
1355      *            container domain to match
1356      */
1357     void doRetransmit(String identityRegex, IType type, IDomain domain)
1358     {
1359         getChannel().retransmit(identityRegex, type, domain);
1360     }
1361 
1362     /**
1363      * Handle the request to retransmit all local containers
1364      */
1365     void doRetransmitAll()
1366     {
1367         getChannel().retransmitAll();
1368     }
1369 
1370     /**
1371      * Handle the request to destroy the connection
1372      */
1373     void doDestroyConnection()
1374     {
1375         if (getLog().isInfoEnabled())
1376         {
1377             getLog().info(
1378                 "Received destroy connection message for " + getChannel());
1379         }
1380         final IConnection connection = getChannel().getConnection();
1381         if (connection != null)
1382         {
1383             connection.destroy();
1384         }
1385     }
1386 
1387     /**
1388      * Utility to split the payload on the '|'
1389      * 
1390      * @param payload
1391      *            the payload containing the type, domain and identity of a
1392      *            container
1393      * @return a 3 element String[]
1394      */
1395     String[] splitTypeDomainAndIdentity(String payload)
1396     {
1397         String[] split = new String[3];
1398         final int indexOf = payload.indexOf(Channel.DELIMITER);
1399         if (indexOf > -1)
1400         {
1401             split[0] = payload.substring(0, indexOf);
1402             final int index2 =
1403                 payload.indexOf(Channel.DELIMITER, indexOf
1404                     + Channel.DELIMITER.length());
1405             split[1] =
1406                 payload.substring(indexOf + Channel.DELIMITER.length(), index2);
1407             split[2] = payload.substring(index2 + Channel.DELIMITER.length());
1408         }
1409         return split;
1410     }
1411 
1412     @Override
1413     public AsyncLog getLog()
1414     {
1415         return LOG;
1416     }
1417 
1418     IChannel getChannel()
1419     {
1420         return channel;
1421     }
1422 }