1 /*
2  * Copyright 2021 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     https://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "compactor_stack.h"
17 
18 #include <cmath>
19 #include <cstdint>
20 #include <type_traits>
21 #include <vector>
22 
23 #include "gmock/gmock.h"
24 #include "random_generator.h"
25 
26 namespace dist_proc {
27 namespace aggregation {
28 namespace internal {
29 
30 namespace {
31 
32 using ::testing::AnyOf;
33 using ::testing::Contains;
34 
35 struct CompactorsTestParams {
36     int num_items;
37     int capacity;
38 };
39 
GenCompactorsTestParams()40 std::vector<CompactorsTestParams> GenCompactorsTestParams() {
41     std::vector<CompactorsTestParams> testcases;
42     for (int num_items = 1; num_items < 1000; num_items += std::ceil(std::log(1 + num_items))) {
43         for (int capacity = 2; capacity < 1030; capacity *= 2) {
44             testcases.push_back({num_items, capacity});
45         }
46     }
47     return testcases;
48 }
49 
50 class AddsToCompactorsTest : public ::testing::TestWithParam<CompactorsTestParams> {
51 protected:
AddsToCompactorsTest()52     AddsToCompactorsTest() : random_() {
53     }
54     MTRandomGenerator random_;
55 };
56 
TEST_P(AddsToCompactorsTest,AddsToCompactorsTestWeightOne)57 TEST_P(AddsToCompactorsTest, AddsToCompactorsTestWeightOne) {
58     // This test should not depend on the seed, since we only test
59     // num_stored_items, capacity and sampler weight.
60     std::random_device rd;
61     std::mt19937 gen(rd());
62     std::uniform_int_distribution<uint64_t> dis(0, std::numeric_limits<uint64_t>::max());
63     const CompactorsTestParams params = GetParam();
64     CompactorStack compactor_stack(1000, 100000, &random_);
65     KllSampler sampler(&compactor_stack);
66     while (sampler.capacity() < params.capacity) {
67         sampler.DoubleCapacity();
68     }
69     for (int i = 0; i < params.num_items; i++) {
70         sampler.Add(dis(gen));
71     }
72     EXPECT_EQ(compactor_stack.num_stored_items(), params.num_items / sampler.capacity());
73     if (sampler.sampled_item_and_weight().has_value()) {
74         EXPECT_EQ(sampler.sampled_item_and_weight().value().second,
75                   params.num_items % sampler.capacity());
76     } else {
77         EXPECT_EQ(0, params.num_items % sampler.capacity());
78     }
79 }
80 
TEST_P(AddsToCompactorsTest,AddWithWeightToCompactorsTest)81 TEST_P(AddsToCompactorsTest, AddWithWeightToCompactorsTest) {
82     // This test should not depend on the seed, since we only test
83     // num_stored_items, capacity and sampler weight.
84     std::random_device rd;
85     std::mt19937 gen(rd());
86     std::uniform_int_distribution<uint64_t> dis(0, std::numeric_limits<uint64_t>::max());
87     const CompactorsTestParams params = GetParam();
88     CompactorStack compactor_stack(1000, 100000, &random_);
89     KllSampler sampler(&compactor_stack);
90     while (sampler.capacity() < params.capacity) {
91         sampler.DoubleCapacity();
92     }
93     int remaining_num_items = params.num_items;
94     while (remaining_num_items > 0) {
95         // Add +1 to weight to avoid adding values with 0 weight.
96         int weight = random_.UnbiasedUniform(remaining_num_items) + 1;
97         sampler.AddWithWeight(dis(gen), weight);
98         remaining_num_items -= weight;
99     }
100     EXPECT_EQ(compactor_stack.num_stored_items(), params.num_items / sampler.capacity());
101     if (sampler.sampled_item_and_weight().has_value()) {
102         EXPECT_EQ(sampler.sampled_item_and_weight().value().second,
103                   params.num_items % sampler.capacity());
104     } else {
105         EXPECT_EQ(0, params.num_items % sampler.capacity());
106     }
107 }
108 
109 INSTANTIATE_TEST_SUITE_P(AddsToCompactorsTestCases, AddsToCompactorsTest,
110                          ::testing::ValuesIn(GenCompactorsTestParams()));
111 class KllQuantileUseSamplerTest : public ::testing::Test {
112 protected:
KllQuantileUseSamplerTest()113     KllQuantileUseSamplerTest() {
114     }
~KllQuantileUseSamplerTest()115     ~KllQuantileUseSamplerTest() override {
116     }
117     MTRandomGenerator random_;
118 };
119 
TEST_F(KllQuantileUseSamplerTest,ZeroCapacityAfterReplacedWithSampler)120 TEST_F(KllQuantileUseSamplerTest, ZeroCapacityAfterReplacedWithSampler) {
121     CompactorStack compactor_stack(10, 10, &random_);
122     for (int i = 0; i < 200000; i++) {
123         compactor_stack.Add(random_.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
124     }
125     EXPECT_TRUE(compactor_stack.IsSamplerOn());
126     const auto& compactors = compactor_stack.compactors();
127     for (int i = 0; i < compactor_stack.lowest_active_level(); i++) {
128         EXPECT_EQ(compactors[i].capacity(), 0u);
129     }
130 }
131 
TEST_F(KllQuantileUseSamplerTest,NumStoredItemsWithSampledItem)132 TEST_F(KllQuantileUseSamplerTest, NumStoredItemsWithSampledItem) {
133     CompactorStack compactor_stack(10, 10, &random_);
134     for (int i = 0; i < 2000; i++) {
135         compactor_stack.Add(random_.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
136     }
137     EXPECT_TRUE(compactor_stack.IsSamplerOn());
138 
139     int num_items_in_compactors = 0;
140     for (const auto& compactor : compactor_stack.compactors()) {
141         if (compactor.capacity() > 0) {
142             num_items_in_compactors += compactor.size();
143         }
144     }
145     if (compactor_stack.sampled_item_and_weight().has_value()) {
146         EXPECT_EQ(compactor_stack.num_stored_items(), num_items_in_compactors + 1);
147     } else {
148         EXPECT_EQ(compactor_stack.num_stored_items(), num_items_in_compactors);
149         compactor_stack.Add(random_.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
150         EXPECT_EQ(compactor_stack.num_stored_items(), num_items_in_compactors + 1);
151     }
152 }
153 
TEST_F(KllQuantileUseSamplerTest,ResetWithSampler)154 TEST_F(KllQuantileUseSamplerTest, ResetWithSampler) {
155     // Set a fixed seed for this test, as it is not given that there are 40 items
156     // in the compactor stack after 2000 insertions.
157     MTRandomGenerator random(10);
158     CompactorStack compactor_stack(10, 10, &random);
159     for (int i = 0; i < 2000; i++) {
160         compactor_stack.Add(random_.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
161     }
162     EXPECT_TRUE(compactor_stack.IsSamplerOn());
163     EXPECT_GE(compactor_stack.num_stored_items(), 40);
164     EXPECT_LE(compactor_stack.num_stored_items(), 2000);
165     EXPECT_GE(compactor_stack.compactors().size(), 2u);
166 
167     compactor_stack.Reset();
168     EXPECT_FALSE(compactor_stack.IsSamplerOn());
169     EXPECT_FALSE(compactor_stack.sampled_item_and_weight().has_value());
170     EXPECT_EQ(compactor_stack.num_stored_items(), 0);
171     EXPECT_EQ(compactor_stack.compactors().size(), 1u);
172 
173     for (int i = 0; i < 2000; i++) {
174         compactor_stack.Add(random_.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
175     }
176     EXPECT_TRUE(compactor_stack.IsSamplerOn());
177     EXPECT_GE(compactor_stack.num_stored_items(), 40);
178     EXPECT_LE(compactor_stack.num_stored_items(), 2000);
179     EXPECT_GE(compactor_stack.compactors().size(), 2u);
180 }
181 
182 struct AddWithSamplerParam {
183     int64_t inv_eps;
184     int64_t inv_delta;
185     int64_t num_items;
186 };
187 
188 class AddWithSamplerTest : public ::testing::TestWithParam<AddWithSamplerParam> {};
189 
TEST_P(AddWithSamplerTest,AddWithSampler)190 TEST_P(AddWithSamplerTest, AddWithSampler) {
191     const AddWithSamplerParam params = GetParam();
192     std::random_device rd;
193     MTRandomGenerator random(rd());
194     CompactorStack compactor_stack(params.inv_eps, params.inv_delta, &random);
195 
196     int upbound = (random.UnbiasedUniform(3) + 1) * params.num_items + 1;
197     for (int i = 0; i < upbound; i++) {
198         compactor_stack.Add(random.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
199     }
200     EXPECT_TRUE(compactor_stack.IsSamplerOn());
201 
202     int lowest_active_level = compactor_stack.lowest_active_level();
203     int size_before_add = compactor_stack.compactors()[lowest_active_level].size();
204     uint64_t previous_sampled_item = -1;
205     if (compactor_stack.sampled_item_and_weight().has_value()) {
206         previous_sampled_item = compactor_stack.sampled_item_and_weight()->first;
207     }
208     // Add additional value sufficiently many times to complete full sampler
209     // cycle.
210     for (int i = 0; i < compactor_stack.sampler_capacity(); i++) {
211         compactor_stack.Add(10);
212     }
213 
214     int size_after_add = compactor_stack.compactors()[lowest_active_level].size();
215     if (size_after_add > 0) {
216         EXPECT_EQ(size_after_add, size_before_add + 1);
217         EXPECT_THAT(compactor_stack.compactors()[lowest_active_level],
218                     AnyOf(Contains(10), Contains(previous_sampled_item)));
219     }
220 }
221 
TEST_P(AddWithSamplerTest,AddWithWeightWithSampler)222 TEST_P(AddWithSamplerTest, AddWithWeightWithSampler) {
223     // Set a fixed seed, since the tests depends on which level is propagated
224     // in the compactor stack.
225     uint64_t seed = 3;
226     const AddWithSamplerParam params = GetParam();
227     MTRandomGenerator random = MTRandomGenerator(seed);
228     CompactorStack compactor_stack(params.inv_eps, params.inv_delta, &random);
229 
230     for (int i = 0; i < params.num_items; i++) {
231         compactor_stack.Add(random.UnbiasedUniform(std::numeric_limits<uint32_t>::max()));
232     }
233     ASSERT_TRUE(compactor_stack.IsSamplerOn());
234     if (!compactor_stack.sampled_item_and_weight().has_value()) {
235         compactor_stack.Add(random.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
236     }
237     int lowest_active_level = compactor_stack.lowest_active_level();
238     int size_before_add_lowest_level = compactor_stack.compactors()[lowest_active_level].size();
239     int size_before_add_level_plus2 = compactor_stack.compactors()[lowest_active_level + 2].size();
240     int size_before_add_level_plus3 = compactor_stack.compactors()[lowest_active_level + 3].size();
241     uint64_t previous_sampled_item = -1;
242     if (compactor_stack.sampled_item_and_weight().has_value()) {
243         previous_sampled_item = compactor_stack.sampled_item_and_weight()->first;
244     }
245 
246     // Expected additions: one item from the sampler to the lowest active level;
247     // one item to the lowest active level + 2.
248     int weight = 5 * (1 << lowest_active_level) - 1;
249     compactor_stack.AddWithWeight(10.0, weight);
250 
251     int size_after_add_lowest_level = compactor_stack.compactors()[lowest_active_level].size();
252     // > 0 to make sure we are not checking compact case
253     if (size_after_add_lowest_level > 0) {
254         EXPECT_EQ(size_after_add_lowest_level, size_before_add_lowest_level + 1);
255         EXPECT_THAT(compactor_stack.compactors()[lowest_active_level],
256                     AnyOf(Contains(10.0), Contains(previous_sampled_item)));
257     }
258 
259     // Level+2 is expected to change.
260     // Exact behavior also subject to seed - a lower level might be compacted
261     // into this one, or this one might be compacted up.
262     // Test only checks the uncompacted case. (Different from Java version)
263     int size_after_add_level_plus2 = compactor_stack.compactors()[lowest_active_level + 2].size();
264     if (size_after_add_level_plus2 >
265         // make sure we are not checking comact case
266         size_before_add_level_plus2) {
267         EXPECT_EQ(size_after_add_level_plus2, size_before_add_level_plus2 + 1);
268         EXPECT_THAT(compactor_stack.compactors()[lowest_active_level + 2], Contains(10.0));
269         int size_after_add_level_plus3 =
270                 compactor_stack.compactors()[lowest_active_level + 3].size();
271         EXPECT_EQ(size_after_add_level_plus3, size_before_add_level_plus3);
272     }
273 }
274 
TEST_P(AddWithSamplerTest,CompactorsStillBigXorSamplerIsOn)275 TEST_P(AddWithSamplerTest, CompactorsStillBigXorSamplerIsOn) {
276     std::random_device rd;
277     uint64_t seed = rd();
278     const AddWithSamplerParam params = GetParam();
279     MTRandomGenerator random = MTRandomGenerator(seed);
280     CompactorStack compactor_stack(params.inv_eps, params.inv_delta, &random);
281     for (int i = 0; i < params.num_items; i++) {
282         compactor_stack.Add(random.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
283         int lowest_compactor_capacity = compactor_stack.TargetCapacityAtLevel(0);
284         EXPECT_TRUE((lowest_compactor_capacity > 2) ^ (compactor_stack.IsSamplerOn()));
285     }
286 }
287 
TEST_P(AddWithSamplerTest,SampledItemIsPresent)288 TEST_P(AddWithSamplerTest, SampledItemIsPresent) {
289     std::random_device rd;
290     uint64_t seed = rd();
291     const AddWithSamplerParam params = GetParam();
292     MTRandomGenerator random = MTRandomGenerator(seed);
293     CompactorStack compactor_stack(params.inv_eps, params.inv_delta, &random);
294 
295     for (int i = 0; i < params.num_items; i++) {
296         compactor_stack.Add(random.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
297     }
298     int sampled_item_weight = (compactor_stack.sampled_item_and_weight().has_value())
299                                       ? compactor_stack.sampled_item_and_weight().value().second
300                                       : 0;
301     compactor_stack.Add(random.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
302     int sampled_item_weight_after_add =
303             (compactor_stack.sampled_item_and_weight().has_value())
304                     ? compactor_stack.sampled_item_and_weight().value().second
305                     : 0;
306     sampled_item_weight = std::max(sampled_item_weight, sampled_item_weight_after_add);
307     EXPECT_GT(sampled_item_weight, 0);
308     EXPECT_LE(sampled_item_weight, (1 << compactor_stack.lowest_active_level()));
309 }
310 
311 INSTANTIATE_TEST_SUITE_P(AddWithSamplerTestCases, AddWithSamplerTest,
312                          ::testing::ValuesIn(std::vector<AddWithSamplerParam>{
313                                  {10, 10, 2400},
314                                  {10, 100, 4600},
315                                  {10, 1000, 5000},
316                                  {50, 10000, 5000000},
317                                  {100, 100, 1250000},
318                                  {100, 1000, 2000000}}));
319 
320 }  // namespace
321 
322 }  // namespace internal
323 }  // namespace aggregation
324 }  // namespace dist_proc
325