Mixins

Mixins come in several flavors:

  • OmegaStore mixins, enabling data pre-/post processing
  • MDataFrame mixins, enabling custom operations on lazy-evaluation dataframes
  • OmegaModelProxy mixins, enabling tasks to run on the compute cluster
  • MDataFrame, MSeries mixins, enabling custom operations on MDataFrame and MSeries objects
  • ApplyContext mixins, enabling custom operations in apply() contexts

Storage mixins

Storage mixins typically override the get and put methods to extend the functionality of backends.

Consider users intend to store plain-text Yaml documents, which is not natively supported by any of the existing backends. However the default backend supports storing Python dictionaries, so we could ask the user to convert the Yaml documents to Pyton dictionaries first, and then use om.datasets.put to store the object.

As a convenience to users, we provide this conversion in a storage mixin:

class YamlDataMixin(object):
   def put(obj, name, attributes=None, **kwargs):
       attributes = attributes or {}
       try:
          obj = yaml.loads(obj)
       except:
          pass # assume obj was some other valid type
       else:
          attributes['as_yaml'] = True
       # call the default implementation
       return super(YamlDataMixin, self).put(obj, name, attributes=attributes,
                                            **kwargs)

   def get(name, **kwargs):
       meta = self.metadata(name)
       data = super(YamlDataMixin, self).get(name, **kwargs)
       if meta.attributes.get('as_yaml'):
           data = yaml.puts(obj)
       return data

To enable this mixin, call om.datasets.register_mixin:

# on startup
om.datasets.register_mixin(YamlDataMixin)

Note

Celery clusters require that the module providing YamlDaskMixin is available on both the client and the worker instance. This limitation is planned to be removed in future versions of omega|ml using ccbackend, which provides for arbitrary functions to be executed on a celery cluster. Dask Distributed clusters do not have this limitation.

Runtime mixins

Runtime mixins provide client-side extensions to om.runtime, specifically to OmegaModelProxy. OmegalModelProxy is responsible for submitting user-requested functions to the compute cluster.

Consider users want to run a cross-validation procedure in some particular way that is not supported by the default runtime. While they could use a job (notebook) to accomplish this, we provide a runtime mixin as a convenience.

# in crossvalidate.py
class CrossValidationMixin(object):
    def cross_validate(modelName, Xname, Yname, *args, **kwargs):
         # get the cross validation task
         task = self.runtime.task('custom.tasks.cross_validate')
         return task.delay(modelName, Xname, Yname, *args, **kwargs)


# in custom.tasks
def cross_validate(modelName, Xname, Yname, *args, **kwargs):
   # get model and data
   model = om.models.get(modelName)
   xdata = om.datasets.get(Xname)
   ydata = om.datasets.get(Yname)
   # perform cross validation
   results = ...
   #
   return results

To enable this mixin, add the class to om.defaults.OMEGA_RUNTIME_MIXINS:

OMEGA_STORE_MIXINS = [
  'crossvalidate.CrossValidationMixin',
]

Note

Celery clusters require that the custom.tasks module is available on both the client and the worker instance. This limitation is planned to be removed in future versions of omega|ml using ccbackend, which provides for arbitrary functions to be executed on a celery cluster. Dask Distributed clusters do not have this limitation.