1 // SPDX-License-Identifier: Apache-2.0
2 // ----------------------------------------------------------------------------
3 // Copyright 2011-2022 Arm Limited
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
6 // use this file except in compliance with the License. You may obtain a copy
7 // of the License at:
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 // License for the specific language governing permissions and limitations
15 // under the License.
16 // ----------------------------------------------------------------------------
17 
18 /**
19  * @brief Functions and data declarations for the outer context.
20  *
21  * The outer context includes thread-pool management, which is slower to
22  * compile due to increased use of C++ stdlib. The inner context used in the
23  * majority of the codec library does not include this.
24  */
25 
26 #ifndef ASTCENC_INTERNAL_ENTRY_INCLUDED
27 #define ASTCENC_INTERNAL_ENTRY_INCLUDED
28 
29 #include <atomic>
30 #include <condition_variable>
31 #include <functional>
32 #include <mutex>
33 
34 #include "astcenc_internal.h"
35 
36 /* ============================================================================
37   Parallel execution control
38 ============================================================================ */
39 
40 /**
41  * @brief A simple counter-based manager for parallel task execution.
42  *
43  * The task processing execution consists of:
44  *
45  *     * A single-threaded init stage.
46  *     * A multi-threaded processing stage.
47  *     * A condition variable so threads can wait for processing completion.
48  *
49  * The init stage will be executed by the first thread to arrive in the critical section, there is
50  * no main thread in the thread pool.
51  *
52  * The processing stage uses dynamic dispatch to assign task tickets to threads on an on-demand
53  * basis. Threads may each therefore executed different numbers of tasks, depending on their
54  * processing complexity. The task queue and the task tickets are just counters; the caller must map
55  * these integers to an actual processing partition in a specific problem domain.
56  *
57  * The exit wait condition is needed to ensure processing has finished before a worker thread can
58  * progress to the next stage of the pipeline. Specifically a worker may exit the processing stage
59  * because there are no new tasks to assign to it while other worker threads are still processing.
60  * Calling @c wait() will ensure that all other worker have finished before the thread can proceed.
61  *
62  * The basic usage model:
63  *
64  *     // --------- From single-threaded code ---------
65  *
66  *     // Reset the tracker state
67  *     manager->reset()
68  *
69  *     // --------- From multi-threaded code ---------
70  *
71  *     // Run the stage init; only first thread actually runs the lambda
72  *     manager->init(<lambda>)
73  *
74  *     do
75  *     {
76  *         // Request a task assignment
77  *         uint task_count;
78  *         uint base_index = manager->get_tasks(<granule>, task_count);
79  *
80  *         // Process any tasks we were given (task_count <= granule size)
81  *         if (task_count)
82  *         {
83  *             // Run the user task processing code for N tasks here
84  *             ...
85  *
86  *             // Flag these tasks as complete
87  *             manager->complete_tasks(task_count);
88  *         }
89  *     } while (task_count);
90  *
91  *     // Wait for all threads to complete tasks before progressing
92  *     manager->wait()
93  *
94   *     // Run the stage term; only first thread actually runs the lambda
95  *     manager->term(<lambda>)
96  */
97 class ParallelManager
98 {
99 private:
100 	/** @brief Lock used for critical section and condition synchronization. */
101 	std::mutex m_lock;
102 
103 	/** @brief True if the stage init() step has been executed. */
104 	bool m_init_done;
105 
106 	/** @brief True if the stage term() step has been executed. */
107 	bool m_term_done;
108 
109 	/** @brief Condition variable for tracking stage processing completion. */
110 	std::condition_variable m_complete;
111 
112 	/** @brief Number of tasks started, but not necessarily finished. */
113 	std::atomic<unsigned int> m_start_count;
114 
115 	/** @brief Number of tasks finished. */
116 	unsigned int m_done_count;
117 
118 	/** @brief Number of tasks that need to be processed. */
119 	unsigned int m_task_count;
120 
121 public:
122 	/** @brief Create a new ParallelManager. */
ParallelManager()123 	ParallelManager()
124 	{
125 		reset();
126 	}
127 
128 	/**
129 	 * @brief Reset the tracker for a new processing batch.
130 	 *
131 	 * This must be called from single-threaded code before starting the multi-threaded processing
132 	 * operations.
133 	 */
reset()134 	void reset()
135 	{
136 		m_init_done = false;
137 		m_term_done = false;
138 		m_start_count = 0;
139 		m_done_count = 0;
140 		m_task_count = 0;
141 	}
142 
143 	/**
144 	 * @brief Trigger the pipeline stage init step.
145 	 *
146 	 * This can be called from multi-threaded code. The first thread to hit this will process the
147 	 * initialization. Other threads will block and wait for it to complete.
148 	 *
149 	 * @param init_func   Callable which executes the stage initialization. It must return the
150 	 *                    total number of tasks in the stage.
151 	 */
init(std::function<unsigned int (void)> init_func)152 	void init(std::function<unsigned int(void)> init_func)
153 	{
154 		std::lock_guard<std::mutex> lck(m_lock);
155 		if (!m_init_done)
156 		{
157 			m_task_count = init_func();
158 			m_init_done = true;
159 		}
160 	}
161 
162 	/**
163 	 * @brief Trigger the pipeline stage init step.
164 	 *
165 	 * This can be called from multi-threaded code. The first thread to hit this will process the
166 	 * initialization. Other threads will block and wait for it to complete.
167 	 *
168 	 * @param task_count   Total number of tasks needing processing.
169 	 */
init(unsigned int task_count)170 	void init(unsigned int task_count)
171 	{
172 		std::lock_guard<std::mutex> lck(m_lock);
173 		if (!m_init_done)
174 		{
175 			m_task_count = task_count;
176 			m_init_done = true;
177 		}
178 	}
179 
180 	/**
181 	 * @brief Request a task assignment.
182 	 *
183 	 * Assign up to @c granule tasks to the caller for processing.
184 	 *
185 	 * @param      granule   Maximum number of tasks that can be assigned.
186 	 * @param[out] count     Actual number of tasks assigned, or zero if no tasks were assigned.
187 	 *
188 	 * @return Task index of the first assigned task; assigned tasks increment from this.
189 	 */
get_task_assignment(unsigned int granule,unsigned int & count)190 	unsigned int get_task_assignment(unsigned int granule, unsigned int& count)
191 	{
192 		unsigned int base = m_start_count.fetch_add(granule, std::memory_order_relaxed);
193 		if (base >= m_task_count)
194 		{
195 			count = 0;
196 			return 0;
197 		}
198 
199 		count = astc::min(m_task_count - base, granule);
200 		return base;
201 	}
202 
203 	/**
204 	 * @brief Complete a task assignment.
205 	 *
206 	 * Mark @c count tasks as complete. This will notify all threads blocked on @c wait() if this
207 	 * completes the processing of the stage.
208 	 *
209 	 * @param count   The number of completed tasks.
210 	 */
complete_task_assignment(unsigned int count)211 	void complete_task_assignment(unsigned int count)
212 	{
213 		// Note: m_done_count cannot use an atomic without the mutex; this has a race between the
214 		// update here and the wait() for other threads
215 		std::unique_lock<std::mutex> lck(m_lock);
216 		this->m_done_count += count;
217 		if (m_done_count == m_task_count)
218 		{
219 			lck.unlock();
220 			m_complete.notify_all();
221 		}
222 	}
223 
224 	/**
225 	 * @brief Wait for stage processing to complete.
226 	 */
wait()227 	void wait()
228 	{
229 		std::unique_lock<std::mutex> lck(m_lock);
230 		m_complete.wait(lck, [this]{ return m_done_count == m_task_count; });
231 	}
232 
233 	/**
234 	 * @brief Trigger the pipeline stage term step.
235 	 *
236 	 * This can be called from multi-threaded code. The first thread to hit this will process the
237 	 * work pool termination. Caller must have called @c wait() prior to calling this function to
238 	 * ensure that processing is complete.
239 	 *
240 	 * @param term_func   Callable which executes the stage termination.
241 	 */
term(std::function<void (void)> term_func)242 	void term(std::function<void(void)> term_func)
243 	{
244 		std::lock_guard<std::mutex> lck(m_lock);
245 		if (!m_term_done)
246 		{
247 			term_func();
248 			m_term_done = true;
249 		}
250 	}
251 };
252 
253 /**
254  * @brief The astcenc compression context.
255  */
256 struct astcenc_context
257 {
258 	/** @brief The context internal state. */
259 	astcenc_contexti context;
260 
261 #if !defined(ASTCENC_DECOMPRESS_ONLY)
262 	/** @brief The parallel manager for averages computation. */
263 	ParallelManager manage_avg;
264 
265 	/** @brief The parallel manager for compression. */
266 	ParallelManager manage_compress;
267 #endif
268 
269 	/** @brief The parallel manager for decompression. */
270 	ParallelManager manage_decompress;
271 };
272 
273 #endif
274