You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
163 lines
7.2 KiB
163 lines
7.2 KiB
|
4 weeks ago
|
using System;
|
||
|
|
using System.Collections.Concurrent;
|
||
|
|
using System.Collections.Generic;
|
||
|
|
using System.Net;
|
||
|
|
using System.Net.Sockets;
|
||
|
|
using System.Text;
|
||
|
|
using System.Threading;
|
||
|
|
using System.Threading.Tasks;
|
||
|
|
using MQTTnet;
|
||
|
|
using MQTTnet.Client;
|
||
|
|
|
||
|
|
//Important:
|
||
|
|
//This EGModule implement by the nuget package MQTTnet:
|
||
|
|
//the project url is: https://github.com/dotnet/MQTTnet
|
||
|
|
//license is : https://github.com/dotnet/MQTTnet/blob/master/LICENSE by MIT license
|
||
|
|
namespace EGFramework{
|
||
|
|
public class EGMqtt : IEGFramework, IModule, IProtocolSend, IProtocolReceived
|
||
|
|
{
|
||
|
|
public MqttFactory MqttFactory = new MqttFactory();
|
||
|
|
public Dictionary<string,IMqttClient> MqttDevices { set; get; } = new Dictionary<string, IMqttClient>();
|
||
|
|
|
||
|
|
public Encoding StringEncoding { set; get; } = Encoding.UTF8;
|
||
|
|
|
||
|
|
public ConcurrentQueue<ResponseMsg> ResponseMsgs { set; get; } = new ConcurrentQueue<ResponseMsg>();
|
||
|
|
|
||
|
|
public EasyEvent<string> OnMqttConnect { set; get; } = new EasyEvent<string>();
|
||
|
|
|
||
|
|
|
||
|
|
public void Init()
|
||
|
|
{
|
||
|
|
this.EGRegisterSendAction(request=>{
|
||
|
|
if(request.protocolType == ProtocolType.MQTTClient){
|
||
|
|
if(request.req.ToProtocolData() != null && request.req.ToProtocolData() != ""){
|
||
|
|
this.SendStringData(request.sender.GetStrFrontSymbol('|'),request.sender.GetStrBehindSymbol('|'),request.req.ToProtocolData());
|
||
|
|
}
|
||
|
|
if(request.req.ToProtocolByteData() != null && request.req.ToProtocolByteData().Length > 0){
|
||
|
|
this.SendByteData(request.sender.GetStrFrontSymbol('|'),request.sender.GetStrBehindSymbol('|'),request.req.ToProtocolByteData());
|
||
|
|
}
|
||
|
|
}
|
||
|
|
});
|
||
|
|
}
|
||
|
|
|
||
|
|
public async void ConnectMQTTServer(string serverURL){
|
||
|
|
if(!MqttDevices.ContainsKey(serverURL)){
|
||
|
|
IMqttClient mqttClient = MqttFactory.CreateMqttClient();
|
||
|
|
var mqttClientOptions = new MqttClientOptionsBuilder().WithTcpServer(serverURL).Build();
|
||
|
|
mqttClient.ApplicationMessageReceivedAsync += e =>
|
||
|
|
{
|
||
|
|
byte[] receivedBytes = e.ApplicationMessage.PayloadSegment.ToArray();
|
||
|
|
ResponseMsgs.Enqueue(new ResponseMsg(StringEncoding.GetString(receivedBytes),receivedBytes,serverURL + "|" + e.ApplicationMessage.Topic,ProtocolType.MQTTClient));
|
||
|
|
//GD.Print(e.ApplicationMessage.Topic+":"+e.ApplicationMessage.PayloadSegment.ToArray().ToStringByHex());
|
||
|
|
return Task.CompletedTask;
|
||
|
|
};
|
||
|
|
await mqttClient.ConnectAsync(mqttClientOptions,CancellationToken.None);
|
||
|
|
MqttDevices.Add(serverURL,mqttClient);
|
||
|
|
EG.Print("Success Connect!"+MqttDevices[serverURL].IsConnected);
|
||
|
|
OnMqttConnect.Invoke(serverURL);
|
||
|
|
}else{
|
||
|
|
if(!MqttDevices[serverURL].IsConnected){
|
||
|
|
var mqttClientOptions = new MqttClientOptionsBuilder().WithTcpServer(serverURL).Build();
|
||
|
|
await MqttDevices[serverURL].ConnectAsync(mqttClientOptions,CancellationToken.None);
|
||
|
|
EG.Print("Success Connect!"+MqttDevices[serverURL].IsConnected);
|
||
|
|
OnMqttConnect.Invoke(serverURL);
|
||
|
|
}else{
|
||
|
|
EG.Print("Server has been Connected"+MqttDevices[serverURL].IsConnected);
|
||
|
|
OnMqttConnect.Invoke(serverURL);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
public async void DisconnectMQTTServer(string serverURL){
|
||
|
|
if(MqttDevices.ContainsKey(serverURL) && MqttDevices[serverURL].IsConnected){
|
||
|
|
await MqttDevices[serverURL].DisconnectAsync(new MqttClientDisconnectOptionsBuilder().WithReason(MqttClientDisconnectOptionsReason.NormalDisconnection).Build());
|
||
|
|
}else{
|
||
|
|
EG.Print("Not connect");
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
public async void SubScribeTheme(string serverURL,string Theme){
|
||
|
|
MqttClientSubscribeOptions mqttSubscribeOptions = MqttFactory.CreateSubscribeOptionsBuilder()
|
||
|
|
.WithTopicFilter(
|
||
|
|
f =>
|
||
|
|
{
|
||
|
|
f.WithTopic(Theme);
|
||
|
|
})
|
||
|
|
.Build();
|
||
|
|
if(MqttDevices.ContainsKey(serverURL) && MqttDevices[serverURL].IsConnected){
|
||
|
|
await MqttDevices[serverURL].SubscribeAsync(mqttSubscribeOptions,CancellationToken.None);
|
||
|
|
EG.Print("Subscribe "+Theme+" success!");
|
||
|
|
}else{
|
||
|
|
EG.Print("Not connect");
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
public async void UnSubScribeTheme(string serverURL,string Theme){
|
||
|
|
MqttClientUnsubscribeOptions mqttUnSubscribeOptions = MqttFactory.CreateUnsubscribeOptionsBuilder()
|
||
|
|
.WithTopicFilter(Theme)
|
||
|
|
.Build();
|
||
|
|
if(MqttDevices.ContainsKey(serverURL) && MqttDevices[serverURL].IsConnected){
|
||
|
|
await MqttDevices[serverURL].UnsubscribeAsync(mqttUnSubscribeOptions,CancellationToken.None);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
public async void PublishTheme(string serverURL,string Theme,string Data){
|
||
|
|
var applicationMessage = new MqttApplicationMessageBuilder()
|
||
|
|
.WithTopic(Theme)
|
||
|
|
.WithPayload(Data)
|
||
|
|
.Build();
|
||
|
|
if(MqttDevices.ContainsKey(serverURL) && MqttDevices[serverURL].IsConnected){
|
||
|
|
await MqttDevices[serverURL].PublishAsync(applicationMessage, CancellationToken.None);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
public async void PublishTheme(string serverURL,string Theme,byte[] Data){
|
||
|
|
var applicationMessage = new MqttApplicationMessageBuilder()
|
||
|
|
.WithTopic(Theme)
|
||
|
|
.WithPayload(Data)
|
||
|
|
.Build();
|
||
|
|
if(MqttDevices.ContainsKey(serverURL) && MqttDevices[serverURL].IsConnected){
|
||
|
|
await MqttDevices[serverURL].PublishAsync(applicationMessage, CancellationToken.None);
|
||
|
|
EG.Print("publish success!");
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
public void SendByteData(string serverURL,string theme, byte[] data)
|
||
|
|
{
|
||
|
|
this.PublishTheme(serverURL,theme,data);
|
||
|
|
}
|
||
|
|
public void SendByteData(string destination, byte[] data)
|
||
|
|
{
|
||
|
|
this.SendByteData(destination.GetStrFrontSymbol('|'),destination.GetStrBehindSymbol('|'),data);
|
||
|
|
}
|
||
|
|
|
||
|
|
public void SendStringData(string serverURL,string theme, string data)
|
||
|
|
{
|
||
|
|
this.PublishTheme(serverURL,theme,data);
|
||
|
|
}
|
||
|
|
public void SendStringData(string destination, string data)
|
||
|
|
{
|
||
|
|
this.SendStringData(destination.GetStrFrontSymbol('|'),destination.GetStrBehindSymbol('|'),data);
|
||
|
|
}
|
||
|
|
|
||
|
|
public void SetEncoding(Encoding textEncoding)
|
||
|
|
{
|
||
|
|
StringEncoding = textEncoding;
|
||
|
|
}
|
||
|
|
public ConcurrentQueue<ResponseMsg> GetReceivedMsg()
|
||
|
|
{
|
||
|
|
return ResponseMsgs;
|
||
|
|
}
|
||
|
|
|
||
|
|
public IArchitecture GetArchitecture()
|
||
|
|
{
|
||
|
|
return EGArchitectureImplement.Interface;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
public static class CanGetEGMqttExtension{
|
||
|
|
public static EGMqtt EGMqtt(this IEGFramework self){
|
||
|
|
return self.GetModule<EGMqtt>();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|