View Javadoc

1   /*
2      Copyright 2007 Ramon Servadei
3   
4      Licensed under the Apache License, Version 2.0 (the "License");
5      you may not use this file except in compliance with the License.
6      You may obtain a copy of the License at
7   
8          http://www.apache.org/licenses/LICENSE-2.0
9   
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15   */
16  package fulmine.event;
17  
18  import static fulmine.util.Utils.SPACING_4_CHARS;
19  import static fulmine.util.Utils.logException;
20  import static fulmine.util.Utils.safeToString;
21  import static fulmine.util.Utils.string;
22  
23  import java.util.Arrays;
24  import java.util.List;
25  
26  import org.apache.commons.lang.SystemUtils;
27  
28  import fulmine.AbstractLifeCycle;
29  import fulmine.ILifeCycle;
30  import fulmine.context.FulmineContext;
31  import fulmine.context.IFulmineContext;
32  import fulmine.event.listener.IEventListener;
33  import fulmine.event.system.ISystemEvent;
34  import fulmine.event.system.ISystemEventListener;
35  import fulmine.event.system.UnsubscribeEvent;
36  import fulmine.util.concurrent.ITaskExecutor;
37  import fulmine.util.concurrent.ITaskHandler;
38  import fulmine.util.concurrent.Task;
39  import fulmine.util.concurrent.TaskExecutor;
40  import fulmine.util.log.AsyncLog;
41  
42  /**
43   * A single-thread execution engine that takes {@link IEvent} objects off a task
44   * queue, finds the {@link IEventListener} instances registered against the
45   * {@link IEventSource} that generated the event, determines if the event is
46   * compatible with the events the listener can accept (see
47   * {@link IEventListener#getEventTypeFilter()}) and calls
48   * {@link IEventListener#update(IEvent)} if the event is compatible.
49   * <p>
50   * On shutdown via the {@link #destroy()} method, a special event (
51   * {@link #shutdownEvent}) is added to the processor's {@link #events}. The
52   * processor continues to process its event queue after the destroy method but
53   * when the processor encounters the shutdown event, it shuts down and the
54   * processor thread ceases execution. This technique allows the event queue to
55   * be drained before the processor terminates. There may be some situations
56   * where events that are generated during the context shutdown need to be
57   * processed (e.g. {@link UnsubscribeEvent}s may need to be handled by closing
58   * external resources gracefully) and this delayed shutdown allows this to be
59   * achieved.
60   * 
61   * @author Ramon Servadei
62   */
63  public final class EventProcessor extends AbstractLifeCycle implements
64      ILifeCycle, ITaskHandler<IEvent>
65  {
66      private final static AsyncLog LOG = new AsyncLog(EventProcessor.class);
67  
68      /** The active component that performs the work */
69      private final ITaskExecutor executor;
70  
71      /** The context this processor is associated with */
72      private final IFulmineContext context;
73  
74      /**
75       * Provides thread-bound access to the frame identifier of the event the
76       * processor is handling.
77       */
78      private final static ThreadLocal<IEventFrameExecution> currentFrame =
79          new ThreadLocal<IEventFrameExecution>();
80  
81      /**
82       * A marker event to destroy an {@link EventProcessor}
83       * 
84       * @author Ramon Servadei
85       */
86      private final static class DestroyProcessorEvent extends AbstractEvent
87          implements ISystemEvent
88      {
89          public DestroyProcessorEvent()
90          {
91              super();
92              setSource(new EventSource("DestroyProcessorEventSource"));
93          }
94  
95      }
96  
97      /** The event that signals an {@link EventProcessor} to shutdown */
98      private final static DestroyProcessorEvent shutdownEvent =
99          new DestroyProcessorEvent();
100 
101     /** The string description */
102     private final String toString;
103 
104     /**
105      * Get the frame identifier of the event the processor is currently
106      * handling. This is thread-bound.
107      * 
108      * @return the {@link IEventFrameExecution} of the event that is being
109      *         processed.
110      */
111     public static final IEventFrameExecution getCurrentFrame()
112     {
113         return currentFrame.get();
114     }
115 
116     /**
117      * Standard constructor for an event processor backed by a standard priority
118      * thread. The thread is held in the {@link FulmineContext} normal priority
119      * thread group.
120      * 
121      * @param name
122      *            the name of the thread for the processor
123      * @param context
124      *            the context this processor is associated with
125      */
126     public EventProcessor(String name, IFulmineContext context)
127     {
128         this(name, context.getEventProcessorThreadGroup(), context);
129     }
130 
131     /**
132      * Constructor for an event processor backed by a thread of priority equal
133      * to the maximum priority of the thread group argument
134      * 
135      * @param name
136      *            the name of the thread for the processor
137      * @param threadGroup
138      *            the thread group for the thread, this also defines the
139      *            priority of the thread
140      * @param context
141      *            the context this processor is associated with
142      */
143     public EventProcessor(String name, ThreadGroup threadGroup,
144         IFulmineContext context)
145     {
146         super();
147         this.context = context;
148         this.executor = new TaskExecutor(threadGroup, name, context);
149         this.toString = string(this, name);
150     }
151 
152     public void handleTask(IEvent task)
153     {
154         processEvent(task);
155     }
156 
157     @Override
158     protected AsyncLog getLog()
159     {
160         return LOG;
161     }
162 
163     /**
164      * Destroy the processor.
165      */
166     @Override
167     protected void doDestroy()
168     {
169         queue(shutdownEvent);
170     }
171 
172     @Override
173     protected void doStart()
174     {
175         this.executor.start();
176     }
177 
178     /**
179      * Process the event by notifying all {@link IEventListener} instances
180      * registered against the {@link IEventSource} of the event.
181      * 
182      * @param event
183      *            the event
184      */
185     private void processEvent(IEvent event)
186     {
187         try
188         {
189             if (event == null)
190             {
191                 return;
192             }
193             if (event == EventProcessor.shutdownEvent)
194             {
195                 this.executor.destroy();
196             }
197             currentFrame.set(event.getFrame());
198             List<IEventListener> listeners = event.getSource().getListeners();
199             if (listeners == null || listeners.size() == 0)
200             {
201                 logNoListeners(event);
202             }
203             else
204             {
205                 for (IEventListener eventListener : listeners)
206                 {
207                     if (eventListener == null)
208                     {
209                         continue;
210                     }
211                     try
212                     {
213                         if (event instanceof ISystemEvent)
214                         {
215                             // only system event listeners can consume these
216                             if (eventListener instanceof ISystemEventListener)
217                             {
218                                 filterEvent(event, eventListener);
219                             }
220                             else
221                             {
222                                 if (getLog().isTraceEnabled())
223                                 {
224                                     getLog().trace(
225                                         "System event " + safeToString(event)
226                                             + " not being handled by "
227                                             + safeToString(eventListener));
228                                 }
229                             }
230                         }
231                         else
232                         {
233                             filterEvent(event, eventListener);
234                         }
235                     }
236                     catch (Exception e)
237                     {
238                         logException(getLog(), safeToString(eventListener)
239                             + " handling " + safeToString(event), e);
240                     }
241                 }
242             }
243             // now queue any trigger event
244             if (event.getTriggerEvent() != null)
245             {
246                 this.context.queueEvent(event.getTriggerEvent());
247             }
248         }
249         catch (Exception e)
250         {
251             logException(getLog(), safeToString(event), e);
252         }
253         finally
254         {
255             currentFrame.remove();
256         }
257     }
258 
259     /**
260      * Helper method to log the fact that there are no listeners for the event
261      * 
262      * @param event
263      *            the event for which there are no listeners
264      */
265     private void logNoListeners(IEvent event)
266     {
267         if (getLog().isDebugEnabled())
268         {
269             getLog().debug("No listeners for " + safeToString(event));
270         }
271     }
272 
273     /**
274      * Check if the event listener can handle the event, if it can then let the
275      * listener handle it.
276      * 
277      * @param event
278      *            the event, never <code>null</code>
279      * @param eventListener
280      *            the event listener
281      */
282     private void filterEvent(IEvent event, IEventListener eventListener)
283     {
284         final Class<? extends IEvent>[] eventFilter =
285             eventListener.getEventTypeFilter();
286         if (getLog().isTraceEnabled())
287         {
288             getLog().trace(
289                 "Filtering event=" + safeToString(event) + " for listener="
290                     + safeToString(eventListener) + " with filter="
291                     + Arrays.deepToString(eventFilter));
292         }
293         if (eventFilter == null)
294         {
295             if (getLog().isInfoEnabled())
296             {
297                 getLog().info(
298                     "No event filter from " + safeToString(eventListener)
299                         + ", this listener will not receive "
300                         + event.toIdentityString());
301             }
302             return;
303         }
304         for (int i = 0; i < eventFilter.length; i++)
305         {
306             Class<? extends IEvent> eventType = eventFilter[i];
307             if (eventType.isInstance(event))
308             {
309                 eventListener.update(event);
310                 return;
311             }
312         }
313     }
314 
315     /**
316      * Add the event onto the processor's queue
317      * 
318      * @param event
319      *            the event to add the queue
320      */
321     public void queue(final IEvent event)
322     {
323         this.executor.execute(new Task<IEvent>(this, event));
324     }
325 
326     /**
327      * Get the current stack trace for the {@link EventProcessor}
328      * 
329      * @return a formatted {@link String} of the current stack trace with each
330      *         stack call on a new line
331      */
332     public String getStackTrace()
333     {
334         final StackTraceElement[] stackTrace = this.executor.getStackTrace();
335         StringBuilder sb = new StringBuilder();
336         for (int i = 0; i < stackTrace.length; i++)
337         {
338             sb.append(SystemUtils.LINE_SEPARATOR);
339             sb.append(SPACING_4_CHARS).append(stackTrace[i]);
340         }
341         return sb.toString();
342     }
343 
344     @Override
345     public String toString()
346     {
347         return toString;
348     }
349 }