mulit topic mqtt

main
Peace 10 months ago
parent 21bc527a81
commit 7ec6f7dccc
  1. 12
      MQTTSample/MQTTSample.sln
  2. 14
      MQTTSample/MqttMultiTopicClientApp/MqttMultiTopicClientApp.csproj
  3. 82
      MQTTSample/MqttMultiTopicClientApp/Program.cs
  4. 138
      MQTTSample/PMqttClient/MqttMultiTopicClient.cs
  5. 13
      MQTTSample/PMqttClient/PMqttClient.csproj

@ -9,6 +9,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MqttPublisherApp", "MqttPub
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MqttSubscriberApp", "MqttSubscriberApp\MqttSubscriberApp.csproj", "{B8BC0A58-4F77-4B94-83B4-880C42960EF9}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PMqttClient", "PMqttClient\PMqttClient.csproj", "{6730F21D-830B-4B73-94D2-3077E65A0E4E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MqttMultiTopicClientApp", "MqttMultiTopicClientApp\MqttMultiTopicClientApp.csproj", "{2C2E1F3E-6387-48C0-8C4A-BE0847CD6B32}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -27,6 +31,14 @@ Global
{B8BC0A58-4F77-4B94-83B4-880C42960EF9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B8BC0A58-4F77-4B94-83B4-880C42960EF9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B8BC0A58-4F77-4B94-83B4-880C42960EF9}.Release|Any CPU.Build.0 = Release|Any CPU
{6730F21D-830B-4B73-94D2-3077E65A0E4E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6730F21D-830B-4B73-94D2-3077E65A0E4E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6730F21D-830B-4B73-94D2-3077E65A0E4E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6730F21D-830B-4B73-94D2-3077E65A0E4E}.Release|Any CPU.Build.0 = Release|Any CPU
{2C2E1F3E-6387-48C0-8C4A-BE0847CD6B32}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2C2E1F3E-6387-48C0-8C4A-BE0847CD6B32}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2C2E1F3E-6387-48C0-8C4A-BE0847CD6B32}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2C2E1F3E-6387-48C0-8C4A-BE0847CD6B32}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE

@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\PMqttClient\PMqttClient.csproj" />
</ItemGroup>
</Project>

@ -0,0 +1,82 @@
using PMqttClient;
namespace MqttMultiTopicClientApp
{
internal class Program
{
private static string SERVER_IP = "localhost";
private static readonly int PORT = 1883;
private static MqttMultiTopicClient _mqttClient;
static async Task Main(string[] args)
{
_mqttClient = new MqttMultiTopicClient();
_mqttClient.LogHandler += MqttClient_LogHandler;
_mqttClient.MessageHandler += MqttClient_MessageHandler;
await _mqttClient.AddTopicAsync("home/livingroom/temperature", MqttQoSLevel.AtLeastOnce);
await _mqttClient.ConnectAsync(SERVER_IP, PORT, $"cid_{Random.Shared.Next(1, 1000)}");
await _mqttClient.AddTopicAsync("home/bedroom/humidity", MqttQoSLevel.AtMostOnce);
await _mqttClient.AddTopicAsync("home/kitchen/temperature", MqttQoSLevel.AtMostOnce);
while (true)
{
Console.WriteLine();
Console.WriteLine("----------------------");
Console.WriteLine("Add topic(1)");
Console.WriteLine("Publish message(2)");
Console.WriteLine("Else: Exit");
Console.WriteLine("----------------------");
Console.Write(">");
string command = Console.ReadLine();
await HandleCommand(command);
}
}
private static void MqttClient_MessageHandler(object? sender, string e)
{
Console.WriteLine($"[Message] {e}");
}
private static void MqttClient_LogHandler(object? sender, string e)
{
Console.WriteLine($"[Log] {e}");
}
static async Task HandleCommand(string command)
{
if (command.Contains("1", StringComparison.InvariantCultureIgnoreCase))
{
Console.WriteLine("Topic?");
Console.Write(">");
string topic = Console.ReadLine();
Console.WriteLine("QoS(0-2)");
Console.Write(">");
string qos = Console.ReadLine();
int.TryParse(qos, out int convertedQos);
await _mqttClient.AddTopicAsync(topic, (MqttQoSLevel)convertedQos);
}
else if (command.Contains("2", StringComparison.InvariantCultureIgnoreCase))
{
Console.WriteLine("Topic?");
Console.Write(">");
string topic = Console.ReadLine();
Console.WriteLine("Message?");
Console.Write(">");
string message = Console.ReadLine();
Console.WriteLine("QoS(0-2)");
Console.Write(">");
string qos = Console.ReadLine();
int.TryParse(qos, out int convertedQos);
await _mqttClient.PubishAsync(topic, message, (MqttQoSLevel)convertedQos);
}
else
{
await _mqttClient.DisconnectAsync();
}
}
}
}

@ -0,0 +1,138 @@
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System.Text;
namespace PMqttClient
{
public enum MqttQoSLevel
{
AtMostOnce = 0x00,
AtLeastOnce = 0x01,
ExactlyOnce = 0x02
}
public class MqttMultiTopicClient
{
private IMqttClient _mqttClient;
private Dictionary<string, MqttQoSLevel> _topics;
public event EventHandler<string> LogHandler;
public event EventHandler<string> MessageHandler;
public MqttMultiTopicClient()
{
var mqttFactory = new MqttFactory();
_mqttClient = mqttFactory.CreateMqttClient();
_topics = new Dictionary<string, MqttQoSLevel>();
_mqttClient.ConnectedAsync += MqttClient_ConnectedAsync;
_mqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync;
_mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync;
}
public async Task ConnectAsync(string broker, int port, string clientId)
{
var options = new MqttClientOptionsBuilder()
.WithClientId(clientId)
.WithTcpServer(broker, port)
.Build();
WriteLog($"Try to connect to {broker}:{port} with client id '{clientId}'...");
await _mqttClient.ConnectAsync(options);
}
public async Task AddTopicAsync(string topic, MqttQoSLevel qos)
{
if (_topics.ContainsKey(topic))
{
WriteLog($"topic '{topic}' is added already.");
return;
}
_topics.Add(topic, qos);
if (_mqttClient.IsConnected)
await SubscribeAsync(topic, qos);
WriteLog($"topic '{topic}' is added.");
}
public async Task RemoveTopicAsync(string topic)
{
if (!_topics.ContainsKey(topic))
return;
_topics.Remove(topic);
if (_mqttClient.IsConnected)
await _mqttClient.UnsubscribeAsync(topic);
WriteLog($"Remove topic '{topic}'.");
}
public async Task PubishAsync(string topic, string message, MqttQoSLevel qos)
{
var mqttMessage = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(message)
.WithQualityOfServiceLevel(ConvertQoSLevel(qos))
.Build();
await _mqttClient.PublishAsync(mqttMessage);
WriteLog($"Publish message '{message}' to topic '{topic}' with QoS {qos.ToString()}.");
}
public async Task DisconnectAsync()
{
WriteLog($"Try to disconnect...");
await _mqttClient.DisconnectAsync();
}
private async Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
if (MessageHandler is not null)
{
var message = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment);
MessageHandler.Invoke(this, message);
}
}
private async Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
WriteLog("Disconnected to MQTT Broker.");
}
private async Task MqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
WriteLog("Connected to MQTT Broker.");
foreach (var topic in _topics)
{
await SubscribeAsync(topic.Key, topic.Value);
WriteLog($"Subscribed to topic: {topic.Key} with QoS {topic.Value.ToString()}.");
}
}
private void WriteLog(string message)
{
if (LogHandler is not null)
LogHandler.Invoke(this, message);
}
private async Task SubscribeAsync(string topic, MqttQoSLevel qos)
{
await _mqttClient.SubscribeAsync(new MqttTopicFilterBuilder()
.WithTopic(topic)
.WithQualityOfServiceLevel(ConvertQoSLevel(qos))
.Build());
WriteLog($"Subscribed to topic: {topic} with QoS {qos.ToString()}.");
}
private MqttQualityOfServiceLevel ConvertQoSLevel(MqttQoSLevel qos)
{
return (MqttQualityOfServiceLevel)((int)qos);
}
}
}

@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="MQTTnet" Version="4.3.6.1152" />
</ItemGroup>
</Project>
Loading…
Cancel
Save