Mixins¶
Mixins come in several flavors:
OmegaStore
mixins, enabling data pre-/post processingMDataFrame
mixins, enabling custom operations on lazy-evaluation dataframesOmegaModelProxy
mixins, enabling tasks to run on the compute clusterMDataFrame, MSeries
mixins, enabling custom operations on MDataFrame and MSeries objectsApplyContext
mixins, enabling custom operations inapply()
contexts
Storage mixins¶
Storage mixins typically override the get
and put
methods
to extend the functionality of backends.
Consider users intend to store plain-text Yaml documents, which is not
natively supported by any of the existing backends. However the default
backend supports storing Python dictionaries, so we could ask the user to
convert the Yaml documents to Pyton dictionaries first, and then use
om.datasets.put
to store the object.
As a convenience to users, we provide this conversion in a storage mixin:
class YamlDataMixin(object):
def put(obj, name, attributes=None, **kwargs):
attributes = attributes or {}
try:
obj = yaml.loads(obj)
except:
pass # assume obj was some other valid type
else:
attributes['as_yaml'] = True
# call the default implementation
return super(YamlDataMixin, self).put(obj, name, attributes=attributes,
**kwargs)
def get(name, **kwargs):
meta = self.metadata(name)
data = super(YamlDataMixin, self).get(name, **kwargs)
if meta.attributes.get('as_yaml'):
data = yaml.puts(obj)
return data
To enable this mixin, call om.datasets.register_mixin
:
# on startup
om.datasets.register_mixin(YamlDataMixin)
Note
Celery clusters require that the module providing YamlDaskMixin is available on both the client and the worker instance. This limitation is planned to be removed in future versions of omega|ml using ccbackend, which provides for arbitrary functions to be executed on a celery cluster. Dask Distributed clusters do not have this limitation.
Runtime mixins¶
Runtime mixins provide client-side extensions to om.runtime, specifically
to OmegaModelProxy
. OmegalModelProxy is responsible for submitting
user-requested functions to the compute cluster.
Consider users want to run a cross-validation procedure in some particular way that is not supported by the default runtime. While they could use a job (notebook) to accomplish this, we provide a runtime mixin as a convenience.
# in crossvalidate.py
class CrossValidationMixin(object):
def cross_validate(modelName, Xname, Yname, *args, **kwargs):
# get the cross validation task
task = self.runtime.task('custom.tasks.cross_validate')
return task.delay(modelName, Xname, Yname, *args, **kwargs)
# in custom.tasks
def cross_validate(modelName, Xname, Yname, *args, **kwargs):
# get model and data
model = om.models.get(modelName)
xdata = om.datasets.get(Xname)
ydata = om.datasets.get(Yname)
# perform cross validation
results = ...
#
return results
To enable this mixin, add the class to om.defaults.OMEGA_RUNTIME_MIXINS
:
OMEGA_STORE_MIXINS = [
'crossvalidate.CrossValidationMixin',
]
Note
Celery clusters require that the custom.tasks module is available on both the client and the worker instance. This limitation is planned to be removed in future versions of omega|ml using ccbackend, which provides for arbitrary functions to be executed on a celery cluster. Dask Distributed clusters do not have this limitation.