Creating Output Streams with StreamPipes Functions¶
A StreamPipes Function is a practical tool to help you receive and send live data from and to your StreamPipes instance. This tutorial provides sample code for creating a StreamPipes Function that defines an output stream so you can get started right away by following the below steps.
Connecting to your StreamPipes instance¶
In order to receive and send live data from and to StreamPipes you will first have to connect to your instance by creating a StreamPipes Client by following the usual procedure. In this case we will be connecting to a local instance using Kafka. If you haven't already, you will need to open the kafka port manually inside the docker compose file by adding the following The following port mapping inside the kafka
node:
yml
ports:
- "9094:9094"
import os
os.environ["BROKER-HOST"] = "localhost"
os.environ["KAFKA-PORT"] = "9094"
api_key = os.environ.get("TOKEN")
from streampipes.client import StreamPipesClient
from streampipes.client.config import StreamPipesClientConfig
from streampipes.client.credential_provider import StreamPipesApiKeyCredentials
client_config = StreamPipesClientConfig(
credential_provider=StreamPipesApiKeyCredentials(username="admin@streampipes.apache.org", api_key=api_key),
host_address="localhost",
port=80,
https_disabled=True,
)
client = StreamPipesClient(client_config=client_config)
2024-07-30 12:44:07,912 - streampipes.client.client - [INFO] - [client.py:199] [_set_up_logging] - Logging successfully initialized with logging level INFO. 2024-07-30 12:44:07,995 - streampipes.endpoint.endpoint - [INFO] - [endpoint.py:164] [_make_request] - Successfully retrieved all resources. 2024-07-30 12:44:07,995 - streampipes.client.client - [INFO] - [client.py:172] [_get_server_version] - The StreamPipes version was successfully retrieved from the backend: 0.95.1. By means of that, authentication via the provided credentials is also tested successfully.
Creating an output stream¶
You can create an output stream by using the create_data_stream
method inside the functions init
method. You need to specify the name that will be used to display the stream in StreamPipes alongside its stream_id
, attributes
, and the broker
.
def __init__(self, input_stream: DataStream):
output_stream = create_data_stream(
name="example-stream",
stream_id="example-stream",
attributes={
"number": RuntimeType.INTEGER.value
},
broker=get_broker_description(input_stream)
)
You can now add this output_stream
to the function definition and pass it to the parent class.
function_definition = FunctionDefinition(
consumed_streams=[input_stream.element_id]
).add_output_data_stream(output_stream)
super().__init__(function_definition=function_definition)
To create the output event we define a dictionary with the key being equvalent to the key we defined for the attributes
in the create_data_stream
method. The value will just be 1, for simplicity. Lastly we call the add_output
method and use the above specified function_definition
to get the stream_id
. All this is happening inside the onEvent
method of the function because this method will be called for every incoming event.
output = {
"number": 1
}
self.add_output(
stream_id=self.function_definition.get_output_stream_ids()[0],
event=output
)
Creating the StreamPipes Function¶
The code below shows an example function and also demonstrates how to include the output stream creation.
import pandas as pd
from typing import Dict, Any
from streampipes.functions.streampipes_function import StreamPipesFunction
from streampipes.functions.utils.data_stream_generator import create_data_stream, RuntimeType
from streampipes.functions.broker.broker_handler import get_broker_description
from streampipes.model.resource import FunctionDefinition, DataStream
from streampipes.functions.utils.function_context import FunctionContext
from streampipes.functions.function_handler import FunctionHandler
from streampipes.functions.registration import Registration
class SimpleFunction(StreamPipesFunction):
def __init__(self, input_stream: DataStream):
output_stream = create_data_stream(
name="example-stream",
stream_id="example-stream",
attributes={
"number": RuntimeType.INTEGER.value
},
broker=get_broker_description(input_stream)
)
function_definition = FunctionDefinition(
consumed_streams=[input_stream.element_id]
).add_output_data_stream(output_stream)
super().__init__(function_definition=function_definition)
def onServiceStarted(self, context: FunctionContext):
pass
def onEvent(self, event: Dict[str, Any], streamId: str):
output = {
"number": 1
}
self.add_output(
stream_id=self.function_definition.get_output_stream_ids()[0],
event=output
)
def onServiceStopped(self):
pass
Instantiating and starting the function¶
To instantiate the function we pass the required input stream, then we register the function using an instance of the class Registration
after which we can use the function handler to initialize all registered functions using the client. In this case, our input stream is named 'demo'. You will need to replace this name with the name of the input stream you wish to use.
stream = [stream for stream in client.dataStreamApi.all() if stream.name == "demo"][0]
simple_function = SimpleFunction(input_stream=stream)
registration = Registration()
registration.register(simple_function)
function_handler = FunctionHandler(registration, client)
function_handler.initializeFunctions()
2024-07-30 12:44:25,052 - streampipes.endpoint.endpoint - [INFO] - [endpoint.py:164] [_make_request] - Successfully retrieved all resources. 2024-07-30 12:44:25,150 - streampipes.endpoint.endpoint - [INFO] - [endpoint.py:164] [_make_request] - Successfully retrieved all resources. 2024-07-30 12:44:25,150 - streampipes.functions.function_handler - [INFO] - [function_handler.py:84] [initializeFunctions] - Using output data stream 'example-stream' for function '0e4287a7-6936-4fcb-9de4-8d29a7b6c1c6' 2024-07-30 12:44:25,229 - streampipes.endpoint.endpoint - [INFO] - [endpoint.py:164] [_make_request] - Successfully retrieved all resources. 2024-07-30 12:44:25,230 - streampipes.functions.function_handler - [INFO] - [function_handler.py:100] [initializeFunctions] - Using KafkaConsumer for SimpleFunction
2024-07-30 12:44:25,247 - streampipes.functions.broker.kafka.kafka_publisher - [INFO] - [kafka_publisher.py:49] [_make_connection] - Connecting to Kafka at localhost:9094 2024-07-30 12:44:25,250 - streampipes.functions.broker.kafka.kafka_consumer - [INFO] - [kafka_consumer.py:52] [_make_connection] - Connecting to Kafka at localhost:9094 2024-07-30 12:44:25,251 - streampipes.functions.broker.kafka.kafka_consumer - [INFO] - [kafka_consumer.py:62] [_create_subscription] - Subscribing to stream: sp:spdatastream:PIgznU 2024-07-30 12:46:12,229 - streampipes.functions.broker.kafka.kafka_publisher - [INFO] - [kafka_publisher.py:73] [disconnect] - Stopped connection to stream: example-stream
If we now go into StreamPipes, create a new pipeline and view the info for our output stream we can see the live predictions coming in.
To stop the function we call the function handler's disconnect
method:
function_handler.disconnect()
2024-07-30 12:46:12,171 - streampipes.functions.broker.kafka.kafka_consumer - [INFO] - [kafka_consumer.py:72] [disconnect] - Stopped connection to stream: sp:spdatastream:PIgznU
In conclusion, this tutorial has successfully demonstrated how to generate output streams using a StreamPipes Function.
We hope you found this tutorial helpful and would appreciate your feedback. Please visit our GitHub discussion page to share your impressions. We promise to read and respond to every comment!