Skip to content
This repository was archived by the owner on Jan 12, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
CC = gcc
CFLAGS = -g -Wall --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -fPIC -I. -Ithirdparty -Ithirdparty/ae -Wno-typedef-redefinition -Werror
CFLAGS = -g -Wall -Wno-typedef-redefinition --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -fPIC -I. -Ithirdparty -Ithirdparty/ae
BUILD = build

all: hiredis $(BUILD)/libcommon.a

$(BUILD)/libcommon.a: event_loop.o common.o task.o io.o state/redis.o thirdparty/ae/ae.o
$(BUILD)/libcommon.a: event_loop.o common.o task.o io.o state/redis.o state/table.o state/object_table.o state/task_log.o thirdparty/ae/ae.o
ar rcs $@ $^

$(BUILD)/common_tests: test/common_tests.c $(BUILD)/libcommon.a
Expand All @@ -13,6 +13,12 @@ $(BUILD)/common_tests: test/common_tests.c $(BUILD)/libcommon.a
$(BUILD)/db_tests: hiredis test/db_tests.c $(BUILD)/libcommon.a
$(CC) -o $@ test/db_tests.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS)

$(BUILD)/object_table_tests: hiredis test/object_table_tests.c $(BUILD)/libcommon.a
$(CC) -o $@ test/object_table_tests.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS)

$(BUILD)/task_log_tests: hiredis test/task_log_tests.c $(BUILD)/libcommon.a
$(CC) -o $@ test/task_log_tests.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS)

$(BUILD)/io_tests: test/io_tests.c $(BUILD)/libcommon.a
$(CC) -o $@ $^ $(CFLAGS)

Expand All @@ -32,9 +38,9 @@ redis:
hiredis:
git submodule update --init --recursive -- "thirdparty/hiredis" ; cd thirdparty/hiredis ; make

test: hiredis redis $(BUILD)/common_tests $(BUILD)/db_tests $(BUILD)/io_tests $(BUILD)/task_tests $(BUILD)/redis_tests FORCE
test: hiredis redis $(BUILD)/common_tests $(BUILD)/task_log_tests $(BUILD)/object_table_tests $(BUILD)/db_tests $(BUILD)/io_tests $(BUILD)/task_tests $(BUILD)/redis_tests FORCE
./thirdparty/redis-3.2.3/src/redis-server &
sleep 1s ; ./build/common_tests ; ./build/db_tests ; ./build/io_tests ; ./build/task_tests ; ./build/redis_tests
sleep 1s ; ./build/common_tests ; ./build/db_tests ; ./build/task_log_tests ; ./build/object_table_tests ; ./build/io_tests ; ./build/task_tests ; ./build/redis_tests

valgrind: test
valgrind --leak-check=full --error-exitcode=1 ./build/common_tests
Expand Down
7 changes: 7 additions & 0 deletions common.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <inttypes.h>

#define RAY_COMMON_DEBUG

#ifndef RAY_COMMON_DEBUG
#define LOG_DEBUG(M, ...)
Expand Down Expand Up @@ -36,6 +39,10 @@
} \
} while (0);

/** This macro indicates that this pointer owns the data it is pointing to
* and is responsible for freeing it. */
#define OWNER

#define UNIQUE_ID_SIZE 20

/* Cleanup method for running tests with the greatest library.
Expand Down
9 changes: 4 additions & 5 deletions event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,14 @@ void event_loop_remove_file(event_loop *loop, int fd) {
}

int64_t event_loop_add_timer(event_loop *loop,
int64_t milliseconds,
int64_t timeout,
event_loop_timer_handler handler,
void *context) {
return aeCreateTimeEvent(loop, milliseconds, handler, context, NULL);
return aeCreateTimeEvent(loop, timeout, handler, context, NULL);
}

void event_loop_remove_timer(event_loop *loop, timer_id timer_id) {
int err = aeDeleteTimeEvent(loop, timer_id);
CHECK(err == AE_OK); /* timer id found? */
int event_loop_remove_timer(event_loop *loop, int64_t id) {
return aeDeleteTimeEvent(loop, id);
}

void event_loop_run(event_loop *loop) {
Expand Down
25 changes: 19 additions & 6 deletions event_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,29 @@ void event_loop_add_file(event_loop *loop,
/* Remove a registered file event handler from the event loop. */
void event_loop_remove_file(event_loop *loop, int fd);

/* Register a handler that will be called after a time slice of
* "milliseconds" milliseconds. Can specify a context that will be passed
* as an argument to the handler. Return the id of the time event. */
/** Register a handler that will be called after a time slice of
* "timeout" milliseconds.
*
* @param loop The event loop.
* @param timeout The timeout in milliseconds.
* @param handler The handler for the timeout.
* @param context User context that can be passed in and will be passed in
* as an argument for the timer handler.
* @return The ID of the timer.
*/
int64_t event_loop_add_timer(event_loop *loop,
int64_t milliseconds,
int64_t timeout,
event_loop_timer_handler handler,
void *context);

/* Remove a registered time event handler from the event loop. */
void event_loop_remove_timer(event_loop *loop, timer_id timer_id);
/**
* Remove a registered time event handler from the event loop.
*
* @param loop The event loop.
* @param timer_id The ID of the timer to be removed.
* @return Returns 0 if the removal was successful.
*/
int event_loop_remove_timer(event_loop *loop, int64_t timer_id);

/* Run the event loop. */
void event_loop_run(event_loop *loop);
Expand Down
2 changes: 1 addition & 1 deletion state/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

#include "event_loop.h"

typedef struct db_handle_impl db_handle;
typedef struct db_handle db_handle;

/* Connect to the global system store at address and port. Returns
* a handle to the database, which must be freed with db_disconnect
Expand Down
62 changes: 62 additions & 0 deletions state/object_table.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#include "object_table.h"
#include "redis.h"

void object_table_lookup(db_handle *db_handle,
object_id object_id,
int retry_count,
uint64_t timeout,
object_table_lookup_done_cb done_cb,
table_fail_cb fail_cb,
void *user_context) {
retry_struct retry;

retry.count = retry_count;
retry.timeout = timeout;
retry.cb = redis_object_table_lookup;

init_table_callback(db_handle, object_id, NULL, &retry, done_cb, fail_cb,
user_context);
}

void object_table_add(db_handle *db_handle,
object_id object_id,
int retry_count,
uint64_t timeout,
object_table_done_cb done_cb,
table_fail_cb fail_cb,
void *user_context) {
retry_struct retry;

retry.count = retry_count;
retry.timeout = timeout;
retry.cb = redis_object_table_add;

init_table_callback(db_handle, object_id, NULL, &retry, done_cb, fail_cb,
user_context);
}

void object_table_subscribe(
db_handle *db_handle,
object_id object_id,
object_table_object_available_cb object_available_cb,
void *subscribe_context,
int retry_count,
uint64_t timeout,
object_table_done_cb done_cb,
table_fail_cb fail_cb,
void *user_context) {
object_table_subscribe_data *sub_data =
malloc(sizeof(object_table_subscribe_data));
utarray_push_back(db_handle->callback_freelist, &sub_data);
sub_data->object_available_cb = object_available_cb;
sub_data->subscribe_context = subscribe_context;

retry_struct retry;

retry.count = retry_count;
retry.timeout = timeout;
retry.cb = redis_object_table_subscribe;

init_table_callback(db_handle, object_id, sub_data, &retry, done_cb, fail_cb,
user_context);
}
144 changes: 129 additions & 15 deletions state/object_table.h
Original file line number Diff line number Diff line change
@@ -1,25 +1,139 @@
#ifndef OBJECT_TABLE_H
#define OBJECT_TABLE_H

#include "common.h"
#include "table.h"
#include "db.h"

/* The callback that is called when the result of a lookup
* in the object table comes back. The callback should free
* the manager_vector array, but NOT the strings they are pointing to. */
typedef void (*lookup_callback)(object_id object_id,
int manager_count,
const char *manager_vector[],
void *context);
/*
* ==== Lookup call and callback ====
*/

/* Register a new object with the directory. */
/* TODO(pcm): Retry, print for each attempt. */
void object_table_add(db_handle *db, object_id object_id);
/* Callback called when the lookup completes. The callback should free
* the manager_vector array, but NOT the strings they are pointing to.
*/
typedef void (*object_table_lookup_done_cb)(object_id object_id,
int manager_count,
OWNER const char *manager_vector[],
void *user_context);

/* Remove object from the directory. */
void object_table_remove(db_handle *db,
/**
* Return the list of nodes storing object_id in their plasma stores.
*
* @param db_handle Handle to object_table database.
* @param object_id ID of the object being looked up.
* @param retry_count Number of retries to the database before giving up.
* @param timeout Timout between retries (in milliseconds).
* @param done_callback Function to be called when database returns result.
* @param fail_callback Function to be called if we failed to contact
* database after retry_count retries.
* @param user_context Context passed by the caller.
* @return Void.
*/
void object_table_lookup(db_handle *db_handle,
object_id object_id,
const char *manager);
int retry_count,
uint64_t timeout,
object_table_lookup_done_cb done_cb,
table_fail_cb fail_cb,
void *user_context);

/*
* ==== Add object call and callback ====
*/

/* Callback called when the object add/remove operation completes. */
typedef void (*object_table_done_cb)(object_id object_id, void *user_context);

/* Look up entry from the directory */
void object_table_lookup(db_handle *db,
/**
* Add the plasma manager that created the db_handle to the
* list of plasma managers that have the object_id.
*
* @param db_handle: Handle to db.
* @param object_id: Object unique identifier.
* @param retry_count: Number of retries after giving up.
* @param done_cb: Callback to be called when lookup completes.
* @param timeout_cb: Callback to be called when lookup timeouts.
* @param user_context: User context to be passed in the callbacks.
* @return Void.
*/
void object_table_add(db_handle *db_handle,
object_id object_id,
int retry_count,
uint64_t timeout,
object_table_done_cb done_cb,
table_fail_cb fail_cb,
void *user_context);

/*
* ==== Remove object call and callback ====
*/

/**
* Object remove function.
*
* @param db_handle: Handle to db.
* @param object_id: Object unique identifier.
* @param retry_count: Number of retries after giving up.
* @param timeout: Timeout after which we retry the lookup.
* @param done_cb: Callback to be called when lookup completes.
* @param fail_cb: Callback to be called when lookup timeouts.
* @param user_context: User context to be passed in the callbacks.
* @return Void.
*/
/*
void object_table_remove(db_handle *db,
object_id object_id,
lookup_callback callback,
void *context);
int retry_count,
uint64_t timeout,
object_table_done_cb done_cb,
table_fail_cb fail_cb,
void *user_context);
*/

/*
* ==== Subscribe to be announced when new object available ====
*/

/* Callback called when object object_id is available. */
typedef void (*object_table_object_available_cb)(object_id object_id,
void *user_context);

/**
* Subcribing to new object available function.
*
* @param db_handle: Handle to db.
* @param object_id: Object unique identifier.
* @param object_available_cb: callback to be called when new object becomes
* available
* @param subscribe_context: caller context which will be passed back in the
* object_available_cb
* @param retry_count: Number of retries after giving up.
* @param timeout: Timeout after which we retry to install subscription.
* @param done_cb: Callback to be called when subscription is installed.
* @param fail_cb: Callback to be called when subscription installation fails.
* @param user_context: User context to be passed in the callbacks.
* @return Void.
*/

void object_table_subscribe(
db_handle *db,
object_id object_id,
object_table_object_available_cb object_available_cb,
void *subscribe_context,
int retry_count,
uint64_t timeout,
object_table_done_cb done_cb,
table_fail_cb fail_cb,
void *user_context);

/* Data that is needed to register new object available callbacks with the state
* database. */
typedef struct {
object_table_object_available_cb object_available_cb;
void *subscribe_context;
} object_table_subscribe_data;

#endif /* OBJECT_TABLE_H */
Loading