1 /*
2 Copyright 2007 Ramon Servadei
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16 package fulmine.distribution.connection;
17
18 import static fulmine.distribution.connection.tcp.ProtocolMessageConstants.DELIMITER;
19 import static fulmine.distribution.connection.tcp.ProtocolMessageConstants.PULSE_MSG;
20 import static fulmine.util.Utils.COLON;
21 import static fulmine.util.Utils.SPACE;
22 import static fulmine.util.Utils.logException;
23 import static fulmine.util.Utils.safeToString;
24
25 import java.io.IOException;
26 import java.net.DatagramPacket;
27 import java.net.InetAddress;
28 import java.net.MulticastSocket;
29 import java.net.NetworkInterface;
30 import java.net.SocketException;
31 import java.util.Iterator;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Set;
35
36 import fulmine.AbstractLifeCycle;
37 import fulmine.context.IContextWatchdog;
38 import fulmine.context.IFrameworkContext;
39 import fulmine.distribution.IHeartbeatMonitor;
40 import fulmine.distribution.connection.tcp.ProtocolMessageConstants;
41 import fulmine.distribution.events.ConnectionDestroyedEvent;
42 import fulmine.distribution.events.ContextDiscoveredEvent;
43 import fulmine.distribution.events.ContextNotAvailableEvent;
44 import fulmine.event.EventFrameExecution;
45 import fulmine.model.container.IContainer;
46 import fulmine.model.field.BooleanField;
47 import fulmine.model.field.IntegerField;
48 import fulmine.model.field.LongField;
49 import fulmine.model.field.StringField;
50 import fulmine.protocol.specification.ByteConstants;
51 import fulmine.protocol.specification.ByteWriter;
52 import fulmine.util.Utils;
53 import fulmine.util.collection.CollectionFactory;
54 import fulmine.util.collection.CollectionUtils;
55 import fulmine.util.collection.TtlSet;
56 import fulmine.util.concurrent.ThreadUtils;
57 import fulmine.util.log.AsyncLog;
58
59 /**
60 * A base-class for {@link IConnectionDiscoverer} instances. The implementation
61 * uses a UDP discovery process.
62 * <p>
63 * This discoverer sends out a heartbeat pulse message at a period specified by
64 * the {@link #heartbeatPeriod} variable. This period can be changed dynamically
65 * at runtime using {@link #setNetworkHeartbeatPeriod(long)}. The pulse message
66 * encapsulates the parameters to make a connection to this local context. Once
67 * a pulse message is received by a remote context, the heartbeat pulses are
68 * expected periodically by the remote context. The expected period for
69 * receiving heartbeat pulses is determined by the remote context; it equals the
70 * remote context's heartbeat period multiplied by an allowed heartbeat miss
71 * counter. The count of missed heartbeats can also be set dynamically at
72 * runtime using {@link #setAllowableNetworkHeartbeatMissCount(int)}. This
73 * mechanism allows for a difference in the heartbeat periods between contexts
74 * but with a certain tolerance.
75 * <p>
76 * The following events are raised by this:
77 * <ul>
78 * <li>{@link ContextDiscoveredEvent} when a new heartbeat pulse is received.
79 * <li>{@link ContextNotAvailableEvent} when the heartbeat pulse from a
80 * previously found context is not received for more than the allowed number of
81 * missed pulses multiplied by this discoverer's heartbeat period
82 * </ul>
83 * <p>
84 * The discoverer uses an {@link IContextWatchdog} component to determine if
85 * heartbeat pulses should be sent out. The {@link IContextWatchdog} component
86 * is also informed if the discoverer encounters any delay between sending out
87 * consecutive pulses; this would generally indicate some form of CPU time
88 * starvation for the heartbeat generator thread.
89 * <p>
90 * The parameters for the UDP network and port can be configured by setting
91 * system properties {@link AbstractConnectionDiscoverer#UDP_DISCOVERY_NETWORK
92 * UDP_DISCOVERY_NETWORK} and
93 * {@link AbstractConnectionDiscoverer#UDP_DISCOVERY_PORT UDP_DISCOVERY_PORT}.
94 * These have defaults of
95 * {@link AbstractConnectionDiscoverer#DEFAULT_UDP_DISCOVERY_NETWORK
96 * DEFAULT_UDP_DISCOVERY_NETWORK} and
97 * {@link AbstractConnectionDiscoverer#DEFAULT_UDP_DISCOVERY_PORT
98 * DEFAULT_UDP_DISCOVERY_PORT} respectively. The network interface to bind to
99 * can be specified using the system property
100 * {@link AbstractConnectionDiscoverer#NETWORK_INTERFACE_NAME
101 * NETWORK_INTERFACE_NAME}. This must specify the name of the NIC to bind to.
102 *
103 * @author Ramon Servadei
104 */
105 public abstract class AbstractConnectionDiscoverer extends AbstractLifeCycle
106 implements IConnectionDiscoverer
107 {
108
109 private static final String HEARTBEAT_PERIOD = "heartbeatPeriod";
110
111 private static final String ALLOWED_MISSED_HEARTBEATS =
112 "allowedMissedHeartbeats";
113
114 private static final String SENDING_HEARTBEATS = "sendingHeartbeats";
115
116 private final static AsyncLog LOG =
117 new AsyncLog(AbstractConnectionDiscoverer.class);
118
119 /**
120 * The system property to override the default UDP multicast network for
121 * discovery.
122 *
123 * @see AbstractConnectionDiscoverer#DEFAULT_UDP_DISCOVERY_NETWORK
124 */
125 public static final String UDP_DISCOVERY_NETWORK = "discover.udp.network";
126
127 /** The default UDP multicast network for discovery */
128 public static final String DEFAULT_UDP_DISCOVERY_NETWORK = "228.160.2.5";
129
130 /**
131 * The system property to use to set the NIC. This is an optional setting
132 * overriding the default. The value must be the NIC name.
133 */
134 public static final String NETWORK_INTERFACE_NAME = "discover.udp.nic";
135
136 /**
137 * The system property to override the default UDP multicast port for
138 * discovery.
139 *
140 * @see AbstractConnectionDiscoverer#DEFAULT_UDP_DISCOVERY_PORT
141 */
142 public static final String UDP_DISCOVERY_PORT = "discover.udp.port";
143
144 /** The default UDP multicast port for discovery */
145 public static final String DEFAULT_UDP_DISCOVERY_PORT = "16025";
146
147 /** The default time window to use to ignore duplicate pulse messages; 5sec */
148 public static final int DEFAULT_DUPLICATE_PING_WINDOW = 5000;
149
150 /** The multicast socket to use for discovering other connections */
151 protected MulticastSocket socket;
152
153 /** Listens for heartbeats messages from other contexts */
154 protected final HeartbeatListener heartbeatListener;
155
156 /** The thread running the {@link #heartbeatListener} */
157 protected final Thread heartbeatListenerThread;
158
159 /** Processes heartbeats from other contexts */
160 protected final HeartbeatProcessor heartbeatProcessor;
161
162 /** The thread running the {@link #heartbeatProcessor} */
163 protected final Thread heartbeatProcessorThread;
164
165 /**
166 * Handles sending out heartbeats and listening for heartbeats from other
167 * contexts
168 */
169 protected final HeartbeatGenerator heartbeatGenerator;
170
171 /** The thread running the {@link #heartbeatGenerator} */
172 protected final Thread heartbeatGeneratorThread;
173
174 /**
175 * The pulse message this context sends out.
176 */
177 protected String pulse;
178
179 /** The UDP discovery port */
180 protected final int port;
181
182 /** The UDP discovery network */
183 protected final InetAddress network;
184
185 /** The string description */
186 protected String description;
187
188 /** The context to use to raise {@link ContextDiscoveredEvent} events */
189 private final IFrameworkContext context;
190
191 /** The heartbeat period */
192 private long heartbeatPeriod = IHeartbeatMonitor.DEFAULT_HEARTBEAT_PERIOD;
193
194 /** The number of allowed missed heartbeats */
195 private int allowedMissedHeartbeats =
196 IHeartbeatMonitor.DEFAULT_ALLOWED_MISSED_COUNT;
197
198 /**
199 * The heartbeats received waiting to be processed by the
200 * {@link HeartbeatProcessor}
201 */
202 private List<String> receivedHeartbeats;
203
204 /**
205 * Holds the set of known dead contexts found during the heartbeat
206 * processing period.
207 */
208 private Set<String> deadContexts;
209
210 /**
211 * The heartbeats received in the current heartbeat window. Access must be
212 * synchronized on the map.
213 */
214 private final Map<String, IConnectionParameters> current;
215
216 /**
217 * The heartbeats received in the previous heartbeat window. Access must be
218 * synchronized on the {@link #current} map.
219 */
220 private final Map<String, IConnectionParameters> previous;
221
222 /** Indicates if the {@link HeartbeatGenerator} should send out a pulse */
223 private boolean pulsingEnabled = true;
224
225 /**
226 * Processes the heartbeat messages received by the
227 * {@link HeartbeatListener}.
228 * <p>
229 * This raises {@link ContextDiscoveredEvent}s when a newly discovered
230 * context heartbeat is received.
231 * <p>
232 * If the heartbeat from a remote context is missed for more than
233 * {@link AbstractConnectionDiscoverer#allowedMissedHeartbeats
234 * allowedMissedHeartbeats} then a {@link ContextNotAvailableEvent} is
235 * raised.
236 *
237 * @author Ramon Servadei
238 */
239 final class HeartbeatProcessor extends AbstractLifeCycle implements
240 Runnable
241 {
242
243 private HeartbeatProcessor()
244 {
245 super();
246 }
247
248 public void run()
249 {
250 int count = 0;
251 while (AbstractConnectionDiscoverer.this.isActive())
252 {
253 count++;
254 try
255 {
256 Thread.sleep(AbstractConnectionDiscoverer.this.heartbeatPeriod);
257 scanForNewContexts();
258
259 if (count
260 % AbstractConnectionDiscoverer.this.allowedMissedHeartbeats != 0)
261 {
262 continue;
263 }
264 count = 0;
265 scanForLostContexts();
266 }
267 catch (Exception e)
268 {
269 logException(getLog(), this, e);
270 }
271 }
272 }
273
274 @Override
275 protected void doDestroy()
276 {
277 AbstractConnectionDiscoverer.this.heartbeatProcessorThread.interrupt();
278 }
279
280 @Override
281 protected void doStart()
282 {
283 }
284
285 @Override
286 protected AsyncLog getLog()
287 {
288 return AbstractConnectionDiscoverer.this.getLog();
289 }
290
291 /**
292 * Helper method that checks for new contexts.
293 * <p>
294 * Raises {@link ContextDiscoveredEvent}.
295 */
296 private void scanForNewContexts()
297 {
298 List<String> rxCopy;
299 Set<String> deadContextsCopy;
300 synchronized (AbstractConnectionDiscoverer.this.receivedHeartbeats)
301 {
302 rxCopy = AbstractConnectionDiscoverer.this.receivedHeartbeats;
303 AbstractConnectionDiscoverer.this.receivedHeartbeats =
304 CollectionFactory.newList();
305 }
306 synchronized (AbstractConnectionDiscoverer.this.deadContexts)
307 {
308 deadContextsCopy =
309 CollectionFactory.newSet(AbstractConnectionDiscoverer.this.deadContexts);
310 AbstractConnectionDiscoverer.this.deadContexts.clear();
311 }
312
313 // remove any dead contexts from the received heartbeats...
314 for (String deadContext : deadContextsCopy)
315 {
316 for (Iterator<String> iterator = deadContextsCopy.iterator(); iterator.hasNext();)
317 {
318 String pulse = iterator.next();
319 if (AbstractConnectionDiscoverer.this.isPulseFromContext(
320 deadContext, pulse))
321 {
322 iterator.remove();
323 }
324 }
325 }
326
327 for (String pulse : rxCopy)
328 {
329 // is this another context's pulse?
330 if (!AbstractConnectionDiscoverer.this.pulse.equals(pulse))
331 {
332 IConnectionParameters params =
333 AbstractConnectionDiscoverer.this.getConnectionParameters(pulse);
334 if (params != null)
335 {
336 boolean raiseDiscoveredEvent = false;
337 final IConnectionParameters putResult;
338 synchronized (AbstractConnectionDiscoverer.this.current)
339 {
340 putResult =
341 AbstractConnectionDiscoverer.this.current.put(
342 params.getRemoteContextIdentity(), params);
343 /*
344 * check if this is a newly found context or the
345 * params have changed - the current is cleared down
346 * by the heartbeat logic so checking for a null
347 * doesn't mean its an undiscovered context
348 */
349 if (putResult == null)
350 {
351 // check if the context is not in previous
352 final IConnectionParameters instanceInPrevious =
353 AbstractConnectionDiscoverer.this.previous.get(params.getRemoteContextIdentity());
354 /*
355 * raise the event if there is no instance in
356 * previous or the parameters have changed
357 */
358 raiseDiscoveredEvent =
359 ((instanceInPrevious == null) || (!instanceInPrevious.equals(params)));
360 }
361 else
362 {
363 /*
364 * this covers the rare situation that the
365 * parameters change within a few heartbeats
366 */
367 raiseDiscoveredEvent =
368 !putResult.equals(params);
369 }
370 }
371 /*
372 * the distribution manager handles what to do if the
373 * parameters change
374 */
375 if (raiseDiscoveredEvent)
376 {
377 if (getLog().isInfoEnabled())
378 {
379 getLog().info(
380 "Context discovered: "
381 + safeToString(params));
382 }
383 if (AbstractConnectionDiscoverer.this.context.isActive())
384 {
385 AbstractConnectionDiscoverer.this.context.queueEvent(new ContextDiscoveredEvent(
386 AbstractConnectionDiscoverer.this.context,
387 params));
388 }
389 }
390 }
391 }
392 }
393 }
394
395 /**
396 * Helper method to check for any contexts that have missed the alotted
397 * number of heartbeats.
398 * <p>
399 * Raises {@link ContextNotAvailableEvent}.
400 */
401 private void scanForLostContexts()
402 {
403 // check the current versus previous pulses only after we
404 // have sent out heartbeats for the allowed miss count
405 final Set<String> notAvailable;
406 synchronized (AbstractConnectionDiscoverer.this.current)
407 {
408 if (getLog().isDebugEnabled())
409 {
410 getLog().debug(
411 "Contexts found in this "
412 + (AbstractConnectionDiscoverer.this.allowedMissedHeartbeats * AbstractConnectionDiscoverer.this.heartbeatPeriod)
413 + "ms period:"
414 + CollectionUtils.toFormattedString(AbstractConnectionDiscoverer.this.current)
415 + "Contexts found in the previous period:"
416 + CollectionUtils.toFormattedString(AbstractConnectionDiscoverer.this.previous));
417 }
418 notAvailable =
419 CollectionFactory.newSet(AbstractConnectionDiscoverer.this.previous.keySet());
420 notAvailable.removeAll(AbstractConnectionDiscoverer.this.current.keySet());
421 // now save the current as the previous
422 AbstractConnectionDiscoverer.this.previous.clear();
423 AbstractConnectionDiscoverer.this.previous.putAll(AbstractConnectionDiscoverer.this.current);
424 AbstractConnectionDiscoverer.this.current.clear();
425 }
426 for (String remoteContextId : notAvailable)
427 {
428 if (getLog().isInfoEnabled())
429 {
430 getLog().info("Context not available: " + remoteContextId);
431 }
432 AbstractConnectionDiscoverer.this.context.queueEvent(new ContextNotAvailableEvent(
433 AbstractConnectionDiscoverer.this.context, remoteContextId));
434 }
435 }
436 }
437
438 /**
439 * Listens for heartbeat pulse messages on the discovery network and adds
440 * them to the {@link AbstractConnectionDiscoverer#receivedHeartbeats
441 * receivedHeartbeats}. These are processed by the
442 * {@link HeartbeatProcessor}.
443 *
444 * @author Ramon Servadei
445 */
446 final class HeartbeatListener extends AbstractLifeCycle implements Runnable
447 {
448 private HeartbeatListener()
449 {
450 super();
451 }
452
453 public void run()
454 {
455 while (AbstractConnectionDiscoverer.this.isActive())
456 {
457 byte[] buf = new byte[1024];
458 DatagramPacket recv = new DatagramPacket(buf, buf.length);
459 try
460 {
461 socket.receive(recv);
462 String data =
463 new String(recv.getData(), ByteConstants.ENCODING).trim();
464 synchronized (AbstractConnectionDiscoverer.this.receivedHeartbeats)
465 {
466 AbstractConnectionDiscoverer.this.receivedHeartbeats.add(data);
467 }
468 }
469 catch (Exception e)
470 {
471 logException(getLog(), this, e);
472 }
473 }
474 }
475
476 @Override
477 protected void doDestroy()
478 {
479 AbstractConnectionDiscoverer.this.heartbeatListenerThread.interrupt();
480 }
481
482 @Override
483 protected void doStart()
484 {
485 }
486
487 @Override
488 protected AsyncLog getLog()
489 {
490 return AbstractConnectionDiscoverer.this.getLog();
491 }
492
493 }
494
495 /**
496 * Sends out the heartbeat pulse for this context.
497 * <p>
498 * This class uses an {@link IContextWatchdog} instance that is queried
499 * using the {@link IContextWatchdog#isContextHealthy()} method before
500 * sending out the heartbeat. If the context is not healthy, the heartbeat
501 * pulse is not sent out.
502 * <p>
503 * This class also provides a mechanism to detect performance issue in the
504 * current runtime by checking that the thread running this is activated at
505 * the required heartbeat period. If the time between activations exceeds
506 * the heartbeat period plus a tolerance, the
507 * {@link IContextWatchdog#cpuResourcesLow()} method is invoked, otherwise,
508 * if the activation time is within the period plus tolerance the
509 * {@link IContextWatchdog#systemNormal()} method is invoked.
510 *
511 * @author Ramon Servadei
512 */
513 final class HeartbeatGenerator extends AbstractLifeCycle implements
514 Runnable
515 {
516
517 private HeartbeatGenerator()
518 {
519 super();
520 }
521
522 @Override
523 protected void doStart()
524 {
525 }
526
527 @Override
528 protected void doDestroy()
529 {
530 AbstractConnectionDiscoverer.this.heartbeatGeneratorThread.interrupt();
531 }
532
533 @Override
534 protected AsyncLog getLog()
535 {
536 return AbstractConnectionDiscoverer.this.getLog();
537 }
538
539 public void run()
540 {
541 while (AbstractConnectionDiscoverer.this.isActive())
542 {
543 try
544 {
545 final IContextWatchdog contextWatchdog =
546 AbstractConnectionDiscoverer.this.context.getContextWatchdog();
547 boolean pulse = true;
548 if (contextWatchdog != null)
549 {
550 if (!contextWatchdog.isContextHealthy())
551 {
552 if (getLog().isWarnEnabled())
553 {
554 getLog().warn(
555 AbstractConnectionDiscoverer.this.context
556 + " is not healthy, not sending out heartbeat");
557 }
558 pulse = false;
559 }
560 }
561 // check for any deadlocks
562 if (ThreadUtils.findDeadlocks())
563 {
564 if (getLog().isWarnEnabled())
565 {
566 getLog().warn(
567 "Deadlock found, not sending out heartbeat");
568 }
569 pulse = false;
570 }
571 if (AbstractConnectionDiscoverer.this.isPulsingEnabled()
572 && pulse)
573 {
574 AbstractConnectionDiscoverer.this.pulse();
575 }
576 long now = System.currentTimeMillis();
577 Thread.sleep(AbstractConnectionDiscoverer.this.heartbeatPeriod);
578 final long actualPeriod = System.currentTimeMillis() - now;
579
580 // compare the actual period with the expected period to
581 // determine any CPU time starvation - allow for 20ms
582 final int maxTolerance = 20;
583 if (actualPeriod > AbstractConnectionDiscoverer.this.heartbeatPeriod
584 + (maxTolerance))
585 {
586 if (getLog().isWarnEnabled())
587 {
588 getLog().warn(
589 AbstractConnectionDiscoverer.this
590 + " low CPU resources; heartbeatPeriod "
591 + AbstractConnectionDiscoverer.this.heartbeatPeriod
592 + "ms but actually took " + actualPeriod
593 + "ms");
594 }
595 if (contextWatchdog != null)
596 {
597 contextWatchdog.cpuResourcesLow();
598 }
599 }
600 else
601 {
602 if (contextWatchdog != null)
603 {
604 contextWatchdog.systemNormal();
605 }
606 }
607 }
608 catch (Exception e)
609 {
610 logException(getLog(), this, e);
611 }
612 }
613 }
614 }
615
616 /**
617 * Constructor that retrieves network values from system properties or use
618 * default values if no system properties are set.
619 *
620 * @param context
621 * the local context
622 */
623 public AbstractConnectionDiscoverer(IFrameworkContext context)
624 {
625 this(context, System.getProperty(UDP_DISCOVERY_NETWORK,
626 DEFAULT_UDP_DISCOVERY_NETWORK),
627 Integer.parseInt(System.getProperty(UDP_DISCOVERY_PORT,
628 DEFAULT_UDP_DISCOVERY_PORT)),
629 System.getProperty(NETWORK_INTERFACE_NAME));
630 }
631
632 /**
633 * Constructor that uses parameters for the network values.
634 *
635 * @param context
636 * the local context
637 * @param udpNetwork
638 * the UDP network
639 * @param udpPort
640 * the UDP port
641 * @param udpNic
642 * the network interface card name to bind to, <code>null</code>
643 * for default
644 */
645 public AbstractConnectionDiscoverer(IFrameworkContext context,
646 String udpNetwork, int udpPort, String udpNic)
647 {
648 super();
649 this.port = udpPort;
650 this.context = context;
651 try
652 {
653 this.network = InetAddress.getByName(udpNetwork);
654 this.socket = new MulticastSocket(this.port);
655 if (udpNic != null)
656 {
657 this.socket.setNetworkInterface(NetworkInterface.getByName(udpNic));
658 }
659 this.socket.joinGroup(this.network);
660 }
661 catch (Exception e)
662 {
663 throw new IllegalStateException("Could not create " + this, e);
664 }
665 this.deadContexts = new TtlSet<String>();
666 this.receivedHeartbeats = CollectionFactory.newList();
667 this.current = CollectionFactory.newMap();
668 this.previous = CollectionFactory.newMap();
669
670 final String threadDetails =
671 this.context.getEventProcessorIdentityPrefix() + COLON
672 + this.network + COLON + this.port;
673
674 this.heartbeatListener = new HeartbeatListener();
675 this.heartbeatListenerThread =
676 new Thread(this.heartbeatListener,
677 this.heartbeatListener.getClass().getSimpleName() + SPACE
678 + threadDetails);
679 this.heartbeatListenerThread.setDaemon(true);
680
681 this.heartbeatProcessor = new HeartbeatProcessor();
682 this.heartbeatProcessorThread =
683 new Thread(this.heartbeatProcessor,
684 this.heartbeatProcessor.getClass().getSimpleName() + SPACE
685 + threadDetails);
686 this.heartbeatProcessorThread.setDaemon(true);
687
688 this.heartbeatGenerator = new HeartbeatGenerator();
689 this.heartbeatGeneratorThread =
690 new Thread(this.heartbeatGenerator,
691 this.heartbeatGenerator.getClass().getSimpleName() + SPACE
692 + threadDetails);
693 this.heartbeatGeneratorThread.setPriority(Thread.MAX_PRIORITY);
694 this.heartbeatGeneratorThread.setDaemon(true);
695 }
696
697 /**
698 * Get the UDP port used for connection discovery
699 *
700 * @return the UDP port for connection discovery
701 */
702 public final int getPort()
703 {
704 return this.port;
705 }
706
707 /**
708 * Get the UDP network for connection discovery
709 *
710 * @return the UDP network for connection discovery
711 */
712 public final InetAddress getNetwork()
713 {
714 return this.network;
715 }
716
717 /**
718 * Get the multicast socket used for connection discovery
719 *
720 * @return the multicast socket used for connection discovery
721 */
722 public final MulticastSocket getSocket()
723 {
724 return this.socket;
725 }
726
727 /**
728 * Get the pulse message sent out to other {@link IConnectionDiscoverer}
729 * instances over the discovery network
730 *
731 * @return the pulse message sent
732 */
733 public final String getPulse()
734 {
735 return this.pulse;
736 }
737
738 /**
739 * This removes the connection details in the
740 * {@link ConnectionDestroyedEvent} from the
741 * {@link AbstractConnectionDiscoverer#current} map of known connections.
742 */
743 public final void connectionDestroyed(String remoteContextIdentity)
744 {
745 synchronized (AbstractConnectionDiscoverer.this.current)
746 {
747 AbstractConnectionDiscoverer.this.current.remove(remoteContextIdentity);
748 AbstractConnectionDiscoverer.this.previous.remove(remoteContextIdentity);
749 }
750 // store the dead context for processing by the HeartbeatProcessor
751 synchronized (AbstractConnectionDiscoverer.this.deadContexts)
752 {
753 AbstractConnectionDiscoverer.this.deadContexts.add(remoteContextIdentity);
754 }
755 }
756
757 public final long getNetworkHeartbeatPeriod()
758 {
759 return heartbeatPeriod;
760 }
761
762 public final int getAllowableNetworkHeartbeatMissCount()
763 {
764 return allowedMissedHeartbeats;
765 }
766
767 public final void setAllowableNetworkHeartbeatMissCount(
768 int allowedHeartbeatMissCount)
769 {
770 this.allowedMissedHeartbeats = allowedHeartbeatMissCount;
771 }
772
773 public final void setNetworkHeartbeatPeriod(long periodInMillis)
774 {
775 this.heartbeatPeriod = periodInMillis;
776 }
777
778 public void disablePulsing()
779 {
780 this.pulsingEnabled = false;
781 updateSystemInfo();
782 }
783
784 public void enablePulsing()
785 {
786 this.pulsingEnabled = true;
787 updateSystemInfo();
788 }
789
790 private void updateSystemInfo()
791 {
792 final IContainer systemInfo = this.context.getSystemInfo();
793 systemInfo.beginFrame(new EventFrameExecution());
794 try
795 {
796 systemInfo.add(new BooleanField(SENDING_HEARTBEATS,
797 isPulsingEnabled()));
798 }
799 finally
800 {
801 systemInfo.endFrame();
802 }
803 }
804
805 @Override
806 protected final void doDestroy()
807 {
808 if (getLog().isInfoEnabled())
809 {
810 getLog().info("Destroying " + this);
811 }
812 this.heartbeatListener.destroy();
813 this.socket.close();
814 this.heartbeatProcessor.destroy();
815 this.heartbeatGenerator.destroy();
816 if (getLog().isInfoEnabled())
817 {
818 getLog().info("Destroyed " + this);
819 }
820 }
821
822 @Override
823 protected void doStart()
824 {
825 if (getLog().isInfoEnabled())
826 {
827 getLog().info("Starting " + this);
828 }
829 this.pulse =
830 PULSE_MSG + DELIMITER + this.context.getIdentity() + DELIMITER
831 + this.context.getContextHashCode() + DELIMITER
832 + getProtocolConnectionParameters();
833 this.description = getClass().getSimpleName() + " for " + this.pulse;
834 this.heartbeatListenerThread.start();
835 this.heartbeatProcessorThread.start();
836 this.heartbeatGeneratorThread.start();
837 final IContainer systemInfo = this.context.getSystemInfo();
838 systemInfo.beginFrame(new EventFrameExecution());
839 try
840 {
841 systemInfo.add(new StringField(UDP_DISCOVERY_NETWORK,
842 getNetwork().toString()));
843 systemInfo.add(new IntegerField(UDP_DISCOVERY_PORT, getPort()));
844 try
845 {
846 systemInfo.add(new StringField(NETWORK_INTERFACE_NAME,
847 this.socket.getNetworkInterface().getDisplayName()));
848 }
849 catch (SocketException e)
850 {
851 Utils.logException(getLog(),
852 "Could not update systemInfo record with UDP NIC", e);
853 }
854 systemInfo.add(new LongField(HEARTBEAT_PERIOD,
855 getNetworkHeartbeatPeriod()));
856 systemInfo.add(new LongField(ALLOWED_MISSED_HEARTBEATS,
857 getAllowableNetworkHeartbeatMissCount()));
858 systemInfo.add(new BooleanField(SENDING_HEARTBEATS,
859 isPulsingEnabled()));
860 }
861 finally
862 {
863 systemInfo.endFrame();
864 }
865 }
866
867 /**
868 * Determine if the pulse message is from the named context
869 *
870 * @param contextIdentity
871 * the context to compare against the pulse message
872 * @param pulseMessage
873 * the pulse message
874 * @return <code>true</code> if the pulse message is from the identified
875 * context
876 */
877 protected boolean isPulseFromContext(String contextIdentity,
878 String pulseMessage)
879 {
880 return pulseMessage.startsWith(PULSE_MSG + DELIMITER + contextIdentity
881 + DELIMITER);
882 }
883
884 /**
885 * Send the message to the multicast socket
886 *
887 * @param msg
888 */
889 protected void send(String msg)
890 {
891 DatagramPacket data =
892 new DatagramPacket(ByteWriter.getBytes(msg), msg.length(),
893 this.network, this.port);
894 try
895 {
896 this.socket.send(data);
897 }
898 catch (IOException e)
899 {
900 logException(getLog(), "Could not send message '"
901 + safeToString(msg) + "'", e);
902 }
903 }
904
905 @Override
906 public String toString()
907 {
908 return this.description;
909 }
910
911 @Override
912 protected AsyncLog getLog()
913 {
914 return LOG;
915 }
916
917 public final void pulse()
918 {
919 if (getLog().isDebugEnabled())
920 {
921 getLog().debug(this + " pulse");
922 }
923 send(this.pulse);
924 }
925
926 /**
927 * Template method for sub-classes to provide the implementation of how to
928 * handle the 'pulse' message received from a peer
929 * {@link IConnectionDiscoverer}. This method inspects the data string and
930 * creates an appropriate {@link IConnectionParameters} instance
931 * representing it.
932 *
933 * @param data
934 * the pulse message from a peer {@link IConnectionDiscoverer}
935 * @return the {@link IConnectionParameters} extracted from the pulse
936 * message or <code>null</code> if the message could not be
937 * deciphered.
938 */
939 protected abstract IConnectionParameters getConnectionParameters(String data);
940
941 protected boolean isPulsingEnabled()
942 {
943 return this.pulsingEnabled;
944 }
945
946 /**
947 * Overridden by subclasses to provide the protocol specific parameters for
948 * connecting to the connection broker of this context. The parameters must
949 * be delimited using {@link ProtocolMessageConstants#DELIMITER}. There is
950 * no need to prepend the delimiter.
951 *
952 * @return a string of parameters for the connection protocol of the
953 * connection broker, delimited using
954 * {@link ProtocolMessageConstants#DELIMITER}
955 */
956 protected abstract String getProtocolConnectionParameters();
957
958 }