I had written a batch processor in Java a long time ago.
The code basically accepts Callables and executes them.
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class BatchProcessor {
private List<Callable<?>> tasks;
private final ExecutorService executors;
private final static Logger logger = LoggerFactory.getLogger(BatchProcessor.class);
public BatchProcessor() {
executors = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
tasks = new ArrayList<>();
}
public <T> void addTask(Callable<T> task) {
if (task == null) return;
tasks.add(task);
}
public List<Object> run() {
if (tasks.isEmpty()) return null;
List<Object> results = new ArrayList<>();
List<Future<?>> futures = new ArrayList<>();
for (Callable<?> task: tasks) {
Future<?> f = executors.submit(task);
futures.add(f);
}
int done = 0;
logger.info(String.format("run - Progress %d / %d completed", done, tasks.size()));
try {
for (int i = 0; i < tasks.size(); i++) {
results.add(futures.get(i).get());
logger.info(String.format("run - Progress %d / %d completed", ++done, tasks.size()));
}
executors.shutdown();
if (!executors.awaitTermination(10, TimeUnit.MINUTES)) {
logger.error("run - timed out");
}
} catch(ExecutionException ee) {
logger.info("run - ExecutionException " + ee.toString());
} catch(InterruptedException ee) {
logger.info("run - InterruptedException");
}
return results;
}
public static void main(String[] args) {
BatchProcessor bp = new BatchProcessor();
bp.addTask(new Callable<String>() {
@Override
public String call() {
return "hello world";
}
});
bp.addTask(new Callable<Integer>() {
@Override
public Integer call() {
return 29;
}
});
List<Object> lst = bp.run();
for (Object o: lst) {
if (o instanceof Integer) {
System.out.println((int) o);
}
if (o instanceof String) {
System.out.println((String) o);
}
}
}
}
Are there any improvements that I can make in this code?
ExecutorServicealready has aninvokeAllmethod Your logging is pointless as it only logs the progress of asking for the results which says nothing about the actual completion which can be far ahead of that. There’s also no need to callawaitTerminationwhen there are knowingly no jobs left. And Java has lambda expressions for more than a decade now; there is no need to write inner classes to implement trivialCallables. \$\endgroup\$if (tasks.isEmpty()) return null;is a huge code smell, turning a valid input into anullresult. If an empty list goes in, an empty list should go out,if (tasks.isEmpty()) return Collections.emptyList();\$\endgroup\$