View Javadoc

1   /*
2      Copyright 2007 Ramon Servadei
3   
4      Licensed under the Apache License, Version 2.0 (the "License");
5      you may not use this file except in compliance with the License.
6      You may obtain a copy of the License at
7   
8          http://www.apache.org/licenses/LICENSE-2.0
9   
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15   */
16  package fulmine.util.io;
17  
18  import static fulmine.util.Utils.logException;
19  import static fulmine.util.Utils.safeToString;
20  
21  import java.io.IOException;
22  import java.nio.channels.Channel;
23  import java.nio.channels.ClosedChannelException;
24  import java.nio.channels.SelectableChannel;
25  import java.nio.channels.SelectionKey;
26  import java.nio.channels.Selector;
27  import java.util.Iterator;
28  import java.util.Map;
29  import java.util.Set;
30  import java.util.concurrent.ConcurrentHashMap;
31  
32  import fulmine.AbstractLifeCycle;
33  import fulmine.ILifeCycle;
34  import fulmine.util.log.AsyncLog;
35  
36  /**
37   * A collection of {@link ISelectionKeyTask} objects that handle the logic for
38   * the ready operations raised from registered NIO {@link Channel} objects. This
39   * class exists to provide a mechanism to manage the tasks that should be run
40   * for each channel that is registered against a single {@link Selector}. Each
41   * instance has an internal {@link Selector} that is used to register with the
42   * {@link Channel} supplied in the
43   * {@link #register(int, SelectableChannel, ISelectionKeyTask)} method.
44   * <p>
45   * This object invokes the {@link Selector#select()} method and blocks until a
46   * channel is available for one of the registered operations. When this happens
47   * the appropriate task is located and executed.
48   * 
49   * <pre>
50   *  // create and register
51   *  SelectorTasks tasks = new SelectorTasks();
52   *  // create an optional listener to get notified when the selector is opened
53   *  ISelectorTasksStateListener = ...
54   *  tasks.setStateListener(listener);
55   *  
56   *  // OPEN THE CHANNEL - no other operation is valid until this is run
57   *  tasks.openSelector();
58   *  
59   *  SelectableChannel channel = ...;
60   *  ISelectionKeyTask task = ...;
61   *  tasks.register(SelectionKey.OP_READ, channel, task);
62   *  
63   *  // process - this blocks until a ready operation occurs, 
64   *  // in the real-world, this would be called in a separate thread
65   *  while(tasks.process());
66   * </pre>
67   * 
68   * There should only be 1 thread running the process method. The
69   * {@link #getRunnable()} method returns an {@link Runnable} object for a
70   * dedicated processing thread. e.g.
71   * 
72   * <pre>
73   *  // create and register
74   *  SelectorTasks tasks = new SelectorTasks();
75   *  ...
76   *  // kick off a thread to run the process() method 
77   *  new Thread(tasks.getRunnable()).start();
78   * </pre>
79   * 
80   * <b>This is not thread aware.</b>
81   * 
82   * @author Ramon Servadei
83   * 
84   */
85  public final class SelectorTasks extends AbstractLifeCycle implements
86      ILifeCycle
87  {
88      private final static AsyncLog LOG = new AsyncLog(SelectorTasks.class);
89  
90      /**
91       * The {@link ISelectionKeyTask} objects registered via
92       * {@link #register(int, SelectableChannel, ISelectionKeyTask)}
93       */
94      private final Map<SelectionKey, ISelectionKeyTask> tasks;
95  
96      /** The selector */
97      private Selector bo;
98  
99      /** The state listener */
100     private ISelectorTasksStateListener listener;
101 
102     /**
103      * Standard constructor
104      */
105     public SelectorTasks()
106     {
107         super();
108         this.tasks = new ConcurrentHashMap<SelectionKey, ISelectionKeyTask>();
109         start();
110     }
111 
112     /**
113      * Register the internal {@link Selector} with the channel and map the
114      * supplied task to the {@link SelectionKey} that is returned from the
115      * {@link SelectableChannel#register(Selector, int)} method.
116      * 
117      * @param op
118      *            the operation to register for
119      * @param channel
120      *            the channel that will register with the internal selector
121      * @param task
122      *            the task to run when the operation occurs in the channel. This
123      *            task should be efficient; if it does any blocking this affects
124      *            processing of other registered {@link ISelectionKeyTask}
125      *            objects.
126      * @see SelectionKey
127      */
128     @SuppressWarnings("boxing")
129     public void register(int op, SelectableChannel channel,
130         ISelectionKeyTask task)
131     {
132         checkSelector();
133         try
134         {
135             synchronized (this)
136             {
137                 // register blocks until the selector is available
138                 // so wake it up
139                 this.bo.wakeup();
140                 final SelectionKey selectionKey = channel.register(this.bo, op);
141                 if (getLog().isDebugEnabled())
142                 {
143                     getLog().debug(
144                         "Registered " + safeToString(task) + " against "
145                             + safeToString(channel));
146                 }
147                 task.setSelectionKey(selectionKey);
148                 this.tasks.put(selectionKey, task);
149             }
150         }
151         catch (ClosedChannelException e)
152         {
153             logException(getLog(), "Could not register operation "
154                 + safeToString(op) + " for " + safeToString(channel) + " with "
155                 + safeToString(task), e);
156         }
157     }
158 
159     /**
160      * Unregister the selection key and remove the associated
161      * {@link ISelectionKeyTask}. The key is also cancelled via the
162      * {@link SelectionKey#cancel()} method.
163      * 
164      * @param key
165      *            the selection key to unregister and cancel
166      * @return the previously registered task
167      */
168     public ISelectionKeyTask unregister(SelectionKey key)
169     {
170         key.cancel();
171         final ISelectionKeyTask task = this.tasks.remove(key);
172         if (getLog().isDebugEnabled())
173         {
174             getLog().debug("Unregistered " + safeToString(task));
175         }
176         return task;
177     }
178 
179     /**
180      * Call {@link Selector#select()} and wait for a one of the operations in
181      * any of the registered channels to occur. When one does, the appropriate
182      * {@link ISelectionKeyTask} is run.
183      * 
184      * @return <code>false</code> if this becomes inactive
185      */
186     public boolean process()
187     {
188         checkSelector();
189         try
190         {
191             this.bo.select();
192             if (this.bo.isOpen())
193             {
194                 Set<SelectionKey> selectedKeys = null;
195                 synchronized (this)
196                 {
197                     selectedKeys = this.bo.selectedKeys();
198                 }
199                 for (Iterator<SelectionKey> iterator = selectedKeys.iterator(); iterator.hasNext();)
200                 {
201                     SelectionKey selectionKey = iterator.next();
202                     iterator.remove();
203                     try
204                     {
205                         if (selectionKey.isValid())
206                         {
207                             final ISelectionKeyTask selectionKeyTask =
208                                 this.tasks.get(selectionKey);
209                             if (selectionKeyTask != null)
210                             {
211                                 selectionKeyTask.run();
212                             }
213                             else
214                             {
215                                 if (getLog().isInfoEnabled())
216                                 {
217                                     getLog().info(
218                                         "No registered task for "
219                                             + safeToString(selectionKey));
220                                 }
221                             }
222                         }
223                         else
224                         {
225                             final ISelectionKeyTask selectionKeyTask =
226                                 this.tasks.remove(selectionKey);
227                             if (selectionKeyTask != null)
228                             {
229                                 selectionKeyTask.destroy();
230                             }
231                         }
232                     }
233                     catch (Exception e)
234                     {
235                         logException(getLog(), "Could not process "
236                             + safeToString(selectionKey), e);
237                     }
238                 }
239 
240             }
241             else
242             {
243                 openSelector();
244             }
245         }
246         catch (Exception e)
247         {
248             logException(getLog(), "Error processing selector "
249                 + safeToString(this.bo), e);
250             if (!this.bo.isOpen())
251             {
252                 openSelector();
253             }
254             else
255             {
256                 destroyTasks();
257             }
258         }
259         return isActive();
260     }
261 
262     /**
263      * Open the internal {@link Selector}
264      */
265     public void openSelector()
266     {
267         if (isActive())
268         {
269             if (getLog().isInfoEnabled())
270             {
271                 if (this.bo != null)
272                 {
273                     getLog().info(
274                         "Re-opening selector and destroying "
275                             + safeToString(this.tasks.values()));
276                     try
277                     {
278                         this.bo.close();
279                     }
280                     catch (IOException e)
281                     {
282                         logException(getLog(), "Error closing selector "
283                             + safeToString(this.bo), e);
284                     }
285                 }
286                 else
287                 {
288                     getLog().info("Opening selector");
289                 }
290             }
291             destroyTasks();
292             try
293             {
294                 this.bo = Selector.open();
295             }
296             catch (IOException e)
297             {
298                 throw new IllegalStateException("Could not open selector", e);
299             }
300             if (this.listener != null)
301             {
302                 this.listener.selectorOpened(this);
303             }
304         }
305     }
306 
307     @Override
308     protected AsyncLog getLog()
309     {
310         return LOG;
311     }
312 
313     @Override
314     protected final void doStart()
315     {
316 
317     }
318 
319     @Override
320     protected final void doDestroy()
321     {
322         try
323         {
324             destroyTasks();
325             if (this.bo != null)
326             {
327                 this.bo.close();
328             }
329         }
330         catch (Exception e)
331         {
332             logException(getLog(), "Error destroying selector " + this.bo, e);
333         }
334     }
335 
336     /**
337      * Set the state listener
338      * 
339      * @param listener
340      */
341     public void setStateListener(ISelectorTasksStateListener listener)
342     {
343         this.listener = listener;
344     }
345 
346     /**
347      * Get a {@link Runnable} to continually execute the {@link #process()}
348      * method.
349      * 
350      * @return a runnable to execute this
351      */
352     public Runnable getRunnable()
353     {
354         checkSelector();
355         return this.new SelectorTaskProcessor();
356     }
357 
358     private void checkSelector()
359     {
360         if (this.bo == null)
361         {
362             throw new IllegalStateException("Selector not open");
363         }
364     }
365 
366     /**
367      * Destroy all registered {@link ISelectionKeyTask} objects and cancel their
368      * associated {@link SelectionKey}
369      */
370     private void destroyTasks()
371     {
372         if (this.bo != null)
373         {
374             Set<SelectionKey> keys = null;
375             keys = this.bo.keys();
376             for (SelectionKey selectionKey : keys)
377             {
378                 if (!selectionKey.isValid())
379                 {
380                     selectionKey.cancel();
381                     final ISelectionKeyTask task =
382                         this.tasks.remove(selectionKey);
383                     if (task != null)
384                     {
385                         task.destroy();
386                     }
387                 }
388             }
389         }
390     }
391 
392     /**
393      * A {@link Runnable} implementation to execute the
394      * {@link SelectorTasks#process()} method.
395      * 
396      * @author Ramon Servadei
397      * 
398      */
399     private final class SelectorTaskProcessor implements Runnable
400     {
401         public void run()
402         {
403             while (SelectorTasks.this.process())
404             {
405 
406             }
407             if (getLog().isInfoEnabled())
408             {
409                 getLog().info("Finished");
410             }
411         }
412     }
413 }