ThreadedProcessExecutionEngineImpl.java revision 2
2ronwalf/*
2ronwalf * Created on 21.04.2005
2ronwalf */
2ronwalfpackage impl.owls.process.execution;
2ronwalf
2ronwalfimport java.util.Iterator;
2ronwalfimport java.util.Vector;
2ronwalf
2ronwalfimport org.mindswap.exceptions.ExecutionException;
2ronwalfimport org.mindswap.owl.OWLIndividual;
2ronwalfimport org.mindswap.owl.OWLIndividualList;
2ronwalfimport org.mindswap.owl.list.RDFList;
2ronwalfimport org.mindswap.owls.generic.list.OWLSObjList;
2ronwalfimport org.mindswap.owls.process.AnyOrder;
2ronwalfimport org.mindswap.owls.process.AtomicProcess;
2ronwalfimport org.mindswap.owls.process.Condition;
2ronwalfimport org.mindswap.owls.process.ControlConstruct;
2ronwalfimport org.mindswap.owls.process.ControlConstructBag;
2ronwalfimport org.mindswap.owls.process.ControlConstructList;
2ronwalfimport org.mindswap.owls.process.ForEach;
2ronwalfimport org.mindswap.owls.process.Parameter;
2ronwalfimport org.mindswap.owls.process.Perform;
2ronwalfimport org.mindswap.owls.process.Process;
2ronwalfimport org.mindswap.owls.process.RepeatUntil;
2ronwalfimport org.mindswap.owls.process.RepeatWhile;
2ronwalfimport org.mindswap.owls.process.Sequence;
2ronwalfimport org.mindswap.owls.process.ValueOf;
2ronwalfimport org.mindswap.owls.process.execution.ThreadedProcessExecutionEngine;
2ronwalfimport org.mindswap.owls.process.execution.ThreadedProcessExecutionListener;
2ronwalfimport org.mindswap.owls.vocabulary.OWLS;
2ronwalfimport org.mindswap.query.ValueMap;
2ronwalfimport org.mindswap.swrl.Variable;
2ronwalf
2ronwalf/**
2ronwalf * @author Michael Daenzer, University of Zurich
2ronwalf */
2ronwalfpublic class ThreadedProcessExecutionEngineImpl extends ProcessExecutionEngineImpl
2ronwalf implements Runnable, ThreadedProcessExecutionEngine {
2ronwalf
2ronwalf private static final int DEFAULT_INTERVAL = 5000;
2ronwalf
2ronwalf private boolean resultReady = false;
2ronwalf private boolean interrupted = false;
2ronwalf private int interval = DEFAULT_INTERVAL;
2ronwalf
2ronwalf private Process process = null;
2ronwalf private ValueMap values = null;
2ronwalf private ValueMap resultMap = null;
2ronwalf private String processName;
2ronwalf
2ronwalf private Vector execListener = new Vector();
2ronwalf
2ronwalf /* (non-Javadoc)
2ronwalf * @see java.lang.Runnable#run()
2ronwalf */
2ronwalf public void run() {
2ronwalf executionHasStarted();
2ronwalf setResultMap(super.execute(this.process, this.values));
2ronwalf }
2ronwalf
2ronwalf /* (non-Javadoc)
2ronwalf * @see org.mindswap.owls.process.execution.ProcessExecutionEngine#execute(org.mindswap.owls.process.Process, org.mindswap.query.ValueMap)
2ronwalf */
2ronwalf public void executeThreaded(Process p, ValueMap values) {
2ronwalf this.process = p;
2ronwalf this.processName = p.getName();
2ronwalf if (this.processName == null)
2ronwalf this.processName = p.getLocalName();
2ronwalf this.values = values;
2ronwalf execAsynchronous();
2ronwalf }
2ronwalf
2ronwalf // starts a method asynchronously in a separate thread
2ronwalf public void execAsynchronous() {
2ronwalf setResultReady(false);
2ronwalf Thread threadedExec = new Thread(this);
2ronwalf threadedExec.start();
2ronwalf }
2ronwalf
2ronwalf /**
2ronwalf * @see impl.owls.process.execution.ProcessExecutionEngine#createSequence(org.mindswap.owls.process.Sequence)
2ronwalf */
2ronwalf protected void executeSequence(Sequence cc) {
2ronwalf ControlConstructList ccList = cc.getComponents();
2ronwalf
2ronwalf for(int i = 0; i < ccList.size(); i++) {
2ronwalf ControlConstruct component = ccList.constructAt(i);
2ronwalf
2ronwalf if (isInterrupted())
2ronwalf processInterruption();
2ronwalf
2ronwalf executeConstruct(component);
2ronwalf }
2ronwalf }
2ronwalf
2ronwalf
2ronwalf /* (non-Javadoc)
2ronwalf * @see impl.owls.process.execution.ProcessExecutionEngineImpl#executeAnyOrder(org.mindswap.owls.process.AnyOrder)
2ronwalf */
2ronwalf protected void executeAnyOrder(AnyOrder cc) {
2ronwalf ControlConstructBag ccList = cc.getComponents();
2ronwalf
2ronwalf // AnyOrder says it doesn't matter in which order subelements
2ronwalf // are executed so let's try the sequential order
2ronwalf // FIXME check preconditions to find a correct ordering
2ronwalf OWLIndividualList list = ccList.getAll();
2ronwalf for(int i = 0; i < list.size(); i++) {
2ronwalf ControlConstruct component = (ControlConstruct) list.individualAt(i);
2ronwalf
2ronwalf if (isInterrupted())
2ronwalf processInterruption();
2ronwalf
2ronwalf executeConstruct(component);
2ronwalf }
2ronwalf }
2ronwalf
2ronwalf protected void executeForEach(ForEach cc) {
2ronwalf ValueMap parentValues = (ValueMap) performResults.get(OWLS.Process.TheParentPerform);
2ronwalf
2ronwalf ControlConstruct loopBody = cc.getComponent();
2ronwalf Variable loopVar = cc.getLoopVar();
2ronwalf ValueOf valueOf = cc.getListValue();
2ronwalf
2ronwalf Perform otherPerform = valueOf.getPerform();
2ronwalf Parameter otherParam = valueOf.getParameter();
2ronwalf
2ronwalf ValueMap performResult = (ValueMap) performResults.get(otherPerform);
2ronwalf if(performResult == null)
2ronwalf throw new ExecutionException( "Perform " + otherPerform + " cannot be found!" );
2ronwalf
2ronwalf OWLIndividual ind = performResult.getIndividualValue(otherParam);
2ronwalf RDFList list = (RDFList) ind.castTo(OWLSObjList.class);
2ronwalf
2ronwalf for( ; !list.isEmpty(); list = list.getRest() ) {
2ronwalf OWLIndividual value = list.getFirst();
2ronwalf parentValues.setValue(loopVar, value);
2ronwalf
2ronwalf if (isInterrupted())
2ronwalf processInterruption();
2ronwalf
2ronwalf executeConstruct(loopBody);
2ronwalf }
2ronwalf }
2ronwalf
2ronwalf protected void executeRepeatUntil(RepeatUntil cc) {
2ronwalf Condition whileCondition = cc.getCondition();
2ronwalf ControlConstruct loopBody = cc.getComponent();
2ronwalf
2ronwalf do {
2ronwalf if (isInterrupted())
2ronwalf processInterruption();
2ronwalf
2ronwalf executeConstruct(loopBody);
2ronwalf }
2ronwalf while( isTrue( whileCondition ) );
2ronwalf }
2ronwalf
2ronwalf protected void executeRepeatWhile(RepeatWhile cc) {
2ronwalf Condition whileCondition = cc.getCondition();
2ronwalf ControlConstruct loopBody = cc.getComponent();
2ronwalf
2ronwalf while( isTrue( whileCondition ) ) {
2ronwalf if (isInterrupted())
2ronwalf processInterruption();
2ronwalf
2ronwalf executeConstruct(loopBody);
2ronwalf }
2ronwalf
2ronwalf }
2ronwalf
2ronwalf protected ValueMap executeAtomicProcess(AtomicProcess process, ValueMap values) {
2ronwalf atomicProcessStarted(process);
2ronwalf ValueMap result = super.executeAtomicProcess(process, values);
2ronwalf atomicProcessEnded(process);
2ronwalf return result;
2ronwalf }
2ronwalf
2ronwalf protected ValueMap executePerform(Perform p) {
2ronwalf ValueMap values = super.executePerform(p);
2ronwalf
2ronwalf Iterator iter = values.getVariables().iterator();
2ronwalf while (iter.hasNext()) {
2ronwalf Parameter param = (Parameter) iter.next();
2ronwalf if (param.isIndividual()) {
2ronwalf //OWLIndividual ind = (OWLIndividual) param;
2ronwalf //if (ind.isType(NextOnt.nextOutput) || ind.isType(NextOnt.nextInput))
2ronwalf parameterValueSet(param, values.getStringValue(param));
2ronwalf }
2ronwalf }
2ronwalf
2ronwalf return values;
2ronwalf }
2ronwalf
2ronwalf public void continueExec() {
2ronwalf setInterrupted(false);
2ronwalf executionContinued();
2ronwalf }
2ronwalf
2ronwalf public void interruptExec() {
2ronwalf interruptExec(DEFAULT_INTERVAL);
2ronwalf }
2ronwalf
2ronwalf public void interruptExec(int millisToSleep) {
2ronwalf setInterrupted(true);
2ronwalf setInterval(millisToSleep);
2ronwalf executionInterrupted();
2ronwalf }
2ronwalf
2ronwalf protected boolean isInterrupted() {
2ronwalf return interrupted;
2ronwalf }
2ronwalf
2ronwalf protected void setInterrupted(boolean interrupted) {
2ronwalf this.interrupted = interrupted;
2ronwalf }
2ronwalf
2ronwalf protected void setResultMap(ValueMap results) {
2ronwalf if (results != null) {
2ronwalf resultMap = results;
2ronwalf setResultReady(true);
2ronwalf }
2ronwalf }
2ronwalf
2ronwalf public ValueMap getResultMap() {
2ronwalf setResultReady(false);
2ronwalf return resultMap;
2ronwalf }
2ronwalf
2ronwalf public boolean isResultReady() {
2ronwalf return resultReady;
2ronwalf }
2ronwalf
2ronwalf public void setResultReady(boolean resultState) {
2ronwalf resultReady = resultState;
2ronwalf if (resultState)
2ronwalf executionHasFinished();
2ronwalf }
2ronwalf
2ronwalf // waits until the interruption ended
2ronwalf private void processInterruption() {
2ronwalf while (isInterrupted()) {
2ronwalf try {
2ronwalf Thread.sleep(interval);
2ronwalf } catch (InterruptedException e) {
2ronwalf e.printStackTrace();
2ronwalf }
2ronwalf }
2ronwalf }
2ronwalf
2ronwalf /**
2ronwalf * invokes executionInterrupted() on all registred listeners
2ronwalf */
2ronwalf protected void executionInterrupted() {
2ronwalf Object[] listeners = execListener.toArray();
2ronwalf for (int i = 0; i < listeners.length; i++)
2ronwalf ((ThreadedProcessExecutionListener) listeners[i]).executionInterrupted(process);
2ronwalf }
2ronwalf
2ronwalf /**
2ronwalf * invokes executionContinued() on all registred listeners
2ronwalf */
2ronwalf protected void executionContinued() {
2ronwalf Object[] listeners = execListener.toArray();
2ronwalf for (int i = 0; i < listeners.length; i++)
2ronwalf ((ThreadedProcessExecutionListener) listeners[i]).executionContinued(process);
2ronwalf }
2ronwalf
2ronwalf /**
2ronwalf * invokes atomicProcessStarted(AtomicProcess atomicProcess) on all registred listeners
2ronwalf * @param atomicProcess the AtomicProcess whichs execution started
2ronwalf */
2ronwalf protected void atomicProcessStarted(AtomicProcess atomicProcess) {
2ronwalf Object[] listeners = execListener.toArray();
2ronwalf for (int i = 0; i < listeners.length; i++)
2ronwalf ((ThreadedProcessExecutionListener) listeners[i]).atomicProcessStarted(atomicProcess);
2ronwalf }
2ronwalf
2ronwalf /**
2ronwalf * invokes atomicProcessEnded(AtomicProcess atomicProcess) on all registred listeners
2ronwalf * @param atomicProcess the AtomicProcess whichs execution finished
2ronwalf */
2ronwalf protected void atomicProcessEnded(AtomicProcess atomicProcess) {
2ronwalf Object[] listeners = execListener.toArray();
2ronwalf for (int i = 0; i < listeners.length; i++)
2ronwalf ((ThreadedProcessExecutionListener) listeners[i]).atomicProcessEnded(atomicProcess);
2ronwalf }
2ronwalf
2ronwalf protected void parameterValueSet(Parameter param, String value) {
2ronwalf Object[] listeners = execListener.toArray();
2ronwalf for (int i = 0; i < listeners.length; i++)
2ronwalf ((ThreadedProcessExecutionListener) listeners[i]).parameterValueSet(process, param, value);
2ronwalf }
2ronwalf
2ronwalf protected void executionHasFinished() {
2ronwalf Object[] listeners = execListener.toArray();
2ronwalf for (int i = 0; i < listeners.length; i++)
2ronwalf ((ThreadedProcessExecutionListener) listeners[i]).executionFinished(process);
2ronwalf }
2ronwalf
2ronwalf protected void executionHasStarted() {
2ronwalf Object[] listeners = execListener.toArray();
2ronwalf for (int i = 0; i < listeners.length; i++)
2ronwalf ((ThreadedProcessExecutionListener) listeners[i]).executionStarted(process);
2ronwalf }
2ronwalf
2ronwalf /* (non-Javadoc)
2ronwalf * @see org.mindswap.owls.process.execution.ThreadedProcessExecutionEngine#addExecutionListener(org.mindswap.owls.process.execution.ThreadedProcessExecutionListener)
2ronwalf */
2ronwalf synchronized public void addExecutionListener(ThreadedProcessExecutionListener listener) {
2ronwalf execListener.add(listener);
2ronwalf }
2ronwalf
2ronwalf /* (non-Javadoc)
2ronwalf * @see org.mindswap.owls.process.execution.ThreadedProcessExecutionEngine#removeExecutionListener(org.mindswap.owls.process.execution.ThreadedProcessExecutionListener)
2ronwalf */
2ronwalf synchronized public void removeExecutionListener(ThreadedProcessExecutionListener listener) {
2ronwalf execListener.remove(listener);
2ronwalf }
2ronwalf
2ronwalf public int getInterval() {
2ronwalf return interval;
2ronwalf }
2ronwalf
2ronwalf public void setInterval(int interval) {
2ronwalf this.interval = interval;
2ronwalf }
2ronwalf
2ronwalf public String getProcessName() {
2ronwalf return processName;
2ronwalf }
2ronwalf
2ronwalf}