1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package fulmine.model.container;
17
18 import static fulmine.util.Utils.logException;
19
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.Set;
24
25 import fulmine.IDomain;
26 import fulmine.IType;
27 import fulmine.context.FulmineContext;
28 import fulmine.context.IFrameworkContext;
29 import fulmine.event.EventProcessor;
30 import fulmine.event.IEvent;
31 import fulmine.event.IEventFrame;
32 import fulmine.event.ImageEvent;
33 import fulmine.event.listener.IEventListener;
34 import fulmine.event.system.ISystemEventListener;
35 import fulmine.event.system.TxEvent;
36 import fulmine.model.component.AbstractComponent;
37 import fulmine.model.field.IField;
38 import fulmine.protocol.wire.IWireIdentity;
39 import fulmine.protocol.wire.operation.IOperationScope;
40 import fulmine.util.collection.CoalescingCollection;
41 import fulmine.util.collection.CollectionFactory;
42 import fulmine.util.collection.OrderedCoalescingCollection;
43 import fulmine.util.log.AsyncLog;
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77 public abstract class AbstractEventProcessingContainer extends
78 AbstractContainer implements Cloneable
79 {
80 private final static AsyncLog LOG =
81 new AsyncLog(AbstractEventProcessingContainer.class);
82
83
84
85
86
87
88
89 protected volatile Collection<IEvent> events;
90
91
92
93
94
95
96
97 Collection<IField> changedFields;
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115 public AbstractEventProcessingContainer(String nativeContextIdentity,
116 String identity, IType type, IDomain domain,
117 IFrameworkContext hostContext, boolean local)
118 {
119 super(nativeContextIdentity, identity, type, domain, hostContext, local);
120 this.changedFields = new CoalescingCollection<IField>(2);
121 this.events = new OrderedCoalescingCollection<IEvent>(2);
122 }
123
124
125
126
127
128
129 protected final void doCommitEvents()
130 {
131
132 this.changedFields.clear();
133
134 Collection<IEvent> copy;
135 synchronized (this.events)
136 {
137 copy = this.events;
138 this.events = new OrderedCoalescingCollection<IEvent>(2);
139 }
140
141 if (copy.size() > 0)
142 {
143 boolean writeImage = false;
144 List<IEvent> notify = CollectionFactory.newList(copy.size());
145 for (IEvent event : copy)
146 {
147 try
148 {
149
150
151
152
153
154 if (event.getSource() instanceof IField)
155 {
156 final IField field = (IField) event.getSource();
157 if (this.equals(field.getContainer()))
158 {
159 this.changedFields.add(field);
160 }
161 }
162
163 if (event instanceof ImageEvent)
164 {
165 writeImage = true;
166
167 final Set<String> keySet = getFields().keySet();
168 for (String fieldName : keySet)
169 {
170 this.changedFields.add(getFields().get(fieldName));
171 }
172 continue;
173 }
174
175
176
177 if (event.getSource().getListeners().size() > 0)
178 {
179
180
181
182
183
184
185 if (event instanceof AbstractComponent)
186 {
187 ((AbstractComponent) event).setFrame(getFrameIdentifier().get());
188 }
189 notify.add(event);
190 }
191 }
192 catch (Exception e)
193 {
194 logException(getLog(), event, e);
195 }
196 }
197
198
199 try
200 {
201
202 AbstractEventProcessingContainer clone = null;
203 if (getListeners() != null)
204 {
205 clone = (AbstractEventProcessingContainer) clone();
206 notify.add(clone);
207 }
208
209 if (getRemoteSubscriptionCount() > 0)
210 {
211 final long start = System.nanoTime();
212 final byte[] frame =
213 writeImage
214 ? getContext().getFrameWriter().writeComplete(this)
215 : getContext().getFrameWriter().write(this);
216 final long createTime = (System.nanoTime() - start);
217 notify.add(new TxEvent(this, frame,
218 getFrameIdentifier().get(),
219 EventProcessor.getCurrentFrame(), createTime));
220 }
221 this.changedFields.clear();
222
223 if(getDefinition() != null)
224 {
225 getDefinition().resetChanges();
226 }
227 }
228 catch (CloneNotSupportedException e)
229 {
230 throw new RuntimeException("Could not clone " + this, e);
231 }
232
233
234
235
236
237
238
239
240 getContext().queueEvents(notify);
241 }
242 else
243 {
244 if (getLog().isDebugEnabled())
245 {
246 getLog().debug(
247 "processEvents() for " + this + " found no events");
248 }
249 }
250 }
251
252 public final void addEvent(IEvent event)
253 {
254 checkClone();
255
256
257
258
259
260 if (isActive() || !isLocal())
261 {
262 try
263 {
264
265
266
267
268
269 if (event != null)
270 {
271 final IEvent clone = (IEvent) event.clone();
272 synchronized (this.events)
273 {
274 this.events.add(clone);
275 }
276 }
277 else
278 {
279 if (getLog().isDebugEnabled())
280 {
281 getLog().debug(
282 toIdentityString() + " received null event");
283 }
284 }
285 }
286 catch (CloneNotSupportedException e)
287 {
288 throw new IllegalStateException("Could not clone " + event, e);
289 }
290 }
291 else
292 {
293 if (getLog().isTraceEnabled())
294 {
295 getLog().trace(
296 toIdentityString() + " is inactive, ignoring event "
297 + event.toIdentityString());
298 }
299 }
300 }
301
302 @Override
303 protected AsyncLog getLog()
304 {
305 return LOG;
306 }
307
308 @Override
309 protected void doComponentDestroy()
310 {
311 super.doComponentDestroy();
312 this.changedFields.clear();
313
314
315 }
316
317 @Override
318 protected void afterAdd(IField field)
319 {
320 super.afterAdd(field);
321 IField added = get(field.getIdentity());
322 addEvent(added);
323 }
324
325
326
327
328
329
330
331
332
333
334
335 public final Collection<IField> getChangedFields()
336 {
337 if (isClone())
338 {
339 return this.changedFields;
340 }
341 return Collections.emptyList();
342 }
343
344 @Override
345 protected Collection<IField> getFieldsToWrite(boolean completeState)
346 {
347 if (completeState)
348 {
349 return super.getFieldsToWrite(completeState);
350 }
351 final Collection<IField> toWrite =
352 CollectionFactory.newList(this.changedFields);
353 return toWrite;
354 }
355
356 @Override
357 protected void doStateChangeOp(DataState oldState)
358 {
359 super.doStateChangeOp(oldState);
360
361
362
363
364
365 if (oldState == DataState.STALE || !isLocal())
366 {
367 addEvent(new ImageEvent());
368 }
369 }
370
371 @Override
372 public Object clone() throws CloneNotSupportedException
373 {
374 final AbstractEventProcessingContainer clone =
375 (AbstractEventProcessingContainer) super.clone();
376
377 clone.changedFields = CollectionFactory.newList(clone.changedFields);
378 return clone;
379 }
380 }