1 // Copyright (C) 2023 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <ditto/multiprocessing.h>
16 
17 #include <ditto/logger.h>
18 
19 #include <sys/mman.h>
20 #include <sys/prctl.h>
21 #include <sys/types.h>
22 #include <unistd.h>
23 
24 namespace dittosuite {
25 
Multiprocessing(const Instruction::Params & params,std::vector<std::unique_ptr<Instruction>> instructions,std::vector<MultithreadingParams> thread_params)26 Multiprocessing::Multiprocessing(const Instruction::Params& params,
27                                  std::vector<std::unique_ptr<Instruction>> instructions,
28                                  std::vector<MultithreadingParams> thread_params)
29     : Instruction(kName, params),
30       instructions_(std::move(instructions)),
31       thread_params_(std::move(thread_params)) {}
32 
SetUpSingle()33 void Multiprocessing::SetUpSingle() {
34   pthread_barrierattr_t barrier_attr;
35   pthread_barrierattr_init(&barrier_attr);
36   pthread_barrierattr_setpshared(&barrier_attr, PTHREAD_PROCESS_SHARED);
37   barrier_execution_ = static_cast<pthread_barrier_t*>(mmap(nullptr, sizeof(*barrier_execution_),
38                                                             PROT_READ | PROT_WRITE,
39                                                             MAP_SHARED | MAP_ANONYMOUS, -1, 0));
40   if (barrier_execution_ == MAP_FAILED) {
41     LOGF("mmap() failed");
42   }
43   pthread_barrier_init(barrier_execution_, &barrier_attr, instructions_.size());
44 
45   barrier_execution_end_ = static_cast<pthread_barrier_t*>(
46       mmap(nullptr, sizeof(*barrier_execution_end_), PROT_READ | PROT_WRITE,
47            MAP_SHARED | MAP_ANONYMOUS, -1, 0));
48   if (barrier_execution_end_ == MAP_FAILED) {
49     LOGF("mmap() failed");
50   }
51   pthread_barrier_init(barrier_execution_end_, &barrier_attr, instructions_.size());
52 
53   initialization_mutex_ = static_cast<pthread_mutex_t*>(
54       mmap(nullptr, sizeof(*initialization_mutex_), PROT_READ | PROT_WRITE,
55            MAP_SHARED | MAP_ANONYMOUS, -1, 0));
56   if (initialization_mutex_ == MAP_FAILED) {
57     LOGF("mmap() failed");
58   }
59   pthread_mutexattr_t mutex_attr;
60   pthread_mutexattr_init(&mutex_attr);
61   pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED);
62   pthread_mutex_init(initialization_mutex_, &mutex_attr);
63 
64   pipe_fds_.resize(instructions_.size());
65   for (unsigned int i = 0; i < instructions_.size(); i++) {
66     if (pipe(pipe_fds_[i].data()) == -1) {
67       LOGF("Pipe Failed");
68     } else {
69       LOGD("Created pipe for: " + std::to_string(i));
70     }
71   }
72 
73   instruction_id_ = 0;
74   for (unsigned int i = 0; i < instructions_.size(); i++) {
75     // LOGD("Not the last... Can fork");
76     pid_t child_pid_ = fork();
77     if (child_pid_) {
78       // I'm the manager, will continue spawning processes
79       is_manager_ = true;
80       continue;
81     }
82     is_manager_ = false;
83     instruction_id_ = i;
84     LOGD("Child process created: " + std::to_string(instruction_id_) +
85          " pid: " + std::to_string(getpid()));
86 
87     break;
88   }
89 
90   // Close the write pipes that do not belong to this process
91   for (unsigned int p = 0; p < instructions_.size(); p++) {
92     if (is_manager_ || p != instruction_id_) {
93       close(pipe_fds_[p][1]);
94     }
95   }
96 
97   if (!is_manager_) {
98     pthread_mutex_lock(initialization_mutex_);
99     LOGD("Trying to set the name for instruction: " + std::to_string(instruction_id_) +
100          "; process: " + std::to_string(getpid()) +
101          "; new name: " + thread_params_[instruction_id_].name_);
102     setproctitle(argc_, argv_, (thread_params_[instruction_id_].name_.c_str()));
103 
104     if (thread_params_[instruction_id_].sched_attr_.IsSet()) {
105       thread_params_[instruction_id_].sched_attr_.Set();
106     }
107     if (thread_params_[instruction_id_].sched_affinity_.IsSet()) {
108       thread_params_[instruction_id_].sched_affinity_.Set();
109     }
110 
111     LOGD("Process initializing instruction: " + std::to_string(instruction_id_) +
112          " pid: " + std::to_string(getpid()));
113     instructions_[instruction_id_]->SetUp();
114     LOGD("Process initializing done, unlocking: " + std::to_string(instruction_id_) +
115          " pid: " + std::to_string(getpid()));
116     pthread_mutex_unlock(initialization_mutex_);
117   }
118 
119   Instruction::SetUpSingle();
120 }
121 
RunSingle()122 void Multiprocessing::RunSingle() {
123   if (!is_manager_) {
124     LOGD("Waiting for the barrier... " + std::to_string(getpid()));
125     pthread_barrier_wait(barrier_execution_);
126     LOGD("Barrier passed! Executing: " + std::to_string(getpid()));
127     instructions_[instruction_id_]->Run();
128     LOGD("Waiting for all the processes to finish... " + std::to_string(getpid()));
129     pthread_barrier_wait(barrier_execution_end_);
130     LOGD("All the processes finished... " + std::to_string(getpid()));
131   }
132 }
133 
TearDownSingle(bool is_last)134 void Multiprocessing::TearDownSingle(bool is_last) {
135   Instruction::TearDownSingle(is_last);
136 
137   if (!is_manager_) {
138     instructions_[instruction_id_]->TearDown();
139   }
140 }
141 
CollectResults(const std::string & prefix)142 std::unique_ptr<Result> Multiprocessing::CollectResults(const std::string& prefix) {
143   // TODO this was copied from Multithreading and should be adapted with some
144   // shared memory solution.
145   auto result = std::make_unique<Result>(prefix + name_, repeat_);
146   dittosuiteproto::Result result_pb;
147 
148   LOGD("Collecting results... " + std::to_string(getpid()));
149 
150   result->AddMeasurement("duration", TimespecToDoubleNanos(time_sampler_.GetSamples()));
151 
152   if (is_manager_) {
153     LOGD("Parent reading... " + std::to_string(getpid()));
154     for (unsigned int i = 0; i < instructions_.size(); i++) {
155       LOGD("Parent reading from pipe: " + std::to_string(i));
156       std::array<char, 100> buffer;
157       dittosuiteproto::Result sub_result_pb;
158       std::string serialized;
159 
160       while (true) {
161         ssize_t chars_read = read(pipe_fds_[i][0], buffer.data(), buffer.size());
162         if (chars_read == -1) {
163           PLOGF("Error reading from pipe");
164         } else if (chars_read == 0) {
165           // Finished reading from pipe
166           LOGD("Finished reading, time to decode the PB");
167           if (!sub_result_pb.ParseFromString(serialized)) {
168             LOGF("Failed decoding PB from pipe");
169           }
170           //PrintPb(sub_result_pb);
171           result->AddSubResult(Result::FromPb(sub_result_pb));
172           break;
173         }
174         LOGD("Parent received: " + std::to_string(chars_read) + " bytes: \"" +
175              std::string(buffer.data()) + "\"");
176         serialized.append(buffer.data(), chars_read);
177         LOGD("PB so far: \"" + serialized + "\"");
178       }
179     }
180   } else {
181     LOGD("Child writing... " + std::to_string(getpid()));
182     std::string child_name = thread_params_[instruction_id_].name_;
183     result->AddSubResult(instructions_[instruction_id_]->CollectResults(child_name + "/"));
184 
185     result_pb = result->ToPb();
186 
187     std::string serialized;
188     // It is safe to pick result_pb.sub_result()[0] because it will be the "multiprocessing"
189     // instruction.
190     if (!result_pb.sub_result()[0].SerializeToString(&serialized)) {
191       LOGF("Failed decoding PB from pipe");
192     }
193     ssize_t chars_sent = 0;
194     while (chars_sent < static_cast<ssize_t>(serialized.size())) {
195       ssize_t chars_written = write(pipe_fds_[instruction_id_][1], &serialized.data()[chars_sent],
196                                     serialized.size() - chars_sent);
197       if (chars_written == -1) {
198         PLOGF("Error writing to pipe");
199       }
200       chars_sent += chars_written;
201     }
202     LOGD("Child closing pipe: " + std::to_string(getpid()));
203     close(pipe_fds_[instruction_id_][1]);
204   }
205 
206   if (!is_manager_) {
207     // Stop every child
208     LOGD("Child stopping: " + std::to_string(getpid()));
209     exit(0);
210   } else {
211     pthread_mutex_destroy(initialization_mutex_);
212     pthread_barrier_destroy(barrier_execution_);
213     pthread_barrier_destroy(barrier_execution_end_);
214     LOGD("Parent finished: " + std::to_string(getpid()));
215   }
216   return result;
217 }
218 
219 }  // namespace dittosuite
220