1 /*
2 Copyright 2007 Ramon Servadei
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16 package fulmine.util.io;
17
18 import static fulmine.util.Utils.logException;
19 import static fulmine.util.Utils.safeToString;
20
21 import java.io.IOException;
22 import java.nio.channels.Channel;
23 import java.nio.channels.ClosedChannelException;
24 import java.nio.channels.SelectableChannel;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.util.Iterator;
28 import java.util.Map;
29 import java.util.Set;
30 import java.util.concurrent.ConcurrentHashMap;
31
32 import fulmine.AbstractLifeCycle;
33 import fulmine.ILifeCycle;
34 import fulmine.util.log.AsyncLog;
35
36 /**
37 * A collection of {@link ISelectionKeyTask} objects that handle the logic for
38 * the ready operations raised from registered NIO {@link Channel} objects. This
39 * class exists to provide a mechanism to manage the tasks that should be run
40 * for each channel that is registered against a single {@link Selector}. Each
41 * instance has an internal {@link Selector} that is used to register with the
42 * {@link Channel} supplied in the
43 * {@link #register(int, SelectableChannel, ISelectionKeyTask)} method.
44 * <p>
45 * This object invokes the {@link Selector#select()} method and blocks until a
46 * channel is available for one of the registered operations. When this happens
47 * the appropriate task is located and executed.
48 *
49 * <pre>
50 * // create and register
51 * SelectorTasks tasks = new SelectorTasks();
52 * // create an optional listener to get notified when the selector is opened
53 * ISelectorTasksStateListener = ...
54 * tasks.setStateListener(listener);
55 *
56 * // OPEN THE CHANNEL - no other operation is valid until this is run
57 * tasks.openSelector();
58 *
59 * SelectableChannel channel = ...;
60 * ISelectionKeyTask task = ...;
61 * tasks.register(SelectionKey.OP_READ, channel, task);
62 *
63 * // process - this blocks until a ready operation occurs,
64 * // in the real-world, this would be called in a separate thread
65 * while(tasks.process());
66 * </pre>
67 *
68 * There should only be 1 thread running the process method. The
69 * {@link #getRunnable()} method returns an {@link Runnable} object for a
70 * dedicated processing thread. e.g.
71 *
72 * <pre>
73 * // create and register
74 * SelectorTasks tasks = new SelectorTasks();
75 * ...
76 * // kick off a thread to run the process() method
77 * new Thread(tasks.getRunnable()).start();
78 * </pre>
79 *
80 * <b>This is not thread aware.</b>
81 *
82 * @author Ramon Servadei
83 *
84 */
85 public final class SelectorTasks extends AbstractLifeCycle implements
86 ILifeCycle
87 {
88 private final static AsyncLog LOG = new AsyncLog(SelectorTasks.class);
89
90 /**
91 * The {@link ISelectionKeyTask} objects registered via
92 * {@link #register(int, SelectableChannel, ISelectionKeyTask)}
93 */
94 private final Map<SelectionKey, ISelectionKeyTask> tasks;
95
96 /** The selector */
97 private Selector bo;
98
99 /** The state listener */
100 private ISelectorTasksStateListener listener;
101
102 /**
103 * Standard constructor
104 */
105 public SelectorTasks()
106 {
107 super();
108 this.tasks = new ConcurrentHashMap<SelectionKey, ISelectionKeyTask>();
109 start();
110 }
111
112 /**
113 * Register the internal {@link Selector} with the channel and map the
114 * supplied task to the {@link SelectionKey} that is returned from the
115 * {@link SelectableChannel#register(Selector, int)} method.
116 *
117 * @param op
118 * the operation to register for
119 * @param channel
120 * the channel that will register with the internal selector
121 * @param task
122 * the task to run when the operation occurs in the channel. This
123 * task should be efficient; if it does any blocking this affects
124 * processing of other registered {@link ISelectionKeyTask}
125 * objects.
126 * @see SelectionKey
127 */
128 @SuppressWarnings("boxing")
129 public void register(int op, SelectableChannel channel,
130 ISelectionKeyTask task)
131 {
132 checkSelector();
133 try
134 {
135 synchronized (this)
136 {
137 // register blocks until the selector is available
138 // so wake it up
139 this.bo.wakeup();
140 final SelectionKey selectionKey = channel.register(this.bo, op);
141 if (getLog().isDebugEnabled())
142 {
143 getLog().debug(
144 "Registered " + safeToString(task) + " against "
145 + safeToString(channel));
146 }
147 task.setSelectionKey(selectionKey);
148 this.tasks.put(selectionKey, task);
149 }
150 }
151 catch (ClosedChannelException e)
152 {
153 logException(getLog(), "Could not register operation "
154 + safeToString(op) + " for " + safeToString(channel) + " with "
155 + safeToString(task), e);
156 }
157 }
158
159 /**
160 * Unregister the selection key and remove the associated
161 * {@link ISelectionKeyTask}. The key is also cancelled via the
162 * {@link SelectionKey#cancel()} method.
163 *
164 * @param key
165 * the selection key to unregister and cancel
166 * @return the previously registered task
167 */
168 public ISelectionKeyTask unregister(SelectionKey key)
169 {
170 key.cancel();
171 final ISelectionKeyTask task = this.tasks.remove(key);
172 if (getLog().isDebugEnabled())
173 {
174 getLog().debug("Unregistered " + safeToString(task));
175 }
176 return task;
177 }
178
179 /**
180 * Call {@link Selector#select()} and wait for a one of the operations in
181 * any of the registered channels to occur. When one does, the appropriate
182 * {@link ISelectionKeyTask} is run.
183 *
184 * @return <code>false</code> if this becomes inactive
185 */
186 public boolean process()
187 {
188 checkSelector();
189 try
190 {
191 this.bo.select();
192 if (this.bo.isOpen())
193 {
194 Set<SelectionKey> selectedKeys = null;
195 synchronized (this)
196 {
197 selectedKeys = this.bo.selectedKeys();
198 }
199 for (Iterator<SelectionKey> iterator = selectedKeys.iterator(); iterator.hasNext();)
200 {
201 SelectionKey selectionKey = iterator.next();
202 iterator.remove();
203 try
204 {
205 if (selectionKey.isValid())
206 {
207 final ISelectionKeyTask selectionKeyTask =
208 this.tasks.get(selectionKey);
209 if (selectionKeyTask != null)
210 {
211 selectionKeyTask.run();
212 }
213 else
214 {
215 if (getLog().isInfoEnabled())
216 {
217 getLog().info(
218 "No registered task for "
219 + safeToString(selectionKey));
220 }
221 }
222 }
223 else
224 {
225 final ISelectionKeyTask selectionKeyTask =
226 this.tasks.remove(selectionKey);
227 if (selectionKeyTask != null)
228 {
229 selectionKeyTask.destroy();
230 }
231 }
232 }
233 catch (Exception e)
234 {
235 logException(getLog(), "Could not process "
236 + safeToString(selectionKey), e);
237 }
238 }
239
240 }
241 else
242 {
243 openSelector();
244 }
245 }
246 catch (Exception e)
247 {
248 logException(getLog(), "Error processing selector "
249 + safeToString(this.bo), e);
250 if (!this.bo.isOpen())
251 {
252 openSelector();
253 }
254 else
255 {
256 destroyTasks();
257 }
258 }
259 return isActive();
260 }
261
262 /**
263 * Open the internal {@link Selector}
264 */
265 public void openSelector()
266 {
267 if (isActive())
268 {
269 if (getLog().isInfoEnabled())
270 {
271 if (this.bo != null)
272 {
273 getLog().info(
274 "Re-opening selector and destroying "
275 + safeToString(this.tasks.values()));
276 try
277 {
278 this.bo.close();
279 }
280 catch (IOException e)
281 {
282 logException(getLog(), "Error closing selector "
283 + safeToString(this.bo), e);
284 }
285 }
286 else
287 {
288 getLog().info("Opening selector");
289 }
290 }
291 destroyTasks();
292 try
293 {
294 this.bo = Selector.open();
295 }
296 catch (IOException e)
297 {
298 throw new IllegalStateException("Could not open selector", e);
299 }
300 if (this.listener != null)
301 {
302 this.listener.selectorOpened(this);
303 }
304 }
305 }
306
307 @Override
308 protected AsyncLog getLog()
309 {
310 return LOG;
311 }
312
313 @Override
314 protected final void doStart()
315 {
316
317 }
318
319 @Override
320 protected final void doDestroy()
321 {
322 try
323 {
324 destroyTasks();
325 if (this.bo != null)
326 {
327 this.bo.close();
328 }
329 }
330 catch (Exception e)
331 {
332 logException(getLog(), "Error destroying selector " + this.bo, e);
333 }
334 }
335
336 /**
337 * Set the state listener
338 *
339 * @param listener
340 */
341 public void setStateListener(ISelectorTasksStateListener listener)
342 {
343 this.listener = listener;
344 }
345
346 /**
347 * Get a {@link Runnable} to continually execute the {@link #process()}
348 * method.
349 *
350 * @return a runnable to execute this
351 */
352 public Runnable getRunnable()
353 {
354 checkSelector();
355 return this.new SelectorTaskProcessor();
356 }
357
358 private void checkSelector()
359 {
360 if (this.bo == null)
361 {
362 throw new IllegalStateException("Selector not open");
363 }
364 }
365
366 /**
367 * Destroy all registered {@link ISelectionKeyTask} objects and cancel their
368 * associated {@link SelectionKey}
369 */
370 private void destroyTasks()
371 {
372 if (this.bo != null)
373 {
374 Set<SelectionKey> keys = null;
375 keys = this.bo.keys();
376 for (SelectionKey selectionKey : keys)
377 {
378 if (!selectionKey.isValid())
379 {
380 selectionKey.cancel();
381 final ISelectionKeyTask task =
382 this.tasks.remove(selectionKey);
383 if (task != null)
384 {
385 task.destroy();
386 }
387 }
388 }
389 }
390 }
391
392 /**
393 * A {@link Runnable} implementation to execute the
394 * {@link SelectorTasks#process()} method.
395 *
396 * @author Ramon Servadei
397 *
398 */
399 private final class SelectorTaskProcessor implements Runnable
400 {
401 public void run()
402 {
403 while (SelectorTasks.this.process())
404 {
405
406 }
407 if (getLog().isInfoEnabled())
408 {
409 getLog().info("Finished");
410 }
411 }
412 }
413 }