-
Notifications
You must be signed in to change notification settings - Fork 30
Expand file tree
/
Copy pathruntime.go
More file actions
534 lines (463 loc) · 19.5 KB
/
Copy pathruntime.go
File metadata and controls
534 lines (463 loc) · 19.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
package main
import (
"context"
"embed"
"fmt"
"os"
"path/filepath"
"strings"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ilyakaznacheev/cleanenv"
"github.com/joho/godotenv"
"github.com/prometheus/client_golang/prometheus"
"github.com/layer-3/nitrolite/nitronode/metrics"
"github.com/layer-3/nitrolite/nitronode/store/database"
"github.com/layer-3/nitrolite/nitronode/store/memory"
"github.com/layer-3/nitrolite/pkg/blockchain/evm"
"github.com/layer-3/nitrolite/pkg/core"
"github.com/layer-3/nitrolite/pkg/log"
"github.com/layer-3/nitrolite/pkg/rpc"
"github.com/layer-3/nitrolite/pkg/sign"
"github.com/layer-3/nitrolite/pkg/sign/kms/gcp"
)
//go:embed config/migrations/*/*.sql
var embedMigrations embed.FS
var Version = "v1.0.0" // set at build time with -ldflags "-X main.Version=x.y.z"
type Backbone struct {
NodeVersion string
ChannelMinChallengeDuration uint32
ChannelMaxChallengeDuration uint32
BlockchainRPCs map[uint64]string
BlockchainGasLimit uint64
ValidationLimits ValidationLimits
DbStore database.DatabaseStore
MemoryStore memory.MemoryStore
RpcNode rpc.Node
StateSigner sign.Signer
TxSigner sign.Signer
Logger log.Logger
RuntimeMetrics metrics.RuntimeMetricExporter
StoreMetrics metrics.StoreMetricExporter
closers []func() error
}
// Close releases resources held by the backbone (e.g., KMS client connections).
func (b *Backbone) Close() error {
var firstErr error
for _, fn := range b.closers {
if err := fn(); err != nil && firstErr == nil {
firstErr = err
}
}
return firstErr
}
type FullConfig struct {
Database database.DatabaseConfig
ChannelMinChallengeDuration uint32 `yaml:"channel_min_challenge_duration" env:"NITRONODE_CHANNEL_MIN_CHALLENGE_DURATION" env-default:"86400"` // 24 hours
ChannelMaxChallengeDuration uint32 `yaml:"channel_max_challenge_duration" env:"NITRONODE_CHANNEL_MAX_CHALLENGE_DURATION" env-default:"604800"` // 7 days
Signer SignerConfig `yaml:"signer"`
SignerType string `yaml:"signer_type" env:"NITRONODE_SIGNER_TYPE" env-default:"key"` // "key" or "gcp-kms"
SignerKey string `yaml:"signer_key" env:"NITRONODE_SIGNER_KEY"` // required when signer_type=key
GCPKMSKeyName string `yaml:"gcp_kms_key_name" env:"NITRONODE_GCP_KMS_KEY_NAME"` // required when signer_type=gcp-kms
ValidationLimits ValidationLimits `yaml:"validation_limits"`
RateLimitPerSec float64 `yaml:"rate_limit_per_sec" env:"NITRONODE_RATE_LIMIT_PER_SEC" env-default:"10"`
RateLimitBurst float64 `yaml:"rate_limit_burst" env:"NITRONODE_RATE_LIMIT_BURST" env-default:"20"`
WsProcessBufferSize int `yaml:"ws_process_buffer_size" env:"NITRONODE_WS_PROCESS_BUFFER_SIZE" env-default:"64"`
WsWriteBufferSize int `yaml:"ws_write_buffer_size" env:"NITRONODE_WS_WRITE_BUFFER_SIZE" env-default:"64"`
// WsMaxMessageSize caps inbound WebSocket frame size in bytes. Frames over the
// cap close the connection with WebSocket close code 1009 before allocation.
// 128 KiB fits any legitimate v1 RPC with substantial headroom.
WsMaxMessageSize int64 `yaml:"ws_max_message_size" env:"NITRONODE_WS_MAX_MESSAGE_SIZE" env-default:"131072"`
// WsBytesPerSec is the steady-state byte budget per connection. Set <0 to disable.
WsBytesPerSec float64 `yaml:"ws_bytes_per_sec" env:"NITRONODE_WS_BYTES_PER_SEC" env-default:"262144"`
// WsBytesBurst is the burst capacity of the per-connection byte bucket.
WsBytesBurst float64 `yaml:"ws_bytes_burst" env:"NITRONODE_WS_BYTES_BURST" env-default:"1048576"`
// BlockchainGasLimit forces a fixed GasLimit on every blockchain transaction,
// bypassing eth_estimateGas. 0 = use estimate (default). Set when an RPC
// rejects estimateGas — e.g. XRPL EVM testnet ("gas cap cannot be lower than 21000").
BlockchainGasLimit uint64 `yaml:"blockchain_gas_limit" env:"NITRONODE_BLOCKCHAIN_GAS_LIMIT" env-default:"0"`
}
type SignerConfig struct {
Type string `yaml:"type" env:"NITRONODE_SIGNER_TYPE" env-default:"key"` // "key" or "gcp-kms"
Key string `yaml:"key" env:"NITRONODE_SIGNER_KEY"` // required when type=key
GCPKMSKeyName string `yaml:"gcp_kms_key_name" env:"NITRONODE_GCP_KMS_KEY_NAME"` // required when type=gcp-kms
}
// ValidationLimits defines configurable upper bounds for dynamic-length request fields.
type ValidationLimits struct {
MaxParticipants int `yaml:"max_participants" env:"NITRONODE_MAX_PARTICIPANTS" env-default:"32"`
MaxSessionDataLen int `yaml:"max_session_data_len" env:"NITRONODE_MAX_SESSION_DATA_LEN" env-default:"1024"`
MaxSessionKeyIDs int `yaml:"max_session_key_ids" env:"NITRONODE_MAX_SESSION_KEY_IDS" env-default:"10"`
// MaxSessionKeysPerUser is a soft per-user cap on active session keys, a DoS/storage
// bound enforced when a submit activates a key (new registration or reactivation).
// A value <= 0 disables the cap entirely (unlimited). Default 100.
MaxSessionKeysPerUser int `yaml:"max_session_keys_per_user" env:"NITRONODE_MAX_SESSION_KEYS_PER_USER" env-default:"100"`
}
// intrinsicTxGas is the minimum gas required for any Ethereum transaction.
const intrinsicTxGas = 21000
func validateBlockchainGasLimit(gasLimit uint64) error {
if gasLimit > 0 && gasLimit < intrinsicTxGas {
return fmt.Errorf(
"NITRONODE_BLOCKCHAIN_GAS_LIMIT must be 0 (auto-estimate) or >= %d, got %d",
intrinsicTxGas,
gasLimit,
)
}
return nil
}
func validateChannelChallengeConfig(minChallenge, maxChallenge uint32) error {
if minChallenge < core.ChannelMinChallengeDuration {
return fmt.Errorf(
"NITRONODE_CHANNEL_MIN_CHALLENGE_DURATION must be at least %d seconds, got %d",
core.ChannelMinChallengeDuration,
minChallenge,
)
}
if maxChallenge > core.ChannelMaxChallengeDuration {
return fmt.Errorf(
"NITRONODE_CHANNEL_MAX_CHALLENGE_DURATION must be at most %d seconds, got %d",
core.ChannelMaxChallengeDuration,
maxChallenge,
)
}
if minChallenge > maxChallenge {
return fmt.Errorf(
"NITRONODE_CHANNEL_MIN_CHALLENGE_DURATION must be <= NITRONODE_CHANNEL_MAX_CHALLENGE_DURATION, got min=%d max=%d",
minChallenge,
maxChallenge,
)
}
return nil
}
// maxParticipantsUint16Safe is the largest MaxParticipants value that cannot overflow the
// uint16 quorum-weight accumulators: 257 × 255 = 65535 = math.MaxUint16.
const maxParticipantsUint16Safe = 257
func validateValidationLimits(vl ValidationLimits) error {
if vl.MaxParticipants > maxParticipantsUint16Safe {
return fmt.Errorf(
"NITRONODE_MAX_PARTICIPANTS must be ≤ %d to prevent quorum-weight overflow (uint16 ceiling), got %d",
maxParticipantsUint16Safe,
vl.MaxParticipants,
)
}
return nil
}
// InitBackbone initializes the backbone components of the application.
func InitBackbone() *Backbone {
closers := []func() error{} // collect closer functions for resources that need cleanup
// ------------------------------------------------
// Logger
// ------------------------------------------------
logger := initLogger()
// ------------------------------------------------
// (Preparation)
// ------------------------------------------------
configDirPath := initBase(logger)
var conf FullConfig
if err := cleanenv.ReadEnv(&conf); err != nil {
logger.Fatal("failed to read env", "err", err)
}
if err := validateChannelChallengeConfig(conf.ChannelMinChallengeDuration, conf.ChannelMaxChallengeDuration); err != nil {
logger.Fatal("invalid channel challenge duration config", "error", err)
}
if err := validateBlockchainGasLimit(conf.BlockchainGasLimit); err != nil {
logger.Fatal("invalid blockchain gas limit config", "error", err)
}
if err := validateValidationLimits(conf.ValidationLimits); err != nil {
logger.Fatal("invalid validation limits config", "error", err)
}
logger.Info("config loaded", "version", Version)
// ------------------------------------------------
// Database Store
// ------------------------------------------------
db, err := database.ConnectToDB(conf.Database, embedMigrations)
if err != nil {
logger.Fatal("failed to load database store", "error", err)
}
dbStore := database.NewDBStore(db)
// ------------------------------------------------
// Memory Store
// ------------------------------------------------
memoryStore, err := memory.NewMemoryStoreV1FromConfig(configDirPath)
if err != nil {
logger.Fatal("failed to load blockchains", "error", err)
}
// ------------------------------------------------
// Signer
// ------------------------------------------------
txSigner, closer := initSigner(logger, conf.Signer.Type, conf.Signer.Key, conf.Signer.GCPKMSKeyName)
closers = append(closers, closer)
stateSigner, err := sign.NewEthereumMsgSignerFromRaw(txSigner)
if err != nil {
logger.Fatal("failed to wrap tx signer as state signer", "error", err)
}
logger.Info("signer initialized", "type", conf.Signer.Type, "address", stateSigner.PublicKey().Address())
// ------------------------------------------------
// Metrics
// ------------------------------------------------
runtimeMetrics, err := metrics.NewRuntimeMetricExporter(prometheus.DefaultRegisterer)
if err != nil {
logger.Fatal("failed to initialize runtime metric exporter", "error", err)
}
storeMetrics, err := metrics.NewStoreMetricExporter(prometheus.DefaultRegisterer)
if err != nil {
logger.Fatal("failed to initialize store metric exporter", "error", err)
}
// Wire DB instrumentation now that both gorm.DB and the runtime exporter
// exist: per-query histograms via gorm callbacks, plus the standard
// go_sql_* pool-stats collector.
if err := database.RegisterMetricsCallbacks(db, runtimeMetrics); err != nil {
logger.Fatal("failed to register database metric callbacks", "error", err)
}
if err := database.RegisterDBStatsCollector(db, prometheus.DefaultRegisterer); err != nil {
logger.Fatal("failed to register database stats collector", "error", err)
}
// ------------------------------------------------
// RPC Node
// ------------------------------------------------
// MF-C01 requires a hard frame-size cap; refuse to start without one even if
// the operator zeroed the env. The library would substitute its own default,
// but we want the misconfiguration surfaced rather than papered over.
if conf.WsMaxMessageSize <= 0 {
logger.Fatal(
"NITRONODE_WS_MAX_MESSAGE_SIZE must be > 0; the WebSocket frame cap cannot be disabled",
"ws_max_message_size", conf.WsMaxMessageSize,
)
}
bytesPerSec := conf.WsBytesPerSec
bytesBurst := conf.WsBytesBurst
// When the per-connection byte budget is enabled, the burst must be at least
// the max message size; otherwise every legitimate frame that passes
// SetReadLimit would still be rejected by the bucket on arrival, taking the
// node out via config alone. Fail fast at startup.
if bytesPerSec > 0 {
if bytesBurst <= 0 {
logger.Fatal(
"NITRONODE_WS_BYTES_BURST must be > 0 when NITRONODE_WS_BYTES_PER_SEC is enabled",
"ws_bytes_burst", bytesBurst,
"ws_bytes_per_sec", bytesPerSec,
)
}
if bytesBurst < float64(conf.WsMaxMessageSize) {
logger.Fatal(
"NITRONODE_WS_BYTES_BURST must be >= NITRONODE_WS_MAX_MESSAGE_SIZE when byte limiting is enabled",
"ws_bytes_burst", bytesBurst,
"ws_max_message_size", conf.WsMaxMessageSize,
)
}
}
// Per-connection request-count budget. Enforced at the frame layer alongside
// the byte budget so a flood of tiny frames — malformed or unknown-method
// frames included, which never reach the handler chain — is throttled before
// it can be parsed. Set <=0 to disable.
reqPerSec := conf.RateLimitPerSec
reqBurst := conf.RateLimitBurst
if reqPerSec > 0 && reqBurst < 1 {
logger.Fatal(
"NITRONODE_RATE_LIMIT_BURST must be >= 1 when NITRONODE_RATE_LIMIT_PER_SEC is enabled",
"rate_limit_burst", reqBurst,
"rate_limit_per_sec", reqPerSec,
)
}
rpcNode, err := rpc.NewWebsocketNode(rpc.WebsocketNodeConfig{
Logger: logger,
ObserveConnections: runtimeMetrics.SetRPCConnections,
WsConnProcessBufferSize: conf.WsProcessBufferSize,
WsConnWriteBufferSize: conf.WsWriteBufferSize,
WsConnMaxMessageSize: conf.WsMaxMessageSize,
NewFrameRateLimiter: func() rpc.FrameRateLimiter {
var limiters rpc.CompositeFrameRateLimiter
if bytesPerSec > 0 {
limiters = append(limiters, rpc.NewByteTokenBucket(bytesPerSec, bytesBurst))
}
if reqPerSec > 0 {
limiters = append(limiters, rpc.NewRequestTokenBucket(reqPerSec, reqBurst))
}
switch len(limiters) {
case 0:
return rpc.NoopFrameRateLimiter{}
case 1:
return limiters[0]
default:
return limiters
}
},
})
if err != nil {
logger.Fatal("failed to initialize RPC node", "error", err)
}
// ------------------------------------------------
// Blockchain RPCs
// ------------------------------------------------
blockchainRPCs := initBlockchainRPCs(logger, memoryStore)
// Seed per-chain event counters at 0 so a chain whose listener has not yet
// emitted an event still appears in /metrics (lets stalled-listener alerts
// based on absent()/rate() evaluate against defined series).
blockchainIDs := make([]uint64, 0, len(blockchainRPCs))
for id := range blockchainRPCs {
blockchainIDs = append(blockchainIDs, id)
}
runtimeMetrics.SeedBlockchainEventMetrics(blockchainIDs)
return &Backbone{
NodeVersion: Version,
ChannelMinChallengeDuration: conf.ChannelMinChallengeDuration,
ChannelMaxChallengeDuration: conf.ChannelMaxChallengeDuration,
BlockchainRPCs: blockchainRPCs,
BlockchainGasLimit: conf.BlockchainGasLimit,
ValidationLimits: conf.ValidationLimits,
DbStore: dbStore,
MemoryStore: memoryStore,
RpcNode: rpcNode,
StateSigner: stateSigner,
TxSigner: txSigner,
Logger: logger,
RuntimeMetrics: runtimeMetrics,
StoreMetrics: storeMetrics,
closers: closers,
}
}
func initBase(logger log.Logger) string {
migrateLegacyClearnodeEnv(logger)
configDirPath := os.Getenv("NITRONODE_CONFIG_DIR_PATH")
if configDirPath == "" {
configDirPath = "."
}
configDotEnvPath := filepath.Join(configDirPath, ".env")
logger.Info("loading .env file", "path", configDotEnvPath)
if err := godotenv.Load(configDotEnvPath); err != nil {
logger.Warn(".env file not found")
}
return configDirPath
}
// migrateLegacyClearnodeEnv copies any CLEARNODE_* environment variables to their
// NITRONODE_* counterparts when the new name is unset. The legacy prefix is
// deprecated and will be removed in a future release.
func migrateLegacyClearnodeEnv(logger log.Logger) {
const legacyPrefix = "CLEARNODE_"
const newPrefix = "NITRONODE_"
for _, kv := range os.Environ() {
eq := strings.IndexByte(kv, '=')
if eq < 0 {
continue
}
key := kv[:eq]
if !strings.HasPrefix(key, legacyPrefix) {
continue
}
newKey := newPrefix + strings.TrimPrefix(key, legacyPrefix)
if _, set := os.LookupEnv(newKey); set {
continue
}
if err := os.Setenv(newKey, kv[eq+1:]); err != nil {
logger.Warn("failed to migrate legacy env var", "from", key, "to", newKey, "error", err)
continue
}
logger.Warn("CLEARNODE_* env var is deprecated, use NITRONODE_*", "from", key, "to", newKey)
}
}
func initLogger() log.Logger {
var loggerConf log.Config
if err := cleanenv.ReadEnv(&loggerConf); err != nil {
panic("failed to read logger config from env: " + err.Error())
}
return log.NewZapLogger(loggerConf).WithName("main")
}
func initSigner(logger log.Logger, signerType, privateKey, gcpKMSKeyName string) (sign.Signer, func() error) {
var signer sign.Signer
var err error
closer := func() error { return nil } // default no-op closer
switch signerType {
case "key":
if privateKey == "" {
logger.Fatal("NITRONODE_SIGNER_KEY is required when NITRONODE_SIGNER_TYPE=key")
}
signer, err = sign.NewEthereumRawSigner(privateKey)
if err != nil {
logger.Fatal("failed to initialise tx signer", "error", err)
}
case "gcp-kms":
if gcpKMSKeyName == "" {
logger.Fatal("NITRONODE_GCP_KMS_KEY_NAME is required when NITRONODE_SIGNER_TYPE=gcp-kms")
}
kmsCtx, kmsCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer kmsCancel()
kmsSigner, kmsErr := gcp.NewSigner(kmsCtx, gcpKMSKeyName)
if kmsErr != nil {
logger.Fatal("failed to initialise GCP KMS signer", "error", kmsErr)
}
closer = kmsSigner.Close
signer = kmsSigner
default:
logger.Fatal("unsupported NITRONODE_SIGNER_TYPE", "type", signerType)
}
return signer, closer
}
func initBlockchainRPCs(logger log.Logger, memoryStore memory.MemoryStore) map[uint64]string {
blockchains, err := memoryStore.GetBlockchains()
if err != nil {
logger.Fatal("failed to get blockchains", "error", err)
}
blockchainRPCs := make(map[uint64]string)
for _, bc := range blockchains {
envVarName := "NITRONODE_BLOCKCHAIN_RPC_" + strings.ToUpper(bc.Name)
rpcURL := os.Getenv(envVarName)
if rpcURL == "" {
logger.Fatal("blockchain RPC URL not set in env", "blockchainID", bc.ID, "env_var", envVarName)
}
// Test connection
if err := checkChainId(rpcURL, bc.ID); err != nil {
logger.Fatal("failed to verify blockchain RPC", "blockchainID", bc.ID, "error", err)
}
// Verify ChannelHub version
channelHubAddress := common.HexToAddress(bc.ChannelHubAddress)
if err := checkChannelHubVersion(rpcURL, channelHubAddress, core.ChannelHubVersion); err != nil {
logger.Fatal("failed to verify ChannelHub version", "blockchainID", bc.ID, "address", bc.ChannelHubAddress, "error", err)
}
blockchainRPCs[bc.ID] = rpcURL
}
return blockchainRPCs
}
// checkChainId connects to an RPC endpoint and verifies it returns the expected chain ID.
// This ensures the RPC URL points to the correct blockchain network.
// Bounded by evm.RPCCallTimeout for the connection and chain ID query.
func checkChainId(blockchainRPC string, expectedChainID uint64) error {
ctx, cancel := context.WithTimeout(context.Background(), evm.RpcCallTimeout)
defer cancel()
client, err := ethclient.DialContext(ctx, blockchainRPC)
if err != nil {
return fmt.Errorf("failed to connect to blockchain RPC: %w", err)
}
defer client.Close()
chainID, err := client.ChainID(ctx)
if err != nil {
return fmt.Errorf("failed to get chain ID from blockchain RPC: %w", err)
}
if chainID.Uint64() != expectedChainID {
return fmt.Errorf("unexpected chain ID from blockchain RPC: got %d, want %d", chainID.Uint64(), expectedChainID)
}
return nil
}
// checkChannelHubVersion verifies that the ChannelHub contract at the given address
// has the expected VERSION constant value.
// Bounded by evm.RPCCallTimeout for the connection and contract calls.
func checkChannelHubVersion(blockchainRPC string, channelHubAddress common.Address, expectedVersion uint8) error {
ctx, cancel := context.WithTimeout(context.Background(), evm.RpcCallTimeout)
defer cancel()
client, err := ethclient.DialContext(ctx, blockchainRPC)
if err != nil {
return fmt.Errorf("failed to connect to blockchain RPC: %w", err)
}
defer client.Close()
channelHub, err := evm.NewChannelHubCaller(channelHubAddress, client)
if err != nil {
return fmt.Errorf("failed to create ChannelHub caller: %w", err)
}
fetchedVersion, err := channelHub.VERSION(nil)
if err != nil {
return fmt.Errorf("failed to get ChannelHub version: %w", err)
}
if fetchedVersion != expectedVersion {
return fmt.Errorf("configured and fetched ChannelHub version mismatch: got %d, want %d", fetchedVersion, expectedVersion)
}
return nil
}