From 7ec6f7dccc474f5796bc047a96aae49b5ded727d Mon Sep 17 00:00:00 2001 From: Peace Date: Fri, 9 Aug 2024 14:32:22 +0900 Subject: [PATCH] mulit topic mqtt --- MQTTSample/MQTTSample.sln | 12 ++ .../MqttMultiTopicClientApp.csproj | 14 ++ MQTTSample/MqttMultiTopicClientApp/Program.cs | 82 +++++++++++ .../PMqttClient/MqttMultiTopicClient.cs | 138 ++++++++++++++++++ MQTTSample/PMqttClient/PMqttClient.csproj | 13 ++ 5 files changed, 259 insertions(+) create mode 100644 MQTTSample/MqttMultiTopicClientApp/MqttMultiTopicClientApp.csproj create mode 100644 MQTTSample/MqttMultiTopicClientApp/Program.cs create mode 100644 MQTTSample/PMqttClient/MqttMultiTopicClient.cs create mode 100644 MQTTSample/PMqttClient/PMqttClient.csproj diff --git a/MQTTSample/MQTTSample.sln b/MQTTSample/MQTTSample.sln index 0a8a0e8..4c322cf 100644 --- a/MQTTSample/MQTTSample.sln +++ b/MQTTSample/MQTTSample.sln @@ -9,6 +9,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MqttPublisherApp", "MqttPub EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MqttSubscriberApp", "MqttSubscriberApp\MqttSubscriberApp.csproj", "{B8BC0A58-4F77-4B94-83B4-880C42960EF9}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PMqttClient", "PMqttClient\PMqttClient.csproj", "{6730F21D-830B-4B73-94D2-3077E65A0E4E}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MqttMultiTopicClientApp", "MqttMultiTopicClientApp\MqttMultiTopicClientApp.csproj", "{2C2E1F3E-6387-48C0-8C4A-BE0847CD6B32}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -27,6 +31,14 @@ Global {B8BC0A58-4F77-4B94-83B4-880C42960EF9}.Debug|Any CPU.Build.0 = Debug|Any CPU {B8BC0A58-4F77-4B94-83B4-880C42960EF9}.Release|Any CPU.ActiveCfg = Release|Any CPU {B8BC0A58-4F77-4B94-83B4-880C42960EF9}.Release|Any CPU.Build.0 = Release|Any CPU + {6730F21D-830B-4B73-94D2-3077E65A0E4E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6730F21D-830B-4B73-94D2-3077E65A0E4E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6730F21D-830B-4B73-94D2-3077E65A0E4E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6730F21D-830B-4B73-94D2-3077E65A0E4E}.Release|Any CPU.Build.0 = Release|Any CPU + {2C2E1F3E-6387-48C0-8C4A-BE0847CD6B32}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {2C2E1F3E-6387-48C0-8C4A-BE0847CD6B32}.Debug|Any CPU.Build.0 = Debug|Any CPU + {2C2E1F3E-6387-48C0-8C4A-BE0847CD6B32}.Release|Any CPU.ActiveCfg = Release|Any CPU + {2C2E1F3E-6387-48C0-8C4A-BE0847CD6B32}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/MQTTSample/MqttMultiTopicClientApp/MqttMultiTopicClientApp.csproj b/MQTTSample/MqttMultiTopicClientApp/MqttMultiTopicClientApp.csproj new file mode 100644 index 0000000..b6b7d65 --- /dev/null +++ b/MQTTSample/MqttMultiTopicClientApp/MqttMultiTopicClientApp.csproj @@ -0,0 +1,14 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + diff --git a/MQTTSample/MqttMultiTopicClientApp/Program.cs b/MQTTSample/MqttMultiTopicClientApp/Program.cs new file mode 100644 index 0000000..f8e81cf --- /dev/null +++ b/MQTTSample/MqttMultiTopicClientApp/Program.cs @@ -0,0 +1,82 @@ +using PMqttClient; + +namespace MqttMultiTopicClientApp +{ + internal class Program + { + private static string SERVER_IP = "localhost"; + private static readonly int PORT = 1883; + + private static MqttMultiTopicClient _mqttClient; + + static async Task Main(string[] args) + { + _mqttClient = new MqttMultiTopicClient(); + _mqttClient.LogHandler += MqttClient_LogHandler; + _mqttClient.MessageHandler += MqttClient_MessageHandler; + + await _mqttClient.AddTopicAsync("home/livingroom/temperature", MqttQoSLevel.AtLeastOnce); + + await _mqttClient.ConnectAsync(SERVER_IP, PORT, $"cid_{Random.Shared.Next(1, 1000)}"); + + await _mqttClient.AddTopicAsync("home/bedroom/humidity", MqttQoSLevel.AtMostOnce); + await _mqttClient.AddTopicAsync("home/kitchen/temperature", MqttQoSLevel.AtMostOnce); + + while (true) + { + Console.WriteLine(); + Console.WriteLine("----------------------"); + Console.WriteLine("Add topic(1)"); + Console.WriteLine("Publish message(2)"); + Console.WriteLine("Else: Exit"); + Console.WriteLine("----------------------"); + Console.Write(">"); + string command = Console.ReadLine(); + await HandleCommand(command); + } + } + + private static void MqttClient_MessageHandler(object? sender, string e) + { + Console.WriteLine($"[Message] {e}"); + } + + private static void MqttClient_LogHandler(object? sender, string e) + { + Console.WriteLine($"[Log] {e}"); + } + + static async Task HandleCommand(string command) + { + if (command.Contains("1", StringComparison.InvariantCultureIgnoreCase)) + { + Console.WriteLine("Topic?"); + Console.Write(">"); + string topic = Console.ReadLine(); + Console.WriteLine("QoS(0-2)"); + Console.Write(">"); + string qos = Console.ReadLine(); + int.TryParse(qos, out int convertedQos); + await _mqttClient.AddTopicAsync(topic, (MqttQoSLevel)convertedQos); + } + else if (command.Contains("2", StringComparison.InvariantCultureIgnoreCase)) + { + Console.WriteLine("Topic?"); + Console.Write(">"); + string topic = Console.ReadLine(); + Console.WriteLine("Message?"); + Console.Write(">"); + string message = Console.ReadLine(); + Console.WriteLine("QoS(0-2)"); + Console.Write(">"); + string qos = Console.ReadLine(); + int.TryParse(qos, out int convertedQos); + await _mqttClient.PubishAsync(topic, message, (MqttQoSLevel)convertedQos); + } + else + { + await _mqttClient.DisconnectAsync(); + } + } + } +} diff --git a/MQTTSample/PMqttClient/MqttMultiTopicClient.cs b/MQTTSample/PMqttClient/MqttMultiTopicClient.cs new file mode 100644 index 0000000..da99d75 --- /dev/null +++ b/MQTTSample/PMqttClient/MqttMultiTopicClient.cs @@ -0,0 +1,138 @@ +using MQTTnet; +using MQTTnet.Client; +using MQTTnet.Protocol; +using System.Text; + +namespace PMqttClient +{ + public enum MqttQoSLevel + { + AtMostOnce = 0x00, + AtLeastOnce = 0x01, + ExactlyOnce = 0x02 + } + + public class MqttMultiTopicClient + { + private IMqttClient _mqttClient; + private Dictionary _topics; + + public event EventHandler LogHandler; + public event EventHandler MessageHandler; + + public MqttMultiTopicClient() + { + var mqttFactory = new MqttFactory(); + _mqttClient = mqttFactory.CreateMqttClient(); + _topics = new Dictionary(); + + _mqttClient.ConnectedAsync += MqttClient_ConnectedAsync; + _mqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync; + _mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync; + } + + public async Task ConnectAsync(string broker, int port, string clientId) + { + var options = new MqttClientOptionsBuilder() + .WithClientId(clientId) + .WithTcpServer(broker, port) + .Build(); + + WriteLog($"Try to connect to {broker}:{port} with client id '{clientId}'..."); + await _mqttClient.ConnectAsync(options); + } + + public async Task AddTopicAsync(string topic, MqttQoSLevel qos) + { + if (_topics.ContainsKey(topic)) + { + WriteLog($"topic '{topic}' is added already."); + return; + } + + _topics.Add(topic, qos); + if (_mqttClient.IsConnected) + await SubscribeAsync(topic, qos); + + WriteLog($"topic '{topic}' is added."); + } + + public async Task RemoveTopicAsync(string topic) + { + if (!_topics.ContainsKey(topic)) + return; + + _topics.Remove(topic); + if (_mqttClient.IsConnected) + await _mqttClient.UnsubscribeAsync(topic); + + WriteLog($"Remove topic '{topic}'."); + } + + public async Task PubishAsync(string topic, string message, MqttQoSLevel qos) + { + var mqttMessage = new MqttApplicationMessageBuilder() + .WithTopic(topic) + .WithPayload(message) + .WithQualityOfServiceLevel(ConvertQoSLevel(qos)) + .Build(); + + await _mqttClient.PublishAsync(mqttMessage); + + WriteLog($"Publish message '{message}' to topic '{topic}' with QoS {qos.ToString()}."); + } + + public async Task DisconnectAsync() + { + WriteLog($"Try to disconnect..."); + await _mqttClient.DisconnectAsync(); + } + + private async Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg) + { + if (MessageHandler is not null) + { + var message = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment); + MessageHandler.Invoke(this, message); + } + } + + private async Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg) + { + WriteLog("Disconnected to MQTT Broker."); + } + + private async Task MqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg) + { + WriteLog("Connected to MQTT Broker."); + + foreach (var topic in _topics) + { + await SubscribeAsync(topic.Key, topic.Value); + + WriteLog($"Subscribed to topic: {topic.Key} with QoS {topic.Value.ToString()}."); + } + } + + private void WriteLog(string message) + { + if (LogHandler is not null) + LogHandler.Invoke(this, message); + } + + private async Task SubscribeAsync(string topic, MqttQoSLevel qos) + { + await _mqttClient.SubscribeAsync(new MqttTopicFilterBuilder() + .WithTopic(topic) + .WithQualityOfServiceLevel(ConvertQoSLevel(qos)) + .Build()); + + WriteLog($"Subscribed to topic: {topic} with QoS {qos.ToString()}."); + } + + private MqttQualityOfServiceLevel ConvertQoSLevel(MqttQoSLevel qos) + { + return (MqttQualityOfServiceLevel)((int)qos); + } + } +} diff --git a/MQTTSample/PMqttClient/PMqttClient.csproj b/MQTTSample/PMqttClient/PMqttClient.csproj new file mode 100644 index 0000000..0b71180 --- /dev/null +++ b/MQTTSample/PMqttClient/PMqttClient.csproj @@ -0,0 +1,13 @@ + + + + net8.0 + enable + enable + + + + + + +