Browse Source

change protocol tools to Concurrent Queue, fixed EGMessage Senddelay only enabled in single device

master
jkpete 4 months ago
parent
commit
40493fcb55
  1. 16
      Example/UsingTest/Script/EGSaveTest.cs
  2. 16
      addons/EGFramework/Module/EGMessage.cs
  3. 5
      addons/EGFramework/Module/ProtocolTools/EGBacnet.cs
  4. 5
      addons/EGFramework/Module/ProtocolTools/EGFileStream.cs
  5. 5
      addons/EGFramework/Module/ProtocolTools/EGMQTT.cs
  6. 27
      addons/EGFramework/Module/ProtocolTools/EGModbus.cs
  7. 5
      addons/EGFramework/Module/ProtocolTools/EGProtocolSchedule.cs
  8. 5
      addons/EGFramework/Module/ProtocolTools/EGSerialPort.cs
  9. 5
      addons/EGFramework/Module/ProtocolTools/EGSsh.cs
  10. 5
      addons/EGFramework/Module/ProtocolTools/EGTCPClient.cs
  11. 5
      addons/EGFramework/Module/ProtocolTools/EGTCPServer.cs
  12. 5
      addons/EGFramework/Module/ProtocolTools/EGUDP.cs
  13. 4
      addons/EGFramework/Module/ProtocolTools/ProtocolToolsInterface.cs
  14. 4
      project.godot

16
Example/UsingTest/Script/EGSaveTest.cs

@ -18,17 +18,29 @@ namespace EGFramework.Examples.Test{ @@ -18,17 +18,29 @@ namespace EGFramework.Examples.Test{
//TestCode();
this.EGEnabledProtocolTool<EGSsh>();
this.EGEnabledProtocolTool<EGTCPClient>();
this.EGOnMessage<EasyMessage>();
// this.EGRegisterMessageEvent<EasyMessage>((e,sender)=>{
// });
TestSsh();
//TestSsh();
TestTCPSend();
}
public async void TestSsh(){
await this.EGSsh().ConnectSsh("127.0.0.1","jkpete",new PrivateKeyFile("../../../.ssh/id_ed25519"));
// await this.EGSsh().ConnectSsh("byserver","bytech","bytech");
this.EGSendMessage(new EasyMessage(){sendString = "ls -la"},"127.0.0.1",ProtocolType.SSHClient);
}
public async void TestTCPSend(){
await this.EGTCPClient().ConnectTCP("127.0.0.1",5555);
await this.EGTCPClient().ConnectTCP("127.0.0.1",6666);
this.EGSendMessage(new EasyMessage(){sendString = "ls -la"},"127.0.0.1:5555",ProtocolType.TCPClient);
this.EGSendMessage(new EasyMessage(){sendString = "ls -la"},"127.0.0.1:6666",ProtocolType.TCPClient);
this.EGSendMessage(new EasyMessage(){sendString = "ls -la"},"127.0.0.1:6666",ProtocolType.TCPClient);
this.EGSendMessage(new EasyMessage(){sendString = "ls -la"},"127.0.0.1:6666",ProtocolType.TCPClient);
this.EGSendMessage(new EasyMessage(){sendString = "ls -la"},"127.0.0.1:6666",ProtocolType.TCPClient);
this.EGSendMessage(new EasyMessage(){sendString = "ls -la"},"127.0.0.1:5555",ProtocolType.TCPClient);
}
public async void TestThread(){

16
addons/EGFramework/Module/EGMessage.cs

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Timers;
namespace EGFramework
@ -19,7 +19,7 @@ namespace EGFramework @@ -19,7 +19,7 @@ namespace EGFramework
/// </summary>
/// <value></value>
public int SendDelay { set; get; } = 100;
public Queue<RequestMsgEvent> RequestCache { set; get; } = new Queue<RequestMsgEvent>();
public ConcurrentDictionary<string,ConcurrentQueue<RequestMsgEvent>> RequestCache { set; get; } = new ConcurrentDictionary<string,ConcurrentQueue<RequestMsgEvent>>();
private System.Timers.Timer RequestTimer { set; get; }
public override void Init()
@ -58,7 +58,10 @@ namespace EGFramework @@ -58,7 +58,10 @@ namespace EGFramework
public void SendRequest<TRequest>(TRequest request,string sender,ProtocolType protocolType) where TRequest:IRequest
{
if(SendDelay>0){
RequestCache.Enqueue(new RequestMsgEvent(request,sender,protocolType));
if(!RequestCache.ContainsKey(sender)){
RequestCache[sender] = new ConcurrentQueue<RequestMsgEvent>();
}
RequestCache[sender].Enqueue(new RequestMsgEvent(request,sender,protocolType));
}else{
OnRequest.Invoke(new RequestMsgEvent(request,sender,protocolType));
}
@ -67,8 +70,11 @@ namespace EGFramework @@ -67,8 +70,11 @@ namespace EGFramework
}
private void ExecuteRequest(object source, ElapsedEventArgs e){
if(RequestCache.Count>0){
OnRequest.Invoke(RequestCache.Dequeue());
foreach(ConcurrentQueue<RequestMsgEvent> singleCache in RequestCache.Values){
if(singleCache.Count>0){
singleCache.TryDequeue(out RequestMsgEvent msg);
OnRequest.Invoke(msg);
}
}
}

5
addons/EGFramework/Module/ProtocolTools/EGBacnet.cs

@ -1,4 +1,5 @@ @@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO.BACnet;
using System.Linq;
@ -14,7 +15,7 @@ namespace EGFramework{ @@ -14,7 +15,7 @@ namespace EGFramework{
public Dictionary<uint,BacnetAddress> DevicesList = new Dictionary<uint,BacnetAddress>();
public Encoding StringEncoding { set; get; } = Encoding.ASCII;
public Queue<ResponseMsg> ResponseMsgs { set; get; } = new Queue<ResponseMsg>();
public ConcurrentQueue<ResponseMsg> ResponseMsgs { set; get; } = new ConcurrentQueue<ResponseMsg>();
public void Init()
{
@ -283,7 +284,7 @@ namespace EGFramework{ @@ -283,7 +284,7 @@ namespace EGFramework{
this.StringEncoding = textEncoding;
}
public Queue<ResponseMsg> GetReceivedMsg()
public ConcurrentQueue<ResponseMsg> GetReceivedMsg()
{
return this.ResponseMsgs;
}

5
addons/EGFramework/Module/ProtocolTools/EGFileStream.cs

@ -1,13 +1,14 @@ @@ -1,13 +1,14 @@
using System.Collections.Generic;
using System.Text;
using System.IO;
using System.Collections.Concurrent;
namespace EGFramework{
public class EGFileStream : IEGFramework, IModule,IProtocolSend,IProtocolReceived
{
public Encoding StringEncoding { set; get; } = Encoding.UTF8;
public Queue<ResponseMsg> ResponseMsgs { set; get; } = new Queue<ResponseMsg>();
public ConcurrentQueue<ResponseMsg> ResponseMsgs { set; get; } = new ConcurrentQueue<ResponseMsg>();
public void Init()
{
@ -83,7 +84,7 @@ namespace EGFramework{ @@ -83,7 +84,7 @@ namespace EGFramework{
this.StringEncoding = textEncoding;
}
public Queue<ResponseMsg> GetReceivedMsg()
public ConcurrentQueue<ResponseMsg> GetReceivedMsg()
{
return this.ResponseMsgs;
}

5
addons/EGFramework/Module/ProtocolTools/EGMQTT.cs

@ -1,4 +1,5 @@ @@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
@ -20,7 +21,7 @@ namespace EGFramework{ @@ -20,7 +21,7 @@ namespace EGFramework{
public Encoding StringEncoding { set; get; } = Encoding.UTF8;
public Queue<ResponseMsg> ResponseMsgs { set; get; } = new Queue<ResponseMsg>();
public ConcurrentQueue<ResponseMsg> ResponseMsgs { set; get; } = new ConcurrentQueue<ResponseMsg>();
public EasyEvent<string> OnMqttConnect { set; get; } = new EasyEvent<string>();
@ -143,7 +144,7 @@ namespace EGFramework{ @@ -143,7 +144,7 @@ namespace EGFramework{
{
StringEncoding = textEncoding;
}
public Queue<ResponseMsg> GetReceivedMsg()
public ConcurrentQueue<ResponseMsg> GetReceivedMsg()
{
return ResponseMsgs;
}

27
addons/EGFramework/Module/ProtocolTools/EGModbus.cs

@ -1,4 +1,5 @@ @@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
@ -8,16 +9,16 @@ namespace EGFramework{ @@ -8,16 +9,16 @@ namespace EGFramework{
/// </summary>
public class EGModbus : IEGFramework, IModule
{
public Queue<ModbusRTU_Response> RTUCache = new Queue<ModbusRTU_Response>();
public Queue<ModbusTCP_Response> TCPCache = new Queue<ModbusTCP_Response>();
public ConcurrentQueue<ModbusRTU_Response?> RTUCache = new ConcurrentQueue<ModbusRTU_Response?>();
public ConcurrentQueue<ModbusTCP_Response?> TCPCache = new ConcurrentQueue<ModbusTCP_Response?>();
public int Delay = 2000;
public Queue<int> WaitForSendRTU = new Queue<int>();
public ConcurrentQueue<int> WaitForSendRTU = new ConcurrentQueue<int>();
public int NextSendRTU = 0;
public int SendPointerRTU = 1;
public Queue<int> WaitForSendTCP = new Queue<int>();
public ConcurrentQueue<int> WaitForSendTCP = new ConcurrentQueue<int>();
public int NextSendTCP = 0;
public int SendPointerTCP = 1;
@ -90,7 +91,7 @@ namespace EGFramework{ @@ -90,7 +91,7 @@ namespace EGFramework{
timeout+=10;
}
if(RTUCache.Count>0){
res = RTUCache.Dequeue();
RTUCache.TryDequeue(out res);
}else{
//Print Error Timeout
OnReadTimeOut.Invoke();
@ -99,7 +100,7 @@ namespace EGFramework{ @@ -99,7 +100,7 @@ namespace EGFramework{
this.EGSerialPort().ClearReceivedCache(serialPort);
IsRequestRTU = false;
if(this.WaitForSendRTU.Count>0){
NextSendRTU = this.WaitForSendRTU.Dequeue();
this.WaitForSendRTU.TryDequeue(out NextSendRTU);
}
return res;
}
@ -143,7 +144,7 @@ namespace EGFramework{ @@ -143,7 +144,7 @@ namespace EGFramework{
timeout+=10;
}
if(RTUCache.Count>0){
res = RTUCache.Dequeue();
RTUCache.TryDequeue(out res);
}else{
//Print Error Timeout
OnReadTimeOut.Invoke();
@ -152,7 +153,7 @@ namespace EGFramework{ @@ -152,7 +153,7 @@ namespace EGFramework{
this.EGSerialPort().ClearReceivedCache(serialPort);
IsRequestRTU = false;
if(this.WaitForSendRTU.Count>0){
NextSendRTU = this.WaitForSendRTU.Dequeue();
this.WaitForSendRTU.TryDequeue(out NextSendRTU);
}
return res;
}
@ -195,7 +196,7 @@ namespace EGFramework{ @@ -195,7 +196,7 @@ namespace EGFramework{
timeout += 10;
}
if(TCPCache.Count>0){
res = TCPCache.Dequeue();
TCPCache.TryDequeue(out res);
}else{
//Print Error Timeout
OnReadTimeOut.Invoke();
@ -203,7 +204,7 @@ namespace EGFramework{ @@ -203,7 +204,7 @@ namespace EGFramework{
});
IsRequestTCP = false;
if(this.WaitForSendTCP.Count>0){
NextSendTCP = this.WaitForSendTCP.Dequeue();
this.WaitForSendTCP.TryDequeue(out NextSendTCP);
}
return res;
}
@ -250,7 +251,7 @@ namespace EGFramework{ @@ -250,7 +251,7 @@ namespace EGFramework{
timeout+=10;
}
if(TCPCache.Count>0){
res = TCPCache.Dequeue();
TCPCache.TryDequeue(out res);
}else{
//Print Error Timeout
OnReadTimeOut.Invoke();
@ -258,10 +259,10 @@ namespace EGFramework{ @@ -258,10 +259,10 @@ namespace EGFramework{
});
IsRequestTCP = false;
if(this.WaitForSendTCP.Count>0){
NextSendTCP = this.WaitForSendTCP.Dequeue();
this.WaitForSendTCP.TryDequeue(out NextSendTCP);
}
if(this.WaitForSendTCP.Count>0){
NextSendTCP = this.WaitForSendTCP.Dequeue();
this.WaitForSendTCP.TryDequeue(out NextSendTCP);
}
return res;
}

5
addons/EGFramework/Module/ProtocolTools/EGProtocolSchedule.cs

@ -9,7 +9,10 @@ namespace EGFramework{ @@ -9,7 +9,10 @@ namespace EGFramework{
public void CheckedProcess(){
foreach(IProtocolReceived tool in ProtocolTools.Values){
if(tool.GetReceivedMsg().Count>0){
this.GetModule<EGMessage>().OnDataReceived.Invoke(tool.GetReceivedMsg().Dequeue());
bool isDequeue = tool.GetReceivedMsg().TryDequeue(out ResponseMsg msg);
if(isDequeue){
this.GetModule<EGMessage>().OnDataReceived.Invoke(msg);
}
}
}
}

5
addons/EGFramework/Module/ProtocolTools/EGSerialPort.cs

@ -3,6 +3,7 @@ using System; @@ -3,6 +3,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Collections.Concurrent;
namespace EGFramework{
public class EGSerialPort : IModule,IEGFramework,IProtocolSend,IProtocolReceived
@ -21,7 +22,7 @@ namespace EGFramework{ @@ -21,7 +22,7 @@ namespace EGFramework{
public Encoding StringEncoding { set; get; } = Encoding.UTF8;
public Queue<ResponseMsg> ResponseMsgs { set; get; } = new Queue<ResponseMsg>();
public ConcurrentQueue<ResponseMsg> ResponseMsgs { set; get; } = new ConcurrentQueue<ResponseMsg>();
public Dictionary<string,byte[]> ReceivedCache { set; get; } = new Dictionary<string,byte[]>();
@ -39,7 +40,7 @@ namespace EGFramework{ @@ -39,7 +40,7 @@ namespace EGFramework{
});
}
public Queue<ResponseMsg> GetReceivedMsg()
public ConcurrentQueue<ResponseMsg> GetReceivedMsg()
{
return this.ResponseMsgs;
}

5
addons/EGFramework/Module/ProtocolTools/EGSsh.cs

@ -1,5 +1,6 @@ @@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@ -14,7 +15,7 @@ namespace EGFramework{ @@ -14,7 +15,7 @@ namespace EGFramework{
public Encoding StringEncoding { set; get; } = Encoding.UTF8;
public Queue<ResponseMsg> ResponseMsgs { set; get; } = new Queue<ResponseMsg>();
public ConcurrentQueue<ResponseMsg> ResponseMsgs { set; get; } = new ConcurrentQueue<ResponseMsg>();
public int TimeOutDelay = 5000;
public void Init()
@ -124,7 +125,7 @@ namespace EGFramework{ @@ -124,7 +125,7 @@ namespace EGFramework{
}
}
public Queue<ResponseMsg> GetReceivedMsg()
public ConcurrentQueue<ResponseMsg> GetReceivedMsg()
{
return ResponseMsgs;
}

5
addons/EGFramework/Module/ProtocolTools/EGTCPClient.cs

@ -1,5 +1,6 @@ @@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Text;
using System.Net.Sockets;
using System.Threading.Tasks;
@ -14,7 +15,7 @@ namespace EGFramework{ @@ -14,7 +15,7 @@ namespace EGFramework{
public Encoding StringEncoding { set; get; } = Encoding.UTF8;
public Queue<ResponseMsg> ResponseMsgs { set; get; } = new Queue<ResponseMsg>();
public ConcurrentQueue<ResponseMsg> ResponseMsgs { set; get; } = new ConcurrentQueue<ResponseMsg>();
public void Init()
{
@ -107,7 +108,7 @@ namespace EGFramework{ @@ -107,7 +108,7 @@ namespace EGFramework{
DisconnectTCP(host,port);
}
public Queue<ResponseMsg> GetReceivedMsg()
public ConcurrentQueue<ResponseMsg> GetReceivedMsg()
{
return ResponseMsgs;
}

5
addons/EGFramework/Module/ProtocolTools/EGTCPServer.cs

@ -1,5 +1,6 @@ @@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Text;
@ -18,7 +19,7 @@ namespace EGFramework{ @@ -18,7 +19,7 @@ namespace EGFramework{
public EasyEvent<string> OnClientConnect { set; get; } = new EasyEvent<string>();
public EasyEvent<string> OnClientDisconnect { set; get; } = new EasyEvent<string>();
public Queue<ResponseMsg> ResponseMsgs { set; get; } = new Queue<ResponseMsg>();
public ConcurrentQueue<ResponseMsg> ResponseMsgs { set; get; } = new ConcurrentQueue<ResponseMsg>();
public string ErrorLogs { set; get; }
public void Init()
@ -35,7 +36,7 @@ namespace EGFramework{ @@ -35,7 +36,7 @@ namespace EGFramework{
});
}
public Queue<ResponseMsg> GetReceivedMsg()
public ConcurrentQueue<ResponseMsg> GetReceivedMsg()
{
return ResponseMsgs;
}

5
addons/EGFramework/Module/ProtocolTools/EGUDP.cs

@ -1,4 +1,5 @@ @@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
@ -11,7 +12,7 @@ namespace EGFramework{ @@ -11,7 +12,7 @@ namespace EGFramework{
public Encoding StringEncoding { set; get; } = Encoding.UTF8;
public Queue<ResponseMsg> ResponseMsgs { set; get; } = new Queue<ResponseMsg>();
public ConcurrentQueue<ResponseMsg> ResponseMsgs { set; get; } = new ConcurrentQueue<ResponseMsg>();
public void Init()
{
@ -114,7 +115,7 @@ namespace EGFramework{ @@ -114,7 +115,7 @@ namespace EGFramework{
return EGArchitectureImplement.Interface;
}
public Queue<ResponseMsg> GetReceivedMsg()
public ConcurrentQueue<ResponseMsg> GetReceivedMsg()
{
return ResponseMsgs;
}

4
addons/EGFramework/Module/ProtocolTools/ProtocolToolsInterface.cs

@ -1,4 +1,4 @@ @@ -1,4 +1,4 @@
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Text;
namespace EGFramework{
@ -8,7 +8,7 @@ namespace EGFramework{ @@ -8,7 +8,7 @@ namespace EGFramework{
public void SetEncoding(Encoding textEncoding);
}
public interface IProtocolReceived{
public Queue<ResponseMsg> GetReceivedMsg();
public ConcurrentQueue<ResponseMsg> GetReceivedMsg();
}
public interface IProtocolListener{

4
project.godot

@ -18,8 +18,8 @@ config/icon="res://icon.svg" @@ -18,8 +18,8 @@ config/icon="res://icon.svg"
[display]
window/size/viewport_width=1920
window/size/viewport_height=1080
window/size/viewport_width=1600
window/size/viewport_height=900
[dotnet]

Loading…
Cancel
Save