Composable Serializers and non-official Sinks.
It is an extension for the popular QuixStreams package, providing enhanced functionality that doesn't suit the main stream branch.
Easily chain different types of serializers to each other. Including:
- Pydantic Serializers: Converts back-and-forth Pydantic models and dataclasses. Helps writing type safe code.
- Confluent AVRO Serializers: Integrate Confluent Schema Registry.
To install this package, you can use pip:
pip install quixstreams-extension[avro,pydantic]
Here's an example of using composable serializers with QuixStreams:
Imagine you want to write a type safe code and forget asking your self "what was the input schema? let me check examples or logs...". Also, you want to process a topic that contains AVRO messages serialised with a help of Confluent Schema Registry.
So first let’s define our input topic schema, as pydantic model:
from pydantic import BaseModel
class User(BaseModel):
age: int
Now, let's define an input topic with its deserializer:
from confluent_kafka.schema_registry import SchemaRegistryClient
from quixstreams.models import Deserializer
from quixstreams_extensions.serializers.composer import composed
from quixstreams_extensions.serializers.compositions import pydantic, confluent
# Configure the Schema Registry client
schema_registry_client = SchemaRegistryClient({"url": "http://localhost:8081"})
composed_deserializer = (
composed(Deserializer, confluent.to_dict(schema_registry_client), pydantic.to_instance_of(User)),
) # Takes AVRO payload and returns a pydantic model (may fail during pydantic validation)
input = app.topic("input", value_deserializer=composed_deserializer)
Take a look closer to composed_deserializer
. The main entry point is composed(SerialiserClass, *functions)
function,
which accept either base quixstreams.models.Deserializer
class or quixstreams.models.Serializer
class (subclasses are allowed),
then it accept a series of composed callable which will be called sequentially to achieve a final result. As we can see in our example above it:
- creates composable Deserializer, which first
- take AVRO payload and convert it to python dictionary, with the help of
SchemaRegistryClient
, then it - take python dict and convert it to
User
instance, which now can to used in pipeline.
Now we can use them in the app that defines business logic:
from pydantic_avro import AvroBase
class EnhancedUser(AvroBase): # output data model
age: int
prefer: Literal["quix-streaming", "sleeping", "hiking"]
def adults_only(user: User):
return user.age > 18
def enhance(user: User) -> EnhancedUser:
return EnhancedUser(age=user.age, prefer="quix-streaming" if user.age < 99 else "sleeping")
sdf = app.dataframe(input)
sdf = sdf.filter(adults_only).print()
sdf = sdf.apply(enhance)
The pipeline has two processing functions, both of them are type safe, with help of pydantic (we could achieve the same with dataclasses of course).
The EnhancedUser
is our output data model, inherits from AvroBase(pydantic.BaseModel)
, it leverages pydantic-avro that will be useful later.
Finally let's push our enhanced data into another AVRO topic:
output = app.topic(
"output",
value_serializer=composed(
Serializer,
pydantic.to_dict,
confluent.to_avro(
schema_registry_client, EnhancedUser.avro_schema()
), # Takes Pydantic model and convert into AVRO, to be ready for publishing
),
)
sdf = sdf.to_topic(output).print()
if __name__ == "__main__":
app.run(sdf)
We we've got a composed serializer that:
pydantic.to_dict
takes pydantic model and converts it to python dictconfluent.to_avro
takes the dict and converts it to AVRO with help of ConfluentSchemaRegistryClient
and generated AVRO schema byEnhancedUser.avro_schema()
- by default
schema_registry_client
will try to register AVRO schema in its registry; with time being and schema evolving it may crash due to migration policy
- by default
Please discover examples/
folder for more information.