View Javadoc

1   /*
2      Copyright 2007 Ramon Servadei
3   
4      Licensed under the Apache License, Version 2.0 (the "License");
5      you may not use this file except in compliance with the License.
6      You may obtain a copy of the License at
7   
8          http://www.apache.org/licenses/LICENSE-2.0
9   
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15   */
16  package fulmine.distribution.connection;
17  
18  import static fulmine.distribution.connection.tcp.ProtocolMessageConstants.DELIMITER;
19  import static fulmine.distribution.connection.tcp.ProtocolMessageConstants.PULSE_MSG;
20  import static fulmine.util.Utils.COLON;
21  import static fulmine.util.Utils.SPACE;
22  import static fulmine.util.Utils.logException;
23  import static fulmine.util.Utils.safeToString;
24  
25  import java.io.IOException;
26  import java.net.DatagramPacket;
27  import java.net.InetAddress;
28  import java.net.MulticastSocket;
29  import java.net.NetworkInterface;
30  import java.net.SocketException;
31  import java.util.Iterator;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Set;
35  
36  import fulmine.AbstractLifeCycle;
37  import fulmine.context.IContextWatchdog;
38  import fulmine.context.IFrameworkContext;
39  import fulmine.distribution.IHeartbeatMonitor;
40  import fulmine.distribution.connection.tcp.ProtocolMessageConstants;
41  import fulmine.distribution.events.ConnectionDestroyedEvent;
42  import fulmine.distribution.events.ContextDiscoveredEvent;
43  import fulmine.distribution.events.ContextNotAvailableEvent;
44  import fulmine.event.EventFrameExecution;
45  import fulmine.model.container.IContainer;
46  import fulmine.model.field.BooleanField;
47  import fulmine.model.field.IntegerField;
48  import fulmine.model.field.LongField;
49  import fulmine.model.field.StringField;
50  import fulmine.protocol.specification.ByteConstants;
51  import fulmine.protocol.specification.ByteWriter;
52  import fulmine.util.Utils;
53  import fulmine.util.collection.CollectionFactory;
54  import fulmine.util.collection.CollectionUtils;
55  import fulmine.util.collection.TtlSet;
56  import fulmine.util.concurrent.ThreadUtils;
57  import fulmine.util.log.AsyncLog;
58  
59  /**
60   * A base-class for {@link IConnectionDiscoverer} instances. The implementation
61   * uses a UDP discovery process.
62   * <p>
63   * This discoverer sends out a heartbeat pulse message at a period specified by
64   * the {@link #heartbeatPeriod} variable. This period can be changed dynamically
65   * at runtime using {@link #setNetworkHeartbeatPeriod(long)}. The pulse message
66   * encapsulates the parameters to make a connection to this local context. Once
67   * a pulse message is received by a remote context, the heartbeat pulses are
68   * expected periodically by the remote context. The expected period for
69   * receiving heartbeat pulses is determined by the remote context; it equals the
70   * remote context's heartbeat period multiplied by an allowed heartbeat miss
71   * counter. The count of missed heartbeats can also be set dynamically at
72   * runtime using {@link #setAllowableNetworkHeartbeatMissCount(int)}. This
73   * mechanism allows for a difference in the heartbeat periods between contexts
74   * but with a certain tolerance.
75   * <p>
76   * The following events are raised by this:
77   * <ul>
78   * <li>{@link ContextDiscoveredEvent} when a new heartbeat pulse is received.
79   * <li>{@link ContextNotAvailableEvent} when the heartbeat pulse from a
80   * previously found context is not received for more than the allowed number of
81   * missed pulses multiplied by this discoverer's heartbeat period
82   * </ul>
83   * <p>
84   * The discoverer uses an {@link IContextWatchdog} component to determine if
85   * heartbeat pulses should be sent out. The {@link IContextWatchdog} component
86   * is also informed if the discoverer encounters any delay between sending out
87   * consecutive pulses; this would generally indicate some form of CPU time
88   * starvation for the heartbeat generator thread.
89   * <p>
90   * The parameters for the UDP network and port can be configured by setting
91   * system properties {@link AbstractConnectionDiscoverer#UDP_DISCOVERY_NETWORK
92   * UDP_DISCOVERY_NETWORK} and
93   * {@link AbstractConnectionDiscoverer#UDP_DISCOVERY_PORT UDP_DISCOVERY_PORT}.
94   * These have defaults of
95   * {@link AbstractConnectionDiscoverer#DEFAULT_UDP_DISCOVERY_NETWORK
96   * DEFAULT_UDP_DISCOVERY_NETWORK} and
97   * {@link AbstractConnectionDiscoverer#DEFAULT_UDP_DISCOVERY_PORT
98   * DEFAULT_UDP_DISCOVERY_PORT} respectively. The network interface to bind to
99   * can be specified using the system property
100  * {@link AbstractConnectionDiscoverer#NETWORK_INTERFACE_NAME
101  * NETWORK_INTERFACE_NAME}. This must specify the name of the NIC to bind to.
102  * 
103  * @author Ramon Servadei
104  */
105 public abstract class AbstractConnectionDiscoverer extends AbstractLifeCycle
106     implements IConnectionDiscoverer
107 {
108 
109     private static final String HEARTBEAT_PERIOD = "heartbeatPeriod";
110 
111     private static final String ALLOWED_MISSED_HEARTBEATS =
112         "allowedMissedHeartbeats";
113 
114     private static final String SENDING_HEARTBEATS = "sendingHeartbeats";
115 
116     private final static AsyncLog LOG =
117         new AsyncLog(AbstractConnectionDiscoverer.class);
118 
119     /**
120      * The system property to override the default UDP multicast network for
121      * discovery.
122      * 
123      * @see AbstractConnectionDiscoverer#DEFAULT_UDP_DISCOVERY_NETWORK
124      */
125     public static final String UDP_DISCOVERY_NETWORK = "discover.udp.network";
126 
127     /** The default UDP multicast network for discovery */
128     public static final String DEFAULT_UDP_DISCOVERY_NETWORK = "228.160.2.5";
129 
130     /**
131      * The system property to use to set the NIC. This is an optional setting
132      * overriding the default. The value must be the NIC name.
133      */
134     public static final String NETWORK_INTERFACE_NAME = "discover.udp.nic";
135 
136     /**
137      * The system property to override the default UDP multicast port for
138      * discovery.
139      * 
140      * @see AbstractConnectionDiscoverer#DEFAULT_UDP_DISCOVERY_PORT
141      */
142     public static final String UDP_DISCOVERY_PORT = "discover.udp.port";
143 
144     /** The default UDP multicast port for discovery */
145     public static final String DEFAULT_UDP_DISCOVERY_PORT = "16025";
146 
147     /** The default time window to use to ignore duplicate pulse messages; 5sec */
148     public static final int DEFAULT_DUPLICATE_PING_WINDOW = 5000;
149 
150     /** The multicast socket to use for discovering other connections */
151     protected MulticastSocket socket;
152 
153     /** Listens for heartbeats messages from other contexts */
154     protected final HeartbeatListener heartbeatListener;
155 
156     /** The thread running the {@link #heartbeatListener} */
157     protected final Thread heartbeatListenerThread;
158 
159     /** Processes heartbeats from other contexts */
160     protected final HeartbeatProcessor heartbeatProcessor;
161 
162     /** The thread running the {@link #heartbeatProcessor} */
163     protected final Thread heartbeatProcessorThread;
164 
165     /**
166      * Handles sending out heartbeats and listening for heartbeats from other
167      * contexts
168      */
169     protected final HeartbeatGenerator heartbeatGenerator;
170 
171     /** The thread running the {@link #heartbeatGenerator} */
172     protected final Thread heartbeatGeneratorThread;
173 
174     /**
175      * The pulse message this context sends out.
176      */
177     protected String pulse;
178 
179     /** The UDP discovery port */
180     protected final int port;
181 
182     /** The UDP discovery network */
183     protected final InetAddress network;
184 
185     /** The string description */
186     protected String description;
187 
188     /** The context to use to raise {@link ContextDiscoveredEvent} events */
189     private final IFrameworkContext context;
190 
191     /** The heartbeat period */
192     private long heartbeatPeriod = IHeartbeatMonitor.DEFAULT_HEARTBEAT_PERIOD;
193 
194     /** The number of allowed missed heartbeats */
195     private int allowedMissedHeartbeats =
196         IHeartbeatMonitor.DEFAULT_ALLOWED_MISSED_COUNT;
197 
198     /**
199      * The heartbeats received waiting to be processed by the
200      * {@link HeartbeatProcessor}
201      */
202     private List<String> receivedHeartbeats;
203 
204     /**
205      * Holds the set of known dead contexts found during the heartbeat
206      * processing period.
207      */
208     private Set<String> deadContexts;
209 
210     /**
211      * The heartbeats received in the current heartbeat window. Access must be
212      * synchronized on the map.
213      */
214     private final Map<String, IConnectionParameters> current;
215 
216     /**
217      * The heartbeats received in the previous heartbeat window. Access must be
218      * synchronized on the {@link #current} map.
219      */
220     private final Map<String, IConnectionParameters> previous;
221 
222     /** Indicates if the {@link HeartbeatGenerator} should send out a pulse */
223     private boolean pulsingEnabled = true;
224 
225     /**
226      * Processes the heartbeat messages received by the
227      * {@link HeartbeatListener}.
228      * <p>
229      * This raises {@link ContextDiscoveredEvent}s when a newly discovered
230      * context heartbeat is received.
231      * <p>
232      * If the heartbeat from a remote context is missed for more than
233      * {@link AbstractConnectionDiscoverer#allowedMissedHeartbeats
234      * allowedMissedHeartbeats} then a {@link ContextNotAvailableEvent} is
235      * raised.
236      * 
237      * @author Ramon Servadei
238      */
239     final class HeartbeatProcessor extends AbstractLifeCycle implements
240         Runnable
241     {
242 
243         private HeartbeatProcessor()
244         {
245             super();
246         }
247 
248         public void run()
249         {
250             int count = 0;
251             while (AbstractConnectionDiscoverer.this.isActive())
252             {
253                 count++;
254                 try
255                 {
256                     Thread.sleep(AbstractConnectionDiscoverer.this.heartbeatPeriod);
257                     scanForNewContexts();
258 
259                     if (count
260                         % AbstractConnectionDiscoverer.this.allowedMissedHeartbeats != 0)
261                     {
262                         continue;
263                     }
264                     count = 0;
265                     scanForLostContexts();
266                 }
267                 catch (Exception e)
268                 {
269                     logException(getLog(), this, e);
270                 }
271             }
272         }
273 
274         @Override
275         protected void doDestroy()
276         {
277             AbstractConnectionDiscoverer.this.heartbeatProcessorThread.interrupt();
278         }
279 
280         @Override
281         protected void doStart()
282         {
283         }
284 
285         @Override
286         protected AsyncLog getLog()
287         {
288             return AbstractConnectionDiscoverer.this.getLog();
289         }
290 
291         /**
292          * Helper method that checks for new contexts.
293          * <p>
294          * Raises {@link ContextDiscoveredEvent}.
295          */
296         private void scanForNewContexts()
297         {
298             List<String> rxCopy;
299             Set<String> deadContextsCopy;
300             synchronized (AbstractConnectionDiscoverer.this.receivedHeartbeats)
301             {
302                 rxCopy = AbstractConnectionDiscoverer.this.receivedHeartbeats;
303                 AbstractConnectionDiscoverer.this.receivedHeartbeats =
304                     CollectionFactory.newList();
305             }
306             synchronized (AbstractConnectionDiscoverer.this.deadContexts)
307             {
308                 deadContextsCopy =
309                     CollectionFactory.newSet(AbstractConnectionDiscoverer.this.deadContexts);
310                 AbstractConnectionDiscoverer.this.deadContexts.clear();
311             }
312 
313             // remove any dead contexts from the received heartbeats...
314             for (String deadContext : deadContextsCopy)
315             {
316                 for (Iterator<String> iterator = deadContextsCopy.iterator(); iterator.hasNext();)
317                 {
318                     String pulse = iterator.next();
319                     if (AbstractConnectionDiscoverer.this.isPulseFromContext(
320                         deadContext, pulse))
321                     {
322                         iterator.remove();
323                     }
324                 }
325             }
326 
327             for (String pulse : rxCopy)
328             {
329                 // is this another context's pulse?
330                 if (!AbstractConnectionDiscoverer.this.pulse.equals(pulse))
331                 {
332                     IConnectionParameters params =
333                         AbstractConnectionDiscoverer.this.getConnectionParameters(pulse);
334                     if (params != null)
335                     {
336                         boolean raiseDiscoveredEvent = false;
337                         final IConnectionParameters putResult;
338                         synchronized (AbstractConnectionDiscoverer.this.current)
339                         {
340                             putResult =
341                                 AbstractConnectionDiscoverer.this.current.put(
342                                     params.getRemoteContextIdentity(), params);
343                             /*
344                              * check if this is a newly found context or the
345                              * params have changed - the current is cleared down
346                              * by the heartbeat logic so checking for a null
347                              * doesn't mean its an undiscovered context
348                              */
349                             if (putResult == null)
350                             {
351                                 // check if the context is not in previous
352                                 final IConnectionParameters instanceInPrevious =
353                                     AbstractConnectionDiscoverer.this.previous.get(params.getRemoteContextIdentity());
354                                 /*
355                                  * raise the event if there is no instance in
356                                  * previous or the parameters have changed
357                                  */
358                                 raiseDiscoveredEvent =
359                                     ((instanceInPrevious == null) || (!instanceInPrevious.equals(params)));
360                             }
361                             else
362                             {
363                                 /*
364                                  * this covers the rare situation that the
365                                  * parameters change within a few heartbeats
366                                  */
367                                 raiseDiscoveredEvent =
368                                     !putResult.equals(params);
369                             }
370                         }
371                         /*
372                          * the distribution manager handles what to do if the
373                          * parameters change
374                          */
375                         if (raiseDiscoveredEvent)
376                         {
377                             if (getLog().isInfoEnabled())
378                             {
379                                 getLog().info(
380                                     "Context discovered: "
381                                         + safeToString(params));
382                             }
383                             if (AbstractConnectionDiscoverer.this.context.isActive())
384                             {
385                                 AbstractConnectionDiscoverer.this.context.queueEvent(new ContextDiscoveredEvent(
386                                     AbstractConnectionDiscoverer.this.context,
387                                     params));
388                             }
389                         }
390                     }
391                 }
392             }
393         }
394 
395         /**
396          * Helper method to check for any contexts that have missed the alotted
397          * number of heartbeats.
398          * <p>
399          * Raises {@link ContextNotAvailableEvent}.
400          */
401         private void scanForLostContexts()
402         {
403             // check the current versus previous pulses only after we
404             // have sent out heartbeats for the allowed miss count
405             final Set<String> notAvailable;
406             synchronized (AbstractConnectionDiscoverer.this.current)
407             {
408                 if (getLog().isDebugEnabled())
409                 {
410                     getLog().debug(
411                         "Contexts found in this "
412                             + (AbstractConnectionDiscoverer.this.allowedMissedHeartbeats * AbstractConnectionDiscoverer.this.heartbeatPeriod)
413                             + "ms period:"
414                             + CollectionUtils.toFormattedString(AbstractConnectionDiscoverer.this.current)
415                             + "Contexts found in the previous period:"
416                             + CollectionUtils.toFormattedString(AbstractConnectionDiscoverer.this.previous));
417                 }
418                 notAvailable =
419                     CollectionFactory.newSet(AbstractConnectionDiscoverer.this.previous.keySet());
420                 notAvailable.removeAll(AbstractConnectionDiscoverer.this.current.keySet());
421                 // now save the current as the previous
422                 AbstractConnectionDiscoverer.this.previous.clear();
423                 AbstractConnectionDiscoverer.this.previous.putAll(AbstractConnectionDiscoverer.this.current);
424                 AbstractConnectionDiscoverer.this.current.clear();
425             }
426             for (String remoteContextId : notAvailable)
427             {
428                 if (getLog().isInfoEnabled())
429                 {
430                     getLog().info("Context not available: " + remoteContextId);
431                 }
432                 AbstractConnectionDiscoverer.this.context.queueEvent(new ContextNotAvailableEvent(
433                     AbstractConnectionDiscoverer.this.context, remoteContextId));
434             }
435         }
436     }
437 
438     /**
439      * Listens for heartbeat pulse messages on the discovery network and adds
440      * them to the {@link AbstractConnectionDiscoverer#receivedHeartbeats
441      * receivedHeartbeats}. These are processed by the
442      * {@link HeartbeatProcessor}.
443      * 
444      * @author Ramon Servadei
445      */
446     final class HeartbeatListener extends AbstractLifeCycle implements Runnable
447     {
448         private HeartbeatListener()
449         {
450             super();
451         }
452 
453         public void run()
454         {
455             while (AbstractConnectionDiscoverer.this.isActive())
456             {
457                 byte[] buf = new byte[1024];
458                 DatagramPacket recv = new DatagramPacket(buf, buf.length);
459                 try
460                 {
461                     socket.receive(recv);
462                     String data =
463                         new String(recv.getData(), ByteConstants.ENCODING).trim();
464                     synchronized (AbstractConnectionDiscoverer.this.receivedHeartbeats)
465                     {
466                         AbstractConnectionDiscoverer.this.receivedHeartbeats.add(data);
467                     }
468                 }
469                 catch (Exception e)
470                 {
471                     logException(getLog(), this, e);
472                 }
473             }
474         }
475 
476         @Override
477         protected void doDestroy()
478         {
479             AbstractConnectionDiscoverer.this.heartbeatListenerThread.interrupt();
480         }
481 
482         @Override
483         protected void doStart()
484         {
485         }
486 
487         @Override
488         protected AsyncLog getLog()
489         {
490             return AbstractConnectionDiscoverer.this.getLog();
491         }
492 
493     }
494 
495     /**
496      * Sends out the heartbeat pulse for this context.
497      * <p>
498      * This class uses an {@link IContextWatchdog} instance that is queried
499      * using the {@link IContextWatchdog#isContextHealthy()} method before
500      * sending out the heartbeat. If the context is not healthy, the heartbeat
501      * pulse is not sent out.
502      * <p>
503      * This class also provides a mechanism to detect performance issue in the
504      * current runtime by checking that the thread running this is activated at
505      * the required heartbeat period. If the time between activations exceeds
506      * the heartbeat period plus a tolerance, the
507      * {@link IContextWatchdog#cpuResourcesLow()} method is invoked, otherwise,
508      * if the activation time is within the period plus tolerance the
509      * {@link IContextWatchdog#systemNormal()} method is invoked.
510      * 
511      * @author Ramon Servadei
512      */
513     final class HeartbeatGenerator extends AbstractLifeCycle implements
514         Runnable
515     {
516 
517         private HeartbeatGenerator()
518         {
519             super();
520         }
521 
522         @Override
523         protected void doStart()
524         {
525         }
526 
527         @Override
528         protected void doDestroy()
529         {
530             AbstractConnectionDiscoverer.this.heartbeatGeneratorThread.interrupt();
531         }
532 
533         @Override
534         protected AsyncLog getLog()
535         {
536             return AbstractConnectionDiscoverer.this.getLog();
537         }
538 
539         public void run()
540         {
541             while (AbstractConnectionDiscoverer.this.isActive())
542             {
543                 try
544                 {
545                     final IContextWatchdog contextWatchdog =
546                         AbstractConnectionDiscoverer.this.context.getContextWatchdog();
547                     boolean pulse = true;
548                     if (contextWatchdog != null)
549                     {
550                         if (!contextWatchdog.isContextHealthy())
551                         {
552                             if (getLog().isWarnEnabled())
553                             {
554                                 getLog().warn(
555                                     AbstractConnectionDiscoverer.this.context
556                                         + " is not healthy, not sending out heartbeat");
557                             }
558                             pulse = false;
559                         }
560                     }
561                     // check for any deadlocks
562                     if (ThreadUtils.findDeadlocks())
563                     {
564                         if (getLog().isWarnEnabled())
565                         {
566                             getLog().warn(
567                                 "Deadlock found, not sending out heartbeat");
568                         }
569                         pulse = false;
570                     }
571                     if (AbstractConnectionDiscoverer.this.isPulsingEnabled()
572                         && pulse)
573                     {
574                         AbstractConnectionDiscoverer.this.pulse();
575                     }
576                     long now = System.currentTimeMillis();
577                     Thread.sleep(AbstractConnectionDiscoverer.this.heartbeatPeriod);
578                     final long actualPeriod = System.currentTimeMillis() - now;
579 
580                     // compare the actual period with the expected period to
581                     // determine any CPU time starvation - allow for 20ms
582                     final int maxTolerance = 20;
583                     if (actualPeriod > AbstractConnectionDiscoverer.this.heartbeatPeriod
584                         + (maxTolerance))
585                     {
586                         if (getLog().isWarnEnabled())
587                         {
588                             getLog().warn(
589                                 AbstractConnectionDiscoverer.this
590                                     + " low CPU resources; heartbeatPeriod "
591                                     + AbstractConnectionDiscoverer.this.heartbeatPeriod
592                                     + "ms but actually took " + actualPeriod
593                                     + "ms");
594                         }
595                         if (contextWatchdog != null)
596                         {
597                             contextWatchdog.cpuResourcesLow();
598                         }
599                     }
600                     else
601                     {
602                         if (contextWatchdog != null)
603                         {
604                             contextWatchdog.systemNormal();
605                         }
606                     }
607                 }
608                 catch (Exception e)
609                 {
610                     logException(getLog(), this, e);
611                 }
612             }
613         }
614     }
615 
616     /**
617      * Constructor that retrieves network values from system properties or use
618      * default values if no system properties are set.
619      * 
620      * @param context
621      *            the local context
622      */
623     public AbstractConnectionDiscoverer(IFrameworkContext context)
624     {
625         this(context, System.getProperty(UDP_DISCOVERY_NETWORK,
626             DEFAULT_UDP_DISCOVERY_NETWORK),
627             Integer.parseInt(System.getProperty(UDP_DISCOVERY_PORT,
628                 DEFAULT_UDP_DISCOVERY_PORT)),
629             System.getProperty(NETWORK_INTERFACE_NAME));
630     }
631 
632     /**
633      * Constructor that uses parameters for the network values.
634      * 
635      * @param context
636      *            the local context
637      * @param udpNetwork
638      *            the UDP network
639      * @param udpPort
640      *            the UDP port
641      * @param udpNic
642      *            the network interface card name to bind to, <code>null</code>
643      *            for default
644      */
645     public AbstractConnectionDiscoverer(IFrameworkContext context,
646         String udpNetwork, int udpPort, String udpNic)
647     {
648         super();
649         this.port = udpPort;
650         this.context = context;
651         try
652         {
653             this.network = InetAddress.getByName(udpNetwork);
654             this.socket = new MulticastSocket(this.port);
655             if (udpNic != null)
656             {
657                 this.socket.setNetworkInterface(NetworkInterface.getByName(udpNic));
658             }
659             this.socket.joinGroup(this.network);
660         }
661         catch (Exception e)
662         {
663             throw new IllegalStateException("Could not create " + this, e);
664         }
665         this.deadContexts = new TtlSet<String>();
666         this.receivedHeartbeats = CollectionFactory.newList();
667         this.current = CollectionFactory.newMap();
668         this.previous = CollectionFactory.newMap();
669 
670         final String threadDetails =
671             this.context.getEventProcessorIdentityPrefix() + COLON
672                 + this.network + COLON + this.port;
673 
674         this.heartbeatListener = new HeartbeatListener();
675         this.heartbeatListenerThread =
676             new Thread(this.heartbeatListener,
677                 this.heartbeatListener.getClass().getSimpleName() + SPACE
678                     + threadDetails);
679         this.heartbeatListenerThread.setDaemon(true);
680 
681         this.heartbeatProcessor = new HeartbeatProcessor();
682         this.heartbeatProcessorThread =
683             new Thread(this.heartbeatProcessor,
684                 this.heartbeatProcessor.getClass().getSimpleName() + SPACE
685                     + threadDetails);
686         this.heartbeatProcessorThread.setDaemon(true);
687 
688         this.heartbeatGenerator = new HeartbeatGenerator();
689         this.heartbeatGeneratorThread =
690             new Thread(this.heartbeatGenerator,
691                 this.heartbeatGenerator.getClass().getSimpleName() + SPACE
692                     + threadDetails);
693         this.heartbeatGeneratorThread.setPriority(Thread.MAX_PRIORITY);
694         this.heartbeatGeneratorThread.setDaemon(true);
695     }
696 
697     /**
698      * Get the UDP port used for connection discovery
699      * 
700      * @return the UDP port for connection discovery
701      */
702     public final int getPort()
703     {
704         return this.port;
705     }
706 
707     /**
708      * Get the UDP network for connection discovery
709      * 
710      * @return the UDP network for connection discovery
711      */
712     public final InetAddress getNetwork()
713     {
714         return this.network;
715     }
716 
717     /**
718      * Get the multicast socket used for connection discovery
719      * 
720      * @return the multicast socket used for connection discovery
721      */
722     public final MulticastSocket getSocket()
723     {
724         return this.socket;
725     }
726 
727     /**
728      * Get the pulse message sent out to other {@link IConnectionDiscoverer}
729      * instances over the discovery network
730      * 
731      * @return the pulse message sent
732      */
733     public final String getPulse()
734     {
735         return this.pulse;
736     }
737 
738     /**
739      * This removes the connection details in the
740      * {@link ConnectionDestroyedEvent} from the
741      * {@link AbstractConnectionDiscoverer#current} map of known connections.
742      */
743     public final void connectionDestroyed(String remoteContextIdentity)
744     {
745         synchronized (AbstractConnectionDiscoverer.this.current)
746         {
747             AbstractConnectionDiscoverer.this.current.remove(remoteContextIdentity);
748             AbstractConnectionDiscoverer.this.previous.remove(remoteContextIdentity);
749         }
750         // store the dead context for processing by the HeartbeatProcessor
751         synchronized (AbstractConnectionDiscoverer.this.deadContexts)
752         {
753             AbstractConnectionDiscoverer.this.deadContexts.add(remoteContextIdentity);
754         }
755     }
756 
757     public final long getNetworkHeartbeatPeriod()
758     {
759         return heartbeatPeriod;
760     }
761 
762     public final int getAllowableNetworkHeartbeatMissCount()
763     {
764         return allowedMissedHeartbeats;
765     }
766 
767     public final void setAllowableNetworkHeartbeatMissCount(
768         int allowedHeartbeatMissCount)
769     {
770         this.allowedMissedHeartbeats = allowedHeartbeatMissCount;
771     }
772 
773     public final void setNetworkHeartbeatPeriod(long periodInMillis)
774     {
775         this.heartbeatPeriod = periodInMillis;
776     }
777 
778     public void disablePulsing()
779     {
780         this.pulsingEnabled = false;
781         updateSystemInfo();
782     }
783 
784     public void enablePulsing()
785     {
786         this.pulsingEnabled = true;
787         updateSystemInfo();
788     }
789 
790     private void updateSystemInfo()
791     {
792         final IContainer systemInfo = this.context.getSystemInfo();
793         systemInfo.beginFrame(new EventFrameExecution());
794         try
795         {
796             systemInfo.add(new BooleanField(SENDING_HEARTBEATS,
797                 isPulsingEnabled()));
798         }
799         finally
800         {
801             systemInfo.endFrame();
802         }
803     }
804 
805     @Override
806     protected final void doDestroy()
807     {
808         if (getLog().isInfoEnabled())
809         {
810             getLog().info("Destroying " + this);
811         }
812         this.heartbeatListener.destroy();
813         this.socket.close();
814         this.heartbeatProcessor.destroy();
815         this.heartbeatGenerator.destroy();
816         if (getLog().isInfoEnabled())
817         {
818             getLog().info("Destroyed " + this);
819         }
820     }
821 
822     @Override
823     protected void doStart()
824     {
825         if (getLog().isInfoEnabled())
826         {
827             getLog().info("Starting " + this);
828         }
829         this.pulse =
830             PULSE_MSG + DELIMITER + this.context.getIdentity() + DELIMITER
831                 + this.context.getContextHashCode() + DELIMITER
832                 + getProtocolConnectionParameters();
833         this.description = getClass().getSimpleName() + " for " + this.pulse;
834         this.heartbeatListenerThread.start();
835         this.heartbeatProcessorThread.start();
836         this.heartbeatGeneratorThread.start();
837         final IContainer systemInfo = this.context.getSystemInfo();
838         systemInfo.beginFrame(new EventFrameExecution());
839         try
840         {
841             systemInfo.add(new StringField(UDP_DISCOVERY_NETWORK,
842                 getNetwork().toString()));
843             systemInfo.add(new IntegerField(UDP_DISCOVERY_PORT, getPort()));
844             try
845             {
846                 systemInfo.add(new StringField(NETWORK_INTERFACE_NAME,
847                     this.socket.getNetworkInterface().getDisplayName()));
848             }
849             catch (SocketException e)
850             {
851                 Utils.logException(getLog(),
852                     "Could not update systemInfo record with UDP NIC", e);
853             }
854             systemInfo.add(new LongField(HEARTBEAT_PERIOD,
855                 getNetworkHeartbeatPeriod()));
856             systemInfo.add(new LongField(ALLOWED_MISSED_HEARTBEATS,
857                 getAllowableNetworkHeartbeatMissCount()));
858             systemInfo.add(new BooleanField(SENDING_HEARTBEATS,
859                 isPulsingEnabled()));
860         }
861         finally
862         {
863             systemInfo.endFrame();
864         }
865     }
866 
867     /**
868      * Determine if the pulse message is from the named context
869      * 
870      * @param contextIdentity
871      *            the context to compare against the pulse message
872      * @param pulseMessage
873      *            the pulse message
874      * @return <code>true</code> if the pulse message is from the identified
875      *         context
876      */
877     protected boolean isPulseFromContext(String contextIdentity,
878         String pulseMessage)
879     {
880         return pulseMessage.startsWith(PULSE_MSG + DELIMITER + contextIdentity
881             + DELIMITER);
882     }
883 
884     /**
885      * Send the message to the multicast socket
886      * 
887      * @param msg
888      */
889     protected void send(String msg)
890     {
891         DatagramPacket data =
892             new DatagramPacket(ByteWriter.getBytes(msg), msg.length(),
893                 this.network, this.port);
894         try
895         {
896             this.socket.send(data);
897         }
898         catch (IOException e)
899         {
900             logException(getLog(), "Could not send message '"
901                 + safeToString(msg) + "'", e);
902         }
903     }
904 
905     @Override
906     public String toString()
907     {
908         return this.description;
909     }
910 
911     @Override
912     protected AsyncLog getLog()
913     {
914         return LOG;
915     }
916 
917     public final void pulse()
918     {
919         if (getLog().isDebugEnabled())
920         {
921             getLog().debug(this + " pulse");
922         }
923         send(this.pulse);
924     }
925 
926     /**
927      * Template method for sub-classes to provide the implementation of how to
928      * handle the 'pulse' message received from a peer
929      * {@link IConnectionDiscoverer}. This method inspects the data string and
930      * creates an appropriate {@link IConnectionParameters} instance
931      * representing it.
932      * 
933      * @param data
934      *            the pulse message from a peer {@link IConnectionDiscoverer}
935      * @return the {@link IConnectionParameters} extracted from the pulse
936      *         message or <code>null</code> if the message could not be
937      *         deciphered.
938      */
939     protected abstract IConnectionParameters getConnectionParameters(String data);
940 
941     protected boolean isPulsingEnabled()
942     {
943         return this.pulsingEnabled;
944     }
945 
946     /**
947      * Overridden by subclasses to provide the protocol specific parameters for
948      * connecting to the connection broker of this context. The parameters must
949      * be delimited using {@link ProtocolMessageConstants#DELIMITER}. There is
950      * no need to prepend the delimiter.
951      * 
952      * @return a string of parameters for the connection protocol of the
953      *         connection broker, delimited using
954      *         {@link ProtocolMessageConstants#DELIMITER}
955      */
956     protected abstract String getProtocolConnectionParameters();
957 
958 }