···
4
4
use std::collections::HashMap;
5
5
use std::path::PathBuf;
6
6
7
7
+
use tokio_util::sync::CancellationToken;
8
8
+
7
9
use constellation::storage::rocks_store::{
8
10
Collection, DidId, RKey, RPath, Target, TargetKey, TargetLinkers, _bincode_opts,
9
11
};
···
108
110
eprintln!("rocks ready.");
109
111
110
112
let RocksStorage { ref db, .. } = rocks;
113
113
+
114
114
+
let stay_alive = CancellationToken::new();
115
115
+
ctrlc::set_handler({
116
116
+
let mut desperation: u8 = 0;
117
117
+
let stay_alive = stay_alive.clone();
118
118
+
move || match desperation {
119
119
+
0 => {
120
120
+
eprintln!("ok, shutting down...");
121
121
+
stay_alive.cancel();
122
122
+
desperation += 1;
123
123
+
}
124
124
+
1.. => panic!("fine, panicking!"),
125
125
+
}
126
126
+
})
127
127
+
.unwrap();
111
128
112
129
let mut stats = Stats::new();
113
130
let mut err_stats: ErrStats = Default::default();
···
121
138
122
139
let mut i = 0;
123
140
for item in db.iterator_cf(&target_id_cf, IteratorMode::Start) {
141
141
+
if stay_alive.is_cancelled() {
142
142
+
break;
143
143
+
}
144
144
+
124
145
if let Some(ref limiter) = limit {
125
146
if let Err(dur) = limiter.try_wait() {
126
147
std::thread::sleep(dur)
···
205
226
}
206
227
}
207
228
208
208
-
if i >= 40_000 {
209
209
-
break;
210
210
-
}
229
229
+
// if i >= 40_000 {
230
230
+
// break;
231
231
+
// }
211
232
}
212
233
213
234
eprintln!(
214
214
-
"FINISHED summarizing {} link targets in {:.1}s",
235
235
+
"{} summarizing {} link targets in {:.1}s",
236
236
+
if stay_alive.is_cancelled() {
237
237
+
"STOPPED"
238
238
+
} else {
239
239
+
"FINISHED"
240
240
+
},
215
241
thousands(i),
216
242
t0.elapsed().as_secs_f32()
217
243
);