You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

224 lines
7.3 KiB
C#

using HslCommunication.Core.Net;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
namespace HslCommunication.Enthernet.Redis
{
/// <summary>
/// Redis协议的订阅操作一个对象订阅一个或是多个频道的信息
/// </summary>
public class RedisSubscribe : NetworkXBase
{
#region Constructor
/// <summary>
/// 实例化一个发布订阅类的客户端需要指定ip地址端口及订阅关键字
/// </summary>
/// <param name="ipAddress">服务器的IP地址</param>
/// <param name="port">服务器的端口号</param>
/// <param name="keys">订阅关键字</param>
public RedisSubscribe( string ipAddress, int port, string[] keys )
{
endPoint = new IPEndPoint( IPAddress.Parse( ipAddress ), port );
keyWords = keys;
if (keys == null)
{
throw new Exception( StringResources.Language.KeyIsNotAllowedNull );
}
}
/// <summary>
/// 实例化一个发布订阅类的客户端需要指定ip地址端口及订阅关键字
/// </summary>
/// <param name="ipAddress">服务器的IP地址</param>
/// <param name="port">服务器的端口号</param>
/// <param name="key">订阅关键字</param>
public RedisSubscribe( string ipAddress, int port, string key )
{
endPoint = new IPEndPoint( IPAddress.Parse( ipAddress ), port );
keyWords = new string[] { key };
if (string.IsNullOrEmpty( key ))
{
throw new Exception( StringResources.Language.KeyIsNotAllowedNull );
}
}
#endregion
#region Private Method
private OperateResult CreatePush( )
{
CoreSocket?.Close( );
OperateResult<Socket> connect = CreateSocketAndConnect( endPoint, 5000 );
if (!connect.IsSuccess) return connect;
// 密码的验证
if (!string.IsNullOrEmpty( this.Password ))
{
OperateResult check = Send( connect.Content, RedisHelper.PackStringCommand( new string[] { "AUTH", this.Password } ) );
if (!check.IsSuccess) return check;
OperateResult<byte[]> checkResult = RedisHelper.ReceiveCommand( connect.Content );
if (!checkResult.IsSuccess) return checkResult;
string msg = Encoding.UTF8.GetString( checkResult.Content );
if (!msg.StartsWith( "+OK" )) return new OperateResult( msg );
}
List<string> lists = new List<string>( );
lists.Add( "SUBSCRIBE" );
lists.AddRange( keyWords );
OperateResult send = Send( connect.Content, RedisHelper.PackStringCommand( lists.ToArray( ) ) );
if (!send.IsSuccess) return send;
CoreSocket = connect.Content;
try
{
connect.Content.BeginReceive( new byte[0], 0, 0, SocketFlags.None, new AsyncCallback( ReceiveCallBack ), connect.Content );
}
catch(Exception ex)
{
return new OperateResult( ex.Message );
}
return OperateResult.CreateSuccessResult( );
}
private void ReceiveCallBack( IAsyncResult ar )
{
if(ar.AsyncState is Socket socket)
{
try
{
int receive = socket.EndReceive( ar );
OperateResult<byte[]> read = RedisHelper.ReceiveCommand( socket );
if (!read.IsSuccess)
{
SocketReceiveException( null );
return;
}
else
{
socket.BeginReceive( new byte[0], 0, 0, SocketFlags.None, new AsyncCallback( ReceiveCallBack ), socket );
}
OperateResult<string[]> data = RedisHelper.GetStringsFromCommandLine( read.Content );
if (!data.IsSuccess) {
LogNet?.WriteWarn( data.Message );
return;
}
if(data.Content[0].ToUpper() == "SUBSCRIBE")
{
return;
}
else if(data.Content[0].ToUpper( ) == "MESSAGE")
{
action?.Invoke( data.Content[1], data.Content[2] );
}
else
{
LogNet?.WriteWarn( data.Content[0] );
}
}
catch (ObjectDisposedException)
{
// 通常是主动退出
return;
}
catch(Exception ex)
{
SocketReceiveException( ex );
}
}
}
private void SocketReceiveException( Exception ex )
{
// 发生异常的时候需要进行重新连接
while (true)
{
if (ex != null) LogNet?.WriteException( "Offline", ex );
Console.WriteLine( StringResources.Language.ReConnectServerAfterTenSeconds );
System.Threading.Thread.Sleep( this.reconnectTime );
if (CreatePush( ).IsSuccess)
{
Console.WriteLine( StringResources.Language.ReConnectServerSuccess );
break;
}
}
}
#endregion
#region Public Properties
/// <summary>
/// 如果Redis服务器设置了密码此处就需要进行设置。必须在CreatePush方法调用前设置
/// </summary>
public string Password { get; set; }
#endregion
#region Public Method
/// <summary>
/// 创建数据推送服务
/// </summary>
/// <param name="pushCallBack">触发数据推送的委托</param>
/// <returns>是否创建成功</returns>
public OperateResult CreatePush( Action<string, string> pushCallBack )
{
action = pushCallBack;
return CreatePush( );
}
/// <summary>
/// 关闭消息推送的界面
/// </summary>
public void ClosePush( )
{
action = null;
CoreSocket?.Close( );
}
#endregion
#region Private Member
private IPEndPoint endPoint; // 服务器的地址及端口信息
private string[] keyWords = null; // 缓存的订阅关键字
private Action<string, string> action; // 服务器推送后的回调方法
private int reconnectTime = 10000; // 重连服务器的时间
#endregion
#region Object Override
/// <summary>
/// 返回表示当前对象的字符串
/// </summary>
/// <returns>字符串信息</returns>
public override string ToString( )
{
return $"RedisSubscribe[{endPoint}]";
}
#endregion
}
}