1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package fulmine.model.container;
17
18 import static fulmine.util.Utils.CLOSE_SQUARE;
19 import static fulmine.util.Utils.COMMA;
20 import static fulmine.util.Utils.EMPTY_STRING;
21 import static fulmine.util.Utils.EQUALS;
22 import static fulmine.util.Utils.OPEN_SQUARE;
23 import static fulmine.util.Utils.SPACE;
24 import static fulmine.util.Utils.SPACING_4_CHARS;
25 import static fulmine.util.Utils.logException;
26 import static fulmine.util.Utils.nullCheck;
27 import static fulmine.util.Utils.safeToString;
28
29 import java.util.Collection;
30 import java.util.Collections;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Set;
34 import java.util.Map.Entry;
35
36 import org.apache.commons.lang.SystemUtils;
37
38 import fulmine.IDomain;
39 import fulmine.IType;
40 import fulmine.context.IFrameworkContext;
41 import fulmine.event.EventFrameExecution;
42 import fulmine.event.IEventFrameExecution;
43 import fulmine.event.listener.IEventListener;
44 import fulmine.event.system.EventSourceNotObservedEvent;
45 import fulmine.event.system.EventSourceObservedEvent;
46 import fulmine.model.component.AbstractComponent;
47 import fulmine.model.container.events.ContainerCreatedEvent;
48 import fulmine.model.container.events.ContainerDestroyedEvent;
49 import fulmine.model.container.events.ContainerStateChangeEvent;
50 import fulmine.model.container.events.RemoteContainerCreatedEvent;
51 import fulmine.model.container.events.RemoteContainerDestroyedEvent;
52 import fulmine.model.field.BooleanField;
53 import fulmine.model.field.DoubleField;
54 import fulmine.model.field.FloatField;
55 import fulmine.model.field.IField;
56 import fulmine.model.field.IntegerField;
57 import fulmine.model.field.LongField;
58 import fulmine.model.field.StringField;
59 import fulmine.model.field.containerdefinition.IContainerDefinitionField;
60 import fulmine.protocol.specification.FieldReader;
61 import fulmine.protocol.specification.FrameReader;
62 import fulmine.protocol.specification.FrameWriter;
63 import fulmine.protocol.wire.IWireIdentity;
64 import fulmine.protocol.wire.operation.IOperationScope;
65 import fulmine.util.Utils;
66 import fulmine.util.collection.CollectionFactory;
67 import fulmine.util.log.AsyncLog;
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87 public abstract class AbstractContainer extends AbstractComponent implements
88 IContainer, Cloneable
89 {
90 private final static AsyncLog LOG = new AsyncLog(AbstractContainer.class);
91
92
93
94
95 protected class ReaderTask implements FieldReader.IFieldReaderTask
96 {
97 public void read(IOperationScope scope, int fieldId, byte[] dataBuffer,
98 int dataStart, int dataLen)
99 {
100 final AbstractContainer outer = AbstractContainer.this;
101 try
102 {
103 IField field =
104 outer.fields.get(getDefinition().getIdentityForWireCode(
105 fieldId));
106 boolean addToContainer = false;
107 if (field == null)
108 {
109
110 if (getDefinition().containsDefinition(fieldId))
111 {
112 field = getDefinition().createField(fieldId);
113 addToContainer = true;
114 }
115 }
116 if (field != null)
117 {
118 field.readState(scope, dataBuffer, dataStart, dataLen);
119 if (addToContainer)
120 {
121
122
123
124
125
126
127
128 add(field);
129 }
130 }
131 else
132 {
133
134
135
136
137 if (getLog().isInfoEnabled())
138 {
139 getLog().info(
140 "No field found for field id="
141 + fieldId
142 + ", container="
143 + outer.toIdentityString()
144 + ", fields="
145 + outer.fields
146 + ". This might be a delayed wire frame received "
147 + "AFTER the remote container was destroyed.");
148 }
149 }
150 }
151 catch (Exception e)
152 {
153
154 outer.getContext().requestRetransmit(
155 outer.getNativeContextIdentity(), outer.getIdentity(),
156 outer.getType(), outer.getDomain());
157 throw new IllegalStateException(
158 "Could not read state for field id=" + fieldId
159 + ", container=" + outer.toIdentityString()
160 + ", fields=" + outer.fields
161 + ". Has an image message been missed? "
162 + "Automatically requesting a retransmission.", e);
163 }
164 }
165
166 public void read(IOperationScope scope, String fieldId,
167 byte[] dataBuffer, int dataStart, int dataLen)
168 {
169 throw new IllegalStateException(
170 "reading SWF fields is invalid for a container");
171 }
172
173 }
174
175
176
177
178
179
180 private volatile byte remoteSubscriptionCount;
181
182
183
184
185
186
187 private final ThreadLocal<IEventFrameExecution> frameIdentifier =
188 new ThreadLocal<IEventFrameExecution>();
189
190
191
192
193
194
195 volatile Thread eventFrameThread;
196
197
198 private byte eventFrameLockCount;
199
200
201
202
203
204
205
206 private volatile Map<String, IField> fields;
207
208
209
210
211
212
213 private volatile DataState dataState;
214
215
216
217
218
219
220 private final String nativeContextIdentity;
221
222
223 private final IFrameworkContext hostContext;
224
225
226 private final boolean local;
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248 public AbstractContainer(String nativeContextIdentity, String identity,
249 IType type, IDomain domain, IFrameworkContext hostContext, boolean local)
250 {
251
252
253
254 super(
255 identity,
256 type,
257 domain,
258
259 (byte) ((type.value() % (hostContext.getEventProcessorCount() - 1)) + 1));
260 this.fields = CollectionFactory.newMap(1);
261 this.dataState = DataState.LIVE;
262 nullCheck(hostContext, "A hosted context is required");
263 Utils.nullCheck(nativeContextIdentity, "The native context is required");
264 this.hostContext = hostContext;
265 this.nativeContextIdentity = nativeContextIdentity;
266 this.local = local;
267
268 if (!isLocal())
269 {
270 this.dataState = DataState.STALE;
271 }
272 getContext().addContainer(this);
273 }
274
275 @Override
276 protected AsyncLog getLog()
277 {
278 return LOG;
279 }
280
281 @Override
282 protected final void doStart()
283 {
284 super.doStart();
285 getDefinition().populate(this);
286 if (isLocal())
287 {
288 getContext().queueEvent(
289 new ContainerCreatedEvent(getContext(), this));
290 }
291 else
292 {
293
294
295
296
297
298 final List<IEventListener> rcceListeners =
299 getContext().getSystemEventSource(
300 RemoteContainerCreatedEvent.class).getListeners();
301 for (IEventListener eventListener : rcceListeners)
302 {
303 eventListener.update(new RemoteContainerCreatedEvent(
304 getNativeContextIdentity(), getContext(), this));
305 }
306 }
307 }
308
309 @Override
310 protected void doComponentDestroy()
311 {
312 if (getLog().isInfoEnabled())
313 {
314 getLog().info("Destroying " + getAddressable());
315 }
316
317 boolean clearFrameReaderContext = false;
318 if (!FrameReader.inContext())
319 {
320 clearFrameReaderContext = true;
321 FrameReader.inContext.set(Boolean.TRUE);
322 }
323 try
324 {
325 beginFrame(new EventFrameExecution("destroy " + getAddress()));
326 try
327 {
328 super.doComponentDestroy();
329 setState(DataState.STALE);
330 final Collection<IField> values;
331
332 synchronized (this)
333 {
334 values = CollectionFactory.newList(this.fields.values());
335 this.fields = Collections.emptyMap();
336 }
337 for (IField field : values)
338 {
339 try
340 {
341 field.destroy();
342 }
343 catch (Exception e)
344 {
345 logException(getLog(), field, e);
346 }
347 }
348 if (getContext() != null && getContext().isActive())
349 {
350 if (isLocal())
351 {
352 getContext().queueEvent(
353 new ContainerDestroyedEvent(getContext(), this));
354 }
355 else
356 {
357 getContext().queueEvent(
358 new RemoteContainerDestroyedEvent(
359 getNativeContextIdentity(), getContext(), this));
360 }
361
362 getContext().removeContainer(this);
363 }
364 }
365 finally
366 {
367 endFrame();
368 }
369 }
370 finally
371 {
372 if (clearFrameReaderContext)
373 {
374 FrameReader.inContext.remove();
375 }
376 }
377 }
378
379 public final boolean isRemote()
380 {
381 return !isLocal();
382 }
383
384 public final IFrameworkContext getContext()
385 {
386 return this.hostContext;
387 }
388
389 public final String getNativeContextIdentity()
390 {
391 return this.nativeContextIdentity;
392 }
393
394 public final void add(IField field)
395 {
396 checkIsLocalOrInFrameReaderContext();
397 doAdd(field);
398 }
399
400 void doAdd(IField field)
401 {
402
403 synchronized (this)
404 {
405 beforeAdd(field);
406 final Map<String, IField> copy =
407 CollectionFactory.newMap(this.fields);
408 copy.put(field.getIdentity(), field);
409 this.fields = copy;
410 afterAdd(field);
411 }
412 field.addedToContainer(this);
413 }
414
415
416
417
418
419
420
421
422 protected void beforeAdd(IField field)
423 {
424 }
425
426
427
428
429
430
431
432
433
434 protected void afterAdd(IField field)
435 {
436 }
437
438 public final <T extends IField> T remove(T field)
439 {
440 checkIsLocalOrInFrameReaderContext();
441 return doRemove(field);
442 }
443
444 @SuppressWarnings("unchecked")
445 <T extends IField> T doRemove(T field)
446 {
447 IField removed = null;
448
449 synchronized (this)
450 {
451 beforeRemove(field);
452 final Map<String, IField> copy =
453 CollectionFactory.newMap(this.fields);
454 removed = copy.remove(field.getIdentity());
455 this.fields = copy;
456 afterRemove(field, removed);
457 }
458 field.removedFromContainer(this);
459 return (T) removed;
460 }
461
462
463
464
465
466
467
468
469 protected void beforeRemove(IField field)
470 {
471 }
472
473
474
475
476
477
478
479
480
481
482
483
484
485 protected void afterRemove(IField field, IField removed)
486 {
487
488 }
489
490 public final boolean isEmpty()
491 {
492 return this.fields.isEmpty();
493 }
494
495 public final boolean contains(String key)
496 {
497 return this.fields.containsKey(key);
498 }
499
500 public final int size()
501 {
502 return this.fields.size();
503 }
504
505 @SuppressWarnings("unchecked")
506 public final <T extends IField> T get(String identity)
507 {
508 final IField field = this.fields.get(identity);
509 return (T) field;
510 }
511
512 public final String[] getComponentIdentities()
513 {
514 final Set<String> keySet = this.fields.keySet();
515 return keySet.toArray(new String[keySet.size()]);
516 }
517
518 public final void beginFrame(IEventFrameExecution frame)
519 {
520 if (isEventFrameThread())
521 {
522 throw new IllegalStateException(
523 "Cannot start a new frame before completing an existing one");
524 }
525 checkIsLocalOrInFrameReaderContext();
526 lockFrame();
527
528 this.frameIdentifier.set(frame);
529 }
530
531 public final void endFrame()
532 {
533 checkIsLocalOrInFrameReaderContext();
534 if (isEventFrameThread())
535 {
536 try
537 {
538 doCommitEvents();
539 }
540 finally
541 {
542 unlockFrame();
543 }
544 }
545 else
546 {
547 logNotTheEventFrameThread();
548 }
549 }
550
551 public final void flushFrame()
552 {
553 if (!isEventFrameThread())
554 {
555 beginFrame(new EventFrameExecution());
556 }
557 if (isEventFrameThread())
558 {
559 endFrame();
560 }
561 else
562 {
563 logNotTheEventFrameThread();
564 }
565 }
566
567 public final boolean isFrameActive()
568 {
569 return this.eventFrameThread != null;
570 }
571
572 public final void setState(DataState stateCode)
573 {
574 DataState oldState = this.dataState;
575 boolean update = oldState != stateCode;
576 synchronized (this)
577 {
578 this.dataState = stateCode;
579 }
580 if (update)
581 {
582 doStateChangeOp(oldState);
583
584
585
586
587 if (!isEventFrameThread())
588 {
589 boolean clearFrameReaderContext = false;
590 if (!FrameReader.inContext())
591 {
592 clearFrameReaderContext = true;
593 FrameReader.inContext.set(Boolean.TRUE);
594 }
595 try
596 {
597
598 flushFrame();
599 }
600 finally
601 {
602 if (clearFrameReaderContext)
603 {
604 FrameReader.inContext.remove();
605 }
606 }
607 }
608 }
609 }
610
611
612
613
614
615
616
617 protected void doStateChangeOp(DataState oldState)
618 {
619 if (getContext() != null && getContext().isActive())
620 {
621 getContext().queueEvent(
622 new ContainerStateChangeEvent(getContext(), this,
623 this.dataState, oldState));
624 }
625 }
626
627 public final DataState getDataState()
628 {
629 return this.dataState;
630 }
631
632 public boolean isDynamic()
633 {
634 return false;
635 }
636
637 public final boolean isLocal()
638 {
639 return this.local;
640 }
641
642 public int markForRemoteSubscription()
643 {
644 int count = 0;
645
646
647 synchronized (this)
648 {
649 this.remoteSubscriptionCount++;
650 count = this.remoteSubscriptionCount;
651 }
652 if (getLog().isInfoEnabled())
653 {
654 getLog().info(
655 "[mark] remote subscription count is " + count + " for "
656 + toIdentityString());
657
658 }
659 return count;
660 }
661
662 public int unmarkForRemoteSubscription()
663 {
664 int count = 0;
665
666
667 synchronized (this)
668 {
669 this.remoteSubscriptionCount--;
670 if (this.remoteSubscriptionCount < 0)
671 {
672 this.remoteSubscriptionCount = 0;
673 }
674 count = this.remoteSubscriptionCount;
675 }
676 if (getLog().isInfoEnabled())
677 {
678 getLog().info(
679 "[unmark] remote subscription count is " + count + " for "
680 + toIdentityString());
681 }
682 return count;
683 }
684
685 public int getRemoteSubscriptionCount()
686 {
687 return this.remoteSubscriptionCount;
688 }
689
690
691
692
693
694
695
696
697
698
699 public BooleanField getBooleanField(String identity)
700 {
701 return get(identity);
702 }
703
704
705
706
707
708
709
710
711
712
713 public StringField getStringField(String identity)
714 {
715 return get(identity);
716 }
717
718
719
720
721
722
723
724
725
726
727 public IntegerField getIntegerField(String identity)
728 {
729 return get(identity);
730 }
731
732
733
734
735
736
737
738
739
740
741 public LongField getLongField(String identity)
742 {
743 return get(identity);
744 }
745
746
747
748
749
750
751
752
753
754
755 public FloatField getFloatField(String identity)
756 {
757 return get(identity);
758 }
759
760
761
762
763
764
765
766
767
768
769 public DoubleField getDoubleField(String identity)
770 {
771 return get(identity);
772 }
773
774 @Override
775 protected final void doPostAddListener(IEventListener listener)
776 {
777 super.doPostAddListener(listener);
778
779
780 if (getListeners().size() == 1)
781 {
782 getContext().queueEvent(
783 new EventSourceObservedEvent(getContext(), this));
784 }
785 }
786
787 @Override
788 protected final void doPostRemoveListener(IEventListener listener)
789 {
790 super.doPostRemoveListener(listener);
791
792 if (getListeners().size() == 0 && getContext() != null)
793 {
794 getContext().queueEvent(
795 new EventSourceNotObservedEvent(getNativeContextIdentity(),
796 getContext(), this));
797 }
798 }
799
800
801
802
803
804
805
806
807 protected final void lockFrame()
808 {
809 synchronized (this)
810 {
811 while (this.eventFrameThread != null && !isEventFrameThread())
812 {
813 if (getLog().isDebugEnabled())
814 {
815 getLog().debug(
816 "Waiting for " + this.eventFrameThread
817 + " to complete event frame for " + this,
818 new Exception());
819 }
820 else
821 {
822 if (getLog().isInfoEnabled())
823 {
824 getLog().info(
825 "Waiting for " + this.eventFrameThread
826 + " to complete event frame for "
827 + toIdentityString());
828 }
829 }
830 try
831 {
832 this.wait(500);
833 }
834 catch (InterruptedException e)
835 {
836 getLog().debug(e);
837 }
838 }
839 this.eventFrameThread = Thread.currentThread();
840 this.eventFrameLockCount++;
841 }
842 }
843
844
845
846
847
848
849
850
851
852 protected IContainerDefinitionField getDefinition()
853 {
854 return getContext().getContainerFactory().getDefinition(getType());
855 }
856
857 @Override
858 protected boolean doReadState(IOperationScope scope, byte[] buffer,
859 int start, int numberOfBytes) throws Exception
860 {
861 lockFrame();
862 try
863 {
864 int[] headerStart = new int[1];
865 int[] headerLen = new int[1];
866 int[] dataStart = new int[1];
867 int[] dataLen = new int[1];
868
869
870 FrameReader.findHeaderAndDataBufferPositions(buffer, start,
871 headerStart, headerLen, dataStart, dataLen);
872
873 if (headerLen[0] == 0)
874 {
875
876 return true;
877 }
878 FieldReader.readIWFFieldSpecs(scope, buffer, headerStart[0],
879 headerLen[0], dataStart[0], dataLen[0], newReaderTask());
880 }
881 finally
882 {
883 unlockFrame();
884 }
885 return true;
886 }
887
888
889
890
891
892
893
894
895
896
897
898 protected ReaderTask newReaderTask()
899 {
900 return this.new ReaderTask();
901 }
902
903 @Override
904 protected boolean doWriteState(IOperationScope scope, IWireIdentity wireId,
905 byte[][] headerBuffer, int[] headerBufferPosition, byte[][] dataBuffer,
906 int[] dataBufferPosition, boolean completeState) throws Exception
907 {
908
909
910 if (isLocal())
911 {
912 lockFrame();
913 try
914 {
915 final Collection<IField> fieldsToWrite;
916 if (getDataState() == DataState.STALE)
917 {
918
919 fieldsToWrite = Collections.emptySet();
920 }
921 else
922 {
923 fieldsToWrite = getFieldsToWrite(completeState);
924 }
925 FrameWriter.writeHeaderAndData(this, fieldsToWrite,
926 getDefinition(), scope, headerBuffer, headerBufferPosition,
927 dataBuffer, dataBufferPosition, completeState);
928 }
929 catch (IllegalArgumentException e)
930 {
931 throw new IllegalArgumentException("Could not write field: "
932 + this, e);
933 }
934 finally
935 {
936 unlockFrame();
937 }
938 }
939 return true;
940 }
941
942
943
944
945
946
947
948
949
950
951 protected Collection<IField> getFieldsToWrite(boolean completeState)
952 {
953 return this.fields.values();
954 }
955
956
957
958
959
960
961
962 protected final boolean isEventFrameThread()
963 {
964 return this.eventFrameThread == Thread.currentThread();
965 }
966
967
968
969
970 protected void logNotTheEventFrameThread()
971 {
972 logException(getLog(), "The current thread is not the "
973 + "event frame thread, the event frame thread is "
974 + this.eventFrameThread + ". The current thread cannot execute "
975 + safeToString(this), new IllegalStateException());
976 }
977
978
979
980
981
982
983
984 final void unlockFrame()
985 {
986 synchronized (this)
987 {
988 if (isEventFrameThread())
989 {
990 this.eventFrameLockCount--;
991 if (this.eventFrameLockCount < 0)
992 {
993 this.eventFrameLockCount = 0;
994 }
995 if (this.eventFrameLockCount == 0)
996 {
997 this.eventFrameThread = null;
998 this.frameIdentifier.remove();
999 }
1000 this.notify();
1001 }
1002 else
1003 {
1004 logNotTheEventFrameThread();
1005 }
1006 }
1007 }
1008
1009
1010
1011
1012
1013 private final void checkIsLocalOrInFrameReaderContext()
1014 {
1015 if (!isLocal() && !FrameReader.inContext())
1016 {
1017 throw new IllegalStateException(
1018 "Only a thread running in a FrameReader context can run this method on remote container "
1019 + this);
1020 }
1021 checkClone();
1022 }
1023
1024 @Override
1025 public String toDetailedString()
1026 {
1027 return containerToFormattedString(super.toDetailedString(), true);
1028 }
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040 private String containerToFormattedString(String containerIdentity,
1041 boolean linePerField)
1042 {
1043 StringBuilder sb = new StringBuilder();
1044 List<String> identities = CollectionFactory.newList(this.fields.size());
1045
1046 Map<String, IField> copy = Collections.emptyMap();
1047 synchronized (this)
1048 {
1049 copy = CollectionFactory.newMap(this.fields);
1050 }
1051 if (isActive())
1052 {
1053 for (Entry<String, IField> entry : copy.entrySet())
1054 {
1055 try
1056 {
1057 if (!getDefinition().equals(entry.getValue()))
1058 {
1059 identities.add(entry.getKey());
1060 }
1061 }
1062 catch (Exception e)
1063 {
1064 logException(getLog(), entry, e);
1065 }
1066 }
1067 }
1068 Collections.sort(identities);
1069 if (linePerField)
1070 {
1071 sb.append(SystemUtils.LINE_SEPARATOR);
1072 }
1073 sb.append(OPEN_SQUARE);
1074 if (linePerField)
1075 {
1076 sb.append(SystemUtils.LINE_SEPARATOR).append(SPACING_4_CHARS);
1077 }
1078 sb.append("nativeContext=").append(getNativeContextIdentity());
1079 sb.append(", state=").append(getDataState());
1080 if (linePerField)
1081 {
1082 sb.append(SystemUtils.LINE_SEPARATOR);
1083 }
1084 if (identities.size() > 0)
1085 {
1086 for (String string : identities)
1087 {
1088 try
1089 {
1090 if (!linePerField)
1091 {
1092 sb.append(COMMA).append(SPACE);
1093 }
1094 else
1095 {
1096 sb.append(SPACING_4_CHARS);
1097 }
1098 sb.append(toString(copy.get(string)));
1099 if (linePerField)
1100 {
1101 sb.append(SystemUtils.LINE_SEPARATOR);
1102 }
1103 }
1104 catch (Exception e)
1105 {
1106 logException(getLog(), string, e);
1107 }
1108 }
1109 }
1110 sb.append(CLOSE_SQUARE);
1111 if (linePerField)
1112 {
1113 sb.append(SystemUtils.LINE_SEPARATOR);
1114 }
1115 return (linePerField ? SystemUtils.LINE_SEPARATOR : EMPTY_STRING)
1116 + containerIdentity + EQUALS + sb.toString();
1117 }
1118
1119
1120
1121
1122
1123
1124 protected ThreadLocal<IEventFrameExecution> getFrameIdentifier()
1125 {
1126 return this.frameIdentifier;
1127 }
1128
1129
1130
1131
1132
1133
1134
1135 protected Map<String, IField> getFields()
1136 {
1137 return this.fields;
1138 }
1139
1140
1141
1142
1143
1144 void setFields(Map<String, IField> fields)
1145 {
1146 this.fields = fields;
1147 }
1148
1149 private String toString(IField field)
1150 {
1151 return field.getIdentity() + EQUALS + field.getValueAsString();
1152 }
1153
1154 @Override
1155 public Object clone() throws CloneNotSupportedException
1156 {
1157 final AbstractContainer clone = (AbstractContainer) super.clone();
1158 clone.eventFrameThread = null;
1159 clone.fields = CollectionFactory.newMap(this.fields.size());
1160
1161 final Set<Entry<String, IField>> entrySet = this.fields.entrySet();
1162 for (Entry<String, IField> entry : entrySet)
1163 {
1164 try
1165 {
1166 clone.fields.put(entry.getKey(),
1167 ((IField) entry.getValue().clone()));
1168 }
1169 catch (Exception e)
1170 {
1171 logException(getLog(), entry, e);
1172 }
1173 }
1174 return clone;
1175 }
1176
1177 @Override
1178 public final String toString()
1179 {
1180 return containerToFormattedString(super.toString(), false);
1181 }
1182
1183 @Override
1184 public final int hashCode()
1185 {
1186 return super.hashCode();
1187 }
1188
1189 @Override
1190 public final boolean equals(Object obj)
1191 {
1192 return super.equals(obj);
1193 }
1194
1195
1196
1197
1198 protected void doCommitEvents()
1199 {
1200
1201 }
1202
1203 }