1 package fulmine.distribution.connection.tcp;
2
3 import static fulmine.distribution.connection.tcp.ProtocolMessageConstants.CONNECTION_MSG_TYPE;
4 import static fulmine.distribution.connection.tcp.ProtocolMessageConstants.CONN_ACK_MSG_TYPE;
5 import static fulmine.distribution.connection.tcp.ProtocolMessageConstants.DELIMITER;
6 import static fulmine.util.Utils.logException;
7
8 import java.io.IOException;
9 import java.nio.ByteBuffer;
10 import java.nio.channels.SelectionKey;
11 import java.nio.channels.SocketChannel;
12 import java.util.Arrays;
13 import java.util.Iterator;
14 import java.util.List;
15
16 import fulmine.context.IFrameworkContext;
17 import fulmine.distribution.connection.IConnection;
18 import fulmine.distribution.events.ConnectionAvailableEvent;
19 import fulmine.distribution.events.ConnectionDestroyedEvent;
20 import fulmine.distribution.events.MessageConsumedEvent;
21 import fulmine.distribution.events.MessageEvent;
22 import fulmine.event.IEventSource;
23 import fulmine.event.listener.IEventListener;
24 import fulmine.protocol.specification.ByteConstants;
25 import fulmine.protocol.specification.ByteReader;
26 import fulmine.protocol.specification.ByteWriter;
27 import fulmine.util.array.ArrayUtils;
28 import fulmine.util.collection.CollectionFactory;
29 import fulmine.util.io.DelegatingSelectionKeyTask;
30 import fulmine.util.io.ISelectionKeyTask;
31 import fulmine.util.log.AsyncLog;
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 public final class TcpConnection extends AbstractSocketChannelConnection
61 implements IEventSource, IConnection, ISelectionKeyTask
62 {
63 private final static AsyncLog LOG = new AsyncLog(TcpConnection.class);
64
65
66
67
68 private final static byte[] CONNECTION_MSG_TYPE_BYTES =
69 ByteWriter.getBytes(CONNECTION_MSG_TYPE);
70
71
72
73
74 private final static byte[] CONN_ACK_MSG_TYPE_BYTES =
75 ByteWriter.getBytes(CONN_ACK_MSG_TYPE);
76
77
78 private DelegatingSelectionKeyTask selectionKeyTask;
79
80
81 final List<ByteBuffer> messages = CollectionFactory.newList();
82
83
84 private final static byte[] recvBuffer = new byte[65535];
85
86
87 private final TcpConnectionBroker broker;
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105 public TcpConnection(IFrameworkContext context, TcpConnectionBroker broker,
106 String remoteContextIdentity, String address, int port,
107 int connectionHashCode)
108 {
109 super(context, remoteContextIdentity, address, port, connectionHashCode);
110 this.broker = broker;
111 this.selectionKeyTask = new DelegatingSelectionKeyTask(this);
112 }
113
114
115
116
117
118
119
120 public void send(byte[] data)
121 {
122 synchronized (this)
123 {
124 byte[] actual = encode(data);
125
126 final ByteBuffer wrapped = ByteBuffer.wrap(actual);
127 messages.add(wrapped);
128
129 getSelectionKey().interestOps(
130 getSelectionKey().interestOps() | SelectionKey.OP_WRITE);
131 getSelectionKey().selector().wakeup();
132 }
133 }
134
135
136
137
138
139
140
141
142
143
144 static byte[] encode(byte[] data)
145 {
146
147 byte[] actual = new byte[data.length + 4];
148 System.arraycopy(data, 0, actual, 4, data.length);
149 int startFrom = 0;
150 for (int i = 4; i > 0; i--)
151 {
152 actual[startFrom++] =
153 (byte) (data.length >>> ByteConstants.bitShiftForByteOrdinal[i]);
154 }
155 return actual;
156 }
157
158
159
160
161
162
163
164
165
166
167
168
169 static List<byte[]> decode(byte[] message)
170 {
171
172
173 List<byte[]> values = CollectionFactory.newList();
174 int lenPtr = 0;
175 int len = 0;
176 while (lenPtr < message.length)
177 {
178 len = ByteReader.readInteger(message, lenPtr, 4);
179
180 byte[] subData = new byte[len];
181 lenPtr += 4;
182 System.arraycopy(message, lenPtr, subData, 0, len);
183 lenPtr += len;
184 values.add(subData);
185 }
186 return values;
187 }
188
189 public void run()
190 {
191 if (isActive())
192 {
193 if (getSelectionKey().isWritable())
194 {
195 write();
196 }
197 if (getSelectionKey().isReadable())
198 {
199 read();
200 }
201 }
202 }
203
204
205
206
207 private void read()
208 {
209 try
210 {
211 final ByteBuffer recv = ByteBuffer.wrap(recvBuffer);
212 final SocketChannel socketChannel =
213 ((SocketChannel) getSelectionKey().channel());
214 final int size = socketChannel.read(recv);
215 if (size == -1)
216 {
217
218
219
220
221 throw new IOException("end-of-stream");
222 }
223 final byte[] bufferData = new byte[recv.position()];
224 System.arraycopy(recv.array(), 0, bufferData, 0, recv.position());
225
226 final List<byte[]> decoded = decode(bufferData);
227 for (byte[] byteData : decoded)
228 {
229 if (getLog().isTraceEnabled())
230 {
231 final String stringData =
232 new String(byteData, ByteConstants.ENCODING).trim();
233 getLog().trace(
234 this + " received '"
235 + Arrays.toString(stringData.getBytes()) + "'");
236 }
237 if (!isConnectionHandshakeComplete())
238 {
239 if (ArrayUtils.startsWith(CONNECTION_MSG_TYPE_BYTES,
240 byteData))
241 {
242 final String data =
243 new String(byteData, ByteConstants.ENCODING).trim();
244
245
246 final String[] fields = data.split(DELIMITER);
247 setIdentity(fields[1]);
248 setRemoteContextHashCode(Integer.parseInt(fields[2]));
249 connectionHandshakeComplete();
250
251 send(ByteWriter.getBytes(CONN_ACK_MSG_TYPE + DELIMITER
252 + getContext().getIdentity() + DELIMITER
253 + getContext().getContextHashCode()));
254
255
256 getContext().queueEvent(
257 new ConnectionAvailableEvent(getContext(), this));
258 }
259 else
260 {
261 if (ArrayUtils.startsWith(CONN_ACK_MSG_TYPE_BYTES,
262 byteData))
263 {
264
265
266
267
268
269
270
271
272
273
274
275 final String data =
276 new String(byteData, ByteConstants.ENCODING).trim();
277 final String[] fields = data.split(DELIMITER);
278 setIdentity(fields[1]);
279 setRemoteContextHashCode(Integer.parseInt(fields[2]));
280 connectionHandshakeComplete();
281
282 getContext().queueEvent(
283 new ConnectionAvailableEvent(getContext(), this));
284 }
285 else
286 {
287 if (getLog().isWarnEnabled())
288 {
289 getLog().warn(
290 "Connection not properly established"
291 + " so could not process message '"
292 + new String(byteData,
293 ByteConstants.ENCODING).trim()
294 + "'");
295 }
296
297 destroy();
298 }
299 }
300 }
301 else
302 {
303 final MessageEvent messageEvent =
304 new MessageEvent(this, byteData);
305 messageEvent.setTriggerEvent(new MessageConsumedEvent(this,
306 messageEvent));
307 synchronized (this)
308 {
309 this.messagesToProcess++;
310 }
311 getContext().queueEvent(messageEvent);
312 }
313 }
314 }
315 catch (IOException e)
316 {
317 logException(getLog(), "Destroying " + this
318 + " because data could not be read from "
319 + getSelectionKey().channel(), e);
320 this.destroyWhenMessagesProcessed = true;
321 destroy();
322 }
323 }
324
325
326
327
328 private void write()
329 {
330 List<ByteBuffer> copy = CollectionFactory.newList(this.messages.size());
331 synchronized (this)
332 {
333 copy.addAll(this.messages);
334 this.messages.clear();
335 }
336 for (Iterator<ByteBuffer> iterator = copy.iterator(); iterator.hasNext();)
337 {
338 final ByteBuffer data = iterator.next();
339 try
340 {
341 if (getLog().isTraceEnabled())
342 {
343 getLog().trace(
344 this + " sending '" + Arrays.toString(data.array())
345 + "'");
346 }
347 ((SocketChannel) getSelectionKey().channel()).write(data);
348 }
349 catch (IOException e)
350 {
351 logException(getLog(), this + " could not write data '"
352 + new String(data.array()) + "'", e);
353 }
354 }
355 synchronized (this)
356 {
357 if (this.messages.size() == 0)
358 {
359
360 getSelectionKey().interestOps(
361 getSelectionKey().interestOps() ^ SelectionKey.OP_WRITE);
362 }
363 }
364 }
365
366 @Override
367 protected void doDestroy()
368 {
369 super.doDestroy();
370 try
371 {
372 if (getSelectionKey() != null
373 && getSelectionKey().channel() != null)
374 {
375 getSelectionKey().channel().close();
376 }
377 }
378 catch (Exception e)
379 {
380 logException(getLog(), "Could not close channel for " + this, e);
381 }
382 }
383
384 public SelectionKey getSelectionKey()
385 {
386 return selectionKeyTask.getSelectionKey();
387 }
388
389 public void setSelectionKey(SelectionKey selectionKey)
390 {
391 selectionKeyTask.setSelectionKey(selectionKey);
392 }
393
394 protected AsyncLog getLog()
395 {
396 return LOG;
397 }
398
399 @Override
400 protected void doComponentDestroy()
401 {
402 super.doComponentDestroy();
403 this.broker.getSelectorTasks().unregister(
404 this.selectionKeyTask.getSelectionKey());
405 }
406 }