View Javadoc

1   /*
2      Copyright 2007 Ramon Servadei
3   
4      Licensed under the Apache License, Version 2.0 (the "License");
5      you may not use this file except in compliance with the License.
6      You may obtain a copy of the License at
7   
8          http://www.apache.org/licenses/LICENSE-2.0
9   
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15   */
16  package fulmine.model.container;
17  
18  import static fulmine.util.Utils.CLOSE_SQUARE;
19  import static fulmine.util.Utils.COMMA;
20  import static fulmine.util.Utils.EMPTY_STRING;
21  import static fulmine.util.Utils.EQUALS;
22  import static fulmine.util.Utils.OPEN_SQUARE;
23  import static fulmine.util.Utils.SPACE;
24  import static fulmine.util.Utils.SPACING_4_CHARS;
25  import static fulmine.util.Utils.logException;
26  import static fulmine.util.Utils.nullCheck;
27  import static fulmine.util.Utils.safeToString;
28  
29  import java.util.Collection;
30  import java.util.Collections;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.Set;
34  import java.util.Map.Entry;
35  
36  import org.apache.commons.lang.SystemUtils;
37  
38  import fulmine.IDomain;
39  import fulmine.IType;
40  import fulmine.context.IFrameworkContext;
41  import fulmine.event.EventFrameExecution;
42  import fulmine.event.IEventFrameExecution;
43  import fulmine.event.listener.IEventListener;
44  import fulmine.event.system.EventSourceNotObservedEvent;
45  import fulmine.event.system.EventSourceObservedEvent;
46  import fulmine.model.component.AbstractComponent;
47  import fulmine.model.container.events.ContainerCreatedEvent;
48  import fulmine.model.container.events.ContainerDestroyedEvent;
49  import fulmine.model.container.events.ContainerStateChangeEvent;
50  import fulmine.model.container.events.RemoteContainerCreatedEvent;
51  import fulmine.model.container.events.RemoteContainerDestroyedEvent;
52  import fulmine.model.field.BooleanField;
53  import fulmine.model.field.DoubleField;
54  import fulmine.model.field.FloatField;
55  import fulmine.model.field.IField;
56  import fulmine.model.field.IntegerField;
57  import fulmine.model.field.LongField;
58  import fulmine.model.field.StringField;
59  import fulmine.model.field.containerdefinition.IContainerDefinitionField;
60  import fulmine.protocol.specification.FieldReader;
61  import fulmine.protocol.specification.FrameReader;
62  import fulmine.protocol.specification.FrameWriter;
63  import fulmine.protocol.wire.IWireIdentity;
64  import fulmine.protocol.wire.operation.IOperationScope;
65  import fulmine.util.Utils;
66  import fulmine.util.collection.CollectionFactory;
67  import fulmine.util.log.AsyncLog;
68  
69  /**
70   * The base class for containers.
71   * <p>
72   * This implementation is thread safe. The the {@link #add(IField)} and
73   * {@link #remove(IField)} methods perform copy-on-write operations. The
74   * {@link Collection} for the fields uses the <a
75   * href="http://www.ibm.com/developerworks/java/library/j-jtp06197.html" >'cheap
76   * read-write lock'</a>
77   * <p>
78   * This implementation can read and write itself from and to a wire frame.
79   * However, when writing, it writes a complete image of itself (i.e. all its
80   * internal {@link IField} objects are written, regardless of any changes that
81   * have occurred). Sub-classes may override this to in a more efficient
82   * mechanism that only writes changed internal fields (deltas).
83   * 
84   * @author Ramon Servadei
85   * 
86   */
87  public abstract class AbstractContainer extends AbstractComponent implements
88      IContainer, Cloneable
89  {
90      private final static AsyncLog LOG = new AsyncLog(AbstractContainer.class);
91  
92      /**
93       * The reader task executed by the {@link FieldReader} when reading messages
94       */
95      protected class ReaderTask implements FieldReader.IFieldReaderTask
96      {
97          public void read(IOperationScope scope, int fieldId, byte[] dataBuffer,
98              int dataStart, int dataLen)
99          {
100             final AbstractContainer outer = AbstractContainer.this;
101             try
102             {
103                 IField field =
104                     outer.fields.get(getDefinition().getIdentityForWireCode(
105                         fieldId));
106                 boolean addToContainer = false;
107                 if (field == null)
108                 {
109                     // try to create it
110                     if (getDefinition().containsDefinition(fieldId))
111                     {
112                         field = getDefinition().createField(fieldId);
113                         addToContainer = true;
114                     }
115                 }
116                 if (field != null)
117                 {
118                     field.readState(scope, dataBuffer, dataStart, dataLen);
119                     if (addToContainer)
120                     {
121                         /*
122                          * Only add the field once it has read its state,
123                          * otherwise if we added the field simply when it was
124                          * created from the definition, it would have no value
125                          * and we'd raise a ContainerFieldAddedEvent with the
126                          * field but no value - pretty useless really.
127                          */
128                         add(field);
129                     }
130                 }
131                 else
132                 {
133                     /*
134                      * If this is a remote container and has been destroyed, we
135                      * may receive delayed wire messages
136                      */
137                     if (getLog().isInfoEnabled())
138                     {
139                         getLog().info(
140                             "No field found for field id="
141                                 + fieldId
142                                 + ", container="
143                                 + outer.toIdentityString()
144                                 + ", fields="
145                                 + outer.fields
146                                 + ". This might be a delayed wire frame received "
147                                 + "AFTER the remote container was destroyed.");
148                     }
149                 }
150             }
151             catch (Exception e)
152             {
153                 // todo request a retransmission
154                 outer.getContext().requestRetransmit(
155                     outer.getNativeContextIdentity(), outer.getIdentity(),
156                     outer.getType(), outer.getDomain());
157                 throw new IllegalStateException(
158                     "Could not read state for field id=" + fieldId
159                         + ", container=" + outer.toIdentityString()
160                         + ", fields=" + outer.fields
161                         + ". Has an image message been missed? "
162                         + "Automatically requesting a retransmission.", e);
163             }
164         }
165 
166         public void read(IOperationScope scope, String fieldId,
167             byte[] dataBuffer, int dataStart, int dataLen)
168         {
169             throw new IllegalStateException(
170                 "reading SWF fields is invalid for a container");
171         }
172 
173     }
174 
175     /**
176      * Tracks the count of remote subscriptions to this instance. Uses the <a
177      * href="http://www.ibm.com/developerworks/java/library/j-jtp06197.html"
178      * >'cheap read-write lock'</a>
179      */
180     private volatile byte remoteSubscriptionCount;
181 
182     /**
183      * A thread local variable to hold the {@link IEventFrameExecution} of the
184      * current event frame. This is bound to the thread executing the event
185      * frame.
186      */
187     private final ThreadLocal<IEventFrameExecution> frameIdentifier =
188         new ThreadLocal<IEventFrameExecution>();
189 
190     /**
191      * The current thread executing an event frame. Uses the <a
192      * href="http://www.ibm.com/developerworks/java/library/j-jtp06197.html"
193      * >'cheap read-write lock'</a>
194      */
195     volatile Thread eventFrameThread;
196 
197     /** Tracks multiple calls to {@link #lockFrame()} */
198     private byte eventFrameLockCount;
199 
200     /**
201      * The {@link IField} objects this container holds, indexed by their
202      * {@link String} identity. Uses the <a
203      * href="http://www.ibm.com/developerworks/java/library/j-jtp06197.html"
204      * >'cheap read-write lock'</a>
205      */
206     private volatile Map<String, IField> fields;
207 
208     /**
209      * The state (validity) of the container. Uses the <a
210      * href="http://www.ibm.com/developerworks/java/library/j-jtp06197.html"
211      * >'cheap read-write lock'</a>
212      */
213     private volatile DataState dataState;
214 
215     /**
216      * The name of the context this container is native to. If the container is
217      * local, its native context will be the same as the hosted context
218      * identity.
219      */
220     private final String nativeContextIdentity;
221 
222     /** The context hosting this container instance */
223     private final IFrameworkContext hostContext;
224 
225     /** Whether the container is local to the host context */
226     private final boolean local;
227 
228     /**
229      * Standard constructor. The container is marked as {@link IContainer#LIVE
230      * LIVE}. If a {@link EventThread} is constructing this then the container
231      * is a remote container.
232      * 
233      * @param nativeContextIdentity
234      *            the name of the context this container is native to - the name
235      *            of its local context
236      * @param identity
237      *            the identity of the container
238      * @param type
239      *            the type of the container
240      * @param domain
241      *            the domain for the container
242      * @param hostContext
243      *            the context hosting this container instance
244      * @param local
245      *            <code>true</code> the container is local to this context
246      * @see ContainerFactory
247      */
248     public AbstractContainer(String nativeContextIdentity, String identity,
249         IType type, IDomain domain, IFrameworkContext hostContext, boolean local)
250     {
251         // only an abstract container is an event source
252         // note: we do not use the 0th (zeroth) processor as this is used for
253         // the system events
254         super(
255             identity,
256             type,
257             domain,
258             // bind the container type to an event processor
259             (byte) ((type.value() % (hostContext.getEventProcessorCount() - 1)) + 1));
260         this.fields = CollectionFactory.newMap(1);
261         this.dataState = DataState.LIVE;
262         nullCheck(hostContext, "A hosted context is required");
263         Utils.nullCheck(nativeContextIdentity, "The native context is required");
264         this.hostContext = hostContext;
265         this.nativeContextIdentity = nativeContextIdentity;
266         this.local = local;
267         // is this a local container
268         if (!isLocal())
269         {
270             this.dataState = DataState.STALE;
271         }
272         getContext().addContainer(this);
273     }
274 
275     @Override
276     protected AsyncLog getLog()
277     {
278         return LOG;
279     }
280 
281     @Override
282     protected final void doStart()
283     {
284         super.doStart();
285         getDefinition().populate(this);
286         if (isLocal())
287         {
288             getContext().queueEvent(
289                 new ContainerCreatedEvent(getContext(), this));
290         }
291         else
292         {
293             /*
294              * for a remote container, notify the listeners synchronously...
295              * this is so that any subscriptions are registered against the
296              * remote container BEFORE any remote changes are then processed.
297              */
298             final List<IEventListener> rcceListeners =
299                 getContext().getSystemEventSource(
300                     RemoteContainerCreatedEvent.class).getListeners();
301             for (IEventListener eventListener : rcceListeners)
302             {
303                 eventListener.update(new RemoteContainerCreatedEvent(
304                     getNativeContextIdentity(), getContext(), this));
305             }
306         }
307     }
308 
309     @Override
310     protected void doComponentDestroy()
311     {
312         if (getLog().isInfoEnabled())
313         {
314             getLog().info("Destroying " + getAddressable());
315         }
316         // ensure we don't interfere with another thread doing some work
317         boolean clearFrameReaderContext = false;
318         if (!FrameReader.inContext())
319         {
320             clearFrameReaderContext = true;
321             FrameReader.inContext.set(Boolean.TRUE);
322         }
323         try
324         {
325             beginFrame(new EventFrameExecution("destroy " + getAddress()));
326             try
327             {
328                 super.doComponentDestroy();
329                 setState(DataState.STALE);
330                 final Collection<IField> values;
331                 // copy-on-write to 'clear' the field map
332                 synchronized (this)
333                 {
334                     values = CollectionFactory.newList(this.fields.values());
335                     this.fields = Collections.emptyMap();
336                 }
337                 for (IField field : values)
338                 {
339                     try
340                     {
341                         field.destroy();
342                     }
343                     catch (Exception e)
344                     {
345                         logException(getLog(), field, e);
346                     }
347                 }
348                 if (getContext() != null && getContext().isActive())
349                 {
350                     if (isLocal())
351                     {
352                         getContext().queueEvent(
353                             new ContainerDestroyedEvent(getContext(), this));
354                     }
355                     else
356                     {
357                         getContext().queueEvent(
358                             new RemoteContainerDestroyedEvent(
359                                 getNativeContextIdentity(), getContext(), this));
360                     }
361                     // finally remove the container from the context
362                     getContext().removeContainer(this);
363                 }
364             }
365             finally
366             {
367                 endFrame();
368             }
369         }
370         finally
371         {
372             if (clearFrameReaderContext)
373             {
374                 FrameReader.inContext.remove();
375             }
376         }
377     }
378 
379     public final boolean isRemote()
380     {
381         return !isLocal();
382     }
383 
384     public final IFrameworkContext getContext()
385     {
386         return this.hostContext;
387     }
388 
389     public final String getNativeContextIdentity()
390     {
391         return this.nativeContextIdentity;
392     }
393 
394     public final void add(IField field)
395     {
396         checkIsLocalOrInFrameReaderContext();
397         doAdd(field);
398     }
399 
400     void doAdd(IField field)
401     {
402         // copy-on-add and ensure atomic operation
403         synchronized (this)
404         {
405             beforeAdd(field);
406             final Map<String, IField> copy =
407                 CollectionFactory.newMap(this.fields);
408             copy.put(field.getIdentity(), field);
409             this.fields = copy;
410             afterAdd(field);
411         }
412         field.addedToContainer(this);
413     }
414 
415     /**
416      * Called before the field is added. This is called whilst holding the
417      * monitor lock for adding/removing fields.
418      * 
419      * @param field
420      *            the field to add
421      */
422     protected void beforeAdd(IField field)
423     {
424     }
425 
426     /**
427      * Called after the field is added and the copy-on-write operation has
428      * completed. This is called whilst holding the monitor lock for
429      * adding/removing fields.
430      * 
431      * @param field
432      *            the field to add
433      */
434     protected void afterAdd(IField field)
435     {
436     }
437 
438     public final <T extends IField> T remove(T field)
439     {
440         checkIsLocalOrInFrameReaderContext();
441         return doRemove(field);
442     }
443 
444     @SuppressWarnings("unchecked")
445     <T extends IField> T doRemove(T field)
446     {
447         IField removed = null;
448         // copy-on-remove and ensure atomic operation
449         synchronized (this)
450         {
451             beforeRemove(field);
452             final Map<String, IField> copy =
453                 CollectionFactory.newMap(this.fields);
454             removed = copy.remove(field.getIdentity());
455             this.fields = copy;
456             afterRemove(field, removed);
457         }
458         field.removedFromContainer(this);
459         return (T) removed;
460     }
461 
462     /**
463      * Called before the field is removed. This is called whilst holding the
464      * monitor lock for adding/removing fields.
465      * 
466      * @param field
467      *            the field to remove
468      */
469     protected void beforeRemove(IField field)
470     {
471     }
472 
473     /**
474      * Called after the field is removed and the copy-on-write operation has
475      * completed. This is called whilst holding the monitor lock for
476      * adding/removing fields.
477      * 
478      * @param field
479      *            the field to remove
480      * @param removed
481      *            the field that has been removed - if <code>null</code> then
482      *            the field was not removed (possibly because it was not in the
483      *            fields)
484      */
485     protected void afterRemove(IField field, IField removed)
486     {
487 
488     }
489 
490     public final boolean isEmpty()
491     {
492         return this.fields.isEmpty();
493     }
494 
495     public final boolean contains(String key)
496     {
497         return this.fields.containsKey(key);
498     }
499 
500     public final int size()
501     {
502         return this.fields.size();
503     }
504 
505     @SuppressWarnings("unchecked")
506     public final <T extends IField> T get(String identity)
507     {
508         final IField field = this.fields.get(identity);
509         return (T) field;
510     }
511 
512     public final String[] getComponentIdentities()
513     {
514         final Set<String> keySet = this.fields.keySet();
515         return keySet.toArray(new String[keySet.size()]);
516     }
517 
518     public final void beginFrame(IEventFrameExecution frame)
519     {
520         if (isEventFrameThread())
521         {
522             throw new IllegalStateException(
523                 "Cannot start a new frame before completing an existing one");
524         }
525         checkIsLocalOrInFrameReaderContext();
526         lockFrame();
527         // set the frame identifier
528         this.frameIdentifier.set(frame);
529     }
530 
531     public final void endFrame()
532     {
533         checkIsLocalOrInFrameReaderContext();
534         if (isEventFrameThread())
535         {
536             try
537             {
538                 doCommitEvents();
539             }
540             finally
541             {
542                 unlockFrame();
543             }
544         }
545         else
546         {
547             logNotTheEventFrameThread();
548         }
549     }
550 
551     public final void flushFrame()
552     {
553         if (!isEventFrameThread())
554         {
555             beginFrame(new EventFrameExecution());
556         }
557         if (isEventFrameThread())
558         {
559             endFrame();
560         }
561         else
562         {
563             logNotTheEventFrameThread();
564         }
565     }
566 
567     public final boolean isFrameActive()
568     {
569         return this.eventFrameThread != null;
570     }
571 
572     public final void setState(DataState stateCode)
573     {
574         DataState oldState = this.dataState;
575         boolean update = oldState != stateCode;
576         synchronized (this)
577         {
578             this.dataState = stateCode;
579         }
580         if (update)
581         {
582             doStateChangeOp(oldState);
583             /*
584              * When triggering an update, we need to masquerade as being in a
585              * frame reader context if this is a remote container
586              */
587             if (!isEventFrameThread())
588             {
589                 boolean clearFrameReaderContext = false;
590                 if (!FrameReader.inContext())
591                 {
592                     clearFrameReaderContext = true;
593                     FrameReader.inContext.set(Boolean.TRUE);
594                 }
595                 try
596                 {
597                     // trigger the update
598                     flushFrame();
599                 }
600                 finally
601                 {
602                     if (clearFrameReaderContext)
603                     {
604                         FrameReader.inContext.remove();
605                     }
606                 }
607             }
608         }
609     }
610 
611     /**
612      * Perform the operation when the {@link #dataState} changes.
613      * 
614      * @param oldState
615      *            the old state
616      */
617     protected void doStateChangeOp(DataState oldState)
618     {
619         if (getContext() != null && getContext().isActive())
620         {
621             getContext().queueEvent(
622                 new ContainerStateChangeEvent(getContext(), this,
623                     this.dataState, oldState));
624         }
625     }
626 
627     public final DataState getDataState()
628     {
629         return this.dataState;
630     }
631 
632     public boolean isDynamic()
633     {
634         return false;
635     }
636 
637     public final boolean isLocal()
638     {
639         return this.local;
640     }
641 
642     public int markForRemoteSubscription()
643     {
644         int count = 0;
645         // uses the 'cheap read-write lock'
646         // http://www.ibm.com/developerworks/java/library/j-jtp06197.html
647         synchronized (this)
648         {
649             this.remoteSubscriptionCount++;
650             count = this.remoteSubscriptionCount;
651         }
652         if (getLog().isInfoEnabled())
653         {
654             getLog().info(
655                 "[mark] remote subscription count is " + count + " for "
656                     + toIdentityString());
657 
658         }
659         return count;
660     }
661 
662     public int unmarkForRemoteSubscription()
663     {
664         int count = 0;
665         // uses the 'cheap read-write lock'
666         // http://www.ibm.com/developerworks/java/library/j-jtp06197.html
667         synchronized (this)
668         {
669             this.remoteSubscriptionCount--;
670             if (this.remoteSubscriptionCount < 0)
671             {
672                 this.remoteSubscriptionCount = 0;
673             }
674             count = this.remoteSubscriptionCount;
675         }
676         if (getLog().isInfoEnabled())
677         {
678             getLog().info(
679                 "[unmark] remote subscription count is " + count + " for "
680                     + toIdentityString());
681         }
682         return count;
683     }
684 
685     public int getRemoteSubscriptionCount()
686     {
687         return this.remoteSubscriptionCount;
688     }
689 
690     /**
691      * Get the {@link BooleanField} identified by the identity
692      * 
693      * @param identity
694      *            the identity of the field to get
695      * @return the {@link BooleanField} for the identity
696      * @throws ClassCastException
697      *             if the field is not of this type
698      */
699     public BooleanField getBooleanField(String identity)
700     {
701         return get(identity);
702     }
703 
704     /**
705      * Get the {@link StringField} identified by the identity
706      * 
707      * @param identity
708      *            the identity of the field to get
709      * @return the {@link StringField} for the identity
710      * @throws ClassCastException
711      *             if the field is not of this type
712      */
713     public StringField getStringField(String identity)
714     {
715         return get(identity);
716     }
717 
718     /**
719      * Get the {@link IntegerField} identified by the identity
720      * 
721      * @param identity
722      *            the identity of the field to get
723      * @return the {@link IntegerField} for the identity
724      * @throws ClassCastException
725      *             if the field is not of this type
726      */
727     public IntegerField getIntegerField(String identity)
728     {
729         return get(identity);
730     }
731 
732     /**
733      * Get the {@link LongField} identified by the identity
734      * 
735      * @param identity
736      *            the identity of the field to get
737      * @return the {@link LongField} for the identity
738      * @throws ClassCastException
739      *             if the field is not of this type
740      */
741     public LongField getLongField(String identity)
742     {
743         return get(identity);
744     }
745 
746     /**
747      * Get the {@link FloatField} identified by the identity
748      * 
749      * @param identity
750      *            the identity of the field to get
751      * @return the {@link FloatField} for the identity
752      * @throws ClassCastException
753      *             if the field is not of this type
754      */
755     public FloatField getFloatField(String identity)
756     {
757         return get(identity);
758     }
759 
760     /**
761      * Get the {@link DoubleField} identified by the identity
762      * 
763      * @param identity
764      *            the identity of the field to get
765      * @return the {@link DoubleField} for the identity
766      * @throws ClassCastException
767      *             if the field is not of this type
768      */
769     public DoubleField getDoubleField(String identity)
770     {
771         return get(identity);
772     }
773 
774     @Override
775     protected final void doPostAddListener(IEventListener listener)
776     {
777         super.doPostAddListener(listener);
778         // trigger the event source observed event when the first listener is
779         // added
780         if (getListeners().size() == 1)
781         {
782             getContext().queueEvent(
783                 new EventSourceObservedEvent(getContext(), this));
784         }
785     }
786 
787     @Override
788     protected final void doPostRemoveListener(IEventListener listener)
789     {
790         super.doPostRemoveListener(listener);
791         // the context can be null if this container has been removed
792         if (getListeners().size() == 0 && getContext() != null)
793         {
794             getContext().queueEvent(
795                 new EventSourceNotObservedEvent(getNativeContextIdentity(),
796                     getContext(), this));
797         }
798     }
799 
800     /**
801      * Lock the event frame. This sets the {@link #eventFrameThread} to the
802      * current thread. If there already is an event frame thread, this method
803      * loops until the frame is unlocked.
804      * 
805      * @see #unlockFrame()
806      */
807     protected final void lockFrame()
808     {
809         synchronized (this)
810         {
811             while (this.eventFrameThread != null && !isEventFrameThread())
812             {
813                 if (getLog().isDebugEnabled())
814                 {
815                     getLog().debug(
816                         "Waiting for " + this.eventFrameThread
817                             + " to complete event frame for " + this,
818                         new Exception());
819                 }
820                 else
821                 {
822                     if (getLog().isInfoEnabled())
823                     {
824                         getLog().info(
825                             "Waiting for " + this.eventFrameThread
826                                 + " to complete event frame for "
827                                 + toIdentityString());
828                     }
829                 }
830                 try
831                 {
832                     this.wait(500);
833                 }
834                 catch (InterruptedException e)
835                 {
836                     getLog().debug(e);
837                 }
838             }
839             this.eventFrameThread = Thread.currentThread();
840             this.eventFrameLockCount++;
841         }
842     }
843 
844     /**
845      * Get the definition for this container. This effectively provides the
846      * 'type' for the this container.
847      * 
848      * @return the {@link IContainerDefinitionField} that defines the
849      *         {@link IField} objects (fields) of this container.
850      * 
851      */
852     protected IContainerDefinitionField getDefinition()
853     {
854         return getContext().getContainerFactory().getDefinition(getType());
855     }
856 
857     @Override
858     protected boolean doReadState(IOperationScope scope, byte[] buffer,
859         int start, int numberOfBytes) throws Exception
860     {
861         lockFrame();
862         try
863         {
864             int[] headerStart = new int[1];
865             int[] headerLen = new int[1];
866             int[] dataStart = new int[1];
867             int[] dataLen = new int[1];
868 
869             // find out how big the header buffer and data buffer are
870             FrameReader.findHeaderAndDataBufferPositions(buffer, start,
871                 headerStart, headerLen, dataStart, dataLen);
872 
873             if (headerLen[0] == 0)
874             {
875                 // an empty frame or meta data for the container
876                 return true;
877             }
878             FieldReader.readIWFFieldSpecs(scope, buffer, headerStart[0],
879                 headerLen[0], dataStart[0], dataLen[0], newReaderTask());
880         }
881         finally
882         {
883             unlockFrame();
884         }
885         return true;
886     }
887 
888     /**
889      * Get the reader task to use during the
890      * {@link #doReadState(IOperationScope, byte[], int, int)} method. The
891      * reader task is executed for each field spec found during the read state
892      * operation. The task implementation is responsible for
893      * constructing/reading the {@link IField} expressed in the field spec.
894      * 
895      * @return the {@link ReaderTask} to invoke for each field spec found during
896      *         the read state operation.
897      */
898     protected ReaderTask newReaderTask()
899     {
900         return this.new ReaderTask();
901     }
902 
903     @Override
904     protected boolean doWriteState(IOperationScope scope, IWireIdentity wireId,
905         byte[][] headerBuffer, int[] headerBufferPosition, byte[][] dataBuffer,
906         int[] dataBufferPosition, boolean completeState) throws Exception
907     {
908         // only local entities are written to the wire (unidirectional changes)
909         // remote entities are read-only
910         if (isLocal())
911         {
912             lockFrame();
913             try
914             {
915                 final Collection<IField> fieldsToWrite;
916                 if (getDataState() == DataState.STALE)
917                 {
918                     // no changes should be sent in this state
919                     fieldsToWrite = Collections.emptySet();
920                 }
921                 else
922                 {
923                     fieldsToWrite = getFieldsToWrite(completeState);
924                 }
925                 FrameWriter.writeHeaderAndData(this, fieldsToWrite,
926                     getDefinition(), scope, headerBuffer, headerBufferPosition,
927                     dataBuffer, dataBufferPosition, completeState);
928             }
929             catch (IllegalArgumentException e)
930             {
931                 throw new IllegalArgumentException("Could not write field: "
932                     + this, e);
933             }
934             finally
935             {
936                 unlockFrame();
937             }
938         }
939         return true;
940     }
941 
942     /**
943      * Get the fields to write to a frame. This occurs whilst the event frame is
944      * locked. This method allows sub-classes to inspect and mutate the
945      * {@link #changedFields} field prior to being written to a frame.
946      * 
947      * @param completeState
948      * 
949      * @return the fields to write into a frame
950      */
951     protected Collection<IField> getFieldsToWrite(boolean completeState)
952     {
953         return this.fields.values();
954     }
955 
956     /**
957      * Is the executing thread the event frame thread.
958      * 
959      * @return <code>true</code> if the executing thread is the event frame
960      *         thread.
961      */
962     protected final boolean isEventFrameThread()
963     {
964         return this.eventFrameThread == Thread.currentThread();
965     }
966 
967     /**
968      * Logs an exception and the current event frame thread.
969      */
970     protected void logNotTheEventFrameThread()
971     {
972         logException(getLog(), "The current thread is not the "
973             + "event frame thread, the event frame thread is "
974             + this.eventFrameThread + ". The current thread cannot execute "
975             + safeToString(this), new IllegalStateException());
976     }
977 
978     /**
979      * Unlock the event frame. This clears the {@link #eventFrameThread} and
980      * {@link #frameIdentifier}.
981      * 
982      * @see #lockFrame()
983      */
984     final void unlockFrame()
985     {
986         synchronized (this)
987         {
988             if (isEventFrameThread())
989             {
990                 this.eventFrameLockCount--;
991                 if (this.eventFrameLockCount < 0)
992                 {
993                     this.eventFrameLockCount = 0;
994                 }
995                 if (this.eventFrameLockCount == 0)
996                 {
997                     this.eventFrameThread = null;
998                     this.frameIdentifier.remove();
999                 }
1000                 this.notify();
1001             }
1002             else
1003             {
1004                 logNotTheEventFrameThread();
1005             }
1006         }
1007     }
1008 
1009     /**
1010      * Check the container is local or the thread is in a {@link FrameReader}
1011      * context.
1012      */
1013     private final void checkIsLocalOrInFrameReaderContext()
1014     {
1015         if (!isLocal() && !FrameReader.inContext())
1016         {
1017             throw new IllegalStateException(
1018                 "Only a thread running in a FrameReader context can run this method on remote container "
1019                     + this);
1020         }
1021         checkClone();
1022     }
1023 
1024     @Override
1025     public String toDetailedString()
1026     {
1027         return containerToFormattedString(super.toDetailedString(), true);
1028     }
1029 
1030     /**
1031      * Return a nicely formatted string with the container fields printed using
1032      * their {@link AbstractComponent#toSimpleString()}
1033      * 
1034      * @param containerIdentity
1035      *            the identity string to use for the container
1036      * @param linePerField
1037      *            whether each field should be on a new line
1038      * @return a formatted string
1039      */
1040     private String containerToFormattedString(String containerIdentity,
1041         boolean linePerField)
1042     {
1043         StringBuilder sb = new StringBuilder();
1044         List<String> identities = CollectionFactory.newList(this.fields.size());
1045         // iterate over a copy
1046         Map<String, IField> copy = Collections.emptyMap();
1047         synchronized (this)
1048         {
1049             copy = CollectionFactory.newMap(this.fields);
1050         }
1051         if (isActive())
1052         {
1053             for (Entry<String, IField> entry : copy.entrySet())
1054             {
1055                 try
1056                 {
1057                     if (!getDefinition().equals(entry.getValue()))
1058                     {
1059                         identities.add(entry.getKey());
1060                     }
1061                 }
1062                 catch (Exception e)
1063                 {
1064                     logException(getLog(), entry, e);
1065                 }
1066             }
1067         }
1068         Collections.sort(identities);
1069         if (linePerField)
1070         {
1071             sb.append(SystemUtils.LINE_SEPARATOR);
1072         }
1073         sb.append(OPEN_SQUARE);
1074         if (linePerField)
1075         {
1076             sb.append(SystemUtils.LINE_SEPARATOR).append(SPACING_4_CHARS);
1077         }
1078         sb.append("nativeContext=").append(getNativeContextIdentity());
1079         sb.append(", state=").append(getDataState());
1080         if (linePerField)
1081         {
1082             sb.append(SystemUtils.LINE_SEPARATOR);
1083         }
1084         if (identities.size() > 0)
1085         {
1086             for (String string : identities)
1087             {
1088                 try
1089                 {
1090                     if (!linePerField)
1091                     {
1092                         sb.append(COMMA).append(SPACE);
1093                     }
1094                     else
1095                     {
1096                         sb.append(SPACING_4_CHARS);
1097                     }
1098                     sb.append(toString(copy.get(string)));
1099                     if (linePerField)
1100                     {
1101                         sb.append(SystemUtils.LINE_SEPARATOR);
1102                     }
1103                 }
1104                 catch (Exception e)
1105                 {
1106                     logException(getLog(), string, e);
1107                 }
1108             }
1109         }
1110         sb.append(CLOSE_SQUARE);
1111         if (linePerField)
1112         {
1113             sb.append(SystemUtils.LINE_SEPARATOR);
1114         }
1115         return (linePerField ? SystemUtils.LINE_SEPARATOR : EMPTY_STRING)
1116             + containerIdentity + EQUALS + sb.toString();
1117     }
1118 
1119     /**
1120      * @return A thread local variable to hold the {@link IEventFrameExecution}
1121      *         of the current event frame. This is bound to the thread executing
1122      *         the event frame.
1123      */
1124     protected ThreadLocal<IEventFrameExecution> getFrameIdentifier()
1125     {
1126         return this.frameIdentifier;
1127     }
1128 
1129     /**
1130      * @return The {@link IField} objects this container holds, indexed by their
1131      *         {@link String} identity. Uses the <a
1132      *         href="http://www.ibm.com/developerworks/java/library/j-jtp06197.html"
1133      *         >'cheap read-write lock'</a>
1134      */
1135     protected Map<String, IField> getFields()
1136     {
1137         return this.fields;
1138     }
1139 
1140     /**
1141      * @see #getFields()
1142      * @param fields
1143      */
1144     void setFields(Map<String, IField> fields)
1145     {
1146         this.fields = fields;
1147     }
1148 
1149     private String toString(IField field)
1150     {
1151         return field.getIdentity() + EQUALS + field.getValueAsString();
1152     }
1153 
1154     @Override
1155     public Object clone() throws CloneNotSupportedException
1156     {
1157         final AbstractContainer clone = (AbstractContainer) super.clone();
1158         clone.eventFrameThread = null;
1159         clone.fields = CollectionFactory.newMap(this.fields.size());
1160         // clone the fields
1161         final Set<Entry<String, IField>> entrySet = this.fields.entrySet();
1162         for (Entry<String, IField> entry : entrySet)
1163         {
1164             try
1165             {
1166                 clone.fields.put(entry.getKey(),
1167                     ((IField) entry.getValue().clone()));
1168             }
1169             catch (Exception e)
1170             {
1171                 logException(getLog(), entry, e);
1172             }
1173         }
1174         return clone;
1175     }
1176 
1177     @Override
1178     public final String toString()
1179     {
1180         return containerToFormattedString(super.toString(), false);
1181     }
1182 
1183     @Override
1184     public final int hashCode()
1185     {
1186         return super.hashCode();
1187     }
1188 
1189     @Override
1190     public final boolean equals(Object obj)
1191     {
1192         return super.equals(obj);
1193     }
1194 
1195     /**
1196      * Perform the operation to commit all events
1197      */
1198     protected void doCommitEvents()
1199     {
1200         // noop
1201     }
1202 
1203 }