diff --git a/worker/action_job.go b/worker/action_job.go index 3a184d221..ccf5bad45 100644 --- a/worker/action_job.go +++ b/worker/action_job.go @@ -116,7 +116,7 @@ func (b *Builder) eventActionJobCreate(ctx *web.EventContext) (r web.EventRespon return r, fmt.Errorf("job %s not found", jobName) } - job, err := b.createJob(ctx, qorJob) + job, err := b.CreateJob(ctx, qorJob) if err != nil { return } @@ -177,7 +177,7 @@ func (b *Builder) eventActionJobInputParams(ctx *web.EventContext) (r web.EventR )). Attr("v-model", "vars.presetsDialog"). Width("600").Persistent(true), - ).VSlot("{ form }"), + ).VSlot("{ form, dash }").DashInit("{errorMessages:{},disabled:{}}"), }) r.RunScript = "setTimeout(function(){vars.presetsDialog = true; }, 100)" return @@ -278,15 +278,18 @@ func (b *Builder) eventActionJobProgressing(ctx *web.EventContext) (er web.Event ).ModelValue(int(inst.Progress)).Height(20)).Class("mb-5"), h.If(config.displayLog, actionJobLog(*config.b, inst)), h.If(inst.ProgressText != "", - h.Div().Class("mb-3").Children( + h.Div().Class("mb-3").Attr("v-pre", true).Children( h.RawHTML(inst.ProgressText), ), ), ) - if inst.Status == JobStatusDone || inst.Status == JobStatusException { + switch inst.Status { + case JobStatusDone: + er.RunScript = "vars.actionJobProgressingInterval = 0; setTimeout(function(){ vars.presetsDialog = false; location.reload(); }, 1000);" + case JobStatusException: er.RunScript = "vars.actionJobProgressingInterval = 0;" - } else { + default: er.RunScript = fmt.Sprintf("vars.actionJobProgressingInterval = %d;", config.progressingInterval) } return er, nil diff --git a/worker/builder.go b/worker/builder.go index 6da6af1c1..68e72c353 100644 --- a/worker/builder.go +++ b/worker/builder.go @@ -114,6 +114,10 @@ func (b *Builder) Queue(q Queue) *Builder { return b } +func (b *Builder) GetQueue() Queue { + return b.q +} + func (b *Builder) GetCurrentUserIDFunc(f func(r *http.Request) string) *Builder { b.getCurrentUserIDFunc = f return b @@ -148,6 +152,16 @@ func (b *Builder) getJobBuilder(name string) *JobBuilder { return nil } +// GetJobBuilder returns the job builder for the given name, or nil if not found. +func (b *Builder) GetJobBuilder(name string) *JobBuilder { + return b.getJobBuilder(name) +} + +// CreateJob creates and enqueues a job from an EventContext (requires HTTP request). +func (b *Builder) CreateJob(ctx *web.EventContext, qorJob *QorJob) (*QorJob, error) { + return b.createJob(ctx, qorJob) +} + func (b *Builder) mustGetJobBuilder(name string) *JobBuilder { jb := b.getJobBuilder(name) @@ -305,7 +319,7 @@ func (b *Builder) Install(pb *presets.Builder) error { if qorJob.Job == "" { return errors.New("job is required") } - j, err := b.createJob(ctx, qorJob) + j, err := b.CreateJob(ctx, qorJob) if err != nil { return err } @@ -367,27 +381,37 @@ func (b *Builder) Install(pb *presets.Builder) error { } } - // Set initial refresh interval based on job status - initialInterval := 0 - if inst.Status == JobStatusNew || inst.Status == JobStatusRunning { - initialInterval = 2000 + isTerminal := inst.Status == JobStatusDone || inst.Status == JobStatusException || + inst.Status == JobStatusKilled || inst.Status == JobStatusCancelled + + // For terminal statuses, render job progress directly (no async portal needed). + // This avoids componentByTemplate which can fail if progressText/logs contain + // content that breaks Vue's runtime template compiler. + var jobContent HTMLComponent + if inst.Status == JobStatusScheduled { + jobContent = Components(scheduledJobDetailing...) + } else if isTerminal { + canEdit := editIsAllowed(ctx.R, qorJob.Job) == nil + logs, hasMoreLogs, logErr := b.fetchJobLogs(inst.ID) + if logErr != nil { + return Text(logErr.Error()) + } + jobContent = b.jobProgressing(canEdit, msgr, qorJob.ID, qorJob.Job, inst.Status, inst.Progress, logs, hasMoreLogs, inst.ProgressText) + } else { + jobContent = web.Scope( + web.Portal(). + Loader(web.Plaid().EventFunc("worker_updateJobProgressing"). + URL(eURL). + Query("jobID", fmt.Sprintf("%d", qorJob.ID)). + Query("job", qorJob.Job), + ). + AutoReloadInterval("locals.worker_updateJobProgressingInterval"), + ).VSlot("{ locals }").Init("{worker_updateJobProgressingInterval: 2000}") } return Div( Div(Text(getTJob(ctx.R, qorJob.Job))).Class("mb-3 text-h6 font-weight-regular"), - web.Scope( - If(inst.Status == JobStatusScheduled, - scheduledJobDetailing..., - ).Else( - web.Portal(). - Loader(web.Plaid().EventFunc("worker_updateJobProgressing"). - URL(eURL). - Query("jobID", fmt.Sprintf("%d", qorJob.ID)). - Query("job", qorJob.Job), - ). - AutoReloadInterval("locals.worker_updateJobProgressingInterval"), - ), - ).VSlot("{ locals }").Init(fmt.Sprintf("{worker_updateJobProgressingInterval: %d}", initialInterval)), + jobContent, web.Portal().Name("worker_snackbar"), ) }) @@ -425,8 +449,9 @@ func (b *Builder) Listen() { var jds []*QorJobDefinition for _, jb := range b.jbs { jds = append(jds, &QorJobDefinition{ - Name: jb.name, - Handler: jb.h, + Name: jb.name, + Handler: jb.h, + Concurrency: jb.concurrency, }) } err := b.q.Listen(jds, func(qorJobID uint) (QueJobInterface, error) { @@ -501,6 +526,35 @@ func (b *Builder) createJob(ctx *web.EventContext, qorJob *QorJob) (j *QorJob, e return } +// CreateSystemJob creates a job from system background, without web context checks +func (b *Builder) CreateSystemJob(ctx context.Context, jobName string, args interface{}) (j *QorJob, err error) { + jb := b.mustGetJobBuilder(jobName) + + err = b.db.Transaction(func(tx *gorm.DB) error { + j = &QorJob{ + Job: jobName, + Status: JobStatusNew, + } + if s, ok := args.(Scheduler); ok { + if scheduleTime := s.GetScheduleTime(); scheduleTime != nil { + j.Status = JobStatusScheduled + } + } + + err = tx.Create(j).Error + if err != nil { + return err + } + var inst *QorJobInstance + inst, err = jb.newJobInstance(nil, j.ID, jobName, args, nil) + if err != nil { + return err + } + return b.q.Add(ctx, inst) + }) + return +} + func (b *Builder) eventSelectJob(ctx *web.EventContext) (er web.EventResponse, err error) { job := ctx.R.FormValue("jobName") er.UpdatePortals = append(er.UpdatePortals, @@ -705,6 +759,36 @@ func (b *Builder) eventUpdateJob(ctx *web.EventContext) (er web.EventResponse, e return er, nil } +func (b *Builder) fetchJobLogs(instanceID uint) (logs []string, hasMoreLogs bool, err error) { + var count int64 + err = b.db.Model(&QorJobLog{}). + Where("qor_job_instance_id = ?", instanceID). + Count(&count). + Error + if err != nil { + return nil, false, err + } + if count > 100 { + hasMoreLogs = true + } + if count > 0 { + var mLogs []*QorJobLog + err = b.db.Where("qor_job_instance_id = ?", instanceID). + Order("created_at desc"). + Limit(100). + Find(&mLogs). + Error + if err != nil { + return nil, false, err + } + logs = make([]string, 0, len(mLogs)) + for i := len(mLogs) - 1; i >= 0; i-- { + logs = append(logs, mLogs[i].Log) + } + } + return logs, hasMoreLogs, nil +} + func (b *Builder) eventUpdateJobProgressing(ctx *web.EventContext) (er web.EventResponse, err error) { msgr := i18n.MustGetModuleMessages(ctx.R, I18nWorkerKey, Messages_en_US).(*Messages) @@ -717,34 +801,9 @@ func (b *Builder) eventUpdateJobProgressing(ctx *web.EventContext) (er web.Event } canEdit := editIsAllowed(ctx.R, qorJobName) == nil - logs := make([]string, 0, 100) - hasMoreLogs := false - { - var count int64 - err = b.db.Model(&QorJobLog{}). - Where("qor_job_instance_id = ?", inst.ID). - Count(&count). - Error - if err != nil { - return er, err - } - if count > 100 { - hasMoreLogs = true - } - if count > 0 { - var mLogs []*QorJobLog - err = b.db.Where("qor_job_instance_id = ?", inst.ID). - Order("created_at desc"). - Limit(100). - Find(&mLogs). - Error - if err != nil { - return er, err - } - for i := len(mLogs) - 1; i >= 0; i-- { - logs = append(logs, mLogs[i].Log) - } - } + logs, hasMoreLogs, err := b.fetchJobLogs(inst.ID) + if err != nil { + return er, err } er.Body = b.jobProgressing(canEdit, msgr, qorJobID, qorJobName, inst.Status, inst.Progress, logs, hasMoreLogs, inst.ProgressText) return er, nil @@ -806,7 +865,7 @@ func (b *Builder) jobProgressing( ).Name("worker_hiddenLogs")) } for _, l := range logs { - logLines = append(logLines, P().Style(` + logLines = append(logLines, P().Attr("v-pre", true).Style(` margin: 0; margin-bottom: 4px;`).Children(Text(l))) } @@ -828,9 +887,11 @@ func (b *Builder) jobProgressing( } return Div( - // Portal passes parent Scope's locals to its body - // Use v-on-mounted to set interval when Portal body renders - Div().Style("display:none").Attr("v-on-mounted", fmt.Sprintf("() => { locals.worker_updateJobProgressingInterval = %d }", interval)), + // Portal passes parent Scope's locals to its body. + // Use v-on-mounted to update the reload interval when the portal body renders. + // Guard locals access because this may be rendered directly (not via portal) + // for terminal statuses, where locals is not in scope. + Div().Style("display:none").Attr("v-on-mounted", fmt.Sprintf("() => { if (locals) locals.worker_updateJobProgressingInterval = %d }", interval)), Div(Text(msgr.DetailTitleStatus)).Class("text-caption"), Div().Class("d-flex align-center mb-5").Children( @@ -857,7 +918,7 @@ func (b *Builder) jobProgressing( ), If(progressText != "", - Div().Class("mb-3").Children( + Div().Class("mb-3").Attr("v-pre", true).Children( RawHTML(progressText), ), ), @@ -874,7 +935,7 @@ func (b *Builder) jobProgressing( Query("job", job). Go()), ), - If(status == JobStatusDone, + If(status == JobStatusDone || status == JobStatusException, VBtn(msgr.ActionRerunJob).Color("primary"). Attr("@click", web.Plaid(). URL(eURL). diff --git a/worker/goque.go b/worker/goque.go index 3436fb098..e57751dfc 100644 --- a/worker/goque.go +++ b/worker/goque.go @@ -104,8 +104,8 @@ func (q *goque) Listen(jobDefs []*QorJobDefinition, getJob func(qorJobID uint) ( Mutex: q.q.Mutex(), MaxLockPerSecond: 10, MaxBufferJobsCount: 0, - MaxPerformPerSecond: 2, - MaxConcurrentPerformCount: 1, + MaxPerformPerSecond: float64(2 * jd.Concurrency), + MaxConcurrentPerformCount: jd.Concurrency, Perform: func(ctx context.Context, qj que.Job) (err error) { var job QueJobInterface { diff --git a/worker/job.go b/worker/job.go index a05397c5d..b96bb1dae 100644 --- a/worker/job.go +++ b/worker/job.go @@ -30,6 +30,7 @@ type JobBuilder struct { h JobHandler contextHandler func(*web.EventContext) map[string]interface{} // optional global bool + concurrency int // max concurrent instances (0 = unlimited) } func newJob(b *Builder, name string) *JobBuilder { @@ -41,14 +42,22 @@ func newJob(b *Builder, name string) *JobBuilder { } return &JobBuilder{ - b: b, - name: name, - global: true, + b: b, + name: name, + global: true, + concurrency: 1, } } type JobHandler func(context.Context, QorJobInterface) error +// Concurrency sets the maximum number of concurrent instances for this job. +// A value of 0 means unlimited (default). +func (jb *JobBuilder) Concurrency(n int) *JobBuilder { + jb.concurrency = n + return jb +} + // r should be ptr to struct func (jb *JobBuilder) Resource(r interface{}) *JobBuilder { { @@ -94,6 +103,11 @@ func (jb *JobBuilder) GetResourceBuilder() *presets.ModelBuilder { return jb.rmb } +// GetResource returns the resource prototype for this job. +func (jb *JobBuilder) GetResource() interface{} { + return jb.r +} + func (jb *JobBuilder) Handler(h JobHandler) *JobBuilder { jb.h = h return jb @@ -197,7 +211,7 @@ func (jb *JobBuilder) newJobInstance( Job: qorJobName, Status: JobStatusNew, } - if jb.b.getCurrentUserIDFunc != nil { + if r != nil && jb.b.getCurrentUserIDFunc != nil { inst.Operator = jb.b.getCurrentUserIDFunc(r) } err := jb.b.db.Create(&inst).Error @@ -260,6 +274,10 @@ func (job *QorJobInstance) GetJobInfo() (ji *JobInfo, err error) { }, nil } +func (job *QorJobInstance) SetJobBuilder(jb *JobBuilder) { + job.jb = jb +} + func (job *QorJobInstance) GetStatus() string { return job.Status } diff --git a/worker/queue.go b/worker/queue.go index 76d8db6af..901325511 100644 --- a/worker/queue.go +++ b/worker/queue.go @@ -5,8 +5,9 @@ import "context" //go:generate moq -pkg mock -out mock/queue.go . Queue type QorJobDefinition struct { - Name string - Handler JobHandler + Name string + Handler JobHandler + Concurrency int } type Queue interface {