1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package fulmine.distribution.channel;
17
18 import static fulmine.util.Utils.CLOSE_BRACE;
19 import static fulmine.util.Utils.CLOSE_CURLY;
20 import static fulmine.util.Utils.COMMA_SPACE;
21 import static fulmine.util.Utils.OPEN_BRACE;
22 import static fulmine.util.Utils.OPEN_CURLY;
23 import static fulmine.util.Utils.SPACE;
24 import static fulmine.util.Utils.logException;
25 import static fulmine.util.Utils.nullCheck;
26 import static fulmine.util.Utils.safeToString;
27
28 import java.util.Arrays;
29 import java.util.Collection;
30 import java.util.List;
31 import java.util.Map;
32
33 import fulmine.AbstractLifeCycle;
34 import fulmine.Domain;
35 import fulmine.IAddressable;
36 import fulmine.IDomain;
37 import fulmine.IType;
38 import fulmine.Type;
39 import fulmine.context.IFrameworkContext;
40 import fulmine.context.FulmineContext.SystemInfoFields;
41 import fulmine.distribution.connection.IConnection;
42 import fulmine.distribution.events.MessageEvent;
43 import fulmine.event.IEvent;
44 import fulmine.event.IEventSource;
45 import fulmine.event.listener.AbstractEventHandler;
46 import fulmine.event.listener.IEventListener;
47 import fulmine.event.listener.ILifeCycleEventListener;
48 import fulmine.event.listener.IPriorityEventListener;
49 import fulmine.event.subscription.ISubscriptionManager;
50 import fulmine.event.subscription.ISubscriptionParameters;
51 import fulmine.event.subscription.SubscriptionParameters;
52 import fulmine.event.system.ISystemEventListener;
53 import fulmine.event.system.RxEvent;
54 import fulmine.event.system.TxEvent;
55 import fulmine.model.container.IContainer;
56 import fulmine.model.container.IContainer.DataState;
57 import fulmine.model.container.events.ContainerDestroyedEvent;
58 import fulmine.model.container.events.ContainerStateChangeEvent;
59 import fulmine.model.container.events.RemoteContainerCreatedEvent;
60 import fulmine.model.container.subscription.ContainerSubscriptionManager;
61 import fulmine.model.container.subscription.remote.RemoteContainerSubscriptionManager;
62 import fulmine.model.container.subscription.remote.RxSubscription;
63 import fulmine.model.container.subscription.remote.RxSubscriptionFactory;
64 import fulmine.model.container.subscription.remote.TxSubscription;
65 import fulmine.model.container.subscription.remote.TxSubscriptionFactory;
66 import fulmine.model.field.LongField;
67 import fulmine.protocol.specification.ByteConstants;
68 import fulmine.protocol.specification.ByteWriter;
69 import fulmine.protocol.specification.IFrameReader;
70 import fulmine.rpc.events.RpcInvokeEvent;
71 import fulmine.util.array.ArrayUtils;
72 import fulmine.util.collection.CollectionFactory;
73 import fulmine.util.collection.CollectionUtils;
74 import fulmine.util.log.AsyncLog;
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137 public class Channel extends AbstractLifeCycle implements IChannel,
138 IChannelOperations
139 {
140 final static AsyncLog LOG = new AsyncLog(Channel.class);
141
142
143 private final IChannelState state;
144
145
146
147
148
149 static final int HEADER_LEN =
150 ByteWriter.getBytes(IChannel.MSG_SUBSCRIBE).length;
151
152
153 static final byte[] MSG_SUBSCRIBE_BYTES =
154 ByteWriter.getBytes(IChannel.MSG_SUBSCRIBE);
155
156
157 static final byte[] MSG_UNSUBSCRIBE_BYTES =
158 ByteWriter.getBytes(IChannel.MSG_UNSUBSCRIBE);
159
160
161 static final byte[] MSG_DATA_BYTES = ByteWriter.getBytes(IChannel.MSG_DATA);
162
163
164 static final byte[] MSG_CONTAINER_DESTROYED_BYTES =
165 ByteWriter.getBytes(IChannel.MSG_CONTAINER_DESTROYED);
166
167
168 static final byte[] MSG_SYN_BYTES = ByteWriter.getBytes(IChannel.MSG_SYN);
169
170
171 static final byte[] MSG_SYN_ACK_BYTES =
172 ByteWriter.getBytes(IChannel.MSG_SYN_ACK);
173
174
175 static final byte[] MSG_RETRANSMIT_BYTES =
176 ByteWriter.getBytes(IChannel.MSG_RETRANSMIT);
177
178
179 static final byte[] MSG_RETRANSMIT_ALL_BYTES =
180 ByteWriter.getBytes(IChannel.MSG_RETRANSMIT_ALL);
181
182
183 static final byte[] MSG_DESTROY_CONNECTION_BYTES =
184 ByteWriter.getBytes(IChannel.MSG_DESTROY_CONNECTION);
185
186
187 static final byte[] MSG_INVOKE_RPC_BYTES =
188 ByteWriter.getBytes(IChannel.MSG_INVOKE_RPC);
189
190
191
192
193
194
195
196
197
198 final static byte[] createDataMessage(byte[] data)
199 {
200
201 final int headerLen = Channel.MSG_DATA_BYTES.length;
202 byte[] message = new byte[data.length + headerLen];
203 System.arraycopy(data, 0, message, headerLen, data.length);
204 System.arraycopy(Channel.MSG_DATA_BYTES, 0, message, 0, headerLen);
205 return message;
206 }
207
208
209
210
211
212
213
214
215
216 public Channel(IConnection connection, IFrameworkContext context)
217 {
218
219
220
221
222
223
224 this(new ChannelState(context, connection));
225 }
226
227
228
229
230
231
232
233 protected Channel(IChannelState state)
234 {
235 super();
236 this.state = state;
237 }
238
239 public final boolean subscribe(ISubscriptionParameters parameters)
240 {
241 checkActive();
242 if (getLog().isDebugEnabled())
243 {
244 getLog().debug(this + "Subscribe for " + safeToString(parameters));
245 }
246 final boolean subscribed =
247 getState().getRxSubscriptionManager().subscribe(parameters);
248
249 doSubscribeOperation(parameters);
250 send(ByteWriter.getBytes(Channel.MSG_SUBSCRIBE
251 + parameters.getType().value() + Channel.DELIMITER
252 + parameters.getDomain().value() + Channel.DELIMITER
253 + parameters.getIdentity()));
254 return subscribed;
255 }
256
257
258
259
260
261
262
263 protected void doSubscribeOperation(ISubscriptionParameters parameters)
264 {
265
266 getState().getRxSubscriptionManager().addListener(parameters,
267 getState().getEventHandler());
268 }
269
270
271
272
273
274 private void checkIfConnectionCanBeClosed()
275 {
276 if (getState().getTxSubscriptionManager().getSubscribedSources().size() == 0
277 && getState().getRxSubscriptionManager().getSubscribedSources().size() == 0)
278 {
279 if (getLog().isInfoEnabled())
280 {
281 getLog().info(
282 "No RX or TX subscriptions so destroying "
283 + safeToString(this));
284 }
285 destroy();
286 }
287 }
288
289 public final boolean unsubscribe(ISubscriptionParameters parameters)
290 {
291 checkActive();
292
293
294
295 if (getLog().isDebugEnabled())
296 {
297 getLog().debug(this + "Unsubscribe for " + safeToString(parameters));
298 }
299 final boolean unsubscribed =
300 getState().getRxSubscriptionManager().unsubscribe(parameters);
301 doUnsubscribeOperation(parameters);
302
303 send(ByteWriter.getBytes(Channel.MSG_UNSUBSCRIBE
304 + parameters.getType().value() + Channel.DELIMITER
305 + parameters.getDomain().value() + Channel.DELIMITER
306 + parameters.getIdentity()));
307 checkIfConnectionCanBeClosed();
308 return unsubscribed;
309 }
310
311
312
313
314
315
316
317 protected void doUnsubscribeOperation(ISubscriptionParameters parameters)
318 {
319
320 }
321
322 public final void send(byte[] bytes)
323 {
324 checkActive();
325 getState().getConnection().send(bytes);
326 }
327
328 @Override
329 protected AsyncLog getLog()
330 {
331 return LOG;
332 }
333
334 @Override
335 protected void doStart()
336 {
337 getState().init(this, this);
338 getState().start();
339 if (getLog().isDebugEnabled())
340 {
341 getLog().debug(this + " sending SYN");
342 }
343 send(Channel.MSG_SYN_BYTES);
344 }
345
346 @Override
347 protected final void doDestroy()
348 {
349 if (getLog().isInfoEnabled())
350 {
351 getLog().info("Destroying " + this + " (stacktrace follows)",
352 new Exception());
353 }
354
355 final Collection<? extends IEventSource> rxSubscribedSources =
356 getState().getRxSubscriptionManager().getSubscribedSources();
357 if (getLog().isTraceEnabled())
358 {
359 getLog().trace(
360 this
361 + "[channel destroy] RX subscribed sources to process are "
362 + CollectionUtils.toFormattedString(rxSubscribedSources));
363 }
364 final int size = rxSubscribedSources.size();
365 int count = 0;
366 for (IEventSource eventSource : rxSubscribedSources)
367 {
368 try
369 {
370 doChannelDestroyedOperation(eventSource, ++count == size);
371 }
372 catch (Exception e)
373 {
374 logException(getLog(), eventSource, e);
375 }
376 }
377
378
379 getState().getConnection().removeListener(getState().getEventHandler());
380
381 getState().destroy();
382 }
383
384
385
386
387
388
389
390
391
392
393
394
395 protected void doChannelDestroyedOperation(IEventSource eventSource,
396 boolean last)
397 {
398 eventSource.destroy();
399 }
400
401 public final String getRemoteContextIdentity()
402 {
403 return getState().getConnection().getRemoteContextIdentity();
404 }
405
406 public final String toString()
407 {
408 return getToString(getConnection());
409 }
410
411 public final boolean isConnectionSyn()
412 {
413 return getState().isConnectionSyn();
414 }
415
416 public final void setConnectionSyn()
417 {
418 getState().setConnectionSyn();
419 }
420
421 public final void requestRetransmit(String identityRegularExpression,
422 IType type, IDomain domain)
423 {
424 checkActive();
425 try
426 {
427 send(ByteWriter.getBytes(Channel.MSG_RETRANSMIT + type.value()
428 + Channel.DELIMITER + domain.value() + Channel.DELIMITER
429 + identityRegularExpression));
430 }
431 catch (Exception e)
432 {
433 logException(getLog(), this + " idregex="
434 + identityRegularExpression + COMMA_SPACE + type + COMMA_SPACE
435 + domain, e);
436 }
437 }
438
439 public final void requestRetransmitAll()
440 {
441 checkActive();
442 send(Channel.MSG_RETRANSMIT_ALL_BYTES);
443 }
444
445 public final void retransmitAll()
446 {
447 checkActive();
448 if (getLog().isDebugEnabled())
449 {
450 getLog().debug(this + " retransmitting all subscribed containers");
451 }
452 final Collection<? extends IEventSource> subscribedContainers =
453 getState().getTxSubscriptionManager().getSubscribedSources();
454 doRetransmit(subscribedContainers);
455 }
456
457 public final void retransmit(String identityRegularExpression, IType type,
458 IDomain domain)
459 {
460 checkActive();
461 if (getLog().isDebugEnabled())
462 {
463 getLog().debug(
464 this + " retransmitting containers for identityPattern='"
465 + identityRegularExpression + "'" + ", type=" + type
466 + ", domain=" + domain);
467 }
468 final Collection<IEventSource> subscribedContainers =
469 getState().getTxSubscriptionManager().getSubscribedSources(
470 new SubscriptionParameters(identityRegularExpression, type,
471 domain));
472 if (subscribedContainers.size() == 0)
473 {
474
475 if (getState().getContext().containsLocalContainer(
476 identityRegularExpression, type, domain))
477 {
478 subscribedContainers.add(getState().getContext().getLocalContainer(
479 identityRegularExpression, type, domain));
480 }
481 }
482 doRetransmit(subscribedContainers);
483 }
484
485 void doRetransmit(
486 final Collection<? extends IEventSource> subscribedContainers)
487 {
488 getState().getContext().execute(new Runnable()
489 {
490 public void run()
491 {
492 final long start = System.nanoTime();
493 for (IEventSource container : subscribedContainers)
494 {
495 try
496 {
497
498
499
500 send(Channel.createDataMessage(Channel.this.getState().getContext().getFrameWriter().writeComplete(
501 (IContainer) container)));
502 }
503 catch (Exception e)
504 {
505 logException(getLog(), container, e);
506 }
507 }
508 final long sendTime = (System.nanoTime() - start);
509 if (Channel.LOG.isDebugEnabled())
510 {
511 Channel.LOG.debug("Time to retransmit "
512 + subscribedContainers.size() + " containers: "
513 + sendTime + " nanoseconds");
514 }
515 }
516 });
517 }
518
519 public final IConnection getConnection()
520 {
521 return getState().getConnection();
522 }
523
524 private String getToString(IConnection connection)
525 {
526 return getClass().getSimpleName() + SPACE + OPEN_BRACE
527 + (isActive() ? "active" : "inactive") + CLOSE_BRACE + SPACE
528 + OPEN_CURLY + connection + OPEN_BRACE
529 + connection.getRemoteContextHashCode() + CLOSE_BRACE + CLOSE_CURLY
530 + SPACE;
531 }
532
533
534 IChannelState getState()
535 {
536 return this.state;
537 }
538
539 public final boolean addListener(ISubscriptionParameters parameters,
540 IEventListener listener)
541 {
542 return getState().getRxSubscriptionManager().addListener(parameters,
543 listener);
544 }
545
546 public final void eventSourceCreated(IAddressable identity)
547 {
548 getState().getRxSubscriptionManager().eventSourceCreated(identity);
549 }
550
551 public final void eventSourceDestroyed(IAddressable identity)
552 {
553 getState().getRxSubscriptionManager().eventSourceDestroyed(identity);
554 }
555
556 public final Collection<IEventSource> getSubscribedSources()
557 {
558 return getState().getRxSubscriptionManager().getSubscribedSources();
559 }
560
561 public final Collection<IEventSource> getSubscribedSources(
562 ISubscriptionParameters parameters)
563 {
564 return getState().getRxSubscriptionManager().getSubscribedSources(
565 parameters);
566 }
567
568 public final boolean isSubscribed(IEventSource source)
569 {
570 return getState().getRxSubscriptionManager().isSubscribed(source);
571 }
572
573 public final boolean removeListener(ISubscriptionParameters parameters,
574 IEventListener listener)
575 {
576 return getState().getRxSubscriptionManager().removeListener(parameters,
577 listener);
578 }
579
580 public final boolean includes(IAddressable parameters)
581 {
582 return getState().getRxSubscriptionManager().includes(parameters);
583 }
584
585 public List<IEventListener> getListeners(ISubscriptionParameters parameters)
586 {
587 return getState().getRxSubscriptionManager().getListeners(parameters);
588 }
589
590 public final byte[] getContainerDestroyedMessage(
591 ContainerDestroyedEvent event)
592 {
593 return ByteWriter.getBytes(Channel.MSG_CONTAINER_DESTROYED
594 + event.getType().value() + Channel.DELIMITER
595 + event.getDomain().value() + Channel.DELIMITER
596 + event.getIdentity());
597 }
598
599 public void invokeRpc(String remoteContextIdentity, byte[] rpcData)
600 {
601 send(ArrayUtils.merge(ByteWriter.getBytes(Channel.MSG_INVOKE_RPC),
602 rpcData));
603 }
604 }
605
606
607
608
609
610
611
612 class ChannelState extends AbstractLifeCycle implements IChannelState
613 {
614 private final static AsyncLog LOG = new AsyncLog(ChannelState.class);
615
616 private final IFrameworkContext context;
617
618
619
620
621
622
623
624 private final ISubscriptionManager txSubscriptionManager;
625
626
627
628
629
630
631
632 private final ISubscriptionManager rxSubscriptionManager;
633
634 private boolean connectionSyn;
635
636 private final IConnection connection;
637
638 private ILifeCycleEventListener eventHandler;
639
640
641 private final LongField txCount, rxCount;
642
643
644
645
646
647 private final Map<Integer, Integer> tokenMap;
648
649 ChannelState(IFrameworkContext context, IConnection connection)
650 {
651 super();
652 this.context = context;
653 this.connection = connection;
654
655
656 this.txSubscriptionManager =
657 new ContainerSubscriptionManager(context,
658 new TxSubscriptionFactory());
659 this.rxSubscriptionManager =
660 new RemoteContainerSubscriptionManager(context,
661 connection.getRemoteContextIdentity(),
662 new RxSubscriptionFactory());
663 this.tokenMap = CollectionFactory.newMap();
664 this.txCount =
665 new LongField(getConnection().getRemoteContextIdentity() + ":"
666 + SystemInfoFields.TX_COUNT);
667 this.rxCount =
668 new LongField(getConnection().getRemoteContextIdentity() + ":"
669 + SystemInfoFields.RX_COUNT);
670 getContext().getSystemInfo().add(ChannelState.this.txCount);
671 getContext().getSystemInfo().add(ChannelState.this.rxCount);
672 }
673
674 public Map<Integer, Integer> getTokenMap()
675 {
676 return this.tokenMap;
677 }
678
679
680
681
682
683
684
685 @SuppressWarnings("unchecked")
686 public final void init(IChannel channel, IChannelOperations channelOps)
687 {
688 Map<Class<? extends IEvent>, IEventListener> listeners =
689 AbstractEventHandler.getEventHandlerMappings(createEventHandlers(
690 context, channel, channelOps));
691 this.eventHandler =
692 new ChannelTransmissionListener(OPEN_CURLY
693 + Channel.class.getSimpleName() + " to "
694 + connection.toString() + CLOSE_CURLY, context, listeners);
695
696
697 getConnection().addListener(getEventHandler());
698 }
699
700
701
702
703
704
705
706
707
708
709 @SuppressWarnings("unchecked")
710 AbstractEventHandler[] createEventHandlers(IFrameworkContext context,
711 IChannel channel, IChannelOperations channelOps)
712 {
713 return new AbstractEventHandler[] {
714 new MessageEventHandler(channel, channelOps, this),
715 new RxEventHandler(channelOps, this),
716 new ContainerDestroyedEventHandler(channelOps, this),
717 new ContainerStateChangeEventHandler(channelOps, this),
718 new TxEventHandler(channelOps, this), };
719 }
720
721 protected final void doDestroy()
722 {
723
724 if (getConnection().isActive())
725 {
726 try
727 {
728 getConnection().send(Channel.MSG_DESTROY_CONNECTION_BYTES);
729 }
730 catch (Exception e)
731 {
732 logException(getLog(), this, e);
733 }
734 }
735 getContext().getSystemInfo().remove(this.txCount);
736 getContext().getSystemInfo().remove(this.rxCount);
737 doComponentDestroy();
738 }
739
740
741
742
743 void doComponentDestroy()
744 {
745 this.txSubscriptionManager.destroy();
746 this.rxSubscriptionManager.destroy();
747 this.eventHandler.destroy();
748 }
749
750 protected void doStart()
751 {
752 this.txSubscriptionManager.start();
753 this.rxSubscriptionManager.start();
754 this.eventHandler.start();
755
756 this.connection.start();
757 }
758
759 @Override
760 protected AsyncLog getLog()
761 {
762 return LOG;
763 }
764
765
766
767
768
769 public final ILifeCycleEventListener getEventHandler()
770 {
771 return eventHandler;
772 }
773
774 public final void setConnectionSyn()
775 {
776 this.connectionSyn = true;
777 }
778
779
780 public final IFrameworkContext getContext()
781 {
782 return this.context;
783 }
784
785
786
787
788
789
790
791 public final ISubscriptionManager getTxSubscriptionManager()
792 {
793 return this.txSubscriptionManager;
794 }
795
796
797
798
799
800
801
802 public final ISubscriptionManager getRxSubscriptionManager()
803 {
804 return this.rxSubscriptionManager;
805 }
806
807
808
809
810
811
812
813 public final boolean isConnectionSyn()
814 {
815 return this.connectionSyn;
816 }
817
818
819 public final IConnection getConnection()
820 {
821 return this.connection;
822 }
823
824 public final LongField getTxCount()
825 {
826 return this.txCount;
827 }
828
829 public final LongField getRxCount()
830 {
831 return this.rxCount;
832 }
833 }
834
835
836
837
838
839
840
841
842
843
844 abstract class ChannelEventHandler<EVENT extends IEvent> extends
845 AbstractEventHandler<EVENT> implements ISystemEventListener
846 {
847
848 private final IChannelOperations channelOps;
849
850
851 private final IChannelState state;
852
853 ChannelEventHandler(IChannelOperations channelOps, IChannelState state)
854 {
855 super();
856 nullCheck(channelOps, "No channel operations provided");
857 nullCheck(state, "No state provided");
858 this.channelOps = channelOps;
859 this.state = state;
860 }
861
862 IChannelState getState()
863 {
864 return state;
865 }
866
867 IChannelOperations getChannelOps()
868 {
869 return channelOps;
870 }
871 }
872
873
874
875
876
877
878
879 final class ContainerStateChangeEventHandler extends
880 ChannelEventHandler<ContainerStateChangeEvent>
881 {
882
883 private final static AsyncLog LOG =
884 new AsyncLog(ContainerStateChangeEventHandler.class);
885
886 ContainerStateChangeEventHandler(IChannelOperations channelOps,
887 IChannelState state)
888 {
889 super(channelOps, state);
890 }
891
892 @Override
893 protected AsyncLog getLog()
894 {
895 return LOG;
896 }
897
898 @Override
899 public void handle(ContainerStateChangeEvent event)
900 {
901 if (getState().getTxSubscriptionManager().isSubscribed(
902 event.getContainer()))
903 {
904 getChannelOps().send(
905 Channel.createDataMessage(getState().getContext().getFrameWriter().writeMeta(
906 event.getContainer())));
907 }
908 }
909
910 }
911
912
913
914
915
916
917
918
919
920 final class RxEventHandler extends ChannelEventHandler<RxEvent>
921 {
922
923 private final static AsyncLog LOG = new AsyncLog(RxEventHandler.class);
924
925
926 private long count;
927
928 RxEventHandler(IChannelOperations channelOps, IChannelState state)
929 {
930 super(channelOps, state);
931 }
932
933
934
935
936
937 @Override
938 public void handle(RxEvent event)
939 {
940 getState().getRxCount().set(count++);
941 if (getLog().isTraceEnabled())
942 {
943 getLog().trace(event.getTraceString());
944 }
945
946 getState().getContext().getFrameReader().read(event.getBuffer(),
947 getState().getConnection().getRemoteContextIdentity(),
948 getState().getContext());
949 }
950
951 @Override
952 public AsyncLog getLog()
953 {
954 return LOG;
955 }
956 }
957
958
959
960
961
962
963
964
965 final class TxEventHandler extends ChannelEventHandler<TxEvent>
966 {
967
968 private final static AsyncLog LOG = new AsyncLog(TxEventHandler.class);
969
970
971 private long count;
972
973 TxEventHandler(IChannelOperations channelOps, IChannelState state)
974 {
975 super(channelOps, state);
976 }
977
978
979
980
981
982 @Override
983 public void handle(TxEvent event)
984 {
985 getState().getTxCount().set(count++);
986 if (getLog().isTraceEnabled())
987 {
988 getLog().trace(event.getTraceString());
989 }
990
991 if (getState().getContext().containsLocalContainer(
992 event.getSource().getIdentity(), event.getSource().getType(),
993 event.getSource().getDomain()))
994 {
995 byte[] data = event.getBuffer();
996 getChannelOps().send(Channel.createDataMessage(data));
997 }
998 else
999 {
1000 if (getLog().isDebugEnabled())
1001 {
1002 getLog().debug("Source is destroyed, not sending " + event);
1003 }
1004 }
1005 }
1006
1007 @Override
1008 public AsyncLog getLog()
1009 {
1010 return LOG;
1011 }
1012 }
1013
1014
1015
1016
1017
1018
1019
1020
1021 final class ContainerDestroyedEventHandler extends
1022 ChannelEventHandler<ContainerDestroyedEvent> implements
1023 IPriorityEventListener
1024 {
1025
1026 private final static AsyncLog LOG =
1027 new AsyncLog(ContainerDestroyedEventHandler.class);
1028
1029 ContainerDestroyedEventHandler(IChannelOperations channelOps,
1030 IChannelState state)
1031 {
1032 super(channelOps, state);
1033 }
1034
1035
1036
1037
1038
1039
1040 @Override
1041 public void handle(ContainerDestroyedEvent event)
1042 {
1043 if (getState().getTxSubscriptionManager().includes(
1044 new SubscriptionParameters(event.getIdentity(), event.getType(),
1045 event.getDomain())))
1046 {
1047 getChannelOps().send(
1048 getChannelOps().getContainerDestroyedMessage(event));
1049 }
1050 }
1051
1052 @Override
1053 public AsyncLog getLog()
1054 {
1055 return LOG;
1056 }
1057 }
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068 class MessageEventHandler extends ChannelEventHandler<MessageEvent>
1069 {
1070
1071 private final static AsyncLog LOG = new AsyncLog(MessageEventHandler.class);
1072
1073
1074 private final IChannel channel;
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086 MessageEventHandler(IChannel channel, IChannelOperations channelOps,
1087 IChannelState state)
1088 {
1089 super(channelOps, state);
1090 this.channel = channel;
1091 }
1092
1093 @Override
1094 public void handle(MessageEvent event)
1095 {
1096 if (getLog().isTraceEnabled())
1097 {
1098 getLog().trace(event.getTraceString());
1099 }
1100 final byte[] data = event.getData();
1101 byte[] arg = new byte[data.length - Channel.HEADER_LEN];
1102 System.arraycopy(data, Channel.HEADER_LEN, arg, 0, arg.length);
1103
1104 if (ArrayUtils.startsWith(Channel.MSG_DATA_BYTES, data))
1105 {
1106 doMessageReceived(arg);
1107 return;
1108 }
1109 if (ArrayUtils.startsWith(Channel.MSG_INVOKE_RPC_BYTES, data))
1110 {
1111 doInvokeRpc(arg);
1112 return;
1113 }
1114 try
1115 {
1116 final String payload = new String(arg, ByteConstants.ENCODING);
1117 final String[] elements = splitTypeDomainAndIdentity(payload);
1118 if (ArrayUtils.startsWith(Channel.MSG_SUBSCRIBE_BYTES, data))
1119 {
1120
1121 doTxSubscribe(elements[2],
1122 Type.get(Byte.parseByte(elements[0])),
1123 Domain.get(Byte.parseByte(elements[1])));
1124 return;
1125 }
1126 if (ArrayUtils.startsWith(Channel.MSG_UNSUBSCRIBE_BYTES, data))
1127 {
1128
1129 doTxUnsubscribe(elements[2],
1130 Type.get(Byte.parseByte(elements[0])),
1131 Domain.get(Byte.parseByte(elements[1])));
1132 return;
1133 }
1134 if (ArrayUtils.startsWith(Channel.MSG_CONTAINER_DESTROYED_BYTES,
1135 data))
1136 {
1137
1138 doRemoteContainerDestroyed(elements[2],
1139 Type.get(Byte.parseByte(elements[0])),
1140 Domain.get(Byte.parseByte(elements[1])));
1141 return;
1142 }
1143 if (ArrayUtils.startsWith(Channel.MSG_SYN_BYTES, data))
1144 {
1145
1146 doSynReceived();
1147 return;
1148 }
1149 if (ArrayUtils.startsWith(Channel.MSG_SYN_ACK_BYTES, data))
1150 {
1151
1152 doSynAckReceived();
1153 return;
1154 }
1155 if (ArrayUtils.startsWith(Channel.MSG_RETRANSMIT_BYTES, data))
1156 {
1157 doRetransmit(elements[2],
1158 Type.get(Byte.parseByte(elements[0])),
1159 Domain.get(Byte.parseByte(elements[1])));
1160 return;
1161 }
1162 if (ArrayUtils.startsWith(Channel.MSG_RETRANSMIT_ALL_BYTES, data))
1163 {
1164 doRetransmitAll();
1165 return;
1166 }
1167 if (ArrayUtils.startsWith(Channel.MSG_DESTROY_CONNECTION_BYTES,
1168 data))
1169 {
1170 doDestroyConnection();
1171 return;
1172 }
1173 throw new RuntimeException("Unhandled message=" + payload);
1174 }
1175 catch (Exception e)
1176 {
1177 throw new RuntimeException(
1178 "Could not handle message (unencoded): '"
1179 + Arrays.toString(data) + "'", e);
1180 }
1181 }
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191 void doInvokeRpc(byte[] rpcData)
1192 {
1193 RpcInvokeEvent event =
1194 new RpcInvokeEvent(getState().getContext(), rpcData);
1195
1196 getState().getContext().queueEvent(event);
1197 }
1198
1199
1200
1201
1202
1203
1204
1205 void doMessageReceived(byte[] arg)
1206 {
1207
1208
1209
1210 final IContainer remoteContainer =
1211 getState().getContext().getFrameReader().getRemoteContainerForFrame(
1212 arg, getState().getConnection().getRemoteContextIdentity(),
1213 getState().getContext());
1214
1215
1216
1217
1218
1219
1220 RxEvent rx = new RxEvent(remoteContainer, arg);
1221
1222 getState().getContext().queueEvent(rx);
1223 }
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237 void doRemoteContainerDestroyed(String identity, IType type, IDomain domain)
1238 {
1239 final IContainer remoteContainer =
1240 getState().getContext().getRemoteContainer(
1241 getState().getConnection().getRemoteContextIdentity(),
1242 identity, type, domain);
1243 remoteContainer.destroy();
1244 }
1245
1246 final void doSynAckReceived()
1247 {
1248 if (!getChannelOps().isConnectionSyn())
1249 {
1250 if (Channel.LOG.isDebugEnabled())
1251 {
1252 Channel.LOG.debug(getChannel()
1253 + " received SYN_ACK, channel is ready.");
1254 }
1255 getChannelOps().setConnectionSyn();
1256 ChannelReadyEvent event =
1257 new ChannelReadyEvent(getState().getContext(), getChannel());
1258 getState().getContext().queueEvent(event);
1259 }
1260 else
1261 {
1262 if (getLog().isDebugEnabled())
1263 {
1264 getLog().debug(
1265 getChannel()
1266 + " is already ready, ignoring duplicate SYN_ACK.");
1267 }
1268 }
1269 }
1270
1271 final void doSynReceived()
1272 {
1273
1274 if (Channel.LOG.isDebugEnabled())
1275 {
1276 Channel.LOG.debug(getChannel() + " received SYN, sending SYN_ACK");
1277 }
1278 getChannelOps().send(Channel.MSG_SYN_ACK_BYTES);
1279 if (!getChannelOps().isConnectionSyn())
1280 {
1281
1282
1283
1284
1285 if (Channel.LOG.isDebugEnabled())
1286 {
1287 Channel.LOG.debug(getChannel() + " re-sending SYN");
1288 }
1289 getChannelOps().send(Channel.MSG_SYN_BYTES);
1290 }
1291 }
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305 void doTxSubscribe(String identityRegex, IType type, IDomain domain)
1306 {
1307
1308
1309
1310
1311
1312 final SubscriptionParameters parameters =
1313 new SubscriptionParameters(identityRegex, type, domain);
1314 if (getLog().isInfoEnabled())
1315 {
1316 getLog().info("[subscribe] " + parameters);
1317 }
1318
1319
1320 getState().getTxSubscriptionManager().subscribe(parameters);
1321 getState().getTxSubscriptionManager().addListener(parameters,
1322 getState().getEventHandler());
1323 }
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335 void doTxUnsubscribe(String identityRegex, IType type, IDomain domain)
1336 {
1337
1338 final SubscriptionParameters parameters =
1339 new SubscriptionParameters(identityRegex, type, domain);
1340 if (getLog().isInfoEnabled())
1341 {
1342 getLog().info("[unsubscribe] " + parameters);
1343 }
1344 getState().getTxSubscriptionManager().unsubscribe(parameters);
1345 }
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357 void doRetransmit(String identityRegex, IType type, IDomain domain)
1358 {
1359 getChannel().retransmit(identityRegex, type, domain);
1360 }
1361
1362
1363
1364
1365 void doRetransmitAll()
1366 {
1367 getChannel().retransmitAll();
1368 }
1369
1370
1371
1372
1373 void doDestroyConnection()
1374 {
1375 if (getLog().isInfoEnabled())
1376 {
1377 getLog().info(
1378 "Received destroy connection message for " + getChannel());
1379 }
1380 final IConnection connection = getChannel().getConnection();
1381 if (connection != null)
1382 {
1383 connection.destroy();
1384 }
1385 }
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395 String[] splitTypeDomainAndIdentity(String payload)
1396 {
1397 String[] split = new String[3];
1398 final int indexOf = payload.indexOf(Channel.DELIMITER);
1399 if (indexOf > -1)
1400 {
1401 split[0] = payload.substring(0, indexOf);
1402 final int index2 =
1403 payload.indexOf(Channel.DELIMITER, indexOf
1404 + Channel.DELIMITER.length());
1405 split[1] =
1406 payload.substring(indexOf + Channel.DELIMITER.length(), index2);
1407 split[2] = payload.substring(index2 + Channel.DELIMITER.length());
1408 }
1409 return split;
1410 }
1411
1412 @Override
1413 public AsyncLog getLog()
1414 {
1415 return LOG;
1416 }
1417
1418 IChannel getChannel()
1419 {
1420 return channel;
1421 }
1422 }