View Javadoc

1   /*
2      Copyright 2007 Ramon Servadei
3   
4      Licensed under the Apache License, Version 2.0 (the "License");
5      you may not use this file except in compliance with the License.
6      You may obtain a copy of the License at
7   
8          http://www.apache.org/licenses/LICENSE-2.0
9   
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15   */
16  package fulmine.event.listener;
17  
18  import static fulmine.util.Utils.CLOSE_CURLY;
19  import static fulmine.util.Utils.CLOSE_SQUARE;
20  import static fulmine.util.Utils.COMMA_SPACE;
21  import static fulmine.util.Utils.OPEN_CURLY;
22  import static fulmine.util.Utils.OPEN_SQUARE;
23  import static fulmine.util.Utils.SPACING_4_CHARS;
24  import static fulmine.util.Utils.logException;
25  import static fulmine.util.Utils.nullCheck;
26  import static fulmine.util.Utils.string;
27  
28  import java.util.List;
29  import java.util.Map;
30  
31  import org.apache.commons.lang.SystemUtils;
32  
33  import fulmine.IDescriptor;
34  import fulmine.ILifeCycle;
35  import fulmine.context.FulmineContext;
36  import fulmine.event.AbstractEvent;
37  import fulmine.event.EventProcessor;
38  import fulmine.event.EventSource;
39  import fulmine.event.IEvent;
40  import fulmine.event.IEventFrame;
41  import fulmine.event.IEventFrameExecution;
42  import fulmine.event.IEventManager;
43  import fulmine.event.IEventSource;
44  import fulmine.util.collection.CollectionFactory;
45  import fulmine.util.log.AsyncLog;
46  
47  /**
48   * <b>This class must be started by calling {@link #start()}.</b>
49   * <p>
50   * Processes the events from multiple sources as a single unit of work. The
51   * sources must have a unidirectional relationship; there is 1 driver source and
52   * 1 or more driven sources. The driver source generates an event that is
53   * consumed by the driven sources that then produce events in response. The
54   * driver event and driven events will occur in separate frames; the 'asymmetry'
55   * is in the events being generated in these separate frames (see
56   * {@link IEventFrame} for a description of frames). The diagram below helps to
57   * illustrate this.
58   * 
59   * <pre>
60   *       Source A (generates event A1)
61   *          |           
62   *          |
63   *    ______|_______
64   *    |            |
65   *    |            |
66   *    |            v    
67   *    |         Source B (consumes A1 and generates event B1)
68   *    |            |    
69   *    |            |
70   *    ---&gt; this &lt;---
71   *    (consumes A1 and B1 only when they are both available)
72   * </pre>
73   * 
74   * This class handles the co-ordination such that event B1 from source B is
75   * processed with event A1 from source A even though event A1 and B1 occur in
76   * separate frames.
77   * <p>
78   * Source A is the 'driver source', source B is the 'driven source'. There can
79   * be multiple driven sources, but only 1 driver source. This driving
80   * relationship, coupled with:
81   * <ul>
82   * <li>the guaranteed event ordering of the {@link FulmineContext}
83   * <li>the guaranteed association of an {@link EventProcessor} to an
84   * {@link IEventSource}
85   * <li>the guarantee that {@link IPriorityEventListener} instances are processed
86   * before other listeners
87   * </ul>
88   * ensures that A1 is received before B1. This listener uses these mechanisms to
89   * cache the 'driver' events in preparation for receiving the 'driven' events.
90   * The events are associated with each other using the driver event's
91   * {@link IEvent#getFrame()} and the driven event's
92   * {@link IEvent#getDrivingFrame()}.
93   * <p>
94   * {@link AsymmetricEventProcessor} instances can be chained together to form
95   * arbitrarily complex event propagation listener matrices. This class was
96   * principally designed to handle the so-called "diamond" observer construct:
97   * 
98   * <pre>
99   *        Node A
100  *          |           
101  *    ______|_______
102  *    |            |
103  *    |            |
104  *    v            v    
105  *  Node B       Node C
106  *    |            |
107  *    |            |
108  *    ---&gt; this &lt;---
109  * </pre>
110  * 
111  * @author Ramon Servadei
112  */
113 public abstract class AsymmetricEventProcessor extends EventSource implements
114     IEventSource, ILifeCycle
115 {
116     private final static AsyncLog LOG =
117         new AsyncLog(AsymmetricEventProcessor.class);
118 
119     /**
120      * The maximum number of driver events that are awaiting driven events
121      * before a message is logged.
122      */
123     private static final int DRIVER_THRESHOLD = 3;
124 
125     /** The driver source */
126     private final IEventSource driverSource;
127 
128     /** The driven sources */
129     private final IEventSource[] drivenSources;
130 
131     /**
132      * The driver events that are stored against their {@link IEvent#getFrame()}
133      * in preparation for a matching driven events (matched using the driven
134      * event's {@link IEvent#getDrivingFrame()}).
135      */
136     protected final Map<IEventFrameExecution, IEvent> driverEvents =
137         CollectionFactory.newMap();
138 
139     /**
140      * The driven events stored per driving frame.
141      */
142     protected final Map<IEventFrameExecution, List<IEvent>> drivenEvents =
143         CollectionFactory.newMap();
144 
145     /** The context this listener will use */
146     private final IEventManager context;
147 
148     /**
149      * The event filter for driver events, defines the event types from the
150      * driver source that are processed.
151      */
152     private final Class<? extends IEvent>[] driverEventTypeFilter;
153 
154     /**
155      * The event filter for driven events, defines the event types from the
156      * driven sources that are processed.
157      */
158     private final Class<? extends IEvent>[] drivenEventTypeFilter;
159 
160     /** The listener for the events from the driver source */
161     final IEventListener driverEventListener;
162 
163     /** The listener for the events from the driven sources */
164     final IEventListener drivenEventListener;
165 
166     /**
167      * Handles the events from the driver sources or driven sources. This class
168      * provides a parameterisable event type filter.
169      * 
170      * @author Ramon Servadei
171      */
172     private class EventListener implements IPriorityEventListener
173     {
174         private final Class<? extends IEvent>[] eventTypeFilter;
175 
176         public EventListener(Class<? extends IEvent>[] eventTypeFilter)
177         {
178             super();
179             this.eventTypeFilter = eventTypeFilter;
180         }
181 
182         public void addedAsListenerFor(IEventSource source)
183         {
184             // noop
185         }
186 
187         public Class<? extends IEvent>[] getEventTypeFilter()
188         {
189             return this.eventTypeFilter;
190         }
191 
192         public void removedAsListenerFrom(IEventSource source)
193         {
194             // noop
195         }
196 
197         public void update(IEvent event)
198         {
199             AsymmetricEventProcessor.this.update(event);
200         }
201     }
202 
203     /**
204      * Construct the processor for the asymmetric event driving relationship.
205      * 
206      * @param context
207      *            the context this processor should use
208      * @param driverEventTypeFilter
209      *            The event filter for driver events, defines the event types
210      *            from the driver source that are processed
211      * @param driverSource
212      *            the driver {@link IEventSource}
213      * @param drivenEventTypeFilter
214      *            The event filter for driven events, defines the event types
215      *            from the driven sources that are processed
216      * @param drivenSources
217      *            the driven {@link IEventSource} objects that will generate
218      *            events as a result of the driver source events.
219      */
220     public AsymmetricEventProcessor(IEventManager context,
221         Class<? extends IEvent>[] driverEventTypeFilter,
222         IEventSource driverSource,
223         Class<? extends IEvent>[] drivenEventTypeFilter,
224         IEventSource... drivenSources)
225     {
226         super(OPEN_CURLY + AsymmetricEventProcessor.class.getSimpleName()
227             + " driver=" + driverSource.toIdentityString() + ", driven="
228             + toIdentityString(drivenSources) + CLOSE_CURLY);
229         nullCheck(driverSource, "Cannot have a null driver source");
230         this.driverEventTypeFilter = driverEventTypeFilter;
231         this.drivenEventTypeFilter = drivenEventTypeFilter;
232         this.driverSource = driverSource;
233         this.drivenSources = drivenSources;
234         this.context = context;
235         this.driverEventListener =
236             new EventListener(this.driverEventTypeFilter);
237         this.driverSource.addListener(this.driverEventListener);
238         this.drivenEventListener =
239             new EventListener(this.drivenEventTypeFilter);
240         for (IEventSource eventSource : this.drivenSources)
241         {
242             try
243             {
244                 eventSource.addListener(this.drivenEventListener);
245             }
246             catch (Exception e)
247             {
248                 logException(getLog(), eventSource, e);
249             }
250         }
251     }
252 
253     /**
254      * Get the identity string for each element of the array
255      * 
256      * @param descriptors
257      *            the array of {@link IDescriptor} elements
258      * @return a string with the identity string of each element
259      */
260     private static String toIdentityString(IDescriptor[] descriptors)
261     {
262         StringBuilder sb = new StringBuilder();
263         sb.append(OPEN_SQUARE);
264         if (descriptors != null && descriptors.length > 0)
265         {
266             if (descriptors[0] != null)
267             {
268                 sb.append(descriptors[0].toIdentityString());
269             }
270             for (int i = 1; i < descriptors.length; i++)
271             {
272                 IDescriptor descriptor = descriptors[i];
273                 if (descriptor != null)
274                 {
275                     sb.append(COMMA_SPACE).append(descriptor.toIdentityString());
276                 }
277             }
278         }
279         sb.append(CLOSE_SQUARE);
280         return sb.toString();
281     }
282 
283     private synchronized final void update(IEvent event)
284     {
285         if (isActive())
286         {
287             // compare object reference
288             if (this.driverSource == event.getSource())
289             {
290                 this.driverEvents.put(event.getFrame(), event);
291                 // log if there are too many driver events (this can indicate
292                 // slow driven event sources)
293                 if (this.driverEvents.size() > AsymmetricEventProcessor.DRIVER_THRESHOLD)
294                 {
295                     if (getLog().isInfoEnabled())
296                     {
297                         getLog().info(
298                             "Driver events still awaiting driven events: "
299                                 + this.driverEvents + " from " + getIdentity());
300                     }
301                 }
302             }
303             else
304             {
305                 if (!this.drivenEvents.containsKey(event.getDrivingFrame()))
306                 {
307                     final List<IEvent> newList =
308                         CollectionFactory.newList(this.drivenSources.length);
309                     this.drivenEvents.put(event.getDrivingFrame(), newList);
310                 }
311                 final List<IEvent> receivedDrivenEvents =
312                     this.drivenEvents.get(event.getDrivingFrame());
313                 receivedDrivenEvents.add(event);
314                 // are all driven events available for the driver?
315                 if (receivedDrivenEvents.size() == this.drivenSources.length)
316                 {
317                     this.drivenEvents.remove(event.getDrivingFrame());
318                     final IEvent driverEvent =
319                         this.driverEvents.remove(event.getDrivingFrame());
320                     if (driverEvent == null)
321                     {
322                         if (getLog().isInfoEnabled())
323                         {
324                             getLog().info("No driver event for " + event);
325                         }
326                     }
327                     else
328                     {
329                         triggerUpdate(event, receivedDrivenEvents, driverEvent);
330                     }
331                 }
332             }
333         }
334         else
335         {
336             throw new IllegalStateException("Inactive listener " + this
337                 + " received event " + event);
338         }
339     }
340 
341     void triggerUpdate(IEvent event, final List<IEvent> receivedDrivenEvents,
342         final IEvent driverEvent)
343     {
344         final Result result = update(driverEvent, receivedDrivenEvents);
345         if (result != null)
346         {
347             result.setDrivingFrame(event.getDrivingFrame());
348             result.setFrame(event.getFrame());
349             result.setSource(this);
350             /*
351              * Raise an event with the same driving frame configuration. This
352              * allows chained processors to process events within the same
353              * driving frame.
354              */
355             this.context.queueEvent(result);
356         }
357     }
358 
359     /**
360      * Process the driver and driven {@link IEvent} objects generated by the
361      * driver and driven {@link IEventSource} objects. These events are linked
362      * by the driver event's frame.
363      * <p>
364      * The order of the driven events in this method <b>is not</b> guaranteed to
365      * be the same as that of the listeners used in the constructor; the order
366      * depends on which driven event arrives first.
367      * 
368      * @param driverEvent
369      *            the driver {@link IEvent}
370      * @param drivenEvents
371      *            the list of driven {@link IEvent} objects caused by the driver
372      *            event. The order of the events in the list <b>is not</b>
373      *            guaranteed to be the same as the listeners used in the
374      *            constructor; the order depends on which driven event arrived
375      *            first.
376      * @return a result ({@link IEvent} subclass) that represents the output of
377      *         this method. The result event is notified to all registered
378      *         {@link IEventListener} instances. The result does not need to set
379      *         the source, frame or driving frame. Use <code>null</code> for no
380      *         event to be raised.
381      * @see IEvent#getDrivingFrame()
382      */
383     protected abstract Result update(IEvent driverEvent,
384         List<IEvent> drivenEvents);
385 
386     /**
387      * Get the driver source.
388      * 
389      * @return the driver {@link IEventSource}
390      */
391     public final IEventSource getDriverSource()
392     {
393         return this.driverSource;
394     }
395 
396     @Override
397     protected AsyncLog getLog()
398     {
399         return LOG;
400     }
401 
402     @Override
403     protected void doStart()
404     {
405         // noop
406     }
407 
408     /**
409      * Get the driven source.
410      * 
411      * @return the driven {@link IEventSource}
412      */
413     public final IEventSource[] getDrivenSources()
414     {
415         return this.drivenSources;
416     }
417 
418     /**
419      * Unregisters listeners from the context.
420      */
421     @Override
422     protected void doComponentDestroy()
423     {
424         this.driverSource.removeListener(this.driverEventListener);
425         for (IEventSource eventSource : drivenSources)
426         {
427             eventSource.removeListener(this.drivenEventListener);
428         }
429         this.driverEvents.clear();
430     }
431 
432     /**
433      * The output from the processing of the driving and driven events handled
434      * by an {@link AsymmetricEventProcessor}.
435      * <p>
436      * The {@link #getFrame()} and {@link #getDrivingFrame()} will return the
437      * same frames as those of the event that caused the result to be generated
438      * from the {@link AsymmetricEventProcessor}.
439      * 
440      * @author Ramon Servadei
441      * 
442      */
443     public static class Result extends AbstractEvent
444     {
445 
446         private final AsymmetricEventProcessor driver;
447 
448         /**
449          * Construct with the driver of this result.
450          * 
451          * @param driver
452          *            the originating {@link AsymmetricEventProcessor} of this
453          *            result
454          */
455         public Result(AsymmetricEventProcessor driver)
456         {
457             super();
458             this.driver = driver;
459         }
460 
461         /**
462          * Get the driver of this result; the originating
463          * {@link AsymmetricEventProcessor}.
464          * 
465          * @return the originating {@link AsymmetricEventProcessor} of this
466          *         result
467          */
468         public AsymmetricEventProcessor getDriver()
469         {
470             return this.driver;
471         }
472     }
473 
474     @Override
475     public String toDetailedString()
476     {
477         return toFormattedString(0);
478     }
479 
480     private String toFormattedString(int level)
481     {
482         StringBuilder sb = new StringBuilder();
483         StringBuilder spacing = new StringBuilder();
484         for (int i = 0; i < level; i++)
485         {
486             spacing.append(SPACING_4_CHARS);
487         }
488         sb.append(SystemUtils.LINE_SEPARATOR).append(spacing).append(OPEN_CURLY).append(
489             getClass().getSimpleName()).append(SystemUtils.LINE_SEPARATOR);
490         sb.append(spacing).append(SPACING_4_CHARS).append("driver: ");
491         if (this.driverSource instanceof AsymmetricEventProcessor)
492         {
493             sb.append(((AsymmetricEventProcessor) this.driverSource).toFormattedString(level + 3));
494         }
495         else
496         {
497             sb.append(SystemUtils.LINE_SEPARATOR).append(spacing).append(
498                 SPACING_4_CHARS).append(SPACING_4_CHARS).append(SPACING_4_CHARS);
499             sb.append(this.driverSource);
500         }
501         sb.append(SystemUtils.LINE_SEPARATOR);
502         if (this.drivenSources != null)
503         {
504             int count = 0;
505             for (IEventSource source : this.drivenSources)
506             {
507                 count++;
508                 if (count == 1)
509                 {
510                     sb.append(spacing).append(SPACING_4_CHARS).append(
511                         "driven: ");
512                 }
513                 if (source instanceof AsymmetricEventProcessor)
514                 {
515                     sb.append(((AsymmetricEventProcessor) source).toFormattedString(level + 3));
516                 }
517                 else
518                 {
519                     sb.append(SystemUtils.LINE_SEPARATOR).append(spacing).append(
520                         SPACING_4_CHARS).append(SPACING_4_CHARS).append(
521                         SPACING_4_CHARS);
522                     sb.append(source);
523                 }
524             }
525         }
526         sb.append(SystemUtils.LINE_SEPARATOR).append(spacing).append(
527             CLOSE_CURLY);
528         return sb.toString();
529     }
530 
531     @Override
532     public final String toString()
533     {
534         return string(this, "driver=" + this.driverSource == null ? "null"
535             : this.driverSource.toIdentityString() + ", driven="
536                 + this.drivenSources == null ? "null"
537                 : toIdentityString(this.drivenSources));
538     }
539 
540 }