Skip to content
This repository was archived by the owner on Jan 12, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ CC = gcc
CFLAGS = -g -Wall --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -fPIC -I. -Ithirdparty
BUILD = build

CFLAGS += -Wmissing-prototypes
CFLAGS += -Wstrict-prototypes
CFLAGS += -Wmissing-declarations

all: $(BUILD)/libcommon.a

$(BUILD)/libcommon.a: event_loop.o common.o task.o io.o state/redis.o
Expand Down
3 changes: 3 additions & 0 deletions common.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <string.h>
#include <errno.h>
#include <sys/socket.h>

#ifdef NDEBUG
#define LOG_DEBUG(M, ...)
Expand Down Expand Up @@ -38,4 +39,6 @@ unique_id globally_unique_id(void);
* UNIQUE_ID_SIZE + 1 */
char *sha1_to_hex(const unsigned char *sha1, char *buffer);

typedef unique_id object_id;

#endif
7 changes: 7 additions & 0 deletions event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,10 @@ void event_loop_free(event_loop *loop) {
utarray_free(loop->items);
utarray_free(loop->waiting);
}

/* Return the type of connection. */
int event_loop_type(event_loop *loop, int64_t index) {
event_loop_item *item =
(event_loop_item *) utarray_eltptr(loop->items, index);
return item->type;
}
1 change: 1 addition & 0 deletions event_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ int64_t event_loop_size(event_loop *loop);
struct pollfd *event_loop_get(event_loop *loop, int64_t index);
void event_loop_set_data(event_loop *loop, int64_t index, void *data);
void *event_loop_get_data(event_loop *loop, int64_t index);
int event_loop_type(event_loop *loop, int64_t index);

#endif
18 changes: 13 additions & 5 deletions state/object_table.h
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
#include "common.h"
#include "db.h"

typedef void (*lookup_callback)(void *);
/* The callback that is called when the result of a lookup
* in the object table comes back. The callback should not free
* the ip addresses of the plasma managers. */
typedef void (*lookup_callback)(object_id obj_id,
struct sockaddr **managers,
int64_t num_nodes);

/* Register a new object with the directory. */
void object_table_add(db_conn *db, unique_id object_id);
/* TODO(pcm): Retry, print for each attempt. */
void object_table_add(db_conn *db, object_id obj_id);

/* Remove object from the directory */
void object_table_remove(db_conn *db, unique_id object_id);
/* Remove object from the directory. */
void object_table_remove(db_conn *db,
object_id obj_id,
struct sockaddr *manager);

/* Look up entry from the directory */
void object_table_lookup(db_conn *db,
unique_id object_id,
object_id obj_id,
lookup_callback callback);
31 changes: 31 additions & 0 deletions state/redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "common.h"
#include "db.h"
#include "object_table.h"
#include "task_queue.h"
#include "event_loop.h"
#include "redis.h"

Expand Down Expand Up @@ -186,3 +187,33 @@ void object_table_lookup(db_conn *db,
LOG_REDIS_ERR(db->context, "error in object_table lookup");
}
}

#define TASK_QUEUE_ARG_SIZE 8
#define QUEUE_PREFIX "queue:"
#define QUEUE_ID_PREFIX "id:"
#define QUEUE_VAL_PREFIX "val:"

void task_queue_submit_task(db_conn *db, task_iid task_iid, task_spec* task) {
int argc = 2 + 2 * task_num_args(task);
char **argv = malloc(sizeof(char *) * argc);
argv[0] = "HMSET";
int64_t size = strlen(QUEUE_PREFIX) + 2 * UNIQUE_ID_SIZE + 1;
argv[1] = malloc(size);
strcpy(argv[1], QUEUE_PREFIX);
sha1_to_hex(&task_iid.id[0], argv[1] + strlen(QUEUE_PREFIX));
for (int i = 0; i < task_num_args(task); ++i) {
int64_t keysize = strlen(QUEUE_ID_PREFIX) + TASK_QUEUE_ARG_SIZE;
argv[2 + 2 * i] = malloc(keysize);
snprintf(argv[2 + 2 * i], keysize, QUEUE_ID_PREFIX "%d", i);
argv[2 + 2 * i + 1] = malloc(2 * UNIQUE_ID_SIZE + 1);
sha1_to_hex(&task_arg_id(task, i)->id[0], argv[2 + 2 * i + 1]);
}
redisAsyncCommandArgv(db->context, NULL, NULL, argc, (const char**) argv, NULL);
if (db->context->err) {
LOG_REDIS_ERR(db->context, "error while submitting task");
}
for (int i = 1; i < argc; ++i) {
free(argv[i]);
}
free(argv);
}
32 changes: 32 additions & 0 deletions state/task_queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#ifndef TASK_QUEUE_H
#define TASK_QUEUE_H

#include "db.h"
#include "task.h"

/* The task ID is a deterministic hash of the function ID that
* the task executes and the argument IDs or argument values */
typedef unique_id task_id;

/* The task instance ID is a globally unique ID generated which
* identifies this particular execution of the task */
typedef unique_id task_iid;

/* The node id is an identifier for the node the task is
* scheduled on */
typedef unique_id node_id;

/* Callback for subscribing to the task queue. The only argument this
* callback gets is the task_id of the. */
typedef void (*task_queue_callback)(task_iid *task_iid, task_spec* task);

/* Submit task to the global scheduler. */
void task_queue_submit_task(db_conn *db, task_iid task_iid, task_spec* task);

/* Submit task to a local scheduler based on the decision made by the global scheduler. */
void task_queue_schedule_task(db_conn *db, task_iid task_iid, node_id node);

/* Subscribe to task queue. */
void task_queue_register_callback(db_conn *db, task_queue_callback callback);

#endif
14 changes: 14 additions & 0 deletions state/task_table.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#ifndef TASK_TABLE_H
#define TASK_TABLE_H

#include "db.h"
#include "task.h"

/* Add task to the task table, handle errors here. */
status task_table_add_task(db_conn *db, task_iid task_iid, task_spec *task);

/* Get specific task from the task table. */
status task_table_get_task(db_conn *db, task_iid task_iid, task_spec *task);


#endif /* TASK_TABLE_H */
37 changes: 37 additions & 0 deletions test/db_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "state/db.h"
#include "state/object_table.h"
#include "state/redis.h"
#include "task.h"

SUITE(db_tests);

Expand Down Expand Up @@ -56,8 +57,44 @@ TEST object_table_lookup_test(void) {
PASS();
}

task_spec *example_task(void) {
function_id func_id = globally_unique_id();
task_spec *task = alloc_task_spec(func_id, 2, 1, 0);
task_args_add_ref(task, globally_unique_id());
task_args_add_ref(task, globally_unique_id());
return task;
}

/*
TEST task_queue_test(void) {
event_loop loop;
event_loop_init(&loop);
db_conn conn;
db_connect("127.0.0.1", 6379, "local_scheduler", "", -1, &conn);
int64_t index = db_attach(&conn, &loop, 0);

task_spec *task = example_task();
task_queue_submit_task(&conn, globally_unique_id(), task);
while (1) {
int num_ready = event_loop_poll(&loop);
if (num_ready < 0) {
exit(-1);
}
for (int i = 0; i < event_loop_size(&loop); ++i) {
struct pollfd *waiting = event_loop_get(&loop, i);
if (waiting->revents == 0)
continue;
if (i == index) {
db_event(&conn);
}
}
}
}
*/

SUITE(db_tests) {
RUN_TEST(object_table_lookup_test);
/* RUN_TEST(task_queue_test); */
}

GREATEST_MAIN_DEFS();
Expand Down