OAR jobs from python

The library provides three main classes:

  • oarpy.oarjob.Job: manage existing jobs (status, stop, suspend, resume)
  • oarpy.oarjob.JobFactory: define and launch jobs (creates oarpy.oarjob.Job)
  • oarpy.oarresource.Resource: optional OAR resources for JobFactory (nodes, cores, gpu)
[1]:
import logging
logging.basicConfig()
def debug(b):
    if b:
        logging.getLogger('oarpy').setLevel(logging.DEBUG)
    else:
        logging.getLogger('oarpy').setLevel(logging.INFO)

from time import sleep
from oarpy import oarjob
from oarpy.oarresource import Resource

Monitor jobs

Jobs can be monitored based on their job ID:

[2]:
# Invalid job ID
job = oarjob.Job(0)
assert(not job.exists)

# Valid job ID
job = oarjob.Job(1)

# Context manager is optional (reduces queries)
with job.fixed_stats():
    if job.exists:
        # Specific statistics
        print(job['assigned_network_address'])
        print(job['assigned_resources'])
        print(job['walltime'])

        # All statistics
        for k,v in job.stats.items():
            print('{}: {}'.format(k,v))

        # Statistics exposed as attributes
        job.is_finished
        job.is_running
        job.is_waiting
        job.is_intermediate
        job.needsresume
        job.time_to_start
        job.time_enqueued
        job.time_scheduled
        assert(job.status==job['state'])
[u'hpc2-0701']
[2353]
2:00:00
resubmit_job_id: 0
owner: forstner
submissionTime: 2018-08-16 10:16:00+02:00
message: R=1,W=2:0:0,J=I,Q=interactive (Karma=0.000)
jobType: INTERACTIVE
queue: interactive
launchingDirectory: /users/forstner
exit_code: None
properties: ((((desktop_computing = 'NO') AND gpu = 'NO') AND cluster = 'NICE') AND opsys = 'debian8') AND drain='NO'
state: Terminated
stopTime: 2018-08-16 10:16:11+02:00
job_user: forstner
assigned_network_address: [u'hpc2-0701']
walltime: 2:00:00
events: [{u'job_id': 1, u'event_id': u'1', u'date': 1534407371, u'type': u'SWITCH_INTO_TERMINATE_STATE', u'to_check': u'NO', u'description': u'[bipbip 1] Ask to change the job state'}]
array_index: 1
assigned_resources: [2353]
array_id: 1
dependencies: []
startTime: 2018-08-16 10:16:02+02:00
reservation: None
stdout_file: OAR.1.stdout
types: []
Job_Id: 1
cpuset_name: forstner_1
name: None
initial_request:
scheduledStart: None
wanted_resources: -l "{type = 'default'}/core=1,walltime=2:0:0"
project: default
stderr_file: OAR.1.stderr
command:

Jobs can be searched for based on date, name, project, owner and other properties:

[3]:
from oarpy import timeutils

end = timeutils.now()
start = timeutils.add(end,minutes=-10)
jobs = oarjob.search(start=start, end=end)

print(str(len(jobs))+' jobs found')

examples = {}
for job in jobs:
    if job.status not in examples:
        examples[job.status] = job

for job in examples.values():
    with job.fixed_stats():
        print(job)
        print(' time_to_start: {}'.format(job.time_to_start))
        print(' time_enqueued: {}'.format(job.time_enqueued))
        print(' time_scheduled: {}'.format(job.time_scheduled))
        print(' runtime: {}'.format(job.runtime))
15 jobs found
Job(1124788)
 name = PyHST
 project = default
 state = Terminated
 owner = in1081
 runtime = 0:00:54
 time_to_start: 0:00:00
 time_enqueued: 1:46:34.100600
 time_scheduled: 0:00:00
 runtime: 0:00:54
Job(1125510)
 name = None
 project = default
 state = Running
 owner = bona
 runtime = 0:02:28.569919
 time_to_start: 0:00:00
 time_enqueued: 0:00:25
 time_scheduled: 0:00:00
 runtime: 0:02:28.571296
Job(1122760)
 name = None
 project = default
 state = Waiting
 owner = in1096
 runtime = 0:00:00
 time_to_start: 15:25:03.953672
 time_enqueued: 21:21:38
 time_scheduled: 0:00:00
 runtime: 0:00:00

Define jobs

The minimal job definition requires only a shell command

[4]:
jobdef = oarjob.JobFactory(command="ls")
print(jobdef)
-n OAR -O %jobname%.%jobid%.stdout -E %jobname%.%jobid%.stderr ls

A job can be identified by name and project

[5]:
jobdef = oarjob.JobFactory(name='test', project='oarpy', command='ls')
print(jobdef)
-n test --project oarpy -O %jobname%.%jobid%.stdout -E %jobname%.%jobid%.stderr ls

Working and log directories can be specified (current directory by default). These directories must exist.

[6]:
jobdef = oarjob.JobFactory(name='test', project='oarpy', command='ls',
                           working_directory='/tmp/oarpy', log_directory='/tmp/oarpy/log')
print(jobdef)
-n test --project oarpy -d /tmp/oarpy -O /tmp/oarpy/log/%jobname%.%jobid%.stdout -E /tmp/oarpy/log/%jobname%.%jobid%.stderr ls

A job definition can also specify the resources required to execute the job:

  • nodes: number of nodes (default: 1)
  • cpu: number of cpu’s per node (default: 1)
  • core: number of cores per cpu (default: 1)
  • gpu: boolean (default: False)
  • mem_core_mb: minimal memory per core (default: 8000 MB)
  • walltime: is a number (default: 2 hours) or a dictionary with at least one of keys “days, seconds, minutes, hours, weeks”
  • custom properties: e.g. cpu_vendor=(‘=’,’INTEL’)

This starts \(\text{nodes}\times\text{cpu}\times\text{core}\) processes distributed over the specified nodes and cpu’s.

[7]:
resource = Resource(nodes=1,core=8,walltime={'minutes':1})
jobdef = oarjob.JobFactory(name='test', project='oarpy',
                           command='ls', resource=resource)
print(jobdef)
-n test --project oarpy -O %jobname%.%jobid%.stdout -E %jobname%.%jobid%.stderr -l nodes=1/core=8,walltime=00:01:00 ls

Launch jobs

Function to define test jobs

[8]:
def definition(seconds):
    command = 'python -c "from time import sleep\nfor i in range({}):\n print(i)\n sleep(1)"'
    resource = Resource(core=1,walltime={'seconds':seconds*3})
    return oarjob.JobFactory(name='test{}'.format(seconds), project='oarpy',
                             resource=resource, command=command.format(seconds))

Immediate execution

Schedule job for execution, wait until done and show output:

[9]:
if True:
    print("Schedule job")
    job = definition(5).submit()
    print(job)
    print("Wait until finished ...")
    job.wait()
    print(job)
    if job.exit_code:
        print('Failed:\n{}'.format(job.stderr))
    else:
        print('Succes:\n{}'.format(job.stdout))
    job.remove_logs()
Schedule job
Job(1125513)
 name = test5
 project = oarpy
 state = Waiting
 owner = denolf
 runtime = 0:00:00
Wait until finished ...
...................................
Job(1125513)
 name = test5
 project = oarpy
 state = Terminated
 owner = denolf
 runtime = 0:00:29
Succes:
0
1
2
3
4

Postpone execution

Enqueue job, wait until enqueued, schedule for execution, wait until done and show output:

[10]:
if True:
    print("Enqueue job")
    job = definition(5).submit(hold=True)
    print(job)
    print("Wait until enqueued ...")
    job.wait(states='Hold')
    print(job)
    print("Schedule job")
    job.resume()
    print("Wait until finished ...")
    job.wait()
    print(job)
    if job.exit_code:
        print('Failed:\n{}'.format(job.stderr))
    else:
        print('Succes:\n{}'.format(job.stdout))
    job.remove_logs()
Enqueue job
Job(1125516)
 name = test5
 project = oarpy
 state = Hold
 owner = denolf
 runtime = 0:00:00
Wait until enqueued ...
Job(1125516)
 name = test5
 project = oarpy
 state = Hold
 owner = denolf
 runtime = 0:00:00
Schedule job
Wait until finished ...
....................................
Job(1125516)
 name = test5
 project = oarpy
 state = Terminated
 owner = denolf
 runtime = 0:00:29
Succes:
0
1
2
3
4

Suspend

Schedule job for execution, wait until running, suspend/resume, wait until done and show output:

[11]:
if True:
    print("Schedule job")
    job = definition(60).submit()
    print(job)
    print("Wait until started ...")
    job.wait(states=('Running', 'Terminated', 'Error'))
    print(job)
    print("Suspend job")
    try:
        job.suspend()
    except RuntimeError:
        print("This operation is currently not permitted")
    else:
        print("Wait until suspended ...")
        job.wait_needsresume(states=('Hold', 'Suspended'))
        print(job)
        print("Resume job")
        job.resume()
    print("Wait until finished ...")
    job.wait()
    print(job)
    if job.exit_code:
        print('Failed:\n{}'.format(job.stderr))
    else:
        print('Succes:\n{}'.format(job.stdout))
    job.remove_logs()
Schedule job
Job(1125518)
 name = test60
 project = oarpy
 state = Waiting
 owner = denolf
 runtime = 0:00:00
Wait until started ...
.................
Job(1125518)
 name = test60
 project = oarpy
 state = Running
 owner = denolf
 runtime = 0:00:24.181162
Suspend job
ERROR:root:Cannot suspend job (Jobid=1125518,Error=1,EPERM)
/!\ Cannot hold 1125518 : the job is not in the right state (try '-r' option).

This operation is currently not permitted
Wait until finished ...
...........................................
Job(1125518)
 name = test60
 project = oarpy
 state = Terminated
 owner = denolf
 runtime = 0:01:23
Succes:
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

Interrupt

Schedule job for execution, waiting until running, interrupt, wait until done and show output:

[12]:
if True:
    print("Schedule job")
    job = definition(60).submit()
    print(job)
    print("Wait until started ...")
    job.wait(states=('Running', 'Terminated', 'Error'))
    sleep(5)
    print("Interrupt")
    job.interrupt()
    print("Wait until finished ...")
    job.wait()
    print(job)
    if job.exit_code:
        print('Failed:\n{}'.format(job.stderr))
    elif job.exit_code is None:
        print('Interrupted:\n{}'.format(job.stdout))
    else:
        print('Succes:\n{}'.format(job.stdout))
    job.remove_logs()
Schedule job
Job(1125521)
 name = test60
 project = oarpy
 state = Waiting
 owner = denolf
 runtime = 0:00:00
Wait until started ...
.........................
Interrupt
Wait until finished ...
................
Job(1125521)
 name = test60
 project = oarpy
 state = Error
 owner = denolf
 runtime = 0:00:53
Interrupted: