io::MemoryQueue
A MemoryQueue
is a lock free, single producer single consumer queue intended for communication
between software components that share access to the same memory. It provides much better average
case memory usage for use cases where the data elements transported through the queue vary in size.
It’s nested classes io::MemoryQueue::Writer and io::MemoryQueue::Reader provide access
to the queue.
Properties
Lock free, single producer single consumer queue which provides implementations of io::IWriter and io::IReader through the adapters io::MemoryQueueWriter and io::MemoryQueueReader.
Any successful allocation will always return a contiguous region of memory.
A
MemoryQueue
is full, if notMAX_ELEMENT_SIZE
bytes can be allocated.An allocation can provide between
1
andMAX_ELEMENT_SIZE
bytes and will consume an extrasizeof(SIZE_TYPE)
bytes to store the allocation size.Memory consumption: ~
CAPACITY + 3 * sizeof(size_t)
Differences to Other Queues
A typical lock free, single producer single consumer (SPSC) queue class template would provide a
type like template<class T, size_t N>class Queue;
where T
is the type of elements the queue
can handle and N
is the number of elements the Queue
holds. As an example let’s look at a
queue of CAN-FD frames, that consist of a 32bit id and up to 64 bytes of payload so a total of
68 bytes per frame at most.
A Queue<CanFdFrame, 100>
would use about 6800 bytes of RAM. Now, the 64 bytes maximum payload
length are a worst case, i.e. it can happen and will happen but it’s not the average case. If the
average frame only had 12 bytes of payload a full queue could still only hold 100 frames and only
use 1600 bytes from the total of 6800 bytes. This is a actual payload rate of roughly 24 percent
- or in other words 76 percent of RAM are wasted in the average case.
If we compare this to a MemoryQueue<6800, 68>
this queue can hold at most
6800 / (68 + 2) == 97
full frames but in the average case it can hold 6800 / (16 + 2) == 377
frames with 12 bytes of payload and not waste any memory.
The MemoryQueue
provides chunks of bytes represented as ::estd::slice<uint8_t>
and the user
has to take care about serializing and deserializing the transferred data.
Memory Placement / Cache
Single-Core
The MemoryQueue
can be used for interaction between tasks on the same core. In this case it just
needs to be placed in a memory region both tasks can access. As they are on the same core, no
caching effects can occur.
Multi-Core
If the MemoryQueue
is used for interaction between two cores, it needs to be placed in
non-cached RAM to avoid caching-effects on the other end of the queue, if the data cache is
enabled on at least one of the using cores.
It has to be ensured that all binaries accessing the queue use the same physical memory location for the queue, but only one core must initialize the memory and call the constructor.
It is recommended to construct the queue on the main core before all other cores are started. This prevents accessing the queue too early by design:
Instantiation
MemoryQueue
is a class template with the following parameters:
* \tparam CAPACITY Number of bytes that this MemoryQueue shall provide.
* \tparam MAX_ELEMENT_SIZE Maximum size of one allocation
* \tparam SIZE_TYPE Type used to store size of allocation internally
It can only be constructed using it’s default constructor.
/**
* Returns the capacity of the underlying array of data managed by this MemoryQueue.
*/
static constexpr size_t capacity() { return CAPACITY; }
/**
* Returns the maximum size of one allocation.
*/
static constexpr size_t maxElementSize() { return MAX_ELEMENT_SIZE; }
/**
* Constructs a MemoryQueue of CAPACITY bytes.
*/
MemoryQueue() = default;
Access Through Generic Interfaces
The generic way of accessing a MemoryQueue
is using the adapter classes
io::MemoryQueueReader and io::MemoryQueueWriter.
They are based on the io::IReader and io::IWriter interfaces with virtual
functions, which means that the exact type of the queue does not have to be known.
For intercore communication an instance of io::MemoryQueue is placed into non-cached shared memory. Two software components placed on different cores can communicate via the queue using the interfaces io::IReader and io::IWriter by creating instances of io::MemoryQueueReader and io::MemoryQueueWriter.
io::MemoryQueueWriter
MemoryQueueWriter
is an implementation of io::IWriter providing write access to a
MemoryQueue
. It is a class template with the queue type as parameter:
* \tparam Queue Type of queue to write data to.
The public API of MemoryQueueWriter
is the same as for io::IWriter:
/**
* Constructs a MemoryQueueWriter from given queue.
*/
explicit MemoryQueueWriter(Queue& queue);
/** \see IWriter::maxSize() */
size_t maxSize() const override;
/** \see IWriter::allocate() */
::estd::slice<uint8_t> allocate(size_t size) override;
/** \see IWriter::commit() */
void commit() override;
/** \see IWriter::flush() */
void flush() override;
/** \see Queue::Writer::available() */
size_t available() const;
/** \see Queue::Writer::minAvailable() */
size_t minAvailable() const;
/** \see Queue::Writer::resetMinAvailable() */
void resetMinAvailable();
Statistics
MemoryQueueWriter`
provides an minAvailable() and resetMinAvailable() as a way to gather
statistics about the queue’s usage.
minAvailable() returns the smallest amount of bytes which were available during a call to allocate().
resetMinAvailable() resets this value to the current available() value.
Together, these can be used in projects to adapt the queue sizes based on runtime data.
io::MemoryQueueReader
MemoryQueueReader
is an implementation of io::IReader providing read access to a
MemoryQueue
. It is a class template with the queue type as parameter:
* \tparam Queue Type of queue to read data from.
The public API of MemoryQueueReader
is the same as for io::IReader
/**
* Constructs a MemoryQueueReader from a given queue.
*/
explicit MemoryQueueReader(Queue& queue);
/** \see MemoryQueueReader::maxSize() */
size_t maxSize() const override;
/** \see MemoryQueueReader::peek() */
::estd::slice<uint8_t> peek() const override;
/** \see MemoryQueueReader::release() */
void release() override;
/** \see Queue::Reader::available() */
size_t available() const;
Usage Example
The above example using MemoryQueueWriter
and MemoryQueueReader
would look like this:
1/**
2 * Forwards data from an IReader to a IWriter.
3 * \param source IReader that acts as source of data
4 * \param destination IWriter that acts as sink of data
5 * \return true if data has been forwarded, false otherwise.
6 */
7bool forwardData(::io::IReader& source, ::io::IWriter& destination)
8{
9 auto srcData = source.peek();
10 if (srcData.size() == 0)
11 {
12 // No data available.
13 return false;
14 }
15 // Allocate space in destination
16 auto dstData = destination.allocate(srcData.size());
17 if (dstData.size() == 0)
18 {
19 // Destination is full.
20 return false;
21 }
22 (void)::estd::memory::copy(dstData, srcData);
23 destination.commit();
24 source.release();
25 return true;
26}
27
And the setup code could look like this:
1 using DataStream = ::io::MemoryQueue<1024, 12>;
2 using DataStreamWriter = ::io::MemoryQueueWriter<DataStream>;
3 using DataStreamReader = ::io::MemoryQueueReader<DataStream>;
4
5 DataStream source, destination;
6 auto srcWriter = DataStreamWriter{source};
7 auto srcReader = DataStreamReader{source};
8 auto dstWriter = DataStreamWriter{destination};
9 auto dstReader = DataStreamReader{destination};
10
11 {
12 auto d = srcWriter.allocate(4);
13 ASSERT_EQ(4, d.size());
14 ::estd::memory::take<::estd::be_uint32_t>(d) = 0x1234;
15 srcWriter.commit();
16 }
17
18 ASSERT_TRUE(forwardData(srcReader, dstWriter));
19
20 {
21 auto d = dstReader.peek();
22 ASSERT_EQ(4, d.size());
23 ASSERT_EQ(0x1234, ::estd::memory::take<::estd::be_uint32_t>(d));
24 }
Access Through Nested Classes
If performance matters, the MemoryQueue
can also be accessed directly using it’s
nested classes io::MemoryQueue::Reader and io::MemoryQueue::Writer. This
avoids the virtual functions from the interfaces mentioned above, but the exact type of the queue
has to be known.
For intercore communication an instance of io::MemoryQueue is placed into non-cached shared memory. Two software components placed on different cores can communicate via the queue by creating instances of io::MemoryQueue::Reader and io::MemoryQueue::Writer using the non-virtual API.
io::MemoryQueue::Writer
MemoryQueue::Writer
provides an API without virtual functions to write data into a
MemoryQueue
.
/**
* Constructs a Writer to a given queue.
*/
explicit Writer(MemoryQueue& queue);
/**
* Allocates a requested number of bytes and returns them as a slice. This function can be
* called multiple times before calling commit() allowing to first allocate a worst case
* size slice and then trimming it before calling commit.
*
* \param size Number of bytes to allocate from this MemoryQueue.
* \return - Empty slice, if requested size was greater as MAX_ELEMENT_SIZE or no memory
* is available
* - Slice of size bytes otherwise.
*/
::estd::slice<uint8_t> allocate(size_t size) const;
/**
* Makes the previously allocated data available for the Reader.
*/
void commit();
/**
* Returns the number of contiguous bytes that can be allocated next.
*
* The calculation of used and free bytes relies on the fact that sent and received always
* point to valid indices. That means, they will never point to an index where not at least
* MAX_ELEMENT_SIZE + sizeof(SIZE_TYPE) bytes are the difference to the end of the data
* array.
*/
size_t available() const;
/**
* Returns the minimum number of available bytes at the time of an allocate call since the
* last reset using resetMinAvailable().
*/
size_t minAvailable() const;
/**
* Resets the minimum number of available bytes to the current number of available bytes.
*/
void resetMinAvailable();
/**
* The MemoryQueue is considered full if less than MAX_ELEMENT_SIZE + sizeof(SIZE_TYPE)
* bytes are available as a contiguous piece of memory.
*/
bool full() const;
/**
* Returns the maximum size of which a slice of bytes can be allocated.
*/
size_t maxSize() const;
io::MemoryQueue::Reader
MemoryQueue::Reader
provides an API without virtual functions to read data from a
MemoryQueue
.
/**
* Constructs a Reader from a given queue.
*/
explicit Reader(MemoryQueue& queue);
/**
* Returns true if no data is available.
*
* Calling peek() on an empty queue will return an empty slice.
*/
bool empty() const;
/**
* Returns a slice of bytes pointing to the next memory chunk of the MemoryQueue, if
* available. Calling peek on an empty MemoryQueue will return an empty slice.
*/
::estd::slice<uint8_t> peek() const;
/**
* Releases the first allocated chunk of memory. If the Reader is empty, calling this
* function has no effect.
*/
void release() const;
/**
* Releases all entries until empty() returns true.
*/
void clear() const;
/**
* Returns the maximum size of which a slice of bytes can be read.
*/
size_t maxSize() const;
/**
* Returns the number of contiguous bytes that can be allocated next.
*
* The calculation of used and free bytes relies on the fact that sent and received always
* point to valid indices. That means, they will never point to an index where not at least
* MAX_ELEMENT_SIZE + sizeof(SIZE_TYPE) bytes are the difference to the end of the data
* array.
*/
size_t available() const;
Usage Example
As MemoryQueue
is a class template, user code might also become templated code as shown in the
following example:
1/**
2 * Forwards data from a reader of a MemoryQueue to a writer of another MemoryQueue
3 * \param source Reader of the MemoryQueue that acts as source of data
4 * \param destination Writer of the MemoryQueue that acts as sink of data
5 * \return true if data has been forwarded, false otherwise.
6 */
7template<class Queue>
8bool forwardData(typename Queue::Reader& source, typename Queue::Writer& destination)
9{
10 auto srcData = source.peek();
11 if (srcData.size() == 0)
12 {
13 // No data available.
14 return false;
15 }
16 // Allocate space in destination
17 auto dstData = destination.allocate(srcData.size());
18 if (dstData.size() == 0)
19 {
20 // Destination is full.
21 return false;
22 }
23 (void)::estd::memory::copy(dstData, srcData);
24 destination.commit();
25 source.release();
26 return true;
27}
28
The code instantiating reader and writer could look like this:
1 using DataStream = ::io::MemoryQueue<1024, 12>;
2 using DataStreamWriter = DataStream::Writer;
3 using DataStreamReader = DataStream::Reader;
4
5 DataStream source, destination;
6 auto srcWriter = DataStreamWriter{source};
7 auto srcReader = DataStreamReader{source};
8 auto dstWriter = DataStreamWriter{destination};
9 auto dstReader = DataStreamReader{destination};
10
11 {
12 auto d = srcWriter.allocate(4);
13 ASSERT_EQ(4, d.size());
14 ::estd::memory::take<::estd::be_uint32_t>(d) = 0x1234;
15 srcWriter.commit();
16 }
17
18 ASSERT_TRUE(forwardData<DataStream>(srcReader, dstWriter));
19
20 {
21 auto d = dstReader.peek();
22 ASSERT_EQ(4, d.size());
23 ASSERT_EQ(0x1234, ::estd::memory::take<::estd::be_uint32_t>(d));
24 }