1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package fulmine.event.subscription;
17
18 import static fulmine.util.Utils.EMPTY_STRING;
19 import static fulmine.util.Utils.logException;
20 import static fulmine.util.Utils.nullCheck;
21 import static fulmine.util.Utils.safeToString;
22
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.Map;
27
28 import fulmine.AbstractLifeCycle;
29 import fulmine.IAddressable;
30 import fulmine.context.IFrameworkContext;
31 import fulmine.event.IEventSource;
32 import fulmine.event.listener.IEventListener;
33 import fulmine.event.system.SubscribeEvent;
34 import fulmine.event.system.UnsubscribeEvent;
35 import fulmine.util.collection.CollectionFactory;
36 import fulmine.util.collection.CollectionUtils;
37 import fulmine.util.log.AsyncLog;
38
39
40
41
42
43
44
45
46
47
48 public abstract class SubscriptionManager extends AbstractLifeCycle implements
49 ISubscriptionManager
50 {
51 private final static AsyncLog LOG = new AsyncLog(SubscriptionManager.class);
52
53
54 private final ISubscriptionFactory factory;
55
56
57 private final Map<ISubscriptionParameters, ISubscription> subscriptions;
58
59
60 private final IFrameworkContext context;
61
62
63
64
65
66
67
68
69
70 public SubscriptionManager(IFrameworkContext context,
71 ISubscriptionFactory factory)
72 {
73 super();
74 this.subscriptions = CollectionFactory.newMap();
75 nullCheck(factory, "No subscription factory provided");
76 this.factory = factory;
77 this.context = context;
78 }
79
80 public final boolean subscribe(ISubscriptionParameters parameters)
81 {
82 checkActive();
83 if (getLog().isInfoEnabled())
84 {
85 getLog().info("[subscribe] " + safeToString(parameters));
86 }
87 final ISubscription subscription;
88 synchronized (getSubscriptions())
89 {
90 if (getSubscriptions().containsKey(parameters))
91 {
92 if (getLog().isInfoEnabled())
93 {
94 getLog().info(
95 "[subscribe] already contains "
96 + safeToString(parameters));
97 }
98 return false;
99 }
100 subscription = this.factory.createSubscription(parameters);
101 getSubscriptions().put(parameters, subscription);
102
103 doRaiseSubscribeEvent(parameters);
104 }
105
106 Collection<? extends IEventSource> currentEntities =
107 doGetEventSources();
108 for (IEventSource source : currentEntities)
109 {
110 try
111 {
112 if (subscription.includes(source))
113 {
114 subscription.addMatch(source);
115 }
116 }
117 catch (Exception e)
118 {
119 logException(getLog(), source, e);
120 }
121 }
122 return true;
123 }
124
125 protected final ISubscription getSubscription(
126 ISubscriptionParameters parameters)
127 {
128 final ISubscription subscription;
129 synchronized (getSubscriptions())
130 {
131 subscription = getSubscriptions().get(parameters);
132 }
133 return subscription;
134 }
135
136 public final boolean addListener(ISubscriptionParameters parameters,
137 IEventListener listener)
138 {
139 checkActive();
140 boolean added = false;
141 final ISubscription subscription;
142 subscription = getSubscription(parameters);
143 if (subscription != null)
144 {
145 added = subscription.addListener(listener);
146 }
147 else
148 {
149 if (getLog().isInfoEnabled())
150 {
151 getLog().info(
152 "[add listener] no subscription for "
153 + safeToString(parameters));
154 }
155 }
156 if (added)
157 {
158 doAddListener(parameters, listener);
159 }
160 return added;
161 }
162
163 public final boolean removeListener(ISubscriptionParameters parameters,
164 IEventListener listener)
165 {
166 checkActive();
167 boolean removed = false;
168 final ISubscription subscription = getSubscription(parameters);
169 if (subscription != null)
170 {
171 removed = subscription.removeListener(listener);
172 }
173 else
174 {
175 if (getLog().isInfoEnabled())
176 {
177 getLog().info(
178 "[remove listener] no subscription for "
179 + safeToString(parameters));
180 }
181 }
182 return removed;
183 }
184
185 public final List<IEventListener> getListeners(
186 ISubscriptionParameters parameters)
187 {
188 checkActive();
189 final ISubscription subscription = getSubscription(parameters);
190 if (subscription != null)
191 {
192 return CollectionFactory.newList(subscription.getListeners());
193 }
194 if (getLog().isInfoEnabled())
195 {
196 getLog().info(
197 "[get listeners] no subscription for "
198 + safeToString(parameters));
199 }
200 return Collections.emptyList();
201 }
202
203 public final boolean unsubscribe(ISubscriptionParameters parameters)
204 {
205 if (getLog().isInfoEnabled())
206 {
207 getLog().info("[unsubscribe] " + safeToString(parameters));
208 }
209
210
211 final ISubscription subscription;
212 synchronized (getSubscriptions())
213 {
214 subscription = getSubscriptions().remove(parameters);
215 }
216 if (subscription != null)
217 {
218 doRaiseUnsubscribeEvent(parameters);
219 }
220 else
221 {
222 if (getLog().isInfoEnabled())
223 {
224 getLog().info(
225 "[unsubscribe] does not contain "
226 + safeToString(parameters));
227 }
228 return false;
229 }
230 try
231 {
232 subscription.destroy();
233 }
234 catch (Exception e)
235 {
236 logException(getLog(), subscription, e);
237 }
238 return true;
239 }
240
241 public final boolean isSubscribed(IEventSource source)
242 {
243 checkActive();
244 synchronized (getSubscriptions())
245 {
246 for (ISubscription subscription : getSubscriptions().values())
247 {
248 try
249 {
250 if (subscription.getMatches().contains(source))
251 {
252 return true;
253 }
254 }
255 catch (Exception e)
256 {
257 logException(getLog(), subscription, e);
258 }
259 }
260 }
261 return false;
262 }
263
264 public final Collection<IEventSource> getSubscribedSources(
265 ISubscriptionParameters parameters)
266 {
267 checkActive();
268 synchronized (getSubscriptions())
269 {
270 Collection<IEventSource> matches = CollectionFactory.newSet();
271 for (ISubscription subscription : getSubscriptions().values())
272 {
273 try
274 {
275
276 if (subscription.includes(parameters))
277 {
278
279
280
281 final Collection<IEventSource> potentialMatches =
282 subscription.getMatches();
283 for (IEventSource potentialMatch : potentialMatches)
284 {
285 if (parameters.includes(potentialMatch))
286 {
287 matches.add(potentialMatch);
288 }
289 }
290 }
291 }
292 catch (Exception e)
293 {
294 logException(getLog(), subscription, e);
295 }
296 }
297 return matches;
298 }
299 }
300
301 public final Collection<IEventSource> getSubscribedSources()
302 {
303 checkActive();
304 Collection<IEventSource> subscribed = CollectionFactory.newSet();
305 synchronized (getSubscriptions())
306 {
307 for (ISubscription subscription : getSubscriptions().values())
308 {
309 try
310 {
311 subscribed.addAll(subscription.getMatches());
312 }
313 catch (Exception e)
314 {
315 logException(getLog(), subscription, e);
316 }
317 }
318 }
319 return subscribed;
320 }
321
322 public final boolean includes(IAddressable parameters)
323 {
324 checkActive();
325 synchronized (getSubscriptions())
326 {
327 for (ISubscription subscription : getSubscriptions().values())
328 {
329 try
330 {
331 if (subscription.includes(parameters))
332 {
333 return true;
334 }
335 }
336 catch (Exception e)
337 {
338 logException(getLog(), subscription, e);
339 }
340 }
341 }
342 return false;
343 }
344
345 public final void eventSourceCreated(IAddressable identity)
346 {
347 if (getLog().isDebugEnabled())
348 {
349 getLog().debug(
350 "Checking subscriptions for CREATED event source "
351 + safeToString(identity));
352 }
353 int subscriptionMatchCount = 0;
354 final IEventSource eventSource = doGetEventSource(identity);
355 synchronized (this.getSubscriptions())
356 {
357
358
359 final SubscriptionParameters subscriptionParameters =
360 new SubscriptionParameters(identity);
361 for (ISubscription subscription : this.getSubscriptions().values())
362 {
363 try
364 {
365 if (subscription.includes(subscriptionParameters))
366 {
367 subscriptionMatchCount++;
368 subscription.addMatch(eventSource);
369 }
370 }
371 catch (Exception e)
372 {
373 logException(getLog(), subscription, e);
374 }
375 }
376 }
377 if (subscriptionMatchCount > 0)
378 {
379 doEventSourceCreated(eventSource);
380 if (getLog().isDebugEnabled())
381 {
382 getLog().debug(
383 EMPTY_STRING + subscriptionMatchCount
384 + " matches found for CREATED "
385 + safeToString(identity));
386 }
387 }
388 }
389
390 public void eventSourceDestroyed(IAddressable identity)
391 {
392 if (getLog().isDebugEnabled())
393 {
394 getLog().debug(
395 "Checking subscriptions for DESTROYED "
396 + safeToString(identity));
397 }
398 int subscriptionMatchCount = 0;
399 synchronized (this.getSubscriptions())
400 {
401
402
403 for (ISubscription subscription : this.getSubscriptions().values())
404 {
405 try
406 {
407 final SubscriptionParameters subscriptionParameters =
408 new SubscriptionParameters(identity);
409 if (subscription.includes(subscriptionParameters))
410 {
411 subscriptionMatchCount++;
412 subscription.removeMatch(subscriptionParameters);
413 }
414 }
415 catch (Exception e)
416 {
417 logException(getLog(), subscription, e);
418 }
419 }
420 }
421 if (subscriptionMatchCount > 0)
422 {
423 if (getLog().isDebugEnabled())
424 {
425 getLog().debug(
426 EMPTY_STRING + subscriptionMatchCount
427 + " matches found for DESTROYED "
428 + safeToString(identity));
429 }
430 }
431 }
432
433 protected void doRaiseSubscribeEvent(ISubscriptionParameters parameters)
434 {
435 this.context.queueEvent(new SubscribeEvent(this.context, parameters));
436 }
437
438 protected void doRaiseUnsubscribeEvent(ISubscriptionParameters parameters)
439 {
440 this.context.queueEvent(new UnsubscribeEvent(this.context, parameters));
441 }
442
443
444
445
446
447
448
449 protected final Map<ISubscriptionParameters, ISubscription> getSubscriptions()
450 {
451 return this.subscriptions;
452 }
453
454
455
456
457
458
459
460 protected abstract Collection<? extends IEventSource> doGetEventSources();
461
462
463
464
465
466
467
468
469
470 protected abstract IEventSource doGetEventSource(IAddressable id);
471
472
473
474
475
476
477
478
479
480
481
482 protected abstract boolean doEventSourceExists(IAddressable id);
483
484
485
486
487
488
489
490
491
492 protected abstract void doEventSourceCreated(IAddressable id);
493
494
495
496
497
498
499
500
501
502
503
504
505 protected abstract void doAddListener(ISubscriptionParameters parameters,
506 IEventListener listener);
507
508 @Override
509 protected void doDestroy()
510 {
511 synchronized (getSubscriptions())
512 {
513 Map<ISubscriptionParameters, ISubscription> copy =
514 CollectionFactory.newMap(getSubscriptions());
515 for (ISubscription subscription : copy.values())
516 {
517 try
518 {
519 unsubscribe(subscription);
520 }
521 catch (Exception e)
522 {
523 logException(getLog(), subscription, e);
524 }
525 }
526 getSubscriptions().clear();
527 }
528 }
529
530 @Override
531 protected AsyncLog getLog()
532 {
533 return LOG;
534 }
535
536 protected IFrameworkContext getContext()
537 {
538 return context;
539 }
540
541 @Override
542 public final String toString()
543 {
544 StringBuilder sb;
545 synchronized (getSubscriptions())
546 {
547 sb = CollectionUtils.toFormattedString(getSubscriptions());
548 }
549 return getClass().getSimpleName() + " subscriptions=" + sb.toString();
550 }
551 }