Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add writing setting #85

Conversation

RemcoMeeuwissen
Copy link
Contributor

What I am changing

In our usage of tipg (lambdas and Aurora on AWS) we're connecting to a database endpoint which is read-only, so I've added a setting for disabling the writing of the functions and the custom SQL to the database.

I haven't added docs yet, but at this stage I wanted to check in if you're interested in merging this and if so if the setting is in the right category or if there would be a better category (e.g. PostgresSettings)

# Register TiPG functions in `pg_temp`
await conn.execute(DB_CATALOG_FILE.read_text())
# Register TiPG functions in `pg_temp`
await conn.execute(DB_CATALOG_FILE.read_text())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think tipg is going to work if you don't have this in!

if database_settings.write_functions:
# Register custom SQL functions/table/views in pg_temp
for sqlfile in self.user_sql_files:
await conn.execute(sqlfile.read_text())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure we need this 🤷 we will register (in pg_temp) functions that are passed by the user so having both options is kinda weird to me

@vincentsarago
Copy link
Member

tipg should be read-only. We don't really write to the database but use the pg_temp schema to register functions (user custom functions and tipg internal functions) dynamically. Those get deleted when the connection is deleted.

@RemcoMeeuwissen
Copy link
Contributor Author

RemcoMeeuwissen commented Jul 4, 2023

tipg should be read-only. We don't really write to the database but use the pg_temp schema to register functions (user custom functions and tipg internal functions) dynamically. Those get deleted when the connection is deleted.

There's another pull request I have waiting to allow the use of schemas other than pg_temp (because Aurora doesn't allow the use of pg_temp) to help with that issue, but the user should indeed create the functions by hand when using this setting and this should be specified in the docs as well.

I do realize that this is relatively niche issue, there won't be many users who have a read-only db connection or have a db user with stripped down rights but sadly we are one of them. Having this merged helps us insofar that we don't have to maintain our own fork. Then we only have the work of manually keeping the SQL functions up to date 🙂

@vincentsarago
Copy link
Member

because Aurora doesn't allow the use of pg_temp

Ah I see 😭

I do realize that this is relatively niche issue, there won't be many users who have a read-only db connection or have a db user with stripped down rights but sadly we are one of them. Having this merged helps us insofar that we don't have to maintain our own fork. Then we only have the work of manually keeping the SQL functions up to date 🙂

TBH you don't have to maintain your own fork but create a customized application using something like this 👇

import datetime
from typing import Any, List, Optional

import asyncpg
import orjson
from fastapi import FastAPI

from tipg.collections import Catalog, Collection
from tipg.settings import TableSettings, PostgresSettings


async def get_collection_index(  # noqa: C901
    db_pool: asyncpg.BuildPgPool,
    schemas: Optional[List[str]] = None,
    tables: Optional[List[str]] = None,
    exclude_tables: Optional[List[str]] = None,
    exclude_table_schemas: Optional[List[str]] = None,
    functions: Optional[List[str]] = None,
    exclude_functions: Optional[List[str]] = None,
    exclude_function_schemas: Optional[List[str]] = None,
    spatial: bool = True,
) -> Catalog:
    """Fetch Table and Functions index."""
    schemas = schemas or ["public"]

    # Replace pg_temp by public
    query = """
        SELECT public.tipg_catalog(
            :schemas,
            :tables,
            :exclude_tables,
            :exclude_table_schemas,
            :functions,
            :exclude_functions,
            :exclude_function_schemas,
            :spatial
        );
    """  # noqa: W605

    async with db_pool.acquire() as conn:
        rows = await conn.fetch_b(
            query,
            schemas=schemas,
            tables=tables,
            exclude_tables=exclude_tables,
            exclude_table_schemas=exclude_table_schemas,
            functions=functions,
            exclude_functions=exclude_functions,
            exclude_function_schemas=exclude_function_schemas,
            spatial=spatial,
        )

        catalog: Dict[str, Collection] = {}
        table_settings = TableSettings()
        table_confs = table_settings.table_config
        fallback_key_names = table_settings.fallback_key_names

        for row in rows:
            table = row[0]
            table_id = table["schema"] + "." + table["name"]
            confid = table["schema"] + "_" + table["name"]

            # Replace pg_temp by public
            if table_id == "public.tipg_catalog":
                continue

            table_conf = table_confs.get(confid, {})

            # Make sure that any properties set in conf exist in table
            properties = sorted(table.get("properties", []), key=lambda d: d["name"])
            properties_setting = table_conf.get("properties", [])
            if properties_setting:
                properties = [p for p in properties if p["name"] in properties_setting]

            # ID Column
            id_column = table_conf.get("pk") or table.get("pk")
            if not id_column and fallback_key_names:
                for p in properties:
                    if p["name"] in fallback_key_names:
                        id_column = p["name"]
                        break

            datetime_column = None
            geometry_column = None

            for c in properties:
                if c.get("type") in ("timestamp", "timestamptz"):
                    if (
                        table_conf.get("datetimecol") == c["name"]
                        or datetime_column is None
                    ):
                        datetime_column = c

                if c.get("type") in ("geometry", "geography"):
                    if (
                        table_conf.get("geomcol") == c["name"]
                        or geometry_column is None
                    ):
                        geometry_column = c

            catalog[table_id] = Collection(
                type=table["entity"],
                id=table_id,
                table=table["name"],
                schema=table["schema"],
                description=table.get("description", None),
                id_column=id_column,
                properties=properties,
                datetime_column=datetime_column,
                geometry_column=geometry_column,
                parameters=table.get("parameters", []),
            )

        return Catalog(collections=catalog, last_updated=datetime.datetime.now())


async def register_collection_catalog(app: FastAPI, **kwargs: Any) -> None:
    """Register Table catalog."""
    app.state.collection_catalog = await get_collection_index(app.state.pool, **kwargs)


class connection_factory:
    """Connection creation."""

    schemas: List[str]
    user_sql_files: List[pathlib.Path]

    def __init__(
        self,
        schemas: Optional[List[str]] = None,
    ) -> None:
        """Init."""
        self.schemas = schemas or []

    async def __call__(self, conn: asyncpg.Connection):
        """Create connection."""
        await conn.set_type_codec(
            "json", encoder=orjson.dumps, decoder=orjson.loads, schema="pg_catalog"
        )
        await conn.set_type_codec(
            "jsonb", encoder=orjson.dumps, decoder=orjson.loads, schema="pg_catalog"
        )

        # Note: we add `pg_temp as the first element of the schemas list to make sure
        # we register the custom functions and `dbcatalog` in it.
        schemas = ",".join(["pg_temp", *self.schemas])
        logger.debug(f"Looking for Tables and Functions in {schemas} schemas")

        await conn.execute(
            f"""
            SELECT set_config(
                'search_path',
                '{schemas},' || current_setting('search_path', false),
                false
                );
            """
        )


async def connect_to_db(
    app: FastAPI,
    settings: Optional[PostgresSettings] = None,
    schemas: Optional[List[str]] = None,
    **kwargs,
) -> None:
    """Connect."""
    if not settings:
        settings = PostgresSettings()

    con_init = connection_factory(schemas)

    app.state.pool = await asyncpg.create_pool_b(
        settings.database_url,
        min_size=settings.db_min_conn_size,
        max_size=settings.db_max_conn_size,
        max_queries=settings.db_max_queries,
        max_inactive_connection_lifetime=settings.db_max_inactive_conn_lifetime,
        init=con_init,
        **kwargs,
    )


async def close_db_connection(app: FastAPI) -> None:
    """Close connection."""
    await app.state.pool.close()

I understand this is not ideal

@RemcoMeeuwissen
Copy link
Contributor Author

We did go down that route, but that also ended up with us basically maintaining our own fork. So these changes (if they're not merged) minimize the amount of work to keep our fork in sync with the main version due to sharing the same git history and us not touching any of the actual logic.

@vincentsarago
Copy link
Member

👍 let see what @bitner thinks

@vincentsarago
Copy link
Member

We talked internally with @bitner, right now we are trying to focus on making tipg working well with Postgres and Aurora is not really Postgres but Postgres compatible.

I think the best way forward is to create a kinda tipg backend extensions. I can start a PR to see how this will look.

Note: a backend should just provide a Catalog

class Catalog(TypedDict):
    """Collection Catalog."""

    collections: Dict[str, Collection]
    last_updated: datetime.datetime

The Collection should be a child object from tipg.collections.Collection

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants