/* * Copyright (C) 2017 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #define STATSD_DEBUG false // STOPSHIP if true #include "Log.h" #include "ValueMetricProducer.h" #include #include #include #include "FieldValue.h" #include "HashableDimensionKey.h" #include "guardrail/StatsdStats.h" #include "metrics/parsing_utils/metrics_manager_util.h" #include "stats_log_util.h" #include "stats_util.h" using android::util::FIELD_COUNT_REPEATED; using android::util::FIELD_TYPE_BOOL; using android::util::FIELD_TYPE_INT32; using android::util::FIELD_TYPE_INT64; using android::util::FIELD_TYPE_MESSAGE; using android::util::ProtoOutputStream; using dist_proc::aggregation::KllQuantile; using std::optional; using std::shared_ptr; using std::unique_ptr; using std::unordered_map; using std::vector; namespace android { namespace os { namespace statsd { // for StatsLogReport const int FIELD_ID_ID = 1; const int FIELD_ID_TIME_BASE = 9; const int FIELD_ID_BUCKET_SIZE = 10; const int FIELD_ID_DIMENSION_PATH_IN_WHAT = 11; const int FIELD_ID_IS_ACTIVE = 14; const int FIELD_ID_DIMENSION_GUARDRAIL_HIT = 17; const int FIELD_ID_ESTIMATED_MEMORY_BYTES = 18; // for *MetricDataWrapper const int FIELD_ID_DATA = 1; const int FIELD_ID_SKIPPED = 2; // for SkippedBuckets const int FIELD_ID_SKIPPED_START_MILLIS = 3; const int FIELD_ID_SKIPPED_END_MILLIS = 4; const int FIELD_ID_SKIPPED_DROP_EVENT = 5; // for DumpEvent Proto const int FIELD_ID_BUCKET_DROP_REASON = 1; const int FIELD_ID_DROP_TIME = 2; // for *MetricData const int FIELD_ID_DIMENSION_IN_WHAT = 1; const int FIELD_ID_BUCKET_INFO = 3; const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4; const int FIELD_ID_SLICE_BY_STATE = 6; template ValueMetricProducer::ValueMetricProducer( const int64_t metricId, const ConfigKey& key, const uint64_t protoHash, const PullOptions& pullOptions, const BucketOptions& bucketOptions, const WhatOptions& whatOptions, const ConditionOptions& conditionOptions, const StateOptions& stateOptions, const ActivationOptions& activationOptions, const GuardrailOptions& guardrailOptions, const wp configMetadataProvider) : MetricProducer(metricId, key, bucketOptions.timeBaseNs, conditionOptions.conditionIndex, conditionOptions.initialConditionCache, conditionOptions.conditionWizard, protoHash, activationOptions.eventActivationMap, activationOptions.eventDeactivationMap, stateOptions.slicedStateAtoms, stateOptions.stateGroupMap, bucketOptions.splitBucketForAppUpgrade, configMetadataProvider), mWhatMatcherIndex(whatOptions.whatMatcherIndex), mEventMatcherWizard(whatOptions.matcherWizard), mPullerManager(pullOptions.pullerManager), mFieldMatchers(whatOptions.fieldMatchers), mPullAtomId(pullOptions.pullAtomId), mMinBucketSizeNs(bucketOptions.minBucketSizeNs), mDimensionSoftLimit(guardrailOptions.dimensionSoftLimit), mDimensionHardLimit(guardrailOptions.dimensionHardLimit), mCurrentBucketIsSkipped(false), mConditionCorrectionThresholdNs(bucketOptions.conditionCorrectionThresholdNs) { // TODO(b/185722221): inject directly via initializer list in MetricProducer. mBucketSizeNs = bucketOptions.bucketSizeNs; // TODO(b/185770171): inject dimensionsInWhat related fields via constructor. if (whatOptions.dimensionsInWhat.field() > 0) { translateFieldMatcher(whatOptions.dimensionsInWhat, &mDimensionsInWhat); } mContainANYPositionInDimensionsInWhat = whatOptions.containsAnyPositionInDimensionsInWhat; mShouldUseNestedDimensions = whatOptions.shouldUseNestedDimensions; if (conditionOptions.conditionLinks.size() > 0) { for (const auto& link : conditionOptions.conditionLinks) { Metric2Condition mc; mc.conditionId = link.condition(); translateFieldMatcher(link.fields_in_what(), &mc.metricFields); translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields); mMetric2ConditionLinks.push_back(mc); } // TODO(b/185770739): use !mMetric2ConditionLinks.empty() instead mConditionSliced = true; } for (const auto& stateLink : stateOptions.stateLinks) { Metric2State ms; ms.stateAtomId = stateLink.state_atom_id(); translateFieldMatcher(stateLink.fields_in_what(), &ms.metricFields); translateFieldMatcher(stateLink.fields_in_state(), &ms.stateFields); mMetric2StateLinks.push_back(ms); } const int64_t numBucketsForward = calcBucketsForwardCount(bucketOptions.startTimeNs); mCurrentBucketNum = numBucketsForward; flushIfNeededLocked(bucketOptions.startTimeNs); if (isPulled()) { mPullerManager->RegisterReceiver(mPullAtomId, mConfigKey, this, getCurrentBucketEndTimeNs(), mBucketSizeNs); } // Only do this for partial buckets like first bucket. All other buckets should use // flushIfNeeded to adjust start and end to bucket boundaries. // Adjust start for partial bucket mCurrentBucketStartTimeNs = bucketOptions.startTimeNs; mConditionTimer.newBucketStart(mCurrentBucketStartTimeNs, mCurrentBucketStartTimeNs); // Now that activations are processed, start the condition timer if needed. mConditionTimer.onConditionChanged(mIsActive && mCondition == ConditionState::kTrue, mCurrentBucketStartTimeNs); } template ValueMetricProducer::~ValueMetricProducer() { VLOG("~ValueMetricProducer() called"); if (isPulled()) { mPullerManager->UnRegisterReceiver(mPullAtomId, mConfigKey, this); } } template void ValueMetricProducer::onStatsdInitCompleted( const int64_t eventTimeNs) { ATRACE_CALL(); lock_guard lock(mMutex); if (isPulled() && mCondition == ConditionState::kTrue && mIsActive) { pullAndMatchEventsLocked(eventTimeNs); } flushCurrentBucketLocked(eventTimeNs, eventTimeNs); } template void ValueMetricProducer::notifyAppUpgradeInternalLocked( const int64_t eventTimeNs) { if (isPulled() && mCondition == ConditionState::kTrue && mIsActive) { pullAndMatchEventsLocked(eventTimeNs); } flushCurrentBucketLocked(eventTimeNs, eventTimeNs); } template optional ValueMetricProducer::onConfigUpdatedLocked( const StatsdConfig& config, const int configIndex, const int metricIndex, const vector>& allAtomMatchingTrackers, const unordered_map& oldAtomMatchingTrackerMap, const unordered_map& newAtomMatchingTrackerMap, const sp& matcherWizard, const vector>& allConditionTrackers, const unordered_map& conditionTrackerMap, const sp& wizard, const unordered_map& metricToActivationMap, unordered_map>& trackerToMetricMap, unordered_map>& conditionToMetricMap, unordered_map>& activationAtomTrackerToMetricMap, unordered_map>& deactivationAtomTrackerToMetricMap, vector& metricsWithActivation) { optional invalidConfigReason = MetricProducer::onConfigUpdatedLocked( config, configIndex, metricIndex, allAtomMatchingTrackers, oldAtomMatchingTrackerMap, newAtomMatchingTrackerMap, matcherWizard, allConditionTrackers, conditionTrackerMap, wizard, metricToActivationMap, trackerToMetricMap, conditionToMetricMap, activationAtomTrackerToMetricMap, deactivationAtomTrackerToMetricMap, metricsWithActivation); if (invalidConfigReason.has_value()) { return invalidConfigReason; } // Update appropriate indices: mWhatMatcherIndex, mConditionIndex and MetricsManager maps. const int64_t atomMatcherId = getWhatAtomMatcherIdForMetric(config, configIndex); invalidConfigReason = handleMetricWithAtomMatchingTrackers( atomMatcherId, mMetricId, metricIndex, /*enforceOneAtom=*/false, allAtomMatchingTrackers, newAtomMatchingTrackerMap, trackerToMetricMap, mWhatMatcherIndex); if (invalidConfigReason.has_value()) { return invalidConfigReason; } const optional& conditionIdOpt = getConditionIdForMetric(config, configIndex); const ConditionLinks& conditionLinks = getConditionLinksForMetric(config, configIndex); if (conditionIdOpt.has_value()) { invalidConfigReason = handleMetricWithConditions( conditionIdOpt.value(), mMetricId, metricIndex, conditionTrackerMap, conditionLinks, allConditionTrackers, mConditionTrackerIndex, conditionToMetricMap); if (invalidConfigReason.has_value()) { return invalidConfigReason; } } sp tmpEventWizard = mEventMatcherWizard; mEventMatcherWizard = matcherWizard; return nullopt; } template size_t ValueMetricProducer::computeValueBucketSizeLocked( const bool isFullBucket, const MetricDimensionKey& dimKey, const bool isFirstBucket, const PastBucket& bucket) const { size_t bucketSize = MetricProducer::computeBucketSizeLocked(isFullBucket, dimKey, isFirstBucket); for (const auto& value : bucket.aggregates) { bucketSize += getAggregatedValueSize(value); } // ConditionTrueNanos if (mConditionTrackerIndex >= 0 || !mSlicedStateAtoms.empty()) { bucketSize += sizeof(int64_t); } // ConditionCorrectionNanos if (getDumpProtoFields().conditionCorrectionNsFieldId.has_value() && isPulled() && mConditionCorrectionThresholdNs && (abs(bucket.mConditionCorrectionNs) >= mConditionCorrectionThresholdNs)) { bucketSize += sizeof(int64_t); } return bucketSize; } template void ValueMetricProducer::onStateChanged( int64_t eventTimeNs, int32_t atomId, const HashableDimensionKey& primaryKey, const FieldValue& oldState, const FieldValue& newState) { std::lock_guard lock(mMutex); VLOG("ValueMetricProducer %lld onStateChanged time %lld, State %d, key %s, %d -> %d", (long long)mMetricId, (long long)eventTimeNs, atomId, primaryKey.toString().c_str(), oldState.mValue.int_value, newState.mValue.int_value); FieldValue oldStateCopy = oldState; FieldValue newStateCopy = newState; mapStateValue(atomId, &oldStateCopy); mapStateValue(atomId, &newStateCopy); // If old and new states are in the same StateGroup, then we do not need to // pull for this state change. if (oldStateCopy == newStateCopy) { return; } // If condition is not true or metric is not active, we do not need to pull // for this state change. if (mCondition != ConditionState::kTrue || !mIsActive) { return; } if (isEventLateLocked(eventTimeNs)) { VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, (long long)mCurrentBucketStartTimeNs); invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET); return; } if (isPulled()) { mStateChangePrimaryKey.first = atomId; mStateChangePrimaryKey.second = primaryKey; // TODO(b/185796114): pass mStateChangePrimaryKey as an argument to // pullAndMatchEventsLocked pullAndMatchEventsLocked(eventTimeNs); mStateChangePrimaryKey.first = 0; mStateChangePrimaryKey.second = DEFAULT_DIMENSION_KEY; } flushIfNeededLocked(eventTimeNs); } template void ValueMetricProducer::onSlicedConditionMayChangeLocked( bool overallCondition, const int64_t eventTime) { VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId); } template void ValueMetricProducer::dropDataLocked(const int64_t dropTimeNs) { StatsdStats::getInstance().noteBucketDropped(mMetricId); // The current partial bucket is not flushed and does not require a pull, // so the data is still valid. flushIfNeededLocked(dropTimeNs); clearPastBucketsLocked(dropTimeNs); } template void ValueMetricProducer::clearPastBucketsLocked( const int64_t dumpTimeNs) { mPastBuckets.clear(); mSkippedBuckets.clear(); mTotalDataSize = 0; } template void ValueMetricProducer::onDumpReportLocked( const int64_t dumpTimeNs, const bool includeCurrentPartialBucket, const bool eraseData, const DumpLatency dumpLatency, set* strSet, ProtoOutputStream* protoOutput) { VLOG("metric %lld dump report now...", (long long)mMetricId); // Pulled metrics need to pull before flushing, which is why they do not call flushIfNeeded. // TODO: b/249823426 see if we can pull and call flushIfneeded for pulled value metrics. if (!isPulled()) { flushIfNeededLocked(dumpTimeNs); } if (includeCurrentPartialBucket) { // For pull metrics, we need to do a pull at bucket boundaries. If we do not do that the // current bucket will have incomplete data and the next will have the wrong snapshot to do // a diff against. If the condition is false, we are fine since the base data is reset and // we are not tracking anything. if (isPulled() && mCondition == ConditionState::kTrue && mIsActive) { switch (dumpLatency) { case FAST: invalidateCurrentBucket(dumpTimeNs, BucketDropReason::DUMP_REPORT_REQUESTED); break; case NO_TIME_CONSTRAINTS: pullAndMatchEventsLocked(dumpTimeNs); break; } } flushCurrentBucketLocked(dumpTimeNs, dumpTimeNs); } protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId); protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_IS_ACTIVE, isActiveLocked()); if (mPastBuckets.empty() && mSkippedBuckets.empty()) { return; } protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ESTIMATED_MEMORY_BYTES, (long long)byteSizeLocked()); if (StatsdStats::getInstance().hasHitDimensionGuardrail(mMetricId)) { protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_DIMENSION_GUARDRAIL_HIT, true); } protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_TIME_BASE, (long long)mTimeBaseNs); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_SIZE, (long long)mBucketSizeNs); // Fills the dimension path if not slicing by a primitive repeated field or position ALL. if (!mShouldUseNestedDimensions) { if (!mDimensionsInWhat.empty()) { uint64_t dimenPathToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT); writeDimensionPathToProto(mDimensionsInWhat, protoOutput); protoOutput->end(dimenPathToken); } } const auto& [metricTypeFieldId, bucketNumFieldId, startBucketMsFieldId, endBucketMsFieldId, conditionTrueNsFieldId, conditionCorrectionNsFieldId] = getDumpProtoFields(); uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | metricTypeFieldId); for (const auto& skippedBucket : mSkippedBuckets) { uint64_t wrapperToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START_MILLIS, (long long)(NanoToMillis(skippedBucket.bucketStartTimeNs))); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END_MILLIS, (long long)(NanoToMillis(skippedBucket.bucketEndTimeNs))); for (const auto& dropEvent : skippedBucket.dropEvents) { uint64_t dropEventToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED_DROP_EVENT); protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_BUCKET_DROP_REASON, dropEvent.reason); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DROP_TIME, (long long)(NanoToMillis(dropEvent.dropTimeNs))); protoOutput->end(dropEventToken); } protoOutput->end(wrapperToken); } for (const auto& [metricDimensionKey, buckets] : mPastBuckets) { VLOG(" dimension key %s", metricDimensionKey.toString().c_str()); uint64_t wrapperToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA); // First fill dimension. if (mShouldUseNestedDimensions) { uint64_t dimensionToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT); writeDimensionToProto(metricDimensionKey.getDimensionKeyInWhat(), strSet, protoOutput); protoOutput->end(dimensionToken); } else { writeDimensionLeafNodesToProto(metricDimensionKey.getDimensionKeyInWhat(), FIELD_ID_DIMENSION_LEAF_IN_WHAT, strSet, protoOutput); } // Then fill slice_by_state. for (auto state : metricDimensionKey.getStateValuesKey().getValues()) { uint64_t stateToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SLICE_BY_STATE); writeStateToProto(state, protoOutput); protoOutput->end(stateToken); } // Then fill bucket_info (*BucketInfo). for (const auto& bucket : buckets) { uint64_t bucketInfoToken = protoOutput->start( FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO); if (bucket.mBucketEndNs - bucket.mBucketStartNs != mBucketSizeNs) { protoOutput->write(FIELD_TYPE_INT64 | startBucketMsFieldId, (long long)NanoToMillis(bucket.mBucketStartNs)); protoOutput->write(FIELD_TYPE_INT64 | endBucketMsFieldId, (long long)NanoToMillis(bucket.mBucketEndNs)); } else { protoOutput->write(FIELD_TYPE_INT64 | bucketNumFieldId, (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs))); } // We only write the condition timer value if the metric has a // condition and/or is sliced by state. // If the metric is sliced by state, the condition timer value is // also sliced by state to reflect time spent in that state. if (mConditionTrackerIndex >= 0 || !mSlicedStateAtoms.empty()) { protoOutput->write(FIELD_TYPE_INT64 | conditionTrueNsFieldId, (long long)bucket.mConditionTrueNs); } if (conditionCorrectionNsFieldId) { // We write the condition correction value when below conditions are true: // - if metric is pulled // - if it is enabled by metric configuration via dedicated field, // see condition_correction_threshold_nanos // - if the abs(value) >= condition_correction_threshold_nanos if (isPulled() && mConditionCorrectionThresholdNs && (abs(bucket.mConditionCorrectionNs) >= mConditionCorrectionThresholdNs)) { protoOutput->write(FIELD_TYPE_INT64 | conditionCorrectionNsFieldId.value(), (long long)bucket.mConditionCorrectionNs); } } for (int i = 0; i < (int)bucket.aggIndex.size(); i++) { VLOG("\t bucket [%lld - %lld]", (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs); int sampleSize = !bucket.sampleSizes.empty() ? bucket.sampleSizes[i] : 0; writePastBucketAggregateToProto(bucket.aggIndex[i], bucket.aggregates[i], sampleSize, protoOutput); } protoOutput->end(bucketInfoToken); } protoOutput->end(wrapperToken); } protoOutput->end(protoToken); VLOG("metric %lld done with dump report...", (long long)mMetricId); if (eraseData) { mPastBuckets.clear(); mSkippedBuckets.clear(); mTotalDataSize = 0; } } template void ValueMetricProducer::invalidateCurrentBucket( const int64_t dropTimeNs, const BucketDropReason reason) { if (!mCurrentBucketIsSkipped) { // Only report to StatsdStats once per invalid bucket. StatsdStats::getInstance().noteInvalidatedBucket(mMetricId); } skipCurrentBucket(dropTimeNs, reason); } template void ValueMetricProducer::skipCurrentBucket( const int64_t dropTimeNs, const BucketDropReason reason) { if (!mIsActive) { // Don't keep track of skipped buckets if metric is not active. return; } if (!maxDropEventsReached()) { mCurrentSkippedBucket.dropEvents.push_back(buildDropEvent(dropTimeNs, reason)); } mCurrentBucketIsSkipped = true; } // Handle active state change. Active state change is *mostly* treated like a condition change: // - drop bucket if active state change event arrives too late // - if condition is true, pull data on active state changes // - ConditionTimer tracks changes based on AND of condition and active state. template void ValueMetricProducer::onActiveStateChangedLocked( const int64_t eventTimeNs, const bool isActive) { const bool eventLate = isEventLateLocked(eventTimeNs); if (eventLate) { // Drop bucket because event arrived too late, ie. we are missing data for this bucket. StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId); invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET); } if (ConditionState::kTrue != mCondition) { // Call parent method before early return. MetricProducer::onActiveStateChangedLocked(eventTimeNs, isActive); return; } // Pull on active state changes. if (!eventLate) { if (isPulled()) { pullAndMatchEventsLocked(eventTimeNs); } onActiveStateChangedInternalLocked(eventTimeNs, isActive); } // Once any pulls are processed, call through to parent method which might flush the current // bucket. MetricProducer::onActiveStateChangedLocked(eventTimeNs, isActive); // Let condition timer know of new active state. mConditionTimer.onConditionChanged(isActive, eventTimeNs); updateCurrentSlicedBucketConditionTimers(isActive, eventTimeNs); } template void ValueMetricProducer::onConditionChangedLocked( const bool condition, const int64_t eventTimeNs) { const ConditionState newCondition = condition ? ConditionState::kTrue : ConditionState::kFalse; const ConditionState oldCondition = mCondition; if (!mIsActive) { mCondition = newCondition; return; } // If the event arrived late, mark the bucket as invalid and skip the event. if (isEventLateLocked(eventTimeNs)) { VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, (long long)mCurrentBucketStartTimeNs); StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId); StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId); invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET); mCondition = newCondition; mConditionTimer.onConditionChanged(newCondition, eventTimeNs); updateCurrentSlicedBucketConditionTimers(newCondition, eventTimeNs); return; } // If the previous condition was unknown, mark the bucket as invalid // because the bucket will contain partial data. For example, the condition // change might happen close to the end of the bucket and we might miss a // lot of data. // We still want to pull to set the base for diffed metrics. if (oldCondition == ConditionState::kUnknown) { invalidateCurrentBucket(eventTimeNs, BucketDropReason::CONDITION_UNKNOWN); } // Pull and match for the following condition change cases: // unknown/false -> true - condition changed // true -> false - condition changed // true -> true - old condition was true so we can flush the bucket at the // end if needed. // // We don’t need to pull for unknown -> false or false -> false. // // onConditionChangedLocked might happen on bucket boundaries if this is // called before #onDataPulled. if (isPulled() && (newCondition == ConditionState::kTrue || oldCondition == ConditionState::kTrue)) { pullAndMatchEventsLocked(eventTimeNs); } onConditionChangedInternalLocked(oldCondition, newCondition, eventTimeNs); // Update condition state after pulling. mCondition = newCondition; flushIfNeededLocked(eventTimeNs); mConditionTimer.onConditionChanged(newCondition, eventTimeNs); updateCurrentSlicedBucketConditionTimers(newCondition, eventTimeNs); } template void ValueMetricProducer::updateCurrentSlicedBucketConditionTimers( bool newCondition, int64_t eventTimeNs) { if (mSlicedStateAtoms.empty()) { return; } // Utilize the current state key of each DimensionsInWhat key to determine // which condition timers to update. // // Assumes that the MetricDimensionKey exists in `mCurrentSlicedBucket`. for (const auto& [dimensionInWhatKey, dimensionInWhatInfo] : mDimInfos) { // If the new condition is true, turn ON the condition timer only if // the DimensionInWhat key was present in the data. mCurrentSlicedBucket[MetricDimensionKey(dimensionInWhatKey, dimensionInWhatInfo.currentState)] .conditionTimer.onConditionChanged( newCondition && dimensionInWhatInfo.hasCurrentState, eventTimeNs); } } template void ValueMetricProducer::dumpStatesLocked(int out, bool verbose) const { if (mCurrentSlicedBucket.size() == 0) { return; } dprintf(out, "ValueMetricProducer %lld dimension size %lu\n", (long long)mMetricId, (unsigned long)mCurrentSlicedBucket.size()); if (verbose) { for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) { for (const Interval& interval : currentBucket.intervals) { dprintf(out, "\t(what)%s\t(states)%s (aggregate)%s\n", metricDimensionKey.getDimensionKeyInWhat().toString().c_str(), metricDimensionKey.getStateValuesKey().toString().c_str(), aggregatedValueToString(interval.aggregate).c_str()); } } } } template bool ValueMetricProducer::hasReachedGuardRailLimit() const { return mCurrentSlicedBucket.size() >= mDimensionHardLimit; } template bool ValueMetricProducer::hitGuardRailLocked( const MetricDimensionKey& newKey) const { // ===========GuardRail============== // 1. Report the tuple count if the tuple count > soft limit if (mCurrentSlicedBucket.find(newKey) != mCurrentSlicedBucket.end()) { return false; } if (mCurrentSlicedBucket.size() > mDimensionSoftLimit - 1) { size_t newTupleCount = mCurrentSlicedBucket.size() + 1; StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount); // 2. Don't add more tuples, we are above the allowed threshold. Drop the data. if (hasReachedGuardRailLimit()) { if (!mHasHitGuardrail) { ALOGE("ValueMetricProducer %lld dropping data for dimension key %s", (long long)mMetricId, newKey.toString().c_str()); mHasHitGuardrail = true; } StatsdStats::getInstance().noteHardDimensionLimitReached(mMetricId); return true; } } return false; } template void ValueMetricProducer::onMatchedLogEventInternalLocked( const size_t matcherIndex, const MetricDimensionKey& eventKey, const ConditionKey& conditionKey, bool condition, const LogEvent& event, const map& statePrimaryKeys) { // Skip this event if a state change occurred for a different primary key. auto it = statePrimaryKeys.find(mStateChangePrimaryKey.first); // Check that both the atom id and the primary key are equal. if (it != statePrimaryKeys.end() && it->second != mStateChangePrimaryKey.second) { VLOG("ValueMetric skip event with primary key %s because state change primary key " "is %s", it->second.toString().c_str(), mStateChangePrimaryKey.second.toString().c_str()); return; } const int64_t eventTimeNs = event.GetElapsedTimestampNs(); if (isEventLateLocked(eventTimeNs)) { VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, (long long)mCurrentBucketStartTimeNs); return; } const auto whatKey = eventKey.getDimensionKeyInWhat(); mMatchedMetricDimensionKeys.insert(whatKey); if (!isPulled()) { // Only flushing for pushed because for pulled metrics, we need to do a pull first. flushIfNeededLocked(eventTimeNs); } if (canSkipLogEventLocked(eventKey, condition, eventTimeNs, statePrimaryKeys)) { return; } if (hitGuardRailLocked(eventKey)) { return; } const auto& returnVal = mDimInfos.emplace(whatKey, DimensionsInWhatInfo(getUnknownStateKey())); DimensionsInWhatInfo& dimensionsInWhatInfo = returnVal.first->second; const HashableDimensionKey& oldStateKey = dimensionsInWhatInfo.currentState; CurrentBucket& currentBucket = mCurrentSlicedBucket[MetricDimensionKey(whatKey, oldStateKey)]; // Ensure we turn on the condition timer in the case where dimensions // were missing on a previous pull due to a state change. const auto stateKey = eventKey.getStateValuesKey(); const bool stateChange = oldStateKey != stateKey || !dimensionsInWhatInfo.hasCurrentState; // We need to get the intervals stored with the previous state key so we can // close these value intervals. vector& intervals = currentBucket.intervals; if (intervals.size() < mFieldMatchers.size()) { VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size()); intervals.resize(mFieldMatchers.size()); } dimensionsInWhatInfo.hasCurrentState = true; dimensionsInWhatInfo.currentState = stateKey; dimensionsInWhatInfo.seenNewData |= aggregateFields(eventTimeNs, eventKey, event, intervals, dimensionsInWhatInfo.dimExtras); // State change. if (!mSlicedStateAtoms.empty() && stateChange) { // Turn OFF the condition timer for the previous state key. currentBucket.conditionTimer.onConditionChanged(false, eventTimeNs); // Turn ON the condition timer for the new state key. mCurrentSlicedBucket[MetricDimensionKey(whatKey, stateKey)] .conditionTimer.onConditionChanged(true, eventTimeNs); } } // For pulled metrics, we always need to make sure we do a pull before flushing the bucket // if mCondition and mIsActive are true! template void ValueMetricProducer::flushIfNeededLocked( const int64_t eventTimeNs) { const int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs(); if (eventTimeNs < currentBucketEndTimeNs) { VLOG("eventTime is %lld, less than current bucket end time %lld", (long long)eventTimeNs, (long long)(currentBucketEndTimeNs)); return; } int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs); int64_t nextBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs; flushCurrentBucketLocked(eventTimeNs, nextBucketStartTimeNs); } template int64_t ValueMetricProducer::calcBucketsForwardCount( const int64_t eventTimeNs) const { int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs(); if (eventTimeNs < currentBucketEndTimeNs) { return 0; } return 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs; } template void ValueMetricProducer::flushCurrentBucketLocked( const int64_t eventTimeNs, const int64_t nextBucketStartTimeNs) { if (mCondition == ConditionState::kUnknown) { StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId); invalidateCurrentBucket(eventTimeNs, BucketDropReason::CONDITION_UNKNOWN); } VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs, (int)mCurrentSlicedBucket.size()); closeCurrentBucket(eventTimeNs, nextBucketStartTimeNs); initNextSlicedBucket(nextBucketStartTimeNs); // Update the condition timer again, in case we skipped buckets. mConditionTimer.newBucketStart(eventTimeNs, nextBucketStartTimeNs); // NOTE: Update the condition timers in `mCurrentSlicedBucket` only when slicing // by state. Otherwise, the "global" condition timer will be used. if (!mSlicedStateAtoms.empty()) { for (auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) { currentBucket.conditionTimer.newBucketStart(eventTimeNs, nextBucketStartTimeNs); } } mCurrentBucketNum += calcBucketsForwardCount(eventTimeNs); } template void ValueMetricProducer::closeCurrentBucket( const int64_t eventTimeNs, const int64_t nextBucketStartTimeNs) { const int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs(); int64_t bucketEndTimeNs = fullBucketEndTimeNs; int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs); if (multipleBucketsSkipped(numBucketsForward)) { VLOG("Skipping forward %lld buckets", (long long)numBucketsForward); StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId); // Something went wrong. Maybe the device was sleeping for a long time. It is better // to mark the current bucket as invalid. The last pull might have been successful though. invalidateCurrentBucket(eventTimeNs, BucketDropReason::MULTIPLE_BUCKETS_SKIPPED); // End the bucket at the next bucket start time so the entire interval is skipped. bucketEndTimeNs = nextBucketStartTimeNs; } else if (eventTimeNs < fullBucketEndTimeNs) { bucketEndTimeNs = eventTimeNs; } // Close the current bucket const auto [globalConditionDurationNs, globalConditionCorrectionNs] = mConditionTimer.newBucketStart(eventTimeNs, bucketEndTimeNs); bool isBucketLargeEnough = bucketEndTimeNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs; if (!isBucketLargeEnough) { skipCurrentBucket(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL); } if (!mCurrentBucketIsSkipped) { bool bucketHasData = false; // The current bucket is large enough to keep. for (auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) { PastBucket bucket = buildPartialBucket(bucketEndTimeNs, currentBucket.intervals); if (bucket.aggIndex.empty()) { continue; } bucketHasData = true; if (!mSlicedStateAtoms.empty()) { const auto [conditionDurationNs, conditionCorrectionNs] = currentBucket.conditionTimer.newBucketStart(eventTimeNs, bucketEndTimeNs); bucket.mConditionTrueNs = conditionDurationNs; bucket.mConditionCorrectionNs = conditionCorrectionNs; } else { bucket.mConditionTrueNs = globalConditionDurationNs; bucket.mConditionCorrectionNs = globalConditionCorrectionNs; } auto& bucketList = mPastBuckets[metricDimensionKey]; const bool isFirstBucket = bucketList.empty(); mTotalDataSize += computeValueBucketSizeLocked( eventTimeNs >= fullBucketEndTimeNs, metricDimensionKey, isFirstBucket, bucket); bucketList.push_back(std::move(bucket)); } if (!bucketHasData) { skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA); } } if (mCurrentBucketIsSkipped) { mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs; mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTimeNs; mSkippedBuckets.push_back(mCurrentSkippedBucket); mTotalDataSize += computeSkippedBucketSizeLocked(mCurrentSkippedBucket); } // This means that the current bucket was not flushed before a forced bucket split. // This can happen if an app update or a dump report with includeCurrentPartialBucket is // requested before we get a chance to flush the bucket due to receiving new data, either from // the statsd socket or the StatsPullerManager. if (bucketEndTimeNs < nextBucketStartTimeNs) { SkippedBucket bucketInGap; bucketInGap.bucketStartTimeNs = bucketEndTimeNs; bucketInGap.bucketEndTimeNs = nextBucketStartTimeNs; bucketInGap.dropEvents.emplace_back(buildDropEvent(eventTimeNs, BucketDropReason::NO_DATA)); mSkippedBuckets.emplace_back(bucketInGap); } } template void ValueMetricProducer::initNextSlicedBucket( int64_t nextBucketStartTimeNs) { StatsdStats::getInstance().noteBucketCount(mMetricId); if (mSlicedStateAtoms.empty()) { mCurrentSlicedBucket.clear(); } else { for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) { bool obsolete = true; for (auto& interval : it->second.intervals) { interval.sampleSize = 0; } // When slicing by state, only delete the MetricDimensionKey when the // state key in the MetricDimensionKey is not the current state key. const HashableDimensionKey& dimensionInWhatKey = it->first.getDimensionKeyInWhat(); const auto& currentDimInfoItr = mDimInfos.find(dimensionInWhatKey); if ((currentDimInfoItr != mDimInfos.end()) && (it->first.getStateValuesKey() == currentDimInfoItr->second.currentState)) { obsolete = false; } if (obsolete) { it = mCurrentSlicedBucket.erase(it); } else { it++; } } } for (auto it = mDimInfos.begin(); it != mDimInfos.end();) { if (!it->second.seenNewData) { it = mDimInfos.erase(it); } else { it->second.seenNewData = false; it++; } } mCurrentBucketIsSkipped = false; mCurrentSkippedBucket.reset(); mCurrentBucketStartTimeNs = nextBucketStartTimeNs; // Reset mHasHitGuardrail boolean since bucket was reset mHasHitGuardrail = false; VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId, (long long)mCurrentBucketStartTimeNs); } // Explicit template instantiations template class ValueMetricProducer>>; template class ValueMetricProducer, Empty>; } // namespace statsd } // namespace os } // namespace android