1 /*
2  * Copyright 2016 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "model/setup/async_manager.h"  // for AsyncManager
17 
18 #include <errno.h>  // for errno
19 
20 #include <atomic>              // for atomic_bool, atomic_e...
21 #include <condition_variable>  // for condition_variable
22 #include <cstring>             // for strerror
23 #include <limits>              // for numeric_limits
24 #include <map>                 // for map<>::value_type, map
25 #include <mutex>               // for unique_lock, mutex
26 #include <ratio>               // for ratio
27 #include <set>                 // for set
28 #include <thread>              // for thread
29 #include <type_traits>         // for remove_extent_t
30 #include <utility>             // for pair, make_pair, oper...
31 #include <vector>              // for vector
32 
33 #include "aemu/base/EintrWrapper.h"  // for HANDLE_EINTR
34 #include "aemu/base/Log.h"           // for LogStreamVoidify, Log...
35 #include "aemu/base/logging/CLog.h"
36 #include "aemu/base/sockets/SocketUtils.h"   // for socketRecv, socketSet...
37 #include "aemu/base/sockets/SocketWaiter.h"  // for SocketWaiter, SocketW...
38 
39 namespace rootcanal {
40 // Implementation of AsyncManager is divided between two classes, three if
41 // AsyncManager itself is taken into account, but its only responsability
42 // besides being a proxy for the other two classes is to provide a global
43 // synchronization mechanism for callbacks and client code to use.
44 
45 // The watching of file descriptors is done through AsyncFdWatcher. Several
46 // objects of this class may coexist simultaneosly as they share no state.
47 // After construction of this objects nothing happens beyond some very simple
48 // member initialization. When the first FD is set up for watching the object
49 // starts a new thread which watches the given (and later provided) FDs using
50 // select() inside a loop. A special FD (a pipe) is also watched which is
51 // used to notify the thread of internal changes on the object state (like
52 // the addition of new FDs to watch on). Every access to internal state is
53 // synchronized using a single internal mutex. The thread is only stopped on
54 // destruction of the object, by modifying a flag, which is the only member
55 // variable accessed without acquiring the lock (because the notification to
56 // the thread is done later by writing to a pipe which means the thread will
57 // be notified regardless of what phase of the loop it is in that moment)
58 
59 // The scheduling of asynchronous tasks, periodic or not, is handled by the
60 // AsyncTaskManager class. Like the one for FDs, this class shares no internal
61 // state between different instances so it is safe to use several objects of
62 // this class, also nothing interesting happens upon construction, but only
63 // after a Task has been scheduled and access to internal state is synchronized
64 // using a single internal mutex. When the first task is scheduled a thread
65 // is started which monitors a queue of tasks. The queue is peeked to see
66 // when the next task should be carried out and then the thread performs a
67 // (absolute) timed wait on a condition variable. The wait ends because of a
68 // time out or a notify on the cond var, the former means a task is due
69 // for execution while the later means there has been a change in internal
70 // state, like a task has been scheduled/canceled or the flag to stop has
71 // been set. Setting and querying the stop flag or modifying the task queue
72 // and subsequent notification on the cond var is done atomically (e.g while
73 // holding the lock on the internal mutex) to ensure that the thread never
74 // misses the notification, since notifying a cond var is not persistent as
75 // writing on a pipe (if not done this way, the thread could query the
76 // stopping flag and be put aside by the OS scheduler right after, then the
77 // 'stop thread' procedure could run, setting the flag, notifying a cond
78 // var that no one is waiting on and joining the thread, the thread then
79 // resumes execution believing that it needs to continue and waits on the
80 // cond var possibly forever if there are no tasks scheduled, efectively
81 // causing a deadlock).
82 
83 // This number also states the maximum number of scheduled tasks we can handle
84 // at a given time
85 static const uint16_t kMaxTaskId =
86     -1; /* 2^16 - 1, permisible ids are {1..2^16-1}*/
NextAsyncTaskId(const AsyncTaskId id)87 static inline AsyncTaskId NextAsyncTaskId(const AsyncTaskId id) {
88   return (id == kMaxTaskId) ? 1 : id + 1;
89 }
90 // The buffer is only 10 bytes because the expected number of bytes
91 // written on this socket is 1. It is possible that the thread is notified
92 // more than once but highly unlikely, so a buffer of size 10 seems enough
93 // and the reads are performed inside a while just in case it isn't. From
94 // the thread routine's point of view it is the same to have been notified
95 // just once or 100 times so it just tries to consume the entire buffer.
96 // In the cases where an interrupt would cause read to return without
97 // having read everything that was available a new iteration of the thread
98 // loop will bring execution to this point almost immediately, so there is
99 // no need to treat that case.
100 static const int kNotificationBufferSize = 10;
101 
102 using android::base::SocketWaiter;
103 
104 // Async File Descriptor Watcher Implementation:
105 class AsyncManager::AsyncFdWatcher {
106  public:
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)107   int WatchFdForNonBlockingReads(
108       int file_descriptor, const ReadCallback &on_read_fd_ready_callback) {
109     // add file descriptor and callback
110     {
111       std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
112       watched_shared_fds_[file_descriptor] = on_read_fd_ready_callback;
113     }
114 
115     // start the thread if not started yet
116     int started = tryStartThread();
117     if (started != 0) {
118       derror("%s: Unable to start thread", __func__);
119       return started;
120     }
121 
122     // notify the thread so that it knows of the new FD
123     notifyThread();
124 
125     return 0;
126   }
127 
StopWatchingFileDescriptor(int file_descriptor)128   void StopWatchingFileDescriptor(int file_descriptor) {
129     std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
130     watched_shared_fds_.erase(file_descriptor);
131   }
132 
133   AsyncFdWatcher() = default;
134   AsyncFdWatcher(const AsyncFdWatcher &) = delete;
135   AsyncFdWatcher &operator=(const AsyncFdWatcher &) = delete;
136 
137   ~AsyncFdWatcher() = default;
138 
stopThread()139   int stopThread() {
140     if (!std::atomic_exchange(&running_, false)) {
141       return 0;  // if not running already
142     }
143 
144     notifyThread();
145 
146     if (std::this_thread::get_id() != thread_.get_id()) {
147       thread_.join();
148     } else {
149       dwarning("%s: Starting thread stop from inside the reading thread itself",
150                __func__);
151     }
152 
153     {
154       std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
155       watched_shared_fds_.clear();
156     }
157 
158     return 0;
159   }
160 
161  private:
162   // Make sure to call this with at least one file descriptor ready to be
163   // watched upon or the thread routine will return immediately
tryStartThread()164   int tryStartThread() {
165     if (std::atomic_exchange(&running_, true)) {
166       return 0;  // if already running
167     }
168     // set up the communication channel
169     if (android::base::socketCreatePair(&notification_listen_fd_,
170                                         &notification_write_fd_)) {
171       derror(
172           "%s:Unable to establish a communication channel to the reading "
173           "thread",
174           __func__);
175       return -1;
176     }
177     android::base::socketSetNonBlocking(notification_listen_fd_);
178     android::base::socketSetNonBlocking(notification_write_fd_);
179 
180     thread_ = std::thread([this]() { ThreadRoutine(); });
181     if (!thread_.joinable()) {
182       derror("%s: Unable to start reading thread", __func__);
183       return -1;
184     }
185     return 0;
186   }
187 
notifyThread()188   int notifyThread() {
189     char buffer = '0';
190     if (android::base::socketSend(notification_write_fd_, &buffer, 1) < 0) {
191       derror("%s: Unable to send message to reading thread", __func__);
192       return -1;
193     }
194     return 0;
195   }
196 
setUpFileDescriptorSet(SocketWaiter * read_fds)197   void setUpFileDescriptorSet(SocketWaiter *read_fds) {
198     // add comm channel to the set
199     read_fds->update(notification_listen_fd_, SocketWaiter::Event::kEventRead);
200 
201     // add watched FDs to the set
202     {
203       std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
204       for (auto &fdp : watched_shared_fds_) {
205         read_fds->update(fdp.first, SocketWaiter::Event::kEventRead);
206       }
207     }
208   }
209 
210   // check the comm channel and read everything there
consumeThreadNotifications(SocketWaiter * read_fds)211   bool consumeThreadNotifications(SocketWaiter *read_fds) {
212     if (read_fds->pendingEventsFor(notification_listen_fd_)) {
213       char buffer[kNotificationBufferSize];
214       while (HANDLE_EINTR(android::base::socketRecv(
215                  notification_listen_fd_, buffer, kNotificationBufferSize)) ==
216              kNotificationBufferSize) {
217       }
218       return true;
219     }
220     return false;
221   }
222 
223   // check all file descriptors and call callbacks if necesary
runAppropriateCallbacks(SocketWaiter * read_fds)224   void runAppropriateCallbacks(SocketWaiter *read_fds) {
225     // not a good idea to call a callback while holding the FD lock,
226     // nor to release the lock while traversing the map
227     std::vector<decltype(watched_shared_fds_)::value_type> fds;
228     std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
229     for (auto &fdc : watched_shared_fds_) {
230       auto pending = read_fds->pendingEventsFor(fdc.first);
231       if (pending == SocketWaiter::kEventRead) {
232         fds.push_back(fdc);
233       }
234     }
235 
236     for (auto &p : fds) {
237       p.second(p.first);
238     }
239   }
240 
ThreadRoutine()241   void ThreadRoutine() {
242     auto read_fds = std::unique_ptr<SocketWaiter>(SocketWaiter::create());
243     while (running_) {
244       read_fds->reset();
245       setUpFileDescriptorSet(read_fds.get());
246 
247       // wait until there is data available to read on some FD
248       int retval = read_fds->wait(std::numeric_limits<int64_t>::max());
249       if (retval <= 0) {  // there was some error or a timeout
250         derror(
251             "%s: There was an error while waiting for data on the file "
252             "descriptors: %s",
253             __func__, strerror(errno));
254         continue;
255       }
256 
257       consumeThreadNotifications(read_fds.get());
258 
259       // Do not read if there was a call to stop running
260       if (!running_) {
261         break;
262       }
263 
264       runAppropriateCallbacks(read_fds.get());
265     }
266   }
267 
268   std::atomic_bool running_{false};
269   std::thread thread_;
270   std::recursive_mutex internal_mutex_;
271 
272   // android::base::SocketWaiter socket_waiter_;
273   std::map<int, ReadCallback> watched_shared_fds_;
274 
275   // A pair of FD to send information to the reading thread
276   int notification_listen_fd_{};
277   int notification_write_fd_{};
278 };
279 
280 // Async task manager implementation
281 class AsyncManager::AsyncTaskManager {
282  public:
GetNextUserId()283   AsyncUserId GetNextUserId() { return lastUserId_++; }
284 
ExecAsync(AsyncUserId user_id,std::chrono::milliseconds delay,const TaskCallback & callback)285   AsyncTaskId ExecAsync(AsyncUserId user_id, std::chrono::milliseconds delay,
286                         const TaskCallback &callback) {
287     return scheduleTask(std::make_shared<Task>(
288         std::chrono::steady_clock::now() + delay, callback, user_id));
289   }
290 
ExecAsyncPeriodically(AsyncUserId user_id,std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)291   AsyncTaskId ExecAsyncPeriodically(AsyncUserId user_id,
292                                     std::chrono::milliseconds delay,
293                                     std::chrono::milliseconds period,
294                                     const TaskCallback &callback) {
295     return scheduleTask(std::make_shared<Task>(
296         std::chrono::steady_clock::now() + delay, period, callback, user_id));
297   }
298 
CancelAsyncTask(AsyncTaskId async_task_id)299   bool CancelAsyncTask(AsyncTaskId async_task_id) {
300     // remove task from queue (and task id association) while holding lock
301     std::unique_lock<std::mutex> guard(internal_mutex_);
302     return cancel_task_with_lock_held(async_task_id);
303   }
304 
CancelAsyncTasksFromUser(AsyncUserId user_id)305   bool CancelAsyncTasksFromUser(AsyncUserId user_id) {
306     // remove task from queue (and task id association) while holding lock
307     std::unique_lock<std::mutex> guard(internal_mutex_);
308     if (tasks_by_user_id_.count(user_id) == 0) {
309       return false;
310     }
311     for (auto task : tasks_by_user_id_[user_id]) {
312       cancel_task_with_lock_held(task);
313     }
314     tasks_by_user_id_.erase(user_id);
315     return true;
316   }
317 
Synchronize(const CriticalCallback & critical)318   void Synchronize(const CriticalCallback &critical) {
319     std::unique_lock<std::mutex> guard(synchronization_mutex_);
320     critical();
321   }
322 
323   AsyncTaskManager() = default;
324   AsyncTaskManager(const AsyncTaskManager &) = delete;
325   AsyncTaskManager &operator=(const AsyncTaskManager &) = delete;
326 
327   ~AsyncTaskManager() = default;
328 
stopThread()329   int stopThread() {
330     {
331       std::unique_lock<std::mutex> guard(internal_mutex_);
332       tasks_by_id_.clear();
333       task_queue_.clear();
334       if (!running_) {
335         return 0;
336       }
337       running_ = false;
338       // notify the thread
339       internal_cond_var_.notify_one();
340     }  // release the lock before joining a thread that is likely waiting for it
341     if (std::this_thread::get_id() != thread_.get_id()) {
342       thread_.join();
343     } else {
344       dwarning("%s: Starting thread stop from inside the task thread itself",
345                __func__);
346     }
347     return 0;
348   }
349 
350  private:
351   // Holds the data for each task
352   class Task {
353    public:
Task(std::chrono::steady_clock::time_point time,std::chrono::milliseconds period,const TaskCallback & callback,AsyncUserId user)354     Task(std::chrono::steady_clock::time_point time,
355          std::chrono::milliseconds period, const TaskCallback &callback,
356          AsyncUserId user)
357         : time(time),
358           periodic(true),
359           period(period),
360           callback(callback),
361           task_id(kInvalidTaskId),
362           user_id(user) {}
Task(std::chrono::steady_clock::time_point time,const TaskCallback & callback,AsyncUserId user)363     Task(std::chrono::steady_clock::time_point time,
364          const TaskCallback &callback, AsyncUserId user)
365         : time(time),
366           periodic(false),
367           callback(callback),
368           task_id(kInvalidTaskId),
369           user_id(user) {}
370 
371     // Operators needed to be in a collection
operator <(const Task & another) const372     bool operator<(const Task &another) const {
373       return std::make_pair(time, task_id) <
374              std::make_pair(another.time, another.task_id);
375     }
376 
isPeriodic() const377     bool isPeriodic() const { return periodic; }
378 
379     // These fields should no longer be public if the class ever becomes
380     // public or gets more complex
381     std::chrono::steady_clock::time_point time;
382     bool periodic;
383     std::chrono::milliseconds period{};
384     std::mutex in_callback;  // Taken when the callback is active
385     TaskCallback callback;
386     AsyncTaskId task_id;
387     AsyncUserId user_id;
388   };
389 
390   // A comparator class to put shared pointers to tasks in an ordered set
391   struct task_p_comparator {
operator ()rootcanal::AsyncManager::AsyncTaskManager::task_p_comparator392     bool operator()(const std::shared_ptr<Task> &t1,
393                     const std::shared_ptr<Task> &t2) const {
394       return *t1 < *t2;
395     }
396   };
397 
cancel_task_with_lock_held(AsyncTaskId async_task_id)398   bool cancel_task_with_lock_held(AsyncTaskId async_task_id) {
399     if (tasks_by_id_.count(async_task_id) == 0) {
400       return false;
401     }
402 
403     // Now make sure we are not running this task.
404     // 2 cases
405     // - This is called from thread_, this means a scheduled task is actually
406     //   unregistering.
407     // - Another thread is calling us, let's make sure the task is not active.
408     if (thread_.get_id() != std::this_thread::get_id()) {
409       auto task = tasks_by_id_[async_task_id];
410       const std::lock_guard<std::mutex> lock(task->in_callback);
411       task_queue_.erase(task);
412       tasks_by_id_.erase(async_task_id);
413     } else {
414       task_queue_.erase(tasks_by_id_[async_task_id]);
415       tasks_by_id_.erase(async_task_id);
416     }
417 
418     return true;
419   }
420 
scheduleTask(const std::shared_ptr<Task> & task)421   AsyncTaskId scheduleTask(const std::shared_ptr<Task> &task) {
422     {
423       std::unique_lock<std::mutex> guard(internal_mutex_);
424       // no more room for new tasks, we need a larger type for IDs
425       if (tasks_by_id_.size() == kMaxTaskId)  // TODO potentially type unsafe
426         return kInvalidTaskId;
427       do {
428         lastTaskId_ = NextAsyncTaskId(lastTaskId_);
429       } while (isTaskIdInUse(lastTaskId_));
430       task->task_id = lastTaskId_;
431       // add task to the queue and map
432       tasks_by_id_[lastTaskId_] = task;
433       tasks_by_user_id_[task->user_id].insert(task->task_id);
434       task_queue_.insert(task);
435     }
436     // start thread if necessary
437     int started = tryStartThread();
438     if (started != 0) {
439       derror("%s: Unable to start thread", __func__);
440       return kInvalidTaskId;
441     }
442     // notify the thread so that it knows of the new task
443     internal_cond_var_.notify_one();
444     // return task id
445     return task->task_id;
446   }
447 
isTaskIdInUse(const AsyncTaskId & task_id) const448   bool isTaskIdInUse(const AsyncTaskId &task_id) const {
449     return tasks_by_id_.count(task_id) != 0;
450   }
451 
tryStartThread()452   int tryStartThread() {
453     // need the lock because of the running flag and the cond var
454     std::unique_lock<std::mutex> guard(internal_mutex_);
455     // check that the thread is not yet running
456     if (running_) {
457       return 0;
458     }
459     // start the thread
460     running_ = true;
461     thread_ = std::thread([this]() { ThreadRoutine(); });
462     if (!thread_.joinable()) {
463       derror("%s: Unable to start task thread", __func__);
464       return -1;
465     }
466     return 0;
467   }
468 
ThreadRoutine()469   void ThreadRoutine() {
470     while (running_) {
471       TaskCallback callback;
472       std::shared_ptr<Task> task_p;
473       bool run_it = false;
474       {
475         std::unique_lock<std::mutex> guard(internal_mutex_);
476         if (!task_queue_.empty()) {
477           task_p = *(task_queue_.begin());
478           if (task_p->time < std::chrono::steady_clock::now()) {
479             run_it = true;
480             callback = task_p->callback;
481             task_queue_.erase(task_p);  // need to remove and add again if
482                                         // periodic to update order
483             if (task_p->isPeriodic()) {
484               task_p->time += task_p->period;
485               task_queue_.insert(task_p);
486             } else {
487               tasks_by_user_id_[task_p->user_id].erase(task_p->task_id);
488               tasks_by_id_.erase(task_p->task_id);
489             }
490           }
491         }
492       }
493       if (run_it) {
494         const std::lock_guard<std::mutex> lock(task_p->in_callback);
495         Synchronize(callback);
496       }
497       {
498         std::unique_lock<std::mutex> guard(internal_mutex_);
499         // check for termination right before waiting
500         if (!running_) break;
501         // wait until time for the next task (if any)
502         if (task_queue_.size() > 0) {
503           // Make a copy of the time_point because wait_until takes a reference
504           // to it and may read it after waiting, by which time the task may
505           // have been freed (e.g. via CancelAsyncTask).
506           std::chrono::steady_clock::time_point time =
507               (*task_queue_.begin())->time;
508           internal_cond_var_.wait_until(guard, time);
509         } else {
510           internal_cond_var_.wait(guard);
511         }
512       }
513     }
514   }
515 
516   bool running_ = false;
517   std::thread thread_;
518   std::mutex internal_mutex_;
519   std::mutex synchronization_mutex_;
520   std::condition_variable internal_cond_var_;
521 
522   AsyncTaskId lastTaskId_ = kInvalidTaskId;
523   AsyncUserId lastUserId_{1};
524   std::map<AsyncTaskId, std::shared_ptr<Task>> tasks_by_id_;
525   std::map<AsyncUserId, std::set<AsyncTaskId>> tasks_by_user_id_;
526   std::set<std::shared_ptr<Task>, task_p_comparator> task_queue_;
527 };
528 
529 // Async Manager Implementation:
AsyncManager()530 AsyncManager::AsyncManager()
531     : fdWatcher_p_(new AsyncFdWatcher()),
532       taskManager_p_(new AsyncTaskManager()) {}
533 
~AsyncManager()534 AsyncManager::~AsyncManager() {
535   // Make sure the threads are stopped before destroying the object.
536   // The threads need to be stopped here and not in each internal class'
537   // destructor because unique_ptr's reset() first assigns nullptr to the
538   // pointer and only then calls the destructor, so any callback running
539   // on these threads would dereference a null pointer if they called a member
540   // function of this class.
541   fdWatcher_p_->stopThread();
542   taskManager_p_->stopThread();
543 }
544 
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)545 int AsyncManager::WatchFdForNonBlockingReads(
546     int file_descriptor, const ReadCallback &on_read_fd_ready_callback) {
547   return fdWatcher_p_->WatchFdForNonBlockingReads(file_descriptor,
548                                                   on_read_fd_ready_callback);
549 }
550 
StopWatchingFileDescriptor(int file_descriptor)551 void AsyncManager::StopWatchingFileDescriptor(int file_descriptor) {
552   fdWatcher_p_->StopWatchingFileDescriptor(file_descriptor);
553 }
554 
GetNextUserId()555 AsyncUserId AsyncManager::GetNextUserId() {
556   return taskManager_p_->GetNextUserId();
557 }
558 
ExecAsync(AsyncUserId user_id,std::chrono::milliseconds delay,const TaskCallback & callback)559 AsyncTaskId AsyncManager::ExecAsync(AsyncUserId user_id,
560                                     std::chrono::milliseconds delay,
561                                     const TaskCallback &callback) {
562   return taskManager_p_->ExecAsync(user_id, delay, callback);
563 }
564 
ExecAsyncPeriodically(AsyncUserId user_id,std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)565 AsyncTaskId AsyncManager::ExecAsyncPeriodically(
566     AsyncUserId user_id, std::chrono::milliseconds delay,
567     std::chrono::milliseconds period, const TaskCallback &callback) {
568   return taskManager_p_->ExecAsyncPeriodically(user_id, delay, period,
569                                                callback);
570 }
571 
CancelAsyncTask(AsyncTaskId async_task_id)572 bool AsyncManager::CancelAsyncTask(AsyncTaskId async_task_id) {
573   return taskManager_p_->CancelAsyncTask(async_task_id);
574 }
575 
CancelAsyncTasksFromUser(rootcanal::AsyncUserId user_id)576 bool AsyncManager::CancelAsyncTasksFromUser(rootcanal::AsyncUserId user_id) {
577   return taskManager_p_->CancelAsyncTasksFromUser(user_id);
578 }
579 
Synchronize(const CriticalCallback & critical)580 void AsyncManager::Synchronize(const CriticalCallback &critical) {
581   taskManager_p_->Synchronize(critical);
582 }
583 }  // namespace rootcanal
584