1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.server.location.listeners;
18 
19 import android.annotation.NonNull;
20 import android.annotation.Nullable;
21 import android.util.ArrayMap;
22 import android.util.ArraySet;
23 
24 import com.android.internal.annotations.GuardedBy;
25 import com.android.internal.listeners.ListenerExecutor.ListenerOperation;
26 import com.android.internal.util.Preconditions;
27 
28 import java.io.FileDescriptor;
29 import java.io.PrintWriter;
30 import java.util.AbstractMap;
31 import java.util.ArrayList;
32 import java.util.Collection;
33 import java.util.Map.Entry;
34 import java.util.Objects;
35 import java.util.function.Function;
36 import java.util.function.Predicate;
37 
38 /**
39  * A base class to multiplex some event source to multiple listener registrations. Every listener is
40  * represented by a registration object which stores all required state for a listener. Keys are
41  * used to uniquely identify every registration. Listener operations may be executed on
42  * registrations in order to invoke the represented listener.
43  *
44  * <p>Registrations are divided into two categories, active registrations and inactive
45  * registrations, as defined by {@link #isActive(ListenerRegistration)}. The set of active
46  * registrations is combined into a single merged registration, which is submitted to the backing
47  * event source when necessary in order to register with the event source. The merged registration
48  * is updated whenever the set of active registration changes. Listeners will only be invoked for
49  * active registrations.
50  *
51  * <p>In order to inform the multiplexer of state changes, if a registration's active state changes,
52  * or if the merged registration changes, {@link #updateRegistrations(Predicate)} or {@link
53  * #updateRegistration(Object, Predicate)} must be invoked and return true for any registration
54  * whose state may have changed in such a way that the active state or merged registration state has
55  * changed. It is acceptable to return true from a predicate even if nothing has changed, though
56  * this may result in extra pointless work.
57  *
58  * <p>Callbacks invoked for various changes will always be ordered according to this lifecycle list:
59  *
60  * <ul>
61  *   <li>{@link #onRegister()}
62  *   <li>{@link ListenerRegistration#onRegister(Object)}
63  *   <li>{@link #onRegistrationAdded(Object, ListenerRegistration)}
64  *   <li>{@link #onActive()}
65  *   <li>{@link ListenerRegistration#onActive()}
66  *   <li>{@link ListenerRegistration#onInactive()}
67  *   <li>{@link #onInactive()}
68  *   <li>{@link #onRegistrationRemoved(Object, ListenerRegistration)}
69  *   <li>{@link ListenerRegistration#onUnregister()}
70  *   <li>{@link #onUnregister()}
71  * </ul>
72  *
73  * <p>If one registration replaces another, then {@link #onRegistrationReplaced(Object,
74  * ListenerRegistration, Object, ListenerRegistration)} is invoked instead of {@link
75  * #onRegistrationRemoved(Object, ListenerRegistration)} and {@link #onRegistrationAdded(Object,
76  * ListenerRegistration)}.
77  *
78  * <p>Adding registrations is not allowed to be called re-entrantly (ie, while in the middle of some
79  * other operation or callback). Removal is allowed re-entrantly, however only via {@link
80  * #removeRegistration(Object, ListenerRegistration)}, not via any other removal method. This
81  * ensures re-entrant removal does not accidentally remove the incorrect registration.
82  *
83  * @param <TKey>                key type
84  * @param <TListener>           listener type
85  * @param <TRegistration>       registration type
86  * @param <TMergedRegistration> merged registration type
87  */
88 public abstract class ListenerMultiplexer<TKey, TListener,
89         TRegistration extends ListenerRegistration<TListener>, TMergedRegistration> {
90 
91     /**
92      * The lock object used by the multiplexer. Acquiring this lock allows for multiple operations
93      * on the multiplexer to be completed atomically. Otherwise, it is not required to hold this
94      * lock. This lock is held while invoking all lifecycle callbacks on both the multiplexer and
95      * any registrations.
96      */
97     protected final Object mMultiplexerLock = new Object();
98 
99     @GuardedBy("mMultiplexerLock")
100     private final ArrayMap<TKey, TRegistration> mRegistrations = new ArrayMap<>();
101 
102     private final UpdateServiceBuffer mUpdateServiceBuffer = new UpdateServiceBuffer();
103 
104     private final ReentrancyGuard mReentrancyGuard = new ReentrancyGuard();
105 
106     @GuardedBy("mMultiplexerLock")
107     private int mActiveRegistrationsCount = 0;
108 
109     @GuardedBy("mMultiplexerLock")
110     private boolean mServiceRegistered = false;
111 
112     @GuardedBy("mMultiplexerLock")
113     @Nullable private TMergedRegistration mMerged;
114 
115     /**
116      * Should be implemented to register with the backing service with the given merged
117      * registration, and should return true if a matching call to {@link #unregisterWithService()}
118      * is required to unregister (ie, if registration succeeds). The set of registrations passed in
119      * is the same set passed into {@link #mergeRegistrations(Collection)} to generate the merged
120      * registration.
121      *
122      * <p class="note">It may seem redundant to pass in the set of active registrations when they
123      * have already been used to generate the merged request, and indeed, for many implementations
124      * this parameter can likely simply be ignored. However, some implementations may require access
125      * to the set of registrations used to generate the merged requestion for further logic even
126      * after the merged registration has been generated.
127      *
128      * @see #mergeRegistrations(Collection)
129      * @see #reregisterWithService(Object, Object, Collection)
130      */
131     @GuardedBy("mMultiplexerLock")
registerWithService(TMergedRegistration merged, @NonNull Collection<TRegistration> registrations)132     protected abstract boolean registerWithService(TMergedRegistration merged,
133             @NonNull Collection<TRegistration> registrations);
134 
135     /**
136      * Invoked when the service has already been registered with some merged registration, and is
137      * now being registered with a different merged registration. The default implementation simply
138      * invokes {@link #registerWithService(Object, Collection)}.
139      *
140      * @see #registerWithService(Object, Collection)
141      */
142     @GuardedBy("mMultiplexerLock")
reregisterWithService(TMergedRegistration oldMerged, TMergedRegistration newMerged, @NonNull Collection<TRegistration> registrations)143     protected boolean reregisterWithService(TMergedRegistration oldMerged,
144             TMergedRegistration newMerged, @NonNull Collection<TRegistration> registrations) {
145         return registerWithService(newMerged, registrations);
146     }
147 
148     /**
149      * Should be implemented to unregister from the backing service.
150      */
151     @GuardedBy("mMultiplexerLock")
unregisterWithService()152     protected abstract void unregisterWithService();
153 
154     /**
155      * Defines whether a registration is currently active or not. Only active registrations will be
156      * forwarded to {@link #registerWithService(Object, Collection)}, and listener invocations will
157      * only be delivered to active requests. If a registration's active state changes,
158      * {@link #updateRegistrations(Predicate)} must be invoked with a function that returns true for
159      * any registrations that may have changed their active state.
160      */
161     @GuardedBy("mMultiplexerLock")
isActive(@onNull TRegistration registration)162     protected abstract boolean isActive(@NonNull TRegistration registration);
163 
164     /**
165      * Called in order to generate a merged registration from the given set of active registrations.
166      * The list of registrations will never be empty. If the resulting merged registration is equal
167      * to the currently registered merged registration, nothing further will happen. If the merged
168      * registration differs, {@link #registerWithService(Object, Collection)} or
169      * {@link #reregisterWithService(Object, Object, Collection)} will be invoked with the new
170      * merged registration so that the backing service can be updated.
171      */
172     @GuardedBy("mMultiplexerLock")
mergeRegistrations( @onNull Collection<TRegistration> registrations)173     protected abstract TMergedRegistration mergeRegistrations(
174             @NonNull Collection<TRegistration> registrations);
175 
176     /**
177      * Invoked when the multiplexer goes from having no registrations to having some registrations.
178      * This is a convenient entry point for registering listeners, etc, which only need to be
179      * present while there are any registrations. Invoked while holding the multiplexer's internal
180      * lock.
181      */
182     @GuardedBy("mMultiplexerLock")
onRegister()183     protected void onRegister() {}
184 
185     /**
186      * Invoked when the multiplexer goes from having some registrations to having no registrations.
187      * This is a convenient entry point for unregistering listeners, etc, which only need to be
188      * present while there are any registrations. Invoked while holding the multiplexer's internal
189      * lock.
190      */
191     @GuardedBy("mMultiplexerLock")
onUnregister()192     protected void onUnregister() {}
193 
194     /**
195      * Invoked when a registration is added. Invoked while holding the multiplexer's internal lock.
196      */
197     @GuardedBy("mMultiplexerLock")
onRegistrationAdded(@onNull TKey key, @NonNull TRegistration registration)198     protected void onRegistrationAdded(@NonNull TKey key, @NonNull TRegistration registration) {}
199 
200     /**
201      * Invoked when one registration replaces another (through {@link #replaceRegistration(Object,
202      * Object, ListenerRegistration)}). The old registration has already been unregistered at this
203      * point. Invoked while holding the multiplexer's internal lock.
204      *
205      * <p>The default behavior is simply to call first {@link #onRegistrationRemoved(Object,
206      * ListenerRegistration)} and then {@link #onRegistrationAdded(Object, ListenerRegistration)}.
207      */
208     @GuardedBy("mMultiplexerLock")
onRegistrationReplaced( @onNull TKey oldKey, @NonNull TRegistration oldRegistration, @NonNull TKey newKey, @NonNull TRegistration newRegistration)209     protected void onRegistrationReplaced(
210             @NonNull TKey oldKey,
211             @NonNull TRegistration oldRegistration,
212             @NonNull TKey newKey,
213             @NonNull TRegistration newRegistration) {
214         onRegistrationRemoved(oldKey, oldRegistration);
215         onRegistrationAdded(newKey, newRegistration);
216     }
217 
218     /**
219      * Invoked when a registration is removed. Invoked while holding the multiplexer's internal
220      * lock.
221      */
222     @GuardedBy("mMultiplexerLock")
onRegistrationRemoved(@onNull TKey key, @NonNull TRegistration registration)223     protected void onRegistrationRemoved(@NonNull TKey key, @NonNull TRegistration registration) {}
224 
225     /**
226      * Invoked when the multiplexer goes from having no active registrations to having some active
227      * registrations. This is a convenient entry point for registering listeners, etc, which only
228      * need to be present while there are active registrations. Invoked while holding the
229      * multiplexer's internal lock.
230      */
231     @GuardedBy("mMultiplexerLock")
onActive()232     protected void onActive() {}
233 
234     /**
235      * Invoked when the multiplexer goes from having some active registrations to having no active
236      * registrations. This is a convenient entry point for unregistering listeners, etc, which only
237      * need to be present while there are active registrations. Invoked while holding the
238      * multiplexer's internal lock.
239      */
240     @GuardedBy("mMultiplexerLock")
onInactive()241     protected void onInactive() {}
242 
243     /**
244      * Puts a new registration with the given key, replacing any previous registration under the
245      * same key. This method cannot be called to put a registration re-entrantly.
246      */
putRegistration(@onNull TKey key, @NonNull TRegistration registration)247     protected final void putRegistration(@NonNull TKey key, @NonNull TRegistration registration) {
248         replaceRegistration(key, key, registration);
249     }
250 
251     /**
252      * Atomically removes the registration with the old key and adds a new registration with the
253      * given key. If there was a registration for the old key, {@link
254      * #onRegistrationReplaced(Object, ListenerRegistration, Object, ListenerRegistration)} will be
255      * invoked instead of {@link #onRegistrationAdded(Object, ListenerRegistration)}, even if they
256      * share the same key. The old key may be the same value as the new key, in which case this
257      * function is equivalent to {@link #putRegistration(Object, ListenerRegistration)}. This method
258      * cannot be called to add a registration re-entrantly.
259      */
replaceRegistration(@onNull TKey oldKey, @NonNull TKey key, @NonNull TRegistration registration)260     protected final void replaceRegistration(@NonNull TKey oldKey, @NonNull TKey key,
261             @NonNull TRegistration registration) {
262         Objects.requireNonNull(oldKey);
263         Objects.requireNonNull(key);
264         Objects.requireNonNull(registration);
265 
266         synchronized (mMultiplexerLock) {
267             // adding listeners reentrantly is not supported
268             Preconditions.checkState(!mReentrancyGuard.isReentrant());
269 
270             // new key may only have a prior registration if the oldKey is the same as the key
271             Preconditions.checkArgument(oldKey == key || !mRegistrations.containsKey(key));
272 
273             // since adding a registration can invoke a variety of callbacks, we need to ensure
274             // those callbacks themselves do not re-enter, as this could lead to out-of-order
275             // callbacks. further, we buffer service updates since adding a registration may
276             // involve removing a prior registration. note that try-with-resources ordering is
277             // meaningful here as well. we want to close the reentrancy guard first, as this may
278             // generate additional service updates, then close the update service buffer.
279             try (UpdateServiceBuffer ignored1 = mUpdateServiceBuffer.acquire();
280                  ReentrancyGuard ignored2 = mReentrancyGuard.acquire()) {
281 
282                 boolean wasEmpty = mRegistrations.isEmpty();
283 
284                 TRegistration oldRegistration = null;
285                 int oldIndex = mRegistrations.indexOfKey(oldKey);
286                 if (oldIndex >= 0) {
287                     // remove ourselves instead of using remove(), to balance registration callbacks
288                     oldRegistration = mRegistrations.valueAt(oldIndex);
289                     unregister(oldRegistration);
290                     oldRegistration.onUnregister();
291                     if (oldKey != key) {
292                         mRegistrations.removeAt(oldIndex);
293                     }
294                 }
295                 if (oldKey == key && oldIndex >= 0) {
296                     mRegistrations.setValueAt(oldIndex, registration);
297                 } else {
298                     mRegistrations.put(key, registration);
299                 }
300 
301                 if (wasEmpty) {
302                     onRegister();
303                 }
304                 registration.onRegister(key);
305                 if (oldRegistration == null) {
306                     onRegistrationAdded(key, registration);
307                 } else {
308                     onRegistrationReplaced(oldKey, oldRegistration, key, registration);
309                 }
310                 onRegistrationActiveChanged(registration);
311             }
312         }
313     }
314 
315     /**
316      * Removes all registrations with keys that satisfy the given predicate. This method cannot be
317      * called to remove a registration re-entrantly.
318      */
removeRegistrationIf(@onNull Predicate<TKey> predicate)319     protected final void removeRegistrationIf(@NonNull Predicate<TKey> predicate) {
320         synchronized (mMultiplexerLock) {
321             // this method does not support removing listeners reentrantly
322             Preconditions.checkState(!mReentrancyGuard.isReentrant());
323 
324             // since removing a registration can invoke a variety of callbacks, we need to ensure
325             // those callbacks themselves do not re-enter, as this could lead to out-of-order
326             // callbacks. further, we buffer service updates since chains of removeLater()
327             // invocations could result in multiple service updates. note that try-with-resources
328             // ordering is meaningful here as well. we want to close the reentrancy guard first, as
329             // this may generate additional service updates, then close the update service buffer.
330             try (UpdateServiceBuffer ignored1 = mUpdateServiceBuffer.acquire();
331                  ReentrancyGuard ignored2 = mReentrancyGuard.acquire()) {
332 
333                 final int size = mRegistrations.size();
334                 for (int i = 0; i < size; i++) {
335                     TKey key = mRegistrations.keyAt(i);
336                     if (predicate.test(key)) {
337                         removeRegistration(key, mRegistrations.valueAt(i));
338                     }
339                 }
340             }
341         }
342     }
343 
344     /**
345      * Removes the registration with the given key. This method cannot be called to remove a
346      * registration re-entrantly.
347      */
removeRegistration(TKey key)348     protected final void removeRegistration(TKey key) {
349         synchronized (mMultiplexerLock) {
350             // this method does not support removing listeners reentrantly
351             Preconditions.checkState(!mReentrancyGuard.isReentrant());
352 
353             int index = mRegistrations.indexOfKey(key);
354             if (index < 0) {
355                 return;
356             }
357 
358             removeRegistration(index);
359         }
360     }
361 
362     /**
363      * Removes the given registration with the given key. If the given key has a different
364      * registration at the time this method is called, nothing happens. This method allows for
365      * re-entrancy, and may be called to remove a registration re-entrantly.
366      */
removeRegistration(@onNull TKey key, @NonNull ListenerRegistration<?> registration)367     protected final void removeRegistration(@NonNull TKey key,
368             @NonNull ListenerRegistration<?> registration) {
369         synchronized (mMultiplexerLock) {
370             int index = mRegistrations.indexOfKey(key);
371             if (index < 0) {
372                 return;
373             }
374 
375             TRegistration typedRegistration = mRegistrations.valueAt(index);
376             if (typedRegistration != registration) {
377                 return;
378             }
379 
380             if (mReentrancyGuard.isReentrant()) {
381                 unregister(typedRegistration);
382                 mReentrancyGuard.markForRemoval(key, typedRegistration);
383             } else {
384                 removeRegistration(index);
385             }
386         }
387     }
388 
389     @GuardedBy("mMultiplexerLock")
removeRegistration(int index)390     private void removeRegistration(int index) {
391         TKey key = mRegistrations.keyAt(index);
392         TRegistration registration = mRegistrations.valueAt(index);
393 
394         // since removing a registration can invoke a variety of callbacks, we need to ensure those
395         // callbacks themselves do not re-enter, as this could lead to out-of-order callbacks.
396         // further, we buffer service updates since chains of removeLater() invocations could result
397         // in multiple service updates. note that try-with-resources ordering is meaningful here as
398         // well. we want to close the reentrancy guard first, as this may generate additional
399         // service updates, then close the update service buffer.
400         try (UpdateServiceBuffer ignored1 = mUpdateServiceBuffer.acquire();
401              ReentrancyGuard ignored2 = mReentrancyGuard.acquire()) {
402 
403             unregister(registration);
404             onRegistrationRemoved(key, registration);
405             registration.onUnregister();
406             mRegistrations.removeAt(index);
407             if (mRegistrations.isEmpty()) {
408                 onUnregister();
409             }
410         }
411     }
412 
413     /**
414      * Forces a re-evalution of the merged request for all active registrations and updates service
415      * registration accordingly.
416      */
updateService()417     protected final void updateService() {
418         synchronized (mMultiplexerLock) {
419             if (mUpdateServiceBuffer.isBuffered()) {
420                 mUpdateServiceBuffer.markUpdateServiceRequired();
421                 return;
422             }
423 
424             final int size = mRegistrations.size();
425             ArrayList<TRegistration> actives = new ArrayList<>(size);
426             for (int i = 0; i < size; i++) {
427                 TRegistration registration = mRegistrations.valueAt(i);
428                 if (registration.isActive()) {
429                     actives.add(registration);
430                 }
431             }
432 
433             if (actives.isEmpty()) {
434                 if (mServiceRegistered) {
435                     mMerged = null;
436                     mServiceRegistered = false;
437                     unregisterWithService();
438                 }
439             } else {
440                 TMergedRegistration merged = mergeRegistrations(actives);
441                 if (mServiceRegistered) {
442                     if (!Objects.equals(merged, mMerged)) {
443                         mServiceRegistered = reregisterWithService(mMerged, merged, actives);
444                         mMerged = mServiceRegistered ? merged : null;
445                     }
446                 } else {
447                     mServiceRegistered = registerWithService(merged, actives);
448                     mMerged = mServiceRegistered ? merged : null;
449                 }
450             }
451         }
452     }
453 
454     /**
455      * If the service is currently registered, unregisters it and then calls
456      * {@link #updateService()} so that {@link #registerWithService(Object, Collection)} will be
457      * re-invoked. This is useful, for instance, if the backing service has crashed or otherwise
458      * lost state, and needs to be re-initialized. Because this unregisters first, this is safe to
459      * use even if there is a possibility the backing server has not crashed, or has already been
460      * reinitialized.
461      */
resetService()462     protected final void resetService() {
463         synchronized (mMultiplexerLock) {
464             if (mServiceRegistered) {
465                 mMerged = null;
466                 mServiceRegistered = false;
467                 unregisterWithService();
468                 updateService();
469             }
470         }
471     }
472 
473     /**
474      * Begins buffering calls to {@link #updateService()} until {@link UpdateServiceLock#close()}
475      * is called. This is useful to prevent extra work when combining multiple calls (for example,
476      * buffering {@code updateService()} until after multiple adds/removes/updates occur.
477      */
newUpdateServiceLock()478     public UpdateServiceLock newUpdateServiceLock() {
479         return new UpdateServiceLock(mUpdateServiceBuffer);
480     }
481 
482     /**
483      * Evaluates the predicate on all registrations until the predicate returns true, at which point
484      * evaluation will cease. Returns true if the predicate ever returned true, and returns false
485      * otherwise.
486      */
findRegistration(Predicate<TRegistration> predicate)487     protected final boolean findRegistration(Predicate<TRegistration> predicate) {
488         synchronized (mMultiplexerLock) {
489             // we only acquire a reentrancy guard in case of removal while iterating. this method
490             // does not directly affect active state or merged state, so there is no advantage to
491             // acquiring an update source buffer.
492             try (ReentrancyGuard ignored = mReentrancyGuard.acquire()) {
493                 final int size = mRegistrations.size();
494                 for (int i = 0; i < size; i++) {
495                     TRegistration registration = mRegistrations.valueAt(i);
496                     if (predicate.test(registration)) {
497                         return true;
498                     }
499                 }
500             }
501 
502             return false;
503         }
504     }
505 
506     /**
507      * Evaluates the predicate on all registrations. The predicate should return true if the active
508      * state of the registration may have changed as a result. If the active state of any
509      * registration has changed, {@link #updateService()} will automatically be invoked to handle
510      * the resulting changes.
511      */
updateRegistrations(@onNull Predicate<TRegistration> predicate)512     protected final void updateRegistrations(@NonNull Predicate<TRegistration> predicate) {
513         synchronized (mMultiplexerLock) {
514             // since updating a registration can invoke a variety of callbacks, we need to ensure
515             // those callbacks themselves do not re-enter, as this could lead to out-of-order
516             // callbacks. note that try-with-resources ordering is meaningful here as well. we want
517             // to close the reentrancy guard first, as this may generate additional service updates,
518             // then close the update service buffer.
519             try (UpdateServiceBuffer ignored1 = mUpdateServiceBuffer.acquire();
520                  ReentrancyGuard ignored2 = mReentrancyGuard.acquire()) {
521 
522                 final int size = mRegistrations.size();
523                 for (int i = 0; i < size; i++) {
524                     TRegistration registration = mRegistrations.valueAt(i);
525                     if (predicate.test(registration)) {
526                         onRegistrationActiveChanged(registration);
527                     }
528                 }
529             }
530         }
531     }
532 
533     /**
534      * Evaluates the predicate on a registration with the given key. The predicate should return
535      * true if the active state of the registration may have changed as a result. If the active
536      * state of the registration has changed, {@link #updateService()} will automatically be invoked
537      * to handle the resulting changes. Returns true if there is a registration with the given key
538      * (and thus the predicate was invoked), and false otherwise.
539      */
updateRegistration(@onNull Object key, @NonNull Predicate<TRegistration> predicate)540     protected final boolean updateRegistration(@NonNull Object key,
541             @NonNull Predicate<TRegistration> predicate) {
542         synchronized (mMultiplexerLock) {
543             // since updating a registration can invoke a variety of callbacks, we need to ensure
544             // those callbacks themselves do not re-enter, as this could lead to out-of-order
545             // callbacks. note that try-with-resources ordering is meaningful here as well. we want
546             // to close the reentrancy guard first, as this may generate additional service updates,
547             // then close the update service buffer.
548             try (UpdateServiceBuffer ignored1 = mUpdateServiceBuffer.acquire();
549                  ReentrancyGuard ignored2 = mReentrancyGuard.acquire()) {
550 
551                 int index = mRegistrations.indexOfKey(key);
552                 if (index < 0) {
553                     return false;
554                 }
555 
556                 TRegistration registration = mRegistrations.valueAt(index);
557                 if (predicate.test(registration)) {
558                     onRegistrationActiveChanged(registration);
559                 }
560                 return true;
561             }
562         }
563     }
564 
565     @GuardedBy("mMultiplexerLock")
onRegistrationActiveChanged(TRegistration registration)566     private void onRegistrationActiveChanged(TRegistration registration) {
567         boolean active = registration.isRegistered() && isActive(registration);
568         boolean changed = registration.setActive(active);
569         if (changed) {
570             if (active) {
571                 if (++mActiveRegistrationsCount == 1) {
572                     onActive();
573                 }
574                 registration.onActive();
575             } else {
576                 registration.onInactive();
577                 if (--mActiveRegistrationsCount == 0) {
578                     onInactive();
579                 }
580             }
581 
582             updateService();
583         }
584     }
585 
586     /**
587      * Executes the given function for all active registrations. If the function returns a non-null
588      * operation, that operation will be invoked with the associated listener. The function may not
589      * change the active state of the registration.
590      */
deliverToListeners( @onNull Function<TRegistration, ListenerOperation<TListener>> function)591     protected final void deliverToListeners(
592             @NonNull Function<TRegistration, ListenerOperation<TListener>> function) {
593         synchronized (mMultiplexerLock) {
594             try (ReentrancyGuard ignored = mReentrancyGuard.acquire()) {
595                 final int size = mRegistrations.size();
596                 for (int i = 0; i < size; i++) {
597                     TRegistration registration = mRegistrations.valueAt(i);
598                     if (registration.isActive()) {
599                         ListenerOperation<TListener> operation = function.apply(registration);
600                         if (operation != null) {
601                             registration.executeOperation(operation);
602                         }
603                     }
604                 }
605             }
606         }
607     }
608 
609     /**
610      * Executes the given operation for all active listeners. This is a convenience function
611      * equivalent to:
612      * <pre>
613      * deliverToListeners(registration -> operation);
614      * </pre>
615      */
deliverToListeners(@onNull ListenerOperation<TListener> operation)616     protected final void deliverToListeners(@NonNull ListenerOperation<TListener> operation) {
617         synchronized (mMultiplexerLock) {
618             try (ReentrancyGuard ignored = mReentrancyGuard.acquire()) {
619                 final int size = mRegistrations.size();
620                 for (int i = 0; i < size; i++) {
621                     TRegistration registration = mRegistrations.valueAt(i);
622                     if (registration.isActive()) {
623                         registration.executeOperation(operation);
624                     }
625                 }
626             }
627         }
628     }
629 
630     @GuardedBy("mMultiplexerLock")
unregister(TRegistration registration)631     private void unregister(TRegistration registration) {
632         registration.unregisterInternal();
633         onRegistrationActiveChanged(registration);
634     }
635 
636     /**
637      * Dumps debug information.
638      */
dump(FileDescriptor fd, PrintWriter pw, String[] args)639     public void dump(FileDescriptor fd, PrintWriter pw, String[] args) {
640         synchronized (mMultiplexerLock) {
641             pw.print("service: ");
642             pw.print(getServiceState());
643             pw.println();
644 
645             if (!mRegistrations.isEmpty()) {
646                 pw.println("listeners:");
647 
648                 final int size = mRegistrations.size();
649                 for (int i = 0; i < size; i++) {
650                     TRegistration registration = mRegistrations.valueAt(i);
651                     pw.print("  ");
652                     pw.print(registration);
653                     if (!registration.isActive()) {
654                         pw.println(" (inactive)");
655                     } else {
656                         pw.println();
657                     }
658                 }
659             }
660         }
661     }
662 
663     /**
664      * May be overridden to provide additional details on service state when dumping the manager
665      * state. Invoked while holding the multiplexer's internal lock.
666      */
667     @GuardedBy("mMultiplexerLock")
getServiceState()668     protected String getServiceState() {
669         if (mServiceRegistered) {
670             if (mMerged != null) {
671                 return mMerged.toString();
672             } else {
673                 return "registered";
674             }
675         } else {
676             return "unregistered";
677         }
678     }
679 
680     /**
681      * A reference counted helper class that guards against re-entrancy, and also helps implement
682      * registration removal during reentrancy. When this class is {@link #acquire()}d, it increments
683      * the reference count. To check whether re-entrancy is occurring, clients may use
684      * {@link #isReentrant()}, and modify their behavior (such as by failing the call, or calling
685      * {@link #markForRemoval(Object, ListenerRegistration)}). When this class is {@link #close()}d,
686      * any key/registration pairs that were marked for removal prior to the close operation will
687      * then be removed - which is safe since the operation will no longer be re-entrant.
688      */
689     private final class ReentrancyGuard implements AutoCloseable {
690 
691         @GuardedBy("mMultiplexerLock")
692         private int mGuardCount;
693 
694         @GuardedBy("mMultiplexerLock")
695         @Nullable private ArraySet<Entry<TKey, ListenerRegistration<?>>> mScheduledRemovals;
696 
ReentrancyGuard()697         ReentrancyGuard() {
698             mGuardCount = 0;
699             mScheduledRemovals = null;
700         }
701 
isReentrant()702         boolean isReentrant() {
703             synchronized (mMultiplexerLock) {
704                 return mGuardCount != 0;
705             }
706         }
707 
markForRemoval(TKey key, ListenerRegistration<?> registration)708         void markForRemoval(TKey key, ListenerRegistration<?> registration) {
709             synchronized (mMultiplexerLock) {
710                 Preconditions.checkState(isReentrant());
711 
712                 if (mScheduledRemovals == null) {
713                     mScheduledRemovals = new ArraySet<>(mRegistrations.size());
714                 }
715                 mScheduledRemovals.add(new AbstractMap.SimpleImmutableEntry<>(key, registration));
716             }
717         }
718 
acquire()719         ReentrancyGuard acquire() {
720             synchronized (mMultiplexerLock) {
721                 ++mGuardCount;
722                 return this;
723             }
724         }
725 
726         @Override
close()727         public void close() {
728             synchronized (mMultiplexerLock) {
729                 Preconditions.checkState(mGuardCount > 0);
730 
731                 ArraySet<Entry<TKey, ListenerRegistration<?>>> scheduledRemovals = null;
732 
733                 if (--mGuardCount == 0) {
734                     scheduledRemovals = mScheduledRemovals;
735                     mScheduledRemovals = null;
736                 }
737 
738                 if (scheduledRemovals == null) {
739                     return;
740                 }
741 
742                 try (UpdateServiceBuffer ignored = mUpdateServiceBuffer.acquire()) {
743                     final int size = scheduledRemovals.size();
744                     for (int i = 0; i < size; i++) {
745                         Entry<TKey, ListenerRegistration<?>> entry = scheduledRemovals.valueAt(i);
746                         removeRegistration(entry.getKey(), entry.getValue());
747                     }
748                 }
749             }
750         }
751     }
752 
753     /**
754      * A reference counted helper class that buffers class to {@link #updateService()}. Since
755      * {@link #updateService()} iterates through every registration and performs request merging
756      * work, it can often be the most expensive part of any update to the multiplexer. This means
757      * that if multiple calls to updateService() can be buffered, work will be saved. This class
758      * allows clients to begin buffering calls after {@link #acquire()}ing this class, and when
759      * {@link #close()} is called, any buffered calls to {@link #updateService()} will be combined
760      * into a single final call. Clients should acquire this class when they are doing work that
761      * could potentially result in multiple calls to updateService(), and close when they are done
762      * with that work.
763      */
764     private final class UpdateServiceBuffer implements AutoCloseable {
765 
766         // requires internal locking because close() may be exposed externally and could be called
767         // from any thread
768 
769         @GuardedBy("this")
770         private int mBufferCount;
771 
772         @GuardedBy("this")
773         private boolean mUpdateServiceRequired;
774 
UpdateServiceBuffer()775         UpdateServiceBuffer() {
776             mBufferCount = 0;
777             mUpdateServiceRequired = false;
778         }
779 
isBuffered()780         synchronized boolean isBuffered() {
781             return mBufferCount != 0;
782         }
783 
markUpdateServiceRequired()784         synchronized void markUpdateServiceRequired() {
785             Preconditions.checkState(isBuffered());
786             mUpdateServiceRequired = true;
787         }
788 
acquire()789         synchronized UpdateServiceBuffer acquire() {
790             ++mBufferCount;
791             return this;
792         }
793 
794         @Override
close()795         public void close() {
796             boolean updateServiceRequired = false;
797             synchronized (this) {
798                 Preconditions.checkState(mBufferCount > 0);
799                 if (--mBufferCount == 0) {
800                     updateServiceRequired = mUpdateServiceRequired;
801                     mUpdateServiceRequired = false;
802                 }
803             }
804 
805             if (updateServiceRequired) {
806                 updateService();
807             }
808         }
809     }
810 
811     /**
812      * Acquiring this lock will buffer all calls to {@link #updateService()} until the lock is
813      * {@link #close()}ed. This can be used to save work by acquiring the lock before multiple calls
814      * to updateService() are expected, and closing the lock after.
815      */
816     public static final class UpdateServiceLock implements AutoCloseable {
817 
818         @Nullable private ListenerMultiplexer<?, ?, ?, ?>.UpdateServiceBuffer mUpdateServiceBuffer;
819 
UpdateServiceLock(ListenerMultiplexer<?, ?, ?, ?>.UpdateServiceBuffer updateServiceBuffer)820         UpdateServiceLock(ListenerMultiplexer<?, ?, ?, ?>.UpdateServiceBuffer updateServiceBuffer) {
821             mUpdateServiceBuffer = updateServiceBuffer.acquire();
822         }
823 
824         @Override
close()825         public void close() {
826             if (mUpdateServiceBuffer != null) {
827                 ListenerMultiplexer<?, ?, ?, ?>.UpdateServiceBuffer buffer = mUpdateServiceBuffer;
828                 mUpdateServiceBuffer = null;
829                 buffer.close();
830             }
831         }
832     }
833 }
834