-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.go
More file actions
122 lines (103 loc) · 3.43 KB
/
main.go
File metadata and controls
122 lines (103 loc) · 3.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/RailForLess/tracky/api/config"
"github.com/RailForLess/tracky/api/db"
"github.com/RailForLess/tracky/api/drainer"
"github.com/RailForLess/tracky/api/middleware"
"github.com/RailForLess/tracky/api/realtime"
"github.com/RailForLess/tracky/api/routes"
"github.com/RailForLess/tracky/api/ws"
)
const drainInterval = 60 * time.Second
func main() {
config.LoadEnv("cmd/server/.env")
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
port := os.Getenv("PORT")
if port == "" {
port = "8080"
}
hub := ws.NewHub()
go hub.Run(ctx)
ingestSecret := os.Getenv("INGEST_SECRET")
if ingestSecret == "" {
log.Printf("WARNING: INGEST_SECRET unset — /ingest is open. Set it for any non-local deploy.")
}
var database *db.DB
if dsn := os.Getenv("DATABASE_URL"); dsn != "" {
var err error
database, err = db.Open(ctx, dsn)
if err != nil {
log.Fatalf("db: %v", err)
}
defer database.Close()
log.Printf("db: connected; /v1/* read endpoints enabled")
} else {
log.Printf("db: DATABASE_URL unset; /v1/* read endpoints disabled")
}
processor := realtime.NewProcessor(hub, database)
if client, bucket := buildR2Client(ctx); client != nil {
d := &drainer.Drainer{Client: client, Bucket: bucket, Processor: processor, Interval: drainInterval}
go d.Run(ctx)
log.Printf("drainer: enabled (bucket=%s, interval=%s)", bucket, drainInterval)
} else {
log.Printf("drainer: disabled (set R2_BUCKET to enable)")
}
mux := http.NewServeMux()
mux.HandleFunc("GET /health", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
})
routes.Setup(mux, hub, processor, database, ingestSecret)
mux.HandleFunc("GET /ws/realtime", ws.Handler(hub))
// 30 req/s per IP, burst 60. Tweak via env if/when needed.
handler := middleware.RateLimit(30, 60)(mux)
srv := &http.Server{Addr: ":" + port, Handler: handler}
go func() {
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
srv.Shutdown(shutdownCtx)
}()
log.Printf("starting server on :%s (collector ingest at POST /ingest)", port)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatal(err)
}
}
// buildR2Client returns (nil, "") if R2 isn't configured. Required env:
//
// R2_BUCKET - bucket name (e.g. tracky-backlog)
// R2_ENDPOINT - https://{account_id}.r2.cloudflarestorage.com
// R2_ACCESS_KEY_ID - R2 access key from the dashboard
// R2_SECRET_ACCESS_KEY
func buildR2Client(ctx context.Context) (*s3.Client, string) {
bucket := os.Getenv("R2_BUCKET")
endpoint := os.Getenv("R2_ENDPOINT")
accessKey := os.Getenv("R2_ACCESS_KEY_ID")
secretKey := os.Getenv("R2_SECRET_ACCESS_KEY")
if bucket == "" || endpoint == "" || accessKey == "" || secretKey == "" {
return nil, ""
}
cfg, err := awsconfig.LoadDefaultConfig(ctx,
awsconfig.WithRegion("auto"),
awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")),
)
if err != nil {
log.Fatalf("r2: load aws config: %v", err)
}
client := s3.NewFromConfig(cfg, func(o *s3.Options) {
o.BaseEndpoint = &endpoint
o.UsePathStyle = true
})
return client, bucket
}