1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package fulmine.context;
17
18 import static fulmine.util.Utils.CLOSE_BRACE;
19 import static fulmine.util.Utils.COLON;
20 import static fulmine.util.Utils.OPEN_BRACE;
21 import static fulmine.util.Utils.SPACE;
22 import static fulmine.util.Utils.logException;
23 import static fulmine.util.Utils.nullCheck;
24
25 import java.net.InetAddress;
26 import java.net.UnknownHostException;
27 import java.util.Collection;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Set;
31 import java.util.TimerTask;
32
33 import org.apache.commons.lang.SystemUtils;
34
35 import fulmine.AbstractLifeCycle;
36 import fulmine.Domain;
37 import fulmine.IDomain;
38 import fulmine.ILifeCycle;
39 import fulmine.IType;
40 import fulmine.Type;
41 import fulmine.distribution.IDistributionManager;
42 import fulmine.distribution.IHeartbeatMonitor;
43 import fulmine.distribution.channel.IChannel;
44 import fulmine.distribution.connection.IConnectionBroker;
45 import fulmine.distribution.connection.IConnectionDiscoverer;
46 import fulmine.distribution.connection.IConnectionParameters;
47 import fulmine.distribution.events.ContextDiscoveredEvent;
48 import fulmine.event.IEvent;
49 import fulmine.event.listener.AbstractEventHandler;
50 import fulmine.event.listener.IEventListener;
51 import fulmine.event.listener.MultiSystemEventListener;
52 import fulmine.event.subscription.ISubscriptionListener;
53 import fulmine.event.system.ISystemEvent;
54 import fulmine.event.system.ISystemEventListener;
55 import fulmine.event.system.ISystemEventSource;
56 import fulmine.model.container.IContainer;
57 import fulmine.model.container.IContainerFactory;
58 import fulmine.model.container.events.ContainerStateChangeEvent;
59 import fulmine.model.container.impl.Record;
60 import fulmine.model.field.BooleanField;
61 import fulmine.model.field.IField;
62 import fulmine.model.field.IntegerField;
63 import fulmine.model.field.LongField;
64 import fulmine.protocol.specification.IFrameReader;
65 import fulmine.protocol.specification.IFrameWriter;
66 import fulmine.rpc.IRpcDefinition;
67 import fulmine.rpc.IRpcHandler;
68 import fulmine.rpc.IRpcMarker;
69 import fulmine.rpc.IRpcPublicationListener;
70 import fulmine.rpc.IRpcResult;
71 import fulmine.rpc.IRpcResultHandler;
72 import fulmine.util.collection.CollectionFactory;
73 import fulmine.util.collection.CollectionUtils;
74 import fulmine.util.log.AsyncLog;
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176 public class FTContext extends AbstractLifeCycle implements IFrameworkContext
177 {
178 final static AsyncLog LOG = new AsyncLog(FTContext.class);
179
180
181 private static final String DELIMITER = COLON;
182
183
184
185
186
187
188 private final long activationDelay;
189
190
191 static final String FT_ID_PREFIX = "FTContext:";
192
193
194
195
196
197
198 static final String CONFIRMED_ACTIVE_FIELD = "confirmedActive";
199
200
201
202
203
204 static final String PROVISIONAL_ACTIVE_FIELD = "provisionalActive";
205
206
207
208
209
210 static final String FT_NUMBER_FIELD = "FTNumber";
211
212
213
214
215
216 static final String START_TIME_FIELD = "startTime";
217
218
219 public static final long HEARTBEAT_MULTIPLIER = 4;
220
221
222
223
224
225
226
227 private final class ContextDiscoveredEventHandler extends
228 AbstractEventHandler<ContextDiscoveredEvent> implements
229 ISystemEventListener
230 {
231 @Override
232 protected AsyncLog getLog()
233 {
234 return LOG;
235 }
236
237 @Override
238 public void handle(ContextDiscoveredEvent event)
239 {
240 IConnectionParameters params = event.getConnectionParameters();
241 if (params.getRemoteContextIdentity().startsWith(
242 getClusterIdentity()))
243 {
244
245
246 FTContext.this.getFtWorkerContext().subscribe(
247 params.getRemoteContextIdentity(),
248 params.getRemoteContextIdentity(), Type.SYSTEM,
249 Domain.FRAMEWORK, FTContext.this.getFtLogicHandler());
250 }
251 }
252 }
253
254
255
256
257
258
259
260 private final class ContextInfoHandler extends AbstractEventHandler<Record>
261 {
262 @Override
263 protected AsyncLog getLog()
264 {
265 return LOG;
266 }
267
268 @Override
269 public void handle(Record event)
270 {
271
272 if (event.getIdentity().startsWith(getClusterIdentity()))
273 {
274 synchronized (FTContext.this.contextInfos)
275 {
276
277
278
279
280
281 FTContext.this.contextInfos.remove(event);
282 FTContext.this.contextInfos.add(event);
283 }
284 FTContext.this.getStateManager().revalidate();
285 }
286 }
287 }
288
289
290
291
292
293
294
295 private final class ContextInfoStateChangeHandler extends
296 AbstractEventHandler<ContainerStateChangeEvent>
297 {
298
299 @Override
300 protected AsyncLog getLog()
301 {
302 return LOG;
303 }
304
305 @Override
306 public void handle(ContainerStateChangeEvent event)
307 {
308
309 final IContainer container = event.getContainer();
310 handleContextInfo(container);
311 }
312
313 private void handleContextInfo(final IContainer container)
314 {
315 boolean run = false;
316
317 synchronized (FTContext.this.contextInfos)
318 {
319 if (FTContext.this.contextInfos.contains(container))
320 {
321
322
323 FTContext.this.contextInfos.remove(container);
324 FTContext.this.contextInfos.add(container);
325 run = true;
326 }
327 }
328 if (run)
329 {
330 FTContext.this.getStateManager().revalidate();
331 }
332 }
333
334 }
335
336
337
338
339
340
341
342
343 private interface StateLogic extends Runnable
344 {
345
346
347
348
349 void validate();
350 }
351
352
353
354
355
356
357
358
359
360
361
362 private abstract class AbstractStateLogic implements StateLogic
363 {
364 public final void run()
365 {
366 if (FTContext.this.activationPeriodActive)
367 {
368 if (LOG.isDebugEnabled())
369 {
370 LOG.debug(FTContext.this
371 + " in activation period, no state validation will occur.");
372 }
373 return;
374 }
375 synchronized (FTContext.this.contextInfos)
376 {
377 if (LOG.isDebugEnabled())
378 {
379 LOG.debug(FTContext.this + " start");
380 }
381 List<IContainer> confirmedActiveContexts =
382 CollectionFactory.newList();
383 List<IContainer> provisionalActiveContexts =
384 CollectionFactory.newList();
385 getContexts(confirmedActiveContexts, provisionalActiveContexts);
386 doValidate(confirmedActiveContexts, provisionalActiveContexts);
387 if (LOG.isDebugEnabled())
388 {
389 LOG.debug(FTContext.this + " finish");
390 }
391 }
392 }
393
394 public final void validate()
395 {
396
397 FTContext.this.execute(this);
398 }
399
400
401
402
403
404
405
406
407
408
409 private void getContexts(List<IContainer> confirmedActiveContexts,
410 List<IContainer> provisionalActiveContexts)
411 {
412 synchronized (FTContext.this.contextInfos)
413 {
414 FTContext.this.logFTContexts();
415
416 for (IContainer contextInfo : FTContext.this.contextInfos)
417 {
418 if (contextInfo.getDataState() == IContainer.DataState.LIVE)
419 {
420 if (contextInfo.getBooleanField(
421 FTContext.CONFIRMED_ACTIVE_FIELD).get())
422 {
423 confirmedActiveContexts.add(contextInfo);
424 }
425 else
426 {
427 if (contextInfo.getBooleanField(
428 FTContext.PROVISIONAL_ACTIVE_FIELD).get())
429 {
430 provisionalActiveContexts.add(contextInfo);
431 }
432 }
433 }
434 }
435 }
436 logActiveContexts(confirmedActiveContexts, "confirmed");
437 logActiveContexts(provisionalActiveContexts, "provisional");
438 }
439
440
441
442
443
444
445
446
447
448 boolean checkForPriority(List<IContainer> otherContexts)
449 {
450 for (IContainer record : otherContexts)
451 {
452 final int otherFTNumber =
453 record.getIntegerField(FT_NUMBER_FIELD).get();
454 if (otherFTNumber < FTContext.this.ftNumber)
455 {
456 return false;
457 }
458
459
460 if (otherFTNumber == FTContext.this.ftNumber)
461 {
462 if (record.getIdentity().compareTo(
463 FTContext.this.getClusterIdentity()) > 0)
464 {
465 return false;
466 }
467 }
468 }
469 return true;
470 }
471
472
473
474
475
476
477
478 void logActiveContexts(List<IContainer> contexts, String type)
479 {
480 if (LOG.isDebugEnabled())
481 {
482 if (contexts.size() > 0)
483 {
484 LOG.debug(FTContext.this + " found other " + type
485 + " active FTContexts: "
486 + CollectionUtils.toFormattedString(contexts));
487 }
488 else
489 {
490 LOG.debug(FTContext.this + " found no other " + type
491 + " active FTContexts.");
492 }
493 }
494 }
495
496 @Override
497 public String toString()
498 {
499 return getClass().getSimpleName();
500 }
501
502
503
504
505
506
507
508
509
510
511
512
513 abstract void doValidate(List<IContainer> confirmedActiveContexts,
514 List<IContainer> provisionalActiveContexts);
515 }
516
517
518
519
520
521
522
523 private final class Standby extends AbstractStateLogic
524 {
525 Standby()
526 {
527 FTContext.this.getStateListener().willStandby();
528 FTContext.this.setFTContextState(FTContext.CONFIRMED_ACTIVE_FIELD,
529 false);
530 logState();
531 logFTContexts();
532 FTContext.this.standby();
533 FTContext.this.getStateListener().didStandby();
534 }
535
536
537
538
539
540 void doValidate(List<IContainer> confirmedActiveContexts,
541 List<IContainer> provisionalActiveContexts)
542 {
543 boolean shouldBeProvisionallyActive = true;
544
545 if (confirmedActiveContexts.size() > 0)
546 {
547
548 shouldBeProvisionallyActive = false;
549 }
550 else
551 {
552
553 shouldBeProvisionallyActive =
554 checkForPriority(provisionalActiveContexts);
555 }
556 FTContext.this.logStateChange(shouldBeProvisionallyActive,
557 "provisional");
558 if (shouldBeProvisionallyActive)
559 {
560 FTContext.this.getStateManager().setState(
561 StateEnum.ProvisionalActive, FTContext.this);
562 }
563 else
564 {
565 if (LOG.isDebugEnabled())
566 {
567 LOG.debug(FTContext.this + " remaining in state " + this);
568 }
569
570 if (confirmedActiveContexts.size() == 1)
571 {
572
573 if (!confirmedActiveContexts.get(0).getIdentity().equals(
574 FTContext.this.activeFTContextIdentity))
575 {
576 if (LOG.isInfoEnabled())
577 {
578 LOG.info(FTContext.this
579 + ": the active FTContext has changed from "
580 + FTContext.this.activeFTContextIdentity
581 + " to "
582 + confirmedActiveContexts.get(0).getIdentity());
583 }
584
585 FTContext.this.activeFTContextIdentity =
586 confirmedActiveContexts.get(0).getIdentity();
587 }
588 }
589 else
590 {
591 if (LOG.isInfoEnabled())
592 {
593 LOG.info(FTContext.this
594 + " found "
595 + (confirmedActiveContexts.size() == 0 ? "zero"
596 : "multiple")
597 + " confirmed active FTContexts,"
598 + " setting current 'known' active"
599 + " FTContext to null."
600 + CollectionUtils.toFormattedString(confirmedActiveContexts));
601 }
602
603 FTContext.this.activeFTContextIdentity = null;
604 }
605 }
606 }
607 }
608
609
610
611
612
613
614
615
616
617
618
619
620 private final class ProvisionalActive extends AbstractStateLogic
621 {
622
623 ProvisionalActive()
624 {
625 FTContext.this.setFTContextState(
626 FTContext.PROVISIONAL_ACTIVE_FIELD, true);
627 }
628
629 @Override
630 void doValidate(List<IContainer> confirmedActiveContexts,
631 List<IContainer> provisionalActiveContexts)
632 {
633 boolean shouldBeConfirmedActive = true;
634
635 if (confirmedActiveContexts.size() > 0)
636 {
637 if (LOG.isInfoEnabled())
638 {
639 LOG.info(FTContext.this
640 + " found other confirmed active contexts,"
641 + " switching to standby state.");
642 }
643 FTContext.this.getStateManager().setState(StateEnum.Standby,
644 FTContext.this);
645 }
646 else
647 {
648
649 shouldBeConfirmedActive =
650 checkForPriority(provisionalActiveContexts);
651 }
652 FTContext.this.logStateChange(shouldBeConfirmedActive, "confirmed");
653 if (shouldBeConfirmedActive)
654 {
655 if (confirmedActiveContexts.size() == 0)
656 {
657
658 FTContext.this.getStateManager().setState(
659 StateEnum.Confirmed, FTContext.this);
660 }
661 else
662 {
663
664 if (LOG.isInfoEnabled())
665 {
666 LOG.info(this
667 + " waiting for other confirmed active"
668 + " FTContexts to become inactive: "
669 + CollectionUtils.toFormattedString(confirmedActiveContexts));
670 }
671 }
672 }
673 else
674 {
675 FTContext.this.getStateManager().setState(StateEnum.Standby,
676 FTContext.this);
677 }
678 }
679 }
680
681
682
683
684
685
686
687
688
689
690
691 private final class Startup extends AbstractStateLogic
692 {
693 Startup()
694 {
695 FTContext.this.setFTContextState(PROVISIONAL_ACTIVE_FIELD, false);
696 }
697
698 @Override
699 void doValidate(List<IContainer> confirmedActiveContexts,
700 List<IContainer> provisionalActiveContexts)
701 {
702 FTContext.this.activeFTContextIdentity = null;
703 boolean shouldBeProvisionalActive = true;
704 IContainer currentActiveContext = null;
705 FTContext.this.logFTContexts();
706 for (IContainer contextInfo : FTContext.this.contextInfos)
707 {
708 if (contextInfo.getDataState() == IContainer.DataState.LIVE)
709 {
710 if (contextInfo.getBooleanField(
711 FTContext.CONFIRMED_ACTIVE_FIELD).get())
712 {
713 currentActiveContext = contextInfo;
714 }
715 if (contextInfo.getIntegerField(FTContext.FT_NUMBER_FIELD).get() < FTContext.this.ftNumber)
716 {
717 shouldBeProvisionalActive = false;
718 }
719 }
720 }
721 FTContext.this.logStateChange(shouldBeProvisionalActive,
722 "provisional");
723 if (currentActiveContext == null && shouldBeProvisionalActive)
724 {
725 if (!FTContext.this.isFTContextActive())
726 {
727
728
729
730 FTContext.this.setFTContextState(
731 FTContext.PROVISIONAL_ACTIVE_FIELD, true);
732 FTContext.this.activationPeriodActive = true;
733
734
735 new Thread(new Runnable()
736 {
737 public void run()
738 {
739 synchronized (this)
740 {
741 if (LOG.isInfoEnabled())
742 {
743 LOG.info(FTContext.this + " waiting for "
744 + FTContext.this.activationDelay
745 + "ms for other FTContexts"
746 + " to be discovered.");
747 }
748 try
749 {
750 this.wait(FTContext.this.activationDelay);
751 }
752 catch (InterruptedException e)
753 {
754 logException(LOG, getFtContextIdentity(), e);
755 }
756 }
757 FTContext.this.activationPeriodActive = false;
758
759 FTContext.this.getStateManager().setState(
760 StateEnum.ProvisionalActive, FTContext.this);
761 }
762
763 }, "Provisional activation thread for " + FTContext.this).start();
764 }
765 }
766 else
767 {
768
769 FTContext.this.getStateManager().setState(StateEnum.Standby,
770 FTContext.this);
771 }
772 }
773 }
774
775
776
777
778
779
780
781
782
783 private final class Confirmed extends AbstractStateLogic
784 {
785 Confirmed()
786 {
787 FTContext.this.getStateListener().willActivate();
788 setFTContextState(CONFIRMED_ACTIVE_FIELD, true);
789 logState();
790 logFTContexts();
791 FTContext.this.activeFTContextIdentity =
792 FTContext.this.getFtContextIdentity();
793 FTContext.this.activate();
794 FTContext.this.getStateListener().didActivate();
795 }
796
797 @Override
798 void doValidate(List<IContainer> confirmedActiveContexts,
799 List<IContainer> provisionalActiveContexts)
800 {
801
802
803
804 if (!checkForPriority(confirmedActiveContexts))
805 {
806 if (LOG.isInfoEnabled())
807 {
808 LOG.info(this
809 + " found other confirmed active"
810 + " FTContexts so restarting activation sequence: "
811 + CollectionUtils.toFormattedString(confirmedActiveContexts));
812 }
813
814 FTContext.this.getStateManager().setState(StateEnum.Startup,
815 FTContext.this);
816 }
817 else
818 {
819 if (LOG.isInfoEnabled())
820 {
821 LOG.info(FTContext.this
822 + " should be the confirmed active versus others: "
823 + CollectionUtils.toFormattedString(confirmedActiveContexts));
824 }
825 }
826 }
827 }
828
829
830
831
832
833
834
835 private final class StateManager
836 {
837
838 StateEnum currentState;
839
840
841 private FTContext.StateLogic stateLogic;
842
843
844
845
846
847
848
849
850
851
852
853 void setState(StateEnum newState, FTContext context)
854 {
855 boolean stateChanged = false;
856 synchronized (FTContext.this.contextInfos)
857 {
858 if (newState != this.currentState)
859 {
860 if (FTContext.LOG.isDebugEnabled())
861 {
862 FTContext.LOG.debug(context + " switching from "
863 + this.currentState + " to " + newState);
864 }
865 this.currentState = newState;
866 stateChanged = true;
867 }
868 }
869 if (stateChanged)
870 {
871
872 final FTContext.StateLogic createdState =
873 this.currentState.create(context);
874 synchronized (FTContext.this.contextInfos)
875 {
876 this.stateLogic = createdState;
877 revalidate();
878 }
879 }
880 }
881
882
883
884
885
886
887 void revalidate()
888 {
889 synchronized (FTContext.this.contextInfos)
890 {
891
892 if (this.stateLogic != null)
893 {
894 this.stateLogic.validate();
895 }
896 }
897 }
898 }
899
900
901
902
903
904
905 private enum StateEnum
906 {
907 Startup
908 {
909 public FTContext.StateLogic create(FTContext context)
910 {
911 return context.new Startup();
912 }
913 },
914 Standby
915 {
916 public FTContext.StateLogic create(FTContext context)
917 {
918 return context.new Standby();
919 }
920 },
921 ProvisionalActive
922 {
923 public FTContext.StateLogic create(FTContext context)
924 {
925 return context.new ProvisionalActive();
926 }
927 },
928 Confirmed
929 {
930 public FTContext.StateLogic create(FTContext context)
931 {
932 return context.new Confirmed();
933 }
934 };
935
936 public abstract FTContext.StateLogic create(FTContext context);
937 }
938
939
940 private final IFrameworkContext appContext;
941
942
943
944
945
946
947
948 private final IFrameworkContext ftWorkerContext;
949
950
951 private final IFTContextStateListener stateListener;
952
953
954
955
956
957 private volatile boolean activationPeriodActive;
958
959
960 private final StateManager stateManager;
961
962
963
964
965
966 private final MultiSystemEventListener ftLogicHandler;
967
968
969
970
971
972
973 private final String ftContextIdentity;
974
975
976
977
978
979
980 private final int ftNumber;
981
982
983
984
985
986 private final Set<IContainer> contextInfos;
987
988
989
990
991
992
993
994
995
996 Record contextInfo;
997
998
999 String activeFTContextIdentity;
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032 @SuppressWarnings("unchecked")
1033 public FTContext(IFrameworkContext appContext,
1034 IFTContextStateListener stateListener, INetwork ftNetwork, int number,
1035 long activationDelayMillis)
1036 {
1037 nullCheck(stateListener, "No FTContext state listener provided");
1038 nullCheck(ftNetwork, "No FT network provided");
1039 this.appContext = appContext;
1040
1041
1042 final FTDistributionManager distributionManager =
1043 new FTDistributionManager(appContext, this);
1044 getAppContext().setDistributionManager(distributionManager);
1045 distributionManager.start();
1046
1047 this.stateListener = stateListener;
1048 this.ftNumber = number;
1049 this.activationDelay = activationDelayMillis;
1050 this.stateManager = new StateManager();
1051 try
1052 {
1053 this.ftContextIdentity =
1054 getClusterIdentity() + DELIMITER + this.ftNumber + DELIMITER
1055 + InetAddress.getLocalHost().getHostAddress() + DELIMITER
1056 + System.identityHashCode(getAppContext()) + DELIMITER
1057 + System.nanoTime();
1058 }
1059 catch (UnknownHostException e)
1060 {
1061 throw new RuntimeException(
1062 "Could not get the local host IP address for context: "
1063 + getAppContext().getIdentity(), e);
1064 }
1065 this.ftWorkerContext = new FulmineContext(ftContextIdentity, 2);
1066 getFtWorkerContext().setNetwork(ftNetwork);
1067
1068 this.contextInfos = CollectionFactory.newSet();
1069 Map<Class<? extends IEvent>, IEventListener> listeners =
1070 AbstractEventHandler.getEventHandlerMappings(
1071 new ContextInfoStateChangeHandler(),
1072 new ContextDiscoveredEventHandler(), new ContextInfoHandler());
1073 this.ftLogicHandler =
1074 new MultiSystemEventListener(getFtContextIdentity(),
1075 getFtWorkerContext(), listeners);
1076 }
1077
1078
1079 private IContextWatchdog watchdog;
1080
1081 public IContextWatchdog getContextWatchdog()
1082 {
1083 return this.watchdog;
1084 }
1085
1086 public void setContextWatchdog(IContextWatchdog watchdog)
1087 {
1088 this.watchdog = watchdog;
1089 }
1090
1091 protected final void doStart()
1092 {
1093 validate();
1094
1095 prepareAppContextForStart();
1096 getAppContext().start();
1097
1098 getFtWorkerContext().start();
1099
1100 createContextInfoRecord();
1101
1102 getStateManager().setState(StateEnum.Startup, this);
1103
1104 getStateListener().start();
1105 getFtLogicHandler().start();
1106 }
1107
1108
1109
1110
1111
1112
1113 protected void prepareAppContextForStart()
1114 {
1115
1116 getNetwork().setListeningOnlyMode(true);
1117 }
1118
1119 protected final void doDestroy()
1120 {
1121 try
1122 {
1123 getFtWorkerContext().getSystemEventSource(
1124 ContextDiscoveredEvent.class).removeListener(
1125 getFtLogicHandler());
1126 }
1127 catch (Exception e)
1128 {
1129 logException(LOG, "clusteringLogic", e);
1130 }
1131 getFtWorkerContext().destroy();
1132 try
1133 {
1134 getStateListener().destroy();
1135 }
1136 catch (Exception e)
1137 {
1138 logException(LOG, "statelistener", e);
1139 }
1140 getAppContext().destroy();
1141 }
1142
1143
1144
1145
1146
1147 protected void standby()
1148 {
1149 doStandby(getAppContext(), getAppContext().getNetwork(),
1150 getAppContext().getConnectionDiscoverer(),
1151 getAppContext().getConnectionBroker());
1152 }
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167 protected final void doStandby(IFrameworkContext context, INetwork network,
1168 IConnectionDiscoverer discoverer, IConnectionBroker connectionBroker)
1169 {
1170 nullCheck(discoverer, "no discoverer");
1171 nullCheck(network, "no network");
1172 nullCheck(context, "no context");
1173 discoverer.disablePulsing();
1174
1175 try
1176 {
1177 if (connectionBroker != null)
1178 {
1179 connectionBroker.destroy();
1180 }
1181
1182 context.setConnectionBroker(network.createBroker());
1183 context.getConnectionBroker().start();
1184 }
1185 catch (Exception e)
1186 {
1187 logException(LOG, FTContext.this, e);
1188 }
1189 }
1190
1191
1192
1193
1194
1195 protected void activate()
1196 {
1197
1198 getConnectionDiscoverer().enablePulsing();
1199 }
1200
1201 private void validate()
1202 {
1203 final INetwork network = getNetwork();
1204 nullCheck(network, "No network provided for " + getAppContext());
1205
1206
1207 if (this.activationDelay < getNetwork().getNetworkHeartbeatPeriod() * 3)
1208 {
1209 throw new IllegalStateException(
1210 "The activation delay of an FTContext must be at "
1211 + "least "
1212 + FTContext.HEARTBEAT_MULTIPLIER
1213 + "x the FT heartbeat period. The activationDelay="
1214 + this.activationDelay
1215 + "ms, FT heartbeatPeriod="
1216 + getFtWorkerContext().getNetwork().getNetworkHeartbeatPeriod()
1217 + "ms. "
1218 + "Either increase the activationDelay or reduce the heartbeat period for the FT network.");
1219 }
1220 }
1221
1222 @Override
1223 public String toString()
1224 {
1225 return getFtContextIdentity() + OPEN_BRACE
1226 + getStateManager().currentState + CLOSE_BRACE;
1227 }
1228
1229 @Override
1230 protected AsyncLog getLog()
1231 {
1232 return LOG;
1233 }
1234
1235
1236
1237
1238
1239
1240
1241 IFrameworkContext getFtWorkerContext()
1242 {
1243 return this.ftWorkerContext;
1244 }
1245
1246
1247
1248
1249 protected IFrameworkContext getAppContext()
1250 {
1251 return this.appContext;
1252 }
1253
1254
1255
1256
1257 private IFTContextStateListener getStateListener()
1258 {
1259 return this.stateListener;
1260 }
1261
1262
1263
1264
1265
1266 MultiSystemEventListener getFtLogicHandler()
1267 {
1268 return this.ftLogicHandler;
1269 }
1270
1271 private StateManager getStateManager()
1272 {
1273 return this.stateManager;
1274 }
1275
1276
1277
1278
1279
1280
1281 String getFtContextIdentity()
1282 {
1283 return this.ftContextIdentity;
1284 }
1285
1286
1287
1288
1289
1290
1291
1292 private String getClusterIdentity()
1293 {
1294 return FTContext.FT_ID_PREFIX + getIdentity();
1295 }
1296
1297
1298
1299
1300 private void createContextInfoRecord()
1301 {
1302 this.contextInfo =
1303 (Record) getFtWorkerContext().getLocalContainer(
1304 this.getFtContextIdentity(), Type.SYSTEM, Domain.FRAMEWORK);
1305 IntegerField contextNumber =
1306 new IntegerField(FTContext.FT_NUMBER_FIELD);
1307 contextNumber.set(this.ftNumber);
1308 LongField startTime = new LongField(FTContext.START_TIME_FIELD);
1309 startTime.set(System.currentTimeMillis());
1310 BooleanField provisionalActive =
1311 new BooleanField(FTContext.PROVISIONAL_ACTIVE_FIELD);
1312 provisionalActive.set(false);
1313 BooleanField activeContext =
1314 new BooleanField(FTContext.CONFIRMED_ACTIVE_FIELD);
1315 activeContext.set(false);
1316 this.contextInfo.add(contextNumber);
1317 this.contextInfo.add(startTime);
1318 this.contextInfo.add(activeContext);
1319 this.contextInfo.add(provisionalActive);
1320 this.contextInfo.flushFrame();
1321 }
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337 private void setFTContextState(final String stateField,
1338 final boolean stateValue)
1339 {
1340 final BooleanField provisionalState =
1341 contextInfo.getBooleanField(FTContext.PROVISIONAL_ACTIVE_FIELD);
1342 boolean wasProvisionallyActive = provisionalState.get();
1343
1344 final BooleanField confirmedState =
1345 contextInfo.getBooleanField(FTContext.CONFIRMED_ACTIVE_FIELD);
1346 boolean wasConfirmedActive = confirmedState.get();
1347
1348 boolean provisionalActive = wasProvisionallyActive;
1349 boolean confirmedActive = wasConfirmedActive;
1350 if (PROVISIONAL_ACTIVE_FIELD.equals(stateField))
1351 {
1352 provisionalActive = stateValue;
1353 if (!provisionalActive)
1354 {
1355
1356 confirmedActive = false;
1357 }
1358 }
1359 else
1360 {
1361 confirmedActive = stateValue;
1362
1363 provisionalActive = false;
1364 }
1365
1366 if (LOG.isDebugEnabled())
1367 {
1368 LOG.debug(FTContext.this + " provisional: "
1369 + (wasProvisionallyActive ? "active" : "standby") + "->"
1370 + (provisionalActive ? "active" : "standby") + ", confirmed: "
1371 + (wasConfirmedActive ? "active" : "standby") + "->"
1372 + (confirmedActive ? "active" : "standby"));
1373 }
1374 provisionalState.set(provisionalActive);
1375 confirmedState.set(confirmedActive);
1376 this.contextInfo.flushFrame();
1377 }
1378
1379
1380
1381
1382
1383 private void logState()
1384 {
1385 if (LOG.isInfoEnabled())
1386 {
1387 LOG.info(SystemUtils.LINE_SEPARATOR
1388 + "********************************************************************************"
1389 + SystemUtils.LINE_SEPARATOR
1390 + "* "
1391 + FTContext.this
1392 + " CONTEXT IS "
1393 + (this.isFTContextActive() ? "ACTIVE" : "STANDBY")
1394 + SystemUtils.LINE_SEPARATOR
1395 + "********************************************************************************");
1396 }
1397 }
1398
1399
1400
1401
1402
1403
1404
1405
1406 public boolean isFTContextActive()
1407 {
1408 final BooleanField activeField =
1409 this.contextInfo.getBooleanField(FTContext.CONFIRMED_ACTIVE_FIELD);
1410 return activeField.get();
1411 }
1412
1413
1414
1415
1416 private void logFTContexts()
1417 {
1418 synchronized (this.contextInfos)
1419 {
1420 if (LOG.isDebugEnabled())
1421 {
1422 if (this.contextInfos.size() > 0)
1423 {
1424 LOG.debug(FTContext.this + " FTContexts in cluster "
1425 + getClusterIdentity() + " are: "
1426 + CollectionUtils.toFormattedString(this.contextInfos));
1427 }
1428 else
1429 {
1430 if (LOG.isDebugEnabled())
1431 {
1432 LOG.debug(FTContext.this
1433 + " No other FTContexts exist in cluster "
1434 + getClusterIdentity());
1435 }
1436 }
1437 }
1438 }
1439 }
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449 private void logStateChange(boolean shouldBeActive, String type)
1450 {
1451 if (LOG.isDebugEnabled())
1452 {
1453 LOG.debug(FTContext.this + " is confirmed "
1454 + (FTContext.this.isFTContextActive() ? "active" : "standby")
1455 + " and should be " + type + SPACE
1456 + (shouldBeActive ? "active" : "standby"));
1457 }
1458 }
1459
1460 public void addContainer(IContainer container)
1461 {
1462 getAppContext().addContainer(container);
1463 }
1464
1465 public boolean addSubscriptionListener(ISubscriptionListener listener)
1466 {
1467 return getAppContext().addSubscriptionListener(listener);
1468 }
1469
1470 public boolean containsLocalContainer(String identityRegularExpression,
1471 IType type, IDomain domain)
1472 {
1473 return getAppContext().containsLocalContainer(
1474 identityRegularExpression, type, domain);
1475 }
1476
1477 public boolean containsRemoteContainer(String remoteContextIdentity,
1478 String containerIdentity, IType type, IDomain domain)
1479 {
1480 return getAppContext().containsRemoteContainer(remoteContextIdentity,
1481 containerIdentity, type, domain);
1482 }
1483
1484 public void execute(Runnable task)
1485 {
1486 getAppContext().execute(task);
1487 }
1488
1489 public IChannel[] getConnectedChannels()
1490 {
1491 return getAppContext().getConnectedChannels();
1492 }
1493
1494 public IConnectionBroker getConnectionBroker()
1495 {
1496 return getAppContext().getConnectionBroker();
1497 }
1498
1499 public IConnectionDiscoverer getConnectionDiscoverer()
1500 {
1501 return getAppContext().getConnectionDiscoverer();
1502 }
1503
1504 public IContainerFactory getContainerFactory()
1505 {
1506 return getAppContext().getContainerFactory();
1507 }
1508
1509 public int getContextHashCode()
1510 {
1511 return getAppContext().getContextHashCode();
1512 }
1513
1514 public int getEventProcessorCount()
1515 {
1516 return getAppContext().getEventProcessorCount();
1517 }
1518
1519 public IFrameReader getFrameReader()
1520 {
1521 return getAppContext().getFrameReader();
1522 }
1523
1524 public IFrameWriter getFrameWriter()
1525 {
1526 return getAppContext().getFrameWriter();
1527 }
1528
1529 public String getIdentity()
1530 {
1531 return getAppContext().getIdentity();
1532 }
1533
1534 public IContainer getLocalContainer(String identity, IType type,
1535 IDomain domain)
1536 {
1537 return getAppContext().getLocalContainer(identity, type, domain);
1538 }
1539
1540 public Collection<IContainer> getLocalContainers()
1541 {
1542 return getAppContext().getLocalContainers();
1543 }
1544
1545 public INetwork getNetwork()
1546 {
1547 return getAppContext().getNetwork();
1548 }
1549
1550 public ThreadGroup getEventProcessorThreadGroup()
1551 {
1552 return getAppContext().getEventProcessorThreadGroup();
1553 }
1554
1555 public IContainer getRemoteContainer(String remoteContextIdentity,
1556 String containerIdentity, IType type, IDomain domain)
1557 {
1558 return getAppContext().getRemoteContainer(remoteContextIdentity,
1559 containerIdentity, type, domain);
1560 }
1561
1562 public Collection<IContainer> getRemoteContainers(
1563 String remoteContextIdentity)
1564 {
1565 return getAppContext().getRemoteContainers(remoteContextIdentity);
1566 }
1567
1568 public ISystemEventSource getSystemEventSource(
1569 Class<? extends ISystemEvent> type)
1570 {
1571 return getAppContext().getSystemEventSource(type);
1572 }
1573
1574 public void queueEvent(IEvent event)
1575 {
1576 getAppContext().queueEvent(event);
1577 }
1578
1579 public void queueEvents(Collection<IEvent> events)
1580 {
1581 getAppContext().queueEvents(events);
1582 }
1583
1584 public boolean removeContainer(IContainer container)
1585 {
1586 return getAppContext().removeContainer(container);
1587 }
1588
1589 public boolean removeSubscriptionListener(ISubscriptionListener listener)
1590 {
1591 return getAppContext().removeSubscriptionListener(listener);
1592 }
1593
1594 public void requestRetransmit(String contextIdentity,
1595 String identityRegularExpression, IType type, IDomain domain)
1596 {
1597 getAppContext().requestRetransmit(contextIdentity,
1598 identityRegularExpression, type, domain);
1599 }
1600
1601 public void requestRetransmitAll(String contextIdentity)
1602 {
1603 getAppContext().requestRetransmitAll(contextIdentity);
1604 }
1605
1606 public void retransmit(String contextIdentity,
1607 String identityRegularExpression, IType type, IDomain domain)
1608 {
1609 getAppContext().retransmit(contextIdentity, identityRegularExpression,
1610 type, domain);
1611 }
1612
1613 public void retransmitAll(String contextIdentity)
1614 {
1615 getAppContext().retransmitAll(contextIdentity);
1616 }
1617
1618 public void retransmitAllToAll()
1619 {
1620 getAppContext().retransmitAllToAll();
1621 }
1622
1623 public void retransmitToAll(String identityRegularExpression, IType type,
1624 IDomain domain)
1625 {
1626 getAppContext().retransmitToAll(identityRegularExpression, type, domain);
1627 }
1628
1629 public void setConnectionBroker(IConnectionBroker broker)
1630 {
1631 getAppContext().setConnectionBroker(broker);
1632 }
1633
1634 public void setConnectionDiscoverer(IConnectionDiscoverer discoverer)
1635 {
1636 getAppContext().setConnectionDiscoverer(discoverer);
1637 }
1638
1639 public void setDistributionManager(IDistributionManager distributionManager)
1640 {
1641 getAppContext().setDistributionManager(distributionManager);
1642 }
1643
1644 public IDistributionManager getDistributionManager()
1645 {
1646 return getAppContext().getDistributionManager();
1647 }
1648
1649 public void setNetwork(INetwork network)
1650 {
1651 getAppContext().setNetwork(network);
1652 }
1653
1654 public boolean subscribe(String contextIdentity,
1655 String identityRegularExpression, IType type, IDomain domain,
1656 IEventListener listener)
1657 {
1658
1659
1660
1661
1662
1663 return getAppContext().subscribe(contextIdentity,
1664 identityRegularExpression, type, domain, listener);
1665 }
1666
1667 public boolean unsubscribe(String contextIdentity,
1668 String identityRegularExpression, IType type, IDomain domain,
1669 IEventListener listener)
1670 {
1671
1672
1673
1674
1675
1676 return getAppContext().unsubscribe(contextIdentity,
1677 identityRegularExpression, type, domain, listener);
1678 }
1679
1680 public String getEventProcessorIdentityPrefix()
1681 {
1682 return getAppContext().getEventProcessorIdentityPrefix();
1683 }
1684
1685 public void invokeRpc(String remoteContextIdentity, byte[] rpcData)
1686 {
1687 getAppContext().invokeRpc(remoteContextIdentity, rpcData);
1688 }
1689
1690 public boolean addRpcPublicationListener(String remoteContextIdentity,
1691 IRpcPublicationListener listener)
1692 {
1693 return getAppContext().addRpcPublicationListener(remoteContextIdentity,
1694 listener);
1695 }
1696
1697 public boolean unpublishRpcs(Class<?> definition, Object handler)
1698 {
1699 return getAppContext().unpublishRpcs(definition, handler);
1700 }
1701
1702 public boolean publishRpcs(Class<?> definition, Object handler)
1703 {
1704 return getAppContext().publishRpcs(definition, handler);
1705 }
1706
1707 public IRpcResult invoke(String remoteContextIdentity, String procedure,
1708 IField... args)
1709 {
1710 return getAppContext().invoke(remoteContextIdentity, procedure, args);
1711 }
1712
1713 public IRpcMarker invoke(IRpcResultHandler resultHandler,
1714 String remoteContextIdentity, String procedure, IField... args)
1715 {
1716 return getAppContext().invoke(resultHandler, remoteContextIdentity,
1717 procedure, args);
1718 }
1719
1720 public boolean removeRpcPublicationListener(String remoteContextIdentity,
1721 IRpcPublicationListener listener)
1722 {
1723 return getAppContext().removeRpcPublicationListener(
1724 remoteContextIdentity, listener);
1725 }
1726
1727 public boolean publishProdedure(IRpcHandler handler,
1728 IRpcDefinition rpcDefinition)
1729 {
1730 return getAppContext().publishProdedure(handler, rpcDefinition);
1731 }
1732
1733 public boolean unpublishProdedure(IRpcDefinition rpcDefinition)
1734 {
1735 return getAppContext().unpublishProdedure(rpcDefinition);
1736 }
1737
1738 public IPermissionProfile getPermissionProfile()
1739 {
1740 return getAppContext().getPermissionProfile();
1741 }
1742
1743 public void setPermissionProfile(IPermissionProfile profile)
1744 {
1745 getAppContext().setPermissionProfile(profile);
1746 }
1747
1748 public void setRemoteUpdateHandler(IRemoteUpdateHandler handler)
1749 {
1750 getAppContext().setRemoteUpdateHandler(handler);
1751 }
1752
1753 public String updateRemoteContainer(String remoteContextIdentity,
1754 String identity, IType type, IDomain domain, String fieldName,
1755 String fieldValueAsString)
1756 {
1757 return getAppContext().updateRemoteContainer(remoteContextIdentity,
1758 identity, type, domain, fieldName, fieldValueAsString);
1759 }
1760
1761 public IContainer getSystemInfo()
1762 {
1763 return getAppContext().getSystemInfo();
1764 }
1765
1766 public void schedule(TimerTask task, long delay, long period)
1767 {
1768 getAppContext().schedule(task, delay, period);
1769 }
1770 }