Skip to content

Commit

Permalink
Implement subscription API functions (#46)
Browse files Browse the repository at this point in the history
It is now possible to subscribe to subjects and receive data from them.
The only missing bit is the RPC services API, all other functionality of
the library is already implemented and can be used.

This changeset also brings `udpardGather` that, well, gathers a
fragmented payload buffer into one contiguous chunk of memory that can
be passed into Nunavut. Later we should amend Nunavut to support
fragmented buffers directly to eliminate extra data copying and memory
utilization.

This changeset also fixes an internal documentation error that provided
an incorrect estimation of the worst-case recursion depth.
  • Loading branch information
pavel-kirienko authored Aug 10, 2023
1 parent c92f8df commit c03b3ee
Show file tree
Hide file tree
Showing 8 changed files with 399 additions and 22 deletions.
107 changes: 87 additions & 20 deletions libudpard/udpard.c
Original file line number Diff line number Diff line change
Expand Up @@ -787,10 +787,18 @@ static inline bool rxParseFrame(const struct UdpardMutablePayload datagram_paylo
const bool service = (out->meta.data_specifier & DATA_SPECIFIER_SERVICE_NOT_MESSAGE_MASK) != 0;
const bool single_frame = (out->base.index == 0) && out->base.end_of_transfer;
ok = service ? ((!broadcast) && (!anonymous)) : (broadcast && ((!anonymous) || single_frame));
ok = ok && (out->meta.transfer_id != TRANSFER_ID_UNSET);
}
return ok;
}

static inline bool rxValidateMemoryResources(const struct UdpardRxMemoryResources memory)
{
return (memory.session.allocate != NULL) && (memory.session.deallocate != NULL) &&
(memory.fragment.allocate != NULL) && (memory.fragment.deallocate != NULL) &&
(memory.payload.deallocate != NULL);
}

/// This helper is needed to minimize the risk of argument swapping when passing these two resources around,
/// as they almost always go side by side.
typedef struct
Expand Down Expand Up @@ -886,7 +894,7 @@ struct UdpardInternalRxSession

/// Frees all fragments in the tree and their payload buffers. Destroys the passed fragment.
/// This is meant to be invoked on the root of the tree.
/// The maximum recursion depth is ceil(1.44*log2(FRAME_INDEX_MAX+1)-0.328) = 22 levels.
/// The maximum recursion depth is ceil(1.44*log2(FRAME_INDEX_MAX+1)-0.328) = 45 levels.
// NOLINTNEXTLINE(misc-no-recursion) MISRA C:2012 rule 17.2
static inline void rxFragmentDestroyTree(RxFragment* const self, const RxMemory memory)
{
Expand Down Expand Up @@ -996,7 +1004,7 @@ typedef struct
} RxSlotEjectContext;

/// See rxSlotEject() for details.
/// The maximum recursion depth is ceil(1.44*log2(FRAME_INDEX_MAX+1)-0.328) = 22 levels.
/// The maximum recursion depth is ceil(1.44*log2(FRAME_INDEX_MAX+1)-0.328) = 45 levels.
/// NOLINTNEXTLINE(misc-no-recursion) MISRA C:2012 rule 17.2
static inline void rxSlotEjectFragment(RxFragment* const frag, RxSlotEjectContext* const ctx)
{
Expand Down Expand Up @@ -1464,20 +1472,23 @@ static inline void rxSessionInit(struct UdpardInternalRxSession* const self, con
static inline void rxSessionDestroyTree(struct UdpardInternalRxSession* const self,
const struct UdpardRxMemoryResources memory)
{
for (uint_fast8_t i = 0; i < UDPARD_NETWORK_INTERFACE_COUNT_MAX; i++)
{
rxIfaceFree(&self->ifaces[i], (RxMemory){.fragment = memory.fragment, .payload = memory.payload});
}
for (uint_fast8_t i = 0; i < 2; i++)
if (self != NULL)
{
struct UdpardInternalRxSession* const child = (struct UdpardInternalRxSession*) (void*) self->base.lr[i];
if (child != NULL)
for (uint_fast8_t i = 0; i < UDPARD_NETWORK_INTERFACE_COUNT_MAX; i++)
{
rxIfaceFree(&self->ifaces[i], (RxMemory){.fragment = memory.fragment, .payload = memory.payload});
}
for (uint_fast8_t i = 0; i < 2; i++)
{
UDPARD_ASSERT(child->base.up == &self->base);
rxSessionDestroyTree(child, memory); // NOSONAR recursion
struct UdpardInternalRxSession* const child = (struct UdpardInternalRxSession*) (void*) self->base.lr[i];
if (child != NULL)
{
UDPARD_ASSERT(child->base.up == &self->base);
rxSessionDestroyTree(child, memory); // NOSONAR recursion
}
}
memFree(memory.session, sizeof(struct UdpardInternalRxSession), self);
}
memFree(memory.session, sizeof(struct UdpardInternalRxSession), self);
}

// -------------------------------------------------- RX PORT --------------------------------------------------
Expand Down Expand Up @@ -1624,6 +1635,7 @@ static inline void rxPortInit(struct UdpardRxPort* const self)
static inline void rxPortFree(struct UdpardRxPort* const self, const struct UdpardRxMemoryResources memory)
{
rxSessionDestroyTree(self->sessions, memory);
self->sessions = NULL;
}

// -------------------------------------------------- RX API --------------------------------------------------
Expand All @@ -1642,12 +1654,67 @@ int_fast8_t udpardRxSubscriptionInit(struct UdpardRxSubscription* const self,
const size_t extent,
const struct UdpardRxMemoryResources memory)
{
(void) self;
(void) subject_id;
(void) extent;
(void) memory;
(void) &rxPortAcceptFrame;
(void) &rxPortInit;
(void) &rxPortFree;
return 0;
int_fast8_t result = -UDPARD_ERROR_ARGUMENT;
if ((self != NULL) && (subject_id <= UDPARD_SUBJECT_ID_MAX) && rxValidateMemoryResources(memory))
{
memZero(sizeof(*self), self);
rxPortInit(&self->port);
self->port.extent = extent;
self->udp_ip_endpoint = makeSubjectUDPIPEndpoint(subject_id);
self->memory = memory;
result = 0;
}
return result;
}

void udpardRxSubscriptionFree(struct UdpardRxSubscription* const self)
{
if (self != NULL)
{
rxPortFree(&self->port, self->memory);
}
}

int_fast8_t udpardRxSubscriptionReceive(struct UdpardRxSubscription* const self,
const UdpardMicrosecond timestamp_usec,
const struct UdpardMutablePayload datagram_payload,
const uint_fast8_t redundant_iface_index,
struct UdpardRxTransfer* const out_transfer)
{
int_fast8_t result = -UDPARD_ERROR_ARGUMENT;
if ((self != NULL) && (timestamp_usec != TIMESTAMP_UNSET) && (datagram_payload.data != NULL) &&
(redundant_iface_index < UDPARD_NETWORK_INTERFACE_COUNT_MAX) && (out_transfer != NULL))
{
result = rxPortAcceptFrame(&self->port,
redundant_iface_index,
timestamp_usec,
datagram_payload,
self->memory,
out_transfer);
}
return result;
}

// =====================================================================================================================
// ==================================================== MISC =====================================================
// =====================================================================================================================

size_t udpardGather(const struct UdpardFragment head, const size_t destination_size_bytes, void* const destination)
{
size_t offset = 0;
if (NULL != destination)
{
const struct UdpardFragment* frag = &head;
while ((frag != NULL) && (offset < destination_size_bytes))
{
UDPARD_ASSERT(frag->view.data != NULL);
const size_t frag_size = smaller(frag->view.size, destination_size_bytes - offset);
// NOLINTNEXTLINE(clang-analyzer-security.insecureAPI.DeprecatedOrUnsafeBufferHandling)
(void) memmove(((byte_t*) destination) + offset, frag->view.data, frag_size);
offset += frag_size;
UDPARD_ASSERT(offset <= destination_size_bytes);
frag = frag->next;
}
}
return offset;
}
19 changes: 17 additions & 2 deletions libudpard/udpard.h
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,7 @@ int_fast8_t udpardRxSubscriptionInit(struct UdpardRxSubscription* const self,

/// Frees all memory held by the subscription instance.
/// After invoking this function, the instance is no longer usable.
/// The function has no effect if the instance is NULL.
/// Do not forget to close the sockets that were opened for this subscription.
void udpardRxSubscriptionFree(struct UdpardRxSubscription* const self);

Expand Down Expand Up @@ -872,8 +873,6 @@ void udpardRxSubscriptionFree(struct UdpardRxSubscription* const self);
/// No data copy takes place. Malformed frames are discarded in constant time.
/// Linear time is spent on the CRC verification of the transfer payload when the transfer is complete.
///
/// This function performs log(n) of recursive calls internally, where n is the number of frames in a transfer.
///
/// UDPARD_ERROR_MEMORY is returned if the function fails to allocate memory.
/// UDPARD_ERROR_ARGUMENT is returned if any of the input arguments are invalid.
int_fast8_t udpardRxSubscriptionReceive(struct UdpardRxSubscription* const self,
Expand Down Expand Up @@ -1021,6 +1020,22 @@ int_fast8_t udpardRxRPCDispatcherReceive(struct UdpardRxRPCDispatcher* const sel
const uint_fast8_t redundant_iface_index,
struct UdpardRxRPCTransfer* const out_transfer);

// =====================================================================================================================
// ==================================================== MISC =====================================================
// =====================================================================================================================

/// This helper function takes the head of a fragmented buffer list and copies the data into the contiguous buffer
/// provided by the user. If the total size of all fragments combined exceeds the size of the user-provided buffer,
/// copying will stop early after the buffer is filled, thus truncating the fragmented data short.
///
/// The source list is not modified. Do not forget to free its memory afterward if it was dynamically allocated.
///
/// The function has no effect and returns zero if the destination buffer is NULL.
/// The data pointers in the fragment list shall be valid, otherwise the behavior is undefined.
///
/// Returns the number of bytes copied into the contiguous destination buffer.
size_t udpardGather(const struct UdpardFragment head, const size_t destination_size_bytes, void* const destination);

#ifdef __cplusplus
}
#endif
Expand Down
3 changes: 3 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ endfunction()
gen_test_matrix(test_helpers "src/test_helpers.c")
gen_test_matrix(test_cavl "src/test_cavl.cpp")
gen_test_matrix(test_tx "${library_dir}/udpard.c;src/test_tx.cpp")
gen_test_matrix(test_rx "${library_dir}/udpard.c;src/test_rx.cpp")
gen_test_matrix(test_e2e "${library_dir}/udpard.c;src/test_e2e.cpp")
gen_test_matrix(test_misc "${library_dir}/udpard.c;src/test_misc.cpp")
gen_test_matrix(test_intrusive_crc "src/test_intrusive_crc.c")
gen_test_matrix(test_intrusive_tx "src/test_intrusive_tx.c")
gen_test_matrix(test_intrusive_rx "src/test_intrusive_rx.c")
13 changes: 13 additions & 0 deletions tests/src/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <stdlib.h>
#include <stdio.h>
#include <limits.h>
#include <time.h>

#if !(defined(UDPARD_VERSION_MAJOR) && defined(UDPARD_VERSION_MINOR))
# error "Library version not defined"
Expand Down Expand Up @@ -158,6 +159,18 @@ static inline struct UdpardMemoryDeleter instrumentedAllocatorMakeMemoryDeleter(
return out;
}

static inline void seedRandomNumberGenerator(void)
{
unsigned seed = (unsigned) time(NULL);
const char* const env_var = getenv("RANDOM_SEED");
if (env_var != NULL)
{
seed = (unsigned) atoll(env_var); // Conversion errors are possible but ignored.
}
srand(seed);
(void) fprintf(stderr, "RANDOM_SEED=%u\n", seed);
}

#ifdef __cplusplus
}
#endif
24 changes: 24 additions & 0 deletions tests/src/test_e2e.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/// This software is distributed under the terms of the MIT License.
/// Copyright (C) OpenCyphal Development Team <opencyphal.org>
/// Copyright Amazon.com Inc. or its affiliates.
/// SPDX-License-Identifier: MIT

#include <unity.h>

namespace
{

// Here be dragons.

} // namespace

void setUp() {}

void tearDown() {}

int main()
{
UNITY_BEGIN();
// TODO
return UNITY_END();
}
10 changes: 10 additions & 0 deletions tests/src/test_intrusive_rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,15 @@ static void testParseFrameEmpty(void)
TEST_ASSERT_FALSE(rxParseFrame((struct UdpardMutablePayload){.data = "", .size = 0}, &rxf));
}

static void testParseFrameInvalidTransferID(void)
{
byte_t data[] = {1, 2, 41, 9, 56, 21, 230, 29, 255, 255, 255, 255,
255, 255, 255, 255, 57, 48, 0, 0, 0, 0, 42, 107, //
'a', 'b', 'c'};
RxFrame rxf = {0};
TEST_ASSERT_FALSE(rxParseFrame((struct UdpardMutablePayload){.data = data, .size = sizeof(data)}, &rxf));
}

// -------------------------------------------------- SLOT --------------------------------------------------

static void testSlotRestartEmpty(void)
Expand Down Expand Up @@ -2395,6 +2404,7 @@ int main(void)
RUN_TEST(testParseFrameUnknownHeaderVersion);
RUN_TEST(testParseFrameHeaderWithoutPayload);
RUN_TEST(testParseFrameEmpty);
RUN_TEST(testParseFrameInvalidTransferID);
// slot
RUN_TEST(testSlotRestartEmpty);
RUN_TEST(testSlotRestartNonEmpty);
Expand Down
83 changes: 83 additions & 0 deletions tests/src/test_misc.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/// This software is distributed under the terms of the MIT License.
/// Copyright (C) OpenCyphal Development Team <opencyphal.org>
/// Copyright Amazon.com Inc. or its affiliates.
/// SPDX-License-Identifier: MIT

#include <udpard.h>
#include "helpers.h"
#include "hexdump.hpp"
#include <unity.h>
#include <iostream>
#include <array>
#include <algorithm>

namespace
{
void testGather()
{
const std::string_view payload =
"It's very simple. The attacker must first transform themselves into life forms that can survive in a "
"low-dimensional universe. For instance, a four-dimensional species can transform itself into "
"three-dimensional creatures, or a three-dimensional species can transform itself into two-dimensional life. "
"After the entire civilization has entered a lower dimension, they can initiate a dimensional strike against "
"the enemy without concern for the consequences.";

std::array<UdpardFragment, 4> frags{{}};
frags.at(0).next = &frags.at(1);
frags.at(1).next = &frags.at(2);
frags.at(2).next = &frags.at(3);
frags.at(3).next = nullptr;

frags.at(0).view.data = payload.data();
frags.at(0).view.size = 100;

frags.at(1).view.data = payload.data() + frags.at(0).view.size;
frags.at(1).view.size = 100;

frags.at(2).view.data = payload.data() + frags.at(1).view.size + frags.at(0).view.size;
frags.at(2).view.size = 0; // Edge case.

frags.at(3).view.data = payload.data() + frags.at(2).view.size + frags.at(1).view.size + frags.at(0).view.size;
frags.at(3).view.size = payload.size() - frags.at(2).view.size - frags.at(1).view.size - frags.at(0).view.size;

std::array<std::uint8_t, 1024> mono{};

// Copy full size payload.
std::generate(mono.begin(), mono.end(), [] { return std::rand() % 256; });
TEST_ASSERT_EQUAL(payload.size(), udpardGather(frags.at(0), mono.size(), mono.data()));
TEST_ASSERT_EQUAL_MEMORY(payload.data(), mono.data(), payload.size());

// Truncation mid-fragment.
std::generate(mono.begin(), mono.end(), [] { return std::rand() % 256; });
TEST_ASSERT_EQUAL(150, udpardGather(frags.at(0), 150, mono.data()));
TEST_ASSERT_EQUAL_MEMORY(payload.data(), mono.data(), 150);

// Truncation at the fragment boundary.
std::generate(mono.begin(), mono.end(), [] { return std::rand() % 256; });
TEST_ASSERT_EQUAL(200, udpardGather(frags.at(0), 200, mono.data()));
TEST_ASSERT_EQUAL_MEMORY(payload.data(), mono.data(), 200);

// Empty destination.
mono.fill(0xA5);
TEST_ASSERT_EQUAL(0, udpardGather(frags.at(0), 0, mono.data()));
TEST_ASSERT_EQUAL(0, std::count_if(mono.begin(), mono.end(), [](const auto x) { return x != 0xA5; }));

// Edge cases.
TEST_ASSERT_EQUAL(0, udpardGather(frags.at(0), 0, nullptr));
TEST_ASSERT_EQUAL(0, udpardGather(frags.at(0), 100, nullptr));
}
} // namespace

void setUp()
{
seedRandomNumberGenerator(); // Re-seed the RNG for each test to avoid coupling.
}

void tearDown() {}

int main()
{
UNITY_BEGIN();
RUN_TEST(testGather);
return UNITY_END();
}
Loading

0 comments on commit c03b3ee

Please sign in to comment.