Using Online Machine Learning on a StreamPipes data stream¶
The last tutorial (Getting live data from the StreamPipes data stream) showed how we can connect to a data stream, and it would be possible to use Online Machine Learning with this approach and train a model with the incoming events at the onEvent
method. However, the StreamPipes client also provides an easier way to do this with the use of the River library for Online Machine Learning. We will have a look at this now.
from streampipes.client import StreamPipesClient
from streampipes.client.config import StreamPipesClientConfig
from streampipes.client.credential_provider import StreamPipesApiKeyCredentials
# you can install all required dependecies for this tutorial by executing the following command
%pip install river streampipes
import os
os.environ["SP_USERNAME"] = "admin@streampipes.apache.org"
os.environ["SP_API_KEY"] = "XXX"
# Use this if you work locally:
os.environ["BROKER-HOST"] = "localhost"
os.environ["KAFKA-PORT"] = "9094" # When using Kafka as message broker
client_config = StreamPipesClientConfig(
credential_provider=StreamPipesApiKeyCredentials(),
host_address="localhost",
port=80,
https_disabled=True,
)
client = StreamPipesClient(client_config=client_config)
2023-01-27 16:04:24,784 - streampipes.client.client - [INFO] - [client.py:128] [_set_up_logging] - Logging successfully initialized with logging level INFO.
client.dataStreamApi.all().to_pandas()
2023-01-27 16:04:28,212 - streampipes.endpoint.endpoint - [INFO] - [endpoint.py:163] [_make_request] - Successfully retrieved all resources.
element_id | name | description | icon_url | app_id | includes_assets | includes_locales | internally_managed | measurement_object | index | ... | dom | rev | num_transport_protocols | num_measurement_capability | num_application_links | num_included_assets | num_connected_to | num_category | num_event_properties | num_included_locales | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | sp:spdatastream:xboBFK | Test | None | None | False | False | True | None | 0 | ... | None | 5-558c861debc745e1ebae29a266a8bdb9 | 1 | 0 | 0 | 0 | 0 | 0 | 7 | 0 | |
1 | urn:streampipes.apache.org:eventstream:Wgyrse | Test File | None | None | False | False | True | None | 0 | ... | None | 4-66548b6b84287011b7cec0876ef82baf | 1 | 0 | 0 | 0 | 0 | 0 | 2 | 0 |
2 rows Ć 22 columns
How to use Online Machine Learning with StreamPipes¶
After we configured the client as usual, we can start with the new part.
The approach is straight forward and you can start with the ML part in just 3 steps:
- Create a pipeline with River and insert the preprocessing steps and model of your choice.
- Configure the
OnlineML
wrapper to fit to your model and insert the client and required data stream ids. - Start the wrapper and let the learning begin.
A StreamPipesFunction is then started, which trains the model for each new event. It also creates an output data stream which will send the prediction of the model back to StreamPipes. This output stream can be seen when creating a new pipeline and can be used like every other data source. So you can use it in a pipeline and save the predictions in a Data Lake.
You can also stop and start the training with the method set_learning
. To stop the whole function use the stop
methode and if you want to delete the output stream entirely, you can go to the Pipeline Element Installer
in StreamPipes and uninstall it.
Now let's take a look at some examples. If you want to execute the examples below you have to create an adapter for the Machine Data Simulator
, select the flowrate
sensor and insert the stream id of this stream.
KMeans¶
from river import cluster, compose, preprocessing
from streampipes.function_zoo.river_function import OnlineML
from streampipes.functions.utils.data_stream_generator import RuntimeType
k_means = compose.Pipeline(
("drop_features", compose.Discard("sensorId", "timestamp")),
("scale", preprocessing.StandardScaler()),
("k_means", cluster.KMeans(n_clusters=2)),
)
clustering = OnlineML(
client=client, stream_ids=["sp:spdatastream:xboBFK"], model=k_means, prediction_type=RuntimeType.INTEGER.value
)
clustering.start()
2023-01-27 16:04:35,599 - streampipes.endpoint.endpoint - [INFO] - [endpoint.py:163] [_make_request] - Successfully retrieved all resources. 2023-01-27 16:04:35,599 - streampipes.functions.function_handler - [INFO] - [function_handler.py:64] [initializeFunctions] - Create output data stream "sp:spdatastream:cwKPoo" for the function "65cf8b86-bcdf-433e-a1c7-3e920eab55d0" 2023-01-27 16:04:37,766 - streampipes.endpoint.endpoint - [INFO] - [endpoint.py:163] [_make_request] - Successfully retrieved all resources. 2023-01-27 16:04:37,767 - streampipes.functions.function_handler - [INFO] - [function_handler.py:78] [initializeFunctions] - Using NatsBroker for RiverFunction 2023-01-27 16:04:37,791 - streampipes.functions.broker.nats_broker - [INFO] - [nats_broker.py:48] [_makeConnection] - Connected to NATS at localhost:4222 2023-01-27 16:04:37,791 - streampipes.functions.broker.nats_broker - [INFO] - [nats_broker.py:48] [_makeConnection] - Connected to NATS at localhost:4222 2023-01-27 16:04:37,792 - streampipes.functions.broker.nats_broker - [INFO] - [nats_broker.py:58] [createSubscription] - Subscribed to stream: sp:spdatastream:xboBFK
clustering.set_learning(False)
clustering.stop()
2023-01-27 16:04:57,303 - streampipes.functions.broker.nats_broker - [INFO] - [nats_broker.py:82] [disconnect] - Stopped connection to stream: sp:spdatastream:xboBFK 2023-01-27 16:04:57,304 - streampipes.functions.broker.nats_broker - [INFO] - [nats_broker.py:82] [disconnect] - Stopped connection to stream: sp:spdatastream:cwKPoo
HoeffdingTreeRegressor¶
import pickle
from river import compose, tree
from streampipes.function_zoo.river_function import OnlineML
from streampipes.functions.utils.data_stream_generator import RuntimeType
hoeffding_tree = compose.Pipeline(
("drop_features", compose.Discard("sensorId", "timestamp")),
("hoeffding_tree", tree.HoeffdingTreeRegressor(grace_period=5)),
)
def draw_tree(self, event, streamId):
"""Draw the tree and save the image."""
if self.learning:
if self.model[1].n_nodes != None:
self.model[1].draw().render("hoeffding_tree", format="png", cleanup=True)
def save_model(self):
"""Save the trained model."""
with open("hoeffding_tree.pkl", "wb") as f:
pickle.dump(self.model, f)
regressor = OnlineML(
client=client,
stream_ids=["sp:spdatastream:xboBFK"],
model=hoeffding_tree,
prediction_type=RuntimeType.FLOAT.value,
supervised=True,
target_label="temperature",
on_event=draw_tree,
on_stop=save_model,
)
regressor.start()
regressor.set_learning(False)
regressor.stop()
DecisionTreeClassifier¶
import pickle
from river import compose, tree
from streampipes.function_zoo.river_function import OnlineML
from streampipes.functions.utils.data_stream_generator import RuntimeType
decision_tree = compose.Pipeline(
("drop_features", compose.Discard("sensorId", "timestamp")),
("decision_tree", tree.ExtremelyFastDecisionTreeClassifier(grace_period=5)),
)
def draw_tree(self, event, streamId):
"""Draw the tree and save the image."""
if self.learning:
if self.model[1].n_nodes != None:
self.model[1].draw().render("decicion_tree", format="png", cleanup=True)
def save_model(self):
"""Save the trained model."""
with open("decision_tree.pkl", "wb") as f:
pickle.dump(self.model, f)
classifier = OnlineML(
client=client,
stream_ids=["sp:spdatastream:xboBFK"],
model=decision_tree,
prediction_type=RuntimeType.BOOLEAN.value,
supervised=True,
target_label="sensor_fault_flags",
on_event=draw_tree,
on_stop=save_model,
)
classifier.start()
classifier.set_learning(False)
classifier.stop()
How do you like this tutorial? We hope you like it and would love to receive some feedback from you. Just go to our GitHub discussion page and let us know your impression. We'll read and react to them all, we promise!