You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
PLT-PDA/Admin.Core.Tasks/QuartzNet/SchedulerCenterServer.cs

493 lines
18 KiB
C#

11 months ago
using Admin.Core.Common;
using Admin.Core.Model;
using Admin.Core.Model.Sys;
using Quartz;
using Quartz.Impl;
using Quartz.Impl.Triggers;
using Quartz.Spi;
using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Reflection;
using System.Threading.Tasks;
namespace Admin.Core.Tasks
{
/// <summary>
/// 任务调度管理中心
/// </summary>
public class SchedulerCenterServer : ISchedulerCenter
{
private Task<IScheduler> _scheduler;
private readonly IJobFactory _iocjobFactory;
public SchedulerCenterServer(IJobFactory jobFactory)
{
_iocjobFactory = jobFactory;
_scheduler = GetSchedulerAsync();
}
private Task<IScheduler> GetSchedulerAsync()
{
if (_scheduler != null)
return this._scheduler;
else
{
// 从Factory中获取Scheduler实例
NameValueCollection collection = new NameValueCollection
{
{ "quartz.serializer.type", "binary" },
};
StdSchedulerFactory factory = new StdSchedulerFactory(collection);
return _scheduler = factory.GetScheduler();
}
}
/// <summary>
/// 开启任务调度
/// </summary>
/// <returns></returns>
public async Task<MessageModel<string>> StartScheduleAsync()
{
var result = new MessageModel<string>();
try
{
this._scheduler.Result.JobFactory = this._iocjobFactory;
if (!this._scheduler.Result.IsStarted)
{
//等待任务运行完成
await this._scheduler.Result.Start();
await Console.Out.WriteLineAsync("任务调度开启!");
result.success = true;
result.msg = $"任务调度开启成功";
return result;
}
else
{
result.success = false;
result.msg = $"任务调度已经开启";
return result;
}
}
catch (Exception)
{
throw;
}
}
/// <summary>
/// 停止任务调度
/// </summary>
/// <returns></returns>
public async Task<MessageModel<string>> StopScheduleAsync()
{
var result = new MessageModel<string>();
try
{
if (!this._scheduler.Result.IsShutdown)
{
//等待任务运行完成
await this._scheduler.Result.Shutdown();
await Console.Out.WriteLineAsync("任务调度停止!");
result.success = true;
result.msg = $"任务调度停止成功";
return result;
}
else
{
result.success = false;
result.msg = $"任务调度已经停止";
return result;
}
}
catch (Exception)
{
throw;
}
}
/// <summary>
/// 添加一个计划任务映射程序集指定IJob实现类
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="SysTasksQz"></param>
/// <returns></returns>
public async Task<MessageModel<string>> AddScheduleJobAsync(SysTasksQz SysTasksQz)
{
var result = new MessageModel<string>();
if (SysTasksQz != null)
{
try
{
JobKey jobKey = new JobKey(SysTasksQz.ID.ToString(), SysTasksQz.JobGroup);
if (await _scheduler.Result.CheckExists(jobKey))
{
result.success = false;
result.msg = $"该任务计划已经在执行:【{SysTasksQz.Name}】,请勿重复启动!";
return result;
}
#region 设置开始时间和结束时间
if (SysTasksQz.BeginTime == null)
{
SysTasksQz.BeginTime = DateTime.Now;
}
DateTimeOffset starRunTime = DateBuilder.NextGivenSecondDate(SysTasksQz.BeginTime, 1);//设置开始时间
if (SysTasksQz.EndTime == null)
{
SysTasksQz.EndTime = DateTime.MaxValue.AddDays(-1);
}
DateTimeOffset endRunTime = DateBuilder.NextGivenSecondDate(SysTasksQz.EndTime, 1);//设置暂停时间
#endregion
#region 通过反射获取程序集类型和类
Assembly assembly = Assembly.Load(new AssemblyName(SysTasksQz.AssemblyName));
Type jobType = assembly.GetType(SysTasksQz.AssemblyName + "." + SysTasksQz.ClassName);
#endregion
//判断任务调度是否开启
if (!_scheduler.Result.IsStarted)
{
await StartScheduleAsync();
}
//传入反射出来的执行程序集
IJobDetail job = new JobDetailImpl(SysTasksQz.ID.ToString(), SysTasksQz.JobGroup, jobType);
job.JobDataMap.Add("JobParam", SysTasksQz.JobParams);
ITrigger trigger;
#region 泛型传递
//IJobDetail job = JobBuilder.Create<T>()
// .WithIdentity(sysSchedule.Name, sysSchedule.JobGroup)
// .Build();
#endregion
if (SysTasksQz.Cron != null && CronExpression.IsValidExpression(SysTasksQz.Cron) && SysTasksQz.TriggerType > 0)
{
trigger = CreateCronTrigger(SysTasksQz);
((CronTriggerImpl)trigger).MisfireInstruction = MisfireInstruction.CronTrigger.DoNothing;
}
else
{
trigger = CreateSimpleTrigger(SysTasksQz);
}
// 告诉Quartz使用我们的触发器来安排作业
await _scheduler.Result.ScheduleJob(job, trigger);
//await Task.Delay(TimeSpan.FromSeconds(120));
//await Console.Out.WriteLineAsync("关闭了调度器!");
//await _scheduler.Result.Shutdown();
result.success = true;
result.msg = $"【{SysTasksQz.Name}】成功";
return result;
}
catch (Exception ex)
{
result.success = false;
result.msg = $"任务计划异常:【{ex.Message}】";
return result;
}
}
else
{
result.success = false;
result.msg = $"任务计划不存在:【{SysTasksQz?.Name}】";
return result;
}
}
/// <summary>
/// 任务是否存在?
/// </summary>
/// <returns></returns>
public async Task<bool> IsExistScheduleJobAsync(SysTasksQz sysSchedule)
{
JobKey jobKey = new JobKey(sysSchedule.ID.ToString(), sysSchedule.JobGroup);
if (await _scheduler.Result.CheckExists(jobKey))
{
return true;
}
else
{
return false;
}
}
/// <summary>
/// 暂停一个指定的计划任务
/// </summary>
/// <returns></returns>
public async Task<MessageModel<string>> StopScheduleJobAsync(SysTasksQz sysSchedule)
{
var result = new MessageModel<string>();
try
{
JobKey jobKey = new JobKey(sysSchedule.ID.ToString(), sysSchedule.JobGroup);
if (!await _scheduler.Result.CheckExists(jobKey))
{
result.success = false;
result.msg = $"未找到要暂停的任务:【{sysSchedule.Name}】";
return result;
}
else
{
await this._scheduler.Result.DeleteJob(jobKey);
result.success = true;
result.msg = $"【{sysSchedule.Name}】成功";
return result;
}
}
catch (Exception)
{
throw;
}
}
/// <summary>
/// 恢复指定的计划任务
/// </summary>
/// <param name="sysSchedule"></param>
/// <returns></returns>
public async Task<MessageModel<string>> ResumeJob(SysTasksQz sysSchedule)
{
var result = new MessageModel<string>();
try
{
JobKey jobKey = new JobKey(sysSchedule.ID.ToString(), sysSchedule.JobGroup);
if (!await _scheduler.Result.CheckExists(jobKey))
{
result.success = false;
result.msg = $"未找到要恢复的任务:【{sysSchedule.Name}】";
return result;
}
await this._scheduler.Result.ResumeJob(jobKey);
result.success = true;
result.msg = $"【{sysSchedule.Name}】成功";
return result;
}
catch (Exception)
{
throw;
}
}
/// <summary>
/// 暂停指定的计划任务
/// </summary>
/// <param name="sysSchedule"></param>
/// <returns></returns>
public async Task<MessageModel<string>> PauseJob(SysTasksQz sysSchedule)
{
var result = new MessageModel<string>();
try
{
JobKey jobKey = new JobKey(sysSchedule.ID.ToString(), sysSchedule.JobGroup);
if (!await _scheduler.Result.CheckExists(jobKey))
{
result.success = false;
result.msg = $"未找到要暂停的任务:【{sysSchedule.Name}】";
return result;
}
await this._scheduler.Result.PauseJob(jobKey);
result.success = true;
result.msg = $"【{sysSchedule.Name}】成功";
return result;
}
catch (Exception)
{
throw;
}
}
#region 状态状态帮助方法
public async Task<List<TaskInfoDto>> GetTaskStaus(SysTasksQz sysSchedule)
{
var ls = new List<TaskInfoDto>();
var noTask = new List<TaskInfoDto>{ new TaskInfoDto {
jobId = sysSchedule.ID.ObjToString(),
jobGroup = sysSchedule.JobGroup,
triggerId = "",
triggerGroup = "",
triggerStatus = "不存在"
} };
JobKey jobKey = new JobKey(sysSchedule.ID.ToString(), sysSchedule.JobGroup);
IJobDetail job = await this._scheduler.Result.GetJobDetail(jobKey);
if (job == null)
{
return noTask;
}
//info.Append(string.Format("任务ID:{0}\r\n任务名称:{1}\r\n", job.Key.Name, job.Description));
var triggers = await this._scheduler.Result.GetTriggersOfJob(jobKey);
if (triggers == null || triggers.Count == 0)
{
return noTask;
}
foreach (var trigger in triggers)
{
var triggerStaus = await this._scheduler.Result.GetTriggerState(trigger.Key);
string state = GetTriggerState(triggerStaus.ObjToString());
ls.Add(new TaskInfoDto
{
jobId = job.Key.Name,
jobGroup = job.Key.Group,
triggerId = trigger.Key.Name,
triggerGroup = trigger.Key.Group,
triggerStatus = state
});
//info.Append(string.Format("触发器ID:{0}\r\n触发器名称:{1}\r\n状态:{2}\r\n", item.Key.Name, item.Description, state));
}
return ls;
}
public string GetTriggerState(string key)
{
string state = null;
if (key != null)
key = key.ToUpper();
switch (key)
{
case "1":
state = "暂停";
break;
case "2":
state = "完成";
break;
case "3":
state = "出错";
break;
case "4":
state = "阻塞";
break;
case "0":
state = "正常";
break;
case "-1":
state = "不存在";
break;
case "BLOCKED":
state = "阻塞";
break;
case "COMPLETE":
state = "完成";
break;
case "ERROR":
state = "出错";
break;
case "NONE":
state = "不存在";
break;
case "NORMAL":
state = "正常";
break;
case "PAUSED":
state = "暂停";
break;
}
return state;
}
#endregion
#region 创建触发器帮助方法
/// <summary>
/// 创建SimpleTrigger触发器简单触发器
/// </summary>
/// <param name="sysSchedule"></param>
/// <param name="starRunTime"></param>
/// <param name="endRunTime"></param>
/// <returns></returns>
private ITrigger CreateSimpleTrigger(SysTasksQz sysSchedule)
{
if (sysSchedule.CycleRunTimes > 0)
{
ITrigger trigger = TriggerBuilder.Create()
.WithIdentity(sysSchedule.ID.ToString(), sysSchedule.JobGroup)
.StartAt(sysSchedule.BeginTime.Value)
.WithSimpleSchedule(x => x
.WithIntervalInSeconds((int)sysSchedule.IntervalSecond)
.WithRepeatCount((int)sysSchedule.CycleRunTimes - 1))
.EndAt(sysSchedule.EndTime.Value)
.Build();
return trigger;
}
else
{
ITrigger trigger = TriggerBuilder.Create()
.WithIdentity(sysSchedule.ID.ToString(), sysSchedule.JobGroup)
.StartAt(sysSchedule.BeginTime.Value)
.WithSimpleSchedule(x => x
.WithIntervalInSeconds((int)sysSchedule.IntervalSecond)
.RepeatForever()
)
.EndAt(sysSchedule.EndTime.Value)
.Build();
return trigger;
}
// 触发作业立即运行然后每10秒重复一次无限循环
}
/// <summary>
/// 创建类型Cron的触发器
/// </summary>
/// <param name="m"></param>
/// <returns></returns>
private ITrigger CreateCronTrigger(SysTasksQz sysSchedule)
{
// 作业触发器
return TriggerBuilder.Create()
.WithIdentity(sysSchedule.ID.ToString(), sysSchedule.JobGroup)
.StartAt(sysSchedule.BeginTime.Value)//开始时间
.EndAt(sysSchedule.EndTime.Value)//结束数据
.WithCronSchedule(sysSchedule.Cron)//指定cron表达式
.ForJob(sysSchedule.ID.ToString(), sysSchedule.JobGroup)//作业名称
.Build();
}
#endregion
/// <summary>
/// 立即执行 一个任务
/// </summary>
/// <param name="SysTasksQz"></param>
/// <returns></returns>
public async Task<MessageModel<string>> ExecuteJobAsync(SysTasksQz SysTasksQz)
{
var result = new MessageModel<string>();
try
{
JobKey jobKey = new JobKey(SysTasksQz.ID.ToString(), SysTasksQz.JobGroup);
//判断任务是否存在,存在则 触发一次,不存在则先添加一个任务,触发以后再 停止任务
if (!await _scheduler.Result.CheckExists(jobKey))
{
//不存在 则 添加一个计划任务
await AddScheduleJobAsync(SysTasksQz);
//触发执行一次
await _scheduler.Result.TriggerJob(jobKey);
//停止任务
await StopScheduleJobAsync(SysTasksQz);
result.success = true;
result.msg = $"立即执行计划任务:【{SysTasksQz.Name}】成功";
}
else
{
await _scheduler.Result.TriggerJob(jobKey);
result.success = true;
result.msg = $"立即执行计划任务:【{SysTasksQz.Name}】成功";
}
}
catch (Exception ex)
{
result.success = false;
result.msg = $"立即执行计划任务失败:【{ex.Message}】";
}
return result;
}
}
}