Source code for omegaml.notebook.jobschedule

  1import datetime
  2from celery.schedules import crontab
  3from croniter import croniter
  4
  5
[docs] 6class JobSchedule(object): 7 """ 8 Produce a cron tab spec from text, time periods, or a crontab spec 9 10 Given any specification format, can translate to a human readable 11 text. 12 13 If using time periods (minute, hour, weekday, monthday, month), 14 any argument not specified defaults to '*' for this argument. 15 16 If using text, sepearate each time part (weekday, hour, month) 17 by a comma. To specify multiple times for a part, use / instead 18 of comma. 19 20 Examples:: 21 22 # using text 23 JobSchedule('friday, at 06:00/08:00/10:00') 24 JobSchedule('Mondays and Fridays, at 06:00') 25 JobSchedule('every 5 minutes, on weekends, in april') 26 27 # using time periods 28 JobSchedule(weekday='mon-fri', at='06:05,12:05') 29 JobSchedule(weekday='mon-fri', at='06:00') 30 JobSchedule(month='every 2', at='08:00', weekday='every 3') 31 32 # using a crontab spec 33 JobSchedule('05 06,12 * * mon-fri') 34 35 # given a valid specification get human readable text or crontab format 36 JobSchedule('05 06,12 * * mon-fri').text 37 JobSchedule('Mondays and Fridays, at 06:00').cron 38 39 Args: 40 text (str): the natural language specification, with time parts 41 separated by comma 42 at (str): the hh:mm specification, equal to hour=hh, minute=mm 43 minute (str): run on 0-59th minute in every specified hour 44 hour (str): run on 0-23th hour on every specified day 45 weekday (str): run on 0-6th day in every week (0 is Sunday), 46 can also be specified as mon/tue/wed/thu/fri/sat/sun 47 monthday (str): run on 1-31th day in every month 48 month (str): run on 1-12th day of every year 49 50 Raises: 51 ValueError if the given specification is not correct 52 """ 53 54 def __init__(self, text=None, minute='*', hour='*', weekday='*', 55 monthday='*', month='*', at=None): 56 # if we get text, attempt to convert 57 if text: 58 try: 59 self.sched = self.from_cron(text).sched 60 except Exception as e: 61 # assume natural language 62 self.sched = self._convert_text(text).sched 63 return 64 # no text, but time periods 65 # get times 66 if at: 67 # 06:00 => hour=6 / minute=00 68 # 06:00,12:00 => hour=6,12 / minute=00 69 hours, minutes = [], [] 70 for att in at.split(','): 71 h, m = att.split(':') 72 hours.append(h) 73 minutes.append(m) 74 hour = ','.join(sorted(set(hours))) 75 minute = ','.join(sorted(set(minutes))) 76 # every n => */n 77 minute = self._expand_every(minute) 78 hour = self._expand_every(hour) 79 weekday = self._expand_every(weekday) 80 monthday = self._expand_every(monthday) 81 month = self._expand_every(month) 82 # long to short, e.g. friday => fri 83 weekday = self._convert_weekdays(weekday) 84 # month to number, e.g. january = 1 85 month = self._convert_months(month) 86 # get a cron spec 87 self.sched = crontab(minute=minute, 88 hour=hour, 89 day_of_month=monthday, 90 day_of_week=weekday, 91 month_of_year=month) 92 # make sure the spec can be processed by croniter 93 if not croniter.is_valid(self.cron): 94 raise ValueError("{cronspec} is not a valid schedule") 95
[docs] 96 @classmethod 97 def from_cron(cls, cronspec): 98 """ initialize JobSchedule from a cron specifier""" 99 (minute, hour, monthday, month, weekday) = cronspec.split(' ') 100 return JobSchedule(minute=minute, hour=hour, monthday=monthday, 101 month=month, weekday=weekday)
102
[docs] 103 @classmethod 104 def from_text(cls, text): 105 """ initialize JobSchedule from a weekday, hour, month specifier""" 106 return JobSchedule(text=text)
107 108 @property 109 def cron(self): 110 """ return the cron representation of the schedule """ 111 # adopted from https://docs.celeryproject.org/en/latest/_modules/celery/schedules.html#schedule 112 cron_repr = ('{0._orig_minute} {0._orig_hour} {0._orig_day_of_month} ' 113 '{0._orig_month_of_year} {0._orig_day_of_week}') 114 return cron_repr.format(self.sched) 115 116 @property 117 def text(self): 118 """ return the human readable representation of the schedule """ 119 from cron_descriptor import get_description 120 return get_description(self.cron) 121
[docs] 122 def next_times(self, n=None, last_run=None): 123 """ return the n next times of this schedule, staring from the last run 124 125 Args: 126 n (int): the n next times 127 last_run (datetime): the last time this was run 128 129 Notes: 130 if n is None returns a never ending iterator 131 132 Returns: 133 iterator of next times 134 135 See Also: 136 * croniter.get_next() 137 """ 138 iter_next = croniter(self.cron, start_time=last_run) 139 while n > 0 or n is None: 140 yield iter_next.get_next(datetime.datetime) 141 n -= 1 if n > 0 else None
142 143 def __repr__(self): 144 return 'JobSchedule(cron={}, text={})'.format(self.cron, self.text) 145 146 def _convert_weekdays(self, v): 147 # convert full name weekdays to short 148 days = dict([('monday', 'mon'), 149 ('tuesday', 'tue'), 150 ('wednesday', 'wed'), 151 ('thursday', 'thu'), 152 ('friday', 'fri'), 153 ('saturday', 'sat'), 154 ('sunday', 'sun'), 155 ('weekday', 'mon-fri'), 156 ('workday', 'mon-fri'), 157 ('working day', 'mon-fri'), 158 ('weekend', 'sat-sun'), 159 ('week-end', 'sat-sun'), 160 ('week end', 'sat-sun'), 161 ]) 162 v = v.lower() 163 for full, short in days.items(): 164 v = v.replace(full, short) 165 return v 166 167 def _has_month(self, v): 168 months = ('january,february,march,april,may,june,july,august,' 169 'september,october,november,december').split(',') 170 long = any(m in v for m in months) 171 short = any(m[0:3] in v for m in months) 172 return long or short 173 174 def _has_day(self, v): 175 days = ('monday,tuesday,wednesday,thursday,friday,saturday,sunday').split(',') 176 long = any(d in v for d in days) 177 short = any(d[0:3] in v for d in days) 178 return long or short 179 180 def _convert_months(self, v): 181 months = ('january,february,march,april,may,june,july,august,' 182 'september,october,november,december').split(',') 183 if not self._has_month(v): 184 return v 185 v = v.lower() 186 for m in months: 187 # full 188 start = v.find(m) 189 if start > -1: 190 v = v.replace(m, str(months.index(m) + 1)) 191 # short 192 sm = m[0:3] 193 start = v.find(sm) 194 if start > -1: 195 v = v.replace(sm, str(months.index(m) + 1)) 196 return v 197 198 def _expand_every(self, v): 199 # convert 'every' specs to cron-like 200 if not isinstance(v, str): 201 return v 202 # every n(nd,rd,th) => */n 203 if '1st' in v or 'first' in v: 204 # 'every 1st' => 1 205 v = '1' 206 if v.startswith('every '): 207 if v.split(' ')[-1].isnumeric(): 208 v = v.replace('every ', '*/') 209 else: 210 v = v.replace('every ', '') 211 for order in ('nd', 'rd', 'th'): 212 v = v.replace(order, '') 213 else: 214 # 'every' => * 215 v = v.replace('every', '*') 216 return v.strip() 217 218 def _convert_text(self, text): 219 # experimental natural language text to crontab 220 # every friday 221 specs = {} 222 # ensure single whitespace 223 orig_text = text 224 text = ' '.join(text.split(' ')).lower() + ' ' 225 # try placing commas 226 text = text.replace(' at ', ', at ') 227 text = ','.join(part for part in text.split(',') if part) 228 229 # get parts separated by comma 230 parts = [part.strip() for part in text.split(',') if part.strip()] 231 try: 232 specs = self._parse_parts(parts) 233 sched = JobSchedule(**specs) 234 except: 235 raise ValueError(f'Cannot parse {orig_text}, read as {specs}') 236 return sched 237 238 def _parse_parts(self, parts): 239 specs = {} 240 for part in parts: 241 # Monday and Friday => Monday/Friday 242 part = part.replace(' and ', ',') 243 part = part.replace('days', 'day') 244 part = part.replace('hours', 'hour') 245 part = part.replace('minutes', 'minute') 246 part = part.replace('ends', 'end') 247 part = part.replace('on ', '') 248 part = part.replace('only ', '') 249 part = part.replace('in ', '') 250 part = part.replace('/', ',') 251 part = part.replace(' through ', '-') 252 part = part.replace(' until ', '-') 253 part = part.replace(' till ', '-') 254 part = part.replace(' to ', '-') 255 part = part.replace(' of ', '') 256 part = part.replace('from ', '') 257 if 'day' in part and 'month' in part: 258 # day 1 of the month 259 specs['monthday'] = part.replace('day', '').replace('month', '').replace('the', '').strip() 260 elif 'month' in part or 'months' in part: 261 # 'every month', 'every 3rd month', 'every 2 months' 262 specs['month'] = part.replace('months', '').replace('month', '').strip() 263 elif self._has_month(part): 264 # 'january', 'every january' 265 specs['month'] = self._convert_months(part).replace('every', '') 266 elif 'daily' in part: 267 specs['weekday'] = '*' 268 elif (self._has_day(part) or 'day' in part or 'week' in part): 269 # 'monday-friday', 'tuesday', 'every 3rd day' 270 part = (self._convert_weekdays(part) 271 .replace('day', '') 272 .strip()) 273 # every mon-fri => mon-fri, every fri 274 if 'every ' in part and '-' in part: 275 part = part.replace('every ', '').strip() 276 specs['weekday'] = part 277 elif ':' in part and 'between' not in part: 278 # at 06:00, at 06:00 am 279 specs['at'] = part.replace('at', '').replace('am', '').replace('pm', '').strip() 280 elif ':' in part and 'between' in part: 281 # between 06:00 and 08:00, between 06:00 am and 08:00 am 282 parts = part.replace('between', '').split(',') 283 specs['at'] = parts[0].replace('at', '').replace('am', '').replace('pm', '').strip() 284 elif 'hour' in part and 'past the hour' not in part: 285 # every hour, hour 6, hour 6/7 286 specs['hour'] = part.replace('hour', '').strip() 287 elif 'hour' in part and 'past the hour' in part: 288 # every 5 minutes past the hour 289 time = part.replace('past the hour', '').replace('at', '').replace('minute', '').strip() 290 specs['minute'] = f'*/{time}' 291 elif 'minute' in part: 292 # every minute, every 3rd minute, every 5 minutes, minute 6/7 293 specs['minute'] = part.replace('minute', '').replace('at ', '').strip() 294 return specs