View Javadoc

1   /*
2      Copyright 2008 Ramon Servadei
3   
4      Licensed under the Apache License, Version 2.0 (the "License");
5      you may not use this file except in compliance with the License.
6      You may obtain a copy of the License at
7   
8          http://www.apache.org/licenses/LICENSE-2.0
9   
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15   */
16  package fulmine.event.subscription;
17  
18  import static fulmine.util.Utils.EMPTY_STRING;
19  import static fulmine.util.Utils.logException;
20  import static fulmine.util.Utils.nullCheck;
21  import static fulmine.util.Utils.safeToString;
22  
23  import java.util.Collection;
24  import java.util.Collections;
25  import java.util.List;
26  import java.util.Map;
27  
28  import fulmine.AbstractLifeCycle;
29  import fulmine.IAddressable;
30  import fulmine.context.IFrameworkContext;
31  import fulmine.event.IEventSource;
32  import fulmine.event.listener.IEventListener;
33  import fulmine.event.system.SubscribeEvent;
34  import fulmine.event.system.UnsubscribeEvent;
35  import fulmine.util.collection.CollectionFactory;
36  import fulmine.util.collection.CollectionUtils;
37  import fulmine.util.log.AsyncLog;
38  
39  /**
40   * Base implementation of a subscription manager.
41   * <p>
42   * This is thread safe. Access on the internal collection of
43   * {@link ISubscription} instances is synchronized just enough to get the
44   * subscription. The subscription implementations are assumed to be thread safe.
45   * 
46   * @author Ramon Servadei
47   */
48  public abstract class SubscriptionManager extends AbstractLifeCycle implements
49      ISubscriptionManager
50  {
51      private final static AsyncLog LOG = new AsyncLog(SubscriptionManager.class);
52  
53      /** The factory to build {@link ISubscription} instances */
54      private final ISubscriptionFactory factory;
55  
56      /** The subscriptions being managed */
57      private final Map<ISubscriptionParameters, ISubscription> subscriptions;
58  
59      /** The context */
60      private final IFrameworkContext context;
61  
62      /**
63       * Standard constructor
64       * 
65       * @param context
66       *            the context to use
67       * @param factory
68       *            the container subscription factory to use
69       */
70      public SubscriptionManager(IFrameworkContext context,
71          ISubscriptionFactory factory)
72      {
73          super();
74          this.subscriptions = CollectionFactory.newMap();
75          nullCheck(factory, "No subscription factory provided");
76          this.factory = factory;
77          this.context = context;
78      }
79  
80      public final boolean subscribe(ISubscriptionParameters parameters)
81      {
82          checkActive();
83          if (getLog().isInfoEnabled())
84          {
85              getLog().info("[subscribe] " + safeToString(parameters));
86          }
87          final ISubscription subscription;
88          synchronized (getSubscriptions())
89          {
90              if (getSubscriptions().containsKey(parameters))
91              {
92                  if (getLog().isInfoEnabled())
93                  {
94                      getLog().info(
95                          "[subscribe] already contains "
96                              + safeToString(parameters));
97                  }
98                  return false;
99              }
100             subscription = this.factory.createSubscription(parameters);
101             getSubscriptions().put(parameters, subscription);
102             // raise a new subscription event
103             doRaiseSubscribeEvent(parameters);
104         }
105         // now find any matches
106         Collection<? extends IEventSource> currentEntities =
107             doGetEventSources();
108         for (IEventSource source : currentEntities)
109         {
110             try
111             {
112                 if (subscription.includes(source))
113                 {
114                     subscription.addMatch(source);
115                 }
116             }
117             catch (Exception e)
118             {
119                 logException(getLog(), source, e);
120             }
121         }
122         return true;
123     }
124 
125     protected final ISubscription getSubscription(
126         ISubscriptionParameters parameters)
127     {
128         final ISubscription subscription;
129         synchronized (getSubscriptions())
130         {
131             subscription = getSubscriptions().get(parameters);
132         }
133         return subscription;
134     }
135 
136     public final boolean addListener(ISubscriptionParameters parameters,
137         IEventListener listener)
138     {
139         checkActive();
140         boolean added = false;
141         final ISubscription subscription;
142         subscription = getSubscription(parameters);
143         if (subscription != null)
144         {
145             added = subscription.addListener(listener);
146         }
147         else
148         {
149             if (getLog().isInfoEnabled())
150             {
151                 getLog().info(
152                     "[add listener] no subscription for "
153                         + safeToString(parameters));
154             }
155         }
156         if (added)
157         {
158             doAddListener(parameters, listener);
159         }
160         return added;
161     }
162 
163     public final boolean removeListener(ISubscriptionParameters parameters,
164         IEventListener listener)
165     {
166         checkActive();
167         boolean removed = false;
168         final ISubscription subscription = getSubscription(parameters);
169         if (subscription != null)
170         {
171             removed = subscription.removeListener(listener);
172         }
173         else
174         {
175             if (getLog().isInfoEnabled())
176             {
177                 getLog().info(
178                     "[remove listener] no subscription for "
179                         + safeToString(parameters));
180             }
181         }
182         return removed;
183     }
184 
185     public final List<IEventListener> getListeners(
186         ISubscriptionParameters parameters)
187     {
188         checkActive();
189         final ISubscription subscription = getSubscription(parameters);
190         if (subscription != null)
191         {
192             return CollectionFactory.newList(subscription.getListeners());
193         }
194         if (getLog().isInfoEnabled())
195         {
196             getLog().info(
197                 "[get listeners] no subscription for "
198                     + safeToString(parameters));
199         }
200         return Collections.emptyList();
201     }
202 
203     public final boolean unsubscribe(ISubscriptionParameters parameters)
204     {
205         if (getLog().isInfoEnabled())
206         {
207             getLog().info("[unsubscribe] " + safeToString(parameters));
208         }
209         // don't check for active because this is called during destroy!
210         // checkActive();
211         final ISubscription subscription;
212         synchronized (getSubscriptions())
213         {
214             subscription = getSubscriptions().remove(parameters);
215         }
216         if (subscription != null)
217         {
218             doRaiseUnsubscribeEvent(parameters);
219         }
220         else
221         {
222             if (getLog().isInfoEnabled())
223             {
224                 getLog().info(
225                     "[unsubscribe] does not contain "
226                         + safeToString(parameters));
227             }
228             return false;
229         }
230         try
231         {
232             subscription.destroy();
233         }
234         catch (Exception e)
235         {
236             logException(getLog(), subscription, e);
237         }
238         return true;
239     }
240 
241     public final boolean isSubscribed(IEventSource source)
242     {
243         checkActive();
244         synchronized (getSubscriptions())
245         {
246             for (ISubscription subscription : getSubscriptions().values())
247             {
248                 try
249                 {
250                     if (subscription.getMatches().contains(source))
251                     {
252                         return true;
253                     }
254                 }
255                 catch (Exception e)
256                 {
257                     logException(getLog(), subscription, e);
258                 }
259             }
260         }
261         return false;
262     }
263 
264     public final Collection<IEventSource> getSubscribedSources(
265         ISubscriptionParameters parameters)
266     {
267         checkActive();
268         synchronized (getSubscriptions())
269         {
270             Collection<IEventSource> matches = CollectionFactory.newSet();
271             for (ISubscription subscription : getSubscriptions().values())
272             {
273                 try
274                 {
275                     // check if the subscription would include the parameters
276                     if (subscription.includes(parameters))
277                     {
278                         // now check the internal matches - if the subscription
279                         // was more generalised than the parameters we need to
280                         // filter out non-matching sources for the parameters
281                         final Collection<IEventSource> potentialMatches =
282                             subscription.getMatches();
283                         for (IEventSource potentialMatch : potentialMatches)
284                         {
285                             if (parameters.includes(potentialMatch))
286                             {
287                                 matches.add(potentialMatch);
288                             }
289                         }
290                     }
291                 }
292                 catch (Exception e)
293                 {
294                     logException(getLog(), subscription, e);
295                 }
296             }
297             return matches;
298         }
299     }
300 
301     public final Collection<IEventSource> getSubscribedSources()
302     {
303         checkActive();
304         Collection<IEventSource> subscribed = CollectionFactory.newSet();
305         synchronized (getSubscriptions())
306         {
307             for (ISubscription subscription : getSubscriptions().values())
308             {
309                 try
310                 {
311                     subscribed.addAll(subscription.getMatches());
312                 }
313                 catch (Exception e)
314                 {
315                     logException(getLog(), subscription, e);
316                 }
317             }
318         }
319         return subscribed;
320     }
321 
322     public final boolean includes(IAddressable parameters)
323     {
324         checkActive();
325         synchronized (getSubscriptions())
326         {
327             for (ISubscription subscription : getSubscriptions().values())
328             {
329                 try
330                 {
331                     if (subscription.includes(parameters))
332                     {
333                         return true;
334                     }
335                 }
336                 catch (Exception e)
337                 {
338                     logException(getLog(), subscription, e);
339                 }
340             }
341         }
342         return false;
343     }
344 
345     public final void eventSourceCreated(IAddressable identity)
346     {
347         if (getLog().isDebugEnabled())
348         {
349             getLog().debug(
350                 "Checking subscriptions for CREATED event source "
351                     + safeToString(identity));
352         }
353         int subscriptionMatchCount = 0;
354         final IEventSource eventSource = doGetEventSource(identity);
355         synchronized (this.getSubscriptions())
356         {
357             // Note: this is rather long-running as it has a time scale
358             // proportional to n subscriptions
359             final SubscriptionParameters subscriptionParameters =
360                 new SubscriptionParameters(identity);
361             for (ISubscription subscription : this.getSubscriptions().values())
362             {
363                 try
364                 {
365                     if (subscription.includes(subscriptionParameters))
366                     {
367                         subscriptionMatchCount++;
368                         subscription.addMatch(eventSource);
369                     }
370                 }
371                 catch (Exception e)
372                 {
373                     logException(getLog(), subscription, e);
374                 }
375             }
376         }
377         if (subscriptionMatchCount > 0)
378         {
379             doEventSourceCreated(eventSource);
380             if (getLog().isDebugEnabled())
381             {
382                 getLog().debug(
383                     EMPTY_STRING + subscriptionMatchCount
384                         + " matches found for CREATED "
385                         + safeToString(identity));
386             }
387         }
388     }
389 
390     public void eventSourceDestroyed(IAddressable identity)
391     {
392         if (getLog().isDebugEnabled())
393         {
394             getLog().debug(
395                 "Checking subscriptions for DESTROYED "
396                     + safeToString(identity));
397         }
398         int subscriptionMatchCount = 0;
399         synchronized (this.getSubscriptions())
400         {
401             // Note: this is rather long-running as it has a time scale
402             // proportional to n subscriptions
403             for (ISubscription subscription : this.getSubscriptions().values())
404             {
405                 try
406                 {
407                     final SubscriptionParameters subscriptionParameters =
408                         new SubscriptionParameters(identity);
409                     if (subscription.includes(subscriptionParameters))
410                     {
411                         subscriptionMatchCount++;
412                         subscription.removeMatch(subscriptionParameters);
413                     }
414                 }
415                 catch (Exception e)
416                 {
417                     logException(getLog(), subscription, e);
418                 }
419             }
420         }
421         if (subscriptionMatchCount > 0)
422         {
423             if (getLog().isDebugEnabled())
424             {
425                 getLog().debug(
426                     EMPTY_STRING + subscriptionMatchCount
427                         + " matches found for DESTROYED "
428                         + safeToString(identity));
429             }
430         }
431     }
432 
433     protected void doRaiseSubscribeEvent(ISubscriptionParameters parameters)
434     {
435         this.context.queueEvent(new SubscribeEvent(this.context, parameters));
436     }
437 
438     protected void doRaiseUnsubscribeEvent(ISubscriptionParameters parameters)
439     {
440         this.context.queueEvent(new UnsubscribeEvent(this.context, parameters));
441     }
442 
443     /**
444      * Get the current subscriptions parameters to subscription. <b>Accessing
445      * this must be synchronized</b>
446      * 
447      * @return the map of subscriptions parameters to subscriptions.
448      */
449     protected final Map<ISubscriptionParameters, ISubscription> getSubscriptions()
450     {
451         return this.subscriptions;
452     }
453 
454     /**
455      * Template method to get the collection of {@link IEventSource} instances
456      * the manager will operate over.
457      * 
458      * @return a collection of {@link IEventSource} instances
459      */
460     protected abstract Collection<? extends IEventSource> doGetEventSources();
461 
462     /**
463      * Template method to get the {@link IEventSource} identified by the
464      * {@link IAddressable} argument.
465      * 
466      * @param id
467      *            identifies the event source
468      * @return the {@link IEventSource} implementation to use
469      */
470     protected abstract IEventSource doGetEventSource(IAddressable id);
471 
472     /**
473      * Template method to identify if the {@link IEventSource} identified by the
474      * {@link IAddressable} argument exists.
475      * 
476      * @param id
477      *            identifies the event source
478      * 
479      * @return <code>true</code> if the container exists in the current context
480      *         scope
481      */
482     protected abstract boolean doEventSourceExists(IAddressable id);
483 
484     /**
485      * Called as the final step in the {@link #eventSourceCreated(IAddressable)}
486      * method. This provides sub-classes with an opportunity to perform any post
487      * creation work.
488      * 
489      * @param identity
490      *            identifies the event source
491      */
492     protected abstract void doEventSourceCreated(IAddressable id);
493 
494     /**
495      * Called as the final step in
496      * {@link #addListener(ISubscriptionParameters, IEventListener)}.
497      * Sub-classes can perform necessary closure work when the listener is
498      * added.
499      * 
500      * @param parameters
501      *            the subscription parameters identifying the subscripion(s)
502      * @param listener
503      *            the listener added
504      */
505     protected abstract void doAddListener(ISubscriptionParameters parameters,
506         IEventListener listener);
507 
508     @Override
509     protected void doDestroy()
510     {
511         synchronized (getSubscriptions())
512         {
513             Map<ISubscriptionParameters, ISubscription> copy =
514                 CollectionFactory.newMap(getSubscriptions());
515             for (ISubscription subscription : copy.values())
516             {
517                 try
518                 {
519                     unsubscribe(subscription);
520                 }
521                 catch (Exception e)
522                 {
523                     logException(getLog(), subscription, e);
524                 }
525             }
526             getSubscriptions().clear();
527         }
528     }
529 
530     @Override
531     protected AsyncLog getLog()
532     {
533         return LOG;
534     }
535 
536     protected IFrameworkContext getContext()
537     {
538         return context;
539     }
540 
541     @Override
542     public final String toString()
543     {
544         StringBuilder sb;
545         synchronized (getSubscriptions())
546         {
547             sb = CollectionUtils.toFormattedString(getSubscriptions());
548         }
549         return getClass().getSimpleName() + " subscriptions=" + sb.toString();
550     }
551 }