Skip to content

Commit

Permalink
feat: fix ExecutorCompletionService workaround in ChunkProcessingPipe…
Browse files Browse the repository at this point in the history
…line (#5128)
  • Loading branch information
BenjaminAmos authored Aug 13, 2023
1 parent e33518e commit 36591e1
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 93 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright 2023 The Terasology Foundation
// SPDX-License-Identifier: Apache-2.0

package org.terasology.engine.world.chunks.pipeline;

import org.joml.Vector3i;
import org.joml.Vector3ic;
import org.terasology.engine.world.chunks.Chunk;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* A specialised alternative to {@link java.util.concurrent.ExecutorCompletionService},
* used for submitting chunk tasks and queuing their results.
*
* Whilst this class adheres to the {@link CompletionService} interface, use of the class's
* {@link #submit(Callable, Vector3ic)} overload is preferred over those inherited from the interface.
*/
public class ChunkExecutorCompletionService implements CompletionService<Chunk> {
private static final Vector3ic EMPTY_VECTOR3I = new Vector3i();
private final ThreadPoolExecutor threadPoolExecutor;
private final BlockingQueue<Future<Chunk>> completionQueue;

private final class ChunkFutureWithCompletion extends PositionFuture<Chunk> {
ChunkFutureWithCompletion(Callable callable, Vector3ic position) {
super(callable, position);
}

ChunkFutureWithCompletion(Runnable runnable, Chunk result, Vector3ic position) {
super(runnable, result, position);
}

@Override
protected void done() {
super.done();
try {
completionQueue.put(this);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

Check warning on line 47 in engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkExecutorCompletionService.java

View check run for this annotation

Terasology Jenkins.io / SpotBugs

THROWS_METHOD_THROWS_RUNTIMEEXCEPTION

LOW: Method intentionally throws RuntimeException.
Raw output
no message found
}

public ChunkExecutorCompletionService(ThreadPoolExecutor threadPoolExecutor, BlockingQueue<Future<Chunk>> completionQueue) {

Check warning on line 50 in engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkExecutorCompletionService.java

View check run for this annotation

Terasology Jenkins.io / CheckStyle

InnerTypeLastCheck

LOW: Init blocks, constructors, fields and methods should be before inner types.
Raw output
<p>Since Checkstyle 5.2</p><p> Check nested (inner) classes/interfaces are declared at the bottom of the class after all method and field declarations. </p>
this.threadPoolExecutor = threadPoolExecutor;

Check warning on line 51 in engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkExecutorCompletionService.java

View check run for this annotation

Terasology Jenkins.io / SpotBugs

EI_EXPOSE_REP2

LOW: new org.terasology.engine.world.chunks.pipeline.ChunkExecutorCompletionService(ThreadPoolExecutor, BlockingQueue) may expose internal representation by storing an externally mutable object into ChunkExecutorCompletionService.threadPoolExecutor
Raw output
<p> This code stores a reference to an externally mutable object into the internal representation of the object.&nbsp; If instances are accessed by untrusted code, and unchecked changes to the mutable object would compromise security or other important properties, you will need to do something different. Storing a copy of the object is better approach in many situations.</p>
this.completionQueue = completionQueue;

Check warning on line 52 in engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkExecutorCompletionService.java

View check run for this annotation

Terasology Jenkins.io / SpotBugs

EI_EXPOSE_REP2

LOW: new org.terasology.engine.world.chunks.pipeline.ChunkExecutorCompletionService(ThreadPoolExecutor, BlockingQueue) may expose internal representation by storing an externally mutable object into ChunkExecutorCompletionService.completionQueue
Raw output
<p> This code stores a reference to an externally mutable object into the internal representation of the object.&nbsp; If instances are accessed by untrusted code, and unchecked changes to the mutable object would compromise security or other important properties, you will need to do something different. Storing a copy of the object is better approach in many situations.</p>
}

/**
* Submits a task to be executed.
* @param callable the task to submit
*
* @deprecated Use {@link #submit(Callable, Vector3ic)} instead
*/
@Override

Check warning on line 61 in engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkExecutorCompletionService.java

View check run for this annotation

Terasology Jenkins.io / CheckStyle

InnerTypeLastCheck

LOW: Init blocks, constructors, fields and methods should be before inner types.
Raw output
<p>Since Checkstyle 5.2</p><p> Check nested (inner) classes/interfaces are declared at the bottom of the class after all method and field declarations. </p>
@Deprecated
public Future<Chunk> submit(Callable<Chunk> callable) {
RunnableFuture<Chunk> task = new ChunkFutureWithCompletion(callable, EMPTY_VECTOR3I);
threadPoolExecutor.execute(task);
return task;
}

/**
* Submits a chunk task to be executed.
* @param callable the chunk task to execute.
* @param position the position of the chunk.
* @return the submitted task.
*/
public Future<Chunk> submit(Callable<Chunk> callable, Vector3ic position) {

Check warning on line 75 in engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkExecutorCompletionService.java

View check run for this annotation

Terasology Jenkins.io / CheckStyle

InnerTypeLastCheck

LOW: Init blocks, constructors, fields and methods should be before inner types.
Raw output
<p>Since Checkstyle 5.2</p><p> Check nested (inner) classes/interfaces are declared at the bottom of the class after all method and field declarations. </p>
RunnableFuture<Chunk> task = new ChunkFutureWithCompletion(callable, position);
threadPoolExecutor.execute(task);
return task;
}

/**
* Submits a task to be executed.
* @param runnable the task to run.
* @param value the value to return upon task completion.
*
* @deprecated Use {@link #submit(Callable, Vector3ic)} instead
*/
@Override

Check warning on line 88 in engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkExecutorCompletionService.java

View check run for this annotation

Terasology Jenkins.io / CheckStyle

InnerTypeLastCheck

LOW: Init blocks, constructors, fields and methods should be before inner types.
Raw output
<p>Since Checkstyle 5.2</p><p> Check nested (inner) classes/interfaces are declared at the bottom of the class after all method and field declarations. </p>
@Deprecated
public Future<Chunk> submit(Runnable runnable, Chunk value) {
RunnableFuture<Chunk> task = new ChunkFutureWithCompletion(runnable, value, EMPTY_VECTOR3I);
threadPoolExecutor.execute(task);
return task;
}

/**
* Retrieves a completed task from the queue.
* @return a completed task.
* @throws InterruptedException if interrupted whilst waiting on the queue.
*/
@Override

Check warning on line 101 in engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkExecutorCompletionService.java

View check run for this annotation

Terasology Jenkins.io / CheckStyle

InnerTypeLastCheck

LOW: Init blocks, constructors, fields and methods should be before inner types.
Raw output
<p>Since Checkstyle 5.2</p><p> Check nested (inner) classes/interfaces are declared at the bottom of the class after all method and field declarations. </p>
public Future<Chunk> take() throws InterruptedException {
return completionQueue.take();
}

/**
* Retrieves a completed task from the queue if not empty.
* @return a completed task, or null if there are no tasks in the queue.
*/
@Override

Check warning on line 110 in engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkExecutorCompletionService.java

View check run for this annotation

Terasology Jenkins.io / CheckStyle

InnerTypeLastCheck

LOW: Init blocks, constructors, fields and methods should be before inner types.
Raw output
<p>Since Checkstyle 5.2</p><p> Check nested (inner) classes/interfaces are declared at the bottom of the class after all method and field declarations. </p>
public Future<Chunk> poll() {
return completionQueue.poll();
}

/**
* Retrieves a completed task from the queue if not empty.
* @param l the timeout duration before returning null.
* @param timeUnit the time units of the timeout duration.
*
* @return a completed task, or null if there are no tasks in the queue.
*/
@Override

Check warning on line 122 in engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkExecutorCompletionService.java

View check run for this annotation

Terasology Jenkins.io / CheckStyle

InnerTypeLastCheck

LOW: Init blocks, constructors, fields and methods should be before inner types.
Raw output
<p>Since Checkstyle 5.2</p><p> Check nested (inner) classes/interfaces are declared at the bottom of the class after all method and field declarations. </p>
public Future<Chunk> poll(long l, TimeUnit timeUnit) throws InterruptedException {
return completionQueue.poll(l, timeUnit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,17 @@
import org.slf4j.LoggerFactory;
import org.terasology.engine.monitoring.ThreadActivity;
import org.terasology.engine.monitoring.ThreadMonitor;
import org.terasology.engine.utilities.ReflectionUtil;
import org.terasology.engine.world.chunks.Chunk;
import org.terasology.engine.world.chunks.pipeline.stages.ChunkTask;
import org.terasology.engine.world.chunks.pipeline.stages.ChunkTaskProvider;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand All @@ -50,7 +45,7 @@ public class ChunkProcessingPipeline {

private final List<ChunkTaskProvider> stages = Lists.newArrayList();
private final Thread reactor;
private final CompletionService<Chunk> chunkProcessor;
private final ChunkExecutorCompletionService chunkProcessor;
private final ThreadPoolExecutor executor;
private final Function<Vector3ic, Chunk> chunkProvider;
private final Map<Vector3ic, ChunkProcessingInfo> chunkProcessingInfoMap = Maps.newConcurrentMap();
Expand All @@ -66,36 +61,18 @@ public ChunkProcessingPipeline(Function<Vector3ic, Chunk> chunkProvider, Compara
NUM_TASK_THREADS,
NUM_TASK_THREADS, 0L,
TimeUnit.MILLISECONDS,
new PriorityBlockingQueue(800, unwrappingComporator(comparable)),
new PriorityBlockingQueue(800, comparable),
this::threadFactory,
this::rejectQueueHandler) {
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
return new PositionFuture<>(newTaskFor, ((PositionalCallable) callable).getPosition());
}
};
this::rejectQueueHandler);
logger.debug("allocated {} threads", NUM_TASK_THREADS);
chunkProcessor = new ExecutorCompletionService<>(executor,
chunkProcessor = new ChunkExecutorCompletionService(executor,
new PriorityBlockingQueue<>(800, comparable));
reactor = new Thread(this::chunkTaskHandler);
reactor.setDaemon(true);
reactor.setName("Chunk-Processing-Reactor");
reactor.start();
}

/**
* BlackMagic method: {@link ExecutorCompletionService} wraps task with QueueingFuture (private access)
* there takes wrapped task for comparing in {@link ThreadPoolExecutor}
*/
private Comparator unwrappingComporator(Comparator<Future<Chunk>> comparable) {
return (o1, o2) -> {
Object unwrapped1 = ReflectionUtil.readField(o1, "task");
Object unwrapped2 = ReflectionUtil.readField(o2, "task");
return comparable.compare((Future<Chunk>) unwrapped1, (Future<Chunk>) unwrapped2);
};
}

/**
* Reactor thread. Handles all ChunkTask dependency logic and running.
*/
Expand Down Expand Up @@ -190,11 +167,11 @@ private Chunk getChunkBy(ChunkTaskProvider requiredStage, Vector3ic position) {
}

private Future<Chunk> runTask(ChunkTask task, List<Chunk> chunks) {
return chunkProcessor.submit(new PositionalCallable(() -> {
return chunkProcessor.submit(() -> {
try (ThreadActivity ignored = ThreadMonitor.startThreadActivity(task.getName())) {

Check warning on line 171 in engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingPipeline.java

View check run for this annotation

Terasology Jenkins.io / SpotBugs

THROWS_METHOD_THROWS_CLAUSE_BASIC_EXCEPTION

LOW: Method lists Exception in its throws clause.
Raw output
no message found
return task.apply(chunks);
}
}, task.getPosition()));
}, task.getPosition());
}

private Thread threadFactory(Runnable runnable) {
Expand Down Expand Up @@ -236,8 +213,7 @@ public ListenableFuture<Chunk> invokeGeneratorTask(Vector3ic position, Supplier<
SettableFuture<Chunk> exitFuture = SettableFuture.create();
chunkProcessingInfo = new ChunkProcessingInfo(position, exitFuture);
chunkProcessingInfoMap.put(position, chunkProcessingInfo);
chunkProcessingInfo.setCurrentFuture(chunkProcessor.submit(new PositionalCallable(generatorTask::get,
position)));
chunkProcessingInfo.setCurrentFuture(chunkProcessor.submit(generatorTask::get, position));
return exitFuture;
}
}
Expand Down Expand Up @@ -317,26 +293,4 @@ public boolean isPositionProcessing(Vector3ic pos) {
public Iterable<Vector3ic> getProcessingPosition() {
return chunkProcessingInfoMap.keySet();
}

/**
* Dummy callable for passthru position for {@link java.util.concurrent.ThreadPoolExecutor}#newTaskFor
*/
private static final class PositionalCallable implements Callable<Chunk> {
private final Callable<Chunk> callable;
private final Vector3ic position;

private PositionalCallable(Callable<Chunk> callable, Vector3ic position) {
this.callable = callable;
this.position = position;
}

public Vector3ic getPosition() {
return position;
}

@Override
public Chunk call() throws Exception {
return callable.call();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,53 +5,23 @@

import org.joml.Vector3ic;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

public class PositionFuture<T> implements RunnableFuture<T> {

private final RunnableFuture<T> delegate;
public class PositionFuture<T> extends FutureTask<T> {
private final Vector3ic position;

public PositionFuture(RunnableFuture<T> delegate, Vector3ic position) {
this.delegate = delegate;
public PositionFuture(Callable callable, Vector3ic position) {
super(callable);
this.position = position;

Check warning on line 16 in engine/src/main/java/org/terasology/engine/world/chunks/pipeline/PositionFuture.java

View check run for this annotation

Terasology Jenkins.io / SpotBugs

EI_EXPOSE_REP2

LOW: new org.terasology.engine.world.chunks.pipeline.PositionFuture(Callable, Vector3ic) may expose internal representation by storing an externally mutable object into PositionFuture.position
Raw output
<p> This code stores a reference to an externally mutable object into the internal representation of the object.&nbsp; If instances are accessed by untrusted code, and unchecked changes to the mutable object would compromise security or other important properties, you will need to do something different. Storing a copy of the object is better approach in many situations.</p>
}

public Vector3ic getPosition() {
return position;
}

@Override
public void run() {
delegate.run();
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return delegate.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return delegate.isCancelled();
}

@Override
public boolean isDone() {
return delegate.isDone();
}

@Override
public T get() throws InterruptedException, ExecutionException {
return delegate.get();
public PositionFuture(Runnable runnable, T result, Vector3ic position) {
super(runnable, result);
this.position = position;

Check warning on line 21 in engine/src/main/java/org/terasology/engine/world/chunks/pipeline/PositionFuture.java

View check run for this annotation

Terasology Jenkins.io / SpotBugs

EI_EXPOSE_REP2

LOW: new org.terasology.engine.world.chunks.pipeline.PositionFuture(Runnable, Object, Vector3ic) may expose internal representation by storing an externally mutable object into PositionFuture.position
Raw output
<p> This code stores a reference to an externally mutable object into the internal representation of the object.&nbsp; If instances are accessed by untrusted code, and unchecked changes to the mutable object would compromise security or other important properties, you will need to do something different. Storing a copy of the object is better approach in many situations.</p>
}

@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException {
return delegate.get(timeout, unit);
public Vector3ic getPosition() {
return position;
}
}

0 comments on commit 36591e1

Please sign in to comment.