Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions downstreamadapter/sink/cloudstorage/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,12 @@ func (s *sink) writeFile(v *commonEvent.DDLEvent, def cloudstorage.TableDefiniti
}
log.Debug("write ddl event to external storage",
zap.String("path", path), zap.Any("ddl", v))
return s.statistics.RecordDDLExecution(func() error {
return s.statistics.RecordDDLExecution(func() (string, error) {
err = s.storage.WriteFile(s.ctx, path, encodedDef)
if err != nil {
return err
return "", err
}
return nil
return v.GetDDLType().String(), nil
})
}

Expand Down
9 changes: 5 additions & 4 deletions downstreamadapter/sink/kafka/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,13 +429,14 @@ func (s *sink) sendDDLEvent(event *commonEvent.DDLEvent) error {
if err != nil {
return err
}
ddlType := e.GetDDLType().String()
if s.partitionRule == helper.PartitionAll {
err = s.statistics.RecordDDLExecution(func() error {
return s.ddlProducer.SendMessages(topic, partitionNum, message)
err = s.statistics.RecordDDLExecution(func() (string, error) {
return ddlType, s.ddlProducer.SendMessages(topic, partitionNum, message)
})
} else {
err = s.statistics.RecordDDLExecution(func() error {
return s.ddlProducer.SendMessage(topic, 0, message)
err = s.statistics.RecordDDLExecution(func() (string, error) {
return ddlType, s.ddlProducer.SendMessage(topic, 0, message)
})
}
if err != nil {
Expand Down
9 changes: 5 additions & 4 deletions downstreamadapter/sink/pulsar/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,14 @@ func (s *sink) sendDDLEvent(event *commonEvent.DDLEvent) error {
if err != nil {
return err
}
ddlType := e.GetDDLType().String()
if s.partitionRule == helper.PartitionAll {
err = s.statistics.RecordDDLExecution(func() error {
return s.ddlProducer.syncBroadcastMessage(s.ctx, topic, message)
err = s.statistics.RecordDDLExecution(func() (string, error) {
return ddlType, s.ddlProducer.syncBroadcastMessage(s.ctx, topic, message)
})
} else {
err = s.statistics.RecordDDLExecution(func() error {
return s.ddlProducer.syncSendMessage(s.ctx, topic, message)
err = s.statistics.RecordDDLExecution(func() (string, error) {
return ddlType, s.ddlProducer.syncSendMessage(s.ctx, topic, message)
})
}
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions downstreamadapter/sink/redo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ func (s *Sink) Run(ctx context.Context) error {
func (s *Sink) WriteBlockEvent(event commonEvent.BlockEvent) error {
switch e := event.(type) {
case *commonEvent.DDLEvent:
err := s.statistics.RecordDDLExecution(func() error {
return s.ddlWriter.WriteEvents(s.ctx, e)
err := s.statistics.RecordDDLExecution(func() (string, error) {
ddlType := e.GetDDLType().String()
return ddlType, s.ddlWriter.WriteEvents(s.ctx, e)
})
if err != nil {
s.isNormal.Store(false)
Expand Down
2 changes: 1 addition & 1 deletion maintainer/barrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (b *Barrier) Resend() []*messaging.TargetMessage {
// because on the complete checkpointTs calculation should consider the new dispatcher.
func (b *Barrier) ShouldBlockCheckpointTs() bool {
flag := false
b.blockedEvents.RangeWoLock(func(key eventKey, barrierEvent *BarrierEvent) bool {
b.blockedEvents.RangeWithoutLock(func(key eventKey, barrierEvent *BarrierEvent) bool {
if barrierEvent.hasNewTable {
flag = true
return false
Expand Down
8 changes: 7 additions & 1 deletion maintainer/barrier_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (b *BlockedEventMap) Range(f func(key eventKey, value *BarrierEvent) bool)
}
}

func (b *BlockedEventMap) RangeWoLock(f func(key eventKey, value *BarrierEvent) bool) {
func (b *BlockedEventMap) RangeWithoutLock(f func(key eventKey, value *BarrierEvent) bool) {
for k, v := range b.m {
if !f(k, v) {
break
Expand All @@ -151,6 +151,12 @@ func (b *BlockedEventMap) Delete(key eventKey) {
delete(b.m, key)
}

func (b *BlockedEventMap) Len() int {
b.mutex.Lock()
defer b.mutex.Unlock()
return len(b.m)
}

// eventKey is the key of the block event,
// the ddl and sync point are identified by the blockTs and isSyncPoint since they can share the same blockTs
type eventKey struct {
Expand Down
4 changes: 4 additions & 0 deletions maintainer/maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1103,15 +1103,19 @@ func (m *Maintainer) collectMetrics() {
working := spanController.GetReplicatingSize()
absent := spanController.GetAbsentSize()

var blockingLen int
if common.IsDefaultMode(mode) {
m.spanCountGauge.Set(float64(totalSpanCount))
m.tableCountGauge.Set(float64(totalTableCount))
m.scheduledTaskGauge.Set(float64(scheduling))
blockingLen = m.controller.barrier.blockedEvents.Len()
} else {
m.redoSpanCountGauge.Set(float64(totalSpanCount))
m.redoTableCountGauge.Set(float64(totalTableCount))
m.redoScheduledTaskGauge.Set(float64(scheduling))
blockingLen = m.controller.redoBarrier.blockedEvents.Len()
}
metrics.ExecDDLBlockingGauge.WithLabelValues(m.changefeedID.Keyspace(), m.changefeedID.Name(), common.StringMode(mode)).Set(float64(blockingLen))
metrics.TableStateGauge.WithLabelValues(m.changefeedID.Keyspace(), m.changefeedID.Name(), "Absent", common.StringMode(mode)).Set(float64(absent))
metrics.TableStateGauge.WithLabelValues(m.changefeedID.Keyspace(), m.changefeedID.Name(), "Working", common.StringMode(mode)).Set(float64(working))
}
Expand Down
Loading