Skip to content

Commit

Permalink
core: autopilot-manager: Add flight_controller module
Browse files Browse the repository at this point in the history
* Move flight controller / flight controller detector to its own module
  as preparation for platform spliting
  • Loading branch information
JoaoMario109 committed Oct 8, 2024
1 parent 05c99de commit b941c8f
Show file tree
Hide file tree
Showing 20 changed files with 134 additions and 123 deletions.
3 changes: 2 additions & 1 deletion core/services/ardupilot_manager/api/v1/routers/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

from autopilot_manager import AutoPilotManager
from exceptions import InvalidFirmwareFile
from typedefs import Firmware, FlightController, Parameters, Serial, SITLFrame, Vehicle
from flight_controller import FlightController
from typedefs import Firmware, Parameters, Serial, SITLFrame, Vehicle

index_router_v1 = APIRouter(
tags=["index_v1"],
Expand Down
24 changes: 10 additions & 14 deletions core/services/ardupilot_manager/autopilot_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,19 @@
NoPreferredBoardSet,
)
from firmware.FirmwareManagement import FirmwareManager
from flight_controller_detector.Detector import Detector as BoardDetector
from flight_controller_detector.linux.linux_boards import LinuxFlightController
from mavlink_proxy.Endpoint import Endpoint, EndpointType
from mavlink_proxy.exceptions import EndpointAlreadyExists
from mavlink_proxy.Manager import Manager as MavlinkManager
from settings import Settings
from typedefs import (
Firmware,
from flight_controller import (
FlightController,
FlightControllerFlags,
Parameters,
Platform,
PlatformType,
Serial,
SITLFrame,
Vehicle,
)
from flight_controller.detector import FlightControllerDetector
from flight_controller.detector.linux.linux_boards import LinuxFlightController
from mavlink_proxy.Endpoint import Endpoint, EndpointType
from mavlink_proxy.exceptions import EndpointAlreadyExists
from mavlink_proxy.Manager import Manager as MavlinkManager
from settings import Settings
from typedefs import Firmware, Parameters, Serial, SITLFrame, Vehicle


class AutoPilotManager(metaclass=Singleton):
Expand Down Expand Up @@ -309,7 +305,7 @@ def get_available_routers(self) -> List[str]:
return [router.name() for router in self.mavlink_manager.available_interfaces()]

async def start_sitl(self) -> None:
self._current_board = BoardDetector.detect_sitl()
self._current_board = FlightControllerDetector.detect_sitl()
if not self.firmware_manager.is_firmware_installed(self._current_board):
self.firmware_manager.install_firmware_from_params(Vehicle.Sub, self._current_board)
frame = self.load_sitl_frame()
Expand Down Expand Up @@ -419,7 +415,7 @@ async def start_mavlink_manager(self, device: Endpoint) -> None:

@staticmethod
async def available_boards(include_bootloaders: bool = False) -> List[FlightController]:
all_boards = await BoardDetector.detect(True)
all_boards = await FlightControllerDetector.detect(True)
if include_bootloaders:
return all_boards
return [board for board in all_boards if FlightControllerFlags.is_bootloader not in board.flags]
Expand Down
3 changes: 2 additions & 1 deletion core/services/ardupilot_manager/firmware/FirmwareDownload.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
NoCandidate,
NoVersionAvailable,
)
from typedefs import FirmwareFormat, Platform, PlatformType, Vehicle
from flight_controller import Platform, PlatformType
from typedefs import FirmwareFormat, Vehicle

# TODO: This should be not necessary
# Disable SSL verification
Expand Down
3 changes: 2 additions & 1 deletion core/services/ardupilot_manager/firmware/FirmwareInstall.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
from exceptions import FirmwareInstallFail, InvalidFirmwareFile, UnsupportedPlatform
from firmware.FirmwareDownload import FirmwareDownloader
from firmware.FirmwareUpload import FirmwareUploader
from typedefs import FirmwareFormat, FlightController, Platform, PlatformType
from flight_controller import FlightController, Platform, PlatformType
from typedefs import FirmwareFormat


def get_board_id(platform: Platform) -> int:
Expand Down
11 changes: 2 additions & 9 deletions core/services/ardupilot_manager/firmware/FirmwareManagement.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,8 @@
)
from firmware.FirmwareDownload import FirmwareDownloader
from firmware.FirmwareInstall import FirmwareInstaller
from typedefs import (
Firmware,
FirmwareFormat,
FlightController,
Parameters,
Platform,
PlatformType,
Vehicle,
)
from flight_controller import FlightController, Platform, PlatformType
from typedefs import Firmware, FirmwareFormat, Parameters, Vehicle


class FirmwareManager:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import pytest

from firmware.FirmwareDownload import FirmwareDownloader
from typedefs import Platform, Vehicle
from flight_controller import Platform
from typedefs import Vehicle


def test_static() -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from exceptions import InvalidFirmwareFile
from firmware.FirmwareDownload import FirmwareDownloader
from firmware.FirmwareInstall import FirmwareInstaller
from typedefs import FlightController, Platform, Vehicle
from flight_controller import FlightController, Platform
from typedefs import Vehicle


def test_firmware_validation() -> None:
Expand Down
8 changes: 8 additions & 0 deletions core/services/ardupilot_manager/flight_controller/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from flight_controller.flight_controller import (
FlightController,
FlightControllerFlags,
Platform,
PlatformType,
)

__all__ = ["FlightController", "FlightControllerFlags", "Platform", "PlatformType"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from flight_controller.detector.detector import FlightControllerDetector

__all__ = ["FlightControllerDetector"]
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from pydantic import BaseModel

from typedefs import Platform
from flight_controller import Platform


class SerialAttr(str, Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
from commonwealth.utils.general import is_running_as_root
from serial.tools.list_ports_linux import SysFS, comports

from flight_controller_detector.board_identification import identifiers
from flight_controller_detector.linux.detector import LinuxFlightControllerDetector
from typedefs import FlightController, FlightControllerFlags, Platform
from flight_controller import FlightController, FlightControllerFlags, Platform
from flight_controller.detector.board_identification import identifiers
from flight_controller.detector.linux.detector import LinuxFlightControllerDetector


class Detector:
class FlightControllerDetector:
@classmethod
async def detect_linux_board(cls) -> Optional[FlightController]:
for _i in range(5):
Expand Down Expand Up @@ -59,16 +59,16 @@ def detect_serial_flight_controllers() -> List[FlightController]:
FlightController(
name=port.product or port.name,
manufacturer=port.manufacturer,
platform=Detector.detect_serial_platform(port)
platform=FlightControllerDetector.detect_serial_platform(port)
or Platform(), # this is just to make CI happy. check line 82
path=port.device,
)
for port in unique_serial_devices
if Detector.detect_serial_platform(port) is not None
if FlightControllerDetector.detect_serial_platform(port) is not None
]
for port in unique_serial_devices:
for board in boards:
if board.path == port.device and Detector.is_serial_bootloader(port):
if board.path == port.device and FlightControllerDetector.is_serial_bootloader(port):
board.flags.append(FlightControllerFlags.is_bootloader)
return boards

Expand Down Expand Up @@ -97,6 +97,6 @@ async def detect(cls, include_sitl: bool = True) -> List[FlightController]:
available.extend(cls().detect_serial_flight_controllers())

if include_sitl:
available.append(Detector.detect_sitl())
available.append(FlightControllerDetector.detect_sitl())

return available
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from flight_controller_detector.linux.navigator import NavigatorPi4
from typedefs import Platform
from flight_controller import Platform
from flight_controller.detector.linux.navigator import NavigatorPi4


class Argonot(NavigatorPi4):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

from loguru import logger

from flight_controller_detector.linux.argonot import Argonot
from flight_controller_detector.linux.linux_boards import LinuxFlightController
from flight_controller_detector.linux.navigator import NavigatorPi4, NavigatorPi5
from flight_controller.detector.linux.argonot import Argonot
from flight_controller.detector.linux.linux_boards import LinuxFlightController
from flight_controller.detector.linux.navigator import NavigatorPi4, NavigatorPi5


class LinuxFlightControllerDetector:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

from smbus2 import SMBus

from typedefs import FlightController, PlatformType, Serial
from flight_controller import FlightController, PlatformType
from typedefs import Serial


class LinuxFlightController(FlightController):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

from commonwealth.utils.commands import load_file

from flight_controller_detector.linux.linux_boards import LinuxFlightController
from typedefs import Platform, Serial
from flight_controller import Platform
from flight_controller.detector.linux.linux_boards import LinuxFlightController
from typedefs import Serial


class Navigator(LinuxFlightController):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from enum import Enum, auto
from platform import machine
from typing import List, Optional

from pydantic import BaseModel


def get_sitl_platform_name(machine_arch: str) -> str:
"""Get SITL platform name based on machine architecture."""

if "arm" not in machine_arch.lower() and "aarch" not in machine_arch.lower():
return "SITL_x86_64_linux_gnu"
return "SITL_arm_linux_gnueabihf"


# TODO: This class can be deprecated once we move to Python 3.11, which introduces the equivalent StrEnum
class LowerStringEnum(str, Enum):
def __str__(self) -> str:
return self.name.lower()


class PlatformType(LowerStringEnum):
Serial = auto()
Linux = auto()
SITL = auto()
Unknown = auto()


class Platform(str, Enum):
"""Valid Ardupilot platform types.
The Enum values are 1:1 representations of the platforms available on the ArduPilot manifest."""

Pixhawk1 = "Pixhawk1"
Pixhawk4 = "Pixhawk4"
Pixhawk6X = "Pixhawk6X"
Pixhawk6C = "Pixhawk6C"
CubeOrange = "CubeOrange"
GenericSerial = "GenericSerial"
Navigator = "navigator"
Argonot = "argonot"
SITL = get_sitl_platform_name(machine())

@property
def type(self) -> PlatformType:
platform_types = {
Platform.Pixhawk1: PlatformType.Serial,
Platform.Pixhawk4: PlatformType.Serial,
Platform.Pixhawk6X: PlatformType.Serial,
Platform.Pixhawk6C: PlatformType.Serial,
Platform.CubeOrange: PlatformType.Serial,
Platform.GenericSerial: PlatformType.Serial,
Platform.Navigator: PlatformType.Linux,
Platform.Argonot: PlatformType.Linux,
Platform.SITL: PlatformType.SITL,
}
return platform_types.get(self, PlatformType.Unknown)


class FlightControllerFlags(str, Enum):
"""Flags for the Flight-controller class."""

is_bootloader = "is_bootloader"


class FlightController(BaseModel):
"""Flight-controller board."""

name: str
manufacturer: Optional[str]
platform: Platform
path: Optional[str]
flags: List[FlightControllerFlags] = []

@property
def type(self) -> PlatformType:
return self.platform.type
Empty file.
4 changes: 2 additions & 2 deletions core/services/ardupilot_manager/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from args import CommandLineArgs
from autopilot_manager import AutoPilotManager
from flight_controller_detector.Detector import Detector as BoardDetector
from flight_controller.detector.detector import FlightControllerDetector
from settings import SERVICE_NAME

logging.basicConfig(handlers=[InterceptHandler()], level=0)
Expand All @@ -36,7 +36,7 @@
server = Server(config)

if args.sitl:
autopilot.set_preferred_board(BoardDetector.detect_sitl())
autopilot.set_preferred_board(FlightControllerDetector.detect_sitl())
try:
loop.run_until_complete(autopilot.start_ardupilot())
except Exception as start_error:
Expand Down
Loading

0 comments on commit b941c8f

Please sign in to comment.