1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package fulmine.context;
17
18 import static fulmine.util.Utils.COLON;
19 import static fulmine.util.Utils.SPACING_4_CHARS;
20 import static fulmine.util.Utils.logException;
21
22 import java.util.Collection;
23 import java.util.TimerTask;
24
25 import org.apache.commons.lang.SystemUtils;
26
27 import fulmine.AbstractLifeCycle;
28 import fulmine.event.EventProcessor;
29 import fulmine.event.IEvent;
30 import fulmine.event.IEventManager;
31 import fulmine.event.IEventSource;
32 import fulmine.event.system.ISystemEvent;
33 import fulmine.event.system.ISystemEventSource;
34 import fulmine.event.system.SystemEventSource;
35 import fulmine.model.container.IContainer;
36 import fulmine.model.field.IField;
37 import fulmine.util.concurrent.ITaskExecutor;
38 import fulmine.util.concurrent.TaskExecutor;
39 import fulmine.util.log.AsyncLog;
40 import fulmine.util.reference.AutoCreatingStore;
41 import fulmine.util.reference.IAutoCreatingStore;
42 import fulmine.util.reference.IObjectBuilder;
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 final class EventManager extends AbstractLifeCycle implements IEventManager
64 {
65 private final static AsyncLog LOG = new AsyncLog(EventManager.class);
66
67
68 static final String PROCESSOR_BASENAME =
69 IFrameworkContext.NAME + "EventProcessor";
70
71
72
73
74
75
76
77 @SuppressWarnings("unchecked")
78 private static final class SystemEventSourceBuilder implements
79 IObjectBuilder<Class, ISystemEventSource>
80 {
81 public ISystemEventSource create(Class key)
82 {
83 final SystemEventSource systemEventSource =
84 new SystemEventSource(key.getSimpleName());
85 systemEventSource.start();
86 return systemEventSource;
87 }
88 }
89
90
91 final ThreadGroup eventProcessorThreadGroup;
92
93
94 protected final EventProcessor[] processors;
95
96
97 private final ITaskExecutor executor;
98
99 private final byte availableProcessors;
100
101
102 @SuppressWarnings("unchecked")
103 private final IAutoCreatingStore<Class, ISystemEventSource> eventSources;
104
105
106
107
108
109
110
111
112
113
114
115
116
117 @SuppressWarnings("unchecked")
118 EventManager(final String identity, byte eventProcessorCount,
119 IFulmineContext context)
120 {
121 super();
122 if (eventProcessorCount < 2)
123 {
124 throw new IllegalArgumentException(
125 "eventProcessorCount cannot be < 2");
126 }
127 this.availableProcessors = eventProcessorCount;
128
129 this.eventProcessorThreadGroup =
130 createThreadGroup(identity, Thread.NORM_PRIORITY);
131
132 this.processors = new EventProcessor[this.availableProcessors];
133 createProcessors(this.processors,
134 identity + COLON + PROCESSOR_BASENAME,
135 this.eventProcessorThreadGroup, context);
136
137 this.executor =
138 new TaskExecutor(identity + COLON + "AdhocProcessor", context);
139 this.eventSources =
140 new AutoCreatingStore<Class, ISystemEventSource>(
141 new SystemEventSourceBuilder());
142 if (getLog().isInfoEnabled())
143 {
144 getLog().info(
145 "Created " + this.availableProcessors + " event processors");
146 }
147 }
148
149
150
151
152
153
154
155
156
157
158 private ThreadGroup createThreadGroup(String identity, int priority)
159 {
160 String ps;
161 switch (priority)
162 {
163 case Thread.MIN_PRIORITY:
164 ps = "LowPriority";
165 break;
166 case Thread.MAX_PRIORITY:
167 ps = "HighPriority";
168 break;
169 case Thread.NORM_PRIORITY:
170 ps = "NormalPriority";
171 break;
172 default:
173 throw new RuntimeException("Unknown priority: " + priority);
174 }
175 final ThreadGroup threadGroup =
176 new ThreadGroup(identity + COLON + IFrameworkContext.NAME + ps);
177 threadGroup.setMaxPriority(priority);
178 return threadGroup;
179 }
180
181 public ThreadGroup getEventProcessorThreadGroup()
182 {
183 return this.eventProcessorThreadGroup;
184 }
185
186 @Override
187 protected AsyncLog getLog()
188 {
189 return LOG;
190 }
191
192 @Override
193 protected void doStart()
194 {
195 this.executor.start();
196 startProcessors(this.processors);
197 if (getLog().isInfoEnabled())
198 {
199 getLog().info("Started");
200 }
201 }
202
203 @Override
204 protected void doDestroy()
205 {
206 this.eventSources.destroy();
207
208 this.executor.destroy();
209 destroyProcessors(this.processors);
210
211 destroyThreadGroup(this.eventProcessorThreadGroup);
212 }
213
214 public void queueEvents(Collection<IEvent> events)
215 {
216 if (events != null)
217 {
218 for (IEvent event : events)
219 {
220 try
221 {
222 queueEvent(event);
223 }
224 catch (Exception e)
225 {
226 logException(getLog(), event, e);
227 }
228 }
229 }
230 }
231
232 public void queueEvent(IEvent event)
233 {
234 checkActive();
235 if (event != null)
236 {
237
238 final IEventSource source = event.getSource();
239 if (source != null)
240 {
241 int eventProcessorId = 0;
242 if (source instanceof IField)
243 {
244
245 final IContainer container =
246 ((IField) source).getContainer();
247 if (container == null)
248 {
249 return;
250 }
251 eventProcessorId = container.getEventSourceGroupId();
252 }
253 else
254 {
255 eventProcessorId = source.getEventSourceGroupId();
256 }
257 final EventProcessor processor =
258 this.processors[eventProcessorId];
259 if (processor.isActive())
260 {
261 if (getLog().isTraceEnabled())
262 {
263 getLog().trace(
264 "Adding " + SystemUtils.LINE_SEPARATOR
265 + SPACING_4_CHARS + "Event: "
266 + event.toString() + SystemUtils.LINE_SEPARATOR
267 + SPACING_4_CHARS + "Source: "
268 + event.getSource().toIdentityString()
269 + SystemUtils.LINE_SEPARATOR + SPACING_4_CHARS
270 + "Processor: " + processor, new Exception());
271 }
272 else
273 {
274 if (getLog().isDebugEnabled())
275 {
276 getLog().debug(
277 "Adding " + SystemUtils.LINE_SEPARATOR
278 + SPACING_4_CHARS + "Event: "
279 + event.toIdentityString()
280 + SystemUtils.LINE_SEPARATOR
281 + SPACING_4_CHARS + "Source: "
282 + event.getSource().toIdentityString()
283 + SystemUtils.LINE_SEPARATOR
284 + SPACING_4_CHARS + "Processor: "
285 + processor);
286 }
287 }
288 processor.queue(event);
289 }
290 }
291 }
292 }
293
294 public int getEventProcessorCount()
295 {
296 return this.availableProcessors;
297 }
298
299 private void createProcessors(EventProcessor[] processors,
300 String processorName, ThreadGroup threadGroup, IFulmineContext context)
301 {
302 for (int i = 0; i < processors.length; i++)
303 {
304 processors[i] =
305 new EventProcessor(processorName + i, threadGroup, context);
306 }
307 }
308
309 private void destroyThreadGroup(ThreadGroup threadGroup)
310 {
311
312 Thread[] threads = new Thread[threadGroup.activeCount()];
313 threadGroup.enumerate(threads);
314 for (Thread thread : threads)
315 {
316 try
317 {
318 if (!Thread.currentThread().equals(thread) && thread != null)
319 {
320 thread.interrupt();
321 }
322 }
323 catch (Exception e)
324 {
325 logException(getLog(), thread, e);
326 }
327 }
328 }
329
330 private void destroyProcessors(EventProcessor[] eventProcessors)
331 {
332 for (EventProcessor processor : eventProcessors)
333 {
334 try
335 {
336 processor.destroy();
337 }
338 catch (Exception e)
339 {
340 logException(getLog(), processor, e);
341 }
342 }
343 }
344
345 private void startProcessors(EventProcessor[] eventProcessors)
346 {
347 for (EventProcessor processor : eventProcessors)
348 {
349 processor.start();
350 }
351 }
352
353 public ISystemEventSource getSystemEventSource(
354 Class<? extends ISystemEvent> type)
355 {
356 return this.eventSources.get(type);
357 }
358
359 public void execute(Runnable task)
360 {
361 checkActive();
362 this.executor.execute(task);
363 }
364
365 public void schedule(TimerTask task, long delay, long period)
366 {
367 checkActive();
368 this.executor.schedule(task, delay, period);
369 }
370 }