View Javadoc

1   /*
2      Copyright 2008 Ramon Servadei
3   
4      Licensed under the Apache License, Version 2.0 (the "License");
5      you may not use this file except in compliance with the License.
6      You may obtain a copy of the License at
7   
8          http://www.apache.org/licenses/LICENSE-2.0
9   
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15   */
16  package fulmine.event.subscription;
17  
18  import static fulmine.util.Utils.logException;
19  import static fulmine.util.Utils.nullCheck;
20  import static fulmine.util.Utils.string;
21  
22  import java.util.Collection;
23  import java.util.Iterator;
24  import java.util.List;
25  
26  import org.apache.commons.logging.Log;
27  
28  import fulmine.IAddressable;
29  import fulmine.IDomain;
30  import fulmine.IType;
31  import fulmine.event.IEventSource;
32  import fulmine.event.listener.IEventListener;
33  import fulmine.util.collection.CollectionFactory;
34  import fulmine.util.log.AsyncLog;
35  
36  /**
37   * The base implementation of a subscription.
38   * <p>
39   * Thread safe.
40   * 
41   * @author Ramon Servadei
42   */
43  public class Subscription implements ISubscription
44  {
45      private final static AsyncLog LOG = new AsyncLog(Subscription.class);
46  
47      /** The source instances this subscription has matched */
48      final Collection<IEventSource> matches;
49  
50      /**
51       * The listeners used by the subscription to register against matching event
52       * sources
53       */
54      final List<IEventListener> listeners;
55  
56      /** The parameters for the subscription */
57      private final ISubscriptionParameters parameters;
58  
59      /**
60       * Construct the subscription to match the subscription passed in.
61       * 
62       * @param subscription
63       *            the subscription subscription for this
64       */
65      protected Subscription(ISubscriptionParameters parameters)
66      {
67          super();
68          nullCheck(parameters, "no subscription parameter provided");
69          this.parameters = parameters;
70          this.listeners = CollectionFactory.newList(1);
71          this.matches = CollectionFactory.newList(1);
72      }
73  
74      public synchronized final boolean addMatch(IEventSource source)
75      {
76          if (!this.matches.contains(source))
77          {
78              if (getLog().isTraceEnabled())
79              {
80                  getLog().trace(
81                      "Adding match " + source.toIdentityString() + " to " + this);
82              }
83              this.matches.add(source);
84              if (source.isActive())
85              {
86                  for (IEventListener eventListener : this.listeners)
87                  {
88                      try
89                      {
90                          source.addListener(eventListener);
91                      }
92                      catch (Exception e)
93                      {
94                          logException(getLog(), source, e);
95                      }
96                  }
97                  doAddOperation(source);
98                  return true;
99              }
100             logException(getLog(), this + " is attempting to subscribe"
101                 + " to inactive event source: " + source, new Exception());
102         }
103         return false;
104     }
105 
106     public synchronized final boolean removeMatch(
107         ISubscriptionParameters parameters)
108     {
109         if (parameters == null)
110         {
111             return false;
112         }
113         for (Iterator<? extends IEventSource> iterator =
114             this.matches.iterator(); iterator.hasNext();)
115         {
116             final IEventSource source = iterator.next();
117             try
118             {
119                 if (parameters.includes(source))
120                 {
121                     if (getLog().isTraceEnabled())
122                     {
123                         getLog().trace(
124                             "Removing identity=" + source.getIdentity()
125                                 + ", type=" + source.getType() + ", domain="
126                                 + source.getDomain() + " from " + this);
127                     }
128                     iterator.remove();
129                     // remove the listeners of this subscription from the source
130                     for (IEventListener eventListener : this.listeners)
131                     {
132                         try
133                         {
134                             source.removeListener(eventListener);
135                         }
136                         catch (Exception e)
137                         {
138                             logException(LOG, source, e);
139                         }
140                     }
141                     doRemoveOperation(source);
142                     return true;
143                 }
144             }
145             catch (Exception e)
146             {
147                 logException(LOG, source, e);
148             }
149         }
150         return false;
151     }
152 
153     public synchronized final Collection<IEventSource> getMatches()
154     {
155         return CollectionFactory.newList(this.matches);
156     }
157 
158     public synchronized final List<IEventListener> getListeners()
159     {
160         return CollectionFactory.newList(this.listeners);
161     }
162 
163     public synchronized final boolean addListener(IEventListener listener)
164     {
165         return doAddListener(listener);
166     }
167 
168     public synchronized final boolean removeListener(IEventListener listener)
169     {
170         if (this.listeners.contains(listener))
171         {
172             this.listeners.remove(listener);
173             // remove the listener to currently matched sources
174             for (IEventSource source : this.matches)
175             {
176                 try
177                 {
178                     source.removeListener(listener);
179                 }
180                 catch (Exception e)
181                 {
182                     logException(getLog(), source, e);
183                 }
184             }
185             if (getLog().isInfoEnabled())
186             {
187                 getLog().info("Removed listener " + listener + " from " + this);
188             }
189             return true;
190         }
191         return false;
192     }
193 
194     public final IDomain getDomain()
195     {
196         return this.parameters.getDomain();
197     }
198 
199     public final String getIdentity()
200     {
201         return this.parameters.getIdentity();
202     }
203 
204     public final IType getType()
205     {
206         return this.parameters.getType();
207     }
208 
209     public final String getAddress()
210     {
211         return this.parameters.getAddress();
212     }
213 
214     public final boolean includes(IAddressable parameters)
215     {
216         return this.parameters.includes(parameters);
217     }
218 
219     public synchronized void destroy()
220     {
221         if (getLog().isTraceEnabled())
222         {
223             getLog().trace("Destroying " + this);
224         }
225         // do not destroy the listener!
226         for (IEventSource source : this.matches)
227         {
228             try
229             {
230                 for (IEventListener listener : this.listeners)
231                 {
232                     try
233                     {
234                         source.removeListener(listener);
235                     }
236                     catch (Exception e)
237                     {
238                         logException(LOG, source, e);
239                     }
240                 }
241                 doRemoveOperation(source);
242             }
243             catch (Exception e)
244             {
245                 logException(LOG, source, e);
246             }
247         }
248         this.matches.clear();
249         this.listeners.clear();
250     }
251 
252     /**
253      * Template method for the operation to perform when a source match is
254      * added.
255      * 
256      * @param source
257      *            the source the operation applies to
258      * @return <code>true</code> if the operation did something
259      */
260     protected boolean doAddOperation(IEventSource source)
261     {
262         return true;
263     }
264 
265     /**
266      * Template method for the operation to perform when a source match is
267      * removed.
268      * 
269      * @param source
270      *            the source the operation applies to
271      * @return <code>true</code> if the operation did something
272      */
273     protected boolean doRemoveOperation(IEventSource source)
274     {
275         return true;
276     }
277 
278     private boolean doAddListener(IEventListener listener)
279     {
280         if (!this.listeners.contains(listener))
281         {
282             this.listeners.add(listener);
283             if (getLog().isTraceEnabled())
284             {
285                 getLog().trace("[adding listener] " + listener + " to " + this);
286             }
287             // add the listener to currently matched sources
288             for (IEventSource source : this.matches)
289             {
290                 try
291                 {
292                     source.addListener(listener);
293                 }
294                 catch (Exception e)
295                 {
296                     logException(getLog(), source, e);
297                 }
298             }
299             if (getLog().isInfoEnabled())
300             {
301                 getLog().info("Added listener " + listener + " to " + this);
302             }
303             return true;
304         }
305         if (getLog().isTraceEnabled())
306         {
307             getLog().trace(this + " already contains listener " + listener);
308         }
309         return false;
310     }
311 
312     protected Log getLog()
313     {
314         return LOG;
315     }
316 
317     @Override
318     public synchronized final String toString()
319     {
320         return string(this, this.parameters + ", matches="
321             + this.matches.size());
322     }
323 
324     @Override
325     public final int hashCode()
326     {
327         return ((parameters == null) ? 0 : parameters.hashCode());
328     }
329 
330     @Override
331     public final boolean equals(Object obj)
332     {
333         if (this == obj)
334         {
335             return true;
336         }
337         if (obj == null)
338         {
339             return false;
340         }
341         if (!(obj instanceof ISubscriptionParameters))
342         {
343             return false;
344         }
345         final ISubscriptionParameters other = (ISubscriptionParameters) obj;
346         if (this.parameters == null)
347         {
348             return false;
349         }
350         return this.parameters.equals(other);
351     }
352 }