API Documentation

Event Processor

class src.event_processor.event_processor.EventProcessor(invocation_strategy: InvocationStrategies = InvocationStrategies.FIRST_MATCH, error_handling_strategy: ErrorHandlingStrategies = ErrorHandlingStrategies.BUBBLE)

A self-contained event processor.

add_subprocessor(subprocessor: EventProcessor)

Add a subprocessor to this event processor

Parameters:

subprocessor – The other event processor to add

add_subprocessors(*subprocessors: EventProcessor)

Add multiple subprocessors at once.

Parameters:

subprocessors – The tuple of subprocessors

add_subprocessors_in_package(package: module)

Add all the processors found in all modules of a package as subprocessors.

Note that you should specify an actual package, and not just the name of a package.

Parameters:

package – The package that should be searched for event processors

invoke(event: Dict) Result | List[Result]

Invoke the correct processor for an event.

There may be multiple processors invoked, depending on the invocation strategy.

Parameters:

event – The event to find a processor for

Returns:

The return value of the processor

processor(event_filter: Filter, rank: int = 0)

Register a new processor with the given filter and rank.

Parameters:
  • event_filter – The filter for which to match events

  • rank – This processor’s rank (when there are multiple matches for a single event)

Invocation Strategies

Contains the different invocation strategies for calling processors.

class src.event_processor.invocation_strategies.AllMatches(error_handling_strategy: ~src.event_processor.error_handling_strategies.ErrorHandlingStrategy = <src.event_processor.error_handling_strategies.Bubble object>)

Strategy calling all matching processors.

invoke(matching: List[Callable], event: Event | None = None, cache: Dict | None = None) List[Result]

Invoke one or multiple matching processors.

class src.event_processor.invocation_strategies.FirstMatch(error_handling_strategy: ~src.event_processor.error_handling_strategies.ErrorHandlingStrategy = <src.event_processor.error_handling_strategies.Bubble object>)

Strategy calling the first matching processor.

invoke(matching: List[Callable], event: Event | None = None, cache: Dict | None = None) Result

Invoke one or multiple matching processors.

class src.event_processor.invocation_strategies.InvocationStrategies(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)

Enumeration of available invocation strategies.

class src.event_processor.invocation_strategies.InvocationStrategy(error_handling_strategy: ~src.event_processor.error_handling_strategies.ErrorHandlingStrategy = <src.event_processor.error_handling_strategies.Bubble object>)

Class defining an abstract invocation strategy.

abstract invoke(matching: List[Callable], event: Event | None = None, cache: Dict | None = None) Result | List[Result]

Invoke one or multiple matching processors.

class src.event_processor.invocation_strategies.NoMatches(error_handling_strategy: ~src.event_processor.error_handling_strategies.ErrorHandlingStrategy = <src.event_processor.error_handling_strategies.Bubble object>)

Strategy not calling any matching processors.

invoke(matching: List[Callable], event: Event | None = None, cache: Dict | None = None) Result

Invoke one or multiple matching processors.

class src.event_processor.invocation_strategies.NoMatchesStrict(error_handling_strategy: ~src.event_processor.error_handling_strategies.ErrorHandlingStrategy = <src.event_processor.error_handling_strategies.Bubble object>)

Strategy failing when there are multiple matching.

invoke(matching: List[Callable], event: Event | None = None, cache: Dict | None = None) Result

Invoke one or multiple matching processors.

Results

class src.event_processor.result.Result(processor_name: str, returned_value: Any | None = None, raised_exception: Exception | None = None)

A result is what gets returned after an invocation.

It contains information about the processor as well as its return value.

Exceptions

Exceptions for event processor.

exception src.event_processor.exceptions.DependencyError

Exceptions for failures while resolving dependencies.

exception src.event_processor.exceptions.EventProcessorError

General exception for the event-processor library.

exception src.event_processor.exceptions.FilterError

Exception for failures related to filters.

exception src.event_processor.exceptions.InvocationError

Exception for failures in invocation.

exception src.event_processor.exceptions.NoValueError

Exception for when a value is not present in a given context.

Filtering

Contains many different filters to conveniently filter through events.

class src.event_processor.filters.Accept

Accept any event (good for default processors).

matches(_event: dict) bool

Test whether a given event matches an input event.

Parameters:

event – The event to test

Returns:

True if the event matches, False otherwise

class src.event_processor.filters.And(*args: Filter)

Accept events that get accepted by all specified filters.

matches(event: dict) bool

Test whether a given event matches an input event.

Parameters:

event – The event to test

Returns:

True if the event matches, False otherwise

class src.event_processor.filters.Dyn(resolver: Callable, inject_as: str | None = None)

Accept events based on a dynamic condition which is resolved by a callable.

This filter also allows dynamically changing the event by adding the returned value of the resolver to the event under a user-supplied key. This will overwrite a previously existing key.

Note that the equality check with this filter is a bit special. The filter will only be equal if the resolver is the same (two functions with the same code are not equal, they need to be the same object in memory).

This means that when using this filter, you should be careful to either re-use the same function as a resolver or make sure the functions have different behaviors. Otherwise, be sure to use processor ranking and invocation strategies.

matches(event: dict) bool

Test whether a given event matches an input event.

Parameters:

event – The event to test

Returns:

True if the event matches, False otherwise

class src.event_processor.filters.Eq(path: Any, value: Any)

Accept events where a given value is present at the given key.

matches(event: dict) bool

Test whether a given event matches an input event.

Parameters:

event – The event to test

Returns:

True if the event matches, False otherwise

class src.event_processor.filters.Exists(path: Any)

Accept event where a given key exists.

matches(event: dict) bool

Test whether a given event matches an input event.

Parameters:

event – The event to test

Returns:

True if the event matches, False otherwise

class src.event_processor.filters.Filter

Abstract filter to define the filter interface.

abstract matches(event: dict) bool

Test whether a given event matches an input event.

Parameters:

event – The event to test

Returns:

True if the event matches, False otherwise

class src.event_processor.filters.Geq(path: Any, value: int | float)

Accept events where the value exists and is greater than or equal to the specified value.

class src.event_processor.filters.Gt(path: Any, value: int | float)

Accept events where the value exists and is greater than the specified value.

class src.event_processor.filters.Leq(path: Any, value: int | float)

Accept events where the value at the given path exists is less than or equal to the specified value.

class src.event_processor.filters.Lt(path: Any, value: int | float)

Accept events where the value at the given path exists and is less than the specified value.

class src.event_processor.filters.NumCmp(path: Any, comparator: Callable[[float, float], bool], target: float)

Accept events when the comparator returns True.

If you use this processor, make sure that you don’t use equal (and not identical) comparators for the same path. For example, don’t use the same lambda in two different places. Instead, use a function, and pass a reference to that function. If you don’t do that, the filters will effectively be different (even if they match the same thing), leading to perhaps unexpected results.

matches(event: dict) bool

Test whether a given event matches an input event.

Parameters:

event – The event to test

Returns:

True if the event matches, False otherwise

class src.event_processor.filters.Or(*args: Filter)

Accept events that get accepted by at least one specified filter.

matches(event: dict) bool

Test whether a given event matches an input event.

Parameters:

event – The event to test

Returns:

True if the event matches, False otherwise

Dependency Injection

Dependency injection and management facilities.

src.event_processor.dependencies.Depends(callable_: Callable[[...], DependsReturn], cache: bool = True) DependsReturn

Create a dependency.

class src.event_processor.dependencies.Event(dict_event: dict)

Type to wrap a dict to be used as a dependency.

src.event_processor.dependencies.call_with_injection(callable_: Callable, event: Event | None = None, cache: dict | None = None) Any | None

Call a callable and inject required dependencies.

Note that keyword args that have the same name as the parameter used for a dependency will be overwritten with the dependency’s injected value.

Parameters:
  • callable – The callable to call

  • event – The event for the current invocation

  • cache – The dependency cache to use

Returns:

The return value of the callable

src.event_processor.dependencies.get_event_dependencies(callable_: Callable) List[str]

Get the parameter names for event dependencies.

Parameters:

callable – The callable for which to get dependencies

Returns:

A list of the parameters requiring the event

src.event_processor.dependencies.get_pydantic_dependencies(callable_: Callable) Dict[str, Type[BaseModel]]

Get the required models and their parameter names for a callable.

Parameters:

callable – The callable for which to get dependencies

Returns:

A mapping of argument names to pydantic model types

src.event_processor.dependencies.get_required_dependencies(callable_: Callable) Dict[str, _Depends]

Get the required dependencies for a callable.

Parameters:

callable – The callable for which to get dependencies

Returns:

A mapping of callable argument names to dependencies

src.event_processor.dependencies.get_scalar_value_dependencies(callable_: Callable) List[Parameter]

Get the scalar value dependencies for a callable.

Parameters:

callable – The callable for which to get dependencies

Returns:

A view of the parameters that represent dependencies

src.event_processor.dependencies.resolve(dependency: _Depends, event: Event | None = None, cache: dict | None = None) Tuple[Any | None, bool]

Resolve a dependency into a value.

The resulting values from dependencies are cached and re-used if a cache is supplied and the dependency itself does not explicitly state that it does not want to be cached. Also, any dependency that depends on another dependency where caching has been disabled will also not be cached (because the sub-value may change, which may in turn change the value of the current dependency).

Parameters:
  • dependency – The dependency to resolve

  • event – The event for the current invocation

  • cache – The cache for previously resolved dependencies

Returns:

The tuple (resolved_value, cacheable)

Raises:

pydantic.error_wrappers.ValidationError if the event cannot be parsed into a pydantic model

src.event_processor.dependencies.resolve_scalar_value_dependencies(scalar_dependencies: List[Parameter], event: Event | None) Dict[str, Any]

Resolve the scalar dependencies to values contained in the event.

Values will be resolved differently depending on whether or not pydantic is installed.

Parameters:
  • scalar_dependencies – The dependencies to resolve

  • event – The event from which to get values

Returns:

A new dict with resolved dependency values

src.event_processor.dependencies.resolve_scalar_value_dependencies_with_pydantic(scalar_dependencies: List[Parameter], event: Event) Dict[str, Any]

Resolve the scalar dependencies to values contained in the event with pydantic.

This function does validation for the types of values passed into the event. Since this uses pydantic, it’s possible to use any pydantic types such as PaymentCardNumber, for example.

Parameters:
  • scalar_dependencies – The dependencies to resolve

  • event – The event from which to get values

Returns:

A new dict with resolved and validated dependency values

src.event_processor.dependencies.resolve_scalar_value_dependencies_without_pydantic(scalar_dependencies: List[Parameter], event: Event) Dict[str, Any]

Resolve the scalar dependencies to values contained in the event without using pydantic.

This function does not validate the types of values passed into the event to ensure they match the type annotations of the dependencies. To get validation for those types, make sure pydantic is installed.

Parameters:
  • scalar_dependencies – The dependencies to resolve

  • event – The event from which to get values

Returns:

A new dict with resolved dependency values