1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package fulmine.distribution.connection.tcp;
17
18 import static fulmine.util.Utils.COLON;
19 import static fulmine.util.Utils.COMMA_SPACE;
20 import static fulmine.util.Utils.string;
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.nio.channels.SelectionKey;
25 import java.nio.channels.Selector;
26 import java.nio.channels.SocketChannel;
27 import java.util.List;
28
29 import fulmine.context.IFrameworkContext;
30 import fulmine.distribution.connection.IConnection;
31 import fulmine.distribution.connection.IConnectionBroker;
32 import fulmine.distribution.events.ConnectionDestroyedEvent;
33 import fulmine.distribution.events.MessageConsumedEvent;
34 import fulmine.event.EventSource;
35 import fulmine.event.IEvent;
36 import fulmine.event.IEventSource;
37 import fulmine.event.listener.AbstractEventHandler;
38 import fulmine.event.listener.IEventListener;
39 import fulmine.event.listener.MultiEventListener;
40 import fulmine.util.log.AsyncLog;
41
42
43
44
45
46
47
48 public abstract class AbstractSocketChannelConnection extends
49 TcpConnectionParameters implements IConnection
50 {
51
52
53
54
55
56
57
58
59
60 private final class MessageConsumedEventHandler extends
61 AbstractEventHandler<MessageConsumedEvent>
62 {
63 @Override
64 public void handle(MessageConsumedEvent event)
65 {
66 boolean destroy = false;
67 synchronized (AbstractSocketChannelConnection.this)
68 {
69 AbstractSocketChannelConnection.this.messagesToProcess--;
70 if (AbstractSocketChannelConnection.this.messagesToProcess == 0
71 && AbstractSocketChannelConnection.this.destroyWhenMessagesProcessed)
72 {
73 destroy = true;
74 }
75 }
76 if (destroy)
77 {
78
79 AbstractSocketChannelConnection.this.doComponentDestroy();
80 }
81 }
82
83 @Override
84 protected AsyncLog getLog()
85 {
86 return AbstractSocketChannelConnection.this.getLog();
87 }
88
89 @Override
90 public String toString()
91 {
92 return getClass().getSimpleName();
93 }
94 }
95
96
97 private SocketChannel socketChannel;
98
99
100 protected IEventSource eventSourceDelegate;
101
102
103
104
105
106 private boolean outbound = true;
107
108
109
110
111
112
113
114
115 private boolean connectionEstablished;
116
117
118 private final IEventListener messageConsumedEventHandler;
119
120
121
122
123
124
125
126 protected volatile boolean destroyWhenMessagesProcessed;
127
128
129 private final IFrameworkContext context;
130
131
132
133
134
135
136
137
138
139
140 protected int messagesToProcess;
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158 @SuppressWarnings("unchecked")
159 public AbstractSocketChannelConnection(IFrameworkContext context,
160 String identity, String address, int port, int connectionHashCode)
161 {
162 super(identity, connectionHashCode, address, port);
163 this.context = context;
164 this.messageConsumedEventHandler =
165 new MultiEventListener(
166 MessageConsumedEventHandler.class.getSimpleName(),
167 context,
168 AbstractEventHandler.getEventHandlerMappings(this.new MessageConsumedEventHandler()));
169 }
170
171
172 protected void doComponentDestroy()
173 {
174
175 if (getLog().isInfoEnabled())
176 {
177 getLog().info("Destroying[2] " + this + " (stacktrace follows)",
178 new Exception());
179 }
180 if (this.eventSourceDelegate != null)
181 {
182 this.eventSourceDelegate.destroy();
183 }
184 }
185
186 protected IFrameworkContext getContext()
187 {
188 return this.context;
189 }
190
191 @Override
192 protected void doStart()
193 {
194 try
195 {
196 if (isOutbound())
197 {
198 final SocketChannel localSocketChannel = SocketChannel.open();
199 localSocketChannel.configureBlocking(false);
200
201
202 if (!localSocketChannel.connect(new InetSocketAddress(
203 getRemoteHostAddress(), getRemoteHostTcpPort())))
204 {
205
206
207 final Selector selector = Selector.open();
208 final SelectionKey key =
209 localSocketChannel.register(selector,
210 SelectionKey.OP_CONNECT);
211 selector.select();
212 localSocketChannel.finishConnect();
213 key.cancel();
214 selector.close();
215 }
216 if (getLog().isDebugEnabled())
217 {
218 getLog().debug("(->) Connected to " + this);
219 }
220 setSocketChannel(localSocketChannel);
221 }
222 else
223 {
224 if (getLog().isDebugEnabled())
225 {
226 getLog().debug("(<-) Connected to " + this);
227 }
228 }
229 }
230 catch (Exception e)
231 {
232 this.socketChannel = null;
233 final String warning =
234 "Could not create socket to " + getAddress() + COLON
235 + getRemoteHostTcpPort() + ". Calling destroy().";
236 destroy();
237 throw new RuntimeException(warning, e);
238 }
239 }
240
241 public boolean isConnected()
242 {
243 return this.socketChannel != null
244 && this.socketChannel.socket() != null
245 && this.socketChannel.socket().isConnected();
246 }
247
248
249
250
251
252
253
254
255
256 void setOutbound(boolean outbound)
257 {
258 this.outbound = outbound;
259 }
260
261
262
263
264
265
266
267
268 public boolean isOutbound()
269 {
270 return outbound;
271 }
272
273
274
275
276 @Override
277 protected void doDestroy()
278 {
279 if (getLog().isInfoEnabled())
280 {
281 getLog().info("Destroying[1] " + this + " (stacktrace follows)",
282 new Exception());
283 }
284 if (this.socketChannel != null)
285 {
286 try
287 {
288 if (this.socketChannel.socket() != null)
289 {
290 this.socketChannel.socket().getOutputStream().flush();
291 this.socketChannel.socket().shutdownInput();
292 this.socketChannel.socket().shutdownOutput();
293 this.socketChannel.socket().close();
294 }
295 this.socketChannel.close();
296 }
297 catch (IOException e)
298 {
299 if (getLog().isDebugEnabled())
300 {
301 getLog().debug("Could not close " + this, e);
302 }
303 }
304 }
305 synchronized (this)
306 {
307 if (this.messagesToProcess == 0)
308 {
309 doComponentDestroy();
310 }
311 }
312 if (getContext() != null && getContext().isActive())
313 {
314 getContext().queueEvent(
315 new ConnectionDestroyedEvent(getContext(),
316 this.getRemoteContextIdentity()));
317 }
318 if (getLog().isInfoEnabled())
319 {
320 getLog().info("Destroyed " + this);
321 }
322 }
323
324 public void addEvent(IEvent event)
325 {
326 this.eventSourceDelegate.addEvent(event);
327 }
328
329 public boolean addListener(IEventListener listener)
330 {
331 return this.eventSourceDelegate.addListener(listener);
332 }
333
334 public byte getEventSourceGroupId()
335 {
336 return this.eventSourceDelegate.getEventSourceGroupId();
337 }
338
339 public List<IEventListener> getListeners()
340 {
341 return this.eventSourceDelegate.getListeners();
342 }
343
344 public boolean removeListener(IEventListener listener)
345 {
346 return this.eventSourceDelegate.removeListener(listener);
347 }
348
349 public List<IEventListener> removeListeners()
350 {
351 return this.eventSourceDelegate.removeListeners();
352 }
353
354 public String toDetailedString()
355 {
356 return toString();
357 }
358
359 public String toIdentityString()
360 {
361 return super.toString();
362 }
363
364 protected SocketChannel getSocketChannel()
365 {
366 return this.socketChannel;
367 }
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386 protected boolean isConnectionHandshakeComplete()
387 {
388 return this.connectionEstablished;
389 }
390
391
392
393
394
395
396
397 protected void connectionHandshakeComplete()
398 {
399 this.connectionEstablished = true;
400 this.eventSourceDelegate = new EventSource(toString());
401 this.eventSourceDelegate.start();
402 addListener(messageConsumedEventHandler);
403 }
404
405 void setSocketChannel(SocketChannel socketChannel)
406 {
407 this.socketChannel = socketChannel;
408 }
409
410 @Override
411 public String toString()
412 {
413 if (this.socketChannel != null)
414 {
415 return string(this, getIdentity() + COMMA_SPACE
416 + this.socketChannel.socket() + ", remoteContextHashcode="
417 + getRemoteContextHashCode());
418 }
419 return super.toString();
420 }
421 }