View Javadoc

1   /*
2      Copyright 2008 Ramon Servadei
3   
4      Licensed under the Apache License, Version 2.0 (the "License");
5      you may not use this file except in compliance with the License.
6      You may obtain a copy of the License at
7   
8          http://www.apache.org/licenses/LICENSE-2.0
9   
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15   */
16  package fulmine.context;
17  
18  import static fulmine.util.Utils.CLOSE_BRACE;
19  import static fulmine.util.Utils.COLON;
20  import static fulmine.util.Utils.OPEN_BRACE;
21  import static fulmine.util.Utils.SPACE;
22  import static fulmine.util.Utils.logException;
23  import static fulmine.util.Utils.nullCheck;
24  
25  import java.net.InetAddress;
26  import java.net.UnknownHostException;
27  import java.util.Collection;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Set;
31  import java.util.TimerTask;
32  
33  import org.apache.commons.lang.SystemUtils;
34  
35  import fulmine.AbstractLifeCycle;
36  import fulmine.Domain;
37  import fulmine.IDomain;
38  import fulmine.ILifeCycle;
39  import fulmine.IType;
40  import fulmine.Type;
41  import fulmine.distribution.IDistributionManager;
42  import fulmine.distribution.IHeartbeatMonitor;
43  import fulmine.distribution.channel.IChannel;
44  import fulmine.distribution.connection.IConnectionBroker;
45  import fulmine.distribution.connection.IConnectionDiscoverer;
46  import fulmine.distribution.connection.IConnectionParameters;
47  import fulmine.distribution.events.ContextDiscoveredEvent;
48  import fulmine.event.IEvent;
49  import fulmine.event.listener.AbstractEventHandler;
50  import fulmine.event.listener.IEventListener;
51  import fulmine.event.listener.MultiSystemEventListener;
52  import fulmine.event.subscription.ISubscriptionListener;
53  import fulmine.event.system.ISystemEvent;
54  import fulmine.event.system.ISystemEventListener;
55  import fulmine.event.system.ISystemEventSource;
56  import fulmine.model.container.IContainer;
57  import fulmine.model.container.IContainerFactory;
58  import fulmine.model.container.events.ContainerStateChangeEvent;
59  import fulmine.model.container.impl.Record;
60  import fulmine.model.field.BooleanField;
61  import fulmine.model.field.IField;
62  import fulmine.model.field.IntegerField;
63  import fulmine.model.field.LongField;
64  import fulmine.protocol.specification.IFrameReader;
65  import fulmine.protocol.specification.IFrameWriter;
66  import fulmine.rpc.IRpcDefinition;
67  import fulmine.rpc.IRpcHandler;
68  import fulmine.rpc.IRpcMarker;
69  import fulmine.rpc.IRpcPublicationListener;
70  import fulmine.rpc.IRpcResult;
71  import fulmine.rpc.IRpcResultHandler;
72  import fulmine.util.collection.CollectionFactory;
73  import fulmine.util.collection.CollectionUtils;
74  import fulmine.util.log.AsyncLog;
75  
76  /**
77   * A context that exists within a cluster of peer contexts. These all exist to
78   * provide fault tolerance for a single logical application context identity. An
79   * FT cluster is identified by a 'cluster identity'. Each FT context within the
80   * cluster has the same logical application context identity but a unique FT
81   * context identity. The FT context identity always starts with the cluster
82   * identity. Only one FT context within the cluster will be 'active'. This
83   * active context is the context servicing all remote subscriptions for the
84   * logical application context. The other contexts will be in a standby mode.
85   * <p>
86   * The FT context actually hosts a second context that is the application
87   * context; both the FT context and application contexts are separate
88   * {@link IFrameworkContext} instances. The FT context has a separate network
89   * for its FT activities. When the FT context determines it should be active, it
90   * activates the application context; the application context is always started
91   * at the same time as the FT context but has its network set to 'listening
92   * mode' ({@link INetwork#setListeningOnlyMode(boolean)}). When the application
93   * context is activated, its network {@link IConnectionDiscoverer} is also
94   * activated ({@link IConnectionDiscoverer#enablePulsing()}); this will then
95   * cause the application context to be discovered by other application contexts.
96   * Should the FT context go into standby mode after this, the application
97   * context will have its network {@link IConnectionDiscoverer} disabled (
98   * {@link IConnectionDiscoverer#disablePulsing()}).
99   * <h2>Fault tolerance algorithm</h2>
100  * Each context is constructed with a fault tolerant number. This indicates the
101  * ordering, relative to the other contexts in the FT cluster, for becoming
102  * active. The context with the lowest FT number in the cluster is the active
103  * context, unless there is already an active context in the cluster. This
104  * numbering also provides the mechanism for all contexts to organise themselves
105  * on startup so that there is only 1 active context.
106  * <p>
107  * A context has 4 states;
108  * <ol>
109  * <li>Startup - The initial state of the context.
110  * <li>Provisional active - the period when a context marks itself as
111  * 'provisionally active' and awaits any conflicts. This phase is used to
112  * marshal conflicts between contexts and ensure only 1 progresses to the active
113  * state.
114  * <li>Active - the period when the context becomes the active context in the
115  * cluster. This phase is when the context can start servicing remote
116  * subscriptions for the cluster as a logical application context.
117  * <li>Standby - the context is in a waiting state, prepared for if the current
118  * active context leaves the cluster. The standby state is a 'warm' state.
119  * </ol>
120  * <h3>Startup</h3>
121  * On startup, an FT context uses its FT network {@link IConnectionDiscoverer}
122  * to find any other FT contexts within the same cluster. When an FT context
123  * finds another within the cluster, each subscribes for a 'context info' record
124  * from the other. This record contains the following attributes:
125  * <ul>
126  * <li>FT canonical ID (string)
127  * <li>FT number (int)
128  * <li>start time (long)
129  * <li>provisionally active (boolean)
130  * <li>active (boolean)
131  * </ul>
132  * If the context finds any other active context, it switches to standby.
133  * Otherwise, it updates its context info record to indicate it is provisionally
134  * active and waits for a period of time to allow discovery of other contexts.
135  * If no contexts are found it switches to the provisional active state.
136  * <h3>Provisional active</h3>
137  * In this state it checks that there are no other active FT contexts and if
138  * none are found, it switches to the active state. Should there be more than
139  * one provisionally active FT context discovered in this state, a conflict
140  * resolution protocol ensues;
141  * <ol>
142  * <li>The FT context with the lesser context number assumes priority, the
143  * greater numbered context should yield to the lesser one.
144  * <li>Should both contexts have the same number, then the context with the
145  * greater FT canonical ID (as per string comparison) assumes active state, the
146  * other will yield and go into stand-by mode.
147  * </ol>
148  * <h3>Active</h3>
149  * Once an FT context has confirmed it is the active context for the cluster, it
150  * activates its hosted application contexts {@link IConnectionDiscoverer} that
151  * will broadcast the identity of the logical application context to other
152  * contexts (outside the FT cluster).
153  * <h3>Loss of active FT context</h3>
154  * If the active FT context is lost, all remaining FT contexts compare their
155  * number with each other. The lowest becomes the new active FT context and
156  * enters the provisional active state and following that (assuming no conflicts
157  * arise) the active state.
158  * <p>
159  * Application code can supply an {@link IFTContextStateListener} to react to
160  * whether the FT context is active or stand-by.
161  * <h2>FT standby scope</h2>
162  * An FT context only has a standby scope of 'warm'. In this state, the FT
163  * context is running but does not replicate the state of the current active FT
164  * context. If a standby FT context becomes active it will need to perform all
165  * necessary work to create the expected records to service any existing remote
166  * subscriptions (the subscriptions will be received when the context broadcasts
167  * its logical context identity).
168  * <p>
169  * From a 'client' perspective, whenever an FT context takes over as being the
170  * active context in a cluster, the client will experience a network interrupt.
171  * When the new FT context is re-discovered, the client will re-connect and
172  * re-subscribe for the containers.
173  * 
174  * @author Ramon Servadei
175  */
176 public class FTContext extends AbstractLifeCycle implements IFrameworkContext
177 {
178     final static AsyncLog LOG = new AsyncLog(FTContext.class);
179 
180     /** Delimiter used within the FT context identity */
181     private static final String DELIMITER = COLON;
182 
183     /**
184      * The delay to wait between start and provisional activation. This provides
185      * time for other FT context instances to be discovered prior to entering
186      * the provisional activation state.
187      */
188     private final long activationDelay;
189 
190     /** The prefix for all FT context identities */
191     static final String FT_ID_PREFIX = "FTContext:";
192 
193     /**
194      * The field name in a context info record for the active flag; the flag
195      * that indicates if the FT context is the active FT context in the cluster.
196      * This is set during the confirmed phase.
197      */
198     static final String CONFIRMED_ACTIVE_FIELD = "confirmedActive";
199 
200     /**
201      * The field name in a context info record for the provisional active flag.
202      * This is set during the activation phase.
203      */
204     static final String PROVISIONAL_ACTIVE_FIELD = "provisionalActive";
205 
206     /**
207      * The field name in a context info record for the FT number within the FT
208      * cluster.
209      */
210     static final String FT_NUMBER_FIELD = "FTNumber";
211 
212     /**
213      * The field name in the context info record for when the FT context was
214      * started.
215      */
216     static final String START_TIME_FIELD = "startTime";
217 
218     /** The number of heartbeats that the FTContext activation period should be. */
219     public static final long HEARTBEAT_MULTIPLIER = 4;
220 
221     /**
222      * Responds to other discovered FT contexts within the cluster and registers
223      * a listener for the context info record of the discovered peer FT context.
224      * 
225      * @author Ramon Servadei
226      */
227     private final class ContextDiscoveredEventHandler extends
228         AbstractEventHandler<ContextDiscoveredEvent> implements
229         ISystemEventListener
230     {
231         @Override
232         protected AsyncLog getLog()
233         {
234             return LOG;
235         }
236 
237         @Override
238         public void handle(ContextDiscoveredEvent event)
239         {
240             IConnectionParameters params = event.getConnectionParameters();
241             if (params.getRemoteContextIdentity().startsWith(
242                 getClusterIdentity()))
243             {
244                 // FT context peer located, subscribe for its info record -
245                 // the record name is the same as the FT context ID
246                 FTContext.this.getFtWorkerContext().subscribe(
247                     params.getRemoteContextIdentity(),
248                     params.getRemoteContextIdentity(), Type.SYSTEM,
249                     Domain.FRAMEWORK, FTContext.this.getFtLogicHandler());
250             }
251         }
252     }
253 
254     /**
255      * Handles changes to the context info record of other peer FT contexts
256      * within the cluster.
257      * 
258      * @author Ramon Servadei
259      */
260     private final class ContextInfoHandler extends AbstractEventHandler<Record>
261     {
262         @Override
263         protected AsyncLog getLog()
264         {
265             return LOG;
266         }
267 
268         @Override
269         public void handle(Record event)
270         {
271             // should be a context info record
272             if (event.getIdentity().startsWith(getClusterIdentity()))
273             {
274                 synchronized (FTContext.this.contextInfos)
275                 {
276                     /*
277                      * The event is a CLONE so remove its 'equal' and add this
278                      * cloned instance - a HashSet will not replace an equal
279                      * object.
280                      */
281                     FTContext.this.contextInfos.remove(event);
282                     FTContext.this.contextInfos.add(event);
283                 }
284                 FTContext.this.getStateManager().revalidate();
285             }
286         }
287     }
288 
289     /**
290      * Handles changes to the state of the context info record of other FT
291      * contexts. This state change can indicate the loss of the FT context.
292      * 
293      * @author Ramon Servadei
294      */
295     private final class ContextInfoStateChangeHandler extends
296         AbstractEventHandler<ContainerStateChangeEvent>
297     {
298 
299         @Override
300         protected AsyncLog getLog()
301         {
302             return LOG;
303         }
304 
305         @Override
306         public void handle(ContainerStateChangeEvent event)
307         {
308 
309             final IContainer container = event.getContainer();
310             handleContextInfo(container);
311         }
312 
313         private void handleContextInfo(final IContainer container)
314         {
315             boolean run = false;
316             // need to check if this is one of the other FT contexts
317             synchronized (FTContext.this.contextInfos)
318             {
319                 if (FTContext.this.contextInfos.contains(container))
320                 {
321                     // due to cloning, remove the 'equal' and add the new
322                     // instance
323                     FTContext.this.contextInfos.remove(container);
324                     FTContext.this.contextInfos.add(container);
325                     run = true;
326                 }
327             }
328             if (run)
329             {
330                 FTContext.this.getStateManager().revalidate();
331             }
332         }
333 
334     }
335 
336     /**
337      * Interface for a state of the {@link FTContext}. This provides the logic
338      * for validating the current state against existing conditions and for
339      * transitioning to the next state when necessary.
340      * 
341      * @author Ramon Servadei
342      */
343     private interface StateLogic extends Runnable
344     {
345         /**
346          * Verify that all conditions are valid for the state. If they are not,
347          * switch to the next valid state.
348          */
349         void validate();
350     }
351 
352     /**
353      * Base class for one of the states of the context. A state is responsible
354      * for invoking the {@link FTContext#setFTContextState(String, boolean)}
355      * state that sets the state of the context info record.
356      * <p>
357      * This is <b>always</b> executed by the {@link FTContext#execute(Runnable)}
358      * method so is inherently thread-safe.
359      * 
360      * @author Ramon Servadei
361      */
362     private abstract class AbstractStateLogic implements StateLogic
363     {
364         public final void run()
365         {
366             if (FTContext.this.activationPeriodActive)
367             {
368                 if (LOG.isDebugEnabled())
369                 {
370                     LOG.debug(FTContext.this
371                         + " in activation period, no state validation will occur.");
372                 }
373                 return;
374             }
375             synchronized (FTContext.this.contextInfos)
376             {
377                 if (LOG.isDebugEnabled())
378                 {
379                     LOG.debug(FTContext.this + " start");
380                 }
381                 List<IContainer> confirmedActiveContexts =
382                     CollectionFactory.newList();
383                 List<IContainer> provisionalActiveContexts =
384                     CollectionFactory.newList();
385                 getContexts(confirmedActiveContexts, provisionalActiveContexts);
386                 doValidate(confirmedActiveContexts, provisionalActiveContexts);
387                 if (LOG.isDebugEnabled())
388                 {
389                     LOG.debug(FTContext.this + " finish");
390                 }
391             }
392         }
393 
394         public final void validate()
395         {
396             // queue a command to run the current state
397             FTContext.this.execute(this);
398         }
399 
400         /**
401          * Find the provisionally active and confirmed active contexts. Add them
402          * to the relevant list argument.
403          * 
404          * @param confirmedActiveContexts
405          *            the list of confirmed active contexts to populate.
406          * @param provisionalActiveContexts
407          *            the list of provisionally active contexts to populate.
408          */
409         private void getContexts(List<IContainer> confirmedActiveContexts,
410             List<IContainer> provisionalActiveContexts)
411         {
412             synchronized (FTContext.this.contextInfos)
413             {
414                 FTContext.this.logFTContexts();
415                 // find any other contexts that are active
416                 for (IContainer contextInfo : FTContext.this.contextInfos)
417                 {
418                     if (contextInfo.getDataState() == IContainer.DataState.LIVE)
419                     {
420                         if (contextInfo.getBooleanField(
421                             FTContext.CONFIRMED_ACTIVE_FIELD).get())
422                         {
423                             confirmedActiveContexts.add(contextInfo);
424                         }
425                         else
426                         {
427                             if (contextInfo.getBooleanField(
428                                 FTContext.PROVISIONAL_ACTIVE_FIELD).get())
429                             {
430                                 provisionalActiveContexts.add(contextInfo);
431                             }
432                         }
433                     }
434                 }
435             }
436             logActiveContexts(confirmedActiveContexts, "confirmed");
437             logActiveContexts(provisionalActiveContexts, "provisional");
438         }
439 
440         /**
441          * Helper that compares this FTContext against those in the collection.
442          * 
443          * @param otherContexts
444          *            the contexts to compare
445          * @return <code>true</code> if this FTContext takes priority over all
446          *         in the collection, <code>false</code> otherwise
447          */
448         boolean checkForPriority(List<IContainer> otherContexts)
449         {
450             for (IContainer record : otherContexts)
451             {
452                 final int otherFTNumber =
453                     record.getIntegerField(FT_NUMBER_FIELD).get();
454                 if (otherFTNumber < FTContext.this.ftNumber)
455                 {
456                     return false;
457                 }
458                 // for duplicate cluster numbers, the one with the
459                 // greater cluster context ID should be active
460                 if (otherFTNumber == FTContext.this.ftNumber)
461                 {
462                     if (record.getIdentity().compareTo(
463                         FTContext.this.getClusterIdentity()) > 0)
464                     {
465                         return false;
466                     }
467                 }
468             }
469             return true;
470         }
471 
472         /**
473          * Helper method to log active contexts
474          * 
475          * @param contexts
476          * @param type
477          */
478         void logActiveContexts(List<IContainer> contexts, String type)
479         {
480             if (LOG.isDebugEnabled())
481             {
482                 if (contexts.size() > 0)
483                 {
484                     LOG.debug(FTContext.this + " found other " + type
485                         + " active FTContexts: "
486                         + CollectionUtils.toFormattedString(contexts));
487                 }
488                 else
489                 {
490                     LOG.debug(FTContext.this + " found no other " + type
491                         + " active FTContexts.");
492                 }
493             }
494         }
495 
496         @Override
497         public String toString()
498         {
499             return getClass().getSimpleName();
500         }
501 
502         /**
503          * Verify that all conditions are valid for the state. If they are not,
504          * switch to the next valid state.
505          * <p>
506          * <b>This is executed in a thread-safe context.</b>
507          * 
508          * @param confirmedActiveContexts
509          *            the confirmed active FTContext instances
510          * @param provisionalActiveContexts
511          *            the provisional active FTContext instances
512          */
513         abstract void doValidate(List<IContainer> confirmedActiveContexts,
514             List<IContainer> provisionalActiveContexts);
515     }
516 
517     /**
518      * The standby state of the context. May transition to the
519      * {@link ProvisionalActive} state.
520      * 
521      * @author Ramon Servadei
522      */
523     private final class Standby extends AbstractStateLogic
524     {
525         Standby()
526         {
527             FTContext.this.getStateListener().willStandby();
528             FTContext.this.setFTContextState(FTContext.CONFIRMED_ACTIVE_FIELD,
529                 false);
530             logState();
531             logFTContexts();
532             FTContext.this.standby();
533             FTContext.this.getStateListener().didStandby();
534         }
535 
536         /**
537          * Validate that the state should remain in standby. If not, transitions
538          * to {@link ProvisionalActive}.
539          */
540         void doValidate(List<IContainer> confirmedActiveContexts,
541             List<IContainer> provisionalActiveContexts)
542         {
543             boolean shouldBeProvisionallyActive = true;
544             // if there are confirmed active contexts, then abort
545             if (confirmedActiveContexts.size() > 0)
546             {
547                 // need to continue to set the active context
548                 shouldBeProvisionallyActive = false;
549             }
550             else
551             {
552                 // check against provisional
553                 shouldBeProvisionallyActive =
554                     checkForPriority(provisionalActiveContexts);
555             }
556             FTContext.this.logStateChange(shouldBeProvisionallyActive,
557                 "provisional");
558             if (shouldBeProvisionallyActive)
559             {
560                 FTContext.this.getStateManager().setState(
561                     StateEnum.ProvisionalActive, FTContext.this);
562             }
563             else
564             {
565                 if (LOG.isDebugEnabled())
566                 {
567                     LOG.debug(FTContext.this + " remaining in state " + this);
568                 }
569                 // if there is only 1 known active
570                 if (confirmedActiveContexts.size() == 1)
571                 {
572                     // if there has been a change in active FT context
573                     if (!confirmedActiveContexts.get(0).getIdentity().equals(
574                         FTContext.this.activeFTContextIdentity))
575                     {
576                         if (LOG.isInfoEnabled())
577                         {
578                             LOG.info(FTContext.this
579                                 + ": the active FTContext has changed from "
580                                 + FTContext.this.activeFTContextIdentity
581                                 + " to "
582                                 + confirmedActiveContexts.get(0).getIdentity());
583                         }
584                         // save the current active FT context identity
585                         FTContext.this.activeFTContextIdentity =
586                             confirmedActiveContexts.get(0).getIdentity();
587                     }
588                 }
589                 else
590                 {
591                     if (LOG.isInfoEnabled())
592                     {
593                         LOG.info(FTContext.this
594                             + " found "
595                             + (confirmedActiveContexts.size() == 0 ? "zero"
596                                 : "multiple")
597                             + " confirmed active FTContexts,"
598                             + " setting current 'known' active"
599                             + " FTContext to null."
600                             + CollectionUtils.toFormattedString(confirmedActiveContexts));
601                     }
602                     // we may not know the true active FTContext
603                     FTContext.this.activeFTContextIdentity = null;
604                 }
605             }
606         }
607     }
608 
609     /**
610      * Encapsulates the provisional active state of the FT context. Transitions
611      * to the {@link Confirmed} state or the {@link Standby} state.
612      * <p>
613      * This state resolves conflicts when there are multiple provisional active
614      * FT contexts within the cluster. If this FT context is the only
615      * provisional active context in the cluster, transitions to the the
616      * {@link Confirmed} state
617      * 
618      * @author Ramon Servadei
619      */
620     private final class ProvisionalActive extends AbstractStateLogic
621     {
622 
623         ProvisionalActive()
624         {
625             FTContext.this.setFTContextState(
626                 FTContext.PROVISIONAL_ACTIVE_FIELD, true);
627         }
628 
629         @Override
630         void doValidate(List<IContainer> confirmedActiveContexts,
631             List<IContainer> provisionalActiveContexts)
632         {
633             boolean shouldBeConfirmedActive = true;
634             // if there are confirmed active contexts, then abort
635             if (confirmedActiveContexts.size() > 0)
636             {
637                 if (LOG.isInfoEnabled())
638                 {
639                     LOG.info(FTContext.this
640                         + " found other confirmed active contexts,"
641                         + " switching to standby state.");
642                 }
643                 FTContext.this.getStateManager().setState(StateEnum.Standby,
644                     FTContext.this);
645             }
646             else
647             {
648                 // check against provisional
649                 shouldBeConfirmedActive =
650                     checkForPriority(provisionalActiveContexts);
651             }
652             FTContext.this.logStateChange(shouldBeConfirmedActive, "confirmed");
653             if (shouldBeConfirmedActive)
654             {
655                 if (confirmedActiveContexts.size() == 0)
656                 {
657                     // no other active contexts
658                     FTContext.this.getStateManager().setState(
659                         StateEnum.Confirmed, FTContext.this);
660                 }
661                 else
662                 {
663                     // there are multiple confirmed active contexts...
664                     if (LOG.isInfoEnabled())
665                     {
666                         LOG.info(this
667                             + " waiting for other confirmed active"
668                             + " FTContexts to become inactive: "
669                             + CollectionUtils.toFormattedString(confirmedActiveContexts));
670                     }
671                 }
672             }
673             else
674             {
675                 FTContext.this.getStateManager().setState(StateEnum.Standby,
676                     FTContext.this);
677             }
678         }
679     }
680 
681     /**
682      * The startup state for the context. Transitions to the
683      * {@link ProvisionalActive} or {@link Standby} state.
684      * <p>
685      * This will either leave the context in stand-by mode if there is already a
686      * confirmed active context in the cluster or transition to the provisional
687      * active state .
688      * 
689      * @author Ramon Servadei
690      */
691     private final class Startup extends AbstractStateLogic
692     {
693         Startup()
694         {
695             FTContext.this.setFTContextState(PROVISIONAL_ACTIVE_FIELD, false);
696         }
697 
698         @Override
699         void doValidate(List<IContainer> confirmedActiveContexts,
700             List<IContainer> provisionalActiveContexts)
701         {
702             FTContext.this.activeFTContextIdentity = null;
703             boolean shouldBeProvisionalActive = true;
704             IContainer currentActiveContext = null;
705             FTContext.this.logFTContexts();
706             for (IContainer contextInfo : FTContext.this.contextInfos)
707             {
708                 if (contextInfo.getDataState() == IContainer.DataState.LIVE)
709                 {
710                     if (contextInfo.getBooleanField(
711                         FTContext.CONFIRMED_ACTIVE_FIELD).get())
712                     {
713                         currentActiveContext = contextInfo;
714                     }
715                     if (contextInfo.getIntegerField(FTContext.FT_NUMBER_FIELD).get() < FTContext.this.ftNumber)
716                     {
717                         shouldBeProvisionalActive = false;
718                     }
719                 }
720             }
721             FTContext.this.logStateChange(shouldBeProvisionalActive,
722                 "provisional");
723             if (currentActiveContext == null && shouldBeProvisionalActive)
724             {
725                 if (!FTContext.this.isFTContextActive())
726                 {
727                     // mark provisional active, let others know...
728                     // technically we're not in the ProvisionalState until the
729                     // activation period expires
730                     FTContext.this.setFTContextState(
731                         FTContext.PROVISIONAL_ACTIVE_FIELD, true);
732                     FTContext.this.activationPeriodActive = true;
733                     // create a new thread because this blocks for the
734                     // activation delay
735                     new Thread(new Runnable()
736                     {
737                         public void run()
738                         {
739                             synchronized (this)
740                             {
741                                 if (LOG.isInfoEnabled())
742                                 {
743                                     LOG.info(FTContext.this + " waiting for "
744                                         + FTContext.this.activationDelay
745                                         + "ms for other FTContexts"
746                                         + " to be discovered.");
747                                 }
748                                 try
749                                 {
750                                     this.wait(FTContext.this.activationDelay);
751                                 }
752                                 catch (InterruptedException e)
753                                 {
754                                     logException(LOG, getFtContextIdentity(), e);
755                                 }
756                             }
757                             FTContext.this.activationPeriodActive = false;
758                             // transition to the provisional active state
759                             FTContext.this.getStateManager().setState(
760                                 StateEnum.ProvisionalActive, FTContext.this);
761                         }
762 
763                     }, "Provisional activation thread for " + FTContext.this).start();
764                 }
765             }
766             else
767             {
768                 // transition to the standby state
769                 FTContext.this.getStateManager().setState(StateEnum.Standby,
770                     FTContext.this);
771             }
772         }
773     }
774 
775     /**
776      * Starts the standard context discoverer and connection broker and notifies
777      * the {@link IFTContextStateListener} that this FT context is now active.
778      * This is idempotent; if the FT context is already active, validating this
779      * multiple times has no effect.
780      * 
781      * @author Ramon Servadei
782      */
783     private final class Confirmed extends AbstractStateLogic
784     {
785         Confirmed()
786         {
787             FTContext.this.getStateListener().willActivate();
788             setFTContextState(CONFIRMED_ACTIVE_FIELD, true);
789             logState();
790             logFTContexts();
791             FTContext.this.activeFTContextIdentity =
792                 FTContext.this.getFtContextIdentity();
793             FTContext.this.activate();
794             FTContext.this.getStateListener().didActivate();
795         }
796 
797         @Override
798         void doValidate(List<IContainer> confirmedActiveContexts,
799             List<IContainer> provisionalActiveContexts)
800         {
801             // the confirmedActiveContexts will not have this FT context so
802             // should be 0 size but if it is not, check if this FT context
803             // does have priority
804             if (!checkForPriority(confirmedActiveContexts))
805             {
806                 if (LOG.isInfoEnabled())
807                 {
808                     LOG.info(this
809                         + " found other confirmed active"
810                         + " FTContexts so restarting activation sequence: "
811                         + CollectionUtils.toFormattedString(confirmedActiveContexts));
812                 }
813                 // transition to the startup state
814                 FTContext.this.getStateManager().setState(StateEnum.Startup,
815                     FTContext.this);
816             }
817             else
818             {
819                 if (LOG.isInfoEnabled())
820                 {
821                     LOG.info(FTContext.this
822                         + " should be the confirmed active versus others: "
823                         + CollectionUtils.toFormattedString(confirmedActiveContexts));
824                 }
825             }
826         }
827     }
828 
829     /**
830      * Manages the state transitions for the FT context. This class is thread
831      * safe.
832      * 
833      * @author Ramon Servadei
834      */
835     private final class StateManager
836     {
837         /** The current state */
838         StateEnum currentState;
839 
840         /** The state logic for the current state */
841         private FTContext.StateLogic stateLogic;
842 
843         /**
844          * Set the state for the state manager.
845          * <p>
846          * <b>This is thread safe.</b>
847          * 
848          * @param newState
849          *            the new state for the manager
850          * @param context
851          *            the FT context
852          */
853         void setState(StateEnum newState, FTContext context)
854         {
855             boolean stateChanged = false;
856             synchronized (FTContext.this.contextInfos)
857             {
858                 if (newState != this.currentState)
859                 {
860                     if (FTContext.LOG.isDebugEnabled())
861                     {
862                         FTContext.LOG.debug(context + " switching from "
863                             + this.currentState + " to " + newState);
864                     }
865                     this.currentState = newState;
866                     stateChanged = true;
867                 }
868             }
869             if (stateChanged)
870             {
871                 // create the logic class outside of the synchronized block
872                 final FTContext.StateLogic createdState =
873                     this.currentState.create(context);
874                 synchronized (FTContext.this.contextInfos)
875                 {
876                     this.stateLogic = createdState;
877                     revalidate();
878                 }
879             }
880         }
881 
882         /**
883          * Re-validate the current state
884          * <p>
885          * <b>This is thread safe.</b>
886          */
887         void revalidate()
888         {
889             synchronized (FTContext.this.contextInfos)
890             {
891                 // only if we are not in the activation period...
892                 if (this.stateLogic != null)
893                 {
894                     this.stateLogic.validate();
895                 }
896             }
897         }
898     }
899 
900     /**
901      * Enumeration for {@link StateEnum} instances
902      * 
903      * @author Ramon Servadei
904      */
905     private enum StateEnum
906     {
907         Startup
908         {
909             public FTContext.StateLogic create(FTContext context)
910             {
911                 return context.new Startup();
912             }
913         },
914         Standby
915         {
916             public FTContext.StateLogic create(FTContext context)
917             {
918                 return context.new Standby();
919             }
920         },
921         ProvisionalActive
922         {
923             public FTContext.StateLogic create(FTContext context)
924             {
925                 return context.new ProvisionalActive();
926             }
927         },
928         Confirmed
929         {
930             public FTContext.StateLogic create(FTContext context)
931             {
932                 return context.new Confirmed();
933             }
934         };
935         /** Create the {@link StateLogic} that validates the state */
936         public abstract FTContext.StateLogic create(FTContext context);
937     }
938 
939     /** The application context that this FT contexts hosts */
940     private final IFrameworkContext appContext;
941 
942     /**
943      * The context that manages the FT behaviour. This connects to the FT
944      * network and negotiates with any other FT contexts on that network to
945      * determine if this is the active one. If it is the active one, it will
946      * activate its application context.
947      */
948     private final IFrameworkContext ftWorkerContext;
949 
950     /** The listener to react to state changes of the clustered context */
951     private final IFTContextStateListener stateListener;
952 
953     /**
954      * A flag to indicate if the activation period is in effect. During this
955      * time, no state processing occurs.
956      */
957     private volatile boolean activationPeriodActive;
958 
959     /** The state manager */
960     private final StateManager stateManager;
961 
962     /**
963      * The listener to handle events that influence whether this FT context is
964      * active or standby in the cluster.
965      */
966     private final MultiSystemEventListener ftLogicHandler;
967 
968     /**
969      * The unique identity of this FT context within the cluster
970      * "{clusterIdentity}:{FT number}:{host IP}:{object identityHash}:{time of
971      * creation in nanoseconds}"
972      */
973     private final String ftContextIdentity;
974 
975     /**
976      * The FT number of this FT context within the cluster; determines the
977      * priority for becoming active relative to the other peer contexts within
978      * the cluster.
979      */
980     private final int ftNumber;
981 
982     /**
983      * The context info records of all other FT contexts within the cluster.
984      * Also doubles up as the object lock.
985      */
986     private final Set<IContainer> contextInfos;
987 
988     /**
989      * The context info for this FT context. The context info contains the
990      * following
991      * <ul>
992      * <li>FT canonical ID (string) <li>FT number (int) <li>start time (long)
993      * <li>provisionally active (boolean) <li>active (boolean)
994      * </ul>
995      */
996     Record contextInfo;
997 
998     /** The identity of the active FT context in the cluster */
999     String activeFTContextIdentity;
1000 
1001     /**
1002      * Standard constructor for an FT context that provides FT capability to a
1003      * hosted application context.
1004      * 
1005      * @param appContext
1006      *            the application context instance that is hosted by the
1007      *            clustering ability of this FT context. The FT cluster identity
1008      *            comes from the {@link IFrameworkContext#getIdentity()} of the
1009      *            context argument.
1010      * @param stateListener
1011      *            the listener that will react to the FT context being active or
1012      *            stand-by
1013      * @param ftNetwork
1014      *            the network that is used for FT activities; other FTContexts
1015      *            joining the same network will coordinate between themselves to
1016      *            identify which one should be the active context
1017      * @param number
1018      *            the FT number for the context within the cluster; determines
1019      *            relative priority for becoming active between all peer FT
1020      *            contexts within the same cluster
1021      * @param activationDelayMillis
1022      *            the delay to wait between start and provisional activation
1023      *            states. This provides time for other FT context instances to
1024      *            be discovered prior to entering the {@link ProvisionalActive}
1025      *            state. The value must be at least equal to the ftNetwork's
1026      *            {@link IHeartbeatMonitor#getNetworkHeartbeatPeriod heartbeat
1027      *            period} multiplied by {@link FTContext#HEARTBEAT_MULTIPLIER}.
1028      *            Choosing a number too low can result in newly started contexts
1029      *            assuming confirmed active state before detecting any other
1030      *            confirmed active instances.
1031      */
1032     @SuppressWarnings("unchecked")
1033     public FTContext(IFrameworkContext appContext,
1034         IFTContextStateListener stateListener, INetwork ftNetwork, int number,
1035         long activationDelayMillis)
1036     {
1037         nullCheck(stateListener, "No FTContext state listener provided");
1038         nullCheck(ftNetwork, "No FT network provided");
1039         this.appContext = appContext;
1040 
1041         // swap in a distribution manager that is FT aware
1042         final FTDistributionManager distributionManager =
1043             new FTDistributionManager(appContext, this);
1044         getAppContext().setDistributionManager(distributionManager);
1045         distributionManager.start();
1046 
1047         this.stateListener = stateListener;
1048         this.ftNumber = number;
1049         this.activationDelay = activationDelayMillis;
1050         this.stateManager = new StateManager();
1051         try
1052         {
1053             this.ftContextIdentity =
1054                 getClusterIdentity() + DELIMITER + this.ftNumber + DELIMITER
1055                     + InetAddress.getLocalHost().getHostAddress() + DELIMITER
1056                     + System.identityHashCode(getAppContext()) + DELIMITER
1057                     + System.nanoTime();
1058         }
1059         catch (UnknownHostException e)
1060         {
1061             throw new RuntimeException(
1062                 "Could not get the local host IP address for context: "
1063                     + getAppContext().getIdentity(), e);
1064         }
1065         this.ftWorkerContext = new FulmineContext(ftContextIdentity, 2);
1066         getFtWorkerContext().setNetwork(ftNetwork);
1067 
1068         this.contextInfos = CollectionFactory.newSet();
1069         Map<Class<? extends IEvent>, IEventListener> listeners =
1070             AbstractEventHandler.getEventHandlerMappings(
1071                 new ContextInfoStateChangeHandler(),
1072                 new ContextDiscoveredEventHandler(), new ContextInfoHandler());
1073         this.ftLogicHandler =
1074             new MultiSystemEventListener(getFtContextIdentity(),
1075                 getFtWorkerContext(), listeners);
1076     }
1077 
1078     /** The component that can report on the state of this context */
1079     private IContextWatchdog watchdog;
1080 
1081     public IContextWatchdog getContextWatchdog()
1082     {
1083         return this.watchdog;
1084     }
1085 
1086     public void setContextWatchdog(IContextWatchdog watchdog)
1087     {
1088         this.watchdog = watchdog;
1089     }
1090 
1091     protected final void doStart()
1092     {
1093         validate();
1094 
1095         prepareAppContextForStart();
1096         getAppContext().start();
1097 
1098         getFtWorkerContext().start();
1099 
1100         createContextInfoRecord();
1101 
1102         getStateManager().setState(StateEnum.Startup, this);
1103 
1104         getStateListener().start();
1105         getFtLogicHandler().start();
1106     }
1107 
1108     /**
1109      * Prepare the application context before calling {@link ILifeCycle#start()
1110      * start}. Default implementation sets the application context network into
1111      * listen-only mode.
1112      */
1113     protected void prepareAppContextForStart()
1114     {
1115         // start the application context but with no pulsing
1116         getNetwork().setListeningOnlyMode(true);
1117     }
1118 
1119     protected final void doDestroy()
1120     {
1121         try
1122         {
1123             getFtWorkerContext().getSystemEventSource(
1124                 ContextDiscoveredEvent.class).removeListener(
1125                 getFtLogicHandler());
1126         }
1127         catch (Exception e)
1128         {
1129             logException(LOG, "clusteringLogic", e);
1130         }
1131         getFtWorkerContext().destroy();
1132         try
1133         {
1134             getStateListener().destroy();
1135         }
1136         catch (Exception e)
1137         {
1138             logException(LOG, "statelistener", e);
1139         }
1140         getAppContext().destroy();
1141     }
1142 
1143     /**
1144      * Invoked by the {@link Standby} state and encapsulates the logic the
1145      * {@link FTContext} needs to perform to go into standby state.
1146      */
1147     protected void standby()
1148     {
1149         doStandby(getAppContext(), getAppContext().getNetwork(),
1150             getAppContext().getConnectionDiscoverer(),
1151             getAppContext().getConnectionBroker());
1152     }
1153 
1154     /**
1155      * Standard operation for standby that stops heartbeat pulsing and recreates
1156      * the {@link IConnectionBroker}
1157      * 
1158      * @param context
1159      *            the context for the connection broker
1160      * @param network
1161      *            the network instance for the new connection broker
1162      * @param discoverer
1163      *            the discoverer to stop heartbeats for
1164      * @param connectionBroker
1165      *            the connection broker to stop
1166      */
1167     protected final void doStandby(IFrameworkContext context, INetwork network,
1168         IConnectionDiscoverer discoverer, IConnectionBroker connectionBroker)
1169     {
1170         nullCheck(discoverer, "no discoverer");
1171         nullCheck(network, "no network");
1172         nullCheck(context, "no context");
1173         discoverer.disablePulsing();
1174         // bounce the connection broker
1175         try
1176         {
1177             if (connectionBroker != null)
1178             {
1179                 connectionBroker.destroy();
1180             }
1181             // now restart
1182             context.setConnectionBroker(network.createBroker());
1183             context.getConnectionBroker().start();
1184         }
1185         catch (Exception e)
1186         {
1187             logException(LOG, FTContext.this, e);
1188         }
1189     }
1190 
1191     /**
1192      * Invoked by the {@link Confirmed} state and encapsulates the logic the
1193      * {@link FTContext} needs to perform to go into the active state.
1194      */
1195     protected void activate()
1196     {
1197         // moving into confirmed active, switch on the application network
1198         getConnectionDiscoverer().enablePulsing();
1199     }
1200 
1201     private void validate()
1202     {
1203         final INetwork network = getNetwork();
1204         nullCheck(network, "No network provided for " + getAppContext());
1205 
1206         // check the activation delay is less than the heartbeat period
1207         if (this.activationDelay < getNetwork().getNetworkHeartbeatPeriod() * 3)
1208         {
1209             throw new IllegalStateException(
1210                 "The activation delay of an FTContext must be at "
1211                     + "least "
1212                     + FTContext.HEARTBEAT_MULTIPLIER
1213                     + "x the FT heartbeat period. The activationDelay="
1214                     + this.activationDelay
1215                     + "ms, FT heartbeatPeriod="
1216                     + getFtWorkerContext().getNetwork().getNetworkHeartbeatPeriod()
1217                     + "ms. "
1218                     + "Either increase the activationDelay or reduce the heartbeat period for the FT network.");
1219         }
1220     }
1221 
1222     @Override
1223     public String toString()
1224     {
1225         return getFtContextIdentity() + OPEN_BRACE
1226             + getStateManager().currentState + CLOSE_BRACE;
1227     }
1228 
1229     @Override
1230     protected AsyncLog getLog()
1231     {
1232         return LOG;
1233     }
1234 
1235     /**
1236      * @return The context that manages the FT behaviour. This connects to the
1237      *         FT network and negotiates with any other FT contexts on that
1238      *         network to determine if this is the active one. If it is the
1239      *         active one, it will activate its application context.
1240      */
1241     IFrameworkContext getFtWorkerContext()
1242     {
1243         return this.ftWorkerContext;
1244     }
1245 
1246     /**
1247      * @return the application context this FTContext hosts
1248      */
1249     protected IFrameworkContext getAppContext()
1250     {
1251         return this.appContext;
1252     }
1253 
1254     /**
1255      * @return The listener to react to state changes of the clustered context
1256      */
1257     private IFTContextStateListener getStateListener()
1258     {
1259         return this.stateListener;
1260     }
1261 
1262     /**
1263      * @return the listener to handle events that influence whether this FT
1264      *         context is active or standby in the cluster.
1265      */
1266     MultiSystemEventListener getFtLogicHandler()
1267     {
1268         return this.ftLogicHandler;
1269     }
1270 
1271     private StateManager getStateManager()
1272     {
1273         return this.stateManager;
1274     }
1275 
1276     /**
1277      * @return the unique identity of this FT context within the cluster
1278      *         "{clusterIdentity}:{FT number}:{host IP}:{object
1279      *         identityHash}:{time of creation in nanoseconds}"
1280      */
1281     String getFtContextIdentity()
1282     {
1283         return this.ftContextIdentity;
1284     }
1285 
1286     /**
1287      * Get the identity of the cluster this FT context is a part of:
1288      * "FTContext:{context-identity}"
1289      * 
1290      * @return the identity of the cluster this FT context is a part of
1291      */
1292     private String getClusterIdentity()
1293     {
1294         return FTContext.FT_ID_PREFIX + getIdentity();
1295     }
1296 
1297     /**
1298      * Create the context info record for this context
1299      */
1300     private void createContextInfoRecord()
1301     {
1302         this.contextInfo =
1303             (Record) getFtWorkerContext().getLocalContainer(
1304                 this.getFtContextIdentity(), Type.SYSTEM, Domain.FRAMEWORK);
1305         IntegerField contextNumber =
1306             new IntegerField(FTContext.FT_NUMBER_FIELD);
1307         contextNumber.set(this.ftNumber);
1308         LongField startTime = new LongField(FTContext.START_TIME_FIELD);
1309         startTime.set(System.currentTimeMillis());
1310         BooleanField provisionalActive =
1311             new BooleanField(FTContext.PROVISIONAL_ACTIVE_FIELD);
1312         provisionalActive.set(false);
1313         BooleanField activeContext =
1314             new BooleanField(FTContext.CONFIRMED_ACTIVE_FIELD);
1315         activeContext.set(false);
1316         this.contextInfo.add(contextNumber);
1317         this.contextInfo.add(startTime);
1318         this.contextInfo.add(activeContext);
1319         this.contextInfo.add(provisionalActive);
1320         this.contextInfo.flushFrame();
1321     }
1322 
1323     /**
1324      * Set the FT context provisional or confirmed active state to the specified
1325      * state. This method updates the context info record and flushes it; other
1326      * FT contexts within the cluster will receive the state change. The
1327      * {@link #stateListener} will be notified only on a change in state.
1328      * 
1329      * @param stateField
1330      *            the state of the FT context to set; either provisional or
1331      *            confirmed
1332      * @param stateValue
1333      *            the state for the FT context provisional or confirmed state
1334      * 
1335      * @see #isFTContextActive()
1336      */
1337     private void setFTContextState(final String stateField,
1338         final boolean stateValue)
1339     {
1340         final BooleanField provisionalState =
1341             contextInfo.getBooleanField(FTContext.PROVISIONAL_ACTIVE_FIELD);
1342         boolean wasProvisionallyActive = provisionalState.get();
1343 
1344         final BooleanField confirmedState =
1345             contextInfo.getBooleanField(FTContext.CONFIRMED_ACTIVE_FIELD);
1346         boolean wasConfirmedActive = confirmedState.get();
1347 
1348         boolean provisionalActive = wasProvisionallyActive;
1349         boolean confirmedActive = wasConfirmedActive;
1350         if (PROVISIONAL_ACTIVE_FIELD.equals(stateField))
1351         {
1352             provisionalActive = stateValue;
1353             if (!provisionalActive)
1354             {
1355                 // if not provisional, neither can we be confirmed
1356                 confirmedActive = false;
1357             }
1358         }
1359         else
1360         {
1361             confirmedActive = stateValue;
1362             // regardless of the confirmed state, set provisional to false
1363             provisionalActive = false;
1364         }
1365 
1366         if (LOG.isDebugEnabled())
1367         {
1368             LOG.debug(FTContext.this + " provisional: "
1369                 + (wasProvisionallyActive ? "active" : "standby") + "->"
1370                 + (provisionalActive ? "active" : "standby") + ", confirmed: "
1371                 + (wasConfirmedActive ? "active" : "standby") + "->"
1372                 + (confirmedActive ? "active" : "standby"));
1373         }
1374         provisionalState.set(provisionalActive);
1375         confirmedState.set(confirmedActive);
1376         this.contextInfo.flushFrame();
1377     }
1378 
1379     /**
1380      * Log a banner message to display the state of the context - either ACTIVE
1381      * or STANDBY
1382      */
1383     private void logState()
1384     {
1385         if (LOG.isInfoEnabled())
1386         {
1387             LOG.info(SystemUtils.LINE_SEPARATOR
1388                 + "********************************************************************************"
1389                 + SystemUtils.LINE_SEPARATOR
1390                 + "* "
1391                 + FTContext.this
1392                 + " CONTEXT IS "
1393                 + (this.isFTContextActive() ? "ACTIVE" : "STANDBY")
1394                 + SystemUtils.LINE_SEPARATOR
1395                 + "********************************************************************************");
1396         }
1397     }
1398 
1399     /**
1400      * Determine if this FT context is active. This examines the confirmed
1401      * active state.
1402      * 
1403      * @see #setFTContextState(String, boolean)
1404      * @return <code>true</code> if this FT context is active
1405      */
1406     public boolean isFTContextActive()
1407     {
1408         final BooleanField activeField =
1409             this.contextInfo.getBooleanField(FTContext.CONFIRMED_ACTIVE_FIELD);
1410         return activeField.get();
1411     }
1412 
1413     /**
1414      * Log the FTContexts in this cluster.
1415      */
1416     private void logFTContexts()
1417     {
1418         synchronized (this.contextInfos)
1419         {
1420             if (LOG.isDebugEnabled())
1421             {
1422                 if (this.contextInfos.size() > 0)
1423                 {
1424                     LOG.debug(FTContext.this + " FTContexts in cluster "
1425                         + getClusterIdentity() + " are: "
1426                         + CollectionUtils.toFormattedString(this.contextInfos));
1427                 }
1428                 else
1429                 {
1430                     if (LOG.isDebugEnabled())
1431                     {
1432                         LOG.debug(FTContext.this
1433                             + " No other FTContexts exist in cluster "
1434                             + getClusterIdentity());
1435                     }
1436                 }
1437             }
1438         }
1439     }
1440 
1441     /**
1442      * Helper method to log the state change type for the context
1443      * 
1444      * @param shouldBeActive
1445      *            whether the context should become active
1446      * @param type
1447      *            provisional or confirmed
1448      */
1449     private void logStateChange(boolean shouldBeActive, String type)
1450     {
1451         if (LOG.isDebugEnabled())
1452         {
1453             LOG.debug(FTContext.this + " is confirmed "
1454                 + (FTContext.this.isFTContextActive() ? "active" : "standby")
1455                 + " and should be " + type + SPACE
1456                 + (shouldBeActive ? "active" : "standby"));
1457         }
1458     }
1459 
1460     public void addContainer(IContainer container)
1461     {
1462         getAppContext().addContainer(container);
1463     }
1464 
1465     public boolean addSubscriptionListener(ISubscriptionListener listener)
1466     {
1467         return getAppContext().addSubscriptionListener(listener);
1468     }
1469 
1470     public boolean containsLocalContainer(String identityRegularExpression,
1471         IType type, IDomain domain)
1472     {
1473         return getAppContext().containsLocalContainer(
1474             identityRegularExpression, type, domain);
1475     }
1476 
1477     public boolean containsRemoteContainer(String remoteContextIdentity,
1478         String containerIdentity, IType type, IDomain domain)
1479     {
1480         return getAppContext().containsRemoteContainer(remoteContextIdentity,
1481             containerIdentity, type, domain);
1482     }
1483 
1484     public void execute(Runnable task)
1485     {
1486         getAppContext().execute(task);
1487     }
1488 
1489     public IChannel[] getConnectedChannels()
1490     {
1491         return getAppContext().getConnectedChannels();
1492     }
1493 
1494     public IConnectionBroker getConnectionBroker()
1495     {
1496         return getAppContext().getConnectionBroker();
1497     }
1498 
1499     public IConnectionDiscoverer getConnectionDiscoverer()
1500     {
1501         return getAppContext().getConnectionDiscoverer();
1502     }
1503 
1504     public IContainerFactory getContainerFactory()
1505     {
1506         return getAppContext().getContainerFactory();
1507     }
1508 
1509     public int getContextHashCode()
1510     {
1511         return getAppContext().getContextHashCode();
1512     }
1513 
1514     public int getEventProcessorCount()
1515     {
1516         return getAppContext().getEventProcessorCount();
1517     }
1518 
1519     public IFrameReader getFrameReader()
1520     {
1521         return getAppContext().getFrameReader();
1522     }
1523 
1524     public IFrameWriter getFrameWriter()
1525     {
1526         return getAppContext().getFrameWriter();
1527     }
1528 
1529     public String getIdentity()
1530     {
1531         return getAppContext().getIdentity();
1532     }
1533 
1534     public IContainer getLocalContainer(String identity, IType type,
1535         IDomain domain)
1536     {
1537         return getAppContext().getLocalContainer(identity, type, domain);
1538     }
1539 
1540     public Collection<IContainer> getLocalContainers()
1541     {
1542         return getAppContext().getLocalContainers();
1543     }
1544 
1545     public INetwork getNetwork()
1546     {
1547         return getAppContext().getNetwork();
1548     }
1549 
1550     public ThreadGroup getEventProcessorThreadGroup()
1551     {
1552         return getAppContext().getEventProcessorThreadGroup();
1553     }
1554 
1555     public IContainer getRemoteContainer(String remoteContextIdentity,
1556         String containerIdentity, IType type, IDomain domain)
1557     {
1558         return getAppContext().getRemoteContainer(remoteContextIdentity,
1559             containerIdentity, type, domain);
1560     }
1561 
1562     public Collection<IContainer> getRemoteContainers(
1563         String remoteContextIdentity)
1564     {
1565         return getAppContext().getRemoteContainers(remoteContextIdentity);
1566     }
1567 
1568     public ISystemEventSource getSystemEventSource(
1569         Class<? extends ISystemEvent> type)
1570     {
1571         return getAppContext().getSystemEventSource(type);
1572     }
1573 
1574     public void queueEvent(IEvent event)
1575     {
1576         getAppContext().queueEvent(event);
1577     }
1578 
1579     public void queueEvents(Collection<IEvent> events)
1580     {
1581         getAppContext().queueEvents(events);
1582     }
1583 
1584     public boolean removeContainer(IContainer container)
1585     {
1586         return getAppContext().removeContainer(container);
1587     }
1588 
1589     public boolean removeSubscriptionListener(ISubscriptionListener listener)
1590     {
1591         return getAppContext().removeSubscriptionListener(listener);
1592     }
1593 
1594     public void requestRetransmit(String contextIdentity,
1595         String identityRegularExpression, IType type, IDomain domain)
1596     {
1597         getAppContext().requestRetransmit(contextIdentity,
1598             identityRegularExpression, type, domain);
1599     }
1600 
1601     public void requestRetransmitAll(String contextIdentity)
1602     {
1603         getAppContext().requestRetransmitAll(contextIdentity);
1604     }
1605 
1606     public void retransmit(String contextIdentity,
1607         String identityRegularExpression, IType type, IDomain domain)
1608     {
1609         getAppContext().retransmit(contextIdentity, identityRegularExpression,
1610             type, domain);
1611     }
1612 
1613     public void retransmitAll(String contextIdentity)
1614     {
1615         getAppContext().retransmitAll(contextIdentity);
1616     }
1617 
1618     public void retransmitAllToAll()
1619     {
1620         getAppContext().retransmitAllToAll();
1621     }
1622 
1623     public void retransmitToAll(String identityRegularExpression, IType type,
1624         IDomain domain)
1625     {
1626         getAppContext().retransmitToAll(identityRegularExpression, type, domain);
1627     }
1628 
1629     public void setConnectionBroker(IConnectionBroker broker)
1630     {
1631         getAppContext().setConnectionBroker(broker);
1632     }
1633 
1634     public void setConnectionDiscoverer(IConnectionDiscoverer discoverer)
1635     {
1636         getAppContext().setConnectionDiscoverer(discoverer);
1637     }
1638 
1639     public void setDistributionManager(IDistributionManager distributionManager)
1640     {
1641         getAppContext().setDistributionManager(distributionManager);
1642     }
1643 
1644     public IDistributionManager getDistributionManager()
1645     {
1646         return getAppContext().getDistributionManager();
1647     }
1648 
1649     public void setNetwork(INetwork network)
1650     {
1651         getAppContext().setNetwork(network);
1652     }
1653 
1654     public boolean subscribe(String contextIdentity,
1655         String identityRegularExpression, IType type, IDomain domain,
1656         IEventListener listener)
1657     {
1658         /*
1659          * If we subscribe for the same context identity, but we're disabled,
1660          * then we do a remote subscribe - this is handled by a specialised
1661          * distribution manager, the FTDistributionManager.
1662          */
1663         return getAppContext().subscribe(contextIdentity,
1664             identityRegularExpression, type, domain, listener);
1665     }
1666 
1667     public boolean unsubscribe(String contextIdentity,
1668         String identityRegularExpression, IType type, IDomain domain,
1669         IEventListener listener)
1670     {
1671         /*
1672          * If we unsubscribe for the same context identity, but we're disabled,
1673          * then do a remote subscribe - this is handled by a specialised
1674          * distribution manager, the FTDistributionManager.
1675          */
1676         return getAppContext().unsubscribe(contextIdentity,
1677             identityRegularExpression, type, domain, listener);
1678     }
1679 
1680     public String getEventProcessorIdentityPrefix()
1681     {
1682         return getAppContext().getEventProcessorIdentityPrefix();
1683     }
1684 
1685     public void invokeRpc(String remoteContextIdentity, byte[] rpcData)
1686     {
1687         getAppContext().invokeRpc(remoteContextIdentity, rpcData);
1688     }
1689 
1690     public boolean addRpcPublicationListener(String remoteContextIdentity,
1691         IRpcPublicationListener listener)
1692     {
1693         return getAppContext().addRpcPublicationListener(remoteContextIdentity,
1694             listener);
1695     }
1696 
1697     public boolean unpublishRpcs(Class<?> definition, Object handler)
1698     {
1699         return getAppContext().unpublishRpcs(definition, handler);
1700     }
1701 
1702     public boolean publishRpcs(Class<?> definition, Object handler)
1703     {
1704         return getAppContext().publishRpcs(definition, handler);
1705     }
1706 
1707     public IRpcResult invoke(String remoteContextIdentity, String procedure,
1708         IField... args)
1709     {
1710         return getAppContext().invoke(remoteContextIdentity, procedure, args);
1711     }
1712 
1713     public IRpcMarker invoke(IRpcResultHandler resultHandler,
1714         String remoteContextIdentity, String procedure, IField... args)
1715     {
1716         return getAppContext().invoke(resultHandler, remoteContextIdentity,
1717             procedure, args);
1718     }
1719 
1720     public boolean removeRpcPublicationListener(String remoteContextIdentity,
1721         IRpcPublicationListener listener)
1722     {
1723         return getAppContext().removeRpcPublicationListener(
1724             remoteContextIdentity, listener);
1725     }
1726 
1727     public boolean publishProdedure(IRpcHandler handler,
1728         IRpcDefinition rpcDefinition)
1729     {
1730         return getAppContext().publishProdedure(handler, rpcDefinition);
1731     }
1732 
1733     public boolean unpublishProdedure(IRpcDefinition rpcDefinition)
1734     {
1735         return getAppContext().unpublishProdedure(rpcDefinition);
1736     }
1737 
1738     public IPermissionProfile getPermissionProfile()
1739     {
1740         return getAppContext().getPermissionProfile();
1741     }
1742 
1743     public void setPermissionProfile(IPermissionProfile profile)
1744     {
1745         getAppContext().setPermissionProfile(profile);
1746     }
1747 
1748     public void setRemoteUpdateHandler(IRemoteUpdateHandler handler)
1749     {
1750         getAppContext().setRemoteUpdateHandler(handler);
1751     }
1752 
1753     public String updateRemoteContainer(String remoteContextIdentity,
1754         String identity, IType type, IDomain domain, String fieldName,
1755         String fieldValueAsString)
1756     {
1757         return getAppContext().updateRemoteContainer(remoteContextIdentity,
1758             identity, type, domain, fieldName, fieldValueAsString);
1759     }
1760 
1761     public IContainer getSystemInfo()
1762     {
1763         return getAppContext().getSystemInfo();
1764     }
1765 
1766     public void schedule(TimerTask task, long delay, long period)
1767     {
1768         getAppContext().schedule(task, delay, period);
1769     }
1770 }