Skip to content

Commit

Permalink
Implement RPC API #sonar
Browse files Browse the repository at this point in the history
  • Loading branch information
pavel-kirienko committed Aug 10, 2023
1 parent c03b3ee commit c3f881e
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 10 deletions.
147 changes: 145 additions & 2 deletions libudpard/udpard.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ typedef struct

#define DATA_SPECIFIER_SERVICE_NOT_MESSAGE_MASK 0x8000U
#define DATA_SPECIFIER_SERVICE_REQUEST_NOT_RESPONSE_MASK 0x4000U
#define DATA_SPECIFIER_SERVICE_ID_MASK 0x3FFFU

#define HEADER_SIZE_BYTES 24U
#define HEADER_VERSION 1U
Expand Down Expand Up @@ -1638,6 +1639,21 @@ static inline void rxPortFree(struct UdpardRxPort* const self, const struct Udpa
self->sessions = NULL;
}

static inline int_fast8_t rxRPCSearch(void* const user_reference, // NOSONAR Cavl API requires non-const.
const struct UdpardTreeNode* node)
{
UDPARD_ASSERT((user_reference != NULL) && (node != NULL));
return compare32(((const struct UdpardRxRPC*) user_reference)->service_id,
((const struct UdpardRxRPC*) node)->service_id);
}

static inline int_fast8_t rxRPCSearchByServiceID(void* const user_reference, // NOSONAR Cavl API requires non-const.
const struct UdpardTreeNode* node)
{
UDPARD_ASSERT((user_reference != NULL) && (node != NULL));
return compare32(*(const UdpardPortID*) user_reference, ((const struct UdpardRxRPC*) node)->service_id);
}

// -------------------------------------------------- RX API --------------------------------------------------

void udpardRxFragmentFree(const struct UdpardFragment head,
Expand Down Expand Up @@ -1681,16 +1697,143 @@ int_fast8_t udpardRxSubscriptionReceive(struct UdpardRxSubscription* const self,
const uint_fast8_t redundant_iface_index,
struct UdpardRxTransfer* const out_transfer)
{
int_fast8_t result = -UDPARD_ERROR_ARGUMENT;
bool release = true;
int_fast8_t result = -UDPARD_ERROR_ARGUMENT;
if ((self != NULL) && (timestamp_usec != TIMESTAMP_UNSET) && (datagram_payload.data != NULL) &&
(redundant_iface_index < UDPARD_NETWORK_INTERFACE_COUNT_MAX) && (out_transfer != NULL))
{
result = rxPortAcceptFrame(&self->port,
result = rxPortAcceptFrame(&self->port,
redundant_iface_index,
timestamp_usec,
datagram_payload,
self->memory,
out_transfer);
release = false;
}
if ((self != NULL) && release)
{
memFreePayload(self->memory.payload, datagram_payload);
}
return result;
}

int_fast8_t udpardRxRPCDispatcherInit(struct UdpardRxRPCDispatcher* const self,
const UdpardNodeID local_node_id,
const struct UdpardRxMemoryResources memory)
{
int_fast8_t result = -UDPARD_ERROR_ARGUMENT;
if ((self != NULL) && (local_node_id <= UDPARD_NODE_ID_MAX) && rxValidateMemoryResources(memory))
{
memZero(sizeof(*self), self);
self->udp_ip_endpoint = makeServiceUDPIPEndpoint(local_node_id);
self->memory = memory;
self->request_ports = NULL;
self->response_ports = NULL;
result = 0;
}
return result;
}

int_fast8_t udpardRxRPCDispatcherListen(struct UdpardRxRPCDispatcher* const self,
struct UdpardRxRPC* const service,
const UdpardPortID service_id,
const bool is_request,
const size_t extent)
{
int_fast8_t result = -UDPARD_ERROR_ARGUMENT;
if ((self != NULL) && (service != NULL) && (service_id <= UDPARD_SERVICE_ID_MAX))
{
const int_fast8_t cancel_result = udpardRxRPCDispatcherCancel(self, service_id, is_request);
UDPARD_ASSERT((cancel_result == 0) || (cancel_result == 1)); // We already checked the arguments.
memZero(sizeof(*service), service);
service->service_id = service_id;
rxPortInit(&service->port);
service->port.extent = extent;
service->user_reference = NULL;
// Insert the newly initialized service into the tree.
const struct UdpardTreeNode* const item = cavlSearch(is_request ? &self->request_ports : &self->response_ports,
service,
&rxRPCSearch,
&avlTrivialFactory);
UDPARD_ASSERT((item != NULL) && (item == &service->base));
(void) item;
result = (cancel_result > 0) ? 0 : 1;
}
return result;
}

int_fast8_t udpardRxRPCDispatcherCancel(struct UdpardRxRPCDispatcher* const self,
const UdpardPortID service_id,
const bool is_request)
{
int_fast8_t result = -UDPARD_ERROR_ARGUMENT;
if ((self != NULL) && (service_id <= UDPARD_SERVICE_ID_MAX))
{
UdpardPortID service_id_mutable = service_id;
struct UdpardTreeNode** const root = is_request ? &self->request_ports : &self->response_ports;
struct UdpardRxRPC* const item =
(struct UdpardRxRPC*) cavlSearch(root, &service_id_mutable, &rxRPCSearchByServiceID, NULL);
if (item != NULL)
{
cavlRemove(root, &item->base);
rxPortFree(&item->port, self->memory);
}
result = (item == NULL) ? 0 : 1;
}
return result;
}

int_fast8_t udpardRxRPCDispatcherReceive(struct UdpardRxRPCDispatcher* const self,
const UdpardMicrosecond timestamp_usec,
const struct UdpardMutablePayload datagram_payload,
const uint_fast8_t redundant_iface_index,
struct UdpardRxRPC** const out_service,
struct UdpardRxRPCTransfer* const out_transfer)
{
bool release = true;
int_fast8_t result = -UDPARD_ERROR_ARGUMENT;
if ((self != NULL) && (timestamp_usec != TIMESTAMP_UNSET) && (datagram_payload.data != NULL) &&
(redundant_iface_index < UDPARD_NETWORK_INTERFACE_COUNT_MAX) && (out_transfer != NULL))
{
result = 0; // Invalid frames cannot complete a transfer, so zero is the new default.
RxFrame frame = {0};
if (rxParseFrame(datagram_payload, &frame) &&
((frame.meta.data_specifier & DATA_SPECIFIER_SERVICE_NOT_MESSAGE_MASK) != 0))
{
// Service transfers cannot be anonymous. This is enforced by the rxParseFrame function; we re-check this.
UDPARD_ASSERT(frame.meta.src_node_id != UDPARD_NODE_ID_UNSET);
// Extract the data specifier from the frame. Update the transfer object even if no transfer is completed.
out_transfer->is_request =
(frame.meta.data_specifier & DATA_SPECIFIER_SERVICE_REQUEST_NOT_RESPONSE_MASK) != 0;
out_transfer->service_id = frame.meta.data_specifier & DATA_SPECIFIER_SERVICE_ID_MASK;
// Search for the RPC-port that is registered for this service transfer in the tree.
struct UdpardRxRPC* const item =
(struct UdpardRxRPC*) cavlSearch(out_transfer->is_request ? &self->request_ports
: &self->response_ports,
&out_transfer->service_id,
&rxRPCSearchByServiceID,
NULL);
// If such a port is found, accept the frame on it.
if (item != NULL)
{
result = rxPortAccept(&item->port,
redundant_iface_index,
timestamp_usec,
frame,
self->memory,
&out_transfer->base);
release = false;
}
// Expose the RPC-service instance to the caller if requested.
if (out_service != NULL)
{
*out_service = item;
}
}
}
if ((self != NULL) && release)
{
memFreePayload(self->memory.payload, datagram_payload);
}
return result;
}
Expand Down
22 changes: 14 additions & 8 deletions libudpard/udpard.h
Original file line number Diff line number Diff line change
Expand Up @@ -924,8 +924,8 @@ struct UdpardRxRPCDispatcher
struct UdpardRxMemoryResources memory;

/// READ-ONLY
struct UdpardRxRPC* request_ports;
struct UdpardRxRPC* response_ports;
struct UdpardTreeNode* request_ports;
struct UdpardTreeNode* response_ports;
};

/// Represents a received Cyphal RPC-service transfer -- either request or response.
Expand Down Expand Up @@ -956,6 +956,10 @@ struct UdpardRxRPCTransfer
/// udpardRxRPCDispatcherReceive, along with the index of the redundant interface
/// the datagram was received on. Only those services that were announced in step 4 will be processed.
///
/// There is no resource deallocation function ("free") for the RPC dispatcher. This is because the dispatcher
/// does not own any resources. To dispose of a dispatcher safely, the application shall invoke
/// udpardRxRPCDispatcherCancel for each RPC-service port on that dispatcher.
///
/// The return value is 0 on success.
/// The return value is a negated UDPARD_ERROR_ARGUMENT if any of the input arguments are invalid.
///
Expand All @@ -964,11 +968,6 @@ int_fast8_t udpardRxRPCDispatcherInit(struct UdpardRxRPCDispatcher* const self,
const UdpardNodeID local_node_id,
const struct UdpardRxMemoryResources memory);

/// Frees all memory held by the RPC-service dispatcher instance.
/// After invoking this function, the instance is no longer usable.
/// Do not forget to close the sockets that were opened for this instance.
void udpardRxRPCDispatcherFree(struct UdpardRxRPCDispatcher* const self);

/// This function lets the application register its interest in a particular service-ID and kind (request/response)
/// by creating an RPC-service RX port. The service pointer shall retain validity until its unregistration or until
/// the dispatcher is destroyed. The service instance shall not be moved or destroyed.
Expand Down Expand Up @@ -1013,11 +1012,18 @@ int_fast8_t udpardRxRPCDispatcherCancel(struct UdpardRxRPCDispatcher* const self
/// Datagrams received from the sockets of this service dispatcher are fed into this function.
/// It is the analog of udpardRxSubscriptionReceive for RPC-service transfers.
/// Please refer to the documentation of udpardRxSubscriptionReceive for the usage information.
///
/// The "out_service" pointer-to-pointer can be used to retrieve the specific UdpardRxRPC instance that was used to
/// process the received transfer. Remember that each UdpardRxRPC instance has a user reference field,
/// which in combination with this feature can be used to construct OOP interfaces on top of the library.
/// If this is not needed, the pointer-to-pointer can be NULL.
///
/// The memory pointed to by out_transfer may be mutated arbitrarily if no transfer is completed.
int_fast8_t udpardRxRPCDispatcherReceive(struct UdpardRxRPCDispatcher* const self,
struct UdpardRxRPC** const service,
const UdpardMicrosecond timestamp_usec,
const struct UdpardMutablePayload datagram_payload,
const uint_fast8_t redundant_iface_index,
struct UdpardRxRPC** const out_service,
struct UdpardRxRPCTransfer* const out_transfer);

// =====================================================================================================================
Expand Down

0 comments on commit c3f881e

Please sign in to comment.