1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package fulmine.distribution.connection.tcp;
17
18 import static fulmine.distribution.connection.tcp.ProtocolMessageConstants.CONNECTION_MSG_TYPE;
19 import static fulmine.distribution.connection.tcp.ProtocolMessageConstants.DELIMITER;
20 import static fulmine.util.Utils.COLON;
21 import static fulmine.util.Utils.EMPTY_STRING;
22 import static fulmine.util.Utils.logException;
23
24 import java.io.IOException;
25 import java.net.InetAddress;
26 import java.net.InetSocketAddress;
27 import java.net.UnknownHostException;
28 import java.nio.channels.SelectionKey;
29 import java.nio.channels.ServerSocketChannel;
30 import java.nio.channels.SocketChannel;
31
32 import fulmine.AbstractLifeCycle;
33 import fulmine.context.IFrameworkContext;
34 import fulmine.distribution.connection.IConnectionBroker;
35 import fulmine.distribution.connection.IConnectionParameters;
36 import fulmine.event.EventFrameExecution;
37 import fulmine.model.container.IContainer;
38 import fulmine.model.field.IntegerField;
39 import fulmine.model.field.StringField;
40 import fulmine.protocol.specification.ByteWriter;
41 import fulmine.util.io.AbstractSelectionKeyTask;
42 import fulmine.util.io.ISelectorTasksStateListener;
43 import fulmine.util.io.SelectorTasks;
44 import fulmine.util.log.AsyncLog;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 public final class TcpConnectionBroker extends AbstractLifeCycle implements
61 IConnectionBroker
62 {
63 final static AsyncLog LOG = new AsyncLog(TcpConnectionBroker.class);
64
65
66
67
68
69
70
71 public static final String CONTEXT_IP_ADDRESS = "context.ip.address";
72
73
74 public static final String CONTEXT_TCP_PORT = "context.tcp.port";
75
76
77 public static final int DEFAULT_CONTEXT_TCP_PORT = 16207;
78
79
80
81
82
83 public static final String CONTEXT_RCV_BUFFER = "context.tcp.buffer.rcv";
84
85
86 public final static String HOST_NAME;
87 static
88 {
89 String name = null;
90 try
91 {
92 name = InetAddress.getLocalHost().getHostName();
93 }
94 catch (UnknownHostException e)
95 {
96 LOG.fatal("Could not get host name", e);
97 System.exit(1);
98 }
99 HOST_NAME = name;
100 }
101
102
103 private final String address;
104
105
106 private final int port;
107
108
109 ServerSocketChannel serverSocketChannel;
110
111
112 private final SelectorTasks selectorTasks;
113
114
115 private final ConnectionAcceptTask connectionAcceptTask;
116
117
118 private Thread connectionThread;
119
120
121 final IFrameworkContext context;
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136 public TcpConnectionBroker(IFrameworkContext context, String address,
137 int port)
138 {
139 super();
140 this.context = context;
141 this.address =
142 address == null ? TcpConnectionBroker.HOST_NAME : address;
143 this.port =
144 port <= 0 ? TcpConnectionBroker.DEFAULT_CONTEXT_TCP_PORT : port;
145 this.selectorTasks = new SelectorTasks();
146 this.connectionAcceptTask = new ConnectionAcceptTask();
147 this.selectorTasks.setStateListener(this.connectionAcceptTask);
148
149 if (getLog().isInfoEnabled())
150 {
151 getLog().info("Created " + this);
152 }
153 }
154
155
156
157
158
159
160
161
162
163
164
165 public TcpConnectionBroker(IFrameworkContext context)
166 {
167 this(context, System.getProperty(
168 TcpConnectionBroker.CONTEXT_IP_ADDRESS,
169 TcpConnectionBroker.HOST_NAME),
170 Integer.parseInt(System.getProperty(
171 TcpConnectionBroker.CONTEXT_TCP_PORT, EMPTY_STRING
172 + TcpConnectionBroker.DEFAULT_CONTEXT_TCP_PORT)));
173 }
174
175 public String getAddress()
176 {
177 return this.address;
178 }
179
180 public int getPort()
181 {
182 return this.port;
183 }
184
185 @Override
186 protected void doDestroy()
187 {
188 if (getLog().isInfoEnabled())
189 {
190 getLog().info("Destroying " + this);
191 }
192 if (this.serverSocketChannel != null)
193 {
194 try
195 {
196 this.serverSocketChannel.close();
197 }
198 catch (IOException e)
199 {
200 if (getLog().isDebugEnabled())
201 {
202 getLog().debug("Could not close " + this, e);
203 }
204 }
205 }
206 this.selectorTasks.destroy();
207 if (getLog().isInfoEnabled())
208 {
209 getLog().info("Destroyed " + this);
210 }
211 }
212
213
214 public synchronized void connect(IConnectionParameters connectionParameters)
215 {
216 TcpConnection connection = null;
217
218 if (!isActive())
219 {
220 return;
221 }
222
223 TcpConnectionParameters parameters =
224 (TcpConnectionParameters) connectionParameters;
225
226 connection =
227 new TcpConnection(this.context, this, parameters.getIdentity(),
228 parameters.getRemoteHostAddress(),
229 parameters.getRemoteHostTcpPort(),
230 parameters.getRemoteContextHashCode());
231 if (getLog().isInfoEnabled())
232 {
233 getLog().info("(->) Connecting to outbound " + connection);
234 }
235 connection.start();
236
237 if (connection.getSocketChannel() != null)
238 {
239 registerConnection(connection, connection.getSocketChannel());
240 if (getLog().isInfoEnabled())
241 {
242 getLog().info("(->) Registered outbound " + connection);
243 }
244
245 connection.send(ByteWriter.getBytes(CONNECTION_MSG_TYPE + DELIMITER
246 + this.context.getIdentity() + DELIMITER
247 + this.context.getContextHashCode()));
248 }
249 else
250 {
251 if (getLog().isInfoEnabled())
252 {
253 getLog().info("No socket for " + connection);
254 }
255 }
256 }
257
258 private void registerConnection(TcpConnection tcpConnection,
259 SocketChannel socketChannel)
260 {
261 this.selectorTasks.register(SelectionKey.OP_READ, socketChannel,
262 tcpConnection);
263 }
264
265 @Override
266 protected AsyncLog getLog()
267 {
268 return LOG;
269 }
270
271 @Override
272 protected void doStart()
273 {
274 try
275 {
276 if (getLog().isInfoEnabled())
277 {
278 getLog().info("Starting " + this);
279 }
280 this.serverSocketChannel = ServerSocketChannel.open();
281 this.serverSocketChannel.configureBlocking(false);
282 if (System.getProperty(CONTEXT_RCV_BUFFER) != null)
283 {
284 this.serverSocketChannel.socket().setReceiveBufferSize(
285 Integer.parseInt(System.getProperty(CONTEXT_RCV_BUFFER)));
286 }
287 final InetSocketAddress endpoint =
288 new InetSocketAddress(this.address, this.port);
289 this.serverSocketChannel.socket().bind(endpoint);
290 this.selectorTasks.openSelector();
291 this.connectionThread =
292 new Thread(this.selectorTasks.getRunnable(), this.toString());
293 this.connectionThread.setDaemon(true);
294 this.connectionThread.start();
295 }
296 catch (IOException e)
297 {
298 getLog().fatal("Could not open channel for " + this, e);
299
300 System.exit(1);
301 }
302 if (getLog().isInfoEnabled())
303 {
304 getLog().info("Started " + this);
305 }
306 final IContainer systemInfo = this.context.getSystemInfo();
307 systemInfo.beginFrame(new EventFrameExecution());
308 try
309 {
310 systemInfo.add(new StringField(CONTEXT_IP_ADDRESS, getAddress()));
311 systemInfo.add(new IntegerField(CONTEXT_TCP_PORT, getPort()));
312 }
313 finally
314 {
315 systemInfo.endFrame();
316 }
317 }
318
319 SelectorTasks getSelectorTasks()
320 {
321 return this.selectorTasks;
322 }
323
324
325
326
327
328
329
330
331
332 private final class ConnectionAcceptTask extends AbstractSelectionKeyTask
333 implements ISelectorTasksStateListener
334 {
335 public ConnectionAcceptTask()
336 {
337 super();
338 }
339
340 public void run()
341 {
342 if (TcpConnectionBroker.this.isActive()
343 && getSelectionKey().isAcceptable())
344 {
345 try
346 {
347 SocketChannel conn =
348 TcpConnectionBroker.this.serverSocketChannel.accept();
349
350 if (LOG.isInfoEnabled())
351 {
352 LOG.info("(<-) Accepted inbound " + conn);
353 }
354 conn.configureBlocking(false);
355 final String address =
356 conn.socket().getInetAddress().getHostName();
357 final int port = conn.socket().getPort();
358 final TcpConnection connection =
359 new TcpConnection(TcpConnectionBroker.this.context,
360 TcpConnectionBroker.this, "(pending...)", address,
361 port, 0);
362 connection.setOutbound(false);
363 connection.setSocketChannel(conn);
364 connection.start();
365 registerConnection(connection,
366 connection.getSocketChannel());
367 if (LOG.isInfoEnabled())
368 {
369 LOG.info("(<-) Registered inbound " + conn);
370 }
371 }
372 catch (Exception e)
373 {
374 logException(getLog(), this
375 + " could not accept connection", e);
376 }
377 }
378 }
379
380 public void selectorOpened(SelectorTasks tasks)
381 {
382 if (TcpConnectionBroker.this.isActive())
383 {
384
385 TcpConnectionBroker.this.selectorTasks.register(
386 SelectionKey.OP_ACCEPT,
387 TcpConnectionBroker.this.serverSocketChannel,
388 TcpConnectionBroker.this.connectionAcceptTask);
389 }
390 }
391 }
392
393 @Override
394 public String toString()
395 {
396 return getClass().getSimpleName() + "<" + this.context.getIdentity()
397 + COLON + this.address + COLON + this.port + ">";
398 }
399 }