1   /*
2    Copyright 2007 Ramon Servadei
3   
4    Licensed under the Apache License, Version 2.0 (the "License");
5    you may not use this file except in compliance with the License.
6    You may obtain a copy of the License at
7   
8    http://www.apache.org/licenses/LICENSE-2.0
9   
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15   */
16  package fulmine.model.container;
17  
18  import static fulmine.util.Utils.logException;
19  
20  import java.util.Collection;
21  import java.util.Collections;
22  import java.util.List;
23  import java.util.Set;
24  
25  import fulmine.IDomain;
26  import fulmine.IType;
27  import fulmine.context.FulmineContext;
28  import fulmine.context.IFrameworkContext;
29  import fulmine.event.EventProcessor;
30  import fulmine.event.IEvent;
31  import fulmine.event.IEventFrame;
32  import fulmine.event.ImageEvent;
33  import fulmine.event.listener.IEventListener;
34  import fulmine.event.system.ISystemEventListener;
35  import fulmine.event.system.TxEvent;
36  import fulmine.model.component.AbstractComponent;
37  import fulmine.model.field.IField;
38  import fulmine.protocol.wire.IWireIdentity;
39  import fulmine.protocol.wire.operation.IOperationScope;
40  import fulmine.util.collection.CoalescingCollection;
41  import fulmine.util.collection.CollectionFactory;
42  import fulmine.util.collection.OrderedCoalescingCollection;
43  import fulmine.util.log.AsyncLog;
44  
45  /**
46   * An extended {@link AbstractContainer} that includes logic to handle event
47   * processing.
48   * <p>
49   * This implementation is thread safe. Events received by
50   * {@link #addEvent(IEvent)} are cloned (if they are an
51   * {@link AbstractCloneableComponent}) and cached in a local collection until
52   * the event frame is flushed (see {@link IEventFrame}). <u>The event sinking
53   * process is thread safe</u>. If any of the events are fields of this
54   * container, they are marked for writing when the
55   * {@link #writeState(IOperationScope, IWireIdentity, byte[][], int[], byte[][], int[], boolean)}
56   * method executes. This provides a delta change mechanism that overrides the
57   * {@link AbstractContainer} implementation that can only write the complete
58   * image.
59   * <p>
60   * Changes to the container are only notified to {@link IEventListener} objects
61   * when the events are processed via {@link #endFrame()}. When the events are
62   * processed, the container also raises itself as an event. However, the
63   * container raises a clone of itself. The cloned container will be a deep clone
64   * so will represent the state of the container at the moment in time when the
65   * event occurs.
66   * <p>
67   * The {@link #endFrame()} also raises a {@link TxEvent} containing the
68   * <code>byte[]</code> to with the changes send to remote context instances.
69   * This is a system event so only {@link ISystemEventListener} instances will
70   * receive these.
71   * <p>
72   * Events are raised via the {@link FulmineContext#queueEvents(Collection)}
73   * method.
74   * 
75   * @author Ramon Servadei
76   */
77  public abstract class AbstractEventProcessingContainer extends
78      AbstractContainer implements Cloneable
79  {
80      private final static AsyncLog LOG =
81          new AsyncLog(AbstractEventProcessingContainer.class);
82  
83      /**
84       * The events occurring in an event frame. The events are ordered and
85       * coalesced. Uses the <a
86       * href="http://www.ibm.com/developerworks/java/library/j-jtp06197.html"
87       * >'cheap read-write lock'</a>
88       */
89      protected volatile Collection<IEvent> events;
90  
91      /**
92       * The fields that have changed as a result of events. The collection is
93       * flushed when
94       * {@link #doWriteState(IOperationScope, byte[][], int[], byte[][], int[])}
95       * executes.
96       */
97      Collection<IField> changedFields;
98  
99      /**
100      * Standard constructor.
101      * 
102      * @param nativeContextIdentity
103      *            whether the container is local to this context
104      * @param identity
105      *            the identity of the container
106      * @param type
107      *            the type of the container
108      * @param domain
109      *            the domain for the container
110      * @param hostContext
111      *            the context hosting this container instance
112      * @param local
113      *            <code>true</code> the container is local to this context
114      */
115     public AbstractEventProcessingContainer(String nativeContextIdentity,
116         String identity, IType type, IDomain domain,
117         IFrameworkContext hostContext, boolean local)
118     {
119         super(nativeContextIdentity, identity, type, domain, hostContext, local);
120         this.changedFields = new CoalescingCollection<IField>(2);
121         this.events = new OrderedCoalescingCollection<IEvent>(2);
122     }
123 
124     /**
125      * This is thread safe - only the event frame can execute this.
126      * 
127      * @see IEventFrame#endFrame()
128      */
129     protected final void doCommitEvents()
130     {
131         // Clear the changes as these are added from the events.
132         this.changedFields.clear();
133 
134         Collection<IEvent> copy;
135         synchronized (this.events)
136         {
137             copy = this.events;
138             this.events = new OrderedCoalescingCollection<IEvent>(2);
139         }
140 
141         if (copy.size() > 0)
142         {
143             boolean writeImage = false;
144             List<IEvent> notify = CollectionFactory.newList(copy.size());
145             for (IEvent event : copy)
146             {
147                 try
148                 {
149                     /*
150                      * local containers write changes out and remote containers
151                      * get changes written in, so both need to be able to
152                      * indicate which fields have changed
153                      */
154                     if (event.getSource() instanceof IField)
155                     {
156                         final IField field = (IField) event.getSource();
157                         if (this.equals(field.getContainer()))
158                         {
159                             this.changedFields.add(field);
160                         }
161                     }
162                     // an image needs to re-write the fields as changes
163                     if (event instanceof ImageEvent)
164                     {
165                         writeImage = true;
166                         // add all fields
167                         final Set<String> keySet = getFields().keySet();
168                         for (String fieldName : keySet)
169                         {
170                             this.changedFields.add(getFields().get(fieldName));
171                         }
172                         continue;
173                     }
174                     // only add to the queue if there are listeners...
175                     // getListeners() is final on the AbstractComponent, so
176                     // no chance of an NPE here
177                     if (event.getSource().getListeners().size() > 0)
178                     {
179                         /*
180                          * the events are cloned fields so the field data value
181                          * in a notified event frame is isolated to the frame
182                          * i.e. its a clone so changing its value doesn't affect
183                          * the original
184                          */
185                         if (event instanceof AbstractComponent)
186                         {
187                             ((AbstractComponent) event).setFrame(getFrameIdentifier().get());
188                         }
189                         notify.add(event);
190                     }
191                 }
192                 catch (Exception e)
193                 {
194                     logException(getLog(), event, e);
195                 }
196             }
197             // notify event listeners with the clone of this so that the I/O
198             // framework can safely write the current state of this frame.
199             try
200             {
201                 // only add to the queue if there are listeners
202                 AbstractEventProcessingContainer clone = null;
203                 if (getListeners() != null)
204                 {
205                     clone = (AbstractEventProcessingContainer) clone();
206                     notify.add(clone);
207                 }
208                 // add the wire-state to transmit
209                 if (getRemoteSubscriptionCount() > 0)
210                 {
211                     final long start = System.nanoTime();
212                     final byte[] frame =
213                         writeImage
214                             ? getContext().getFrameWriter().writeComplete(this)
215                             : getContext().getFrameWriter().write(this);
216                     final long createTime = (System.nanoTime() - start);
217                     notify.add(new TxEvent(this, frame,
218                         getFrameIdentifier().get(),
219                         EventProcessor.getCurrentFrame(), createTime));
220                 }
221                 this.changedFields.clear();
222                 // will be null on destroy
223                 if(getDefinition() != null)
224                 {
225                     getDefinition().resetChanges();
226                 }
227             }
228             catch (CloneNotSupportedException e)
229             {
230                 throw new RuntimeException("Could not clone " + this, e);
231             }
232             /*
233              * If the processing itself generates events, these events will be
234              * processed in another frame. E.g. {@link LongField#set(long)} is
235              * invoked during processing of an event frame, this generates an
236              * event to notify the owning container that the field value has
237              * changed. This event will occur in the next, or subsequent, frame
238              * of that container.
239              */
240             getContext().queueEvents(notify);
241         }
242         else
243         {
244             if (getLog().isDebugEnabled())
245             {
246                 getLog().debug(
247                     "processEvents() for " + this + " found no events");
248             }
249         }
250     }
251 
252     public final void addEvent(IEvent event)
253     {
254         checkClone();
255         /*
256          * Remote instances still process events (they come from the reader
257          * thread) so that listeners can react. The events are not written to
258          * the wire (doWrite checks if this is local).
259          */
260         if (isActive() || !isLocal())
261         {
262             try
263             {
264                 /*
265                  * Note that even if the event is a clone, it still has the same
266                  * context identity so will actually be coalesced in the
267                  * collection. We thus get snapshot coalescing.
268                  */
269                 if (event != null)
270                 {
271                     final IEvent clone = (IEvent) event.clone();
272                     synchronized (this.events)
273                     {
274                         this.events.add(clone);
275                     }
276                 }
277                 else
278                 {
279                     if (getLog().isDebugEnabled())
280                     {
281                         getLog().debug(
282                             toIdentityString() + " received null event");
283                     }
284                 }
285             }
286             catch (CloneNotSupportedException e)
287             {
288                 throw new IllegalStateException("Could not clone " + event, e);
289             }
290         }
291         else
292         {
293             if (getLog().isTraceEnabled())
294             {
295                 getLog().trace(
296                     toIdentityString() + " is inactive, ignoring event "
297                         + event.toIdentityString());
298             }
299         }
300     }
301 
302     @Override
303     protected AsyncLog getLog()
304     {
305         return LOG;
306     }
307 
308     @Override
309     protected void doComponentDestroy()
310     {
311         super.doComponentDestroy();
312         this.changedFields.clear();
313         // DO NOT ENABLE THIS
314         // this.events.clear();
315     }
316 
317     @Override
318     protected void afterAdd(IField field)
319     {
320         super.afterAdd(field);
321         IField added = get(field.getIdentity());
322         addEvent(added);
323     }
324 
325     /**
326      * Get the fields that have changed in the previous event frame. This method
327      * should only be accessed from the {@link IEventListener#update(IEvent)}
328      * method when a listener receives notification that the {@link IContainer}
329      * has changed. The notification is a clone of the original and this method
330      * is only usable for cloned instances (hence only within the update
331      * method).
332      * 
333      * @return the fields that have changed in the previous event frame.
334      */
335     public final Collection<IField> getChangedFields()
336     {
337         if (isClone())
338         {
339             return this.changedFields;
340         }
341         return Collections.emptyList();
342     }
343 
344     @Override
345     protected Collection<IField> getFieldsToWrite(boolean completeState)
346     {
347         if (completeState)
348         {
349             return super.getFieldsToWrite(completeState);
350         }
351         final Collection<IField> toWrite =
352             CollectionFactory.newList(this.changedFields);
353         return toWrite;
354     }
355 
356     @Override
357     protected void doStateChangeOp(DataState oldState)
358     {
359         super.doStateChangeOp(oldState);
360         /*
361          * Trigger a full update on a valid data status change OR if this is a
362          * remote container; as it is remote, the image event will not be sent
363          * on-the-wire.
364          */
365         if (oldState == DataState.STALE || !isLocal())
366         {
367             addEvent(new ImageEvent());
368         }
369     }
370 
371     @Override
372     public Object clone() throws CloneNotSupportedException
373     {
374         final AbstractEventProcessingContainer clone =
375             (AbstractEventProcessingContainer) super.clone();
376         // get a deep copy of the changed fields
377         clone.changedFields = CollectionFactory.newList(clone.changedFields);
378         return clone;
379     }
380 }