Introduction¶
omega-ml supports data streaming from various sources, including
Python
Kafka
MQTT
omega-ml native datasets
A streaming application, in general, consists of three components:
one or more producers
a streaming log
one or more consumers
The streaming log should be capable of producing windows of data to process so that continous processing of the data is possible. This is contrast to batch processing where we wait until all of the data has arrived and then gets processed at once. In a streaming system, producers and consumers act independently and they can be active at different times.
Creating a stream¶
Streams in omega-ml are based on minibatch. minibatch streams are created at first access time:
stream = om.streams.get('mystream')
stream.append(data)
Once we have created a stream, similar to a dataset, there is a Metadata entry:
om.streams.metadata('mystream')
<Metadata: Metadata(name=mystream,bucket=omegaml,prefix=streams/,kind=stream.minibatch,created=2020-09-17 15:46:47.217000)>
Writing to a stream¶
We can write to a stream either by using om.streams.put() or by directly appending to the stream using stream.append(). Technically both ways are valid, using stream.append, avoids the overhead of getting the stream from metadata first.
data = {
'foo': 'bar',
}
om.streams.put(data, 'mystream')
# technically equivalent, use for less overhead in higher frequency scenarios
stream = om.streams.get('mystream')
stream.append(data)
Accessing a stream buffer¶
We can also check the streams’ buffer:
stream = om.streams.get('mystream')
stream.buffer()
=> [<Buffer: Buffer created=[2020-09-17 15:52:33.193000] processed=False data={'foo': 'bar'}>]
Note items in the buffer have not been processed yet, as indicated by the processed flag. That is, a producer has written data to the buffer, but no consumer has yet seen the data.
Consuming streaming data¶
omega-ml supports streaming in mini batches, called Windows. Each window is some subset of the data in a streams buffer. Windows are produced by an emitter strategy. There are several emitter strategies provided out of the box, and custom stratagies can be created if needed.
CountWindow - an emitter that batches N buffered items, where N is any integer
FixedTimeWindow - an emitter that batches data in fixed, absolute intervals
RelaxedTimeWindow - an emitter that batches data in relative intervals
Emitters are created by getting a stream in lazy mode. This returns a streaming function that continuously processes windows in batches, given by the emitter strategy, and forwards each window to a callable provided by the user.
# using the CountWindow strategy, with one item per window
streaming = om.streams.get('mystream', lazy=True)
streaming.apply(lambda window: print(window)
# using the CountWindow strategy, with size=N items per window
streaming = om.streams.get('mystream', lazy=True, size=5)
streaming.apply(lambda window: print(window)
# using the FixedTimeWindow strategy, with absolute intervals of 2 seconds
streaming = om.streams.get('mystream', lazy=True, interval=2)
streaming.apply(lambda window: print(window)
# using the RelaxedTimeWindow strategy, with relativbe intervals of 2 seconds
streaming = om.streams.get('mystream', lazy=True, interval=2, relaxed=True)
streaming.apply(lambda window: print(window)
Writing streaming apps¶
Streaming apps require a continuously running consumer in order to be useful, or at least a consumer that runs as frequently as the emitter strategy needs.
With omega-ml we can create a streaming application as follows:
# consumer.py
import omegaml as om
streaming = om.streams.get('mystream', lazy=True)
@streaming
def consumer(window):
print(window)
# producer
import omegaml as om
stream = om.streams.get('mystream')
data = {
'foo': 'bar'
}
stream.append(data)
When we run the producer and the consumer we get the following output:
# shell 2, run this 3 times
$ python producer.py
# shell 1
$ python consumer.py
Window [2020-09-17 16:15:37.687509] [{'foo': 'bar'}]
Window [2020-09-17 16:15:40.974479] [{'foo': 'bar'}]
Window [2020-09-17 16:15:43.678020] [{'foo': 'bar'}]
Note that producer and consumer are not required to run on the same machine. Since they are connected through the streaming log provided by omega-ml, they just need to use the same stream name in order connect.
Deploying streaming apps¶
omega-ml core does not yet provide an integrated streaming runtime. However, there are several options:
omega-ml commercial edition provides the apphub component, which supports streaming applications out of the box
# deploy the myconsumer script package to apphub $ om scripts put myconsumer apps/myconsumer # access at http://hub.omegaml.io/apps/<user>/myconsumer
run inside a jupyter notebook session
run as a scheduled job and use a non-blocking streaming function
streaming = om.streams.get('mystream', lazy=True, blocking=False) # get only one window streaming.apply(consumer)
any python application can be any consumer or producer as long as it is connected to an omega-ml server
Note
Future releases will include a streaming worker built into the omega-ml native runtime. The syntax will be something like this:
# this will start the streaming consumer on the runtime
om.streams.put(streaming_function, 'myconsumer')