Sensor Message Publishing#

Overview top#

One of the main functionalities of SVL Simulator is its ability to work with third-party tools, which include autonomous driving systems. Simulator provides output from virtual sensors that can be received and parsed by tools like Apollo or Autoware.Auto. Third-party systems can then provide instructions back to Simulator, which will execute them on virtual vehicle. The whole communication aims to be indistinguishable from its real-world equivalent, which means that each message sent and received by Simulator follows the data format expected by third party tools.

Since multiple bridge plugins are supported, all the sensor plugins have to be bridge-agnostic. Plugins can check if the bridge is connected and decide when and what to send, but do not know the bridge type, and therefore its expected format. This splits responsibility for publishing a message between two plugin types:

  • Sensor prepares data in bridge-agnostic format and requests for it to be sent
  • Bridge converts message to bridge-specific format and sends it

This article covers subjects of creating and dispatching bridge messages from sensor plugins. For details about creating new sensor plugins, see sensor plugins page.

If you're creating a bridge plugin and want to make sure that sensor data is properly handled, see sensor plugins page.

Sensor data types top#

Data types for sensors don't have to follow any particular guidelines, but next to the data that sensor produces, you should consider including fields that might be required by some bridge plugins. Often used examples include timestamp or sequential message number. You do not need to worry about how bridge plugin will consume this data - just make sure all the fields have public read accessors.

If your sensor produces small amount of data that's easy to convert, you can decide to call the publishing delegate directly and perform the whole operation synchronously. If instead you opt to use asynchronous approach (see asynchronous publishing), you have to be aware about including any reused, heap-allocated resources (e.g. single large buffer included in each message), since the order of updating and accessing data from multiple threads will not be defined.

Probably the most common examples that would call for reused resources are large array buffers. Of course it's possible to allocate completely new array for each message, but this would add unnecessary work for garbage collector and could affect performance. You might consider using pool of pre-allocated buffers, and while it's certainly possible to do so, you would have to depend on callbacks to track which buffers are actively in use. Fortunately, there is a built-in, simpler way to handle this kinds of cases.

If you want to reuse resources and use asynchronous publishing, your data type should be a class, provide parameterless constructor, and implement the IThreadCachedBridgeData interface. This interface defines two methods: - CopyToCache - your implementation should perform a full deep copy of current instance data into the container of the same type, provided through parameter. Provided target instance is part of a pooling system that will make sure this data will not be accessed again until conversion and publishing on the bridge side are complete. This means only one thread will ever access it at a time, and you don't have to worry about thread safety. - GetHash - your implementation should return value that groups instances into compatible sub-pools. Returning one value for all instances is valid if you don't need sub-pools. If some instances are not compatible (e.g. they use pre-allocated buffers with different sizes), this should return value calculated based on incompatible parameters.

Implementing IThreadCachedBridgeData interface is enough to enable thread-side caching functionality and no further changes are required on sensor side, as long as you're using BridgeMessageDispatcher (see asynchronous publishing section for details).

The instance provided by IThreadCachedBridgeData interface is persistent, pooled resource. Depending on previous usage, its fields may or may not be initialized - make sure to clear any optional fields. If you're using arrays, consider either storing valid data size in separate parameter (it would allow you to use larger arrays to store varying amounts of data without the need to reallocate them every single time) or using GetHash method to split instances into separate sub-pools based on array size.

For an example of IThreadCachedBridgeData interface usage, see Simulator.Bridge.Data.PointCloudData type in Simulator repository. This example includes good practices described above.

Asynchronous publishing top#

When you expect the conversion process to take a long time, you might want to avoid occupying main thread of the application, and perform the process asynchronously. You might also consider using multiple threads to increase overall processing speed. Both of these cases, and more, are covered by the BridgeMessageDispatcher class. Its default instance is created for each simulation and is accessible through the class' static field Instance. Note that this instance only exists during simulation.

Interacting with the message dispatcher is simplified to a single method, TryQueueTask. Its signature and detailed description can be found below.

public bool TryQueueTask<T>(
    Publisher<T> publisher,
    T data,
    Action<bool> callback = null,
    object exclusiveToken = null) where T : class, new()

Parameters:

  • publisher - Publisher delegate for active bridge, created by calling AddPublisher<T>(Topic) on bridge instance. See bridge registration section for an example.
  • data - Sensor data to convert and send. If type of the data implements IThreadCachedBridgeData interface, thread-exclusive cache will be used to store copy of the data. Original data will never be accessed by worker thread in such case. See sensor data types section for more details.
  • callback (optional) - Delegate that will be called after conversion and publishing process was resolved. This will always be executed, but if delegate boolean parameter is set to false, the process has failed. See callback section for more information.
  • exclusiveToken (optional) - Token used to enforce exclusivity for given task. Only one publishing process with the same token can be active or queued at a time - new requests with the same token will be dropped. See exclusive publishing section for more details.

Returns:

  • bool - True if message was enqueued and is expected to be converted and sent. False if it was immediately dropped for any reason (see dropping messages section for a list of possible causes).

Multi-threading top#

BridgeMessageDispatcher internally manages a number of background threads that execute conversion and publishing code defined in bridge plugins. By default, only a single background thread is running, but dispatcher will automatically spawn new ones if the queue is not expected to be unloaded in a single cycle. Maximum number of worker threads is defined by logical core count of your CPU. If one or more threads are idle for a while, thread count will be reduced. This means that threads will be scaled up and down dynamically, based on current bridge data throughput.

It's possible for extreme situations to occur, in which thread count reached its limit and yet queue size keeps growing. This can happen if large messages requiring long processing are send very often. At this stage CPU is usually close to 100% utilization on all cores and not much can be done. In this situation, dispatcher will block main thread execution until one of worker threads is finished with its work. This will usually severely reduce framerate, so you should consider reducing bridge data throughput or using more powerful CPU at that stage. Warning is displayed when this situation occurs.

Dropping messages top#

There are situations in which calling TryQueueTask will not publish the data. BridgeMessageDispatcher considers message as published when publisher delegate for active bridge finishes execution without throwing any exception. Note that this doesn't always mean that message was properly received, or even sent. This is heavily dependent on bridge implementation. If you're implementing your own bridge, it's suggested to throw exception whenever something goes wrong with publishing process. BridgeMessageDispatcher class will catch the exception, allow sensor to react to it, and display it in console. The exception will not be thrown again to prevent worker thread from crashing.

For standard causes of dropping messages (unrelated to bridge implementation), consult the list below. Most of them will occur when enqueuing attempt happens, in which case TryQueueTask will return false. If task was enqueued correctly, and problem occurred during delegate execution, TryQueueTask will return true, but the problem still can be reacted to through callback.

  • Time is paused (TryQueueTask returns false, callback executes with false)

    If time in simulation is paused, every module should hold its execution. If sensor ignores time state and attempts to publish data during pause, message will be dropped.

  • Exclusive token is already in use (TryQueueTask returns false, callback executes with false)

    Message with the same token is either queued or being processed. See exclusive publishing for details about token usage.

  • Exception was thrown by publisher delegate (TryQueueTask returns true, callback executes with false)

    Reasons for exception occurring at this stage are dependent on bridge type. Possible causes include lack of compatible publisher, errors during conversion process or problems with connection. For more details check console (if you're running Simulator in Unity Editor) or application logs.

Callback top#

When attempting to enqueue bridge message, you can provide optional delegate (callback) with a single boolean parameter. This delegate will always be invoked, even if message was dropped immediately and never enqueued. If bridge properly executed publisher delegate, callback will be invoked with true passed as its parameter. Details depends on bridge implementation, but this usually means that message was properly converted and sent. If message was dropped for any reason (see dropping messages), or if exception was thrown anywhere in publisher delegate code, callback will be invoked with false passed as its parameter.

Some uses of the callback include cleaning up resources after message is published, reacting to expected failures, or waiting for very expensive bridge operations to finish before continuing with sensor work.

Exclusive publishing top#

There are use cases in which multi-threaded character of the publishing process, mixed with unknown processing times, can have undesired implications. If, for example, you enqueue two subsequent messages with timestamps, older message might, in theory, finish publishing later. If message order is critical and you want to enforce it, there are a few options:

  • publish messages synchronously - will block main thread
  • publish using your own, single background thread - requires synchronization between main thread (that can access time) and background thread
  • use callbacks, start new publishing process when previous finishes
  • provide exclusive token

While all options are viable, providing exclusive token is probably the easiest solution if synchronous execution is undesired.

The exclusive token can be an instance of any object. Its purpose is simple - only one publish request with individual token can be active at a time. If you call TryQueueTask and provide exclusive token, two things can happen:

  • if another request using the same token is currently active, message is immediately dropped and TryQueueTask returns false
  • if the token is not used by any of the active requests, message is processed as usual - if it's not dropped for different reason, message is enqueued, token becomes active and TryQueueTask returns true

The token becomes active immediately when request using it becomes enqueued, and becomes inactive when publish delegate execution either succeeds or crashes. If you provide the exclusive token, messages using it will always be published in chronological order, although their frequency depends on conversion time and queue load.

Please note that using the exclusive token will drop already prepared message. If the performance cost of preparing the message is significant, you should consider using callback instead to wait for previous message to finish publishing before preparing the next one.

Frequency-based publishing top#

Many sensors have predefined frequency at which they collect and publish data. If your sensor is supposed to work with a set frequency, the easiest way to achieve this is to use FrequencySensorBase.

FrequencySensorBase is an abstract class that preserves all standard SensorBase class behavior, but provides functionality that is supposed to simplify, or even completely remove any need for time tracking in your sensor code.

When you create sensor derived from FrequencySensorBase class, you will need provide implementation for its abstract member, UseFixedUpdate. It's boolean property with only a read accessor. Simply make it return false if you want to use standard update loop, or true if your use case depends on using Unity's FixedUpdate loop. For vast majority of cases, using standard update loop should be enough.

In both cases sensor updates will happen with set frequency, but you shouldn't expect perfect intervals. Your code will be executed when either Update (with intervals dependent on framerate) or FixedUpdate (with fixed intervals, 10 ms default in SVL Simulator) is executed by Unity. In both cases, most of the updates in the loop are ignored, and the ones closest to sensor intervals are chosen. Your code will be executed at that point.

Sensor frequency is defined through Frequency parameter. It's marked as sensor parameter, and therefore will be visible among sensor settings. See sensor plugins page for more information about sensor parameters. Frequency is based on simulation time, so sensor will handle pause or non-realtime mode properly.

Last thing that has to be done is overriding SensorUpdate method. Code inside will be executed with set frequency, unless simulation is paused. If you need to use either Update or FixedUpdate method directly, you can override them too, but remember to call their base implementation before your code.

For an example of FrequencySensorBase usage, see publishing with set frequency section.

Asynchronous GPU readback#

In some cases, sensors utilize GPU capabilities to process data. This doesn't only include rendering in camera-based sensors, but also cases where highly parallel computations need to be performed on large data sets. In both cases data is stored on GPU (as either textures or compute buffers), and then read back to process and publish on CPU. Reading back large buffers or textures can take significant time, and may result in latency spikes if main thread is blocked during this process. As usual with long operations, you might want to perform this process asynchronously and make sure that main thread is able to keep stable framerate.

You can see longer processing times in publishing frames on the frame time chart below. Upper row shows case with synchronous GPU readback, lower row shows the same setup with asynchronous GPU readback. Notice that latency spikes are significantly reduced.

Simulator provides an utility class (GpuReadbackPool<TData, T>) that can help with pooling and tracking asynchronous GPU readbacks. It will internally handle some of the problems related to asynchronous reads: - Multiple readbacks can be active at once - required resources will be pooled and tracked automatically - Requests will always be finished in the same order they were started - Timestamp of the readback start will be stored with the data - You don't need to track state of active readbacks, callback will be called for each upon completion

To use this system, you have to create an instance of GpuReadbackPool<TData, T> for your sensor. It requires explicit initialization through Initialize method, which takes buffer size and delegate called on completion as parameters. If the buffer size ever changes (e.g. when output texture resolution for sensor is modified), you can change the buffer size through Resize method.

TData type parameter is usually GpuReadbackData<T>, but can also be a type derived from it. T must be a struct compatible with NativeArray<T>.

With this kind of setup, using asynchronous GPU readbacks is very simple - when operations on GPU are finished, you can simply call:

ReadbackPool.StartReadback(GpuResource);

When readback finishes, delegate declared in Initialize method will be called. Instance of GpuReadbackData<T> will be passed as a parameter, containing gpuData (native array containing data read back from the GPU) and captureTime (timestamp of the request start) fields. If you used your own type derived from GpuReadbackData<T>, your custom fields will be accessible too.

StartReadback has multiple overloads that allow you to specify some of the readback parameters (like texture format, buffer offset etc.). It also returns instance of the GpuReadbackData<T>, so you can populate it with any data at the time of readback start. This is especially useful if you want to include any custom fields.

Please note that GpuReadbackPool<TData, T> is a passive class and requires explicit updates from outside. You should call Process method whenever you want to trigger a check on pending readbacks. This is usually done from Update method of sensor class.

For an example of GpuReadbackPool<TData, T> usage, see reading GPU data asynchronously section.

Code examples top#

Snippets presented here are intended to explain parts of the sensor code responsible for message publishing. Large parts of the sensor code are omitted - see sensor plugins page for more complete examples.

Custom sensor data type top#

This example shows data type that supports thread-side caching through IThreadCachedBridgeData interface (see sensor data types for details).

array field in this use case will contain reference to persistent buffer used by sensor. The same buffer is passed for all messages. For the sake of this example, let's assume that buffer size is constant, but data size is not. dataSize field will contain size of valid data in the buffer. Check one of publishing examples below to see how data is prepared.

public class FloatArray : IThreadCachedBridgeData<FloatArray>
{
    // Array reference passed from sensor. Shared across multiple messages.
    public float[] array;

    // Specifies size of valid data in array.
    public int dataSize;

    // Timestamp that is required by receiving side.
    public double timestamp;

    // Method defined by IThreadCachedBridgeData interface.
    // This will be called internally by BridgeMessageDispatcher before enqueuing message.
    // Parameter target contains instance provided by pooling system.
    // Implementation must perform deep copy from current instance to target.
    public void CopyToCache(FloatArray target)
    {
        // If array in target instance was not initialized or is too small, allocate new one.
        if (target.array == null || target.array.Length < dataSize)
            target.array = new float[dataSize];

        // Copy all valid data to array in target instance.
        // Data beyond dataSize (if present) has undefined value, so it's skipped.
        Array.Copy(array, target.array, dataSize);

        // Value types can be simply assigned in target instance.
        target.dataSize = dataSize;
        target.timestamp = timestamp;
    }

    // Method defined by IThreadCachedBridgeData interface.
    // This should return hash that groups instances into compatible sub-pools.
    public int GetHash()
    {
        // No sub-pools required - return the same value for all instances.
        return 0;
    }
}

Bridge registration top#

During initialization process, each sensor must define its expected interactions with bridge.

OnBridgeSetup is an abstract method defined in SensorBase class, that is invoked once for each sensor if bridge instance is present. Each sensor must implement it. You can use it to get reference to the bridge instance and register publisher (or subscriber) of your data type. Delegate returned by AddPublisher method is responsible for converting and publishing messages.

Data type used for this example (FloatArray) is described in its own section. Note that bridge plugin must implement conversion between this type and its own, bridge-specific message format. You can find more information about bridge plugin implementation on the bridge plugins page.

// Declare sensor name and types of data that it will publish over the bridge.
[SensorType("Float Array Sensor", new[] {typeof(FloatArray)})]
public class FloatArraySensor : SensorBase
{
    // Field used to store reference to the bridge instance.
    BridgeInstance Bridge;

    // Field used to store publisher delegate.
    Publisher<FloatArray> Publish;

    // OnSetupBridge will be called during initialization if any bridge is present.
    public override void OnBridgeSetup(BridgeInstance bridge)
    {
        // Store reference to bridge instance so we can track its state.
        Bridge = bridge;

        // Register publisher of FloatArray data type for the bridge.
        // Returned delegate can be used to convert and publish messages.
        Publish = bridge.AddPublisher<FloatArray>(Topic);
    }

    // Visualization and sensor update code is omitted in this example.
    [...]
}

Publishing using exclusive token top#

This example shows how custom data can be published using exclusive token - it assumes that message order is critical and enforces it by dropping any messages until previously enqueued one is published.

Since array used as buffer is reused every frame, data type in this example utilizes thread-side caching. Exact time that conversion and publishing will take is unknown, so there's a risk that this data might be overwritten before conversion finishes. Using data type that supports caching is invisible from sensor code (as shown below), but all cross-thread access issues - like the one mentioned - are mitigated.

// Declare sensor name and types of data that it will publish over the bridge.
[SensorType("Float Array Sensor", new[] {typeof(FloatArray)})]
public class FloatArraySensor : SensorBase
{
    // Field used to store reference to the bridge instance.
    BridgeInstance Bridge;

    // Field used to store publisher delegate.
    Publisher<FloatArray> Publish;

    // Preallocated array used as a buffer.
    public float[] array;

    // Update is called every frame.
    private void Update()
    {
        // Theoretical method that would calculate output for this sensor.
        // This is executed even if bridge is not connected, as data might be used inside Simulator.
        // Data is put in array. Since data size is dynamic, and array is not, dataSize stores valid data count.
        PrepareSensorData(array, out var dataSize);

        // Check if bridge exists and is currently connected.
        if (Bridge is {Status: Status.Connected})
        {
            // Fetch current time from simulation manager.
            var time = SimulatorManager.Instance.CurrentTime;

            // Create data object for new message. Note that persistent buffer is passed as reference.
            var data = new FloatArray {array = array, dataSize = dataSize, timestamp = time};

            // Try enqueueing publish request. Instance of this sensor is used as exclusive token.
            // If simulation is paused or token is already in use, this message will be dropped.
            // Sensor doesn't care whether message dropped or not, so returned value is ignored.
            BridgeMessageDispatcher.Instance.TryQueueTask(Publish, data, this);
        }
    }

    // Initialization and visualization code is omitted in this example.
    [...]
}

Publishing with set frequency top#

This example shows how FrequencySensorBase can be used to calculate and publish data with set interval. See frequency-based publishing section for details.

Similar to previous example, used data type utilizes thread-side caching to prevent simultaneous access from multiple threads to array buffer.

// Declare sensor name and types of data that it will publish over the bridge.
// Sensor type is derived from FrequencySensorBase instead of usual SensorBase.
[SensorType("Float Array Sensor", new[] {typeof(FloatArray)})]
public class FloatArraySensor : FrequencySensorBase
{
    // Field used to store reference to the bridge instance.
    BridgeInstance Bridge;

    // Field used to store publisher delegate.
    Publisher<FloatArray> Publish;

    // Preallocated array used as a buffer.
    public float[] array;

    // FixedUpdate is not required - standard update loop will be used.
    protected override bool UseFixedUpdate => false;

    // SensorUpdate is executed with frequency defined by Frequency sensor parameter in base class.
    // Frequency is based on simulation time, so pause or non-realtime mode will be handled properly.
    private override void SensorUpdate()
    {
        // Theoretical method that would calculate output for this sensor.
        // This is executed even if bridge is not connected, as data might be used inside Simulator.
        // Data is put in array. Since data size is dynamic, and array is not, dataSize stores valid data count.
        PrepareSensorData(array, out var dataSize);

        // Check if bridge exists and is currently connected.
        if (Bridge is {Status: Status.Connected})
        {
            // Fetch current time from simulation manager.
            var time = SimulatorManager.Instance.CurrentTime;

            // Create data object for new message. Note that persistent buffer is passed as reference.
            var data = new FloatArray {array = array, dataSize = dataSize, timestamp = time};

            // Try enqueueing publish request. This will always succeed, as both requirements are met:
            // - time is not paused (SensorUpdate wouldn't be called if it was)
            // - optional exclusive token is not provided
            BridgeMessageDispatcher.Instance.TryQueueTask(Publish, data);
        }
    }

    // Initialization and visualization code is omitted in this example.
    [...]
}

Reading GPU data asynchronously top#

This example shows how GpuReadbackPool<TData, T> can be used to perform GPU readbacks asynchronously.

// Declare sensor name and types of data that it will publish over the bridge.
[SensorType("Float Array Sensor", new[] {typeof(FloatArray)})]
public class FloatArraySensor : SensorBase
{
    // Field used to store reference to the bridge instance.
    BridgeInstance Bridge;

    // Field used to store publisher delegate.
    Publisher<FloatArray> Publish;

    // Preallocated array used as a buffer.
    public float[] array;

    // Buffer used for storing data on GPU side.
    private ComputeBuffer computeBuffer;

    // Asynchronous GPU readback pool.
    private GpuReadbackPool<GpuReadbackData<Vector4>, Vector4> readbackPool;

    // Initialization method - prepare required resources.
    protected override void Initialize()
    {
        // Assume 100 floats will be used per data packet.
        array = new float[100];
        computeBuffer = new ComputeBuffer(100, sizeof(float));

        // Initialize GPU readback pool with 100 floats capacity.
        // OnReadbackComplete delegate will be called for each completed readback.
        readbackPool = new GpuReadbackPool<GpuReadbackData<float>, float>();
        readbackPool.Initialize(100, OnReadbackComplete);
    }

    // Deinitialization method - free any currently used resources.
    protected override void Deinitialize()
    {
        computeBuffer.Release();
        readbackPool.Dispose();
    }

    // Update is called every frame.
    private void Update()
    {
        // Explicitly trigger update of all currently pending readbacks.
        readbackPool.Process();

        // Theoretical method that would calculate output for this sensor.
        // This is executed even if bridge is not connected, as data might be used inside Simulator.
        // Data is calculated and kept on the GPU, in the compute buffer.
        PrepareSensorData(computeBuffer);

        // Request asynchronous read of the GPU resource.
        // Completion will trigger OnReadbackComplete declared during initialization.
        readbackPool.StartReadback(computeBuffer);
    }

    // Delegate called when readback is completed
    private void OnReadbackComplete(GpuReadbackData<float> data)
    {
        // Check if bridge exists and is currently connected.
        if (Bridge is {Status: Status.Connected})
        {
            // data.gpuData contains data read back from the GPU.
            // Perform copy from NativeArray to managed array, since our message type doesn't support it.
            data.gpuData.CopyTo(array);

            // Publish data. See previous examples for details.
            // Note that time passed is capture time, not current time.
            var data = new FloatArray {array = array, dataSize = 100, timestamp = data.captureTime};
            BridgeMessageDispatcher.Instance.TryQueueTask(Publish, data);
        }
    }

    // Bridge setup and visualization code is omitted in this example.
    [...]
}