Skip to content

Commit f3bab98

Browse files
authoredFeb 5, 2024
[0.6] Aggregation: repeat same request on retries. (#2556)
This is implemented by copying the necessary report information into the report aggregations at time of aggregation job creation. This means that client-report GC will not change the request that is made. Over & above 0.7 version of this functionality, the 0.6 version retains the ability to work with aggregation jobs which include client report data in the `client_reports` table.
1 parent adc16f4 commit f3bab98

13 files changed

+1914
-1280
lines changed
 

‎aggregator/src/aggregator.rs

+19-15
Original file line numberDiff line numberDiff line change
@@ -1653,10 +1653,12 @@ impl VdafOps {
16531653
C: Clock,
16541654
A::AggregationParam: Send + Sync + PartialEq,
16551655
A::AggregateShare: Send + Sync,
1656+
A::InputShare: Send + Sync,
16561657
A::PrepareMessage: Send + Sync + PartialEq,
16571658
A::PrepareShare: Send + Sync + PartialEq,
16581659
for<'a> A::PrepareState:
16591660
Send + Sync + Encode + ParameterizedDecode<(&'a A, usize)> + PartialEq,
1661+
A::PublicShare: Send + Sync,
16601662
A::OutputShare: Send + Sync + PartialEq,
16611663
{
16621664
// unwrap safety: SHA-256 computed by ring should always be 32 bytes
@@ -1892,12 +1894,12 @@ impl VdafOps {
18921894
});
18931895

18941896
let (report_aggregation_state, prepare_step_result) = match init_rslt {
1895-
Ok((PingPongState::Continued(prep_state), outgoing_message)) => {
1897+
Ok((PingPongState::Continued(prepare_state), outgoing_message)) => {
18961898
// Helper is not finished. Await the next message from the Leader to advance to
18971899
// the next step.
18981900
saw_continue = true;
18991901
(
1900-
ReportAggregationState::WaitingHelper(prep_state),
1902+
ReportAggregationState::WaitingHelper { prepare_state },
19011903
PrepareStepResult::Continue {
19021904
message: outgoing_message,
19031905
},
@@ -1920,7 +1922,7 @@ impl VdafOps {
19201922
)
19211923
}
19221924
Err(prepare_error) => (
1923-
ReportAggregationState::Failed(prepare_error),
1925+
ReportAggregationState::Failed { prepare_error },
19241926
PrepareStepResult::Reject(prepare_error),
19251927
),
19261928
};
@@ -2026,9 +2028,9 @@ impl VdafOps {
20262028
rsd.report_aggregation = rsd
20272029
.report_aggregation
20282030
.clone()
2029-
.with_state(ReportAggregationState::Failed(
2030-
PrepareError::ReportReplayed,
2031-
))
2031+
.with_state(ReportAggregationState::Failed {
2032+
prepare_error: PrepareError::ReportReplayed,
2033+
})
20322034
.with_last_prep_resp(Some(PrepareResp::new(
20332035
*rsd.report_share.metadata().id(),
20342036
PrepareStepResult::Reject(PrepareError::ReportReplayed),
@@ -2037,9 +2039,9 @@ impl VdafOps {
20372039
rsd.report_aggregation = rsd
20382040
.report_aggregation
20392041
.clone()
2040-
.with_state(ReportAggregationState::Failed(
2041-
PrepareError::BatchCollected,
2042-
))
2042+
.with_state(ReportAggregationState::Failed {
2043+
prepare_error: PrepareError::BatchCollected,
2044+
})
20432045
.with_last_prep_resp(Some(PrepareResp::new(
20442046
*rsd.report_share.metadata().id(),
20452047
PrepareStepResult::Reject(PrepareError::BatchCollected),
@@ -2096,9 +2098,9 @@ impl VdafOps {
20962098
rsd.report_aggregation = rsd
20972099
.report_aggregation
20982100
.clone()
2099-
.with_state(ReportAggregationState::Failed(
2100-
PrepareError::BatchCollected,
2101-
))
2101+
.with_state(ReportAggregationState::Failed {
2102+
prepare_error: PrepareError::BatchCollected,
2103+
})
21022104
.with_last_prep_resp(Some(PrepareResp::new(
21032105
*rsd.report_share.metadata().id(),
21042106
PrepareStepResult::Reject(PrepareError::BatchCollected),
@@ -2118,9 +2120,9 @@ impl VdafOps {
21182120
rsd.report_aggregation = rsd
21192121
.report_aggregation
21202122
.clone()
2121-
.with_state(ReportAggregationState::Failed(
2122-
PrepareError::ReportReplayed,
2123-
))
2123+
.with_state(ReportAggregationState::Failed {
2124+
prepare_error: PrepareError::ReportReplayed,
2125+
})
21242126
.with_last_prep_resp(Some(PrepareResp::new(
21252127
*rsd.report_share.metadata().id(),
21262128
PrepareStepResult::Reject(
@@ -2204,9 +2206,11 @@ impl VdafOps {
22042206
A: 'static + Send + Sync,
22052207
A::AggregationParam: Send + Sync,
22062208
A::AggregateShare: Send + Sync,
2209+
A::InputShare: Send + Sync,
22072210
for<'a> A::PrepareState: Send + Sync + Encode + ParameterizedDecode<(&'a A, usize)>,
22082211
A::PrepareShare: Send + Sync,
22092212
A::PrepareMessage: Send + Sync,
2213+
A::PublicShare: Send + Sync,
22102214
A::OutputShare: Send + Sync,
22112215
{
22122216
if leader_aggregation_job.step() == AggregationJobStep::from(0) {

‎aggregator/src/aggregator/aggregation_job_continue.rs

+28-14
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,15 @@ impl VdafOps {
7171
if report_agg.report_id() != prep_step.report_id() {
7272
// This report was omitted by the leader because of a prior failure. Note that
7373
// the report was dropped (if it's not already in an error state) and continue.
74-
if matches!(report_agg.state(), ReportAggregationState::WaitingHelper(_)) {
74+
if matches!(
75+
report_agg.state(),
76+
ReportAggregationState::WaitingHelper { .. }
77+
) {
7578
*report_agg = report_agg
7679
.clone()
77-
.with_state(ReportAggregationState::Failed(PrepareError::ReportDropped))
80+
.with_state(ReportAggregationState::Failed {
81+
prepare_error: PrepareError::ReportDropped,
82+
})
7883
.with_last_prep_resp(None);
7984
}
8085
continue;
@@ -96,7 +101,9 @@ impl VdafOps {
96101
if !conflicting_aggregate_share_jobs.is_empty() {
97102
*report_aggregation = report_aggregation
98103
.clone()
99-
.with_state(ReportAggregationState::Failed(PrepareError::BatchCollected))
104+
.with_state(ReportAggregationState::Failed {
105+
prepare_error: PrepareError::BatchCollected,
106+
})
100107
.with_last_prep_resp(Some(PrepareResp::new(
101108
*prep_step.report_id(),
102109
PrepareStepResult::Reject(PrepareError::BatchCollected),
@@ -105,8 +112,8 @@ impl VdafOps {
105112
}
106113

107114
let prep_state = match report_aggregation.state() {
108-
ReportAggregationState::WaitingHelper(prep_state) => prep_state,
109-
ReportAggregationState::WaitingLeader(_) => {
115+
ReportAggregationState::WaitingHelper { prepare_state } => prepare_state,
116+
ReportAggregationState::WaitingLeader { .. } => {
110117
return Err(datastore::Error::User(
111118
Error::Internal(
112119
"helper encountered unexpected ReportAggregationState::WaitingLeader"
@@ -143,8 +150,8 @@ impl VdafOps {
143150
let (report_aggregation_state, output_share) = match new_state {
144151
// Helper did not finish. Store the new state and await the
145152
// next message from the Leader to advance preparation.
146-
PingPongState::Continued(prep_state) => (
147-
ReportAggregationState::WaitingHelper(prep_state),
153+
PingPongState::Continued(prepare_state) => (
154+
ReportAggregationState::WaitingHelper { prepare_state },
148155
None,
149156
),
150157
// Helper finished. Commit the output share.
@@ -179,7 +186,7 @@ impl VdafOps {
179186
})
180187
.unwrap_or_else(|prepare_error| {
181188
(
182-
ReportAggregationState::Failed(prepare_error),
189+
ReportAggregationState::Failed { prepare_error },
183190
PrepareStepResult::Reject(prepare_error),
184191
None,
185192
)
@@ -205,10 +212,15 @@ impl VdafOps {
205212
for report_agg in report_aggregations_iter {
206213
// This report was omitted by the leader because of a prior failure. Note that the
207214
// report was dropped (if it's not already in an error state) and continue.
208-
if matches!(report_agg.state(), ReportAggregationState::WaitingHelper(_)) {
215+
if matches!(
216+
report_agg.state(),
217+
ReportAggregationState::WaitingHelper { .. }
218+
) {
209219
*report_agg = report_agg
210220
.clone()
211-
.with_state(ReportAggregationState::Failed(PrepareError::ReportDropped))
221+
.with_state(ReportAggregationState::Failed {
222+
prepare_error: PrepareError::ReportDropped,
223+
})
212224
.with_last_prep_resp(None);
213225
}
214226
}
@@ -220,7 +232,9 @@ impl VdafOps {
220232
if unwritable_reports.contains(report_aggregation.report_id()) {
221233
*report_aggregation = report_aggregation
222234
.clone()
223-
.with_state(ReportAggregationState::Failed(PrepareError::BatchCollected))
235+
.with_state(ReportAggregationState::Failed {
236+
prepare_error: PrepareError::BatchCollected,
237+
})
224238
.with_last_prep_resp(Some(PrepareResp::new(
225239
*report_aggregation.report_id(),
226240
PrepareStepResult::Reject(PrepareError::BatchCollected),
@@ -502,11 +516,11 @@ mod tests {
502516
*prepare_init.report_share().metadata().time(),
503517
0,
504518
None,
505-
ReportAggregationState::WaitingHelper(
506-
transcript.helper_prepare_transitions[0]
519+
ReportAggregationState::WaitingHelper {
520+
prepare_state: transcript.helper_prepare_transitions[0]
507521
.prepare_state()
508522
.clone(),
509-
),
523+
},
510524
),
511525
)
512526
.await

0 commit comments

Comments
 (0)