1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package fulmine.context;
17
18 import static fulmine.util.Utils.COLON;
19 import static fulmine.util.Utils.COMMA_SPACE;
20 import static fulmine.util.Utils.EMPTY_STRING;
21 import static fulmine.util.Utils.SPACING_4_CHARS;
22 import static fulmine.util.Utils.logException;
23 import static fulmine.util.Utils.nullCheck;
24 import static fulmine.util.Utils.safeToString;
25 import static fulmine.util.Utils.string;
26
27 import java.util.Collection;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Set;
31
32 import org.apache.commons.lang.SystemUtils;
33
34 import fulmine.AbstractLifeCycle;
35 import fulmine.IDomain;
36 import fulmine.IType;
37 import fulmine.distribution.IDistributionManager;
38 import fulmine.distribution.IDistributionState;
39 import fulmine.distribution.IRemoteUpdateInvoker;
40 import fulmine.distribution.RemoteUpdateInvoker;
41 import fulmine.distribution.channel.Channel;
42 import fulmine.distribution.channel.ChannelFactory;
43 import fulmine.distribution.channel.ChannelReadyEvent;
44 import fulmine.distribution.channel.ChannelTransmissionListener;
45 import fulmine.distribution.channel.IChannel;
46 import fulmine.distribution.channel.IChannelFactory;
47 import fulmine.distribution.connection.IConnection;
48 import fulmine.distribution.connection.IConnectionDiscoverer;
49 import fulmine.distribution.connection.IConnectionParameters;
50 import fulmine.distribution.events.ConnectionAvailableEvent;
51 import fulmine.distribution.events.ConnectionDestroyedEvent;
52 import fulmine.distribution.events.ContextDiscoveredEvent;
53 import fulmine.distribution.events.ContextNotAvailableEvent;
54 import fulmine.event.IEvent;
55 import fulmine.event.IEventManager;
56 import fulmine.event.listener.AbstractEventHandler;
57 import fulmine.event.listener.IEventListener;
58 import fulmine.event.listener.ILifeCycleEventListener;
59 import fulmine.event.listener.MultiSystemEventListener;
60 import fulmine.event.subscription.ISubscriptionListener;
61 import fulmine.event.subscription.ISubscriptionManager;
62 import fulmine.event.subscription.ISubscriptionParameters;
63 import fulmine.event.subscription.SubscriptionParameters;
64 import fulmine.event.system.AbstractSystemEvent;
65 import fulmine.event.system.EventSourceNotObservedEvent;
66 import fulmine.event.system.EventSourceObservedEvent;
67 import fulmine.event.system.ISystemEventListener;
68 import fulmine.event.system.SubscribeEvent;
69 import fulmine.event.system.UnsubscribeEvent;
70 import fulmine.model.container.IContainer;
71 import fulmine.model.container.subscription.ContainerSubscriptionManager;
72 import fulmine.model.field.IntegerField;
73 import fulmine.model.field.StringField;
74 import fulmine.protocol.specification.FrameReader;
75 import fulmine.protocol.specification.FrameWriter;
76 import fulmine.protocol.specification.IFrameReader;
77 import fulmine.protocol.specification.IFrameWriter;
78 import fulmine.rpc.IRpcResult;
79 import fulmine.util.collection.CollectionFactory;
80 import fulmine.util.collection.CollectionUtils;
81 import fulmine.util.log.AsyncLog;
82 import fulmine.util.reference.AutoCreatingStore;
83 import fulmine.util.reference.DualValue;
84 import fulmine.util.reference.IAutoCreatingStore;
85 import fulmine.util.reference.IObjectBuilder;
86 import fulmine.util.reference.IReferenceCounter;
87 import fulmine.util.reference.ReferenceCounter;
88 import fulmine.util.reference.Values;
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140 class DistributionManager extends AbstractLifeCycle implements
141 IDistributionManager
142 {
143 final static AsyncLog LOG = new AsyncLog(DistributionManager.class);
144
145
146 private IDistributionState state;
147
148
149
150
151
152
153
154 DistributionManager(IFrameworkContext context)
155 {
156 this(new DistributionState(context));
157 }
158
159
160
161
162
163
164
165 DistributionManager(IDistributionState state)
166 {
167 super();
168 this.state = state;
169 }
170
171 @Override
172 protected AsyncLog getLog()
173 {
174 return LOG;
175 }
176
177 @Override
178 protected void doStart()
179 {
180 getState().init();
181 getState().start();
182 if (getLog().isInfoEnabled())
183 {
184 getLog().info("Started");
185 }
186 }
187
188 public boolean subscribe(String contextIdentity, String identityRegex,
189 IType type, IDomain domain, IEventListener listener)
190 {
191 final SubscriptionParameters parameters =
192 new SubscriptionParameters(identityRegex, type, domain);
193 if (getLog().isInfoEnabled())
194 {
195 getLog().info(
196 "[subscribe] context=" + contextIdentity + COMMA_SPACE
197 + parameters + ", listener=" + safeToString(listener));
198 }
199 if (getState().getContext().getIdentity().equals(contextIdentity))
200 {
201 boolean subscribed = false;
202
203 final ISubscriptionManager subscriptionManager =
204 getState().getSubscriptionManager();
205 subscriptionManager.subscribe(parameters);
206 subscribed = subscriptionManager.addListener(parameters, listener);
207 return subscribed;
208 }
209 return doRemoteSubscribe(contextIdentity, parameters, listener);
210 }
211
212
213
214
215
216
217
218
219
220
221
222
223
224 protected boolean doRemoteSubscribe(String remoteContextIdentity,
225 ISubscriptionParameters parameters, IEventListener listener)
226 {
227 boolean subscribed = false;
228 nullCheck(remoteContextIdentity, "No remote context identity");
229 IConnectionParameters connectionParameters = null;
230 DualValue<ISubscriptionParameters, IEventListener> values =
231 new DualValue<ISubscriptionParameters, IEventListener>(parameters,
232 listener);
233 boolean connect = false;
234 final IChannel channel;
235
236 synchronized (getState())
237 {
238 final Set<DualValue<ISubscriptionParameters, IEventListener>> remoteSubscriptions =
239 getState().getRemoteSubscriptions().get(remoteContextIdentity);
240
241
242 subscribed = !remoteSubscriptions.contains(values);
243 remoteSubscriptions.add(values);
244 if (getLog().isTraceEnabled())
245 {
246 getLog().trace(
247 "Remote subscriptions for remote context "
248 + remoteContextIdentity
249 + " are"
250 + CollectionUtils.toFormattedString(remoteSubscriptions));
251 }
252 connectionParameters =
253 getState().getDiscoveredContexts().get(remoteContextIdentity);
254
255
256
257
258
259
260 channel = getState().getChannels().get(remoteContextIdentity);
261 if (channel == null)
262 {
263 connect =
264 shouldConnect(remoteContextIdentity, connectionParameters);
265 }
266 }
267 if (channel != null)
268 {
269
270
271
272
273 getState().getContext().queueEvent(
274 new RemoteContainerSubscriptionEvent(getState().getContext(),
275 remoteContextIdentity, values));
276 }
277
278
279
280
281
282
283 if (connect)
284 {
285 getState().getContext().getConnectionBroker().connect(
286 connectionParameters);
287 }
288 return subscribed;
289 }
290
291
292
293
294
295
296
297
298
299
300
301
302
303 private boolean shouldConnect(String remoteContextIdentity,
304 IConnectionParameters connectionParameters)
305 {
306 synchronized (getState())
307 {
308 boolean connect = false;
309 if (connectionParameters != null)
310 {
311
312 final String connectionIdentity =
313 connectionParameters.getRemoteContextIdentity();
314 if (getState().getConnectedContexts().getCount(
315 connectionIdentity) == 0)
316 {
317
318
319
320
321
322
323 if (getState().getCONNECTINGContexts().getCount(
324 connectionIdentity) == 0)
325 {
326 getState().getCONNECTINGContexts().adjustCount(
327 connectionIdentity, 1);
328 if (getLog().isTraceEnabled())
329 {
330 getLog().trace(
331 "CONNECTING contexts are "
332 + safeToString(getState().getCONNECTINGContexts()));
333 }
334 connect = true;
335 }
336 else
337 {
338 if (getLog().isTraceEnabled())
339 {
340 getLog().trace(
341 "Already trying to connect to "
342 + safeToString(connectionParameters));
343 }
344 }
345 }
346 else
347 {
348 if (getLog().isTraceEnabled())
349 {
350 getLog().trace(
351 "Already connected to "
352 + safeToString(connectionParameters));
353 }
354 }
355 }
356 else
357 {
358 if (getLog().isDebugEnabled())
359 {
360 getLog().debug(
361 "Remote context " + remoteContextIdentity
362 + " has not been discovered yet");
363 }
364 }
365 return connect;
366 }
367 }
368
369 public boolean unsubscribe(String contextIdentity, String identityRegex,
370 IType type, IDomain domain, IEventListener listener)
371 {
372 final SubscriptionParameters parameters =
373 new SubscriptionParameters(identityRegex, type, domain);
374 if (getLog().isInfoEnabled())
375 {
376 getLog().info(
377 "[unsubscribe] context=" + contextIdentity + COMMA_SPACE
378 + parameters + ", listener=" + safeToString(listener));
379 }
380 if (getState().getContext().getIdentity().equals(contextIdentity))
381 {
382 boolean unsubscribed = false;
383
384 if (canUnsubscribe(getState().getSubscriptionManager(), parameters,
385 listener, false))
386 {
387 unsubscribed =
388 getState().getSubscriptionManager().unsubscribe(parameters);
389 }
390 return unsubscribed;
391 }
392 return doRemoteUnsubscribe(contextIdentity, parameters, listener);
393 }
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415 private final boolean canUnsubscribe(
416 ISubscriptionManager subscriptionManager,
417 final ISubscriptionParameters parameters, IEventListener listener,
418 boolean unsubscribeRemote)
419 {
420 if (getLog().isTraceEnabled())
421 {
422 getLog().trace(
423 "removing listener "
424 + safeToString(listener)
425 + " for "
426 + safeToString(parameters)
427 + " from subscription listeners "
428 + CollectionUtils.toFormattedString(subscriptionManager.getListeners(parameters)));
429 }
430 subscriptionManager.removeListener(parameters, listener);
431 final List<IEventListener> listeners =
432 subscriptionManager.getListeners(parameters);
433 boolean canUnsubscribe = false;
434 if (listeners.size() == 0)
435 {
436 canUnsubscribe = true;
437 }
438 else
439 {
440 if (unsubscribeRemote)
441 {
442 canUnsubscribe = applicationListenersExist(listeners);
443 }
444 }
445
446 if (getLog().isTraceEnabled())
447 {
448 getLog().trace(
449 "Can"
450 + (canUnsubscribe ? EMPTY_STRING : "not")
451 + " unsubscribe "
452 + safeToString(parameters)
453 + ", subscribed sources: "
454 + CollectionUtils.toFormattedString(subscriptionManager.getSubscribedSources(parameters))
455 + ", listeners: "
456 + CollectionUtils.toFormattedString(subscriptionManager.getListeners(parameters)));
457 }
458 return canUnsubscribe;
459 }
460
461
462
463
464
465
466
467
468
469
470
471 protected boolean applicationListenersExist(
472 final List<IEventListener> listeners)
473 {
474 boolean canUnsubscribe = false;
475
476
477
478
479
480 if (listeners.size() == 1
481 && listeners.get(0) instanceof ChannelTransmissionListener)
482 {
483 canUnsubscribe = true;
484 }
485 return canUnsubscribe;
486 }
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501 protected boolean doRemoteUnsubscribe(String remoteContextIdentity,
502 ISubscriptionParameters parameters, IEventListener listener)
503 {
504 IChannel channel = null;
505 if (remoteContextIdentity != null)
506 {
507 final DualValue<ISubscriptionParameters, IEventListener> values =
508 new DualValue<ISubscriptionParameters, IEventListener>(
509 parameters, listener);
510
511 synchronized (getState())
512 {
513 final Set<DualValue<ISubscriptionParameters, IEventListener>> remoteSubscriptions =
514 getState().getRemoteSubscriptions().get(
515 remoteContextIdentity);
516 remoteSubscriptions.remove(values);
517 if (getLog().isTraceEnabled())
518 {
519 getLog().trace(
520 "Remote subscriptions for remote context "
521 + remoteContextIdentity
522 + " are now "
523 + CollectionUtils.toFormattedString(remoteSubscriptions));
524 }
525 }
526 channel = getState().getChannels().get(remoteContextIdentity);
527 }
528 if (channel != null)
529 {
530 if (canUnsubscribe(channel, parameters, listener, true))
531 {
532
533
534 return channel.unsubscribe(parameters);
535 }
536 }
537 return false;
538 }
539
540 @Override
541 protected void doDestroy()
542 {
543 getState().destroy();
544 for (IChannel channel : getState().getChannels().values())
545 {
546 try
547 {
548 channel.destroy();
549 }
550 catch (Exception e)
551 {
552 logException(getLog(), channel, e);
553 }
554 }
555 getState().getChannels().clear();
556 }
557
558 public IFrameReader getFrameReader()
559 {
560 return getState().getFrameReader();
561 }
562
563 public IFrameWriter getFrameWriter()
564 {
565 return getState().getFrameWriter();
566 }
567
568 public void requestRetransmit(String contextIdentity,
569 String identityRegularExpression, IType type, IDomain domain)
570 {
571 if (getLog().isDebugEnabled())
572 {
573 getLog().debug(
574 "requestRetransmit for context=" + contextIdentity
575 + ", identity=" + identityRegularExpression + ", type="
576 + type + ", domain=" + domain);
577 }
578 final IChannel channel = getChannel(contextIdentity);
579 if (channel != null)
580 {
581 channel.requestRetransmit(identityRegularExpression, type, domain);
582 }
583 }
584
585 public void requestRetransmitAll(String contextIdentity)
586 {
587 if (getLog().isDebugEnabled())
588 {
589 getLog().debug(
590 "requestRetransmitAll for context=" + contextIdentity);
591 }
592 final IChannel channel = getChannel(contextIdentity);
593 if (channel != null)
594 {
595 channel.requestRetransmitAll();
596 }
597 }
598
599 public void retransmit(String contextIdentity,
600 String identityRegularExpression, IType type, IDomain domain)
601 {
602 if (getLog().isDebugEnabled())
603 {
604 getLog().debug(
605 "retransmit for context=" + contextIdentity + ", identity="
606 + identityRegularExpression + ", type=" + type
607 + ", domain=" + domain);
608 }
609 final IChannel channel = getChannel(contextIdentity);
610 if (channel != null)
611 {
612 channel.retransmit(identityRegularExpression, type, domain);
613 }
614 }
615
616 public void retransmitAll(String contextIdentity)
617 {
618 if (getLog().isDebugEnabled())
619 {
620 getLog().debug("requestAll for context=" + contextIdentity);
621 }
622 final IChannel channel = getChannel(contextIdentity);
623 if (channel != null)
624 {
625 channel.retransmitAll();
626 }
627 }
628
629 public void retransmitAllToAll()
630 {
631 if (getLog().isDebugEnabled())
632 {
633 getLog().debug("requestAllToAll");
634 }
635 for (IChannel channel : getState().getChannels().values())
636 {
637 try
638 {
639 channel.retransmitAll();
640 }
641 catch (Exception e)
642 {
643 logException(getLog(), channel, e);
644 }
645 }
646 }
647
648 public void retransmitToAll(String identityRegularExpression, IType type,
649 IDomain domain)
650 {
651 if (getLog().isDebugEnabled())
652 {
653 getLog().debug(
654 "retransmitToAll for identity=" + identityRegularExpression
655 + ", type=" + type + ", domain=" + domain);
656 }
657 for (IChannel channel : getState().getChannels().values())
658 {
659 try
660 {
661 channel.retransmit(identityRegularExpression, type, domain);
662 }
663 catch (Exception e)
664 {
665 logException(getLog(), channel, e);
666 }
667 }
668 }
669
670 private IChannel getChannel(String contextIdentity)
671 {
672 return getState().getChannels().get(contextIdentity);
673 }
674
675 public IChannel[] getConnectedChannels()
676 {
677 final Collection<IChannel> values = getState().getChannels().values();
678 return values.toArray(new IChannel[values.size()]);
679 }
680
681
682
683
684 public void logConnectedChannels()
685 {
686 StringBuilder sb = new StringBuilder();
687 final IChannel[] connectedChannels = getConnectedChannels();
688 for (IChannel channel : connectedChannels)
689 {
690 sb.append(SystemUtils.LINE_SEPARATOR).append(SPACING_4_CHARS).append(
691 safeToString(channel.toString()));
692 }
693 if (connectedChannels.length == 0)
694 {
695 sb.append(SystemUtils.LINE_SEPARATOR).append(SPACING_4_CHARS).append(
696 "<UNCONNECTED>");
697 }
698 if (getLog().isInfoEnabled())
699 {
700 getLog().info(
701 SystemUtils.LINE_SEPARATOR + this + " connected to:" + sb);
702 }
703 }
704
705 public boolean addSubscriptionListener(ISubscriptionListener listener)
706 {
707 boolean addListener =
708 getState().getContext().getSystemEventSource(
709 EventSourceObservedEvent.class).addListener(listener);
710 addListener |=
711 getState().getContext().getSystemEventSource(
712 EventSourceNotObservedEvent.class).addListener(listener);
713 addListener |=
714 getState().getContext().getSystemEventSource(SubscribeEvent.class).addListener(
715 listener);
716 addListener |=
717 getState().getContext().getSystemEventSource(UnsubscribeEvent.class).addListener(
718 listener);
719 return addListener;
720 }
721
722 public boolean removeSubscriptionListener(ISubscriptionListener listener)
723 {
724 boolean removeListener =
725 getState().getContext().getSystemEventSource(
726 EventSourceNotObservedEvent.class).removeListener(listener);
727 removeListener |=
728 getState().getContext().getSystemEventSource(
729 EventSourceObservedEvent.class).removeListener(listener);
730 removeListener |=
731 getState().getContext().getSystemEventSource(SubscribeEvent.class).removeListener(
732 listener);
733 removeListener |=
734 getState().getContext().getSystemEventSource(UnsubscribeEvent.class).removeListener(
735 listener);
736 return removeListener;
737 }
738
739 public String updateRemoteContainer(String remoteContextIdentity,
740 String identity, IType type, IDomain domain, String fieldName,
741 String fieldValueAsString)
742 {
743
744
745 getState().getContext().addRpcPublicationListener(
746 remoteContextIdentity,
747 getState().getRemoteUpdateInvoker(remoteContextIdentity));
748 final IRpcResult result =
749 getState().getRemoteUpdateInvoker(remoteContextIdentity).invoke(
750 remoteContextIdentity,
751 new StringField("identity", identity),
752 new IntegerField("type", type.value()),
753 new IntegerField("domain", domain.value()),
754 new StringField("fieldName", fieldName),
755 new StringField("fieldValueString", fieldValueAsString),
756 new IntegerField(
757 "permissionApp",
758 getState().getContext().getPermissionProfile().getApplicationCode()),
759 new IntegerField(
760 "permissionCode",
761 getState().getContext().getPermissionProfile().getPermissionCode()),
762 new StringField("remoteContextIdentity",
763 getState().getContext().getIdentity()));
764 if (result == null)
765 {
766 return "Null result, RPC definition not available";
767 }
768 if (result.isSuccessful())
769 {
770 return result.getResult().getValue().toString();
771 }
772 return result.getExceptionMessage();
773 }
774
775 public void invokeRpc(String remoteContextIdentity, byte[] rpcData)
776 {
777 final IChannel channel =
778 getState().getChannels().get(remoteContextIdentity);
779 if (channel == null)
780 {
781
782
783
784
785 throw new RuntimeException("There is no channel available for "
786 + remoteContextIdentity);
787 }
788 channel.invokeRpc(remoteContextIdentity, rpcData);
789 }
790
791 IDistributionState getState()
792 {
793 return this.state;
794 }
795
796 void setState(IDistributionState state)
797 {
798 this.state = state;
799 }
800 }
801
802
803
804
805
806
807
808 class DistributionState extends AbstractLifeCycle implements IDistributionState
809 {
810
811
812
813
814
815
816 final class RemoteUpdateInvokerBuilder implements
817 IObjectBuilder<String, IRemoteUpdateInvoker>
818 {
819 public IRemoteUpdateInvoker create(String key)
820 {
821 return new RemoteUpdateInvoker(key, getContext());
822 }
823 }
824
825
826
827
828
829
830
831 static final class ValuesBuilder
832 implements
833 IObjectBuilder<String, Set<DualValue<ISubscriptionParameters, IEventListener>>>
834 {
835 public Set<DualValue<ISubscriptionParameters, IEventListener>> create(
836 String key)
837 {
838 return CollectionFactory.newSet(1);
839 }
840 }
841
842
843
844
845
846
847 private volatile Map<String, IChannel> channels;
848
849 private final Map<String, IConnectionParameters> discoveredConnections;
850
851 private final ISubscriptionManager subscriptionManager;
852
853 private final IAutoCreatingStore<String, Set<DualValue<ISubscriptionParameters, IEventListener>>> remoteSubscriptions;
854
855 private final IReferenceCounter<String> connectedContexts;
856
857 private final IReferenceCounter<String> CONNECTINGContexts;
858
859 private final IFrameworkContext context;
860
861 private final IFrameReader frameReader;
862
863 private final IFrameWriter frameWriter;
864
865 private ILifeCycleEventListener eventHandler;
866
867 private IChannelFactory channelFactory;
868
869 private IAutoCreatingStore<String, IRemoteUpdateInvoker> remoteUpdateInvokers;
870
871 DistributionState(IFrameworkContext context)
872 {
873 super();
874 this.context = context;
875 this.discoveredConnections = CollectionFactory.newMap(2);
876 this.channels = CollectionFactory.newMap(2);
877 this.subscriptionManager =
878 new ContainerSubscriptionManager(this.context, null);
879 this.remoteSubscriptions =
880 new AutoCreatingStore<String, Set<DualValue<ISubscriptionParameters, IEventListener>>>(
881 new ValuesBuilder());
882 this.connectedContexts = new ReferenceCounter<String>();
883 this.CONNECTINGContexts = new ReferenceCounter<String>();
884 this.frameReader = new FrameReader();
885 this.frameWriter = new FrameWriter();
886
887
888 this.remoteUpdateInvokers =
889 new AutoCreatingStore<String, IRemoteUpdateInvoker>(
890 new RemoteUpdateInvokerBuilder());
891 }
892
893
894
895
896 public void init()
897 {
898 this.channelFactory = createChannelFactory();
899 this.eventHandler =
900 new MultiSystemEventListener(
901 string(this, context.toString()),
902 context,
903 AbstractEventHandler.getEventHandlerMappings(createEventHandlers()));
904 }
905
906 public IChannelFactory createChannelFactory()
907 {
908 return new ChannelFactory();
909 }
910
911 @SuppressWarnings("unchecked")
912 AbstractEventHandler<? extends IEvent>[] createEventHandlers()
913 {
914 return new AbstractEventHandler[] { new ChannelReadyEventHandler(this),
915 new ConnectionAvailableEventHandler(this),
916 new ConnectionDestroyedEventHandler(this),
917 new RemoteContainerSubscriptionEventHandler(this),
918 new RemoteEventSourceNotObservedEventHandler(this),
919 new ContextDiscoveredEventHandler(this),
920 new ContextNotAvailableEventHandler(this), };
921 }
922
923 protected void doDestroy()
924 {
925 this.eventHandler.destroy();
926 this.subscriptionManager.destroy();
927 }
928
929 protected void doStart()
930 {
931 this.eventHandler.start();
932 this.subscriptionManager.start();
933 }
934
935 public final IChannelFactory getChannelFactory()
936 {
937 return this.channelFactory;
938 }
939
940 public final ILifeCycleEventListener getEventHandler()
941 {
942 return this.eventHandler;
943 }
944
945 public final Map<String, IChannel> getChannels()
946 {
947 return this.channels;
948 }
949
950 public final Map<String, IConnectionParameters> getDiscoveredContexts()
951 {
952 return this.discoveredConnections;
953 }
954
955 public final ISubscriptionManager getSubscriptionManager()
956 {
957 return this.subscriptionManager;
958 }
959
960 public final IAutoCreatingStore<String, Set<DualValue<ISubscriptionParameters, IEventListener>>> getRemoteSubscriptions()
961 {
962 return this.remoteSubscriptions;
963 }
964
965 public final IReferenceCounter<String> getConnectedContexts()
966 {
967 return this.connectedContexts;
968 }
969
970 public final IFrameworkContext getContext()
971 {
972 return this.context;
973 }
974
975 public final IFrameReader getFrameReader()
976 {
977 return this.frameReader;
978 }
979
980 public final IFrameWriter getFrameWriter()
981 {
982 return this.frameWriter;
983 }
984
985 public final void setChannels(Map<String, IChannel> channels)
986 {
987 this.channels = channels;
988 }
989
990 public final IReferenceCounter<String> getCONNECTINGContexts()
991 {
992 return this.CONNECTINGContexts;
993 }
994
995 public IRemoteUpdateInvoker getRemoteUpdateInvoker(
996 String remoteContextIdentity)
997 {
998 return this.remoteUpdateInvokers.get(remoteContextIdentity);
999 }
1000 }
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010 abstract class DistributionEventHandler<T extends IEvent> extends
1011 AbstractEventHandler<T> implements ISystemEventListener
1012 {
1013
1014 private final IDistributionState state;
1015
1016 DistributionEventHandler(IDistributionState state)
1017 {
1018 super();
1019 nullCheck(state, "No state provided");
1020 this.state = state;
1021 }
1022
1023
1024 IDistributionState getState()
1025 {
1026 return state;
1027 }
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043 final boolean validateChannelConnection(final IChannel channel,
1044 final IConnectionParameters connectionParameters)
1045 {
1046
1047 if (channel.getConnection().isOutbound()
1048 && (connectionParameters != null && !connectionParameters.isEqual(channel.getConnection())))
1049 {
1050 if (getLog().isInfoEnabled())
1051 {
1052 getLog().info(
1053 "Destroying " + safeToString(channel)
1054 + " because remote context"
1055 + " connection parameters have changed to "
1056 + safeToString(connectionParameters));
1057 }
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067 channel.getConnection().destroy();
1068 return false;
1069 }
1070 return true;
1071 }
1072 }
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083 class ContextNotAvailableEventHandler extends
1084 DistributionEventHandler<ContextNotAvailableEvent>
1085 {
1086 private final static AsyncLog LOG =
1087 new AsyncLog(ContextNotAvailableEventHandler.class);
1088
1089 ContextNotAvailableEventHandler(IDistributionState state)
1090 {
1091 super(state);
1092 }
1093
1094
1095
1096
1097
1098
1099
1100 @Override
1101 public void handle(ContextNotAvailableEvent event)
1102 {
1103 IChannel channel;
1104
1105
1106 synchronized (getState())
1107 {
1108 getState().getDiscoveredContexts().remove(
1109 event.getRemoteContextIdentity());
1110
1111
1112 channel =
1113 getState().getChannels().get(event.getRemoteContextIdentity());
1114 }
1115 if (channel != null)
1116 {
1117
1118 final IConnection connection = channel.getConnection();
1119 if (connection != null)
1120 {
1121 connection.destroy();
1122 }
1123 }
1124 }
1125
1126 @Override
1127 public AsyncLog getLog()
1128 {
1129 return LOG;
1130 }
1131 }
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142 class ContextDiscoveredEventHandler extends
1143 DistributionEventHandler<ContextDiscoveredEvent>
1144 {
1145
1146 private final static AsyncLog LOG =
1147 new AsyncLog(ContextDiscoveredEventHandler.class);
1148
1149 ContextDiscoveredEventHandler(IDistributionState state)
1150 {
1151 super(state);
1152 }
1153
1154
1155
1156
1157
1158
1159
1160 @Override
1161 public void handle(ContextDiscoveredEvent event)
1162 {
1163 if (!getState().getContext().isActive())
1164 {
1165 if (getLog().isInfoEnabled())
1166 {
1167 getLog().info(
1168 "Context is not active, ignoring " + safeToString(event));
1169 }
1170 return;
1171 }
1172 final IConnectionParameters connectionParameters =
1173 event.getConnectionParameters();
1174 final String remoteContextIdentity =
1175 connectionParameters.getRemoteContextIdentity();
1176 nullCheck(remoteContextIdentity, "No remote context identity");
1177 boolean connect = false;
1178
1179 synchronized (getState())
1180 {
1181 connect =
1182 getState().getRemoteSubscriptions().get(remoteContextIdentity).size() > 0;
1183 if (connect)
1184 {
1185 if (getLog().isDebugEnabled())
1186 {
1187 getLog().debug(
1188 "Connect pending because there are remote subscriptions to "
1189 + remoteContextIdentity
1190 + CollectionUtils.toFormattedString(getState().getRemoteSubscriptions().get(
1191 remoteContextIdentity)));
1192 }
1193
1194 connect =
1195 getState().getConnectedContexts().getCount(
1196 remoteContextIdentity) == 0;
1197 if (getLog().isDebugEnabled())
1198 {
1199 getLog().debug(
1200 "Connected contexts are "
1201 + safeToString(getState().getConnectedContexts()));
1202 }
1203 if (connect)
1204 {
1205 if (getLog().isDebugEnabled())
1206 {
1207 getLog().debug(
1208 "Connect still pending because there is no connection, connecting contexts are "
1209 + safeToString(getState().getCONNECTINGContexts()));
1210 }
1211
1212 connect =
1213 getState().getCONNECTINGContexts().getCount(
1214 remoteContextIdentity) == 0;
1215
1216 if (connect)
1217 {
1218 getState().getCONNECTINGContexts().adjustCount(
1219 remoteContextIdentity, 1);
1220 }
1221 else
1222 {
1223 if (getLog().isDebugEnabled())
1224 {
1225 getLog().debug(
1226 "Already trying to connect to "
1227 + safeToString(event.getConnectionParameters()));
1228 }
1229 }
1230 }
1231 else
1232 {
1233 if (getLog().isDebugEnabled())
1234 {
1235 getLog().debug(
1236 "Already connected to "
1237 + safeToString(event.getConnectionParameters())
1238 + ", current connections are: "
1239 + safeToString(getState().getConnectedContexts()));
1240 }
1241 }
1242 }
1243 getState().getDiscoveredContexts().put(remoteContextIdentity,
1244 event.getConnectionParameters());
1245 if (getLog().isInfoEnabled())
1246 {
1247 getLog().info(
1248 "Discovered contexts are: "
1249 + CollectionUtils.toFormattedString(getState().getDiscoveredContexts()));
1250 }
1251 }
1252
1253 final IChannel channel =
1254 getState().getChannels().get(remoteContextIdentity);
1255 if (channel != null)
1256 {
1257
1258
1259 validateChannelConnection(channel, event.getConnectionParameters());
1260 }
1261 if (connect)
1262 {
1263 try
1264 {
1265
1266
1267 getState().getContext().getConnectionBroker().connect(
1268 event.getConnectionParameters());
1269 }
1270 catch (RuntimeException e)
1271 {
1272 synchronized (getState())
1273 {
1274
1275 getState().getCONNECTINGContexts().adjustCount(
1276 remoteContextIdentity, -1);
1277 }
1278 throw e;
1279 }
1280 }
1281 else
1282 {
1283 if (getLog().isDebugEnabled())
1284 {
1285 getLog().debug(
1286 "Not connecting to (already connected or not required) "
1287 + safeToString(event.getConnectionParameters()));
1288 }
1289 }
1290 }
1291
1292 @Override
1293 public AsyncLog getLog()
1294 {
1295 return LOG;
1296 }
1297 }
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310 final class ConnectionAvailableEventHandler extends
1311 DistributionEventHandler<ConnectionAvailableEvent>
1312 {
1313 private final static AsyncLog LOG =
1314 new AsyncLog(ConnectionAvailableEventHandler.class);
1315
1316 ConnectionAvailableEventHandler(IDistributionState state)
1317 {
1318 super(state);
1319 }
1320
1321
1322
1323
1324
1325
1326
1327
1328 @Override
1329 public void handle(ConnectionAvailableEvent event)
1330 {
1331 boolean destroyConnection = false;
1332 final IConnection connection = event.getConnection();
1333 synchronized (getState())
1334 {
1335
1336 final String remoteContextIdentity =
1337 connection.getRemoteContextIdentity();
1338 if (getState().getConnectedContexts().getCount(
1339 remoteContextIdentity) > 0)
1340 {
1341
1342 final int compareTo =
1343 getState().getContext().getIdentity().compareTo(
1344 connection.getRemoteContextIdentity());
1345 if (compareTo > 0)
1346 {
1347 destroyConnection = true;
1348 }
1349 else
1350 {
1351 if (compareTo == 0)
1352 {
1353
1354
1355 if (getState().getContext().getContextHashCode() > connection.getRemoteContextHashCode())
1356 {
1357 destroyConnection = true;
1358 }
1359 }
1360 }
1361 }
1362
1363
1364
1365
1366
1367
1368 final IConnectionParameters connectionParameters =
1369 getState().getDiscoveredContexts().get(
1370 connection.getRemoteContextIdentity());
1371 if (connection.isOutbound() && connectionParameters != null
1372 && !connectionParameters.isEqual(connection))
1373 {
1374 destroyConnection = true;
1375 if (getLog().isDebugEnabled())
1376 {
1377 getLog().debug(
1378 "Destroying connection " + safeToString(connection)
1379 + ", parameters have changed to "
1380 + safeToString(connectionParameters));
1381 }
1382 }
1383 if (!destroyConnection)
1384 {
1385
1386 getState().getConnectedContexts().adjustCount(
1387 remoteContextIdentity, 1);
1388
1389 getState().getChannelFactory().createChannel(connection,
1390 getState().getContext()).start();
1391 }
1392
1393 getState().getCONNECTINGContexts().adjustCount(
1394 connection.getRemoteContextIdentity(), -1);
1395 }
1396 if (destroyConnection)
1397 {
1398 if (getLog().isTraceEnabled())
1399 {
1400 getLog().trace(
1401 "Destroying duplicate new connection "
1402 + safeToString(connection)
1403 + ", current connections are "
1404 + CollectionUtils.toFormattedString(getState().getChannels()));
1405 }
1406
1407
1408
1409
1410
1411 connection.destroy();
1412 }
1413 }
1414
1415 @Override
1416 public AsyncLog getLog()
1417 {
1418 return LOG;
1419 }
1420 }
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432 class ConnectionDestroyedEventHandler extends
1433 DistributionEventHandler<ConnectionDestroyedEvent>
1434 {
1435 private final static AsyncLog LOG =
1436 new AsyncLog(ConnectionDestroyedEventHandler.class);
1437
1438 ConnectionDestroyedEventHandler(IDistributionState state)
1439 {
1440 super(state);
1441 }
1442
1443
1444
1445
1446
1447
1448
1449 @Override
1450 public void handle(ConnectionDestroyedEvent event)
1451 {
1452
1453 final IConnectionDiscoverer connectionDiscoverer =
1454 getState().getContext().getConnectionDiscoverer();
1455 if (connectionDiscoverer != null)
1456 {
1457 connectionDiscoverer.connectionDestroyed(event.getRemoteContextIdentity());
1458 }
1459 IChannel channel = null;
1460 final String remoteContextIdentity = event.getRemoteContextIdentity();
1461 synchronized (getState())
1462 {
1463
1464 final Map<String, IChannel> copy =
1465 CollectionFactory.newMap(getState().getChannels());
1466 channel = copy.remove(remoteContextIdentity);
1467 getState().setChannels(copy);
1468 getState().getConnectedContexts().adjustCount(
1469 remoteContextIdentity, -1);
1470 }
1471 if (channel != null)
1472 {
1473 if (getLog().isInfoEnabled())
1474 {
1475 getLog().info("Destroying " + safeToString(channel));
1476 }
1477
1478
1479 channel.destroy();
1480 }
1481
1482
1483
1484
1485
1486
1487
1488 }
1489
1490 @Override
1491 public AsyncLog getLog()
1492 {
1493 return LOG;
1494 }
1495 }
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505 class ChannelReadyEventHandler extends
1506 DistributionEventHandler<ChannelReadyEvent>
1507 {
1508
1509 private final static AsyncLog LOG =
1510 new AsyncLog(ChannelReadyEventHandler.class);
1511
1512 ChannelReadyEventHandler(IDistributionState state)
1513 {
1514 super(state);
1515 }
1516
1517
1518
1519
1520
1521 @Override
1522 public void handle(ChannelReadyEvent event)
1523 {
1524 final IChannel channel = event.getChannel();
1525
1526 final List<DualValue<ISubscriptionParameters, IEventListener>> list;
1527 synchronized (getState())
1528 {
1529
1530 final IConnectionParameters connectionParameters =
1531 getState().getDiscoveredContexts().get(
1532 channel.getRemoteContextIdentity());
1533
1534
1535
1536 if (!validateChannelConnection(channel, connectionParameters))
1537 {
1538 return;
1539 }
1540
1541
1542 final Map<String, IChannel> copy =
1543 CollectionFactory.newMap(getState().getChannels());
1544 copy.put(channel.getRemoteContextIdentity(), channel);
1545 getState().setChannels(copy);
1546
1547
1548 list =
1549 CollectionFactory.newList(getState().getRemoteSubscriptions().get(
1550 channel.getRemoteContextIdentity()));
1551 }
1552 if (list.size() > 0)
1553 {
1554 if (getLog().isInfoEnabled())
1555 {
1556 getLog().info(
1557 "Sending subscriptions to new channel "
1558 + safeToString(channel) + COLON
1559 + CollectionUtils.toFormattedString(list));
1560 }
1561 for (DualValue<ISubscriptionParameters, IEventListener> values : list)
1562 {
1563 try
1564 {
1565
1566
1567
1568 getState().getContext().queueEvent(
1569 new RemoteContainerSubscriptionEvent(
1570 getState().getContext(),
1571 channel.getRemoteContextIdentity(), values));
1572 }
1573 catch (Exception e)
1574 {
1575 logException(getLog(), values, e);
1576 }
1577 }
1578 }
1579
1580
1581 StringBuilder sb = new StringBuilder();
1582 final IChannel[] connectedChannels =
1583 getState().getContext().getConnectedChannels();
1584 for (IChannel channel2 : connectedChannels)
1585 {
1586 sb.append(SystemUtils.LINE_SEPARATOR).append(SPACING_4_CHARS).append(
1587 safeToString(channel2.toString()));
1588 }
1589 if (connectedChannels.length == 0)
1590 {
1591 sb.append(SystemUtils.LINE_SEPARATOR).append(SPACING_4_CHARS).append(
1592 "<UNCONNECTED>");
1593 }
1594 if (getLog().isInfoEnabled())
1595 {
1596 getLog().info(
1597 SystemUtils.LINE_SEPARATOR + getState().getContext()
1598 + " connected to:" + sb);
1599 }
1600 }
1601
1602 @Override
1603 public AsyncLog getLog()
1604 {
1605 return LOG;
1606 }
1607 }
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617 final class RemoteContainerSubscriptionEvent extends AbstractSystemEvent
1618 {
1619
1620 final String remoteContextIdentity;
1621
1622
1623 final ISubscriptionParameters parameters;
1624
1625
1626 final IEventListener listener;
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640 public RemoteContainerSubscriptionEvent(IEventManager context,
1641 String remoteContextIdentity,
1642 DualValue<ISubscriptionParameters, IEventListener> values)
1643 {
1644 super(context);
1645 this.remoteContextIdentity = remoteContextIdentity;
1646 this.parameters = values.getFirst();
1647 this.listener = values.getSecond();
1648 }
1649
1650
1651
1652
1653
1654
1655 public String getRemoteContextIdentity()
1656 {
1657 return remoteContextIdentity;
1658 }
1659
1660
1661
1662
1663
1664
1665 public ISubscriptionParameters getParameters()
1666 {
1667 return parameters;
1668 }
1669
1670
1671
1672
1673
1674
1675 public IEventListener getListener()
1676 {
1677 return listener;
1678 }
1679
1680 protected String getAdditionalToString()
1681 {
1682 return "subcriptionParameters=" + getParameters();
1683 }
1684 }
1685
1686
1687
1688
1689
1690
1691
1692
1693 final class RemoteContainerSubscriptionEventHandler extends
1694 DistributionEventHandler<RemoteContainerSubscriptionEvent>
1695 {
1696 private final static AsyncLog LOG =
1697 new AsyncLog(RemoteContainerSubscriptionEventHandler.class);
1698
1699 RemoteContainerSubscriptionEventHandler(IDistributionState state)
1700 {
1701 super(state);
1702 }
1703
1704
1705
1706
1707
1708 @Override
1709 public void handle(RemoteContainerSubscriptionEvent event)
1710 {
1711 final IChannel channel =
1712 getState().getChannels().get(event.getRemoteContextIdentity());
1713 if (channel != null)
1714 {
1715
1716 channel.subscribe(event.getParameters());
1717
1718 channel.addListener(event.getParameters(), event.getListener());
1719 }
1720 }
1721
1722 @Override
1723 public AsyncLog getLog()
1724 {
1725 return LOG;
1726 }
1727 }
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739 final class RemoteEventSourceNotObservedEventHandler extends
1740 DistributionEventHandler<EventSourceNotObservedEvent>
1741 {
1742 private final static AsyncLog LOG =
1743 new AsyncLog(RemoteEventSourceNotObservedEventHandler.class);
1744
1745 RemoteEventSourceNotObservedEventHandler(IDistributionState state)
1746 {
1747 super(state);
1748 }
1749
1750
1751
1752
1753 @Override
1754 public void handle(EventSourceNotObservedEvent event)
1755 {
1756 final IFrameworkContext context = getState().getContext();
1757 if (context.containsRemoteContainer(event.getNativeContextIdentity(),
1758 event.getIdentity(), event.getType(), event.getDomain()))
1759 {
1760 final IContainer remoteContainer =
1761 context.getRemoteContainer(event.getNativeContextIdentity(),
1762 event.getIdentity(), event.getType(), event.getDomain());
1763
1764 if (!remoteContainer.isLocal())
1765 {
1766 if (getLog().isDebugEnabled())
1767 {
1768 getLog().debug(
1769 "Removing " + remoteContainer.toIdentityString());
1770 }
1771 context.removeContainer(remoteContainer);
1772 }
1773 }
1774 }
1775
1776 @Override
1777 protected AsyncLog getLog()
1778 {
1779 return LOG;
1780 }
1781 }