What is the runtime?

The omega-ml runtime provides distributed and highly scalable execution of all of the omega-ml functionality. It is a distributed programming and deployment environment based on the widely used Celery library.

Key functionality includes

  • distributed model training, scoring and prediction

  • tracking of experiment metrics across a distributed set of workers

  • straight-forward logging from any work load, including models, Python scripts and notebooks

  • parallel, map-reduce and sequenced execution of data and model pipelines

  • easily switch workers without changing your code (e.g. local, remote cpu, remote gpu, any cloud)

For Jupyter notebooks, the runtime provides additional functionality:

  • easy scheduling of notebooks

  • transparent results logging and status tracking

  • parallel execution using Multiprocessing.map() semantics, including automated restarts of failed partial tasks

Concepts

  • workers - a worker is a compute server that waits to receive commands, every worker responds to one or more labels, where a label specifies a worker’s capabilities (e.g. whether it provides cpu or gpu resources)

  • tasks - tasks describe the specific action to execute, e.g. fit a model, predict from a model, run a script, run a notebook

  • object references - references link tasks to models, datasets and other objects accessible via omega-ml’s Metadata

For example, to fit a model, we can use the omega-ml runtime as follows. This sends the task fit the model mymodel using datasets data-X, data-Y to the default runtime worker, waiting for the task to complete.

# mymodel is a fitted model, newdata the name of a dataset
result = om.runtime.model('mymodel').fit('data-X', 'data-Y')
result.get()

Running tasks

The runtime provides built-in tasks for models, jobs (notebooks) and scripts. In general the syntax follows the pattern om.runtime.<kind>(name).<action>. Every task returns as AsyncResult result object, which is a reference to the result of the task execution on a remote worker described in Asynchronous execution. To get the actual result, call result.get()

result = om.runtime.model('mymodel').fit(*args, **kwargs)
result = om.runtime.script('myscript').run(*args, **kwargs)
result = om.runtime.job('mynotebook').run(*args, **kwargs)

Submitting tasks to specific workers

Every worker is assigned one or more labels. A label is just an arbitrary name but it should signify the worker’s capabilities, e.g. cpu or gpu. The default label is default.

The list of available workers and their labels can be retrieved by running

om.runtime.labels()
=>
{'celery@eowyn': ['default']}

Tasks can be submitted to a specific worker by specifying .require(label) just before the actual call:

om.runtime.require('gpu').model('mymodel').fit(*args, **kwargs)

Asynchronous execution

All runtime tasks are run asynchronously. This means that any task submitted to a runtime worker is put into a waiting queue until a worker can respond to it. The immediate result returned by a call to the runtime is a reference to the task, also known as a promise i.e. a reference to a future result.

result = om.runtime.model('mymodel').predict('new-X')
type(result)
=> <AsyncResult: eda3b2f9-f675-4690-8303-2a944783147c>

We can check the execution state by looking at the result.status. The states are PENDING, STARTED, SUCCESS or FAILURE.

result.status
=>
PENDING

To wait for the task to complete and get back the actual result call result.get():

result.get()
=>
[5, 10, 11, 15]

Parallel and pipelined task execution

The runtime is built for horizontal scalability, which means it can process many tasks in parallel. One easy way to submit tasks in parallel is to call the runtime in a loop. One caveat is that we need to keep track of every result’s status.

results = []
for i in range(5):
    result = om.runtime.job(f'myjob{i}').run(i)
    results.append(result)

while not done:
    done = all(r.status == 'SUCCESS' for r in results)

print(results)

omega-ml provides easier semantics for the three typical ways in which to run many tasks:

  • sequence - run tasks in a given sequence

  • parallel - run tasks in parallel, independent of sequence

  • mapreduce - run many tasks in parallel, combine results in a last step

Running many tasks in sequence

sequence() runs tasks in sequence, forwarding results from the previous task to the next.

with om.runtime.sequence() as crt:
    for i in range(5):
        om.runtime.job(f'myjob{i}').run(i)
    result = crt.run()
result.getall()
=>
['<Metadata: Metadata(name=results/myjob_2021-11-25 14:02:14.690974.ipynb,bucket=omegaml,prefix=jobs/,kind=script.ipynb,created=2021-11-25 14:02:15.814244)>',
 '<Metadata: Metadata(name=results/myjob_2021-11-25 14:02:12.101315.ipynb,bucket=omegaml,prefix=jobs/,kind=script.ipynb,created=2021-11-25 14:02:13.185000)>',
 '<Metadata: Metadata(name=results/myjob_2021-11-25 14:02:13.247192.ipynb,bucket=omegaml,prefix=jobs/,kind=script.ipynb,created=2021-11-25 14:02:14.605521)>',
 '<Metadata: Metadata(name=results/myjob_2021-11-25 14:02:15.899301.ipynb,bucket=omegaml,prefix=jobs/,kind=script.ipynb,created=2021-11-25 14:02:17.157619)>',
 '<Metadata: Metadata(name=results/myjob_2021-11-25 14:02:10.690568.ipynb,bucket=omegaml,prefix=jobs/,kind=script.ipynb,created=2021-11-25 14:02:12.037948)>']

Running many tasks in parallel

parallel() runs many tasks in parallel.

with om.runtime.parallel() as crt:
    for i in range(5):
        om.runtime.job(f'myjob{i}').run(i)
    result = crt.run()
result.getall()
=>
['<Metadata: Metadata(name=results/myjob_2021-11-25 14:02:14.690974.ipynb,bucket=omegaml,prefix=jobs/,kind=script.ipynb,created=2021-11-25 14:02:15.814244)>',
 '<Metadata: Metadata(name=results/myjob_2021-11-25 14:02:12.101315.ipynb,bucket=omegaml,prefix=jobs/,kind=script.ipynb,created=2021-11-25 14:02:13.185000)>',
 '<Metadata: Metadata(name=results/myjob_2021-11-25 14:02:13.247192.ipynb,bucket=omegaml,prefix=jobs/,kind=script.ipynb,created=2021-11-25 14:02:14.605521)>',
 '<Metadata: Metadata(name=results/myjob_2021-11-25 14:02:15.899301.ipynb,bucket=omegaml,prefix=jobs/,kind=script.ipynb,created=2021-11-25 14:02:17.157619)>',
 '<Metadata: Metadata(name=results/myjob_2021-11-25 14:02:10.690568.ipynb,bucket=omegaml,prefix=jobs/,kind=script.ipynb,created=2021-11-25 14:02:12.037948)>']

Running many tasks to combine results (mapreduce)

mapreduce runs tasks in parallel, except for the last one. The last task will wait for all parallel jobs to be completed and then runs to combine the previous results.

with om.runtime.mapreduce() as crt:
    for i in range(5):
        om.runtime.job(f'myjob{i}').run(i)
    result = crt.run()
result.collect()
=>
{<AsyncResult: 33e7baf0-1905-4ff4-aecb-8d6ee43fd9b1>,
 <GroupResult: a42f9d7b-3c36-456c-9f13-23acda3c1ae0 [b6f67c16-3a98-474c-b97a-544b7bb20291,
  fa95a436-fc70-46ad-97ee-a31a9a3a5720, 78e060e8-59d9-4244-a2db-10f18a49a0c0,
  541761d9-6c56-4a9e-a815-3959b01609ae]>}