View Javadoc

1   /*
2      Copyright 2008 Ramon Servadei
3   
4      Licensed under the Apache License, Version 2.0 (the "License");
5      you may not use this file except in compliance with the License.
6      You may obtain a copy of the License at
7   
8          http://www.apache.org/licenses/LICENSE-2.0
9   
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15   */
16  package fulmine.context;
17  
18  import static fulmine.util.Utils.COLON;
19  import static fulmine.util.Utils.SPACING_4_CHARS;
20  import static fulmine.util.Utils.logException;
21  
22  import java.util.Collection;
23  import java.util.TimerTask;
24  
25  import org.apache.commons.lang.SystemUtils;
26  
27  import fulmine.AbstractLifeCycle;
28  import fulmine.event.EventProcessor;
29  import fulmine.event.IEvent;
30  import fulmine.event.IEventManager;
31  import fulmine.event.IEventSource;
32  import fulmine.event.system.ISystemEvent;
33  import fulmine.event.system.ISystemEventSource;
34  import fulmine.event.system.SystemEventSource;
35  import fulmine.model.container.IContainer;
36  import fulmine.model.field.IField;
37  import fulmine.util.concurrent.ITaskExecutor;
38  import fulmine.util.concurrent.TaskExecutor;
39  import fulmine.util.log.AsyncLog;
40  import fulmine.util.reference.AutoCreatingStore;
41  import fulmine.util.reference.IAutoCreatingStore;
42  import fulmine.util.reference.IObjectBuilder;
43  
44  /**
45   * A manager for the event framework of a context. This constructs a number of
46   * internal {@link EventProcessor} instances that handle {@link IEvent}
47   * distribution.
48   * <p>
49   * The allocation of an event to an event processor is based on the
50   * {@link IEventSource#getEventSourceGroupId} of the event's
51   * {@link IEventSource} <code>modulo</code> the number of processors. This
52   * ensures events from the same source are processed by the same processor and
53   * thus listeners registered with a source are guaranteed to receive events
54   * originating from an event source in the order they occurred.
55   * <p>
56   * This class also has a single threaded task executor that is used to service
57   * requests via the {@link #execute(Runnable)} method. This executor is meant to
58   * be used as a utility for executing "one-shot" asynchronous tasks.
59   * 
60   * @see IFulmineContext
61   * @author Ramon Servadei
62   */
63  final class EventManager extends AbstractLifeCycle implements IEventManager
64  {
65      private final static AsyncLog LOG = new AsyncLog(EventManager.class);
66  
67      /** The basename for an event processor */
68      static final String PROCESSOR_BASENAME =
69          IFrameworkContext.NAME + "EventProcessor";
70  
71      /**
72       * Builder for {@link SystemEventSource} instances
73       * 
74       * @author Ramon Servadei
75       * 
76       */
77      @SuppressWarnings("unchecked")
78      private static final class SystemEventSourceBuilder implements
79          IObjectBuilder<Class, ISystemEventSource>
80      {
81          public ISystemEventSource create(Class key)
82          {
83              final SystemEventSource systemEventSource =
84                  new SystemEventSource(key.getSimpleName());
85              systemEventSource.start();
86              return systemEventSource;
87          }
88      }
89  
90      /** The event processor thread group */
91      final ThreadGroup eventProcessorThreadGroup;
92  
93      /** The {@link EventProcessor} instances in the context */
94      protected final EventProcessor[] processors;
95  
96      /** Executor for 'one-shot' tasks */
97      private final ITaskExecutor executor;
98  
99      private final byte availableProcessors;
100 
101     /** The typed event sources */
102     @SuppressWarnings("unchecked")
103     private final IAutoCreatingStore<Class, ISystemEventSource> eventSources;
104 
105     /**
106      * Standard constructor
107      * 
108      * @param identity
109      *            the identity for the threads
110      * @param eventProcessorCount
111      *            the number of processors available
112      * @param context
113      *            required to construct the information container records for
114      *            the {@link EventProcessor} instances. The containers are
115      *            created lazily.
116      */
117     @SuppressWarnings("unchecked")
118     EventManager(final String identity, byte eventProcessorCount,
119         IFulmineContext context)
120     {
121         super();
122         if (eventProcessorCount < 2)
123         {
124             throw new IllegalArgumentException(
125                 "eventProcessorCount cannot be < 2");
126         }
127         this.availableProcessors = eventProcessorCount;
128 
129         this.eventProcessorThreadGroup =
130             createThreadGroup(identity, Thread.NORM_PRIORITY);
131 
132         this.processors = new EventProcessor[this.availableProcessors];
133         createProcessors(this.processors,
134             identity + COLON + PROCESSOR_BASENAME,
135             this.eventProcessorThreadGroup, context);
136 
137         this.executor =
138             new TaskExecutor(identity + COLON + "AdhocProcessor", context);
139         this.eventSources =
140             new AutoCreatingStore<Class, ISystemEventSource>(
141                 new SystemEventSourceBuilder());
142         if (getLog().isInfoEnabled())
143         {
144             getLog().info(
145                 "Created " + this.availableProcessors + " event processors");
146         }
147     }
148 
149     /**
150      * Utility method to create a thread group
151      * 
152      * @param identity
153      *            the identity for the thread group
154      * @param priority
155      *            the priority of the thread group
156      * @return the {@link ThreadGroup}
157      */
158     private ThreadGroup createThreadGroup(String identity, int priority)
159     {
160         String ps;
161         switch (priority)
162         {
163             case Thread.MIN_PRIORITY:
164                 ps = "LowPriority";
165                 break;
166             case Thread.MAX_PRIORITY:
167                 ps = "HighPriority";
168                 break;
169             case Thread.NORM_PRIORITY:
170                 ps = "NormalPriority";
171                 break;
172             default:
173                 throw new RuntimeException("Unknown priority: " + priority);
174         }
175         final ThreadGroup threadGroup =
176             new ThreadGroup(identity + COLON + IFrameworkContext.NAME + ps);
177         threadGroup.setMaxPriority(priority);
178         return threadGroup;
179     }
180 
181     public ThreadGroup getEventProcessorThreadGroup()
182     {
183         return this.eventProcessorThreadGroup;
184     }
185 
186     @Override
187     protected AsyncLog getLog()
188     {
189         return LOG;
190     }
191 
192     @Override
193     protected void doStart()
194     {
195         this.executor.start();
196         startProcessors(this.processors);
197         if (getLog().isInfoEnabled())
198         {
199             getLog().info("Started");
200         }
201     }
202 
203     @Override
204     protected void doDestroy()
205     {
206         this.eventSources.destroy();
207         // destroy the processors
208         this.executor.destroy();
209         destroyProcessors(this.processors);
210         // destroy the thread groups
211         destroyThreadGroup(this.eventProcessorThreadGroup);
212     }
213 
214     public void queueEvents(Collection<IEvent> events)
215     {
216         if (events != null)
217         {
218             for (IEvent event : events)
219             {
220                 try
221                 {
222                     queueEvent(event);
223                 }
224                 catch (Exception e)
225                 {
226                     logException(getLog(), event, e);
227                 }
228             }
229         }
230     }
231 
232     public void queueEvent(IEvent event)
233     {
234         checkActive();
235         if (event != null)
236         {
237             // find the processor for the event SOURCE
238             final IEventSource source = event.getSource();
239             if (source != null)
240             {
241                 int eventProcessorId = 0;
242                 if (source instanceof IField)
243                 {
244                     // ensures fields use the same processor as their container
245                     final IContainer container =
246                         ((IField) source).getContainer();
247                     if (container == null)
248                     {
249                         return;
250                     }
251                     eventProcessorId = container.getEventSourceGroupId();
252                 }
253                 else
254                 {
255                     eventProcessorId = source.getEventSourceGroupId();
256                 }
257                 final EventProcessor processor =
258                     this.processors[eventProcessorId];
259                 if (processor.isActive())
260                 {
261                     if (getLog().isTraceEnabled())
262                     {
263                         getLog().trace(
264                             "Adding " + SystemUtils.LINE_SEPARATOR
265                                 + SPACING_4_CHARS + "Event: "
266                                 + event.toString() + SystemUtils.LINE_SEPARATOR
267                                 + SPACING_4_CHARS + "Source: "
268                                 + event.getSource().toIdentityString()
269                                 + SystemUtils.LINE_SEPARATOR + SPACING_4_CHARS
270                                 + "Processor: " + processor, new Exception());
271                     }
272                     else
273                     {
274                         if (getLog().isDebugEnabled())
275                         {
276                             getLog().debug(
277                                 "Adding " + SystemUtils.LINE_SEPARATOR
278                                     + SPACING_4_CHARS + "Event: "
279                                     + event.toIdentityString()
280                                     + SystemUtils.LINE_SEPARATOR
281                                     + SPACING_4_CHARS + "Source: "
282                                     + event.getSource().toIdentityString()
283                                     + SystemUtils.LINE_SEPARATOR
284                                     + SPACING_4_CHARS + "Processor: "
285                                     + processor);
286                         }
287                     }
288                     processor.queue(event);
289                 }
290             }
291         }
292     }
293 
294     public int getEventProcessorCount()
295     {
296         return this.availableProcessors;
297     }
298 
299     private void createProcessors(EventProcessor[] processors,
300         String processorName, ThreadGroup threadGroup, IFulmineContext context)
301     {
302         for (int i = 0; i < processors.length; i++)
303         {
304             processors[i] =
305                 new EventProcessor(processorName + i, threadGroup, context);
306         }
307     }
308 
309     private void destroyThreadGroup(ThreadGroup threadGroup)
310     {
311         // interrupt all threads we know about
312         Thread[] threads = new Thread[threadGroup.activeCount()];
313         threadGroup.enumerate(threads);
314         for (Thread thread : threads)
315         {
316             try
317             {
318                 if (!Thread.currentThread().equals(thread) && thread != null)
319                 {
320                     thread.interrupt();
321                 }
322             }
323             catch (Exception e)
324             {
325                 logException(getLog(), thread, e);
326             }
327         }
328     }
329 
330     private void destroyProcessors(EventProcessor[] eventProcessors)
331     {
332         for (EventProcessor processor : eventProcessors)
333         {
334             try
335             {
336                 processor.destroy();
337             }
338             catch (Exception e)
339             {
340                 logException(getLog(), processor, e);
341             }
342         }
343     }
344 
345     private void startProcessors(EventProcessor[] eventProcessors)
346     {
347         for (EventProcessor processor : eventProcessors)
348         {
349             processor.start();
350         }
351     }
352 
353     public ISystemEventSource getSystemEventSource(
354         Class<? extends ISystemEvent> type)
355     {
356         return this.eventSources.get(type);
357     }
358 
359     public void execute(Runnable task)
360     {
361         checkActive();
362         this.executor.execute(task);
363     }
364 
365     public void schedule(TimerTask task, long delay, long period)
366     {
367         checkActive();
368         this.executor.schedule(task, delay, period);
369     }
370 }