1 /*
2 Copyright 2008 Ramon Servadei
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16 package fulmine.util.concurrent;
17
18 import static fulmine.util.Utils.COLON;
19 import static fulmine.util.Utils.EMPTY_STRING;
20 import static fulmine.util.Utils.logException;
21
22 import java.util.Queue;
23 import java.util.Timer;
24 import java.util.TimerTask;
25 import java.util.concurrent.Executor;
26
27 import fulmine.ILifeCycle;
28 import fulmine.context.IFulmineContext;
29 import fulmine.event.EventQueueItem;
30 import fulmine.util.collection.CollectionFactory;
31 import fulmine.util.log.IStatistics;
32 import fulmine.util.log.Statistics;
33
34 /**
35 * A single threaded {@link Executor} for tasks. This has an unbounded task
36 * queue. There are 2 modes of operation for this;
37 * <ul>
38 * <li>Event driven; when a task is added the executor will be notified
39 * <li>Polling; the executor will periodically poll the task queue
40 * </ul>
41 * These 2 modes provide the necessary behaviour to meet different application
42 * processing requirements. A polling behaviour is suitable for high volume
43 * non-critical task submission; the threads submitting the tasks do not wake up
44 * the executing thread. The executing thread will periodically poll the task
45 * queue and consume all tasks. The trade-off for the submitting thread not
46 * having to notify the executing thread is greater context switching regardless
47 * of whether there are any tasks to process for the executing thread.
48 * <p>
49 * There is a single global {@link Timer} used by all executors for handling the
50 * scheduling of tasks via the {@link #schedule(TimerTask, long, long)} method.
51 * The actual task will be run by the {@link TaskExecutor} - the timer is merely
52 * used to schedule a call to {@link #execute(Runnable)} when the task is due
53 * for (re)execution.
54 *
55 * @author Ramon Servadei
56 */
57 public final class TaskExecutor implements ITaskExecutor, ILifeCycle
58 {
59 /**
60 * A 'global' timer used simply for scheduling when tasks should be
61 * executed. The execution takes place on the {@link TaskExecutor} that had
62 * the {@link TaskExecutor#schedule(TimerTask, long, long)} method called.
63 */
64 private static final Timer timer;
65 static
66 {
67 timer = new Timer(TaskExecutor.class.getSimpleName() + "Timer", true);
68 }
69
70 /** The thread to execute the tasks */
71 private final Thread executor;
72
73 /** The task queue */
74 private final Queue<EventQueueItem<Runnable>> tasks;
75
76 /** The polling period, for event driven this will be 0 */
77 private final long pollingPeriod;
78
79 /** indicates if the executor is active */
80 private volatile boolean active = true;
81
82 /** The statistics for this executor */
83 private final IStatistics stats;
84
85 /**
86 * The logic for de-queueing the task queue of the {@link ITaskExecutor}
87 *
88 * @author Ramon Servadei
89 */
90 private final class DequeueLogic implements Runnable
91 {
92 public DequeueLogic()
93 {
94 super();
95 }
96
97 public void run()
98 {
99 EventQueueItem<Runnable> task = null;
100 while (isActive())
101 {
102 try
103 {
104 task = null;
105 int queueSize = 0;
106 synchronized (TaskExecutor.this.getTasks())
107 {
108 if (TaskExecutor.this.getTasks().size() == 0)
109 {
110 try
111 {
112 TaskExecutor.this.getTasks().wait(
113 TaskExecutor.this.getPollingPeriod());
114 }
115 catch (InterruptedException e)
116 {
117 if (!isActive())
118 {
119 return;
120 }
121 }
122 }
123 final int size = TaskExecutor.this.getTasks().size();
124 if (size > 0)
125 {
126 task = TaskExecutor.this.getTasks().poll();
127 queueSize = size - 1;
128 }
129 }
130 try
131 {
132 if (task != null)
133 {
134 // do not log statistics of statistics events
135 if (!task.isStatisticsEvent())
136 {
137 TaskExecutor.this.stats.processEvent(task.getElapsedTimeMicro());
138 }
139 task.getEvent().run();
140 if (!task.isStatisticsEvent())
141 {
142 // todo could log the execution time
143 TaskExecutor.this.stats.intervalFinished(
144 queueSize, 1);
145 }
146 }
147 }
148 catch (Exception e)
149 {
150 logException(null, task, e);
151 }
152 }
153 catch (Exception e)
154 {
155 logException(null, EMPTY_STRING, e);
156 }
157 }
158 }
159 }
160
161 /**
162 * Construct the executor in event driven mode in the same thread group as
163 * the calling thread.
164 *
165 * @param identity
166 * the identity of the executor
167 * @param context
168 * required to construct the information container record, this
169 * is done lazily
170 */
171 public TaskExecutor(String identity, IFulmineContext context)
172 {
173 this(identity, 0, context);
174 }
175
176 /**
177 * Construct the executor in polling or event driven mode in the same thread
178 * group as the calling thread.
179 *
180 * @param identity
181 * the identity of the executor
182 * @param pollingPeriod
183 * the polling period, in milliseconds or <code>0</code> or less
184 * for event driven mode
185 * @param context
186 * required to construct the information container record, this
187 * is done lazily
188 */
189 public TaskExecutor(String identity, long pollingPeriod,
190 IFulmineContext context)
191 {
192 this(Thread.currentThread().getThreadGroup(), identity, pollingPeriod,
193 context);
194 }
195
196 /**
197 * Construct the executor in event driven mode with a specific priority
198 *
199 * @param threadGroup
200 * the thread group and priority
201 * @param identity
202 * the identity of the executor
203 * @param context
204 * required to construct the information container record, this
205 * is done lazily
206 */
207 public TaskExecutor(ThreadGroup threadGroup, String identity,
208 IFulmineContext context)
209 {
210 this(threadGroup, identity, 0, context);
211 }
212
213 /**
214 * Internal constructor that handles the thread group and polling paradigm
215 *
216 * @param threadGroup
217 * the thread group and priority
218 * @param identity
219 * the identity of the executor
220 * @param pollingPeriod
221 * the polling period, in milliseconds or <code>0</code> or less
222 * for event driven mode
223 * @param context
224 * required to construct the information container record, this
225 * is done lazily
226 */
227 private TaskExecutor(ThreadGroup threadGroup, String identity,
228 long pollingPeriod, IFulmineContext context)
229 {
230
231 this.tasks = CollectionFactory.newQueue();
232 this.stats = new Statistics(identity, context);
233 this.executor =
234 new Thread(threadGroup, new DequeueLogic(), identity + COLON
235 + getClass().getSimpleName());
236 this.executor.setDaemon(true);
237 this.executor.setPriority(threadGroup.getMaxPriority());
238 this.pollingPeriod = pollingPeriod < 0 ? 0 : pollingPeriod;
239 }
240
241 public void execute(Runnable command)
242 {
243 synchronized (getTasks())
244 {
245 getTasks().add(new EventQueueItem<Runnable>(command));
246 if (getPollingPeriod() == 0)
247 {
248 getTasks().notify();
249 }
250 }
251 }
252
253 public boolean isActive()
254 {
255 return this.executor.isAlive() && active;
256 }
257
258 public void start()
259 {
260 this.executor.start();
261 // schedule the stats logging
262 schedule(new TimerTask()
263 {
264 @Override
265 public void run()
266 {
267 EventQueueItem.isStatisticsEvent.set(Boolean.TRUE);
268 try
269 {
270 TaskExecutor.this.stats.run();
271 }
272 finally
273 {
274 EventQueueItem.isStatisticsEvent.remove();
275 }
276 }
277 }, Statistics.TIME_PERIOD, Statistics.TIME_PERIOD);
278 }
279
280 public void destroy()
281 {
282 this.active = false;
283 this.executor.interrupt();
284 }
285
286 private long getPollingPeriod()
287 {
288 return this.pollingPeriod;
289 }
290
291 private Queue<EventQueueItem<Runnable>> getTasks()
292 {
293 return this.tasks;
294 }
295
296 public StackTraceElement[] getStackTrace()
297 {
298 return this.executor.getStackTrace();
299 }
300
301 public void schedule(final TimerTask task, long delay, long period)
302 {
303 timer.schedule(new TimerTask()
304 {
305 @Override
306 public void run()
307 {
308 // execute on the task executor thread
309 execute(task);
310 }
311 }, delay, period);
312 }
313 }