Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions worker/action_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (b *Builder) eventActionJobCreate(ctx *web.EventContext) (r web.EventRespon
return r, fmt.Errorf("job %s not found", jobName)
}

job, err := b.createJob(ctx, qorJob)
job, err := b.CreateJob(ctx, qorJob)
if err != nil {
return
}
Expand Down Expand Up @@ -177,7 +177,7 @@ func (b *Builder) eventActionJobInputParams(ctx *web.EventContext) (r web.EventR
)).
Attr("v-model", "vars.presetsDialog").
Width("600").Persistent(true),
).VSlot("{ form }"),
).VSlot("{ form, dash }").DashInit("{errorMessages:{},disabled:{}}"),
})
r.RunScript = "setTimeout(function(){vars.presetsDialog = true; }, 100)"
return
Expand Down Expand Up @@ -278,15 +278,18 @@ func (b *Builder) eventActionJobProgressing(ctx *web.EventContext) (er web.Event
).ModelValue(int(inst.Progress)).Height(20)).Class("mb-5"),
h.If(config.displayLog, actionJobLog(*config.b, inst)),
h.If(inst.ProgressText != "",
h.Div().Class("mb-3").Children(
h.Div().Class("mb-3").Attr("v-pre", true).Children(
h.RawHTML(inst.ProgressText),
),
),
)

if inst.Status == JobStatusDone || inst.Status == JobStatusException {
switch inst.Status {
case JobStatusDone:
er.RunScript = "vars.actionJobProgressingInterval = 0; setTimeout(function(){ vars.presetsDialog = false; location.reload(); }, 1000);"
case JobStatusException:
er.RunScript = "vars.actionJobProgressingInterval = 0;"
} else {
default:
er.RunScript = fmt.Sprintf("vars.actionJobProgressingInterval = %d;", config.progressingInterval)
}
return er, nil
Expand Down
169 changes: 115 additions & 54 deletions worker/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ func (b *Builder) Queue(q Queue) *Builder {
return b
}

func (b *Builder) GetQueue() Queue {
return b.q
}

func (b *Builder) GetCurrentUserIDFunc(f func(r *http.Request) string) *Builder {
b.getCurrentUserIDFunc = f
return b
Expand Down Expand Up @@ -148,6 +152,16 @@ func (b *Builder) getJobBuilder(name string) *JobBuilder {
return nil
}

// GetJobBuilder returns the job builder for the given name, or nil if not found.
func (b *Builder) GetJobBuilder(name string) *JobBuilder {
return b.getJobBuilder(name)
}

// CreateJob creates and enqueues a job from an EventContext (requires HTTP request).
func (b *Builder) CreateJob(ctx *web.EventContext, qorJob *QorJob) (*QorJob, error) {
return b.createJob(ctx, qorJob)
}

func (b *Builder) mustGetJobBuilder(name string) *JobBuilder {
jb := b.getJobBuilder(name)

Expand Down Expand Up @@ -305,7 +319,7 @@ func (b *Builder) Install(pb *presets.Builder) error {
if qorJob.Job == "" {
return errors.New("job is required")
}
j, err := b.createJob(ctx, qorJob)
j, err := b.CreateJob(ctx, qorJob)
if err != nil {
return err
}
Expand Down Expand Up @@ -367,27 +381,37 @@ func (b *Builder) Install(pb *presets.Builder) error {
}
}

// Set initial refresh interval based on job status
initialInterval := 0
if inst.Status == JobStatusNew || inst.Status == JobStatusRunning {
initialInterval = 2000
isTerminal := inst.Status == JobStatusDone || inst.Status == JobStatusException ||
inst.Status == JobStatusKilled || inst.Status == JobStatusCancelled

// For terminal statuses, render job progress directly (no async portal needed).
// This avoids componentByTemplate which can fail if progressText/logs contain
// content that breaks Vue's runtime template compiler.
var jobContent HTMLComponent
if inst.Status == JobStatusScheduled {
jobContent = Components(scheduledJobDetailing...)
} else if isTerminal {
canEdit := editIsAllowed(ctx.R, qorJob.Job) == nil
logs, hasMoreLogs, logErr := b.fetchJobLogs(inst.ID)
if logErr != nil {
return Text(logErr.Error())
}
jobContent = b.jobProgressing(canEdit, msgr, qorJob.ID, qorJob.Job, inst.Status, inst.Progress, logs, hasMoreLogs, inst.ProgressText)
} else {
jobContent = web.Scope(
web.Portal().
Loader(web.Plaid().EventFunc("worker_updateJobProgressing").
URL(eURL).
Query("jobID", fmt.Sprintf("%d", qorJob.ID)).
Query("job", qorJob.Job),
).
AutoReloadInterval("locals.worker_updateJobProgressingInterval"),
).VSlot("{ locals }").Init("{worker_updateJobProgressingInterval: 2000}")
}

return Div(
Div(Text(getTJob(ctx.R, qorJob.Job))).Class("mb-3 text-h6 font-weight-regular"),
web.Scope(
If(inst.Status == JobStatusScheduled,
scheduledJobDetailing...,
).Else(
web.Portal().
Loader(web.Plaid().EventFunc("worker_updateJobProgressing").
URL(eURL).
Query("jobID", fmt.Sprintf("%d", qorJob.ID)).
Query("job", qorJob.Job),
).
AutoReloadInterval("locals.worker_updateJobProgressingInterval"),
),
).VSlot("{ locals }").Init(fmt.Sprintf("{worker_updateJobProgressingInterval: %d}", initialInterval)),
jobContent,
web.Portal().Name("worker_snackbar"),
)
})
Expand Down Expand Up @@ -425,8 +449,9 @@ func (b *Builder) Listen() {
var jds []*QorJobDefinition
for _, jb := range b.jbs {
jds = append(jds, &QorJobDefinition{
Name: jb.name,
Handler: jb.h,
Name: jb.name,
Handler: jb.h,
Concurrency: jb.concurrency,
})
}
err := b.q.Listen(jds, func(qorJobID uint) (QueJobInterface, error) {
Expand Down Expand Up @@ -501,6 +526,35 @@ func (b *Builder) createJob(ctx *web.EventContext, qorJob *QorJob) (j *QorJob, e
return
}

// CreateSystemJob creates a job from system background, without web context checks
func (b *Builder) CreateSystemJob(ctx context.Context, jobName string, args interface{}) (j *QorJob, err error) {
jb := b.mustGetJobBuilder(jobName)

err = b.db.Transaction(func(tx *gorm.DB) error {
j = &QorJob{
Job: jobName,
Status: JobStatusNew,
}
if s, ok := args.(Scheduler); ok {
if scheduleTime := s.GetScheduleTime(); scheduleTime != nil {
j.Status = JobStatusScheduled
}
}

err = tx.Create(j).Error
if err != nil {
return err
}
var inst *QorJobInstance
inst, err = jb.newJobInstance(nil, j.ID, jobName, args, nil)
if err != nil {
return err
}
return b.q.Add(ctx, inst)
})
return
}

func (b *Builder) eventSelectJob(ctx *web.EventContext) (er web.EventResponse, err error) {
job := ctx.R.FormValue("jobName")
er.UpdatePortals = append(er.UpdatePortals,
Expand Down Expand Up @@ -705,6 +759,36 @@ func (b *Builder) eventUpdateJob(ctx *web.EventContext) (er web.EventResponse, e
return er, nil
}

func (b *Builder) fetchJobLogs(instanceID uint) (logs []string, hasMoreLogs bool, err error) {
var count int64
err = b.db.Model(&QorJobLog{}).
Where("qor_job_instance_id = ?", instanceID).
Count(&count).
Error
if err != nil {
return nil, false, err
}
if count > 100 {
hasMoreLogs = true
}
if count > 0 {
var mLogs []*QorJobLog
err = b.db.Where("qor_job_instance_id = ?", instanceID).
Order("created_at desc").
Limit(100).
Find(&mLogs).
Error
if err != nil {
return nil, false, err
}
logs = make([]string, 0, len(mLogs))
for i := len(mLogs) - 1; i >= 0; i-- {
logs = append(logs, mLogs[i].Log)
}
}
return logs, hasMoreLogs, nil
}

func (b *Builder) eventUpdateJobProgressing(ctx *web.EventContext) (er web.EventResponse, err error) {
msgr := i18n.MustGetModuleMessages(ctx.R, I18nWorkerKey, Messages_en_US).(*Messages)

Expand All @@ -717,34 +801,9 @@ func (b *Builder) eventUpdateJobProgressing(ctx *web.EventContext) (er web.Event
}

canEdit := editIsAllowed(ctx.R, qorJobName) == nil
logs := make([]string, 0, 100)
hasMoreLogs := false
{
var count int64
err = b.db.Model(&QorJobLog{}).
Where("qor_job_instance_id = ?", inst.ID).
Count(&count).
Error
if err != nil {
return er, err
}
if count > 100 {
hasMoreLogs = true
}
if count > 0 {
var mLogs []*QorJobLog
err = b.db.Where("qor_job_instance_id = ?", inst.ID).
Order("created_at desc").
Limit(100).
Find(&mLogs).
Error
if err != nil {
return er, err
}
for i := len(mLogs) - 1; i >= 0; i-- {
logs = append(logs, mLogs[i].Log)
}
}
logs, hasMoreLogs, err := b.fetchJobLogs(inst.ID)
if err != nil {
return er, err
}
er.Body = b.jobProgressing(canEdit, msgr, qorJobID, qorJobName, inst.Status, inst.Progress, logs, hasMoreLogs, inst.ProgressText)
return er, nil
Expand Down Expand Up @@ -806,7 +865,7 @@ func (b *Builder) jobProgressing(
).Name("worker_hiddenLogs"))
}
for _, l := range logs {
logLines = append(logLines, P().Style(`
logLines = append(logLines, P().Attr("v-pre", true).Style(`
margin: 0;
margin-bottom: 4px;`).Children(Text(l)))
}
Expand All @@ -828,9 +887,11 @@ func (b *Builder) jobProgressing(
}

return Div(
// Portal passes parent Scope's locals to its body
// Use v-on-mounted to set interval when Portal body renders
Div().Style("display:none").Attr("v-on-mounted", fmt.Sprintf("() => { locals.worker_updateJobProgressingInterval = %d }", interval)),
// Portal passes parent Scope's locals to its body.
// Use v-on-mounted to update the reload interval when the portal body renders.
// Guard locals access because this may be rendered directly (not via portal)
// for terminal statuses, where locals is not in scope.
Div().Style("display:none").Attr("v-on-mounted", fmt.Sprintf("() => { if (locals) locals.worker_updateJobProgressingInterval = %d }", interval)),

Div(Text(msgr.DetailTitleStatus)).Class("text-caption"),
Div().Class("d-flex align-center mb-5").Children(
Expand All @@ -857,7 +918,7 @@ func (b *Builder) jobProgressing(
),

If(progressText != "",
Div().Class("mb-3").Children(
Div().Class("mb-3").Attr("v-pre", true).Children(
RawHTML(progressText),
),
),
Expand All @@ -874,7 +935,7 @@ func (b *Builder) jobProgressing(
Query("job", job).
Go()),
),
If(status == JobStatusDone,
If(status == JobStatusDone || status == JobStatusException,
VBtn(msgr.ActionRerunJob).Color("primary").
Attr("@click", web.Plaid().
URL(eURL).
Expand Down
4 changes: 2 additions & 2 deletions worker/goque.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ func (q *goque) Listen(jobDefs []*QorJobDefinition, getJob func(qorJobID uint) (
Mutex: q.q.Mutex(),
MaxLockPerSecond: 10,
MaxBufferJobsCount: 0,
MaxPerformPerSecond: 2,
MaxConcurrentPerformCount: 1,
MaxPerformPerSecond: float64(2 * jd.Concurrency),
MaxConcurrentPerformCount: jd.Concurrency,
Perform: func(ctx context.Context, qj que.Job) (err error) {
var job QueJobInterface
{
Expand Down
26 changes: 22 additions & 4 deletions worker/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type JobBuilder struct {
h JobHandler
contextHandler func(*web.EventContext) map[string]interface{} // optional
global bool
concurrency int // max concurrent instances (0 = unlimited)
}

func newJob(b *Builder, name string) *JobBuilder {
Expand All @@ -41,14 +42,22 @@ func newJob(b *Builder, name string) *JobBuilder {
}

return &JobBuilder{
b: b,
name: name,
global: true,
b: b,
name: name,
global: true,
concurrency: 1,
}
}

type JobHandler func(context.Context, QorJobInterface) error

// Concurrency sets the maximum number of concurrent instances for this job.
// A value of 0 means unlimited (default).
func (jb *JobBuilder) Concurrency(n int) *JobBuilder {
jb.concurrency = n
return jb
}

// r should be ptr to struct
func (jb *JobBuilder) Resource(r interface{}) *JobBuilder {
{
Expand Down Expand Up @@ -94,6 +103,11 @@ func (jb *JobBuilder) GetResourceBuilder() *presets.ModelBuilder {
return jb.rmb
}

// GetResource returns the resource prototype for this job.
func (jb *JobBuilder) GetResource() interface{} {
return jb.r
}

func (jb *JobBuilder) Handler(h JobHandler) *JobBuilder {
jb.h = h
return jb
Expand Down Expand Up @@ -197,7 +211,7 @@ func (jb *JobBuilder) newJobInstance(
Job: qorJobName,
Status: JobStatusNew,
}
if jb.b.getCurrentUserIDFunc != nil {
if r != nil && jb.b.getCurrentUserIDFunc != nil {
inst.Operator = jb.b.getCurrentUserIDFunc(r)
}
err := jb.b.db.Create(&inst).Error
Expand Down Expand Up @@ -260,6 +274,10 @@ func (job *QorJobInstance) GetJobInfo() (ji *JobInfo, err error) {
}, nil
}

func (job *QorJobInstance) SetJobBuilder(jb *JobBuilder) {
job.jb = jb
}

func (job *QorJobInstance) GetStatus() string {
return job.Status
}
Expand Down
5 changes: 3 additions & 2 deletions worker/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import "context"
//go:generate moq -pkg mock -out mock/queue.go . Queue

type QorJobDefinition struct {
Name string
Handler JobHandler
Name string
Handler JobHandler
Concurrency int
}

type Queue interface {
Expand Down