-
Notifications
You must be signed in to change notification settings - Fork 4
/
models.py
129 lines (105 loc) · 3.2 KB
/
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from datetime import datetime
from typing import Dict, List, Literal, Optional
from databricks.sql.types import Row
from pydantic import BaseModel
class TableLineage(BaseModel):
upstream_tables: List[str] = []
class Column(BaseModel):
column_name: str
table_name: str
class ColumnLineage(BaseModel):
upstream_columns: Dict[str, List[Column]] = {}
class Tag(BaseModel):
key: str
value: str
@classmethod
def from_row_tags(cls, tags: Optional[List[Dict]]) -> List["Tag"]:
return [
Tag(key=tag["tag_name"], value=tag["tag_value"])
for tag in (tags if tags is not None else [])
]
class CatalogInfo(BaseModel):
catalog_name: str
owner: str
comment: Optional[str] = None
tags: List[Tag]
class SchemaInfo(BaseModel):
catalog_name: str
schema_name: str
owner: str
comment: Optional[str] = None
tags: List[Tag]
class ColumnInfo(BaseModel):
column_name: str
data_type: str
data_precision: Optional[int]
is_nullable: bool
comment: Optional[str] = None
tags: List[Tag]
@classmethod
def from_row(cls, row: Row) -> List["ColumnInfo"]:
return [
ColumnInfo(
column_name=column["column_name"],
data_type=column["data_type"],
data_precision=column["data_precision"],
is_nullable=column["is_nullable"] == "YES",
comment=column["comment"],
tags=Tag.from_row_tags(column["tags"]),
)
for column in (row["columns"] if row["columns"] is not None else [])
]
class TableInfo(BaseModel):
catalog_name: str
schema_name: str
table_name: str
type: str
owner: str
comment: Optional[str] = None
created_at: datetime
created_by: str
updated_at: datetime
updated_by: str
view_definition: Optional[str] = None
storage_location: Optional[str] = None
data_source_format: str
tags: List[Tag] = []
columns: List[ColumnInfo] = []
@classmethod
def from_row(cls, row: Row) -> "TableInfo":
return TableInfo(
catalog_name=row["catalog_name"],
schema_name=row["schema_name"],
table_name=row["table_name"],
type=row["table_type"],
owner=row["owner"],
comment=row["table_comment"],
created_at=row["created_at"],
created_by=row["created_by"],
updated_at=row["updated_at"],
updated_by=row["updated_by"],
data_source_format=row["data_source_format"],
view_definition=row["view_definition"],
storage_location=row["storage_path"],
tags=Tag.from_row_tags(row["tags"]),
columns=ColumnInfo.from_row(row),
)
class VolumeInfo(BaseModel):
catalog_name: str
schema_name: str
volume_name: str
volume_type: Literal["MANAGED", "EXTERNAL"]
full_name: str
owner: str
comment: Optional[str] = None
created_at: datetime
created_by: str
updated_at: datetime
updated_by: str
storage_location: str
tags: List[Tag]
class VolumeFileInfo(BaseModel):
last_updated: datetime
name: str
path: str
size: float