2ronwalf/*
2ronwalf * Created on 21.04.2005
2ronwalf */
2ronwalfpackage impl.owls.process.execution;
2ronwalf
2ronwalfimport java.util.Iterator;
2ronwalf
2ronwalfimport org.mindswap.exceptions.ExecutionException;
2ronwalfimport org.mindswap.owls.process.AtomicProcess;
2ronwalfimport org.mindswap.owls.process.ControlConstruct;
2ronwalfimport org.mindswap.owls.process.Process;
3daenzeroramaimport org.mindswap.owls.process.execution.ProcessMonitor;
2ronwalfimport org.mindswap.owls.process.execution.ThreadedProcessExecutionEngine;
2ronwalfimport org.mindswap.query.ValueMap;
2ronwalf
2ronwalf/**
38daenzerorama * @author Michael Daenzer (University of Zurich)
2ronwalf */
3daenzerorama
3daenzerorama//TODO dmi introduce new Factory classes for easy handling of several parallel executions.
3daenzerorama//TODO dmi think about execute method for processes (this is OO)
2ronwalfpublic class ThreadedProcessExecutionEngineImpl extends ProcessExecutionEngineImpl
2ronwalf implements Runnable, ThreadedProcessExecutionEngine {
2ronwalf
3daenzerorama private int sleepInterval = DEFAULT_SLEEP_INTERVAL;
3daenzerorama
3daenzerorama private boolean interrupted = false;
3daenzerorama private boolean stopped = false;
3daenzerorama private Process process;
3daenzerorama private ValueMap values;
4daenzerorama private AtomicProcess processInExecution;
2ronwalf
2ronwalf /* (non-Javadoc)
2ronwalf * @see java.lang.Runnable#run()
2ronwalf */
2ronwalf public void run() {
3daenzerorama super.execute(this.process, this.values);
2ronwalf }
2ronwalf
2ronwalf /* (non-Javadoc)
2ronwalf * @see org.mindswap.owls.process.execution.ProcessExecutionEngine#execute(org.mindswap.owls.process.Process, org.mindswap.query.ValueMap)
2ronwalf */
3daenzerorama public void executeInThread(Process process, ValueMap values) {
3daenzerorama this.process = process;
2ronwalf this.values = values;
3daenzerorama
2ronwalf Thread threadedExec = new Thread(this);
3daenzerorama threadedExec.start();
2ronwalf }
2ronwalf
3daenzerorama /*
3daenzerorama * (non-Javadoc)
3daenzerorama * @see impl.owls.process.execution.ProcessExecutionEngineImpl#executeAtomicProcess(org.mindswap.owls.process.AtomicProcess, org.mindswap.query.ValueMap)
2ronwalf */
2ronwalf protected ValueMap executeAtomicProcess(AtomicProcess process, ValueMap values) {
4daenzerorama processInExecution = process;
2ronwalf ValueMap result = super.executeAtomicProcess(process, values);
3daenzerorama monitorOutputs(result);
2ronwalf return result;
2ronwalf }
3daenzerorama
3daenzerorama /*
3daenzerorama * (non-Javadoc)
3daenzerorama * @see impl.owls.process.execution.ProcessExecutionEngineImpl#executeConstruct(org.mindswap.owls.process.ControlConstruct)
3daenzerorama */
3daenzerorama protected void executeConstruct(ControlConstruct cc) {
3daenzerorama if (isInterrupted())
3daenzerorama processInterruption();
2ronwalf
3daenzerorama if (isStopped())
3daenzerorama processStop();
3daenzerorama
3daenzerorama super.executeConstruct(cc);
3daenzerorama }
3daenzerorama
3daenzerorama /*
3daenzerorama * (non-Javadoc)
3daenzerorama * @see org.mindswap.owls.process.execution.ThreadedProcessExecutionEngine#continueExecution()
3daenzerorama */
3daenzerorama public void continueExecution() {
2ronwalf setInterrupted(false);
3daenzerorama monitorContinuation();
2ronwalf }
2ronwalf
3daenzerorama /*
3daenzerorama * (non-Javadoc)
3daenzerorama * @see org.mindswap.owls.process.execution.ThreadedProcessExecutionEngine#interruptExecution()
3daenzerorama */
3daenzerorama public void interruptExecution() {
3daenzerorama interruptExecution(sleepInterval);
2ronwalf }
2ronwalf
3daenzerorama /*
3daenzerorama * (non-Javadoc)
3daenzerorama * @see org.mindswap.owls.process.execution.ThreadedProcessExecutionEngine#interruptExecution(int)
3daenzerorama */
4daenzerorama public void interruptExecution(int millisToSleep) {
2ronwalf setInterrupted(true);
3daenzerorama setSleepInterval(millisToSleep);
3daenzerorama monitorInterruption();
2ronwalf }
2ronwalf
3daenzerorama public void stopExecution() throws ExecutionException {
3daenzerorama setStopped(true);
3daenzerorama executionFailed("Execution stopped by user");
3daenzerorama }
3daenzerorama
3daenzerorama /**
3daenzerorama * Checks whether interrupted flag is set or not
3daenzerorama *
3daenzerorama * @return true, if flag set. false, otherwise.
3daenzerorama */
2ronwalf protected boolean isInterrupted() {
2ronwalf return interrupted;
2ronwalf }
2ronwalf
3daenzerorama /**
3daenzerorama * Sets the interrupted flag for the given process
3daenzerorama *
3daenzerorama * @param interrupted the new value of the flag
3daenzerorama */
2ronwalf protected void setInterrupted(boolean interrupted) {
3daenzerorama this.interrupted = interrupted;
2ronwalf }
3daenzerorama
3daenzerorama /**
3daenzerorama * Checks whether interrupted flag is set or not
3daenzerorama *
3daenzerorama * @return true, if flag set. false, otherwise.
3daenzerorama */
3daenzerorama protected boolean isStopped() {
3daenzerorama return stopped;
2ronwalf }
2ronwalf
3daenzerorama /**
3daenzerorama * Sets the interrupted flag for the given process
3daenzerorama *
3daenzerorama * @param interrupted the new value of the flag
3daenzerorama */
3daenzerorama protected void setStopped(boolean stopped) {
3daenzerorama this.stopped = stopped;
2ronwalf }
2ronwalf
3daenzerorama /**
3daenzerorama * Handles the interruption in a loop checking each <code>sleepInterval</code>.
3daenzerorama * milliseconds the interruption state.
3daenzerorama */
2ronwalf private void processInterruption() {
2ronwalf while (isInterrupted()) {
2ronwalf try {
3daenzerorama Thread.sleep(sleepInterval);
2ronwalf } catch (InterruptedException e) {
2ronwalf e.printStackTrace();
2ronwalf }
2ronwalf }
2ronwalf }
2ronwalf
2ronwalf /**
3daenzerorama * Handles a stop of the execution and loops until object is garbage collected
2ronwalf */
3daenzerorama private void processStop() {
3daenzerorama while (true) {
3daenzerorama try {
3daenzerorama Thread.sleep(sleepInterval);
3daenzerorama } catch (InterruptedException e) {
3daenzerorama e.printStackTrace();
3daenzerorama }
3daenzerorama }
2ronwalf }
2ronwalf
2ronwalf /**
3daenzerorama * invokes the executionInterrupted() method on all registred monitors
2ronwalf */
3daenzerorama protected void monitorInterruption() {
3daenzerorama for(Iterator i = monitors.iterator(); i.hasNext();) {
3daenzerorama ProcessMonitor monitor = (ProcessMonitor) i.next();
4daenzerorama monitor.executionInterrupted(process, processInExecution);
3daenzerorama }
2ronwalf }
2ronwalf
2ronwalf /**
3daenzerorama * invokes the executionContinued() method on all registered monitors
2ronwalf */
3daenzerorama protected void monitorContinuation() {
3daenzerorama for(Iterator i = monitors.iterator(); i.hasNext();) {
3daenzerorama ProcessMonitor monitor = (ProcessMonitor) i.next();
3daenzerorama monitor.executionContinued(process);
3daenzerorama }
2ronwalf }
3daenzerorama
2ronwalf /**
3daenzerorama * invokes the getProcessResults() method on all registered monitors
2ronwalf */
3daenzerorama protected void monitorOutputs(ValueMap values) {
3daenzerorama for(Iterator i = monitors.iterator(); i.hasNext();) {
3daenzerorama ProcessMonitor monitor = (ProcessMonitor) i.next();
3daenzerorama monitor.intermediateResultsReceived(values);
3daenzerorama }
2ronwalf }
2ronwalf
3daenzerorama /*
3daenzerorama * (non-Javadoc)
3daenzerorama * @see org.mindswap.owls.process.execution.ThreadedProcessExecutionEngine#getSleepInterval()
3daenzerorama */
3daenzerorama public int getSleepInterval() {
3daenzerorama return sleepInterval;
2ronwalf }
2ronwalf
3daenzerorama /*
3daenzerorama * (non-Javadoc)
3daenzerorama * @see org.mindswap.owls.process.execution.ThreadedProcessExecutionEngine#setSleepInterval(int)
2ronwalf */
3daenzerorama public void setSleepInterval(int sleepInterval) {
3daenzerorama this.sleepInterval = sleepInterval;
3daenzerorama }
2ronwalf}