1from os.path import basename, dirname
2
3import os
4import tempfile
5from shutil import make_archive, unpack_archive
6from subprocess import run
7
8from omegaml.backends.basedata import BaseDataBackend
9from omegaml.backends.package.packager import RunnablePackageMixin
10
11
[docs]
12class MLFlowProjectBackend(RunnablePackageMixin, BaseDataBackend):
13 """
14 Backend to support storage of MLFlow projects
15
16 Usage::
17
18 om.scripts.put('mlflow://path/to/MLProject', 'myproject')
19 om.scripts.get('myproject')
20
21 See Also:
22 https://www.mlflow.org/docs/latest/projects.html#project-directories
23 """
24 KIND = 'mlflow.project'
25 MLFLOW_PREFIX = 'mlflow://'
26
[docs]
27 @classmethod
28 def supports(self, obj, name, **kwargs):
29 is_mlflow_prefix = isinstance(obj, str) and obj.startswith(self.MLFLOW_PREFIX)
30 is_mlflow_kind = kwargs.get('kind') == self.KIND
31 return is_mlflow_kind or is_mlflow_prefix
32
[docs]
33 def put(self, obj, name, attributes=None, **kwargs):
34 """
35 save a MLFlow project
36
37 :param obj: full path to the MLFlow project directory
38 mlflow://path/to/MLProject
39 :param name: name to store
40 :return: the Metadata object
41 """
42 pkgsrc = obj.split(self.MLFLOW_PREFIX)[-1]
43 if os.path.exists(pkgsrc):
44 distdir = os.path.join(pkgsrc, 'dist')
45 os.makedirs(distdir, exist_ok=True)
46 tarfn = os.path.join(distdir, f'{name}')
47 pkgdist = make_archive(tarfn, 'gztar', root_dir=pkgsrc, base_dir='.')
48 else:
49 raise FileNotFoundError(pkgsrc)
50 filename = self.data_store.object_store_key(name, 'pkg', hashed=True)
51 gridfile = self._store_to_file(self.data_store, pkgdist, filename)
52 return self.data_store._make_metadata(
53 name=name,
54 prefix=self.data_store.prefix,
55 bucket=self.data_store.bucket,
56 kind=self.KIND,
57 attributes=attributes,
58 gridfile=gridfile).save()
59
[docs]
60 def get(self, name, localpath=None, **kwargs):
61 """
62 Load MLFlow project from store
63
64 This copies the projects's .tar.gz file from om.scripts to a local temp
65 path and returns the MLFlowProject to it
66
67 :param name: the name of the package
68 :param keep: keep the packages load path in sys.path, defaults to False
69 :param localpath: the local path to store the package
70 :param install: if True call pip install on the retrieved package
71 :param kwargs:
72 :return: the loaded module
73 """
74 pkgname = basename(name)
75 dstdir = localpath or self.data_store.tmppath
76 packagefname = '{}.tar.gz'.format(os.path.join(localpath or self.packages_path, pkgname))
77 os.makedirs(dirname(packagefname), exist_ok=True)
78 meta = self.data_store.metadata(name)
79 outf = meta.gridfile
80 with open(packagefname, 'wb') as pkgf:
81 pkgf.write(outf.read())
82 unpack_archive(packagefname, dstdir)
83 if localpath:
84 mod = localpath
85 else:
86 mod = MLFlowProject(dstdir)
87 return mod
88
89 @property
90 def packages_path(self):
91 return os.path.join(self.data_store.tmppath, 'packages')
92
93
94class MLFlowProject:
95 """ a proxy to the MLFlow project that runs a script
96
97 This provides the mod.run() interface for scripts so that
98 we can use the same semantics for mlflow projects and pypi
99 packages
100 """
101
102 def __init__(self, uri):
103 self.uri = uri
104
105 def run(self, om, pure_python=False, **kwargs):
106 kwargs.setdefault('env-manager', 'local')
107 options = ' '.join(f'--{k.replace("_", "-")} {v}' for k, v in kwargs.items())
108 tmpdir = tempfile.mkdtemp()
109 # fix issue
110 with open(os.path.join(tmpdir, 'pyenv'), 'w') as fout:
111 fout.write('#/bin/bash')
112 fout.write('conda activate $1')
113 cmd = fr'PATH={tmpdir}:$PATH; cd {tmpdir}; chmod +x ./pyenv; mlflow run {options} {self.uri}'
114 print(cmd)
115 output = run(cmd, capture_output=True, shell=True)
116 print(output)
117 if output.stderr:
118 output = {
119 'stdout': output.stdout.decode('utf8'),
120 'stderr': output.stderr.decode('utf8'),
121 }
122 else:
123 output = output.stdout.decode('utf8')
124 return {
125 'output': output,
126 }