With the introduction of StreamPipes Functions, Apache StreamPipes provides a lightweight way to attach fixed runtime logic to existing streams — without creating a full pipeline element.
Functions are ideal when:
- A full processor would be overkill
- You want logic that runs with the extensions service lifecycle
- You need direct access to incoming events
Previously, functions could maintain in-memory state during runtime, but any restart of the extensions service resulted in a complete state reset.
We now provide a new functionality in developer preview that introduces typed state persistence for Functions, allowing state to survive service restarts.
This allows functions to survive restarts and resume exactly where they stopped.
Why Stateful Functions?
Many real-world use cases require minimal but persistent runtime state:
- Counting processed events
- Tracking the last processed timestamp
- Storing threshold configurations
- Keeping lightweight usage statistics
Previously, developers had to implement external persistence manually.
Now, state management is built directly into the Function lifecycle.
The New State Store API
The FunctionContext now provides access to a typed state store:
StateStore<T> getStateStore(Class<T> stateClass)
This API allows you to:
- Load persisted state during startup
- Persist state on shutdown
State handling is fully optional and backward compatible. If you don’t request a state store, your function behaves exactly as before.
Lifecycle Integration
Functions still follow the same lifecycle:
onServiceStarted(FunctionContext context)onEvent(Event event, String streamId)onServiceStopped()
The only difference is that you can now initialize a StateStore during startup.
Example: Threshold Monitoring Function
Let’s implement a simple function that:
- Counts processed events
- Tracks the last processed timestamp
- Logs a message when a threshold is reached
1. Define the State Object
State is defined as a typed POJO.
public class ProcessingState {
private int schemaVersion = 1;
private long processedEventCount = 0;
private long lastProcessedEpochMs = 0;
private long alertThreshold = 1000;
public ProcessingState() {
}
// getters/setters omitted for brevity
}
2. Implement the Stateful Function
public class ThresholdMonitoringFunction implements StreamPipesFunction {
private StateStore<ProcessingState> stateStore;
private ProcessingState state;
@Override
public String getFunctionId() {
return "threshold-monitoring-function";
}
@Override
public List<String> requiredStreamIds() {
return List.of("your-stream-resource-id");
}
@Override
public void onServiceStarted(FunctionContext context) {
// Request typed state store
this.stateStore = context.getStateStore(ProcessingState.class);
// Load existing state or use default
this.state = stateStore.load(new ProcessingState());
}
@Override
public void onEvent(Event event, String streamId) {
state.setProcessedEventCount(
state.getProcessedEventCount() + 1
);
state.setLastProcessedEpochMs(
System.currentTimeMillis()
);
if (state.getProcessedEventCount()
>= state.getAlertThreshold()) {
// Handle alert once when threshold is reached
}
}
@Override
public void onServiceStopped() {
// Persist state before shutdown
stateStore.persist(state);
}
}
State Persistence Lifecycle
The persistence mechanism follows a simple and predictable pattern:
Startup
state = stateStore.load(new ProcessingState());
- If persisted state exists → it is restored
- If not → the provided default object is used
Runtime
- State is modified in memory during
onEvent
Shutdown
stateStore.persist(state);
- The current state is stored before the service stops
After a restart, the function continues with the restored values.
No external database setup. No manual serialization logic. No additional infrastructure required.
Backward Compatibility
Stateful behavior is opt-in.
Existing functions:
- Continue to work unchanged
- Require no migration
- Remain fully stateless unless
getStateStore(...)is used
Conclusion & Feedback
Functions and state persistence are currently available in developer preview, and the API may evolve.
We are open to feedback from developers, if you try out stateful functions, let us know your thoughts and suggestions.
