Skip to content

Enhancement - Support IList<ArraySegment<byte>> overload on RtpChannel.Send #1433

@FinHorsley

Description

@FinHorsley

Proposal

Consider adding the following method to the RtpChannel. (Happy to raise PR if approved)

public virtual SocketError Send(RTPChannelSocketsEnum sendOn, IPEndPoint dstEndPoint, IList<ArraySegment<byte>> buffers)

This allows for optimisations when sending packets, as we no longer need to build one overall buffer that represents both Header+Payload. Instead , can send separate Header / Payload buffers

e.g.

protected void SendRtpRaw(ArraySegment<byte> data, uint timestamp, int markerBit, int payloadType, Boolean checkDone, ushort? seqNum = null, bool useScatterSend = false)
{
    // ... rest omitted for brevity

  if (protectRtpPacket == null)
  {
     rtpChannel.Send(RTPChannelSocketsEnum.RTP, DestinationEndPoint, [rtpPacket.Header.GetBytes(), rtpPacket.GetPayloadSegment()]);
  }
  else
  {
      // ... omitted, but fallback to original var rtpBuffer = rtpPacket.GetBytes();
      // ...
      // rtpChannel.Send(RTPChannelSocketsEnum.RTP, DestinationEndPoint, rtpBuffer);
      // This _could_ be further optimised, but i was trying to avoid breaking changes
  }
}

Implementation

This is a rough outline of what an implementation could look like. I have tested manually with audio / video streams, but am not an expert in this so might have missed a howler here! I think there are also some regressions in the EndSendTo equivalent

public virtual SocketError Send(RTPChannelSocketsEnum sendOn, IPEndPoint dstEndPoint, IList<ArraySegment<byte>> buffers)
{
    if (m_isClosed)
    {
        return SocketError.Disconnecting;
    }

    if (dstEndPoint == null)
    {
        throw new ArgumentException(nameof(dstEndPoint), "An empty destination was specified to Send in RTPChannel.");
    }

    if (IPAddress.Any.Equals(dstEndPoint.Address) || IPAddress.IPv6Any.Equals(dstEndPoint.Address))
    {
        logger.LogWarning("The destination address for Send in RTPChannel cannot be {Address}.", dstEndPoint.Address);
        return SocketError.DestinationAddressRequired;
    }

    try
    {
        // 2) Pick the right socket
        var sendSocket = RtpSocket;
        if (sendOn == RTPChannelSocketsEnum.Control)
        {
            LastControlDestination = dstEndPoint;
            sendSocket = m_controlSocket ?? throw new ApplicationException(
                "RTPChannel was asked to send on the control socket but none exists.");
        }
        else
        {
            LastRtpDestination = dstEndPoint;
        }

        // 3) IPv4→IPv6 mapping for Mono if needed
        if (dstEndPoint.AddressFamily == AddressFamily.InterNetwork && sendSocket.AddressFamily != AddressFamily.InterNetwork)
        {
            dstEndPoint = new IPEndPoint(dstEndPoint.Address.MapToIPv6(), dstEndPoint.Port);
        }

        // 4) Ensure receiver is running
        if (!m_rtpReceiver.IsRunningReceive && !m_rtpReceiver.IsClosed)
        {
            m_rtpReceiver.BeginReceiveFrom();
        }

        var args = new SocketAsyncEventArgs
        {
            RemoteEndPoint = dstEndPoint,
            SocketFlags = SocketFlags.None,
            BufferList = buffers,
            UserToken = sendSocket
        };

        args.Completed += (_, ea) =>
        {
            try
            {
                var sock = (Socket) ea.UserToken;
                if (ea.SocketError != SocketError.Success)
                {
                    logger.LogWarning("Send error on socket {Socket}: {Error}", sock, ea.SocketError);
                }
            }
            catch (Exception ex)
            {
                logger.LogError(ex, "Exception in RTPChannel.Send Completed handler. {Message}", ex.Message);
            }
            finally
            {
                ea.Dispose();
            }
        };

        // returns false ⇒ completed synchronously
        if (!sendSocket.SendToAsync(args))
        {
            // if it completed in-line, manually invoke our cleanup
            try
            {
                if (args.SocketError != SocketError.Success)
                {
                    logger.LogWarning("Send error on socket {Socket}: {Error}", sendSocket, args.SocketError);
                }
            }
            finally
            {
                args.Dispose();
            }
        }

        return SocketError.Success;
    }
    catch (ObjectDisposedException)
    {
        // socket was closed underfoot
        return SocketError.Disconnecting;
    }
    catch (SocketException sockExcp)
    {
        return sockExcp.SocketErrorCode;
    }
    catch (Exception excp)
    {
        logger.LogError(excp, "Exception in RTPChannel.Send. {ErrorMessage}", excp.Message);
        return SocketError.Fault;
    }
}

Benchmarks

| Method   | TotalDurationSec | Mean     | Error   | StdDev  | Ratio | Completed Work Items | Lock Contentions | Exceptions | Gen0     | Allocated | Alloc Ratio |
|--------- |----------------- |---------:|--------:|--------:|------:|---------------------:|-----------------:|-----------:|---------:|----------:|------------:|
| Original | 20               | 312.7 us | 1.48 us | 1.16 us |  1.00 |                    - |                - |          - | 101.5625 |    625 KB |        1.00 |
| Improved | 20               | 300.2 us | 1.32 us | 1.17 us |  0.96 |                    - |                - |          - |  58.5938 | 359.38 KB |        0.57 |
[MemoryDiagnoser]
[ThreadingDiagnoser]
[ExceptionDiagnoser]
public class AudioStreamBenchmark
{
    private readonly AudioStream _audioStream;

    public AudioStreamBenchmark()
    {
        _audioStream = new AudioStream(new RtpSessionConfig
        {
            BindAddress = IPAddress.Loopback,
            BindPort = 0,
            IsMediaMultiplexed = false,
            IsRtcpMultiplexed = false,
            RtpPortRange = new(10000, 12999),
            RtpSecureMediaOption = RtpSecureMediaOptionEnum.None
        }, 0);
        _audioStream.MediaType = SDPMediaTypesEnum.audio;
        _audioStream.LocalTrack= new MediaStreamTrack(SDPMediaTypesEnum.audio, true, [new SDPAudioVideoMediaFormat(SDPMediaTypesEnum.audio, 0, "test")], MediaStreamStatusEnum.SendRecv);
        _audioStream.RemoteTrack = new MediaStreamTrack(SDPMediaTypesEnum.audio, true, [new SDPAudioVideoMediaFormat(SDPMediaTypesEnum.audio, 0, "test")], MediaStreamStatusEnum.SendRecv);
        _audioStream.AddRtpChannel(new NoOpRtpChannel());
        _audioStream.DestinationEndPoint = new IPEndPoint(IPAddress.Loopback, 0);
    }

    private const int    SampleRate       = 8000;      // samples/sec
    private const int    Channels         = 1;         // mono
    private const int    BitsPerSample    = 16;        // 16-bit PCM
    private const double FrameDurationSec = 0.020;     // 20 ms
    private int _frameCount;
    private ArraySegment<byte> _b;

    [Params(20.0)]
    public double TotalDurationSec { get; set; }

    [GlobalSetup]
    public void Setup()
    {
        // compute frame size
        var bytesPerSample = BitsPerSample / 8;                         // = 2
        var samplesPerFrame = (int)(SampleRate * FrameDurationSec);     // = 160
        var frameSize       = samplesPerFrame * Channels * bytesPerSample; // = 320
        _frameCount = (int)Math.Round(TotalDurationSec / FrameDurationSec);

        _b = new byte[frameSize];
    }

    [Benchmark(Baseline = true)]
    public void Original()
    {
        for (var i = 0; i < _frameCount; i++)
        {
            _audioStream.SendAudio(0, _b, useMultiBuffer: false); // Test only addition to allow simpler benchmarking
        }
    }

    [Benchmark]
    public void Improved()
    {
        for (var i = 0; i < _frameCount; i++)
        {
            _audioStream.SendAudio(0, _b, useMultiBuffer: true);
        }
    }
}

Considerations/Questions

  • Is Socket SendToAsync supported across targetFrameworks? I haven't worked with NetFramework, only NetCore and later

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions