Skip to content

Data lake measure

Specific implementation of the StreamPipes API's data lake measure endpoints. This endpoint allows to consume data stored in StreamPipes' data lake.

DataLakeMeasureEndpoint

Bases: APIEndpoint

Implementation of the DataLakeMeasure endpoint.

This endpoint provides an interfact to all data stored in the StreamPipes data lake.

Consequently, it allows uerying metadata about available data sets (see all() method). The metadata is returned as an instance of DataLakeMeasures.

In addition, the endpoint provides direct access to the data stored in the data laka by querying a specific data lake measure using the get() method.

Examples:

from streampipes.client import StreamPipesClient
from streampipes.client.config import StreamPipesClientConfig
from streampipes.client.credential_provider import StreamPipesApiKeyCredentials
client_config = StreamPipesClientConfig(
    credential_provider=StreamPipesApiKeyCredentials(username="test-user", api_key="api-key"),
    host_address="localhost",
    port=8082,
    https_disabled=True
)
client = StreamPipesClient.create(client_config=client_config)

# get all existing data lake measures from StreamPipes
data_lake_measures = client.dataLakeMeasureApi.all()

# let's take a look how many we got
len(data_lake_measures)
5

# Retrieve a specific data lake measure as a pandas DataFrame
flow_rate_pd = client.dataLakeMeasureApi.get(identifier="flow-rate").to_pandas()
flow_rate_pd
                         time    density  mass_flow    sensorId  sensor_fault_flags  temperature  volume_flow
0    2023-02-24T16:19:41.472Z  50.872730   3.309556  flowrate02               False    44.448483     5.793138
1    2023-02-24T16:19:41.482Z  47.186588   5.608580  flowrate02               False    40.322033     0.058015
2    2023-02-24T16:19:41.493Z  46.735321   7.692881  flowrate02               False    49.239639    10.283526
3    2023-02-24T16:19:41.503Z  40.169796   3.632898  flowrate02               False    49.933754     6.893441
4    2023-02-24T16:19:41.513Z  49.635124   0.711260  flowrate02               False    50.106617     2.999871
..                        ...        ...        ...         ...                 ...          ...          ...
995  2023-02-24T16:19:52.927Z  50.057495   1.740114  flowrate02               False    46.558231     1.818237
996  2023-02-24T16:19:52.94Z   41.038895   7.211723  flowrate02               False    48.048622     2.127493
997  2023-02-24T16:19:52.952Z  45.837013   7.770180  flowrate02               False    48.188026     7.892062
998  2023-02-24T16:19:52.965Z  43.389065   4.458602  flowrate02               False    48.280899     5.733892
999  2023-02-24T16:19:52.977Z  44.056030   2.592060  flowrate02               False    47.505951     4.260697

As you can see, the returned amount of rows per default is 1000. We can modify this behavior by passing the limit paramter.

flow_rate_pd = client.dataLakeMeasureApi.get(identifier="flow-rate", limit=10).to_pandas()
len(flow_rate_pd)
10

If we are only interested in the values for density, columns allows us to select the columns to be returned:

flow_rate_pd = client.dataLakeMeasureApi.get(identifier="flow-rate", columns='density', limit=3).to_pandas()
flow_rate_pd
                       time    density
0  2023-02-24T16:19:41.472Z  50.872730
1  2023-02-24T16:19:41.482Z  47.186588
2  2023-02-24T16:19:41.493Z  46.735321

This is only a subset of the available query parameters, find them at MeasurementGetQueryConfig.

get(identifier, **kwargs)

Queries the specified data lake measure from the API.

By default, the maximum number of returned records is 1000. This behaviour can be influenced by passing the parameter limit with a different value (see MeasurementGetQueryConfig).

PARAMETER DESCRIPTION
identifier

The identifier of the data lake measure to be queried.

TYPE: str

**kwargs

keyword arguments can be used to provide additional query parameters. The available query parameters are defined by the MeasurementGetQueryConfig.

TYPE: Optional[Dict[str, Any]] DEFAULT: {}

RETURNS DESCRIPTION
measurement

the specified data lake measure

TYPE: DataLakeMeasures

Examples:

see directly at DataLakeMeasureEndpoint.

MeasurementGetQueryConfig

Bases: BaseModel

Config class describing the parameters of the get() method for measurements.

This config class is used to validate the provided query parameters for the GET endpoint of measurements. Additionally, it takes care of the conversion to a proper HTTP query string. Thereby, parameter names are adapted to the naming of the StreamPipes API, for which Pydantic aliases are used.

ATTRIBUTE DESCRIPTION
columns

A comma separated list of column names (e.g., time,value)
If provided, the returned data only consists of the given columns.
Please be aware that the column time as an index is always included.

TYPE: Optional[str]

end_date

Restricts queried data to be younger than the specified time.

TYPE: Optional[datetime]

limit

Amount of records returned at maximum (default: 1000)
This needs to be at least 1

TYPE: Optional[int]

offset

Offset to be applied to returned data
This needs to be at least 0

TYPE: Optional[int]

order

Ordering of query results
Allowed values: ASC and DESC (default: ASC)

TYPE: Optional[str]

page_no

Page number used for paging operation
This needs to be at least 1

TYPE: Optional[int]

start_date

Restricts queried data to be older than the specified time

TYPE: Optional[datetime]

Config

Pydantic Config class

build_query_string()

Builds a HTTP query string for the config.

This method returns an HTTP query string for the invoking config. It follows the following structure ?param1=value1&param2=value2.... This query string is not an entire URL, instead it needs to appended to an API path.

RETURNS DESCRIPTION
query_param_string

HTTP query params string (?param1=value1&param2=value2...)

TYPE: str

StreamPipesQueryValidationError

Bases: Exception

A custom exception to be raised when the validation of query parameter causes an error.