View Javadoc

1   /*
2      Copyright 2008 Ramon Servadei
3   
4      Licensed under the Apache License, Version 2.0 (the "License");
5      you may not use this file except in compliance with the License.
6      You may obtain a copy of the License at
7   
8          http://www.apache.org/licenses/LICENSE-2.0
9   
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15   */
16  package fulmine.context;
17  
18  import static fulmine.util.Utils.COLON;
19  import static fulmine.util.Utils.COMMA_SPACE;
20  import static fulmine.util.Utils.EMPTY_STRING;
21  import static fulmine.util.Utils.SPACING_4_CHARS;
22  import static fulmine.util.Utils.logException;
23  import static fulmine.util.Utils.nullCheck;
24  import static fulmine.util.Utils.safeToString;
25  import static fulmine.util.Utils.string;
26  
27  import java.util.Collection;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Set;
31  
32  import org.apache.commons.lang.SystemUtils;
33  
34  import fulmine.AbstractLifeCycle;
35  import fulmine.IDomain;
36  import fulmine.IType;
37  import fulmine.distribution.IDistributionManager;
38  import fulmine.distribution.IDistributionState;
39  import fulmine.distribution.IRemoteUpdateInvoker;
40  import fulmine.distribution.RemoteUpdateInvoker;
41  import fulmine.distribution.channel.Channel;
42  import fulmine.distribution.channel.ChannelFactory;
43  import fulmine.distribution.channel.ChannelReadyEvent;
44  import fulmine.distribution.channel.ChannelTransmissionListener;
45  import fulmine.distribution.channel.IChannel;
46  import fulmine.distribution.channel.IChannelFactory;
47  import fulmine.distribution.connection.IConnection;
48  import fulmine.distribution.connection.IConnectionDiscoverer;
49  import fulmine.distribution.connection.IConnectionParameters;
50  import fulmine.distribution.events.ConnectionAvailableEvent;
51  import fulmine.distribution.events.ConnectionDestroyedEvent;
52  import fulmine.distribution.events.ContextDiscoveredEvent;
53  import fulmine.distribution.events.ContextNotAvailableEvent;
54  import fulmine.event.IEvent;
55  import fulmine.event.IEventManager;
56  import fulmine.event.listener.AbstractEventHandler;
57  import fulmine.event.listener.IEventListener;
58  import fulmine.event.listener.ILifeCycleEventListener;
59  import fulmine.event.listener.MultiSystemEventListener;
60  import fulmine.event.subscription.ISubscriptionListener;
61  import fulmine.event.subscription.ISubscriptionManager;
62  import fulmine.event.subscription.ISubscriptionParameters;
63  import fulmine.event.subscription.SubscriptionParameters;
64  import fulmine.event.system.AbstractSystemEvent;
65  import fulmine.event.system.EventSourceNotObservedEvent;
66  import fulmine.event.system.EventSourceObservedEvent;
67  import fulmine.event.system.ISystemEventListener;
68  import fulmine.event.system.SubscribeEvent;
69  import fulmine.event.system.UnsubscribeEvent;
70  import fulmine.model.container.IContainer;
71  import fulmine.model.container.subscription.ContainerSubscriptionManager;
72  import fulmine.model.field.IntegerField;
73  import fulmine.model.field.StringField;
74  import fulmine.protocol.specification.FrameReader;
75  import fulmine.protocol.specification.FrameWriter;
76  import fulmine.protocol.specification.IFrameReader;
77  import fulmine.protocol.specification.IFrameWriter;
78  import fulmine.rpc.IRpcResult;
79  import fulmine.util.collection.CollectionFactory;
80  import fulmine.util.collection.CollectionUtils;
81  import fulmine.util.log.AsyncLog;
82  import fulmine.util.reference.AutoCreatingStore;
83  import fulmine.util.reference.DualValue;
84  import fulmine.util.reference.IAutoCreatingStore;
85  import fulmine.util.reference.IObjectBuilder;
86  import fulmine.util.reference.IReferenceCounter;
87  import fulmine.util.reference.ReferenceCounter;
88  import fulmine.util.reference.Values;
89  
90  /**
91   * The manager of subscriptions for distribution of events within the local
92   * context and from remote contexts. The manager also provides retransmission
93   * and retransmission-request operations.
94   * <p>
95   * This registers for the following events:
96   * <ul>
97   * <li>{@link ContextDiscoveredEvent} - when a remote context is discovered, the
98   * manager will initiate creation of an {@link IConnection} to the remote
99   * context if there is a subscription waiting for a remote container in that
100  * remote context, otherwise it will simply cache the connection parameters
101  * against the remote context's identity for future use.
102  * <li>{@link ContextNotAvailableEvent} - raised when a remote context is no
103  * longer available. Any connection to the remote context (and by association
104  * the channel) is destroyed after this event is received.
105  * <li>{@link ConnectionAvailableEvent} - this is raised when an
106  * {@link IConnection} has been created between the local and a remote context.
107  * After this, an {@link IChannel} instance will be created using the
108  * connection. When 2 contexts decide to connect to each other, duplicate
109  * connections can occur between the contexts; only 1 connection is required.
110  * The manager handles this scenario as follows:
111  * <ul>
112  * <li>If the name of this context is semantically greater than the other
113  * <u>and</u> there is already a connection between the 2 contexts, the new
114  * connection is destroyed (if the names are equal, the hashcodes of the
115  * contexts are compared, see {@link IFrameworkContext#getContextHashCode()}).
116  * This will cause the {@link IChannel} in the remote context to shutdown (for a
117  * brief period the remote context may have had 2 channels, but only one will
118  * have completed its initialisation sequence).
119  * <li>Else a new channel is created using the connection. The channel will
120  * begin its synchronisation with the remote peer channel - this may not
121  * complete if the remote context destroys the connection as per the previous
122  * point.
123  * </ul>
124  * <li>{@link ConnectionDestroyedEvent} - when an {@link IConnection} is
125  * destroyed, the corresponding {@link IChannel} is also destroyed.
126  * <li>{@link ChannelReadyEvent} - this is received when a {@link Channel} has
127  * been created over an {@link IConnection} to a remote context and the channel
128  * has synchronised its readiness with the other remote peer channel. At this
129  * point the channel is available for use to issue subscriptions. Any
130  * subscriptions that have been received by the manager for remote containers in
131  * the remote context this channel connects to can now be issued.
132  * </ul>
133  * <p>
134  * The synchronisation rule-of-thumb for this class is that any operations
135  * involving I/O should not be executed whilst holding any object monitors.
136  * 
137  * @see IFulmineContext
138  * @author Ramon Servadei
139  */
140 class DistributionManager extends AbstractLifeCycle implements
141     IDistributionManager
142 {
143     final static AsyncLog LOG = new AsyncLog(DistributionManager.class);
144 
145     /** The shared state */
146     private IDistributionState state;
147 
148     /**
149      * Standard constructor
150      * 
151      * @param context
152      *            the context this is associated with
153      */
154     DistributionManager(IFrameworkContext context)
155     {
156         this(new DistributionState(context));
157     }
158 
159     /**
160      * Internally chained constructor
161      * 
162      * @param state
163      *            the state for the manager
164      */
165     DistributionManager(IDistributionState state)
166     {
167         super();
168         this.state = state;
169     }
170 
171     @Override
172     protected AsyncLog getLog()
173     {
174         return LOG;
175     }
176 
177     @Override
178     protected void doStart()
179     {
180         getState().init();
181         getState().start();
182         if (getLog().isInfoEnabled())
183         {
184             getLog().info("Started");
185         }
186     }
187 
188     public boolean subscribe(String contextIdentity, String identityRegex,
189         IType type, IDomain domain, IEventListener listener)
190     {
191         final SubscriptionParameters parameters =
192             new SubscriptionParameters(identityRegex, type, domain);
193         if (getLog().isInfoEnabled())
194         {
195             getLog().info(
196                 "[subscribe] context=" + contextIdentity + COMMA_SPACE
197                     + parameters + ", listener=" + safeToString(listener));
198         }
199         if (getState().getContext().getIdentity().equals(contextIdentity))
200         {
201             boolean subscribed = false;
202             // local container
203             final ISubscriptionManager subscriptionManager =
204                 getState().getSubscriptionManager();
205             subscriptionManager.subscribe(parameters);
206             subscribed = subscriptionManager.addListener(parameters, listener);
207             return subscribed;
208         }
209         return doRemoteSubscribe(contextIdentity, parameters, listener);
210     }
211 
212     /**
213      * Performs the actions to service a remote subscription
214      * 
215      * @param remoteContextIdentity
216      *            the remote context for the subscription
217      * @param parameters
218      *            the parameters for the subscription
219      * @param listener
220      *            the listener for the subscription
221      * @return <code>true</code> if the subscription was created,
222      *         <code>false</code> if it already existed
223      */
224     protected boolean doRemoteSubscribe(String remoteContextIdentity,
225         ISubscriptionParameters parameters, IEventListener listener)
226     {
227         boolean subscribed = false;
228         nullCheck(remoteContextIdentity, "No remote context identity");
229         IConnectionParameters connectionParameters = null;
230         DualValue<ISubscriptionParameters, IEventListener> values =
231             new DualValue<ISubscriptionParameters, IEventListener>(parameters,
232                 listener);
233         boolean connect = false;
234         final IChannel channel;
235         // single lock
236         synchronized (getState())
237         {
238             final Set<DualValue<ISubscriptionParameters, IEventListener>> remoteSubscriptions =
239                 getState().getRemoteSubscriptions().get(remoteContextIdentity);
240             // if the subscription is already contained, return false
241             // but still attempt all the connection logic
242             subscribed = !remoteSubscriptions.contains(values);
243             remoteSubscriptions.add(values);
244             if (getLog().isTraceEnabled())
245             {
246                 getLog().trace(
247                     "Remote subscriptions for remote context "
248                         + remoteContextIdentity
249                         + " are"
250                         + CollectionUtils.toFormattedString(remoteSubscriptions));
251             }
252             connectionParameters =
253                 getState().getDiscoveredContexts().get(remoteContextIdentity);
254 
255             /*
256              * pass the subscription on to the channel, if there is no channel,
257              * the remoteSubscription will be triggered when the channel comes
258              * online
259              */
260             channel = getState().getChannels().get(remoteContextIdentity);
261             if (channel == null)
262             {
263                 connect =
264                     shouldConnect(remoteContextIdentity, connectionParameters);
265             }
266         }
267         if (channel != null)
268         {
269             /*
270              * spin-off onto an event so that its handled by the same thread -
271              * subscription can also occur from the ChannelAvailableEventHandler
272              */
273             getState().getContext().queueEvent(
274                 new RemoteContainerSubscriptionEvent(getState().getContext(),
275                     remoteContextIdentity, values));
276         }
277         /*
278          * trigger the connection that will create the channel
279          * 
280          * although duplicate connections may arise from this, duplicates are
281          * handled by the ConnectionAvailableEventHandler...
282          */
283         if (connect)
284         {
285             getState().getContext().getConnectionBroker().connect(
286                 connectionParameters);
287         }
288         return subscribed;
289     }
290 
291     /**
292      * Helper method to determine whether a connection should be made to the
293      * remote context to issue the subscription. This method checks that there
294      * is no connection currently existing or being made and that the context is
295      * available.
296      * 
297      * @param remoteContextIdentity
298      *            the remote context that the connection would be for
299      * @param connectionParameters
300      *            the connection parameters
301      * @return <code>true</code> if a connection should be attempted
302      */
303     private boolean shouldConnect(String remoteContextIdentity,
304         IConnectionParameters connectionParameters)
305     {
306         synchronized (getState())
307         {
308             boolean connect = false;
309             if (connectionParameters != null)
310             {
311                 // are we already connected?
312                 final String connectionIdentity =
313                     connectionParameters.getRemoteContextIdentity();
314                 if (getState().getConnectedContexts().getCount(
315                     connectionIdentity) == 0)
316                 {
317                     /*
318                      * this operation must be idempotent otherwise we get
319                      * multiple connection attempts...only potential problem is
320                      * that we can't force a re-connection if a connection
321                      * attempt fails...
322                      */
323                     if (getState().getCONNECTINGContexts().getCount(
324                         connectionIdentity) == 0)
325                     {
326                         getState().getCONNECTINGContexts().adjustCount(
327                             connectionIdentity, 1);
328                         if (getLog().isTraceEnabled())
329                         {
330                             getLog().trace(
331                                 "CONNECTING contexts are "
332                                     + safeToString(getState().getCONNECTINGContexts()));
333                         }
334                         connect = true;
335                     }
336                     else
337                     {
338                         if (getLog().isTraceEnabled())
339                         {
340                             getLog().trace(
341                                 "Already trying to connect to "
342                                     + safeToString(connectionParameters));
343                         }
344                     }
345                 }
346                 else
347                 {
348                     if (getLog().isTraceEnabled())
349                     {
350                         getLog().trace(
351                             "Already connected to "
352                                 + safeToString(connectionParameters));
353                     }
354                 }
355             }
356             else
357             {
358                 if (getLog().isDebugEnabled())
359                 {
360                     getLog().debug(
361                         "Remote context " + remoteContextIdentity
362                             + " has not been discovered yet");
363                 }
364             }
365             return connect;
366         }
367     }
368 
369     public boolean unsubscribe(String contextIdentity, String identityRegex,
370         IType type, IDomain domain, IEventListener listener)
371     {
372         final SubscriptionParameters parameters =
373             new SubscriptionParameters(identityRegex, type, domain);
374         if (getLog().isInfoEnabled())
375         {
376             getLog().info(
377                 "[unsubscribe] context=" + contextIdentity + COMMA_SPACE
378                     + parameters + ", listener=" + safeToString(listener));
379         }
380         if (getState().getContext().getIdentity().equals(contextIdentity))
381         {
382             boolean unsubscribed = false;
383             // remove the listener, if there are no more, then unsubscribe
384             if (canUnsubscribe(getState().getSubscriptionManager(), parameters,
385                 listener, false))
386             {
387                 unsubscribed =
388                     getState().getSubscriptionManager().unsubscribe(parameters);
389             }
390             return unsubscribed;
391         }
392         return doRemoteUnsubscribe(contextIdentity, parameters, listener);
393     }
394 
395     /**
396      * Helper method to remove the listener from the subscription managed by the
397      * manager that is passed in. It returns whether the subscription can be
398      * removed because there are no more application listeners. This works for
399      * both local and remote unsubscribe operations.
400      * 
401      * @param subscriptionManager
402      *            the subscription manager to operate on
403      * @param parameters
404      *            the parameters identifying the subscription
405      * @param listener
406      *            the listener to remove
407      * @param unsubscribeRemote
408      *            whether this is an unsubscribe for a remote container
409      * @return <code>true</code> if there are no more application listeners
410      *         associated with this subscription and thus the subscription can
411      *         be removed. For a remote subscription, if there is only the
412      *         {@link ChannelTransmissionListener} remaining as a listener, this
413      *         effectively means there are no more application listeners.
414      */
415     private final boolean canUnsubscribe(
416         ISubscriptionManager subscriptionManager,
417         final ISubscriptionParameters parameters, IEventListener listener,
418         boolean unsubscribeRemote)
419     {
420         if (getLog().isTraceEnabled())
421         {
422             getLog().trace(
423                 "removing listener "
424                     + safeToString(listener)
425                     + " for "
426                     + safeToString(parameters)
427                     + " from subscription listeners "
428                     + CollectionUtils.toFormattedString(subscriptionManager.getListeners(parameters)));
429         }
430         subscriptionManager.removeListener(parameters, listener);
431         final List<IEventListener> listeners =
432             subscriptionManager.getListeners(parameters);
433         boolean canUnsubscribe = false;
434         if (listeners.size() == 0)
435         {
436             canUnsubscribe = true;
437         }
438         else
439         {
440             if (unsubscribeRemote)
441             {
442                 canUnsubscribe = applicationListenersExist(listeners);
443             }
444         }
445 
446         if (getLog().isTraceEnabled())
447         {
448             getLog().trace(
449                 "Can"
450                     + (canUnsubscribe ? EMPTY_STRING : "not")
451                     + " unsubscribe "
452                     + safeToString(parameters)
453                     + ", subscribed sources: "
454                     + CollectionUtils.toFormattedString(subscriptionManager.getSubscribedSources(parameters))
455                     + ", listeners: "
456                     + CollectionUtils.toFormattedString(subscriptionManager.getListeners(parameters)));
457         }
458         return canUnsubscribe;
459     }
460 
461     /**
462      * Identify if there are any application listeners in the list of listeners.
463      * Any application listeners existing mean that unsubscription cannot
464      * continue.
465      * 
466      * @param listeners
467      *            the list of listeners to examine for any application listeners
468      * @return <code>false</code> if there are any application listeners and
469      *         unsubscription should not continue
470      */
471     protected boolean applicationListenersExist(
472         final List<IEventListener> listeners)
473     {
474         boolean canUnsubscribe = false;
475         /*
476          * if the only listener is the channel, then there are no more
477          * application listeners for the remote subscription, so we can
478          * unsubscribe
479          */
480         if (listeners.size() == 1
481             && listeners.get(0) instanceof ChannelTransmissionListener)
482         {
483             canUnsubscribe = true;
484         }
485         return canUnsubscribe;
486     }
487 
488     /**
489      * Performs the actions to service an unsubscribe operation for a remote
490      * container.
491      * 
492      * @param remoteContextIdentity
493      *            the remote context to unsubscribe from
494      * @param parameters
495      *            the subscription parameters
496      * @param listener
497      *            the listener to unsubscribe
498      * @return <code>true</code> if the subscription was found and removed,
499      *         <code>false</code> otherwise
500      */
501     protected boolean doRemoteUnsubscribe(String remoteContextIdentity,
502         ISubscriptionParameters parameters, IEventListener listener)
503     {
504         IChannel channel = null;
505         if (remoteContextIdentity != null)
506         {
507             final DualValue<ISubscriptionParameters, IEventListener> values =
508                 new DualValue<ISubscriptionParameters, IEventListener>(
509                     parameters, listener);
510             // single lock
511             synchronized (getState())
512             {
513                 final Set<DualValue<ISubscriptionParameters, IEventListener>> remoteSubscriptions =
514                     getState().getRemoteSubscriptions().get(
515                         remoteContextIdentity);
516                 remoteSubscriptions.remove(values);
517                 if (getLog().isTraceEnabled())
518                 {
519                     getLog().trace(
520                         "Remote subscriptions for remote context "
521                             + remoteContextIdentity
522                             + " are now "
523                             + CollectionUtils.toFormattedString(remoteSubscriptions));
524                 }
525             }
526             channel = getState().getChannels().get(remoteContextIdentity);
527         }
528         if (channel != null)
529         {
530             if (canUnsubscribe(channel, parameters, listener, true))
531             {
532                 // this might cause the connection to be destroyed if there are
533                 // no more tx/rx subscriptions
534                 return channel.unsubscribe(parameters);
535             }
536         }
537         return false;
538     }
539 
540     @Override
541     protected void doDestroy()
542     {
543         getState().destroy();
544         for (IChannel channel : getState().getChannels().values())
545         {
546             try
547             {
548                 channel.destroy();
549             }
550             catch (Exception e)
551             {
552                 logException(getLog(), channel, e);
553             }
554         }
555         getState().getChannels().clear();
556     }
557 
558     public IFrameReader getFrameReader()
559     {
560         return getState().getFrameReader();
561     }
562 
563     public IFrameWriter getFrameWriter()
564     {
565         return getState().getFrameWriter();
566     }
567 
568     public void requestRetransmit(String contextIdentity,
569         String identityRegularExpression, IType type, IDomain domain)
570     {
571         if (getLog().isDebugEnabled())
572         {
573             getLog().debug(
574                 "requestRetransmit for context=" + contextIdentity
575                     + ", identity=" + identityRegularExpression + ", type="
576                     + type + ", domain=" + domain);
577         }
578         final IChannel channel = getChannel(contextIdentity);
579         if (channel != null)
580         {
581             channel.requestRetransmit(identityRegularExpression, type, domain);
582         }
583     }
584 
585     public void requestRetransmitAll(String contextIdentity)
586     {
587         if (getLog().isDebugEnabled())
588         {
589             getLog().debug(
590                 "requestRetransmitAll for context=" + contextIdentity);
591         }
592         final IChannel channel = getChannel(contextIdentity);
593         if (channel != null)
594         {
595             channel.requestRetransmitAll();
596         }
597     }
598 
599     public void retransmit(String contextIdentity,
600         String identityRegularExpression, IType type, IDomain domain)
601     {
602         if (getLog().isDebugEnabled())
603         {
604             getLog().debug(
605                 "retransmit for context=" + contextIdentity + ", identity="
606                     + identityRegularExpression + ", type=" + type
607                     + ", domain=" + domain);
608         }
609         final IChannel channel = getChannel(contextIdentity);
610         if (channel != null)
611         {
612             channel.retransmit(identityRegularExpression, type, domain);
613         }
614     }
615 
616     public void retransmitAll(String contextIdentity)
617     {
618         if (getLog().isDebugEnabled())
619         {
620             getLog().debug("requestAll for context=" + contextIdentity);
621         }
622         final IChannel channel = getChannel(contextIdentity);
623         if (channel != null)
624         {
625             channel.retransmitAll();
626         }
627     }
628 
629     public void retransmitAllToAll()
630     {
631         if (getLog().isDebugEnabled())
632         {
633             getLog().debug("requestAllToAll");
634         }
635         for (IChannel channel : getState().getChannels().values())
636         {
637             try
638             {
639                 channel.retransmitAll();
640             }
641             catch (Exception e)
642             {
643                 logException(getLog(), channel, e);
644             }
645         }
646     }
647 
648     public void retransmitToAll(String identityRegularExpression, IType type,
649         IDomain domain)
650     {
651         if (getLog().isDebugEnabled())
652         {
653             getLog().debug(
654                 "retransmitToAll for identity=" + identityRegularExpression
655                     + ", type=" + type + ", domain=" + domain);
656         }
657         for (IChannel channel : getState().getChannels().values())
658         {
659             try
660             {
661                 channel.retransmit(identityRegularExpression, type, domain);
662             }
663             catch (Exception e)
664             {
665                 logException(getLog(), channel, e);
666             }
667         }
668     }
669 
670     private IChannel getChannel(String contextIdentity)
671     {
672         return getState().getChannels().get(contextIdentity);
673     }
674 
675     public IChannel[] getConnectedChannels()
676     {
677         final Collection<IChannel> values = getState().getChannels().values();
678         return values.toArray(new IChannel[values.size()]);
679     }
680 
681     /**
682      * Log the current connected channels
683      */
684     public void logConnectedChannels()
685     {
686         StringBuilder sb = new StringBuilder();
687         final IChannel[] connectedChannels = getConnectedChannels();
688         for (IChannel channel : connectedChannels)
689         {
690             sb.append(SystemUtils.LINE_SEPARATOR).append(SPACING_4_CHARS).append(
691                 safeToString(channel.toString()));
692         }
693         if (connectedChannels.length == 0)
694         {
695             sb.append(SystemUtils.LINE_SEPARATOR).append(SPACING_4_CHARS).append(
696                 "<UNCONNECTED>");
697         }
698         if (getLog().isInfoEnabled())
699         {
700             getLog().info(
701                 SystemUtils.LINE_SEPARATOR + this + " connected to:" + sb);
702         }
703     }
704 
705     public boolean addSubscriptionListener(ISubscriptionListener listener)
706     {
707         boolean addListener =
708             getState().getContext().getSystemEventSource(
709                 EventSourceObservedEvent.class).addListener(listener);
710         addListener |=
711             getState().getContext().getSystemEventSource(
712                 EventSourceNotObservedEvent.class).addListener(listener);
713         addListener |=
714             getState().getContext().getSystemEventSource(SubscribeEvent.class).addListener(
715                 listener);
716         addListener |=
717             getState().getContext().getSystemEventSource(UnsubscribeEvent.class).addListener(
718                 listener);
719         return addListener;
720     }
721 
722     public boolean removeSubscriptionListener(ISubscriptionListener listener)
723     {
724         boolean removeListener =
725             getState().getContext().getSystemEventSource(
726                 EventSourceNotObservedEvent.class).removeListener(listener);
727         removeListener |=
728             getState().getContext().getSystemEventSource(
729                 EventSourceObservedEvent.class).removeListener(listener);
730         removeListener |=
731             getState().getContext().getSystemEventSource(SubscribeEvent.class).removeListener(
732                 listener);
733         removeListener |=
734             getState().getContext().getSystemEventSource(UnsubscribeEvent.class).removeListener(
735                 listener);
736         return removeListener;
737     }
738 
739     public String updateRemoteContainer(String remoteContextIdentity,
740         String identity, IType type, IDomain domain, String fieldName,
741         String fieldValueAsString)
742     {
743         // register the RPC publication listener so we can pick up
744         // the procedure (RPC key) to invoke (this is idempotent)
745         getState().getContext().addRpcPublicationListener(
746             remoteContextIdentity,
747             getState().getRemoteUpdateInvoker(remoteContextIdentity));
748         final IRpcResult result =
749             getState().getRemoteUpdateInvoker(remoteContextIdentity).invoke(
750                 remoteContextIdentity,
751                 new StringField("identity", identity),
752                 new IntegerField("type", type.value()),
753                 new IntegerField("domain", domain.value()),
754                 new StringField("fieldName", fieldName),
755                 new StringField("fieldValueString", fieldValueAsString),
756                 new IntegerField(
757                     "permissionApp",
758                     getState().getContext().getPermissionProfile().getApplicationCode()),
759                 new IntegerField(
760                     "permissionCode",
761                     getState().getContext().getPermissionProfile().getPermissionCode()),
762                 new StringField("remoteContextIdentity",
763                     getState().getContext().getIdentity()));
764         if (result == null)
765         {
766             return "Null result, RPC definition not available";
767         }
768         if (result.isSuccessful())
769         {
770             return result.getResult().getValue().toString();
771         }
772         return result.getExceptionMessage();
773     }
774 
775     public void invokeRpc(String remoteContextIdentity, byte[] rpcData)
776     {
777         final IChannel channel =
778             getState().getChannels().get(remoteContextIdentity);
779         if (channel == null)
780         {
781             /*
782              * It is the responsibility of the RpcManager to only invoke the RPC
783              * when the channel is ready.
784              */
785             throw new RuntimeException("There is no channel available for "
786                 + remoteContextIdentity);
787         }
788         channel.invokeRpc(remoteContextIdentity, rpcData);
789     }
790 
791     IDistributionState getState()
792     {
793         return this.state;
794     }
795 
796     void setState(IDistributionState state)
797     {
798         this.state = state;
799     }
800 }
801 
802 /**
803  * Internal state for the {@link DistributionManager}.
804  * 
805  * @author Ramon Servadei
806  * 
807  */
808 class DistributionState extends AbstractLifeCycle implements IDistributionState
809 {
810     /**
811      * A builder for {@link RemoteUpdateInvoker} objects, keyed by their remote
812      * context identity
813      * 
814      * @author Ramon Servadei
815      */
816     final class RemoteUpdateInvokerBuilder implements
817         IObjectBuilder<String, IRemoteUpdateInvoker>
818     {
819         public IRemoteUpdateInvoker create(String key)
820         {
821             return new RemoteUpdateInvoker(key, getContext());
822         }
823     }
824 
825     /**
826      * A builder of a {@link List} of {@link Values}
827      * 
828      * @author Ramon Servadei
829      * 
830      */
831     static final class ValuesBuilder
832         implements
833         IObjectBuilder<String, Set<DualValue<ISubscriptionParameters, IEventListener>>>
834     {
835         public Set<DualValue<ISubscriptionParameters, IEventListener>> create(
836             String key)
837         {
838             return CollectionFactory.newSet(1);
839         }
840     }
841 
842     /**
843      * The channels the manager has. Uses the <a
844      * href="http://www.ibm.com/developerworks/java/library/j-jtp06197.html"
845      * >'cheap read-write lock'</a>
846      */
847     private volatile Map<String, IChannel> channels;
848 
849     private final Map<String, IConnectionParameters> discoveredConnections;
850 
851     private final ISubscriptionManager subscriptionManager;
852 
853     private final IAutoCreatingStore<String, Set<DualValue<ISubscriptionParameters, IEventListener>>> remoteSubscriptions;
854 
855     private final IReferenceCounter<String> connectedContexts;
856 
857     private final IReferenceCounter<String> CONNECTINGContexts;
858 
859     private final IFrameworkContext context;
860 
861     private final IFrameReader frameReader;
862 
863     private final IFrameWriter frameWriter;
864 
865     private ILifeCycleEventListener eventHandler;
866 
867     private IChannelFactory channelFactory;
868 
869     private IAutoCreatingStore<String, IRemoteUpdateInvoker> remoteUpdateInvokers;
870 
871     DistributionState(IFrameworkContext context)
872     {
873         super();
874         this.context = context;
875         this.discoveredConnections = CollectionFactory.newMap(2);
876         this.channels = CollectionFactory.newMap(2);
877         this.subscriptionManager =
878             new ContainerSubscriptionManager(this.context, null);
879         this.remoteSubscriptions =
880             new AutoCreatingStore<String, Set<DualValue<ISubscriptionParameters, IEventListener>>>(
881                 new ValuesBuilder());
882         this.connectedContexts = new ReferenceCounter<String>();
883         this.CONNECTINGContexts = new ReferenceCounter<String>();
884         this.frameReader = new FrameReader();
885         this.frameWriter = new FrameWriter();
886 
887         // NOTE: this must be created AFTER the context is set
888         this.remoteUpdateInvokers =
889             new AutoCreatingStore<String, IRemoteUpdateInvoker>(
890                 new RemoteUpdateInvokerBuilder());
891     }
892 
893     /**
894      * Initialise the state
895      */
896     public void init()
897     {
898         this.channelFactory = createChannelFactory();
899         this.eventHandler =
900             new MultiSystemEventListener(
901                 string(this, context.toString()),
902                 context,
903                 AbstractEventHandler.getEventHandlerMappings(createEventHandlers()));
904     }
905 
906     public IChannelFactory createChannelFactory()
907     {
908         return new ChannelFactory();
909     }
910 
911     @SuppressWarnings("unchecked")
912     AbstractEventHandler<? extends IEvent>[] createEventHandlers()
913     {
914         return new AbstractEventHandler[] { new ChannelReadyEventHandler(this),
915             new ConnectionAvailableEventHandler(this),
916             new ConnectionDestroyedEventHandler(this),
917             new RemoteContainerSubscriptionEventHandler(this),
918             new RemoteEventSourceNotObservedEventHandler(this),
919             new ContextDiscoveredEventHandler(this),
920             new ContextNotAvailableEventHandler(this), };
921     }
922 
923     protected void doDestroy()
924     {
925         this.eventHandler.destroy();
926         this.subscriptionManager.destroy();
927     }
928 
929     protected void doStart()
930     {
931         this.eventHandler.start();
932         this.subscriptionManager.start();
933     }
934 
935     public final IChannelFactory getChannelFactory()
936     {
937         return this.channelFactory;
938     }
939 
940     public final ILifeCycleEventListener getEventHandler()
941     {
942         return this.eventHandler;
943     }
944 
945     public final Map<String, IChannel> getChannels()
946     {
947         return this.channels;
948     }
949 
950     public final Map<String, IConnectionParameters> getDiscoveredContexts()
951     {
952         return this.discoveredConnections;
953     }
954 
955     public final ISubscriptionManager getSubscriptionManager()
956     {
957         return this.subscriptionManager;
958     }
959 
960     public final IAutoCreatingStore<String, Set<DualValue<ISubscriptionParameters, IEventListener>>> getRemoteSubscriptions()
961     {
962         return this.remoteSubscriptions;
963     }
964 
965     public final IReferenceCounter<String> getConnectedContexts()
966     {
967         return this.connectedContexts;
968     }
969 
970     public final IFrameworkContext getContext()
971     {
972         return this.context;
973     }
974 
975     public final IFrameReader getFrameReader()
976     {
977         return this.frameReader;
978     }
979 
980     public final IFrameWriter getFrameWriter()
981     {
982         return this.frameWriter;
983     }
984 
985     public final void setChannels(Map<String, IChannel> channels)
986     {
987         this.channels = channels;
988     }
989 
990     public final IReferenceCounter<String> getCONNECTINGContexts()
991     {
992         return this.CONNECTINGContexts;
993     }
994 
995     public IRemoteUpdateInvoker getRemoteUpdateInvoker(
996         String remoteContextIdentity)
997     {
998         return this.remoteUpdateInvokers.get(remoteContextIdentity);
999     }
1000 }
1001 
1002 /**
1003  * Base class for distribution manager event handlers
1004  * 
1005  * @author Ramon Servadei
1006  * 
1007  * @param <T>
1008  *            the type of event
1009  */
1010 abstract class DistributionEventHandler<T extends IEvent> extends
1011     AbstractEventHandler<T> implements ISystemEventListener
1012 {
1013     /** The distribution state reference */
1014     private final IDistributionState state;
1015 
1016     DistributionEventHandler(IDistributionState state)
1017     {
1018         super();
1019         nullCheck(state, "No state provided");
1020         this.state = state;
1021     }
1022 
1023     /** Get the state */
1024     IDistributionState getState()
1025     {
1026         return state;
1027     }
1028 
1029     /**
1030      * Compares the {@link IConnection} of the {@link IChannel} against the
1031      * {@link IConnectionParameters}. If the channel's connection is different
1032      * to the connection parameters, this method destroys the channel's
1033      * connection.
1034      * 
1035      * @param channel
1036      *            the channel
1037      * @param connectionParameters
1038      *            the connection parameters to compare with the channel's
1039      *            connection
1040      * @return <code>true</code> if the channel's connection parameters are
1041      *         equal to the connection parameters argument
1042      */
1043     final boolean validateChannelConnection(final IChannel channel,
1044         final IConnectionParameters connectionParameters)
1045     {
1046         // we can only verify outbound connections
1047         if (channel.getConnection().isOutbound()
1048             && (connectionParameters != null && !connectionParameters.isEqual(channel.getConnection())))
1049         {
1050             if (getLog().isInfoEnabled())
1051             {
1052                 getLog().info(
1053                     "Destroying " + safeToString(channel)
1054                         + " because remote context"
1055                         + " connection parameters have changed to "
1056                         + safeToString(connectionParameters));
1057             }
1058             /*
1059              * chances are the channel will already be destroyed...for the
1060              * connection to change and the channel to still exist can only
1061              * occur during the transition from one connection to another for
1062              * the same remote context
1063              * 
1064              * destroying the channel will cause reconnect when the connection
1065              * destroyed event is handled
1066              */
1067             channel.getConnection().destroy();
1068             return false;
1069         }
1070         return true;
1071     }
1072 }
1073 
1074 /**
1075  * Handles the {@link ContextNotAvailableEvent}s raised when a remote context is
1076  * not available anymore. Removes the context {@link IConnectionParameters} from
1077  * the discovered contexts collection and destroys the active {@link IChannel}
1078  * connection if it exists. The channel is not destroyed, only the connection -
1079  * the {@link ConnectionDestroyedEventHandler} will destroy the channel.
1080  * 
1081  * @author Ramon Servadei
1082  */
1083 class ContextNotAvailableEventHandler extends
1084     DistributionEventHandler<ContextNotAvailableEvent>
1085 {
1086     private final static AsyncLog LOG =
1087         new AsyncLog(ContextNotAvailableEventHandler.class);
1088 
1089     ContextNotAvailableEventHandler(IDistributionState state)
1090     {
1091         super(state);
1092     }
1093 
1094     /**
1095      * Removes the context {@link IConnectionParameters} from the discovered
1096      * contexts collection and destroys the active {@link IChannel} connection
1097      * if it exists. The channel is not destroyed, only the connection - the
1098      * {@link ConnectionDestroyedEventHandler} will destroy the channel.
1099      */
1100     @Override
1101     public void handle(ContextNotAvailableEvent event)
1102     {
1103         IChannel channel;
1104         // 1. remove the context from discovered contexts
1105         // 2. find any active channel and destroy it
1106         synchronized (getState())
1107         {
1108             getState().getDiscoveredContexts().remove(
1109                 event.getRemoteContextIdentity());
1110             // don't remove the channel, this is done in the
1111             // ConnectionDestroyedEventHandler
1112             channel =
1113                 getState().getChannels().get(event.getRemoteContextIdentity());
1114         }
1115         if (channel != null)
1116         {
1117             // destroy the connection, this will destroy the channel
1118             final IConnection connection = channel.getConnection();
1119             if (connection != null)
1120             {
1121                 connection.destroy();
1122             }
1123         }
1124     }
1125 
1126     @Override
1127     public AsyncLog getLog()
1128     {
1129         return LOG;
1130     }
1131 }
1132 
1133 /**
1134  * Handler for {@link ContextDiscoveredEvent} events. Saves the
1135  * {@link ContextDiscoveredEvent} and connects to the remote context if there is
1136  * a remote container subscription for the remote context. This will also
1137  * destroy any connection to the remote context if the connection parameters
1138  * have changed.
1139  * 
1140  * @author Ramon Servadei
1141  */
1142 class ContextDiscoveredEventHandler extends
1143     DistributionEventHandler<ContextDiscoveredEvent>
1144 {
1145 
1146     private final static AsyncLog LOG =
1147         new AsyncLog(ContextDiscoveredEventHandler.class);
1148 
1149     ContextDiscoveredEventHandler(IDistributionState state)
1150     {
1151         super(state);
1152     }
1153 
1154     /**
1155      * Saves the {@link ContextDiscoveredEvent} and connects to the remote
1156      * context if there is a remote container subscription for the remote
1157      * context. This will also destroy any connection to the remote context if
1158      * the connection parameters have changed.
1159      */
1160     @Override
1161     public void handle(ContextDiscoveredEvent event)
1162     {
1163         if (!getState().getContext().isActive())
1164         {
1165             if (getLog().isInfoEnabled())
1166             {
1167                 getLog().info(
1168                     "Context is not active, ignoring " + safeToString(event));
1169             }
1170             return;
1171         }
1172         final IConnectionParameters connectionParameters =
1173             event.getConnectionParameters();
1174         final String remoteContextIdentity =
1175             connectionParameters.getRemoteContextIdentity();
1176         nullCheck(remoteContextIdentity, "No remote context identity");
1177         boolean connect = false;
1178         // single lock
1179         synchronized (getState())
1180         {
1181             connect =
1182                 getState().getRemoteSubscriptions().get(remoteContextIdentity).size() > 0;
1183             if (connect)
1184             {
1185                 if (getLog().isDebugEnabled())
1186                 {
1187                     getLog().debug(
1188                         "Connect pending because there are remote subscriptions to "
1189                             + remoteContextIdentity
1190                             + CollectionUtils.toFormattedString(getState().getRemoteSubscriptions().get(
1191                                 remoteContextIdentity)));
1192                 }
1193                 // are we already connected?
1194                 connect =
1195                     getState().getConnectedContexts().getCount(
1196                         remoteContextIdentity) == 0;
1197                 if (getLog().isDebugEnabled())
1198                 {
1199                     getLog().debug(
1200                         "Connected contexts are "
1201                             + safeToString(getState().getConnectedContexts()));
1202                 }
1203                 if (connect)
1204                 {
1205                     if (getLog().isDebugEnabled())
1206                     {
1207                         getLog().debug(
1208                             "Connect still pending because there is no connection, connecting contexts are "
1209                                 + safeToString(getState().getCONNECTINGContexts()));
1210                     }
1211                     // are we already trying to connect...
1212                     connect =
1213                         getState().getCONNECTINGContexts().getCount(
1214                             remoteContextIdentity) == 0;
1215                     // if we are not trying to connect, increment a counter for
1216                     if (connect)
1217                     {
1218                         getState().getCONNECTINGContexts().adjustCount(
1219                             remoteContextIdentity, 1);
1220                     }
1221                     else
1222                     {
1223                         if (getLog().isDebugEnabled())
1224                         {
1225                             getLog().debug(
1226                                 "Already trying to connect to "
1227                                     + safeToString(event.getConnectionParameters()));
1228                         }
1229                     }
1230                 }
1231                 else
1232                 {
1233                     if (getLog().isDebugEnabled())
1234                     {
1235                         getLog().debug(
1236                             "Already connected to "
1237                                 + safeToString(event.getConnectionParameters())
1238                                 + ", current connections are: "
1239                                 + safeToString(getState().getConnectedContexts()));
1240                     }
1241                 }
1242             }
1243             getState().getDiscoveredContexts().put(remoteContextIdentity,
1244                 event.getConnectionParameters());
1245             if (getLog().isInfoEnabled())
1246             {
1247                 getLog().info(
1248                     "Discovered contexts are: "
1249                         + CollectionUtils.toFormattedString(getState().getDiscoveredContexts()));
1250             }
1251         } // synchronized block
1252 
1253         final IChannel channel =
1254             getState().getChannels().get(remoteContextIdentity);
1255         if (channel != null)
1256         {
1257             // if the connection parameters have changed, we need to re-broker
1258             // the connection and resubmit all remote subscriptions...
1259             validateChannelConnection(channel, event.getConnectionParameters());
1260         }
1261         if (connect)
1262         {
1263             try
1264             {
1265                 // execute the connect - when the connection is available, we'll
1266                 // get a ConnectionAvailableEvent
1267                 getState().getContext().getConnectionBroker().connect(
1268                     event.getConnectionParameters());
1269             }
1270             catch (RuntimeException e)
1271             {
1272                 synchronized (getState())
1273                 {
1274                     // if something goes wrong, reset the connecting count
1275                     getState().getCONNECTINGContexts().adjustCount(
1276                         remoteContextIdentity, -1);
1277                 }
1278                 throw e;
1279             }
1280         }
1281         else
1282         {
1283             if (getLog().isDebugEnabled())
1284             {
1285                 getLog().debug(
1286                     "Not connecting to (already connected or not required) "
1287                         + safeToString(event.getConnectionParameters()));
1288             }
1289         }
1290     }
1291 
1292     @Override
1293     public AsyncLog getLog()
1294     {
1295         return LOG;
1296     }
1297 }
1298 
1299 /**
1300  * Handler for {@link ConnectionAvailableEvent} events. Creates an
1301  * {@link IChannel} using the {@link IConnection} encapsulated in the event.
1302  * Also deals with the situation where two contexts have both connected to each
1303  * other; the context with the semantically greater identity will severe/close
1304  * the latest connection thus leaving only one connection between the two
1305  * contexts.
1306  * 
1307  * @author Ramon Servadei
1308  * 
1309  */
1310 final class ConnectionAvailableEventHandler extends
1311     DistributionEventHandler<ConnectionAvailableEvent>
1312 {
1313     private final static AsyncLog LOG =
1314         new AsyncLog(ConnectionAvailableEventHandler.class);
1315 
1316     ConnectionAvailableEventHandler(IDistributionState state)
1317     {
1318         super(state);
1319     }
1320 
1321     /**
1322      * Creates an {@link IChannel} using the {@link IConnection} encapsulated in
1323      * the event. Also deals with the situation where two contexts have both
1324      * connected to each other; the context with the semantically greater
1325      * identity will severe/close the latest connection thus leaving only one
1326      * connection between the two contexts.
1327      */
1328     @Override
1329     public void handle(ConnectionAvailableEvent event)
1330     {
1331         boolean destroyConnection = false;
1332         final IConnection connection = event.getConnection();
1333         synchronized (getState())
1334         {
1335             // check for a simultaneous connection
1336             final String remoteContextIdentity =
1337                 connection.getRemoteContextIdentity();
1338             if (getState().getConnectedContexts().getCount(
1339                 remoteContextIdentity) > 0)
1340             {
1341                 // the 'superior' context will close the duplicate connection
1342                 final int compareTo =
1343                     getState().getContext().getIdentity().compareTo(
1344                         connection.getRemoteContextIdentity());
1345                 if (compareTo > 0)
1346                 {
1347                     destroyConnection = true;
1348                 }
1349                 else
1350                 {
1351                     if (compareTo == 0)
1352                     {
1353                         // FT clustering requires duplicate context identities
1354                         // so we compare on the context hashcodes
1355                         if (getState().getContext().getContextHashCode() > connection.getRemoteContextHashCode())
1356                         {
1357                             destroyConnection = true;
1358                         }
1359                     }
1360                 }
1361             }
1362             /*
1363              * If the connection is outbound, check if the connection parameters
1364              * are still valid - i.e. the remote context has changed its
1365              * connection settings during the connection period - this can
1366              * happen with FT contexts switching
1367              */
1368             final IConnectionParameters connectionParameters =
1369                 getState().getDiscoveredContexts().get(
1370                     connection.getRemoteContextIdentity());
1371             if (connection.isOutbound() && connectionParameters != null
1372                 && !connectionParameters.isEqual(connection))
1373             {
1374                 destroyConnection = true;
1375                 if (getLog().isDebugEnabled())
1376                 {
1377                     getLog().debug(
1378                         "Destroying connection " + safeToString(connection)
1379                             + ", parameters have changed to "
1380                             + safeToString(connectionParameters));
1381                 }
1382             }
1383             if (!destroyConnection)
1384             {
1385                 // increment connected context count
1386                 getState().getConnectedContexts().adjustCount(
1387                     remoteContextIdentity, 1);
1388                 // we only store the channel when the ChannelReadyEvent occurs
1389                 getState().getChannelFactory().createChannel(connection,
1390                     getState().getContext()).start();
1391             }
1392             // decrement connecting contexts
1393             getState().getCONNECTINGContexts().adjustCount(
1394                 connection.getRemoteContextIdentity(), -1);
1395         }
1396         if (destroyConnection)
1397         {
1398             if (getLog().isTraceEnabled())
1399             {
1400                 getLog().trace(
1401                     "Destroying duplicate new connection "
1402                         + safeToString(connection)
1403                         + ", current connections are "
1404                         + CollectionUtils.toFormattedString(getState().getChannels()));
1405             }
1406             /*
1407              * destroy the connection, the other channel will be destroyed when
1408              * the connection is destroyed and will not have handled any
1409              * subscriptions as it will not have completed the start cycle.
1410              */
1411             connection.destroy();
1412         }
1413     }
1414 
1415     @Override
1416     public AsyncLog getLog()
1417     {
1418         return LOG;
1419     }
1420 }
1421 
1422 /**
1423  * Handler for {@link ConnectionDestroyedEvent} events. Destroys the
1424  * {@link IChannel} associated with the {@link IConnection}. If there are
1425  * pending subscriptions for the remote context then when the context is
1426  * re-discovered (when a {@link ContextDiscoveredEvent} is raised) the
1427  * subscriptions will be re-serviced.
1428  * 
1429  * @author Ramon Servadei
1430  * 
1431  */
1432 class ConnectionDestroyedEventHandler extends
1433     DistributionEventHandler<ConnectionDestroyedEvent>
1434 {
1435     private final static AsyncLog LOG =
1436         new AsyncLog(ConnectionDestroyedEventHandler.class);
1437 
1438     ConnectionDestroyedEventHandler(IDistributionState state)
1439     {
1440         super(state);
1441     }
1442 
1443     /**
1444      * Destroys the {@link IChannel} associated with the {@link IConnection}. If
1445      * there are pending subscriptions for the remote context then when the
1446      * context is re-discovered (when a {@link ContextDiscoveredEvent} is
1447      * raised) the subscriptions will be re-serviced.
1448      */
1449     @Override
1450     public void handle(ConnectionDestroyedEvent event)
1451     {
1452         // tell the connection discoverer to remove the context
1453         final IConnectionDiscoverer connectionDiscoverer =
1454             getState().getContext().getConnectionDiscoverer();
1455         if (connectionDiscoverer != null)
1456         {
1457             connectionDiscoverer.connectionDestroyed(event.getRemoteContextIdentity());
1458         }
1459         IChannel channel = null;
1460         final String remoteContextIdentity = event.getRemoteContextIdentity();
1461         synchronized (getState())
1462         {
1463             // copy-on-write
1464             final Map<String, IChannel> copy =
1465                 CollectionFactory.newMap(getState().getChannels());
1466             channel = copy.remove(remoteContextIdentity);
1467             getState().setChannels(copy);
1468             getState().getConnectedContexts().adjustCount(
1469                 remoteContextIdentity, -1);
1470         }
1471         if (channel != null)
1472         {
1473             if (getLog().isInfoEnabled())
1474             {
1475                 getLog().info("Destroying " + safeToString(channel));
1476             }
1477             // this will destroy any remote containers associated with the
1478             // remote context the channel connects to
1479             channel.destroy();
1480         }
1481         /*
1482          * Now we wait for the connection discoverer to 'rediscover' the
1483          * context. If there are pending subscriptions (some may have come in
1484          * between the unsubscribe that caused the connection to be destroyed)
1485          * then we need to re-broker the connection. This also handles the
1486          * situation where the connection parameters have changed.
1487          */
1488     }
1489 
1490     @Override
1491     public AsyncLog getLog()
1492     {
1493         return LOG;
1494     }
1495 }
1496 
1497 /**
1498  * Handler for {@link ChannelReadyEvent} events. Saves the ready
1499  * {@link IChannel} and issues any pending subscriptions for remote containers
1500  * for the remote context the channel connects to.
1501  * 
1502  * @author Ramon Servadei
1503  * 
1504  */
1505 class ChannelReadyEventHandler extends
1506     DistributionEventHandler<ChannelReadyEvent>
1507 {
1508 
1509     private final static AsyncLog LOG =
1510         new AsyncLog(ChannelReadyEventHandler.class);
1511 
1512     ChannelReadyEventHandler(IDistributionState state)
1513     {
1514         super(state);
1515     }
1516 
1517     /**
1518      * Saves the ready {@link IChannel} and issues any pending subscriptions for
1519      * remote containers for the remote context the channel connects to.
1520      */
1521     @Override
1522     public void handle(ChannelReadyEvent event)
1523     {
1524         final IChannel channel = event.getChannel();
1525 
1526         final List<DualValue<ISubscriptionParameters, IEventListener>> list;
1527         synchronized (getState())
1528         {
1529 
1530             final IConnectionParameters connectionParameters =
1531                 getState().getDiscoveredContexts().get(
1532                     channel.getRemoteContextIdentity());
1533 
1534             // if the connection parameters have changed, we need to re-broker
1535             // the connection and all remote subscriptions...
1536             if (!validateChannelConnection(channel, connectionParameters))
1537             {
1538                 return;
1539             }
1540 
1541             // copy-on-write
1542             final Map<String, IChannel> copy =
1543                 CollectionFactory.newMap(getState().getChannels());
1544             copy.put(channel.getRemoteContextIdentity(), channel);
1545             getState().setChannels(copy);
1546 
1547             // the list of values in the remote Subscriptions is not thread safe
1548             list =
1549                 CollectionFactory.newList(getState().getRemoteSubscriptions().get(
1550                     channel.getRemoteContextIdentity()));
1551         }
1552         if (list.size() > 0)
1553         {
1554             if (getLog().isInfoEnabled())
1555             {
1556                 getLog().info(
1557                     "Sending subscriptions to new channel "
1558                         + safeToString(channel) + COLON
1559                         + CollectionUtils.toFormattedString(list));
1560             }
1561             for (DualValue<ISubscriptionParameters, IEventListener> values : list)
1562             {
1563                 try
1564                 {
1565                     // spin-off onto an event so that its handled by the same
1566                     // thread - subscription can also occur from the subscribe
1567                     // method
1568                     getState().getContext().queueEvent(
1569                         new RemoteContainerSubscriptionEvent(
1570                             getState().getContext(),
1571                             channel.getRemoteContextIdentity(), values));
1572                 }
1573                 catch (Exception e)
1574                 {
1575                     logException(getLog(), values, e);
1576                 }
1577             }
1578         }
1579 
1580         // now log the channels
1581         StringBuilder sb = new StringBuilder();
1582         final IChannel[] connectedChannels =
1583             getState().getContext().getConnectedChannels();
1584         for (IChannel channel2 : connectedChannels)
1585         {
1586             sb.append(SystemUtils.LINE_SEPARATOR).append(SPACING_4_CHARS).append(
1587                 safeToString(channel2.toString()));
1588         }
1589         if (connectedChannels.length == 0)
1590         {
1591             sb.append(SystemUtils.LINE_SEPARATOR).append(SPACING_4_CHARS).append(
1592                 "<UNCONNECTED>");
1593         }
1594         if (getLog().isInfoEnabled())
1595         {
1596             getLog().info(
1597                 SystemUtils.LINE_SEPARATOR + getState().getContext()
1598                     + " connected to:" + sb);
1599         }
1600     }
1601 
1602     @Override
1603     public AsyncLog getLog()
1604     {
1605         return LOG;
1606     }
1607 }
1608 
1609 /**
1610  * Encapsulates all necessary information for a remote container subscription
1611  * into an event to queue onto the context event framework. The event will be
1612  * intercepted by the {@link DistributionManager}. This ensures subscriptions
1613  * are handled on the same thread.
1614  * 
1615  * @author Ramon Servadei
1616  */
1617 final class RemoteContainerSubscriptionEvent extends AbstractSystemEvent
1618 {
1619     /** The identity of the remote context for this subscription */
1620     final String remoteContextIdentity;
1621 
1622     /** The subscription parameters */
1623     final ISubscriptionParameters parameters;
1624 
1625     /** The listener the subscription should use */
1626     final IEventListener listener;
1627 
1628     /**
1629      * Standard constructor to encapsulate the parameters for a remote
1630      * subscription.
1631      * 
1632      * @param context
1633      *            the context for event operations
1634      * @param remoteContextIdentity
1635      *            the identity of the remote context for the subscription
1636      * @param values
1637      *            an {@link ISubscriptionParameters} and {@link IEventListener}
1638      *            instance, in that order
1639      */
1640     public RemoteContainerSubscriptionEvent(IEventManager context,
1641         String remoteContextIdentity,
1642         DualValue<ISubscriptionParameters, IEventListener> values)
1643     {
1644         super(context);
1645         this.remoteContextIdentity = remoteContextIdentity;
1646         this.parameters = values.getFirst();
1647         this.listener = values.getSecond();
1648     }
1649 
1650     /**
1651      * The identity of the remote context for this subscription
1652      * 
1653      * @return the remote context identity
1654      */
1655     public String getRemoteContextIdentity()
1656     {
1657         return remoteContextIdentity;
1658     }
1659 
1660     /**
1661      * Get the subscription parameters
1662      * 
1663      * @return the subscription parameters
1664      */
1665     public ISubscriptionParameters getParameters()
1666     {
1667         return parameters;
1668     }
1669 
1670     /**
1671      * Get the listener the subscription should use
1672      * 
1673      * @return the listener the subscription should use
1674      */
1675     public IEventListener getListener()
1676     {
1677         return listener;
1678     }
1679 
1680     protected String getAdditionalToString()
1681     {
1682         return "subcriptionParameters=" + getParameters();
1683     }
1684 }
1685 
1686 /**
1687  * Handler for {@link RemoteContainerSubscriptionEvent} events. Locates the
1688  * appropriate {@link IChannel} and issues the remote container subscription
1689  * encapsulated in the event.
1690  * 
1691  * @author Ramon Servadei
1692  */
1693 final class RemoteContainerSubscriptionEventHandler extends
1694     DistributionEventHandler<RemoteContainerSubscriptionEvent>
1695 {
1696     private final static AsyncLog LOG =
1697         new AsyncLog(RemoteContainerSubscriptionEventHandler.class);
1698 
1699     RemoteContainerSubscriptionEventHandler(IDistributionState state)
1700     {
1701         super(state);
1702     }
1703 
1704     /**
1705      * Locates the appropriate {@link IChannel} and issues the remote container
1706      * subscription encapsulated in the event.
1707      */
1708     @Override
1709     public void handle(RemoteContainerSubscriptionEvent event)
1710     {
1711         final IChannel channel =
1712             getState().getChannels().get(event.getRemoteContextIdentity());
1713         if (channel != null)
1714         {
1715             // subscribe is idempotent
1716             channel.subscribe(event.getParameters());
1717             // add the listener to the subscription
1718             channel.addListener(event.getParameters(), event.getListener());
1719         }
1720     }
1721 
1722     @Override
1723     public AsyncLog getLog()
1724     {
1725         return LOG;
1726     }
1727 }
1728 
1729 /**
1730  * Handler for {@link EventSourceNotObservedEvent} events. Removes remote
1731  * {@link IContainer} instances from the context. This cleans up the process
1732  * space when a remote {@link IContainer} has no more listeners (it will no
1733  * longer be subscribed for). The {@link EventSourceNotObservedEvent} is
1734  * actually raised by the {@link IContainer} itself when it has no more
1735  * {@link IEventListener} instances attached to it.
1736  * 
1737  * @author Ramon Servadei
1738  */
1739 final class RemoteEventSourceNotObservedEventHandler extends
1740     DistributionEventHandler<EventSourceNotObservedEvent>
1741 {
1742     private final static AsyncLog LOG =
1743         new AsyncLog(RemoteEventSourceNotObservedEventHandler.class);
1744 
1745     RemoteEventSourceNotObservedEventHandler(IDistributionState state)
1746     {
1747         super(state);
1748     }
1749 
1750     /**
1751      * Removes remote {@link IContainer} instances from the context.
1752      */
1753     @Override
1754     public void handle(EventSourceNotObservedEvent event)
1755     {
1756         final IFrameworkContext context = getState().getContext();
1757         if (context.containsRemoteContainer(event.getNativeContextIdentity(),
1758             event.getIdentity(), event.getType(), event.getDomain()))
1759         {
1760             final IContainer remoteContainer =
1761                 context.getRemoteContainer(event.getNativeContextIdentity(),
1762                     event.getIdentity(), event.getType(), event.getDomain());
1763             // only remove the container if it is a remote one
1764             if (!remoteContainer.isLocal())
1765             {
1766                 if (getLog().isDebugEnabled())
1767                 {
1768                     getLog().debug(
1769                         "Removing " + remoteContainer.toIdentityString());
1770                 }
1771                 context.removeContainer(remoteContainer);
1772             }
1773         }
1774     }
1775 
1776     @Override
1777     protected AsyncLog getLog()
1778     {
1779         return LOG;
1780     }
1781 }