{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# OAR jobs from python\n", "\n", "The library provides three main classes:\n", "\n", " * oarpy.oarjob.Job: manage existing jobs (status, stop, suspend, resume)\n", " * oarpy.oarjob.JobFactory: define and launch jobs (creates oarpy.oarjob.Job)\n", " * oarpy.oarresource.Resource: optional OAR resources for JobFactory (nodes, cores, gpu)" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import logging\n", "logging.basicConfig()\n", "def debug(b):\n", " if b:\n", " logging.getLogger('oarpy').setLevel(logging.DEBUG)\n", " else:\n", " logging.getLogger('oarpy').setLevel(logging.INFO)\n", "\n", "from time import sleep\n", "from oarpy import oarjob\n", "from oarpy.oarresource import Resource" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Monitor jobs" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Jobs can be monitored based on their job ID:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[u'hpc2-0701']\n", "[2353]\n", "2:00:00\n", "resubmit_job_id: 0\n", "owner: forstner\n", "submissionTime: 2018-08-16 10:16:00+02:00\n", "message: R=1,W=2:0:0,J=I,Q=interactive (Karma=0.000)\n", "jobType: INTERACTIVE\n", "queue: interactive\n", "launchingDirectory: /users/forstner\n", "exit_code: None\n", "properties: ((((desktop_computing = 'NO') AND gpu = 'NO') AND cluster = 'NICE') AND opsys = 'debian8') AND drain='NO'\n", "state: Terminated\n", "stopTime: 2018-08-16 10:16:11+02:00\n", "job_user: forstner\n", "assigned_network_address: [u'hpc2-0701']\n", "walltime: 2:00:00\n", "events: [{u'job_id': 1, u'event_id': u'1', u'date': 1534407371, u'type': u'SWITCH_INTO_TERMINATE_STATE', u'to_check': u'NO', u'description': u'[bipbip 1] Ask to change the job state'}]\n", "array_index: 1\n", "assigned_resources: [2353]\n", "array_id: 1\n", "dependencies: []\n", "startTime: 2018-08-16 10:16:02+02:00\n", "reservation: None\n", "stdout_file: OAR.1.stdout\n", "types: []\n", "Job_Id: 1\n", "cpuset_name: forstner_1\n", "name: None\n", "initial_request: \n", "scheduledStart: None\n", "wanted_resources: -l \"{type = 'default'}/core=1,walltime=2:0:0\" \n", "project: default\n", "stderr_file: OAR.1.stderr\n", "command: \n" ] } ], "source": [ "# Invalid job ID\n", "job = oarjob.Job(0)\n", "assert(not job.exists)\n", "\n", "# Valid job ID\n", "job = oarjob.Job(1)\n", "\n", "# Context manager is optional (reduces queries)\n", "with job.fixed_stats():\n", " if job.exists:\n", " # Specific statistics\n", " print(job['assigned_network_address'])\n", " print(job['assigned_resources'])\n", " print(job['walltime'])\n", "\n", " # All statistics\n", " for k,v in job.stats.items():\n", " print('{}: {}'.format(k,v))\n", "\n", " # Statistics exposed as attributes\n", " job.is_finished\n", " job.is_running\n", " job.is_waiting\n", " job.is_intermediate\n", " job.needsresume\n", " job.time_to_start\n", " job.time_enqueued\n", " job.time_scheduled\n", " assert(job.status==job['state'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Jobs can be searched for based on date, name, project, owner and other properties:" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "15 jobs found\n", "Job(1124788)\n", " name = PyHST\n", " project = default\n", " state = Terminated\n", " owner = in1081\n", " runtime = 0:00:54\n", " time_to_start: 0:00:00\n", " time_enqueued: 1:46:34.100600\n", " time_scheduled: 0:00:00\n", " runtime: 0:00:54\n", "Job(1125510)\n", " name = None\n", " project = default\n", " state = Running\n", " owner = bona\n", " runtime = 0:02:28.569919\n", " time_to_start: 0:00:00\n", " time_enqueued: 0:00:25\n", " time_scheduled: 0:00:00\n", " runtime: 0:02:28.571296\n", "Job(1122760)\n", " name = None\n", " project = default\n", " state = Waiting\n", " owner = in1096\n", " runtime = 0:00:00\n", " time_to_start: 15:25:03.953672\n", " time_enqueued: 21:21:38\n", " time_scheduled: 0:00:00\n", " runtime: 0:00:00\n" ] } ], "source": [ "from oarpy import timeutils\n", "\n", "end = timeutils.now()\n", "start = timeutils.add(end,minutes=-10)\n", "jobs = oarjob.search(start=start, end=end)\n", "\n", "print(str(len(jobs))+' jobs found')\n", "\n", "examples = {}\n", "for job in jobs:\n", " if job.status not in examples:\n", " examples[job.status] = job\n", "\n", "for job in examples.values():\n", " with job.fixed_stats():\n", " print(job)\n", " print(' time_to_start: {}'.format(job.time_to_start))\n", " print(' time_enqueued: {}'.format(job.time_enqueued))\n", " print(' time_scheduled: {}'.format(job.time_scheduled))\n", " print(' runtime: {}'.format(job.runtime))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Define jobs\n", "The minimal job definition requires only a shell command" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "-n OAR -O %jobname%.%jobid%.stdout -E %jobname%.%jobid%.stderr ls\n" ] } ], "source": [ "jobdef = oarjob.JobFactory(command=\"ls\")\n", "print(jobdef)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A job can be identified by name and project" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "-n test --project oarpy -O %jobname%.%jobid%.stdout -E %jobname%.%jobid%.stderr ls\n" ] } ], "source": [ "jobdef = oarjob.JobFactory(name='test', project='oarpy', command='ls')\n", "print(jobdef)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Working and log directories can be specified (current directory by default). These directories must exist." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "-n test --project oarpy -d /tmp/oarpy -O /tmp/oarpy/log/%jobname%.%jobid%.stdout -E /tmp/oarpy/log/%jobname%.%jobid%.stderr ls\n" ] } ], "source": [ "jobdef = oarjob.JobFactory(name='test', project='oarpy', command='ls',\n", " working_directory='/tmp/oarpy', log_directory='/tmp/oarpy/log')\n", "print(jobdef)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A job definition can also specify the resources required to execute the job:\n", "\n", "* nodes: number of nodes (default: 1)\n", "* cpu: number of cpu's per node (default: 1)\n", "* core: number of cores per cpu (default: 1)\n", "* gpu: boolean (default: False)\n", "* mem_core_mb: minimal memory per core (default: 8000 MB)\n", "* walltime: is a number (default: 2 hours) or a dictionary with at least one of keys \"days, seconds, minutes, hours, weeks\"\n", "* custom properties: e.g. cpu_vendor=('=','INTEL')\n", "\n", "This starts $\\text{nodes}\\times\\text{cpu}\\times\\text{core}$ processes distributed over the specified nodes and cpu's." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "-n test --project oarpy -O %jobname%.%jobid%.stdout -E %jobname%.%jobid%.stderr -l nodes=1/core=8,walltime=00:01:00 ls\n" ] } ], "source": [ "resource = Resource(nodes=1,core=8,walltime={'minutes':1})\n", "jobdef = oarjob.JobFactory(name='test', project='oarpy',\n", " command='ls', resource=resource)\n", "print(jobdef)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Launch jobs\n", "Function to define test jobs" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "def definition(seconds):\n", " command = 'python -c \"from time import sleep\\nfor i in range({}):\\n print(i)\\n sleep(1)\"'\n", " resource = Resource(core=1,walltime={'seconds':seconds*3})\n", " return oarjob.JobFactory(name='test{}'.format(seconds), project='oarpy',\n", " resource=resource, command=command.format(seconds))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Immediate execution\n", "Schedule job for execution, wait until done and show output:" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Schedule job\n", "Job(1125513)\n", " name = test5\n", " project = oarpy\n", " state = Waiting\n", " owner = denolf\n", " runtime = 0:00:00\n", "Wait until finished ...\n", "...................................\n", "Job(1125513)\n", " name = test5\n", " project = oarpy\n", " state = Terminated\n", " owner = denolf\n", " runtime = 0:00:29\n", "Succes:\n", "0\n", "1\n", "2\n", "3\n", "4\n", "\n" ] } ], "source": [ "if True:\n", " print(\"Schedule job\")\n", " job = definition(5).submit()\n", " print(job)\n", " print(\"Wait until finished ...\")\n", " job.wait()\n", " print(job)\n", " if job.exit_code:\n", " print('Failed:\\n{}'.format(job.stderr))\n", " else:\n", " print('Succes:\\n{}'.format(job.stdout))\n", " job.remove_logs()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Postpone execution\n", "\n", "Enqueue job, wait until enqueued, schedule for execution, wait until done and show output:" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Enqueue job\n", "Job(1125516)\n", " name = test5\n", " project = oarpy\n", " state = Hold\n", " owner = denolf\n", " runtime = 0:00:00\n", "Wait until enqueued ...\n", "Job(1125516)\n", " name = test5\n", " project = oarpy\n", " state = Hold\n", " owner = denolf\n", " runtime = 0:00:00\n", "Schedule job\n", "Wait until finished ...\n", "....................................\n", "Job(1125516)\n", " name = test5\n", " project = oarpy\n", " state = Terminated\n", " owner = denolf\n", " runtime = 0:00:29\n", "Succes:\n", "0\n", "1\n", "2\n", "3\n", "4\n", "\n" ] } ], "source": [ "if True:\n", " print(\"Enqueue job\")\n", " job = definition(5).submit(hold=True)\n", " print(job)\n", " print(\"Wait until enqueued ...\")\n", " job.wait(states='Hold')\n", " print(job)\n", " print(\"Schedule job\")\n", " job.resume()\n", " print(\"Wait until finished ...\")\n", " job.wait()\n", " print(job)\n", " if job.exit_code:\n", " print('Failed:\\n{}'.format(job.stderr))\n", " else:\n", " print('Succes:\\n{}'.format(job.stdout))\n", " job.remove_logs()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Suspend\n", "Schedule job for execution, wait until running, suspend/resume, wait until done and show output:" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Schedule job\n", "Job(1125518)\n", " name = test60\n", " project = oarpy\n", " state = Waiting\n", " owner = denolf\n", " runtime = 0:00:00\n", "Wait until started ...\n", ".................\n", "Job(1125518)\n", " name = test60\n", " project = oarpy\n", " state = Running\n", " owner = denolf\n", " runtime = 0:00:24.181162\n", "Suspend job\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "ERROR:root:Cannot suspend job (Jobid=1125518,Error=1,EPERM)\n", "/!\\ Cannot hold 1125518 : the job is not in the right state (try '-r' option).\n", "\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "This operation is currently not permitted\n", "Wait until finished ...\n", "...........................................\n", "Job(1125518)\n", " name = test60\n", " project = oarpy\n", " state = Terminated\n", " owner = denolf\n", " runtime = 0:01:23\n", "Succes:\n", "0\n", "1\n", "2\n", "3\n", "4\n", "5\n", "6\n", "7\n", "8\n", "9\n", "10\n", "11\n", "12\n", "13\n", "14\n", "15\n", "16\n", "17\n", "18\n", "19\n", "20\n", "21\n", "22\n", "23\n", "24\n", "25\n", "26\n", "27\n", "28\n", "29\n", "30\n", "31\n", "32\n", "33\n", "34\n", "35\n", "36\n", "37\n", "38\n", "39\n", "40\n", "41\n", "42\n", "43\n", "44\n", "45\n", "46\n", "47\n", "48\n", "49\n", "50\n", "51\n", "52\n", "53\n", "54\n", "55\n", "56\n", "57\n", "58\n", "59\n", "\n" ] } ], "source": [ "if True:\n", " print(\"Schedule job\")\n", " job = definition(60).submit()\n", " print(job)\n", " print(\"Wait until started ...\")\n", " job.wait(states=('Running', 'Terminated', 'Error'))\n", " print(job)\n", " print(\"Suspend job\")\n", " try:\n", " job.suspend()\n", " except RuntimeError:\n", " print(\"This operation is currently not permitted\")\n", " else:\n", " print(\"Wait until suspended ...\")\n", " job.wait_needsresume(states=('Hold', 'Suspended'))\n", " print(job)\n", " print(\"Resume job\")\n", " job.resume()\n", " print(\"Wait until finished ...\")\n", " job.wait()\n", " print(job)\n", " if job.exit_code:\n", " print('Failed:\\n{}'.format(job.stderr))\n", " else:\n", " print('Succes:\\n{}'.format(job.stdout))\n", " job.remove_logs()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Interrupt\n", "\n", "Schedule job for execution, waiting until running, interrupt, wait until done and show output:" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Schedule job\n", "Job(1125521)\n", " name = test60\n", " project = oarpy\n", " state = Waiting\n", " owner = denolf\n", " runtime = 0:00:00\n", "Wait until started ...\n", ".........................\n", "Interrupt\n", "Wait until finished ...\n", "................\n", "Job(1125521)\n", " name = test60\n", " project = oarpy\n", " state = Error\n", " owner = denolf\n", " runtime = 0:00:53\n", "Interrupted:\n", "\n" ] } ], "source": [ "if True:\n", " print(\"Schedule job\")\n", " job = definition(60).submit()\n", " print(job)\n", " print(\"Wait until started ...\")\n", " job.wait(states=('Running', 'Terminated', 'Error'))\n", " sleep(5)\n", " print(\"Interrupt\")\n", " job.interrupt()\n", " print(\"Wait until finished ...\")\n", " job.wait()\n", " print(job)\n", " if job.exit_code:\n", " print('Failed:\\n{}'.format(job.stderr))\n", " elif job.exit_code is None:\n", " print('Interrupted:\\n{}'.format(job.stdout))\n", " else:\n", " print('Succes:\\n{}'.format(job.stdout))\n", " job.remove_logs()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.9" } }, "nbformat": 4, "nbformat_minor": 2 }