靶机服务端(适用于Linux系统控制靶机的情况)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

163 lines
7.2 KiB

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
//Important:
//This EGModule implement by the nuget package MQTTnet:
//the project url is: https://github.com/dotnet/MQTTnet
//license is : https://github.com/dotnet/MQTTnet/blob/master/LICENSE by MIT license
namespace EGFramework{
public class EGMqtt : IEGFramework, IModule, IProtocolSend, IProtocolReceived
{
public MqttFactory MqttFactory = new MqttFactory();
public Dictionary<string,IMqttClient> MqttDevices { set; get; } = new Dictionary<string, IMqttClient>();
public Encoding StringEncoding { set; get; } = Encoding.UTF8;
public ConcurrentQueue<ResponseMsg> ResponseMsgs { set; get; } = new ConcurrentQueue<ResponseMsg>();
public EasyEvent<string> OnMqttConnect { set; get; } = new EasyEvent<string>();
public void Init()
{
this.EGRegisterSendAction(request=>{
if(request.protocolType == ProtocolType.MQTTClient){
if(request.req.ToProtocolData() != null && request.req.ToProtocolData() != ""){
this.SendStringData(request.sender.GetStrFrontSymbol('|'),request.sender.GetStrBehindSymbol('|'),request.req.ToProtocolData());
}
if(request.req.ToProtocolByteData() != null && request.req.ToProtocolByteData().Length > 0){
this.SendByteData(request.sender.GetStrFrontSymbol('|'),request.sender.GetStrBehindSymbol('|'),request.req.ToProtocolByteData());
}
}
});
}
public async void ConnectMQTTServer(string serverURL){
if(!MqttDevices.ContainsKey(serverURL)){
IMqttClient mqttClient = MqttFactory.CreateMqttClient();
var mqttClientOptions = new MqttClientOptionsBuilder().WithTcpServer(serverURL).Build();
mqttClient.ApplicationMessageReceivedAsync += e =>
{
byte[] receivedBytes = e.ApplicationMessage.PayloadSegment.ToArray();
ResponseMsgs.Enqueue(new ResponseMsg(StringEncoding.GetString(receivedBytes),receivedBytes,serverURL + "|" + e.ApplicationMessage.Topic,ProtocolType.MQTTClient));
//GD.Print(e.ApplicationMessage.Topic+":"+e.ApplicationMessage.PayloadSegment.ToArray().ToStringByHex());
return Task.CompletedTask;
};
await mqttClient.ConnectAsync(mqttClientOptions,CancellationToken.None);
MqttDevices.Add(serverURL,mqttClient);
EG.Print("Success Connect!"+MqttDevices[serverURL].IsConnected);
OnMqttConnect.Invoke(serverURL);
}else{
if(!MqttDevices[serverURL].IsConnected){
var mqttClientOptions = new MqttClientOptionsBuilder().WithTcpServer(serverURL).Build();
await MqttDevices[serverURL].ConnectAsync(mqttClientOptions,CancellationToken.None);
EG.Print("Success Connect!"+MqttDevices[serverURL].IsConnected);
OnMqttConnect.Invoke(serverURL);
}else{
EG.Print("Server has been Connected"+MqttDevices[serverURL].IsConnected);
OnMqttConnect.Invoke(serverURL);
}
}
}
public async void DisconnectMQTTServer(string serverURL){
if(MqttDevices.ContainsKey(serverURL) && MqttDevices[serverURL].IsConnected){
await MqttDevices[serverURL].DisconnectAsync(new MqttClientDisconnectOptionsBuilder().WithReason(MqttClientDisconnectOptionsReason.NormalDisconnection).Build());
}else{
EG.Print("Not connect");
}
}
public async void SubScribeTheme(string serverURL,string Theme){
MqttClientSubscribeOptions mqttSubscribeOptions = MqttFactory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(
f =>
{
f.WithTopic(Theme);
})
.Build();
if(MqttDevices.ContainsKey(serverURL) && MqttDevices[serverURL].IsConnected){
await MqttDevices[serverURL].SubscribeAsync(mqttSubscribeOptions,CancellationToken.None);
EG.Print("Subscribe "+Theme+" success!");
}else{
EG.Print("Not connect");
}
}
public async void UnSubScribeTheme(string serverURL,string Theme){
MqttClientUnsubscribeOptions mqttUnSubscribeOptions = MqttFactory.CreateUnsubscribeOptionsBuilder()
.WithTopicFilter(Theme)
.Build();
if(MqttDevices.ContainsKey(serverURL) && MqttDevices[serverURL].IsConnected){
await MqttDevices[serverURL].UnsubscribeAsync(mqttUnSubscribeOptions,CancellationToken.None);
}
}
public async void PublishTheme(string serverURL,string Theme,string Data){
var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic(Theme)
.WithPayload(Data)
.Build();
if(MqttDevices.ContainsKey(serverURL) && MqttDevices[serverURL].IsConnected){
await MqttDevices[serverURL].PublishAsync(applicationMessage, CancellationToken.None);
}
}
public async void PublishTheme(string serverURL,string Theme,byte[] Data){
var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic(Theme)
.WithPayload(Data)
.Build();
if(MqttDevices.ContainsKey(serverURL) && MqttDevices[serverURL].IsConnected){
await MqttDevices[serverURL].PublishAsync(applicationMessage, CancellationToken.None);
EG.Print("publish success!");
}
}
public void SendByteData(string serverURL,string theme, byte[] data)
{
this.PublishTheme(serverURL,theme,data);
}
public void SendByteData(string destination, byte[] data)
{
this.SendByteData(destination.GetStrFrontSymbol('|'),destination.GetStrBehindSymbol('|'),data);
}
public void SendStringData(string serverURL,string theme, string data)
{
this.PublishTheme(serverURL,theme,data);
}
public void SendStringData(string destination, string data)
{
this.SendStringData(destination.GetStrFrontSymbol('|'),destination.GetStrBehindSymbol('|'),data);
}
public void SetEncoding(Encoding textEncoding)
{
StringEncoding = textEncoding;
}
public ConcurrentQueue<ResponseMsg> GetReceivedMsg()
{
return ResponseMsgs;
}
public IArchitecture GetArchitecture()
{
return EGArchitectureImplement.Interface;
}
}
public static class CanGetEGMqttExtension{
public static EGMqtt EGMqtt(this IEGFramework self){
return self.GetModule<EGMqtt>();
}
}
}