diff --git a/src/main/java/teetime/framework/AbstractPipe.java b/src/main/java/teetime/framework/AbstractPipe.java index 24125539..0ca1261e 100644 --- a/src/main/java/teetime/framework/AbstractPipe.java +++ b/src/main/java/teetime/framework/AbstractPipe.java @@ -36,20 +36,20 @@ public abstract class AbstractPipe implements IPipe { private final InputPort targetPort; // FIXME each scheduler should set a default pipe scheduler by its own - private PipeScheduler scheduler = new PipeScheduler() { + private PipeScheduler scheduler = new PipeScheduler() { @Override - public void onElementAdded(final AbstractSynchedPipe pipe) { + public void onElementAdded(final AbstractSynchedPipe pipe) { // do nothing } @Override - public void onElementAdded(final AbstractUnsynchedPipe pipe) { + public void onElementAdded(final AbstractUnsynchedPipe pipe) { // do nothing } @Override - public void onElementNotAdded(final AbstractSynchedPipe pipe) { + public void onElementNotAdded(final AbstractSynchedPipe pipe) { // do nothing } }; diff --git a/src/main/java/teetime/framework/AbstractPort.java b/src/main/java/teetime/framework/AbstractPort.java index 2bfa1854..413aa3ea 100644 --- a/src/main/java/teetime/framework/AbstractPort.java +++ b/src/main/java/teetime/framework/AbstractPort.java @@ -26,7 +26,7 @@ */ public class AbstractPort { - protected static final Object TERMINATE_ELEMENT = new Object(); + protected final T TERMINATE_ELEMENT = null; protected IPipe pipe; /** diff --git a/src/main/java/teetime/framework/DivideAndConquerRecursivePipe.java b/src/main/java/teetime/framework/DivideAndConquerRecursivePipe.java index 6533c8ef..a62ac444 100644 --- a/src/main/java/teetime/framework/DivideAndConquerRecursivePipe.java +++ b/src/main/java/teetime/framework/DivideAndConquerRecursivePipe.java @@ -111,12 +111,12 @@ public void close() { } @Override - public boolean addNonBlocking(final Object element) { + public boolean addNonBlocking(final P element) { return outputPipe.addNonBlocking(element); } @Override - public void add(final Object element) { + public void add(final P element) { outputPipe.add(element); } @@ -131,7 +131,7 @@ public int size() { } @Override - public Object removeLast() { + public P removeLast() { return outputPipe.removeLast(); } diff --git a/src/main/java/teetime/framework/InstantiationPipe.java b/src/main/java/teetime/framework/InstantiationPipe.java index f44a1693..a5bc2308 100644 --- a/src/main/java/teetime/framework/InstantiationPipe.java +++ b/src/main/java/teetime/framework/InstantiationPipe.java @@ -72,7 +72,7 @@ public int size() { } @Override - public Object removeLast() { + public T removeLast() { throw new IllegalStateException(ERROR_MESSAGE); } diff --git a/src/main/java/teetime/framework/pipe/DummyPipe.java b/src/main/java/teetime/framework/pipe/DummyPipe.java index 684141fd..38a3196e 100644 --- a/src/main/java/teetime/framework/pipe/DummyPipe.java +++ b/src/main/java/teetime/framework/pipe/DummyPipe.java @@ -46,7 +46,7 @@ public boolean addNonBlocking(final Object element) { } @Override - public Object removeLast() { + public T removeLast() { return null; } @@ -61,12 +61,12 @@ public int size() { } @Override - public OutputPort getSourcePort() { + public OutputPort getSourcePort() { return null; } @Override - public InputPort getTargetPort() { + public InputPort getTargetPort() { return null; } diff --git a/src/main/java/teetime/framework/scheduling/globaltaskpool/BoundedMpMcSynchedPipe.java b/src/main/java/teetime/framework/scheduling/globaltaskpool/BoundedMpMcSynchedPipe.java index 2fc68d98..b4c09ebd 100644 --- a/src/main/java/teetime/framework/scheduling/globaltaskpool/BoundedMpMcSynchedPipe.java +++ b/src/main/java/teetime/framework/scheduling/globaltaskpool/BoundedMpMcSynchedPipe.java @@ -33,17 +33,17 @@ */ class BoundedMpMcSynchedPipe extends AbstractSynchedPipe implements IMonitorablePipe { - private final MpmcArrayQueue queue; + private final MpmcArrayQueue queue; private transient long lastProducerIndex, lastConsumerIndex; public BoundedMpMcSynchedPipe(final OutputPort sourcePort, final InputPort targetPort, final int requestedCapacity) { super(sourcePort, targetPort); - this.queue = new MpmcArrayQueue(requestedCapacity); + this.queue = new MpmcArrayQueue(requestedCapacity); } @Override - public void add(final Object element) { + public void add(final T element) { while (!this.queue.offer(element)) { getScheduler().onElementNotAdded(this); } @@ -51,7 +51,7 @@ public void add(final Object element) { } @Override - public boolean addNonBlocking(final Object element) { + public boolean addNonBlocking(final T element) { return this.queue.offer(element); } @@ -66,7 +66,7 @@ public int size() { } @Override - public Object removeLast() { + public T removeLast() { return this.queue.poll(); } diff --git a/src/main/java/teetime/framework/scheduling/globaltaskpool/GlobalTaskPoolScheduling.java b/src/main/java/teetime/framework/scheduling/globaltaskpool/GlobalTaskPoolScheduling.java index 03f6416e..d95a3f5a 100644 --- a/src/main/java/teetime/framework/scheduling/globaltaskpool/GlobalTaskPoolScheduling.java +++ b/src/main/java/teetime/framework/scheduling/globaltaskpool/GlobalTaskPoolScheduling.java @@ -45,7 +45,7 @@ * @since 3.0 * */ -public class GlobalTaskPoolScheduling implements TeeTimeScheduler, PipeScheduler, UncaughtExceptionHandler { +public class GlobalTaskPoolScheduling implements TeeTimeScheduler, PipeScheduler, UncaughtExceptionHandler { private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTaskPoolScheduling.class); private static final StageFacade STAGE_FACADE = StageFacade.INSTANCE; @@ -320,13 +320,13 @@ private void setScheduler(final AbstractStage stage) { } @Override - public void onElementAdded(final AbstractUnsynchedPipe pipe) { + public void onElementAdded(final AbstractUnsynchedPipe pipe) { String message = String.format("This scheduler does not allow unsynched pipes: %s", pipe); throw new IllegalStateException(message); } @Override - public void onElementAdded(final AbstractSynchedPipe pipe) { + public void onElementAdded(final AbstractSynchedPipe pipe) { BoundedMpMcSynchedPipe castedPipe = (BoundedMpMcSynchedPipe) pipe; long numPushes = castedPipe.getNumPushesSinceAppStart(); long lastNumPushes = castedPipe.getLastProducerIndex(); @@ -345,7 +345,7 @@ public void onElementAdded(final AbstractSynchedPipe pipe) { } @Override - public void onElementNotAdded(final AbstractSynchedPipe pipe) { + public void onElementNotAdded(final AbstractSynchedPipe pipe) { if (!taskPool.scheduleStage(pipe.getCachedTargetStage())) { throw new IllegalStateException(String.format("onElementNotAdded: scheduling target stage failed for %s", pipe.getCachedTargetStage())); } diff --git a/src/main/java/teetime/framework/scheduling/pushpullmodel/A3PipeInstantiation.java b/src/main/java/teetime/framework/scheduling/pushpullmodel/A3PipeInstantiation.java index 8bf18550..44df83a1 100644 --- a/src/main/java/teetime/framework/scheduling/pushpullmodel/A3PipeInstantiation.java +++ b/src/main/java/teetime/framework/scheduling/pushpullmodel/A3PipeInstantiation.java @@ -32,7 +32,7 @@ /** * Automatically instantiates the correct pipes */ -class A3PipeInstantiation implements ITraverserVisitor, PipeScheduler { +class A3PipeInstantiation implements ITraverserVisitor, PipeScheduler { private static final Logger LOGGER = LoggerFactory.getLogger(A3PipeInstantiation.class); @@ -89,17 +89,17 @@ private void instantiatePipe(final IPipe pipe) { } @Override - public void onElementAdded(final AbstractUnsynchedPipe pipe) { + public void onElementAdded(final AbstractUnsynchedPipe pipe) { pipe.getCachedTargetStage().executeByFramework(); } @Override - public void onElementAdded(final AbstractSynchedPipe pipe) { + public void onElementAdded(final AbstractSynchedPipe pipe) { // do nothing } @Override - public void onElementNotAdded(final AbstractSynchedPipe pipe) { + public void onElementNotAdded(final AbstractSynchedPipe pipe) { // do nothing } diff --git a/src/test/java/teetime/stage/taskfarm/monitoring/extraction/ExtractionTestPipe.java b/src/test/java/teetime/stage/taskfarm/monitoring/extraction/ExtractionTestPipe.java index 129baf21..dad442d6 100644 --- a/src/test/java/teetime/stage/taskfarm/monitoring/extraction/ExtractionTestPipe.java +++ b/src/test/java/teetime/stage/taskfarm/monitoring/extraction/ExtractionTestPipe.java @@ -142,7 +142,7 @@ public boolean isEmpty() { } @Override - public Object removeLast() { + public T removeLast() { return null; }