Skip to content

Commit

Permalink
Refactor Unity Catalog to fetch catalog/schema/table metadata from Sy…
Browse files Browse the repository at this point in the history
…stem tables (#1022)
  • Loading branch information
mars-lan authored Oct 29, 2024
1 parent db18615 commit 955ff89
Show file tree
Hide file tree
Showing 25 changed files with 2,334 additions and 955 deletions.
685 changes: 270 additions & 415 deletions metaphor/unity_catalog/extractor.py

Large diffs are not rendered by default.

95 changes: 72 additions & 23 deletions metaphor/unity_catalog/models.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,8 @@
from typing import Dict, List
from datetime import datetime
from typing import Dict, List, Literal, Optional

from databricks.sdk.service.catalog import ColumnInfo
from pydantic import BaseModel

from metaphor.common.fieldpath import build_schema_field
from metaphor.common.logger import get_logger
from metaphor.models.metadata_change_event import SchemaField

logger = get_logger()


def extract_schema_field_from_column_info(column: ColumnInfo) -> SchemaField:
if column.name is None or column.type_name is None:
raise ValueError(f"Invalid column {column.name}, no type_name found")

field = build_schema_field(
column.name, column.type_name.value.lower(), column.comment
)
field.precision = (
float(column.type_precision)
if column.type_precision is not None
else float("nan")
)
return field


class TableLineage(BaseModel):
upstream_tables: List[str] = []
Expand All @@ -36,3 +15,73 @@ class Column(BaseModel):

class ColumnLineage(BaseModel):
upstream_columns: Dict[str, List[Column]] = {}


class Tag(BaseModel):
key: str
value: str


class CatalogInfo(BaseModel):
catalog_name: str
owner: str
comment: Optional[str] = None
tags: List[Tag]


class SchemaInfo(BaseModel):
catalog_name: str
schema_name: str
owner: str
comment: Optional[str] = None
tags: List[Tag]


class ColumnInfo(BaseModel):
column_name: str
data_type: str
data_precision: Optional[int]
is_nullable: bool
comment: Optional[str] = None
tags: List[Tag]


class TableInfo(BaseModel):
catalog_name: str
schema_name: str
table_name: str
type: str
owner: str
comment: Optional[str] = None
created_at: datetime
created_by: str
updated_at: datetime
updated_by: str
view_definition: Optional[str] = None
storage_location: Optional[str] = None
data_source_format: str
tags: List[Tag] = []
columns: List[ColumnInfo] = []


class VolumeInfo(BaseModel):
catalog_name: str
schema_name: str
volume_name: str
volume_type: Literal["MANAGED", "EXTERNAL"]
full_name: str
owner: str
comment: Optional[str] = None
created_at: datetime
created_by: str
updated_at: datetime
updated_by: str
storage_location: str
tags: List[Tag]


class VolumeFileInfo(BaseModel):
last_updated: datetime
name: str
path: str
size: float
10 changes: 9 additions & 1 deletion metaphor/unity_catalog/profile/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from metaphor.common.entity_id import normalize_full_dataset_name
from metaphor.common.event_util import ENTITY_TYPES
from metaphor.common.fieldpath import build_field_statistics
from metaphor.common.filter import DatasetFilter
from metaphor.common.logger import get_logger
from metaphor.common.utils import safe_float
from metaphor.models.crawler_run_metadata import Platform
Expand All @@ -26,7 +27,6 @@
DatasetLogicalID,
DatasetStatistics,
)
from metaphor.unity_catalog.extractor import DEFAULT_FILTER
from metaphor.unity_catalog.profile.config import UnityCatalogProfileRunConfig
from metaphor.unity_catalog.utils import (
create_api,
Expand All @@ -36,6 +36,14 @@

logger = get_logger()

# Filter out "system" database & all "information_schema" schemas
DEFAULT_FILTER: DatasetFilter = DatasetFilter(
excludes={
"system": None,
"*": {"information_schema": None},
}
)

NON_MODIFICATION_OPERATIONS = {
"SET TBLPROPERTIES",
"ADD CONSTRAINT",
Expand Down
Loading

0 comments on commit 955ff89

Please sign in to comment.