Skip to content

Commit

Permalink
Rethink serializers composition
Browse files Browse the repository at this point in the history
Prefer composed instances rather than composed classes.

Signed-off-by: Volodymyr Trotsyshyn <[email protected]>
  • Loading branch information
devova committed Aug 11, 2024
1 parent 4dc0937 commit f48e941
Show file tree
Hide file tree
Showing 31 changed files with 708 additions and 644 deletions.
124 changes: 64 additions & 60 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
# QuixStreams Extensions

Holds chainable serializers and utils for railway-oriented programming.
Composable Serializers and non-official Sinks.

# QuixStreams Extension Package

This Python package is an extension for the popular [QuixStreams](https://quix.io/docs/get-started/welcome.html) package,
providing enhanced functionality with chainable serializers.
These serializers allow you to chain different types to each other seamlessly.
It is an extension for the popular [QuixStreams](https://quix.io/docs/get-started/welcome.html) package,
providing enhanced functionality that doesn't suit the main stream branch.

## Key Features

- **Chainable Serializers**: Easily chain different types of serializers to each other.
- **Pydantic Serializers**: Chain serializers for Pydantic models.
- **AVRO Serializers**: Integrate Confluent Schema Registry AVRO serializers into your chains.
- **Rail-Well-Oriented Programming Serializes**: Use serializers designed for railway-oriented Programming. Based on [returns](https://returns.readthedocs.io/en/latest/index.html)
### Chainable Serializers
Easily chain different types of serializers to each other. Including:

- **Pydantic Serializers**: Converts back-and-forth Pydantic models and dataclasses. Helps writing type safe code.
- **Confluent AVRO Serializers**: Integrate Confluent Schema Registry.


## Installation

To install this package, you can use pip:
Expand All @@ -23,86 +22,91 @@ To install this package, you can use pip:
pip install quixstreams-extension[avro,pydantic]
```

## Usage
Here's an example of how to use the chainable serializers with QuixStreams:
## Quick start
Here's an example of using composable serializers with [QuixStreams](https://quix.io/docs/get-started/welcome.html):

First let’s define our serializers:
Imagine you want to write a type safe code and forget asking your self "what was the input schema? let me check examples or logs...".
Also, you want to process a topic that contains [AVRO](https://avro.apache.org/) messages serialised with a help of [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html).

So first let’s define our input topic schema, as pydantic model:
```python
from confluent_kafka.schema_registry import SchemaRegistryClient
from pydantic_avro import AvroBase
from quixstreams.models import (
BytesDeserializer,
)
from quixstreams_extensions.models.serializers.confluent_schema_registry.avro import (
AVROSerializer,
)
from quixstreams_extensions.models.chains import pydantic
from quixstreams_extensions.models.chains.confluent_schema_registry import avro
from pydantic import BaseModel

class User(BaseModel):
age: int
```

Now, let's define an input topic with its deserializer:

class AVROPydanticDeserializer(avro.ToDict, pydantic.FromDict, BytesDeserializer):
"""
Takes AVRO payload form input topic and returns a pydantic model (may fail during pydantic validation)
"""
```python
from confluent_kafka.schema_registry import SchemaRegistryClient
from quixstreams.models import Deserializer
from quixstreams_extensions.serializers.composer import composed
from quixstreams_extensions.serializers.compositions import pydantic, confluent

# Configure the Schema Registry client
schema_registry_client = SchemaRegistryClient({"url": "http://localhost:8081"})

class PydanticAVROSerializer(pydantic.ToDict, AVROSerializer):
"""
Takes Pydantic model and convert into AVRO, to be ready for publishing
"""
composed_deserializer = (
composed(Deserializer, confluent.to_dict(schema_registry_client), pydantic.to_instance_of(User)),
) # Takes AVRO payload and returns a pydantic model (may fail during pydantic validation)

def __init__(self, schema_registry_client: SchemaRegistryClient, model_class: Type[BaseModel]):
super().__init__(schema_registry_client, json.dumps(model_class.avro_schema()), model_class)
input = app.topic("input", value_deserializer=composed_deserializer)
```

Then we can use them in the app:
Take a look closer to `composed_deserializer`. The main entry point is `composed(SerialiserClass, *functions)` function,
which accept either base `quixstreams.models.Deserializer` class or `quixstreams.models.Serializer` class (subclasses are allowed),
then it accept a series of composed callable which will be called sequentially to achieve a final result. As we can see in our example above it:
- creates composable Deserializer, which first
- take AVRO payload and convert it to python dictionary, with the help of `SchemaRegistryClient`, then it
- take python dict and convert it to `User` instance, which now can to used in pipeline.

Now we can use them in the app that defines business logic:
```python
from confluent_kafka.schema_registry import SchemaRegistryClient
from pydantic_avro import AvroBase
from quixstreams import Application

# Create an Application - the main configuration entry point
app = Application(...)

# Configure the Schema Registry client
schema_registry_client = SchemaRegistryClient(...)

class User(AvroBase):
age: int

class EnhancedUser(AvroBase):
class EnhancedUser(AvroBase): # output data model
age: int
prefer: Literal["quix-streaming", "sleeping", "hiking"]

# Define the input topic
input = app.topic(
"input",
value_deserializer=AVROPydanticDeserializer(schema_registry_client, User),
)

# Define the output topics
output = app.topic(
"output",
value_serializer=PydanticAVROSerializer(schema_registry_client, EnhancedUser),
)


def adults_only(user: User):
return user.age > 18


def enhance(user: User):
def enhance(user: User) -> EnhancedUser:
return EnhancedUser(age=user.age, prefer="quix-streaming" if user.age < 99 else "sleeping")


sdf = app.dataframe(input)
sdf = sdf.filter(adults_only).print()
sdf = sdf.apply(enhance)
```
The pipeline has two processing functions, both of them are type safe, with help of pydantic (we could achieve the same with dataclasses of course).
The `EnhancedUser` is our output data model, inherits from `AvroBase(pydantic.BaseModel)`, it leverages [pydantic-avro](https://github.com/godatadriven/pydantic-avro) that will be useful later.

Finally let's push our enhanced data into another AVRO topic:
```python
output = app.topic(
"output",
value_serializer=composed(
Serializer,
pydantic.to_dict,
confluent.to_avro(
schema_registry_client, EnhancedUser.avro_schema()
), # Takes Pydantic model and convert into AVRO, to be ready for publishing
),
)

sdf = sdf.to_topic(output).print()

if __name__ == "__main__":
app.run(sdf)

```
We we've got a composed serializer that:
- `pydantic.to_dict` takes pydantic model and converts it to python dict
- `confluent.to_avro` takes the dict and converts it to AVRO with help of Confluent `SchemaRegistryClient` and generated AVRO schema by `EnhancedUser.avro_schema()`
- by default `schema_registry_client` will try to register AVRO schema in its registry;
with time being and schema evolving it may crash due to migration [policy](https://docs.confluent.io/platform/current/schema-registry/index.html#compatibility-and-schema-evolution)

Please discover `examples/` folder for more information.
6 changes: 6 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Examples

Before you start running any example, please run:
```shell
docker compose up
```
40 changes: 16 additions & 24 deletions examples/pydantic_avro/consumer.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import json
from typing import Type, Literal
from typing import Literal

from confluent_kafka.schema_registry import SchemaRegistryClient
from pydantic_avro import AvroBase
from quixstreams import Application
from quixstreams.models import BytesDeserializer
from quixstreams.models import Deserializer, Serializer

from quixstreams_extensions.models.chains import loggers, pydantic
from quixstreams_extensions.models.chains.confluent_schema_registry import avro
from quixstreams_extensions.models.serializers.confluent_schema_registry.avro import AVROSerializer
from quixstreams_extensions.serializers.composer import composed
from quixstreams_extensions.serializers.compositions import pydantic, confluent

# Create an Application - the main configuration entry point
app = Application(broker_address="localhost:9092", consumer_group="pydantic_avro")
Expand All @@ -26,45 +24,39 @@ class EnhancedUser(AvroBase):
prefer: Literal["quix-streaming", "sleeping", "hiking"]


class AVROPydanticDeserializer(avro.ToDict, pydantic.FromDict, BytesDeserializer):
"""
Takes AVRO payload form input topic and returns a pydantic model (may fail during pydantic validation)
"""


class PydanticAVROSerializer(pydantic.ToDict, loggers.Logged, AVROSerializer):
"""
Takes Pydantic model and convert into AVRO, to be ready for publishing
"""

def __init__(self, schema_registry_client: SchemaRegistryClient, model_class: Type[AvroBase]):
super().__init__(schema_registry_client, json.dumps(model_class.avro_schema()), model_class)


# Define the input topic
input = app.topic(
"input",
value_deserializer=AVROPydanticDeserializer(schema_registry_client, User),
value_deserializer=composed(
Deserializer, confluent.to_dict(schema_registry_client), pydantic.to_instance_of(User)
), # Takes AVRO payload and returns a pydantic model (may fail during pydantic validation)
)

# Define the output topics
output = app.topic(
"output",
value_serializer=PydanticAVROSerializer(schema_registry_client, EnhancedUser),
value_serializer=composed(
Serializer,
pydantic.to_dict,
confluent.to_avro(
schema_registry_client, EnhancedUser.avro_schema()
), # Takes Pydantic model and convert into AVRO, to be ready for publishing
),
)


def adults_only(user: User):
return user.age > 18


def enhance(user: User):
def enhance(user: User) -> EnhancedUser:
return EnhancedUser(age=user.age, prefer="quix-streaming" if user.age < 99 else "sleeping")


sdf = app.dataframe(input)
sdf = sdf.filter(adults_only)
sdf = sdf.apply(enhance)
sdf = sdf.print()
sdf = sdf.to_topic(output)

if __name__ == "__main__":
Expand Down
21 changes: 6 additions & 15 deletions examples/pydantic_avro/producer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import json
from typing import Type

from confluent_kafka.schema_registry import SchemaRegistryClient
from pydantic_avro import AvroBase
from quixstreams import Application
from quixstreams.models import Serializer

from quixstreams_extensions.models.chains import pydantic
from quixstreams_extensions.models.serializers.confluent_schema_registry.avro import AVROSerializer
from quixstreams_extensions.serializers.composer import composed
from quixstreams_extensions.serializers.compositions import confluent, pydantic

# Create an Application - the main configuration entry point
app = Application(broker_address="localhost:9092", consumer_group="pydantic_avro")
Expand All @@ -15,23 +13,16 @@
schema_registry_client = SchemaRegistryClient({"url": "http://localhost:8081"})


class PydanticAVROSerializer(pydantic.ToDict, AVROSerializer):
"""
Takes Pydantic model and convert into AVRO, to be ready for publishing
"""

def __init__(self, schema_registry_client: SchemaRegistryClient, model_class: Type[AvroBase]):
super().__init__(schema_registry_client, json.dumps(model_class.avro_schema()), model_class)


class User(AvroBase):
age: int
name: str


messages_topic = app.topic(
"input",
value_serializer=PydanticAVROSerializer(schema_registry_client, User),
value_serializer=composed(
Serializer, pydantic.to_dict, confluent.to_avro(schema_registry_client, User.avro_schema())
),
)

messages = [
Expand Down
Loading

0 comments on commit f48e941

Please sign in to comment.