1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package fulmine.event.listener;
17
18 import static fulmine.util.Utils.CLOSE_CURLY;
19 import static fulmine.util.Utils.CLOSE_SQUARE;
20 import static fulmine.util.Utils.COMMA_SPACE;
21 import static fulmine.util.Utils.OPEN_CURLY;
22 import static fulmine.util.Utils.OPEN_SQUARE;
23 import static fulmine.util.Utils.SPACING_4_CHARS;
24 import static fulmine.util.Utils.logException;
25 import static fulmine.util.Utils.nullCheck;
26 import static fulmine.util.Utils.string;
27
28 import java.util.List;
29 import java.util.Map;
30
31 import org.apache.commons.lang.SystemUtils;
32
33 import fulmine.IDescriptor;
34 import fulmine.ILifeCycle;
35 import fulmine.context.FulmineContext;
36 import fulmine.event.AbstractEvent;
37 import fulmine.event.EventProcessor;
38 import fulmine.event.EventSource;
39 import fulmine.event.IEvent;
40 import fulmine.event.IEventFrame;
41 import fulmine.event.IEventFrameExecution;
42 import fulmine.event.IEventManager;
43 import fulmine.event.IEventSource;
44 import fulmine.util.collection.CollectionFactory;
45 import fulmine.util.log.AsyncLog;
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113 public abstract class AsymmetricEventProcessor extends EventSource implements
114 IEventSource, ILifeCycle
115 {
116 private final static AsyncLog LOG =
117 new AsyncLog(AsymmetricEventProcessor.class);
118
119
120
121
122
123 private static final int DRIVER_THRESHOLD = 3;
124
125
126 private final IEventSource driverSource;
127
128
129 private final IEventSource[] drivenSources;
130
131
132
133
134
135
136 protected final Map<IEventFrameExecution, IEvent> driverEvents =
137 CollectionFactory.newMap();
138
139
140
141
142 protected final Map<IEventFrameExecution, List<IEvent>> drivenEvents =
143 CollectionFactory.newMap();
144
145
146 private final IEventManager context;
147
148
149
150
151
152 private final Class<? extends IEvent>[] driverEventTypeFilter;
153
154
155
156
157
158 private final Class<? extends IEvent>[] drivenEventTypeFilter;
159
160
161 final IEventListener driverEventListener;
162
163
164 final IEventListener drivenEventListener;
165
166
167
168
169
170
171
172 private class EventListener implements IPriorityEventListener
173 {
174 private final Class<? extends IEvent>[] eventTypeFilter;
175
176 public EventListener(Class<? extends IEvent>[] eventTypeFilter)
177 {
178 super();
179 this.eventTypeFilter = eventTypeFilter;
180 }
181
182 public void addedAsListenerFor(IEventSource source)
183 {
184
185 }
186
187 public Class<? extends IEvent>[] getEventTypeFilter()
188 {
189 return this.eventTypeFilter;
190 }
191
192 public void removedAsListenerFrom(IEventSource source)
193 {
194
195 }
196
197 public void update(IEvent event)
198 {
199 AsymmetricEventProcessor.this.update(event);
200 }
201 }
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220 public AsymmetricEventProcessor(IEventManager context,
221 Class<? extends IEvent>[] driverEventTypeFilter,
222 IEventSource driverSource,
223 Class<? extends IEvent>[] drivenEventTypeFilter,
224 IEventSource... drivenSources)
225 {
226 super(OPEN_CURLY + AsymmetricEventProcessor.class.getSimpleName()
227 + " driver=" + driverSource.toIdentityString() + ", driven="
228 + toIdentityString(drivenSources) + CLOSE_CURLY);
229 nullCheck(driverSource, "Cannot have a null driver source");
230 this.driverEventTypeFilter = driverEventTypeFilter;
231 this.drivenEventTypeFilter = drivenEventTypeFilter;
232 this.driverSource = driverSource;
233 this.drivenSources = drivenSources;
234 this.context = context;
235 this.driverEventListener =
236 new EventListener(this.driverEventTypeFilter);
237 this.driverSource.addListener(this.driverEventListener);
238 this.drivenEventListener =
239 new EventListener(this.drivenEventTypeFilter);
240 for (IEventSource eventSource : this.drivenSources)
241 {
242 try
243 {
244 eventSource.addListener(this.drivenEventListener);
245 }
246 catch (Exception e)
247 {
248 logException(getLog(), eventSource, e);
249 }
250 }
251 }
252
253
254
255
256
257
258
259
260 private static String toIdentityString(IDescriptor[] descriptors)
261 {
262 StringBuilder sb = new StringBuilder();
263 sb.append(OPEN_SQUARE);
264 if (descriptors != null && descriptors.length > 0)
265 {
266 if (descriptors[0] != null)
267 {
268 sb.append(descriptors[0].toIdentityString());
269 }
270 for (int i = 1; i < descriptors.length; i++)
271 {
272 IDescriptor descriptor = descriptors[i];
273 if (descriptor != null)
274 {
275 sb.append(COMMA_SPACE).append(descriptor.toIdentityString());
276 }
277 }
278 }
279 sb.append(CLOSE_SQUARE);
280 return sb.toString();
281 }
282
283 private synchronized final void update(IEvent event)
284 {
285 if (isActive())
286 {
287
288 if (this.driverSource == event.getSource())
289 {
290 this.driverEvents.put(event.getFrame(), event);
291
292
293 if (this.driverEvents.size() > AsymmetricEventProcessor.DRIVER_THRESHOLD)
294 {
295 if (getLog().isInfoEnabled())
296 {
297 getLog().info(
298 "Driver events still awaiting driven events: "
299 + this.driverEvents + " from " + getIdentity());
300 }
301 }
302 }
303 else
304 {
305 if (!this.drivenEvents.containsKey(event.getDrivingFrame()))
306 {
307 final List<IEvent> newList =
308 CollectionFactory.newList(this.drivenSources.length);
309 this.drivenEvents.put(event.getDrivingFrame(), newList);
310 }
311 final List<IEvent> receivedDrivenEvents =
312 this.drivenEvents.get(event.getDrivingFrame());
313 receivedDrivenEvents.add(event);
314
315 if (receivedDrivenEvents.size() == this.drivenSources.length)
316 {
317 this.drivenEvents.remove(event.getDrivingFrame());
318 final IEvent driverEvent =
319 this.driverEvents.remove(event.getDrivingFrame());
320 if (driverEvent == null)
321 {
322 if (getLog().isInfoEnabled())
323 {
324 getLog().info("No driver event for " + event);
325 }
326 }
327 else
328 {
329 triggerUpdate(event, receivedDrivenEvents, driverEvent);
330 }
331 }
332 }
333 }
334 else
335 {
336 throw new IllegalStateException("Inactive listener " + this
337 + " received event " + event);
338 }
339 }
340
341 void triggerUpdate(IEvent event, final List<IEvent> receivedDrivenEvents,
342 final IEvent driverEvent)
343 {
344 final Result result = update(driverEvent, receivedDrivenEvents);
345 if (result != null)
346 {
347 result.setDrivingFrame(event.getDrivingFrame());
348 result.setFrame(event.getFrame());
349 result.setSource(this);
350
351
352
353
354
355 this.context.queueEvent(result);
356 }
357 }
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383 protected abstract Result update(IEvent driverEvent,
384 List<IEvent> drivenEvents);
385
386
387
388
389
390
391 public final IEventSource getDriverSource()
392 {
393 return this.driverSource;
394 }
395
396 @Override
397 protected AsyncLog getLog()
398 {
399 return LOG;
400 }
401
402 @Override
403 protected void doStart()
404 {
405
406 }
407
408
409
410
411
412
413 public final IEventSource[] getDrivenSources()
414 {
415 return this.drivenSources;
416 }
417
418
419
420
421 @Override
422 protected void doComponentDestroy()
423 {
424 this.driverSource.removeListener(this.driverEventListener);
425 for (IEventSource eventSource : drivenSources)
426 {
427 eventSource.removeListener(this.drivenEventListener);
428 }
429 this.driverEvents.clear();
430 }
431
432
433
434
435
436
437
438
439
440
441
442
443 public static class Result extends AbstractEvent
444 {
445
446 private final AsymmetricEventProcessor driver;
447
448
449
450
451
452
453
454
455 public Result(AsymmetricEventProcessor driver)
456 {
457 super();
458 this.driver = driver;
459 }
460
461
462
463
464
465
466
467
468 public AsymmetricEventProcessor getDriver()
469 {
470 return this.driver;
471 }
472 }
473
474 @Override
475 public String toDetailedString()
476 {
477 return toFormattedString(0);
478 }
479
480 private String toFormattedString(int level)
481 {
482 StringBuilder sb = new StringBuilder();
483 StringBuilder spacing = new StringBuilder();
484 for (int i = 0; i < level; i++)
485 {
486 spacing.append(SPACING_4_CHARS);
487 }
488 sb.append(SystemUtils.LINE_SEPARATOR).append(spacing).append(OPEN_CURLY).append(
489 getClass().getSimpleName()).append(SystemUtils.LINE_SEPARATOR);
490 sb.append(spacing).append(SPACING_4_CHARS).append("driver: ");
491 if (this.driverSource instanceof AsymmetricEventProcessor)
492 {
493 sb.append(((AsymmetricEventProcessor) this.driverSource).toFormattedString(level + 3));
494 }
495 else
496 {
497 sb.append(SystemUtils.LINE_SEPARATOR).append(spacing).append(
498 SPACING_4_CHARS).append(SPACING_4_CHARS).append(SPACING_4_CHARS);
499 sb.append(this.driverSource);
500 }
501 sb.append(SystemUtils.LINE_SEPARATOR);
502 if (this.drivenSources != null)
503 {
504 int count = 0;
505 for (IEventSource source : this.drivenSources)
506 {
507 count++;
508 if (count == 1)
509 {
510 sb.append(spacing).append(SPACING_4_CHARS).append(
511 "driven: ");
512 }
513 if (source instanceof AsymmetricEventProcessor)
514 {
515 sb.append(((AsymmetricEventProcessor) source).toFormattedString(level + 3));
516 }
517 else
518 {
519 sb.append(SystemUtils.LINE_SEPARATOR).append(spacing).append(
520 SPACING_4_CHARS).append(SPACING_4_CHARS).append(
521 SPACING_4_CHARS);
522 sb.append(source);
523 }
524 }
525 }
526 sb.append(SystemUtils.LINE_SEPARATOR).append(spacing).append(
527 CLOSE_CURLY);
528 return sb.toString();
529 }
530
531 @Override
532 public final String toString()
533 {
534 return string(this, "driver=" + this.driverSource == null ? "null"
535 : this.driverSource.toIdentityString() + ", driven="
536 + this.drivenSources == null ? "null"
537 : toIdentityString(this.drivenSources));
538 }
539
540 }