Source code for omegaml.backends.mlflow.localprojects

  1from os.path import basename, dirname
  2
  3import os
  4import tempfile
  5from shutil import make_archive, unpack_archive
  6from subprocess import run
  7
  8from omegaml.backends.basedata import BaseDataBackend
  9from omegaml.backends.package.packager import RunnablePackageMixin
 10
 11
[docs] 12class MLFlowProjectBackend(RunnablePackageMixin, BaseDataBackend): 13 """ 14 Backend to support storage of MLFlow projects 15 16 Usage:: 17 18 om.scripts.put('mlflow://path/to/MLProject', 'myproject') 19 om.scripts.get('myproject') 20 21 See Also: 22 https://www.mlflow.org/docs/latest/projects.html#project-directories 23 """ 24 KIND = 'mlflow.project' 25 MLFLOW_PREFIX = 'mlflow://' 26
[docs] 27 @classmethod 28 def supports(self, obj, name, **kwargs): 29 is_mlflow_prefix = isinstance(obj, str) and obj.startswith(self.MLFLOW_PREFIX) 30 is_mlflow_kind = kwargs.get('kind') == self.KIND 31 return is_mlflow_kind or is_mlflow_prefix
32
[docs] 33 def put(self, obj, name, attributes=None, **kwargs): 34 """ 35 save a MLFlow project 36 37 :param obj: full path to the MLFlow project directory 38 mlflow://path/to/MLProject 39 :param name: name to store 40 :return: the Metadata object 41 """ 42 pkgsrc = obj.split(self.MLFLOW_PREFIX)[-1] 43 if os.path.exists(pkgsrc): 44 distdir = os.path.join(pkgsrc, 'dist') 45 os.makedirs(distdir, exist_ok=True) 46 tarfn = os.path.join(distdir, f'{name}') 47 pkgdist = make_archive(tarfn, 'gztar', root_dir=pkgsrc, base_dir='.') 48 else: 49 raise FileNotFoundError(pkgsrc) 50 filename = self.data_store.object_store_key(name, 'pkg', hashed=True) 51 gridfile = self._store_to_file(self.data_store, pkgdist, filename) 52 return self.data_store._make_metadata( 53 name=name, 54 prefix=self.data_store.prefix, 55 bucket=self.data_store.bucket, 56 kind=self.KIND, 57 attributes=attributes, 58 gridfile=gridfile).save()
59
[docs] 60 def get(self, name, localpath=None, **kwargs): 61 """ 62 Load MLFlow project from store 63 64 This copies the projects's .tar.gz file from om.scripts to a local temp 65 path and returns the MLFlowProject to it 66 67 :param name: the name of the package 68 :param keep: keep the packages load path in sys.path, defaults to False 69 :param localpath: the local path to store the package 70 :param install: if True call pip install on the retrieved package 71 :param kwargs: 72 :return: the loaded module 73 """ 74 pkgname = basename(name) 75 dstdir = localpath or self.data_store.tmppath 76 packagefname = '{}.tar.gz'.format(os.path.join(localpath or self.packages_path, pkgname)) 77 os.makedirs(dirname(packagefname), exist_ok=True) 78 meta = self.data_store.metadata(name) 79 outf = meta.gridfile 80 with open(packagefname, 'wb') as pkgf: 81 pkgf.write(outf.read()) 82 unpack_archive(packagefname, dstdir) 83 if localpath: 84 mod = localpath 85 else: 86 mod = MLFlowProject(dstdir) 87 return mod
88 89 @property 90 def packages_path(self): 91 return os.path.join(self.data_store.tmppath, 'packages')
92 93 94class MLFlowProject: 95 """ a proxy to the MLFlow project that runs a script 96 97 This provides the mod.run() interface for scripts so that 98 we can use the same semantics for mlflow projects and pypi 99 packages 100 """ 101 102 def __init__(self, uri): 103 self.uri = uri 104 105 def run(self, om, pure_python=False, **kwargs): 106 kwargs.setdefault('env-manager', 'local') 107 options = ' '.join(f'--{k.replace("_", "-")} {v}' for k, v in kwargs.items()) 108 tmpdir = tempfile.mkdtemp() 109 # fix issue 110 with open(os.path.join(tmpdir, 'pyenv'), 'w') as fout: 111 fout.write('#/bin/bash') 112 fout.write('conda activate $1') 113 cmd = fr'PATH={tmpdir}:$PATH; cd {tmpdir}; chmod +x ./pyenv; mlflow run {options} {self.uri}' 114 print(cmd) 115 output = run(cmd, capture_output=True, shell=True) 116 print(output) 117 if output.stderr: 118 output = { 119 'stdout': output.stdout.decode('utf8'), 120 'stderr': output.stderr.decode('utf8'), 121 } 122 else: 123 output = output.stdout.decode('utf8') 124 return { 125 'output': output, 126 }