-
Notifications
You must be signed in to change notification settings - Fork 2
/
push
executable file
·259 lines (227 loc) · 7.98 KB
/
push
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
#!/usr/bin/env python3
"""
Push a firmware set to an OT-3.
Since we build all the firmware and build a manifest here, we can enforce a
build + install and then just scp dist/applications/* to /usr/lib/firmware.
"""
import argparse
import subprocess
import os
import sys
import shutil
from itertools import chain
from zipfile import ZipFile
import tempfile
from contextlib import contextmanager
import json
import re
_DEFAULT_EXTRAS = {'stdout': sys.stdout, 'stderr': sys.stderr}
_SSH_EXTRA_OPTS = ['-o', 'StrictHostKeyChecking=no',
'-o', 'UserKnownHostsFile=/dev/null']
_ROBOT_MANIFEST_FILE_PATH = "/usr/lib/firmware/opentrons-firmware.json"
TARGETS = [
"pipettes",
"pipettes-rev1",
"pipettes-single",
"pipettes-multi",
"pipettes-96",
"gripper",
"hepa-uv",
"gantry",
"gantry-x",
"gantry-y",
"head",
"rear-panel",
"bootloader",
]
_MULTI_SUBSYSTEM_TARGETS = {
"pipettes": ["pipettes-single", "pipettes-multi", "pipettes-96"],
"gantry": ["gantry-x", "gantry-y"]
}
class CantFindUtilityException(RuntimeError):
def __init__(self, which_util):
self.util = which_util
def _ssh(ssh_util, host, cmd, **extras):
_cmd(
[ssh_util]
+ _SSH_EXTRA_OPTS
+ ['root@{host}'.format(host=host), cmd],
**extras)
def _scp_to_robot(scp_util, host, local, remote, **extras):
_cmd(
[scp_util]
+ _SSH_EXTRA_OPTS
+ [local, f'root@{host}:{remote}'],
**extras
)
def _scp_from_robot(scp_util, host, local, remote, **extras):
_cmd(
[scp_util]
+ _SSH_EXTRA_OPTS
+ [f'root@{host}:{remote}', local],
**extras
)
def _cmd(cmdlist, **extras):
_extras = {k: v for k, v in chain(_DEFAULT_EXTRAS.items(), extras.items())}
print(' '.join(cmdlist))
subprocess.run(cmdlist, **_extras).check_returncode()
@contextmanager
def _controlled_tempdir():
td = tempfile.mkdtemp()
try:
yield td
finally:
shutil.rmtree(td)
def _build_fw(zip_path, apps_path, targets):
if targets:
regex_list = [re.compile(f"{target}" + r"(.*)(.hex|.bin)") for target in targets]
with ZipFile(zip_path, 'w') as zf:
for fname in os.listdir(apps_path):
# only write to zip file to be copied if filename matches target
if any([reg.search(fname) for reg in regex_list]):
zf.write(os.path.join(apps_path, fname), fname)
else:
with ZipFile(zip_path, 'w') as zf:
for fname in os.listdir(apps_path):
# write all image files to zip file
zf.write(os.path.join(apps_path, fname), fname)
def _subsystems_from_targets(targets):
# assuming all targets are valid at this point, convert
# presets that encompass multiple subsystems to their
# respective subsystems
for t in targets:
if t in _MULTI_SUBSYSTEM_TARGETS:
t_index = targets.index(t)
# replace the target with multiple subsystems
targets[t_index:t_index+1] = tuple(_MULTI_SUBSYSTEM_TARGETS[t])
return targets
def _update_shortsha(scp, host, json_data_path, targets):
shortsha = subprocess.check_output(["git", "rev-parse", "--short", "HEAD"]).decode().strip()
# copy data to local file
_scp_from_robot(scp, host, json_data_path, _ROBOT_MANIFEST_FILE_PATH)
with open(json_data_path, 'r+') as output_file:
manifest = json.load(output_file)
for target in _subsystems_from_targets(targets):
manifest['subsystems'][target]['shortsha'] = shortsha
output_file.seek(0)
json.dump(manifest, output_file)
# copy updated subsystem data to the robot
_scp_to_robot(scp, host, json_data_path, _ROBOT_MANIFEST_FILE_PATH)
def _transfer_firmware(host, repo_path, scp, ssh, targets):
dist_dir = "dist"
apps_path = os.path.join(repo_path, dist_dir, 'applications')
with _controlled_tempdir() as td:
local_zip_path = os.path.join(td, 'fw.zip')
robot_zip_path = '/tmp/fw.zip'
_build_fw(local_zip_path, apps_path, targets)
if targets:
local_temp_manifest_path = os.path.join(td, 'temp_manifest.json')
_update_shortsha(scp, host, local_temp_manifest_path, targets)
_scp_to_robot(scp, host, local_zip_path, robot_zip_path)
_ssh(ssh, host, 'unzip -o {zip_path} -d /usr/lib/firmware/'.format(zip_path=robot_zip_path))
_ssh(ssh, host, 'rm {zip_path}'.format(zip_path=robot_zip_path))
def _prep_firmware(repo_path, cmake, targets):
working_dir = "./build-cross"
full_build_preset = "firmware-g4"
if targets:
for target in targets:
_cmd([cmake, '--build', 'build-cross', '--target', f'{target}-images'], cwd=repo_path)
else:
_cmd([cmake, '--build', f'--preset={full_build_preset}', '--target', 'firmware-applications', 'firmware-images'], cwd=repo_path)
_cmd([cmake, '--install', f'{working_dir}', '--component', 'Applications'], cwd=repo_path)
@contextmanager
def _prep_robot(host, ssh):
_ssh(ssh, host, 'mount -o remount,rw /')
try:
yield
finally:
_ssh(ssh, host, 'mount -o remount,ro /')
def _find_utils():
ssh = shutil.which('ssh')
if not ssh:
raise CantFindUtilityException('ssh')
scp = shutil.which('scp')
if not scp:
raise CantFindUtilityException('scp')
cmake = shutil.which('cmake')
if not cmake:
raise CantFindUtilityException('cmake')
return ssh, scp, cmake
def _check_targets(targets):
for t in targets:
if t not in TARGETS:
print(f"preset {t} is not in target options, ignoring")
targets.remove(t)
return targets
def _restart_robot(host, ssh):
_ssh(ssh, host, 'nohup systemctl restart opentrons-robot-server &')
def _do_push(host, repo_path, build, restart, targets):
ssh, scp, cmake = _find_utils()
if targets:
targets = _check_targets(targets)
if build:
_prep_firmware(repo_path, cmake, targets)
with _prep_robot(host, ssh):
_transfer_firmware(host, repo_path, scp, ssh, targets)
if restart:
_restart_robot(host, ssh)
def push(host, repo_path=None, build=True, restart=True, targets=[]):
repo = repo_path or os.dirname(__file__)
try:
_do_push(host, repo, build, restart, targets)
return 0
except subprocess.CalledProcessError as e:
print(
'{cmd}: {returncode}'.format(cmd=e.cmd[0], returncode=e.returncode),
file=sys.stderr)
return -1
except CantFindUtilityException as e:
print(
'Could not find {util}. Is it installed?'.format(util=e.util),
file=sys.stderr)
return -1
def _push_from_argparse(args):
if args.key:
_SSH_EXTRA_OPTS.append('-i')
_SSH_EXTRA_OPTS.append(args.key)
return push(args.host, os.path.abspath(args.repo_path), not args.no_build, not args.no_restart, args.targets)
def _arg_parser(parent=None):
parents = []
if parent:
parents.append(parent)
parser = argparse.ArgumentParser(
prog='push', description='Push firmware to an OT-3', parents=parents)
parser.add_argument(
'host', type=str, help='The host (e.g. IP) of the robot to push to')
parser.add_argument(
'--repo-path',
type=str,
default='.',
help=('The path to the ot3-firmware repo; if not specified, it will be '
'the one containing this file'))
parser.add_argument(
'--no-build',
action='store_true',
help='Skip building firmware'
)
parser.add_argument(
'--no-restart',
action='store_true',
help='Skip restarting robot server'
)
parser.add_argument(
'--key',
type=str,
help='Private SSH key to use'
)
parser.add_argument(
'--targets',
nargs='*'
)
return parser
def _main():
parser = _arg_parser()
args = parser.parse_args()
return _push_from_argparse(args)
if __name__ == '__main__':
sys.exit(_main())