OAR jobs from python¶
The library provides three main classes:
- oarpy.oarjob.Job: manage existing jobs (status, stop, suspend, resume)
- oarpy.oarjob.JobFactory: define and launch jobs (creates oarpy.oarjob.Job)
- oarpy.oarresource.Resource: optional OAR resources for JobFactory (nodes, cores, gpu)
[1]:
import logging
logging.basicConfig()
def debug(b):
if b:
logging.getLogger('oarpy').setLevel(logging.DEBUG)
else:
logging.getLogger('oarpy').setLevel(logging.INFO)
from time import sleep
from oarpy import oarjob
from oarpy.oarresource import Resource
Monitor jobs¶
Jobs can be monitored based on their job ID:
[2]:
# Invalid job ID
job = oarjob.Job(0)
assert(not job.exists)
# Valid job ID
job = oarjob.Job(1)
# Context manager is optional (reduces queries)
with job.fixed_stats():
if job.exists:
# Specific statistics
print(job['assigned_network_address'])
print(job['assigned_resources'])
print(job['walltime'])
# All statistics
for k,v in job.stats.items():
print('{}: {}'.format(k,v))
# Statistics exposed as attributes
job.is_finished
job.is_running
job.is_waiting
job.is_intermediate
job.needsresume
job.time_to_start
job.time_enqueued
job.time_scheduled
assert(job.status==job['state'])
[u'hpc2-0701']
[2353]
2:00:00
resubmit_job_id: 0
owner: forstner
submissionTime: 2018-08-16 10:16:00+02:00
message: R=1,W=2:0:0,J=I,Q=interactive (Karma=0.000)
jobType: INTERACTIVE
queue: interactive
launchingDirectory: /users/forstner
exit_code: None
properties: ((((desktop_computing = 'NO') AND gpu = 'NO') AND cluster = 'NICE') AND opsys = 'debian8') AND drain='NO'
state: Terminated
stopTime: 2018-08-16 10:16:11+02:00
job_user: forstner
assigned_network_address: [u'hpc2-0701']
walltime: 2:00:00
events: [{u'job_id': 1, u'event_id': u'1', u'date': 1534407371, u'type': u'SWITCH_INTO_TERMINATE_STATE', u'to_check': u'NO', u'description': u'[bipbip 1] Ask to change the job state'}]
array_index: 1
assigned_resources: [2353]
array_id: 1
dependencies: []
startTime: 2018-08-16 10:16:02+02:00
reservation: None
stdout_file: OAR.1.stdout
types: []
Job_Id: 1
cpuset_name: forstner_1
name: None
initial_request:
scheduledStart: None
wanted_resources: -l "{type = 'default'}/core=1,walltime=2:0:0"
project: default
stderr_file: OAR.1.stderr
command:
Jobs can be searched for based on date, name, project, owner and other properties:
[3]:
from oarpy import timeutils
end = timeutils.now()
start = timeutils.add(end,minutes=-10)
jobs = oarjob.search(start=start, end=end)
print(str(len(jobs))+' jobs found')
examples = {}
for job in jobs:
if job.status not in examples:
examples[job.status] = job
for job in examples.values():
with job.fixed_stats():
print(job)
print(' time_to_start: {}'.format(job.time_to_start))
print(' time_enqueued: {}'.format(job.time_enqueued))
print(' time_scheduled: {}'.format(job.time_scheduled))
print(' runtime: {}'.format(job.runtime))
15 jobs found
Job(1124788)
name = PyHST
project = default
state = Terminated
owner = in1081
runtime = 0:00:54
time_to_start: 0:00:00
time_enqueued: 1:46:34.100600
time_scheduled: 0:00:00
runtime: 0:00:54
Job(1125510)
name = None
project = default
state = Running
owner = bona
runtime = 0:02:28.569919
time_to_start: 0:00:00
time_enqueued: 0:00:25
time_scheduled: 0:00:00
runtime: 0:02:28.571296
Job(1122760)
name = None
project = default
state = Waiting
owner = in1096
runtime = 0:00:00
time_to_start: 15:25:03.953672
time_enqueued: 21:21:38
time_scheduled: 0:00:00
runtime: 0:00:00
Define jobs¶
The minimal job definition requires only a shell command
[4]:
jobdef = oarjob.JobFactory(command="ls")
print(jobdef)
-n OAR -O %jobname%.%jobid%.stdout -E %jobname%.%jobid%.stderr ls
A job can be identified by name and project
[5]:
jobdef = oarjob.JobFactory(name='test', project='oarpy', command='ls')
print(jobdef)
-n test --project oarpy -O %jobname%.%jobid%.stdout -E %jobname%.%jobid%.stderr ls
Working and log directories can be specified (current directory by default). These directories must exist.
[6]:
jobdef = oarjob.JobFactory(name='test', project='oarpy', command='ls',
working_directory='/tmp/oarpy', log_directory='/tmp/oarpy/log')
print(jobdef)
-n test --project oarpy -d /tmp/oarpy -O /tmp/oarpy/log/%jobname%.%jobid%.stdout -E /tmp/oarpy/log/%jobname%.%jobid%.stderr ls
A job definition can also specify the resources required to execute the job:
- nodes: number of nodes (default: 1)
- cpu: number of cpu’s per node (default: 1)
- core: number of cores per cpu (default: 1)
- gpu: boolean (default: False)
- mem_core_mb: minimal memory per core (default: 8000 MB)
- walltime: is a number (default: 2 hours) or a dictionary with at least one of keys “days, seconds, minutes, hours, weeks”
- custom properties: e.g. cpu_vendor=(‘=’,’INTEL’)
This starts \(\text{nodes}\times\text{cpu}\times\text{core}\) processes distributed over the specified nodes and cpu’s.
[7]:
resource = Resource(nodes=1,core=8,walltime={'minutes':1})
jobdef = oarjob.JobFactory(name='test', project='oarpy',
command='ls', resource=resource)
print(jobdef)
-n test --project oarpy -O %jobname%.%jobid%.stdout -E %jobname%.%jobid%.stderr -l nodes=1/core=8,walltime=00:01:00 ls
Launch jobs¶
Function to define test jobs
[8]:
def definition(seconds):
command = 'python -c "from time import sleep\nfor i in range({}):\n print(i)\n sleep(1)"'
resource = Resource(core=1,walltime={'seconds':seconds*3})
return oarjob.JobFactory(name='test{}'.format(seconds), project='oarpy',
resource=resource, command=command.format(seconds))
Immediate execution¶
Schedule job for execution, wait until done and show output:
[9]:
if True:
print("Schedule job")
job = definition(5).submit()
print(job)
print("Wait until finished ...")
job.wait()
print(job)
if job.exit_code:
print('Failed:\n{}'.format(job.stderr))
else:
print('Succes:\n{}'.format(job.stdout))
job.remove_logs()
Schedule job
Job(1125513)
name = test5
project = oarpy
state = Waiting
owner = denolf
runtime = 0:00:00
Wait until finished ...
...................................
Job(1125513)
name = test5
project = oarpy
state = Terminated
owner = denolf
runtime = 0:00:29
Succes:
0
1
2
3
4
Postpone execution¶
Enqueue job, wait until enqueued, schedule for execution, wait until done and show output:
[10]:
if True:
print("Enqueue job")
job = definition(5).submit(hold=True)
print(job)
print("Wait until enqueued ...")
job.wait(states='Hold')
print(job)
print("Schedule job")
job.resume()
print("Wait until finished ...")
job.wait()
print(job)
if job.exit_code:
print('Failed:\n{}'.format(job.stderr))
else:
print('Succes:\n{}'.format(job.stdout))
job.remove_logs()
Enqueue job
Job(1125516)
name = test5
project = oarpy
state = Hold
owner = denolf
runtime = 0:00:00
Wait until enqueued ...
Job(1125516)
name = test5
project = oarpy
state = Hold
owner = denolf
runtime = 0:00:00
Schedule job
Wait until finished ...
....................................
Job(1125516)
name = test5
project = oarpy
state = Terminated
owner = denolf
runtime = 0:00:29
Succes:
0
1
2
3
4
Suspend¶
Schedule job for execution, wait until running, suspend/resume, wait until done and show output:
[11]:
if True:
print("Schedule job")
job = definition(60).submit()
print(job)
print("Wait until started ...")
job.wait(states=('Running', 'Terminated', 'Error'))
print(job)
print("Suspend job")
try:
job.suspend()
except RuntimeError:
print("This operation is currently not permitted")
else:
print("Wait until suspended ...")
job.wait_needsresume(states=('Hold', 'Suspended'))
print(job)
print("Resume job")
job.resume()
print("Wait until finished ...")
job.wait()
print(job)
if job.exit_code:
print('Failed:\n{}'.format(job.stderr))
else:
print('Succes:\n{}'.format(job.stdout))
job.remove_logs()
Schedule job
Job(1125518)
name = test60
project = oarpy
state = Waiting
owner = denolf
runtime = 0:00:00
Wait until started ...
.................
Job(1125518)
name = test60
project = oarpy
state = Running
owner = denolf
runtime = 0:00:24.181162
Suspend job
ERROR:root:Cannot suspend job (Jobid=1125518,Error=1,EPERM)
/!\ Cannot hold 1125518 : the job is not in the right state (try '-r' option).
This operation is currently not permitted
Wait until finished ...
...........................................
Job(1125518)
name = test60
project = oarpy
state = Terminated
owner = denolf
runtime = 0:01:23
Succes:
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
Interrupt¶
Schedule job for execution, waiting until running, interrupt, wait until done and show output:
[12]:
if True:
print("Schedule job")
job = definition(60).submit()
print(job)
print("Wait until started ...")
job.wait(states=('Running', 'Terminated', 'Error'))
sleep(5)
print("Interrupt")
job.interrupt()
print("Wait until finished ...")
job.wait()
print(job)
if job.exit_code:
print('Failed:\n{}'.format(job.stderr))
elif job.exit_code is None:
print('Interrupted:\n{}'.format(job.stdout))
else:
print('Succes:\n{}'.format(job.stdout))
job.remove_logs()
Schedule job
Job(1125521)
name = test60
project = oarpy
state = Waiting
owner = denolf
runtime = 0:00:00
Wait until started ...
.........................
Interrupt
Wait until finished ...
................
Job(1125521)
name = test60
project = oarpy
state = Error
owner = denolf
runtime = 0:00:53
Interrupted: