Skip to content

River function

OnlineML(client, stream_ids, model, prediction_type=RuntimeType.STRING.value, supervised=False, target_label=None, on_start=lambda self, context: None, on_event=lambda self, event, streamId: None, on_stop=lambda self: None)

Wrapper class to enable an easy usage for Online Machine Learning models of the River library.

It creates a StreamPipesFunction to train a model with the incoming events of a data stream and creates an output data stream that publishes the prediction to StreamPipes.

PARAMETER DESCRIPTION
client

The client for the StreamPipes API.

TYPE: StreamPipesClient

stream_ids

The ids of the data stream to train the model.

TYPE: List[str]

model

The model to train. It meant to be a River model/pipeline, but can be every model with a 'learn_one' and 'predict_one' methode.

TYPE: Any

prediction_type

The data type of the prediction. Is only needed when you continue to work with the prediction in StreamPipes.

TYPE: str DEFAULT: RuntimeType.STRING.value

supervised

Define if the model is supervised or unsupervised.

TYPE: bool DEFAULT: False

target_label

Define the name of the target attribute if the model is supervised.

TYPE: Optional[str] DEFAULT: None

on_start

A function to be called when this StreamPipesFunction gets started.

TYPE: Callable[[Any, FunctionContext], None] DEFAULT: lambda self, context: None

on_event

A function to be called when this StreamPipesFunction receives an event.

TYPE: Callable[[Any, Dict[str, Any], str], None] DEFAULT: lambda self, event, streamId: None

on_stop

A function to be called when this StreamPipesFunction gets stopped.

TYPE: Callable[[Any], None] DEFAULT: lambda self: None

set_learning(learning)

Start or stop the training of the model.

PARAMETER DESCRIPTION
learning

Defines if the training should be continued

TYPE: bool

start()

Registers the function and starts the training.

stop()

Stops the function and ends the training forever.

RiverFunction(function_definition, stream_ids, model, supervised, target_label, on_start, on_event, on_stop)

Bases: StreamPipesFunction

Implementation of a StreamPipesFunction to enable an easy usage for Online Machine Learning models of the River library.

The function trains the model with the incoming events and publishes the prediction to an output data stream.

PARAMETER DESCRIPTION
function_definition

The function definition which contains the output stream.

TYPE: FunctionDefinition

stream_ids

The ids of the data stream to train the model.

TYPE: List[str]

model

The model to train. It meant to be a River model/pipeline, but can be every model with a 'learn_one' and 'predict_one' method.

TYPE: Any

supervised

Define if the model is supervised or unsupervised.

TYPE: bool

target_label

Define the name of the target attribute if the model is supervised.

TYPE: Optional[str]

on_start

A function to be called when this StreamPipesFunction gets started.

TYPE: Callable[[Any, FunctionContext], None]

on_event

A function to be called when this StreamPipesFunction receives an event.

TYPE: Callable[[Any, Dict[str, Any], str], None]

on_stop

A function to be called when this StreamPipesFunction gets stopped.

TYPE: Callable[[Any], None]

onEvent(event, streamId)

Trains the model with the incoming events and sends the prediction back to StreamPipes.

PARAMETER DESCRIPTION
event

The incoming event that serves as input for the function

TYPE: Dict[str, Any]

streamId

Identifier of the corresponding data stream

TYPE: str

RETURNS DESCRIPTION
None

onServiceStarted(context)

Executes the on_start method of the function.

PARAMETER DESCRIPTION
context

The functions' context

TYPE: FunctionContext

RETURNS DESCRIPTION
None

onServiceStopped()

Executes the on_stop function.

requiredStreamIds()

Returns the stream ids required by this function.

RETURNS DESCRIPTION
stream_ids

List of stream ids required by the function

TYPE: List[str]