Deriving and using custom queue classes

To derive a custom queue class, the driverQueues module must be imported and the class must inherit directly or indirectly from the Queue class. Derived queues must provide an implementation for the submit method. Derived class methods can raise exceptions as needed. The predefined QueueError exception is provided as a general-purpose exception.

The following examples illustrate the derivation and use of custom queue classes:

    run_mode = BATCH
    from driverQueues import *
    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    class NiceQueue(Queue):
        #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
        def __repr__(self):
            return 'Executes analysis using Linux nice command.'
        #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
        def submit(self, options, env):
            job = options['job']
            after = options.get('after', '')
            verbose = options.get('verbose', 0)
            if options.get('after', ''):
                # a descriptive string must be supplied as data when
                # raising a QueueError exception
                #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
                raise QueueError, \
                      '"after" is not a valid argument for this queue.'
            # run nice under bourne shell to eliminate platform dependencies
            cmd = "/bin/sh -c 'nice %s python ./%s.com 1>./%s.log 2>&1 &'" \
                  % (self.getDriverName(), job, job)
            return self.spawn(cmd, env, verbose)
    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    class LSF_ResvQueue(LSFQueue):
        # For integration with LSF. This queue class supports cpu, memory,
        # and license reservations.
        #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
        def __init__(self, name, memReserve=0, cpusReserve=0):
            LSFQueue.__init__(self, name)
            self.memReserve = memReserve
            self.cpusReserve = cpusReserve
        #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
        def __repr__(self):
            return 'Submits to LSF %s queue (run "bqueues -l %s" for description)' \
                   % (self.name, self.name)
        #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
        def submit(self, options, env):
            job = options['job']
            verbose = options.get('verbose', 0)
            queue = self.name
            cpus = options.get('cpus', '1')
            if self.cpusReserve:
                cpus = self.cpusReserve
            resLst = [ ]
            # license reservation - For the following line to work, LSF
            # must be configured with a static or dynamic resource called
            # "abqtokens".
            #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
            resLst.append('abqtokens=%d' % self.getNumRequiredTokens(options))
            # memory reservation
            if self.memReserve:
                from math import ceil
                resLst.append('mem=%d' % int(ceil(self.memReserve/float(cpus))))
            resStr = ''
            if resLst:
                import string
                resStr = '-R rusage[%s]' % string.join(resLst, ':')
            bsub = 'bsub -q %s -J %s -n %s -o %s.log -N %s %s python %s.com' % \
                   (queue, job, cpus, job, resStr, self.getDriverName(), job)
            return self.spawn(bsub, env, verbose)
    # queue definitions
    queues['default']     = NiceQueue()
    queues['hold']        = HoldQueue()
    queues['benchmark']   = LSF_ResvQueue(name='benchmark', cpusReserve=16)
    queues['dedicated']   = LSF_ResvQueue(name='dedicated')
    queues['a500M']       = LSF_ResvQueue(name='a500M', memReserve=500)
    queues['a1500M']      = LSF_ResvQueue(name='a1500M', memReserve=1500)
    queues['a3000M']      = LSF_ResvQueue(name='a3000M', memReserve=3000)
    queues['a6000M']      = LSF_ResvQueue(name='a6000M', memReserve=6000)