1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  package fulmine.distribution.connection.tcp;
17  
18  import static fulmine.util.Utils.COLON;
19  import static fulmine.util.Utils.COMMA_SPACE;
20  import static fulmine.util.Utils.string;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.nio.channels.SelectionKey;
25  import java.nio.channels.Selector;
26  import java.nio.channels.SocketChannel;
27  import java.util.List;
28  
29  import fulmine.context.IFrameworkContext;
30  import fulmine.distribution.connection.IConnection;
31  import fulmine.distribution.connection.IConnectionBroker;
32  import fulmine.distribution.events.ConnectionDestroyedEvent;
33  import fulmine.distribution.events.MessageConsumedEvent;
34  import fulmine.event.EventSource;
35  import fulmine.event.IEvent;
36  import fulmine.event.IEventSource;
37  import fulmine.event.listener.AbstractEventHandler;
38  import fulmine.event.listener.IEventListener;
39  import fulmine.event.listener.MultiEventListener;
40  import fulmine.util.log.AsyncLog;
41  
42  
43  
44  
45  
46  
47  
48  public abstract class AbstractSocketChannelConnection extends
49      TcpConnectionParameters implements IConnection
50  {
51      
52  
53  
54  
55  
56  
57  
58  
59  
60      private final class MessageConsumedEventHandler extends
61          AbstractEventHandler<MessageConsumedEvent>
62      {
63          @Override
64          public void handle(MessageConsumedEvent event)
65          {
66              boolean destroy = false;
67              synchronized (AbstractSocketChannelConnection.this)
68              {
69                  AbstractSocketChannelConnection.this.messagesToProcess--;
70                  if (AbstractSocketChannelConnection.this.messagesToProcess == 0
71                      && AbstractSocketChannelConnection.this.destroyWhenMessagesProcessed)
72                  {
73                      destroy = true;
74                  }
75              }
76              if (destroy)
77              {
78                  
79                  AbstractSocketChannelConnection.this.doComponentDestroy();
80              }
81          }
82  
83          @Override
84          protected AsyncLog getLog()
85          {
86              return AbstractSocketChannelConnection.this.getLog();
87          }
88  
89          @Override
90          public String toString()
91          {
92              return getClass().getSimpleName();
93          }
94      }
95  
96      
97      private SocketChannel socketChannel;
98  
99      
100     protected IEventSource eventSourceDelegate;
101 
102     
103 
104 
105 
106     private boolean outbound = true;
107 
108     
109 
110 
111 
112 
113 
114 
115     private boolean connectionEstablished;
116 
117     
118     private final IEventListener messageConsumedEventHandler;
119 
120     
121 
122 
123 
124 
125 
126     protected volatile boolean destroyWhenMessagesProcessed;
127 
128     
129     private final IFrameworkContext context;
130 
131     
132 
133 
134 
135 
136 
137 
138 
139 
140     protected int messagesToProcess;
141 
142     
143 
144 
145 
146 
147 
148 
149 
150 
151 
152 
153 
154 
155 
156 
157 
158     @SuppressWarnings("unchecked")
159     public AbstractSocketChannelConnection(IFrameworkContext context,
160         String identity, String address, int port, int connectionHashCode)
161     {
162         super(identity, connectionHashCode, address, port);
163         this.context = context;
164         this.messageConsumedEventHandler =
165             new MultiEventListener(
166                 MessageConsumedEventHandler.class.getSimpleName(),
167                 context,
168                 AbstractEventHandler.getEventHandlerMappings(this.new MessageConsumedEventHandler()));
169     }
170 
171     
172     protected void doComponentDestroy()
173     {
174         
175         if (getLog().isInfoEnabled())
176         {
177             getLog().info("Destroying[2] " + this + " (stacktrace follows)",
178                 new Exception());
179         }
180         if (this.eventSourceDelegate != null)
181         {
182             this.eventSourceDelegate.destroy();
183         }
184     }
185 
186     protected IFrameworkContext getContext()
187     {
188         return this.context;
189     }
190 
191     @Override
192     protected void doStart()
193     {
194         try
195         {
196             if (isOutbound())
197             {
198                 final SocketChannel localSocketChannel = SocketChannel.open();
199                 localSocketChannel.configureBlocking(false);
200                 
201                 
202                 if (!localSocketChannel.connect(new InetSocketAddress(
203                     getRemoteHostAddress(), getRemoteHostTcpPort())))
204                 {
205                     
206                     
207                     final Selector selector = Selector.open();
208                     final SelectionKey key =
209                         localSocketChannel.register(selector,
210                             SelectionKey.OP_CONNECT);
211                     selector.select();
212                     localSocketChannel.finishConnect();
213                     key.cancel();
214                     selector.close();
215                 }
216                 if (getLog().isDebugEnabled())
217                 {
218                     getLog().debug("(->) Connected to " + this);
219                 }
220                 setSocketChannel(localSocketChannel);
221             }
222             else
223             {
224                 if (getLog().isDebugEnabled())
225                 {
226                     getLog().debug("(<-) Connected to " + this);
227                 }
228             }
229         }
230         catch (Exception e)
231         {
232             this.socketChannel = null;
233             final String warning =
234                 "Could not create socket to " + getAddress() + COLON
235                     + getRemoteHostTcpPort() + ". Calling destroy().";
236             destroy();
237             throw new RuntimeException(warning, e);
238         }
239     }
240 
241     public boolean isConnected()
242     {
243         return this.socketChannel != null
244             && this.socketChannel.socket() != null
245             && this.socketChannel.socket().isConnected();
246     }
247 
248     
249 
250 
251 
252 
253 
254 
255 
256     void setOutbound(boolean outbound)
257     {
258         this.outbound = outbound;
259     }
260 
261     
262 
263 
264 
265 
266 
267 
268     public boolean isOutbound()
269     {
270         return outbound;
271     }
272 
273     
274 
275 
276     @Override
277     protected void doDestroy()
278     {
279         if (getLog().isInfoEnabled())
280         {
281             getLog().info("Destroying[1] " + this + " (stacktrace follows)",
282                 new Exception());
283         }
284         if (this.socketChannel != null)
285         {
286             try
287             {
288                 if (this.socketChannel.socket() != null)
289                 {
290                     this.socketChannel.socket().getOutputStream().flush();
291                     this.socketChannel.socket().shutdownInput();
292                     this.socketChannel.socket().shutdownOutput();
293                     this.socketChannel.socket().close();
294                 }
295                 this.socketChannel.close();
296             }
297             catch (IOException e)
298             {
299                 if (getLog().isDebugEnabled())
300                 {
301                     getLog().debug("Could not close " + this, e);
302                 }
303             }
304         }
305         synchronized (this)
306         {
307             if (this.messagesToProcess == 0)
308             {
309                 doComponentDestroy();
310             }
311         }
312         if (getContext() != null && getContext().isActive())
313         {
314             getContext().queueEvent(
315                 new ConnectionDestroyedEvent(getContext(),
316                     this.getRemoteContextIdentity()));
317         }
318         if (getLog().isInfoEnabled())
319         {
320             getLog().info("Destroyed " + this);
321         }
322     }
323 
324     public void addEvent(IEvent event)
325     {
326         this.eventSourceDelegate.addEvent(event);
327     }
328 
329     public boolean addListener(IEventListener listener)
330     {
331         return this.eventSourceDelegate.addListener(listener);
332     }
333 
334     public byte getEventSourceGroupId()
335     {
336         return this.eventSourceDelegate.getEventSourceGroupId();
337     }
338 
339     public List<IEventListener> getListeners()
340     {
341         return this.eventSourceDelegate.getListeners();
342     }
343 
344     public boolean removeListener(IEventListener listener)
345     {
346         return this.eventSourceDelegate.removeListener(listener);
347     }
348 
349     public List<IEventListener> removeListeners()
350     {
351         return this.eventSourceDelegate.removeListeners();
352     }
353 
354     public String toDetailedString()
355     {
356         return toString();
357     }
358 
359     public String toIdentityString()
360     {
361         return super.toString();
362     }
363 
364     protected SocketChannel getSocketChannel()
365     {
366         return this.socketChannel;
367     }
368 
369     
370 
371 
372 
373 
374 
375 
376 
377 
378 
379 
380 
381 
382 
383 
384 
385 
386     protected boolean isConnectionHandshakeComplete()
387     {
388         return this.connectionEstablished;
389     }
390 
391     
392 
393 
394 
395 
396 
397     protected void connectionHandshakeComplete()
398     {
399         this.connectionEstablished = true;
400         this.eventSourceDelegate = new EventSource(toString());
401         this.eventSourceDelegate.start();
402         addListener(messageConsumedEventHandler);
403     }
404 
405     void setSocketChannel(SocketChannel socketChannel)
406     {
407         this.socketChannel = socketChannel;
408     }
409 
410     @Override
411     public String toString()
412     {
413         if (this.socketChannel != null)
414         {
415             return string(this, getIdentity() + COMMA_SPACE
416                 + this.socketChannel.socket() + ", remoteContextHashcode="
417                 + getRemoteContextHashCode());
418         }
419         return super.toString();
420     }
421 }