Source code for omegaml.backends.tracking.simple

  1import dill
  2import importlib
  3import numpy as np
  4import os
  5import pandas as pd
  6import platform
  7import pymongo
  8import warnings
  9from base64 import b64encode, b64decode
 10from datetime import date
 11from itertools import chain
 12from typing import Iterable
 13from uuid import uuid4
 14
 15from omegaml.backends.tracking.base import TrackingProvider
 16from omegaml.documents import Metadata
 17from omegaml.util import _raise, ensure_index, batched, signature, tryOr, ensurelist
 18
 19
[docs] 20class NoTrackTracker(TrackingProvider): 21 """ A default tracker that does not record anything """ 22 23 def start(self, run=None): 24 pass 25 26 def stop(self): 27 pass 28 29 def log_artifact(self, obj, name, step=None, **extra): 30 pass 31 32 def log_metric(self, key, value, step=None, **extra): 33 pass 34 35 def log_param(self, key, value, step=None, **extra): 36 pass 37 38 def log_event(self, event, key, value, step=None, **extra): 39 pass 40 41 def log_extra(self, **kwargs): 42 pass 43 44 def log_data(self, **kwargs): 45 pass 46 47 def data(self, experiment=None, run=None, event=None, step=None, key=None, raw=False, **query): 48 pass
49 50
[docs] 51class OmegaSimpleTracker(TrackingProvider): 52 """ A tracking provider that logs to an omegaml dataset 53 54 Usage:: 55 56 with om.runtime.experiment(provider='default') as exp: 57 ... 58 exp.log_metric('accuracy', .78) 59 60 .. versionchanged:: 0.17 61 any extra 62 63 """ 64 _provider = 'simple' 65 _experiment = None 66 _startdt = None 67 _stopdt = None 68 _autotrack = False 69 70 _ensure_active = lambda self, r: r if r is not None else _raise( 71 ValueError('no active run, call .start() or .use() ')) 72 73 def __init__(self, *args, **kwargs): 74 super().__init__(*args, **kwargs) 75 self.log_buffer = [] 76 self.max_buffer = 10 77 self._initialize_dataset() 78
[docs] 79 def active_run(self, run=None): 80 """ set the lastest run as the active run 81 82 Args: 83 run (int|str): optional or unique task id, if None the 84 latest active run will be set, or a new run is created if 85 no active run exists. 86 87 Returns: 88 current run (int) 89 """ 90 if run is None: 91 latest = self._latest_run 92 latest_is_active = (latest is not None and self.status(run=latest) == 'STARTED') 93 self._run = latest if latest_is_active else self.start(run=None) 94 else: 95 self._run = run 96 self._experiment = self._experiment or uuid4().hex 97 return self._run
98
[docs] 99 def use(self, run=None): 100 """ reuse the latest run instead of starting a new one 101 102 semantic sugar for self.active_run() 103 104 Returns: 105 self 106 """ 107 self.active_run(run=run) 108 return self
109 110 @property 111 def autotrack(self): 112 return self._autotrack 113 114 @autotrack.setter 115 def autotrack(self, value): 116 self._autotrack = value 117 118 @property 119 def _latest_run(self): 120 cursor = self.data(event='start', run='*', lazy=True) 121 data = list(cursor.sort('data.run', -1).limit(1)) if cursor else None 122 run = data[-1].get('data', {}).get('run') if data is not None and len(data) > 0 else None 123 return run 124
[docs] 125 def status(self, run=None): 126 """ status of a run 127 128 Args: 129 run (int): the run number, defaults to the currently active run 130 131 Returns: 132 status in 'STARTED', 'STOPPED' 133 """ 134 self._run = run or self._run or self._latest_run 135 data = self.data(event=['start', 'stop'], run=self._run, raw=True) 136 no_runs = data is None or len(data) == 0 137 has_stop = not no_runs and sum(1 for row in data if row.get('event') == 'stop') 138 return 'PENDING' if no_runs else 'STOPPED' if has_stop else 'STARTED'
139
[docs] 140 def start(self, run=None, immediate=True): 141 """ start a new run 142 143 This starts a new run and logs the start event 144 """ 145 self._run = run or (self._latest_run or 0) + 1 146 self._startdt = datetime.utcnow() 147 data = self._common_log_data('start', key=None, value=None, step=None, dt=self._startdt) 148 self._write_log(data, immediate=immediate) 149 return self._run
150
[docs] 151 def stop(self, flush=True): 152 """ stop the current run 153 154 This stops the current run and records the stop event 155 """ 156 self._stopdt = datetime.utcnow() 157 data = self._common_log_data('stop', key=None, value=None, step=None, dt=self._stopdt) 158 self._write_log(data) 159 self.flush() if flush else None
160 161 def flush(self): 162 # passing list of list, as_many=True => collection.insert_many() for speed 163 if self.log_buffer: 164 self._store.put(self.log_buffer, self._data_name, 165 noversion=True, as_many=True) 166 self.log_buffer.clear() 167
[docs] 168 def clear(self, force=False): 169 """ clear all data 170 171 All data is removed from the experiment's dataset. This is not recoverable. 172 173 Args: 174 force (bool): if True, clears all data, otherwise raises an error 175 176 Caution: 177 * this will clear all data and is not recoverable 178 179 Raises: 180 AssertionError: if force is not True 181 182 .. versionadded:: 0.16.2 183 184 """ 185 assert force, "clear() requires force=True to prevent accidental data loss. This will clear all experiment data and is not recoverable." 186 self._store.drop(self._data_name, force=True) 187 self._initialize_dataset(force=True)
188 189 def _common_log_data(self, event, key, value, step=None, dt=None, **extra): 190 if isinstance(value, dict): 191 # shortcut to resolve PassthroughDataset actual values 192 # -- enables storing the actual values of a dataset passed as a PassthroughDataset 193 # TODO: should this be the responsibility of SimpleTracker? 194 if isinstance(value.get('args'), (list, tuple)): 195 value['args'] = [getattr(arg, '_passthrough_data', arg) for arg in value['args']] 196 if isinstance(value.get('kwargs'), dict): 197 value['kwargs'] = { 198 k: getattr(v, '_passthrough_data', v) for k, v in value['kwargs'].items() 199 } 200 data = { 201 'experiment': self._experiment, 202 'run': self._ensure_active(self._run), 203 'step': step, 204 'event': event, 205 'key': key or event, 206 'value': value, 207 'dt': dt or datetime.utcnow(), 208 'node': os.environ.get('HOSTNAME', platform.node()), 209 'userid': self.userid, 210 } 211 # add **extra, check for duplicate keys to avoid overwriting 212 dupl_keys = set(data.keys()) & set(extra.keys()) 213 if dupl_keys: 214 raise ValueError(f'duplicate extra keys : {dupl_keys}') 215 data.update(extra) 216 data.update(self._extra_log) if self._extra_log else None 217 return data 218 219 def _write_log(self, data, immediate=False): 220 self.log_buffer.append(data) 221 if immediate or len(self.log_buffer) > self.max_buffer: 222 self.flush() 223
[docs] 224 def log_artifact(self, obj, name, step=None, dt=None, event=None, key=None, **extra): 225 """ log any object to the current run 226 227 Usage:: 228 229 # log an artifact 230 exp.log_artifact(mydict, 'somedata') 231 232 # retrieve back 233 mydict_ = exp.restore_artifact('somedata') 234 235 Args: 236 obj (obj): any object to log 237 name (str): the name of artifact 238 step (int): the step, if any 239 **extra: any extra data to log 240 241 Notes: 242 * bool, str, int, float, list, dict are stored as ``format=type`` 243 * Metadata is stored as ``format=metadata`` 244 * objects supported by ``om.models`` are stored as ``format=model`` 245 * objects supported by ``om.datasets`` are stored as ``format=dataset`` 246 * all other objects are pickled and stored as ``format=pickle`` 247 """ 248 event = event or 'artifact' 249 key = key or name 250 if isinstance(obj, (bool, str, int, float, list, dict)): 251 format = 'type' 252 rawdata = obj 253 elif isinstance(obj, Metadata): 254 format = 'metadata' 255 rawdata = obj.to_json() 256 elif self._model_store.get_backend_byobj(obj) is not None: 257 objname = uuid4().hex 258 meta = self._model_store.put(obj, f'.experiments/.artefacts/{objname}') 259 format = 'model' 260 rawdata = meta.name 261 elif self._store.get_backend_byobj(obj) is not None: 262 objname = uuid4().hex 263 meta = self._store.put(obj, f'.experiments/.artefacts/{objname}') 264 format = 'dataset' 265 rawdata = meta.name 266 else: 267 try: 268 rawdata = b64encode(dill.dumps(obj)).decode('utf8') 269 format = 'pickle' 270 except TypeError as e: 271 rawdata = repr(obj) 272 format = 'repr' 273 value = { 274 'name': name, 275 'data': rawdata, 276 'format': format 277 } 278 data = self._common_log_data(event, key, value, step=step, dt=dt, name=name, **extra) 279 self._write_log(data)
280 281 def log_event(self, event, key, value, step=None, dt=None, **extra): 282 data = self._common_log_data(event, key, value, step=step, dt=dt, **extra) 283 self._write_log(data) 284
[docs] 285 def log_events(self, event, key, values, step=None, dt=None, **extra): 286 """ log a series of events 287 288 This is a convenience method to log multiple values for the same event. 289 All values will be logged with the same commong log data, i.e. the same 290 datetime, step, and any extra values. 291 292 Args: 293 event (str): the event name 294 key (str): the key for the event 295 values (list): a list of values to log 296 step (int): the step, if any 297 dt (datetime): the datetime, defaults to now 298 **extra: any other values to store with event 299 300 .. versionadded:: NEXT 301 302 """ 303 data = self._common_log_data(event, key, None, step=step, dt=dt, **extra) 304 for value in values: 305 data['value'] = value 306 self._write_log(dict(data))
307
[docs] 308 def log_param(self, key, value, step=None, dt=None, **extra): 309 """ log an experiment parameter 310 311 Args: 312 key (str): the parameter name 313 value (str|float|int|bool|dict): the parameter value 314 step (int): the step 315 **extra: any other values to store with event 316 317 Notes: 318 * logged as ``event=param`` 319 """ 320 data = self._common_log_data('param', key, value, step=step, dt=dt, **extra) 321 self._write_log(data)
322
[docs] 323 def log_metric(self, key, value, step=None, dt=None, **extra): 324 """ log a metric value 325 326 Args: 327 key (str): the metric name 328 value (str|float|int|bool|dict): the metric value 329 step (int): the step 330 **extra: any other values to store with event 331 332 Notes: 333 * logged as ``event=metric`` 334 """ 335 data = self._common_log_data('metric', key, value, step=step, dt=dt, **extra) 336 self._write_log(data)
337
[docs] 338 def log_data(self, key, value, step=None, dt=None, event=None, **extra): 339 """ log x/y data for model predictions 340 341 This is semantic sugar for log_artifact() using the 'data' event. 342 343 Args: 344 key (str): the name of the artifact 345 value (any): the x/y data 346 step (int): the step 347 dt (datetime): the datetime 348 event (str): the event, defaults to 'data' 349 **extra: any other values to store with event 350 351 Returns: 352 None 353 """ 354 event = event or 'data' 355 self.log_artifact(value, key, step=step, dt=dt, key=key, event=event, **extra)
356
[docs] 357 def log_system(self, key=None, value=None, step=None, dt=None, **extra): 358 """ log system data 359 360 Args: 361 key (str): the key to use, defaults to 'system' 362 value (str|float|int|bool|dict): the parameter value 363 step (int): the step 364 **extra: any other values to store with event 365 366 Notes: 367 * logged as ``event=system`` 368 * logs platform, python version and list of installed packages 369 """ 370 key = key or 'system' 371 value = value or { 372 'platform': platform.uname()._asdict(), 373 'python': '-'.join((platform.python_implementation(), 374 platform.python_version())), 375 'packages': ['=='.join((d.metadata['Name'], d.version)) 376 for d in importlib.metadata.distributions()] 377 } 378 data = self._common_log_data('system', key, value, step=step, dt=dt, **extra) 379 self._write_log(data)
380
[docs] 381 def log_extra(self, remove=False, **kwargs): 382 """ add additional log information for every subsequent logging call 383 384 Args: 385 remove (bool): if True, removes the extra log information 386 kwargs: any key-value pairs to log 387 388 """ 389 self._extra_log = {} if self._extra_log is None else self._extra_log 390 if not remove: 391 self._extra_log.update(kwargs) 392 elif kwargs: 393 from collections import deque as consume 394 deletions = (self._extra_log.pop(k, None) for k in kwargs) 395 consume(deletions, maxlen=0) 396 else: 397 self._extra_log = {}
398
[docs] 399 def data(self, experiment=None, run=None, event=None, step=None, key=None, raw=False, 400 lazy=False, since=None, end=None, batchsize=None, slice=None, **extra): 401 """ build a dataframe of all stored data 402 403 Args: 404 experiment (str|list): the name of the experiment, defaults to its current value 405 run (int|list|str|slice): the run(s) to get data back, defaults to current run, use 'all' for all, 406 1-indexed since first run, or -1 indexed from latest run, can combine both. If run < 0 407 would go before the first run, run 1 will be returned. A slice(start, stop) can be used 408 to specify a range of runs. 409 event (str|list): the event(s) to include 410 step (int|list): the step(s) to include 411 key (str|list): the key(s) to include 412 raw (bool): if True returns the raw data instead of a DataFrame 413 lazy (bool): if True returns the Cursor instead of data, ignores raw 414 since (datetime|timedelta|str): only return data since this date. If both since and run are specified, 415 only matches since the given date are returned. If since is a string it must be parseable 416 by pd.to_datime, or be given in the format '<n><unit:[smhdwMqy]>' for relative times, or a timedelta object. See 417 dtrelative() for details on relative times. 418 end (datetime): only return data until this date 419 batchsize (int): if specified, returns a generator yielding data in batches of batchsize, 420 note that raw is respected, i.e. raw=False yields a DataFrame for every batch, raw=True 421 yields a list of dicts 422 slice (tuple): if specified, returns a slice of the data, e.g. slice=(10, 25) returns rows 10-25, 423 the slice is applied after all other filters 424 425 Returns: 426 For lazy == False: 427 * data (DataFrame) if raw == False 428 * data (list of dicts) if raw == True 429 * None if no data exists 430 431 For lazy == True, no batchsize, regardless of raw: 432 * data (Cursor) for any value of raw 433 434 For lazy == True, with batchsize: 435 * data(generator of list[dict]) if raw = True 436 * data(generator of DataFrame) if raw = False 437 438 .. versionchanged:: 0.16.2 439 run supports negative indexing 440 441 .. versionchanged:: 0.17 442 added batchsize 443 444 .. versionchanged:: 0.17 445 enabled the use of run='*' to retrieve all runs, equivalent of run='all' 446 447 .. versionchanged:: 0.17 448 enabled data(run=, start=, end=, since=), accepting range queries on run, dt and event# 449 """ 450 from functools import cache 451 experiment = experiment or self._experiment 452 # -- flush all buffers before querying 453 self.flush() 454 # -- build filter 455 if since is None: 456 run = run if run is not None else self._run 457 run = ensurelist(run) if not isinstance(run, str) and isinstance(run, Iterable) else run 458 # actual run 459 # -- run is 1-indexed, so we need to adjust for -1 indexing 460 # e.g. -1 means the latest run, -2 the run before that 461 # e.g. latest_run = 5, run=-1 means 5, run=-2 means 4 etc. 462 # -- run can be a list, in which case we adjust run < 0 for each element 463 # -- run can never be less than 1 (1-indexed), even if run << 0 464 last_run = cache( 465 lambda: int(self._latest_run or 0)) # PERF/consistency: memoize the last run per each .data() call 466 relative_run = lambda r: max(1, 1 + last_run() + r) 467 if isinstance(run, list) and any(r < 0 for r in run): 468 run = [(r if r >= 0 else relative_run(r)) for r in run] 469 elif isinstance(run, int) and run < 0: 470 run = relative_run(run) 471 filter = self._build_data_filter(experiment, run, event, step, key, since, end, extra) 472 473 def read_data(cursor): 474 data = pd.DataFrame.from_records(cursor) 475 if 'dt' in data.columns: 476 data['dt'] = pd.to_datetime(data['dt'], errors='coerce') 477 data.sort_values('dt', inplace=True) 478 return data 479 480 def read_data_batched(cursor, batchsize, slice): 481 from builtins import slice as t_slice 482 if cursor is None: 483 yield None 484 return 485 if slice: 486 slice = (slice.start or 0, slice.stop or 0) if isinstance(slice, t_slice) else slice 487 cursor.skip(slice[0]) 488 cursor.limit(slice[1] - slice[0]) 489 batchsize = batchsize or (slice[1] - slice[0]) 490 for rows in batched(cursor, batchsize): 491 data = (r.get('data') for r in rows) 492 yield read_data(data) if not raw else list(data) 493 494 if batchsize or slice: 495 data = self._store.get(self._data_name, filter=filter, lazy=True, trusted=signature(filter)) 496 data = read_data_batched(data, batchsize, slice) 497 if slice and not batchsize: 498 # try to resolve just one iteration 499 data, _ = tryOr(lambda: (next(data), data.close()), (None, None)) 500 else: 501 data = self._store.get(self._data_name, filter=filter, lazy=lazy, trusted=signature(filter)) 502 data = read_data(data) if data is not None and not lazy and not raw else data 503 return data
504 505 def _build_data_filter(self, experiment, run, event, step, key, since, end, extra): 506 # build a filter for the data query, suitable for OmegaStore.get() 507 filter = {} 508 valid = lambda s: s is not None and str(s).lower() not in ('all', '*') 509 # SEC: ensure all values are basic types, to prevent operator injection 510 valid_types = (str, int, float, list, tuple, date, datetime) 511 op = lambda s: {'$in': ensurelist(s)} if isinstance(s, (list, tuple, np.ndarray)) else ensure_type(s, 512 valid_types) 513 ensure_type = lambda v, t: v if isinstance(v, t) else str(v) 514 if valid(experiment): 515 filter['data.experiment'] = op(experiment) 516 if valid(run): 517 if isinstance(run, slice): 518 filter['data.run'] = {'$gte': run.start, '$lte': run.stop} 519 else: 520 filter['data.run'] = op(run) 521 if valid(event): 522 filter['data.event'] = op(event) 523 if valid(step): 524 filter['data.step'] = op(step) 525 if valid(key): 526 filter['data.key'] = op(key) 527 if valid(since): 528 dtnow = getattr(self, '_since_dtnow', datetime.utcnow()) 529 if isinstance(since, str): 530 since = tryOr(lambda: pd.to_datetime(since), lambda: dtrelative('-' + since, now=dtnow)) 531 elif isinstance(since, timedelta): 532 since = dtnow - since 533 elif isinstance(since, datetime): 534 since = since 535 else: 536 raise ValueError( 537 f'invalid since value: {since}, must be datetime, timedelta or string in format "<n><unit:[smhdwMqy]>"') 538 filter['data.dt'] = {'$gte': str(since.isoformat())} 539 if valid(end): 540 dtnow = getattr(self, '_since_dtnow', datetime.utcnow()) 541 if isinstance(end, str): 542 end = tryOr(lambda: pd.to_datetime(end), lambda: dtrelative('+' + end, now=dtnow)) 543 elif isinstance(end, timedelta): 544 end = dtnow + end 545 elif isinstance(end, datetime): 546 end = end 547 else: 548 raise ValueError( 549 f'invalid end value: {end}, must be datetime, timedelta or string in format "<n><unit:[smhdwMqy]>"') 550 filter['data.dt'] = filter.setdefault('data.dt', {}) 551 filter['data.dt']['$lte'] = str(end.isoformat()) 552 for k, v in extra.items(): 553 if valid(v): 554 fk = f'data.{k}' 555 filter[fk] = op(v) 556 return filter 557 558 @property 559 def dataset(self): 560 return self._data_name 561 562 @property 563 def stats(self): 564 from omegaml.backends.tracking.statistics import ExperimentStatistics 565 return ExperimentStatistics(self) 566 567 def summary(self, **kwargs): 568 return self.stats.summary(**kwargs) 569 570 def _initialize_dataset(self, force=False): 571 # create indexes when the dataset is first created 572 if not force and self._store.exists(self._data_name): 573 return 574 coll = self._store.collection(self._data_name) 575 idxs = [ 576 {'data.run': pymongo.ASCENDING, 'data.event': pymongo.ASCENDING, 'data.key': pymongo.ASCENDING, 577 'data.experiment': pymongo.ASCENDING}, 578 {'data.dt': pymongo.ASCENDING, 'data.event': pymongo.ASCENDING, 'data.key': pymongo.ASCENDING, 579 'data.experiment': pymongo.ASCENDING}, 580 {'data.dt': pymongo.ASCENDING, 'data.event': pymongo.ASCENDING, 'data.experiment': pymongo.ASCENDING}, 581 ] 582 for specs in idxs: 583 ensure_index(coll, specs) 584
[docs] 585 def restore_artifact(self, *args, **kwargs): 586 """ restore a specific logged artifact 587 588 .. versionchanged:: 0.17 589 deprecated, use exp.restore_artifacts() instead 590 """ 591 warnings.warn('deprecated, use exp.restore_artifacts() instead', DeprecationWarning) 592 restored = self.restore_artifacts(*args, **kwargs) 593 return restored[-1] if restored else None
594
[docs] 595 def restore_artifacts(self, key=None, experiment=None, run=None, since=None, step=None, value=None, event=None, 596 name=None): 597 """ restore logged artifacts 598 599 Args: 600 key (str): the name of the artifact as provided in log_artifact 601 run (int): the run for which to query, defaults to current run 602 since (datetime): only return data since this date 603 step (int): the step for which to query, defaults to all steps in run 604 value (dict|list): dict or list of dict, this value is used instead of 605 querying data, use to retrieve an artifact from contents of ``.data()`` 606 607 Returns: 608 list of restored objects 609 610 Notes: 611 * this will restore the artifact according to its type assigned 612 by ``.log_artifact()``. If the type cannot be determined, the 613 actual data is returned 614 615 Updates: 616 * since 0.17: return list of objects instead of last object 617 """ 618 event = event or 'artifact' 619 name = name or '*' 620 if value is None: 621 all_data = self.data(experiment=experiment, run=run, since=since, event=event, 622 step=step, key=key, raw=True, name=name) 623 else: 624 all_data = [{'value': value}] if isinstance(value, dict) else value 625 restored = [] 626 all_data = all_data or [] 627 for item in all_data: 628 data = item.get('value') 629 if data['format'] == 'type': 630 obj = data['data'] 631 elif data['format'] == 'metadata': 632 meta = self._store._Metadata 633 obj = meta.from_json(data['data']) 634 elif data['format'] == 'dataset': 635 obj = self._store.get(data['data']) 636 elif data['format'] == 'model': 637 obj = self._model_store.get(data['data']) 638 elif data['format'] == 'pickle': 639 obj = dill.loads(b64decode((data['data']).encode('utf8'))) 640 else: 641 obj = data.get('data', data) 642 restored.append(obj) 643 return restored
644
[docs] 645 def restore_data(self, key, run=None, event=None, since=None, concat=True, **extra): 646 """ restore x/y data for model predictions 647 648 This is semantic sugar for restore_artifacts() using the event='data' event. 649 650 Args: 651 key (str): the name of the artifact 652 run (int): the run for which to query, defaults to current run 653 event (str): the event, defaults to 'data' 654 since (datetime): only return data since this date 655 concat (bool): if True, concatenates the data into a single object, 656 in this case all data must be of the same type. Defaults to True. 657 **extra: any other values to store with event 658 659 Returns: 660 list of restored objects 661 """ 662 event = event or 'data' 663 664 def _concat(values): 665 if values is None: 666 return None 667 if len(values) and isinstance(values[0], (pd.DataFrame, pd.Series)): 668 ensure_df_or_series = lambda v: pd.Series(v) if isinstance(v, (np.ndarray, list)) else v 669 return pd.concat((ensure_df_or_series(v) for v in values), axis=0) 670 elif len(values) and isinstance(values[0], np.ndarray): 671 return np.concatenate(values, axis=0) 672 # chain seems to be the fastests approach 673 # -- https://stackoverflow.com/a/56407963/890242 674 return list(chain(*values)) 675 676 restored = self.restore_artifacts(run=run, key=key, event=event, since=since, **extra) 677 return restored if not concat else _concat(restored)
678 679 680from datetime import datetime, timedelta 681 682 683def dtrelative(delta, now=None, as_delta=False): 684 """ return a datetime relative to now 685 686 Args: 687 delta (str|timedelta): the relative delta, if a string, specify as '[+-]<n><unit:[smhdwMqy]>', 688 e.g. '1d' for one day, '-1d' for one day ago, '+1d' for one day from now. Special cases: 689 '-0y' means the beginning of the current year, '+0y' means the end of the current year. 690 smhdwMqy = seconds, minutes, hours, days, weeks, months, quarters, years 691 now (datetime): the reference datetime, defaults to datetime.utcnow() 692 as_delta (bool): if True, returns a timedelta object, otherwise a datetime object 693 694 Returns: 695 datetime|timedelta: the relative datetime or timedelta 696 """ 697 # Parse the numeric part and the unit from the specifier 698 UNIT_MAP = {'s': 1, # 1 second 699 'm': 60, # 1 minute 700 'h': 60 * 60, # 1 hour 701 'd': 24 * 60 * 60, # 1 day 702 'w': 7 * 24 * 60 * 60, # 1 week 703 'n': 30 * 24 * 60 * 60, # 1 month 704 'q': 90 * 24 * 60 * 60, # 1 quarter 705 'y': 365 * 24 * 60 * 60} # 1 year 706 now = now or datetime.utcnow() 707 error_msg = f"Invalid delta {delta}. Use a string of format '<n><unit:[hdwmqy]>' or timedelta object." 708 if isinstance(delta, str): 709 try: 710 past = delta.startswith('-') 711 delta = (delta 712 .replace('-', '') 713 .replace('+', '') # Remove the sign 714 .replace(' ', '') # Remove spaces 715 .replace('M', 'n') # m is ambiguous, so we use n for month 716 .lower()) 717 num = int(delta[:-1]) # The numeric part 718 units = UNIT_MAP.get(delta[-1]) # The last character 719 if delta[-1] == 'y' and num == 0: 720 # special case -0y means beginning of the year, +0y means end of the year 721 dtdelta = (datetime(now.year, 1, 1) - now) if past else (datetime(now.year, 12, 31) - now) 722 else: 723 dtdelta = timedelta(seconds=num * units * (-1 if past else 1)) 724 except: 725 raise ValueError(error_msg) 726 elif isinstance(delta, timedelta): 727 dtdelta = delta 728 else: 729 raise ValueError(error_msg) 730 return now + dtdelta if not as_delta else dtdelta