Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 "crypto/rand"
6 "encoding/hex"
7 "errors"
8 "fmt"
9 "log/slog"
10 "net"
11 "net/http"
12 "strings"
13 "time"
14
15 "github.com/bluesky-social/indigo/service/tap"
16 "tangled.org/core/api/tangled"
17 "tangled.org/core/spindle/config"
18)
19
20func randomAdminPassword() (string, error) {
21 var b [32]byte
22 if _, err := rand.Read(b[:]); err != nil {
23 return "", fmt.Errorf("generate tap admin password: %w", err)
24 }
25 return hex.EncodeToString(b[:]), nil
26}
27
28func assertLoopbackBind(bind string) error {
29 host, _, err := net.SplitHostPort(bind)
30 if err != nil {
31 return fmt.Errorf("parse tap bind %q: %w", bind, err)
32 }
33 if host == "" {
34 return fmt.Errorf("embedded mode requires loopback host in tap bind %q", bind)
35 }
36 if strings.EqualFold(host, "localhost") {
37 return nil
38 }
39 ip := net.ParseIP(host)
40 if ip == nil || !ip.IsLoopback() {
41 return fmt.Errorf("embedded tap bind %q must be loopback like 127.0.0.1 or ::1", bind)
42 }
43 return nil
44}
45
46type embeddedTap struct {
47 tap *tap.Tap
48 logger *slog.Logger
49}
50
51func startEmbeddedTap(ctx context.Context, cfg *config.Config, logger *slog.Logger) (*embeddedTap, error) {
52 if err := assertLoopbackBind(cfg.Server.Tap.Bind); err != nil {
53 return nil, err
54 }
55
56 tcfg := tap.Config{
57 DatabaseURL: "sqlite://" + cfg.Server.Tap.DBPath,
58 DBMaxConns: 32,
59 PLCURL: cfg.Server.PlcUrl,
60 RelayUrl: cfg.Server.Tap.RelayUrl,
61 FirehoseParallelism: 4,
62 ResyncParallelism: 2,
63 OutboxParallelism: 1,
64 FirehoseCursorSaveInterval: time.Second,
65 RepoFetchTimeout: 5 * time.Minute,
66 IdentityCacheSize: 50_000,
67 EventCacheSize: 10_000,
68 SignalCollection: tangled.RepoNSID,
69 CollectionFilters: []string{tangled.RepoNSID, tangled.RepoCollaboratorNSID},
70 AdminPassword: cfg.Server.Tap.AdminPassword,
71 RetryTimeout: 60 * time.Second,
72 }
73
74 t, err := tap.New(tcfg)
75 if err != nil {
76 return nil, fmt.Errorf("tap.New: %w", err)
77 }
78
79 go func() {
80 if err := t.Firehose.Run(ctx); err != nil && !errors.Is(err, context.Canceled) {
81 logger.Error("firehose terminated", "err", err)
82 }
83 }()
84 t.Run(ctx)
85 go func() {
86 logger.Info("tap http server listening", "bind", cfg.Server.Tap.Bind)
87 if err := t.Server.Start(cfg.Server.Tap.Bind); err != nil && !errors.Is(err, http.ErrServerClosed) {
88 logger.Error("tap http server terminated", "err", err)
89 }
90 }()
91
92 if err := waitForListener(ctx, cfg.Server.Tap.Bind, time.Now().Add(10*time.Second)); err != nil {
93 logger.Warn("tap http server unreachable before timeout", "bind", cfg.Server.Tap.Bind, "err", err)
94 }
95
96 return &embeddedTap{tap: t, logger: logger}, nil
97}
98
99func waitForListener(ctx context.Context, addr string, deadline time.Time) error {
100 if ctx.Err() != nil {
101 return ctx.Err()
102 }
103 if time.Now().After(deadline) {
104 return fmt.Errorf("timed out waiting for %s", addr)
105 }
106 c, err := net.DialTimeout("tcp", addr, 100*time.Millisecond)
107 if err == nil {
108 c.Close()
109 return nil
110 }
111 time.Sleep(50 * time.Millisecond)
112 return waitForListener(ctx, addr, deadline)
113}
114
115func (e *embeddedTap) Shutdown() {
116 if e == nil || e.tap == nil {
117 return
118 }
119 shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
120 defer cancel()
121 if err := e.tap.Server.Shutdown(shutdownCtx); err != nil {
122 e.logger.Error("tap server shutdown failed", "err", err)
123 }
124 if err := e.tap.CloseDb(shutdownCtx); err != nil {
125 e.logger.Error("tap db close failed", "err", err)
126 }
127}