1 /*
2 * Copyright 2021 Google LLC
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "compactor_stack.h"
17
18 #include <cmath>
19 #include <cstdint>
20 #include <type_traits>
21 #include <vector>
22
23 #include "gmock/gmock.h"
24 #include "random_generator.h"
25
26 namespace dist_proc {
27 namespace aggregation {
28 namespace internal {
29
30 namespace {
31
32 using ::testing::AnyOf;
33 using ::testing::Contains;
34
35 struct CompactorsTestParams {
36 int num_items;
37 int capacity;
38 };
39
GenCompactorsTestParams()40 std::vector<CompactorsTestParams> GenCompactorsTestParams() {
41 std::vector<CompactorsTestParams> testcases;
42 for (int num_items = 1; num_items < 1000; num_items += std::ceil(std::log(1 + num_items))) {
43 for (int capacity = 2; capacity < 1030; capacity *= 2) {
44 testcases.push_back({num_items, capacity});
45 }
46 }
47 return testcases;
48 }
49
50 class AddsToCompactorsTest : public ::testing::TestWithParam<CompactorsTestParams> {
51 protected:
AddsToCompactorsTest()52 AddsToCompactorsTest() : random_() {
53 }
54 MTRandomGenerator random_;
55 };
56
TEST_P(AddsToCompactorsTest,AddsToCompactorsTestWeightOne)57 TEST_P(AddsToCompactorsTest, AddsToCompactorsTestWeightOne) {
58 // This test should not depend on the seed, since we only test
59 // num_stored_items, capacity and sampler weight.
60 std::random_device rd;
61 std::mt19937 gen(rd());
62 std::uniform_int_distribution<uint64_t> dis(0, std::numeric_limits<uint64_t>::max());
63 const CompactorsTestParams params = GetParam();
64 CompactorStack compactor_stack(1000, 100000, &random_);
65 KllSampler sampler(&compactor_stack);
66 while (sampler.capacity() < params.capacity) {
67 sampler.DoubleCapacity();
68 }
69 for (int i = 0; i < params.num_items; i++) {
70 sampler.Add(dis(gen));
71 }
72 EXPECT_EQ(compactor_stack.num_stored_items(), params.num_items / sampler.capacity());
73 if (sampler.sampled_item_and_weight().has_value()) {
74 EXPECT_EQ(sampler.sampled_item_and_weight().value().second,
75 params.num_items % sampler.capacity());
76 } else {
77 EXPECT_EQ(0, params.num_items % sampler.capacity());
78 }
79 }
80
TEST_P(AddsToCompactorsTest,AddWithWeightToCompactorsTest)81 TEST_P(AddsToCompactorsTest, AddWithWeightToCompactorsTest) {
82 // This test should not depend on the seed, since we only test
83 // num_stored_items, capacity and sampler weight.
84 std::random_device rd;
85 std::mt19937 gen(rd());
86 std::uniform_int_distribution<uint64_t> dis(0, std::numeric_limits<uint64_t>::max());
87 const CompactorsTestParams params = GetParam();
88 CompactorStack compactor_stack(1000, 100000, &random_);
89 KllSampler sampler(&compactor_stack);
90 while (sampler.capacity() < params.capacity) {
91 sampler.DoubleCapacity();
92 }
93 int remaining_num_items = params.num_items;
94 while (remaining_num_items > 0) {
95 // Add +1 to weight to avoid adding values with 0 weight.
96 int weight = random_.UnbiasedUniform(remaining_num_items) + 1;
97 sampler.AddWithWeight(dis(gen), weight);
98 remaining_num_items -= weight;
99 }
100 EXPECT_EQ(compactor_stack.num_stored_items(), params.num_items / sampler.capacity());
101 if (sampler.sampled_item_and_weight().has_value()) {
102 EXPECT_EQ(sampler.sampled_item_and_weight().value().second,
103 params.num_items % sampler.capacity());
104 } else {
105 EXPECT_EQ(0, params.num_items % sampler.capacity());
106 }
107 }
108
109 INSTANTIATE_TEST_SUITE_P(AddsToCompactorsTestCases, AddsToCompactorsTest,
110 ::testing::ValuesIn(GenCompactorsTestParams()));
111 class KllQuantileUseSamplerTest : public ::testing::Test {
112 protected:
KllQuantileUseSamplerTest()113 KllQuantileUseSamplerTest() {
114 }
~KllQuantileUseSamplerTest()115 ~KllQuantileUseSamplerTest() override {
116 }
117 MTRandomGenerator random_;
118 };
119
TEST_F(KllQuantileUseSamplerTest,ZeroCapacityAfterReplacedWithSampler)120 TEST_F(KllQuantileUseSamplerTest, ZeroCapacityAfterReplacedWithSampler) {
121 CompactorStack compactor_stack(10, 10, &random_);
122 for (int i = 0; i < 200000; i++) {
123 compactor_stack.Add(random_.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
124 }
125 EXPECT_TRUE(compactor_stack.IsSamplerOn());
126 const auto& compactors = compactor_stack.compactors();
127 for (int i = 0; i < compactor_stack.lowest_active_level(); i++) {
128 EXPECT_EQ(compactors[i].capacity(), 0u);
129 }
130 }
131
TEST_F(KllQuantileUseSamplerTest,NumStoredItemsWithSampledItem)132 TEST_F(KllQuantileUseSamplerTest, NumStoredItemsWithSampledItem) {
133 CompactorStack compactor_stack(10, 10, &random_);
134 for (int i = 0; i < 2000; i++) {
135 compactor_stack.Add(random_.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
136 }
137 EXPECT_TRUE(compactor_stack.IsSamplerOn());
138
139 int num_items_in_compactors = 0;
140 for (const auto& compactor : compactor_stack.compactors()) {
141 if (compactor.capacity() > 0) {
142 num_items_in_compactors += compactor.size();
143 }
144 }
145 if (compactor_stack.sampled_item_and_weight().has_value()) {
146 EXPECT_EQ(compactor_stack.num_stored_items(), num_items_in_compactors + 1);
147 } else {
148 EXPECT_EQ(compactor_stack.num_stored_items(), num_items_in_compactors);
149 compactor_stack.Add(random_.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
150 EXPECT_EQ(compactor_stack.num_stored_items(), num_items_in_compactors + 1);
151 }
152 }
153
TEST_F(KllQuantileUseSamplerTest,ResetWithSampler)154 TEST_F(KllQuantileUseSamplerTest, ResetWithSampler) {
155 // Set a fixed seed for this test, as it is not given that there are 40 items
156 // in the compactor stack after 2000 insertions.
157 MTRandomGenerator random(10);
158 CompactorStack compactor_stack(10, 10, &random);
159 for (int i = 0; i < 2000; i++) {
160 compactor_stack.Add(random_.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
161 }
162 EXPECT_TRUE(compactor_stack.IsSamplerOn());
163 EXPECT_GE(compactor_stack.num_stored_items(), 40);
164 EXPECT_LE(compactor_stack.num_stored_items(), 2000);
165 EXPECT_GE(compactor_stack.compactors().size(), 2u);
166
167 compactor_stack.Reset();
168 EXPECT_FALSE(compactor_stack.IsSamplerOn());
169 EXPECT_FALSE(compactor_stack.sampled_item_and_weight().has_value());
170 EXPECT_EQ(compactor_stack.num_stored_items(), 0);
171 EXPECT_EQ(compactor_stack.compactors().size(), 1u);
172
173 for (int i = 0; i < 2000; i++) {
174 compactor_stack.Add(random_.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
175 }
176 EXPECT_TRUE(compactor_stack.IsSamplerOn());
177 EXPECT_GE(compactor_stack.num_stored_items(), 40);
178 EXPECT_LE(compactor_stack.num_stored_items(), 2000);
179 EXPECT_GE(compactor_stack.compactors().size(), 2u);
180 }
181
182 struct AddWithSamplerParam {
183 int64_t inv_eps;
184 int64_t inv_delta;
185 int64_t num_items;
186 };
187
188 class AddWithSamplerTest : public ::testing::TestWithParam<AddWithSamplerParam> {};
189
TEST_P(AddWithSamplerTest,AddWithSampler)190 TEST_P(AddWithSamplerTest, AddWithSampler) {
191 const AddWithSamplerParam params = GetParam();
192 std::random_device rd;
193 MTRandomGenerator random(rd());
194 CompactorStack compactor_stack(params.inv_eps, params.inv_delta, &random);
195
196 int upbound = (random.UnbiasedUniform(3) + 1) * params.num_items + 1;
197 for (int i = 0; i < upbound; i++) {
198 compactor_stack.Add(random.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
199 }
200 EXPECT_TRUE(compactor_stack.IsSamplerOn());
201
202 int lowest_active_level = compactor_stack.lowest_active_level();
203 int size_before_add = compactor_stack.compactors()[lowest_active_level].size();
204 uint64_t previous_sampled_item = -1;
205 if (compactor_stack.sampled_item_and_weight().has_value()) {
206 previous_sampled_item = compactor_stack.sampled_item_and_weight()->first;
207 }
208 // Add additional value sufficiently many times to complete full sampler
209 // cycle.
210 for (int i = 0; i < compactor_stack.sampler_capacity(); i++) {
211 compactor_stack.Add(10);
212 }
213
214 int size_after_add = compactor_stack.compactors()[lowest_active_level].size();
215 if (size_after_add > 0) {
216 EXPECT_EQ(size_after_add, size_before_add + 1);
217 EXPECT_THAT(compactor_stack.compactors()[lowest_active_level],
218 AnyOf(Contains(10), Contains(previous_sampled_item)));
219 }
220 }
221
TEST_P(AddWithSamplerTest,AddWithWeightWithSampler)222 TEST_P(AddWithSamplerTest, AddWithWeightWithSampler) {
223 // Set a fixed seed, since the tests depends on which level is propagated
224 // in the compactor stack.
225 uint64_t seed = 3;
226 const AddWithSamplerParam params = GetParam();
227 MTRandomGenerator random = MTRandomGenerator(seed);
228 CompactorStack compactor_stack(params.inv_eps, params.inv_delta, &random);
229
230 for (int i = 0; i < params.num_items; i++) {
231 compactor_stack.Add(random.UnbiasedUniform(std::numeric_limits<uint32_t>::max()));
232 }
233 ASSERT_TRUE(compactor_stack.IsSamplerOn());
234 if (!compactor_stack.sampled_item_and_weight().has_value()) {
235 compactor_stack.Add(random.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
236 }
237 int lowest_active_level = compactor_stack.lowest_active_level();
238 int size_before_add_lowest_level = compactor_stack.compactors()[lowest_active_level].size();
239 int size_before_add_level_plus2 = compactor_stack.compactors()[lowest_active_level + 2].size();
240 int size_before_add_level_plus3 = compactor_stack.compactors()[lowest_active_level + 3].size();
241 uint64_t previous_sampled_item = -1;
242 if (compactor_stack.sampled_item_and_weight().has_value()) {
243 previous_sampled_item = compactor_stack.sampled_item_and_weight()->first;
244 }
245
246 // Expected additions: one item from the sampler to the lowest active level;
247 // one item to the lowest active level + 2.
248 int weight = 5 * (1 << lowest_active_level) - 1;
249 compactor_stack.AddWithWeight(10.0, weight);
250
251 int size_after_add_lowest_level = compactor_stack.compactors()[lowest_active_level].size();
252 // > 0 to make sure we are not checking compact case
253 if (size_after_add_lowest_level > 0) {
254 EXPECT_EQ(size_after_add_lowest_level, size_before_add_lowest_level + 1);
255 EXPECT_THAT(compactor_stack.compactors()[lowest_active_level],
256 AnyOf(Contains(10.0), Contains(previous_sampled_item)));
257 }
258
259 // Level+2 is expected to change.
260 // Exact behavior also subject to seed - a lower level might be compacted
261 // into this one, or this one might be compacted up.
262 // Test only checks the uncompacted case. (Different from Java version)
263 int size_after_add_level_plus2 = compactor_stack.compactors()[lowest_active_level + 2].size();
264 if (size_after_add_level_plus2 >
265 // make sure we are not checking comact case
266 size_before_add_level_plus2) {
267 EXPECT_EQ(size_after_add_level_plus2, size_before_add_level_plus2 + 1);
268 EXPECT_THAT(compactor_stack.compactors()[lowest_active_level + 2], Contains(10.0));
269 int size_after_add_level_plus3 =
270 compactor_stack.compactors()[lowest_active_level + 3].size();
271 EXPECT_EQ(size_after_add_level_plus3, size_before_add_level_plus3);
272 }
273 }
274
TEST_P(AddWithSamplerTest,CompactorsStillBigXorSamplerIsOn)275 TEST_P(AddWithSamplerTest, CompactorsStillBigXorSamplerIsOn) {
276 std::random_device rd;
277 uint64_t seed = rd();
278 const AddWithSamplerParam params = GetParam();
279 MTRandomGenerator random = MTRandomGenerator(seed);
280 CompactorStack compactor_stack(params.inv_eps, params.inv_delta, &random);
281 for (int i = 0; i < params.num_items; i++) {
282 compactor_stack.Add(random.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
283 int lowest_compactor_capacity = compactor_stack.TargetCapacityAtLevel(0);
284 EXPECT_TRUE((lowest_compactor_capacity > 2) ^ (compactor_stack.IsSamplerOn()));
285 }
286 }
287
TEST_P(AddWithSamplerTest,SampledItemIsPresent)288 TEST_P(AddWithSamplerTest, SampledItemIsPresent) {
289 std::random_device rd;
290 uint64_t seed = rd();
291 const AddWithSamplerParam params = GetParam();
292 MTRandomGenerator random = MTRandomGenerator(seed);
293 CompactorStack compactor_stack(params.inv_eps, params.inv_delta, &random);
294
295 for (int i = 0; i < params.num_items; i++) {
296 compactor_stack.Add(random.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
297 }
298 int sampled_item_weight = (compactor_stack.sampled_item_and_weight().has_value())
299 ? compactor_stack.sampled_item_and_weight().value().second
300 : 0;
301 compactor_stack.Add(random.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
302 int sampled_item_weight_after_add =
303 (compactor_stack.sampled_item_and_weight().has_value())
304 ? compactor_stack.sampled_item_and_weight().value().second
305 : 0;
306 sampled_item_weight = std::max(sampled_item_weight, sampled_item_weight_after_add);
307 EXPECT_GT(sampled_item_weight, 0);
308 EXPECT_LE(sampled_item_weight, (1 << compactor_stack.lowest_active_level()));
309 }
310
311 INSTANTIATE_TEST_SUITE_P(AddWithSamplerTestCases, AddWithSamplerTest,
312 ::testing::ValuesIn(std::vector<AddWithSamplerParam>{
313 {10, 10, 2400},
314 {10, 100, 4600},
315 {10, 1000, 5000},
316 {50, 10000, 5000000},
317 {100, 100, 1250000},
318 {100, 1000, 2000000}}));
319
320 } // namespace
321
322 } // namespace internal
323 } // namespace aggregation
324 } // namespace dist_proc
325