Skip to content

MQTTnet(2000 links) eating more and more memories until the system was crashed. #2195

@zmrbak

Description

@zmrbak

using IotDevicesMock.Utils;
using MQTTnet;
using MQTTnet.Protocol;
using System.Security.Authentication;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;

namespace IotDevicesMock.Models
{
public class Device : IDisposable
{
[JsonIgnore]
private static readonly ILogger logger = ServiceLocator.Provider.GetRequiredService().CreateLogger("Device");
[JsonIgnore]
private static readonly IConfiguration configuration = ServiceLocator.Provider.GetRequiredService();
private static readonly Random _random = new Random();
private static readonly object _randomLock = new object();

    private bool _disposed = false;
    private Task _reportTask;
    private CancellationTokenSource _reportCts; // 独立控制报告任务的取消

    public string DeviceId { get; } = "T" + DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString().Substring(6) + Guid.NewGuid().ToString("N").Substring(0, 4).ToUpper();
    public DeviceStatus Status { get; set; }
    public SwitchStatus SwitchStatus { get; set; }
    public DateTime StartTime { get; set; }
    public long MessagesAcknowledged;
    public long MessagesSent;
    public int ConnectionErrors;
    [JsonIgnore]
    public CancellationToken cancellationToken { get; set; } = CancellationToken.None;
    [JsonIgnore]
    public MqttConfig MqttConfig { get; }
    [JsonIgnore]
    public IMqttClient MqttClient { get; private set; }
    public int RetryCount { get; set; } = -1;
    public int Random { get; } = GetRandomNumber(1000, 10000);

    public Device()
    {
        MqttConfig = configuration.GetSection("MqttConfig").Get<MqttConfig>() ?? throw new InvalidOperationException("MQTT configuration is missing");
    }

    public async Task StartMqttAsync(CancellationToken cancellationToken)
    {
        this.cancellationToken = cancellationToken;
        StartTime = DateTime.UtcNow;

        // 每次启动前先释放旧资源
        if (_reportCts != null)
        {
            _reportCts.Cancel();
            _reportCts.Dispose();
        }
        _reportCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

        // 用循环处理重连,避免资源堆积
        while (!cancellationToken.IsCancellationRequested)
        {
            RetryCount++;
            try
            {
                // 释放旧客户端(关键优化)
                await DisposeMqttClient();

                var clientFactory = new MqttClientFactory();
                var client = clientFactory.CreateMqttClient();

                var options = new MqttClientOptionsBuilder()
                    .WithClientId(DeviceId)
                    .WithTcpServer(MqttConfig.BrokerAddress, MqttConfig.BrokerPort)
                    .WithCredentials(MqttConfig.UserName, MqttConfig.Password)
                    .WithWillTopic(MqttConfig.WillTopic.Replace("{id}", DeviceId).Replace("{num}", Random.ToString()))
                    .WithWillPayload(Encoding.UTF8.GetBytes("客户端离线"))
                    .WithWillQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
                    .WithTimeout(TimeSpan.FromSeconds(50))
                    .WithCleanSession(MqttConfig.CleanSession);

                if (MqttConfig.UseTls)
                {
                    options.WithTlsOptions(tls =>
                    {
                        tls.UseTls(true);
                        tls.WithSslProtocols(SslProtocols.Tls12);
                        tls.WithAllowUntrustedCertificates();
                        tls.WithCertificateValidationHandler(_ => true);
                    });
                }

                // 注册事件(合并重复订阅,避免内存泄漏)
                client.ApplicationMessageReceivedAsync += OnApplicationMessageReceived;
                client.ConnectedAsync += OnConnectedAsync;
                client.DisconnectedAsync += OnDisconnectedAsync;
                MqttClient = client;

                // 随机延迟启动(使用静态Random实例)
                int delayMs = GetRandomNumber(1, (int)(1000 * MqttConfig.KeepAlivePeriodSeconds));
                await Task.Delay(delayMs, cancellationToken);

                var connectResult = await client.ConnectAsync(options.Build(), cancellationToken);
                if (connectResult.ResultCode == MqttClientConnectResultCode.Success)
                {
                    await SubscribeTopicAsync();
                    // 启动状态报告任务(使用独立CancellationToken)
                    _reportTask = ReportStatusAsync(_reportCts.Token);
                    break; // 连接成功则退出循环
                }
                else
                {
                    logger.LogError("Client {DeviceId} failed to connect", DeviceId);
                    Interlocked.Increment(ref ConnectionErrors);
                }
            }
            catch (OperationCanceledException)
            {
                logger.LogInformation("MQTT 连接操作已取消");
                throw;
            }
            catch (Exception ex)
            {
                logger.LogError(ex, "MQTT 连接失败(重试次数: {RetryCount})", RetryCount);
                await Task.Delay(5000, cancellationToken);
            }
        }
    }

    private async Task SubscribeTopicAsync()
    {
        if (!MqttClient.IsConnected)
        {
            logger.LogWarning($"订阅失败:MQTT客户端未连接");
            return;
        }

        string subTopic = MqttConfig.SubTopic.Replace("{num}", Random.ToString()).Replace("{id}", DeviceId);
        await MqttClient.SubscribeAsync(new MqttTopicFilterBuilder()
                    .WithTopic(subTopic)
                    .WithQualityOfServiceLevel(MqttConfig.SubTopicQualityOfService)
                    .Build());
        logger.LogInformation($"已订阅MQTT主题: {subTopic}(QoS: {MqttConfig.SubTopicQualityOfService})");
    }

    private async Task ReportStatusAsync(CancellationToken token)
    {
        try
        {
            while (!token.IsCancellationRequested)
            {
                var statusReport = new
                {
                    MessageId = "auto",
                    Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString(),
                    Status = SwitchStatus.ToString(),
                };

                // 减少日志序列化开销(仅在调试时输出详细日志)
                if (logger.IsEnabled(LogLevel.Information))
                {
                    logger.LogInformation("Device {DeviceId} status: {Status}", DeviceId, statusReport.Status);
                }

                // 仅在连接状态下执行发布
                if (MqttClient?.IsConnected == true)
                {
                    var payload = JsonSerializer.SerializeToUtf8Bytes(statusReport);
                    var message = new MqttApplicationMessageBuilder()
                        .WithTopic(MqttConfig.PubTopic.Replace("{id}", DeviceId).Replace("{num}", Random.ToString()))
                        .WithPayload(payload)
                        .WithQualityOfServiceLevel(MqttConfig.PubTopicQualityOfService)
                        .Build();

                    await MqttClient.PublishAsync(message, token);
                    Interlocked.Increment(ref MessagesSent);
                }
                else
                {
                    logger.LogWarning("Device {DeviceId} is not connected, cannot send status", DeviceId);
                    // 连接断开时触发重连(通过退出循环让外层逻辑处理)
                    break;
                }

                // 等待下一次报告(使用配置的周期)
                await Task.Delay(TimeSpan.FromSeconds(MqttConfig.KeepAlivePeriodSeconds), token);
            }

            // 若因连接问题退出,尝试重连
            if (!token.IsCancellationRequested)
            {
                await StartMqttAsync(token);
            }
        }
        catch (OperationCanceledException)
        {
            logger.LogInformation("Device {DeviceId} status report canceled", DeviceId);
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "Device {DeviceId} status report error", DeviceId);
            // 出错后延迟重连,避免频繁重试
            if (!token.IsCancellationRequested)
            {
                await Task.Delay(3000, token);
                await StartMqttAsync(token);
            }
        }
    }

    private Task OnDisconnectedAsync(MqttClientDisconnectedEventArgs args)
    {
        logger.LogWarning("Device {DeviceId} disconnected: {Reason}", DeviceId, args.ReasonString);
        return Task.CompletedTask;
    }

    private Task OnConnectedAsync(MqttClientConnectedEventArgs args)
    {
        logger.LogInformation("Device {DeviceId} connected to MQTT server", DeviceId);
        return Task.CompletedTask;
    }

    private Task OnApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
    {
        Interlocked.Increment(ref MessagesAcknowledged);
        // 合并原有的两个消息处理逻辑,避免重复订阅
        string topic = e.ApplicationMessage.Topic;
        string payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
        logger.LogInformation("Device {DeviceId} received message: Topic={Topic}, Payload={Payload}", DeviceId, topic, payload);
        return Task.CompletedTask;
    }

    // 单独的MQTT客户端释放方法
    private async Task DisposeMqttClient()
    {
        if (MqttClient != null)
        {
            // 先取消事件订阅
            UnsubscribeEvents();

            // 断开连接(带超时保护)
            if (MqttClient.IsConnected)
            {
                try
                {
                    await MqttClient.DisconnectAsync(new MqttClientDisconnectOptions { ReasonString = "Normal disconnect" },
                        CancellationToken.None).WaitAsync(TimeSpan.FromSeconds(5));
                }
                catch (TimeoutException)
                {
                    logger.LogWarning("Device {DeviceId} disconnect timed out", DeviceId);
                }
            }

            // 释放客户端
            MqttClient.Dispose();
            MqttClient = null;
        }
    }

    private void UnsubscribeEvents()
    {
        if (MqttClient == null) return;

        // 移除所有事件订阅(必须与注册的方法一一对应)
        MqttClient.ApplicationMessageReceivedAsync -= OnApplicationMessageReceived;
        MqttClient.ConnectedAsync -= OnConnectedAsync;
        MqttClient.DisconnectedAsync -= OnDisconnectedAsync;
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    protected virtual void Dispose(bool disposing)
    {
        if (_disposed) return;

        if (disposing)
        {
            // 取消并释放报告任务
            if (_reportCts != null)
            {
                _reportCts.Cancel();
                _reportCts.Dispose();
                _reportCts = null;
            }

            // 等待报告任务结束(带超时)
            if (_reportTask != null && !_reportTask.IsCompleted)
            {
                try
                {
                    _reportTask.Wait(1000);
                }
                catch (AggregateException) { } // 忽略取消异常
                _reportTask = null;
            }

            // 释放MQTT客户端
            _ = DisposeMqttClient(); // 异步释放不阻塞Dispose
        }

        _disposed = true;
    }

    private static int GetRandomNumber(int min, int max)
    {
        lock (_randomLock)
        {
            return _random.Next(min, max);
        }
    }
}

}

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions