alpha
Login
or
Join now
microcosm.blue
/
microcosm-rs
Star
0
Fork
3
Atom
Configure Feed
Issues
Pull Requests
Commits
Tags
Feed URL
Select the types of activity you want to include in your feed.
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
Star
0
Fork
3
Atom
Configure Feed
Issues
Pull Requests
Commits
Tags
Feed URL
Select the types of activity you want to include in your feed.
Overview
Issues
Pulls
Pipelines
consolidate filter into just an arg
author
phil
date
1 year ago
(Apr 25, 2025, 4:07 PM -0400)
commit
56662c08
56662c081df2e6b0c921a739f58f359d6b80000b
parent
0510778d
0510778dad5b51959294c87b3e8ef7ed4ba6d3ef
+94
-166
4 changed files
Expand all
Collapse all
Unified
Split
constellation
src
server
mod.rs
storage
mem_store.rs
mod.rs
rocks_store.rs
+8
-1
constellation/src/server/mod.rs
Reviewed
···
271
271
}
272
272
273
273
let paged = store
274
274
-
.get_links(&query.target, &query.collection, &query.path, limit, until)
274
274
+
.get_links(
275
275
+
&query.target,
276
276
+
&query.collection,
277
277
+
&query.path,
278
278
+
limit,
279
279
+
until,
280
280
+
None,
281
281
+
)
275
282
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
276
283
277
284
let cursor = paged.next.map(|next| {
+15
-59
constellation/src/storage/mem_store.rs
Reviewed
···
166
166
path: &str,
167
167
limit: u64,
168
168
until: Option<u64>,
169
169
+
filter_dids: Option<&HashSet<Did>>,
169
170
) -> Result<PagedAppendingCollection<RecordId>> {
170
171
let data = self.0.lock().unwrap();
171
172
let Some(paths) = data.targets.get(&Target::new(target)) else {
···
181
182
items: Vec::new(),
182
183
next: None,
183
184
});
185
185
+
};
186
186
+
187
187
+
let did_rkeys: Vec<_> = if let Some(dids) = filter_dids {
188
188
+
did_rkeys
189
189
+
.into_iter()
190
190
+
.cloned()
191
191
+
.filter(|m| {
192
192
+
m.clone()
193
193
+
.map(|(did, _)| dids.contains(&did))
194
194
+
.unwrap_or(false)
195
195
+
})
196
196
+
.collect()
197
197
+
} else {
198
198
+
did_rkeys.to_vec()
184
199
};
185
200
186
201
let total = did_rkeys.len();
···
269
284
.flatten()
270
285
.filter(|did| *data.dids.get(did).expect("did must be in dids"))
271
286
.cloned()
272
272
-
.collect();
273
273
-
274
274
-
Ok(PagedAppendingCollection {
275
275
-
version: (total as u64, gone as u64),
276
276
-
items,
277
277
-
next,
278
278
-
})
279
279
-
}
280
280
-
281
281
-
fn get_links_from_dids(
282
282
-
&self,
283
283
-
target: &str,
284
284
-
collection: &str,
285
285
-
path: &str,
286
286
-
limit: u64,
287
287
-
until: Option<u64>,
288
288
-
dids: &HashSet<Did>,
289
289
-
) -> Result<PagedAppendingCollection<RecordId>> {
290
290
-
let data = self.0.lock().unwrap();
291
291
-
let Some(paths) = data.targets.get(&Target::new(target)) else {
292
292
-
return Ok(PagedAppendingCollection {
293
293
-
version: (0, 0),
294
294
-
items: Vec::new(),
295
295
-
next: None,
296
296
-
});
297
297
-
};
298
298
-
let Some(did_rkeys) = paths.get(&Source::new(collection, path)) else {
299
299
-
return Ok(PagedAppendingCollection {
300
300
-
version: (0, 0),
301
301
-
items: Vec::new(),
302
302
-
next: None,
303
303
-
});
304
304
-
};
305
305
-
306
306
-
let did_rkeys: Vec<_> = did_rkeys
307
307
-
.into_iter()
308
308
-
.flatten()
309
309
-
.filter(|(did, _)| dids.contains(did))
310
310
-
.collect();
311
311
-
312
312
-
let total = did_rkeys.len();
313
313
-
let end = until
314
314
-
.map(|u| std::cmp::min(u as usize, total))
315
315
-
.unwrap_or(total);
316
316
-
let begin = end.saturating_sub(limit as usize);
317
317
-
let next = if begin == 0 { None } else { Some(begin as u64) };
318
318
-
319
319
-
let alive = did_rkeys.iter().count();
320
320
-
let gone = total - alive;
321
321
-
322
322
-
let items: Vec<_> = did_rkeys[begin..end]
323
323
-
.iter()
324
324
-
.rev()
325
325
-
.filter(|(did, _)| *data.dids.get(did).expect("did must be in dids"))
326
326
-
.map(|(did, rkey)| RecordId {
327
327
-
did: did.clone(),
328
328
-
rkey: rkey.0.clone(),
329
329
-
collection: collection.to_string(),
330
330
-
})
331
287
.collect();
332
288
333
289
Ok(PagedAppendingCollection {
+70
-41
constellation/src/storage/mod.rs
Reviewed
···
58
58
path: &str,
59
59
limit: u64,
60
60
until: Option<u64>,
61
61
+
filter_dids: Option<&HashSet<Did>>,
61
62
) -> Result<PagedAppendingCollection<RecordId>>;
62
63
63
64
fn get_distinct_dids(
···
68
69
limit: u64,
69
70
until: Option<u64>,
70
71
) -> Result<PagedAppendingCollection<Did>>; // TODO: reflect dedups in cursor
71
71
-
72
72
-
fn get_links_from_dids(
73
73
-
&self,
74
74
-
target: &str,
75
75
-
collection: &str,
76
76
-
path: &str,
77
77
-
limit: u64,
78
78
-
until: Option<u64>,
79
79
-
dids: &HashSet<Did>,
80
80
-
) -> Result<PagedAppendingCollection<RecordId>>;
81
72
82
73
fn get_all_record_counts(&self, _target: &str)
83
74
-> Result<HashMap<String, HashMap<String, u64>>>;
···
154
145
);
155
146
assert_eq!(storage.get_distinct_did_count("", "", "")?, 0);
156
147
assert_eq!(
157
157
-
storage.get_links("a.com", "app.t.c", ".abc.uri", 100, None)?,
148
148
+
storage.get_links("a.com", "app.t.c", ".abc.uri", 100, None, None)?,
158
149
PagedAppendingCollection {
159
150
version: (0, 0),
160
151
items: vec![],
···
648
639
0,
649
640
)?;
650
641
assert_eq!(
651
651
-
storage.get_links("a.com", "app.t.c", ".abc.uri", 100, None)?,
642
642
+
storage.get_links("a.com", "app.t.c", ".abc.uri", 100, None, None)?,
652
643
PagedAppendingCollection {
653
644
version: (1, 0),
654
645
items: vec![RecordId {
···
687
678
0,
688
679
)?;
689
680
}
690
690
-
let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None)?;
681
681
+
let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, None)?;
691
682
let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, None)?;
692
683
assert_eq!(
693
684
links,
···
716
707
next: Some(3),
717
708
}
718
709
);
719
719
-
let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, links.next)?;
710
710
+
let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, links.next, None)?;
720
711
let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, dids.next)?;
721
712
assert_eq!(
722
713
links,
···
745
736
next: Some(1),
746
737
}
747
738
);
748
748
-
let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, links.next)?;
739
739
+
let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, links.next, None)?;
749
740
let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, dids.next)?;
750
741
assert_eq!(
751
742
links,
···
771
762
});
772
763
773
764
test_each_storage!(get_filtered_links, |storage| {
774
774
-
let links = storage.get_links_from_dids("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::from([Did("did:plc:linker".to_string())]))?;
765
765
+
let links = storage.get_links(
766
766
+
"a.com",
767
767
+
"app.t.c",
768
768
+
".abc.uri",
769
769
+
2,
770
770
+
None,
771
771
+
Some(&HashSet::from([Did("did:plc:linker".to_string())])),
772
772
+
)?;
775
773
assert_eq!(
776
774
links,
777
775
PagedAppendingCollection {
···
796
794
0,
797
795
)?;
798
796
799
799
-
let links = storage.get_links_from_dids("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::from([Did("did:plc:linker".to_string())]))?;
797
797
+
let links = storage.get_links(
798
798
+
"a.com",
799
799
+
"app.t.c",
800
800
+
".abc.uri",
801
801
+
2,
802
802
+
None,
803
803
+
Some(&HashSet::from([Did("did:plc:linker".to_string())])),
804
804
+
)?;
800
805
assert_eq!(
801
806
links,
802
807
PagedAppendingCollection {
803
808
version: (1, 0),
804
804
-
items: vec![
805
805
-
RecordId {
806
806
-
did: "did:plc:linker".into(),
807
807
-
collection: "app.t.c".into(),
808
808
-
rkey: "asdf".into(),
809
809
-
},
810
810
-
],
809
809
+
items: vec![RecordId {
810
810
+
did: "did:plc:linker".into(),
811
811
+
collection: "app.t.c".into(),
812
812
+
rkey: "asdf".into(),
813
813
+
},],
811
814
next: None,
812
815
}
813
816
);
814
817
815
815
-
let links = storage.get_links_from_dids("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::from([Did("did:plc:someone-else".to_string())]))?;
818
818
+
let links = storage.get_links(
819
819
+
"a.com",
820
820
+
"app.t.c",
821
821
+
".abc.uri",
822
822
+
2,
823
823
+
None,
824
824
+
Some(&HashSet::from([Did("did:plc:someone-else".to_string())])),
825
825
+
)?;
816
826
assert_eq!(
817
827
links,
818
828
PagedAppendingCollection {
···
851
861
0,
852
862
)?;
853
863
854
854
-
let links = storage.get_links_from_dids("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::from([Did("did:plc:linker".to_string())]))?;
864
864
+
let links = storage.get_links(
865
865
+
"a.com",
866
866
+
"app.t.c",
867
867
+
".abc.uri",
868
868
+
2,
869
869
+
None,
870
870
+
Some(&HashSet::from([Did("did:plc:linker".to_string())])),
871
871
+
)?;
855
872
assert_eq!(
856
873
links,
857
874
PagedAppendingCollection {
···
872
889
}
873
890
);
874
891
875
875
-
let links = storage.get_links_from_dids("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::from([
876
876
-
Did("did:plc:linker".to_string()),
877
877
-
Did("did:plc:someone-else".to_string()),
878
878
-
]))?;
892
892
+
let links = storage.get_links(
893
893
+
"a.com",
894
894
+
"app.t.c",
895
895
+
".abc.uri",
896
896
+
2,
897
897
+
None,
898
898
+
Some(&HashSet::from([
899
899
+
Did("did:plc:linker".to_string()),
900
900
+
Did("did:plc:someone-else".to_string()),
901
901
+
])),
902
902
+
)?;
879
903
assert_eq!(
880
904
links,
881
905
PagedAppendingCollection {
···
896
920
}
897
921
);
898
922
899
899
-
let links = storage.get_links_from_dids("a.com", "app.t.c", ".abc.uri", 2, None, &HashSet::from([
900
900
-
Did("did:plc:someone-unknown".to_string()),
901
901
-
]))?;
923
923
+
let links = storage.get_links(
924
924
+
"a.com",
925
925
+
"app.t.c",
926
926
+
".abc.uri",
927
927
+
2,
928
928
+
None,
929
929
+
Some(&HashSet::from([Did("did:plc:someone-unknown".to_string())])),
930
930
+
)?;
902
931
assert_eq!(
903
932
links,
904
933
PagedAppendingCollection {
···
926
955
0,
927
956
)?;
928
957
}
929
929
-
let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None)?;
958
958
+
let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, None)?;
930
959
assert_eq!(
931
960
links,
932
961
PagedAppendingCollection {
···
946
975
next: Some(2),
947
976
}
948
977
);
949
949
-
let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, links.next)?;
978
978
+
let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, links.next, None)?;
950
979
assert_eq!(
951
980
links,
952
981
PagedAppendingCollection {
···
986
1015
0,
987
1016
)?;
988
1017
}
989
989
-
let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None)?;
1018
1018
+
let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, None)?;
990
1019
assert_eq!(
991
1020
links,
992
1021
PagedAppendingCollection {
···
1020
1049
},
1021
1050
0,
1022
1051
)?;
1023
1023
-
let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, links.next)?;
1052
1052
+
let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, links.next, None)?;
1024
1053
assert_eq!(
1025
1054
links,
1026
1055
PagedAppendingCollection {
···
1060
1089
0,
1061
1090
)?;
1062
1091
}
1063
1063
-
let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None)?;
1092
1092
+
let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, None)?;
1064
1093
assert_eq!(
1065
1094
links,
1066
1095
PagedAppendingCollection {
···
1088
1117
}),
1089
1118
0,
1090
1119
)?;
1091
1091
-
let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, links.next)?;
1120
1120
+
let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, links.next, None)?;
1092
1121
assert_eq!(
1093
1122
links,
1094
1123
PagedAppendingCollection {
···
1121
1150
0,
1122
1151
)?;
1123
1152
}
1124
1124
-
let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None)?;
1153
1153
+
let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None, None)?;
1125
1154
assert_eq!(
1126
1155
links,
1127
1156
PagedAppendingCollection {
···
1145
1174
&ActionableEvent::DeactivateAccount("did:plc:asdf-1".into()),
1146
1175
0,
1147
1176
)?;
1148
1148
-
let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, links.next)?;
1177
1177
+
let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, links.next, None)?;
1149
1178
assert_eq!(
1150
1179
links,
1151
1180
PagedAppendingCollection {
+1
-65
constellation/src/storage/rocks_store.rs
Reviewed
···
860
860
path: &str,
861
861
limit: u64,
862
862
until: Option<u64>,
863
863
+
filter_dids: Option<&HashSet<Did>>,
863
864
) -> Result<PagedAppendingCollection<RecordId>> {
864
865
let target_key = TargetKey(
865
866
Target(target.to_string()),
···
965
966
continue;
966
967
}
967
968
items.push(did);
968
968
-
} else {
969
969
-
eprintln!("failed to look up did from did_id {did_id:?}");
970
970
-
}
971
971
-
}
972
972
-
973
973
-
Ok(PagedAppendingCollection {
974
974
-
version: (total, gone),
975
975
-
items,
976
976
-
next,
977
977
-
})
978
978
-
}
979
979
-
980
980
-
fn get_links_from_dids(
981
981
-
&self,
982
982
-
target: &str,
983
983
-
collection: &str,
984
984
-
path: &str,
985
985
-
limit: u64,
986
986
-
until: Option<u64>,
987
987
-
dids: &HashSet<Did>,
988
988
-
) -> Result<PagedAppendingCollection<RecordId>> {
989
989
-
let target_key = TargetKey(
990
990
-
Target(target.to_string()),
991
991
-
Collection(collection.to_string()),
992
992
-
RPath(path.to_string()),
993
993
-
);
994
994
-
995
995
-
let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else {
996
996
-
return Ok(PagedAppendingCollection {
997
997
-
version: (0, 0),
998
998
-
items: Vec::new(),
999
999
-
next: None,
1000
1000
-
});
1001
1001
-
};
1002
1002
-
1003
1003
-
let linkers = self.get_target_linkers(&target_id)?;
1004
1004
-
1005
1005
-
let (alive, gone) = linkers.count();
1006
1006
-
let total = alive + gone;
1007
1007
-
let end = until.map(|u| std::cmp::min(u, total)).unwrap_or(total) as usize;
1008
1008
-
let begin = end.saturating_sub(limit as usize);
1009
1009
-
let next = if begin == 0 { None } else { Some(begin as u64) };
1010
1010
-
1011
1011
-
let did_id_rkeys = linkers.0[begin..end].iter().rev().collect::<Vec<_>>();
1012
1012
-
1013
1013
-
let mut items = Vec::with_capacity(did_id_rkeys.len());
1014
1014
-
// TODO: use get-many (or multi-get or whatever it's called)
1015
1015
-
for (did_id, rkey) in did_id_rkeys {
1016
1016
-
if did_id.is_empty() {
1017
1017
-
continue;
1018
1018
-
}
1019
1019
-
if let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? {
1020
1020
-
let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)?
1021
1021
-
else {
1022
1022
-
eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?");
1023
1023
-
continue;
1024
1024
-
};
1025
1025
-
if !active {
1026
1026
-
continue;
1027
1027
-
}
1028
1028
-
items.push(RecordId {
1029
1029
-
did,
1030
1030
-
collection: collection.to_string(),
1031
1031
-
rkey: rkey.0.clone(),
1032
1032
-
});
1033
969
} else {
1034
970
eprintln!("failed to look up did from did_id {did_id:?}");
1035
971
}