diff --git a/Makefile b/Makefile index 6982b79..4cd48ec 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,10 @@ CC = gcc -CFLAGS = -g -Wall --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -fPIC -I. -Ithirdparty -Ithirdparty/ae -Wno-typedef-redefinition -Werror +CFLAGS = -g -Wall -Wno-typedef-redefinition --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -fPIC -I. -Ithirdparty -Ithirdparty/ae BUILD = build all: hiredis $(BUILD)/libcommon.a -$(BUILD)/libcommon.a: event_loop.o common.o task.o io.o state/redis.o thirdparty/ae/ae.o +$(BUILD)/libcommon.a: event_loop.o common.o task.o io.o state/redis.o state/table.o state/object_table.o state/task_log.o thirdparty/ae/ae.o ar rcs $@ $^ $(BUILD)/common_tests: test/common_tests.c $(BUILD)/libcommon.a @@ -13,6 +13,12 @@ $(BUILD)/common_tests: test/common_tests.c $(BUILD)/libcommon.a $(BUILD)/db_tests: hiredis test/db_tests.c $(BUILD)/libcommon.a $(CC) -o $@ test/db_tests.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS) +$(BUILD)/object_table_tests: hiredis test/object_table_tests.c $(BUILD)/libcommon.a + $(CC) -o $@ test/object_table_tests.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS) + +$(BUILD)/task_log_tests: hiredis test/task_log_tests.c $(BUILD)/libcommon.a + $(CC) -o $@ test/task_log_tests.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS) + $(BUILD)/io_tests: test/io_tests.c $(BUILD)/libcommon.a $(CC) -o $@ $^ $(CFLAGS) @@ -32,9 +38,9 @@ redis: hiredis: git submodule update --init --recursive -- "thirdparty/hiredis" ; cd thirdparty/hiredis ; make -test: hiredis redis $(BUILD)/common_tests $(BUILD)/db_tests $(BUILD)/io_tests $(BUILD)/task_tests $(BUILD)/redis_tests FORCE +test: hiredis redis $(BUILD)/common_tests $(BUILD)/task_log_tests $(BUILD)/object_table_tests $(BUILD)/db_tests $(BUILD)/io_tests $(BUILD)/task_tests $(BUILD)/redis_tests FORCE ./thirdparty/redis-3.2.3/src/redis-server & - sleep 1s ; ./build/common_tests ; ./build/db_tests ; ./build/io_tests ; ./build/task_tests ; ./build/redis_tests + sleep 1s ; ./build/common_tests ; ./build/db_tests ; ./build/task_log_tests ; ./build/object_table_tests ; ./build/io_tests ; ./build/task_tests ; ./build/redis_tests valgrind: test valgrind --leak-check=full --error-exitcode=1 ./build/common_tests diff --git a/common.h b/common.h index f09e91c..f238870 100644 --- a/common.h +++ b/common.h @@ -5,6 +5,9 @@ #include #include #include +#include + +#define RAY_COMMON_DEBUG #ifndef RAY_COMMON_DEBUG #define LOG_DEBUG(M, ...) @@ -36,6 +39,10 @@ } \ } while (0); +/** This macro indicates that this pointer owns the data it is pointing to + * and is responsible for freeing it. */ +#define OWNER + #define UNIQUE_ID_SIZE 20 /* Cleanup method for running tests with the greatest library. diff --git a/event_loop.c b/event_loop.c index 7e78354..d6980cd 100644 --- a/event_loop.c +++ b/event_loop.c @@ -42,15 +42,14 @@ void event_loop_remove_file(event_loop *loop, int fd) { } int64_t event_loop_add_timer(event_loop *loop, - int64_t milliseconds, + int64_t timeout, event_loop_timer_handler handler, void *context) { - return aeCreateTimeEvent(loop, milliseconds, handler, context, NULL); + return aeCreateTimeEvent(loop, timeout, handler, context, NULL); } -void event_loop_remove_timer(event_loop *loop, timer_id timer_id) { - int err = aeDeleteTimeEvent(loop, timer_id); - CHECK(err == AE_OK); /* timer id found? */ +int event_loop_remove_timer(event_loop *loop, int64_t id) { + return aeDeleteTimeEvent(loop, id); } void event_loop_run(event_loop *loop) { diff --git a/event_loop.h b/event_loop.h index 7f0659b..296fd98 100644 --- a/event_loop.h +++ b/event_loop.h @@ -57,16 +57,29 @@ void event_loop_add_file(event_loop *loop, /* Remove a registered file event handler from the event loop. */ void event_loop_remove_file(event_loop *loop, int fd); -/* Register a handler that will be called after a time slice of - * "milliseconds" milliseconds. Can specify a context that will be passed - * as an argument to the handler. Return the id of the time event. */ +/** Register a handler that will be called after a time slice of + * "timeout" milliseconds. + * + * @param loop The event loop. + * @param timeout The timeout in milliseconds. + * @param handler The handler for the timeout. + * @param context User context that can be passed in and will be passed in + * as an argument for the timer handler. + * @return The ID of the timer. + */ int64_t event_loop_add_timer(event_loop *loop, - int64_t milliseconds, + int64_t timeout, event_loop_timer_handler handler, void *context); -/* Remove a registered time event handler from the event loop. */ -void event_loop_remove_timer(event_loop *loop, timer_id timer_id); +/** + * Remove a registered time event handler from the event loop. + * + * @param loop The event loop. + * @param timer_id The ID of the timer to be removed. + * @return Returns 0 if the removal was successful. + */ +int event_loop_remove_timer(event_loop *loop, int64_t timer_id); /* Run the event loop. */ void event_loop_run(event_loop *loop); diff --git a/state/db.h b/state/db.h index 50536e2..96ecf1e 100644 --- a/state/db.h +++ b/state/db.h @@ -3,7 +3,7 @@ #include "event_loop.h" -typedef struct db_handle_impl db_handle; +typedef struct db_handle db_handle; /* Connect to the global system store at address and port. Returns * a handle to the database, which must be freed with db_disconnect diff --git a/state/object_table.c b/state/object_table.c new file mode 100644 index 0000000..8545d01 --- /dev/null +++ b/state/object_table.c @@ -0,0 +1,62 @@ +#include "object_table.h" +#include "redis.h" + +void object_table_lookup(db_handle *db_handle, + object_id object_id, + int retry_count, + uint64_t timeout, + object_table_lookup_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context) { + retry_struct retry; + + retry.count = retry_count; + retry.timeout = timeout; + retry.cb = redis_object_table_lookup; + + init_table_callback(db_handle, object_id, NULL, &retry, done_cb, fail_cb, + user_context); +} + +void object_table_add(db_handle *db_handle, + object_id object_id, + int retry_count, + uint64_t timeout, + object_table_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context) { + retry_struct retry; + + retry.count = retry_count; + retry.timeout = timeout; + retry.cb = redis_object_table_add; + + init_table_callback(db_handle, object_id, NULL, &retry, done_cb, fail_cb, + user_context); +} + +void object_table_subscribe( + db_handle *db_handle, + object_id object_id, + object_table_object_available_cb object_available_cb, + void *subscribe_context, + int retry_count, + uint64_t timeout, + object_table_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context) { + object_table_subscribe_data *sub_data = + malloc(sizeof(object_table_subscribe_data)); + utarray_push_back(db_handle->callback_freelist, &sub_data); + sub_data->object_available_cb = object_available_cb; + sub_data->subscribe_context = subscribe_context; + + retry_struct retry; + + retry.count = retry_count; + retry.timeout = timeout; + retry.cb = redis_object_table_subscribe; + + init_table_callback(db_handle, object_id, sub_data, &retry, done_cb, fail_cb, + user_context); +} diff --git a/state/object_table.h b/state/object_table.h index bab54bc..a0cab0a 100644 --- a/state/object_table.h +++ b/state/object_table.h @@ -1,25 +1,139 @@ +#ifndef OBJECT_TABLE_H +#define OBJECT_TABLE_H + #include "common.h" +#include "table.h" #include "db.h" -/* The callback that is called when the result of a lookup - * in the object table comes back. The callback should free - * the manager_vector array, but NOT the strings they are pointing to. */ -typedef void (*lookup_callback)(object_id object_id, - int manager_count, - const char *manager_vector[], - void *context); +/* + * ==== Lookup call and callback ==== + */ -/* Register a new object with the directory. */ -/* TODO(pcm): Retry, print for each attempt. */ -void object_table_add(db_handle *db, object_id object_id); +/* Callback called when the lookup completes. The callback should free + * the manager_vector array, but NOT the strings they are pointing to. + */ +typedef void (*object_table_lookup_done_cb)(object_id object_id, + int manager_count, + OWNER const char *manager_vector[], + void *user_context); -/* Remove object from the directory. */ -void object_table_remove(db_handle *db, +/** + * Return the list of nodes storing object_id in their plasma stores. + * + * @param db_handle Handle to object_table database. + * @param object_id ID of the object being looked up. + * @param retry_count Number of retries to the database before giving up. + * @param timeout Timout between retries (in milliseconds). + * @param done_callback Function to be called when database returns result. + * @param fail_callback Function to be called if we failed to contact + * database after retry_count retries. + * @param user_context Context passed by the caller. + * @return Void. + */ +void object_table_lookup(db_handle *db_handle, object_id object_id, - const char *manager); + int retry_count, + uint64_t timeout, + object_table_lookup_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context); + +/* + * ==== Add object call and callback ==== + */ + +/* Callback called when the object add/remove operation completes. */ +typedef void (*object_table_done_cb)(object_id object_id, void *user_context); -/* Look up entry from the directory */ -void object_table_lookup(db_handle *db, +/** + * Add the plasma manager that created the db_handle to the + * list of plasma managers that have the object_id. + * + * @param db_handle: Handle to db. + * @param object_id: Object unique identifier. + * @param retry_count: Number of retries after giving up. + * @param done_cb: Callback to be called when lookup completes. + * @param timeout_cb: Callback to be called when lookup timeouts. + * @param user_context: User context to be passed in the callbacks. + * @return Void. + */ +void object_table_add(db_handle *db_handle, + object_id object_id, + int retry_count, + uint64_t timeout, + object_table_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context); + +/* + * ==== Remove object call and callback ==== + */ + +/** + * Object remove function. + * + * @param db_handle: Handle to db. + * @param object_id: Object unique identifier. + * @param retry_count: Number of retries after giving up. + * @param timeout: Timeout after which we retry the lookup. + * @param done_cb: Callback to be called when lookup completes. + * @param fail_cb: Callback to be called when lookup timeouts. + * @param user_context: User context to be passed in the callbacks. + * @return Void. + */ +/* +void object_table_remove(db_handle *db, object_id object_id, lookup_callback callback, void *context); + int retry_count, + uint64_t timeout, + object_table_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context); +*/ + +/* + * ==== Subscribe to be announced when new object available ==== + */ + +/* Callback called when object object_id is available. */ +typedef void (*object_table_object_available_cb)(object_id object_id, + void *user_context); + +/** + * Subcribing to new object available function. + * + * @param db_handle: Handle to db. + * @param object_id: Object unique identifier. + * @param object_available_cb: callback to be called when new object becomes + * available + * @param subscribe_context: caller context which will be passed back in the + * object_available_cb + * @param retry_count: Number of retries after giving up. + * @param timeout: Timeout after which we retry to install subscription. + * @param done_cb: Callback to be called when subscription is installed. + * @param fail_cb: Callback to be called when subscription installation fails. + * @param user_context: User context to be passed in the callbacks. + * @return Void. + */ + +void object_table_subscribe( + db_handle *db, + object_id object_id, + object_table_object_available_cb object_available_cb, + void *subscribe_context, + int retry_count, + uint64_t timeout, + object_table_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context); + +/* Data that is needed to register new object available callbacks with the state + * database. */ +typedef struct { + object_table_object_available_cb object_available_cb; + void *subscribe_context; +} object_table_subscribe_data; + +#endif /* OBJECT_TABLE_H */ diff --git a/state/redis.c b/state/redis.c index d53b0bf..a41c73b 100644 --- a/state/redis.c +++ b/state/redis.c @@ -1,6 +1,7 @@ /* Redis implementation of the global state store */ #include +#include #include #include "hiredis/adapters/ae.h" @@ -9,6 +10,7 @@ #include "common.h" #include "db.h" #include "object_table.h" +#include "task.h" #include "task_log.h" #include "event_loop.h" #include "redis.h" @@ -31,6 +33,19 @@ } \ } while (0); +#define REDIS_CALLBACK_HEADER(DB, CB_DATA, REPLY) \ + db_handle *DB = c->data; \ + table_callback_data *CB_DATA = privdata; \ + \ + if ((REPLY) == NULL) { \ + return; \ + } \ + \ + if (outstanding_callbacks_find(cb_data) == NULL) \ + /* the callback data structure has been \ + * already freed; just ignore this reply */ \ + return; + db_handle *db_connect(const char *address, int port, const char *client_type, @@ -104,131 +119,287 @@ void db_disconnect(db_handle *db) { } void db_attach(db_handle *db, event_loop *loop) { + db->loop = loop; redisAeAttach(loop, db->context); redisAeAttach(loop, db->sub_context); } -void object_table_add(db_handle *db, unique_id object_id) { - redisAsyncCommand(db->context, NULL, NULL, "SADD obj:%b %d", &object_id.id[0], - UNIQUE_ID_SIZE, db->client_id); +/* + * ==== object_table callbacks ==== + */ + +void redis_object_table_add_cb(redisAsyncContext *c, void *r, void *privdata) { + REDIS_CALLBACK_HEADER(db, cb_data, r) + + if (cb_data->done_cb) { + task_log_done_cb done_cb = cb_data->done_cb; + done_cb(cb_data->id, cb_data->user_context); + } + event_loop_remove_timer(db->loop, cb_data->timer_id); +} + +void redis_object_table_add(table_callback_data *cb_data) { + CHECK(cb_data); + + if (outstanding_callbacks_find(cb_data) == NULL) + /* the callback data structure has been already freed; just ignore this + * reply */ + return; + + db_handle *db = cb_data->db_handle; + redisAsyncCommand(db->context, redis_object_table_add_cb, cb_data, + "SADD obj:%b %d", &cb_data->id.id[0], UNIQUE_ID_SIZE, + db->client_id); if (db->context->err) { LOG_REDIS_ERR(db->context, "could not add object_table entry"); } } -void object_table_get_entry(redisAsyncContext *c, void *r, void *privdata) { - db_handle *db = c->data; - lookup_callback_data *cb_data = privdata; +void redis_object_table_lookup(table_callback_data *cb_data) { + CHECK(cb_data); + db_handle *db = cb_data->db_handle; + + /* Call redis asynchronously */ + redisAsyncCommand(db->context, redis_object_table_get_entry, cb_data, + "SMEMBERS obj:%b", &cb_data->id.id[0], UNIQUE_ID_SIZE); + if (db->context->err) { + LOG_REDIS_ERR(db->context, "error in object_table lookup"); + } +} + +/** + * Get an entry from the plasma manager table in redis. + * + * @param db The database handle. + * @param index The index of the plasma manager. + * @param *manager The pointer where the IP address of the manager gets written. + * @return Void. + */ +void redis_get_cached_service(db_handle *db, int index, const char **manager) { + service_cache_entry *entry; + HASH_FIND_INT(db->service_cache, &index, entry); + if (!entry) { + /* This is a very rare case. */ + redisReply *reply = + redisCommand(db->sync_context, "HGET %s %lld", db->client_type, index); + CHECK(reply->type == REDIS_REPLY_STRING); + entry = malloc(sizeof(service_cache_entry)); + entry->service_id = index; + entry->addr = strdup(reply->str); + HASH_ADD_INT(db->service_cache, service_id, entry); + freeReplyObject(reply); + } + *manager = entry->addr; +} + +void redis_object_table_get_entry(redisAsyncContext *c, + void *r, + void *privdata) { + REDIS_CALLBACK_HEADER(db, cb_data, r) redisReply *reply = r; - if (reply == NULL) - return; - int *result = malloc(reply->elements * sizeof(int)); + + int *managers = malloc(reply->elements * sizeof(int)); int64_t manager_count = reply->elements; + if (reply->type == REDIS_REPLY_ARRAY) { + const char **manager_vector = malloc(manager_count * sizeof(char *)); for (int j = 0; j < reply->elements; j++) { CHECK(reply->element[j]->type == REDIS_REPLY_STRING); - result[j] = atoi(reply->element[j]->str); - service_cache_entry *entry; - HASH_FIND_INT(db->service_cache, &result[j], entry); - if (!entry) { - redisReply *reply = redisCommand(db->sync_context, "HGET %s %lld", - db->client_type, result[j]); - CHECK(reply->type == REDIS_REPLY_STRING); - entry = malloc(sizeof(service_cache_entry)); - entry->service_id = result[j]; - entry->addr = strdup(reply->str); - HASH_ADD_INT(db->service_cache, service_id, entry); - freeReplyObject(reply); - } + managers[j] = atoi(reply->element[j]->str); + redis_get_cached_service(db, managers[j], manager_vector + j); } + + object_table_lookup_done_cb done_cb = cb_data->done_cb; + done_cb(cb_data->id, manager_count, manager_vector, cb_data->user_context); + /* remove timer */ + event_loop_remove_timer(cb_data->db_handle->loop, cb_data->timer_id); + free(privdata); + free(managers); } else { LOG_ERR("expected integer or string, received type %d", reply->type); exit(-1); } - const char **manager_vector = malloc(manager_count * sizeof(char *)); - for (int j = 0; j < manager_count; ++j) { - service_cache_entry *entry; - HASH_FIND_INT(db->service_cache, &result[j], entry); - manager_vector[j] = entry->addr; - } - cb_data->callback(cb_data->object_id, manager_count, manager_vector, - cb_data->context); - free(privdata); - free(result); -} - -void object_table_lookup(db_handle *db, - object_id object_id, - lookup_callback callback, - void *context) { - lookup_callback_data *cb_data = malloc(sizeof(lookup_callback_data)); - cb_data->callback = callback; - cb_data->object_id = object_id; - cb_data->context = context; - redisAsyncCommand(db->context, object_table_get_entry, cb_data, - "SMEMBERS obj:%b", &object_id.id[0], UNIQUE_ID_SIZE); - if (db->context->err) { - LOG_REDIS_ERR(db->context, "error in object_table lookup"); +} + +void object_table_redis_callback(redisAsyncContext *c, + void *r, + void *privdata) { + REDIS_CALLBACK_HEADER(db, cb_data, r) + redisReply *reply = r; + + CHECK(reply->type == REDIS_REPLY_ARRAY); + /* First entry is message type, second is topic, third is payload. */ + CHECK(reply->elements > 2); + /* If this condition is true, we got the initial message that acknowledged the + * subscription. */ + if (strncmp(reply->element[1]->str, "add", 3) != 0) { + if (cb_data->done_cb) { + object_table_done_cb done_cb = cb_data->done_cb; + done_cb(cb_data->id, cb_data->user_context); + } + event_loop_remove_timer(db->loop, cb_data->timer_id); + return; + } + /* Otherwise, parse the task and call the callback. */ + CHECK(privdata); + object_table_subscribe_data *data = cb_data->data; + + if (data->object_available_cb) { + data->object_available_cb(cb_data->id, data->subscribe_context); } } -void task_log_add_task(db_handle *db, task_instance *task_instance) { - task_iid task_iid = *task_instance_id(task_instance); - redisAsyncCommand(db->context, NULL, NULL, "HMSET tasklog:%b 0 %b", - (char *) &task_iid.id[0], UNIQUE_ID_SIZE, - (char *) task_instance, task_instance_size(task_instance)); - if (db->context->err) { - LOG_REDIS_ERR(db->context, "error setting task in task_log_add_task"); +void redis_object_table_subscribe(table_callback_data *cb_data) { + db_handle *db = cb_data->db_handle; + + /* subscribe to key notification associated to object id */ + + redisAsyncCommand(db->sub_context, object_table_redis_callback, cb_data, + "SUBSCRIBE __keyspace@0__:%b add", + (char *) &cb_data->id.id[0], UNIQUE_ID_SIZE); + + if (db->sub_context->err) { + LOG_REDIS_ERR(db->sub_context, + "error in redis_object_table_subscribe_callback"); } +} + +/* + * ==== task_log callbacks ==== + */ + +void redis_task_log_publish(table_callback_data *cb_data) { + db_handle *db = cb_data->db_handle; + task_instance *task_instance = cb_data->data; + task_iid task_iid = *task_instance_id(task_instance); node_id node = *task_instance_node(task_instance); int32_t state = *task_instance_state(task_instance); - redisAsyncCommand(db->context, NULL, NULL, "PUBLISH task_log:%b:%d %b", - (char *) &node.id[0], UNIQUE_ID_SIZE, state, - (char *) task_instance, task_instance_size(task_instance)); - if (db->context->err) { - LOG_REDIS_ERR(db->context, "error publishing task in task_log_add_task"); + + LOG_DEBUG("Called log_publish callback"); + +/* Check whether the vector (requests_info) indicating the status of the + * requests has been allocated. + * If was not allocate it, allocate it and initialize it. + * This vector has an entry for each redis command, and it stores true if a + * reply for that command + * has been received, and false otherwise. + * The first entry in the callback corresponds to RPUSH, and the second entry to + * PUBLISH. + */ +#define NUM_DB_REQUESTS 2 +#define PUSH_INDEX 0 +#define PUBLISH_INDEX 1 + if (cb_data->requests_info == NULL) { + cb_data->requests_info = malloc(NUM_DB_REQUESTS * sizeof(bool)); + for (int i = 0; i < NUM_DB_REQUESTS; i++) { + ((bool *) cb_data->requests_info)[i] = false; + } + } + + if (((bool *) cb_data->requests_info)[PUSH_INDEX] == false) { + if (*task_instance_state(task_instance) == TASK_STATUS_WAITING) { + redisAsyncCommand(db->context, redis_task_log_publish_push_cb, cb_data, + "RPUSH tasklog:%b %b", (char *) &task_iid.id[0], + UNIQUE_ID_SIZE, (char *) task_instance, + task_instance_size(task_instance)); + } else { + task_update update = {.state = state, .node = node}; + redisAsyncCommand(db->context, redis_task_log_publish_push_cb, cb_data, + "RPUSH tasklog:%b %b", (char *) &task_iid.id[0], + UNIQUE_ID_SIZE, (char *) &update, sizeof(update)); + } + + if (db->context->err) { + LOG_REDIS_ERR(db->context, "error setting task in task_log_add_task"); + } + } + + if (((bool *) cb_data->requests_info)[PUBLISH_INDEX] == false) { + redisAsyncCommand(db->context, redis_task_log_publish_publish_cb, cb_data, + "PUBLISH task_log:%b:%d %b", (char *) &node.id[0], + UNIQUE_ID_SIZE, state, (char *) task_instance, + task_instance_size(task_instance)); + + if (db->context->err) { + LOG_REDIS_ERR(db->context, "error publishing task in task_log_add_task"); + } } } -void task_log_redis_callback(redisAsyncContext *c, - void *reply, - void *privdata) { - redisReply *r = reply; - if (reply == NULL) - return; - CHECK(r->type == REDIS_REPLY_ARRAY); +void redis_task_log_publish_push_cb(redisAsyncContext *c, + void *r, + void *privdata) { + REDIS_CALLBACK_HEADER(db, cb_data, r) + + CHECK(cb_data->requests_info != NULL); + ((bool *) cb_data->requests_info)[PUSH_INDEX] = true; + + if (((bool *) cb_data->requests_info)[PUBLISH_INDEX] == true) { + if (cb_data->done_cb) { + task_log_done_cb done_cb = cb_data->done_cb; + done_cb(cb_data->id, cb_data->user_context); + } + event_loop_remove_timer(db->loop, cb_data->timer_id); + } +} + +void redis_task_log_publish_publish_cb(redisAsyncContext *c, + void *r, + void *privdata) { + REDIS_CALLBACK_HEADER(db, cb_data, r) + + CHECK(cb_data->requests_info != NULL); + ((bool *) cb_data->requests_info)[PUBLISH_INDEX] = true; + + if (((bool *) cb_data->requests_info)[PUSH_INDEX] == true) { + if (cb_data->done_cb) { + task_log_done_cb done_cb = cb_data->done_cb; + done_cb(cb_data->id, cb_data->user_context); + } + event_loop_remove_timer(db->loop, cb_data->timer_id); + } +} + +void task_log_redis_callback(redisAsyncContext *c, void *r, void *privdata) { + REDIS_CALLBACK_HEADER(db, cb_data, r) + redisReply *reply = r; + + CHECK(reply->type == REDIS_REPLY_ARRAY); /* First entry is message type, second is topic, third is payload. */ - CHECK(r->elements > 2); + CHECK(reply->elements > 2); /* If this condition is true, we got the initial message that acknowledged the * subscription. */ - if (r->element[2]->str == NULL) { + if (reply->element[2]->str == NULL) { + if (cb_data->done_cb) { + task_log_done_cb done_cb = cb_data->done_cb; + done_cb(cb_data->id, cb_data->user_context); + } + event_loop_remove_timer(db->loop, cb_data->timer_id); return; } /* Otherwise, parse the task and call the callback. */ CHECK(privdata); - task_log_callback_data *callback_data = privdata; - task_instance *instance = malloc(r->element[2]->len); - memcpy(instance, r->element[2]->str, r->element[2]->len); - callback_data->callback(instance, callback_data->userdata); + task_log_subscribe_data *data = cb_data->data; + + task_instance *instance = malloc(reply->element[2]->len); + memcpy(instance, reply->element[2]->str, reply->element[2]->len); + if (data->subscribe_cb) { + data->subscribe_cb(instance, data->subscribe_context); + } task_instance_free(instance); } -void task_log_register_callback(db_handle *db, - task_log_callback callback, - node_id node, - int32_t state, - void *userdata) { - task_log_callback_data *callback_data = - malloc(sizeof(task_log_callback_data)); - utarray_push_back(db->callback_freelist, &callback_data); - callback_data->callback = callback; - callback_data->userdata = userdata; - if (memcmp(&node.id[0], &NIL_ID.id[0], UNIQUE_ID_SIZE) == 0) { - redisAsyncCommand(db->sub_context, task_log_redis_callback, callback_data, - "PSUBSCRIBE task_log:*:%d", state); + +void redis_task_log_subscribe(table_callback_data *cb_data) { + db_handle *db = cb_data->db_handle; + task_log_subscribe_data *data = cb_data->data; + + if (memcmp(&data->node.id[0], &NIL_ID.id[0], UNIQUE_ID_SIZE) == 0) { + redisAsyncCommand(db->sub_context, task_log_redis_callback, cb_data, + "PSUBSCRIBE task_log:*:%d", data->state_filter); } else { - redisAsyncCommand(db->sub_context, task_log_redis_callback, callback_data, - "SUBSCRIBE task_log:%b:%d", (char *) &node.id[0], - UNIQUE_ID_SIZE, state); + redisAsyncCommand(db->sub_context, task_log_redis_callback, cb_data, + "SUBSCRIBE task_log:%b:%d", (char *) &data->node.id[0], + UNIQUE_ID_SIZE, data->state_filter); } if (db->sub_context->err) { LOG_REDIS_ERR(db->sub_context, "error in task_log_register_callback"); diff --git a/state/redis.h b/state/redis.h index cf368b9..0867394 100644 --- a/state/redis.h +++ b/state/redis.h @@ -19,50 +19,104 @@ typedef struct { UT_hash_handle hh; } service_cache_entry; -typedef struct { - /* The callback that will be called. */ - task_log_callback callback; - /* Userdata associated with the callback. */ - void *userdata; -} task_log_callback_data; - -struct db_handle_impl { - /* String that identifies this client type. */ +struct db_handle { + /** String that identifies this client type. */ char *client_type; - /* Unique ID for this client within the type. */ + /** Unique ID for this client within the type. */ int64_t client_id; - /* Redis context for this global state store connection. */ + /** Redis context for this global state store connection. */ redisAsyncContext *context; - /* Redis context for "subscribe" communication. + /** Redis context for "subscribe" communication. * Yes, we need a separate one for that, see * https://github.com/redis/hiredis/issues/55 */ redisAsyncContext *sub_context; - /* The event loop this global state store connection is part of. */ + /** The event loop this global state store connection is part of. */ event_loop *loop; - /* Index of the database connection in the event loop */ + /** Index of the database connection in the event loop */ int64_t db_index; - /* Cache for the IP addresses of services. */ + /** Cache for the IP addresses of services. */ service_cache_entry *service_cache; - /* Redis context for synchronous connections. + /** Redis context for synchronous connections. * Should only be used very rarely, it is not asynchronous. */ redisContext *sync_context; /* Data structure for callbacks that needs to be freed. */ UT_array *callback_freelist; }; -typedef struct { - /* The callback that will be called. */ - lookup_callback callback; - /* Object ID that is looked up. */ - object_id object_id; - /* Data context for the callback. */ - void *context; -} lookup_callback_data; - -void object_table_get_entry(redisAsyncContext *c, void *r, void *privdata); +void redis_object_table_get_entry(redisAsyncContext *c, + void *r, + void *privdata); void object_table_lookup_callback(redisAsyncContext *c, void *r, void *privdata); -#endif +/* + * ==== Redis object table functions ==== + */ + +/** + * Lookup object table entry in redis. + * @param cb_data Data structure containing redis connection and timeout + * information. + * @return Void. + */ +void redis_object_table_lookup(table_callback_data *cb_data); + +/** + * Add an entry to the object table in redis. + * @param cb_data Data structure containing redis connection and timeout + * information. + * @return Void. + */ +void redis_object_table_add(table_callback_data *cb_data); + +/** + * Subscribe to learn when a new object becomes available. + * @param cb_data Data structure containing redis connection and timeout + * information. + * @return Void. + */ +void redis_object_table_subscribe(table_callback_data *cb_data); + +/* + * ==== Redis task table function ===== + */ + +/** + * Add or update task log entry with new scheduling information. + * @param cb_data Data structure containing redis connection and timeout + * information. + * @return Void. + */ +void redis_task_log_publish(table_callback_data *cb_data); + +/** + * Callback invoked when the replya from the task push command is received. + * @param c Redis context. + * @param r Reply (not used). + * @param privdata Data associated to the callback. + */ +void redis_task_log_publish_push_cb(redisAsyncContext *c, + void *r, + void *privdata); + +/** + * Callback invoked when the replya from the task publish command is received. + * @param c Redis context. + * @param r Reply (not used). + * @param privdata Data associated to the callback. + */ +void redis_task_log_publish_publish_cb(redisAsyncContext *c, + void *r, + void *privdata); + +/** + * Subscribe to updates of the task log. + * @param cb_data Data structure containing redis connection and timeout + * information. + * @return Void. + */ +void redis_task_log_subscribe(table_callback_data *cb_data); + +#endif /* REDIS_H */ diff --git a/state/table.c b/state/table.c new file mode 100644 index 0000000..4e3b0de --- /dev/null +++ b/state/table.c @@ -0,0 +1,112 @@ +#include "table.h" + +#include +#include "redis.h" + +table_callback_data *init_table_callback(db_handle *db_handle, + unique_id id, + void *data, + retry_struct *retry, + table_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context) { + CHECK(db_handle); + CHECK(db_handle->loop); + CHECK(retry); + /* Allocate and initialize callback data structure for object table */ + table_callback_data *cb_data = malloc(sizeof(table_callback_data)); + CHECKM(cb_data != NULL, "Memory allocation error!") + cb_data->id = id; + cb_data->done_cb = done_cb; + cb_data->fail_cb = fail_cb; + cb_data->retry.cb = retry->cb; + cb_data->retry.count = retry->count; + cb_data->retry.timeout = retry->timeout; + cb_data->data = data; + cb_data->requests_info = NULL; + cb_data->user_context = user_context; + cb_data->db_handle = db_handle; + /* Add timer and initialize it. */ + cb_data->timer_id = event_loop_add_timer(db_handle->loop, + retry->timeout, + table_timeout_handler, + cb_data); + outstanding_callbacks_add(cb_data); + + cb_data->retry.cb(cb_data); + + return cb_data; +} + +void destroy_table_callback(table_callback_data *cb_data) { + CHECK(cb_data != NULL); + + // if (cb_data->data) + // free(cb_data->data); + + if (cb_data->requests_info) + free(cb_data->requests_info); + + outstanding_callbacks_remove(cb_data); + + /* Timer is removed via EVENT_LOOP_TIMER_DONE in the timeout callback. */ + free(cb_data); +} + +int64_t table_timeout_handler(event_loop *loop, + int64_t timer_id, + void *user_context) { + CHECK(loop != NULL); + CHECK(user_context != NULL); + table_callback_data *cb_data = (table_callback_data *) user_context; + + CHECK(cb_data->retry.count >= 0) + LOG_DEBUG("retrying operation, retry_count = %d", cb_data->retry.count); + + if (cb_data->retry.count == 0) { + /* We didn't get a response from the database after exhausting all retries; + * let user know, cleanup the state, and remove the timer. */ + if (cb_data->fail_cb) { + cb_data->fail_cb(cb_data->id, cb_data->user_context); + } + destroy_table_callback(cb_data); + return EVENT_LOOP_TIMER_DONE; + } + + /* Decrement retry count and try again. */ + cb_data->retry.count--; + cb_data->retry.cb(cb_data); + return cb_data->retry.timeout; +} + +/** + * List of outstanding callbacks. We need to maintain this list for the case in which a reply is received + * after te last timeout expires and all relevant data structures are removed. In this case we just need + * to ignore the reply. + * */ +static outstanding_callback *outstanding_cbs = NULL; + +void outstanding_callbacks_add(table_callback_data *key) { + outstanding_callback *outstanding_cb = malloc(sizeof(outstanding_callback)); + + CHECK(outstanding_cb != NULL); + outstanding_cb->key = key; + HASH_ADD_PTR(outstanding_cbs, key, outstanding_cb); +} + +outstanding_callback *outstanding_callbacks_find(table_callback_data *key) { + outstanding_callback *outstanding_cb = NULL; + + HASH_FIND_PTR(outstanding_cbs, &key, outstanding_cb); + return outstanding_cb; +} + +void outstanding_callbacks_remove(table_callback_data *key) { + outstanding_callback *outstanding_cb = NULL; + + outstanding_cb = outstanding_callbacks_find(key); + if (outstanding_cb != NULL) { + HASH_DEL(outstanding_cbs, outstanding_cb); + free(outstanding_cb); + } +} diff --git a/state/table.h b/state/table.h new file mode 100644 index 0000000..f3cd463 --- /dev/null +++ b/state/table.h @@ -0,0 +1,137 @@ +#ifndef TABLE_H +#define TABLE_H + +#include "uthash.h" +#include "stdbool.h" + +#include "common.h" +#include "db.h" + +typedef struct table_callback_data table_callback_data; + +typedef void * table_done_cb; + +/* The callback called when the database operation hasn't completed after + * the number of retries specified for the operation. */ +typedef void (*table_fail_cb)(unique_id id, void *user_context); + +typedef void (*table_retry_cb)(table_callback_data *cb_data); + +/** + * Data structure consolidating the retry related varaibles. + */ +typedef struct { + /** Number of retries left. */ + int count; + /** Timeout, in milliseconds. */ + uint64_t timeout; + /** The callback that will be called to initiate the next try. */ + table_retry_cb cb; +} retry_struct; + + +struct table_callback_data { + /** ID of the entry in the table that we are going to look up, remove or add. */ + unique_id id; + /** The callback that will be called when results is returned. */ + table_done_cb done_cb; + /** The callback that will be called when the redis command times out. */ + table_fail_cb fail_cb; + /** Retry structure containong the remaining number of retries, timeout and a point + * to the retry callback. + */ + retry_struct retry; + /** Pointer to the data that is entered into the table. */ + void *data; + /** Pointer to the data used internally to handle multiple database requests. */ + void *requests_info; + /** User context. */ + void *user_context; + /** Handle to db. */ + db_handle *db_handle; + /** Handle to timer. */ + int64_t timer_id; +}; + +/** + * Function to handle the timeout event. + * @param loop Event loop. + * @param timer_id Timer identifier. + * @param context Pointer to the callback data for the object table + * @return Timeout to reset the timer if we need to try again, or + * EVENT_LOOP_TIMER_DONE if retry_count == 0. + */ +int64_t table_timeout_handler(event_loop *loop, + int64_t timer_id, + void *context); + +/** + * + * @param db_handle Database handle. + * @param id ID of the object that is looked up, added or removed. + * @param data Data entered into the table. + * @param retry Retry relevant information: retry timeout, number of remaining retries, and retry callback. + * @param done_cb Function to be called when database returns result. + * @param fail_cb Function to be called when number of retries is exhausted. + * @param user_context Context that can be provided by the user and will be + * passed on to the various callbacks. + * @return New table callback data struct. + */ +table_callback_data *init_table_callback(db_handle *db_handle, + unique_id id, + void *data, + retry_struct *retry, + table_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context); + + +void destroy_table_callback(table_callback_data *cb_data); + +/** + * Hash table maintaining the outstanding callbacks. + * + * This hash table is used to handle the following case: + * - a table command is issued with an associated callback and a callback data structure; + * - the last timeout associated to this command expires, as a result the callback data structure is freed; + * - a reply arrives, but now the callback data structure is gone, so we have to ignore this reply; + * + * This hash table enables us to ignore such replies. The operations on the hash table are as follows. + * + * When we issue a table command we add a new entry to the hash table that is keyed by the address of the callback's + * data structure. + * + * When we receive the reply, we check whether the callback still exists in this hash table, and if not we just ignore + * the reply. + * + * When the last timeout associated to the command expires we remove the entry associated to the collback. + */ +typedef struct { + table_callback_data *key; + int dummy; + UT_hash_handle hh; /* makes this structure hashable */ +} outstanding_callback; + +/** + * + * @param key The pointer to the data structure of the callback we want to insert. + * @return None. + */ +void outstanding_callbacks_add(table_callback_data *key); + +/** + * + * @param key The pointer to the data structure of the callback we are looking for. + * @return Returns callback if found, NULL otherwise. + */ +outstanding_callback *outstanding_callbacks_find(table_callback_data *key); + +/** + * + * @param key Key, defined as the pointer to the data structure of the callback we want to remove. + * @return None. + */ +void outstanding_callbacks_remove(table_callback_data *key); + + +#endif /* TABLE_H */ diff --git a/state/task_log.c b/state/task_log.c new file mode 100644 index 0000000..be68dbc --- /dev/null +++ b/state/task_log.c @@ -0,0 +1,52 @@ +#include "task_log.h" +#include "redis.h" + +#define NUM_DB_REQUESTS 2 + +void task_log_publish(db_handle *db_handle, + task_instance *task_instance, + int retry_count, + uint64_t timeout, + task_log_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context) { + retry_struct retry; + + retry.count = retry_count; + retry.timeout = timeout; + retry.cb = redis_task_log_publish; + + table_callback_data *cb_data = + init_table_callback(db_handle, *task_instance_id(task_instance), task_instance, + &retry, done_cb, fail_cb, user_context); + redis_task_log_publish(cb_data); +} + +void task_log_subscribe(db_handle *db_handle, + node_id node, + int32_t state_filter, + task_log_subscribe_cb subscribe_cb, + void *subscribe_context, + int retry_count, + uint64_t timeout, + task_log_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context) { + + task_log_subscribe_data *sub_data = malloc(sizeof(task_log_subscribe_data)); + utarray_push_back(db_handle->callback_freelist, &sub_data); + sub_data->node = node; + sub_data->state_filter = state_filter; + sub_data->subscribe_cb = subscribe_cb; + sub_data->subscribe_context = subscribe_context; + + retry_struct retry; + + retry.count = retry_count; + retry.timeout = timeout; + retry.cb = redis_task_log_subscribe; + + table_callback_data *cb_data = + init_table_callback(db_handle, node, sub_data, &retry, done_cb, fail_cb, user_context); + redis_task_log_subscribe(cb_data); +} \ No newline at end of file diff --git a/state/task_log.h b/state/task_log.h index acf5dbc..cd6a973 100644 --- a/state/task_log.h +++ b/state/task_log.h @@ -2,6 +2,7 @@ #define TASK_LOG_H #include "db.h" +#include "table.h" #include "task.h" /* The task log is a message bus that is used for all communication between @@ -15,27 +16,77 @@ * 5) local scheduler writes it when a task finishes execution; * 6) global scheduler reads it to get the tasks that have finished; */ +/* Callback called when the task log operation completes. */ +typedef void (*task_log_done_cb)(task_iid task_iid, void *user_context); + +/* + * ==== Publish the task log ==== + */ + +/** Add or update a task instance to the task log. + * @param db_handle Database handle. + * @param retry_count Number of retries to the database before giving up. + * @param timeout Timout between retries (in milliseconds). + * @param done_cb Function to be called when database returns result. + * @param fail_cb Function to be called if we failed to contact + * database after retry_count retries. + * @param user_context Data that will be passed to done_cb and fail_cb. + * @return Void. + */ +void task_log_publish(db_handle *db_handle, + task_instance *task_instance, + int retry_count, + uint64_t timeout, + task_log_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context); + + +/* + * ==== Subscribing to the task log ==== + */ + /* Callback for subscribing to the task log. */ -typedef void (*task_log_callback)(task_instance *task_instance, void *userdata); - -/* Initially add a task instance to the task log. */ -void task_log_add_task(db_handle *db, task_instance *task_instance); - -/* Update task instance in the task log. */ -void task_log_update_task(db_handle *db, - task_iid task_iid, - int32_t state, - node_id node); - -/* Register callback for a certain event. The node specifies the node whose - * events we want to listen to. If you want to listen to all events for this - * node, use state_filter = - * TASK_WAITING | TASK_SCHEDULED | TASK_RUNNING | TASK_DONE. - * If you want to register to updates from all nodes, set node = NIL_ID. */ -void task_log_register_callback(db_handle *db, - task_log_callback callback, - node_id node, - int32_t state_filter, - void *userdata); +typedef void (*task_log_subscribe_cb)(task_instance *task_instance, + void *user_context); + +/** Register callback for a certain event. + * + * @param db_handle Database handle. + * @param subscribe_cb Callback that will be called when the task log is + updated. + * @param subscribe_context Context that will be passed into the subscribe_cb. + * @param node Node whose events we want to listen to. If you want to register + * to updates from all nodes, set node = NIL_ID. + * @param state_filter Flags for events we want to listen to. If you want + * to listen to all events, use state_filter = TASK_WAITING | + * TASK_SCHEDULED | TASK_RUNNING | TASK_DONE. + * @param retry_count Number of retries to the database before giving up. + * @param timeout Timout between retries (in milliseconds). + * @param done_cb Function to be called when database returns result. + * @param fail_cb Function to be called if we failed to contact + * database after retry_count retries. + * @param user_context Data that will be passed to done_cb and fail_cb. + * @return Void. + */ +void task_log_subscribe(db_handle *db_handle, + node_id node, + int32_t state_filter, + task_log_subscribe_cb subscribe_cb, + void *subscribe_context, + int retry_count, + uint64_t timeout, + task_log_done_cb done_cb, + table_fail_cb fail_cb, + void *user_context); + +/* Data that is needed to register task log subscribe callbacks with the state + * database. */ +typedef struct { + node_id node; + int32_t state_filter; + task_log_subscribe_cb subscribe_cb; + void *subscribe_context; +} task_log_subscribe_data; #endif /* TASK_LOG_H */ diff --git a/task.h b/task.h index 782bf68..8d818e3 100644 --- a/task.h +++ b/task.h @@ -27,9 +27,9 @@ typedef unique_id task_iid; typedef unique_id node_id; /* - * TASK SPECIFICATIONS: Contain all the information neccessary - * to execute the task (function id, arguments, return object ids). - * + * ==== Task specifications ==== + * Contain all the information neccessary to execute the + * task (function id, arguments, return object ids). */ typedef struct task_spec_impl task_spec; @@ -83,11 +83,10 @@ task_spec *read_task(int fd); void print_task(task_spec *spec, UT_string *output); /* - * SCHEDULED TASK: Contains information about a scheduled task: - * the task iid, the task specification and the task status - * (WAITING, SCHEDULED, RUNNING, DONE) and which node the - * task is scheduled on. - * + * ==== Task instance ==== + * Contains information about a scheduled task: The task iid, + * the task specification and the task status (WAITING, SCHEDULED, + * RUNNING, DONE) and which node the task is scheduled on. */ /* The scheduling_state can be used as a flag when we are listening @@ -129,4 +128,14 @@ task_spec *task_instance_task_spec(task_instance *instance); /* Free this task instance datastructure. */ void task_instance_free(task_instance *instance); +/* + * ==== Task update ==== + * Contains the information necessary to update a task in the task log. + */ + +typedef struct { + int32_t state; + node_id node; +} task_update; + #endif diff --git a/test/db_tests.c b/test/db_tests.c index 74dd9a8..8d1c3f2 100644 --- a/test/db_tests.c +++ b/test/db_tests.c @@ -14,6 +14,10 @@ SUITE(db_tests); +/* Retry 10 times with an 100ms timeout. */ +const int NUM_RETRIES = 10; +const uint64_t TIMEOUT = 50; + const char *manager_addr = "127.0.0.1"; int manager_port1 = 12345; int manager_port2 = 12346; @@ -22,11 +26,16 @@ char received_port1[6] = {0}; char received_addr2[16] = {0}; char received_port2[6] = {0}; +typedef struct { int test_number; } user_context; + +const int TEST_NUMBER = 10; + /* Test if entries have been written to the database. */ -void test_callback(object_id object_id, - int manager_count, - const char *manager_vector[], - void *context) { + +void lookup_done_cb(object_id object_id, + int manager_count, + const char *manager_vector[], + void *user_context) { CHECK(manager_count == 2); if (!manager_vector[0] || sscanf(manager_vector[0], "%15[0-9.]:%5[0-9]", received_addr1, @@ -41,7 +50,17 @@ void test_callback(object_id object_id, free(manager_vector); } -int timeout_handler(event_loop *loop, timer_id timer_id, void *context) { +/* Entry added to database successfully. */ +void add_done_cb(object_id object_id, void *user_context) { +} + +/* Test if we got a timeout callback if we couldn't connect database. */ +void timeout_cb(object_id object_id, void *context) { + user_context *uc = (user_context *) context; + CHECK(uc->test_number == TEST_NUMBER) +} + +int64_t timeout_handler(event_loop *loop, int64_t id, void *context) { event_loop_stop(loop); return EVENT_LOOP_TIMER_DONE; } @@ -55,12 +74,17 @@ TEST object_table_lookup_test(void) { db_attach(db1, loop); db_attach(db2, loop); unique_id id = globally_unique_id(); - object_table_add(db1, id); - object_table_add(db2, id); - event_loop_add_timer(loop, 100, timeout_handler, NULL); + object_table_add(db1, id, NUM_RETRIES, TIMEOUT, add_done_cb, timeout_cb, + NULL); + object_table_add(db2, id, NUM_RETRIES, TIMEOUT, add_done_cb, timeout_cb, + NULL); + event_loop_add_timer(loop, 200, timeout_handler, NULL); event_loop_run(loop); - object_table_lookup(db1, id, test_callback, NULL); - event_loop_add_timer(loop, 100, timeout_handler, NULL); + user_context user_context; + user_context.test_number = TEST_NUMBER; + object_table_lookup(db1, id, NUM_RETRIES, TIMEOUT, lookup_done_cb, timeout_cb, + NULL); + event_loop_add_timer(loop, 200, timeout_handler, NULL); event_loop_run(loop); int port1 = atoi(received_port1); int port2 = atoi(received_port2); @@ -90,10 +114,10 @@ TEST task_log_test(void) { task_spec *task = example_task(); task_instance *instance = make_task_instance(globally_unique_id(), task, TASK_STATUS_SCHEDULED, node); - task_log_register_callback(db, task_log_test_callback, node, - TASK_STATUS_SCHEDULED, instance); - task_log_add_task(db, instance); - event_loop_add_timer(loop, 100, timeout_handler, NULL); + task_log_subscribe(db, node, TASK_STATUS_SCHEDULED, task_log_test_callback, + instance, NUM_RETRIES, TIMEOUT, NULL, NULL, NULL); + task_log_publish(db, instance, NUM_RETRIES, TIMEOUT, NULL, NULL, NULL); + event_loop_add_timer(loop, 200, timeout_handler, NULL); event_loop_run(loop); task_instance_free(instance); free_task_spec(task); @@ -118,17 +142,22 @@ TEST task_log_all_test(void) { globally_unique_id(), task, TASK_STATUS_SCHEDULED, globally_unique_id()); task_instance *instance2 = make_task_instance( globally_unique_id(), task, TASK_STATUS_SCHEDULED, globally_unique_id()); - task_log_register_callback(db, task_log_all_test_callback, NIL_ID, - TASK_STATUS_SCHEDULED, NULL); - task_log_add_task(db, instance1); - task_log_add_task(db, instance2); - event_loop_add_timer(loop, 100, timeout_handler, NULL); + task_log_subscribe(db, NIL_ID, TASK_STATUS_SCHEDULED, + task_log_all_test_callback, NULL, NUM_RETRIES, TIMEOUT, + NULL, NULL, NULL); + event_loop_add_timer(loop, 50, timeout_handler, NULL); + event_loop_run(loop); + /* TODO(pcm): Get rid of this sleep once the robust pubsub is implemented. */ + task_log_publish(db, instance1, NUM_RETRIES, TIMEOUT, NULL, NULL, NULL); + task_log_publish(db, instance2, NUM_RETRIES, TIMEOUT, NULL, NULL, NULL); + event_loop_add_timer(loop, 200, timeout_handler, NULL); event_loop_run(loop); task_instance_free(instance2); task_instance_free(instance1); free_task_spec(task); db_disconnect(db); event_loop_destroy(loop); + printf("num_test_callback_called = %d\n", num_test_callback_called); ASSERT(num_test_callback_called == 2); PASS(); } diff --git a/test/example_task.h b/test/example_task.h index 0dddc4d..f471ff4 100644 --- a/test/example_task.h +++ b/test/example_task.h @@ -11,4 +11,12 @@ task_spec *example_task(void) { return task; } +task_instance *example_task_instance(void) { + task_iid iid = globally_unique_id(); + task_spec *spec = example_task(); + task_instance *instance = make_task_instance(iid, spec, TASK_STATUS_WAITING, NIL_ID); + free_task_spec(spec); + return instance; +} + #endif diff --git a/test/object_table_tests.c b/test/object_table_tests.c new file mode 100644 index 0000000..5712d9a --- /dev/null +++ b/test/object_table_tests.c @@ -0,0 +1,439 @@ +#include "greatest.h" + +#include "event_loop.h" +#include "example_task.h" +#include "common.h" +#include "state/object_table.h" +#include "state/redis.h" + +#include + +SUITE(object_table_tests); + +static event_loop *g_loop; + +/* ==== Test if operations time out correctly ==== */ + +/* === Test lookup timeout === */ + +const char *lookup_timeout_context = "lookup_timeout"; +int lookup_failed = 0; + +void lookup_done_cb(object_id object_id, + int manager_count, + OWNER const char *manager_vector[], + void *context) { + /* The done callback should not be called. */ + CHECK(0); +} + +void lookup_fail_cb(unique_id id, void *user_data) { + lookup_failed = 1; + CHECK(user_data == (void *) lookup_timeout_context); + event_loop_stop(g_loop); +} + +TEST lookup_timeout_test(void) { + g_loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); + db_attach(db, g_loop); + object_table_lookup(db, NIL_ID, 5, 100, lookup_done_cb, lookup_fail_cb, + (void *) lookup_timeout_context); + /* Disconnect the database to see if the lookup times out. */ + close(db->context->c.fd); + event_loop_run(g_loop); + db_disconnect(db); + event_loop_destroy(g_loop); + CHECK(lookup_failed); + PASS(); +} + +/* === Test add timeout === */ + +const char *add_timeout_context = "add_timeout"; +int add_failed = 0; + +void add_done_cb(object_id object_id, void *user_context) { + /* The done callback should not be called. */ + CHECK(0); +} + +void add_fail_cb(unique_id id, void *user_data) { + add_failed = 1; + CHECK(user_data == (void *) add_timeout_context); + event_loop_stop(g_loop); +} + +TEST add_timeout_test(void) { + g_loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); + db_attach(db, g_loop); + object_table_add(db, NIL_ID, 5, 100, add_done_cb, add_fail_cb, + (void *) add_timeout_context); + /* Disconnect the database to see if the lookup times out. */ + close(db->context->c.fd); + event_loop_run(g_loop); + db_disconnect(db); + event_loop_destroy(g_loop); + CHECK(add_failed); + PASS(); +} + +/* === Test subscribe timeout === */ + +const char *subscribe_timeout_context = "subscribe_timeout"; +int subscribe_failed = 0; + +void subscribe_done_cb(object_id object_id, void *user_context) { + /* The done callback should not be called. */ + CHECK(0); +} + +void subscribe_fail_cb(unique_id id, void *user_data) { + subscribe_failed = 1; + CHECK(user_data == (void *) subscribe_timeout_context); + event_loop_stop(g_loop); +} + +TEST subscribe_timeout_test(void) { + g_loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); + db_attach(db, g_loop); + object_table_subscribe(db, NIL_ID, NULL, NULL, 5, 100, subscribe_done_cb, + subscribe_fail_cb, (void *) subscribe_timeout_context); + /* Disconnect the database to see if the lookup times out. */ + close(db->sub_context->c.fd); + event_loop_run(g_loop); + db_disconnect(db); + event_loop_destroy(g_loop); + CHECK(subscribe_failed); + PASS(); +} + +/* ==== Test if the retry is working correctly ==== */ + +int64_t reconnect_context_cb(event_loop *loop, + int64_t timer_id, + void *context) { + db_handle *db = context; + /* Reconnect to redis. This is not reconnecting the pub/sub channel. */ + redisAsyncFree(db->context); + redisFree(db->sync_context); + db->context = redisAsyncConnect("127.0.0.1", 6379); + db->context->data = (void *) db; + db->sync_context = redisConnect("127.0.0.1", 6379); + /* Re-attach the database to the event loop (the file descriptor changed). */ + db_attach(db, loop); + return EVENT_LOOP_TIMER_DONE; +} + +int64_t terminate_event_loop_cb(event_loop *loop, + int64_t timer_id, + void *context) { + event_loop_stop(loop); + return EVENT_LOOP_TIMER_DONE; +} + +/* === Test lookup retry === */ + +const char *lookup_retry_context = "lookup_retry"; +int lookup_retry_succeeded = 0; + +void lookup_retry_done_cb(object_id object_id, + int manager_count, + OWNER const char *manager_vector[], + void *context) { + CHECK(context == (void *) lookup_retry_context); + lookup_retry_succeeded = 1; + free(manager_vector); +} + +void lookup_retry_fail_cb(unique_id id, void *user_data) { + /* The fail callback should not be called. */ + CHECK(0); +} + +TEST lookup_retry_test(void) { + g_loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235); + db_attach(db, g_loop); + object_table_lookup(db, NIL_ID, 5, 100, lookup_retry_done_cb, + lookup_retry_fail_cb, (void *) lookup_retry_context); + /* Disconnect the database to let the lookup time out the first time. */ + close(db->context->c.fd); + /* Install handler for reconnecting the database. */ + event_loop_add_timer(g_loop, 150, reconnect_context_cb, db); + /* Install handler for terminating the event loop. */ + event_loop_add_timer(g_loop, 750, terminate_event_loop_cb, NULL); + event_loop_run(g_loop); + db_disconnect(db); + event_loop_destroy(g_loop); + CHECK(lookup_retry_succeeded); + PASS(); +} + +/* === Test add retry === */ + +const char *add_retry_context = "add_retry"; +int add_retry_succeeded = 0; + +void add_retry_done_cb(object_id object_id, void *user_context) { + CHECK(user_context == (void *) add_retry_context); + add_retry_succeeded = 1; +} + +void add_retry_fail_cb(unique_id id, void *user_data) { + /* The fail callback should not be called. */ + CHECK(0); +} + +TEST add_retry_test(void) { + g_loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235); + db_attach(db, g_loop); + object_table_add(db, NIL_ID, 5, 100, add_retry_done_cb, add_retry_fail_cb, + (void *) add_retry_context); + /* Disconnect the database to let the add time out the first time. */ + close(db->context->c.fd); + /* Install handler for reconnecting the database. */ + event_loop_add_timer(g_loop, 150, reconnect_context_cb, db); + /* Install handler for terminating the event loop. */ + event_loop_add_timer(g_loop, 750, terminate_event_loop_cb, NULL); + event_loop_run(g_loop); + db_disconnect(db); + event_loop_destroy(g_loop); + CHECK(add_retry_succeeded); + PASS(); +} + +/* === Test subscribe retry === */ + +const char *subscribe_retry_context = "subscribe_retry"; +int subscribe_retry_succeeded = 0; + +int64_t reconnect_sub_context_cb(event_loop *loop, + int64_t timer_id, + void *context) { + db_handle *db = context; + /* Reconnect to redis. This is not reconnecting the pub/sub channel. */ + redisAsyncFree(db->sub_context); + redisFree(db->sync_context); + db->sub_context = redisAsyncConnect("127.0.0.1", 6379); + db->sub_context->data = (void *) db; + db->sync_context = redisConnect("127.0.0.1", 6379); + /* Re-attach the database to the event loop (the file descriptor changed). */ + db_attach(db, loop); + return EVENT_LOOP_TIMER_DONE; +} + +void subscribe_retry_done_cb(object_id object_id, void *user_context) { + CHECK(user_context == (void *) subscribe_retry_context); + subscribe_retry_succeeded = 1; +} + +void subscribe_retry_fail_cb(unique_id id, void *user_data) { + /* The fail callback should not be called. */ + CHECK(0); +} + +TEST subscribe_retry_test(void) { + g_loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235); + db_attach(db, g_loop); + object_table_subscribe(db, NIL_ID, NULL, NULL, 5, 100, + subscribe_retry_done_cb, subscribe_retry_fail_cb, + (void *) subscribe_retry_context); + /* Disconnect the database to let the subscribe times out the first time. */ + close(db->sub_context->c.fd); + /* Install handler for reconnecting the database. */ + event_loop_add_timer(g_loop, 150, reconnect_sub_context_cb, db); + /* Install handler for terminating the event loop. */ + event_loop_add_timer(g_loop, 750, terminate_event_loop_cb, NULL); + event_loop_run(g_loop); + db_disconnect(db); + event_loop_destroy(g_loop); + CHECK(subscribe_retry_succeeded); + PASS(); +} + +/* ==== Test if late succeed is working correctly ==== */ + +/* === Test lookup late succeed === */ + +const char *lookup_late_context = "lookup_late"; +int lookup_late_failed = 0; + +void lookup_late_fail_cb(unique_id id, void *user_context) { + CHECK(user_context == (void *) lookup_late_context); + lookup_late_failed = 1; +} + +void lookup_late_done_cb(object_id object_id, + int manager_count, + OWNER const char *manager_vector[], + void *context) { + /* This function should never be called. */ + CHECK(0); +} + +TEST lookup_late_test(void) { + g_loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); + db_attach(db, g_loop); + object_table_lookup(db, NIL_ID, 0, 0, lookup_late_done_cb, + lookup_late_fail_cb, (void *) lookup_late_context); + /* Install handler for terminating the event loop. */ + event_loop_add_timer(g_loop, 750, terminate_event_loop_cb, NULL); + /* First process timer events to make sure the timeout is processed before + * anything else. */ + aeProcessEvents(g_loop, AE_TIME_EVENTS); + event_loop_run(g_loop); + db_disconnect(db); + event_loop_destroy(g_loop); + CHECK(lookup_late_failed); + PASS(); +} + +/* === Test add late succeed === */ + +const char *add_late_context = "add_late"; +int add_late_failed = 0; + +void add_late_fail_cb(unique_id id, void *user_context) { + CHECK(user_context == (void *) add_late_context); + add_late_failed = 1; +} + +void add_late_done_cb(object_id object_id, void *user_context) { + /* This function should never be called. */ + CHECK(0); +} + +TEST add_late_test(void) { + g_loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); + db_attach(db, g_loop); + object_table_add(db, NIL_ID, 0, 0, add_late_done_cb, add_late_fail_cb, + (void *) add_late_context); + /* Install handler for terminating the event loop. */ + event_loop_add_timer(g_loop, 750, terminate_event_loop_cb, NULL); + /* First process timer events to make sure the timeout is processed before + * anything else. */ + aeProcessEvents(g_loop, AE_TIME_EVENTS); + event_loop_run(g_loop); + db_disconnect(db); + event_loop_destroy(g_loop); + CHECK(add_late_failed); + PASS(); +} + +/* === Test subscribe late succeed === */ + +const char *subscribe_late_context = "subscribe_late"; +int subscribe_late_failed = 0; + +void subscribe_late_fail_cb(unique_id id, void *user_context) { + CHECK(user_context == (void *) subscribe_late_context); + subscribe_late_failed = 1; +} + +void subscribe_late_done_cb(object_id object_id, void *user_context) { + /* This function should never be called. */ + CHECK(0); +} + +TEST subscribe_late_test(void) { + g_loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); + db_attach(db, g_loop); + object_table_subscribe(db, NIL_ID, NULL, NULL, 0, 0, subscribe_late_done_cb, + subscribe_late_fail_cb, + (void *) subscribe_late_context); + /* Install handler for terminating the event loop. */ + event_loop_add_timer(g_loop, 750, terminate_event_loop_cb, NULL); + /* First process timer events to make sure the timeout is processed before + * anything else. */ + aeProcessEvents(g_loop, AE_TIME_EVENTS); + event_loop_run(g_loop); + db_disconnect(db); + event_loop_destroy(g_loop); + CHECK(subscribe_late_failed); + PASS(); +} + +/* === Test subscribe object available succeed === */ + +const char *subscribe_success_context = "subscribe_success"; +int subscribe_success_done = 0; +int subscribe_success_succeeded = 0; + +void subscribe_success_fail_cb(unique_id id, void *user_context) { + /* This function should never be called. */ + CHECK(0); +} + +void subscribe_success_done_cb(object_id object_id, void *user_context) { + object_table_add((db_handle *) user_context, object_id, 0, 0, NULL, NULL, + NULL); + subscribe_success_done = 1; +} + +void subscribe_success_object_available_cb(object_id object_id, + void *user_context) { + CHECK(user_context == (void *) subscribe_success_context); + subscribe_success_succeeded = 1; +} + +TEST subscribe_success_test(void) { + g_loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); + db_attach(db, g_loop); + unique_id id = globally_unique_id(); + + object_table_subscribe(db, id, subscribe_success_object_available_cb, + (void *) subscribe_success_context, 0, 100, + subscribe_success_done_cb, subscribe_success_fail_cb, + (void *) db); + + /* Install handler for terminating the event loop. */ + event_loop_add_timer(g_loop, 750, terminate_event_loop_cb, NULL); + + event_loop_run(g_loop); + db_disconnect(db); + event_loop_destroy(g_loop); + + CHECK(subscribe_success_done); + CHECK(subscribe_success_succeeded); + PASS(); +} + +SUITE(object_table_tests) { + RUN_TEST(lookup_timeout_test); + RUN_TEST(add_timeout_test); + RUN_TEST(subscribe_timeout_test); + RUN_TEST(lookup_retry_test); + RUN_TEST(add_retry_test); + RUN_TEST(subscribe_retry_test); + RUN_TEST(lookup_late_test); + RUN_TEST(add_late_test); + RUN_TEST(subscribe_late_test); + RUN_TEST(subscribe_success_test); +} + +GREATEST_MAIN_DEFS(); + +int main(int argc, char **argv) { + GREATEST_MAIN_BEGIN(); + RUN_SUITE(object_table_tests); + GREATEST_MAIN_END(); +} diff --git a/test/task_log_tests.c b/test/task_log_tests.c new file mode 100644 index 0000000..a6bc8ee --- /dev/null +++ b/test/task_log_tests.c @@ -0,0 +1,271 @@ +#include "greatest.h" + +#include "event_loop.h" +#include "example_task.h" +#include "common.h" +#include "state/object_table.h" +#include "state/redis.h" + +#include +#include +// #include + +SUITE(task_log_tests); + +event_loop *loop; + +/* ==== Test if operations time out correctly ==== */ + +/* === Test subscribe timeout === */ + +const char *subscribe_timeout_context = "subscribe_timeout"; +int subscribe_failed = 0; + +void subscribe_done_cb(task_iid task_iid, void *user_context) { + /* The done callback should not be called. */ + CHECK(0); +} + +void subscribe_fail_cb(unique_id id, void *user_data) { + subscribe_failed = 1; + CHECK(user_data == (void *) subscribe_timeout_context); + event_loop_stop(loop); +} + +TEST subscribe_timeout_test(void) { + loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); + db_attach(db, loop); + task_log_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, 5, 100, + subscribe_done_cb, subscribe_fail_cb, + (void *) subscribe_timeout_context); + /* Disconnect the database to see if the subscribe times out. */ + close(db->sub_context->c.fd); + aeProcessEvents(loop, AE_TIME_EVENTS); + event_loop_run(loop); + db_disconnect(db); + event_loop_destroy(loop); + CHECK(subscribe_failed); + PASS(); +} + +/* === Test publish timeout === */ + +const char *publish_timeout_context = "publish_timeout"; +const int publish_test_number = 272; +int publish_failed = 0; + +void publish_done_cb(task_iid task_iid, void *user_context) { + /* The done callback should not be called. */ + CHECK(0); +} + +void publish_fail_cb(unique_id id, void *user_data) { + publish_failed = 1; + CHECK(user_data == (void *) publish_timeout_context); + event_loop_stop(loop); +} + +TEST publish_timeout_test(void) { + loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); + db_attach(db, loop); + task_instance *task = example_task_instance(); + task_log_publish(db, task, 5, 100, publish_done_cb, publish_fail_cb, + (void *) publish_timeout_context); + /* Disconnect the database to see if the publish times out. */ + close(db->context->c.fd); + aeProcessEvents(loop, AE_TIME_EVENTS); + event_loop_run(loop); + db_disconnect(db); + event_loop_destroy(loop); + CHECK(publish_failed); + task_instance_free(task); + PASS(); +} + +/* ==== Test if the retry is working correctly ==== */ + +int64_t reconnect_db_cb(event_loop *loop, int64_t timer_id, void *context) { + db_handle *db = context; + /* Reconnect to redis. */ + redisAsyncFree(db->sub_context); + db->sub_context = redisAsyncConnect("127.0.0.1", 6379); + db->sub_context->data = (void *) db; + /* Re-attach the database to the event loop (the file descriptor changed). */ + db_attach(db, loop); + return EVENT_LOOP_TIMER_DONE; +} + +int64_t terminate_event_loop_cb(event_loop *loop, + int64_t timer_id, + void *context) { + event_loop_stop(loop); + return EVENT_LOOP_TIMER_DONE; +} + +/* === Test subscribe retry === */ + +const char *subscribe_retry_context = "subscribe_retry"; +const int subscribe_retry_test_number = 273; +int subscribe_retry_succeeded = 0; + +void subscribe_retry_done_cb(object_id object_id, void *user_context) { + CHECK(user_context == (void *) subscribe_retry_context); + subscribe_retry_succeeded = 1; +} + +void subscribe_retry_fail_cb(unique_id id, void *user_data) { + /* The fail callback should not be called. */ + CHECK(0); +} + +TEST subscribe_retry_test(void) { + loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235); + db_attach(db, loop); + task_log_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, 5, 100, + subscribe_retry_done_cb, subscribe_retry_fail_cb, + (void *) subscribe_retry_context); + /* Disconnect the database to see if the subscribe times out. */ + close(db->sub_context->c.fd); + /* Install handler for reconnecting the database. */ + event_loop_add_timer(loop, 150, reconnect_db_cb, db); + /* Install handler for terminating the event loop. */ + event_loop_add_timer(loop, 750, terminate_event_loop_cb, NULL); + event_loop_run(loop); + db_disconnect(db); + event_loop_destroy(loop); + CHECK(subscribe_retry_succeeded); + PASS(); +} + +/* === Test publish retry === */ + +const char *publish_retry_context = "publish_retry"; +int publish_retry_succeeded = 0; + +void publish_retry_done_cb(object_id object_id, void *user_context) { + CHECK(user_context == (void *) publish_retry_context); + publish_retry_succeeded = 1; +} + +void publish_retry_fail_cb(unique_id id, void *user_data) { + /* The fail callback should not be called. */ + CHECK(0); +} + +TEST publish_retry_test(void) { + loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235); + db_attach(db, loop); + task_instance *task = example_task_instance(); + task_log_publish(db, task, 5, 100, publish_retry_done_cb, + publish_retry_fail_cb, (void *) publish_retry_context); + /* Disconnect the database to see if the publish times out. */ + close(db->sub_context->c.fd); + /* Install handler for reconnecting the database. */ + event_loop_add_timer(loop, 150, reconnect_db_cb, db); + /* Install handler for terminating the event loop. */ + event_loop_add_timer(loop, 750, terminate_event_loop_cb, NULL); + event_loop_run(loop); + db_disconnect(db); + event_loop_destroy(loop); + CHECK(publish_retry_succeeded); + task_instance_free(task); + PASS(); +} + +/* ==== Test if late succeed is working correctly ==== */ + +/* === Test subscribe late succeed === */ + +const char *subscribe_late_context = "subscribe_late"; +int subscribe_late_failed = 0; + +void subscribe_late_fail_cb(unique_id id, void *user_context) { + CHECK(user_context == (void *) subscribe_late_context); + subscribe_late_failed = 1; +} + +void subscribe_late_done_cb(task_iid task_iid, void *user_context) { + /* This function should never be called. */ + CHECK(0); +} + +TEST subscribe_late_test(void) { + loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); + db_attach(db, loop); + task_log_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, 0, 0, + subscribe_late_done_cb, subscribe_late_fail_cb, + (void *) subscribe_late_context); + /* Install handler for terminating the event loop. */ + event_loop_add_timer(loop, 750, terminate_event_loop_cb, NULL); + /* First process timer events to make sure the timeout is processed before + * anything else. */ + aeProcessEvents(loop, AE_TIME_EVENTS); + event_loop_run(loop); + db_disconnect(db); + event_loop_destroy(loop); + CHECK(subscribe_late_failed); + PASS(); +} + +/* === Test publish late succeed === */ + +const char *publish_late_context = "publish_late"; +int publish_late_failed = 0; + +void publish_late_fail_cb(unique_id id, void *user_context) { + CHECK(user_context == (void *) publish_late_context); + publish_late_failed = 1; +} + +void publish_late_done_cb(task_iid task_iik, void *user_context) { + /* This function should never be called. */ + CHECK(0); +} + +TEST publish_late_test(void) { + loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); + db_attach(db, loop); + task_instance *task = example_task_instance(); + task_log_publish(db, task, 0, 0, publish_late_done_cb, publish_late_fail_cb, + (void *) publish_late_context); + /* Install handler for terminating the event loop. */ + event_loop_add_timer(loop, 750, terminate_event_loop_cb, NULL); + /* First process timer events to make sure the timeout is processed before + * anything else. */ + aeProcessEvents(loop, AE_TIME_EVENTS); + event_loop_run(loop); + db_disconnect(db); + event_loop_destroy(loop); + CHECK(publish_late_failed); + task_instance_free(task); + PASS(); +} + +SUITE(task_log_tests) { + RUN_TEST(subscribe_timeout_test); + RUN_TEST(publish_timeout_test); + RUN_TEST(subscribe_retry_test); + RUN_TEST(publish_retry_test); + RUN_TEST(subscribe_late_test); + RUN_TEST(publish_late_test); +} + +GREATEST_MAIN_DEFS(); + +int main(int argc, char **argv) { + GREATEST_MAIN_BEGIN(); + RUN_SUITE(task_log_tests); + GREATEST_MAIN_END(); +}