···159159 // holds up all consumer progress until it can send to the channel
160160 // use this when the current batch is too full to add more to it
161161 async fn send_current_batch_now(&mut self, small: bool) -> anyhow::Result<()> {
162162+ let beginning = match self.current_batch.initial_cursor.map(|c| c.elapsed()) {
163163+ None => "unknown".to_string(),
164164+ Some(Ok(t)) => format!("{:?}", t),
165165+ Some(Err(e)) => format!("+{:?}", e.duration()),
166166+ };
162167 log::info!(
163163- "attempting to send batch now (small? {small}, capacity: {})",
164164- self.batch_sender.capacity()
168168+ "sending batch now from {beginning}, {}, queue capacity: {}",
169169+ if small { "small" } else { "full" },
170170+ self.batch_sender.capacity(),
165171 );
166172 let current = mem::take(&mut self.current_batch);
167173 self.batch_sender
+47
ufos/src/lib.rs
···227227 nsid_child_segments: HashMap<String, TopCollections>,
228228}
229229230230+// this is not safe from ~DOS
231231+// todo: remove this and just iterate the all-time rollups to get nsids? (or recent rollups?)
232232+impl From<TopCollections> for Vec<String> {
233233+ fn from(tc: TopCollections) -> Self {
234234+ let mut me = vec![];
235235+ for (segment, children) in tc.nsid_child_segments {
236236+ let child_segments: Self = children.into();
237237+ if child_segments.is_empty() {
238238+ me.push(segment);
239239+ } else {
240240+ for ch in child_segments {
241241+ let nsid = format!("{segment}.{ch}");
242242+ me.push(nsid);
243243+ }
244244+ }
245245+ }
246246+ me
247247+ }
248248+}
249249+230250#[cfg(test)]
231251mod tests {
232252 use super::*;
253253+254254+ #[test]
255255+ fn test_top_collections_to_nsids() {
256256+ let empty_tc = TopCollections::default();
257257+ assert_eq!(Into::<Vec<String>>::into(empty_tc), Vec::<String>::new());
258258+259259+ let tc = TopCollections {
260260+ nsid_child_segments: HashMap::from([
261261+ (
262262+ "a".to_string(),
263263+ TopCollections {
264264+ nsid_child_segments: HashMap::from([
265265+ ("b".to_string(), TopCollections::default()),
266266+ ("c".to_string(), TopCollections::default()),
267267+ ]),
268268+ ..Default::default()
269269+ },
270270+ ),
271271+ ("z".to_string(), TopCollections::default()),
272272+ ]),
273273+ ..Default::default()
274274+ };
275275+276276+ let mut nsids: Vec<String> = tc.into();
277277+ nsids.sort();
278278+ assert_eq!(nsids, ["a.b", "a.c", "z"]);
279279+ }
233280234281 #[test]
235282 fn test_truncating_insert_truncates() -> anyhow::Result<()> {
+46-26
ufos/src/server.rs
···6161 consumer,
6262 })
6363}
6464+fn to_multiple_nsids(s: &str) -> Result<Vec<Nsid>, String> {
6565+ let mut out = Vec::new();
6666+ for collection in s.split(',') {
6767+ let Ok(nsid) = Nsid::new(collection.to_string()) else {
6868+ return Err(format!("collection {collection:?} was not a valid NSID"));
6969+ };
7070+ out.push(nsid);
7171+ }
7272+ Ok(out)
7373+}
64746575#[derive(Debug, Deserialize, JsonSchema)]
6666-struct CollectionsQuery {
6767- collection: String, // JsonSchema not implemented for Nsid :(
6868-}
6969-impl CollectionsQuery {
7070- fn to_multiple_nsids(&self) -> Result<Vec<Nsid>, String> {
7171- let mut out = Vec::with_capacity(self.collection.len());
7272- for collection in self.collection.split(',') {
7373- let Ok(nsid) = Nsid::new(collection.to_string()) else {
7474- return Err(format!("collection {collection:?} was not a valid NSID"));
7575- };
7676- out.push(nsid);
7777- }
7878- Ok(out)
7979- }
7676+struct RecordsCollectionsQuery {
7777+ collection: Option<String>, // JsonSchema not implemented for Nsid :(
8078}
8179#[derive(Debug, Serialize, JsonSchema)]
8280struct ApiRecord {
···105103 method = GET,
106104 path = "/records",
107105}]
108108-async fn get_records_by_collection(
106106+async fn get_records_by_collections(
109107 ctx: RequestContext<Context>,
110110- collection_query: Query<CollectionsQuery>,
108108+ collection_query: Query<RecordsCollectionsQuery>,
111109) -> OkCorsResponse<Vec<ApiRecord>> {
112110 let Context { storage, .. } = ctx.context();
111111+ let mut limit = 42;
112112+ let query = collection_query.into_inner();
113113+ let collections = if let Some(provided_collection) = query.collection {
114114+ to_multiple_nsids(&provided_collection)
115115+ .map_err(|reason| HttpError::for_bad_request(None, reason))?
116116+ } else {
117117+ let all_collections_should_be_nsids: Vec<String> = storage
118118+ .get_top_collections()
119119+ .await
120120+ .map_err(|e| {
121121+ HttpError::for_internal_error(format!("failed to get top collections: {e:?}"))
122122+ })?
123123+ .into();
124124+ let mut all_collections = Vec::with_capacity(all_collections_should_be_nsids.len());
125125+ for raw_nsid in all_collections_should_be_nsids {
126126+ let nsid = Nsid::new(raw_nsid).map_err(|e| {
127127+ HttpError::for_internal_error(format!("failed to parse nsid: {e:?}"))
128128+ })?;
129129+ all_collections.push(nsid);
130130+ }
113131114114- let collections = collection_query
115115- .into_inner()
116116- .to_multiple_nsids()
117117- .map_err(|reason| HttpError::for_bad_request(None, reason))?;
132132+ limit = 12;
133133+ all_collections
134134+ };
118135119136 let records = storage
120120- .get_records_by_collections(&collections, 100)
137137+ .get_records_by_collections(&collections, limit, true)
121138 .await
122139 .map_err(|e| HttpError::for_internal_error(e.to_string()))?
123140 .into_iter()
···127144 ok_cors(records)
128145}
129146147147+#[derive(Debug, Deserialize, JsonSchema)]
148148+struct TotalSeenCollectionsQuery {
149149+ collection: String, // JsonSchema not implemented for Nsid :(
150150+}
130151#[derive(Debug, Serialize, JsonSchema)]
131152struct TotalCounts {
132153 total_records: u64,
···139160}]
140161async fn get_records_total_seen(
141162 ctx: RequestContext<Context>,
142142- collection_query: Query<CollectionsQuery>,
163163+ collection_query: Query<TotalSeenCollectionsQuery>,
143164) -> OkCorsResponse<HashMap<String, TotalCounts>> {
144165 let Context { storage, .. } = ctx.context();
145166146146- let collections = collection_query
147147- .into_inner()
148148- .to_multiple_nsids()
167167+ let query = collection_query.into_inner();
168168+ let collections = to_multiple_nsids(&query.collection)
149169 .map_err(|reason| HttpError::for_bad_request(None, reason))?;
150170151171 let mut seen_by_collection = HashMap::with_capacity(collections.len());
···194214195215 api.register(get_openapi).unwrap();
196216 api.register(get_meta_info).unwrap();
197197- api.register(get_records_by_collection).unwrap();
217217+ api.register(get_records_by_collections).unwrap();
198218 api.register(get_records_total_seen).unwrap();
199219 api.register(get_top_collections).unwrap();
200220