Skip to content
This repository was archived by the owner on Jan 12, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions state/task_log.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#ifndef TASK_LOG_H
#define TASK_LOG_H

#include "db.h"
#include "task.h"

/* The task log is a message bus that is used for all communication between
* local and global schedulers (and also persisted to the state database).
* Here are examples of events that are recorded by the task log:
*
* 1) local scheduler writes it when submits a task to the global scheduler;
* 2) global scheduler reads it to get the task submitted by local schedulers;
* 3) global scheduler writes it when assigning the task to a local scheduler;
* 4) local scheduler reads it to get its tasks assigned by global scheduler;
* 5) local scheduler writes it when a task finishes execution;
* 6) global scheduler reads it to get the tasks that have finished; */

/* Callback for subscribing to the task log. */
typedef void (*task_log_callback)(task_instance* task_instance, void *userdata);

/* Initially add a task instance to the task log. */
void task_log_add_task(db_handle *db, task_iid task_iid, task_instance* task_instance);

/* Update task instance in the task log. */
void task_log_update_task(db_handle *db, task_iid task_iid, int32_t state, node_id node);

/* Register callback for a certain event. The node specifies the node whose
* events we want to listen to. If you want to listen to all events for this node,
* use state_filter = TASK_WAITING | TASK_SCHEDULED | TASK_RUNNING | TASK_DONE.
* If you want to register to updates from all nodes, set node = NIL_ID. */
void task_log_register_callback(db_handle *db, task_log_callback callback, node_id node, int32_t state_filter, void *userdata);

#endif /* TASK_LOG_H */
33 changes: 0 additions & 33 deletions state/task_queue.h

This file was deleted.

62 changes: 61 additions & 1 deletion task.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@

#ifndef TASK_H
#define TASK_H

/* This API specifies the task data structure. It is in C so we can
/* This API specifies the task data structures. It is in C so we can
* easily construct tasks from other languages like Python. The datastructures
* are also defined in such a way that memory is contiguous and all pointers
* are relative, so that we can memcpy the datastructure and ship it over the
Expand All @@ -15,6 +16,24 @@
typedef unique_id function_id;
typedef unique_id object_id;

/* The task ID is a deterministic hash of the function ID that
* the task executes and the argument IDs or argument values */
typedef unique_id task_id;

/* The task instance ID is a globally unique ID generated which
* identifies this particular execution of the task */
typedef unique_id task_iid;

/* The node id is an identifier for the node the task is
* scheduled on */
typedef unique_id node_id;

/*
* TASK SPECIFICATIONS: Contain all the information neccessary
* to execute the task (function id, arguments, return object ids).
*
*/

typedef struct task_spec_impl task_spec;

/* If argument is passed by value or reference. */
Expand Down Expand Up @@ -68,4 +87,45 @@ void print_task(task_spec *spec, UT_string *output);
/* Parse task as printed by print_task. */
task_spec *parse_task(char *task_string, int64_t task_length);

/*
* SCHEDULED TASK: Contains information about a scheduled task:
* the task iid, the task specification and the task status
* (WAITING, SCHEDULED, RUNNING, DONE) and which node the
* task is scheduled on.
*
*/

/* The scheduling_state can be used as a flag when we are listening for an event,
* for example TASK_WAITING | TASK_SCHEDULED. */
enum scheduling_state {
TASK_WAITING = 1,
TASK_SCHEDULED = 2,
TASK_RUNNING = 4,
TASK_DONE = 8
};

/* A task instance is one execution of a task specification.
* It has a unique instance id, a state of execution (see scheduling_state)
* and a node it is scheduled on or running on. */
typedef struct task_instance_impl task_instance;

/* Allocate and initialize a new task instance. Must be freed with
* scheduled_task_free after use. */
task_instance *make_task_instance(task_iid task_iid, task_spec *task, int32_t state, node_id node);

/* Size of task instance structure in bytes. */
int64_t task_instance_size(task_instance *instance);

/* Instance ID of the task instance. */
task_iid *task_instance_id(task_instance *instance);

/* Node this task instance has been assigned to or is running on. */
node_id *task_instance_node(task_instance *instance);

/* Task specification of this task instance. */
task_spec *task_instance_task_spec(task_instance *instance);

/* Free this task instance datastructure. */
void task_instance_free(task_instance *instance);

#endif