1import importlib
2import os
3import platform
4import warnings
5from base64 import b64encode, b64decode
6from datetime import date
7from itertools import chain
8from typing import Iterable
9from uuid import uuid4
10
11import dill
12import numpy as np
13import pandas as pd
14import pymongo
15
16from omegaml.backends.tracking.base import TrackingProvider
17from omegaml.documents import Metadata
18from omegaml.util import _raise, ensure_index, batched, signature, tryOr, ensurelist
19
20
[docs]
21class NoTrackTracker(TrackingProvider):
22 """ A default tracker that does not record anything """
23
24 def start(self, run=None):
25 pass
26
27 def stop(self):
28 pass
29
30 def log_artifact(self, obj, name, step=None, **extra):
31 pass
32
33 def log_metric(self, key, value, step=None, **extra):
34 pass
35
36 def log_param(self, key, value, step=None, **extra):
37 pass
38
39 def log_event(self, event, key, value, step=None, **extra):
40 pass
41
42 def log_events(self, event, key, values, step=None, dt=None, **extra):
43 pass
44
45 def log_extra(self, **kwargs):
46 pass
47
48 def log_data(self, **kwargs):
49 pass
50
51 def data(self, experiment=None, run=None, event=None, step=None, key=None, raw=False, **query):
52 pass
53
54
[docs]
55class OmegaSimpleTracker(TrackingProvider):
56 """ A tracking provider that logs to an omegaml dataset
57
58 Usage::
59
60 with om.runtime.experiment(provider='default') as exp:
61 ...
62 exp.log_metric('accuracy', .78)
63
64 .. versionchanged:: 0.17
65 any extra
66
67 """
68 _provider = 'simple'
69 _experiment = None
70 _startdt = None
71 _stopdt = None
72 _autotrack = False
73
74 _ensure_active = lambda self, r: r if r is not None else _raise(
75 ValueError('no active run, call .start() or .use() '))
76
77 def __init__(self, *args, **kwargs):
78 super().__init__(*args, **kwargs)
79 self.log_buffer = []
80 self.max_buffer = 10
81 self._initialize_dataset()
82
[docs]
83 def active_run(self, run=None):
84 """ set the lastest run as the active run
85
86 Args:
87 run (int|str): optional or unique task id, if None the
88 latest active run will be set, or a new run is created if
89 no active run exists.
90
91 Returns:
92 current run (int)
93 """
94 if run is None:
95 latest = self._latest_run
96 latest_is_active = (latest is not None and self.status(run=latest) == 'STARTED')
97 self._run = latest if latest_is_active else self.start(run=None)
98 else:
99 self._run = run
100 self._experiment = self._experiment or uuid4().hex
101 return self._run
102
[docs]
103 def use(self, run=None):
104 """ reuse the latest run instead of starting a new one
105
106 semantic sugar for self.active_run()
107
108 Returns:
109 self
110 """
111 self.active_run(run=run)
112 return self
113
114 @property
115 def autotrack(self):
116 return self._autotrack
117
118 @autotrack.setter
119 def autotrack(self, value):
120 self._autotrack = value
121
122 @property
123 def _latest_run(self):
124 cursor = self.data(event='start', run='*', lazy=True)
125 data = list(cursor.sort('data.run', -1).limit(1)) if cursor else None
126 run = data[-1].get('data', {}).get('run') if data is not None and len(data) > 0 else None
127 return run
128
[docs]
129 def status(self, run=None):
130 """ status of a run
131
132 Args:
133 run (int): the run number, defaults to the currently active run
134
135 Returns:
136 status in 'STARTED', 'STOPPED'
137 """
138 self._run = run or self._run or self._latest_run
139 data = self.data(event=['start', 'stop'], run=self._run, raw=True)
140 no_runs = data is None or len(data) == 0
141 has_stop = not no_runs and sum(1 for row in data if row.get('event') == 'stop')
142 return 'PENDING' if no_runs else 'STOPPED' if has_stop else 'STARTED'
143
[docs]
144 def start(self, run=None, immediate=True):
145 """ start a new run
146
147 This starts a new run and logs the start event
148 """
149 self._run = run or (self._latest_run or 0) + 1
150 self._startdt = datetime.utcnow()
151 data = self._common_log_data('start', key=None, value=None, step=None, dt=self._startdt)
152 self._write_log(data, immediate=immediate)
153 return self._run
154
[docs]
155 def stop(self, flush=True):
156 """ stop the current run
157
158 This stops the current run and records the stop event
159 """
160 self._stopdt = datetime.utcnow()
161 data = self._common_log_data('stop', key=None, value=None, step=None, dt=self._stopdt)
162 self._write_log(data)
163 self.flush() if flush else None
164
165 def flush(self):
166 # passing list of list, as_many=True => collection.insert_many() for speed
167 if self.log_buffer:
168 self._store.put(self.log_buffer, self._data_name,
169 noversion=True, as_many=True)
170 self.log_buffer.clear()
171
[docs]
172 def clear(self, force=False):
173 """ clear all data
174
175 All data is removed from the experiment's dataset. This is not recoverable.
176
177 Args:
178 force (bool): if True, clears all data, otherwise raises an error
179
180 Caution:
181 * this will clear all data and is not recoverable
182
183 Raises:
184 AssertionError: if force is not True
185
186 .. versionadded:: 0.16.2
187
188 """
189 assert force, "clear() requires force=True to prevent accidental data loss. This will clear all experiment data and is not recoverable."
190 self._store.drop(self._data_name, force=True)
191 self._initialize_dataset(force=True)
192
193 def _common_log_data(self, event, key, value, step=None, dt=None, run=None, **extra):
194 if isinstance(value, dict):
195 # shortcut to resolve PassthroughDataset actual values
196 # -- enables storing the actual values of a dataset passed as a PassthroughDataset
197 # TODO: should this be the responsibility of SimpleTracker?
198 if isinstance(value.get('args'), (list, tuple)):
199 value['args'] = [getattr(arg, '_passthrough_data', arg) for arg in value['args']]
200 if isinstance(value.get('kwargs'), dict):
201 value['kwargs'] = {
202 k: getattr(v, '_passthrough_data', v) for k, v in value['kwargs'].items()
203 }
204 data = {
205 'experiment': self._experiment,
206 'run': run or self._ensure_active(self._run),
207 'step': step,
208 'event': event,
209 'key': key or event,
210 'value': value,
211 'dt': dt or datetime.utcnow(),
212 'node': os.environ.get('HOSTNAME', platform.node()),
213 'userid': self.userid,
214 }
215 # add **extra, check for duplicate keys to avoid overwriting
216 dupl_keys = set(data.keys()) & set(extra.keys())
217 if dupl_keys:
218 raise ValueError(f'duplicate extra keys : {dupl_keys}')
219 data.update(extra)
220 data.update(self._extra_log) if self._extra_log else None
221 return data
222
223 def _write_log(self, data, immediate=False):
224 self.log_buffer.append(data)
225 if immediate or len(self.log_buffer) > self.max_buffer:
226 self.flush()
227
[docs]
228 def log_artifact(self, obj, name, step=None, dt=None, event=None, key=None, **extra):
229 """ log any object to the current run
230
231 Usage::
232
233 # log an artifact
234 exp.log_artifact(mydict, 'somedata')
235
236 # retrieve back
237 mydict_ = exp.restore_artifact('somedata')
238
239 Args:
240 obj (obj): any object to log
241 name (str): the name of artifact
242 step (int): the step, if any
243 **extra: any extra data to log
244
245 Notes:
246 * bool, str, int, float, list, dict are stored as ``format=type``
247 * Metadata is stored as ``format=metadata``
248 * objects supported by ``om.models`` are stored as ``format=model``
249 * objects supported by ``om.datasets`` are stored as ``format=dataset``
250 * all other objects are pickled and stored as ``format=pickle``
251 """
252 event = event or 'artifact'
253 key = key or name
254 if isinstance(obj, (bool, str, int, float, list, dict)):
255 format = 'type'
256 rawdata = obj
257 elif isinstance(obj, Metadata):
258 format = 'metadata'
259 rawdata = obj.to_json()
260 elif self._model_store.get_backend_byobj(obj) is not None:
261 objname = uuid4().hex
262 meta = self._model_store.put(obj, f'.experiments/.artefacts/{objname}')
263 format = 'model'
264 rawdata = meta.name
265 elif self._store.get_backend_byobj(obj) is not None:
266 objname = uuid4().hex
267 meta = self._store.put(obj, f'.experiments/.artefacts/{objname}')
268 format = 'dataset'
269 rawdata = meta.name
270 else:
271 try:
272 rawdata = b64encode(dill.dumps(obj)).decode('utf8')
273 format = 'pickle'
274 except TypeError as e:
275 rawdata = repr(obj)
276 format = 'repr'
277 value = {
278 'name': name,
279 'data': rawdata,
280 'format': format
281 }
282 data = self._common_log_data(event, key, value, step=step, dt=dt, name=name, **extra)
283 self._write_log(data)
284
285 def log_event(self, event, key, value, step=None, dt=None, **extra):
286 data = self._common_log_data(event, key, value, step=step, dt=dt, **extra)
287 self._write_log(data)
288
[docs]
289 def log_events(self, event, key, values, step=None, dt=None, **extra):
290 """ log a series of events
291
292 This is a convenience method to log multiple values for the same event.
293 All values will be logged with the same commong log data, i.e. the same
294 datetime, step, and any extra values.
295
296 Args:
297 event (str): the event name
298 key (str): the key for the event
299 values (list): a list of values to log
300 step (int): the step, if any
301 dt (datetime): the datetime, defaults to now
302 **extra: any other values to store with event
303
304 .. versionadded:: 0.18
305
306 """
307 data = self._common_log_data(event, key, None, step=step, dt=dt, **extra)
308 for value in values:
309 data['value'] = value
310 self._write_log(dict(data))
311
[docs]
312 def log_param(self, key, value, step=None, dt=None, **extra):
313 """ log an experiment parameter
314
315 Args:
316 key (str): the parameter name
317 value (str|float|int|bool|dict): the parameter value
318 step (int): the step
319 **extra: any other values to store with event
320
321 Notes:
322 * logged as ``event=param``
323 """
324 data = self._common_log_data('param', key, value, step=step, dt=dt, **extra)
325 self._write_log(data)
326
[docs]
327 def log_metric(self, key, value, step=None, dt=None, **extra):
328 """ log a metric value
329
330 Args:
331 key (str): the metric name
332 value (str|float|int|bool|dict): the metric value
333 step (int): the step
334 **extra: any other values to store with event
335
336 Notes:
337 * logged as ``event=metric``
338 """
339 data = self._common_log_data('metric', key, value, step=step, dt=dt, **extra)
340 self._write_log(data)
341
[docs]
342 def log_data(self, key, value, step=None, dt=None, event=None, **extra):
343 """ log x/y data for model predictions
344
345 This is semantic sugar for log_artifact() using the 'data' event.
346
347 Args:
348 key (str): the name of the artifact
349 value (any): the x/y data
350 step (int): the step
351 dt (datetime): the datetime
352 event (str): the event, defaults to 'data'
353 **extra: any other values to store with event
354
355 Returns:
356 None
357 """
358 event = event or 'data'
359 self.log_artifact(value, key, step=step, dt=dt, key=key, event=event, **extra)
360
[docs]
361 def log_system(self, key=None, value=None, step=None, dt=None, **extra):
362 """ log system data
363
364 Args:
365 key (str): the key to use, defaults to 'system'
366 value (str|float|int|bool|dict): the parameter value
367 step (int): the step
368 **extra: any other values to store with event
369
370 Notes:
371 * logged as ``event=system``
372 * logs platform, python version and list of installed packages
373 """
374 key = key or 'system'
375 value = value or {
376 'platform': platform.uname()._asdict(),
377 'python': '-'.join((platform.python_implementation(),
378 platform.python_version())),
379 'packages': ['=='.join((d.metadata['Name'], d.version))
380 for d in importlib.metadata.distributions()]
381 }
382 data = self._common_log_data('system', key, value, step=step, dt=dt, **extra)
383 self._write_log(data)
384
402
[docs]
403 def data(self, experiment=None, run=None, event=None, step=None, key=None, raw=False,
404 lazy=False, since=None, end=None, batchsize=None, slice=None, **extra):
405 """ build a dataframe of all stored data
406
407 Args:
408 experiment (str|list): the name of the experiment, defaults to its current value
409 run (int|list|str|slice): the run(s) to get data back, defaults to current run, use 'all' for all,
410 1-indexed since first run, or -1 indexed from latest run, can combine both. If run < 0
411 would go before the first run, run 1 will be returned. A slice(start, stop) can be used
412 to specify a range of runs.
413 event (str|list): the event(s) to include
414 step (int|list): the step(s) to include
415 key (str|list): the key(s) to include
416 raw (bool): if True returns the raw data instead of a DataFrame
417 lazy (bool): if True returns the Cursor instead of data, ignores raw
418 since (datetime|timedelta|str): only return data since this date. If both since and run are specified,
419 only matches since the given date are returned. If since is a string it must be parseable
420 by pd.to_datime, or be given in the format '<n><unit:[smhdwMqy]>' for relative times, or a timedelta object. See
421 dtrelative() for details on relative times.
422 end (datetime): only return data until this date
423 batchsize (int): if specified, returns a generator yielding data in batches of batchsize,
424 note that raw is respected, i.e. raw=False yields a DataFrame for every batch, raw=True
425 yields a list of dicts
426 slice (tuple): if specified, returns a slice of the data, e.g. slice=(10, 25) returns rows 10-25,
427 the slice is applied after all other filters
428
429 Returns:
430 For lazy == False:
431 * data (DataFrame) if raw == False
432 * data (list of dicts) if raw == True
433 * None if no data exists
434
435 For lazy == True, no batchsize, regardless of raw:
436 * data (Cursor) for any value of raw
437
438 For lazy == True, with batchsize:
439 * data(generator of list[dict]) if raw = True
440 * data(generator of DataFrame) if raw = False
441
442 .. versionchanged:: 0.16.2
443 run supports negative indexing
444
445 .. versionchanged:: 0.17
446 added batchsize
447
448 .. versionchanged:: 0.17
449 enabled the use of run='*' to retrieve all runs, equivalent of run='all'
450
451 .. versionchanged:: 0.17
452 enabled data(run=, start=, end=, since=), accepting range queries on run, dt and event#
453 """
454 from functools import cache
455 experiment = experiment or self._experiment
456 # -- flush all buffers before querying
457 self.flush()
458 # -- build filter
459 if since is None:
460 run = run if run is not None else self._run
461 run = ensurelist(run) if not isinstance(run, str) and isinstance(run, Iterable) else run
462 # actual run
463 # -- run is 1-indexed, so we need to adjust for -1 indexing
464 # e.g. -1 means the latest run, -2 the run before that
465 # e.g. latest_run = 5, run=-1 means 5, run=-2 means 4 etc.
466 # -- run can be a list, in which case we adjust run < 0 for each element
467 # -- run can never be less than 1 (1-indexed), even if run << 0
468 last_run = cache(
469 lambda: int(self._latest_run or 0)) # PERF/consistency: memoize the last run per each .data() call
470 relative_run = lambda r: max(1, 1 + last_run() + r)
471 if isinstance(run, list) and any(r < 0 for r in run):
472 run = [(r if r >= 0 else relative_run(r)) for r in run]
473 elif isinstance(run, int) and run < 0:
474 run = relative_run(run)
475 filter = self._build_data_filter(experiment, run, event, step, key, since, end, extra)
476
477 def read_data(cursor):
478 data = pd.DataFrame.from_records(cursor)
479 if 'dt' in data.columns:
480 data['dt'] = pd.to_datetime(data['dt'], errors='coerce')
481 data.sort_values('dt', inplace=True)
482 return data
483
484 def read_data_batched(cursor, batchsize, slice):
485 from builtins import slice as t_slice
486 if cursor is None:
487 yield None
488 return
489 if slice:
490 slice = (slice.start or 0, slice.stop or 0) if isinstance(slice, t_slice) else slice
491 cursor.skip(slice[0])
492 cursor.limit(slice[1] - slice[0])
493 batchsize = batchsize or (slice[1] - slice[0])
494 for rows in batched(cursor, batchsize):
495 data = (r.get('data') for r in rows)
496 yield read_data(data) if not raw else list(data)
497
498 if batchsize or slice:
499 data = self._store.get(self._data_name, filter=filter, lazy=True, trusted=signature(filter))
500 data = read_data_batched(data, batchsize, slice)
501 if slice and not batchsize:
502 # try to resolve just one iteration
503 data, _ = tryOr(lambda: (next(data), data.close()), (None, None))
504 else:
505 data = self._store.get(self._data_name, filter=filter, lazy=lazy, trusted=signature(filter))
506 data = read_data(data) if data is not None and not lazy and not raw else data
507 return data
508
509 def _build_data_filter(self, experiment, run, event, step, key, since, end, extra):
510 # build a filter for the data query, suitable for OmegaStore.get()
511 filter = {}
512 valid = lambda s: s is not None and str(s).lower() not in ('all', '*')
513 # SEC: ensure all values are basic types, to prevent operator injection
514 valid_types = (str, int, float, list, tuple, date, datetime)
515 op = lambda s: {'$in': ensurelist(s)} if isinstance(s, (list, tuple, np.ndarray)) else ensure_type(s,
516 valid_types)
517 ensure_type = lambda v, t: v if isinstance(v, t) else str(v)
518 if valid(experiment):
519 filter['data.experiment'] = op(experiment)
520 if valid(run):
521 if isinstance(run, slice):
522 filter['data.run'] = {'$gte': run.start, '$lte': run.stop}
523 else:
524 filter['data.run'] = op(run)
525 if valid(event):
526 filter['data.event'] = op(event)
527 if valid(step):
528 filter['data.step'] = op(step)
529 if valid(key):
530 filter['data.key'] = op(key)
531 if valid(since):
532 dtnow = getattr(self, '_since_dtnow', datetime.utcnow())
533 if isinstance(since, str):
534 since = tryOr(lambda: pd.to_datetime(since), lambda: dtrelative('-' + since, now=dtnow))
535 elif isinstance(since, timedelta):
536 since = dtnow - since
537 elif isinstance(since, datetime):
538 since = since
539 else:
540 raise ValueError(
541 f'invalid since value: {since}, must be datetime, timedelta or string in format "<n><unit:[smhdwMqy]>"')
542 filter['data.dt'] = {'$gte': str(since.isoformat())}
543 if valid(end):
544 dtnow = getattr(self, '_since_dtnow', datetime.utcnow())
545 if isinstance(end, str):
546 end = tryOr(lambda: pd.to_datetime(end), lambda: dtrelative('+' + end, now=dtnow))
547 elif isinstance(end, timedelta):
548 end = dtnow + end
549 elif isinstance(end, datetime):
550 end = end
551 else:
552 raise ValueError(
553 f'invalid end value: {end}, must be datetime, timedelta or string in format "<n><unit:[smhdwMqy]>"')
554 filter['data.dt'] = filter.setdefault('data.dt', {})
555 filter['data.dt']['$lte'] = str(end.isoformat())
556 for k, v in extra.items():
557 if valid(v):
558 fk = f'data.{k}'
559 filter[fk] = op(v)
560 return filter
561
562 @property
563 def dataset(self):
564 return self._data_name
565
566 @property
567 def stats(self):
568 from omegaml.backends.tracking.statistics import ExperimentStatistics
569 return ExperimentStatistics(self)
570
571 def summary(self, **kwargs):
572 return self.stats.summary(**kwargs)
573
574 def _initialize_dataset(self, force=False):
575 # create indexes when the dataset is first created
576 if not force and self._store.exists(self._data_name):
577 return
578 coll = self._store.collection(self._data_name)
579 idxs = [
580 {'data.run': pymongo.ASCENDING, 'data.event': pymongo.ASCENDING, 'data.key': pymongo.ASCENDING,
581 'data.experiment': pymongo.ASCENDING},
582 {'data.dt': pymongo.ASCENDING, 'data.event': pymongo.ASCENDING, 'data.key': pymongo.ASCENDING,
583 'data.experiment': pymongo.ASCENDING},
584 {'data.dt': pymongo.ASCENDING, 'data.event': pymongo.ASCENDING, 'data.experiment': pymongo.ASCENDING},
585 # fast retrieval by event, key, userid across all runs
586 {'data.event': pymongo.ASCENDING, 'data.key': pymongo.ASCENDING, 'data.userid': pymongo.ASCENDING,
587 'data.experiment': pymongo.ASCENDING},
588 ]
589 for specs in idxs:
590 ensure_index(coll, specs)
591 # self._store.put(coll, self._data_name)
592
[docs]
593 def restore_artifact(self, *args, **kwargs):
594 """ restore a specific logged artifact
595
596 .. versionchanged:: 0.17
597 deprecated, use exp.restore_artifacts() instead
598 """
599 warnings.warn('deprecated, use exp.restore_artifacts() instead', DeprecationWarning)
600 restored = self.restore_artifacts(*args, **kwargs)
601 return restored[-1] if restored else None
602
[docs]
603 def restore_artifacts(self, key=None, experiment=None, run=None, since=None, step=None, value=None, event=None,
604 name=None):
605 """ restore logged artifacts
606
607 Args:
608 key (str): the name of the artifact as provided in log_artifact
609 run (int): the run for which to query, defaults to current run
610 since (datetime): only return data since this date
611 step (int): the step for which to query, defaults to all steps in run
612 value (dict|list): dict or list of dict, this value is used instead of
613 querying data, use to retrieve an artifact from contents of ``.data()``
614
615 Returns:
616 list of restored objects
617
618 Notes:
619 * this will restore the artifact according to its type assigned
620 by ``.log_artifact()``. If the type cannot be determined, the
621 actual data is returned
622
623 Updates:
624 * since 0.17: return list of objects instead of last object
625 """
626 event = event or 'artifact'
627 name = name or '*'
628 if value is None:
629 all_data = self.data(experiment=experiment, run=run, since=since, event=event,
630 step=step, key=key, raw=True, name=name)
631 else:
632 all_data = [{'value': value}] if isinstance(value, dict) else value
633 restored = []
634 all_data = all_data or []
635 for item in all_data:
636 data = item.get('value')
637 if data['format'] == 'type':
638 obj = data['data']
639 elif data['format'] == 'metadata':
640 meta = self._store._Metadata
641 obj = meta.from_json(data['data'])
642 elif data['format'] == 'dataset':
643 obj = self._store.get(data['data'])
644 elif data['format'] == 'model':
645 obj = self._model_store.get(data['data'])
646 elif data['format'] == 'pickle':
647 obj = dill.loads(b64decode((data['data']).encode('utf8')))
648 else:
649 obj = data.get('data', data)
650 restored.append(obj)
651 return restored
652
[docs]
653 def restore_data(self, key, run=None, event=None, since=None, concat=True, **extra):
654 """ restore x/y data for model predictions
655
656 This is semantic sugar for restore_artifacts() using the event='data' event.
657
658 Args:
659 key (str): the name of the artifact
660 run (int): the run for which to query, defaults to current run
661 event (str): the event, defaults to 'data'
662 since (datetime): only return data since this date
663 concat (bool): if True, concatenates the data into a single object,
664 in this case all data must be of the same type. Defaults to True.
665 **extra: any other values to store with event
666
667 Returns:
668 list of restored objects
669 """
670 event = event or 'data'
671
672 def _concat(values):
673 if values is None:
674 return None
675 if len(values) and isinstance(values[0], (pd.DataFrame, pd.Series)):
676 ensure_df_or_series = lambda v: pd.Series(v) if isinstance(v, (np.ndarray, list)) else v
677 return pd.concat((ensure_df_or_series(v) for v in values), axis=0)
678 elif len(values) and isinstance(values[0], np.ndarray):
679 return np.concatenate(values, axis=0)
680 # chain seems to be the fastests approach
681 # -- https://stackoverflow.com/a/56407963/890242
682 return list(chain(*values))
683
684 restored = self.restore_artifacts(run=run, key=key, event=event, since=since, **extra)
685 return restored if not concat else _concat(restored)
686
687
688from datetime import datetime, timedelta
689
690
691def dtrelative(delta, now=None, as_delta=False):
692 """ return a datetime relative to now
693
694 Args:
695 delta (str|timedelta): the relative delta, if a string, specify as '[+-]<n><unit:[smhdwMqy]>',
696 e.g. '1d' for one day, '-1d' for one day ago, '+1d' for one day from now. Special cases:
697 '-0y' means the beginning of the current year, '+0y' means the end of the current year.
698 smhdwMqy = seconds, minutes, hours, days, weeks, months, quarters, years
699 now (datetime): the reference datetime, defaults to datetime.utcnow()
700 as_delta (bool): if True, returns a timedelta object, otherwise a datetime object
701
702 Returns:
703 datetime|timedelta: the relative datetime or timedelta
704 """
705 # Parse the numeric part and the unit from the specifier
706 UNIT_MAP = {'s': 1, # 1 second
707 'm': 60, # 1 minute
708 'h': 60 * 60, # 1 hour
709 'd': 24 * 60 * 60, # 1 day
710 'w': 7 * 24 * 60 * 60, # 1 week
711 'n': 30 * 24 * 60 * 60, # 1 month
712 'q': 90 * 24 * 60 * 60, # 1 quarter
713 'y': 365 * 24 * 60 * 60} # 1 year
714 now = now or datetime.utcnow()
715 error_msg = f"Invalid delta {delta}. Use a string of format '<n><unit:[hdwmqy]>' or timedelta object."
716 if isinstance(delta, str):
717 try:
718 past = delta.startswith('-')
719 delta = (delta
720 .replace('-', '')
721 .replace('+', '') # Remove the sign
722 .replace(' ', '') # Remove spaces
723 .replace('M', 'n') # m is ambiguous, so we use n for month
724 .lower())
725 num = int(delta[:-1]) # The numeric part
726 units = UNIT_MAP.get(delta[-1]) # The last character
727 if delta[-1] == 'y' and num == 0:
728 # special case -0y means beginning of the year, +0y means end of the year
729 dtdelta = (datetime(now.year, 1, 1) - now) if past else (datetime(now.year, 12, 31) - now)
730 else:
731 dtdelta = timedelta(seconds=num * units * (-1 if past else 1))
732 except:
733 raise ValueError(error_msg)
734 elif isinstance(delta, timedelta):
735 dtdelta = delta
736 else:
737 raise ValueError(error_msg)
738 return now + dtdelta if not as_delta else dtdelta