1import dill
2import importlib
3import numpy as np
4import os
5import pandas as pd
6import platform
7import pymongo
8import warnings
9from base64 import b64encode, b64decode
10from datetime import date
11from itertools import chain
12from typing import Iterable
13from uuid import uuid4
14
15from omegaml.backends.tracking.base import TrackingProvider
16from omegaml.documents import Metadata
17from omegaml.util import _raise, ensure_index, batched, signature, tryOr, ensurelist
18
19
[docs]
20class NoTrackTracker(TrackingProvider):
21 """ A default tracker that does not record anything """
22
23 def start(self, run=None):
24 pass
25
26 def stop(self):
27 pass
28
29 def log_artifact(self, obj, name, step=None, **extra):
30 pass
31
32 def log_metric(self, key, value, step=None, **extra):
33 pass
34
35 def log_param(self, key, value, step=None, **extra):
36 pass
37
38 def log_event(self, event, key, value, step=None, **extra):
39 pass
40
41 def log_extra(self, **kwargs):
42 pass
43
44 def log_data(self, **kwargs):
45 pass
46
47 def data(self, experiment=None, run=None, event=None, step=None, key=None, raw=False, **query):
48 pass
49
50
[docs]
51class OmegaSimpleTracker(TrackingProvider):
52 """ A tracking provider that logs to an omegaml dataset
53
54 Usage::
55
56 with om.runtime.experiment(provider='default') as exp:
57 ...
58 exp.log_metric('accuracy', .78)
59
60 .. versionchanged:: 0.17
61 any extra
62
63 """
64 _provider = 'simple'
65 _experiment = None
66 _startdt = None
67 _stopdt = None
68 _autotrack = False
69
70 _ensure_active = lambda self, r: r if r is not None else _raise(
71 ValueError('no active run, call .start() or .use() '))
72
73 def __init__(self, *args, **kwargs):
74 super().__init__(*args, **kwargs)
75 self.log_buffer = []
76 self.max_buffer = 10
77 self._initialize_dataset()
78
[docs]
79 def active_run(self, run=None):
80 """ set the lastest run as the active run
81
82 Args:
83 run (int|str): optional or unique task id, if None the
84 latest active run will be set, or a new run is created if
85 no active run exists.
86
87 Returns:
88 current run (int)
89 """
90 if run is None:
91 latest = self._latest_run
92 latest_is_active = (latest is not None and self.status(run=latest) == 'STARTED')
93 self._run = latest if latest_is_active else self.start(run=None)
94 else:
95 self._run = run
96 self._experiment = self._experiment or uuid4().hex
97 return self._run
98
[docs]
99 def use(self, run=None):
100 """ reuse the latest run instead of starting a new one
101
102 semantic sugar for self.active_run()
103
104 Returns:
105 self
106 """
107 self.active_run(run=run)
108 return self
109
110 @property
111 def autotrack(self):
112 return self._autotrack
113
114 @autotrack.setter
115 def autotrack(self, value):
116 self._autotrack = value
117
118 @property
119 def _latest_run(self):
120 cursor = self.data(event='start', run='*', lazy=True)
121 data = list(cursor.sort('data.run', -1).limit(1)) if cursor else None
122 run = data[-1].get('data', {}).get('run') if data is not None and len(data) > 0 else None
123 return run
124
[docs]
125 def status(self, run=None):
126 """ status of a run
127
128 Args:
129 run (int): the run number, defaults to the currently active run
130
131 Returns:
132 status in 'STARTED', 'STOPPED'
133 """
134 self._run = run or self._run or self._latest_run
135 data = self.data(event=['start', 'stop'], run=self._run, raw=True)
136 no_runs = data is None or len(data) == 0
137 has_stop = sum(1 for row in (data or []) if row.get('event') == 'stop')
138 return 'PENDING' if no_runs else 'STOPPED' if has_stop else 'STARTED'
139
[docs]
140 def start(self, run=None, immediate=True):
141 """ start a new run
142
143 This starts a new run and logs the start event
144 """
145 self._run = run or (self._latest_run or 0) + 1
146 self._startdt = datetime.utcnow()
147 data = self._common_log_data('start', key=None, value=None, step=None, dt=self._startdt)
148 self._write_log(data, immediate=immediate)
149 return self._run
150
[docs]
151 def stop(self, flush=True):
152 """ stop the current run
153
154 This stops the current run and records the stop event
155 """
156 self._stopdt = datetime.utcnow()
157 data = self._common_log_data('stop', key=None, value=None, step=None, dt=self._stopdt)
158 self._write_log(data)
159 self.flush() if flush else None
160
161 def flush(self):
162 # passing list of list, as_many=True => collection.insert_many() for speed
163 if self.log_buffer:
164 self._store.put(self.log_buffer, self._data_name,
165 noversion=True, as_many=True)
166 self.log_buffer.clear()
167
[docs]
168 def clear(self, force=False):
169 """ clear all data
170
171 All data is removed from the experiment's dataset. This is not recoverable.
172
173 Args:
174 force (bool): if True, clears all data, otherwise raises an error
175
176 Caution:
177 * this will clear all data and is not recoverable
178
179 Raises:
180 AssertionError: if force is not True
181
182 .. versionadded:: 0.16.2
183
184 """
185 assert force, "clear() requires force=True to prevent accidental data loss. This will clear all experiment data and is not recoverable."
186 self._store.drop(self._data_name, force=True)
187 self._initialize_dataset(force=True)
188
189 def _common_log_data(self, event, key, value, step=None, dt=None, **extra):
190 if isinstance(value, dict):
191 # shortcut to resolve PassthroughDataset actual values
192 # -- enables storing the actual values of a dataset passed as a PassthroughDataset
193 # TODO: should this be the responsibility of SimpleTracker?
194 if isinstance(value.get('args'), (list, tuple)):
195 value['args'] = [getattr(arg, '_passthrough_data', arg) for arg in value['args']]
196 if isinstance(value.get('kwargs'), dict):
197 value['kwargs'] = {
198 k: getattr(v, '_passthrough_data', v) for k, v in value['kwargs'].items()
199 }
200 data = {
201 'experiment': self._experiment,
202 'run': self._ensure_active(self._run),
203 'step': step,
204 'event': event,
205 'key': key or event,
206 'value': value,
207 'dt': dt or datetime.utcnow(),
208 'node': os.environ.get('HOSTNAME', platform.node()),
209 'userid': self.userid,
210 }
211 # add **extra, check for duplicate keys to avoid overwriting
212 dupl_keys = set(data.keys()) & set(extra.keys())
213 if dupl_keys:
214 raise ValueError(f'duplicate extra keys : {dupl_keys}')
215 data.update(extra)
216 data.update(self._extra_log) if self._extra_log else None
217 return data
218
219 def _write_log(self, data, immediate=False):
220 self.log_buffer.append(data)
221 if immediate or len(self.log_buffer) > self.max_buffer:
222 self.flush()
223
[docs]
224 def log_artifact(self, obj, name, step=None, dt=None, event=None, key=None, **extra):
225 """ log any object to the current run
226
227 Usage::
228
229 # log an artifact
230 exp.log_artifact(mydict, 'somedata')
231
232 # retrieve back
233 mydict_ = exp.restore_artifact('somedata')
234
235 Args:
236 obj (obj): any object to log
237 name (str): the name of artifact
238 step (int): the step, if any
239 **extra: any extra data to log
240
241 Notes:
242 * bool, str, int, float, list, dict are stored as ``format=type``
243 * Metadata is stored as ``format=metadata``
244 * objects supported by ``om.models`` are stored as ``format=model``
245 * objects supported by ``om.datasets`` are stored as ``format=dataset``
246 * all other objects are pickled and stored as ``format=pickle``
247 """
248 event = event or 'artifact'
249 key = key or name
250 if isinstance(obj, (bool, str, int, float, list, dict)):
251 format = 'type'
252 rawdata = obj
253 elif isinstance(obj, Metadata):
254 format = 'metadata'
255 rawdata = obj.to_json()
256 elif self._model_store.get_backend_byobj(obj) is not None:
257 objname = uuid4().hex
258 meta = self._model_store.put(obj, f'.experiments/.artefacts/{objname}')
259 format = 'model'
260 rawdata = meta.name
261 elif self._store.get_backend_byobj(obj) is not None:
262 objname = uuid4().hex
263 meta = self._store.put(obj, f'.experiments/.artefacts/{objname}')
264 format = 'dataset'
265 rawdata = meta.name
266 else:
267 try:
268 rawdata = b64encode(dill.dumps(obj)).decode('utf8')
269 format = 'pickle'
270 except TypeError as e:
271 rawdata = repr(obj)
272 format = 'repr'
273 value = {
274 'name': name,
275 'data': rawdata,
276 'format': format
277 }
278 data = self._common_log_data(event, key, value, step=step, dt=dt, name=name, **extra)
279 self._write_log(data)
280
281 def log_event(self, event, key, value, step=None, dt=None, **extra):
282 data = self._common_log_data(event, key, value, step=step, dt=dt, **extra)
283 self._write_log(data)
284
[docs]
285 def log_param(self, key, value, step=None, dt=None, **extra):
286 """ log an experiment parameter
287
288 Args:
289 key (str): the parameter name
290 value (str|float|int|bool|dict): the parameter value
291 step (int): the step
292 **extra: any other values to store with event
293
294 Notes:
295 * logged as ``event=param``
296 """
297 data = self._common_log_data('param', key, value, step=step, dt=dt, **extra)
298 self._write_log(data)
299
[docs]
300 def log_metric(self, key, value, step=None, dt=None, **extra):
301 """ log a metric value
302
303 Args:
304 key (str): the metric name
305 value (str|float|int|bool|dict): the metric value
306 step (int): the step
307 **extra: any other values to store with event
308
309 Notes:
310 * logged as ``event=metric``
311 """
312 data = self._common_log_data('metric', key, value, step=step, dt=dt, **extra)
313 self._write_log(data)
314
[docs]
315 def log_data(self, key, value, step=None, dt=None, event=None, **extra):
316 """ log x/y data for model predictions
317
318 This is semantic sugar for log_artifact() using the 'data' event.
319
320 Args:
321 key (str): the name of the artifact
322 value (any): the x/y data
323 step (int): the step
324 dt (datetime): the datetime
325 event (str): the event, defaults to 'data'
326 **extra: any other values to store with event
327
328 Returns:
329 None
330 """
331 event = event or 'data'
332 self.log_artifact(value, key, step=step, dt=dt, key=key, event=event, **extra)
333
[docs]
334 def log_system(self, key=None, value=None, step=None, dt=None, **extra):
335 """ log system data
336
337 Args:
338 key (str): the key to use, defaults to 'system'
339 value (str|float|int|bool|dict): the parameter value
340 step (int): the step
341 **extra: any other values to store with event
342
343 Notes:
344 * logged as ``event=system``
345 * logs platform, python version and list of installed packages
346 """
347 key = key or 'system'
348 value = value or {
349 'platform': platform.uname()._asdict(),
350 'python': '-'.join((platform.python_implementation(),
351 platform.python_version())),
352 'packages': ['=='.join((d.metadata['Name'], d.version))
353 for d in importlib.metadata.distributions()]
354 }
355 data = self._common_log_data('system', key, value, step=step, dt=dt, **extra)
356 self._write_log(data)
357
375
[docs]
376 def data(self, experiment=None, run=None, event=None, step=None, key=None, raw=False,
377 lazy=False, since=None, end=None, batchsize=None, slice=None, **extra):
378 """ build a dataframe of all stored data
379
380 Args:
381 experiment (str|list): the name of the experiment, defaults to its current value
382 run (int|list|str|slice): the run(s) to get data back, defaults to current run, use 'all' for all,
383 1-indexed since first run, or -1 indexed from latest run, can combine both. If run < 0
384 would go before the first run, run 1 will be returned. A slice(start, stop) can be used
385 to specify a range of runs.
386 event (str|list): the event(s) to include
387 step (int|list): the step(s) to include
388 key (str|list): the key(s) to include
389 raw (bool): if True returns the raw data instead of a DataFrame
390 lazy (bool): if True returns the Cursor instead of data, ignores raw
391 since (datetime|timedelta|str): only return data since this date. If both since and run are specified,
392 only matches since the given date are returned. If since is a string it must be parseable
393 by pd.to_datime, or be given in the format '<n><unit:[smhdwMqy]>' for relative times, or a timedelta object. See
394 dtrelative() for details on relative times.
395 end (datetime): only return data until this date
396 batchsize (int): if specified, returns a generator yielding data in batches of batchsize,
397 note that raw is respected, i.e. raw=False yields a DataFrame for every batch, raw=True
398 yields a list of dicts
399 slice (tuple): if specified, returns a slice of the data, e.g. slice=(10, 25) returns rows 10-25,
400 the slice is applied after all other filters
401
402 Returns:
403 For lazy == False:
404 * data (DataFrame) if raw == False
405 * data (list of dicts) if raw == True
406 * None if no data exists
407
408 For lazy == True, no batchsize, regardless of raw:
409 * data (Cursor) for any value of raw
410
411 For lazy == True, with batchsize:
412 * data(generator of list[dict]) if raw = True
413 * data(generator of DataFrame) if raw = False
414
415 .. versionchanged:: 0.16.2
416 run supports negative indexing
417
418 .. versionchanged:: 0.17
419 added batchsize
420
421 .. versionchanged:: 0.17
422 enabled the use of run='*' to retrieve all runs, equivalent of run='all'
423
424 .. versionchanged:: 0.17
425 enabled data(run=, start=, end=, since=), accepting range queries on run, dt and event#
426 """
427 from functools import cache
428 experiment = experiment or self._experiment
429 # -- flush all buffers before querying
430 self.flush()
431 # -- build filter
432 if since is None:
433 run = run if run is not None else self._run
434 run = ensurelist(run) if not isinstance(run, str) and isinstance(run, Iterable) else run
435 # actual run
436 # -- run is 1-indexed, so we need to adjust for -1 indexing
437 # e.g. -1 means the latest run, -2 the run before that
438 # e.g. latest_run = 5, run=-1 means 5, run=-2 means 4 etc.
439 # -- run can be a list, in which case we adjust run < 0 for each element
440 # -- run can never be less than 1 (1-indexed), even if run << 0
441 last_run = cache(
442 lambda: int(self._latest_run or 0)) # PERF/consistency: memoize the last run per each .data() call
443 relative_run = lambda r: max(1, 1 + last_run() + r)
444 if isinstance(run, list) and any(r < 0 for r in run):
445 run = [(r if r >= 0 else relative_run(r)) for r in run]
446 elif isinstance(run, int) and run < 0:
447 run = relative_run(run)
448 filter = self._build_data_filter(experiment, run, event, step, key, since, end, extra)
449
450 def read_data(cursor):
451 data = pd.DataFrame.from_records(cursor)
452 if 'dt' in data.columns:
453 data['dt'] = pd.to_datetime(data['dt'], errors='coerce')
454 data.sort_values('dt', inplace=True)
455 return data
456
457 def read_data_batched(cursor, batchsize, slice):
458 from builtins import slice as t_slice
459 if cursor is None:
460 yield None
461 return
462 if slice:
463 slice = (slice.start or 0, slice.stop or 0) if isinstance(slice, t_slice) else slice
464 cursor.skip(slice[0])
465 cursor.limit(slice[1] - slice[0])
466 batchsize = batchsize or (slice[1] - slice[0])
467 for rows in batched(cursor, batchsize):
468 data = (r.get('data') for r in rows)
469 yield read_data(data) if not raw else list(data)
470
471 if batchsize or slice:
472 data = self._store.get(self._data_name, filter=filter, lazy=True, trusted=signature(filter))
473 data = read_data_batched(data, batchsize, slice)
474 if slice and not batchsize:
475 # try to resolve just one iteration
476 data, _ = tryOr(lambda: (next(data), data.close()), (None, None))
477 else:
478 data = self._store.get(self._data_name, filter=filter, lazy=lazy, trusted=signature(filter))
479 data = read_data(data) if data is not None and not lazy and not raw else data
480 return data
481
482 def _build_data_filter(self, experiment, run, event, step, key, since, end, extra):
483 # build a filter for the data query, suitable for OmegaStore.get()
484 filter = {}
485 valid = lambda s: s is not None and str(s).lower() not in ('all', '*')
486 # SEC: ensure all values are basic types, to prevent operator injection
487 valid_types = (str, int, float, list, tuple, date, datetime)
488 op = lambda s: {'$in': ensurelist(s)} if isinstance(s, (list, tuple, np.ndarray)) else ensure_type(s,
489 valid_types)
490 ensure_type = lambda v, t: v if isinstance(v, t) else str(v)
491 if valid(experiment):
492 filter['data.experiment'] = op(experiment)
493 if valid(run):
494 if isinstance(run, slice):
495 filter['data.run'] = {'$gte': run.start, '$lte': run.stop}
496 else:
497 filter['data.run'] = op(run)
498 if valid(event):
499 filter['data.event'] = op(event)
500 if valid(step):
501 filter['data.step'] = op(step)
502 if valid(key):
503 filter['data.key'] = op(key)
504 if valid(since):
505 dtnow = getattr(self, '_since_dtnow', datetime.utcnow())
506 if isinstance(since, str):
507 since = tryOr(lambda: pd.to_datetime(since), lambda: dtrelative('-' + since, now=dtnow))
508 elif isinstance(since, timedelta):
509 since = dtnow - since
510 elif isinstance(since, datetime):
511 since = since
512 else:
513 raise ValueError(
514 f'invalid since value: {since}, must be datetime, timedelta or string in format "<n><unit:[smhdwMqy]>"')
515 filter['data.dt'] = {'$gte': str(since.isoformat())}
516 if valid(end):
517 dtnow = getattr(self, '_since_dtnow', datetime.utcnow())
518 if isinstance(end, str):
519 end = tryOr(lambda: pd.to_datetime(end), lambda: dtrelative('+' + end, now=dtnow))
520 elif isinstance(end, timedelta):
521 end = dtnow + end
522 elif isinstance(end, datetime):
523 end = end
524 else:
525 raise ValueError(
526 f'invalid end value: {end}, must be datetime, timedelta or string in format "<n><unit:[smhdwMqy]>"')
527 filter['data.dt'] = filter.setdefault('data.dt', {})
528 filter['data.dt']['$lte'] = str(end.isoformat())
529 for k, v in extra.items():
530 if valid(v):
531 fk = f'data.{k}'
532 filter[fk] = op(v)
533 return filter
534
535 @property
536 def dataset(self):
537 return self._data_name
538
539 @property
540 def stats(self):
541 from omegaml.backends.tracking.statistics import ExperimentStatistics
542 return ExperimentStatistics(self)
543
544 def summary(self, **kwargs):
545 return self.stats.summary(**kwargs)
546
547 def _initialize_dataset(self, force=False):
548 # create indexes when the dataset is first created
549 if not force and self._store.exists(self._data_name):
550 return
551 coll = self._store.collection(self._data_name)
552 idxs = [
553 {'data.run': pymongo.ASCENDING, 'data.event': pymongo.ASCENDING, 'data.key': pymongo.ASCENDING,
554 'data.experiment': pymongo.ASCENDING},
555 {'data.dt': pymongo.ASCENDING, 'data.event': pymongo.ASCENDING, 'data.key': pymongo.ASCENDING,
556 'data.experiment': pymongo.ASCENDING},
557 {'data.dt': pymongo.ASCENDING, 'data.event': pymongo.ASCENDING, 'data.experiment': pymongo.ASCENDING},
558 ]
559 for specs in idxs:
560 ensure_index(coll, specs)
561
[docs]
562 def restore_artifact(self, *args, **kwargs):
563 """ restore a specific logged artifact
564
565 .. versionchanged:: 0.17
566 deprecated, use exp.restore_artifacts() instead
567 """
568 warnings.warn('deprecated, use exp.restore_artifacts() instead', DeprecationWarning)
569 restored = self.restore_artifacts(*args, **kwargs)
570 return restored[-1] if restored else None
571
[docs]
572 def restore_artifacts(self, key=None, experiment=None, run=None, since=None, step=None, value=None, event=None,
573 name=None):
574 """ restore logged artifacts
575
576 Args:
577 key (str): the name of the artifact as provided in log_artifact
578 run (int): the run for which to query, defaults to current run
579 since (datetime): only return data since this date
580 step (int): the step for which to query, defaults to all steps in run
581 value (dict|list): dict or list of dict, this value is used instead of
582 querying data, use to retrieve an artifact from contents of ``.data()``
583
584 Returns:
585 list of restored objects
586
587 Notes:
588 * this will restore the artifact according to its type assigned
589 by ``.log_artifact()``. If the type cannot be determined, the
590 actual data is returned
591
592 Updates:
593 * since 0.17: return list of objects instead of last object
594 """
595 event = event or 'artifact'
596 name = name or '*'
597 if value is None:
598 all_data = self.data(experiment=experiment, run=run, since=since, event=event,
599 step=step, key=key, raw=True, name=name)
600 else:
601 all_data = [{'value': value}] if isinstance(value, dict) else value
602 restored = []
603 all_data = all_data or []
604 for item in all_data:
605 data = item.get('value')
606 if data['format'] == 'type':
607 obj = data['data']
608 elif data['format'] == 'metadata':
609 meta = self._store._Metadata
610 obj = meta.from_json(data['data'])
611 elif data['format'] == 'dataset':
612 obj = self._store.get(data['data'])
613 elif data['format'] == 'model':
614 obj = self._model_store.get(data['data'])
615 elif data['format'] == 'pickle':
616 obj = dill.loads(b64decode((data['data']).encode('utf8')))
617 else:
618 obj = data.get('data', data)
619 restored.append(obj)
620 return restored
621
[docs]
622 def restore_data(self, key, run=None, event=None, since=None, concat=True, **extra):
623 """ restore x/y data for model predictions
624
625 This is semantic sugar for restore_artifacts() using the event='data' event.
626
627 Args:
628 key (str): the name of the artifact
629 run (int): the run for which to query, defaults to current run
630 event (str): the event, defaults to 'data'
631 since (datetime): only return data since this date
632 concat (bool): if True, concatenates the data into a single object,
633 in this case all data must be of the same type. Defaults to True.
634 **extra: any other values to store with event
635
636 Returns:
637 list of restored objects
638 """
639 event = event or 'data'
640
641 def _concat(values):
642 if values is None:
643 return None
644 if len(values) and isinstance(values[0], (pd.DataFrame, pd.Series)):
645 ensure_df_or_series = lambda v: pd.Series(v) if isinstance(v, (np.ndarray, list)) else v
646 return pd.concat((ensure_df_or_series(v) for v in values), axis=0)
647 elif len(values) and isinstance(values[0], np.ndarray):
648 return np.concatenate(values, axis=0)
649 # chain seems to be the fastests approach
650 # -- https://stackoverflow.com/a/56407963/890242
651 return list(chain(*values))
652
653 restored = self.restore_artifacts(run=run, key=key, event=event, since=since, **extra)
654 return restored if not concat else _concat(restored)
655
656
657from datetime import datetime, timedelta
658
659
660def dtrelative(delta, now=None, as_delta=False):
661 """ return a datetime relative to now
662
663 Args:
664 delta (str|timedelta): the relative delta, if a string, specify as '[+-]<n><unit:[smhdwMqy]>',
665 e.g. '1d' for one day, '-1d' for one day ago, '+1d' for one day from now. Special cases:
666 '-0y' means the beginning of the current year, '+0y' means the end of the current year.
667 smhdwMqy = seconds, minutes, hours, days, weeks, months, quarters, years
668 now (datetime): the reference datetime, defaults to datetime.utcnow()
669 as_delta (bool): if True, returns a timedelta object, otherwise a datetime object
670
671 Returns:
672 datetime|timedelta: the relative datetime or timedelta
673 """
674 # Parse the numeric part and the unit from the specifier
675 UNIT_MAP = {'s': 1, # 1 second
676 'm': 60, # 1 minute
677 'h': 60 * 60, # 1 hour
678 'd': 24 * 60 * 60, # 1 day
679 'w': 7 * 24 * 60 * 60, # 1 week
680 'n': 30 * 24 * 60 * 60, # 1 month
681 'q': 90 * 24 * 60 * 60, # 1 quarter
682 'y': 365 * 24 * 60 * 60} # 1 year
683 now = now or datetime.utcnow()
684 error_msg = f"Invalid delta {delta}. Use a string of format '<n><unit:[hdwmqy]>' or timedelta object."
685 if isinstance(delta, str):
686 try:
687 past = delta.startswith('-')
688 delta = (delta
689 .replace('-', '')
690 .replace('+', '') # Remove the sign
691 .replace(' ', '') # Remove spaces
692 .replace('M', 'n') # m is ambiguous, so we use n for month
693 .lower())
694 num = int(delta[:-1]) # The numeric part
695 units = UNIT_MAP.get(delta[-1]) # The last character
696 if delta[-1] == 'y' and num == 0:
697 # special case -0y means beginning of the year, +0y means end of the year
698 dtdelta = (datetime(now.year, 1, 1) - now) if past else (datetime(now.year, 12, 31) - now)
699 else:
700 dtdelta = timedelta(seconds=num * units * (-1 if past else 1))
701 except:
702 raise ValueError(error_msg)
703 elif isinstance(delta, timedelta):
704 dtdelta = delta
705 else:
706 raise ValueError(error_msg)
707 return now + dtdelta if not as_delta else dtdelta