Pre-Processing Guide¶
Pre-processors are useful when you want to modify the input event before passing it on to your processor. It’s mostly a convenience feature, because processors can always just accept the event and use it directly. Though, because pre-processors are any function, they can also fetch additional values not present in the event.
Structural Pre-Processing¶
One use of pre-processors is to change the structure of input events to make them more convenient to manipulate for processors. For example, you could turn an input event into a dataclass:
from dataclasses import dataclass
from typing import Any, Dict
from event_processor import EventProcessor
event_processor = EventProcessor()
@dataclass
class User:
name: str
email: str
role: str
def event_to_user(event: Dict) -> User: # This is a pre-processor
user = event["user"]
return User(
name=user["name"],
email=user["email"],
role=user["role"]
)
@event_processor.processor(
{"user.name": Any, "user.email": Any, "user.role": Any},
pre_processor=event_to_user
)
def my_processor(user: User): # The processor takes a User
return user.role == "admin"
print(
event_processor.invoke({"user": {"name": "John", "email": "john@example.com", "role": "admin"}}),
event_processor.invoke({"user": {"name": "Bob", "email": "bob@example.com", "role": "user"}}),
)
True False
Data Pre-Processing¶
Another use of pre-processors is to fetch additional external data from, realistically, any source you could imagine. This can also be combined with dependencies to create dynamic pre-processors that can fetch data from external sources. Here’s an example:
from event_processor import EventProcessor
event_processor = EventProcessor()
@dataclass
class User:
name: str
email: str
role: str
class FakeDbClient:
database = {
"admin@example.com": {"role": "admin", "name": "John"},
"user@example.com": {"role": "user", "name": "Bob"}
}
def fetch_by_email(self, email: str) -> User:
user = self.database.get(email, {})
return User(
name=user["name"],
email=email,
role=user["role"]
)
def event_to_user(event: Dict, db_client: FakeDbClient) -> User:
email = event["user"]["email_3"]
user = db_client.fetch_by_email(email=email)
return user
@event_processor.dependency_factory
def db_client(_name: str) -> FakeDbClient:
return FakeDbClient()
@event_processor.processor(
{"user.email_3": Any},
pre_processor=event_to_user,
db_client=("my_db",)
)
def my_processor(user: User):
return user.role == "admin"
print(
event_processor.invoke({"user": {"email_3": "user@example.com"}}),
event_processor.invoke({"user": {"email_3": "admin@example.com"}})
)
False True
For more details on dependency injection, see the Dependency Injection Guide. The gist is that you can specify dependencies in the decorator, and they will automatically be injected into either the processor, pre-processor, or both, depending on the parameters.
Bigger Data Pre-Processing Example¶
from dataclasses import dataclass
from typing import Any, Dict
from event_processor import EventProcessor
event_processor = EventProcessor()
class FakeDynamoClient:
database = {
"users": [
{"Email": {"S": "user@example.com"}, "Role": {"S": "user"}},
{"Email": {"S": "admin@example.com"}, "Role": {"S": "admin"}}
]
}
def get_item(self, TableName="", Key={}):
table = self.database.get(TableName, {})
key_name = list(Key.keys())[0]
record = [e for e in table if e[key_name]["S"] == Key[key_name]["S"]][0]
return {"Item": record}
@dataclass
class User:
email: str
role: str
@event_processor.dependency_factory
def boto_clients(client_name: str) -> FakeDynamoClient:
if client_name == "dynamodb":
return FakeDynamoClient()
else:
raise NotImplementedError()
# Uses the dynamodb client specified in the processor decorator
def event_to_user(event: Dict, dynamodb_client: FakeDynamoClient):
email = event["user"]["email"]
response = dynamodb_client.get_item(
TableName="users",
Key={"Email": {"S": email}}
)
role = response["Item"]["Role"]["S"]
return User(email=email, role=role)
# Does not use the dynamodb client, but needs it for pre-processing
@event_processor.processor(
{"user.email": Any},
pre_processor=event_to_user,
boto_clients=("dynamodb",)
)
def my_processor(user: User):
return user.role == "admin"
print(
event_processor.invoke({"user": {"email": "user@example.com"}}),
event_processor.invoke({"user": {"email": "admin@example.com"}})
)
False True