Experiment to rebuild Diffuse using web applets.
1import * as Uint8 from "uint8arrays";
2import * as Comlink from "comlink";
3import { xxh32 } from "xxh32";
4import { getTransferables } from "@okikio/transferables";
5
6import type { Track } from "@applets/core/types";
7import type { DiffuseApplet } from "./applet/common";
8
9// export { SharedWorkerPolyfill as SharedWorker } from "@okikio/sharedworker";
10export const SharedWorker = globalThis.SharedWorker;
11
12////////////////////////////////////////////
13// 🌳
14////////////////////////////////////////////
15
16export type WorkerTasks = {
17 _listen: ReturnType<typeof _listen>;
18 _manage: ReturnType<typeof _manage>;
19};
20
21////////////////////////////////////////////
22// 🛠️
23////////////////////////////////////////////
24
25export function arrayShuffle<T>(array: Array<T>): Array<T> {
26 if (array.length === 0) {
27 return [];
28 }
29
30 array = [...array];
31
32 for (let index = array.length - 1; index > 0; index--) {
33 const randArr = crypto.getRandomValues(new Uint32Array(1));
34 const randVal = randArr[0] / 2 ** 32;
35 const newIndex = Math.floor(randVal * (index + 1));
36 [array[index], array[newIndex]] = [array[newIndex], array[index]];
37 }
38
39 return array;
40}
41
42export function cleanUndefinedValuesForTracks(tracks: Track[]): Track[] {
43 return tracks.map((track) => {
44 const t = { ...track };
45
46 if (t.tags) {
47 if ("album" in t.tags && t.tags.album === undefined) delete t.tags.album;
48 if ("artist" in t.tags && t.tags.artist === undefined) delete t.tags.artist;
49 if ("genre" in t.tags && t.tags.genre === undefined) delete t.tags.genre;
50 if ("year" in t.tags && t.tags.year === undefined) delete t.tags.year;
51
52 if ("of" in t.tags.disc && t.tags.disc.of === undefined) delete t.tags.disc.of;
53 if ("of" in t.tags.track && t.tags.track.of === undefined) delete t.tags.track.of;
54 }
55
56 return t;
57 });
58}
59
60export function comparable(value: unknown) {
61 return xxh32(JSON.stringify(value));
62}
63
64export function endpoint<T extends Record<string, any> = WorkerTasks>(ini: Comlink.Endpoint) {
65 const e = Comlink.wrap<T>(ini);
66 if ("start" in ini && typeof ini.start === "function") ini.start();
67 return e;
68}
69
70export function expose<A extends Record<string, any>>(
71 tasks: A,
72 opts?: {
73 ports?: {
74 applets: MessagePort[];
75 consumers: MessagePort[];
76 };
77 },
78): A {
79 if (globalThis.SharedWorkerGlobalScope && self instanceof SharedWorkerGlobalScope) {
80 self.onconnect = (event: MessageEvent) => {
81 const port = event.ports[0];
82 opts?.ports?.applets?.push(port);
83 Comlink.expose(tasks, port);
84 port.start();
85 };
86
87 (self as any).connected = true;
88 } else {
89 Comlink.expose(tasks, self);
90 }
91
92 return tasks;
93}
94
95export function groupTracksPerScheme(
96 tracks: Track[],
97 initial: Record<string, Track[]> = {},
98): Record<string, Track[]> {
99 return tracks.reduce((acc: Record<string, Track[]>, track: Track) => {
100 const scheme = track.uri.split(":", 1)[0];
101 return { ...acc, [scheme]: [...(acc[scheme] || []), track] };
102 }, initial);
103}
104
105export function inIframe() {
106 return window.self !== window.top;
107}
108
109export function initialConnections<C extends Record<string, any>>(ids: string[]) {
110 const connections: Record<string, PromiseWithResolvers<Comlink.Remote<C>>> = {};
111
112 ids.forEach((c) => {
113 connections[c] = Promise.withResolvers<Comlink.Remote<C>>();
114 });
115
116 return connections;
117}
118
119export function isPrimitive(test: unknown) {
120 return test !== Object(test);
121}
122
123export function jsonDecode<T>(a: any): T {
124 return JSON.parse(new TextDecoder().decode(a));
125}
126
127export function jsonEncode<T>(a: T): Uint8Array {
128 return new TextEncoder().encode(JSON.stringify(a));
129}
130
131export function postMessages<D, T>({
132 data,
133 ports,
134 transfer,
135}: {
136 data: D;
137 ports: MessagePort[];
138 transfer?: Transferable[];
139}) {
140 ports.forEach((port) => {
141 port.postMessage(data, transfer ?? []);
142 });
143}
144
145export function provide<
146 C extends Record<string, any>,
147 A extends Record<string, any>,
148 T extends Record<string, any>,
149>({
150 actions,
151 connections,
152 tasks,
153}: {
154 actions?: A;
155 connections?: Record<string, PromiseWithResolvers<Comlink.Remote<C>>>;
156 tasks?: T;
157}) {
158 const portsHolder = {
159 applets: [] as MessagePort[],
160 consumers: [] as MessagePort[],
161 };
162
163 const allTasks = expose<WorkerTasks & T>(
164 {
165 _listen: _listen<A>(actions || ({} as A), portsHolder),
166 _manage: _manage<C>(connections || {}),
167 ...(tasks || ({} as T)),
168 },
169 {
170 ports: portsHolder,
171 },
172 );
173
174 return {
175 connections: connections || ({} as Record<string, PromiseWithResolvers<Comlink.Remote<C>>>),
176 ports: portsHolder,
177 tasks: allTasks,
178 };
179}
180
181export function sync<DataType = unknown>(
182 context: DiffuseApplet<DataType>,
183 port: MessagePort | Worker,
184 options: { groupId?: string } = {},
185) {
186 port.onmessage = (event) => {
187 if (
188 event.data?.type === "data" &&
189 (options.groupId ? event.data?.groupId === options.groupId : true)
190 ) {
191 context.data = event.data.data;
192 }
193 };
194}
195
196export async function trackArtworkCacheId(track: Track): Promise<string> {
197 return await crypto.subtle
198 .digest("SHA-256", new TextEncoder().encode(track.uri))
199 .then((a) => Uint8.toString(new Uint8Array(a), "base64url"));
200}
201
202export function transfer<T = unknown>(a: T) {
203 const b = getTransferables(a);
204 return Comlink.transfer(a, b);
205}
206
207// PRIVATE
208
209function _listen<A extends Record<string, any>>(
210 actions: A,
211 portsHolder: {
212 applets: MessagePort[];
213 consumers: MessagePort[];
214 },
215) {
216 async function handleAction(
217 port: MessagePort,
218 action: {
219 type: "action";
220 id: string;
221 actionId: string;
222 arguments: any;
223 },
224 ) {
225 const result = await actions[action.actionId]?.(action.arguments);
226 return postMessage(port, action.id, result);
227 }
228
229 function postMessage<T>(port: MessagePort, id: string, result: T) {
230 port.postMessage(
231 {
232 type: "actioncomplete",
233 id,
234 result,
235 },
236 {
237 transfer: getTransferables(result),
238 },
239 );
240 }
241
242 return (port: MessagePort) => {
243 Comlink.expose(actions, port);
244 portsHolder.consumers.push(port);
245
246 port.onmessage = async (message) => {
247 switch (message.data?.type) {
248 case "action":
249 return handleAction(port, message.data);
250 }
251 };
252 };
253}
254
255function _manage<C extends Record<string, any>>(
256 connections: Record<string, PromiseWithResolvers<Comlink.Remote<C>>>,
257) {
258 return (connectionId: string, workerPort: MessagePort) => {
259 let conn = connections[connectionId];
260 const remote = endpoint<C>(workerPort);
261
262 if (!conn) {
263 connections[connectionId] = Promise.withResolvers<Comlink.Remote<C>>();
264 conn = connections[connectionId];
265 }
266
267 conn.resolve(remote);
268 };
269}