-
Notifications
You must be signed in to change notification settings - Fork 0
/
cp.py
110 lines (85 loc) · 3.43 KB
/
cp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from collections import OrderedDict
from io import IOBase
FIELD_SEP = ';'
DEFINITON_TABLE_NAME = 'Definition'
class CPTable:
def __init__(self, name, fields=[]):
self.name = name
self.fields = fields
self.lines = []
def add_line(self, fields):
if not isinstance(fields, list):
raise ValueError('fields must be a list')
self.lines.append(fields)
def get_field(self, line_nr, field):
if isinstance(field, int):
return self.lines[line_nr][field]
else:
return self.lines[line_nr][self.get_field_index(field)]
def get_field_index(self, field_name):
return self.fields.index(str(field_name))
class CPFile:
def __init__(self, afile=None):
self.tables = OrderedDict()
if not isinstance(afile, IOBase) and afile:
afile = open(str(afile), 'r')
if afile is not None:
self.load_from_stream(afile)
def add_table(self, table):
self.tables[table.name] = table
def has_table(self, name):
return name in self.tables.keys()
def get_table(self, name, create=False):
if not self.has_table(name):
if create:
self.tables[name] = CPTable(name)
else:
return None
return self.tables[name]
def delete_table(self, name):
if self.has_table(name):
del self.tables[name]
def save_to_stream(self, stream):
if not isinstance(stream, IOBase):
raise ValueError('can only write to stream')
if len(self.tables) == 0:
return
stream.write('[' + DEFINITON_TABLE_NAME + ']\n')
for name, table in self.tables.items():
stream.write('{0}={1}\n'.format(name,
FIELD_SEP.join(table.fields)))
for name, table in self.tables.items():
self.save_table_to_stream(stream, table)
def save_table_to_stream(self, stream, table):
stream.write('\n[{0}]\n'.format(table.name))
for fields in table.lines:
stream.write('{0}\n'.format(FIELD_SEP.join(fields)))
def load_from_stream(self, stream):
if not isinstance(stream, IOBase):
raise ValueError('can only read from streams')
current_table = None
field_definitions = {}
# load all tables but the Definition table
while True:
line = stream.readline()
if len(line) == 0 or len(line.strip()) == 0:
if current_table and current_table.name != DEFINITON_TABLE_NAME:
self.add_table(current_table)
if len(line) == 0:
break
line = line.strip()
if line.startswith('[') and line.endswith(']'):
table_name = line.strip('[]')
current_table = CPTable(table_name)
elif len(line) > 0 and current_table:
if current_table.name != DEFINITON_TABLE_NAME:
current_table.add_line(line.split(';'))
else:
(table_name, fields) = line.split('=', 2)
field_definitions[table_name] = fields.split(FIELD_SEP)
# assign loaded fields to tables (Definition table may occur
# somewhere in the file)
for table_name, fields in field_definitions.items():
table = self.get_table(table_name)
if table:
table.fields = fields