# -*- coding: utf-8 -*-
# $Id: Spooler.py 1634 2013-04-12 15:36:36Z amelung $
#
# Copyright (c) 2007-2011 Otto-von-Guericke-Universität Magdeburg
#
# This file is part of ECSpooler.
#
import os
import sys
import time
import thread
import threading
import xmlrpclib
import traceback
import logging
# local imports
import config
# define a default logger for spooler classes
"""
The Spooler manages incomming jobs and backends. Each job will be turned
over to a backend.
"""
# resourcestring
"""
Creates a new spooler instance at the given host and port.
@param: host: host name
@param: port: port number
@param: pwd_file: absolute path to password file
"""
# a dictionary for backends
# q queue to managing incomming jobs
# a queue for managing backend results
# a queue for managing jobs which cannot executed because backend was busy
# doqueue thread (we will use only one thread)
self._doqueueThread = None
# TODO:
#self._doqueueThreadCond = threading.Condition()
"""
"""
#self._server.register_function(self.stopBackend)
"""
Starts a thread which checks the queue for new jobs.
"""
# start a new thread which runs the doqueue-method
return True
"""
Stops the scheduler thread and all backends before shutting down
the spooler itself.
"""
# stop doqueue thread
# We make a copy of self._backends because it will change during
# iteration: backends call removeBackend in their shutdown method,
# which will change self._backends.
# TODO: Don't if SMF is enabled, since the backend would be respawned
# automatically by SMF (self healing) - env var SMF_METHOD
# is an indicator, that the service is running under SMF
# just unregister it and perhaps send a message to the backend to
# wait a moment so that backends have enough time to unregister
# def stopBackend(self, authdata, uid):
# """
# Stopps the backend with the given name by invoking the
# shutdown method for this backend. The backend itself then
# invokes the removeBackend method in the spooler.
#
# @param: authdata: username and password for authentication
# @param: uid: a backend's unique ID
# @return: (code, msg) with
# code == 1, stopping backend succeeded
# code != 1, stopping backend failed; msg contains further information
# """
# if not self._auth.test(authdata, auth.STOP_BACKEND):
# return (-110, self.ERROR_AUTH_FAILED)
#
# if uid in self._backends:
# grp = self._backends[uid]
#
# for backend in grp:
# LOG.info("Stopping backend '%s' at '%s'" % (uid, backend['url']))
# # TODO: don't if SMF is enabled - self healing - see above
# self._callBackend(backend["url"], "shutdown")
#
# return (1, '')
# else:
# return (-120, "Backend '%s' is not registered" % uid)
# xmlrpc
"""
Adds a backend to the spooler's list of available backends.
Each backend calls the spooler on being started, and tells
the server its ID, name and URL. The spooler returns
a specific ID, which is a random number created at server
startup. This ID is sent to the backend on each request and
thus authorizes the request. Only the spooler to which the
backend is attached can perform requests to this backend.
@param: authdata: username and password for authentication
@param: backendId: a backend's unique ID
@param: name: a backend's name
@param: version: a backend's version
@param: url: a backend's URL
@return: this server's ID (for further communication)
"""
#return (-110, self.ERROR_AUTH_FAILED)
# 1st backend of this type
'name': name,
'version': version,
'url': url,
'isBusy': False})
"""
Removes a backend from the list of available backends in this spooler.
@param: authdata: username and password for authentication
@param: backendId: a backend's ID
@return: (code, msg) with
code == 1, removing backend succeeded
code != 1, removing backend failed; msg contains further information
"""
#return (-110, self.ERROR_AUTH_FAILED)
#backend = self._backends[backendId]
(backend['name'],
backend['version'],
backend['id'],
backend['url']))
#self._backends[backendId].remove(backend)
# end if
# end for
return True
else:
"""
@see: appendJob
"""
"""
Adds a new test to the queue
@param: authdata: username and password for authentication
@param: jobdata: relevant job data (see also class CheckJob)
@return: (code, msg) with
code == 1, enqueue succeeded and msg contains the job id
code < 0, enqueue failed and msg contains further information
"""
#return (-110, self.ERROR_AUTH_FAILED)
try:
# create a new BackenJob instance
# get the backend from the job
# if no backend with the backendId is currently registered to this
# spooler an appropriate message will be returned
# append the job
except Exception, e:
"""
Returns a dictionary with the results of all performed jobs. Once the
results are polled, they are no longer stored.
@param: authdata: username and password for authentication
@return: a dictionary
"""
#return (-110, self.ERROR_AUTH_FAILED)
result = {}
return result
"""
@see: getResult
"""
"""
Returns a dictionary { jobID: QueueItem.getData() } with
the result of the performed check job for the given ID.
Once the result is polled, it is no longer stored.
@param: authdata: username and password for authentication
@param: jobId: a valid job ID
@return: a dictionary with 'id' as key and another dictionary with keys
'value', 'message', etc. representing the test results
"""
#return {'value':-110, 'message':self.ERROR_AUTH_FAILED}
result = {}
if item:
else:
return result
"""
Returns a dict with some status information:
"pid": the process ID
"backends": a list of the attached backends
"queue": the number of items in the queue
"results": the number of cached result data
@param: authdata: username and password for authentication
"""
#return (-110, self.ERROR_AUTH_FAILED)
return {
#"pid": os.getpid(),
}
"""
Returns the process ID
@param authdata: username and password for authentication
@return: current process ID
"""
#return (-110, self.ERROR_AUTH_FAILED)
"""
Returns a dict with all currently available backends.
@param: authdata: username and password for authentication
@return: dict with backend names as keys
"""
#return (-110, self.ERROR_AUTH_FAILED)
result = {}
# get backend group
# if group has at least one backend instance, add it
# to result
return result
"""
Returns a dict with status information of a single backend.
@param: authdata: username and password for authentication
@param: backendId: a backend's unique ID
"""
#return (-110, self.ERROR_AUTH_FAILED)
#return (-112, "No such backend: %s" % backendId)
# if group has at least one backend instance, we will use the first entry
else:
#return (-112, "No such backend: %s" % backendId)
"""
Returns information about additional fields required by this backend.
@param: authdata: username and password for authentication
@param: backendId: a backend's unique ID
@see: Backend.getInputFields
"""
#return (-110, self.ERROR_AUTH_FAILED)
#return (-112, "No such backend: %s" % backendId)
# if group has at least one backend instance, add it
# to result
else:
#return (-112, "No such backend: %s" % backendId)
"""
Returns informationen about test scenarios available by this backend.
@param: authdata: username and password for authentication
@param: backendId: a backend's unique ID
@see. Backend.getTestFields
"""
#return (-110, self.ERROR_AUTH_FAILED)
#return (-112, "No such backend: %s" % backendId)
# if group has at least one backend instance, add it
# to result
else:
#return (-112, "No such backend: %s" % backendId)
"""
@return: True if a backend with the given backendId is registered,
otherwise False
"""
return True
else:
return False
"""
Performs the dispatching of a job to a backend.
This method is called in a separate thread, which is opened
and managed by the threading.Thread class. _doqueue() runs in a
loop until _doqueue_thread_stop is True.
"""
while not self._doqueueThreadExit:
try:
#self._doqueueThreadLock.acquire()
# is the queue empty?
# get next job from the queue and the selected backend
# backend groups
backend = None
for b in grp:
# backend busy?
backend = b
break
# end for
if backend == None:
# backend is bussy, try again later
# try later
#self._queue.enqueue(job)
# put this job in retry-queue
# process job
else:
# dont block - continue to service other backends
# end if
else:
# enqueue the job so maybe later, if the backend
# is available, we can process it
# end if
else:
#LOG.debug('_doqueue: self._queue is empty');
pass
except Exception, e:
# wait a defined time before resuming
# end while loop
"""
Processing the job by dispatching it to the backend.
@param: backend: a dict with backend attributes (such as id, name, url,)
@param: job: a job instance
"""
try:
# invoke remote method call
# result from backend must be of type dict
# this should be normal case
# probably some kind of error in the backend
else:
# unexpected or unhandled result
except Exception, e:
# move jobs from retry queue back to default queue
#LOG.debug('jobId: %s' % job.getId())
#LOG.debug('data: %s' % result.getData())
return result
"""
Move jobs from retry queue back to default queue
"""
if size > 0:
% size)
"""
Executes an xmlrpc call.
@param: url: backend's URL
@param: method: name of method that will be invoked
@return: a tuple with code and result or an error message
"""
#LOG.debug('xxx: %s' % repr(kw))
#LOG.debug("xmlrpclib.Server('%s')" % (url))
try:
#LOG.debug('_callBackend: %s' % repr(result))
return result
#except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError), exc:
except Exception, e: