1 // Copyright (C) 2023 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <ditto/multiprocessing.h>
16
17 #include <ditto/logger.h>
18
19 #include <sys/mman.h>
20 #include <sys/prctl.h>
21 #include <sys/types.h>
22 #include <unistd.h>
23
24 namespace dittosuite {
25
Multiprocessing(const Instruction::Params & params,std::vector<std::unique_ptr<Instruction>> instructions,std::vector<MultithreadingParams> thread_params)26 Multiprocessing::Multiprocessing(const Instruction::Params& params,
27 std::vector<std::unique_ptr<Instruction>> instructions,
28 std::vector<MultithreadingParams> thread_params)
29 : Instruction(kName, params),
30 instructions_(std::move(instructions)),
31 thread_params_(std::move(thread_params)) {}
32
SetUpSingle()33 void Multiprocessing::SetUpSingle() {
34 pthread_barrierattr_t barrier_attr;
35 pthread_barrierattr_init(&barrier_attr);
36 pthread_barrierattr_setpshared(&barrier_attr, PTHREAD_PROCESS_SHARED);
37 barrier_execution_ = static_cast<pthread_barrier_t*>(mmap(nullptr, sizeof(*barrier_execution_),
38 PROT_READ | PROT_WRITE,
39 MAP_SHARED | MAP_ANONYMOUS, -1, 0));
40 if (barrier_execution_ == MAP_FAILED) {
41 LOGF("mmap() failed");
42 }
43 pthread_barrier_init(barrier_execution_, &barrier_attr, instructions_.size());
44
45 barrier_execution_end_ = static_cast<pthread_barrier_t*>(
46 mmap(nullptr, sizeof(*barrier_execution_end_), PROT_READ | PROT_WRITE,
47 MAP_SHARED | MAP_ANONYMOUS, -1, 0));
48 if (barrier_execution_end_ == MAP_FAILED) {
49 LOGF("mmap() failed");
50 }
51 pthread_barrier_init(barrier_execution_end_, &barrier_attr, instructions_.size());
52
53 initialization_mutex_ = static_cast<pthread_mutex_t*>(
54 mmap(nullptr, sizeof(*initialization_mutex_), PROT_READ | PROT_WRITE,
55 MAP_SHARED | MAP_ANONYMOUS, -1, 0));
56 if (initialization_mutex_ == MAP_FAILED) {
57 LOGF("mmap() failed");
58 }
59 pthread_mutexattr_t mutex_attr;
60 pthread_mutexattr_init(&mutex_attr);
61 pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED);
62 pthread_mutex_init(initialization_mutex_, &mutex_attr);
63
64 pipe_fds_.resize(instructions_.size());
65 for (unsigned int i = 0; i < instructions_.size(); i++) {
66 if (pipe(pipe_fds_[i].data()) == -1) {
67 LOGF("Pipe Failed");
68 } else {
69 LOGD("Created pipe for: " + std::to_string(i));
70 }
71 }
72
73 instruction_id_ = 0;
74 for (unsigned int i = 0; i < instructions_.size(); i++) {
75 // LOGD("Not the last... Can fork");
76 pid_t child_pid_ = fork();
77 if (child_pid_) {
78 // I'm the manager, will continue spawning processes
79 is_manager_ = true;
80 continue;
81 }
82 is_manager_ = false;
83 instruction_id_ = i;
84 LOGD("Child process created: " + std::to_string(instruction_id_) +
85 " pid: " + std::to_string(getpid()));
86
87 break;
88 }
89
90 // Close the write pipes that do not belong to this process
91 for (unsigned int p = 0; p < instructions_.size(); p++) {
92 if (is_manager_ || p != instruction_id_) {
93 close(pipe_fds_[p][1]);
94 }
95 }
96
97 if (!is_manager_) {
98 pthread_mutex_lock(initialization_mutex_);
99 LOGD("Trying to set the name for instruction: " + std::to_string(instruction_id_) +
100 "; process: " + std::to_string(getpid()) +
101 "; new name: " + thread_params_[instruction_id_].name_);
102 setproctitle(argc_, argv_, (thread_params_[instruction_id_].name_.c_str()));
103
104 if (thread_params_[instruction_id_].sched_attr_.IsSet()) {
105 thread_params_[instruction_id_].sched_attr_.Set();
106 }
107 if (thread_params_[instruction_id_].sched_affinity_.IsSet()) {
108 thread_params_[instruction_id_].sched_affinity_.Set();
109 }
110
111 LOGD("Process initializing instruction: " + std::to_string(instruction_id_) +
112 " pid: " + std::to_string(getpid()));
113 instructions_[instruction_id_]->SetUp();
114 LOGD("Process initializing done, unlocking: " + std::to_string(instruction_id_) +
115 " pid: " + std::to_string(getpid()));
116 pthread_mutex_unlock(initialization_mutex_);
117 }
118
119 Instruction::SetUpSingle();
120 }
121
RunSingle()122 void Multiprocessing::RunSingle() {
123 if (!is_manager_) {
124 LOGD("Waiting for the barrier... " + std::to_string(getpid()));
125 pthread_barrier_wait(barrier_execution_);
126 LOGD("Barrier passed! Executing: " + std::to_string(getpid()));
127 instructions_[instruction_id_]->Run();
128 LOGD("Waiting for all the processes to finish... " + std::to_string(getpid()));
129 pthread_barrier_wait(barrier_execution_end_);
130 LOGD("All the processes finished... " + std::to_string(getpid()));
131 }
132 }
133
TearDownSingle(bool is_last)134 void Multiprocessing::TearDownSingle(bool is_last) {
135 Instruction::TearDownSingle(is_last);
136
137 if (!is_manager_) {
138 instructions_[instruction_id_]->TearDown();
139 }
140 }
141
CollectResults(const std::string & prefix)142 std::unique_ptr<Result> Multiprocessing::CollectResults(const std::string& prefix) {
143 // TODO this was copied from Multithreading and should be adapted with some
144 // shared memory solution.
145 auto result = std::make_unique<Result>(prefix + name_, repeat_);
146 dittosuiteproto::Result result_pb;
147
148 LOGD("Collecting results... " + std::to_string(getpid()));
149
150 result->AddMeasurement("duration", TimespecToDoubleNanos(time_sampler_.GetSamples()));
151
152 if (is_manager_) {
153 LOGD("Parent reading... " + std::to_string(getpid()));
154 for (unsigned int i = 0; i < instructions_.size(); i++) {
155 LOGD("Parent reading from pipe: " + std::to_string(i));
156 std::array<char, 100> buffer;
157 dittosuiteproto::Result sub_result_pb;
158 std::string serialized;
159
160 while (true) {
161 ssize_t chars_read = read(pipe_fds_[i][0], buffer.data(), buffer.size());
162 if (chars_read == -1) {
163 PLOGF("Error reading from pipe");
164 } else if (chars_read == 0) {
165 // Finished reading from pipe
166 LOGD("Finished reading, time to decode the PB");
167 if (!sub_result_pb.ParseFromString(serialized)) {
168 LOGF("Failed decoding PB from pipe");
169 }
170 //PrintPb(sub_result_pb);
171 result->AddSubResult(Result::FromPb(sub_result_pb));
172 break;
173 }
174 LOGD("Parent received: " + std::to_string(chars_read) + " bytes: \"" +
175 std::string(buffer.data()) + "\"");
176 serialized.append(buffer.data(), chars_read);
177 LOGD("PB so far: \"" + serialized + "\"");
178 }
179 }
180 } else {
181 LOGD("Child writing... " + std::to_string(getpid()));
182 std::string child_name = thread_params_[instruction_id_].name_;
183 result->AddSubResult(instructions_[instruction_id_]->CollectResults(child_name + "/"));
184
185 result_pb = result->ToPb();
186
187 std::string serialized;
188 // It is safe to pick result_pb.sub_result()[0] because it will be the "multiprocessing"
189 // instruction.
190 if (!result_pb.sub_result()[0].SerializeToString(&serialized)) {
191 LOGF("Failed decoding PB from pipe");
192 }
193 ssize_t chars_sent = 0;
194 while (chars_sent < static_cast<ssize_t>(serialized.size())) {
195 ssize_t chars_written = write(pipe_fds_[instruction_id_][1], &serialized.data()[chars_sent],
196 serialized.size() - chars_sent);
197 if (chars_written == -1) {
198 PLOGF("Error writing to pipe");
199 }
200 chars_sent += chars_written;
201 }
202 LOGD("Child closing pipe: " + std::to_string(getpid()));
203 close(pipe_fds_[instruction_id_][1]);
204 }
205
206 if (!is_manager_) {
207 // Stop every child
208 LOGD("Child stopping: " + std::to_string(getpid()));
209 exit(0);
210 } else {
211 pthread_mutex_destroy(initialization_mutex_);
212 pthread_barrier_destroy(barrier_execution_);
213 pthread_barrier_destroy(barrier_execution_end_);
214 LOGD("Parent finished: " + std::to_string(getpid()));
215 }
216 return result;
217 }
218
219 } // namespace dittosuite
220