Source code for omegaml.backends.tracking.simple

  1import dill
  2import importlib
  3import numpy as np
  4import os
  5import pandas as pd
  6import platform
  7import pymongo
  8import warnings
  9from base64 import b64encode, b64decode
 10from datetime import date
 11from itertools import chain
 12from typing import Iterable
 13from uuid import uuid4
 14
 15from omegaml.backends.tracking.base import TrackingProvider
 16from omegaml.documents import Metadata
 17from omegaml.util import _raise, ensure_index, batched, signature, tryOr, ensurelist
 18
 19
[docs] 20class NoTrackTracker(TrackingProvider): 21 """ A default tracker that does not record anything """ 22 23 def start(self, run=None): 24 pass 25 26 def stop(self): 27 pass 28 29 def log_artifact(self, obj, name, step=None, **extra): 30 pass 31 32 def log_metric(self, key, value, step=None, **extra): 33 pass 34 35 def log_param(self, key, value, step=None, **extra): 36 pass 37 38 def log_event(self, event, key, value, step=None, **extra): 39 pass 40 41 def log_extra(self, **kwargs): 42 pass 43 44 def log_data(self, **kwargs): 45 pass 46 47 def data(self, experiment=None, run=None, event=None, step=None, key=None, raw=False, **query): 48 pass
49 50
[docs] 51class OmegaSimpleTracker(TrackingProvider): 52 """ A tracking provider that logs to an omegaml dataset 53 54 Usage:: 55 56 with om.runtime.experiment(provider='default') as exp: 57 ... 58 exp.log_metric('accuracy', .78) 59 60 .. versionchanged:: 0.17 61 any extra 62 63 """ 64 _provider = 'simple' 65 _experiment = None 66 _startdt = None 67 _stopdt = None 68 _autotrack = False 69 70 _ensure_active = lambda self, r: r if r is not None else _raise( 71 ValueError('no active run, call .start() or .use() ')) 72 73 def __init__(self, *args, **kwargs): 74 super().__init__(*args, **kwargs) 75 self.log_buffer = [] 76 self.max_buffer = 10 77 self._initialize_dataset() 78
[docs] 79 def active_run(self, run=None): 80 """ set the lastest run as the active run 81 82 Args: 83 run (int|str): optional or unique task id, if None the 84 latest active run will be set, or a new run is created if 85 no active run exists. 86 87 Returns: 88 current run (int) 89 """ 90 if run is None: 91 latest = self._latest_run 92 latest_is_active = (latest is not None and self.status(run=latest) == 'STARTED') 93 self._run = latest if latest_is_active else self.start(run=None) 94 else: 95 self._run = run 96 self._experiment = self._experiment or uuid4().hex 97 return self._run
98
[docs] 99 def use(self, run=None): 100 """ reuse the latest run instead of starting a new one 101 102 semantic sugar for self.active_run() 103 104 Returns: 105 self 106 """ 107 self.active_run(run=run) 108 return self
109 110 @property 111 def autotrack(self): 112 return self._autotrack 113 114 @autotrack.setter 115 def autotrack(self, value): 116 self._autotrack = value 117 118 @property 119 def _latest_run(self): 120 cursor = self.data(event='start', run='*', lazy=True) 121 data = list(cursor.sort('data.run', -1).limit(1)) if cursor else None 122 run = data[-1].get('data', {}).get('run') if data is not None and len(data) > 0 else None 123 return run 124
[docs] 125 def status(self, run=None): 126 """ status of a run 127 128 Args: 129 run (int): the run number, defaults to the currently active run 130 131 Returns: 132 status in 'STARTED', 'STOPPED' 133 """ 134 self._run = run or self._run or self._latest_run 135 data = self.data(event=['start', 'stop'], run=self._run, raw=True) 136 no_runs = data is None or len(data) == 0 137 has_stop = sum(1 for row in (data or []) if row.get('event') == 'stop') 138 return 'PENDING' if no_runs else 'STOPPED' if has_stop else 'STARTED'
139
[docs] 140 def start(self, run=None, immediate=True): 141 """ start a new run 142 143 This starts a new run and logs the start event 144 """ 145 self._run = run or (self._latest_run or 0) + 1 146 self._startdt = datetime.utcnow() 147 data = self._common_log_data('start', key=None, value=None, step=None, dt=self._startdt) 148 self._write_log(data, immediate=immediate) 149 return self._run
150
[docs] 151 def stop(self, flush=True): 152 """ stop the current run 153 154 This stops the current run and records the stop event 155 """ 156 self._stopdt = datetime.utcnow() 157 data = self._common_log_data('stop', key=None, value=None, step=None, dt=self._stopdt) 158 self._write_log(data) 159 self.flush() if flush else None
160 161 def flush(self): 162 # passing list of list, as_many=True => collection.insert_many() for speed 163 if self.log_buffer: 164 self._store.put(self.log_buffer, self._data_name, 165 noversion=True, as_many=True) 166 self.log_buffer.clear() 167
[docs] 168 def clear(self, force=False): 169 """ clear all data 170 171 All data is removed from the experiment's dataset. This is not recoverable. 172 173 Args: 174 force (bool): if True, clears all data, otherwise raises an error 175 176 Caution: 177 * this will clear all data and is not recoverable 178 179 Raises: 180 AssertionError: if force is not True 181 182 .. versionadded:: 0.16.2 183 184 """ 185 assert force, "clear() requires force=True to prevent accidental data loss. This will clear all experiment data and is not recoverable." 186 self._store.drop(self._data_name, force=True) 187 self._initialize_dataset(force=True)
188 189 def _common_log_data(self, event, key, value, step=None, dt=None, **extra): 190 if isinstance(value, dict): 191 # shortcut to resolve PassthroughDataset actual values 192 # -- enables storing the actual values of a dataset passed as a PassthroughDataset 193 # TODO: should this be the responsibility of SimpleTracker? 194 if isinstance(value.get('args'), (list, tuple)): 195 value['args'] = [getattr(arg, '_passthrough_data', arg) for arg in value['args']] 196 if isinstance(value.get('kwargs'), dict): 197 value['kwargs'] = { 198 k: getattr(v, '_passthrough_data', v) for k, v in value['kwargs'].items() 199 } 200 data = { 201 'experiment': self._experiment, 202 'run': self._ensure_active(self._run), 203 'step': step, 204 'event': event, 205 'key': key or event, 206 'value': value, 207 'dt': dt or datetime.utcnow(), 208 'node': os.environ.get('HOSTNAME', platform.node()), 209 'userid': self.userid, 210 } 211 # add **extra, check for duplicate keys to avoid overwriting 212 dupl_keys = set(data.keys()) & set(extra.keys()) 213 if dupl_keys: 214 raise ValueError(f'duplicate extra keys : {dupl_keys}') 215 data.update(extra) 216 data.update(self._extra_log) if self._extra_log else None 217 return data 218 219 def _write_log(self, data, immediate=False): 220 self.log_buffer.append(data) 221 if immediate or len(self.log_buffer) > self.max_buffer: 222 self.flush() 223
[docs] 224 def log_artifact(self, obj, name, step=None, dt=None, event=None, key=None, **extra): 225 """ log any object to the current run 226 227 Usage:: 228 229 # log an artifact 230 exp.log_artifact(mydict, 'somedata') 231 232 # retrieve back 233 mydict_ = exp.restore_artifact('somedata') 234 235 Args: 236 obj (obj): any object to log 237 name (str): the name of artifact 238 step (int): the step, if any 239 **extra: any extra data to log 240 241 Notes: 242 * bool, str, int, float, list, dict are stored as ``format=type`` 243 * Metadata is stored as ``format=metadata`` 244 * objects supported by ``om.models`` are stored as ``format=model`` 245 * objects supported by ``om.datasets`` are stored as ``format=dataset`` 246 * all other objects are pickled and stored as ``format=pickle`` 247 """ 248 event = event or 'artifact' 249 key = key or name 250 if isinstance(obj, (bool, str, int, float, list, dict)): 251 format = 'type' 252 rawdata = obj 253 elif isinstance(obj, Metadata): 254 format = 'metadata' 255 rawdata = obj.to_json() 256 elif self._model_store.get_backend_byobj(obj) is not None: 257 objname = uuid4().hex 258 meta = self._model_store.put(obj, f'.experiments/.artefacts/{objname}') 259 format = 'model' 260 rawdata = meta.name 261 elif self._store.get_backend_byobj(obj) is not None: 262 objname = uuid4().hex 263 meta = self._store.put(obj, f'.experiments/.artefacts/{objname}') 264 format = 'dataset' 265 rawdata = meta.name 266 else: 267 try: 268 rawdata = b64encode(dill.dumps(obj)).decode('utf8') 269 format = 'pickle' 270 except TypeError as e: 271 rawdata = repr(obj) 272 format = 'repr' 273 value = { 274 'name': name, 275 'data': rawdata, 276 'format': format 277 } 278 data = self._common_log_data(event, key, value, step=step, dt=dt, name=name, **extra) 279 self._write_log(data)
280 281 def log_event(self, event, key, value, step=None, dt=None, **extra): 282 data = self._common_log_data(event, key, value, step=step, dt=dt, **extra) 283 self._write_log(data) 284
[docs] 285 def log_param(self, key, value, step=None, dt=None, **extra): 286 """ log an experiment parameter 287 288 Args: 289 key (str): the parameter name 290 value (str|float|int|bool|dict): the parameter value 291 step (int): the step 292 **extra: any other values to store with event 293 294 Notes: 295 * logged as ``event=param`` 296 """ 297 data = self._common_log_data('param', key, value, step=step, dt=dt, **extra) 298 self._write_log(data)
299
[docs] 300 def log_metric(self, key, value, step=None, dt=None, **extra): 301 """ log a metric value 302 303 Args: 304 key (str): the metric name 305 value (str|float|int|bool|dict): the metric value 306 step (int): the step 307 **extra: any other values to store with event 308 309 Notes: 310 * logged as ``event=metric`` 311 """ 312 data = self._common_log_data('metric', key, value, step=step, dt=dt, **extra) 313 self._write_log(data)
314
[docs] 315 def log_data(self, key, value, step=None, dt=None, event=None, **extra): 316 """ log x/y data for model predictions 317 318 This is semantic sugar for log_artifact() using the 'data' event. 319 320 Args: 321 key (str): the name of the artifact 322 value (any): the x/y data 323 step (int): the step 324 dt (datetime): the datetime 325 event (str): the event, defaults to 'data' 326 **extra: any other values to store with event 327 328 Returns: 329 None 330 """ 331 event = event or 'data' 332 self.log_artifact(value, key, step=step, dt=dt, key=key, event=event, **extra)
333
[docs] 334 def log_system(self, key=None, value=None, step=None, dt=None, **extra): 335 """ log system data 336 337 Args: 338 key (str): the key to use, defaults to 'system' 339 value (str|float|int|bool|dict): the parameter value 340 step (int): the step 341 **extra: any other values to store with event 342 343 Notes: 344 * logged as ``event=system`` 345 * logs platform, python version and list of installed packages 346 """ 347 key = key or 'system' 348 value = value or { 349 'platform': platform.uname()._asdict(), 350 'python': '-'.join((platform.python_implementation(), 351 platform.python_version())), 352 'packages': ['=='.join((d.metadata['Name'], d.version)) 353 for d in importlib.metadata.distributions()] 354 } 355 data = self._common_log_data('system', key, value, step=step, dt=dt, **extra) 356 self._write_log(data)
357
[docs] 358 def log_extra(self, remove=False, **kwargs): 359 """ add additional log information for every subsequent logging call 360 361 Args: 362 remove (bool): if True, removes the extra log information 363 kwargs: any key-value pairs to log 364 365 """ 366 self._extra_log = {} if self._extra_log is None else self._extra_log 367 if not remove: 368 self._extra_log.update(kwargs) 369 elif kwargs: 370 from collections import deque as consume 371 deletions = (self._extra_log.pop(k, None) for k in kwargs) 372 consume(deletions, maxlen=0) 373 else: 374 self._extra_log = {}
375
[docs] 376 def data(self, experiment=None, run=None, event=None, step=None, key=None, raw=False, 377 lazy=False, since=None, end=None, batchsize=None, slice=None, **extra): 378 """ build a dataframe of all stored data 379 380 Args: 381 experiment (str|list): the name of the experiment, defaults to its current value 382 run (int|list|str|slice): the run(s) to get data back, defaults to current run, use 'all' for all, 383 1-indexed since first run, or -1 indexed from latest run, can combine both. If run < 0 384 would go before the first run, run 1 will be returned. A slice(start, stop) can be used 385 to specify a range of runs. 386 event (str|list): the event(s) to include 387 step (int|list): the step(s) to include 388 key (str|list): the key(s) to include 389 raw (bool): if True returns the raw data instead of a DataFrame 390 lazy (bool): if True returns the Cursor instead of data, ignores raw 391 since (datetime|timedelta|str): only return data since this date. If both since and run are specified, 392 only matches since the given date are returned. If since is a string it must be parseable 393 by pd.to_datime, or be given in the format '<n><unit:[smhdwMqy]>' for relative times, or a timedelta object. See 394 dtrelative() for details on relative times. 395 end (datetime): only return data until this date 396 batchsize (int): if specified, returns a generator yielding data in batches of batchsize, 397 note that raw is respected, i.e. raw=False yields a DataFrame for every batch, raw=True 398 yields a list of dicts 399 slice (tuple): if specified, returns a slice of the data, e.g. slice=(10, 25) returns rows 10-25, 400 the slice is applied after all other filters 401 402 Returns: 403 For lazy == False: 404 * data (DataFrame) if raw == False 405 * data (list of dicts) if raw == True 406 * None if no data exists 407 408 For lazy == True, no batchsize, regardless of raw: 409 * data (Cursor) for any value of raw 410 411 For lazy == True, with batchsize: 412 * data(generator of list[dict]) if raw = True 413 * data(generator of DataFrame) if raw = False 414 415 .. versionchanged:: 0.16.2 416 run supports negative indexing 417 418 .. versionchanged:: 0.17 419 added batchsize 420 421 .. versionchanged:: 0.17 422 enabled the use of run='*' to retrieve all runs, equivalent of run='all' 423 424 .. versionchanged:: 0.17 425 enabled data(run=, start=, end=, since=), accepting range queries on run, dt and event# 426 """ 427 from functools import cache 428 experiment = experiment or self._experiment 429 # -- flush all buffers before querying 430 self.flush() 431 # -- build filter 432 if since is None: 433 run = run if run is not None else self._run 434 run = ensurelist(run) if not isinstance(run, str) and isinstance(run, Iterable) else run 435 # actual run 436 # -- run is 1-indexed, so we need to adjust for -1 indexing 437 # e.g. -1 means the latest run, -2 the run before that 438 # e.g. latest_run = 5, run=-1 means 5, run=-2 means 4 etc. 439 # -- run can be a list, in which case we adjust run < 0 for each element 440 # -- run can never be less than 1 (1-indexed), even if run << 0 441 last_run = cache( 442 lambda: int(self._latest_run or 0)) # PERF/consistency: memoize the last run per each .data() call 443 relative_run = lambda r: max(1, 1 + last_run() + r) 444 if isinstance(run, list) and any(r < 0 for r in run): 445 run = [(r if r >= 0 else relative_run(r)) for r in run] 446 elif isinstance(run, int) and run < 0: 447 run = relative_run(run) 448 filter = self._build_data_filter(experiment, run, event, step, key, since, end, extra) 449 450 def read_data(cursor): 451 data = pd.DataFrame.from_records(cursor) 452 if 'dt' in data.columns: 453 data['dt'] = pd.to_datetime(data['dt'], errors='coerce') 454 data.sort_values('dt', inplace=True) 455 return data 456 457 def read_data_batched(cursor, batchsize, slice): 458 from builtins import slice as t_slice 459 if cursor is None: 460 yield None 461 return 462 if slice: 463 slice = (slice.start or 0, slice.stop or 0) if isinstance(slice, t_slice) else slice 464 cursor.skip(slice[0]) 465 cursor.limit(slice[1] - slice[0]) 466 batchsize = batchsize or (slice[1] - slice[0]) 467 for rows in batched(cursor, batchsize): 468 data = (r.get('data') for r in rows) 469 yield read_data(data) if not raw else list(data) 470 471 if batchsize or slice: 472 data = self._store.get(self._data_name, filter=filter, lazy=True, trusted=signature(filter)) 473 data = read_data_batched(data, batchsize, slice) 474 if slice and not batchsize: 475 # try to resolve just one iteration 476 data, _ = tryOr(lambda: (next(data), data.close()), (None, None)) 477 else: 478 data = self._store.get(self._data_name, filter=filter, lazy=lazy, trusted=signature(filter)) 479 data = read_data(data) if data is not None and not lazy and not raw else data 480 return data
481 482 def _build_data_filter(self, experiment, run, event, step, key, since, end, extra): 483 # build a filter for the data query, suitable for OmegaStore.get() 484 filter = {} 485 valid = lambda s: s is not None and str(s).lower() not in ('all', '*') 486 # SEC: ensure all values are basic types, to prevent operator injection 487 valid_types = (str, int, float, list, tuple, date, datetime) 488 op = lambda s: {'$in': ensurelist(s)} if isinstance(s, (list, tuple, np.ndarray)) else ensure_type(s, 489 valid_types) 490 ensure_type = lambda v, t: v if isinstance(v, t) else str(v) 491 if valid(experiment): 492 filter['data.experiment'] = op(experiment) 493 if valid(run): 494 if isinstance(run, slice): 495 filter['data.run'] = {'$gte': run.start, '$lte': run.stop} 496 else: 497 filter['data.run'] = op(run) 498 if valid(event): 499 filter['data.event'] = op(event) 500 if valid(step): 501 filter['data.step'] = op(step) 502 if valid(key): 503 filter['data.key'] = op(key) 504 if valid(since): 505 dtnow = getattr(self, '_since_dtnow', datetime.utcnow()) 506 if isinstance(since, str): 507 since = tryOr(lambda: pd.to_datetime(since), lambda: dtrelative('-' + since, now=dtnow)) 508 elif isinstance(since, timedelta): 509 since = dtnow - since 510 elif isinstance(since, datetime): 511 since = since 512 else: 513 raise ValueError( 514 f'invalid since value: {since}, must be datetime, timedelta or string in format "<n><unit:[smhdwMqy]>"') 515 filter['data.dt'] = {'$gte': str(since.isoformat())} 516 if valid(end): 517 dtnow = getattr(self, '_since_dtnow', datetime.utcnow()) 518 if isinstance(end, str): 519 end = tryOr(lambda: pd.to_datetime(end), lambda: dtrelative('+' + end, now=dtnow)) 520 elif isinstance(end, timedelta): 521 end = dtnow + end 522 elif isinstance(end, datetime): 523 end = end 524 else: 525 raise ValueError( 526 f'invalid end value: {end}, must be datetime, timedelta or string in format "<n><unit:[smhdwMqy]>"') 527 filter['data.dt'] = filter.setdefault('data.dt', {}) 528 filter['data.dt']['$lte'] = str(end.isoformat()) 529 for k, v in extra.items(): 530 if valid(v): 531 fk = f'data.{k}' 532 filter[fk] = op(v) 533 return filter 534 535 @property 536 def dataset(self): 537 return self._data_name 538 539 @property 540 def stats(self): 541 from omegaml.backends.tracking.statistics import ExperimentStatistics 542 return ExperimentStatistics(self) 543 544 def summary(self, **kwargs): 545 return self.stats.summary(**kwargs) 546 547 def _initialize_dataset(self, force=False): 548 # create indexes when the dataset is first created 549 if not force and self._store.exists(self._data_name): 550 return 551 coll = self._store.collection(self._data_name) 552 idxs = [ 553 {'data.run': pymongo.ASCENDING, 'data.event': pymongo.ASCENDING, 'data.key': pymongo.ASCENDING, 554 'data.experiment': pymongo.ASCENDING}, 555 {'data.dt': pymongo.ASCENDING, 'data.event': pymongo.ASCENDING, 'data.key': pymongo.ASCENDING, 556 'data.experiment': pymongo.ASCENDING}, 557 {'data.dt': pymongo.ASCENDING, 'data.event': pymongo.ASCENDING, 'data.experiment': pymongo.ASCENDING}, 558 ] 559 for specs in idxs: 560 ensure_index(coll, specs) 561
[docs] 562 def restore_artifact(self, *args, **kwargs): 563 """ restore a specific logged artifact 564 565 .. versionchanged:: 0.17 566 deprecated, use exp.restore_artifacts() instead 567 """ 568 warnings.warn('deprecated, use exp.restore_artifacts() instead', DeprecationWarning) 569 restored = self.restore_artifacts(*args, **kwargs) 570 return restored[-1] if restored else None
571
[docs] 572 def restore_artifacts(self, key=None, experiment=None, run=None, since=None, step=None, value=None, event=None, 573 name=None): 574 """ restore logged artifacts 575 576 Args: 577 key (str): the name of the artifact as provided in log_artifact 578 run (int): the run for which to query, defaults to current run 579 since (datetime): only return data since this date 580 step (int): the step for which to query, defaults to all steps in run 581 value (dict|list): dict or list of dict, this value is used instead of 582 querying data, use to retrieve an artifact from contents of ``.data()`` 583 584 Returns: 585 list of restored objects 586 587 Notes: 588 * this will restore the artifact according to its type assigned 589 by ``.log_artifact()``. If the type cannot be determined, the 590 actual data is returned 591 592 Updates: 593 * since 0.17: return list of objects instead of last object 594 """ 595 event = event or 'artifact' 596 name = name or '*' 597 if value is None: 598 all_data = self.data(experiment=experiment, run=run, since=since, event=event, 599 step=step, key=key, raw=True, name=name) 600 else: 601 all_data = [{'value': value}] if isinstance(value, dict) else value 602 restored = [] 603 all_data = all_data or [] 604 for item in all_data: 605 data = item.get('value') 606 if data['format'] == 'type': 607 obj = data['data'] 608 elif data['format'] == 'metadata': 609 meta = self._store._Metadata 610 obj = meta.from_json(data['data']) 611 elif data['format'] == 'dataset': 612 obj = self._store.get(data['data']) 613 elif data['format'] == 'model': 614 obj = self._model_store.get(data['data']) 615 elif data['format'] == 'pickle': 616 obj = dill.loads(b64decode((data['data']).encode('utf8'))) 617 else: 618 obj = data.get('data', data) 619 restored.append(obj) 620 return restored
621
[docs] 622 def restore_data(self, key, run=None, event=None, since=None, concat=True, **extra): 623 """ restore x/y data for model predictions 624 625 This is semantic sugar for restore_artifacts() using the event='data' event. 626 627 Args: 628 key (str): the name of the artifact 629 run (int): the run for which to query, defaults to current run 630 event (str): the event, defaults to 'data' 631 since (datetime): only return data since this date 632 concat (bool): if True, concatenates the data into a single object, 633 in this case all data must be of the same type. Defaults to True. 634 **extra: any other values to store with event 635 636 Returns: 637 list of restored objects 638 """ 639 event = event or 'data' 640 641 def _concat(values): 642 if values is None: 643 return None 644 if len(values) and isinstance(values[0], (pd.DataFrame, pd.Series)): 645 ensure_df_or_series = lambda v: pd.Series(v) if isinstance(v, (np.ndarray, list)) else v 646 return pd.concat((ensure_df_or_series(v) for v in values), axis=0) 647 elif len(values) and isinstance(values[0], np.ndarray): 648 return np.concatenate(values, axis=0) 649 # chain seems to be the fastests approach 650 # -- https://stackoverflow.com/a/56407963/890242 651 return list(chain(*values)) 652 653 restored = self.restore_artifacts(run=run, key=key, event=event, since=since, **extra) 654 return restored if not concat else _concat(restored)
655 656 657from datetime import datetime, timedelta 658 659 660def dtrelative(delta, now=None, as_delta=False): 661 """ return a datetime relative to now 662 663 Args: 664 delta (str|timedelta): the relative delta, if a string, specify as '[+-]<n><unit:[smhdwMqy]>', 665 e.g. '1d' for one day, '-1d' for one day ago, '+1d' for one day from now. Special cases: 666 '-0y' means the beginning of the current year, '+0y' means the end of the current year. 667 smhdwMqy = seconds, minutes, hours, days, weeks, months, quarters, years 668 now (datetime): the reference datetime, defaults to datetime.utcnow() 669 as_delta (bool): if True, returns a timedelta object, otherwise a datetime object 670 671 Returns: 672 datetime|timedelta: the relative datetime or timedelta 673 """ 674 # Parse the numeric part and the unit from the specifier 675 UNIT_MAP = {'s': 1, # 1 second 676 'm': 60, # 1 minute 677 'h': 60 * 60, # 1 hour 678 'd': 24 * 60 * 60, # 1 day 679 'w': 7 * 24 * 60 * 60, # 1 week 680 'n': 30 * 24 * 60 * 60, # 1 month 681 'q': 90 * 24 * 60 * 60, # 1 quarter 682 'y': 365 * 24 * 60 * 60} # 1 year 683 now = now or datetime.utcnow() 684 error_msg = f"Invalid delta {delta}. Use a string of format '<n><unit:[hdwmqy]>' or timedelta object." 685 if isinstance(delta, str): 686 try: 687 past = delta.startswith('-') 688 delta = (delta 689 .replace('-', '') 690 .replace('+', '') # Remove the sign 691 .replace(' ', '') # Remove spaces 692 .replace('M', 'n') # m is ambiguous, so we use n for month 693 .lower()) 694 num = int(delta[:-1]) # The numeric part 695 units = UNIT_MAP.get(delta[-1]) # The last character 696 if delta[-1] == 'y' and num == 0: 697 # special case -0y means beginning of the year, +0y means end of the year 698 dtdelta = (datetime(now.year, 1, 1) - now) if past else (datetime(now.year, 12, 31) - now) 699 else: 700 dtdelta = timedelta(seconds=num * units * (-1 if past else 1)) 701 except: 702 raise ValueError(error_msg) 703 elif isinstance(delta, timedelta): 704 dtdelta = delta 705 else: 706 raise ValueError(error_msg) 707 return now + dtdelta if not as_delta else dtdelta