1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package fulmine.event;
17
18 import static fulmine.util.Utils.SPACING_4_CHARS;
19 import static fulmine.util.Utils.logException;
20 import static fulmine.util.Utils.safeToString;
21 import static fulmine.util.Utils.string;
22
23 import java.util.Arrays;
24 import java.util.List;
25
26 import org.apache.commons.lang.SystemUtils;
27
28 import fulmine.AbstractLifeCycle;
29 import fulmine.ILifeCycle;
30 import fulmine.context.FulmineContext;
31 import fulmine.context.IFulmineContext;
32 import fulmine.event.listener.IEventListener;
33 import fulmine.event.system.ISystemEvent;
34 import fulmine.event.system.ISystemEventListener;
35 import fulmine.event.system.UnsubscribeEvent;
36 import fulmine.util.concurrent.ITaskExecutor;
37 import fulmine.util.concurrent.ITaskHandler;
38 import fulmine.util.concurrent.Task;
39 import fulmine.util.concurrent.TaskExecutor;
40 import fulmine.util.log.AsyncLog;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 public final class EventProcessor extends AbstractLifeCycle implements
64 ILifeCycle, ITaskHandler<IEvent>
65 {
66 private final static AsyncLog LOG = new AsyncLog(EventProcessor.class);
67
68
69 private final ITaskExecutor executor;
70
71
72 private final IFulmineContext context;
73
74
75
76
77
78 private final static ThreadLocal<IEventFrameExecution> currentFrame =
79 new ThreadLocal<IEventFrameExecution>();
80
81
82
83
84
85
86 private final static class DestroyProcessorEvent extends AbstractEvent
87 implements ISystemEvent
88 {
89 public DestroyProcessorEvent()
90 {
91 super();
92 setSource(new EventSource("DestroyProcessorEventSource"));
93 }
94
95 }
96
97
98 private final static DestroyProcessorEvent shutdownEvent =
99 new DestroyProcessorEvent();
100
101
102 private final String toString;
103
104
105
106
107
108
109
110
111 public static final IEventFrameExecution getCurrentFrame()
112 {
113 return currentFrame.get();
114 }
115
116
117
118
119
120
121
122
123
124
125
126 public EventProcessor(String name, IFulmineContext context)
127 {
128 this(name, context.getEventProcessorThreadGroup(), context);
129 }
130
131
132
133
134
135
136
137
138
139
140
141
142
143 public EventProcessor(String name, ThreadGroup threadGroup,
144 IFulmineContext context)
145 {
146 super();
147 this.context = context;
148 this.executor = new TaskExecutor(threadGroup, name, context);
149 this.toString = string(this, name);
150 }
151
152 public void handleTask(IEvent task)
153 {
154 processEvent(task);
155 }
156
157 @Override
158 protected AsyncLog getLog()
159 {
160 return LOG;
161 }
162
163
164
165
166 @Override
167 protected void doDestroy()
168 {
169 queue(shutdownEvent);
170 }
171
172 @Override
173 protected void doStart()
174 {
175 this.executor.start();
176 }
177
178
179
180
181
182
183
184
185 private void processEvent(IEvent event)
186 {
187 try
188 {
189 if (event == null)
190 {
191 return;
192 }
193 if (event == EventProcessor.shutdownEvent)
194 {
195 this.executor.destroy();
196 }
197 currentFrame.set(event.getFrame());
198 List<IEventListener> listeners = event.getSource().getListeners();
199 if (listeners == null || listeners.size() == 0)
200 {
201 logNoListeners(event);
202 }
203 else
204 {
205 for (IEventListener eventListener : listeners)
206 {
207 if (eventListener == null)
208 {
209 continue;
210 }
211 try
212 {
213 if (event instanceof ISystemEvent)
214 {
215
216 if (eventListener instanceof ISystemEventListener)
217 {
218 filterEvent(event, eventListener);
219 }
220 else
221 {
222 if (getLog().isTraceEnabled())
223 {
224 getLog().trace(
225 "System event " + safeToString(event)
226 + " not being handled by "
227 + safeToString(eventListener));
228 }
229 }
230 }
231 else
232 {
233 filterEvent(event, eventListener);
234 }
235 }
236 catch (Exception e)
237 {
238 logException(getLog(), safeToString(eventListener)
239 + " handling " + safeToString(event), e);
240 }
241 }
242 }
243
244 if (event.getTriggerEvent() != null)
245 {
246 this.context.queueEvent(event.getTriggerEvent());
247 }
248 }
249 catch (Exception e)
250 {
251 logException(getLog(), safeToString(event), e);
252 }
253 finally
254 {
255 currentFrame.remove();
256 }
257 }
258
259
260
261
262
263
264
265 private void logNoListeners(IEvent event)
266 {
267 if (getLog().isDebugEnabled())
268 {
269 getLog().debug("No listeners for " + safeToString(event));
270 }
271 }
272
273
274
275
276
277
278
279
280
281
282 private void filterEvent(IEvent event, IEventListener eventListener)
283 {
284 final Class<? extends IEvent>[] eventFilter =
285 eventListener.getEventTypeFilter();
286 if (getLog().isTraceEnabled())
287 {
288 getLog().trace(
289 "Filtering event=" + safeToString(event) + " for listener="
290 + safeToString(eventListener) + " with filter="
291 + Arrays.deepToString(eventFilter));
292 }
293 if (eventFilter == null)
294 {
295 if (getLog().isInfoEnabled())
296 {
297 getLog().info(
298 "No event filter from " + safeToString(eventListener)
299 + ", this listener will not receive "
300 + event.toIdentityString());
301 }
302 return;
303 }
304 for (int i = 0; i < eventFilter.length; i++)
305 {
306 Class<? extends IEvent> eventType = eventFilter[i];
307 if (eventType.isInstance(event))
308 {
309 eventListener.update(event);
310 return;
311 }
312 }
313 }
314
315
316
317
318
319
320
321 public void queue(final IEvent event)
322 {
323 this.executor.execute(new Task<IEvent>(this, event));
324 }
325
326
327
328
329
330
331
332 public String getStackTrace()
333 {
334 final StackTraceElement[] stackTrace = this.executor.getStackTrace();
335 StringBuilder sb = new StringBuilder();
336 for (int i = 0; i < stackTrace.length; i++)
337 {
338 sb.append(SystemUtils.LINE_SEPARATOR);
339 sb.append(SPACING_4_CHARS).append(stackTrace[i]);
340 }
341 return sb.toString();
342 }
343
344 @Override
345 public String toString()
346 {
347 return toString;
348 }
349 }