Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

1package spindle 2 3import ( 4 "context" 5 "crypto/rand" 6 "encoding/hex" 7 "errors" 8 "fmt" 9 "log/slog" 10 "net" 11 "net/http" 12 "strings" 13 "sync/atomic" 14 "time" 15 16 "github.com/bluesky-social/indigo/service/tap" 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/spindle/config" 19) 20 21func randomAdminPassword() (string, error) { 22 var b [32]byte 23 if _, err := rand.Read(b[:]); err != nil { 24 return "", fmt.Errorf("generate tap admin password: %w", err) 25 } 26 return hex.EncodeToString(b[:]), nil 27} 28 29func assertLoopbackBind(bind string) error { 30 host, _, err := net.SplitHostPort(bind) 31 if err != nil { 32 return fmt.Errorf("parse tap bind %q: %w", bind, err) 33 } 34 if host == "" { 35 return fmt.Errorf("embedded mode requires loopback host in tap bind %q", bind) 36 } 37 if strings.EqualFold(host, "localhost") { 38 return nil 39 } 40 ip := net.ParseIP(host) 41 if ip == nil || !ip.IsLoopback() { 42 return fmt.Errorf("embedded tap bind %q must be loopback like 127.0.0.1 or ::1", bind) 43 } 44 return nil 45} 46 47type embeddedTap struct { 48 tap *tap.Tap 49 logger *slog.Logger 50 closed atomic.Bool 51} 52 53func startEmbeddedTap(ctx context.Context, cfg *config.Config, logger *slog.Logger) (*embeddedTap, error) { 54 if err := assertLoopbackBind(cfg.Server.Tap.Bind); err != nil { 55 return nil, err 56 } 57 58 tcfg := tap.Config{ 59 DatabaseURL: "sqlite://" + cfg.Server.Tap.DBPath, 60 DBMaxConns: 32, 61 PLCURL: cfg.Server.PlcUrl, 62 RelayUrl: cfg.Server.Tap.RelayUrl, 63 FirehoseParallelism: 4, 64 ResyncParallelism: 2, 65 OutboxParallelism: 1, 66 FirehoseCursorSaveInterval: time.Second, 67 RepoFetchTimeout: 5 * time.Minute, 68 IdentityCacheSize: 50_000, 69 EventCacheSize: 10_000, 70 SignalCollection: tangled.RepoNSID, 71 CollectionFilters: []string{tangled.RepoNSID, tangled.RepoCollaboratorNSID}, 72 AdminPassword: cfg.Server.Tap.AdminPassword, 73 RetryTimeout: 60 * time.Second, 74 } 75 76 t, err := tap.New(tcfg) 77 if err != nil { 78 return nil, fmt.Errorf("tap.New: %w", err) 79 } 80 81 go func() { 82 if err := t.Firehose.Run(ctx); err != nil && !errors.Is(err, context.Canceled) { 83 logger.Error("firehose terminated", "err", err) 84 } 85 }() 86 t.Run(ctx) 87 go func() { 88 logger.Info("tap http server listening", "bind", cfg.Server.Tap.Bind) 89 if err := t.Server.Start(cfg.Server.Tap.Bind); err != nil && !errors.Is(err, http.ErrServerClosed) { 90 logger.Error("tap http server terminated", "err", err) 91 } 92 }() 93 94 if err := waitForListener(ctx, cfg.Server.Tap.Bind, time.Now().Add(10*time.Second)); err != nil { 95 logger.Warn("tap http server unreachable before timeout", "bind", cfg.Server.Tap.Bind, "err", err) 96 } 97 98 return &embeddedTap{tap: t, logger: logger}, nil 99} 100 101func waitForListener(ctx context.Context, addr string, deadline time.Time) error { 102 if ctx.Err() != nil { 103 return ctx.Err() 104 } 105 if time.Now().After(deadline) { 106 return fmt.Errorf("timed out waiting for %s", addr) 107 } 108 c, err := net.DialTimeout("tcp", addr, 100*time.Millisecond) 109 if err == nil { 110 c.Close() 111 return nil 112 } 113 time.Sleep(50 * time.Millisecond) 114 return waitForListener(ctx, addr, deadline) 115} 116 117func (e *embeddedTap) Shutdown() { 118 if e == nil || e.tap == nil { 119 return 120 } 121 if e.closed.Swap(true) { 122 return 123 } 124 shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 125 defer cancel() 126 if err := e.tap.Server.Shutdown(shutdownCtx); err != nil { 127 e.logger.Error("tap server shutdown failed", "err", err) 128 } 129 if err := e.tap.CloseDb(shutdownCtx); err != nil { 130 e.logger.Error("tap db close failed", "err", err) 131 } 132}