From 64ecb315d8ea6d39992ff6b7a96f2626e654d1ff Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sun, 18 Sep 2016 18:11:43 -0700 Subject: [PATCH 1/4] temp commit --- state/redis.c | 5 +++++ state/task_queue.h | 24 ++++++++++++++++++++++++ 2 files changed, 29 insertions(+) create mode 100644 state/task_queue.h diff --git a/state/redis.c b/state/redis.c index a8029a0..509d6ed 100644 --- a/state/redis.c +++ b/state/redis.c @@ -5,6 +5,7 @@ #include "common.h" #include "db.h" #include "object_table.h" +#include "task_queue.h" #include "event_loop.h" #include "redis.h" @@ -186,3 +187,7 @@ void object_table_lookup(db_conn *db, LOG_REDIS_ERR(db->context, "error in object_table lookup"); } } + +void task_queue_submit_task(db_conn *db, task_iid task_iid, task_spec* task) { + redisAsyncCommand(db->context, NULL, NULL, ) +} diff --git a/state/task_queue.h b/state/task_queue.h new file mode 100644 index 0000000..ef44a1e --- /dev/null +++ b/state/task_queue.h @@ -0,0 +1,24 @@ +#ifndef TASK_QUEUE_H +#define TASK_QUEUE_H + +#include "db.h" +#include "task.h" + +typedef unique_id task_id; +typedef unique_id task_iid; +typedef unique_id node_id; + +/* Callback for subscribing to the task queue. The only argument this + * callback gets is the task_id of the. */ +typedef void (*task_queue_callback)(task_iid *task_iid, task_spec* task); + +/* Submit task to the global scheduler. */ +void task_queue_submit_task(db_conn *db, task_iid task_iid, task_spec* task); + +/* Submit task to a local scheduler based on the decision made by the global scheduler. */ +void task_queue_schedule_task(db_conn *db, task_iid task_iid, node_id node); + +/* Subscribe to task queue. */ +void task_queue_register_callback(db_conn *db, task_queue_callback callback); + +#endif From 8e1c453f5ed958089ece4d7943cf8ed5e5c0835a Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sun, 18 Sep 2016 19:56:00 -0700 Subject: [PATCH 2/4] update --- state/redis.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/state/redis.c b/state/redis.c index 509d6ed..273f785 100644 --- a/state/redis.c +++ b/state/redis.c @@ -189,5 +189,6 @@ void object_table_lookup(db_conn *db, } void task_queue_submit_task(db_conn *db, task_iid task_iid, task_spec* task) { - redisAsyncCommand(db->context, NULL, NULL, ) + redisAsyncCommand(db->context, NULL, NULL, "HMSET hello world test test"); + // redisAsyncCommandArgv(db->context, NULL, NULL, ); } From 621e2e419526cbaa3b85f3fa63c8119a6365aed5 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Mon, 19 Sep 2016 14:09:16 -0700 Subject: [PATCH 3/4] add state APIs --- Makefile | 4 ---- common.h | 3 +++ event_loop.c | 7 +++++++ event_loop.h | 1 + state/object_table.h | 18 +++++++++++++----- state/redis.c | 29 +++++++++++++++++++++++++++-- state/task_queue.h | 8 ++++++++ test/db_tests.c | 37 +++++++++++++++++++++++++++++++++++++ 8 files changed, 96 insertions(+), 11 deletions(-) diff --git a/Makefile b/Makefile index 1f3deeb..29b7bef 100644 --- a/Makefile +++ b/Makefile @@ -2,10 +2,6 @@ CC = gcc CFLAGS = -g -Wall --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -fPIC -I. -Ithirdparty BUILD = build -CFLAGS += -Wmissing-prototypes -CFLAGS += -Wstrict-prototypes -CFLAGS += -Wmissing-declarations - all: $(BUILD)/libcommon.a $(BUILD)/libcommon.a: event_loop.o common.o task.o io.o state/redis.o diff --git a/common.h b/common.h index 6dc3601..73ed418 100644 --- a/common.h +++ b/common.h @@ -3,6 +3,7 @@ #include #include +#include #ifdef NDEBUG #define LOG_DEBUG(M, ...) @@ -38,4 +39,6 @@ unique_id globally_unique_id(void); * UNIQUE_ID_SIZE + 1 */ char *sha1_to_hex(const unsigned char *sha1, char *buffer); +typedef unique_id object_id; + #endif diff --git a/event_loop.c b/event_loop.c index ebc6ebc..dd9485e 100644 --- a/event_loop.c +++ b/event_loop.c @@ -89,3 +89,10 @@ void event_loop_free(event_loop *loop) { utarray_free(loop->items); utarray_free(loop->waiting); } + +/* Return the type of connection. */ +int event_loop_type(event_loop *loop, int64_t index) { + event_loop_item *item = + (event_loop_item *) utarray_eltptr(loop->items, index); + return item->type; +} diff --git a/event_loop.h b/event_loop.h index 0903bb9..a96ec46 100644 --- a/event_loop.h +++ b/event_loop.h @@ -34,5 +34,6 @@ int64_t event_loop_size(event_loop *loop); struct pollfd *event_loop_get(event_loop *loop, int64_t index); void event_loop_set_data(event_loop *loop, int64_t index, void *data); void *event_loop_get_data(event_loop *loop, int64_t index); +int event_loop_type(event_loop *loop, int64_t index); #endif diff --git a/state/object_table.h b/state/object_table.h index 6b4d62e..642434c 100644 --- a/state/object_table.h +++ b/state/object_table.h @@ -1,15 +1,23 @@ #include "common.h" #include "db.h" -typedef void (*lookup_callback)(void *); +/* The callback that is called when the result of a lookup + * in the object table comes back. The callback should not free + * the ip addresses of the plasma managers. */ +typedef void (*lookup_callback)(object_id obj_id, + struct sockaddr **managers, + int64_t num_nodes); /* Register a new object with the directory. */ -void object_table_add(db_conn *db, unique_id object_id); +/* TODO(pcm): Retry, print for each attempt. */ +void object_table_add(db_conn *db, object_id obj_id); -/* Remove object from the directory */ -void object_table_remove(db_conn *db, unique_id object_id); +/* Remove object from the directory. */ +void object_table_remove(db_conn *db, + object_id obj_id, + struct sockaddr *manager); /* Look up entry from the directory */ void object_table_lookup(db_conn *db, - unique_id object_id, + object_id obj_id, lookup_callback callback); diff --git a/state/redis.c b/state/redis.c index 273f785..20a5e51 100644 --- a/state/redis.c +++ b/state/redis.c @@ -188,7 +188,32 @@ void object_table_lookup(db_conn *db, } } +#define TASK_QUEUE_ARG_SIZE 8 +#define QUEUE_PREFIX "queue:" +#define QUEUE_ID_PREFIX "id:" +#define QUEUE_VAL_PREFIX "val:" + void task_queue_submit_task(db_conn *db, task_iid task_iid, task_spec* task) { - redisAsyncCommand(db->context, NULL, NULL, "HMSET hello world test test"); - // redisAsyncCommandArgv(db->context, NULL, NULL, ); + int argc = 2 + 2 * task_num_args(task); + char **argv = malloc(sizeof(char *) * argc); + argv[0] = "HMSET"; + int64_t size = strlen(QUEUE_PREFIX) + 2 * UNIQUE_ID_SIZE + 1; + argv[1] = malloc(size); + strcpy(argv[1], QUEUE_PREFIX); + sha1_to_hex(&task_iid.id[0], argv[1] + strlen(QUEUE_PREFIX)); + for (int i = 0; i < task_num_args(task); ++i) { + int64_t keysize = strlen(QUEUE_ID_PREFIX) + TASK_QUEUE_ARG_SIZE; + argv[2 + 2 * i] = malloc(keysize); + snprintf(argv[2 + 2 * i], keysize, QUEUE_ID_PREFIX "%d", i); + argv[2 + 2 * i + 1] = malloc(2 * UNIQUE_ID_SIZE + 1); + sha1_to_hex(&task_arg_id(task, i)->id[0], argv[2 + 2 * i + 1]); + } + redisAsyncCommandArgv(db->context, NULL, NULL, argc, (const char**) argv, NULL); + if (db->context->err) { + LOG_REDIS_ERR(db->context, "error while submitting task"); + } + for (int i = 1; i < argc; ++i) { + free(argv[i]); + } + free(argv); } diff --git a/state/task_queue.h b/state/task_queue.h index ef44a1e..8de5c6b 100644 --- a/state/task_queue.h +++ b/state/task_queue.h @@ -4,8 +4,16 @@ #include "db.h" #include "task.h" +/* The task ID is a deterministic hash of the function ID that + * the task executes and the argument IDs or argument values */ typedef unique_id task_id; + +/* The task instance ID is a globally unique ID generated which + * identifies this particular execution of the task */ typedef unique_id task_iid; + +/* The node id is an identifier for the node the task is + * scheduled on */ typedef unique_id node_id; /* Callback for subscribing to the task queue. The only argument this diff --git a/test/db_tests.c b/test/db_tests.c index b3a0d58..6fd0091 100644 --- a/test/db_tests.c +++ b/test/db_tests.c @@ -6,6 +6,7 @@ #include "state/db.h" #include "state/object_table.h" #include "state/redis.h" +#include "task.h" SUITE(db_tests); @@ -56,8 +57,44 @@ TEST object_table_lookup_test(void) { PASS(); } +task_spec *example_task(void) { + function_id func_id = globally_unique_id(); + task_spec *task = alloc_task_spec(func_id, 2, 1, 0); + task_args_add_ref(task, globally_unique_id()); + task_args_add_ref(task, globally_unique_id()); + return task; +} + +/* +TEST task_queue_test(void) { + event_loop loop; + event_loop_init(&loop); + db_conn conn; + db_connect("127.0.0.1", 6379, "local_scheduler", "", -1, &conn); + int64_t index = db_attach(&conn, &loop, 0); + + task_spec *task = example_task(); + task_queue_submit_task(&conn, globally_unique_id(), task); + while (1) { + int num_ready = event_loop_poll(&loop); + if (num_ready < 0) { + exit(-1); + } + for (int i = 0; i < event_loop_size(&loop); ++i) { + struct pollfd *waiting = event_loop_get(&loop, i); + if (waiting->revents == 0) + continue; + if (i == index) { + db_event(&conn); + } + } + } +} +*/ + SUITE(db_tests) { RUN_TEST(object_table_lookup_test); + /* RUN_TEST(task_queue_test); */ } GREATEST_MAIN_DEFS(); From b21a01d32f126d9d5b6925f2f22b9f87c8736b1e Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Mon, 19 Sep 2016 16:42:32 -0700 Subject: [PATCH 4/4] add task table --- state/task_table.h | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 state/task_table.h diff --git a/state/task_table.h b/state/task_table.h new file mode 100644 index 0000000..94daeab --- /dev/null +++ b/state/task_table.h @@ -0,0 +1,14 @@ +#ifndef TASK_TABLE_H +#define TASK_TABLE_H + +#include "db.h" +#include "task.h" + +/* Add task to the task table, handle errors here. */ +status task_table_add_task(db_conn *db, task_iid task_iid, task_spec *task); + +/* Get specific task from the task table. */ +status task_table_get_task(db_conn *db, task_iid task_iid, task_spec *task); + + +#endif /* TASK_TABLE_H */