1 /*
2 Copyright 2007 Ramon Servadei
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16 package fulmine.model.container;
17
18 import static fulmine.util.Utils.logException;
19
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.Set;
24
25 import fulmine.IDomain;
26 import fulmine.IType;
27 import fulmine.context.FulmineContext;
28 import fulmine.context.IFrameworkContext;
29 import fulmine.event.EventProcessor;
30 import fulmine.event.IEvent;
31 import fulmine.event.IEventFrame;
32 import fulmine.event.ImageEvent;
33 import fulmine.event.listener.IEventListener;
34 import fulmine.event.system.ISystemEventListener;
35 import fulmine.event.system.TxEvent;
36 import fulmine.model.component.AbstractComponent;
37 import fulmine.model.field.IField;
38 import fulmine.protocol.wire.IWireIdentity;
39 import fulmine.protocol.wire.operation.IOperationScope;
40 import fulmine.util.collection.CoalescingCollection;
41 import fulmine.util.collection.CollectionFactory;
42 import fulmine.util.collection.OrderedCoalescingCollection;
43 import fulmine.util.log.AsyncLog;
44
45 /**
46 * An extended {@link AbstractContainer} that includes logic to handle event
47 * processing.
48 * <p>
49 * This implementation is thread safe. Events received by
50 * {@link #addEvent(IEvent)} are cloned (if they are an
51 * {@link AbstractCloneableComponent}) and cached in a local collection until
52 * the event frame is flushed (see {@link IEventFrame}). <u>The event sinking
53 * process is thread safe</u>. If any of the events are fields of this
54 * container, they are marked for writing when the
55 * {@link #writeState(IOperationScope, IWireIdentity, byte[][], int[], byte[][], int[], boolean)}
56 * method executes. This provides a delta change mechanism that overrides the
57 * {@link AbstractContainer} implementation that can only write the complete
58 * image.
59 * <p>
60 * Changes to the container are only notified to {@link IEventListener} objects
61 * when the events are processed via {@link #endFrame()}. When the events are
62 * processed, the container also raises itself as an event. However, the
63 * container raises a clone of itself. The cloned container will be a deep clone
64 * so will represent the state of the container at the moment in time when the
65 * event occurs.
66 * <p>
67 * The {@link #endFrame()} also raises a {@link TxEvent} containing the
68 * <code>byte[]</code> to with the changes send to remote context instances.
69 * This is a system event so only {@link ISystemEventListener} instances will
70 * receive these.
71 * <p>
72 * Events are raised via the {@link FulmineContext#queueEvents(Collection)}
73 * method.
74 *
75 * @author Ramon Servadei
76 */
77 public abstract class AbstractEventProcessingContainer extends
78 AbstractContainer implements Cloneable
79 {
80 private final static AsyncLog LOG =
81 new AsyncLog(AbstractEventProcessingContainer.class);
82
83 /**
84 * The events occurring in an event frame. The events are ordered and
85 * coalesced. Uses the <a
86 * href="http://www.ibm.com/developerworks/java/library/j-jtp06197.html"
87 * >'cheap read-write lock'</a>
88 */
89 protected volatile Collection<IEvent> events;
90
91 /**
92 * The fields that have changed as a result of events. The collection is
93 * flushed when
94 * {@link #doWriteState(IOperationScope, byte[][], int[], byte[][], int[])}
95 * executes.
96 */
97 Collection<IField> changedFields;
98
99 /**
100 * Standard constructor.
101 *
102 * @param nativeContextIdentity
103 * whether the container is local to this context
104 * @param identity
105 * the identity of the container
106 * @param type
107 * the type of the container
108 * @param domain
109 * the domain for the container
110 * @param hostContext
111 * the context hosting this container instance
112 * @param local
113 * <code>true</code> the container is local to this context
114 */
115 public AbstractEventProcessingContainer(String nativeContextIdentity,
116 String identity, IType type, IDomain domain,
117 IFrameworkContext hostContext, boolean local)
118 {
119 super(nativeContextIdentity, identity, type, domain, hostContext, local);
120 this.changedFields = new CoalescingCollection<IField>(2);
121 this.events = new OrderedCoalescingCollection<IEvent>(2);
122 }
123
124 /**
125 * This is thread safe - only the event frame can execute this.
126 *
127 * @see IEventFrame#endFrame()
128 */
129 protected final void doCommitEvents()
130 {
131 // Clear the changes as these are added from the events.
132 this.changedFields.clear();
133
134 Collection<IEvent> copy;
135 synchronized (this.events)
136 {
137 copy = this.events;
138 this.events = new OrderedCoalescingCollection<IEvent>(2);
139 }
140
141 if (copy.size() > 0)
142 {
143 boolean writeImage = false;
144 List<IEvent> notify = CollectionFactory.newList(copy.size());
145 for (IEvent event : copy)
146 {
147 try
148 {
149 /*
150 * local containers write changes out and remote containers
151 * get changes written in, so both need to be able to
152 * indicate which fields have changed
153 */
154 if (event.getSource() instanceof IField)
155 {
156 final IField field = (IField) event.getSource();
157 if (this.equals(field.getContainer()))
158 {
159 this.changedFields.add(field);
160 }
161 }
162 // an image needs to re-write the fields as changes
163 if (event instanceof ImageEvent)
164 {
165 writeImage = true;
166 // add all fields
167 final Set<String> keySet = getFields().keySet();
168 for (String fieldName : keySet)
169 {
170 this.changedFields.add(getFields().get(fieldName));
171 }
172 continue;
173 }
174 // only add to the queue if there are listeners...
175 // getListeners() is final on the AbstractComponent, so
176 // no chance of an NPE here
177 if (event.getSource().getListeners().size() > 0)
178 {
179 /*
180 * the events are cloned fields so the field data value
181 * in a notified event frame is isolated to the frame
182 * i.e. its a clone so changing its value doesn't affect
183 * the original
184 */
185 if (event instanceof AbstractComponent)
186 {
187 ((AbstractComponent) event).setFrame(getFrameIdentifier().get());
188 }
189 notify.add(event);
190 }
191 }
192 catch (Exception e)
193 {
194 logException(getLog(), event, e);
195 }
196 }
197 // notify event listeners with the clone of this so that the I/O
198 // framework can safely write the current state of this frame.
199 try
200 {
201 // only add to the queue if there are listeners
202 AbstractEventProcessingContainer clone = null;
203 if (getListeners() != null)
204 {
205 clone = (AbstractEventProcessingContainer) clone();
206 notify.add(clone);
207 }
208 // add the wire-state to transmit
209 if (getRemoteSubscriptionCount() > 0)
210 {
211 final long start = System.nanoTime();
212 final byte[] frame =
213 writeImage
214 ? getContext().getFrameWriter().writeComplete(this)
215 : getContext().getFrameWriter().write(this);
216 final long createTime = (System.nanoTime() - start);
217 notify.add(new TxEvent(this, frame,
218 getFrameIdentifier().get(),
219 EventProcessor.getCurrentFrame(), createTime));
220 }
221 this.changedFields.clear();
222 // will be null on destroy
223 if(getDefinition() != null)
224 {
225 getDefinition().resetChanges();
226 }
227 }
228 catch (CloneNotSupportedException e)
229 {
230 throw new RuntimeException("Could not clone " + this, e);
231 }
232 /*
233 * If the processing itself generates events, these events will be
234 * processed in another frame. E.g. {@link LongField#set(long)} is
235 * invoked during processing of an event frame, this generates an
236 * event to notify the owning container that the field value has
237 * changed. This event will occur in the next, or subsequent, frame
238 * of that container.
239 */
240 getContext().queueEvents(notify);
241 }
242 else
243 {
244 if (getLog().isDebugEnabled())
245 {
246 getLog().debug(
247 "processEvents() for " + this + " found no events");
248 }
249 }
250 }
251
252 public final void addEvent(IEvent event)
253 {
254 checkClone();
255 /*
256 * Remote instances still process events (they come from the reader
257 * thread) so that listeners can react. The events are not written to
258 * the wire (doWrite checks if this is local).
259 */
260 if (isActive() || !isLocal())
261 {
262 try
263 {
264 /*
265 * Note that even if the event is a clone, it still has the same
266 * context identity so will actually be coalesced in the
267 * collection. We thus get snapshot coalescing.
268 */
269 if (event != null)
270 {
271 final IEvent clone = (IEvent) event.clone();
272 synchronized (this.events)
273 {
274 this.events.add(clone);
275 }
276 }
277 else
278 {
279 if (getLog().isDebugEnabled())
280 {
281 getLog().debug(
282 toIdentityString() + " received null event");
283 }
284 }
285 }
286 catch (CloneNotSupportedException e)
287 {
288 throw new IllegalStateException("Could not clone " + event, e);
289 }
290 }
291 else
292 {
293 if (getLog().isTraceEnabled())
294 {
295 getLog().trace(
296 toIdentityString() + " is inactive, ignoring event "
297 + event.toIdentityString());
298 }
299 }
300 }
301
302 @Override
303 protected AsyncLog getLog()
304 {
305 return LOG;
306 }
307
308 @Override
309 protected void doComponentDestroy()
310 {
311 super.doComponentDestroy();
312 this.changedFields.clear();
313 // DO NOT ENABLE THIS
314 // this.events.clear();
315 }
316
317 @Override
318 protected void afterAdd(IField field)
319 {
320 super.afterAdd(field);
321 IField added = get(field.getIdentity());
322 addEvent(added);
323 }
324
325 /**
326 * Get the fields that have changed in the previous event frame. This method
327 * should only be accessed from the {@link IEventListener#update(IEvent)}
328 * method when a listener receives notification that the {@link IContainer}
329 * has changed. The notification is a clone of the original and this method
330 * is only usable for cloned instances (hence only within the update
331 * method).
332 *
333 * @return the fields that have changed in the previous event frame.
334 */
335 public final Collection<IField> getChangedFields()
336 {
337 if (isClone())
338 {
339 return this.changedFields;
340 }
341 return Collections.emptyList();
342 }
343
344 @Override
345 protected Collection<IField> getFieldsToWrite(boolean completeState)
346 {
347 if (completeState)
348 {
349 return super.getFieldsToWrite(completeState);
350 }
351 final Collection<IField> toWrite =
352 CollectionFactory.newList(this.changedFields);
353 return toWrite;
354 }
355
356 @Override
357 protected void doStateChangeOp(DataState oldState)
358 {
359 super.doStateChangeOp(oldState);
360 /*
361 * Trigger a full update on a valid data status change OR if this is a
362 * remote container; as it is remote, the image event will not be sent
363 * on-the-wire.
364 */
365 if (oldState == DataState.STALE || !isLocal())
366 {
367 addEvent(new ImageEvent());
368 }
369 }
370
371 @Override
372 public Object clone() throws CloneNotSupportedException
373 {
374 final AbstractEventProcessingContainer clone =
375 (AbstractEventProcessingContainer) super.clone();
376 // get a deep copy of the changed fields
377 clone.changedFields = CollectionFactory.newList(clone.changedFields);
378 return clone;
379 }
380 }