using MQTTnet; using MQTTnet.Client; using MQTTnet.Protocol; using System.Text; namespace PMqttClient { public enum MqttQoSLevel { AtMostOnce = 0x00, AtLeastOnce = 0x01, ExactlyOnce = 0x02 } public class MqttMultiTopicClient { private IMqttClient _mqttClient; private Dictionary _topics; public event EventHandler LogHandler; public event EventHandler MessageHandler; public MqttMultiTopicClient() { var mqttFactory = new MqttFactory(); _mqttClient = mqttFactory.CreateMqttClient(); _topics = new Dictionary(); _mqttClient.ConnectedAsync += MqttClient_ConnectedAsync; _mqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync; _mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync; } public async Task ConnectAsync(string broker, int port, string clientId) { var options = new MqttClientOptionsBuilder() .WithClientId(clientId) .WithTcpServer(broker, port) .Build(); WriteLog($"Try to connect to {broker}:{port} with client id '{clientId}'..."); await _mqttClient.ConnectAsync(options); } public async Task AddTopicAsync(string topic, MqttQoSLevel qos) { if (_topics.ContainsKey(topic)) { WriteLog($"topic '{topic}' is added already."); return; } _topics.Add(topic, qos); if (_mqttClient.IsConnected) await SubscribeAsync(topic, qos); } public async Task RemoveTopicAsync(string topic) { if (!_topics.ContainsKey(topic)) return; _topics.Remove(topic); if (_mqttClient.IsConnected) await _mqttClient.UnsubscribeAsync(topic); WriteLog($"Remove topic '{topic}'."); } public async Task PubishAsync(string topic, string message, MqttQoSLevel qos) { var mqttMessage = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(message) .WithQualityOfServiceLevel(ConvertQoSLevel(qos)) .Build(); await _mqttClient.PublishAsync(mqttMessage); WriteLog($"Publish message '{message}' to topic '{topic}' with QoS {qos.ToString()}."); } public async Task DisconnectAsync() { WriteLog($"Try to disconnect..."); await _mqttClient.DisconnectAsync(); } private async Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg) { if (MessageHandler is not null) { var message = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment); MessageHandler.Invoke(this, message); } } private async Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg) { WriteLog("Disconnected to MQTT Broker."); } private async Task MqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg) { WriteLog("Connected to MQTT Broker."); foreach (var topic in _topics) await SubscribeAsync(topic.Key, topic.Value); } private void WriteLog(string message) { if (LogHandler is not null) LogHandler.Invoke(this, message); } private async Task SubscribeAsync(string topic, MqttQoSLevel qos) { await _mqttClient.SubscribeAsync(new MqttTopicFilterBuilder() .WithTopic(topic) .WithQualityOfServiceLevel(ConvertQoSLevel(qos)) .Build()); WriteLog($"Subscribed to topic: {topic} with QoS {qos.ToString()}."); } private MqttQualityOfServiceLevel ConvertQoSLevel(MqttQoSLevel qos) { return (MqttQualityOfServiceLevel)((int)qos); } } }