1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package fulmine.util.io;
17
18 import static fulmine.util.Utils.logException;
19 import static fulmine.util.Utils.safeToString;
20
21 import java.io.IOException;
22 import java.nio.channels.Channel;
23 import java.nio.channels.ClosedChannelException;
24 import java.nio.channels.SelectableChannel;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.util.Iterator;
28 import java.util.Map;
29 import java.util.Set;
30 import java.util.concurrent.ConcurrentHashMap;
31
32 import fulmine.AbstractLifeCycle;
33 import fulmine.ILifeCycle;
34 import fulmine.util.log.AsyncLog;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85 public final class SelectorTasks extends AbstractLifeCycle implements
86 ILifeCycle
87 {
88 private final static AsyncLog LOG = new AsyncLog(SelectorTasks.class);
89
90
91
92
93
94 private final Map<SelectionKey, ISelectionKeyTask> tasks;
95
96
97 private Selector bo;
98
99
100 private ISelectorTasksStateListener listener;
101
102
103
104
105 public SelectorTasks()
106 {
107 super();
108 this.tasks = new ConcurrentHashMap<SelectionKey, ISelectionKeyTask>();
109 start();
110 }
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 @SuppressWarnings("boxing")
129 public void register(int op, SelectableChannel channel,
130 ISelectionKeyTask task)
131 {
132 checkSelector();
133 try
134 {
135 synchronized (this)
136 {
137
138
139 this.bo.wakeup();
140 final SelectionKey selectionKey = channel.register(this.bo, op);
141 if (getLog().isDebugEnabled())
142 {
143 getLog().debug(
144 "Registered " + safeToString(task) + " against "
145 + safeToString(channel));
146 }
147 task.setSelectionKey(selectionKey);
148 this.tasks.put(selectionKey, task);
149 }
150 }
151 catch (ClosedChannelException e)
152 {
153 logException(getLog(), "Could not register operation "
154 + safeToString(op) + " for " + safeToString(channel) + " with "
155 + safeToString(task), e);
156 }
157 }
158
159
160
161
162
163
164
165
166
167
168 public ISelectionKeyTask unregister(SelectionKey key)
169 {
170 key.cancel();
171 final ISelectionKeyTask task = this.tasks.remove(key);
172 if (getLog().isDebugEnabled())
173 {
174 getLog().debug("Unregistered " + safeToString(task));
175 }
176 return task;
177 }
178
179
180
181
182
183
184
185
186 public boolean process()
187 {
188 checkSelector();
189 try
190 {
191 this.bo.select();
192 if (this.bo.isOpen())
193 {
194 Set<SelectionKey> selectedKeys = null;
195 synchronized (this)
196 {
197 selectedKeys = this.bo.selectedKeys();
198 }
199 for (Iterator<SelectionKey> iterator = selectedKeys.iterator(); iterator.hasNext();)
200 {
201 SelectionKey selectionKey = iterator.next();
202 iterator.remove();
203 try
204 {
205 if (selectionKey.isValid())
206 {
207 final ISelectionKeyTask selectionKeyTask =
208 this.tasks.get(selectionKey);
209 if (selectionKeyTask != null)
210 {
211 selectionKeyTask.run();
212 }
213 else
214 {
215 if (getLog().isInfoEnabled())
216 {
217 getLog().info(
218 "No registered task for "
219 + safeToString(selectionKey));
220 }
221 }
222 }
223 else
224 {
225 final ISelectionKeyTask selectionKeyTask =
226 this.tasks.remove(selectionKey);
227 if (selectionKeyTask != null)
228 {
229 selectionKeyTask.destroy();
230 }
231 }
232 }
233 catch (Exception e)
234 {
235 logException(getLog(), "Could not process "
236 + safeToString(selectionKey), e);
237 }
238 }
239
240 }
241 else
242 {
243 openSelector();
244 }
245 }
246 catch (Exception e)
247 {
248 logException(getLog(), "Error processing selector "
249 + safeToString(this.bo), e);
250 if (!this.bo.isOpen())
251 {
252 openSelector();
253 }
254 else
255 {
256 destroyTasks();
257 }
258 }
259 return isActive();
260 }
261
262
263
264
265 public void openSelector()
266 {
267 if (isActive())
268 {
269 if (getLog().isInfoEnabled())
270 {
271 if (this.bo != null)
272 {
273 getLog().info(
274 "Re-opening selector and destroying "
275 + safeToString(this.tasks.values()));
276 try
277 {
278 this.bo.close();
279 }
280 catch (IOException e)
281 {
282 logException(getLog(), "Error closing selector "
283 + safeToString(this.bo), e);
284 }
285 }
286 else
287 {
288 getLog().info("Opening selector");
289 }
290 }
291 destroyTasks();
292 try
293 {
294 this.bo = Selector.open();
295 }
296 catch (IOException e)
297 {
298 throw new IllegalStateException("Could not open selector", e);
299 }
300 if (this.listener != null)
301 {
302 this.listener.selectorOpened(this);
303 }
304 }
305 }
306
307 @Override
308 protected AsyncLog getLog()
309 {
310 return LOG;
311 }
312
313 @Override
314 protected final void doStart()
315 {
316
317 }
318
319 @Override
320 protected final void doDestroy()
321 {
322 try
323 {
324 destroyTasks();
325 if (this.bo != null)
326 {
327 this.bo.close();
328 }
329 }
330 catch (Exception e)
331 {
332 logException(getLog(), "Error destroying selector " + this.bo, e);
333 }
334 }
335
336
337
338
339
340
341 public void setStateListener(ISelectorTasksStateListener listener)
342 {
343 this.listener = listener;
344 }
345
346
347
348
349
350
351
352 public Runnable getRunnable()
353 {
354 checkSelector();
355 return this.new SelectorTaskProcessor();
356 }
357
358 private void checkSelector()
359 {
360 if (this.bo == null)
361 {
362 throw new IllegalStateException("Selector not open");
363 }
364 }
365
366
367
368
369
370 private void destroyTasks()
371 {
372 if (this.bo != null)
373 {
374 Set<SelectionKey> keys = null;
375 keys = this.bo.keys();
376 for (SelectionKey selectionKey : keys)
377 {
378 if (!selectionKey.isValid())
379 {
380 selectionKey.cancel();
381 final ISelectionKeyTask task =
382 this.tasks.remove(selectionKey);
383 if (task != null)
384 {
385 task.destroy();
386 }
387 }
388 }
389 }
390 }
391
392
393
394
395
396
397
398
399 private final class SelectorTaskProcessor implements Runnable
400 {
401 public void run()
402 {
403 while (SelectorTasks.this.process())
404 {
405
406 }
407 if (getLog().isInfoEnabled())
408 {
409 getLog().info("Finished");
410 }
411 }
412 }
413 }