1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package fulmine.context;
17
18 import static fulmine.util.Utils.COLON;
19 import static fulmine.util.Utils.logException;
20 import static fulmine.util.Utils.safeToString;
21
22 import java.util.Arrays;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29 import fulmine.AbstractLifeCycle;
30 import fulmine.Domain;
31 import fulmine.Type;
32 import fulmine.distribution.channel.ChannelReadyEvent;
33 import fulmine.distribution.events.ConnectionDestroyedEvent;
34 import fulmine.event.EventFrameExecution;
35 import fulmine.event.IEvent;
36 import fulmine.event.listener.AbstractEventHandler;
37 import fulmine.event.listener.IEventListener;
38 import fulmine.event.listener.MultiEventListener;
39 import fulmine.event.listener.MultiSystemEventListener;
40 import fulmine.event.system.EventSourceNotObservedEvent;
41 import fulmine.event.system.EventSourceObservedEvent;
42 import fulmine.event.system.ISystemEventListener;
43 import fulmine.model.container.IContainer;
44 import fulmine.model.container.IContainer.DataState;
45 import fulmine.model.container.events.AbstractContainerFieldEvent;
46 import fulmine.model.container.events.ContainerFieldAddedEvent;
47 import fulmine.model.container.events.ContainerFieldRemovedEvent;
48 import fulmine.model.field.IField;
49 import fulmine.model.field.IntegerField;
50 import fulmine.model.field.StringField;
51 import fulmine.rpc.IRpcDefinition;
52 import fulmine.rpc.IRpcHandler;
53 import fulmine.rpc.IRpcManager;
54 import fulmine.rpc.IRpcMarker;
55 import fulmine.rpc.IRpcPublicationListener;
56 import fulmine.rpc.IRpcRegistry;
57 import fulmine.rpc.IRpcResult;
58 import fulmine.rpc.IRpcResultHandler;
59 import fulmine.rpc.RpcCodec;
60 import fulmine.rpc.RpcDefinition;
61 import fulmine.rpc.RpcMarker;
62 import fulmine.rpc.RpcRegistry;
63 import fulmine.rpc.RpcResult;
64 import fulmine.rpc.RpcUtils;
65 import fulmine.rpc.events.RpcInvokeEvent;
66 import fulmine.rpc.events.SendRpcEvent;
67 import fulmine.util.Utils;
68 import fulmine.util.collection.CollectionFactory;
69 import fulmine.util.collection.MapList;
70 import fulmine.util.collection.MapSet;
71 import fulmine.util.concurrent.ITaskExecutor;
72 import fulmine.util.concurrent.ITaskHandler;
73 import fulmine.util.concurrent.Task;
74 import fulmine.util.concurrent.TaskExecutor;
75 import fulmine.util.log.AsyncLog;
76 import fulmine.util.reference.DualValue;
77 import fulmine.util.reference.IReferenceCounter;
78 import fulmine.util.reference.QuadValue;
79 import fulmine.util.reference.ReferenceCounter;
80 import fulmine.util.reference.Value;
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155 public final class RpcManager extends AbstractLifeCycle implements IRpcManager,
156 IRpcManagerOperations
157 {
158 private final static AsyncLog LOG = new AsyncLog(RpcManager.class);
159
160
161
162
163
164 private final int TIMEOUT =
165 Integer.parseInt(System.getProperty(IRpcManager.RPC_TIMEOUT,
166 IRpcManager.DEFAULT_RPC_TIMEOUT));
167
168
169
170
171
172
173
174
175
176 private final class ResultHandler implements IRpcResultHandler
177 {
178 private final Value<IRpcMarker> markerValue;
179
180 private final Value<IRpcResult> resultValue;
181
182 private ResultHandler(Value<IRpcMarker> markerValue,
183 Value<IRpcResult> resultValue)
184 {
185 this.markerValue = markerValue;
186 this.resultValue = resultValue;
187 }
188
189 public void resultReceived(IRpcResult result, IRpcMarker marker)
190 {
191 if (marker.equals(this.markerValue.get()))
192 {
193 this.resultValue.set(result);
194 synchronized (this.resultValue)
195 {
196 this.resultValue.notifyAll();
197 }
198 }
199 }
200 }
201
202
203 private final IRpcManagerState state;
204
205
206
207
208
209
210 private final MultiEventListener rpcPublicationListener;
211
212
213
214
215
216
217
218 RpcManager(IFrameworkContext context)
219 {
220 this(new RpcManagerState(context));
221 }
222
223
224
225
226
227
228
229 @SuppressWarnings("unchecked")
230 RpcManager(IRpcManagerState state)
231 {
232 super();
233 this.state = state;
234 this.rpcPublicationListener =
235 new MultiEventListener(RpcManager.class.getSimpleName(),
236 getState().getContext(),
237 AbstractEventHandler.getEventHandlerMappings(
238 new RpcPublishedHandler(getState(), this),
239 new RpcUnpublishedHandler(getState(), this)));
240 }
241
242 public IRpcResult invoke(String remoteContextIdentity, String procedure,
243 IField... args)
244 {
245 final Value<IRpcResult> resultValue = new Value<IRpcResult>();
246 final Value<IRpcMarker> markerValue = new Value<IRpcMarker>();
247 synchronized (resultValue)
248 {
249 final IRpcMarker marker =
250 invoke(new ResultHandler(markerValue, resultValue),
251 remoteContextIdentity, procedure, args);
252 markerValue.set(marker);
253 if (resultValue.get() == null)
254 {
255 try
256 {
257
258 resultValue.wait(TIMEOUT);
259 }
260 catch (InterruptedException e)
261 {
262 logException(getLog(), "RPC name=" + procedure + ", args="
263 + Arrays.deepToString(args) + ", context="
264 + remoteContextIdentity, e);
265 }
266 }
267 if (resultValue.get() == null)
268 {
269 Utils.logException(getLog(), "TIMEOUT for RPC name="
270 + procedure + ", args=" + Arrays.deepToString(args)
271 + ", context=" + remoteContextIdentity, new Exception());
272 }
273 }
274 return resultValue.get();
275 }
276
277 public IRpcMarker invoke(IRpcResultHandler resultHandler,
278 String remoteContextIdentity, String procedure, IField... args)
279 {
280 IRpcMarker rpcMarker =
281 new RpcMarker(getState().getMarkerCounter().getAndIncrement());
282 DualValue<String, IRpcDefinition> keyAndDefinition =
283 getRegistryKeyAndDefinition(remoteContextIdentity, procedure, args);
284
285
286
287
288
289 if (keyAndDefinition == null)
290 {
291
292 final Set<String> connectedContexts =
293 getState().getConnectedContexts();
294 synchronized (connectedContexts)
295 {
296
297 boolean contextExists = false;
298 if (connectedContexts.contains(remoteContextIdentity))
299 {
300 contextExists = true;
301 }
302 getState().getPendingRpcInvocations().get(remoteContextIdentity).add(
303 new QuadValue<String, IField[], IRpcResultHandler, IRpcMarker>(
304 procedure, args, resultHandler, rpcMarker));
305 if (getLog().isDebugEnabled())
306 {
307 getLog().debug(
308 (contextExists ? "RPC for " + remoteContextIdentity
309 : "Remote context " + remoteContextIdentity)
310 + " is not yet available so placing RPC {name="
311 + safeToString(procedure)
312 + ", args="
313 + Arrays.deepToString(args)
314 + "} onto the pending RPC queue.");
315 }
316 }
317 return rpcMarker;
318 }
319
320
321 invoke(remoteContextIdentity, keyAndDefinition.getFirst(),
322 keyAndDefinition.getSecond(), args, resultHandler, rpcMarker);
323
324 return rpcMarker;
325 }
326
327 @SuppressWarnings("boxing")
328 public void invoke(String remoteContextIdentity, String rpcKey,
329 IRpcDefinition definition, IField[] args,
330 IRpcResultHandler resultHandler, IRpcMarker marker)
331 {
332 if (getLog().isDebugEnabled())
333 {
334 getLog().debug(
335 "Invoking " + safeToString(definition) + " with args="
336 + Arrays.deepToString(args) + ", "
337 + safeToString(resultHandler) + ", " + safeToString(marker));
338 }
339
340
341
342 final String resultRecordName =
343 getState().getResultRecordName(
344 getState().getContext().getIdentity(), rpcKey);
345 final int count;
346 final IReferenceCounter<DualValue<String, String>> counter =
347 getState().getResultRecordSubscriptionCounter();
348 synchronized (counter)
349 {
350 count =
351 counter.adjustCount(new DualValue<String, String>(
352 remoteContextIdentity, resultRecordName), 1);
353 }
354 if (count == 1)
355 {
356
357 getState().getContext().subscribe(remoteContextIdentity,
358 resultRecordName, Type.SYSTEM, Domain.FRAMEWORK,
359 getState().getEventHandler());
360 }
361
362
363 byte[] data =
364 new RpcCodec(getState().getRegistry()).encode(marker, rpcKey,
365 getState().getContext().getIdentity(), args);
366 final Map<Integer, DualValue<IRpcResultHandler, IRpcDefinition>> resultHandlers =
367 getState().getResultHandlers();
368 synchronized (resultHandlers)
369 {
370 resultHandlers.put(marker.getId(),
371 new DualValue<IRpcResultHandler, IRpcDefinition>(resultHandler,
372 definition));
373 }
374
375
376
377
378 getState().getContext().queueEvent(
379 new SendRpcEvent(getState().getContext(), remoteContextIdentity,
380 data));
381 }
382
383 public boolean publishProdedure(IRpcHandler handler,
384 IRpcDefinition rpcDefinition)
385 {
386 return getState().getRegistry().publishProdedure(handler, rpcDefinition);
387 }
388
389 public boolean unpublishProdedure(IRpcDefinition rpcDefinition)
390 {
391 return getState().getRegistry().unpublishProdedure(rpcDefinition);
392 }
393
394 public boolean addRpcPublicationListener(String remoteContextIdentity,
395 IRpcPublicationListener listener)
396 {
397 boolean subscribe = false;
398 final boolean added;
399 final MapSet<String, IRpcPublicationListener> rpcPublicationListeners =
400 getState().getRpcPublicationListeners();
401 synchronized (rpcPublicationListeners)
402 {
403 final Set<IRpcPublicationListener> listenersForContext =
404 rpcPublicationListeners.get(remoteContextIdentity);
405 added = listenersForContext.add(listener);
406 if (added)
407 {
408
409 rpcPublicationListeners.put(remoteContextIdentity,
410 CollectionFactory.newSet(listenersForContext));
411 subscribe = listenersForContext.size() == 1;
412 }
413 }
414 if (subscribe)
415 {
416
417 getState().getContext().subscribe(remoteContextIdentity,
418 IRpcRegistry.RPC_REGISTRY, Type.SYSTEM, Domain.FRAMEWORK,
419 getRpcPublicationListener());
420 }
421
422
423
424
425 final IContainer remoteContainer =
426 getState().getContext().getRemoteContainer(remoteContextIdentity,
427 IRpcRegistry.RPC_REGISTRY, Type.SYSTEM, Domain.FRAMEWORK);
428 final String[] componentIdentities =
429 remoteContainer.getComponentIdentities();
430 for (String identity : componentIdentities)
431 {
432 if (identity.indexOf(IRpcRegistry.RPC_KEY) > -1)
433 {
434 final IField field = remoteContainer.get(identity);
435 if (field instanceof StringField)
436 {
437 final String definitionAsString =
438 ((StringField) field).get();
439 IRpcDefinition rpcDefinition =
440 new RpcDefinition(definitionAsString);
441 listener.procedureAvailable(remoteContextIdentity,
442 rpcDefinition);
443 }
444 }
445 }
446 return added;
447 }
448
449 public boolean removeRpcPublicationListener(String remoteContextIdentity,
450 IRpcPublicationListener listener)
451 {
452 boolean unsubscribe;
453 boolean removed;
454 final MapSet<String, IRpcPublicationListener> rpcPublicationListeners =
455 getState().getRpcPublicationListeners();
456 synchronized (rpcPublicationListeners)
457 {
458 unsubscribe = false;
459 final Set<IRpcPublicationListener> listenersForContext =
460 rpcPublicationListeners.get(remoteContextIdentity);
461 removed = listenersForContext.remove(listener);
462 if (removed)
463 {
464
465 rpcPublicationListeners.put(remoteContextIdentity,
466 CollectionFactory.newSet(listenersForContext));
467 }
468 unsubscribe = listenersForContext.size() == 0;
469 }
470 if (unsubscribe)
471 {
472 getState().getContext().unsubscribe(remoteContextIdentity,
473 IRpcRegistry.RPC_REGISTRY, Type.SYSTEM, Domain.FRAMEWORK,
474 getRpcPublicationListener());
475 }
476 return removed;
477 }
478
479 @Override
480 protected void doDestroy()
481 {
482 getState().destroy();
483 }
484
485 @Override
486 protected void doStart()
487 {
488 getState().init(this);
489 getState().start();
490 }
491
492 @Override
493 protected AsyncLog getLog()
494 {
495 return LOG;
496 }
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512 public DualValue<String, IRpcDefinition> getRegistryKeyAndDefinition(
513 String remoteContextIdentity, String procedure, IField[] args)
514 {
515 if (!getState().getContext().containsRemoteContainer(
516 remoteContextIdentity, IRpcRegistry.RPC_REGISTRY, Type.SYSTEM,
517 Domain.FRAMEWORK))
518 {
519 return null;
520 }
521
522 String rpcSignature = RpcUtils.getSignature(procedure, args);
523 final IContainer registry =
524 getState().getContext().getRemoteContainer(remoteContextIdentity,
525 IRpcRegistry.RPC_REGISTRY, Type.SYSTEM, Domain.FRAMEWORK);
526 final String[] componentIdentities = registry.getComponentIdentities();
527 for (String fieldId : componentIdentities)
528 {
529 if (fieldId.startsWith(IRpcRegistry.RPC_KEY))
530 {
531
532
533 final IField field = registry.get(fieldId);
534 if (field instanceof StringField)
535 {
536 final String definitionAsString =
537 ((StringField) field).get();
538 if (definitionAsString.contains(rpcSignature))
539 {
540 return new DualValue<String, IRpcDefinition>(fieldId,
541 new RpcDefinition(definitionAsString));
542 }
543 }
544 }
545 }
546 return null;
547 }
548
549
550
551
552
553
554 private IRpcManagerState getState()
555 {
556 return this.state;
557 }
558
559 private MultiEventListener getRpcPublicationListener()
560 {
561 return this.rpcPublicationListener;
562 }
563
564 public boolean unpublishRpcs(Class<?> definition, Object handler)
565 {
566 return getState().getRegistry().unpublishRpcs(definition, handler);
567 }
568
569 public boolean publishRpcs(Class<?> definition, Object handler)
570 {
571 return getState().getRegistry().publishRpcs(definition, handler);
572 }
573 }
574
575
576
577
578
579
580 final class RpcManagerState extends AbstractLifeCycle implements
581 IRpcManagerState
582 {
583 private final IReferenceCounter<DualValue<String, String>> resultRecordSubscriptionCounter;
584
585
586 private final ITaskExecutor executor;
587
588
589 private final IFrameworkContext context;
590
591
592
593
594
595 private final IRpcRegistry registry;
596
597
598
599
600
601
602 private final Map<Integer, DualValue<IRpcResultHandler, IRpcDefinition>> resultHandlers;
603
604
605 private final AtomicInteger markerCounter;
606
607
608 private final MapList<String, QuadValue<String, IField[], IRpcResultHandler, IRpcMarker>> pendingRpcInvocations;
609
610
611 private final MapSet<String, IContainer> resultRecords;
612
613
614 private final Set<String> connectedContexts;
615
616
617 private MultiSystemEventListener eventHandler;
618
619
620 private final MapSet<String, IRpcPublicationListener> rpcPublicationListeners;
621
622
623 private final Set<String> observedResultRecords;
624
625
626
627
628
629
630
631 public RpcManagerState(IFrameworkContext context)
632 {
633 super();
634 this.context = context;
635 this.markerCounter = new AtomicInteger(0);
636 this.pendingRpcInvocations =
637 new MapList<String, QuadValue<String, IField[], IRpcResultHandler, IRpcMarker>>();
638 this.resultHandlers = CollectionFactory.newMap();
639 this.registry = new RpcRegistry(context);
640 this.resultRecords = new MapSet<String, IContainer>(1);
641 this.connectedContexts = CollectionFactory.newSet();
642 this.rpcPublicationListeners =
643 new MapSet<String, IRpcPublicationListener>();
644 this.resultRecordSubscriptionCounter =
645 new ReferenceCounter<DualValue<String, String>>(1);
646 this.observedResultRecords = CollectionFactory.newSet();
647 final String identity = context.getIdentity() + COLON + "RpcProcessor";
648
649
650
651
652 this.executor = new TaskExecutor(identity, context);
653 }
654
655 @SuppressWarnings("unchecked")
656 public void init(IRpcManagerOperations operations)
657 {
658 this.eventHandler =
659 new MultiSystemEventListener(RpcManager.class.getSimpleName()
660 + "EventHandler", getContext(),
661 AbstractEventHandler.getEventHandlerMappings(
662 new RpcConnectionDestroyedEventHandler(this),
663 new RpcChannelReadyEventHandler(this, operations),
664 new RpcResultHandler(this),
665 new ResultRecordObservedHandler(this),
666 new SendRpcEventHandler(this),
667 new ResultRecordNotObservedHandler(this),
668 new RpcInvokeHandler(this)));
669 }
670
671 public IFrameworkContext getContext()
672 {
673 return this.context;
674 }
675
676 public IRpcRegistry getRegistry()
677 {
678 return this.registry;
679 }
680
681 public Map<Integer, DualValue<IRpcResultHandler, IRpcDefinition>> getResultHandlers()
682 {
683 return this.resultHandlers;
684 }
685
686 public AtomicInteger getMarkerCounter()
687 {
688 return this.markerCounter;
689 }
690
691 public MapList<String, QuadValue<String, IField[], IRpcResultHandler, IRpcMarker>> getPendingRpcInvocations()
692 {
693 return this.pendingRpcInvocations;
694 }
695
696 public MapSet<String, IContainer> getResultRecords()
697 {
698 return this.resultRecords;
699 }
700
701 public Set<String> getConnectedContexts()
702 {
703 return this.connectedContexts;
704 }
705
706 public String getResultRecordName(String remoteContextIdentity,
707 String rpcKey)
708 {
709 return remoteContextIdentity + COLON + rpcKey;
710 }
711
712 public MultiSystemEventListener getEventHandler()
713 {
714 return this.eventHandler;
715 }
716
717 public MapSet<String, IRpcPublicationListener> getRpcPublicationListeners()
718 {
719 return this.rpcPublicationListeners;
720 }
721
722 @Override
723 protected void doDestroy()
724 {
725 getEventHandler().destroy();
726 getRegistry().destroy();
727 getTaskExecutor().destroy();
728 }
729
730 @Override
731 protected void doStart()
732 {
733 getTaskExecutor().start();
734 getEventHandler().start();
735 getRegistry().start();
736 }
737
738 public IReferenceCounter<DualValue<String, String>> getResultRecordSubscriptionCounter()
739 {
740 return this.resultRecordSubscriptionCounter;
741 }
742
743 public ITaskExecutor getTaskExecutor()
744 {
745 return this.executor;
746 }
747
748 public Set<String> getObservedResultRecords()
749 {
750 return this.observedResultRecords;
751 }
752 }
753
754
755
756
757
758
759
760
761 abstract class RpcEventHandler<EVENT extends IEvent> extends
762 AbstractEventHandler<EVENT> implements ISystemEventListener
763 {
764
765
766 private final IRpcManagerState state;
767
768
769
770
771
772
773
774 RpcEventHandler(IRpcManagerState state)
775 {
776 super();
777 this.state = state;
778 }
779
780 IRpcManagerState getState()
781 {
782 return state;
783 }
784 }
785
786
787
788
789
790
791
792
793
794
795
796 final class RpcInvokeHandler extends RpcEventHandler<RpcInvokeEvent> implements
797 ITaskHandler<RpcInvokeEvent>
798 {
799 private final static AsyncLog LOG = new AsyncLog(RpcInvokeHandler.class);
800
801 RpcInvokeHandler(IRpcManagerState state)
802 {
803 super(state);
804 }
805
806
807
808
809
810
811
812 @Override
813 public void handle(RpcInvokeEvent event)
814 {
815 getState().getTaskExecutor().execute(
816 new Task<RpcInvokeEvent>(this, event));
817 }
818
819 public void handleTask(RpcInvokeEvent event)
820 {
821
822 event.decode(getState().getRegistry());
823
824
825 final String resultRecordName =
826 getState().getResultRecordName(event.getRemoteContextIdentity(),
827 event.getRpcKey());
828 final IContainer resultRecord =
829 getState().getContext().getLocalContainer(resultRecordName,
830 Type.SYSTEM, Domain.FRAMEWORK);
831
832
833
834
835
836 synchronized (getState().getObservedResultRecords())
837 {
838 if (!getState().getObservedResultRecords().contains(
839 resultRecordName))
840 {
841 if (getLog().isInfoEnabled())
842 {
843 getLog().info(
844 "Waiting for result record " + resultRecordName
845 + " to be observed");
846 }
847 try
848 {
849 getState().getObservedResultRecords().wait();
850 }
851 catch (InterruptedException e)
852 {
853 logException(getLog(), "Waiting for " + resultRecordName, e);
854 }
855 }
856 }
857
858 IRpcResult result = null;
859 try
860 {
861 String rpcKey = event.getRpcKey();
862 final IRpcHandler handler =
863 getState().getRegistry().getHandler(rpcKey);
864 if (handler == null)
865 {
866 final String message = "No hander for " + safeToString(event);
867 if (getLog().isDebugEnabled())
868 {
869 getLog().debug(message);
870 }
871 result = new RpcResult(false, null, message);
872 }
873 else
874 {
875 IRpcDefinition definition =
876 getState().getRegistry().getDefinition(rpcKey);
877 if (definition == null)
878 {
879 final String message =
880 "No definition for " + safeToString(event);
881 if (getLog().isDebugEnabled())
882 {
883 getLog().debug(message);
884 }
885 result = new RpcResult(false, null, message);
886 }
887 else
888 {
889 try
890 {
891 if (getLog().isDebugEnabled())
892 {
893 getLog().debug(
894 "Invoking " + safeToString(definition)
895 + " with arguments "
896 + Arrays.deepToString(event.getArguments()));
897 }
898 result =
899 handler.handle(definition, event.getArguments());
900 }
901 catch (Exception e)
902 {
903 logException(getLog(), "Handling RPC invoke for "
904 + safeToString(definition) + " with "
905 + Arrays.deepToString(event.getArguments())
906 + " from " + event.getRemoteContextIdentity(), e);
907 result = new RpcResult(false, null, e.toString());
908 }
909
910 }
911 }
912 }
913 catch (Exception e)
914 {
915 logException(getLog(), "Could not handle " + safeToString(event), e);
916 result = new RpcResult(false, null, e.toString());
917 }
918
919
920
921 getState().getResultRecords().get(event.getRemoteContextIdentity()).add(
922 resultRecord);
923
924
925 resultRecord.beginFrame(new EventFrameExecution());
926 try
927 {
928 result.updateResultRecord(event.getMarker(), resultRecord);
929 }
930 finally
931 {
932
933 resultRecord.endFrame();
934 }
935 }
936
937 @Override
938 protected AsyncLog getLog()
939 {
940 return LOG;
941 }
942 }
943
944
945
946
947
948
949
950
951
952
953 final class RpcResultHandler extends RpcEventHandler<IContainer> implements
954 ITaskHandler<IContainer>
955 {
956 private final static AsyncLog LOG = new AsyncLog(RpcResultHandler.class);
957
958 RpcResultHandler(IRpcManagerState state)
959 {
960 super(state);
961 }
962
963
964
965
966
967
968 @Override
969 public void handle(IContainer event)
970 {
971 getState().getTaskExecutor().execute(new Task<IContainer>(this, event));
972 }
973
974 @Override
975 protected AsyncLog getLog()
976 {
977 return LOG;
978 }
979
980 @SuppressWarnings("boxing")
981 public void handleTask(IContainer event)
982 {
983 if (event.getDataState() != DataState.LIVE)
984 {
985 if (getLog().isDebugEnabled())
986 {
987 getLog().debug(
988 "Waiting for result record to become active " + event);
989 }
990 return;
991 }
992 final IntegerField markerIdField =
993 event.getIntegerField(IRpcResult.MARKER);
994 if (markerIdField == null)
995 {
996 if (getLog().isDebugEnabled())
997 {
998 getLog().debug(
999 "Waiting for result record to be populated " + event);
1000 }
1001 return;
1002 }
1003 final int markerId = markerIdField.get();
1004 final DualValue<IRpcResultHandler, IRpcDefinition> result;
1005 final Map<Integer, DualValue<IRpcResultHandler, IRpcDefinition>> resultHandlers =
1006 getState().getResultHandlers();
1007 synchronized (resultHandlers)
1008 {
1009 result = resultHandlers.remove(markerId);
1010 }
1011 if (result == null)
1012 {
1013 if (getLog().isDebugEnabled())
1014 {
1015 getLog().debug("No result handler found for " + event);
1016 }
1017 return;
1018 }
1019 IRpcMarker marker = new RpcMarker(markerId);
1020 if (result.getSecond() == null)
1021 {
1022 throw new IllegalStateException("No definition found for "
1023 + safeToString(marker) + ", " + safeToString(result));
1024 }
1025 result.getFirst().resultReceived(
1026 new RpcResult(result.getSecond(), event), marker);
1027
1028
1029
1030
1031 final int count;
1032 final IReferenceCounter<DualValue<String, String>> counter =
1033 getState().getResultRecordSubscriptionCounter();
1034 synchronized (counter)
1035 {
1036 count =
1037 counter.adjustCount(new DualValue<String, String>(
1038 event.getNativeContextIdentity(), event.getIdentity()), -1);
1039 }
1040 if (count == 0)
1041 {
1042
1043 getState().getContext().unsubscribe(
1044 event.getNativeContextIdentity(), event.getIdentity(),
1045 Type.SYSTEM, Domain.FRAMEWORK, getState().getEventHandler());
1046 }
1047 }
1048 }
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058 final class RpcConnectionDestroyedEventHandler extends
1059 RpcEventHandler<ConnectionDestroyedEvent>
1060 {
1061 private final static AsyncLog LOG =
1062 new AsyncLog(RpcConnectionDestroyedEventHandler.class);
1063
1064 RpcConnectionDestroyedEventHandler(IRpcManagerState state)
1065 {
1066 super(state);
1067 }
1068
1069
1070
1071
1072
1073 @Override
1074 public void handle(ConnectionDestroyedEvent event)
1075 {
1076 if (getLog().isDebugEnabled())
1077 {
1078 getLog().debug(
1079 "Destroying result records attached to remote context "
1080 + event.getRemoteContextIdentity());
1081 }
1082 final Set<IContainer> resultRecords;
1083 final Set<String> connectedContexts = getState().getConnectedContexts();
1084 synchronized (connectedContexts)
1085 {
1086 resultRecords =
1087 getState().getResultRecords().remove(
1088 event.getRemoteContextIdentity());
1089 connectedContexts.remove(event.getRemoteContextIdentity());
1090 }
1091 if (resultRecords != null)
1092 {
1093 for (IContainer container : resultRecords)
1094 {
1095 container.destroy();
1096 }
1097 }
1098 }
1099
1100 @Override
1101 protected AsyncLog getLog()
1102 {
1103 return LOG;
1104 }
1105 }
1106
1107
1108
1109
1110
1111
1112
1113 final class RpcChannelReadyEventHandler extends
1114 RpcEventHandler<ChannelReadyEvent>
1115 {
1116 private final static AsyncLog LOG =
1117 new AsyncLog(RpcChannelReadyEventHandler.class);
1118
1119 final IRpcManagerOperations operations;
1120
1121 RpcChannelReadyEventHandler(IRpcManagerState state,
1122 IRpcManagerOperations operations)
1123 {
1124 super(state);
1125 this.operations = operations;
1126 }
1127
1128
1129
1130
1131
1132 @Override
1133 public void handle(ChannelReadyEvent event)
1134 {
1135 final String remoteContextIdentity =
1136 event.getChannel().getRemoteContextIdentity();
1137 final List<QuadValue<String, IField[], IRpcResultHandler, IRpcMarker>> pendingRpcs;
1138 final Set<String> connectedContexts = getState().getConnectedContexts();
1139
1140 synchronized (connectedContexts)
1141 {
1142 connectedContexts.add(remoteContextIdentity);
1143 pendingRpcs =
1144 getState().getPendingRpcInvocations().remove(
1145 remoteContextIdentity);
1146 }
1147 if (pendingRpcs == null)
1148 {
1149 return;
1150 }
1151 if (getLog().isDebugEnabled())
1152 {
1153 getLog().debug(
1154 "Invoking " + pendingRpcs.size() + " RPCs for "
1155 + remoteContextIdentity);
1156 }
1157 for (QuadValue<String, IField[], IRpcResultHandler, IRpcMarker> rpc : pendingRpcs)
1158 {
1159 final DualValue<String, IRpcDefinition> keyAndDefinition =
1160 getOperations().getRegistryKeyAndDefinition(
1161 remoteContextIdentity, rpc.getFirst(), rpc.getSecond());
1162 getOperations().invoke(remoteContextIdentity,
1163 keyAndDefinition.getFirst(), keyAndDefinition.getSecond(),
1164 rpc.getSecond(), rpc.getThird(), rpc.getFourth());
1165 }
1166 }
1167
1168 @Override
1169 protected AsyncLog getLog()
1170 {
1171 return LOG;
1172 }
1173
1174 private IRpcManagerOperations getOperations()
1175 {
1176 return this.operations;
1177 }
1178
1179 }
1180
1181
1182
1183
1184
1185
1186
1187
1188 abstract class AbstractRpcPublicationHandler<T extends AbstractContainerFieldEvent>
1189 extends RpcEventHandler<T>
1190 {
1191 private final IRpcManagerOperations operations;
1192
1193 AbstractRpcPublicationHandler(IRpcManagerState state,
1194 IRpcManagerOperations operations)
1195 {
1196 super(state);
1197 this.operations = operations;
1198 }
1199
1200 @Override
1201 public void handle(T event)
1202 {
1203 final IField field = event.getField();
1204 final String identity = field.getIdentity();
1205 if (identity.indexOf(IRpcRegistry.RPC_KEY) > -1)
1206 {
1207 if (!(field instanceof StringField))
1208 {
1209 return;
1210 }
1211 IContainer container = (IContainer) event.getSource();
1212 final Set<IRpcPublicationListener> listeners;
1213
1214 final MapSet<String, IRpcPublicationListener> rpcPublicationListeners =
1215 getState().getRpcPublicationListeners();
1216 synchronized (rpcPublicationListeners)
1217 {
1218 listeners =
1219 rpcPublicationListeners.get(container.getNativeContextIdentity());
1220 }
1221 final String definitionAsString = ((StringField) field).get();
1222 IRpcDefinition rpcDefinition =
1223 new RpcDefinition(definitionAsString);
1224 for (IRpcPublicationListener listener : listeners)
1225 {
1226 doAction(container, rpcDefinition, listener);
1227 }
1228 }
1229 }
1230
1231 IRpcManagerOperations getOperations()
1232 {
1233 return this.operations;
1234 }
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246 abstract void doAction(IContainer container, IRpcDefinition rpcDefinition,
1247 IRpcPublicationListener listener);
1248 }
1249
1250
1251
1252
1253
1254
1255
1256
1257 final class RpcPublishedHandler extends
1258 AbstractRpcPublicationHandler<ContainerFieldAddedEvent>
1259 {
1260 private final static AsyncLog LOG = new AsyncLog(RpcPublishedHandler.class);
1261
1262 RpcPublishedHandler(IRpcManagerState state, IRpcManagerOperations operations)
1263 {
1264 super(state, operations);
1265 }
1266
1267 @Override
1268 protected AsyncLog getLog()
1269 {
1270 return LOG;
1271 }
1272
1273 @Override
1274 void doAction(IContainer container, IRpcDefinition rpcDefinition,
1275 IRpcPublicationListener listener)
1276 {
1277 final String remoteContextIdentity =
1278 container.getNativeContextIdentity();
1279 listener.procedureAvailable(remoteContextIdentity, rpcDefinition);
1280
1281 final List<QuadValue<String, IField[], IRpcResultHandler, IRpcMarker>> pendingRpcs;
1282 final Set<String> connectedContexts = getState().getConnectedContexts();
1283 final List<QuadValue<String, IField[], IRpcResultHandler, IRpcMarker>> toInvoke =
1284 CollectionFactory.newList();
1285
1286 synchronized (connectedContexts)
1287 {
1288 pendingRpcs =
1289 getState().getPendingRpcInvocations().get(remoteContextIdentity);
1290
1291 if (pendingRpcs == null)
1292 {
1293 return;
1294 }
1295 for (Iterator<QuadValue<String, IField[], IRpcResultHandler, IRpcMarker>> iterator =
1296 pendingRpcs.iterator(); iterator.hasNext();)
1297 {
1298 QuadValue<String, IField[], IRpcResultHandler, IRpcMarker> rpc =
1299 iterator.next();
1300 if (rpcDefinition.getName().equals(rpc.getFirst())
1301 && Arrays.equals(rpcDefinition.getArgumentTypes(),
1302 RpcUtils.getArgumentTypes(rpc.getSecond())))
1303 {
1304 iterator.remove();
1305 toInvoke.add(rpc);
1306 }
1307 }
1308 }
1309 if (getLog().isDebugEnabled())
1310 {
1311 getLog().debug(
1312 "Invoking " + toInvoke.size()
1313 + " awaiting calls for newly available RPC "
1314 + rpcDefinition + " from " + remoteContextIdentity);
1315 }
1316
1317 for (QuadValue<String, IField[], IRpcResultHandler, IRpcMarker> rpc : toInvoke)
1318 {
1319 final DualValue<String, IRpcDefinition> keyAndDefinition =
1320 getOperations().getRegistryKeyAndDefinition(
1321 remoteContextIdentity, rpc.getFirst(), rpc.getSecond());
1322 getOperations().invoke(remoteContextIdentity,
1323 keyAndDefinition.getFirst(), keyAndDefinition.getSecond(),
1324 rpc.getSecond(), rpc.getThird(), rpc.getFourth());
1325 }
1326 }
1327 }
1328
1329
1330
1331
1332
1333
1334
1335
1336 final class RpcUnpublishedHandler extends
1337 AbstractRpcPublicationHandler<ContainerFieldRemovedEvent>
1338 {
1339 private final static AsyncLog LOG = new AsyncLog(RpcPublishedHandler.class);
1340
1341 RpcUnpublishedHandler(IRpcManagerState state,
1342 IRpcManagerOperations operations)
1343 {
1344 super(state, operations);
1345 }
1346
1347 @Override
1348 protected AsyncLog getLog()
1349 {
1350 return LOG;
1351 }
1352
1353 void doAction(IContainer container, IRpcDefinition rpcDefinition,
1354 IRpcPublicationListener listener)
1355 {
1356 listener.procedureUnavailable(container.getNativeContextIdentity(),
1357 rpcDefinition);
1358 }
1359 }
1360
1361
1362
1363
1364
1365
1366 final class SendRpcEventHandler extends RpcEventHandler<SendRpcEvent>
1367 {
1368 private final static AsyncLog LOG = new AsyncLog(SendRpcEventHandler.class);
1369
1370 SendRpcEventHandler(IRpcManagerState state)
1371 {
1372 super(state);
1373 }
1374
1375 @Override
1376 protected AsyncLog getLog()
1377 {
1378 return LOG;
1379 }
1380
1381
1382
1383
1384
1385 @Override
1386 public void handle(SendRpcEvent event)
1387 {
1388 if (LOG.isDebugEnabled())
1389 {
1390 LOG.debug("invoking " + event);
1391 }
1392 getState().getContext().invokeRpc(event.getRemoteContextIdentity(),
1393 event.getRpcData());
1394 }
1395 }
1396
1397
1398
1399
1400
1401
1402
1403
1404 final class ResultRecordObservedHandler extends
1405 RpcEventHandler<EventSourceObservedEvent>
1406 {
1407 private final static AsyncLog LOG =
1408 new AsyncLog(ResultRecordObservedHandler.class);
1409
1410 ResultRecordObservedHandler(IRpcManagerState state)
1411 {
1412 super(state);
1413 }
1414
1415 @Override
1416 protected AsyncLog getLog()
1417 {
1418 return LOG;
1419 }
1420
1421 @Override
1422 public void handle(final EventSourceObservedEvent event)
1423 {
1424 if (event.getIdentity().contains(IRpcRegistry.RPC_KEY))
1425 {
1426 synchronized (getState().getObservedResultRecords())
1427 {
1428 getState().getObservedResultRecords().add(event.getIdentity());
1429 getState().getObservedResultRecords().notify();
1430 }
1431 }
1432 }
1433 }
1434
1435
1436
1437
1438
1439
1440
1441 final class ResultRecordNotObservedHandler extends
1442 RpcEventHandler<EventSourceNotObservedEvent>
1443 {
1444 private final static AsyncLog LOG =
1445 new AsyncLog(ResultRecordNotObservedHandler.class);
1446
1447 ResultRecordNotObservedHandler(IRpcManagerState state)
1448 {
1449 super(state);
1450 }
1451
1452 @Override
1453 protected AsyncLog getLog()
1454 {
1455 return LOG;
1456 }
1457
1458 @Override
1459 public void handle(EventSourceNotObservedEvent event)
1460 {
1461 if (event.getSource().getIdentity().contains(IRpcRegistry.RPC_KEY))
1462 {
1463 synchronized (getState().getObservedResultRecords())
1464 {
1465 getState().getObservedResultRecords().remove(
1466 event.getSource().getIdentity());
1467 }
1468 }
1469 }
1470 }