Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
592ff64
init
aglinxinyuan Feb 9, 2026
b95465d
update
aglinxinyuan Feb 11, 2026
f83041a
Merge branch 'main' into xinyuan-loop-feb
aglinxinyuan Feb 11, 2026
d9d0cd9
update
aglinxinyuan Feb 11, 2026
144ae29
update
aglinxinyuan Feb 11, 2026
7b13fef
update
aglinxinyuan Feb 11, 2026
bd5ac3a
update
aglinxinyuan Feb 11, 2026
19be0c1
update
aglinxinyuan Feb 11, 2026
706884f
update
aglinxinyuan Feb 11, 2026
21e6a41
update
aglinxinyuan Feb 11, 2026
24da3e3
update
aglinxinyuan Feb 11, 2026
2ba0fa4
update
aglinxinyuan Feb 11, 2026
a05ffd1
update
aglinxinyuan Feb 11, 2026
44fc0e7
update
aglinxinyuan Feb 11, 2026
846aac2
update
aglinxinyuan Feb 12, 2026
6be7dc5
update
aglinxinyuan Feb 12, 2026
d8338d1
update
aglinxinyuan Feb 12, 2026
36e517e
fix
aglinxinyuan Feb 13, 2026
a4bfbdb
fix
aglinxinyuan Feb 13, 2026
393faac
Merge branch 'main' into xinyuan-loop-feb
aglinxinyuan Feb 14, 2026
cbb2fc7
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Feb 14, 2026
084f602
update
aglinxinyuan Feb 15, 2026
ae0d4ed
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Feb 15, 2026
55d5cec
update
aglinxinyuan Feb 15, 2026
b8faf93
update
aglinxinyuan Feb 15, 2026
a53506a
update
aglinxinyuan Feb 15, 2026
d44a664
update
aglinxinyuan Feb 15, 2026
1cd48fd
update
aglinxinyuan Feb 15, 2026
e35a332
update
aglinxinyuan Feb 16, 2026
4d18d1d
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Feb 17, 2026
30a8562
update
aglinxinyuan Feb 24, 2026
b717fb0
update
aglinxinyuan Feb 24, 2026
da8d6ed
Merge remote-tracking branch 'origin/xinyuan-loop-feb' into xinyuan-l…
aglinxinyuan Feb 24, 2026
04fe614
update
aglinxinyuan Feb 27, 2026
160bc6d
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Feb 27, 2026
bd27031
update
aglinxinyuan Feb 28, 2026
99e0f86
update
aglinxinyuan Feb 28, 2026
53ae08b
update
aglinxinyuan Feb 28, 2026
8c7d53c
update
aglinxinyuan Feb 28, 2026
92ab10f
update
aglinxinyuan Mar 1, 2026
bc58566
update
aglinxinyuan Mar 1, 2026
c655856
update
aglinxinyuan Mar 1, 2026
0970a53
update
aglinxinyuan Mar 1, 2026
2b78e2c
update
aglinxinyuan Mar 1, 2026
55f288f
update
aglinxinyuan Mar 1, 2026
2ccde1e
update
aglinxinyuan Mar 1, 2026
9b0d14d
update
aglinxinyuan Mar 1, 2026
3a2d0b9
update
aglinxinyuan Mar 1, 2026
0cfcf2f
update
aglinxinyuan Mar 1, 2026
00e49a5
update
aglinxinyuan Mar 1, 2026
f71dbec
update
aglinxinyuan Mar 1, 2026
565ee71
update
aglinxinyuan Mar 1, 2026
aa444a0
update
aglinxinyuan Mar 2, 2026
2e7c72a
update
aglinxinyuan Mar 2, 2026
f8ce99f
update
aglinxinyuan Mar 2, 2026
fe7e071
update
aglinxinyuan Mar 2, 2026
ba1b50f
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Mar 4, 2026
08679f0
update
aglinxinyuan Mar 4, 2026
b18d9db
update
aglinxinyuan Mar 4, 2026
dac211a
update
aglinxinyuan Mar 5, 2026
43f2ca6
update
aglinxinyuan Mar 5, 2026
3ada4c2
update
aglinxinyuan Mar 8, 2026
be43608
update
aglinxinyuan Mar 8, 2026
f09c9e3
update
aglinxinyuan Mar 8, 2026
64e2dda
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Mar 8, 2026
fbd5403
update
aglinxinyuan Mar 9, 2026
1c74e3b
update
aglinxinyuan Mar 9, 2026
e3a4dad
init
aglinxinyuan Mar 9, 2026
2d86c45
fix
aglinxinyuan Mar 9, 2026
0c72e25
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Mar 15, 2026
b203774
update
aglinxinyuan Mar 15, 2026
4977837
update
aglinxinyuan Mar 17, 2026
b9092d0
update
aglinxinyuan Mar 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ message ControlRequest {
PortCompletedRequest portCompletedRequest = 9;
WorkerStateUpdatedRequest workerStateUpdatedRequest = 10;
LinkWorkersRequest linkWorkersRequest = 11;
IterationCompletedRequest iterationCompletedRequest = 12;

// request for worker
AddInputChannelRequest addInputChannelRequest = 50;
Expand All @@ -58,6 +59,7 @@ message ControlRequest {
PrepareCheckpointRequest prepareCheckpointRequest = 57;
QueryStatisticsRequest queryStatisticsRequest = 58;


// request for testing
Ping ping = 100;
Pong pong = 101;
Expand Down Expand Up @@ -278,4 +280,8 @@ enum StatisticsUpdateTarget {
message QueryStatisticsRequest{
repeated core.ActorVirtualIdentity filterByWorkers = 1;
StatisticsUpdateTarget updateTarget = 2;
}

message IterationCompletedRequest{
core.OperatorIdentity LoopStartId = 1 [(scalapb.field).no_box = true];
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ service ControllerService {
rpc PauseWorkflow(EmptyRequest) returns (EmptyReturn);
rpc WorkerStateUpdated(WorkerStateUpdatedRequest) returns (EmptyReturn);
rpc WorkerExecutionCompleted(EmptyRequest) returns (EmptyReturn);
rpc IterationCompleted(IterationCompletedRequest) returns (EmptyReturn);
rpc LinkWorkers(LinkWorkersRequest) returns (EmptyReturn);
rpc ControllerInitiateQueryStatistics(QueryStatisticsRequest) returns (EmptyReturn);
rpc RetryWorkflow(RetryWorkflowRequest) returns (EmptyReturn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,8 @@ def _process_data(self, table: Table) -> Iterator[Tuple]:
yield Tuple(
{name: field_accessor for name in table.column_names}, schema=schema
)

def get_input_state_uri(self):
return next(iter(self._input_port_mat_reader_runnables.values()))[
0
].uri.replace("/result", "/state")
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ def __init__(self, worker_id: str):
PortIdentity, typing.Tuple[Queue, PortStorageWriter, Thread]
] = dict()

self._storage_uris: typing.Dict[PortIdentity, str] = dict()

def is_missing_output_ports(self):
"""
This method is only used for ensuring correct region execution.
Expand Down Expand Up @@ -126,6 +128,7 @@ def set_up_port_storage_writer(self, port_id: PortIdentity, storage_uri: str):
Create a separate thread for saving output tuples of a port
to storage in batch.
"""
self._storage_uris[port_id] = storage_uri
document, _ = DocumentFactory.open_document(storage_uri)
buffered_item_writer = document.writer(str(get_worker_index(self.worker_id)))
writer_queue = Queue()
Expand Down Expand Up @@ -171,6 +174,21 @@ def save_tuple_to_storage_if_needed(self, tuple_: Tuple, port_id=None) -> None:
PortStorageWriterElement(data_tuple=tuple_)
)

def save_state_to_storage_if_needed(self, state: State, port_id=None) -> None:
if port_id is None:
uris = self._storage_uris.values()
elif port_id in self._storage_uris:
uris = [self._storage_uris[port_id]]
else:
return

for uri in uris:
writer = DocumentFactory.create_document(
uri.replace("/result", "/state"), state.schema
).writer(str(get_worker_index(self.worker_id)))
writer.put_one(Tuple(vars(state)))
writer.close()

def close_port_storage_writers(self) -> None:
"""
Flush the buffers of port storage writers and wait for all the
Expand Down
40 changes: 40 additions & 0 deletions amber/src/main/python/core/models/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,43 @@ def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike]
time, or None.
"""
yield


class LoopStartOperator(TableOperator):
def open(self) -> None:
pass

@abstractmethod
def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike]]:
yield

@overrides.final
def produce_state_on_finish(self, port: int) -> State:
from pickle import dumps

self.state["table"] = dumps(Table(self._TableOperator__table_data[port]))
return State().from_dict(self.state)

def close(self) -> None:
pass


class LoopEndOperator(TableOperator):
def open(self) -> None:
pass

@overrides.final
def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike]]:
yield table

def close(self) -> None:
pass

@abstractmethod
def condition(self) -> None:
pass

def loop_start_id(self) -> str:
del self.state["table"]
del self.state["output"]
return self.state["LoopStartId"]
22 changes: 21 additions & 1 deletion amber/src/main/python/core/models/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,28 @@
@dataclass
class State:
def __init__(
self, table: Optional[Table] = None, pass_to_all_downstream: bool = False
self, table: Optional[Table] = None, pass_to_all_downstream: bool = True
):
self.schema = Schema()
self.passToAllDownstream = pass_to_all_downstream
if table is not None:
self.__dict__.update(table.to_pandas().iloc[0].to_dict())
self.schema = Schema(table.schema)

@classmethod
def from_tuple(cls, tuple, schema):
obj = cls()
obj.__dict__.update(tuple.as_dict())
obj.schema = schema
return obj

@classmethod
def from_dict(cls, dictionary):
obj = cls()
for item in dictionary:
obj.add(item, dictionary[item])
return obj

def add(
self, key: str, value: any, value_type: Optional[AttributeType] = None
) -> None:
Expand All @@ -53,6 +67,12 @@ def to_table(self) -> Table:
schema=self.schema.as_arrow_schema(),
)

def to_dict(self) -> dict:
dictionary = self.__dict__
del dictionary["passToAllDownstream"]
del dictionary["schema"]
return dictionary

def __setattr__(self, key: str, value: any) -> None:
self.add(key, value)

Expand Down
1 change: 1 addition & 0 deletions amber/src/main/python/core/runnables/data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def process_state(self, state: State) -> None:
self._context.worker_id,
self._context.console_message_manager.print_buf,
):
self._switch_context()
self._set_output_state(executor.process_state(state, port_id))

except Exception as err:
Expand Down
52 changes: 42 additions & 10 deletions amber/src/main/python/core/runnables/main_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import threading
import time
import typing
import uuid

from loguru import logger
from overrides import overrides
from pampy import match
Expand All @@ -38,8 +40,10 @@
ECMElement,
InternalQueueElement,
)
from core.models.operator import LoopEndOperator, LoopStartOperator
from core.models.state import State
from core.runnables.data_processor import DataProcessor
from core.storage.document_factory import DocumentFactory
from core.util import StoppableQueueBlockingRunnable, get_one_of
from core.util.console_message.timestamp import current_time_in_local_timezone
from core.util.customized_queue.queue_base import QueueElement
Expand All @@ -48,6 +52,7 @@
PortIdentity,
ChannelIdentity,
EmbeddedControlMessageIdentity,
OperatorIdentity,
)
from proto.org.apache.texera.amber.engine.architecture.rpc import (
ConsoleMessage,
Expand All @@ -61,6 +66,7 @@
EmbeddedControlMessage,
AsyncRpcContext,
ControlRequest,
IterationCompletedRequest,
)
from proto.org.apache.texera.amber.engine.architecture.worker import (
WorkerState,
Expand Down Expand Up @@ -94,12 +100,26 @@ def complete(self) -> None:
"""
# flush the buffered console prints
self._check_and_report_console_messages(force_flush=True)
self.context.executor_manager.executor.close()
controller_interface = self._async_rpc_client.controller_stub()
executor = self.context.executor_manager.executor
if isinstance(executor, LoopEndOperator) and executor.condition():
controller_interface.iteration_completed(
IterationCompletedRequest(OperatorIdentity(executor.loop_start_id()))
)
uri = executor.state["LoopStartStateURI"]
del executor.state["LoopStartStateURI"]
del executor.state["LoopStartId"]
state = State.from_dict(executor.state)
writer = DocumentFactory.create_document(uri, state.schema).writer(
str(uuid.uuid4())
)
writer.put_one(Tuple(vars(state)))
writer.close()
executor.close()
# stop the data processing thread
self.data_processor.stop()
self.context.state_manager.transit_to(WorkerState.COMPLETED)
self.context.statistics_manager.update_total_execution_time(time.time_ns())
controller_interface = self._async_rpc_client.controller_stub()
controller_interface.worker_execution_completed(EmptyRequest())
self.context.close()

Expand Down Expand Up @@ -188,14 +208,26 @@ def process_input_state(self) -> None:
output_state = self.context.state_processing_manager.get_output_state()
self._switch_context()
if output_state is not None:
for to, batch in self.context.output_manager.emit_state(output_state):
self._output_queue.put(
DataElement(
tag=ChannelIdentity(
ActorVirtualIdentity(self.context.worker_id), to, False
),
payload=batch,
if isinstance(self.context.executor_manager.executor, LoopStartOperator):
output_state.add(
"LoopStartId",
self.context.worker_id.split("-", 1)[1].rsplit("-main-0", 1)[0],
)
output_state.add(
"LoopStartStateURI",
self.context.input_manager.get_input_state_uri(),
)
for to, batch in self.context.output_manager.emit_state(output_state):
self._output_queue.put(
DataElement(
tag=ChannelIdentity(
ActorVirtualIdentity(self.context.worker_id), to, False
),
payload=batch,
)
)
self.context.output_manager.save_state_to_storage_if_needed(
output_state
)

def process_tuple_with_udf(self) -> Iterator[Optional[Tuple]]:
Expand Down Expand Up @@ -329,7 +361,7 @@ def _process_ecm(self, ecm_element: ECMElement):

if ecm.ecm_type != EmbeddedControlMessageType.NO_ALIGNMENT:
self.context.pause_manager.resume(PauseType.ECM_PAUSE)

self._switch_context()
if self.context.tuple_processing_manager.current_internal_marker:
{
StartChannel: self._process_start_channel,
Expand Down
Loading
Loading