API Documentation
Event Processor
- class src.event_processor.event_processor.EventProcessor(invocation_strategy: InvocationStrategies = InvocationStrategies.FIRST_MATCH, error_handling_strategy: ErrorHandlingStrategies = ErrorHandlingStrategies.BUBBLE)
A self-contained event processor.
- add_subprocessor(subprocessor: EventProcessor)
Add a subprocessor to this event processor
- Parameters:
subprocessor – The other event processor to add
- add_subprocessors(*subprocessors: EventProcessor)
Add multiple subprocessors at once.
- Parameters:
subprocessors – The tuple of subprocessors
- add_subprocessors_in_package(package: module)
Add all the processors found in all modules of a package as subprocessors.
Note that you should specify an actual package, and not just the name of a package.
- Parameters:
package – The package that should be searched for event processors
Invocation Strategies
Contains the different invocation strategies for calling processors.
- class src.event_processor.invocation_strategies.AllMatches(error_handling_strategy: ~src.event_processor.error_handling_strategies.ErrorHandlingStrategy = <src.event_processor.error_handling_strategies.Bubble object>)
Strategy calling all matching processors.
- class src.event_processor.invocation_strategies.FirstMatch(error_handling_strategy: ~src.event_processor.error_handling_strategies.ErrorHandlingStrategy = <src.event_processor.error_handling_strategies.Bubble object>)
Strategy calling the first matching processor.
- class src.event_processor.invocation_strategies.InvocationStrategies(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)
Enumeration of available invocation strategies.
- class src.event_processor.invocation_strategies.InvocationStrategy(error_handling_strategy: ~src.event_processor.error_handling_strategies.ErrorHandlingStrategy = <src.event_processor.error_handling_strategies.Bubble object>)
Class defining an abstract invocation strategy.
- class src.event_processor.invocation_strategies.NoMatches(error_handling_strategy: ~src.event_processor.error_handling_strategies.ErrorHandlingStrategy = <src.event_processor.error_handling_strategies.Bubble object>)
Strategy not calling any matching processors.
- class src.event_processor.invocation_strategies.NoMatchesStrict(error_handling_strategy: ~src.event_processor.error_handling_strategies.ErrorHandlingStrategy = <src.event_processor.error_handling_strategies.Bubble object>)
Strategy failing when there are multiple matching.
Results
- class src.event_processor.result.Result(processor_name: str, returned_value: Any | None = None, raised_exception: Exception | None = None)
A result is what gets returned after an invocation.
It contains information about the processor as well as its return value.
Exceptions
Exceptions for event processor.
- exception src.event_processor.exceptions.DependencyError
Exceptions for failures while resolving dependencies.
- exception src.event_processor.exceptions.EventProcessorError
General exception for the event-processor library.
- exception src.event_processor.exceptions.FilterError
Exception for failures related to filters.
- exception src.event_processor.exceptions.InvocationError
Exception for failures in invocation.
- exception src.event_processor.exceptions.NoValueError
Exception for when a value is not present in a given context.
Filtering
Contains many different filters to conveniently filter through events.
- class src.event_processor.filters.Accept
Accept any event (good for default processors).
- matches(_event: dict) bool
Test whether a given event matches an input event.
- Parameters:
event – The event to test
- Returns:
True if the event matches, False otherwise
- class src.event_processor.filters.And(*args: Filter)
Accept events that get accepted by all specified filters.
- matches(event: dict) bool
Test whether a given event matches an input event.
- Parameters:
event – The event to test
- Returns:
True if the event matches, False otherwise
- class src.event_processor.filters.Dyn(resolver: Callable, inject_as: str | None = None)
Accept events based on a dynamic condition which is resolved by a callable.
This filter also allows dynamically changing the event by adding the returned value of the resolver to the event under a user-supplied key. This will overwrite a previously existing key.
Note that the equality check with this filter is a bit special. The filter will only be equal if the resolver is the same (two functions with the same code are not equal, they need to be the same object in memory).
This means that when using this filter, you should be careful to either re-use the same function as a resolver or make sure the functions have different behaviors. Otherwise, be sure to use processor ranking and invocation strategies.
- matches(event: dict) bool
Test whether a given event matches an input event.
- Parameters:
event – The event to test
- Returns:
True if the event matches, False otherwise
- class src.event_processor.filters.Eq(path: Any, value: Any)
Accept events where a given value is present at the given key.
- matches(event: dict) bool
Test whether a given event matches an input event.
- Parameters:
event – The event to test
- Returns:
True if the event matches, False otherwise
- class src.event_processor.filters.Exists(path: Any)
Accept event where a given key exists.
- matches(event: dict) bool
Test whether a given event matches an input event.
- Parameters:
event – The event to test
- Returns:
True if the event matches, False otherwise
- class src.event_processor.filters.Filter
Abstract filter to define the filter interface.
- abstract matches(event: dict) bool
Test whether a given event matches an input event.
- Parameters:
event – The event to test
- Returns:
True if the event matches, False otherwise
- class src.event_processor.filters.Geq(path: Any, value: int | float)
Accept events where the value exists and is greater than or equal to the specified value.
- class src.event_processor.filters.Gt(path: Any, value: int | float)
Accept events where the value exists and is greater than the specified value.
- class src.event_processor.filters.Leq(path: Any, value: int | float)
Accept events where the value at the given path exists is less than or equal to the specified value.
- class src.event_processor.filters.Lt(path: Any, value: int | float)
Accept events where the value at the given path exists and is less than the specified value.
- class src.event_processor.filters.NumCmp(path: Any, comparator: Callable[[float, float], bool], target: float)
Accept events when the comparator returns True.
If you use this processor, make sure that you don’t use equal (and not identical) comparators for the same path. For example, don’t use the same lambda in two different places. Instead, use a function, and pass a reference to that function. If you don’t do that, the filters will effectively be different (even if they match the same thing), leading to perhaps unexpected results.
- matches(event: dict) bool
Test whether a given event matches an input event.
- Parameters:
event – The event to test
- Returns:
True if the event matches, False otherwise
Dependency Injection
Dependency injection and management facilities.
- src.event_processor.dependencies.Depends(callable_: Callable[[...], DependsReturn], cache: bool = True) DependsReturn
Create a dependency.
- class src.event_processor.dependencies.Event(dict_event: dict)
Type to wrap a dict to be used as a dependency.
- src.event_processor.dependencies.call_with_injection(callable_: Callable, event: Event | None = None, cache: dict | None = None) Any | None
Call a callable and inject required dependencies.
Note that keyword args that have the same name as the parameter used for a dependency will be overwritten with the dependency’s injected value.
- Parameters:
callable – The callable to call
event – The event for the current invocation
cache – The dependency cache to use
- Returns:
The return value of the callable
- src.event_processor.dependencies.get_event_dependencies(callable_: Callable) List[str]
Get the parameter names for event dependencies.
- Parameters:
callable – The callable for which to get dependencies
- Returns:
A list of the parameters requiring the event
- src.event_processor.dependencies.get_pydantic_dependencies(callable_: Callable) Dict[str, Type[BaseModel]]
Get the required models and their parameter names for a callable.
- Parameters:
callable – The callable for which to get dependencies
- Returns:
A mapping of argument names to pydantic model types
- src.event_processor.dependencies.get_required_dependencies(callable_: Callable) Dict[str, _Depends]
Get the required dependencies for a callable.
- Parameters:
callable – The callable for which to get dependencies
- Returns:
A mapping of callable argument names to dependencies
- src.event_processor.dependencies.get_scalar_value_dependencies(callable_: Callable) List[Parameter]
Get the scalar value dependencies for a callable.
- Parameters:
callable – The callable for which to get dependencies
- Returns:
A view of the parameters that represent dependencies
- src.event_processor.dependencies.resolve(dependency: _Depends, event: Event | None = None, cache: dict | None = None) Tuple[Any | None, bool]
Resolve a dependency into a value.
The resulting values from dependencies are cached and re-used if a cache is supplied and the dependency itself does not explicitly state that it does not want to be cached. Also, any dependency that depends on another dependency where caching has been disabled will also not be cached (because the sub-value may change, which may in turn change the value of the current dependency).
- Parameters:
dependency – The dependency to resolve
event – The event for the current invocation
cache – The cache for previously resolved dependencies
- Returns:
The tuple (resolved_value, cacheable)
- Raises:
pydantic.error_wrappers.ValidationError if the event cannot be parsed into a pydantic model
- src.event_processor.dependencies.resolve_scalar_value_dependencies(scalar_dependencies: List[Parameter], event: Event | None) Dict[str, Any]
Resolve the scalar dependencies to values contained in the event.
Values will be resolved differently depending on whether or not pydantic is installed.
- Parameters:
scalar_dependencies – The dependencies to resolve
event – The event from which to get values
- Returns:
A new dict with resolved dependency values
- src.event_processor.dependencies.resolve_scalar_value_dependencies_with_pydantic(scalar_dependencies: List[Parameter], event: Event) Dict[str, Any]
Resolve the scalar dependencies to values contained in the event with pydantic.
This function does validation for the types of values passed into the event. Since this uses pydantic, it’s possible to use any pydantic types such as PaymentCardNumber, for example.
- Parameters:
scalar_dependencies – The dependencies to resolve
event – The event from which to get values
- Returns:
A new dict with resolved and validated dependency values
- src.event_processor.dependencies.resolve_scalar_value_dependencies_without_pydantic(scalar_dependencies: List[Parameter], event: Event) Dict[str, Any]
Resolve the scalar dependencies to values contained in the event without using pydantic.
This function does not validate the types of values passed into the event to ensure they match the type annotations of the dependencies. To get validation for those types, make sure pydantic is installed.
- Parameters:
scalar_dependencies – The dependencies to resolve
event – The event from which to get values
- Returns:
A new dict with resolved dependency values