1 /*
2 Copyright 2007 Ramon Servadei
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16 package fulmine.event.listener;
17
18 import static fulmine.util.Utils.CLOSE_CURLY;
19 import static fulmine.util.Utils.CLOSE_SQUARE;
20 import static fulmine.util.Utils.COMMA_SPACE;
21 import static fulmine.util.Utils.OPEN_CURLY;
22 import static fulmine.util.Utils.OPEN_SQUARE;
23 import static fulmine.util.Utils.SPACING_4_CHARS;
24 import static fulmine.util.Utils.logException;
25 import static fulmine.util.Utils.nullCheck;
26 import static fulmine.util.Utils.string;
27
28 import java.util.List;
29 import java.util.Map;
30
31 import org.apache.commons.lang.SystemUtils;
32
33 import fulmine.IDescriptor;
34 import fulmine.ILifeCycle;
35 import fulmine.context.FulmineContext;
36 import fulmine.event.AbstractEvent;
37 import fulmine.event.EventProcessor;
38 import fulmine.event.EventSource;
39 import fulmine.event.IEvent;
40 import fulmine.event.IEventFrame;
41 import fulmine.event.IEventFrameExecution;
42 import fulmine.event.IEventManager;
43 import fulmine.event.IEventSource;
44 import fulmine.util.collection.CollectionFactory;
45 import fulmine.util.log.AsyncLog;
46
47 /**
48 * <b>This class must be started by calling {@link #start()}.</b>
49 * <p>
50 * Processes the events from multiple sources as a single unit of work. The
51 * sources must have a unidirectional relationship; there is 1 driver source and
52 * 1 or more driven sources. The driver source generates an event that is
53 * consumed by the driven sources that then produce events in response. The
54 * driver event and driven events will occur in separate frames; the 'asymmetry'
55 * is in the events being generated in these separate frames (see
56 * {@link IEventFrame} for a description of frames). The diagram below helps to
57 * illustrate this.
58 *
59 * <pre>
60 * Source A (generates event A1)
61 * |
62 * |
63 * ______|_______
64 * | |
65 * | |
66 * | v
67 * | Source B (consumes A1 and generates event B1)
68 * | |
69 * | |
70 * ---> this <---
71 * (consumes A1 and B1 only when they are both available)
72 * </pre>
73 *
74 * This class handles the co-ordination such that event B1 from source B is
75 * processed with event A1 from source A even though event A1 and B1 occur in
76 * separate frames.
77 * <p>
78 * Source A is the 'driver source', source B is the 'driven source'. There can
79 * be multiple driven sources, but only 1 driver source. This driving
80 * relationship, coupled with:
81 * <ul>
82 * <li>the guaranteed event ordering of the {@link FulmineContext}
83 * <li>the guaranteed association of an {@link EventProcessor} to an
84 * {@link IEventSource}
85 * <li>the guarantee that {@link IPriorityEventListener} instances are processed
86 * before other listeners
87 * </ul>
88 * ensures that A1 is received before B1. This listener uses these mechanisms to
89 * cache the 'driver' events in preparation for receiving the 'driven' events.
90 * The events are associated with each other using the driver event's
91 * {@link IEvent#getFrame()} and the driven event's
92 * {@link IEvent#getDrivingFrame()}.
93 * <p>
94 * {@link AsymmetricEventProcessor} instances can be chained together to form
95 * arbitrarily complex event propagation listener matrices. This class was
96 * principally designed to handle the so-called "diamond" observer construct:
97 *
98 * <pre>
99 * Node A
100 * |
101 * ______|_______
102 * | |
103 * | |
104 * v v
105 * Node B Node C
106 * | |
107 * | |
108 * ---> this <---
109 * </pre>
110 *
111 * @author Ramon Servadei
112 */
113 public abstract class AsymmetricEventProcessor extends EventSource implements
114 IEventSource, ILifeCycle
115 {
116 private final static AsyncLog LOG =
117 new AsyncLog(AsymmetricEventProcessor.class);
118
119 /**
120 * The maximum number of driver events that are awaiting driven events
121 * before a message is logged.
122 */
123 private static final int DRIVER_THRESHOLD = 3;
124
125 /** The driver source */
126 private final IEventSource driverSource;
127
128 /** The driven sources */
129 private final IEventSource[] drivenSources;
130
131 /**
132 * The driver events that are stored against their {@link IEvent#getFrame()}
133 * in preparation for a matching driven events (matched using the driven
134 * event's {@link IEvent#getDrivingFrame()}).
135 */
136 protected final Map<IEventFrameExecution, IEvent> driverEvents =
137 CollectionFactory.newMap();
138
139 /**
140 * The driven events stored per driving frame.
141 */
142 protected final Map<IEventFrameExecution, List<IEvent>> drivenEvents =
143 CollectionFactory.newMap();
144
145 /** The context this listener will use */
146 private final IEventManager context;
147
148 /**
149 * The event filter for driver events, defines the event types from the
150 * driver source that are processed.
151 */
152 private final Class<? extends IEvent>[] driverEventTypeFilter;
153
154 /**
155 * The event filter for driven events, defines the event types from the
156 * driven sources that are processed.
157 */
158 private final Class<? extends IEvent>[] drivenEventTypeFilter;
159
160 /** The listener for the events from the driver source */
161 final IEventListener driverEventListener;
162
163 /** The listener for the events from the driven sources */
164 final IEventListener drivenEventListener;
165
166 /**
167 * Handles the events from the driver sources or driven sources. This class
168 * provides a parameterisable event type filter.
169 *
170 * @author Ramon Servadei
171 */
172 private class EventListener implements IPriorityEventListener
173 {
174 private final Class<? extends IEvent>[] eventTypeFilter;
175
176 public EventListener(Class<? extends IEvent>[] eventTypeFilter)
177 {
178 super();
179 this.eventTypeFilter = eventTypeFilter;
180 }
181
182 public void addedAsListenerFor(IEventSource source)
183 {
184 // noop
185 }
186
187 public Class<? extends IEvent>[] getEventTypeFilter()
188 {
189 return this.eventTypeFilter;
190 }
191
192 public void removedAsListenerFrom(IEventSource source)
193 {
194 // noop
195 }
196
197 public void update(IEvent event)
198 {
199 AsymmetricEventProcessor.this.update(event);
200 }
201 }
202
203 /**
204 * Construct the processor for the asymmetric event driving relationship.
205 *
206 * @param context
207 * the context this processor should use
208 * @param driverEventTypeFilter
209 * The event filter for driver events, defines the event types
210 * from the driver source that are processed
211 * @param driverSource
212 * the driver {@link IEventSource}
213 * @param drivenEventTypeFilter
214 * The event filter for driven events, defines the event types
215 * from the driven sources that are processed
216 * @param drivenSources
217 * the driven {@link IEventSource} objects that will generate
218 * events as a result of the driver source events.
219 */
220 public AsymmetricEventProcessor(IEventManager context,
221 Class<? extends IEvent>[] driverEventTypeFilter,
222 IEventSource driverSource,
223 Class<? extends IEvent>[] drivenEventTypeFilter,
224 IEventSource... drivenSources)
225 {
226 super(OPEN_CURLY + AsymmetricEventProcessor.class.getSimpleName()
227 + " driver=" + driverSource.toIdentityString() + ", driven="
228 + toIdentityString(drivenSources) + CLOSE_CURLY);
229 nullCheck(driverSource, "Cannot have a null driver source");
230 this.driverEventTypeFilter = driverEventTypeFilter;
231 this.drivenEventTypeFilter = drivenEventTypeFilter;
232 this.driverSource = driverSource;
233 this.drivenSources = drivenSources;
234 this.context = context;
235 this.driverEventListener =
236 new EventListener(this.driverEventTypeFilter);
237 this.driverSource.addListener(this.driverEventListener);
238 this.drivenEventListener =
239 new EventListener(this.drivenEventTypeFilter);
240 for (IEventSource eventSource : this.drivenSources)
241 {
242 try
243 {
244 eventSource.addListener(this.drivenEventListener);
245 }
246 catch (Exception e)
247 {
248 logException(getLog(), eventSource, e);
249 }
250 }
251 }
252
253 /**
254 * Get the identity string for each element of the array
255 *
256 * @param descriptors
257 * the array of {@link IDescriptor} elements
258 * @return a string with the identity string of each element
259 */
260 private static String toIdentityString(IDescriptor[] descriptors)
261 {
262 StringBuilder sb = new StringBuilder();
263 sb.append(OPEN_SQUARE);
264 if (descriptors != null && descriptors.length > 0)
265 {
266 if (descriptors[0] != null)
267 {
268 sb.append(descriptors[0].toIdentityString());
269 }
270 for (int i = 1; i < descriptors.length; i++)
271 {
272 IDescriptor descriptor = descriptors[i];
273 if (descriptor != null)
274 {
275 sb.append(COMMA_SPACE).append(descriptor.toIdentityString());
276 }
277 }
278 }
279 sb.append(CLOSE_SQUARE);
280 return sb.toString();
281 }
282
283 private synchronized final void update(IEvent event)
284 {
285 if (isActive())
286 {
287 // compare object reference
288 if (this.driverSource == event.getSource())
289 {
290 this.driverEvents.put(event.getFrame(), event);
291 // log if there are too many driver events (this can indicate
292 // slow driven event sources)
293 if (this.driverEvents.size() > AsymmetricEventProcessor.DRIVER_THRESHOLD)
294 {
295 if (getLog().isInfoEnabled())
296 {
297 getLog().info(
298 "Driver events still awaiting driven events: "
299 + this.driverEvents + " from " + getIdentity());
300 }
301 }
302 }
303 else
304 {
305 if (!this.drivenEvents.containsKey(event.getDrivingFrame()))
306 {
307 final List<IEvent> newList =
308 CollectionFactory.newList(this.drivenSources.length);
309 this.drivenEvents.put(event.getDrivingFrame(), newList);
310 }
311 final List<IEvent> receivedDrivenEvents =
312 this.drivenEvents.get(event.getDrivingFrame());
313 receivedDrivenEvents.add(event);
314 // are all driven events available for the driver?
315 if (receivedDrivenEvents.size() == this.drivenSources.length)
316 {
317 this.drivenEvents.remove(event.getDrivingFrame());
318 final IEvent driverEvent =
319 this.driverEvents.remove(event.getDrivingFrame());
320 if (driverEvent == null)
321 {
322 if (getLog().isInfoEnabled())
323 {
324 getLog().info("No driver event for " + event);
325 }
326 }
327 else
328 {
329 triggerUpdate(event, receivedDrivenEvents, driverEvent);
330 }
331 }
332 }
333 }
334 else
335 {
336 throw new IllegalStateException("Inactive listener " + this
337 + " received event " + event);
338 }
339 }
340
341 void triggerUpdate(IEvent event, final List<IEvent> receivedDrivenEvents,
342 final IEvent driverEvent)
343 {
344 final Result result = update(driverEvent, receivedDrivenEvents);
345 if (result != null)
346 {
347 result.setDrivingFrame(event.getDrivingFrame());
348 result.setFrame(event.getFrame());
349 result.setSource(this);
350 /*
351 * Raise an event with the same driving frame configuration. This
352 * allows chained processors to process events within the same
353 * driving frame.
354 */
355 this.context.queueEvent(result);
356 }
357 }
358
359 /**
360 * Process the driver and driven {@link IEvent} objects generated by the
361 * driver and driven {@link IEventSource} objects. These events are linked
362 * by the driver event's frame.
363 * <p>
364 * The order of the driven events in this method <b>is not</b> guaranteed to
365 * be the same as that of the listeners used in the constructor; the order
366 * depends on which driven event arrives first.
367 *
368 * @param driverEvent
369 * the driver {@link IEvent}
370 * @param drivenEvents
371 * the list of driven {@link IEvent} objects caused by the driver
372 * event. The order of the events in the list <b>is not</b>
373 * guaranteed to be the same as the listeners used in the
374 * constructor; the order depends on which driven event arrived
375 * first.
376 * @return a result ({@link IEvent} subclass) that represents the output of
377 * this method. The result event is notified to all registered
378 * {@link IEventListener} instances. The result does not need to set
379 * the source, frame or driving frame. Use <code>null</code> for no
380 * event to be raised.
381 * @see IEvent#getDrivingFrame()
382 */
383 protected abstract Result update(IEvent driverEvent,
384 List<IEvent> drivenEvents);
385
386 /**
387 * Get the driver source.
388 *
389 * @return the driver {@link IEventSource}
390 */
391 public final IEventSource getDriverSource()
392 {
393 return this.driverSource;
394 }
395
396 @Override
397 protected AsyncLog getLog()
398 {
399 return LOG;
400 }
401
402 @Override
403 protected void doStart()
404 {
405 // noop
406 }
407
408 /**
409 * Get the driven source.
410 *
411 * @return the driven {@link IEventSource}
412 */
413 public final IEventSource[] getDrivenSources()
414 {
415 return this.drivenSources;
416 }
417
418 /**
419 * Unregisters listeners from the context.
420 */
421 @Override
422 protected void doComponentDestroy()
423 {
424 this.driverSource.removeListener(this.driverEventListener);
425 for (IEventSource eventSource : drivenSources)
426 {
427 eventSource.removeListener(this.drivenEventListener);
428 }
429 this.driverEvents.clear();
430 }
431
432 /**
433 * The output from the processing of the driving and driven events handled
434 * by an {@link AsymmetricEventProcessor}.
435 * <p>
436 * The {@link #getFrame()} and {@link #getDrivingFrame()} will return the
437 * same frames as those of the event that caused the result to be generated
438 * from the {@link AsymmetricEventProcessor}.
439 *
440 * @author Ramon Servadei
441 *
442 */
443 public static class Result extends AbstractEvent
444 {
445
446 private final AsymmetricEventProcessor driver;
447
448 /**
449 * Construct with the driver of this result.
450 *
451 * @param driver
452 * the originating {@link AsymmetricEventProcessor} of this
453 * result
454 */
455 public Result(AsymmetricEventProcessor driver)
456 {
457 super();
458 this.driver = driver;
459 }
460
461 /**
462 * Get the driver of this result; the originating
463 * {@link AsymmetricEventProcessor}.
464 *
465 * @return the originating {@link AsymmetricEventProcessor} of this
466 * result
467 */
468 public AsymmetricEventProcessor getDriver()
469 {
470 return this.driver;
471 }
472 }
473
474 @Override
475 public String toDetailedString()
476 {
477 return toFormattedString(0);
478 }
479
480 private String toFormattedString(int level)
481 {
482 StringBuilder sb = new StringBuilder();
483 StringBuilder spacing = new StringBuilder();
484 for (int i = 0; i < level; i++)
485 {
486 spacing.append(SPACING_4_CHARS);
487 }
488 sb.append(SystemUtils.LINE_SEPARATOR).append(spacing).append(OPEN_CURLY).append(
489 getClass().getSimpleName()).append(SystemUtils.LINE_SEPARATOR);
490 sb.append(spacing).append(SPACING_4_CHARS).append("driver: ");
491 if (this.driverSource instanceof AsymmetricEventProcessor)
492 {
493 sb.append(((AsymmetricEventProcessor) this.driverSource).toFormattedString(level + 3));
494 }
495 else
496 {
497 sb.append(SystemUtils.LINE_SEPARATOR).append(spacing).append(
498 SPACING_4_CHARS).append(SPACING_4_CHARS).append(SPACING_4_CHARS);
499 sb.append(this.driverSource);
500 }
501 sb.append(SystemUtils.LINE_SEPARATOR);
502 if (this.drivenSources != null)
503 {
504 int count = 0;
505 for (IEventSource source : this.drivenSources)
506 {
507 count++;
508 if (count == 1)
509 {
510 sb.append(spacing).append(SPACING_4_CHARS).append(
511 "driven: ");
512 }
513 if (source instanceof AsymmetricEventProcessor)
514 {
515 sb.append(((AsymmetricEventProcessor) source).toFormattedString(level + 3));
516 }
517 else
518 {
519 sb.append(SystemUtils.LINE_SEPARATOR).append(spacing).append(
520 SPACING_4_CHARS).append(SPACING_4_CHARS).append(
521 SPACING_4_CHARS);
522 sb.append(source);
523 }
524 }
525 }
526 sb.append(SystemUtils.LINE_SEPARATOR).append(spacing).append(
527 CLOSE_CURLY);
528 return sb.toString();
529 }
530
531 @Override
532 public final String toString()
533 {
534 return string(this, "driver=" + this.driverSource == null ? "null"
535 : this.driverSource.toIdentityString() + ", driven="
536 + this.drivenSources == null ? "null"
537 : toIdentityString(this.drivenSources));
538 }
539
540 }