View Javadoc

1   /*
2      Copyright 2008 Ramon Servadei
3   
4      Licensed under the Apache License, Version 2.0 (the "License");
5      you may not use this file except in compliance with the License.
6      You may obtain a copy of the License at
7   
8          http://www.apache.org/licenses/LICENSE-2.0
9   
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15   */
16  package fulmine.util.concurrent;
17  
18  import static fulmine.util.Utils.COLON;
19  import static fulmine.util.Utils.EMPTY_STRING;
20  import static fulmine.util.Utils.logException;
21  
22  import java.util.Queue;
23  import java.util.Timer;
24  import java.util.TimerTask;
25  import java.util.concurrent.Executor;
26  
27  import fulmine.ILifeCycle;
28  import fulmine.context.IFulmineContext;
29  import fulmine.event.EventQueueItem;
30  import fulmine.util.collection.CollectionFactory;
31  import fulmine.util.log.IStatistics;
32  import fulmine.util.log.Statistics;
33  
34  /**
35   * A single threaded {@link Executor} for tasks. This has an unbounded task
36   * queue. There are 2 modes of operation for this;
37   * <ul>
38   * <li>Event driven; when a task is added the executor will be notified
39   * <li>Polling; the executor will periodically poll the task queue
40   * </ul>
41   * These 2 modes provide the necessary behaviour to meet different application
42   * processing requirements. A polling behaviour is suitable for high volume
43   * non-critical task submission; the threads submitting the tasks do not wake up
44   * the executing thread. The executing thread will periodically poll the task
45   * queue and consume all tasks. The trade-off for the submitting thread not
46   * having to notify the executing thread is greater context switching regardless
47   * of whether there are any tasks to process for the executing thread.
48   * <p>
49   * There is a single global {@link Timer} used by all executors for handling the
50   * scheduling of tasks via the {@link #schedule(TimerTask, long, long)} method.
51   * The actual task will be run by the {@link TaskExecutor} - the timer is merely
52   * used to schedule a call to {@link #execute(Runnable)} when the task is due
53   * for (re)execution.
54   * 
55   * @author Ramon Servadei
56   */
57  public final class TaskExecutor implements ITaskExecutor, ILifeCycle
58  {
59      /**
60       * A 'global' timer used simply for scheduling when tasks should be
61       * executed. The execution takes place on the {@link TaskExecutor} that had
62       * the {@link TaskExecutor#schedule(TimerTask, long, long)} method called.
63       */
64      private static final Timer timer;
65      static
66      {
67          timer = new Timer(TaskExecutor.class.getSimpleName() + "Timer", true);
68      }
69  
70      /** The thread to execute the tasks */
71      private final Thread executor;
72  
73      /** The task queue */
74      private final Queue<EventQueueItem<Runnable>> tasks;
75  
76      /** The polling period, for event driven this will be 0 */
77      private final long pollingPeriod;
78  
79      /** indicates if the executor is active */
80      private volatile boolean active = true;
81  
82      /** The statistics for this executor */
83      private final IStatistics stats;
84  
85      /**
86       * The logic for de-queueing the task queue of the {@link ITaskExecutor}
87       * 
88       * @author Ramon Servadei
89       */
90      private final class DequeueLogic implements Runnable
91      {
92          public DequeueLogic()
93          {
94              super();
95          }
96  
97          public void run()
98          {
99              EventQueueItem<Runnable> task = null;
100             while (isActive())
101             {
102                 try
103                 {
104                     task = null;
105                     int queueSize = 0;
106                     synchronized (TaskExecutor.this.getTasks())
107                     {
108                         if (TaskExecutor.this.getTasks().size() == 0)
109                         {
110                             try
111                             {
112                                 TaskExecutor.this.getTasks().wait(
113                                     TaskExecutor.this.getPollingPeriod());
114                             }
115                             catch (InterruptedException e)
116                             {
117                                 if (!isActive())
118                                 {
119                                     return;
120                                 }
121                             }
122                         }
123                         final int size = TaskExecutor.this.getTasks().size();
124                         if (size > 0)
125                         {
126                             task = TaskExecutor.this.getTasks().poll();
127                             queueSize = size - 1;
128                         }
129                     }
130                     try
131                     {
132                         if (task != null)
133                         {
134                             // do not log statistics of statistics events
135                             if (!task.isStatisticsEvent())
136                             {
137                                 TaskExecutor.this.stats.processEvent(task.getElapsedTimeMicro());
138                             }
139                             task.getEvent().run();
140                             if (!task.isStatisticsEvent())
141                             {
142                                 // todo could log the execution time
143                                 TaskExecutor.this.stats.intervalFinished(
144                                     queueSize, 1);
145                             }
146                         }
147                     }
148                     catch (Exception e)
149                     {
150                         logException(null, task, e);
151                     }
152                 }
153                 catch (Exception e)
154                 {
155                     logException(null, EMPTY_STRING, e);
156                 }
157             }
158         }
159     }
160 
161     /**
162      * Construct the executor in event driven mode in the same thread group as
163      * the calling thread.
164      * 
165      * @param identity
166      *            the identity of the executor
167      * @param context
168      *            required to construct the information container record, this
169      *            is done lazily
170      */
171     public TaskExecutor(String identity, IFulmineContext context)
172     {
173         this(identity, 0, context);
174     }
175 
176     /**
177      * Construct the executor in polling or event driven mode in the same thread
178      * group as the calling thread.
179      * 
180      * @param identity
181      *            the identity of the executor
182      * @param pollingPeriod
183      *            the polling period, in milliseconds or <code>0</code> or less
184      *            for event driven mode
185      * @param context
186      *            required to construct the information container record, this
187      *            is done lazily
188      */
189     public TaskExecutor(String identity, long pollingPeriod,
190         IFulmineContext context)
191     {
192         this(Thread.currentThread().getThreadGroup(), identity, pollingPeriod,
193             context);
194     }
195 
196     /**
197      * Construct the executor in event driven mode with a specific priority
198      * 
199      * @param threadGroup
200      *            the thread group and priority
201      * @param identity
202      *            the identity of the executor
203      * @param context
204      *            required to construct the information container record, this
205      *            is done lazily
206      */
207     public TaskExecutor(ThreadGroup threadGroup, String identity,
208         IFulmineContext context)
209     {
210         this(threadGroup, identity, 0, context);
211     }
212 
213     /**
214      * Internal constructor that handles the thread group and polling paradigm
215      * 
216      * @param threadGroup
217      *            the thread group and priority
218      * @param identity
219      *            the identity of the executor
220      * @param pollingPeriod
221      *            the polling period, in milliseconds or <code>0</code> or less
222      *            for event driven mode
223      * @param context
224      *            required to construct the information container record, this
225      *            is done lazily
226      */
227     private TaskExecutor(ThreadGroup threadGroup, String identity,
228         long pollingPeriod, IFulmineContext context)
229     {
230 
231         this.tasks = CollectionFactory.newQueue();
232         this.stats = new Statistics(identity, context);
233         this.executor =
234             new Thread(threadGroup, new DequeueLogic(), identity + COLON
235                 + getClass().getSimpleName());
236         this.executor.setDaemon(true);
237         this.executor.setPriority(threadGroup.getMaxPriority());
238         this.pollingPeriod = pollingPeriod < 0 ? 0 : pollingPeriod;
239     }
240 
241     public void execute(Runnable command)
242     {
243         synchronized (getTasks())
244         {
245             getTasks().add(new EventQueueItem<Runnable>(command));
246             if (getPollingPeriod() == 0)
247             {
248                 getTasks().notify();
249             }
250         }
251     }
252 
253     public boolean isActive()
254     {
255         return this.executor.isAlive() && active;
256     }
257 
258     public void start()
259     {
260         this.executor.start();
261         // schedule the stats logging
262         schedule(new TimerTask()
263         {
264             @Override
265             public void run()
266             {
267                 EventQueueItem.isStatisticsEvent.set(Boolean.TRUE);
268                 try
269                 {
270                     TaskExecutor.this.stats.run();
271                 }
272                 finally
273                 {
274                     EventQueueItem.isStatisticsEvent.remove();
275                 }
276             }
277         }, Statistics.TIME_PERIOD, Statistics.TIME_PERIOD);
278     }
279 
280     public void destroy()
281     {
282         this.active = false;
283         this.executor.interrupt();
284     }
285 
286     private long getPollingPeriod()
287     {
288         return this.pollingPeriod;
289     }
290 
291     private Queue<EventQueueItem<Runnable>> getTasks()
292     {
293         return this.tasks;
294     }
295 
296     public StackTraceElement[] getStackTrace()
297     {
298         return this.executor.getStackTrace();
299     }
300 
301     public void schedule(final TimerTask task, long delay, long period)
302     {
303         timer.schedule(new TimerTask()
304         {
305             @Override
306             public void run()
307             {
308                 // execute on the task executor thread
309                 execute(task);
310             }
311         }, delay, period);
312     }
313 }