From 145c1b6a85ab65578f0f0f566593c65d40a7c130 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 28 Oct 2016 13:39:41 -0700 Subject: [PATCH 1/2] implement object table and task log retries --- Makefile | 14 +- common.h | 7 + event_loop.c | 9 +- event_loop.h | 25 ++- state/db.h | 2 +- state/object_table.c | 60 ++++++ state/object_table.h | 141 ++++++++++-- state/redis.c | 336 +++++++++++++++++++++-------- state/redis.h | 100 ++++++--- state/table.c | 112 ++++++++++ state/table.h | 137 ++++++++++++ state/task_log.c | 52 +++++ state/task_log.h | 93 ++++++-- task.h | 25 ++- test/db_tests.c | 69 ++++-- test/example_task.h | 8 + test/object_table_tests.c | 443 ++++++++++++++++++++++++++++++++++++++ test/task_log_tests.c | 274 +++++++++++++++++++++++ 18 files changed, 1714 insertions(+), 193 deletions(-) create mode 100644 state/object_table.c create mode 100644 state/table.c create mode 100644 state/table.h create mode 100644 state/task_log.c create mode 100644 test/object_table_tests.c create mode 100644 test/task_log_tests.c diff --git a/Makefile b/Makefile index 6982b79..4cd48ec 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,10 @@ CC = gcc -CFLAGS = -g -Wall --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -fPIC -I. -Ithirdparty -Ithirdparty/ae -Wno-typedef-redefinition -Werror +CFLAGS = -g -Wall -Wno-typedef-redefinition --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -fPIC -I. -Ithirdparty -Ithirdparty/ae BUILD = build all: hiredis $(BUILD)/libcommon.a -$(BUILD)/libcommon.a: event_loop.o common.o task.o io.o state/redis.o thirdparty/ae/ae.o +$(BUILD)/libcommon.a: event_loop.o common.o task.o io.o state/redis.o state/table.o state/object_table.o state/task_log.o thirdparty/ae/ae.o ar rcs $@ $^ $(BUILD)/common_tests: test/common_tests.c $(BUILD)/libcommon.a @@ -13,6 +13,12 @@ $(BUILD)/common_tests: test/common_tests.c $(BUILD)/libcommon.a $(BUILD)/db_tests: hiredis test/db_tests.c $(BUILD)/libcommon.a $(CC) -o $@ test/db_tests.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS) +$(BUILD)/object_table_tests: hiredis test/object_table_tests.c $(BUILD)/libcommon.a + $(CC) -o $@ test/object_table_tests.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS) + +$(BUILD)/task_log_tests: hiredis test/task_log_tests.c $(BUILD)/libcommon.a + $(CC) -o $@ test/task_log_tests.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS) + $(BUILD)/io_tests: test/io_tests.c $(BUILD)/libcommon.a $(CC) -o $@ $^ $(CFLAGS) @@ -32,9 +38,9 @@ redis: hiredis: git submodule update --init --recursive -- "thirdparty/hiredis" ; cd thirdparty/hiredis ; make -test: hiredis redis $(BUILD)/common_tests $(BUILD)/db_tests $(BUILD)/io_tests $(BUILD)/task_tests $(BUILD)/redis_tests FORCE +test: hiredis redis $(BUILD)/common_tests $(BUILD)/task_log_tests $(BUILD)/object_table_tests $(BUILD)/db_tests $(BUILD)/io_tests $(BUILD)/task_tests $(BUILD)/redis_tests FORCE ./thirdparty/redis-3.2.3/src/redis-server & - sleep 1s ; ./build/common_tests ; ./build/db_tests ; ./build/io_tests ; ./build/task_tests ; ./build/redis_tests + sleep 1s ; ./build/common_tests ; ./build/db_tests ; ./build/task_log_tests ; ./build/object_table_tests ; ./build/io_tests ; ./build/task_tests ; ./build/redis_tests valgrind: test valgrind --leak-check=full --error-exitcode=1 ./build/common_tests diff --git a/common.h b/common.h index f09e91c..f238870 100644 --- a/common.h +++ b/common.h @@ -5,6 +5,9 @@ #include #include #include +#include + +#define RAY_COMMON_DEBUG #ifndef RAY_COMMON_DEBUG #define LOG_DEBUG(M, ...) @@ -36,6 +39,10 @@ } \ } while (0); +/** This macro indicates that this pointer owns the data it is pointing to + * and is responsible for freeing it. */ +#define OWNER + #define UNIQUE_ID_SIZE 20 /* Cleanup method for running tests with the greatest library. diff --git a/event_loop.c b/event_loop.c index 7e78354..d6980cd 100644 --- a/event_loop.c +++ b/event_loop.c @@ -42,15 +42,14 @@ void event_loop_remove_file(event_loop *loop, int fd) { } int64_t event_loop_add_timer(event_loop *loop, - int64_t milliseconds, + int64_t timeout, event_loop_timer_handler handler, void *context) { - return aeCreateTimeEvent(loop, milliseconds, handler, context, NULL); + return aeCreateTimeEvent(loop, timeout, handler, context, NULL); } -void event_loop_remove_timer(event_loop *loop, timer_id timer_id) { - int err = aeDeleteTimeEvent(loop, timer_id); - CHECK(err == AE_OK); /* timer id found? */ +int event_loop_remove_timer(event_loop *loop, int64_t id) { + return aeDeleteTimeEvent(loop, id); } void event_loop_run(event_loop *loop) { diff --git a/event_loop.h b/event_loop.h index 7f0659b..296fd98 100644 --- a/event_loop.h +++ b/event_loop.h @@ -57,16 +57,29 @@ void event_loop_add_file(event_loop *loop, /* Remove a registered file event handler from the event loop. */ void event_loop_remove_file(event_loop *loop, int fd); -/* Register a handler that will be called after a time slice of - * "milliseconds" milliseconds. Can specify a context that will be passed - * as an argument to the handler. Return the id of the time event. */ +/** Register a handler that will be called after a time slice of + * "timeout" milliseconds. + * + * @param loop The event loop. + * @param timeout The timeout in milliseconds. + * @param handler The handler for the timeout. + * @param context User context that can be passed in and will be passed in + * as an argument for the timer handler. + * @return The ID of the timer. + */ int64_t event_loop_add_timer(event_loop *loop, - int64_t milliseconds, + int64_t timeout, event_loop_timer_handler handler, void *context); -/* Remove a registered time event handler from the event loop. */ -void event_loop_remove_timer(event_loop *loop, timer_id timer_id); +/** + * Remove a registered time event handler from the event loop. + * + * @param loop The event loop. + * @param timer_id The ID of the timer to be removed. + * @return Returns 0 if the removal was successful. + */ +int event_loop_remove_timer(event_loop *loop, int64_t timer_id); /* Run the event loop. */ void event_loop_run(event_loop *loop); diff --git a/state/db.h b/state/db.h index 50536e2..96ecf1e 100644 --- a/state/db.h +++ b/state/db.h @@ -3,7 +3,7 @@ #include "event_loop.h" -typedef struct db_handle_impl db_handle; +typedef struct db_handle db_handle; /* Connect to the global system store at address and port. Returns * a handle to the database, which must be freed with db_disconnect diff --git a/state/object_table.c b/state/object_table.c new file mode 100644 index 0000000..45584ba --- /dev/null +++ b/state/object_table.c @@ -0,0 +1,60 @@ +#include "object_table.h" +#include "redis.h" + +void object_table_lookup(db_handle *db_handle, + object_id object_id, + int retry_count, + uint64_t timeout, + object_table_lookup_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context) { + retry_struct retry; + + retry.count = retry_count; + retry.timeout = timeout; + retry.cb = redis_object_table_lookup; + + init_table_callback(db_handle, object_id, NULL, &retry, done_cb, fail_cb, user_context); +} + +void object_table_add(db_handle *db_handle, + object_id object_id, + int retry_count, + uint64_t timeout, + object_table_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context) { + + retry_struct retry; + + retry.count = retry_count; + retry.timeout = timeout; + retry.cb = redis_object_table_add; + + init_table_callback(db_handle, object_id, NULL, &retry, done_cb, fail_cb, user_context); +} + + +void object_table_subscribe(db_handle *db_handle, + object_id object_id, + object_table_object_available_cb object_available_cb, + void *subscribe_context, + int retry_count, + uint64_t timeout, + object_table_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context) { + + object_table_subscribe_data *sub_data = malloc(sizeof(object_table_subscribe_data)); + utarray_push_back(db_handle->callback_freelist, &sub_data); + sub_data->object_available_cb = object_available_cb; + sub_data->subscribe_context = subscribe_context; + + retry_struct retry; + + retry.count = retry_count; + retry.timeout = timeout; + retry.cb = redis_object_table_subscribe; + + init_table_callback(db_handle, object_id, sub_data, &retry, done_cb, fail_cb, user_context); +} diff --git a/state/object_table.h b/state/object_table.h index bab54bc..33b54dc 100644 --- a/state/object_table.h +++ b/state/object_table.h @@ -1,25 +1,136 @@ +#ifndef OBJECT_TABLE_H +#define OBJECT_TABLE_H + #include "common.h" +#include "table.h" #include "db.h" -/* The callback that is called when the result of a lookup - * in the object table comes back. The callback should free - * the manager_vector array, but NOT the strings they are pointing to. */ -typedef void (*lookup_callback)(object_id object_id, - int manager_count, - const char *manager_vector[], - void *context); +/* + * ==== Lookup call and callback ==== + */ -/* Register a new object with the directory. */ -/* TODO(pcm): Retry, print for each attempt. */ -void object_table_add(db_handle *db, object_id object_id); +/* Callback called when the lookup completes. The callback should free + * the manager_vector array, but NOT the strings they are pointing to. + */ +typedef void (*object_table_lookup_done_cb)(object_id object_id, + int manager_count, + OWNER const char *manager_vector[], + void *user_context); -/* Remove object from the directory. */ -void object_table_remove(db_handle *db, +/** + * Return the list of nodes storing object_id in their plasma stores. + * + * @param db_handle Handle to object_table database. + * @param object_id ID of the object being looked up. + * @param retry_count Number of retries to the database before giving up. + * @param timeout Timout between retries (in milliseconds). + * @param done_callback Function to be called when database returns result. + * @param fail_callback Function to be called if we failed to contact + * database after retry_count retries. + * @param user_context Context passed by the caller. + * @return Void. + */ +void object_table_lookup(db_handle *db_handle, object_id object_id, - const char *manager); + int retry_count, + uint64_t timeout, + object_table_lookup_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context); + +/* + * ==== Add object call and callback ==== + */ + + /* Callback called when the object add/remove operation completes. */ +typedef void (*object_table_done_cb)(object_id object_id, void *user_context); -/* Look up entry from the directory */ -void object_table_lookup(db_handle *db, +/** + * Add the plasma manager that created the db_handle to the + * list of plasma managers that have the object_id. + * + * @param db_handle: Handle to db. + * @param object_id: Object unique identifier. + * @param retry_count: Number of retries after giving up. + * @param done_cb: Callback to be called when lookup completes. + * @param timeout_cb: Callback to be called when lookup timeouts. + * @param user_context: User context to be passed in the callbacks. + * @return Void. + */ +void object_table_add(db_handle *db_handle, + object_id object_id, + int retry_count, + uint64_t timeout, + object_table_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context); + +/* + * ==== Remove object call and callback ==== + */ + + +/** + * Object remove function. + * + * @param db_handle: Handle to db. + * @param object_id: Object unique identifier. + * @param retry_count: Number of retries after giving up. + * @param timeout: Timeout after which we retry the lookup. + * @param done_cb: Callback to be called when lookup completes. + * @param fail_cb: Callback to be called when lookup timeouts. + * @param user_context: User context to be passed in the callbacks. + * @return Void. + */ +/* +void object_table_remove(db_handle *db, object_id object_id, lookup_callback callback, void *context); + int retry_count, + uint64_t timeout, + object_table_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context); +*/ + +/* + * ==== Subscribe to be announced when new object available ==== + */ + +/* Callback called when object object_id is available. */ +typedef void (*object_table_object_available_cb)(object_id object_id, void *user_context); + +/** + * Subcribing to new object available function. + * + * @param db_handle: Handle to db. + * @param object_id: Object unique identifier. + * @param object_available_cb: callback to be called when new object becomes available + * @param subscribe_context: caller context which will be passed back in the object_available_cb + * @param retry_count: Number of retries after giving up. + * @param timeout: Timeout after which we retry to install subscription. + * @param done_cb: Callback to be called when subscription is installed. + * @param fail_cb: Callback to be called when subscription installation fails. + * @param user_context: User context to be passed in the callbacks. + * @return Void. + */ + +void object_table_subscribe(db_handle *db, + object_id object_id, + object_table_object_available_cb object_available_cb, + void *subscribe_context, + int retry_count, + uint64_t timeout, + object_table_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context); + +/* Data that is needed to register new object available callbacks with the state database. */ +typedef struct { + object_table_object_available_cb object_available_cb; + void *subscribe_context; +} object_table_subscribe_data; + + +#endif /* OBJECT_TABLE_H */ diff --git a/state/redis.c b/state/redis.c index d53b0bf..8d94951 100644 --- a/state/redis.c +++ b/state/redis.c @@ -1,6 +1,7 @@ /* Redis implementation of the global state store */ #include +#include #include #include "hiredis/adapters/ae.h" @@ -9,6 +10,7 @@ #include "common.h" #include "db.h" #include "object_table.h" +#include "task.h" #include "task_log.h" #include "event_loop.h" #include "redis.h" @@ -31,6 +33,19 @@ } \ } while (0); +#define REDIS_CALLBACK_HEADER(DB, CB_DATA, REPLY) \ + db_handle *DB = c->data; \ + table_callback_data *CB_DATA = privdata; \ + \ + if ((REPLY) == NULL) { \ + return; \ + } \ + \ + if (outstanding_callbacks_find(cb_data) == NULL) \ + /* the callback data structure has been + * already freed; just ignore this reply */ \ + return; + db_handle *db_connect(const char *address, int port, const char *client_type, @@ -104,131 +119,282 @@ void db_disconnect(db_handle *db) { } void db_attach(db_handle *db, event_loop *loop) { + db->loop = loop; redisAeAttach(loop, db->context); redisAeAttach(loop, db->sub_context); } -void object_table_add(db_handle *db, unique_id object_id) { - redisAsyncCommand(db->context, NULL, NULL, "SADD obj:%b %d", &object_id.id[0], - UNIQUE_ID_SIZE, db->client_id); +/* + * ==== object_table callbacks ==== + */ + +void redis_object_table_add_cb(redisAsyncContext *c, void *r, void *privdata) { + REDIS_CALLBACK_HEADER(db, cb_data, r) + + if (cb_data->done_cb) { + task_log_done_cb done_cb = cb_data->done_cb; + done_cb(cb_data->id, cb_data->user_context); + } + event_loop_remove_timer(db->loop, cb_data->timer_id); +} + +void redis_object_table_add(table_callback_data *cb_data) { + CHECK(cb_data); + + if (outstanding_callbacks_find(cb_data) == NULL) + /* the callback data structure has been already freed; just ignore this reply */ + return; + + db_handle *db = cb_data->db_handle; + redisAsyncCommand(db->context, redis_object_table_add_cb, cb_data, "SADD obj:%b %d", + &cb_data->id.id[0], UNIQUE_ID_SIZE, db->client_id); if (db->context->err) { LOG_REDIS_ERR(db->context, "could not add object_table entry"); } } -void object_table_get_entry(redisAsyncContext *c, void *r, void *privdata) { - db_handle *db = c->data; - lookup_callback_data *cb_data = privdata; +void redis_object_table_lookup(table_callback_data *cb_data) { + CHECK(cb_data); + db_handle *db = cb_data->db_handle; + + /* Call redis asynchronously */ + redisAsyncCommand(db->context, redis_object_table_get_entry, cb_data, + "SMEMBERS obj:%b", &cb_data->id.id[0], UNIQUE_ID_SIZE); + if (db->context->err) { + LOG_REDIS_ERR(db->context, "error in object_table lookup"); + } +} + +/** + * Get an entry from the plasma manager table in redis. + * + * @param db The database handle. + * @param index The index of the plasma manager. + * @param *manager The pointer where the IP address of the manager gets written. + * @return Void. + */ +void redis_get_cached_service(db_handle *db, int index, const char **manager) { + service_cache_entry *entry; + HASH_FIND_INT(db->service_cache, &index, entry); + if (!entry) { + /* This is a very rare case. */ + redisReply *reply = redisCommand(db->sync_context, "HGET %s %lld", + db->client_type, index); + CHECK(reply->type == REDIS_REPLY_STRING); + entry = malloc(sizeof(service_cache_entry)); + entry->service_id = index; + entry->addr = strdup(reply->str); + HASH_ADD_INT(db->service_cache, service_id, entry); + freeReplyObject(reply); + } + *manager = entry->addr; +} + +void redis_object_table_get_entry(redisAsyncContext *c, + void *r, + void *privdata) { + REDIS_CALLBACK_HEADER(db, cb_data, r) redisReply *reply = r; - if (reply == NULL) - return; - int *result = malloc(reply->elements * sizeof(int)); + + int *managers = malloc(reply->elements * sizeof(int)); int64_t manager_count = reply->elements; + if (reply->type == REDIS_REPLY_ARRAY) { + const char **manager_vector = malloc(manager_count * sizeof(char *)); for (int j = 0; j < reply->elements; j++) { CHECK(reply->element[j]->type == REDIS_REPLY_STRING); - result[j] = atoi(reply->element[j]->str); - service_cache_entry *entry; - HASH_FIND_INT(db->service_cache, &result[j], entry); - if (!entry) { - redisReply *reply = redisCommand(db->sync_context, "HGET %s %lld", - db->client_type, result[j]); - CHECK(reply->type == REDIS_REPLY_STRING); - entry = malloc(sizeof(service_cache_entry)); - entry->service_id = result[j]; - entry->addr = strdup(reply->str); - HASH_ADD_INT(db->service_cache, service_id, entry); - freeReplyObject(reply); - } + managers[j] = atoi(reply->element[j]->str); + redis_get_cached_service(db, managers[j], manager_vector + j); } + + object_table_lookup_done_cb done_cb = cb_data->done_cb; + done_cb(cb_data->id, manager_count, manager_vector, cb_data->user_context); + /* remove timer */ + event_loop_remove_timer(cb_data->db_handle->loop, cb_data->timer_id); + free(privdata); + free(managers); } else { LOG_ERR("expected integer or string, received type %d", reply->type); exit(-1); } - const char **manager_vector = malloc(manager_count * sizeof(char *)); - for (int j = 0; j < manager_count; ++j) { - service_cache_entry *entry; - HASH_FIND_INT(db->service_cache, &result[j], entry); - manager_vector[j] = entry->addr; - } - cb_data->callback(cb_data->object_id, manager_count, manager_vector, - cb_data->context); - free(privdata); - free(result); -} - -void object_table_lookup(db_handle *db, - object_id object_id, - lookup_callback callback, - void *context) { - lookup_callback_data *cb_data = malloc(sizeof(lookup_callback_data)); - cb_data->callback = callback; - cb_data->object_id = object_id; - cb_data->context = context; - redisAsyncCommand(db->context, object_table_get_entry, cb_data, - "SMEMBERS obj:%b", &object_id.id[0], UNIQUE_ID_SIZE); - if (db->context->err) { - LOG_REDIS_ERR(db->context, "error in object_table lookup"); +} + + +void object_table_redis_callback(redisAsyncContext *c, + void *r, + void *privdata) { + REDIS_CALLBACK_HEADER(db, cb_data, r) + redisReply *reply = r; + + CHECK(reply->type == REDIS_REPLY_ARRAY); + /* First entry is message type, second is topic, third is payload. */ + CHECK(reply->elements > 2); + /* If this condition is true, we got the initial message that acknowledged the + * subscription. */ + if (strncmp(reply->element[1]->str, "add", 3) != 0) { + if (cb_data->done_cb) { + object_table_done_cb done_cb = cb_data->done_cb; + done_cb(cb_data->id, cb_data->user_context); + } + event_loop_remove_timer(db->loop, cb_data->timer_id); + return; + } + /* Otherwise, parse the task and call the callback. */ + CHECK(privdata); + object_table_subscribe_data *data = cb_data->data; + + if (data->object_available_cb) { + data->object_available_cb(cb_data->id, data->subscribe_context); } } -void task_log_add_task(db_handle *db, task_instance *task_instance) { - task_iid task_iid = *task_instance_id(task_instance); - redisAsyncCommand(db->context, NULL, NULL, "HMSET tasklog:%b 0 %b", - (char *) &task_iid.id[0], UNIQUE_ID_SIZE, - (char *) task_instance, task_instance_size(task_instance)); - if (db->context->err) { - LOG_REDIS_ERR(db->context, "error setting task in task_log_add_task"); +void redis_object_table_subscribe(table_callback_data *cb_data) { + db_handle *db = cb_data->db_handle; + + /* subscribe to key notification associated to object id */ + + redisAsyncCommand(db->sub_context, object_table_redis_callback, cb_data, + "SUBSCRIBE __keyspace@0__:%b add", (char *) &cb_data->id.id[0], UNIQUE_ID_SIZE); + + if (db->sub_context->err) { + LOG_REDIS_ERR(db->sub_context, "error in redis_object_table_subscribe_callback"); } +} + +/* + * ==== task_log callbacks ==== + */ + + +void redis_task_log_publish(table_callback_data *cb_data) { + db_handle *db = cb_data->db_handle; + task_instance *task_instance = cb_data->data; + task_iid task_iid = *task_instance_id(task_instance); node_id node = *task_instance_node(task_instance); int32_t state = *task_instance_state(task_instance); - redisAsyncCommand(db->context, NULL, NULL, "PUBLISH task_log:%b:%d %b", - (char *) &node.id[0], UNIQUE_ID_SIZE, state, - (char *) task_instance, task_instance_size(task_instance)); - if (db->context->err) { - LOG_REDIS_ERR(db->context, "error publishing task in task_log_add_task"); + + LOG_DEBUG("Called log_publish callback"); + + /* Check whether the vector (requests_info) indicating the status of the requests has been allocated. + * If was not allocate it, allocate it and initialize it. + * This vector has an entry for each redis command, and it stores true if a reply for that command + * has been received, and false otherwise. + * The first entry in the callback corresponds to RPUSH, and the second entry to PUBLISH. + */ +#define NUM_DB_REQUESTS 2 +#define PUSH_INDEX 0 +#define PUBLISH_INDEX 1 + if (cb_data->requests_info == NULL) { + cb_data->requests_info = malloc(NUM_DB_REQUESTS * sizeof(bool)); + for (int i = 0; i < NUM_DB_REQUESTS; i++) { + ((bool *)cb_data->requests_info)[i] = false; + } + } + + if (((bool *)cb_data->requests_info)[PUSH_INDEX] == false) { + if (*task_instance_state(task_instance) == TASK_STATUS_WAITING) { + redisAsyncCommand(db->context, redis_task_log_publish_push_cb, cb_data, "RPUSH tasklog:%b %b", + (char *) &task_iid.id[0], UNIQUE_ID_SIZE, + (char *) task_instance, task_instance_size(task_instance)); + } else { + task_update update = {.state = state, .node = node}; + redisAsyncCommand(db->context, redis_task_log_publish_push_cb, cb_data, "RPUSH tasklog:%b %b", + (char *) &task_iid.id[0], UNIQUE_ID_SIZE, + (char *) &update, sizeof(update)); + } + + if (db->context->err) { + LOG_REDIS_ERR(db->context, "error setting task in task_log_add_task"); + } + } + + if (((bool *)cb_data->requests_info)[PUBLISH_INDEX] == false) { + redisAsyncCommand(db->context, redis_task_log_publish_publish_cb, cb_data, "PUBLISH task_log:%b:%d %b", + (char *) &node.id[0], UNIQUE_ID_SIZE, state, + (char *) task_instance, task_instance_size(task_instance)); + + if (db->context->err) { + LOG_REDIS_ERR(db->context, "error publishing task in task_log_add_task"); + } } } +void redis_task_log_publish_push_cb(redisAsyncContext *c, + void *r, + void *privdata) { + REDIS_CALLBACK_HEADER(db, cb_data, r) + + CHECK(cb_data->requests_info != NULL); + ((bool *)cb_data->requests_info)[PUSH_INDEX] = true; + + if (((bool *)cb_data->requests_info)[PUBLISH_INDEX] == true) { + if (cb_data->done_cb) { + task_log_done_cb done_cb = cb_data->done_cb; + done_cb(cb_data->id, cb_data->user_context); + } + event_loop_remove_timer(db->loop, cb_data->timer_id); + } +} + +void redis_task_log_publish_publish_cb(redisAsyncContext *c, void *r, void *privdata) { + REDIS_CALLBACK_HEADER(db, cb_data, r) + + CHECK(cb_data->requests_info != NULL); + ((bool *)cb_data->requests_info)[PUBLISH_INDEX] = true; + + if (((bool *)cb_data->requests_info)[PUSH_INDEX] == true) { + if (cb_data->done_cb) { + task_log_done_cb done_cb = cb_data->done_cb; + done_cb(cb_data->id, cb_data->user_context); + } + event_loop_remove_timer(db->loop, cb_data->timer_id); + } +} + + void task_log_redis_callback(redisAsyncContext *c, - void *reply, + void *r, void *privdata) { - redisReply *r = reply; - if (reply == NULL) - return; - CHECK(r->type == REDIS_REPLY_ARRAY); + REDIS_CALLBACK_HEADER(db, cb_data, r) + redisReply *reply = r; + + CHECK(reply->type == REDIS_REPLY_ARRAY); /* First entry is message type, second is topic, third is payload. */ - CHECK(r->elements > 2); + CHECK(reply->elements > 2); /* If this condition is true, we got the initial message that acknowledged the * subscription. */ - if (r->element[2]->str == NULL) { + if (reply->element[2]->str == NULL) { + if (cb_data->done_cb) { + task_log_done_cb done_cb = cb_data->done_cb; + done_cb(cb_data->id, cb_data->user_context); + } + event_loop_remove_timer(db->loop, cb_data->timer_id); return; } /* Otherwise, parse the task and call the callback. */ CHECK(privdata); - task_log_callback_data *callback_data = privdata; - task_instance *instance = malloc(r->element[2]->len); - memcpy(instance, r->element[2]->str, r->element[2]->len); - callback_data->callback(instance, callback_data->userdata); + task_log_subscribe_data *data = cb_data->data; + + task_instance *instance = malloc(reply->element[2]->len); + memcpy(instance, reply->element[2]->str, reply->element[2]->len); + if (data->subscribe_cb) { + data->subscribe_cb(instance, data->subscribe_context); + } task_instance_free(instance); } -void task_log_register_callback(db_handle *db, - task_log_callback callback, - node_id node, - int32_t state, - void *userdata) { - task_log_callback_data *callback_data = - malloc(sizeof(task_log_callback_data)); - utarray_push_back(db->callback_freelist, &callback_data); - callback_data->callback = callback; - callback_data->userdata = userdata; - if (memcmp(&node.id[0], &NIL_ID.id[0], UNIQUE_ID_SIZE) == 0) { - redisAsyncCommand(db->sub_context, task_log_redis_callback, callback_data, - "PSUBSCRIBE task_log:*:%d", state); + + +void redis_task_log_subscribe(table_callback_data *cb_data) { + db_handle *db = cb_data->db_handle; + task_log_subscribe_data *data = cb_data->data; + + if (memcmp(&data->node.id[0], &NIL_ID.id[0], UNIQUE_ID_SIZE) == 0) { + redisAsyncCommand(db->sub_context, task_log_redis_callback, cb_data, + "PSUBSCRIBE task_log:*:%d", data->state_filter); } else { - redisAsyncCommand(db->sub_context, task_log_redis_callback, callback_data, - "SUBSCRIBE task_log:%b:%d", (char *) &node.id[0], - UNIQUE_ID_SIZE, state); + redisAsyncCommand(db->sub_context, task_log_redis_callback, cb_data, + "SUBSCRIBE task_log:%b:%d", (char *) &data->node.id[0], + UNIQUE_ID_SIZE, data->state_filter); } if (db->sub_context->err) { LOG_REDIS_ERR(db->sub_context, "error in task_log_register_callback"); diff --git a/state/redis.h b/state/redis.h index cf368b9..4815134 100644 --- a/state/redis.h +++ b/state/redis.h @@ -19,50 +19,96 @@ typedef struct { UT_hash_handle hh; } service_cache_entry; -typedef struct { - /* The callback that will be called. */ - task_log_callback callback; - /* Userdata associated with the callback. */ - void *userdata; -} task_log_callback_data; - -struct db_handle_impl { - /* String that identifies this client type. */ +struct db_handle { + /** String that identifies this client type. */ char *client_type; - /* Unique ID for this client within the type. */ + /** Unique ID for this client within the type. */ int64_t client_id; - /* Redis context for this global state store connection. */ + /** Redis context for this global state store connection. */ redisAsyncContext *context; - /* Redis context for "subscribe" communication. + /** Redis context for "subscribe" communication. * Yes, we need a separate one for that, see * https://github.com/redis/hiredis/issues/55 */ redisAsyncContext *sub_context; - /* The event loop this global state store connection is part of. */ + /** The event loop this global state store connection is part of. */ event_loop *loop; - /* Index of the database connection in the event loop */ + /** Index of the database connection in the event loop */ int64_t db_index; - /* Cache for the IP addresses of services. */ + /** Cache for the IP addresses of services. */ service_cache_entry *service_cache; - /* Redis context for synchronous connections. + /** Redis context for synchronous connections. * Should only be used very rarely, it is not asynchronous. */ redisContext *sync_context; /* Data structure for callbacks that needs to be freed. */ UT_array *callback_freelist; }; -typedef struct { - /* The callback that will be called. */ - lookup_callback callback; - /* Object ID that is looked up. */ - object_id object_id; - /* Data context for the callback. */ - void *context; -} lookup_callback_data; - -void object_table_get_entry(redisAsyncContext *c, void *r, void *privdata); +void redis_object_table_get_entry(redisAsyncContext *c, void *r, void *privdata); void object_table_lookup_callback(redisAsyncContext *c, void *r, void *privdata); -#endif +/* + * ==== Redis object table functions ==== + */ + +/** + * Lookup object table entry in redis. + * @param cb_data Data structure containing redis connection and timeout information. + * @return Void. + */ +void redis_object_table_lookup(table_callback_data *cb_data); + +/** + * Add an entry to the object table in redis. + * @param cb_data Data structure containing redis connection and timeout information. + * @return Void. + */ +void redis_object_table_add(table_callback_data *cb_data); + +/** + * Subscribe to learn when a new object becomes available. + * @param cb_data Data structure containing redis connection and timeout information. + * @return Void. + */ +void redis_object_table_subscribe(table_callback_data *cb_data); + + +/* + * ==== Redis task table function ===== + */ + +/** + * Add or update task log entry with new scheduling information. + * @param cb_data Data structure containing redis connection and timeout information. + * @return Void. + */ +void redis_task_log_publish(table_callback_data *cb_data); + + +/** + * Callback invoked when the replya from the task push command is received. + * @param c Redis context. + * @param r Reply (not used). + * @param privdata Data associated to the callback. + */ +void redis_task_log_publish_push_cb(redisAsyncContext *c, void *r, void *privdata); + +/** + * Callback invoked when the replya from the task publish command is received. + * @param c Redis context. + * @param r Reply (not used). + * @param privdata Data associated to the callback. + */ +void redis_task_log_publish_publish_cb(redisAsyncContext *c, void *r, void *privdata); + + +/** + * Subscribe to updates of the task log. + * @param cb_data Data structure containing redis connection and timeout information. + * @return Void. + */ +void redis_task_log_subscribe(table_callback_data *cb_data); + +#endif /* REDIS_H */ diff --git a/state/table.c b/state/table.c new file mode 100644 index 0000000..4e3b0de --- /dev/null +++ b/state/table.c @@ -0,0 +1,112 @@ +#include "table.h" + +#include +#include "redis.h" + +table_callback_data *init_table_callback(db_handle *db_handle, + unique_id id, + void *data, + retry_struct *retry, + table_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context) { + CHECK(db_handle); + CHECK(db_handle->loop); + CHECK(retry); + /* Allocate and initialize callback data structure for object table */ + table_callback_data *cb_data = malloc(sizeof(table_callback_data)); + CHECKM(cb_data != NULL, "Memory allocation error!") + cb_data->id = id; + cb_data->done_cb = done_cb; + cb_data->fail_cb = fail_cb; + cb_data->retry.cb = retry->cb; + cb_data->retry.count = retry->count; + cb_data->retry.timeout = retry->timeout; + cb_data->data = data; + cb_data->requests_info = NULL; + cb_data->user_context = user_context; + cb_data->db_handle = db_handle; + /* Add timer and initialize it. */ + cb_data->timer_id = event_loop_add_timer(db_handle->loop, + retry->timeout, + table_timeout_handler, + cb_data); + outstanding_callbacks_add(cb_data); + + cb_data->retry.cb(cb_data); + + return cb_data; +} + +void destroy_table_callback(table_callback_data *cb_data) { + CHECK(cb_data != NULL); + + // if (cb_data->data) + // free(cb_data->data); + + if (cb_data->requests_info) + free(cb_data->requests_info); + + outstanding_callbacks_remove(cb_data); + + /* Timer is removed via EVENT_LOOP_TIMER_DONE in the timeout callback. */ + free(cb_data); +} + +int64_t table_timeout_handler(event_loop *loop, + int64_t timer_id, + void *user_context) { + CHECK(loop != NULL); + CHECK(user_context != NULL); + table_callback_data *cb_data = (table_callback_data *) user_context; + + CHECK(cb_data->retry.count >= 0) + LOG_DEBUG("retrying operation, retry_count = %d", cb_data->retry.count); + + if (cb_data->retry.count == 0) { + /* We didn't get a response from the database after exhausting all retries; + * let user know, cleanup the state, and remove the timer. */ + if (cb_data->fail_cb) { + cb_data->fail_cb(cb_data->id, cb_data->user_context); + } + destroy_table_callback(cb_data); + return EVENT_LOOP_TIMER_DONE; + } + + /* Decrement retry count and try again. */ + cb_data->retry.count--; + cb_data->retry.cb(cb_data); + return cb_data->retry.timeout; +} + +/** + * List of outstanding callbacks. We need to maintain this list for the case in which a reply is received + * after te last timeout expires and all relevant data structures are removed. In this case we just need + * to ignore the reply. + * */ +static outstanding_callback *outstanding_cbs = NULL; + +void outstanding_callbacks_add(table_callback_data *key) { + outstanding_callback *outstanding_cb = malloc(sizeof(outstanding_callback)); + + CHECK(outstanding_cb != NULL); + outstanding_cb->key = key; + HASH_ADD_PTR(outstanding_cbs, key, outstanding_cb); +} + +outstanding_callback *outstanding_callbacks_find(table_callback_data *key) { + outstanding_callback *outstanding_cb = NULL; + + HASH_FIND_PTR(outstanding_cbs, &key, outstanding_cb); + return outstanding_cb; +} + +void outstanding_callbacks_remove(table_callback_data *key) { + outstanding_callback *outstanding_cb = NULL; + + outstanding_cb = outstanding_callbacks_find(key); + if (outstanding_cb != NULL) { + HASH_DEL(outstanding_cbs, outstanding_cb); + free(outstanding_cb); + } +} diff --git a/state/table.h b/state/table.h new file mode 100644 index 0000000..f3cd463 --- /dev/null +++ b/state/table.h @@ -0,0 +1,137 @@ +#ifndef TABLE_H +#define TABLE_H + +#include "uthash.h" +#include "stdbool.h" + +#include "common.h" +#include "db.h" + +typedef struct table_callback_data table_callback_data; + +typedef void * table_done_cb; + +/* The callback called when the database operation hasn't completed after + * the number of retries specified for the operation. */ +typedef void (*table_fail_cb)(unique_id id, void *user_context); + +typedef void (*table_retry_cb)(table_callback_data *cb_data); + +/** + * Data structure consolidating the retry related varaibles. + */ +typedef struct { + /** Number of retries left. */ + int count; + /** Timeout, in milliseconds. */ + uint64_t timeout; + /** The callback that will be called to initiate the next try. */ + table_retry_cb cb; +} retry_struct; + + +struct table_callback_data { + /** ID of the entry in the table that we are going to look up, remove or add. */ + unique_id id; + /** The callback that will be called when results is returned. */ + table_done_cb done_cb; + /** The callback that will be called when the redis command times out. */ + table_fail_cb fail_cb; + /** Retry structure containong the remaining number of retries, timeout and a point + * to the retry callback. + */ + retry_struct retry; + /** Pointer to the data that is entered into the table. */ + void *data; + /** Pointer to the data used internally to handle multiple database requests. */ + void *requests_info; + /** User context. */ + void *user_context; + /** Handle to db. */ + db_handle *db_handle; + /** Handle to timer. */ + int64_t timer_id; +}; + +/** + * Function to handle the timeout event. + * @param loop Event loop. + * @param timer_id Timer identifier. + * @param context Pointer to the callback data for the object table + * @return Timeout to reset the timer if we need to try again, or + * EVENT_LOOP_TIMER_DONE if retry_count == 0. + */ +int64_t table_timeout_handler(event_loop *loop, + int64_t timer_id, + void *context); + +/** + * + * @param db_handle Database handle. + * @param id ID of the object that is looked up, added or removed. + * @param data Data entered into the table. + * @param retry Retry relevant information: retry timeout, number of remaining retries, and retry callback. + * @param done_cb Function to be called when database returns result. + * @param fail_cb Function to be called when number of retries is exhausted. + * @param user_context Context that can be provided by the user and will be + * passed on to the various callbacks. + * @return New table callback data struct. + */ +table_callback_data *init_table_callback(db_handle *db_handle, + unique_id id, + void *data, + retry_struct *retry, + table_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context); + + +void destroy_table_callback(table_callback_data *cb_data); + +/** + * Hash table maintaining the outstanding callbacks. + * + * This hash table is used to handle the following case: + * - a table command is issued with an associated callback and a callback data structure; + * - the last timeout associated to this command expires, as a result the callback data structure is freed; + * - a reply arrives, but now the callback data structure is gone, so we have to ignore this reply; + * + * This hash table enables us to ignore such replies. The operations on the hash table are as follows. + * + * When we issue a table command we add a new entry to the hash table that is keyed by the address of the callback's + * data structure. + * + * When we receive the reply, we check whether the callback still exists in this hash table, and if not we just ignore + * the reply. + * + * When the last timeout associated to the command expires we remove the entry associated to the collback. + */ +typedef struct { + table_callback_data *key; + int dummy; + UT_hash_handle hh; /* makes this structure hashable */ +} outstanding_callback; + +/** + * + * @param key The pointer to the data structure of the callback we want to insert. + * @return None. + */ +void outstanding_callbacks_add(table_callback_data *key); + +/** + * + * @param key The pointer to the data structure of the callback we are looking for. + * @return Returns callback if found, NULL otherwise. + */ +outstanding_callback *outstanding_callbacks_find(table_callback_data *key); + +/** + * + * @param key Key, defined as the pointer to the data structure of the callback we want to remove. + * @return None. + */ +void outstanding_callbacks_remove(table_callback_data *key); + + +#endif /* TABLE_H */ diff --git a/state/task_log.c b/state/task_log.c new file mode 100644 index 0000000..be68dbc --- /dev/null +++ b/state/task_log.c @@ -0,0 +1,52 @@ +#include "task_log.h" +#include "redis.h" + +#define NUM_DB_REQUESTS 2 + +void task_log_publish(db_handle *db_handle, + task_instance *task_instance, + int retry_count, + uint64_t timeout, + task_log_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context) { + retry_struct retry; + + retry.count = retry_count; + retry.timeout = timeout; + retry.cb = redis_task_log_publish; + + table_callback_data *cb_data = + init_table_callback(db_handle, *task_instance_id(task_instance), task_instance, + &retry, done_cb, fail_cb, user_context); + redis_task_log_publish(cb_data); +} + +void task_log_subscribe(db_handle *db_handle, + node_id node, + int32_t state_filter, + task_log_subscribe_cb subscribe_cb, + void *subscribe_context, + int retry_count, + uint64_t timeout, + task_log_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context) { + + task_log_subscribe_data *sub_data = malloc(sizeof(task_log_subscribe_data)); + utarray_push_back(db_handle->callback_freelist, &sub_data); + sub_data->node = node; + sub_data->state_filter = state_filter; + sub_data->subscribe_cb = subscribe_cb; + sub_data->subscribe_context = subscribe_context; + + retry_struct retry; + + retry.count = retry_count; + retry.timeout = timeout; + retry.cb = redis_task_log_subscribe; + + table_callback_data *cb_data = + init_table_callback(db_handle, node, sub_data, &retry, done_cb, fail_cb, user_context); + redis_task_log_subscribe(cb_data); +} \ No newline at end of file diff --git a/state/task_log.h b/state/task_log.h index acf5dbc..cd6a973 100644 --- a/state/task_log.h +++ b/state/task_log.h @@ -2,6 +2,7 @@ #define TASK_LOG_H #include "db.h" +#include "table.h" #include "task.h" /* The task log is a message bus that is used for all communication between @@ -15,27 +16,77 @@ * 5) local scheduler writes it when a task finishes execution; * 6) global scheduler reads it to get the tasks that have finished; */ +/* Callback called when the task log operation completes. */ +typedef void (*task_log_done_cb)(task_iid task_iid, void *user_context); + +/* + * ==== Publish the task log ==== + */ + +/** Add or update a task instance to the task log. + * @param db_handle Database handle. + * @param retry_count Number of retries to the database before giving up. + * @param timeout Timout between retries (in milliseconds). + * @param done_cb Function to be called when database returns result. + * @param fail_cb Function to be called if we failed to contact + * database after retry_count retries. + * @param user_context Data that will be passed to done_cb and fail_cb. + * @return Void. + */ +void task_log_publish(db_handle *db_handle, + task_instance *task_instance, + int retry_count, + uint64_t timeout, + task_log_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context); + + +/* + * ==== Subscribing to the task log ==== + */ + /* Callback for subscribing to the task log. */ -typedef void (*task_log_callback)(task_instance *task_instance, void *userdata); - -/* Initially add a task instance to the task log. */ -void task_log_add_task(db_handle *db, task_instance *task_instance); - -/* Update task instance in the task log. */ -void task_log_update_task(db_handle *db, - task_iid task_iid, - int32_t state, - node_id node); - -/* Register callback for a certain event. The node specifies the node whose - * events we want to listen to. If you want to listen to all events for this - * node, use state_filter = - * TASK_WAITING | TASK_SCHEDULED | TASK_RUNNING | TASK_DONE. - * If you want to register to updates from all nodes, set node = NIL_ID. */ -void task_log_register_callback(db_handle *db, - task_log_callback callback, - node_id node, - int32_t state_filter, - void *userdata); +typedef void (*task_log_subscribe_cb)(task_instance *task_instance, + void *user_context); + +/** Register callback for a certain event. + * + * @param db_handle Database handle. + * @param subscribe_cb Callback that will be called when the task log is + updated. + * @param subscribe_context Context that will be passed into the subscribe_cb. + * @param node Node whose events we want to listen to. If you want to register + * to updates from all nodes, set node = NIL_ID. + * @param state_filter Flags for events we want to listen to. If you want + * to listen to all events, use state_filter = TASK_WAITING | + * TASK_SCHEDULED | TASK_RUNNING | TASK_DONE. + * @param retry_count Number of retries to the database before giving up. + * @param timeout Timout between retries (in milliseconds). + * @param done_cb Function to be called when database returns result. + * @param fail_cb Function to be called if we failed to contact + * database after retry_count retries. + * @param user_context Data that will be passed to done_cb and fail_cb. + * @return Void. + */ +void task_log_subscribe(db_handle *db_handle, + node_id node, + int32_t state_filter, + task_log_subscribe_cb subscribe_cb, + void *subscribe_context, + int retry_count, + uint64_t timeout, + task_log_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context); + +/* Data that is needed to register task log subscribe callbacks with the state + * database. */ +typedef struct { + node_id node; + int32_t state_filter; + task_log_subscribe_cb subscribe_cb; + void *subscribe_context; +} task_log_subscribe_data; #endif /* TASK_LOG_H */ diff --git a/task.h b/task.h index 782bf68..8d818e3 100644 --- a/task.h +++ b/task.h @@ -27,9 +27,9 @@ typedef unique_id task_iid; typedef unique_id node_id; /* - * TASK SPECIFICATIONS: Contain all the information neccessary - * to execute the task (function id, arguments, return object ids). - * + * ==== Task specifications ==== + * Contain all the information neccessary to execute the + * task (function id, arguments, return object ids). */ typedef struct task_spec_impl task_spec; @@ -83,11 +83,10 @@ task_spec *read_task(int fd); void print_task(task_spec *spec, UT_string *output); /* - * SCHEDULED TASK: Contains information about a scheduled task: - * the task iid, the task specification and the task status - * (WAITING, SCHEDULED, RUNNING, DONE) and which node the - * task is scheduled on. - * + * ==== Task instance ==== + * Contains information about a scheduled task: The task iid, + * the task specification and the task status (WAITING, SCHEDULED, + * RUNNING, DONE) and which node the task is scheduled on. */ /* The scheduling_state can be used as a flag when we are listening @@ -129,4 +128,14 @@ task_spec *task_instance_task_spec(task_instance *instance); /* Free this task instance datastructure. */ void task_instance_free(task_instance *instance); +/* + * ==== Task update ==== + * Contains the information necessary to update a task in the task log. + */ + +typedef struct { + int32_t state; + node_id node; +} task_update; + #endif diff --git a/test/db_tests.c b/test/db_tests.c index 74dd9a8..b59025a 100644 --- a/test/db_tests.c +++ b/test/db_tests.c @@ -14,6 +14,10 @@ SUITE(db_tests); +/* Retry 10 times with an 100ms timeout. */ +const int NUM_RETRIES = 10; +const uint64_t TIMEOUT = 50; + const char *manager_addr = "127.0.0.1"; int manager_port1 = 12345; int manager_port2 = 12346; @@ -22,11 +26,18 @@ char received_port1[6] = {0}; char received_addr2[16] = {0}; char received_port2[6] = {0}; +typedef struct { + int test_number; +} user_context; + +const int TEST_NUMBER = 10; + /* Test if entries have been written to the database. */ -void test_callback(object_id object_id, - int manager_count, - const char *manager_vector[], - void *context) { + +void lookup_done_cb(object_id object_id, + int manager_count, + const char *manager_vector[], + void *user_context) { CHECK(manager_count == 2); if (!manager_vector[0] || sscanf(manager_vector[0], "%15[0-9.]:%5[0-9]", received_addr1, @@ -41,7 +52,19 @@ void test_callback(object_id object_id, free(manager_vector); } -int timeout_handler(event_loop *loop, timer_id timer_id, void *context) { +/* Entry added to database successfully. */ +void add_done_cb(object_id object_id, void* user_context) { + +} + +/* Test if we got a timeout callback if we couldn't connect database. */ +void timeout_cb(object_id object_id, + void *context) { + user_context *uc = (user_context *)context; + CHECK(uc->test_number == TEST_NUMBER) +} + +int64_t timeout_handler(event_loop *loop, int64_t id, void *context) { event_loop_stop(loop); return EVENT_LOOP_TIMER_DONE; } @@ -55,12 +78,14 @@ TEST object_table_lookup_test(void) { db_attach(db1, loop); db_attach(db2, loop); unique_id id = globally_unique_id(); - object_table_add(db1, id); - object_table_add(db2, id); - event_loop_add_timer(loop, 100, timeout_handler, NULL); + object_table_add(db1, id, NUM_RETRIES, TIMEOUT, add_done_cb, timeout_cb, NULL); + object_table_add(db2, id, NUM_RETRIES, TIMEOUT, add_done_cb, timeout_cb, NULL); + event_loop_add_timer(loop, 200, timeout_handler, NULL); event_loop_run(loop); - object_table_lookup(db1, id, test_callback, NULL); - event_loop_add_timer(loop, 100, timeout_handler, NULL); + user_context user_context; + user_context.test_number = TEST_NUMBER; + object_table_lookup(db1, id, NUM_RETRIES, TIMEOUT, lookup_done_cb, timeout_cb, NULL); + event_loop_add_timer(loop, 200, timeout_handler, NULL); event_loop_run(loop); int port1 = atoi(received_port1); int port2 = atoi(received_port2); @@ -88,12 +113,11 @@ TEST task_log_test(void) { db_attach(db, loop); node_id node = globally_unique_id(); task_spec *task = example_task(); - task_instance *instance = make_task_instance(globally_unique_id(), task, - TASK_STATUS_SCHEDULED, node); - task_log_register_callback(db, task_log_test_callback, node, - TASK_STATUS_SCHEDULED, instance); - task_log_add_task(db, instance); - event_loop_add_timer(loop, 100, timeout_handler, NULL); + task_instance *instance = + make_task_instance(globally_unique_id(), task, TASK_STATUS_SCHEDULED, node); + task_log_subscribe(db, node, TASK_STATUS_SCHEDULED, task_log_test_callback, instance, NUM_RETRIES, TIMEOUT, NULL, NULL, NULL); + task_log_publish(db, instance, NUM_RETRIES, TIMEOUT, NULL, NULL, NULL); + event_loop_add_timer(loop, 200, timeout_handler, NULL); event_loop_run(loop); task_instance_free(instance); free_task_spec(task); @@ -118,17 +142,20 @@ TEST task_log_all_test(void) { globally_unique_id(), task, TASK_STATUS_SCHEDULED, globally_unique_id()); task_instance *instance2 = make_task_instance( globally_unique_id(), task, TASK_STATUS_SCHEDULED, globally_unique_id()); - task_log_register_callback(db, task_log_all_test_callback, NIL_ID, - TASK_STATUS_SCHEDULED, NULL); - task_log_add_task(db, instance1); - task_log_add_task(db, instance2); - event_loop_add_timer(loop, 100, timeout_handler, NULL); + task_log_subscribe(db, NIL_ID, TASK_STATUS_SCHEDULED, task_log_all_test_callback, NULL, NUM_RETRIES, TIMEOUT, NULL, NULL, NULL); + event_loop_add_timer(loop, 50, timeout_handler, NULL); + event_loop_run(loop); + /* TODO(pcm): Get rid of this sleep once the robust pubsub is implemented. */ + task_log_publish(db, instance1, NUM_RETRIES, TIMEOUT, NULL, NULL, NULL); + task_log_publish(db, instance2, NUM_RETRIES, TIMEOUT, NULL, NULL, NULL); + event_loop_add_timer(loop, 200, timeout_handler, NULL); event_loop_run(loop); task_instance_free(instance2); task_instance_free(instance1); free_task_spec(task); db_disconnect(db); event_loop_destroy(loop); + printf("num_test_callback_called = %d\n", num_test_callback_called); ASSERT(num_test_callback_called == 2); PASS(); } diff --git a/test/example_task.h b/test/example_task.h index 0dddc4d..f471ff4 100644 --- a/test/example_task.h +++ b/test/example_task.h @@ -11,4 +11,12 @@ task_spec *example_task(void) { return task; } +task_instance *example_task_instance(void) { + task_iid iid = globally_unique_id(); + task_spec *spec = example_task(); + task_instance *instance = make_task_instance(iid, spec, TASK_STATUS_WAITING, NIL_ID); + free_task_spec(spec); + return instance; +} + #endif diff --git a/test/object_table_tests.c b/test/object_table_tests.c new file mode 100644 index 0000000..bd025d0 --- /dev/null +++ b/test/object_table_tests.c @@ -0,0 +1,443 @@ +#include "greatest.h" + +#include "event_loop.h" +#include "example_task.h" +#include "common.h" +#include "state/object_table.h" +#include "state/redis.h" + +#include + +SUITE(object_table_tests); + +static event_loop *g_loop; + +/* ==== Test if operations time out correctly ==== */ + +/* === Test lookup timeout === */ + +const char *lookup_timeout_context = "lookup_timeout"; +int lookup_failed = 0; + +void lookup_done_cb(object_id object_id, + int manager_count, + OWNER const char *manager_vector[], + void *context) { + /* The done callback should not be called. */ + CHECK(0); +} + +void lookup_fail_cb(unique_id id, void *user_data) { + lookup_failed = 1; + CHECK(user_data == (void*) lookup_timeout_context); + event_loop_stop(g_loop); +} + +TEST lookup_timeout_test(void) { + g_loop = event_loop_create(); + db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", + 1234); + db_attach(db, g_loop); + object_table_lookup(db, NIL_ID, 5, 100, lookup_done_cb, lookup_fail_cb, + (void*) lookup_timeout_context); + /* Disconnect the database to see if the lookup times out. */ + close(db->context->c.fd); + event_loop_run(g_loop); + db_disconnect(db); + event_loop_destroy(g_loop); + CHECK(lookup_failed); + PASS(); +} + +/* === Test add timeout === */ + +const char *add_timeout_context = "add_timeout"; +int add_failed = 0; + +void add_done_cb(object_id object_id, + void *user_context) { + /* The done callback should not be called. */ + CHECK(0); +} + +void add_fail_cb(unique_id id, void *user_data) { + add_failed = 1; + CHECK(user_data == (void*) add_timeout_context); + event_loop_stop(g_loop); +} + +TEST add_timeout_test(void) { + g_loop = event_loop_create(); + db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", + 1234); + db_attach(db, g_loop); + object_table_add(db, NIL_ID, 5, 100, add_done_cb, add_fail_cb, (void*) add_timeout_context); + /* Disconnect the database to see if the lookup times out. */ + close(db->context->c.fd); + event_loop_run(g_loop); + db_disconnect(db); + event_loop_destroy(g_loop); + CHECK(add_failed); + PASS(); +} + + +/* === Test subscribe timeout === */ + +const char *subscribe_timeout_context = "subscribe_timeout"; +int subscribe_failed = 0; + +void subscribe_done_cb(object_id object_id, + void *user_context) { + /* The done callback should not be called. */ + CHECK(0); +} + +void subscribe_fail_cb(unique_id id, void *user_data) { + subscribe_failed = 1; + CHECK(user_data == (void*) subscribe_timeout_context); + event_loop_stop(g_loop); +} + +TEST subscribe_timeout_test(void) { + g_loop = event_loop_create(); + db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", + 1234); + db_attach(db, g_loop); + object_table_subscribe(db, NIL_ID, NULL, NULL, 5, 100, subscribe_done_cb, subscribe_fail_cb, (void *) subscribe_timeout_context); + /* Disconnect the database to see if the lookup times out. */ + close(db->sub_context->c.fd); + event_loop_run(g_loop); + db_disconnect(db); + event_loop_destroy(g_loop); + CHECK(subscribe_failed); + PASS(); +} + + +/* ==== Test if the retry is working correctly ==== */ + +int64_t reconnect_context_cb(event_loop *loop, int64_t timer_id, void *context) { + db_handle *db = context; + /* Reconnect to redis. This is not reconnecting the pub/sub channel. */ + redisAsyncFree(db->context); + redisFree(db->sync_context); + db->context = redisAsyncConnect("127.0.0.1", 6379); + db->context->data = (void *) db; + db->sync_context = redisConnect("127.0.0.1", 6379); + /* Re-attach the database to the event loop (the file descriptor changed). */ + db_attach(db, loop); + return EVENT_LOOP_TIMER_DONE; +} + +int64_t terminate_event_loop_cb(event_loop *loop, + int64_t timer_id, + void *context) { + event_loop_stop(loop); + return EVENT_LOOP_TIMER_DONE; +} + +/* === Test lookup retry === */ + +const char *lookup_retry_context = "lookup_retry"; +int lookup_retry_succeeded = 0; + +void lookup_retry_done_cb(object_id object_id, + int manager_count, + OWNER const char *manager_vector[], + void *context) { + CHECK(context == (void *) lookup_retry_context); + lookup_retry_succeeded = 1; + free(manager_vector); +} + +void lookup_retry_fail_cb(unique_id id, void *user_data) { + /* The fail callback should not be called. */ + CHECK(0); +} + +TEST lookup_retry_test(void) { + g_loop = event_loop_create(); + db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", + 11235); + db_attach(db, g_loop); + object_table_lookup(db, NIL_ID, 5, 100, lookup_retry_done_cb, + lookup_retry_fail_cb, (void*) lookup_retry_context); + /* Disconnect the database to let the lookup time out the first time. */ + close(db->context->c.fd); + /* Install handler for reconnecting the database. */ + event_loop_add_timer(g_loop, 150, reconnect_context_cb, db); + /* Install handler for terminating the event loop. */ + event_loop_add_timer(g_loop, 750, terminate_event_loop_cb, NULL); + event_loop_run(g_loop); + db_disconnect(db); + event_loop_destroy(g_loop); + CHECK(lookup_retry_succeeded); + PASS(); +} + +/* === Test add retry === */ + +const char *add_retry_context = "add_retry"; +int add_retry_succeeded = 0; + +void add_retry_done_cb(object_id object_id, + void *user_context) { + CHECK(user_context == (void *) add_retry_context); + add_retry_succeeded = 1; +} + +void add_retry_fail_cb(unique_id id, void *user_data) { + /* The fail callback should not be called. */ + CHECK(0); +} + +TEST add_retry_test(void) { + g_loop = event_loop_create(); + db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", + 11235); + db_attach(db, g_loop); + object_table_add(db, NIL_ID, 5, 100, add_retry_done_cb, + add_retry_fail_cb, (void*) add_retry_context); + /* Disconnect the database to let the add time out the first time. */ + close(db->context->c.fd); + /* Install handler for reconnecting the database. */ + event_loop_add_timer(g_loop, 150, reconnect_context_cb, db); + /* Install handler for terminating the event loop. */ + event_loop_add_timer(g_loop, 750, terminate_event_loop_cb, NULL); + event_loop_run(g_loop); + db_disconnect(db); + event_loop_destroy(g_loop); + CHECK(add_retry_succeeded); + PASS(); +} + +/* === Test subscribe retry === */ + +const char *subscribe_retry_context = "subscribe_retry"; +int subscribe_retry_succeeded = 0; + +int64_t reconnect_sub_context_cb(event_loop *loop, int64_t timer_id, void *context) { + db_handle *db = context; + /* Reconnect to redis. This is not reconnecting the pub/sub channel. */ + redisAsyncFree(db->sub_context); + redisFree(db->sync_context); + db->sub_context = redisAsyncConnect("127.0.0.1", 6379); + db->sub_context->data = (void *) db; + db->sync_context = redisConnect("127.0.0.1", 6379); + /* Re-attach the database to the event loop (the file descriptor changed). */ + db_attach(db, loop); + return EVENT_LOOP_TIMER_DONE; +} + +void subscribe_retry_done_cb(object_id object_id, + void *user_context) { + CHECK(user_context == (void *) subscribe_retry_context); + subscribe_retry_succeeded = 1; +} + +void subscribe_retry_fail_cb(unique_id id, void *user_data) { + /* The fail callback should not be called. */ + CHECK(0); +} + +TEST subscribe_retry_test(void) { + g_loop = event_loop_create(); + db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", + 11235); + db_attach(db, g_loop); + object_table_subscribe(db, NIL_ID, NULL, NULL, 5, 100, subscribe_retry_done_cb, + subscribe_retry_fail_cb, (void*) subscribe_retry_context); + /* Disconnect the database to let the subscribe times out the first time. */ + close(db->sub_context->c.fd); + /* Install handler for reconnecting the database. */ + event_loop_add_timer(g_loop, 150, reconnect_sub_context_cb, db); + /* Install handler for terminating the event loop. */ + event_loop_add_timer(g_loop, 750, terminate_event_loop_cb, NULL); + event_loop_run(g_loop); + db_disconnect(db); + event_loop_destroy(g_loop); + CHECK(subscribe_retry_succeeded); + PASS(); +} + +/* ==== Test if late succeed is working correctly ==== */ + +/* === Test lookup late succeed === */ + +const char* lookup_late_context = "lookup_late"; +int lookup_late_failed = 0; + +void lookup_late_fail_cb(unique_id id, void *user_context) { + CHECK(user_context == (void *) lookup_late_context); + lookup_late_failed = 1; +} + +void lookup_late_done_cb(object_id object_id, + int manager_count, + OWNER const char *manager_vector[], + void *context) { + /* This function should never be called. */ + CHECK(0); +} + +TEST lookup_late_test(void) { + g_loop = event_loop_create(); + db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", + 11236); + db_attach(db, g_loop); + object_table_lookup(db, NIL_ID, 0, 0, lookup_late_done_cb, + lookup_late_fail_cb, (void*) lookup_late_context); + /* Install handler for terminating the event loop. */ + event_loop_add_timer(g_loop, 750, terminate_event_loop_cb, NULL); + /* First process timer events to make sure the timeout is processed before + * anything else. */ + aeProcessEvents(g_loop, AE_TIME_EVENTS); + event_loop_run(g_loop); + db_disconnect(db); + event_loop_destroy(g_loop); + CHECK(lookup_late_failed); + PASS(); +} + +/* === Test add late succeed === */ + +const char *add_late_context = "add_late"; +int add_late_failed = 0; + +void add_late_fail_cb(unique_id id, void *user_context) { + CHECK(user_context == (void *) add_late_context); + add_late_failed = 1; +} + +void add_late_done_cb(object_id object_id, + void *user_context) { + /* This function should never be called. */ + CHECK(0); +} + +TEST add_late_test(void) { + g_loop = event_loop_create(); + db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", + 11236); + db_attach(db, g_loop); + object_table_add(db, NIL_ID, 0, 0, add_late_done_cb, + add_late_fail_cb, (void*) add_late_context); + /* Install handler for terminating the event loop. */ + event_loop_add_timer(g_loop, 750, terminate_event_loop_cb, NULL); + /* First process timer events to make sure the timeout is processed before + * anything else. */ + aeProcessEvents(g_loop, AE_TIME_EVENTS); + event_loop_run(g_loop); + db_disconnect(db); + event_loop_destroy(g_loop); + CHECK(add_late_failed); + PASS(); +} + +/* === Test subscribe late succeed === */ + +const char *subscribe_late_context = "subscribe_late"; +int subscribe_late_failed = 0; + +void subscribe_late_fail_cb(unique_id id, void *user_context) { + CHECK(user_context == (void *) subscribe_late_context); + subscribe_late_failed = 1; +} + +void subscribe_late_done_cb(object_id object_id, + void *user_context) { + /* This function should never be called. */ + CHECK(0); +} + +TEST subscribe_late_test(void) { + g_loop = event_loop_create(); + db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", + 11236); + db_attach(db, g_loop); + object_table_subscribe(db, NIL_ID, NULL, NULL, 0, 0, subscribe_late_done_cb, + subscribe_late_fail_cb, (void*) subscribe_late_context); + /* Install handler for terminating the event loop. */ + event_loop_add_timer(g_loop, 750, terminate_event_loop_cb, NULL); + /* First process timer events to make sure the timeout is processed before + * anything else. */ + aeProcessEvents(g_loop, AE_TIME_EVENTS); + event_loop_run(g_loop); + db_disconnect(db); + event_loop_destroy(g_loop); + CHECK(subscribe_late_failed); + PASS(); +} + + +/* === Test subscribe object available succeed === */ + +const char* subscribe_success_context = "subscribe_success"; +int subscribe_success_done = 0; +int subscribe_success_succeeded = 0; + +void subscribe_success_fail_cb(unique_id id, void *user_context) { + /* This function should never be called. */ + CHECK(0); +} + +void subscribe_success_done_cb(object_id object_id, void *user_context) { + object_table_add((db_handle *)user_context, object_id, 0, 0, NULL, NULL, NULL); + subscribe_success_done = 1; +} + +void subscribe_success_object_available_cb(object_id object_id, + void *user_context) { + CHECK(user_context == (void*) subscribe_success_context); + subscribe_success_succeeded = 1; +} + + +TEST subscribe_success_test(void) { + g_loop = event_loop_create(); + db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", + 11236); + db_attach(db, g_loop); + unique_id id = globally_unique_id(); + + object_table_subscribe(db, id, + subscribe_success_object_available_cb, + (void*) subscribe_success_context, 0, 100, + subscribe_success_done_cb, + subscribe_success_fail_cb, + (void *) db); + + + /* Install handler for terminating the event loop. */ + event_loop_add_timer(g_loop, 750, terminate_event_loop_cb, NULL); + + event_loop_run(g_loop); + db_disconnect(db); + event_loop_destroy(g_loop); + + CHECK(subscribe_success_done); + CHECK(subscribe_success_succeeded); + PASS(); +} + +SUITE(object_table_tests) { + RUN_TEST(lookup_timeout_test); + RUN_TEST(add_timeout_test); + RUN_TEST(subscribe_timeout_test); + RUN_TEST(lookup_retry_test); + RUN_TEST(add_retry_test); + RUN_TEST(subscribe_retry_test); + RUN_TEST(lookup_late_test); + RUN_TEST(add_late_test); + RUN_TEST(subscribe_late_test); + RUN_TEST(subscribe_success_test); +} + +GREATEST_MAIN_DEFS(); + +int main(int argc, char **argv) { + GREATEST_MAIN_BEGIN(); + RUN_SUITE(object_table_tests); + GREATEST_MAIN_END(); +} diff --git a/test/task_log_tests.c b/test/task_log_tests.c new file mode 100644 index 0000000..04ca853 --- /dev/null +++ b/test/task_log_tests.c @@ -0,0 +1,274 @@ +#include "greatest.h" + +#include "event_loop.h" +#include "example_task.h" +#include "common.h" +#include "state/object_table.h" +#include "state/redis.h" + +#include +#include +// #include + +SUITE(task_log_tests); + +event_loop *loop; + +/* ==== Test if operations time out correctly ==== */ + +/* === Test subscribe timeout === */ + +const char *subscribe_timeout_context = "subscribe_timeout"; +int subscribe_failed = 0; + +void subscribe_done_cb(task_iid task_iid, + void *user_context) { + /* The done callback should not be called. */ + CHECK(0); +} + +void subscribe_fail_cb(unique_id id, void *user_data) { + subscribe_failed = 1; + CHECK(user_data == (void*) subscribe_timeout_context); + event_loop_stop(loop); +} + +TEST subscribe_timeout_test(void) { + loop = event_loop_create(); + db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", + 1234); + db_attach(db, loop); + task_log_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, 5, 100, + subscribe_done_cb, subscribe_fail_cb, (void*) subscribe_timeout_context); + /* Disconnect the database to see if the subscribe times out. */ + close(db->sub_context->c.fd); + aeProcessEvents(loop, AE_TIME_EVENTS); + event_loop_run(loop); + db_disconnect(db); + event_loop_destroy(loop); + CHECK(subscribe_failed); + PASS(); +} + +/* === Test publish timeout === */ + +const char *publish_timeout_context = "publish_timeout"; +const int publish_test_number = 272; +int publish_failed = 0; + +void publish_done_cb(task_iid task_iid, + void *user_context) { + /* The done callback should not be called. */ + CHECK(0); +} + +void publish_fail_cb(unique_id id, void *user_data) { + publish_failed = 1; + CHECK(user_data == (void*) publish_timeout_context); + event_loop_stop(loop); +} + +TEST publish_timeout_test(void) { + loop = event_loop_create(); + db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", + 1234); + db_attach(db, loop); + task_instance *task = example_task_instance(); + task_log_publish(db, task, 5, 100, + publish_done_cb, publish_fail_cb, (void*) publish_timeout_context); + /* Disconnect the database to see if the publish times out. */ + close(db->context->c.fd); + aeProcessEvents(loop, AE_TIME_EVENTS); + event_loop_run(loop); + db_disconnect(db); + event_loop_destroy(loop); + CHECK(publish_failed); + task_instance_free(task); + PASS(); +} + +/* ==== Test if the retry is working correctly ==== */ + +int64_t reconnect_db_cb(event_loop *loop, int64_t timer_id, void *context) { + db_handle *db = context; + /* Reconnect to redis. */ + redisAsyncFree(db->sub_context); + db->sub_context = redisAsyncConnect("127.0.0.1", 6379); + db->sub_context->data = (void *) db; + /* Re-attach the database to the event loop (the file descriptor changed). */ + db_attach(db, loop); + return EVENT_LOOP_TIMER_DONE; +} + +int64_t terminate_event_loop_cb(event_loop *loop, + int64_t timer_id, + void *context) { + event_loop_stop(loop); + return EVENT_LOOP_TIMER_DONE; +} + +/* === Test subscribe retry === */ + +const char *subscribe_retry_context = "subscribe_retry"; +const int subscribe_retry_test_number = 273; +int subscribe_retry_succeeded = 0; + +void subscribe_retry_done_cb(object_id object_id, + void *user_context) { + CHECK(user_context == (void *) subscribe_retry_context); + subscribe_retry_succeeded = 1; +} + +void subscribe_retry_fail_cb(unique_id id, void *user_data) { + /* The fail callback should not be called. */ + CHECK(0); +} + +TEST subscribe_retry_test(void) { + loop = event_loop_create(); + db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", + 11235); + db_attach(db, loop); + task_log_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, 5, 100, + subscribe_retry_done_cb, subscribe_retry_fail_cb, (void*) subscribe_retry_context); + /* Disconnect the database to see if the subscribe times out. */ + close(db->sub_context->c.fd); + /* Install handler for reconnecting the database. */ + event_loop_add_timer(loop, 150, reconnect_db_cb, db); + /* Install handler for terminating the event loop. */ + event_loop_add_timer(loop, 750, terminate_event_loop_cb, NULL); + event_loop_run(loop); + db_disconnect(db); + event_loop_destroy(loop); + CHECK(subscribe_retry_succeeded); + PASS(); +} + +/* === Test publish retry === */ + +const char *publish_retry_context = "publish_retry"; +int publish_retry_succeeded = 0; + +void publish_retry_done_cb(object_id object_id, + void *user_context) { + CHECK(user_context == (void *) publish_retry_context); + publish_retry_succeeded = 1; +} + +void publish_retry_fail_cb(unique_id id, void *user_data) { + /* The fail callback should not be called. */ + CHECK(0); +} + +TEST publish_retry_test(void) { + loop = event_loop_create(); + db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", + 11235); + db_attach(db, loop); + task_instance *task = example_task_instance(); + task_log_publish(db, task, 5, 100, + publish_retry_done_cb, publish_retry_fail_cb, (void*) publish_retry_context); + /* Disconnect the database to see if the publish times out. */ + close(db->sub_context->c.fd); + /* Install handler for reconnecting the database. */ + event_loop_add_timer(loop, 150, reconnect_db_cb, db); + /* Install handler for terminating the event loop. */ + event_loop_add_timer(loop, 750, terminate_event_loop_cb, NULL); + event_loop_run(loop); + db_disconnect(db); + event_loop_destroy(loop); + CHECK(publish_retry_succeeded); + task_instance_free(task); + PASS(); +} + +/* ==== Test if late succeed is working correctly ==== */ + +/* === Test subscribe late succeed === */ + +const char *subscribe_late_context = "subscribe_late"; +int subscribe_late_failed = 0; + +void subscribe_late_fail_cb(unique_id id, void *user_context) { + CHECK(user_context == (void *) subscribe_late_context); + subscribe_late_failed = 1; +} + +void subscribe_late_done_cb(task_iid task_iid, + void *user_context) { + /* This function should never be called. */ + CHECK(0); +} + +TEST subscribe_late_test(void) { + loop = event_loop_create(); + db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", + 11236); + db_attach(db, loop); + task_log_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, 0, 0, + subscribe_late_done_cb, subscribe_late_fail_cb, (void*) subscribe_late_context); + /* Install handler for terminating the event loop. */ + event_loop_add_timer(loop, 750, terminate_event_loop_cb, NULL); + /* First process timer events to make sure the timeout is processed before + * anything else. */ + aeProcessEvents(loop, AE_TIME_EVENTS); + event_loop_run(loop); + db_disconnect(db); + event_loop_destroy(loop); + CHECK(subscribe_late_failed); + PASS(); +} + +/* === Test publish late succeed === */ + +const char* publish_late_context = "publish_late"; +int publish_late_failed = 0; + +void publish_late_fail_cb(unique_id id, void *user_context) { + CHECK(user_context == (void *) publish_late_context); + publish_late_failed = 1; +} + +void publish_late_done_cb(task_iid task_iik, + void *user_context) { + /* This function should never be called. */ + CHECK(0); +} + +TEST publish_late_test(void) { + loop = event_loop_create(); + db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", + 11236); + db_attach(db, loop); + task_instance *task = example_task_instance(); + task_log_publish(db, task, 0, 0, + publish_late_done_cb, publish_late_fail_cb, (void*) publish_late_context); + /* Install handler for terminating the event loop. */ + event_loop_add_timer(loop, 750, terminate_event_loop_cb, NULL); + /* First process timer events to make sure the timeout is processed before + * anything else. */ + aeProcessEvents(loop, AE_TIME_EVENTS); + event_loop_run(loop); + db_disconnect(db); + event_loop_destroy(loop); + CHECK(publish_late_failed); + task_instance_free(task); + PASS(); +} + +SUITE(task_log_tests) { + RUN_TEST(subscribe_timeout_test); + RUN_TEST(publish_timeout_test); + RUN_TEST(subscribe_retry_test); + RUN_TEST(publish_retry_test); + RUN_TEST(subscribe_late_test); + RUN_TEST(publish_late_test); +} + +GREATEST_MAIN_DEFS(); + +int main(int argc, char **argv) { + GREATEST_MAIN_BEGIN(); + RUN_SUITE(task_log_tests); + GREATEST_MAIN_END(); +} From fc9d9fa17d1f5fc4889ca89307e9d455815db23d Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 28 Oct 2016 14:11:42 -0700 Subject: [PATCH 2/2] clang fixes --- state/object_table.c | 34 ++++++----- state/object_table.h | 37 ++++++------ state/redis.c | 93 +++++++++++++++-------------- state/redis.h | 30 ++++++---- test/db_tests.c | 32 +++++----- test/object_table_tests.c | 122 ++++++++++++++++++-------------------- test/task_log_tests.c | 69 +++++++++++---------- 7 files changed, 215 insertions(+), 202 deletions(-) diff --git a/state/object_table.c b/state/object_table.c index 45584ba..8545d01 100644 --- a/state/object_table.c +++ b/state/object_table.c @@ -14,7 +14,8 @@ void object_table_lookup(db_handle *db_handle, retry.timeout = timeout; retry.cb = redis_object_table_lookup; - init_table_callback(db_handle, object_id, NULL, &retry, done_cb, fail_cb, user_context); + init_table_callback(db_handle, object_id, NULL, &retry, done_cb, fail_cb, + user_context); } void object_table_add(db_handle *db_handle, @@ -24,28 +25,28 @@ void object_table_add(db_handle *db_handle, object_table_done_cb done_cb, table_fail_cb fail_cb, void *user_context) { - retry_struct retry; retry.count = retry_count; retry.timeout = timeout; retry.cb = redis_object_table_add; - init_table_callback(db_handle, object_id, NULL, &retry, done_cb, fail_cb, user_context); + init_table_callback(db_handle, object_id, NULL, &retry, done_cb, fail_cb, + user_context); } - -void object_table_subscribe(db_handle *db_handle, - object_id object_id, - object_table_object_available_cb object_available_cb, - void *subscribe_context, - int retry_count, - uint64_t timeout, - object_table_done_cb done_cb, - table_fail_cb fail_cb, - void *user_context) { - - object_table_subscribe_data *sub_data = malloc(sizeof(object_table_subscribe_data)); +void object_table_subscribe( + db_handle *db_handle, + object_id object_id, + object_table_object_available_cb object_available_cb, + void *subscribe_context, + int retry_count, + uint64_t timeout, + object_table_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context) { + object_table_subscribe_data *sub_data = + malloc(sizeof(object_table_subscribe_data)); utarray_push_back(db_handle->callback_freelist, &sub_data); sub_data->object_available_cb = object_available_cb; sub_data->subscribe_context = subscribe_context; @@ -56,5 +57,6 @@ void object_table_subscribe(db_handle *db_handle, retry.timeout = timeout; retry.cb = redis_object_table_subscribe; - init_table_callback(db_handle, object_id, sub_data, &retry, done_cb, fail_cb, user_context); + init_table_callback(db_handle, object_id, sub_data, &retry, done_cb, fail_cb, + user_context); } diff --git a/state/object_table.h b/state/object_table.h index 33b54dc..a0cab0a 100644 --- a/state/object_table.h +++ b/state/object_table.h @@ -42,7 +42,7 @@ void object_table_lookup(db_handle *db_handle, * ==== Add object call and callback ==== */ - /* Callback called when the object add/remove operation completes. */ +/* Callback called when the object add/remove operation completes. */ typedef void (*object_table_done_cb)(object_id object_id, void *user_context); /** @@ -69,7 +69,6 @@ void object_table_add(db_handle *db_handle, * ==== Remove object call and callback ==== */ - /** * Object remove function. * @@ -99,15 +98,18 @@ void object_table_remove(db_handle *db, */ /* Callback called when object object_id is available. */ -typedef void (*object_table_object_available_cb)(object_id object_id, void *user_context); +typedef void (*object_table_object_available_cb)(object_id object_id, + void *user_context); /** * Subcribing to new object available function. * * @param db_handle: Handle to db. * @param object_id: Object unique identifier. - * @param object_available_cb: callback to be called when new object becomes available - * @param subscribe_context: caller context which will be passed back in the object_available_cb + * @param object_available_cb: callback to be called when new object becomes + * available + * @param subscribe_context: caller context which will be passed back in the + * object_available_cb * @param retry_count: Number of retries after giving up. * @param timeout: Timeout after which we retry to install subscription. * @param done_cb: Callback to be called when subscription is installed. @@ -116,21 +118,22 @@ typedef void (*object_table_object_available_cb)(object_id object_id, void *user * @return Void. */ -void object_table_subscribe(db_handle *db, - object_id object_id, - object_table_object_available_cb object_available_cb, - void *subscribe_context, - int retry_count, - uint64_t timeout, - object_table_done_cb done_cb, - table_fail_cb fail_cb, - void *user_context); - -/* Data that is needed to register new object available callbacks with the state database. */ +void object_table_subscribe( + db_handle *db, + object_id object_id, + object_table_object_available_cb object_available_cb, + void *subscribe_context, + int retry_count, + uint64_t timeout, + object_table_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context); + +/* Data that is needed to register new object available callbacks with the state + * database. */ typedef struct { object_table_object_available_cb object_available_cb; void *subscribe_context; } object_table_subscribe_data; - #endif /* OBJECT_TABLE_H */ diff --git a/state/redis.c b/state/redis.c index 8d94951..a41c73b 100644 --- a/state/redis.c +++ b/state/redis.c @@ -42,9 +42,9 @@ } \ \ if (outstanding_callbacks_find(cb_data) == NULL) \ - /* the callback data structure has been - * already freed; just ignore this reply */ \ - return; + /* the callback data structure has been \ + * already freed; just ignore this reply */ \ + return; db_handle *db_connect(const char *address, int port, @@ -142,12 +142,14 @@ void redis_object_table_add(table_callback_data *cb_data) { CHECK(cb_data); if (outstanding_callbacks_find(cb_data) == NULL) - /* the callback data structure has been already freed; just ignore this reply */ + /* the callback data structure has been already freed; just ignore this + * reply */ return; db_handle *db = cb_data->db_handle; - redisAsyncCommand(db->context, redis_object_table_add_cb, cb_data, "SADD obj:%b %d", - &cb_data->id.id[0], UNIQUE_ID_SIZE, db->client_id); + redisAsyncCommand(db->context, redis_object_table_add_cb, cb_data, + "SADD obj:%b %d", &cb_data->id.id[0], UNIQUE_ID_SIZE, + db->client_id); if (db->context->err) { LOG_REDIS_ERR(db->context, "could not add object_table entry"); } @@ -178,8 +180,8 @@ void redis_get_cached_service(db_handle *db, int index, const char **manager) { HASH_FIND_INT(db->service_cache, &index, entry); if (!entry) { /* This is a very rare case. */ - redisReply *reply = redisCommand(db->sync_context, "HGET %s %lld", - db->client_type, index); + redisReply *reply = + redisCommand(db->sync_context, "HGET %s %lld", db->client_type, index); CHECK(reply->type == REDIS_REPLY_STRING); entry = malloc(sizeof(service_cache_entry)); entry->service_id = index; @@ -206,7 +208,7 @@ void redis_object_table_get_entry(redisAsyncContext *c, managers[j] = atoi(reply->element[j]->str); redis_get_cached_service(db, managers[j], manager_vector + j); } - + object_table_lookup_done_cb done_cb = cb_data->done_cb; done_cb(cb_data->id, manager_count, manager_vector, cb_data->user_context); /* remove timer */ @@ -219,7 +221,6 @@ void redis_object_table_get_entry(redisAsyncContext *c, } } - void object_table_redis_callback(redisAsyncContext *c, void *r, void *privdata) { @@ -254,10 +255,12 @@ void redis_object_table_subscribe(table_callback_data *cb_data) { /* subscribe to key notification associated to object id */ redisAsyncCommand(db->sub_context, object_table_redis_callback, cb_data, - "SUBSCRIBE __keyspace@0__:%b add", (char *) &cb_data->id.id[0], UNIQUE_ID_SIZE); + "SUBSCRIBE __keyspace@0__:%b add", + (char *) &cb_data->id.id[0], UNIQUE_ID_SIZE); if (db->sub_context->err) { - LOG_REDIS_ERR(db->sub_context, "error in redis_object_table_subscribe_callback"); + LOG_REDIS_ERR(db->sub_context, + "error in redis_object_table_subscribe_callback"); } } @@ -265,42 +268,45 @@ void redis_object_table_subscribe(table_callback_data *cb_data) { * ==== task_log callbacks ==== */ - void redis_task_log_publish(table_callback_data *cb_data) { db_handle *db = cb_data->db_handle; task_instance *task_instance = cb_data->data; task_iid task_iid = *task_instance_id(task_instance); node_id node = *task_instance_node(task_instance); int32_t state = *task_instance_state(task_instance); - + LOG_DEBUG("Called log_publish callback"); - /* Check whether the vector (requests_info) indicating the status of the requests has been allocated. - * If was not allocate it, allocate it and initialize it. - * This vector has an entry for each redis command, and it stores true if a reply for that command - * has been received, and false otherwise. - * The first entry in the callback corresponds to RPUSH, and the second entry to PUBLISH. - */ +/* Check whether the vector (requests_info) indicating the status of the + * requests has been allocated. + * If was not allocate it, allocate it and initialize it. + * This vector has an entry for each redis command, and it stores true if a + * reply for that command + * has been received, and false otherwise. + * The first entry in the callback corresponds to RPUSH, and the second entry to + * PUBLISH. + */ #define NUM_DB_REQUESTS 2 -#define PUSH_INDEX 0 -#define PUBLISH_INDEX 1 +#define PUSH_INDEX 0 +#define PUBLISH_INDEX 1 if (cb_data->requests_info == NULL) { cb_data->requests_info = malloc(NUM_DB_REQUESTS * sizeof(bool)); for (int i = 0; i < NUM_DB_REQUESTS; i++) { - ((bool *)cb_data->requests_info)[i] = false; + ((bool *) cb_data->requests_info)[i] = false; } } - if (((bool *)cb_data->requests_info)[PUSH_INDEX] == false) { + if (((bool *) cb_data->requests_info)[PUSH_INDEX] == false) { if (*task_instance_state(task_instance) == TASK_STATUS_WAITING) { - redisAsyncCommand(db->context, redis_task_log_publish_push_cb, cb_data, "RPUSH tasklog:%b %b", - (char *) &task_iid.id[0], UNIQUE_ID_SIZE, - (char *) task_instance, task_instance_size(task_instance)); + redisAsyncCommand(db->context, redis_task_log_publish_push_cb, cb_data, + "RPUSH tasklog:%b %b", (char *) &task_iid.id[0], + UNIQUE_ID_SIZE, (char *) task_instance, + task_instance_size(task_instance)); } else { task_update update = {.state = state, .node = node}; - redisAsyncCommand(db->context, redis_task_log_publish_push_cb, cb_data, "RPUSH tasklog:%b %b", - (char *) &task_iid.id[0], UNIQUE_ID_SIZE, - (char *) &update, sizeof(update)); + redisAsyncCommand(db->context, redis_task_log_publish_push_cb, cb_data, + "RPUSH tasklog:%b %b", (char *) &task_iid.id[0], + UNIQUE_ID_SIZE, (char *) &update, sizeof(update)); } if (db->context->err) { @@ -308,10 +314,11 @@ void redis_task_log_publish(table_callback_data *cb_data) { } } - if (((bool *)cb_data->requests_info)[PUBLISH_INDEX] == false) { - redisAsyncCommand(db->context, redis_task_log_publish_publish_cb, cb_data, "PUBLISH task_log:%b:%d %b", - (char *) &node.id[0], UNIQUE_ID_SIZE, state, - (char *) task_instance, task_instance_size(task_instance)); + if (((bool *) cb_data->requests_info)[PUBLISH_INDEX] == false) { + redisAsyncCommand(db->context, redis_task_log_publish_publish_cb, cb_data, + "PUBLISH task_log:%b:%d %b", (char *) &node.id[0], + UNIQUE_ID_SIZE, state, (char *) task_instance, + task_instance_size(task_instance)); if (db->context->err) { LOG_REDIS_ERR(db->context, "error publishing task in task_log_add_task"); @@ -325,9 +332,9 @@ void redis_task_log_publish_push_cb(redisAsyncContext *c, REDIS_CALLBACK_HEADER(db, cb_data, r) CHECK(cb_data->requests_info != NULL); - ((bool *)cb_data->requests_info)[PUSH_INDEX] = true; + ((bool *) cb_data->requests_info)[PUSH_INDEX] = true; - if (((bool *)cb_data->requests_info)[PUBLISH_INDEX] == true) { + if (((bool *) cb_data->requests_info)[PUBLISH_INDEX] == true) { if (cb_data->done_cb) { task_log_done_cb done_cb = cb_data->done_cb; done_cb(cb_data->id, cb_data->user_context); @@ -336,13 +343,15 @@ void redis_task_log_publish_push_cb(redisAsyncContext *c, } } -void redis_task_log_publish_publish_cb(redisAsyncContext *c, void *r, void *privdata) { +void redis_task_log_publish_publish_cb(redisAsyncContext *c, + void *r, + void *privdata) { REDIS_CALLBACK_HEADER(db, cb_data, r) CHECK(cb_data->requests_info != NULL); - ((bool *)cb_data->requests_info)[PUBLISH_INDEX] = true; + ((bool *) cb_data->requests_info)[PUBLISH_INDEX] = true; - if (((bool *)cb_data->requests_info)[PUSH_INDEX] == true) { + if (((bool *) cb_data->requests_info)[PUSH_INDEX] == true) { if (cb_data->done_cb) { task_log_done_cb done_cb = cb_data->done_cb; done_cb(cb_data->id, cb_data->user_context); @@ -351,10 +360,7 @@ void redis_task_log_publish_publish_cb(redisAsyncContext *c, void *r, void *priv } } - -void task_log_redis_callback(redisAsyncContext *c, - void *r, - void *privdata) { +void task_log_redis_callback(redisAsyncContext *c, void *r, void *privdata) { REDIS_CALLBACK_HEADER(db, cb_data, r) redisReply *reply = r; @@ -383,7 +389,6 @@ void task_log_redis_callback(redisAsyncContext *c, task_instance_free(instance); } - void redis_task_log_subscribe(table_callback_data *cb_data) { db_handle *db = cb_data->db_handle; task_log_subscribe_data *data = cb_data->data; diff --git a/state/redis.h b/state/redis.h index 4815134..0867394 100644 --- a/state/redis.h +++ b/state/redis.h @@ -43,7 +43,9 @@ struct db_handle { UT_array *callback_freelist; }; -void redis_object_table_get_entry(redisAsyncContext *c, void *r, void *privdata); +void redis_object_table_get_entry(redisAsyncContext *c, + void *r, + void *privdata); void object_table_lookup_callback(redisAsyncContext *c, void *r, @@ -55,45 +57,49 @@ void object_table_lookup_callback(redisAsyncContext *c, /** * Lookup object table entry in redis. - * @param cb_data Data structure containing redis connection and timeout information. + * @param cb_data Data structure containing redis connection and timeout + * information. * @return Void. */ void redis_object_table_lookup(table_callback_data *cb_data); /** * Add an entry to the object table in redis. - * @param cb_data Data structure containing redis connection and timeout information. + * @param cb_data Data structure containing redis connection and timeout + * information. * @return Void. */ void redis_object_table_add(table_callback_data *cb_data); /** * Subscribe to learn when a new object becomes available. - * @param cb_data Data structure containing redis connection and timeout information. + * @param cb_data Data structure containing redis connection and timeout + * information. * @return Void. */ void redis_object_table_subscribe(table_callback_data *cb_data); - /* * ==== Redis task table function ===== */ /** * Add or update task log entry with new scheduling information. - * @param cb_data Data structure containing redis connection and timeout information. + * @param cb_data Data structure containing redis connection and timeout + * information. * @return Void. */ void redis_task_log_publish(table_callback_data *cb_data); - /** * Callback invoked when the replya from the task push command is received. * @param c Redis context. * @param r Reply (not used). * @param privdata Data associated to the callback. */ -void redis_task_log_publish_push_cb(redisAsyncContext *c, void *r, void *privdata); +void redis_task_log_publish_push_cb(redisAsyncContext *c, + void *r, + void *privdata); /** * Callback invoked when the replya from the task publish command is received. @@ -101,12 +107,14 @@ void redis_task_log_publish_push_cb(redisAsyncContext *c, void *r, void *privdat * @param r Reply (not used). * @param privdata Data associated to the callback. */ -void redis_task_log_publish_publish_cb(redisAsyncContext *c, void *r, void *privdata); - +void redis_task_log_publish_publish_cb(redisAsyncContext *c, + void *r, + void *privdata); /** * Subscribe to updates of the task log. - * @param cb_data Data structure containing redis connection and timeout information. + * @param cb_data Data structure containing redis connection and timeout + * information. * @return Void. */ void redis_task_log_subscribe(table_callback_data *cb_data); diff --git a/test/db_tests.c b/test/db_tests.c index b59025a..8d1c3f2 100644 --- a/test/db_tests.c +++ b/test/db_tests.c @@ -26,9 +26,7 @@ char received_port1[6] = {0}; char received_addr2[16] = {0}; char received_port2[6] = {0}; -typedef struct { - int test_number; -} user_context; +typedef struct { int test_number; } user_context; const int TEST_NUMBER = 10; @@ -53,14 +51,12 @@ void lookup_done_cb(object_id object_id, } /* Entry added to database successfully. */ -void add_done_cb(object_id object_id, void* user_context) { - +void add_done_cb(object_id object_id, void *user_context) { } /* Test if we got a timeout callback if we couldn't connect database. */ -void timeout_cb(object_id object_id, - void *context) { - user_context *uc = (user_context *)context; +void timeout_cb(object_id object_id, void *context) { + user_context *uc = (user_context *) context; CHECK(uc->test_number == TEST_NUMBER) } @@ -78,13 +74,16 @@ TEST object_table_lookup_test(void) { db_attach(db1, loop); db_attach(db2, loop); unique_id id = globally_unique_id(); - object_table_add(db1, id, NUM_RETRIES, TIMEOUT, add_done_cb, timeout_cb, NULL); - object_table_add(db2, id, NUM_RETRIES, TIMEOUT, add_done_cb, timeout_cb, NULL); + object_table_add(db1, id, NUM_RETRIES, TIMEOUT, add_done_cb, timeout_cb, + NULL); + object_table_add(db2, id, NUM_RETRIES, TIMEOUT, add_done_cb, timeout_cb, + NULL); event_loop_add_timer(loop, 200, timeout_handler, NULL); event_loop_run(loop); user_context user_context; user_context.test_number = TEST_NUMBER; - object_table_lookup(db1, id, NUM_RETRIES, TIMEOUT, lookup_done_cb, timeout_cb, NULL); + object_table_lookup(db1, id, NUM_RETRIES, TIMEOUT, lookup_done_cb, timeout_cb, + NULL); event_loop_add_timer(loop, 200, timeout_handler, NULL); event_loop_run(loop); int port1 = atoi(received_port1); @@ -113,9 +112,10 @@ TEST task_log_test(void) { db_attach(db, loop); node_id node = globally_unique_id(); task_spec *task = example_task(); - task_instance *instance = - make_task_instance(globally_unique_id(), task, TASK_STATUS_SCHEDULED, node); - task_log_subscribe(db, node, TASK_STATUS_SCHEDULED, task_log_test_callback, instance, NUM_RETRIES, TIMEOUT, NULL, NULL, NULL); + task_instance *instance = make_task_instance(globally_unique_id(), task, + TASK_STATUS_SCHEDULED, node); + task_log_subscribe(db, node, TASK_STATUS_SCHEDULED, task_log_test_callback, + instance, NUM_RETRIES, TIMEOUT, NULL, NULL, NULL); task_log_publish(db, instance, NUM_RETRIES, TIMEOUT, NULL, NULL, NULL); event_loop_add_timer(loop, 200, timeout_handler, NULL); event_loop_run(loop); @@ -142,7 +142,9 @@ TEST task_log_all_test(void) { globally_unique_id(), task, TASK_STATUS_SCHEDULED, globally_unique_id()); task_instance *instance2 = make_task_instance( globally_unique_id(), task, TASK_STATUS_SCHEDULED, globally_unique_id()); - task_log_subscribe(db, NIL_ID, TASK_STATUS_SCHEDULED, task_log_all_test_callback, NULL, NUM_RETRIES, TIMEOUT, NULL, NULL, NULL); + task_log_subscribe(db, NIL_ID, TASK_STATUS_SCHEDULED, + task_log_all_test_callback, NULL, NUM_RETRIES, TIMEOUT, + NULL, NULL, NULL); event_loop_add_timer(loop, 50, timeout_handler, NULL); event_loop_run(loop); /* TODO(pcm): Get rid of this sleep once the robust pubsub is implemented. */ diff --git a/test/object_table_tests.c b/test/object_table_tests.c index bd025d0..5712d9a 100644 --- a/test/object_table_tests.c +++ b/test/object_table_tests.c @@ -29,17 +29,17 @@ void lookup_done_cb(object_id object_id, void lookup_fail_cb(unique_id id, void *user_data) { lookup_failed = 1; - CHECK(user_data == (void*) lookup_timeout_context); + CHECK(user_data == (void *) lookup_timeout_context); event_loop_stop(g_loop); } TEST lookup_timeout_test(void) { g_loop = event_loop_create(); - db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", - 1234); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); db_attach(db, g_loop); object_table_lookup(db, NIL_ID, 5, 100, lookup_done_cb, lookup_fail_cb, - (void*) lookup_timeout_context); + (void *) lookup_timeout_context); /* Disconnect the database to see if the lookup times out. */ close(db->context->c.fd); event_loop_run(g_loop); @@ -54,24 +54,24 @@ TEST lookup_timeout_test(void) { const char *add_timeout_context = "add_timeout"; int add_failed = 0; -void add_done_cb(object_id object_id, - void *user_context) { +void add_done_cb(object_id object_id, void *user_context) { /* The done callback should not be called. */ CHECK(0); } void add_fail_cb(unique_id id, void *user_data) { add_failed = 1; - CHECK(user_data == (void*) add_timeout_context); + CHECK(user_data == (void *) add_timeout_context); event_loop_stop(g_loop); } TEST add_timeout_test(void) { g_loop = event_loop_create(); - db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", - 1234); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); db_attach(db, g_loop); - object_table_add(db, NIL_ID, 5, 100, add_done_cb, add_fail_cb, (void*) add_timeout_context); + object_table_add(db, NIL_ID, 5, 100, add_done_cb, add_fail_cb, + (void *) add_timeout_context); /* Disconnect the database to see if the lookup times out. */ close(db->context->c.fd); event_loop_run(g_loop); @@ -81,30 +81,29 @@ TEST add_timeout_test(void) { PASS(); } - /* === Test subscribe timeout === */ const char *subscribe_timeout_context = "subscribe_timeout"; int subscribe_failed = 0; -void subscribe_done_cb(object_id object_id, - void *user_context) { +void subscribe_done_cb(object_id object_id, void *user_context) { /* The done callback should not be called. */ CHECK(0); } void subscribe_fail_cb(unique_id id, void *user_data) { subscribe_failed = 1; - CHECK(user_data == (void*) subscribe_timeout_context); + CHECK(user_data == (void *) subscribe_timeout_context); event_loop_stop(g_loop); } TEST subscribe_timeout_test(void) { g_loop = event_loop_create(); - db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", - 1234); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); db_attach(db, g_loop); - object_table_subscribe(db, NIL_ID, NULL, NULL, 5, 100, subscribe_done_cb, subscribe_fail_cb, (void *) subscribe_timeout_context); + object_table_subscribe(db, NIL_ID, NULL, NULL, 5, 100, subscribe_done_cb, + subscribe_fail_cb, (void *) subscribe_timeout_context); /* Disconnect the database to see if the lookup times out. */ close(db->sub_context->c.fd); event_loop_run(g_loop); @@ -114,10 +113,11 @@ TEST subscribe_timeout_test(void) { PASS(); } - /* ==== Test if the retry is working correctly ==== */ -int64_t reconnect_context_cb(event_loop *loop, int64_t timer_id, void *context) { +int64_t reconnect_context_cb(event_loop *loop, + int64_t timer_id, + void *context) { db_handle *db = context; /* Reconnect to redis. This is not reconnecting the pub/sub channel. */ redisAsyncFree(db->context); @@ -158,11 +158,11 @@ void lookup_retry_fail_cb(unique_id id, void *user_data) { TEST lookup_retry_test(void) { g_loop = event_loop_create(); - db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", - 11235); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235); db_attach(db, g_loop); object_table_lookup(db, NIL_ID, 5, 100, lookup_retry_done_cb, - lookup_retry_fail_cb, (void*) lookup_retry_context); + lookup_retry_fail_cb, (void *) lookup_retry_context); /* Disconnect the database to let the lookup time out the first time. */ close(db->context->c.fd); /* Install handler for reconnecting the database. */ @@ -181,8 +181,7 @@ TEST lookup_retry_test(void) { const char *add_retry_context = "add_retry"; int add_retry_succeeded = 0; -void add_retry_done_cb(object_id object_id, - void *user_context) { +void add_retry_done_cb(object_id object_id, void *user_context) { CHECK(user_context == (void *) add_retry_context); add_retry_succeeded = 1; } @@ -194,11 +193,11 @@ void add_retry_fail_cb(unique_id id, void *user_data) { TEST add_retry_test(void) { g_loop = event_loop_create(); - db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", - 11235); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235); db_attach(db, g_loop); - object_table_add(db, NIL_ID, 5, 100, add_retry_done_cb, - add_retry_fail_cb, (void*) add_retry_context); + object_table_add(db, NIL_ID, 5, 100, add_retry_done_cb, add_retry_fail_cb, + (void *) add_retry_context); /* Disconnect the database to let the add time out the first time. */ close(db->context->c.fd); /* Install handler for reconnecting the database. */ @@ -217,7 +216,9 @@ TEST add_retry_test(void) { const char *subscribe_retry_context = "subscribe_retry"; int subscribe_retry_succeeded = 0; -int64_t reconnect_sub_context_cb(event_loop *loop, int64_t timer_id, void *context) { +int64_t reconnect_sub_context_cb(event_loop *loop, + int64_t timer_id, + void *context) { db_handle *db = context; /* Reconnect to redis. This is not reconnecting the pub/sub channel. */ redisAsyncFree(db->sub_context); @@ -230,8 +231,7 @@ int64_t reconnect_sub_context_cb(event_loop *loop, int64_t timer_id, void *conte return EVENT_LOOP_TIMER_DONE; } -void subscribe_retry_done_cb(object_id object_id, - void *user_context) { +void subscribe_retry_done_cb(object_id object_id, void *user_context) { CHECK(user_context == (void *) subscribe_retry_context); subscribe_retry_succeeded = 1; } @@ -243,11 +243,12 @@ void subscribe_retry_fail_cb(unique_id id, void *user_data) { TEST subscribe_retry_test(void) { g_loop = event_loop_create(); - db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", - 11235); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235); db_attach(db, g_loop); - object_table_subscribe(db, NIL_ID, NULL, NULL, 5, 100, subscribe_retry_done_cb, - subscribe_retry_fail_cb, (void*) subscribe_retry_context); + object_table_subscribe(db, NIL_ID, NULL, NULL, 5, 100, + subscribe_retry_done_cb, subscribe_retry_fail_cb, + (void *) subscribe_retry_context); /* Disconnect the database to let the subscribe times out the first time. */ close(db->sub_context->c.fd); /* Install handler for reconnecting the database. */ @@ -265,7 +266,7 @@ TEST subscribe_retry_test(void) { /* === Test lookup late succeed === */ -const char* lookup_late_context = "lookup_late"; +const char *lookup_late_context = "lookup_late"; int lookup_late_failed = 0; void lookup_late_fail_cb(unique_id id, void *user_context) { @@ -283,11 +284,11 @@ void lookup_late_done_cb(object_id object_id, TEST lookup_late_test(void) { g_loop = event_loop_create(); - db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", - 11236); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); db_attach(db, g_loop); object_table_lookup(db, NIL_ID, 0, 0, lookup_late_done_cb, - lookup_late_fail_cb, (void*) lookup_late_context); + lookup_late_fail_cb, (void *) lookup_late_context); /* Install handler for terminating the event loop. */ event_loop_add_timer(g_loop, 750, terminate_event_loop_cb, NULL); /* First process timer events to make sure the timeout is processed before @@ -310,19 +311,18 @@ void add_late_fail_cb(unique_id id, void *user_context) { add_late_failed = 1; } -void add_late_done_cb(object_id object_id, - void *user_context) { +void add_late_done_cb(object_id object_id, void *user_context) { /* This function should never be called. */ CHECK(0); } TEST add_late_test(void) { g_loop = event_loop_create(); - db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", - 11236); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); db_attach(db, g_loop); - object_table_add(db, NIL_ID, 0, 0, add_late_done_cb, - add_late_fail_cb, (void*) add_late_context); + object_table_add(db, NIL_ID, 0, 0, add_late_done_cb, add_late_fail_cb, + (void *) add_late_context); /* Install handler for terminating the event loop. */ event_loop_add_timer(g_loop, 750, terminate_event_loop_cb, NULL); /* First process timer events to make sure the timeout is processed before @@ -345,19 +345,19 @@ void subscribe_late_fail_cb(unique_id id, void *user_context) { subscribe_late_failed = 1; } -void subscribe_late_done_cb(object_id object_id, - void *user_context) { +void subscribe_late_done_cb(object_id object_id, void *user_context) { /* This function should never be called. */ CHECK(0); } TEST subscribe_late_test(void) { g_loop = event_loop_create(); - db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", - 11236); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); db_attach(db, g_loop); object_table_subscribe(db, NIL_ID, NULL, NULL, 0, 0, subscribe_late_done_cb, - subscribe_late_fail_cb, (void*) subscribe_late_context); + subscribe_late_fail_cb, + (void *) subscribe_late_context); /* Install handler for terminating the event loop. */ event_loop_add_timer(g_loop, 750, terminate_event_loop_cb, NULL); /* First process timer events to make sure the timeout is processed before @@ -370,10 +370,9 @@ TEST subscribe_late_test(void) { PASS(); } - /* === Test subscribe object available succeed === */ -const char* subscribe_success_context = "subscribe_success"; +const char *subscribe_success_context = "subscribe_success"; int subscribe_success_done = 0; int subscribe_success_succeeded = 0; @@ -383,32 +382,29 @@ void subscribe_success_fail_cb(unique_id id, void *user_context) { } void subscribe_success_done_cb(object_id object_id, void *user_context) { - object_table_add((db_handle *)user_context, object_id, 0, 0, NULL, NULL, NULL); + object_table_add((db_handle *) user_context, object_id, 0, 0, NULL, NULL, + NULL); subscribe_success_done = 1; } void subscribe_success_object_available_cb(object_id object_id, void *user_context) { - CHECK(user_context == (void*) subscribe_success_context); + CHECK(user_context == (void *) subscribe_success_context); subscribe_success_succeeded = 1; } - TEST subscribe_success_test(void) { g_loop = event_loop_create(); - db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", - 11236); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); db_attach(db, g_loop); unique_id id = globally_unique_id(); - object_table_subscribe(db, id, - subscribe_success_object_available_cb, - (void*) subscribe_success_context, 0, 100, - subscribe_success_done_cb, - subscribe_success_fail_cb, + object_table_subscribe(db, id, subscribe_success_object_available_cb, + (void *) subscribe_success_context, 0, 100, + subscribe_success_done_cb, subscribe_success_fail_cb, (void *) db); - /* Install handler for terminating the event loop. */ event_loop_add_timer(g_loop, 750, terminate_event_loop_cb, NULL); diff --git a/test/task_log_tests.c b/test/task_log_tests.c index 04ca853..a6bc8ee 100644 --- a/test/task_log_tests.c +++ b/test/task_log_tests.c @@ -21,25 +21,25 @@ event_loop *loop; const char *subscribe_timeout_context = "subscribe_timeout"; int subscribe_failed = 0; -void subscribe_done_cb(task_iid task_iid, - void *user_context) { +void subscribe_done_cb(task_iid task_iid, void *user_context) { /* The done callback should not be called. */ CHECK(0); } void subscribe_fail_cb(unique_id id, void *user_data) { subscribe_failed = 1; - CHECK(user_data == (void*) subscribe_timeout_context); + CHECK(user_data == (void *) subscribe_timeout_context); event_loop_stop(loop); } TEST subscribe_timeout_test(void) { loop = event_loop_create(); - db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", - 1234); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); db_attach(db, loop); task_log_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, 5, 100, - subscribe_done_cb, subscribe_fail_cb, (void*) subscribe_timeout_context); + subscribe_done_cb, subscribe_fail_cb, + (void *) subscribe_timeout_context); /* Disconnect the database to see if the subscribe times out. */ close(db->sub_context->c.fd); aeProcessEvents(loop, AE_TIME_EVENTS); @@ -56,26 +56,25 @@ const char *publish_timeout_context = "publish_timeout"; const int publish_test_number = 272; int publish_failed = 0; -void publish_done_cb(task_iid task_iid, - void *user_context) { +void publish_done_cb(task_iid task_iid, void *user_context) { /* The done callback should not be called. */ CHECK(0); } void publish_fail_cb(unique_id id, void *user_data) { publish_failed = 1; - CHECK(user_data == (void*) publish_timeout_context); + CHECK(user_data == (void *) publish_timeout_context); event_loop_stop(loop); } TEST publish_timeout_test(void) { loop = event_loop_create(); - db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", - 1234); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); db_attach(db, loop); task_instance *task = example_task_instance(); - task_log_publish(db, task, 5, 100, - publish_done_cb, publish_fail_cb, (void*) publish_timeout_context); + task_log_publish(db, task, 5, 100, publish_done_cb, publish_fail_cb, + (void *) publish_timeout_context); /* Disconnect the database to see if the publish times out. */ close(db->context->c.fd); aeProcessEvents(loop, AE_TIME_EVENTS); @@ -113,8 +112,7 @@ const char *subscribe_retry_context = "subscribe_retry"; const int subscribe_retry_test_number = 273; int subscribe_retry_succeeded = 0; -void subscribe_retry_done_cb(object_id object_id, - void *user_context) { +void subscribe_retry_done_cb(object_id object_id, void *user_context) { CHECK(user_context == (void *) subscribe_retry_context); subscribe_retry_succeeded = 1; } @@ -126,11 +124,12 @@ void subscribe_retry_fail_cb(unique_id id, void *user_data) { TEST subscribe_retry_test(void) { loop = event_loop_create(); - db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", - 11235); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235); db_attach(db, loop); task_log_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, 5, 100, - subscribe_retry_done_cb, subscribe_retry_fail_cb, (void*) subscribe_retry_context); + subscribe_retry_done_cb, subscribe_retry_fail_cb, + (void *) subscribe_retry_context); /* Disconnect the database to see if the subscribe times out. */ close(db->sub_context->c.fd); /* Install handler for reconnecting the database. */ @@ -149,8 +148,7 @@ TEST subscribe_retry_test(void) { const char *publish_retry_context = "publish_retry"; int publish_retry_succeeded = 0; -void publish_retry_done_cb(object_id object_id, - void *user_context) { +void publish_retry_done_cb(object_id object_id, void *user_context) { CHECK(user_context == (void *) publish_retry_context); publish_retry_succeeded = 1; } @@ -162,12 +160,12 @@ void publish_retry_fail_cb(unique_id id, void *user_data) { TEST publish_retry_test(void) { loop = event_loop_create(); - db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", - 11235); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235); db_attach(db, loop); task_instance *task = example_task_instance(); - task_log_publish(db, task, 5, 100, - publish_retry_done_cb, publish_retry_fail_cb, (void*) publish_retry_context); + task_log_publish(db, task, 5, 100, publish_retry_done_cb, + publish_retry_fail_cb, (void *) publish_retry_context); /* Disconnect the database to see if the publish times out. */ close(db->sub_context->c.fd); /* Install handler for reconnecting the database. */ @@ -194,19 +192,19 @@ void subscribe_late_fail_cb(unique_id id, void *user_context) { subscribe_late_failed = 1; } -void subscribe_late_done_cb(task_iid task_iid, - void *user_context) { +void subscribe_late_done_cb(task_iid task_iid, void *user_context) { /* This function should never be called. */ CHECK(0); } TEST subscribe_late_test(void) { loop = event_loop_create(); - db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", - 11236); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); db_attach(db, loop); task_log_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, 0, 0, - subscribe_late_done_cb, subscribe_late_fail_cb, (void*) subscribe_late_context); + subscribe_late_done_cb, subscribe_late_fail_cb, + (void *) subscribe_late_context); /* Install handler for terminating the event loop. */ event_loop_add_timer(loop, 750, terminate_event_loop_cb, NULL); /* First process timer events to make sure the timeout is processed before @@ -221,7 +219,7 @@ TEST subscribe_late_test(void) { /* === Test publish late succeed === */ -const char* publish_late_context = "publish_late"; +const char *publish_late_context = "publish_late"; int publish_late_failed = 0; void publish_late_fail_cb(unique_id id, void *user_context) { @@ -229,20 +227,19 @@ void publish_late_fail_cb(unique_id id, void *user_context) { publish_late_failed = 1; } -void publish_late_done_cb(task_iid task_iik, - void *user_context) { +void publish_late_done_cb(task_iid task_iik, void *user_context) { /* This function should never be called. */ CHECK(0); } TEST publish_late_test(void) { loop = event_loop_create(); - db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", - 11236); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); db_attach(db, loop); task_instance *task = example_task_instance(); - task_log_publish(db, task, 0, 0, - publish_late_done_cb, publish_late_fail_cb, (void*) publish_late_context); + task_log_publish(db, task, 0, 0, publish_late_done_cb, publish_late_fail_cb, + (void *) publish_late_context); /* Install handler for terminating the event loop. */ event_loop_add_timer(loop, 750, terminate_event_loop_cb, NULL); /* First process timer events to make sure the timeout is processed before