1from omegaml.backends.tracking.simple import OmegaSimpleTracker
2
3
4class BackgroundProfiler:
5 """ Profile CPU, Memory and Disk use in a background thread
6 """
7
8 def __init__(self, interval=10, callback=print):
9 self._stop = False
10 self._interval = interval
11 self._callback = callback
12 self._metrics = ['cpu', 'memory', 'disk']
13
14 def profile(self):
15 """
16 returns memory and cpu data as a dict
17 memory_load (float): percent of total memory used
18 memory_total (float): total memory in bytes
19 cpu_load (list): cpu load in percent, per cpu
20 cpu_count (int): logical cpus,
21 cpu_freq (float): MHz of current cpu frequency
22 cpu_avg (list): list of cpu load over last 1, 5, 15 minutes
23 disk_use (float): percent of disk used
24 disk_total (int): total size of disk, bytes
25 """
26 import psutil
27 from datetime import datetime as dt
28 p = psutil
29 disk = p.disk_usage('/')
30 cpu_count = p.cpu_count()
31 data = {'profile_dt': dt.utcnow()}
32 if 'memory' in self._metrics:
33 data.update(memory_load=p.virtual_memory().percent,
34 memory_total=p.virtual_memory().total)
35 if 'cpu' in self._metrics:
36 data.update(cpu_load=p.cpu_percent(percpu=True),
37 cpu_count=cpu_count,
38 cpu_freq=[f.current for f in p.cpu_freq(percpu=True)],
39 cpu_avg=[x / cpu_count for x in p.getloadavg()])
40 if 'disk' in self._metrics:
41 data.update(disk_use=disk.percent,
42 disk_total=disk.total)
43 return data
44
45 @property
46 def interval(self):
47 return self._interval
48
49 @interval.setter
50 def interval(self, interval):
51 self._interval = interval
52 self.stop()
53 self.start()
54
55 @property
56 def metrics(self):
57 return self._metrics
58
59 @metrics.setter
60 def metrics(self, metrics):
61 self._metrics = metrics
62
63 def start(self):
64 """ runs a background thread that reports stats every interval seconds
65
66 Every interval, calls callback(data), where data is the output of BackgroundProfiler.profile()
67 Stop by BackgroundProfiler.stop()
68 """
69 import atexit
70 from threading import Thread
71 from time import sleep
72
73 def runner():
74 cb = self._callback
75 try:
76 while not self._stop:
77 cb(self.profile())
78 sleep(self._interval)
79 except (KeyboardInterrupt, SystemExit):
80 pass
81
82 # handle exits by stopping the profiler
83 atexit.register(self.stop)
84 # start the profiler
85 self._stop = False
86 t = Thread(target=runner)
87 t.start()
88 # clean up
89 atexit.unregister(self.stop)
90
91 def stop(self):
92 self._stop = True
93
94
[docs]
95class OmegaProfilingTracker(OmegaSimpleTracker):
96 """ A metric tracker that runs a system profiler while the experiment is active
97
98 Will record ``profile`` events that contain cpu, memory and disk profilings.
99 See BackgroundProfiler.profile() for details of the profiling metrics collected.
100
101 Usage:
102
103 To log metrics and system performance data::
104
105 with om.runtime.experiment('myexp', provider='profiling') as exp:
106 ...
107
108 data = exp.data(event='profile')
109
110 Properties::
111
112 exp.profiler.interval = n.m # interval of n.m seconds to profile, defaults to 3 seconds
113 exp.profiler.metrics = ['cpu', 'memory', 'disk'] # all or subset of metrics to collect
114 exp.max_buffer = n # number of items in buffer before tracking
115
116 Notes:
117 - the profiling data is buffered to reduce the number of database writes, by
118 default the data is written on every 6 profiling events (default: 6 * 10 = every 60 seconds)
119 - the step reported in the tracker counts the profiling event since the start, it is not
120 related to the step (epoch) reported by e.g. tensorflow
121 - For every step there is a ``event=profile``, ``key=profile_dt`` entry which you can use
122 to relate profiling events to a specific wall-clock time.
123 - It usually sufficient to report system metrics in intervals > 10 seconds since machine
124 learning algorithms tend to use CPU and memory over longer periods of time.
125 """
126
127 def __init__(self, *args, **kwargs):
128 super().__init__(*args, **kwargs)
129 self.profile_logs = []
130 self.max_buffer = 10
131
[docs]
132 def log_profile(self, data):
133 """ the callback for BackgroundProfiler """
134 self.profile_logs.append(data)
135 if len(self.profile_logs) >= (self.max_buffer or 1):
136 self.flush()
137
138 def flush(self):
139 super().flush()
140
141 def log_items():
142 for step, data in enumerate(self.profile_logs):
143 # record the actual time instead of logging time (avoid buffering delays)
144 dt = data.get('profile_dt')
145 for k, v in data.items():
146 item = self._common_log_data('profile', k, v, step=step, dt=dt)
147 yield item
148
149 if self.profile_logs:
150 # passing list of list, as_many=True => collection.insert_many() for speed
151 self._store.put([item for item in log_items()], self._data_name,
152 index=['event'], as_many=True, noversion=True)
153 self.profile_logs = []
154
155 def start_runtime(self):
156 self.profiler = BackgroundProfiler(callback=self.log_profile)
157 self.profiler.start()
158 super().start_runtime()
159
160 def stop_runtime(self):
161 self.profiler.stop()
162 self.flush()
163 super().stop_runtime()