Data lake measure
Specific implementation of the StreamPipes API's data lake measure endpoints. This endpoint allows to consume data stored in StreamPipes' data lake.
DataLakeMeasureEndpoint(parent_client)
¶
Bases: APIEndpoint
Implementation of the DataLakeMeasure endpoint.
This endpoint provides an interface to all data stored in the StreamPipes data lake.
Consequently, it allows querying metadata about available data sets (see all()
method).
The metadata is returned as an instance of DataLakeMeasures
.
In addition, the endpoint provides direct access to the data stored in the data laka by querying a
specific data lake measure using the get()
method.
Examples:
from streampipes.client import StreamPipesClient
from streampipes.client.config import StreamPipesClientConfig
from streampipes.client.credential_provider import StreamPipesApiKeyCredentials
client_config = StreamPipesClientConfig(
credential_provider=StreamPipesApiKeyCredentials(username="test-user", api_key="api-key"),
host_address="localhost",
port=8082,
https_disabled=True
)
client = StreamPipesClient.create(client_config=client_config)
# get all existing data lake measures from StreamPipes
data_lake_measures = client.dataLakeMeasureApi.all()
# let's take a look how many we got
len(data_lake_measures)
5
# Retrieve a specific data lake measure as a pandas DataFrame
flow_rate_pd = client.dataLakeMeasureApi.get(identifier="flow-rate").to_pandas()
flow_rate_pd
time density mass_flow sensorId sensor_fault_flags temperature volume_flow
0 2023-02-24T16:19:41.472Z 50.872730 3.309556 flowrate02 False 44.448483 5.793138
1 2023-02-24T16:19:41.482Z 47.186588 5.608580 flowrate02 False 40.322033 0.058015
2 2023-02-24T16:19:41.493Z 46.735321 7.692881 flowrate02 False 49.239639 10.283526
3 2023-02-24T16:19:41.503Z 40.169796 3.632898 flowrate02 False 49.933754 6.893441
4 2023-02-24T16:19:41.513Z 49.635124 0.711260 flowrate02 False 50.106617 2.999871
.. ... ... ... ... ... ... ...
995 2023-02-24T16:19:52.927Z 50.057495 1.740114 flowrate02 False 46.558231 1.818237
996 2023-02-24T16:19:52.94Z 41.038895 7.211723 flowrate02 False 48.048622 2.127493
997 2023-02-24T16:19:52.952Z 45.837013 7.770180 flowrate02 False 48.188026 7.892062
998 2023-02-24T16:19:52.965Z 43.389065 4.458602 flowrate02 False 48.280899 5.733892
999 2023-02-24T16:19:52.977Z 44.056030 2.592060 flowrate02 False 47.505951 4.260697
As you can see, the returned amount of rows per default is 1000
.
We can modify this behavior by passing the limit
paramter.
flow_rate_pd = client.dataLakeMeasureApi.get(identifier="flow-rate", limit=10).to_pandas()
len(flow_rate_pd)
10
If we are only interested in the values for density
,
columns
allows us to select the columns to be returned:
flow_rate_pd = client.dataLakeMeasureApi.get(identifier="flow-rate", columns='density', limit=3).to_pandas()
flow_rate_pd
time density
0 2023-02-24T16:19:41.472Z 50.872730
1 2023-02-24T16:19:41.482Z 47.186588
2 2023-02-24T16:19:41.493Z 46.735321
This is only a subset of the available query parameters, find them at MeasurementGetQueryConfig.
get(identifier, **kwargs)
¶
Queries the specified data lake measure from the API.
By default, the maximum number of returned records is 1000.
This behaviour can be influenced by passing the parameter limit
with a different value
(see MeasurementGetQueryConfig).
PARAMETER | DESCRIPTION |
---|---|
identifier |
The identifier of the data lake measure to be queried.
TYPE:
|
**kwargs |
keyword arguments can be used to provide additional query parameters. The available query parameters are defined by the MeasurementGetQueryConfig.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
measurement
|
the specified data lake measure
TYPE:
|
Examples:
see directly at DataLakeMeasureEndpoint.
MeasurementGetQueryConfig
¶
Bases: BaseModel
Config class describing the parameters of the get()
method for measurements.
This config class is used to validate the provided query parameters for the GET endpoint of measurements. Additionally, it takes care of the conversion to a proper HTTP query string. Thereby, parameter names are adapted to the naming of the StreamPipes API, for which Pydantic aliases are used.
ATTRIBUTE | DESCRIPTION |
---|---|
columns |
A comma separated list of column names (e.g.,
TYPE:
|
end_date |
Restricts queried data to be younger than the specified time.
TYPE:
|
limit |
Amount of records returned at maximum (default:
TYPE:
|
offset |
Offset to be applied to returned data
TYPE:
|
order |
Ordering of query results
TYPE:
|
page_no |
Page number used for paging operation
TYPE:
|
start_date |
Restricts queried data to be older than the specified time
TYPE:
|
Config
¶
Pydantic Config class
build_query_string()
¶
Builds a HTTP query string for the config.
This method returns an HTTP query string for the invoking config.
It follows the following structure ?param1=value1¶m2=value2...
.
This query string is not an entire URL, instead it needs to appended to an API path.
RETURNS | DESCRIPTION |
---|---|
query_param_string
|
HTTP query params string (
TYPE:
|
StreamPipesQueryValidationError
¶
Bases: Exception
A custom exception to be raised when the validation of query parameter causes an error.