1 /*
2  * Copyright 2021 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     https://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <algorithm>
20 #include <cmath>
21 #include <cstdint>
22 #include <memory>
23 #include <utility>
24 #include <vector>
25 
26 #include "random_generator.h"
27 #include "sampler.h"
28 
29 namespace dist_proc {
30 namespace aggregation {
31 namespace internal {
32 
33 class KllSampler;
34 
35 // Hierarchy of compactors, which store items from the stream and 'compact'
36 // them when necessary (i.e., keep every second item in a sorted compactor)
37 // and add them to the compactor one level up.
38 class CompactorStack {
39 public:
40     CompactorStack(int64_t inv_eps, int64_t inv_delta, RandomGenerator* random);
41     CompactorStack(int64_t inv_eps, int64_t inv_delta, int k, RandomGenerator* random);
42     ~CompactorStack();
43 
44     // Initialize or reset the compactor stack and all counters and thresholds.
45     void Reset();
46 
47     void Add(const int64_t value);
48 
49     // Adds an item to the compactor stack with weight >= 1.
50     // Does nothing if weight <= 0.
51     void AddWithWeight(int64_t value, int weight);
52 
53     // Ensures that the contents of each compactor are sorted.
54     void SortCompactorContents();
55 
56     // Target capacity of compactor with index h. If this capacity is exceeded,
57     // the compactor will be lazily compacted in one of the next CompactStack()
58     // runs. I.e., this capacity can be temorarily exceeded.
59     int TargetCapacityAtLevel(int h) const;
60 
61     void DoubleSamplerCapacity();
62 
63     int num_stored_items() const;
64 
65     std::optional<std::pair<const int64_t, int64_t>> sampled_item_and_weight() const;
66 
67     // Returns the lowest active level in the compactor stack, which is identical
68     // with the number of replaced levels, or log2(sampler_capacity()).
69     int lowest_active_level() const;
70 
71     int64_t sampler_capacity() const;
72 
73     // For testing
IsSamplerOn()74     bool IsSamplerOn() const {
75         return sampler_ != nullptr;
76     }
77 
compactors()78     const std::vector<std::vector<int64_t>>& compactors() const {
79         return compactors_;
80     }
81 
random()82     RandomGenerator* random() {
83         return random_;
84     }
85 
k()86     int k() const {
87         return k_;
88     }
89 
90 private:
91     void ClearCompactors();
92 
93     // Adds a new compactor at the highest level. To be called when the currently
94     // topmost compactor is full.
95     void AddLevel();
96 
97     // Called when at least one level in the compactor stack is above capacity.
98     // Iterates from bottom to top through the compactors and compacts the
99     // first one that is over its capacity by halving its contents and adding
100     // them to the compactor one level higher.
101     void CompactStack();
102 
103     void CompactLevel(int level);
104 
105     // To compact the items in a compactor to roughly half the size,
106     // sorts the items and adds every even or odd item (determined randomly)
107     // to the up_compactor.
108     void Halve(std::vector<int64_t>* down_compactor, std::vector<int64_t>* up_compactor);
109 
110     std::vector<std::vector<int64_t>> compactors_;
111     int k_;
112     const double c_ = 2.0 / 3.0;
113     int overall_capacity_;
114     int num_items_in_compactors_;
115     RandomGenerator* random_;
116     std::unique_ptr<KllSampler> sampler_;
117 };
118 
119 }  // namespace internal
120 }  // namespace aggregation
121 }  // namespace dist_proc
122