Skip to main content

Apache StreamPipes release 0.91.0

· 5 min read

14 minutes to read


The Apache StreamPipes community is delighted to announce the release of Apache StreamPipes version 0.91.0! The biggest highlight of this release is the birth of our official StreamPipes Python library. Apart from that, we have made a lot of improvements under the hood, especially in terms of improving the development experience (e.g., by introducing and enforcing code style rules). Nevertheless, our new version includes more than 90 improvements and bug fixes in total.

The current release can be downloaded here.

StreamPipes Python

Apache StreamPipes aims to enable non-technical users to connect and analyze IoT data streams. To this end, it provides an easy-to-use and convenient user interface that allows one to connect to an IoT data source and create some visual graphs within a few minutes.
Although this is the main use case of Apache StreamPipes, it can also provide great value for people who are eager to work on data analysis or data science with IoT data, but don't want to get in touch with all the hassle associated with extracting data from devices in a suitable format. In this scenario, StreamPipes helps you connect to your data source and extract the data for you. You then can make the data available outside StreamPipes by writing it into an external source, such as a database, Kafka, etc. While this requires another component, you can also extract your data directly from StreamPipes programmatically using the StreamPipes API. For convenience, we also provide you with a StreamPipes client both available for Java and Python. Specifically with StreamPipes Python, we want to address the amazing data analytics and data science community in Python and benefit from the great universe of Python libraries out there.

Getting started

Besides our official version, which you can access here, we provide Apache StreamPipes as a convenient release via PyPI. Therefore, you can easily install it via the following command

pip install streampipes

All documentation related to StreamPipes Python can be found here.

But now enough words are written, let's just start with a few examples. As a first step, we want to establish a connection to a running StreamPipes instance.

from streampipes.client import StreamPipesClient
from streampipes.client.config import StreamPipesClientConfig
from streampipes.client.credential_provider import StreamPipesApiKeyCredentials

config = StreamPipesClientConfig(
credential_provider = StreamPipesApiKeyCredentials(
username = "test@streampipes.apache.org",
api_key = "DEMO-KEY",
),
host_address = "localhost",
https_disabled = True,
port = 80
)

client = StreamPipesClient(client_config=config)

client.describe() gives you an overview about what can be discovered via the Python client:

Hi there!
You are connected to a StreamPipes instance running at http://localhost:80.
The following StreamPipes resources are available with this client:
1x DataLakeMeasures
1x DataStreams

The required API key can be retrieved from StreamPipes via the UI:

demonstration how to get an API key in form of a gif

Query Data from StreamPipes data lake

As a first step, we can see which data sets are available in the data lake:

client.dataLakeMeasureApi.all().to_pandas()

To query data for a specific data lake measure, the following command takes you there:

client.dataLakeMeasureApi.get(identifier="flow-rate").to_pandas()

There are some parameters that allow you to query data more specifically, visit the docs for more detailed information.

If you are curious which features are available for StreamPipes Python, have a look at our tutorial section.

We plan to further extend the Python library in the next releases - we welcome your ideas on any features you'd like to see!

Create Adapter without starting

Until now, when creating an adapter in StreamPipes, the adapter was started immediately after completion. From now on you can choose via the adapter overview in the last dialog whether the adapter should be started immediately or later (this is then possible in the adapter overview).

demonstration how to disable automatic start of an adapter in form of a gif

Redirect after Login

This is another small user interface improvement: when a user opens a StreamPipes link even though they are not logged in, e.g., http://streampipes.demo/#/connect, StreamPipes displays the login dialog and now redirects the user directly to the target view (in this case the connection overview).

Support Output Streams for Standalone Functions

Within the last release, 0.90.0, we introduced StreamPipes Functions as a lightweight alternative to processing elements.
StreamPipes Functions do now support having output streams, which means that they allow not only to consume but to send data to StreamPipes as well.

Below you can see a simple example function that demonstrates how events can be pushed back to StreamPipes again.

public class FunctionPublishExample extends StreamPipesFunction {

private static final Logger LOG = LoggerFactory.getLogger(FunctionPublishExample.class);

private static final String FUNCTION_ID = "org.apache.streampipes.example.function.publish";
private static final String STREAM_APP_ID = "example-output-stream-1";

private SpOutputCollector outputCollector;

@Override
public List<String> requiredStreamIds() {
return List.of("urn:streampipes.apache.org:eventstream:EtMUkN");
}

@Override
public void onServiceStarted(FunctionContext context) {
LOG.info("Service started");
this.outputCollector = context.getOutputCollectors().get(STREAM_APP_ID);
}

@Override
public void onEvent(Event event, String streamId) {
LOG.info("on event");
var ev = new Event();
ev.addField("timestamp", System.currentTimeMillis());
ev.addField("example-property", "abc");
this.outputCollector.collect(ev);
}

@Override
public void onServiceStopped() {
LOG.info("service stopped");
}

@Override
public FunctionConfig getFunctionConfig() {

return FunctionConfigBuilder
.create(FunctionId.from(FUNCTION_ID, 1))
.withOutputStream(DataStreamBuilder.create(STREAM_APP_ID, "My Function Stream", "")
.property(EpProperties.timestampProperty("timestamp"))
.property(EpProperties.stringEp(
Labels.from("my-example-property", "test", "test"),
"example-property",
SO.TEXT,
PropertyScope.MEASUREMENT_PROPERTY))
.format(Formats.jsonFormat())
.protocol(Protocols.kafka("localhost", 9094, STREAM_APP_ID))
.build())
.build();
}
}

This functionality is also already supported in StreamPipes Python! 🚀

This means that you can write a StreamPipes function in Python that allows you to consume streams of data from StreamPipes, perform any data manipulation in Python, and send the data back to StreamPipes, where it can be further processed.


To get a comprehensive overview of all changes released within Apache StreamPipes 0.91.0 please visit our [release notes](https://github.com/apache/incubator-streampipes/blob/release/0.91.0/RELEASE_NOTES.md#0910).