View Javadoc

1   /*
2      Copyright 2008 Ramon Servadei
3   
4      Licensed under the Apache License, Version 2.0 (the "License");
5      you may not use this file except in compliance with the License.
6      You may obtain a copy of the License at
7   
8          http://www.apache.org/licenses/LICENSE-2.0
9   
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15   */
16  package fulmine.context;
17  
18  import static fulmine.util.Utils.COLON;
19  import static fulmine.util.Utils.logException;
20  import static fulmine.util.Utils.safeToString;
21  
22  import java.util.Arrays;
23  import java.util.Iterator;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Set;
27  import java.util.concurrent.atomic.AtomicInteger;
28  
29  import fulmine.AbstractLifeCycle;
30  import fulmine.Domain;
31  import fulmine.Type;
32  import fulmine.distribution.channel.ChannelReadyEvent;
33  import fulmine.distribution.events.ConnectionDestroyedEvent;
34  import fulmine.event.EventFrameExecution;
35  import fulmine.event.IEvent;
36  import fulmine.event.listener.AbstractEventHandler;
37  import fulmine.event.listener.IEventListener;
38  import fulmine.event.listener.MultiEventListener;
39  import fulmine.event.listener.MultiSystemEventListener;
40  import fulmine.event.system.EventSourceNotObservedEvent;
41  import fulmine.event.system.EventSourceObservedEvent;
42  import fulmine.event.system.ISystemEventListener;
43  import fulmine.model.container.IContainer;
44  import fulmine.model.container.IContainer.DataState;
45  import fulmine.model.container.events.AbstractContainerFieldEvent;
46  import fulmine.model.container.events.ContainerFieldAddedEvent;
47  import fulmine.model.container.events.ContainerFieldRemovedEvent;
48  import fulmine.model.field.IField;
49  import fulmine.model.field.IntegerField;
50  import fulmine.model.field.StringField;
51  import fulmine.rpc.IRpcDefinition;
52  import fulmine.rpc.IRpcHandler;
53  import fulmine.rpc.IRpcManager;
54  import fulmine.rpc.IRpcMarker;
55  import fulmine.rpc.IRpcPublicationListener;
56  import fulmine.rpc.IRpcRegistry;
57  import fulmine.rpc.IRpcResult;
58  import fulmine.rpc.IRpcResultHandler;
59  import fulmine.rpc.RpcCodec;
60  import fulmine.rpc.RpcDefinition;
61  import fulmine.rpc.RpcMarker;
62  import fulmine.rpc.RpcRegistry;
63  import fulmine.rpc.RpcResult;
64  import fulmine.rpc.RpcUtils;
65  import fulmine.rpc.events.RpcInvokeEvent;
66  import fulmine.rpc.events.SendRpcEvent;
67  import fulmine.util.Utils;
68  import fulmine.util.collection.CollectionFactory;
69  import fulmine.util.collection.MapList;
70  import fulmine.util.collection.MapSet;
71  import fulmine.util.concurrent.ITaskExecutor;
72  import fulmine.util.concurrent.ITaskHandler;
73  import fulmine.util.concurrent.Task;
74  import fulmine.util.concurrent.TaskExecutor;
75  import fulmine.util.log.AsyncLog;
76  import fulmine.util.reference.DualValue;
77  import fulmine.util.reference.IReferenceCounter;
78  import fulmine.util.reference.QuadValue;
79  import fulmine.util.reference.ReferenceCounter;
80  import fulmine.util.reference.Value;
81  
82  /**
83   * The standard implementation of an {@link IRpcManager}.
84   * <p>
85   * RPC definitions can be invoked in a multi-threaded context. However,
86   * simultaneous calls to the same RPC definition are handled sequentially.
87   * <p>
88   * Every RPC definition has an associated 'result record'. When a local context
89   * handles an RPC invocation from a remote context, it also creates a result
90   * record that will contain the result from the RPC invocation. The result
91   * record has a specific naming convention, in ABNF form:
92   * 
93   * <pre>
94   * result-record            = remote-context-identity &quot;:&quot; RPC-registry-key
95   * 
96   * remote-context-identity  = 1*(ALPHA / DIGIT)
97   * RPC-registry-key         = &quot;RpcKey&quot; + 1*(DIGIT)
98   * </pre>
99   * 
100  * When invoking an RPC, the local context first subscribes for the result
101  * record then issues the RPC. On the receiving end, the invocation is packaged
102  * up into an event. The remote context's RpcManager registers an
103  * {@link IEventListener} that will respond to these {@link RpcInvokeEvent}s.
104  * This listener will be responsible for locating the appropriate
105  * {@link IRpcHandler} and invoking it with the arguments encapsulated in the
106  * event. The result record attached to the remote context for the RPC
107  * definition is then updated with the result and the invoking context will
108  * receive this result and return it to the application caller.
109  * <p>
110  * The sequence diagram below helps to illustrate the operation.
111  * 
112  * <pre>
113  * Application   IRpcManager    RpcResultHandler      RpcInvokeHandler   IRpcHandler   ResultRecord
114  *  |                 |                |                     |                |             |
115  *  |   invoke        |                |                     |                |             |
116  *  |----------------&gt;|                |                     |                |             |
117  *  |                 |            RpcInokeEvent             |                |             |
118  *  |                 |-------------------------------------&gt;|                |             |
119  *  |                 |            (remote call)             |                |             |
120  *  |                 |                |                     |                |             |
121  *  |                 |                |                     |    handle      |             |
122  *  |                 |                |                     |---------------&gt;|             |
123  *  |                 |                |                     |                |             |
124  *  |                 |                |                     |    result      |             |
125  *  |                 |                |                     |&lt;---------------|             |
126  *  |                 |                |                     |                |             |
127  *  |                 |                |                     |     update with result       |
128  *  |                 |                |                     |-----------------------------&gt;|
129  *  |                 |                |                     |                |             |
130  *  |                 |                |   get result details (after remote transmission)   |
131  *  |                 |                |---------------------------------------------------&gt;|
132  *  |                 |                |                     |                |             |
133  *  |              result              |                     |                |             |
134  *  |&lt;---------------------------------|                     |                |             |
135  *  |                 |                |                     |                |             |
136  * </pre>
137  * 
138  * The timeout for RPC calls is defined by the system property
139  * {@link IRpcManager#RPC_TIMEOUT}. If the timeout expires, an exception is
140  * printed and a null value will be returned from
141  * {@link #invoke(String, String, IField...)}.
142  * <p>
143  * Every context has a special component called the 'RPC registry'. This holds
144  * every RPC that a local context exposes. Remote contexts must subscribe for
145  * this via
146  * {@link IFulmineContext#addRpcPublicationListener(String, IRpcPublicationListener)}
147  * in order for the remote context to receive the RPC definitions of the
148  * context. Calling {@link IFulmineContext#invoke(String, String, IField...)}
149  * before the RPC registry is received is safe; when the RPC definition is
150  * received from the target context, the RPC will be invoked.
151  * 
152  * @see RpcRegistry
153  * @author Ramon Servadei
154  */
155 public final class RpcManager extends AbstractLifeCycle implements IRpcManager,
156     IRpcManagerOperations
157 {
158     private final static AsyncLog LOG = new AsyncLog(RpcManager.class);
159 
160     /**
161      * Defines the timeout in use - specified by the setting of
162      * {@link IRpcManager#RPC_TIMEOUT}
163      */
164     private final int TIMEOUT =
165         Integer.parseInt(System.getProperty(IRpcManager.RPC_TIMEOUT,
166             IRpcManager.DEFAULT_RPC_TIMEOUT));
167 
168     /**
169      * Internal class to handle results for the synchronous version of
170      * {@link RpcManager#invoke(String, String, IField...)}. All RPC invocations
171      * are asynchronous, this result handler notifies all threads waiting on
172      * {@link #resultValue} when the result comes in for the correct RPC marker.
173      * 
174      * @author Ramon Servadei
175      */
176     private final class ResultHandler implements IRpcResultHandler
177     {
178         private final Value<IRpcMarker> markerValue;
179 
180         private final Value<IRpcResult> resultValue;
181 
182         private ResultHandler(Value<IRpcMarker> markerValue,
183             Value<IRpcResult> resultValue)
184         {
185             this.markerValue = markerValue;
186             this.resultValue = resultValue;
187         }
188 
189         public void resultReceived(IRpcResult result, IRpcMarker marker)
190         {
191             if (marker.equals(this.markerValue.get()))
192             {
193                 this.resultValue.set(result);
194                 synchronized (this.resultValue)
195                 {
196                     this.resultValue.notifyAll();
197                 }
198             }
199         }
200     }
201 
202     /** Shared state for the manager */
203     private final IRpcManagerState state;
204 
205     /**
206      * A multi-event listener that handles signalling RPC published/unpublished
207      * events when remote RPC registry instances have RPC definitions
208      * added/removed.
209      */
210     private final MultiEventListener rpcPublicationListener;
211 
212     /**
213      * Construct the RPC manager.
214      * 
215      * @param context
216      *            the local context for the RPC manager
217      */
218     RpcManager(IFrameworkContext context)
219     {
220         this(new RpcManagerState(context));
221     }
222 
223     /**
224      * Internally chained constructor
225      * 
226      * @param state
227      *            the state for the RPC manager
228      */
229     @SuppressWarnings("unchecked")
230     RpcManager(IRpcManagerState state)
231     {
232         super();
233         this.state = state;
234         this.rpcPublicationListener =
235             new MultiEventListener(RpcManager.class.getSimpleName(),
236                 getState().getContext(),
237                 AbstractEventHandler.getEventHandlerMappings(
238                     new RpcPublishedHandler(getState(), this),
239                     new RpcUnpublishedHandler(getState(), this)));
240     }
241 
242     public IRpcResult invoke(String remoteContextIdentity, String procedure,
243         IField... args)
244     {
245         final Value<IRpcResult> resultValue = new Value<IRpcResult>();
246         final Value<IRpcMarker> markerValue = new Value<IRpcMarker>();
247         synchronized (resultValue)
248         {
249             final IRpcMarker marker =
250                 invoke(new ResultHandler(markerValue, resultValue),
251                     remoteContextIdentity, procedure, args);
252             markerValue.set(marker);
253             if (resultValue.get() == null)
254             {
255                 try
256                 {
257                     // now block until the result handler receives the result
258                     resultValue.wait(TIMEOUT);
259                 }
260                 catch (InterruptedException e)
261                 {
262                     logException(getLog(), "RPC name=" + procedure + ", args="
263                         + Arrays.deepToString(args) + ", context="
264                         + remoteContextIdentity, e);
265                 }
266             }
267             if (resultValue.get() == null)
268             {
269                 Utils.logException(getLog(), "TIMEOUT for RPC name="
270                     + procedure + ", args=" + Arrays.deepToString(args)
271                     + ", context=" + remoteContextIdentity, new Exception());
272             }
273         }
274         return resultValue.get();
275     }
276 
277     public IRpcMarker invoke(IRpcResultHandler resultHandler,
278         String remoteContextIdentity, String procedure, IField... args)
279     {
280         IRpcMarker rpcMarker =
281             new RpcMarker(getState().getMarkerCounter().getAndIncrement());
282         DualValue<String, IRpcDefinition> keyAndDefinition =
283             getRegistryKeyAndDefinition(remoteContextIdentity, procedure, args);
284         /*
285          * Try to see if the RPC can be invoked, if not, it will be placed onto
286          * a pending RPC queue for invoking when the RPC/context become
287          * available.
288          */
289         if (keyAndDefinition == null)
290         {
291             // context OR RPC is not available
292             final Set<String> connectedContexts =
293                 getState().getConnectedContexts();
294             synchronized (connectedContexts)
295             {
296                 // this variable is used purely for logging purposes
297                 boolean contextExists = false;
298                 if (connectedContexts.contains(remoteContextIdentity))
299                 {
300                     contextExists = true;
301                 }
302                 getState().getPendingRpcInvocations().get(remoteContextIdentity).add(
303                     new QuadValue<String, IField[], IRpcResultHandler, IRpcMarker>(
304                         procedure, args, resultHandler, rpcMarker));
305                 if (getLog().isDebugEnabled())
306                 {
307                     getLog().debug(
308                         (contextExists ? "RPC for " + remoteContextIdentity
309                             : "Remote context " + remoteContextIdentity)
310                             + " is not yet available so placing RPC {name="
311                             + safeToString(procedure)
312                             + ", args="
313                             + Arrays.deepToString(args)
314                             + "} onto the pending RPC queue.");
315                 }
316             }
317             return rpcMarker;
318         }
319 
320         // now invoke
321         invoke(remoteContextIdentity, keyAndDefinition.getFirst(),
322             keyAndDefinition.getSecond(), args, resultHandler, rpcMarker);
323 
324         return rpcMarker;
325     }
326 
327     @SuppressWarnings("boxing")
328     public void invoke(String remoteContextIdentity, String rpcKey,
329         IRpcDefinition definition, IField[] args,
330         IRpcResultHandler resultHandler, IRpcMarker marker)
331     {
332         if (getLog().isDebugEnabled())
333         {
334             getLog().debug(
335                 "Invoking " + safeToString(definition) + " with args="
336                     + Arrays.deepToString(args) + ", "
337                     + safeToString(resultHandler) + ", " + safeToString(marker));
338         }
339 
340         // increment the reference count for this result record, if it is 1,
341         // then subscribe for the record from the remote context
342         final String resultRecordName =
343             getState().getResultRecordName(
344                 getState().getContext().getIdentity(), rpcKey);
345         final int count;
346         final IReferenceCounter<DualValue<String, String>> counter =
347             getState().getResultRecordSubscriptionCounter();
348         synchronized (counter)
349         {
350             count =
351                 counter.adjustCount(new DualValue<String, String>(
352                     remoteContextIdentity, resultRecordName), 1);
353         }
354         if (count == 1)
355         {
356             // subscribe for the result record
357             getState().getContext().subscribe(remoteContextIdentity,
358                 resultRecordName, Type.SYSTEM, Domain.FRAMEWORK,
359                 getState().getEventHandler());
360         }
361 
362         // prepare the data buffer that represents the RPC call
363         byte[] data =
364             new RpcCodec(getState().getRegistry()).encode(marker, rpcKey,
365                 getState().getContext().getIdentity(), args);
366         final Map<Integer, DualValue<IRpcResultHandler, IRpcDefinition>> resultHandlers =
367             getState().getResultHandlers();
368         synchronized (resultHandlers)
369         {
370             resultHandlers.put(marker.getId(),
371                 new DualValue<IRpcResultHandler, IRpcDefinition>(resultHandler,
372                     definition));
373         }
374         /*
375          * Send this off in an event so that the ordering of subscribing for the
376          * record occurs before the RPC invoke
377          */
378         getState().getContext().queueEvent(
379             new SendRpcEvent(getState().getContext(), remoteContextIdentity,
380                 data));
381     }
382 
383     public boolean publishProdedure(IRpcHandler handler,
384         IRpcDefinition rpcDefinition)
385     {
386         return getState().getRegistry().publishProdedure(handler, rpcDefinition);
387     }
388 
389     public boolean unpublishProdedure(IRpcDefinition rpcDefinition)
390     {
391         return getState().getRegistry().unpublishProdedure(rpcDefinition);
392     }
393 
394     public boolean addRpcPublicationListener(String remoteContextIdentity,
395         IRpcPublicationListener listener)
396     {
397         boolean subscribe = false;
398         final boolean added;
399         final MapSet<String, IRpcPublicationListener> rpcPublicationListeners =
400             getState().getRpcPublicationListeners();
401         synchronized (rpcPublicationListeners)
402         {
403             final Set<IRpcPublicationListener> listenersForContext =
404                 rpcPublicationListeners.get(remoteContextIdentity);
405             added = listenersForContext.add(listener);
406             if (added)
407             {
408                 // perform copy-on-write
409                 rpcPublicationListeners.put(remoteContextIdentity,
410                     CollectionFactory.newSet(listenersForContext));
411                 subscribe = listenersForContext.size() == 1;
412             }
413         }
414         if (subscribe)
415         {
416             // todo do we really want to subscribe/unsubscribe?
417             getState().getContext().subscribe(remoteContextIdentity,
418                 IRpcRegistry.RPC_REGISTRY, Type.SYSTEM, Domain.FRAMEWORK,
419                 getRpcPublicationListener());
420         }
421         /*
422          * at this point, the record may already exist so we need to go through
423          * the record and notify the new listener with all the available RPCs
424          */
425         final IContainer remoteContainer =
426             getState().getContext().getRemoteContainer(remoteContextIdentity,
427                 IRpcRegistry.RPC_REGISTRY, Type.SYSTEM, Domain.FRAMEWORK);
428         final String[] componentIdentities =
429             remoteContainer.getComponentIdentities();
430         for (String identity : componentIdentities)
431         {
432             if (identity.indexOf(IRpcRegistry.RPC_KEY) > -1)
433             {
434                 final IField field = remoteContainer.get(identity);
435                 if (field instanceof StringField)
436                 {
437                     final String definitionAsString =
438                         ((StringField) field).get();
439                     IRpcDefinition rpcDefinition =
440                         new RpcDefinition(definitionAsString);
441                     listener.procedureAvailable(remoteContextIdentity,
442                         rpcDefinition);
443                 }
444             }
445         }
446         return added;
447     }
448 
449     public boolean removeRpcPublicationListener(String remoteContextIdentity,
450         IRpcPublicationListener listener)
451     {
452         boolean unsubscribe;
453         boolean removed;
454         final MapSet<String, IRpcPublicationListener> rpcPublicationListeners =
455             getState().getRpcPublicationListeners();
456         synchronized (rpcPublicationListeners)
457         {
458             unsubscribe = false;
459             final Set<IRpcPublicationListener> listenersForContext =
460                 rpcPublicationListeners.get(remoteContextIdentity);
461             removed = listenersForContext.remove(listener);
462             if (removed)
463             {
464                 // perform copy-on-write for the set of listeners
465                 rpcPublicationListeners.put(remoteContextIdentity,
466                     CollectionFactory.newSet(listenersForContext));
467             }
468             unsubscribe = listenersForContext.size() == 0;
469         }
470         if (unsubscribe)
471         {
472             getState().getContext().unsubscribe(remoteContextIdentity,
473                 IRpcRegistry.RPC_REGISTRY, Type.SYSTEM, Domain.FRAMEWORK,
474                 getRpcPublicationListener());
475         }
476         return removed;
477     }
478 
479     @Override
480     protected void doDestroy()
481     {
482         getState().destroy();
483     }
484 
485     @Override
486     protected void doStart()
487     {
488         getState().init(this);
489         getState().start();
490     }
491 
492     @Override
493     protected AsyncLog getLog()
494     {
495         return LOG;
496     }
497 
498     /**
499      * Get the RPC registry key and {@link IRpcDefinition} for the procedure.
500      * This checks the RPC registry record of the remote context to find the key
501      * for an RPC with matching name and arguments.
502      * 
503      * @param remoteContextIdentity
504      *            the remote context
505      * @param procedure
506      *            the RPC name
507      * @param args
508      *            the RPC arguments
509      * @return the RPC registry key and {@link IRpcDefinition} for the RPC in
510      *         the remote context or <code>null</code> if not found
511      */
512     public DualValue<String, IRpcDefinition> getRegistryKeyAndDefinition(
513         String remoteContextIdentity, String procedure, IField[] args)
514     {
515         if (!getState().getContext().containsRemoteContainer(
516             remoteContextIdentity, IRpcRegistry.RPC_REGISTRY, Type.SYSTEM,
517             Domain.FRAMEWORK))
518         {
519             return null;
520         }
521         // prepare the signature we're looking for
522         String rpcSignature = RpcUtils.getSignature(procedure, args);
523         final IContainer registry =
524             getState().getContext().getRemoteContainer(remoteContextIdentity,
525                 IRpcRegistry.RPC_REGISTRY, Type.SYSTEM, Domain.FRAMEWORK);
526         final String[] componentIdentities = registry.getComponentIdentities();
527         for (String fieldId : componentIdentities)
528         {
529             if (fieldId.startsWith(IRpcRegistry.RPC_KEY))
530             {
531                 // if the RPC definition 'toString' contains the signature,
532                 // we've found the match so return the key
533                 final IField field = registry.get(fieldId);
534                 if (field instanceof StringField)
535                 {
536                     final String definitionAsString =
537                         ((StringField) field).get();
538                     if (definitionAsString.contains(rpcSignature))
539                     {
540                         return new DualValue<String, IRpcDefinition>(fieldId,
541                             new RpcDefinition(definitionAsString));
542                     }
543                 }
544             }
545         }
546         return null;
547     }
548 
549     /**
550      * Get the shared state for the manager
551      * 
552      * @return the shared state
553      */
554     private IRpcManagerState getState()
555     {
556         return this.state;
557     }
558 
559     private MultiEventListener getRpcPublicationListener()
560     {
561         return this.rpcPublicationListener;
562     }
563 
564     public boolean unpublishRpcs(Class<?> definition, Object handler)
565     {
566         return getState().getRegistry().unpublishRpcs(definition, handler);
567     }
568 
569     public boolean publishRpcs(Class<?> definition, Object handler)
570     {
571         return getState().getRegistry().publishRpcs(definition, handler);
572     }
573 }
574 
575 /**
576  * State for the {@link RpcManager}
577  * 
578  * @author Ramon Servadei
579  */
580 final class RpcManagerState extends AbstractLifeCycle implements
581     IRpcManagerState
582 {
583     private final IReferenceCounter<DualValue<String, String>> resultRecordSubscriptionCounter;
584 
585     /** The task executor */
586     private final ITaskExecutor executor;
587 
588     /** The local context */
589     private final IFrameworkContext context;
590 
591     /**
592      * Sub-component that manages {@link RpcDefinition} instances registered by
593      * local application code. These are the RPCs for the local context.
594      */
595     private final IRpcRegistry registry;
596 
597     /**
598      * Holds the {@link IRpcResultHandler} to invoke with the {@link IRpcResult}
599      * for the matching RPC marker ID. The {@link IRpcDefinition} is also held
600      * with the result handler.
601      */
602     private final Map<Integer, DualValue<IRpcResultHandler, IRpcDefinition>> resultHandlers;
603 
604     /** Used to generate the marker ID for each successive RPC invocation */
605     private final AtomicInteger markerCounter;
606 
607     /** Holds RPC invocations for contexts not yet available */
608     private final MapList<String, QuadValue<String, IField[], IRpcResultHandler, IRpcMarker>> pendingRpcInvocations;
609 
610     /** The collection of result records keyed by remote context identity */
611     private final MapSet<String, IContainer> resultRecords;
612 
613     /** The currently connected contexts */
614     private final Set<String> connectedContexts;
615 
616     /** Holds the event handlers used for the RpcManager */
617     private MultiSystemEventListener eventHandler;
618 
619     /** The RPC publication listeners per remote context identity */
620     private final MapSet<String, IRpcPublicationListener> rpcPublicationListeners;
621 
622     /** The known collection of observed result records */
623     private final Set<String> observedResultRecords;
624 
625     /**
626      * Construct the state
627      * 
628      * @param context
629      *            the context
630      */
631     public RpcManagerState(IFrameworkContext context)
632     {
633         super();
634         this.context = context;
635         this.markerCounter = new AtomicInteger(0);
636         this.pendingRpcInvocations =
637             new MapList<String, QuadValue<String, IField[], IRpcResultHandler, IRpcMarker>>();
638         this.resultHandlers = CollectionFactory.newMap();
639         this.registry = new RpcRegistry(context);
640         this.resultRecords = new MapSet<String, IContainer>(1);
641         this.connectedContexts = CollectionFactory.newSet();
642         this.rpcPublicationListeners =
643             new MapSet<String, IRpcPublicationListener>();
644         this.resultRecordSubscriptionCounter =
645             new ReferenceCounter<DualValue<String, String>>(1);
646         this.observedResultRecords = CollectionFactory.newSet();
647         final String identity = context.getIdentity() + COLON + "RpcProcessor";
648         /*
649          * Note: we can't use any multi-threading for RPC processing because
650          * that might cause RPCs to be run out-of-order.
651          */
652         this.executor = new TaskExecutor(identity, context);
653     }
654 
655     @SuppressWarnings("unchecked")
656     public void init(IRpcManagerOperations operations)
657     {
658         this.eventHandler =
659             new MultiSystemEventListener(RpcManager.class.getSimpleName()
660                 + "EventHandler", getContext(),
661                 AbstractEventHandler.getEventHandlerMappings(
662                     new RpcConnectionDestroyedEventHandler(this),
663                     new RpcChannelReadyEventHandler(this, operations),
664                     new RpcResultHandler(this),
665                     new ResultRecordObservedHandler(this),
666                     new SendRpcEventHandler(this),
667                     new ResultRecordNotObservedHandler(this),
668                     new RpcInvokeHandler(this)));
669     }
670 
671     public IFrameworkContext getContext()
672     {
673         return this.context;
674     }
675 
676     public IRpcRegistry getRegistry()
677     {
678         return this.registry;
679     }
680 
681     public Map<Integer, DualValue<IRpcResultHandler, IRpcDefinition>> getResultHandlers()
682     {
683         return this.resultHandlers;
684     }
685 
686     public AtomicInteger getMarkerCounter()
687     {
688         return this.markerCounter;
689     }
690 
691     public MapList<String, QuadValue<String, IField[], IRpcResultHandler, IRpcMarker>> getPendingRpcInvocations()
692     {
693         return this.pendingRpcInvocations;
694     }
695 
696     public MapSet<String, IContainer> getResultRecords()
697     {
698         return this.resultRecords;
699     }
700 
701     public Set<String> getConnectedContexts()
702     {
703         return this.connectedContexts;
704     }
705 
706     public String getResultRecordName(String remoteContextIdentity,
707         String rpcKey)
708     {
709         return remoteContextIdentity + COLON + rpcKey;
710     }
711 
712     public MultiSystemEventListener getEventHandler()
713     {
714         return this.eventHandler;
715     }
716 
717     public MapSet<String, IRpcPublicationListener> getRpcPublicationListeners()
718     {
719         return this.rpcPublicationListeners;
720     }
721 
722     @Override
723     protected void doDestroy()
724     {
725         getEventHandler().destroy();
726         getRegistry().destroy();
727         getTaskExecutor().destroy();
728     }
729 
730     @Override
731     protected void doStart()
732     {
733         getTaskExecutor().start();
734         getEventHandler().start();
735         getRegistry().start();
736     }
737 
738     public IReferenceCounter<DualValue<String, String>> getResultRecordSubscriptionCounter()
739     {
740         return this.resultRecordSubscriptionCounter;
741     }
742 
743     public ITaskExecutor getTaskExecutor()
744     {
745         return this.executor;
746     }
747 
748     public Set<String> getObservedResultRecords()
749     {
750         return this.observedResultRecords;
751     }
752 }
753 
754 /**
755  * Base class for RPC event handlers used by the {@link RpcManager}
756  * 
757  * @author Ramon Servadei
758  * @param <EVENT>
759  *            the event type the handler works with
760  */
761 abstract class RpcEventHandler<EVENT extends IEvent> extends
762     AbstractEventHandler<EVENT> implements ISystemEventListener
763 {
764 
765     /** The state */
766     private final IRpcManagerState state;
767 
768     /**
769      * Standard constructor
770      * 
771      * @param state
772      *            the {@link RpcManager} state
773      */
774     RpcEventHandler(IRpcManagerState state)
775     {
776         super();
777         this.state = state;
778     }
779 
780     IRpcManagerState getState()
781     {
782         return state;
783     }
784 }
785 
786 /**
787  * Handler for {@link RpcInvokeEvent}s. This locates the appropriate
788  * {@link IRpcHandler} and calls it with the arguments encapsulated in the
789  * invoke event. The result from the handler is used to update the result
790  * record. The update triggers the record to be transmitted to the remote
791  * context that issued the RPC and the remote context will thus get the result
792  * from the RPC invocation.
793  * 
794  * @author Ramon Servadei
795  */
796 final class RpcInvokeHandler extends RpcEventHandler<RpcInvokeEvent> implements
797     ITaskHandler<RpcInvokeEvent>
798 {
799     private final static AsyncLog LOG = new AsyncLog(RpcInvokeHandler.class);
800 
801     RpcInvokeHandler(IRpcManagerState state)
802     {
803         super(state);
804     }
805 
806     /**
807      * Locates the {@link IRpcHandler} for the {@link RpcDefinition} identified
808      * by the RPC registry key in the event. The handler is executed and the
809      * result record attached to the remote context for the RPC definition is
810      * updated with the {@link IRpcResult} from the handler.
811      */
812     @Override
813     public void handle(RpcInvokeEvent event)
814     {
815         getState().getTaskExecutor().execute(
816             new Task<RpcInvokeEvent>(this, event));
817     }
818 
819     public void handleTask(RpcInvokeEvent event)
820     {
821         // decode first
822         event.decode(getState().getRegistry());
823 
824         // now check to see if the result record is being observed
825         final String resultRecordName =
826             getState().getResultRecordName(event.getRemoteContextIdentity(),
827                 event.getRpcKey());
828         final IContainer resultRecord =
829             getState().getContext().getLocalContainer(resultRecordName,
830                 Type.SYSTEM, Domain.FRAMEWORK);
831         /*
832          * We have to block until we get notified that the result record is
833          * being observed. We can't execute any other RPCs during this time
834          * because we must preserve the RPC invocation order.
835          */
836         synchronized (getState().getObservedResultRecords())
837         {
838             if (!getState().getObservedResultRecords().contains(
839                 resultRecordName))
840             {
841                 if (getLog().isInfoEnabled())
842                 {
843                     getLog().info(
844                         "Waiting for result record " + resultRecordName
845                             + " to be observed");
846                 }
847                 try
848                 {
849                     getState().getObservedResultRecords().wait();
850                 }
851                 catch (InterruptedException e)
852                 {
853                     logException(getLog(), "Waiting for " + resultRecordName, e);
854                 }
855             }
856         }
857 
858         IRpcResult result = null;
859         try
860         {
861             String rpcKey = event.getRpcKey();
862             final IRpcHandler handler =
863                 getState().getRegistry().getHandler(rpcKey);
864             if (handler == null)
865             {
866                 final String message = "No hander for " + safeToString(event);
867                 if (getLog().isDebugEnabled())
868                 {
869                     getLog().debug(message);
870                 }
871                 result = new RpcResult(false, null, message);
872             }
873             else
874             {
875                 IRpcDefinition definition =
876                     getState().getRegistry().getDefinition(rpcKey);
877                 if (definition == null)
878                 {
879                     final String message =
880                         "No definition for " + safeToString(event);
881                     if (getLog().isDebugEnabled())
882                     {
883                         getLog().debug(message);
884                     }
885                     result = new RpcResult(false, null, message);
886                 }
887                 else
888                 {
889                     try
890                     {
891                         if (getLog().isDebugEnabled())
892                         {
893                             getLog().debug(
894                                 "Invoking " + safeToString(definition)
895                                     + " with arguments "
896                                     + Arrays.deepToString(event.getArguments()));
897                         }
898                         result =
899                             handler.handle(definition, event.getArguments());
900                     }
901                     catch (Exception e)
902                     {
903                         logException(getLog(), "Handling RPC invoke for "
904                             + safeToString(definition) + " with "
905                             + Arrays.deepToString(event.getArguments())
906                             + " from " + event.getRemoteContextIdentity(), e);
907                         result = new RpcResult(false, null, e.toString());
908                     }
909 
910                 }
911             }
912         }
913         catch (Exception e)
914         {
915             logException(getLog(), "Could not handle " + safeToString(event), e);
916             result = new RpcResult(false, null, e.toString());
917         }
918 
919         // send the result to the invoker, ensure the result record is held
920         // against the remote context
921         getState().getResultRecords().get(event.getRemoteContextIdentity()).add(
922             resultRecord);
923 
924         // now update the result record
925         resultRecord.beginFrame(new EventFrameExecution());
926         try
927         {
928             result.updateResultRecord(event.getMarker(), resultRecord);
929         }
930         finally
931         {
932             // this will trigger the update to the remote context
933             resultRecord.endFrame();
934         }
935     }
936 
937     @Override
938     protected AsyncLog getLog()
939     {
940         return LOG;
941     }
942 }
943 
944 /**
945  * Handler for result records (represented as {@link IContainer} instances).
946  * When an update event occurs for a result record, the marker ID for the RPC
947  * invocation is extracted from the result record and this is used to locate the
948  * {@link IRpcResultHandler}. An {@link IRpcResult} is then constructed from the
949  * result record and is passed to the result handler.
950  * 
951  * @author Ramon Servadei
952  */
953 final class RpcResultHandler extends RpcEventHandler<IContainer> implements
954     ITaskHandler<IContainer>
955 {
956     private final static AsyncLog LOG = new AsyncLog(RpcResultHandler.class);
957 
958     RpcResultHandler(IRpcManagerState state)
959     {
960         super(state);
961     }
962 
963     /**
964      * Finds the {@link IRpcResultHandler} from {@link #getResultHandlers()} and
965      * notifies it with the result contained within the result record (the
966      * event).
967      */
968     @Override
969     public void handle(IContainer event)
970     {
971         getState().getTaskExecutor().execute(new Task<IContainer>(this, event));
972     }
973 
974     @Override
975     protected AsyncLog getLog()
976     {
977         return LOG;
978     }
979 
980     @SuppressWarnings("boxing")
981     public void handleTask(IContainer event)
982     {
983         if (event.getDataState() != DataState.LIVE)
984         {
985             if (getLog().isDebugEnabled())
986             {
987                 getLog().debug(
988                     "Waiting for result record to become active " + event);
989             }
990             return;
991         }
992         final IntegerField markerIdField =
993             event.getIntegerField(IRpcResult.MARKER);
994         if (markerIdField == null)
995         {
996             if (getLog().isDebugEnabled())
997             {
998                 getLog().debug(
999                     "Waiting for result record to be populated " + event);
1000             }
1001             return;
1002         }
1003         final int markerId = markerIdField.get();
1004         final DualValue<IRpcResultHandler, IRpcDefinition> result;
1005         final Map<Integer, DualValue<IRpcResultHandler, IRpcDefinition>> resultHandlers =
1006             getState().getResultHandlers();
1007         synchronized (resultHandlers)
1008         {
1009             result = resultHandlers.remove(markerId);
1010         }
1011         if (result == null)
1012         {
1013             if (getLog().isDebugEnabled())
1014             {
1015                 getLog().debug("No result handler found for " + event);
1016             }
1017             return;
1018         }
1019         IRpcMarker marker = new RpcMarker(markerId);
1020         if (result.getSecond() == null)
1021         {
1022             throw new IllegalStateException("No definition found for "
1023                 + safeToString(marker) + ", " + safeToString(result));
1024         }
1025         result.getFirst().resultReceived(
1026             new RpcResult(result.getSecond(), event), marker);
1027 
1028         // decrement the reference count for this result record, if it is 0,
1029         // then unsubscribe
1030 
1031         final int count;
1032         final IReferenceCounter<DualValue<String, String>> counter =
1033             getState().getResultRecordSubscriptionCounter();
1034         synchronized (counter)
1035         {
1036             count =
1037                 counter.adjustCount(new DualValue<String, String>(
1038                     event.getNativeContextIdentity(), event.getIdentity()), -1);
1039         }
1040         if (count == 0)
1041         {
1042             // todo should we just leave subscribed?
1043             getState().getContext().unsubscribe(
1044                 event.getNativeContextIdentity(), event.getIdentity(),
1045                 Type.SYSTEM, Domain.FRAMEWORK, getState().getEventHandler());
1046         }
1047     }
1048 }
1049 
1050 /**
1051  * Handles {@link ConnectionDestroyedEvent}s. This will destroy any result
1052  * records associated with the remote context the connection was for. The reason
1053  * for this handler is to prevent memory leaks occurring from stale result
1054  * records not being purged when remote contexts are destroyed.
1055  * 
1056  * @author Ramon Servadei
1057  */
1058 final class RpcConnectionDestroyedEventHandler extends
1059     RpcEventHandler<ConnectionDestroyedEvent>
1060 {
1061     private final static AsyncLog LOG =
1062         new AsyncLog(RpcConnectionDestroyedEventHandler.class);
1063 
1064     RpcConnectionDestroyedEventHandler(IRpcManagerState state)
1065     {
1066         super(state);
1067     }
1068 
1069     /**
1070      * Find all result records associated with the remote context the connection
1071      * was for and destroy them.
1072      */
1073     @Override
1074     public void handle(ConnectionDestroyedEvent event)
1075     {
1076         if (getLog().isDebugEnabled())
1077         {
1078             getLog().debug(
1079                 "Destroying result records attached to remote context "
1080                     + event.getRemoteContextIdentity());
1081         }
1082         final Set<IContainer> resultRecords;
1083         final Set<String> connectedContexts = getState().getConnectedContexts();
1084         synchronized (connectedContexts)
1085         {
1086             resultRecords =
1087                 getState().getResultRecords().remove(
1088                     event.getRemoteContextIdentity());
1089             connectedContexts.remove(event.getRemoteContextIdentity());
1090         }
1091         if (resultRecords != null)
1092         {
1093             for (IContainer container : resultRecords)
1094             {
1095                 container.destroy();
1096             }
1097         }
1098     }
1099 
1100     @Override
1101     protected AsyncLog getLog()
1102     {
1103         return LOG;
1104     }
1105 }
1106 
1107 /**
1108  * Handler for {@link ChannelReadyEvent}s. This keeps track of available
1109  * contexts and triggers any pending RPC invocations for available contexts.
1110  * 
1111  * @author Ramon Servadei
1112  */
1113 final class RpcChannelReadyEventHandler extends
1114     RpcEventHandler<ChannelReadyEvent>
1115 {
1116     private final static AsyncLog LOG =
1117         new AsyncLog(RpcChannelReadyEventHandler.class);
1118 
1119     final IRpcManagerOperations operations;
1120 
1121     RpcChannelReadyEventHandler(IRpcManagerState state,
1122         IRpcManagerOperations operations)
1123     {
1124         super(state);
1125         this.operations = operations;
1126     }
1127 
1128     /**
1129      * This keeps track of available contexts and triggers any pending RPC
1130      * invocations for available contexts.
1131      */
1132     @Override
1133     public void handle(ChannelReadyEvent event)
1134     {
1135         final String remoteContextIdentity =
1136             event.getChannel().getRemoteContextIdentity();
1137         final List<QuadValue<String, IField[], IRpcResultHandler, IRpcMarker>> pendingRpcs;
1138         final Set<String> connectedContexts = getState().getConnectedContexts();
1139         // remove any RPCs destined for the new context and re-invoke them
1140         synchronized (connectedContexts)
1141         {
1142             connectedContexts.add(remoteContextIdentity);
1143             pendingRpcs =
1144                 getState().getPendingRpcInvocations().remove(
1145                     remoteContextIdentity);
1146         }
1147         if (pendingRpcs == null)
1148         {
1149             return;
1150         }
1151         if (getLog().isDebugEnabled())
1152         {
1153             getLog().debug(
1154                 "Invoking " + pendingRpcs.size() + " RPCs for "
1155                     + remoteContextIdentity);
1156         }
1157         for (QuadValue<String, IField[], IRpcResultHandler, IRpcMarker> rpc : pendingRpcs)
1158         {
1159             final DualValue<String, IRpcDefinition> keyAndDefinition =
1160                 getOperations().getRegistryKeyAndDefinition(
1161                     remoteContextIdentity, rpc.getFirst(), rpc.getSecond());
1162             getOperations().invoke(remoteContextIdentity,
1163                 keyAndDefinition.getFirst(), keyAndDefinition.getSecond(),
1164                 rpc.getSecond(), rpc.getThird(), rpc.getFourth());
1165         }
1166     }
1167 
1168     @Override
1169     protected AsyncLog getLog()
1170     {
1171         return LOG;
1172     }
1173 
1174     private IRpcManagerOperations getOperations()
1175     {
1176         return this.operations;
1177     }
1178 
1179 }
1180 
1181 /**
1182  * Base class for handling {@link ContainerFieldAddedEvent} and
1183  * {@link ContainerFieldRemovedEvent}s. This holds the standard logic required
1184  * for each sub-class.
1185  * 
1186  * @author Ramon Servadei
1187  */
1188 abstract class AbstractRpcPublicationHandler<T extends AbstractContainerFieldEvent>
1189     extends RpcEventHandler<T>
1190 {
1191     private final IRpcManagerOperations operations;
1192 
1193     AbstractRpcPublicationHandler(IRpcManagerState state,
1194         IRpcManagerOperations operations)
1195     {
1196         super(state);
1197         this.operations = operations;
1198     }
1199 
1200     @Override
1201     public void handle(T event)
1202     {
1203         final IField field = event.getField();
1204         final String identity = field.getIdentity();
1205         if (identity.indexOf(IRpcRegistry.RPC_KEY) > -1)
1206         {
1207             if (!(field instanceof StringField))
1208             {
1209                 return;
1210             }
1211             IContainer container = (IContainer) event.getSource();
1212             final Set<IRpcPublicationListener> listeners;
1213             // get the listeners for this remote context
1214             final MapSet<String, IRpcPublicationListener> rpcPublicationListeners =
1215                 getState().getRpcPublicationListeners();
1216             synchronized (rpcPublicationListeners)
1217             {
1218                 listeners =
1219                     rpcPublicationListeners.get(container.getNativeContextIdentity());
1220             }
1221             final String definitionAsString = ((StringField) field).get();
1222             IRpcDefinition rpcDefinition =
1223                 new RpcDefinition(definitionAsString);
1224             for (IRpcPublicationListener listener : listeners)
1225             {
1226                 doAction(container, rpcDefinition, listener);
1227             }
1228         }
1229     }
1230 
1231     IRpcManagerOperations getOperations()
1232     {
1233         return this.operations;
1234     }
1235 
1236     /**
1237      * Perform the action for a change in an {@link IRpcDefinition}
1238      * 
1239      * @param container
1240      *            the remote RPC registry
1241      * @param rpcDefinition
1242      *            the RPC definition
1243      * @param listener
1244      *            the {@link IRpcPublicationListener} to notify
1245      */
1246     abstract void doAction(IContainer container, IRpcDefinition rpcDefinition,
1247         IRpcPublicationListener listener);
1248 }
1249 
1250 /**
1251  * Handler for {@link ContainerFieldAddedEvent}s. Checks if the field is an RPC
1252  * definition and if it is this will notify all {@link IRpcPublicationListener}
1253  * instances with the published RPC.
1254  * 
1255  * @author Ramon Servadei
1256  */
1257 final class RpcPublishedHandler extends
1258     AbstractRpcPublicationHandler<ContainerFieldAddedEvent>
1259 {
1260     private final static AsyncLog LOG = new AsyncLog(RpcPublishedHandler.class);
1261 
1262     RpcPublishedHandler(IRpcManagerState state, IRpcManagerOperations operations)
1263     {
1264         super(state, operations);
1265     }
1266 
1267     @Override
1268     protected AsyncLog getLog()
1269     {
1270         return LOG;
1271     }
1272 
1273     @Override
1274     void doAction(IContainer container, IRpcDefinition rpcDefinition,
1275         IRpcPublicationListener listener)
1276     {
1277         final String remoteContextIdentity =
1278             container.getNativeContextIdentity();
1279         listener.procedureAvailable(remoteContextIdentity, rpcDefinition);
1280 
1281         final List<QuadValue<String, IField[], IRpcResultHandler, IRpcMarker>> pendingRpcs;
1282         final Set<String> connectedContexts = getState().getConnectedContexts();
1283         final List<QuadValue<String, IField[], IRpcResultHandler, IRpcMarker>> toInvoke =
1284             CollectionFactory.newList();
1285         // find any invocations for this RPC that was put onto the pending queue
1286         synchronized (connectedContexts)
1287         {
1288             pendingRpcs =
1289                 getState().getPendingRpcInvocations().get(remoteContextIdentity);
1290             // none found, so return
1291             if (pendingRpcs == null)
1292             {
1293                 return;
1294             }
1295             for (Iterator<QuadValue<String, IField[], IRpcResultHandler, IRpcMarker>> iterator =
1296                 pendingRpcs.iterator(); iterator.hasNext();)
1297             {
1298                 QuadValue<String, IField[], IRpcResultHandler, IRpcMarker> rpc =
1299                     iterator.next();
1300                 if (rpcDefinition.getName().equals(rpc.getFirst())
1301                     && Arrays.equals(rpcDefinition.getArgumentTypes(),
1302                         RpcUtils.getArgumentTypes(rpc.getSecond())))
1303                 {
1304                     iterator.remove();
1305                     toInvoke.add(rpc);
1306                 }
1307             }
1308         }
1309         if (getLog().isDebugEnabled())
1310         {
1311             getLog().debug(
1312                 "Invoking " + toInvoke.size()
1313                     + " awaiting calls for newly available RPC "
1314                     + rpcDefinition + " from " + remoteContextIdentity);
1315         }
1316         // now re-invoke the found ones
1317         for (QuadValue<String, IField[], IRpcResultHandler, IRpcMarker> rpc : toInvoke)
1318         {
1319             final DualValue<String, IRpcDefinition> keyAndDefinition =
1320                 getOperations().getRegistryKeyAndDefinition(
1321                     remoteContextIdentity, rpc.getFirst(), rpc.getSecond());
1322             getOperations().invoke(remoteContextIdentity,
1323                 keyAndDefinition.getFirst(), keyAndDefinition.getSecond(),
1324                 rpc.getSecond(), rpc.getThird(), rpc.getFourth());
1325         }
1326     }
1327 }
1328 
1329 /**
1330  * Handler for {@link ContainerFieldRemovedEvent}s. Checks if the field is an
1331  * RPC definition and if it is this will notify all
1332  * {@link IRpcPublicationListener} instances with the unpublished RPC.
1333  * 
1334  * @author Ramon Servadei
1335  */
1336 final class RpcUnpublishedHandler extends
1337     AbstractRpcPublicationHandler<ContainerFieldRemovedEvent>
1338 {
1339     private final static AsyncLog LOG = new AsyncLog(RpcPublishedHandler.class);
1340 
1341     RpcUnpublishedHandler(IRpcManagerState state,
1342         IRpcManagerOperations operations)
1343     {
1344         super(state, operations);
1345     }
1346 
1347     @Override
1348     protected AsyncLog getLog()
1349     {
1350         return LOG;
1351     }
1352 
1353     void doAction(IContainer container, IRpcDefinition rpcDefinition,
1354         IRpcPublicationListener listener)
1355     {
1356         listener.procedureUnavailable(container.getNativeContextIdentity(),
1357             rpcDefinition);
1358     }
1359 }
1360 
1361 /**
1362  * Handles the {@link SendRpcEvent}s
1363  * 
1364  * @author Ramon Servadei
1365  */
1366 final class SendRpcEventHandler extends RpcEventHandler<SendRpcEvent>
1367 {
1368     private final static AsyncLog LOG = new AsyncLog(SendRpcEventHandler.class);
1369 
1370     SendRpcEventHandler(IRpcManagerState state)
1371     {
1372         super(state);
1373     }
1374 
1375     @Override
1376     protected AsyncLog getLog()
1377     {
1378         return LOG;
1379     }
1380 
1381     /**
1382      * Invoke the RPC by calling
1383      * {@link IFulmineContext#invokeRpc(String, byte[])}
1384      */
1385     @Override
1386     public void handle(SendRpcEvent event)
1387     {
1388         if (LOG.isDebugEnabled())
1389         {
1390             LOG.debug("invoking " + event);
1391         }
1392         getState().getContext().invokeRpc(event.getRemoteContextIdentity(),
1393             event.getRpcData());
1394     }
1395 }
1396 
1397 /**
1398  * If a result record is observed, adds the record name to a list of known
1399  * active result records.
1400  * 
1401  * @see RpcInvokeHandler#handleTask(RpcInvokeEvent)
1402  * @author Ramon Servadei
1403  */
1404 final class ResultRecordObservedHandler extends
1405     RpcEventHandler<EventSourceObservedEvent>
1406 {
1407     private final static AsyncLog LOG =
1408         new AsyncLog(ResultRecordObservedHandler.class);
1409 
1410     ResultRecordObservedHandler(IRpcManagerState state)
1411     {
1412         super(state);
1413     }
1414 
1415     @Override
1416     protected AsyncLog getLog()
1417     {
1418         return LOG;
1419     }
1420 
1421     @Override
1422     public void handle(final EventSourceObservedEvent event)
1423     {
1424         if (event.getIdentity().contains(IRpcRegistry.RPC_KEY))
1425         {
1426             synchronized (getState().getObservedResultRecords())
1427             {
1428                 getState().getObservedResultRecords().add(event.getIdentity());
1429                 getState().getObservedResultRecords().notify();
1430             }
1431         }
1432     }
1433 }
1434 
1435 /**
1436  * Complimentary to {@link ResultRecordObservedHandler}. This removes the result
1437  * record from the collection of observed result records.
1438  * 
1439  * @author Ramon Servadei
1440  */
1441 final class ResultRecordNotObservedHandler extends
1442     RpcEventHandler<EventSourceNotObservedEvent>
1443 {
1444     private final static AsyncLog LOG =
1445         new AsyncLog(ResultRecordNotObservedHandler.class);
1446 
1447     ResultRecordNotObservedHandler(IRpcManagerState state)
1448     {
1449         super(state);
1450     }
1451 
1452     @Override
1453     protected AsyncLog getLog()
1454     {
1455         return LOG;
1456     }
1457 
1458     @Override
1459     public void handle(EventSourceNotObservedEvent event)
1460     {
1461         if (event.getSource().getIdentity().contains(IRpcRegistry.RPC_KEY))
1462         {
1463             synchronized (getState().getObservedResultRecords())
1464             {
1465                 getState().getObservedResultRecords().remove(
1466                     event.getSource().getIdentity());
1467             }
1468         }
1469     }
1470 }