Source code for omegaml.backends.tracking.simple

  1import importlib
  2import os
  3import platform
  4import warnings
  5from base64 import b64encode, b64decode
  6from datetime import date
  7from itertools import chain
  8from typing import Iterable
  9from uuid import uuid4
 10
 11import dill
 12import numpy as np
 13import pandas as pd
 14import pymongo
 15
 16from omegaml.backends.tracking.base import TrackingProvider
 17from omegaml.documents import Metadata
 18from omegaml.util import _raise, ensure_index, batched, signature, tryOr, ensurelist
 19
 20
[docs] 21class NoTrackTracker(TrackingProvider): 22 """ A default tracker that does not record anything """ 23 24 def start(self, run=None): 25 pass 26 27 def stop(self): 28 pass 29 30 def log_artifact(self, obj, name, step=None, **extra): 31 pass 32 33 def log_metric(self, key, value, step=None, **extra): 34 pass 35 36 def log_param(self, key, value, step=None, **extra): 37 pass 38 39 def log_event(self, event, key, value, step=None, **extra): 40 pass 41 42 def log_events(self, event, key, values, step=None, dt=None, **extra): 43 pass 44 45 def log_extra(self, **kwargs): 46 pass 47 48 def log_data(self, **kwargs): 49 pass 50 51 def data(self, experiment=None, run=None, event=None, step=None, key=None, raw=False, **query): 52 pass
53 54
[docs] 55class OmegaSimpleTracker(TrackingProvider): 56 """ A tracking provider that logs to an omegaml dataset 57 58 Usage:: 59 60 with om.runtime.experiment(provider='default') as exp: 61 ... 62 exp.log_metric('accuracy', .78) 63 64 .. versionchanged:: 0.17 65 any extra 66 67 """ 68 _provider = 'simple' 69 _experiment = None 70 _startdt = None 71 _stopdt = None 72 _autotrack = False 73 74 _ensure_active = lambda self, r: r if r is not None else _raise( 75 ValueError('no active run, call .start() or .use() ')) 76 77 def __init__(self, *args, **kwargs): 78 super().__init__(*args, **kwargs) 79 self.log_buffer = [] 80 self.max_buffer = 10 81 self._initialize_dataset() 82
[docs] 83 def active_run(self, run=None): 84 """ set the lastest run as the active run 85 86 Args: 87 run (int|str): optional or unique task id, if None the 88 latest active run will be set, or a new run is created if 89 no active run exists. 90 91 Returns: 92 current run (int) 93 """ 94 if run is None: 95 latest = self._latest_run 96 latest_is_active = (latest is not None and self.status(run=latest) == 'STARTED') 97 self._run = latest if latest_is_active else self.start(run=None) 98 else: 99 self._run = run 100 self._experiment = self._experiment or uuid4().hex 101 return self._run
102
[docs] 103 def use(self, run=None): 104 """ reuse the latest run instead of starting a new one 105 106 semantic sugar for self.active_run() 107 108 Returns: 109 self 110 """ 111 self.active_run(run=run) 112 return self
113 114 @property 115 def autotrack(self): 116 return self._autotrack 117 118 @autotrack.setter 119 def autotrack(self, value): 120 self._autotrack = value 121 122 @property 123 def _latest_run(self): 124 cursor = self.data(event='start', run='*', lazy=True) 125 data = list(cursor.sort('data.run', -1).limit(1)) if cursor else None 126 run = data[-1].get('data', {}).get('run') if data is not None and len(data) > 0 else None 127 return run 128
[docs] 129 def status(self, run=None): 130 """ status of a run 131 132 Args: 133 run (int): the run number, defaults to the currently active run 134 135 Returns: 136 status in 'STARTED', 'STOPPED' 137 """ 138 self._run = run or self._run or self._latest_run 139 data = self.data(event=['start', 'stop'], run=self._run, raw=True) 140 no_runs = data is None or len(data) == 0 141 has_stop = not no_runs and sum(1 for row in data if row.get('event') == 'stop') 142 return 'PENDING' if no_runs else 'STOPPED' if has_stop else 'STARTED'
143
[docs] 144 def start(self, run=None, immediate=True): 145 """ start a new run 146 147 This starts a new run and logs the start event 148 """ 149 self._run = run or (self._latest_run or 0) + 1 150 self._startdt = datetime.utcnow() 151 data = self._common_log_data('start', key=None, value=None, step=None, dt=self._startdt) 152 self._write_log(data, immediate=immediate) 153 return self._run
154
[docs] 155 def stop(self, flush=True): 156 """ stop the current run 157 158 This stops the current run and records the stop event 159 """ 160 self._stopdt = datetime.utcnow() 161 data = self._common_log_data('stop', key=None, value=None, step=None, dt=self._stopdt) 162 self._write_log(data) 163 self.flush() if flush else None
164 165 def flush(self): 166 # passing list of list, as_many=True => collection.insert_many() for speed 167 if self.log_buffer: 168 self._store.put(self.log_buffer, self._data_name, 169 noversion=True, as_many=True) 170 self.log_buffer.clear() 171
[docs] 172 def clear(self, force=False): 173 """ clear all data 174 175 All data is removed from the experiment's dataset. This is not recoverable. 176 177 Args: 178 force (bool): if True, clears all data, otherwise raises an error 179 180 Caution: 181 * this will clear all data and is not recoverable 182 183 Raises: 184 AssertionError: if force is not True 185 186 .. versionadded:: 0.16.2 187 188 """ 189 assert force, "clear() requires force=True to prevent accidental data loss. This will clear all experiment data and is not recoverable." 190 self._store.drop(self._data_name, force=True) 191 self._initialize_dataset(force=True)
192 193 def _common_log_data(self, event, key, value, step=None, dt=None, run=None, **extra): 194 if isinstance(value, dict): 195 # shortcut to resolve PassthroughDataset actual values 196 # -- enables storing the actual values of a dataset passed as a PassthroughDataset 197 # TODO: should this be the responsibility of SimpleTracker? 198 if isinstance(value.get('args'), (list, tuple)): 199 value['args'] = [getattr(arg, '_passthrough_data', arg) for arg in value['args']] 200 if isinstance(value.get('kwargs'), dict): 201 value['kwargs'] = { 202 k: getattr(v, '_passthrough_data', v) for k, v in value['kwargs'].items() 203 } 204 data = { 205 'experiment': self._experiment, 206 'run': run or self._ensure_active(self._run), 207 'step': step, 208 'event': event, 209 'key': key or event, 210 'value': value, 211 'dt': dt or datetime.utcnow(), 212 'node': os.environ.get('HOSTNAME', platform.node()), 213 'userid': self.userid, 214 } 215 # add **extra, check for duplicate keys to avoid overwriting 216 dupl_keys = set(data.keys()) & set(extra.keys()) 217 if dupl_keys: 218 raise ValueError(f'duplicate extra keys : {dupl_keys}') 219 data.update(extra) 220 data.update(self._extra_log) if self._extra_log else None 221 return data 222 223 def _write_log(self, data, immediate=False): 224 self.log_buffer.append(data) 225 if immediate or len(self.log_buffer) > self.max_buffer: 226 self.flush() 227
[docs] 228 def log_artifact(self, obj, name, step=None, dt=None, event=None, key=None, **extra): 229 """ log any object to the current run 230 231 Usage:: 232 233 # log an artifact 234 exp.log_artifact(mydict, 'somedata') 235 236 # retrieve back 237 mydict_ = exp.restore_artifact('somedata') 238 239 Args: 240 obj (obj): any object to log 241 name (str): the name of artifact 242 step (int): the step, if any 243 **extra: any extra data to log 244 245 Notes: 246 * bool, str, int, float, list, dict are stored as ``format=type`` 247 * Metadata is stored as ``format=metadata`` 248 * objects supported by ``om.models`` are stored as ``format=model`` 249 * objects supported by ``om.datasets`` are stored as ``format=dataset`` 250 * all other objects are pickled and stored as ``format=pickle`` 251 """ 252 event = event or 'artifact' 253 key = key or name 254 if isinstance(obj, (bool, str, int, float, list, dict)): 255 format = 'type' 256 rawdata = obj 257 elif isinstance(obj, Metadata): 258 format = 'metadata' 259 rawdata = obj.to_json() 260 elif self._model_store.get_backend_byobj(obj) is not None: 261 objname = uuid4().hex 262 meta = self._model_store.put(obj, f'.experiments/.artefacts/{objname}') 263 format = 'model' 264 rawdata = meta.name 265 elif self._store.get_backend_byobj(obj) is not None: 266 objname = uuid4().hex 267 meta = self._store.put(obj, f'.experiments/.artefacts/{objname}') 268 format = 'dataset' 269 rawdata = meta.name 270 else: 271 try: 272 rawdata = b64encode(dill.dumps(obj)).decode('utf8') 273 format = 'pickle' 274 except TypeError as e: 275 rawdata = repr(obj) 276 format = 'repr' 277 value = { 278 'name': name, 279 'data': rawdata, 280 'format': format 281 } 282 data = self._common_log_data(event, key, value, step=step, dt=dt, name=name, **extra) 283 self._write_log(data)
284 285 def log_event(self, event, key, value, step=None, dt=None, **extra): 286 data = self._common_log_data(event, key, value, step=step, dt=dt, **extra) 287 self._write_log(data) 288
[docs] 289 def log_events(self, event, key, values, step=None, dt=None, **extra): 290 """ log a series of events 291 292 This is a convenience method to log multiple values for the same event. 293 All values will be logged with the same commong log data, i.e. the same 294 datetime, step, and any extra values. 295 296 Args: 297 event (str): the event name 298 key (str): the key for the event 299 values (list): a list of values to log 300 step (int): the step, if any 301 dt (datetime): the datetime, defaults to now 302 **extra: any other values to store with event 303 304 .. versionadded:: 0.18 305 306 """ 307 data = self._common_log_data(event, key, None, step=step, dt=dt, **extra) 308 for value in values: 309 data['value'] = value 310 self._write_log(dict(data))
311
[docs] 312 def log_param(self, key, value, step=None, dt=None, **extra): 313 """ log an experiment parameter 314 315 Args: 316 key (str): the parameter name 317 value (str|float|int|bool|dict): the parameter value 318 step (int): the step 319 **extra: any other values to store with event 320 321 Notes: 322 * logged as ``event=param`` 323 """ 324 data = self._common_log_data('param', key, value, step=step, dt=dt, **extra) 325 self._write_log(data)
326
[docs] 327 def log_metric(self, key, value, step=None, dt=None, **extra): 328 """ log a metric value 329 330 Args: 331 key (str): the metric name 332 value (str|float|int|bool|dict): the metric value 333 step (int): the step 334 **extra: any other values to store with event 335 336 Notes: 337 * logged as ``event=metric`` 338 """ 339 data = self._common_log_data('metric', key, value, step=step, dt=dt, **extra) 340 self._write_log(data)
341
[docs] 342 def log_data(self, key, value, step=None, dt=None, event=None, **extra): 343 """ log x/y data for model predictions 344 345 This is semantic sugar for log_artifact() using the 'data' event. 346 347 Args: 348 key (str): the name of the artifact 349 value (any): the x/y data 350 step (int): the step 351 dt (datetime): the datetime 352 event (str): the event, defaults to 'data' 353 **extra: any other values to store with event 354 355 Returns: 356 None 357 """ 358 event = event or 'data' 359 self.log_artifact(value, key, step=step, dt=dt, key=key, event=event, **extra)
360
[docs] 361 def log_system(self, key=None, value=None, step=None, dt=None, **extra): 362 """ log system data 363 364 Args: 365 key (str): the key to use, defaults to 'system' 366 value (str|float|int|bool|dict): the parameter value 367 step (int): the step 368 **extra: any other values to store with event 369 370 Notes: 371 * logged as ``event=system`` 372 * logs platform, python version and list of installed packages 373 """ 374 key = key or 'system' 375 value = value or { 376 'platform': platform.uname()._asdict(), 377 'python': '-'.join((platform.python_implementation(), 378 platform.python_version())), 379 'packages': ['=='.join((d.metadata['Name'], d.version)) 380 for d in importlib.metadata.distributions()] 381 } 382 data = self._common_log_data('system', key, value, step=step, dt=dt, **extra) 383 self._write_log(data)
384
[docs] 385 def log_extra(self, remove=False, **kwargs): 386 """ add additional log information for every subsequent logging call 387 388 Args: 389 remove (bool): if True, removes the extra log information 390 kwargs: any key-value pairs to log 391 392 """ 393 self._extra_log = {} if self._extra_log is None else self._extra_log 394 if not remove: 395 self._extra_log.update(kwargs) 396 elif kwargs: 397 from collections import deque as consume 398 deletions = (self._extra_log.pop(k, None) for k in kwargs) 399 consume(deletions, maxlen=0) 400 else: 401 self._extra_log = {}
402
[docs] 403 def data(self, experiment=None, run=None, event=None, step=None, key=None, raw=False, 404 lazy=False, since=None, end=None, batchsize=None, slice=None, **extra): 405 """ build a dataframe of all stored data 406 407 Args: 408 experiment (str|list): the name of the experiment, defaults to its current value 409 run (int|list|str|slice): the run(s) to get data back, defaults to current run, use 'all' for all, 410 1-indexed since first run, or -1 indexed from latest run, can combine both. If run < 0 411 would go before the first run, run 1 will be returned. A slice(start, stop) can be used 412 to specify a range of runs. 413 event (str|list): the event(s) to include 414 step (int|list): the step(s) to include 415 key (str|list): the key(s) to include 416 raw (bool): if True returns the raw data instead of a DataFrame 417 lazy (bool): if True returns the Cursor instead of data, ignores raw 418 since (datetime|timedelta|str): only return data since this date. If both since and run are specified, 419 only matches since the given date are returned. If since is a string it must be parseable 420 by pd.to_datime, or be given in the format '<n><unit:[smhdwMqy]>' for relative times, or a timedelta object. See 421 dtrelative() for details on relative times. 422 end (datetime): only return data until this date 423 batchsize (int): if specified, returns a generator yielding data in batches of batchsize, 424 note that raw is respected, i.e. raw=False yields a DataFrame for every batch, raw=True 425 yields a list of dicts 426 slice (tuple): if specified, returns a slice of the data, e.g. slice=(10, 25) returns rows 10-25, 427 the slice is applied after all other filters 428 429 Returns: 430 For lazy == False: 431 * data (DataFrame) if raw == False 432 * data (list of dicts) if raw == True 433 * None if no data exists 434 435 For lazy == True, no batchsize, regardless of raw: 436 * data (Cursor) for any value of raw 437 438 For lazy == True, with batchsize: 439 * data(generator of list[dict]) if raw = True 440 * data(generator of DataFrame) if raw = False 441 442 .. versionchanged:: 0.16.2 443 run supports negative indexing 444 445 .. versionchanged:: 0.17 446 added batchsize 447 448 .. versionchanged:: 0.17 449 enabled the use of run='*' to retrieve all runs, equivalent of run='all' 450 451 .. versionchanged:: 0.17 452 enabled data(run=, start=, end=, since=), accepting range queries on run, dt and event# 453 """ 454 from functools import cache 455 experiment = experiment or self._experiment 456 # -- flush all buffers before querying 457 self.flush() 458 # -- build filter 459 if since is None: 460 run = run if run is not None else self._run 461 run = ensurelist(run) if not isinstance(run, str) and isinstance(run, Iterable) else run 462 # actual run 463 # -- run is 1-indexed, so we need to adjust for -1 indexing 464 # e.g. -1 means the latest run, -2 the run before that 465 # e.g. latest_run = 5, run=-1 means 5, run=-2 means 4 etc. 466 # -- run can be a list, in which case we adjust run < 0 for each element 467 # -- run can never be less than 1 (1-indexed), even if run << 0 468 last_run = cache( 469 lambda: int(self._latest_run or 0)) # PERF/consistency: memoize the last run per each .data() call 470 relative_run = lambda r: max(1, 1 + last_run() + r) 471 if isinstance(run, list) and any(r < 0 for r in run): 472 run = [(r if r >= 0 else relative_run(r)) for r in run] 473 elif isinstance(run, int) and run < 0: 474 run = relative_run(run) 475 filter = self._build_data_filter(experiment, run, event, step, key, since, end, extra) 476 477 def read_data(cursor): 478 data = pd.DataFrame.from_records(cursor) 479 if 'dt' in data.columns: 480 data['dt'] = pd.to_datetime(data['dt'], errors='coerce') 481 data.sort_values('dt', inplace=True) 482 return data 483 484 def read_data_batched(cursor, batchsize, slice): 485 from builtins import slice as t_slice 486 if cursor is None: 487 yield None 488 return 489 if slice: 490 slice = (slice.start or 0, slice.stop or 0) if isinstance(slice, t_slice) else slice 491 cursor.skip(slice[0]) 492 cursor.limit(slice[1] - slice[0]) 493 batchsize = batchsize or (slice[1] - slice[0]) 494 for rows in batched(cursor, batchsize): 495 data = (r.get('data') for r in rows) 496 yield read_data(data) if not raw else list(data) 497 498 if batchsize or slice: 499 data = self._store.get(self._data_name, filter=filter, lazy=True, trusted=signature(filter)) 500 data = read_data_batched(data, batchsize, slice) 501 if slice and not batchsize: 502 # try to resolve just one iteration 503 data, _ = tryOr(lambda: (next(data), data.close()), (None, None)) 504 else: 505 data = self._store.get(self._data_name, filter=filter, lazy=lazy, trusted=signature(filter)) 506 data = read_data(data) if data is not None and not lazy and not raw else data 507 return data
508 509 def _build_data_filter(self, experiment, run, event, step, key, since, end, extra): 510 # build a filter for the data query, suitable for OmegaStore.get() 511 filter = {} 512 valid = lambda s: s is not None and str(s).lower() not in ('all', '*') 513 # SEC: ensure all values are basic types, to prevent operator injection 514 valid_types = (str, int, float, list, tuple, date, datetime) 515 op = lambda s: {'$in': ensurelist(s)} if isinstance(s, (list, tuple, np.ndarray)) else ensure_type(s, 516 valid_types) 517 ensure_type = lambda v, t: v if isinstance(v, t) else str(v) 518 if valid(experiment): 519 filter['data.experiment'] = op(experiment) 520 if valid(run): 521 if isinstance(run, slice): 522 filter['data.run'] = {'$gte': run.start, '$lte': run.stop} 523 else: 524 filter['data.run'] = op(run) 525 if valid(event): 526 filter['data.event'] = op(event) 527 if valid(step): 528 filter['data.step'] = op(step) 529 if valid(key): 530 filter['data.key'] = op(key) 531 if valid(since): 532 dtnow = getattr(self, '_since_dtnow', datetime.utcnow()) 533 if isinstance(since, str): 534 since = tryOr(lambda: pd.to_datetime(since), lambda: dtrelative('-' + since, now=dtnow)) 535 elif isinstance(since, timedelta): 536 since = dtnow - since 537 elif isinstance(since, datetime): 538 since = since 539 else: 540 raise ValueError( 541 f'invalid since value: {since}, must be datetime, timedelta or string in format "<n><unit:[smhdwMqy]>"') 542 filter['data.dt'] = {'$gte': str(since.isoformat())} 543 if valid(end): 544 dtnow = getattr(self, '_since_dtnow', datetime.utcnow()) 545 if isinstance(end, str): 546 end = tryOr(lambda: pd.to_datetime(end), lambda: dtrelative('+' + end, now=dtnow)) 547 elif isinstance(end, timedelta): 548 end = dtnow + end 549 elif isinstance(end, datetime): 550 end = end 551 else: 552 raise ValueError( 553 f'invalid end value: {end}, must be datetime, timedelta or string in format "<n><unit:[smhdwMqy]>"') 554 filter['data.dt'] = filter.setdefault('data.dt', {}) 555 filter['data.dt']['$lte'] = str(end.isoformat()) 556 for k, v in extra.items(): 557 if valid(v): 558 fk = f'data.{k}' 559 filter[fk] = op(v) 560 return filter 561 562 @property 563 def dataset(self): 564 return self._data_name 565 566 @property 567 def stats(self): 568 from omegaml.backends.tracking.statistics import ExperimentStatistics 569 return ExperimentStatistics(self) 570 571 def summary(self, **kwargs): 572 return self.stats.summary(**kwargs) 573 574 def _initialize_dataset(self, force=False): 575 # create indexes when the dataset is first created 576 if not force and self._store.exists(self._data_name): 577 return 578 coll = self._store.collection(self._data_name) 579 idxs = [ 580 {'data.run': pymongo.ASCENDING, 'data.event': pymongo.ASCENDING, 'data.key': pymongo.ASCENDING, 581 'data.experiment': pymongo.ASCENDING}, 582 {'data.dt': pymongo.ASCENDING, 'data.event': pymongo.ASCENDING, 'data.key': pymongo.ASCENDING, 583 'data.experiment': pymongo.ASCENDING}, 584 {'data.dt': pymongo.ASCENDING, 'data.event': pymongo.ASCENDING, 'data.experiment': pymongo.ASCENDING}, 585 # fast retrieval by event, key, userid across all runs 586 {'data.event': pymongo.ASCENDING, 'data.key': pymongo.ASCENDING, 'data.userid': pymongo.ASCENDING, 587 'data.experiment': pymongo.ASCENDING}, 588 ] 589 for specs in idxs: 590 ensure_index(coll, specs) 591 # self._store.put(coll, self._data_name) 592
[docs] 593 def restore_artifact(self, *args, **kwargs): 594 """ restore a specific logged artifact 595 596 .. versionchanged:: 0.17 597 deprecated, use exp.restore_artifacts() instead 598 """ 599 warnings.warn('deprecated, use exp.restore_artifacts() instead', DeprecationWarning) 600 restored = self.restore_artifacts(*args, **kwargs) 601 return restored[-1] if restored else None
602
[docs] 603 def restore_artifacts(self, key=None, experiment=None, run=None, since=None, step=None, value=None, event=None, 604 name=None): 605 """ restore logged artifacts 606 607 Args: 608 key (str): the name of the artifact as provided in log_artifact 609 run (int): the run for which to query, defaults to current run 610 since (datetime): only return data since this date 611 step (int): the step for which to query, defaults to all steps in run 612 value (dict|list): dict or list of dict, this value is used instead of 613 querying data, use to retrieve an artifact from contents of ``.data()`` 614 615 Returns: 616 list of restored objects 617 618 Notes: 619 * this will restore the artifact according to its type assigned 620 by ``.log_artifact()``. If the type cannot be determined, the 621 actual data is returned 622 623 Updates: 624 * since 0.17: return list of objects instead of last object 625 """ 626 event = event or 'artifact' 627 name = name or '*' 628 if value is None: 629 all_data = self.data(experiment=experiment, run=run, since=since, event=event, 630 step=step, key=key, raw=True, name=name) 631 else: 632 all_data = [{'value': value}] if isinstance(value, dict) else value 633 restored = [] 634 all_data = all_data or [] 635 for item in all_data: 636 data = item.get('value') 637 if data['format'] == 'type': 638 obj = data['data'] 639 elif data['format'] == 'metadata': 640 meta = self._store._Metadata 641 obj = meta.from_json(data['data']) 642 elif data['format'] == 'dataset': 643 obj = self._store.get(data['data']) 644 elif data['format'] == 'model': 645 obj = self._model_store.get(data['data']) 646 elif data['format'] == 'pickle': 647 obj = dill.loads(b64decode((data['data']).encode('utf8'))) 648 else: 649 obj = data.get('data', data) 650 restored.append(obj) 651 return restored
652
[docs] 653 def restore_data(self, key, run=None, event=None, since=None, concat=True, **extra): 654 """ restore x/y data for model predictions 655 656 This is semantic sugar for restore_artifacts() using the event='data' event. 657 658 Args: 659 key (str): the name of the artifact 660 run (int): the run for which to query, defaults to current run 661 event (str): the event, defaults to 'data' 662 since (datetime): only return data since this date 663 concat (bool): if True, concatenates the data into a single object, 664 in this case all data must be of the same type. Defaults to True. 665 **extra: any other values to store with event 666 667 Returns: 668 list of restored objects 669 """ 670 event = event or 'data' 671 672 def _concat(values): 673 if values is None: 674 return None 675 if len(values) and isinstance(values[0], (pd.DataFrame, pd.Series)): 676 ensure_df_or_series = lambda v: pd.Series(v) if isinstance(v, (np.ndarray, list)) else v 677 return pd.concat((ensure_df_or_series(v) for v in values), axis=0) 678 elif len(values) and isinstance(values[0], np.ndarray): 679 return np.concatenate(values, axis=0) 680 # chain seems to be the fastests approach 681 # -- https://stackoverflow.com/a/56407963/890242 682 return list(chain(*values)) 683 684 restored = self.restore_artifacts(run=run, key=key, event=event, since=since, **extra) 685 return restored if not concat else _concat(restored)
686 687 688from datetime import datetime, timedelta 689 690 691def dtrelative(delta, now=None, as_delta=False): 692 """ return a datetime relative to now 693 694 Args: 695 delta (str|timedelta): the relative delta, if a string, specify as '[+-]<n><unit:[smhdwMqy]>', 696 e.g. '1d' for one day, '-1d' for one day ago, '+1d' for one day from now. Special cases: 697 '-0y' means the beginning of the current year, '+0y' means the end of the current year. 698 smhdwMqy = seconds, minutes, hours, days, weeks, months, quarters, years 699 now (datetime): the reference datetime, defaults to datetime.utcnow() 700 as_delta (bool): if True, returns a timedelta object, otherwise a datetime object 701 702 Returns: 703 datetime|timedelta: the relative datetime or timedelta 704 """ 705 # Parse the numeric part and the unit from the specifier 706 UNIT_MAP = {'s': 1, # 1 second 707 'm': 60, # 1 minute 708 'h': 60 * 60, # 1 hour 709 'd': 24 * 60 * 60, # 1 day 710 'w': 7 * 24 * 60 * 60, # 1 week 711 'n': 30 * 24 * 60 * 60, # 1 month 712 'q': 90 * 24 * 60 * 60, # 1 quarter 713 'y': 365 * 24 * 60 * 60} # 1 year 714 now = now or datetime.utcnow() 715 error_msg = f"Invalid delta {delta}. Use a string of format '<n><unit:[hdwmqy]>' or timedelta object." 716 if isinstance(delta, str): 717 try: 718 past = delta.startswith('-') 719 delta = (delta 720 .replace('-', '') 721 .replace('+', '') # Remove the sign 722 .replace(' ', '') # Remove spaces 723 .replace('M', 'n') # m is ambiguous, so we use n for month 724 .lower()) 725 num = int(delta[:-1]) # The numeric part 726 units = UNIT_MAP.get(delta[-1]) # The last character 727 if delta[-1] == 'y' and num == 0: 728 # special case -0y means beginning of the year, +0y means end of the year 729 dtdelta = (datetime(now.year, 1, 1) - now) if past else (datetime(now.year, 12, 31) - now) 730 else: 731 dtdelta = timedelta(seconds=num * units * (-1 if past else 1)) 732 except: 733 raise ValueError(error_msg) 734 elif isinstance(delta, timedelta): 735 dtdelta = delta 736 else: 737 raise ValueError(error_msg) 738 return now + dtdelta if not as_delta else dtdelta