···
189
189
190
190
log::trace!("rw: iterating newer rw items...");
191
191
192
192
+
for (i, pair) in partition.range(range.clone()).enumerate() {
193
193
+
log::trace!("rw: iterating {i}");
194
194
+
any_tasks_found = true;
192
195
193
193
-
//// ITER
196
196
+
if i >= MAX_BATCHED_RW_EVENTS {
197
197
+
break;
198
198
+
}
194
199
195
195
-
{
196
196
-
let iterator = partition.range(range.clone()).enumerate();
197
197
-
198
198
-
for (i, pair) in iterator {
199
199
-
log::trace!("rw: iterating {i}");
200
200
-
any_tasks_found = true;
201
201
-
202
202
-
if i >= MAX_BATCHED_RW_EVENTS {
203
203
-
break;
200
200
+
let (key_bytes, val_bytes) = pair?;
201
201
+
let mod_key = match db_complete::<ModQueueItemKey>(&key_bytes) {
202
202
+
Ok(k) => k,
203
203
+
Err(EncodingError::WrongStaticPrefix(_, _)) => {
204
204
+
panic!("wsp: mod queue empty.");
204
205
}
205
205
-
206
206
-
let (key_bytes, val_bytes) = pair?;
207
207
-
let mod_key = match db_complete::<ModQueueItemKey>(&key_bytes) {
208
208
-
Ok(k) => k,
209
209
-
Err(EncodingError::WrongStaticPrefix(_, _)) => {
210
210
-
panic!("wsp: mod queue empty.");
211
211
-
}
212
212
-
otherwise => otherwise?,
213
213
-
};
206
206
+
otherwise => otherwise?,
207
207
+
};
214
208
215
215
-
let mod_value: ModQueueItemValue =
216
216
-
db_complete::<ModQueueItemStringValue>(&val_bytes)?.try_into()?;
209
209
+
let mod_value: ModQueueItemValue =
210
210
+
db_complete::<ModQueueItemStringValue>(&val_bytes)?.try_into()?;
217
211
218
218
-
log::trace!("rw: iterating {i}: sending to batcher {mod_key:?} => {mod_value:?}");
219
219
-
batched_rw_items += DBWriter {
220
220
-
keyspace: keyspace.clone(),
221
221
-
partition: partition.clone(),
222
222
-
}
223
223
-
.write_rw(&mut db_batch, mod_key, mod_value)?;
224
224
-
log::trace!("rw: iterating {i}: back from batcher.");
212
212
+
log::trace!("rw: iterating {i}: sending to batcher {mod_key:?} => {mod_value:?}");
213
213
+
batched_rw_items += DBWriter {
214
214
+
keyspace: keyspace.clone(),
215
215
+
partition: partition.clone(),
216
216
+
}
217
217
+
.write_rw(&mut db_batch, mod_key, mod_value)?;
218
218
+
log::trace!("rw: iterating {i}: back from batcher.");
225
219
226
226
-
if batched_rw_items >= MAX_BATCHED_RW_ITEMS {
227
227
-
log::trace!("rw: iterating {i}: batch big enough, breaking out.");
228
228
-
break;
229
229
-
}
220
220
+
if batched_rw_items >= MAX_BATCHED_RW_ITEMS {
221
221
+
log::trace!("rw: iterating {i}: batch big enough, breaking out.");
222
222
+
break;
230
223
}
231
231
-
// drop(iterator); // moved -- must be dropped hopefully
232
224
}
233
225
234
226
if !any_tasks_found {
···
604
596
for pair in self.partition.prefix(&key_prefix_bytes) {
605
597
let (key_bytes, _) = pair?;
606
598
607
607
-
let (_, collection, _rkey, found_cursor) =
608
608
-
db_complete::<ByIdKey>(&key_bytes)?.into();
599
599
+
let (_, collection, _rkey, found_cursor) = db_complete::<ByIdKey>(&key_bytes)?.into();
609
600
if found_cursor > cursor {
610
601
log::trace!(
611
602
"delete account: found (and ignoring) newer records than the delete event??"