1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package fulmine.distribution.connection;
17
18 import static fulmine.distribution.connection.tcp.ProtocolMessageConstants.DELIMITER;
19 import static fulmine.distribution.connection.tcp.ProtocolMessageConstants.PULSE_MSG;
20 import static fulmine.util.Utils.COLON;
21 import static fulmine.util.Utils.SPACE;
22 import static fulmine.util.Utils.logException;
23 import static fulmine.util.Utils.safeToString;
24
25 import java.io.IOException;
26 import java.net.DatagramPacket;
27 import java.net.InetAddress;
28 import java.net.MulticastSocket;
29 import java.net.NetworkInterface;
30 import java.net.SocketException;
31 import java.util.Iterator;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Set;
35
36 import fulmine.AbstractLifeCycle;
37 import fulmine.context.IContextWatchdog;
38 import fulmine.context.IFrameworkContext;
39 import fulmine.distribution.IHeartbeatMonitor;
40 import fulmine.distribution.connection.tcp.ProtocolMessageConstants;
41 import fulmine.distribution.events.ConnectionDestroyedEvent;
42 import fulmine.distribution.events.ContextDiscoveredEvent;
43 import fulmine.distribution.events.ContextNotAvailableEvent;
44 import fulmine.event.EventFrameExecution;
45 import fulmine.model.container.IContainer;
46 import fulmine.model.field.BooleanField;
47 import fulmine.model.field.IntegerField;
48 import fulmine.model.field.LongField;
49 import fulmine.model.field.StringField;
50 import fulmine.protocol.specification.ByteConstants;
51 import fulmine.protocol.specification.ByteWriter;
52 import fulmine.util.Utils;
53 import fulmine.util.collection.CollectionFactory;
54 import fulmine.util.collection.CollectionUtils;
55 import fulmine.util.collection.TtlSet;
56 import fulmine.util.concurrent.ThreadUtils;
57 import fulmine.util.log.AsyncLog;
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105 public abstract class AbstractConnectionDiscoverer extends AbstractLifeCycle
106 implements IConnectionDiscoverer
107 {
108
109 private static final String HEARTBEAT_PERIOD = "heartbeatPeriod";
110
111 private static final String ALLOWED_MISSED_HEARTBEATS =
112 "allowedMissedHeartbeats";
113
114 private static final String SENDING_HEARTBEATS = "sendingHeartbeats";
115
116 private final static AsyncLog LOG =
117 new AsyncLog(AbstractConnectionDiscoverer.class);
118
119
120
121
122
123
124
125 public static final String UDP_DISCOVERY_NETWORK = "discover.udp.network";
126
127
128 public static final String DEFAULT_UDP_DISCOVERY_NETWORK = "228.160.2.5";
129
130
131
132
133
134 public static final String NETWORK_INTERFACE_NAME = "discover.udp.nic";
135
136
137
138
139
140
141
142 public static final String UDP_DISCOVERY_PORT = "discover.udp.port";
143
144
145 public static final String DEFAULT_UDP_DISCOVERY_PORT = "16025";
146
147
148 public static final int DEFAULT_DUPLICATE_PING_WINDOW = 5000;
149
150
151 protected MulticastSocket socket;
152
153
154 protected final HeartbeatListener heartbeatListener;
155
156
157 protected final Thread heartbeatListenerThread;
158
159
160 protected final HeartbeatProcessor heartbeatProcessor;
161
162
163 protected final Thread heartbeatProcessorThread;
164
165
166
167
168
169 protected final HeartbeatGenerator heartbeatGenerator;
170
171
172 protected final Thread heartbeatGeneratorThread;
173
174
175
176
177 protected String pulse;
178
179
180 protected final int port;
181
182
183 protected final InetAddress network;
184
185
186 protected String description;
187
188
189 private final IFrameworkContext context;
190
191
192 private long heartbeatPeriod = IHeartbeatMonitor.DEFAULT_HEARTBEAT_PERIOD;
193
194
195 private int allowedMissedHeartbeats =
196 IHeartbeatMonitor.DEFAULT_ALLOWED_MISSED_COUNT;
197
198
199
200
201
202 private List<String> receivedHeartbeats;
203
204
205
206
207
208 private Set<String> deadContexts;
209
210
211
212
213
214 private final Map<String, IConnectionParameters> current;
215
216
217
218
219
220 private final Map<String, IConnectionParameters> previous;
221
222
223 private boolean pulsingEnabled = true;
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239 final class HeartbeatProcessor extends AbstractLifeCycle implements
240 Runnable
241 {
242
243 private HeartbeatProcessor()
244 {
245 super();
246 }
247
248 public void run()
249 {
250 int count = 0;
251 while (AbstractConnectionDiscoverer.this.isActive())
252 {
253 count++;
254 try
255 {
256 Thread.sleep(AbstractConnectionDiscoverer.this.heartbeatPeriod);
257 scanForNewContexts();
258
259 if (count
260 % AbstractConnectionDiscoverer.this.allowedMissedHeartbeats != 0)
261 {
262 continue;
263 }
264 count = 0;
265 scanForLostContexts();
266 }
267 catch (Exception e)
268 {
269 logException(getLog(), this, e);
270 }
271 }
272 }
273
274 @Override
275 protected void doDestroy()
276 {
277 AbstractConnectionDiscoverer.this.heartbeatProcessorThread.interrupt();
278 }
279
280 @Override
281 protected void doStart()
282 {
283 }
284
285 @Override
286 protected AsyncLog getLog()
287 {
288 return AbstractConnectionDiscoverer.this.getLog();
289 }
290
291
292
293
294
295
296 private void scanForNewContexts()
297 {
298 List<String> rxCopy;
299 Set<String> deadContextsCopy;
300 synchronized (AbstractConnectionDiscoverer.this.receivedHeartbeats)
301 {
302 rxCopy = AbstractConnectionDiscoverer.this.receivedHeartbeats;
303 AbstractConnectionDiscoverer.this.receivedHeartbeats =
304 CollectionFactory.newList();
305 }
306 synchronized (AbstractConnectionDiscoverer.this.deadContexts)
307 {
308 deadContextsCopy =
309 CollectionFactory.newSet(AbstractConnectionDiscoverer.this.deadContexts);
310 AbstractConnectionDiscoverer.this.deadContexts.clear();
311 }
312
313
314 for (String deadContext : deadContextsCopy)
315 {
316 for (Iterator<String> iterator = deadContextsCopy.iterator(); iterator.hasNext();)
317 {
318 String pulse = iterator.next();
319 if (AbstractConnectionDiscoverer.this.isPulseFromContext(
320 deadContext, pulse))
321 {
322 iterator.remove();
323 }
324 }
325 }
326
327 for (String pulse : rxCopy)
328 {
329
330 if (!AbstractConnectionDiscoverer.this.pulse.equals(pulse))
331 {
332 IConnectionParameters params =
333 AbstractConnectionDiscoverer.this.getConnectionParameters(pulse);
334 if (params != null)
335 {
336 boolean raiseDiscoveredEvent = false;
337 final IConnectionParameters putResult;
338 synchronized (AbstractConnectionDiscoverer.this.current)
339 {
340 putResult =
341 AbstractConnectionDiscoverer.this.current.put(
342 params.getRemoteContextIdentity(), params);
343
344
345
346
347
348
349 if (putResult == null)
350 {
351
352 final IConnectionParameters instanceInPrevious =
353 AbstractConnectionDiscoverer.this.previous.get(params.getRemoteContextIdentity());
354
355
356
357
358 raiseDiscoveredEvent =
359 ((instanceInPrevious == null) || (!instanceInPrevious.equals(params)));
360 }
361 else
362 {
363
364
365
366
367 raiseDiscoveredEvent =
368 !putResult.equals(params);
369 }
370 }
371
372
373
374
375 if (raiseDiscoveredEvent)
376 {
377 if (getLog().isInfoEnabled())
378 {
379 getLog().info(
380 "Context discovered: "
381 + safeToString(params));
382 }
383 if (AbstractConnectionDiscoverer.this.context.isActive())
384 {
385 AbstractConnectionDiscoverer.this.context.queueEvent(new ContextDiscoveredEvent(
386 AbstractConnectionDiscoverer.this.context,
387 params));
388 }
389 }
390 }
391 }
392 }
393 }
394
395
396
397
398
399
400
401 private void scanForLostContexts()
402 {
403
404
405 final Set<String> notAvailable;
406 synchronized (AbstractConnectionDiscoverer.this.current)
407 {
408 if (getLog().isDebugEnabled())
409 {
410 getLog().debug(
411 "Contexts found in this "
412 + (AbstractConnectionDiscoverer.this.allowedMissedHeartbeats * AbstractConnectionDiscoverer.this.heartbeatPeriod)
413 + "ms period:"
414 + CollectionUtils.toFormattedString(AbstractConnectionDiscoverer.this.current)
415 + "Contexts found in the previous period:"
416 + CollectionUtils.toFormattedString(AbstractConnectionDiscoverer.this.previous));
417 }
418 notAvailable =
419 CollectionFactory.newSet(AbstractConnectionDiscoverer.this.previous.keySet());
420 notAvailable.removeAll(AbstractConnectionDiscoverer.this.current.keySet());
421
422 AbstractConnectionDiscoverer.this.previous.clear();
423 AbstractConnectionDiscoverer.this.previous.putAll(AbstractConnectionDiscoverer.this.current);
424 AbstractConnectionDiscoverer.this.current.clear();
425 }
426 for (String remoteContextId : notAvailable)
427 {
428 if (getLog().isInfoEnabled())
429 {
430 getLog().info("Context not available: " + remoteContextId);
431 }
432 AbstractConnectionDiscoverer.this.context.queueEvent(new ContextNotAvailableEvent(
433 AbstractConnectionDiscoverer.this.context, remoteContextId));
434 }
435 }
436 }
437
438
439
440
441
442
443
444
445
446 final class HeartbeatListener extends AbstractLifeCycle implements Runnable
447 {
448 private HeartbeatListener()
449 {
450 super();
451 }
452
453 public void run()
454 {
455 while (AbstractConnectionDiscoverer.this.isActive())
456 {
457 byte[] buf = new byte[1024];
458 DatagramPacket recv = new DatagramPacket(buf, buf.length);
459 try
460 {
461 socket.receive(recv);
462 String data =
463 new String(recv.getData(), ByteConstants.ENCODING).trim();
464 synchronized (AbstractConnectionDiscoverer.this.receivedHeartbeats)
465 {
466 AbstractConnectionDiscoverer.this.receivedHeartbeats.add(data);
467 }
468 }
469 catch (Exception e)
470 {
471 logException(getLog(), this, e);
472 }
473 }
474 }
475
476 @Override
477 protected void doDestroy()
478 {
479 AbstractConnectionDiscoverer.this.heartbeatListenerThread.interrupt();
480 }
481
482 @Override
483 protected void doStart()
484 {
485 }
486
487 @Override
488 protected AsyncLog getLog()
489 {
490 return AbstractConnectionDiscoverer.this.getLog();
491 }
492
493 }
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513 final class HeartbeatGenerator extends AbstractLifeCycle implements
514 Runnable
515 {
516
517 private HeartbeatGenerator()
518 {
519 super();
520 }
521
522 @Override
523 protected void doStart()
524 {
525 }
526
527 @Override
528 protected void doDestroy()
529 {
530 AbstractConnectionDiscoverer.this.heartbeatGeneratorThread.interrupt();
531 }
532
533 @Override
534 protected AsyncLog getLog()
535 {
536 return AbstractConnectionDiscoverer.this.getLog();
537 }
538
539 public void run()
540 {
541 while (AbstractConnectionDiscoverer.this.isActive())
542 {
543 try
544 {
545 final IContextWatchdog contextWatchdog =
546 AbstractConnectionDiscoverer.this.context.getContextWatchdog();
547 boolean pulse = true;
548 if (contextWatchdog != null)
549 {
550 if (!contextWatchdog.isContextHealthy())
551 {
552 if (getLog().isWarnEnabled())
553 {
554 getLog().warn(
555 AbstractConnectionDiscoverer.this.context
556 + " is not healthy, not sending out heartbeat");
557 }
558 pulse = false;
559 }
560 }
561
562 if (ThreadUtils.findDeadlocks())
563 {
564 if (getLog().isWarnEnabled())
565 {
566 getLog().warn(
567 "Deadlock found, not sending out heartbeat");
568 }
569 pulse = false;
570 }
571 if (AbstractConnectionDiscoverer.this.isPulsingEnabled()
572 && pulse)
573 {
574 AbstractConnectionDiscoverer.this.pulse();
575 }
576 long now = System.currentTimeMillis();
577 Thread.sleep(AbstractConnectionDiscoverer.this.heartbeatPeriod);
578 final long actualPeriod = System.currentTimeMillis() - now;
579
580
581
582 final int maxTolerance = 20;
583 if (actualPeriod > AbstractConnectionDiscoverer.this.heartbeatPeriod
584 + (maxTolerance))
585 {
586 if (getLog().isWarnEnabled())
587 {
588 getLog().warn(
589 AbstractConnectionDiscoverer.this
590 + " low CPU resources; heartbeatPeriod "
591 + AbstractConnectionDiscoverer.this.heartbeatPeriod
592 + "ms but actually took " + actualPeriod
593 + "ms");
594 }
595 if (contextWatchdog != null)
596 {
597 contextWatchdog.cpuResourcesLow();
598 }
599 }
600 else
601 {
602 if (contextWatchdog != null)
603 {
604 contextWatchdog.systemNormal();
605 }
606 }
607 }
608 catch (Exception e)
609 {
610 logException(getLog(), this, e);
611 }
612 }
613 }
614 }
615
616
617
618
619
620
621
622
623 public AbstractConnectionDiscoverer(IFrameworkContext context)
624 {
625 this(context, System.getProperty(UDP_DISCOVERY_NETWORK,
626 DEFAULT_UDP_DISCOVERY_NETWORK),
627 Integer.parseInt(System.getProperty(UDP_DISCOVERY_PORT,
628 DEFAULT_UDP_DISCOVERY_PORT)),
629 System.getProperty(NETWORK_INTERFACE_NAME));
630 }
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645 public AbstractConnectionDiscoverer(IFrameworkContext context,
646 String udpNetwork, int udpPort, String udpNic)
647 {
648 super();
649 this.port = udpPort;
650 this.context = context;
651 try
652 {
653 this.network = InetAddress.getByName(udpNetwork);
654 this.socket = new MulticastSocket(this.port);
655 if (udpNic != null)
656 {
657 this.socket.setNetworkInterface(NetworkInterface.getByName(udpNic));
658 }
659 this.socket.joinGroup(this.network);
660 }
661 catch (Exception e)
662 {
663 throw new IllegalStateException("Could not create " + this, e);
664 }
665 this.deadContexts = new TtlSet<String>();
666 this.receivedHeartbeats = CollectionFactory.newList();
667 this.current = CollectionFactory.newMap();
668 this.previous = CollectionFactory.newMap();
669
670 final String threadDetails =
671 this.context.getEventProcessorIdentityPrefix() + COLON
672 + this.network + COLON + this.port;
673
674 this.heartbeatListener = new HeartbeatListener();
675 this.heartbeatListenerThread =
676 new Thread(this.heartbeatListener,
677 this.heartbeatListener.getClass().getSimpleName() + SPACE
678 + threadDetails);
679 this.heartbeatListenerThread.setDaemon(true);
680
681 this.heartbeatProcessor = new HeartbeatProcessor();
682 this.heartbeatProcessorThread =
683 new Thread(this.heartbeatProcessor,
684 this.heartbeatProcessor.getClass().getSimpleName() + SPACE
685 + threadDetails);
686 this.heartbeatProcessorThread.setDaemon(true);
687
688 this.heartbeatGenerator = new HeartbeatGenerator();
689 this.heartbeatGeneratorThread =
690 new Thread(this.heartbeatGenerator,
691 this.heartbeatGenerator.getClass().getSimpleName() + SPACE
692 + threadDetails);
693 this.heartbeatGeneratorThread.setPriority(Thread.MAX_PRIORITY);
694 this.heartbeatGeneratorThread.setDaemon(true);
695 }
696
697
698
699
700
701
702 public final int getPort()
703 {
704 return this.port;
705 }
706
707
708
709
710
711
712 public final InetAddress getNetwork()
713 {
714 return this.network;
715 }
716
717
718
719
720
721
722 public final MulticastSocket getSocket()
723 {
724 return this.socket;
725 }
726
727
728
729
730
731
732
733 public final String getPulse()
734 {
735 return this.pulse;
736 }
737
738
739
740
741
742
743 public final void connectionDestroyed(String remoteContextIdentity)
744 {
745 synchronized (AbstractConnectionDiscoverer.this.current)
746 {
747 AbstractConnectionDiscoverer.this.current.remove(remoteContextIdentity);
748 AbstractConnectionDiscoverer.this.previous.remove(remoteContextIdentity);
749 }
750
751 synchronized (AbstractConnectionDiscoverer.this.deadContexts)
752 {
753 AbstractConnectionDiscoverer.this.deadContexts.add(remoteContextIdentity);
754 }
755 }
756
757 public final long getNetworkHeartbeatPeriod()
758 {
759 return heartbeatPeriod;
760 }
761
762 public final int getAllowableNetworkHeartbeatMissCount()
763 {
764 return allowedMissedHeartbeats;
765 }
766
767 public final void setAllowableNetworkHeartbeatMissCount(
768 int allowedHeartbeatMissCount)
769 {
770 this.allowedMissedHeartbeats = allowedHeartbeatMissCount;
771 }
772
773 public final void setNetworkHeartbeatPeriod(long periodInMillis)
774 {
775 this.heartbeatPeriod = periodInMillis;
776 }
777
778 public void disablePulsing()
779 {
780 this.pulsingEnabled = false;
781 updateSystemInfo();
782 }
783
784 public void enablePulsing()
785 {
786 this.pulsingEnabled = true;
787 updateSystemInfo();
788 }
789
790 private void updateSystemInfo()
791 {
792 final IContainer systemInfo = this.context.getSystemInfo();
793 systemInfo.beginFrame(new EventFrameExecution());
794 try
795 {
796 systemInfo.add(new BooleanField(SENDING_HEARTBEATS,
797 isPulsingEnabled()));
798 }
799 finally
800 {
801 systemInfo.endFrame();
802 }
803 }
804
805 @Override
806 protected final void doDestroy()
807 {
808 if (getLog().isInfoEnabled())
809 {
810 getLog().info("Destroying " + this);
811 }
812 this.heartbeatListener.destroy();
813 this.socket.close();
814 this.heartbeatProcessor.destroy();
815 this.heartbeatGenerator.destroy();
816 if (getLog().isInfoEnabled())
817 {
818 getLog().info("Destroyed " + this);
819 }
820 }
821
822 @Override
823 protected void doStart()
824 {
825 if (getLog().isInfoEnabled())
826 {
827 getLog().info("Starting " + this);
828 }
829 this.pulse =
830 PULSE_MSG + DELIMITER + this.context.getIdentity() + DELIMITER
831 + this.context.getContextHashCode() + DELIMITER
832 + getProtocolConnectionParameters();
833 this.description = getClass().getSimpleName() + " for " + this.pulse;
834 this.heartbeatListenerThread.start();
835 this.heartbeatProcessorThread.start();
836 this.heartbeatGeneratorThread.start();
837 final IContainer systemInfo = this.context.getSystemInfo();
838 systemInfo.beginFrame(new EventFrameExecution());
839 try
840 {
841 systemInfo.add(new StringField(UDP_DISCOVERY_NETWORK,
842 getNetwork().toString()));
843 systemInfo.add(new IntegerField(UDP_DISCOVERY_PORT, getPort()));
844 try
845 {
846 systemInfo.add(new StringField(NETWORK_INTERFACE_NAME,
847 this.socket.getNetworkInterface().getDisplayName()));
848 }
849 catch (SocketException e)
850 {
851 Utils.logException(getLog(),
852 "Could not update systemInfo record with UDP NIC", e);
853 }
854 systemInfo.add(new LongField(HEARTBEAT_PERIOD,
855 getNetworkHeartbeatPeriod()));
856 systemInfo.add(new LongField(ALLOWED_MISSED_HEARTBEATS,
857 getAllowableNetworkHeartbeatMissCount()));
858 systemInfo.add(new BooleanField(SENDING_HEARTBEATS,
859 isPulsingEnabled()));
860 }
861 finally
862 {
863 systemInfo.endFrame();
864 }
865 }
866
867
868
869
870
871
872
873
874
875
876
877 protected boolean isPulseFromContext(String contextIdentity,
878 String pulseMessage)
879 {
880 return pulseMessage.startsWith(PULSE_MSG + DELIMITER + contextIdentity
881 + DELIMITER);
882 }
883
884
885
886
887
888
889 protected void send(String msg)
890 {
891 DatagramPacket data =
892 new DatagramPacket(ByteWriter.getBytes(msg), msg.length(),
893 this.network, this.port);
894 try
895 {
896 this.socket.send(data);
897 }
898 catch (IOException e)
899 {
900 logException(getLog(), "Could not send message '"
901 + safeToString(msg) + "'", e);
902 }
903 }
904
905 @Override
906 public String toString()
907 {
908 return this.description;
909 }
910
911 @Override
912 protected AsyncLog getLog()
913 {
914 return LOG;
915 }
916
917 public final void pulse()
918 {
919 if (getLog().isDebugEnabled())
920 {
921 getLog().debug(this + " pulse");
922 }
923 send(this.pulse);
924 }
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939 protected abstract IConnectionParameters getConnectionParameters(String data);
940
941 protected boolean isPulsingEnabled()
942 {
943 return this.pulsingEnabled;
944 }
945
946
947
948
949
950
951
952
953
954
955
956 protected abstract String getProtocolConnectionParameters();
957
958 }