1 /*
2 Copyright 2007 Ramon Servadei
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16 package fulmine.event;
17
18 import static fulmine.util.Utils.SPACING_4_CHARS;
19 import static fulmine.util.Utils.logException;
20 import static fulmine.util.Utils.safeToString;
21 import static fulmine.util.Utils.string;
22
23 import java.util.Arrays;
24 import java.util.List;
25
26 import org.apache.commons.lang.SystemUtils;
27
28 import fulmine.AbstractLifeCycle;
29 import fulmine.ILifeCycle;
30 import fulmine.context.FulmineContext;
31 import fulmine.context.IFulmineContext;
32 import fulmine.event.listener.IEventListener;
33 import fulmine.event.system.ISystemEvent;
34 import fulmine.event.system.ISystemEventListener;
35 import fulmine.event.system.UnsubscribeEvent;
36 import fulmine.util.concurrent.ITaskExecutor;
37 import fulmine.util.concurrent.ITaskHandler;
38 import fulmine.util.concurrent.Task;
39 import fulmine.util.concurrent.TaskExecutor;
40 import fulmine.util.log.AsyncLog;
41
42 /**
43 * A single-thread execution engine that takes {@link IEvent} objects off a task
44 * queue, finds the {@link IEventListener} instances registered against the
45 * {@link IEventSource} that generated the event, determines if the event is
46 * compatible with the events the listener can accept (see
47 * {@link IEventListener#getEventTypeFilter()}) and calls
48 * {@link IEventListener#update(IEvent)} if the event is compatible.
49 * <p>
50 * On shutdown via the {@link #destroy()} method, a special event (
51 * {@link #shutdownEvent}) is added to the processor's {@link #events}. The
52 * processor continues to process its event queue after the destroy method but
53 * when the processor encounters the shutdown event, it shuts down and the
54 * processor thread ceases execution. This technique allows the event queue to
55 * be drained before the processor terminates. There may be some situations
56 * where events that are generated during the context shutdown need to be
57 * processed (e.g. {@link UnsubscribeEvent}s may need to be handled by closing
58 * external resources gracefully) and this delayed shutdown allows this to be
59 * achieved.
60 *
61 * @author Ramon Servadei
62 */
63 public final class EventProcessor extends AbstractLifeCycle implements
64 ILifeCycle, ITaskHandler<IEvent>
65 {
66 private final static AsyncLog LOG = new AsyncLog(EventProcessor.class);
67
68 /** The active component that performs the work */
69 private final ITaskExecutor executor;
70
71 /** The context this processor is associated with */
72 private final IFulmineContext context;
73
74 /**
75 * Provides thread-bound access to the frame identifier of the event the
76 * processor is handling.
77 */
78 private final static ThreadLocal<IEventFrameExecution> currentFrame =
79 new ThreadLocal<IEventFrameExecution>();
80
81 /**
82 * A marker event to destroy an {@link EventProcessor}
83 *
84 * @author Ramon Servadei
85 */
86 private final static class DestroyProcessorEvent extends AbstractEvent
87 implements ISystemEvent
88 {
89 public DestroyProcessorEvent()
90 {
91 super();
92 setSource(new EventSource("DestroyProcessorEventSource"));
93 }
94
95 }
96
97 /** The event that signals an {@link EventProcessor} to shutdown */
98 private final static DestroyProcessorEvent shutdownEvent =
99 new DestroyProcessorEvent();
100
101 /** The string description */
102 private final String toString;
103
104 /**
105 * Get the frame identifier of the event the processor is currently
106 * handling. This is thread-bound.
107 *
108 * @return the {@link IEventFrameExecution} of the event that is being
109 * processed.
110 */
111 public static final IEventFrameExecution getCurrentFrame()
112 {
113 return currentFrame.get();
114 }
115
116 /**
117 * Standard constructor for an event processor backed by a standard priority
118 * thread. The thread is held in the {@link FulmineContext} normal priority
119 * thread group.
120 *
121 * @param name
122 * the name of the thread for the processor
123 * @param context
124 * the context this processor is associated with
125 */
126 public EventProcessor(String name, IFulmineContext context)
127 {
128 this(name, context.getEventProcessorThreadGroup(), context);
129 }
130
131 /**
132 * Constructor for an event processor backed by a thread of priority equal
133 * to the maximum priority of the thread group argument
134 *
135 * @param name
136 * the name of the thread for the processor
137 * @param threadGroup
138 * the thread group for the thread, this also defines the
139 * priority of the thread
140 * @param context
141 * the context this processor is associated with
142 */
143 public EventProcessor(String name, ThreadGroup threadGroup,
144 IFulmineContext context)
145 {
146 super();
147 this.context = context;
148 this.executor = new TaskExecutor(threadGroup, name, context);
149 this.toString = string(this, name);
150 }
151
152 public void handleTask(IEvent task)
153 {
154 processEvent(task);
155 }
156
157 @Override
158 protected AsyncLog getLog()
159 {
160 return LOG;
161 }
162
163 /**
164 * Destroy the processor.
165 */
166 @Override
167 protected void doDestroy()
168 {
169 queue(shutdownEvent);
170 }
171
172 @Override
173 protected void doStart()
174 {
175 this.executor.start();
176 }
177
178 /**
179 * Process the event by notifying all {@link IEventListener} instances
180 * registered against the {@link IEventSource} of the event.
181 *
182 * @param event
183 * the event
184 */
185 private void processEvent(IEvent event)
186 {
187 try
188 {
189 if (event == null)
190 {
191 return;
192 }
193 if (event == EventProcessor.shutdownEvent)
194 {
195 this.executor.destroy();
196 }
197 currentFrame.set(event.getFrame());
198 List<IEventListener> listeners = event.getSource().getListeners();
199 if (listeners == null || listeners.size() == 0)
200 {
201 logNoListeners(event);
202 }
203 else
204 {
205 for (IEventListener eventListener : listeners)
206 {
207 if (eventListener == null)
208 {
209 continue;
210 }
211 try
212 {
213 if (event instanceof ISystemEvent)
214 {
215 // only system event listeners can consume these
216 if (eventListener instanceof ISystemEventListener)
217 {
218 filterEvent(event, eventListener);
219 }
220 else
221 {
222 if (getLog().isTraceEnabled())
223 {
224 getLog().trace(
225 "System event " + safeToString(event)
226 + " not being handled by "
227 + safeToString(eventListener));
228 }
229 }
230 }
231 else
232 {
233 filterEvent(event, eventListener);
234 }
235 }
236 catch (Exception e)
237 {
238 logException(getLog(), safeToString(eventListener)
239 + " handling " + safeToString(event), e);
240 }
241 }
242 }
243 // now queue any trigger event
244 if (event.getTriggerEvent() != null)
245 {
246 this.context.queueEvent(event.getTriggerEvent());
247 }
248 }
249 catch (Exception e)
250 {
251 logException(getLog(), safeToString(event), e);
252 }
253 finally
254 {
255 currentFrame.remove();
256 }
257 }
258
259 /**
260 * Helper method to log the fact that there are no listeners for the event
261 *
262 * @param event
263 * the event for which there are no listeners
264 */
265 private void logNoListeners(IEvent event)
266 {
267 if (getLog().isDebugEnabled())
268 {
269 getLog().debug("No listeners for " + safeToString(event));
270 }
271 }
272
273 /**
274 * Check if the event listener can handle the event, if it can then let the
275 * listener handle it.
276 *
277 * @param event
278 * the event, never <code>null</code>
279 * @param eventListener
280 * the event listener
281 */
282 private void filterEvent(IEvent event, IEventListener eventListener)
283 {
284 final Class<? extends IEvent>[] eventFilter =
285 eventListener.getEventTypeFilter();
286 if (getLog().isTraceEnabled())
287 {
288 getLog().trace(
289 "Filtering event=" + safeToString(event) + " for listener="
290 + safeToString(eventListener) + " with filter="
291 + Arrays.deepToString(eventFilter));
292 }
293 if (eventFilter == null)
294 {
295 if (getLog().isInfoEnabled())
296 {
297 getLog().info(
298 "No event filter from " + safeToString(eventListener)
299 + ", this listener will not receive "
300 + event.toIdentityString());
301 }
302 return;
303 }
304 for (int i = 0; i < eventFilter.length; i++)
305 {
306 Class<? extends IEvent> eventType = eventFilter[i];
307 if (eventType.isInstance(event))
308 {
309 eventListener.update(event);
310 return;
311 }
312 }
313 }
314
315 /**
316 * Add the event onto the processor's queue
317 *
318 * @param event
319 * the event to add the queue
320 */
321 public void queue(final IEvent event)
322 {
323 this.executor.execute(new Task<IEvent>(this, event));
324 }
325
326 /**
327 * Get the current stack trace for the {@link EventProcessor}
328 *
329 * @return a formatted {@link String} of the current stack trace with each
330 * stack call on a new line
331 */
332 public String getStackTrace()
333 {
334 final StackTraceElement[] stackTrace = this.executor.getStackTrace();
335 StringBuilder sb = new StringBuilder();
336 for (int i = 0; i < stackTrace.length; i++)
337 {
338 sb.append(SystemUtils.LINE_SEPARATOR);
339 sb.append(SPACING_4_CHARS).append(stackTrace[i]);
340 }
341 return sb.toString();
342 }
343
344 @Override
345 public String toString()
346 {
347 return toString;
348 }
349 }