This workbook will demonstrate four different ways to to submit a bag (or cluster) of jobs to HTCondor using the Python bindings. All jobs in the bag will share the same cluster id, and will be submitted efficiently to the schedd in just one transaction. To make things interesting, the bag will contain one job for each file found in the current working directory. The file name will be passed as an argument to the job.
NOTE: These examples assume you are running version 8.7.10 or higher of the HTCondor Python Bindings.
import os
import htcondor
print htcondor.version()
schedd = htcondor.Schedd()
Each example will result in the same set of jobs being submitted, one job per file. The jobs are submitted on Hold as we don't actually want them to run for this demonstration. Also, each job is tagged with an attribute SubmitExample set to True, which allows us to later view and remove just the jobs submitted by this notebook.
Example One. In this example, we pass a string to the Submit() constructor. The string should be a valid HTCondor JDL (job description language); the format for this JDL is exactly the same as what you would use in a condor_submit file (although there are no files involved here). Documentation on the HTCondor JDL can be found in the condor_submit man page.
# Example One: submit via JDL string
sub = htcondor.Submit('''
Executable = ExampleOne.exe
Hold = True
+SubmitExample = True
queue Arguments matching files *
''')
with schedd.transaction() as txn:
sub.queue(txn)
Example Two. Instead of using a JDL string, we create an empty Submit() object and then add attributes one at a time.
# Example Two: submit by adding attributes to the Submit() object
sub = htcondor.Submit()
sub['Executable']='ExampleTwo.exe'
sub['Hold']='True'
sub['+SubmitExample']='True'
sub['Arguments']='$(Item)'
sub.setQArgs('matching files *')
with schedd.transaction() as txn:
sub.queue(txn)
Example Three. The HTCondor Submit() JDL is able to submit a bag of jobs of a size specified by an interger, or it can submit a job per file, per directory, or per line in a file. But perhaps you want to submit one job per something else? The queue_with_itemdata() method allows you to pass in an arbitrary Python iterator, where each item returned by the iterator should be a dictionary consisting of key/value pairs. Each item returned by the iterator will result in a job submission with that items key/values inserted. In the below example, instead of having HTCondor scan the directory for files, we create a Python iterator that will return a {'Arguments' : filename} item for each file in the current directory, and use this custom iterator to create the jobs.
# Example Three: submit one job per item from a Python iterator
sub = htcondor.Submit()
sub['Executable']='ExampleThree.exe'
sub['Hold']='True'
sub['+SubmitExample']='True'
files=[{'Arguments':f} for f in os.listdir('.') if os.path.isfile(f)]
with schedd.transaction() as txn:
sub.queue_with_itemdata(txn,1,iter(files))
Example Four. In Example Four we create a bag of jobs by using a for loop instead of a Python iterator.
# Example Four: submit procedurally via a for-loop
sub = htcondor.Submit()
sub['Executable']='ExampleFour.exe'
sub['Hold']='True'
sub['+SubmitExample']='True'
files=[str(f) for f in os.listdir('.') if os.path.isfile(f)]
with schedd.transaction() as txn:
for f in files:
sub['Arguments']=f
sub.queue(txn)
# Print a column header
print('{:10} {:18} {}'.format('Job','Cmd','Args'))
# Query the schedd with a constraint and projection of a few attributes
q = schedd.query(constraint='SubmitExample == True', attr_list=['ClusterId','ProcId','Cmd','Args'])
# Sort the resulting list of ClassAds using a key that is a tuple of ClusterID and ProcID
q.sort( key = lambda x: (x['ClusterId'],x['ProcId']) )
# Print a line for each job
for jobad in q: print('{:10} {:18} {}'.format(
'{}.{}'.format(jobad['ClusterId'],jobad['ProcId']),
os.path.basename(jobad['Cmd']),
jobad['Args']))
# Remove all jobs submitted by me that have attribute SubmitExample == True
result = schedd.act(htcondor.JobAction.Remove,'SubmitExample == True')
if result['TotalError']:
print('Error removing jobs!')
else:
print('{} jobs removed'.format(result['TotalSuccess']))