io::MemoryQueue

A MemoryQueue is a lock free, single producer single consumer queue intended for communication between software components that share access to the same memory. It provides much better average case memory usage for use cases where the data elements transported through the queue vary in size. It’s nested classes io::MemoryQueue::Writer and io::MemoryQueue::Reader provide access to the queue.

Properties

  • Lock free, single producer single consumer queue which provides implementations of io::IWriter and io::IReader through the adapters io::MemoryQueueWriter and io::MemoryQueueReader.

  • Any successful allocation will always return a contiguous region of memory.

  • A MemoryQueue is full, if not MAX_ELEMENT_SIZE bytes can be allocated.

  • An allocation can provide between 1 and MAX_ELEMENT_SIZE bytes and will consume an extra sizeof(SIZE_TYPE) bytes to store the allocation size.

  • Memory consumption: ~ CAPACITY + 3 * sizeof(size_t)

Differences to Other Queues

A typical lock free, single producer single consumer (SPSC) queue class template would provide a type like template<class T, size_t N>class Queue; where T is the type of elements the queue can handle and N is the number of elements the Queue holds. As an example let’s look at a queue of CAN-FD frames, that consist of a 32bit id and up to 64 bytes of payload so a total of 68 bytes per frame at most.

A Queue<CanFdFrame, 100> would use about 6800 bytes of RAM. Now, the 64 bytes maximum payload length are a worst case, i.e. it can happen and will happen but it’s not the average case. If the average frame only had 12 bytes of payload a full queue could still only hold 100 frames and only use 1600 bytes from the total of 6800 bytes. This is a actual payload rate of roughly 24 percent - or in other words 76 percent of RAM are wasted in the average case.

If we compare this to a MemoryQueue<6800, 68> this queue can hold at most 6800 / (68 + 2) == 97 full frames but in the average case it can hold 6800 / (16 + 2) == 377 frames with 12 bytes of payload and not waste any memory.

The MemoryQueue provides chunks of bytes represented as ::estd::slice<uint8_t> and the user has to take care about serializing and deserializing the transferred data.

Memory Placement / Cache

Single-Core

The MemoryQueue can be used for interaction between tasks on the same core. In this case it just needs to be placed in a memory region both tasks can access. As they are on the same core, no caching effects can occur.

Multi-Core

If the MemoryQueue is used for interaction between two cores, it needs to be placed in non-cached RAM to avoid caching-effects on the other end of the queue, if the data cache is enabled on at least one of the using cores.

It has to be ensured that all binaries accessing the queue use the same physical memory location for the queue, but only one core must initialize the memory and call the constructor.

It is recommended to construct the queue on the main core before all other cores are started. This prevents accessing the queue too early by design:

participant core0 order 0
participant MemoryQueue order 1
participant core1 order 2

note over core0, core1: Access to MemoryQueue not allowed yet

core0 -> MemoryQueue: Placement new

note over core0, core1: Access allowed from core0

core0 -> core1: Start core
core1 -> MemoryQueue: Reinterpret memory region

note over core0, core1: Access allowed from both cores

core0 -> MemoryQueue: Access
core1 -> MemoryQueue: Access

Instantiation

MemoryQueue is a class template with the following parameters:

 * \tparam CAPACITY Number of bytes that this MemoryQueue shall provide.
 * \tparam MAX_ELEMENT_SIZE Maximum size of one allocation
 * \tparam SIZE_TYPE Type used to store size of allocation internally

It can only be constructed using it’s default constructor.

/**
 * Returns the capacity of the underlying array of data managed by this MemoryQueue.
 */
static constexpr size_t capacity() { return CAPACITY; }

/**
 * Returns the maximum size of one allocation.
 */
static constexpr size_t maxElementSize() { return MAX_ELEMENT_SIZE; }

/**
 * Constructs a MemoryQueue of CAPACITY bytes.
 */
MemoryQueue() = default;

Access Through Generic Interfaces

The generic way of accessing a MemoryQueue is using the adapter classes io::MemoryQueueReader and io::MemoryQueueWriter. They are based on the io::IReader and io::IWriter interfaces with virtual functions, which means that the exact type of the queue does not have to be known.

For intercore communication an instance of io::MemoryQueue is placed into non-cached shared memory. Two software components placed on different cores can communicate via the queue using the interfaces io::IReader and io::IWriter by creating instances of io::MemoryQueueReader and io::MemoryQueueWriter.

top to bottom direction

frame core0 {
    actor SWC1
    interface IReader as IReaderC0
    interface IWriter as IWriterC0
    agent MemoryQueueReader as MQRC0
    agent MemoryQueueWriter as MQWC0
}
cloud SharedMemory {
    queue MemoryQueue1 as MQ1
    queue MemoryQueue2 as MQ2
}
note bottom of SharedMemory : non-cached
frame core1 {
    actor SWC2
    interface IReader as IReaderC1
    interface IWriter as IWriterC1
    agent MemoryQueueReader as MQRC1
    agent MemoryQueueWriter as MQWC1
}

core0 -[hidden] SharedMemory

MQ2 - MQWC0
MQ1 - MQWC1
MQ1 - MQRC0
MQ2 - MQRC1

SWC1 -d-( IReaderC0
MQRC0 -u- IReaderC0
SWC1 -u-( IWriterC0
MQWC0 -- IWriterC0

SWC2 -d-( IReaderC1
MQRC1 -u- IReaderC1
SWC2 -u-( IWriterC1
MQWC1 -- IWriterC1

io::MemoryQueueWriter

MemoryQueueWriter is an implementation of io::IWriter providing write access to a MemoryQueue. It is a class template with the queue type as parameter:

 * \tparam Queue Type of queue to write data to.

The public API of MemoryQueueWriter is the same as for io::IWriter:

/**
 * Constructs a MemoryQueueWriter from given queue.
 */
explicit MemoryQueueWriter(Queue& queue);

/** \see IWriter::maxSize() */
size_t maxSize() const override;

/** \see IWriter::allocate() */
::estd::slice<uint8_t> allocate(size_t size) override;

/** \see IWriter::commit() */
void commit() override;

/** \see IWriter::flush() */
void flush() override;

/** \see Queue::Writer::available() */
size_t available() const;

/** \see Queue::Writer::minAvailable() */
size_t minAvailable() const;

/** \see Queue::Writer::resetMinAvailable() */
void resetMinAvailable();

Statistics

MemoryQueueWriter` provides an minAvailable() and resetMinAvailable() as a way to gather statistics about the queue’s usage.

minAvailable() returns the smallest amount of bytes which were available during a call to allocate().

resetMinAvailable() resets this value to the current available() value.

Together, these can be used in projects to adapt the queue sizes based on runtime data.

io::MemoryQueueReader

MemoryQueueReader is an implementation of io::IReader providing read access to a MemoryQueue. It is a class template with the queue type as parameter:

 * \tparam Queue Type of queue to read data from.

The public API of MemoryQueueReader is the same as for io::IReader

/**
 * Constructs a MemoryQueueReader from a given queue.
 */
explicit MemoryQueueReader(Queue& queue);

/** \see MemoryQueueReader::maxSize() */
size_t maxSize() const override;

/** \see MemoryQueueReader::peek() */
::estd::slice<uint8_t> peek() const override;

/** \see MemoryQueueReader::release() */
void release() override;

/** \see Queue::Reader::available() */
size_t available() const;

Usage Example

The above example using MemoryQueueWriter and MemoryQueueReader would look like this:

 1/**
 2 * Forwards data from an IReader to a IWriter.
 3 * \param source    IReader that acts as source of data
 4 * \param destination   IWriter that acts as sink of data
 5 * \return true if data has been forwarded, false otherwise.
 6 */
 7bool forwardData(::io::IReader& source, ::io::IWriter& destination)
 8{
 9    auto srcData = source.peek();
10    if (srcData.size() == 0)
11    {
12        // No data available.
13        return false;
14    }
15    // Allocate space in destination
16    auto dstData = destination.allocate(srcData.size());
17    if (dstData.size() == 0)
18    {
19        // Destination is full.
20        return false;
21    }
22    (void)::estd::memory::copy(dstData, srcData);
23    destination.commit();
24    source.release();
25    return true;
26}
27

And the setup code could look like this:

 1    using DataStream       = ::io::MemoryQueue<1024, 12>;
 2    using DataStreamWriter = ::io::MemoryQueueWriter<DataStream>;
 3    using DataStreamReader = ::io::MemoryQueueReader<DataStream>;
 4
 5    DataStream source, destination;
 6    auto srcWriter = DataStreamWriter{source};
 7    auto srcReader = DataStreamReader{source};
 8    auto dstWriter = DataStreamWriter{destination};
 9    auto dstReader = DataStreamReader{destination};
10
11    {
12        auto d = srcWriter.allocate(4);
13        ASSERT_EQ(4, d.size());
14        ::estd::memory::take<::estd::be_uint32_t>(d) = 0x1234;
15        srcWriter.commit();
16    }
17
18    ASSERT_TRUE(forwardData(srcReader, dstWriter));
19
20    {
21        auto d = dstReader.peek();
22        ASSERT_EQ(4, d.size());
23        ASSERT_EQ(0x1234, ::estd::memory::take<::estd::be_uint32_t>(d));
24    }

Access Through Nested Classes

If performance matters, the MemoryQueue can also be accessed directly using it’s nested classes io::MemoryQueue::Reader and io::MemoryQueue::Writer. This avoids the virtual functions from the interfaces mentioned above, but the exact type of the queue has to be known.

For intercore communication an instance of io::MemoryQueue is placed into non-cached shared memory. Two software components placed on different cores can communicate via the queue by creating instances of io::MemoryQueue::Reader and io::MemoryQueue::Writer using the non-virtual API.

top to bottom direction

frame core0 {
    actor SWC1
    agent "MemoryQueue::Reader" as MQRC0
    agent "MemoryQueue::Writer" as MQWC0
}
cloud SharedMemory {
    queue MemoryQueue1 as MQ1
    queue MemoryQueue2 as MQ2
}
note bottom of SharedMemory : non-cached
frame core1 {
    actor SWC2
    agent "MemoryQueue::Reader" as MQRC1
    agent "MemoryQueue::Writer" as MQWC1
}

core0 -[hidden] SharedMemory

MQ2 - MQWC0
MQ1 - MQWC1
MQ1 - MQRC0
MQ2 - MQRC1

SWC1 -d- MQRC0
SWC1 -u- MQWC0

SWC2 -d- MQRC1
SWC2 -u- MQWC1

io::MemoryQueue::Writer

MemoryQueue::Writer provides an API without virtual functions to write data into a MemoryQueue.

/**
 * Constructs a Writer to a given queue.
 */
explicit Writer(MemoryQueue& queue);

/**
 * Allocates a requested number of bytes and returns them as a slice. This function can be
 * called multiple times before calling commit() allowing to first allocate a worst case
 * size slice and then trimming it before calling commit.
 *
 * \param size  Number of bytes to allocate from this MemoryQueue.
 * \return  - Empty slice, if requested size was greater as MAX_ELEMENT_SIZE or no memory
 *            is available
 *          - Slice of size bytes otherwise.
 */
::estd::slice<uint8_t> allocate(size_t size) const;

/**
 * Makes the previously allocated data available for the Reader.
 */
void commit();

/**
 * Returns the number of contiguous bytes that can be allocated next.
 *
 * The calculation of used and free bytes relies on the fact that sent and received always
 * point to valid indices. That means, they will never point to an index where not at least
 * MAX_ELEMENT_SIZE + sizeof(SIZE_TYPE) bytes are the difference to the end of the data
 * array.
 */
size_t available() const;

/**
 * Returns the minimum number of available bytes at the time of an allocate call since the
 * last reset using resetMinAvailable().
 */
size_t minAvailable() const;

/**
 * Resets the minimum number of available bytes to the current number of available bytes.
 */
void resetMinAvailable();

/**
 * The MemoryQueue is considered full if less than MAX_ELEMENT_SIZE + sizeof(SIZE_TYPE)
 * bytes are available as a contiguous piece of memory.
 */
bool full() const;

/**
 * Returns the maximum size of which a slice of bytes can be allocated.
 */
size_t maxSize() const;

io::MemoryQueue::Reader

MemoryQueue::Reader provides an API without virtual functions to read data from a MemoryQueue.

/**
 * Constructs a Reader from a given queue.
 */
explicit Reader(MemoryQueue& queue);

/**
 * Returns true if no data is available.
 *
 * Calling peek() on an empty queue will return an empty slice.
 */
bool empty() const;

/**
 * Returns a slice of bytes pointing to the next memory chunk of the MemoryQueue, if
 * available. Calling peek on an empty MemoryQueue will return an empty slice.
 */
::estd::slice<uint8_t> peek() const;

/**
 * Releases the first allocated chunk of memory. If the Reader is empty, calling this
 * function has no effect.
 */
void release() const;

/**
 * Releases all entries until empty() returns true.
 */
void clear() const;

/**
 * Returns the maximum size of which a slice of bytes can be read.
 */
size_t maxSize() const;

/**
 * Returns the number of contiguous bytes that can be allocated next.
 *
 * The calculation of used and free bytes relies on the fact that sent and received always
 * point to valid indices. That means, they will never point to an index where not at least
 * MAX_ELEMENT_SIZE + sizeof(SIZE_TYPE) bytes are the difference to the end of the data
 * array.
 */
size_t available() const;

Usage Example

As MemoryQueue is a class template, user code might also become templated code as shown in the following example:

 1/**
 2 * Forwards data from a reader of a MemoryQueue to a writer of another MemoryQueue
 3 * \param source    Reader of the MemoryQueue that acts as source of data
 4 * \param destination   Writer of the MemoryQueue that acts as sink of data
 5 * \return true if data has been forwarded, false otherwise.
 6 */
 7template<class Queue>
 8bool forwardData(typename Queue::Reader& source, typename Queue::Writer& destination)
 9{
10    auto srcData = source.peek();
11    if (srcData.size() == 0)
12    {
13        // No data available.
14        return false;
15    }
16    // Allocate space in destination
17    auto dstData = destination.allocate(srcData.size());
18    if (dstData.size() == 0)
19    {
20        // Destination is full.
21        return false;
22    }
23    (void)::estd::memory::copy(dstData, srcData);
24    destination.commit();
25    source.release();
26    return true;
27}
28

The code instantiating reader and writer could look like this:

 1    using DataStream       = ::io::MemoryQueue<1024, 12>;
 2    using DataStreamWriter = DataStream::Writer;
 3    using DataStreamReader = DataStream::Reader;
 4
 5    DataStream source, destination;
 6    auto srcWriter = DataStreamWriter{source};
 7    auto srcReader = DataStreamReader{source};
 8    auto dstWriter = DataStreamWriter{destination};
 9    auto dstReader = DataStreamReader{destination};
10
11    {
12        auto d = srcWriter.allocate(4);
13        ASSERT_EQ(4, d.size());
14        ::estd::memory::take<::estd::be_uint32_t>(d) = 0x1234;
15        srcWriter.commit();
16    }
17
18    ASSERT_TRUE(forwardData<DataStream>(srcReader, dstWriter));
19
20    {
21        auto d = dstReader.peek();
22        ASSERT_EQ(4, d.size());
23        ASSERT_EQ(0x1234, ::estd::memory::take<::estd::be_uint32_t>(d));
24    }