Source code for omegaml.backends.tracking.profiling

  1from omegaml.backends.tracking.simple import OmegaSimpleTracker
  2
  3
  4class BackgroundProfiler:
  5    """ Profile CPU, Memory and Disk use in a background thread
  6    """
  7
  8    def __init__(self, interval=10, callback=print):
  9        self._stop = False
 10        self._interval = interval
 11        self._callback = callback
 12        self._metrics = ['cpu', 'memory', 'disk']
 13
 14    def profile(self):
 15        """
 16        returns memory and cpu data as a dict
 17            memory_load (float): percent of total memory used
 18            memory_total (float): total memory in bytes
 19            cpu_load (list): cpu load in percent, per cpu
 20            cpu_count (int): logical cpus,
 21            cpu_freq (float): MHz of current cpu frequency
 22            cpu_avg (list): list of cpu load over last 1, 5, 15 minutes
 23            disk_use (float): percent of disk used
 24            disk_total (int): total size of disk, bytes
 25        """
 26        import psutil
 27        from datetime import datetime as dt
 28        p = psutil
 29        disk = p.disk_usage('/')
 30        cpu_count = p.cpu_count()
 31        data = {'profile_dt': dt.utcnow()}
 32        if 'memory' in self._metrics:
 33            data.update(memory_load=p.virtual_memory().percent,
 34                        memory_total=p.virtual_memory().total)
 35        if 'cpu' in self._metrics:
 36            data.update(cpu_load=p.cpu_percent(percpu=True),
 37                        cpu_count=cpu_count,
 38                        cpu_freq=[f.current for f in p.cpu_freq(percpu=True)],
 39                        cpu_avg=[x / cpu_count for x in p.getloadavg()])
 40        if 'disk' in self._metrics:
 41            data.update(disk_use=disk.percent,
 42                        disk_total=disk.total)
 43        return data
 44
 45    @property
 46    def interval(self):
 47        return self._interval
 48
 49    @interval.setter
 50    def interval(self, interval):
 51        self._interval = interval
 52        self.stop()
 53        self.start()
 54
 55    @property
 56    def metrics(self):
 57        return self._metrics
 58
 59    @metrics.setter
 60    def metrics(self, metrics):
 61        self._metrics = metrics
 62
 63    def start(self):
 64        """ runs a background thread that reports stats every interval seconds
 65
 66        Every interval, calls callback(data), where data is the output of BackgroundProfiler.profile()
 67        Stop by BackgroundProfiler.stop()
 68        """
 69        import atexit
 70        from threading import Thread
 71        from time import sleep
 72
 73        def runner():
 74            cb = self._callback
 75            try:
 76                while not self._stop:
 77                    cb(self.profile())
 78                    sleep(self._interval)
 79            except (KeyboardInterrupt, SystemExit):
 80                pass
 81
 82        # handle exits by stopping the profiler
 83        atexit.register(self.stop)
 84        # start the profiler
 85        self._stop = False
 86        t = Thread(target=runner)
 87        t.start()
 88        # clean up
 89        atexit.unregister(self.stop)
 90
 91    def stop(self):
 92        self._stop = True
 93
 94
[docs] 95class OmegaProfilingTracker(OmegaSimpleTracker): 96 """ A metric tracker that runs a system profiler while the experiment is active 97 98 Will record ``profile`` events that contain cpu, memory and disk profilings. 99 See BackgroundProfiler.profile() for details of the profiling metrics collected. 100 101 Usage: 102 103 To log metrics and system performance data:: 104 105 with om.runtime.experiment('myexp', provider='profiling') as exp: 106 ... 107 108 data = exp.data(event='profile') 109 110 Properties:: 111 112 exp.profiler.interval = n.m # interval of n.m seconds to profile, defaults to 3 seconds 113 exp.profiler.metrics = ['cpu', 'memory', 'disk'] # all or subset of metrics to collect 114 exp.max_buffer = n # number of items in buffer before tracking 115 116 Notes: 117 - the profiling data is buffered to reduce the number of database writes, by 118 default the data is written on every 6 profiling events (default: 6 * 10 = every 60 seconds) 119 - the step reported in the tracker counts the profiling event since the start, it is not 120 related to the step (epoch) reported by e.g. tensorflow 121 - For every step there is a ``event=profile``, ``key=profile_dt`` entry which you can use 122 to relate profiling events to a specific wall-clock time. 123 - It usually sufficient to report system metrics in intervals > 10 seconds since machine 124 learning algorithms tend to use CPU and memory over longer periods of time. 125 """ 126 127 def __init__(self, *args, **kwargs): 128 super().__init__(*args, **kwargs) 129 self.profile_logs = [] 130 self.max_buffer = 10 131
[docs] 132 def log_profile(self, data): 133 """ the callback for BackgroundProfiler """ 134 self.profile_logs.append(data) 135 if len(self.profile_logs) >= (self.max_buffer or 1): 136 self.flush()
137 138 def flush(self): 139 super().flush() 140 141 def log_items(): 142 for step, data in enumerate(self.profile_logs): 143 # record the actual time instead of logging time (avoid buffering delays) 144 dt = data.get('profile_dt') 145 for k, v in data.items(): 146 item = self._common_log_data('profile', k, v, step=step, dt=dt) 147 yield item 148 149 if self.profile_logs: 150 # passing list of list, as_many=True => collection.insert_many() for speed 151 self._store.put([item for item in log_items()], self._data_name, 152 index=['event'], as_many=True, noversion=True) 153 self.profile_logs = [] 154 155 def start_runtime(self): 156 self.profiler = BackgroundProfiler(callback=self.log_profile) 157 self.profiler.start() 158 super().start_runtime() 159 160 def stop_runtime(self): 161 self.profiler.stop() 162 self.flush() 163 super().stop_runtime()