1import dill
2import importlib
3import numpy as np
4import os
5import pandas as pd
6import platform
7import pymongo
8import warnings
9from base64 import b64encode, b64decode
10from datetime import date
11from itertools import chain
12from typing import Iterable
13from uuid import uuid4
14
15from omegaml.backends.tracking.base import TrackingProvider
16from omegaml.documents import Metadata
17from omegaml.util import _raise, ensure_index, batched, signature, tryOr, ensurelist
18
19
[docs]
20class NoTrackTracker(TrackingProvider):
21 """ A default tracker that does not record anything """
22
23 def start(self, run=None):
24 pass
25
26 def stop(self):
27 pass
28
29 def log_artifact(self, obj, name, step=None, **extra):
30 pass
31
32 def log_metric(self, key, value, step=None, **extra):
33 pass
34
35 def log_param(self, key, value, step=None, **extra):
36 pass
37
38 def log_event(self, event, key, value, step=None, **extra):
39 pass
40
41 def log_extra(self, **kwargs):
42 pass
43
44 def log_data(self, **kwargs):
45 pass
46
47 def data(self, experiment=None, run=None, event=None, step=None, key=None, raw=False, **query):
48 pass
49
50
[docs]
51class OmegaSimpleTracker(TrackingProvider):
52 """ A tracking provider that logs to an omegaml dataset
53
54 Usage::
55
56 with om.runtime.experiment(provider='default') as exp:
57 ...
58 exp.log_metric('accuracy', .78)
59
60 .. versionchanged:: 0.17
61 any extra
62
63 """
64 _provider = 'simple'
65 _experiment = None
66 _startdt = None
67 _stopdt = None
68 _autotrack = False
69
70 _ensure_active = lambda self, r: r if r is not None else _raise(
71 ValueError('no active run, call .start() or .use() '))
72
73 def __init__(self, *args, **kwargs):
74 super().__init__(*args, **kwargs)
75 self.log_buffer = []
76 self.max_buffer = 10
77 self._initialize_dataset()
78
[docs]
79 def active_run(self, run=None):
80 """ set the lastest run as the active run
81
82 Args:
83 run (int|str): optional or unique task id, if None the
84 latest active run will be set, or a new run is created if
85 no active run exists.
86
87 Returns:
88 current run (int)
89 """
90 if run is None:
91 latest = self._latest_run
92 latest_is_active = (latest is not None and self.status(run=latest) == 'STARTED')
93 self._run = latest if latest_is_active else self.start(run=None)
94 else:
95 self._run = run
96 self._experiment = self._experiment or uuid4().hex
97 return self._run
98
[docs]
99 def use(self, run=None):
100 """ reuse the latest run instead of starting a new one
101
102 semantic sugar for self.active_run()
103
104 Returns:
105 self
106 """
107 self.active_run(run=run)
108 return self
109
110 @property
111 def autotrack(self):
112 return self._autotrack
113
114 @autotrack.setter
115 def autotrack(self, value):
116 self._autotrack = value
117
118 @property
119 def _latest_run(self):
120 cursor = self.data(event='start', run='*', lazy=True)
121 data = list(cursor.sort('data.run', -1).limit(1)) if cursor else None
122 run = data[-1].get('data', {}).get('run') if data is not None and len(data) > 0 else None
123 return run
124
[docs]
125 def status(self, run=None):
126 """ status of a run
127
128 Args:
129 run (int): the run number, defaults to the currently active run
130
131 Returns:
132 status in 'STARTED', 'STOPPED'
133 """
134 self._run = run or self._run or self._latest_run
135 data = self.data(event=['start', 'stop'], run=self._run, raw=True)
136 no_runs = data is None or len(data) == 0
137 has_stop = not no_runs and sum(1 for row in data if row.get('event') == 'stop')
138 return 'PENDING' if no_runs else 'STOPPED' if has_stop else 'STARTED'
139
[docs]
140 def start(self, run=None, immediate=True):
141 """ start a new run
142
143 This starts a new run and logs the start event
144 """
145 self._run = run or (self._latest_run or 0) + 1
146 self._startdt = datetime.utcnow()
147 data = self._common_log_data('start', key=None, value=None, step=None, dt=self._startdt)
148 self._write_log(data, immediate=immediate)
149 return self._run
150
[docs]
151 def stop(self, flush=True):
152 """ stop the current run
153
154 This stops the current run and records the stop event
155 """
156 self._stopdt = datetime.utcnow()
157 data = self._common_log_data('stop', key=None, value=None, step=None, dt=self._stopdt)
158 self._write_log(data)
159 self.flush() if flush else None
160
161 def flush(self):
162 # passing list of list, as_many=True => collection.insert_many() for speed
163 if self.log_buffer:
164 self._store.put(self.log_buffer, self._data_name,
165 noversion=True, as_many=True)
166 self.log_buffer.clear()
167
[docs]
168 def clear(self, force=False):
169 """ clear all data
170
171 All data is removed from the experiment's dataset. This is not recoverable.
172
173 Args:
174 force (bool): if True, clears all data, otherwise raises an error
175
176 Caution:
177 * this will clear all data and is not recoverable
178
179 Raises:
180 AssertionError: if force is not True
181
182 .. versionadded:: 0.16.2
183
184 """
185 assert force, "clear() requires force=True to prevent accidental data loss. This will clear all experiment data and is not recoverable."
186 self._store.drop(self._data_name, force=True)
187 self._initialize_dataset(force=True)
188
189 def _common_log_data(self, event, key, value, step=None, dt=None, **extra):
190 if isinstance(value, dict):
191 # shortcut to resolve PassthroughDataset actual values
192 # -- enables storing the actual values of a dataset passed as a PassthroughDataset
193 # TODO: should this be the responsibility of SimpleTracker?
194 if isinstance(value.get('args'), (list, tuple)):
195 value['args'] = [getattr(arg, '_passthrough_data', arg) for arg in value['args']]
196 if isinstance(value.get('kwargs'), dict):
197 value['kwargs'] = {
198 k: getattr(v, '_passthrough_data', v) for k, v in value['kwargs'].items()
199 }
200 data = {
201 'experiment': self._experiment,
202 'run': self._ensure_active(self._run),
203 'step': step,
204 'event': event,
205 'key': key or event,
206 'value': value,
207 'dt': dt or datetime.utcnow(),
208 'node': os.environ.get('HOSTNAME', platform.node()),
209 'userid': self.userid,
210 }
211 # add **extra, check for duplicate keys to avoid overwriting
212 dupl_keys = set(data.keys()) & set(extra.keys())
213 if dupl_keys:
214 raise ValueError(f'duplicate extra keys : {dupl_keys}')
215 data.update(extra)
216 data.update(self._extra_log) if self._extra_log else None
217 return data
218
219 def _write_log(self, data, immediate=False):
220 self.log_buffer.append(data)
221 if immediate or len(self.log_buffer) > self.max_buffer:
222 self.flush()
223
[docs]
224 def log_artifact(self, obj, name, step=None, dt=None, event=None, key=None, **extra):
225 """ log any object to the current run
226
227 Usage::
228
229 # log an artifact
230 exp.log_artifact(mydict, 'somedata')
231
232 # retrieve back
233 mydict_ = exp.restore_artifact('somedata')
234
235 Args:
236 obj (obj): any object to log
237 name (str): the name of artifact
238 step (int): the step, if any
239 **extra: any extra data to log
240
241 Notes:
242 * bool, str, int, float, list, dict are stored as ``format=type``
243 * Metadata is stored as ``format=metadata``
244 * objects supported by ``om.models`` are stored as ``format=model``
245 * objects supported by ``om.datasets`` are stored as ``format=dataset``
246 * all other objects are pickled and stored as ``format=pickle``
247 """
248 event = event or 'artifact'
249 key = key or name
250 if isinstance(obj, (bool, str, int, float, list, dict)):
251 format = 'type'
252 rawdata = obj
253 elif isinstance(obj, Metadata):
254 format = 'metadata'
255 rawdata = obj.to_json()
256 elif self._model_store.get_backend_byobj(obj) is not None:
257 objname = uuid4().hex
258 meta = self._model_store.put(obj, f'.experiments/.artefacts/{objname}')
259 format = 'model'
260 rawdata = meta.name
261 elif self._store.get_backend_byobj(obj) is not None:
262 objname = uuid4().hex
263 meta = self._store.put(obj, f'.experiments/.artefacts/{objname}')
264 format = 'dataset'
265 rawdata = meta.name
266 else:
267 try:
268 rawdata = b64encode(dill.dumps(obj)).decode('utf8')
269 format = 'pickle'
270 except TypeError as e:
271 rawdata = repr(obj)
272 format = 'repr'
273 value = {
274 'name': name,
275 'data': rawdata,
276 'format': format
277 }
278 data = self._common_log_data(event, key, value, step=step, dt=dt, name=name, **extra)
279 self._write_log(data)
280
281 def log_event(self, event, key, value, step=None, dt=None, **extra):
282 data = self._common_log_data(event, key, value, step=step, dt=dt, **extra)
283 self._write_log(data)
284
[docs]
285 def log_events(self, event, key, values, step=None, dt=None, **extra):
286 """ log a series of events
287
288 This is a convenience method to log multiple values for the same event.
289 All values will be logged with the same commong log data, i.e. the same
290 datetime, step, and any extra values.
291
292 Args:
293 event (str): the event name
294 key (str): the key for the event
295 values (list): a list of values to log
296 step (int): the step, if any
297 dt (datetime): the datetime, defaults to now
298 **extra: any other values to store with event
299
300 .. versionadded:: NEXT
301
302 """
303 data = self._common_log_data(event, key, None, step=step, dt=dt, **extra)
304 for value in values:
305 data['value'] = value
306 self._write_log(dict(data))
307
[docs]
308 def log_param(self, key, value, step=None, dt=None, **extra):
309 """ log an experiment parameter
310
311 Args:
312 key (str): the parameter name
313 value (str|float|int|bool|dict): the parameter value
314 step (int): the step
315 **extra: any other values to store with event
316
317 Notes:
318 * logged as ``event=param``
319 """
320 data = self._common_log_data('param', key, value, step=step, dt=dt, **extra)
321 self._write_log(data)
322
[docs]
323 def log_metric(self, key, value, step=None, dt=None, **extra):
324 """ log a metric value
325
326 Args:
327 key (str): the metric name
328 value (str|float|int|bool|dict): the metric value
329 step (int): the step
330 **extra: any other values to store with event
331
332 Notes:
333 * logged as ``event=metric``
334 """
335 data = self._common_log_data('metric', key, value, step=step, dt=dt, **extra)
336 self._write_log(data)
337
[docs]
338 def log_data(self, key, value, step=None, dt=None, event=None, **extra):
339 """ log x/y data for model predictions
340
341 This is semantic sugar for log_artifact() using the 'data' event.
342
343 Args:
344 key (str): the name of the artifact
345 value (any): the x/y data
346 step (int): the step
347 dt (datetime): the datetime
348 event (str): the event, defaults to 'data'
349 **extra: any other values to store with event
350
351 Returns:
352 None
353 """
354 event = event or 'data'
355 self.log_artifact(value, key, step=step, dt=dt, key=key, event=event, **extra)
356
[docs]
357 def log_system(self, key=None, value=None, step=None, dt=None, **extra):
358 """ log system data
359
360 Args:
361 key (str): the key to use, defaults to 'system'
362 value (str|float|int|bool|dict): the parameter value
363 step (int): the step
364 **extra: any other values to store with event
365
366 Notes:
367 * logged as ``event=system``
368 * logs platform, python version and list of installed packages
369 """
370 key = key or 'system'
371 value = value or {
372 'platform': platform.uname()._asdict(),
373 'python': '-'.join((platform.python_implementation(),
374 platform.python_version())),
375 'packages': ['=='.join((d.metadata['Name'], d.version))
376 for d in importlib.metadata.distributions()]
377 }
378 data = self._common_log_data('system', key, value, step=step, dt=dt, **extra)
379 self._write_log(data)
380
398
[docs]
399 def data(self, experiment=None, run=None, event=None, step=None, key=None, raw=False,
400 lazy=False, since=None, end=None, batchsize=None, slice=None, **extra):
401 """ build a dataframe of all stored data
402
403 Args:
404 experiment (str|list): the name of the experiment, defaults to its current value
405 run (int|list|str|slice): the run(s) to get data back, defaults to current run, use 'all' for all,
406 1-indexed since first run, or -1 indexed from latest run, can combine both. If run < 0
407 would go before the first run, run 1 will be returned. A slice(start, stop) can be used
408 to specify a range of runs.
409 event (str|list): the event(s) to include
410 step (int|list): the step(s) to include
411 key (str|list): the key(s) to include
412 raw (bool): if True returns the raw data instead of a DataFrame
413 lazy (bool): if True returns the Cursor instead of data, ignores raw
414 since (datetime|timedelta|str): only return data since this date. If both since and run are specified,
415 only matches since the given date are returned. If since is a string it must be parseable
416 by pd.to_datime, or be given in the format '<n><unit:[smhdwMqy]>' for relative times, or a timedelta object. See
417 dtrelative() for details on relative times.
418 end (datetime): only return data until this date
419 batchsize (int): if specified, returns a generator yielding data in batches of batchsize,
420 note that raw is respected, i.e. raw=False yields a DataFrame for every batch, raw=True
421 yields a list of dicts
422 slice (tuple): if specified, returns a slice of the data, e.g. slice=(10, 25) returns rows 10-25,
423 the slice is applied after all other filters
424
425 Returns:
426 For lazy == False:
427 * data (DataFrame) if raw == False
428 * data (list of dicts) if raw == True
429 * None if no data exists
430
431 For lazy == True, no batchsize, regardless of raw:
432 * data (Cursor) for any value of raw
433
434 For lazy == True, with batchsize:
435 * data(generator of list[dict]) if raw = True
436 * data(generator of DataFrame) if raw = False
437
438 .. versionchanged:: 0.16.2
439 run supports negative indexing
440
441 .. versionchanged:: 0.17
442 added batchsize
443
444 .. versionchanged:: 0.17
445 enabled the use of run='*' to retrieve all runs, equivalent of run='all'
446
447 .. versionchanged:: 0.17
448 enabled data(run=, start=, end=, since=), accepting range queries on run, dt and event#
449 """
450 from functools import cache
451 experiment = experiment or self._experiment
452 # -- flush all buffers before querying
453 self.flush()
454 # -- build filter
455 if since is None:
456 run = run if run is not None else self._run
457 run = ensurelist(run) if not isinstance(run, str) and isinstance(run, Iterable) else run
458 # actual run
459 # -- run is 1-indexed, so we need to adjust for -1 indexing
460 # e.g. -1 means the latest run, -2 the run before that
461 # e.g. latest_run = 5, run=-1 means 5, run=-2 means 4 etc.
462 # -- run can be a list, in which case we adjust run < 0 for each element
463 # -- run can never be less than 1 (1-indexed), even if run << 0
464 last_run = cache(
465 lambda: int(self._latest_run or 0)) # PERF/consistency: memoize the last run per each .data() call
466 relative_run = lambda r: max(1, 1 + last_run() + r)
467 if isinstance(run, list) and any(r < 0 for r in run):
468 run = [(r if r >= 0 else relative_run(r)) for r in run]
469 elif isinstance(run, int) and run < 0:
470 run = relative_run(run)
471 filter = self._build_data_filter(experiment, run, event, step, key, since, end, extra)
472
473 def read_data(cursor):
474 data = pd.DataFrame.from_records(cursor)
475 if 'dt' in data.columns:
476 data['dt'] = pd.to_datetime(data['dt'], errors='coerce')
477 data.sort_values('dt', inplace=True)
478 return data
479
480 def read_data_batched(cursor, batchsize, slice):
481 from builtins import slice as t_slice
482 if cursor is None:
483 yield None
484 return
485 if slice:
486 slice = (slice.start or 0, slice.stop or 0) if isinstance(slice, t_slice) else slice
487 cursor.skip(slice[0])
488 cursor.limit(slice[1] - slice[0])
489 batchsize = batchsize or (slice[1] - slice[0])
490 for rows in batched(cursor, batchsize):
491 data = (r.get('data') for r in rows)
492 yield read_data(data) if not raw else list(data)
493
494 if batchsize or slice:
495 data = self._store.get(self._data_name, filter=filter, lazy=True, trusted=signature(filter))
496 data = read_data_batched(data, batchsize, slice)
497 if slice and not batchsize:
498 # try to resolve just one iteration
499 data, _ = tryOr(lambda: (next(data), data.close()), (None, None))
500 else:
501 data = self._store.get(self._data_name, filter=filter, lazy=lazy, trusted=signature(filter))
502 data = read_data(data) if data is not None and not lazy and not raw else data
503 return data
504
505 def _build_data_filter(self, experiment, run, event, step, key, since, end, extra):
506 # build a filter for the data query, suitable for OmegaStore.get()
507 filter = {}
508 valid = lambda s: s is not None and str(s).lower() not in ('all', '*')
509 # SEC: ensure all values are basic types, to prevent operator injection
510 valid_types = (str, int, float, list, tuple, date, datetime)
511 op = lambda s: {'$in': ensurelist(s)} if isinstance(s, (list, tuple, np.ndarray)) else ensure_type(s,
512 valid_types)
513 ensure_type = lambda v, t: v if isinstance(v, t) else str(v)
514 if valid(experiment):
515 filter['data.experiment'] = op(experiment)
516 if valid(run):
517 if isinstance(run, slice):
518 filter['data.run'] = {'$gte': run.start, '$lte': run.stop}
519 else:
520 filter['data.run'] = op(run)
521 if valid(event):
522 filter['data.event'] = op(event)
523 if valid(step):
524 filter['data.step'] = op(step)
525 if valid(key):
526 filter['data.key'] = op(key)
527 if valid(since):
528 dtnow = getattr(self, '_since_dtnow', datetime.utcnow())
529 if isinstance(since, str):
530 since = tryOr(lambda: pd.to_datetime(since), lambda: dtrelative('-' + since, now=dtnow))
531 elif isinstance(since, timedelta):
532 since = dtnow - since
533 elif isinstance(since, datetime):
534 since = since
535 else:
536 raise ValueError(
537 f'invalid since value: {since}, must be datetime, timedelta or string in format "<n><unit:[smhdwMqy]>"')
538 filter['data.dt'] = {'$gte': str(since.isoformat())}
539 if valid(end):
540 dtnow = getattr(self, '_since_dtnow', datetime.utcnow())
541 if isinstance(end, str):
542 end = tryOr(lambda: pd.to_datetime(end), lambda: dtrelative('+' + end, now=dtnow))
543 elif isinstance(end, timedelta):
544 end = dtnow + end
545 elif isinstance(end, datetime):
546 end = end
547 else:
548 raise ValueError(
549 f'invalid end value: {end}, must be datetime, timedelta or string in format "<n><unit:[smhdwMqy]>"')
550 filter['data.dt'] = filter.setdefault('data.dt', {})
551 filter['data.dt']['$lte'] = str(end.isoformat())
552 for k, v in extra.items():
553 if valid(v):
554 fk = f'data.{k}'
555 filter[fk] = op(v)
556 return filter
557
558 @property
559 def dataset(self):
560 return self._data_name
561
562 @property
563 def stats(self):
564 from omegaml.backends.tracking.statistics import ExperimentStatistics
565 return ExperimentStatistics(self)
566
567 def summary(self, **kwargs):
568 return self.stats.summary(**kwargs)
569
570 def _initialize_dataset(self, force=False):
571 # create indexes when the dataset is first created
572 if not force and self._store.exists(self._data_name):
573 return
574 coll = self._store.collection(self._data_name)
575 idxs = [
576 {'data.run': pymongo.ASCENDING, 'data.event': pymongo.ASCENDING, 'data.key': pymongo.ASCENDING,
577 'data.experiment': pymongo.ASCENDING},
578 {'data.dt': pymongo.ASCENDING, 'data.event': pymongo.ASCENDING, 'data.key': pymongo.ASCENDING,
579 'data.experiment': pymongo.ASCENDING},
580 {'data.dt': pymongo.ASCENDING, 'data.event': pymongo.ASCENDING, 'data.experiment': pymongo.ASCENDING},
581 ]
582 for specs in idxs:
583 ensure_index(coll, specs)
584
[docs]
585 def restore_artifact(self, *args, **kwargs):
586 """ restore a specific logged artifact
587
588 .. versionchanged:: 0.17
589 deprecated, use exp.restore_artifacts() instead
590 """
591 warnings.warn('deprecated, use exp.restore_artifacts() instead', DeprecationWarning)
592 restored = self.restore_artifacts(*args, **kwargs)
593 return restored[-1] if restored else None
594
[docs]
595 def restore_artifacts(self, key=None, experiment=None, run=None, since=None, step=None, value=None, event=None,
596 name=None):
597 """ restore logged artifacts
598
599 Args:
600 key (str): the name of the artifact as provided in log_artifact
601 run (int): the run for which to query, defaults to current run
602 since (datetime): only return data since this date
603 step (int): the step for which to query, defaults to all steps in run
604 value (dict|list): dict or list of dict, this value is used instead of
605 querying data, use to retrieve an artifact from contents of ``.data()``
606
607 Returns:
608 list of restored objects
609
610 Notes:
611 * this will restore the artifact according to its type assigned
612 by ``.log_artifact()``. If the type cannot be determined, the
613 actual data is returned
614
615 Updates:
616 * since 0.17: return list of objects instead of last object
617 """
618 event = event or 'artifact'
619 name = name or '*'
620 if value is None:
621 all_data = self.data(experiment=experiment, run=run, since=since, event=event,
622 step=step, key=key, raw=True, name=name)
623 else:
624 all_data = [{'value': value}] if isinstance(value, dict) else value
625 restored = []
626 all_data = all_data or []
627 for item in all_data:
628 data = item.get('value')
629 if data['format'] == 'type':
630 obj = data['data']
631 elif data['format'] == 'metadata':
632 meta = self._store._Metadata
633 obj = meta.from_json(data['data'])
634 elif data['format'] == 'dataset':
635 obj = self._store.get(data['data'])
636 elif data['format'] == 'model':
637 obj = self._model_store.get(data['data'])
638 elif data['format'] == 'pickle':
639 obj = dill.loads(b64decode((data['data']).encode('utf8')))
640 else:
641 obj = data.get('data', data)
642 restored.append(obj)
643 return restored
644
[docs]
645 def restore_data(self, key, run=None, event=None, since=None, concat=True, **extra):
646 """ restore x/y data for model predictions
647
648 This is semantic sugar for restore_artifacts() using the event='data' event.
649
650 Args:
651 key (str): the name of the artifact
652 run (int): the run for which to query, defaults to current run
653 event (str): the event, defaults to 'data'
654 since (datetime): only return data since this date
655 concat (bool): if True, concatenates the data into a single object,
656 in this case all data must be of the same type. Defaults to True.
657 **extra: any other values to store with event
658
659 Returns:
660 list of restored objects
661 """
662 event = event or 'data'
663
664 def _concat(values):
665 if values is None:
666 return None
667 if len(values) and isinstance(values[0], (pd.DataFrame, pd.Series)):
668 ensure_df_or_series = lambda v: pd.Series(v) if isinstance(v, (np.ndarray, list)) else v
669 return pd.concat((ensure_df_or_series(v) for v in values), axis=0)
670 elif len(values) and isinstance(values[0], np.ndarray):
671 return np.concatenate(values, axis=0)
672 # chain seems to be the fastests approach
673 # -- https://stackoverflow.com/a/56407963/890242
674 return list(chain(*values))
675
676 restored = self.restore_artifacts(run=run, key=key, event=event, since=since, **extra)
677 return restored if not concat else _concat(restored)
678
679
680from datetime import datetime, timedelta
681
682
683def dtrelative(delta, now=None, as_delta=False):
684 """ return a datetime relative to now
685
686 Args:
687 delta (str|timedelta): the relative delta, if a string, specify as '[+-]<n><unit:[smhdwMqy]>',
688 e.g. '1d' for one day, '-1d' for one day ago, '+1d' for one day from now. Special cases:
689 '-0y' means the beginning of the current year, '+0y' means the end of the current year.
690 smhdwMqy = seconds, minutes, hours, days, weeks, months, quarters, years
691 now (datetime): the reference datetime, defaults to datetime.utcnow()
692 as_delta (bool): if True, returns a timedelta object, otherwise a datetime object
693
694 Returns:
695 datetime|timedelta: the relative datetime or timedelta
696 """
697 # Parse the numeric part and the unit from the specifier
698 UNIT_MAP = {'s': 1, # 1 second
699 'm': 60, # 1 minute
700 'h': 60 * 60, # 1 hour
701 'd': 24 * 60 * 60, # 1 day
702 'w': 7 * 24 * 60 * 60, # 1 week
703 'n': 30 * 24 * 60 * 60, # 1 month
704 'q': 90 * 24 * 60 * 60, # 1 quarter
705 'y': 365 * 24 * 60 * 60} # 1 year
706 now = now or datetime.utcnow()
707 error_msg = f"Invalid delta {delta}. Use a string of format '<n><unit:[hdwmqy]>' or timedelta object."
708 if isinstance(delta, str):
709 try:
710 past = delta.startswith('-')
711 delta = (delta
712 .replace('-', '')
713 .replace('+', '') # Remove the sign
714 .replace(' ', '') # Remove spaces
715 .replace('M', 'n') # m is ambiguous, so we use n for month
716 .lower())
717 num = int(delta[:-1]) # The numeric part
718 units = UNIT_MAP.get(delta[-1]) # The last character
719 if delta[-1] == 'y' and num == 0:
720 # special case -0y means beginning of the year, +0y means end of the year
721 dtdelta = (datetime(now.year, 1, 1) - now) if past else (datetime(now.year, 12, 31) - now)
722 else:
723 dtdelta = timedelta(seconds=num * units * (-1 if past else 1))
724 except:
725 raise ValueError(error_msg)
726 elif isinstance(delta, timedelta):
727 dtdelta = delta
728 else:
729 raise ValueError(error_msg)
730 return now + dtdelta if not as_delta else dtdelta