From e09bbc249f2f86678b7bd4ad9b476152c15c48d8 Mon Sep 17 00:00:00 2001 From: AprilWind <2100166581@qq.com> Date: Fri, 28 Jun 2024 11:30:56 +0800 Subject: [PATCH] =?UTF-8?q?docs=20=E4=BC=98=E5=8C=96dubbo=E6=B3=A8?= =?UTF-8?q?=E9=87=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../support/AbstractMetadataReport.java | 480 +++++++++++++----- .../store/redis/RedisMetadataReport.java | 352 ++++++++++--- .../CustomBeanFactoryPostProcessor.java | 25 +- .../dubbo/config/DubboConfiguration.java | 3 + .../common/dubbo/enumd/RequestLogEnum.java | 14 +- .../dubbo/filter/DubboRequestFilter.java | 40 +- .../properties/DubboCustomProperties.java | 6 + 7 files changed, 715 insertions(+), 205 deletions(-) diff --git a/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/report/support/AbstractMetadataReport.java b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/report/support/AbstractMetadataReport.java index 6c53681c..cba4d52c 100644 --- a/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/report/support/AbstractMetadataReport.java +++ b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/report/support/AbstractMetadataReport.java @@ -49,6 +49,9 @@ import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROXY_FAILED import static org.apache.dubbo.common.utils.StringUtils.replace; import static org.apache.dubbo.metadata.report.support.Constants.*; +/** + * 抽象的元数据上报实现类,实现了元数据上报的基本操作 + */ public abstract class AbstractMetadataReport implements MetadataReport { protected static final String DEFAULT_ROOT = "dubbo"; @@ -58,18 +61,17 @@ public abstract class AbstractMetadataReport implements MetadataReport { // Log output protected final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(getClass()); - // Local disk cache, where the special key value.registries records the list of metadata centers, and the others are - // the list of notified service providers + // 本地磁盘缓存,特定键值 registries 记录元数据中心列表,其他是通知的服务提供者列表 final Properties properties = new Properties(); private final ExecutorService reportCacheExecutor = - Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveMetadataReport", true)); + Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveMetadataReport", true)); final Map allMetadataReports = new ConcurrentHashMap<>(4); private final AtomicLong lastCacheChanged = new AtomicLong(); final Map failedReports = new ConcurrentHashMap<>(4); private URL reportURL; boolean syncReport; - // Local disk cache file + // 本地磁盘缓存文件 File file; private AtomicBoolean initialized = new AtomicBoolean(false); public MetadataReportRetry metadataReportRetry; @@ -79,29 +81,30 @@ public abstract class AbstractMetadataReport implements MetadataReport { private final boolean reportDefinition; protected ApplicationModel applicationModel; + /** + * 构造方法,初始化元数据上报实现类 + * + * @param reportServerURL 元数据上报服务器的URL + */ public AbstractMetadataReport(URL reportServerURL) { setUrl(reportServerURL); applicationModel = reportServerURL.getOrDefaultApplicationModel(); boolean localCacheEnabled = reportServerURL.getParameter(REGISTRY_LOCAL_FILE_CACHE_ENABLED, true); - // Start file save timer + // 启动文件保存定时器 String defaultFilename = System.getProperty("user.home") + DUBBO_METADATA - + reportServerURL.getApplication() - + "-" + replace(reportServerURL.getAddress(), ":", "-") - + CACHE; + + reportServerURL.getApplication() + "-" + replace(reportServerURL.getAddress(), ":", "-") + CACHE; String filename = reportServerURL.getParameter(FILE_KEY, defaultFilename); File file = null; if (localCacheEnabled && ConfigUtils.isNotEmpty(filename)) { file = new File(filename); - if (!file.exists() - && file.getParentFile() != null - && !file.getParentFile().exists()) { + if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) { if (!file.getParentFile().mkdirs()) { throw new IllegalArgumentException("Invalid service store file " + file - + ", cause: Failed to create directory " + file.getParentFile() + "!"); + + ", cause: Failed to create directory " + file.getParentFile() + "!"); } } - // if this file exists, firstly delete it. + // 如果文件存在,首先删除它 if (!initialized.getAndSet(true) && file.exists()) { file.delete(); } @@ -110,24 +113,33 @@ public abstract class AbstractMetadataReport implements MetadataReport { loadProperties(); syncReport = reportServerURL.getParameter(SYNC_REPORT_KEY, false); metadataReportRetry = new MetadataReportRetry( - reportServerURL.getParameter(RETRY_TIMES_KEY, DEFAULT_METADATA_REPORT_RETRY_TIMES), - reportServerURL.getParameter(RETRY_PERIOD_KEY, DEFAULT_METADATA_REPORT_RETRY_PERIOD)); - // cycle report the data switch + reportServerURL.getParameter(RETRY_TIMES_KEY, DEFAULT_METADATA_REPORT_RETRY_TIMES), + reportServerURL.getParameter(RETRY_PERIOD_KEY, DEFAULT_METADATA_REPORT_RETRY_PERIOD)); + + // 循环上报数据开关 if (reportServerURL.getParameter(CYCLE_REPORT_KEY, DEFAULT_METADATA_REPORT_CYCLE_REPORT)) { - reportTimerScheduler = Executors.newSingleThreadScheduledExecutor( - new NamedThreadFactory("DubboMetadataReportTimer", true)); - reportTimerScheduler.scheduleAtFixedRate( - this::publishAll, calculateStartTime(), ONE_DAY_IN_MILLISECONDS, TimeUnit.MILLISECONDS); + reportTimerScheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboMetadataReportTimer", true)); + reportTimerScheduler.scheduleAtFixedRate(this::publishAll, calculateStartTime(), ONE_DAY_IN_MILLISECONDS, TimeUnit.MILLISECONDS); } this.reportMetadata = reportServerURL.getParameter(REPORT_METADATA_KEY, false); this.reportDefinition = reportServerURL.getParameter(REPORT_DEFINITION_KEY, true); } + /** + * 获取元数据上报服务器的URL + * + * @return 元数据上报服务器的URL + */ public URL getUrl() { return reportURL; } + /** + * 设置元数据上报服务器的URL + * + * @param url 元数据上报服务器的URL + */ protected void setUrl(URL url) { if (url == null) { throw new IllegalArgumentException("metadataReport url == null"); @@ -135,6 +147,11 @@ public abstract class AbstractMetadataReport implements MetadataReport { this.reportURL = url; } + /** + * 执行保存属性操作,将属性持久化到本地磁盘缓存文件中 + * + * @param version 缓存版本号 + */ private void doSaveProperties(long version) { if (version < lastCacheChanged.get()) { return; @@ -142,21 +159,21 @@ public abstract class AbstractMetadataReport implements MetadataReport { if (file == null) { return; } - // Save + // 保存操作 try { File lockfile = new File(file.getAbsolutePath() + ".lock"); if (!lockfile.exists()) { lockfile.createNewFile(); } try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw"); - FileChannel channel = raf.getChannel()) { + FileChannel channel = raf.getChannel()) { FileLock lock = channel.tryLock(); if (lock == null) { throw new IOException( - "Can not lock the metadataReport cache file " + file.getAbsolutePath() - + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.metadata.file=xxx.properties"); + "Can not lock the metadataReport cache file " + file.getAbsolutePath() + + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.metadata.file=xxx.properties"); } - // Save + // 保存 try { if (!file.exists()) { file.createNewFile(); @@ -164,12 +181,10 @@ public abstract class AbstractMetadataReport implements MetadataReport { Properties tmpProperties; if (!syncReport) { - // When syncReport = false, properties.setProperty and properties.store are called from the same - // thread(reportCacheExecutor), so deep copy is not required + // 当 syncReport = false 时,从同一个线程(reportCacheExecutor)中调用 properties.setProperty 和 properties.store,因此不需要深度复制 tmpProperties = properties; } else { - // Using store method and setProperty method of the this.properties will cause lock contention - // under multi-threading, so deep copy a new container + // 使用 store 方法和 this.properties 的 setProperty 方法会在多线程环境下引起锁竞争,因此需要深度复制一个新的容器 tmpProperties = new Properties(); Set> entries = properties.entrySet(); for (Map.Entry entry : entries) { @@ -190,15 +205,14 @@ public abstract class AbstractMetadataReport implements MetadataReport { } else { reportCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet())); } - logger.warn( - COMMON_UNEXPECTED_EXCEPTION, - "", - "", - "Failed to save service store file, cause: " + e.getMessage(), - e); + logger.warn(COMMON_UNEXPECTED_EXCEPTION, "", "", + "Failed to save service store file, cause: " + e.getMessage(), e); } } + /** + * 加载本地磁盘缓存文件中的属性 + */ void loadProperties() { if (file != null && file.exists()) { try (InputStream in = new FileInputStream(file)) { @@ -212,6 +226,14 @@ public abstract class AbstractMetadataReport implements MetadataReport { } } + /** + * 将元数据信息保存到本地文件中 + * + * @param metadataIdentifier 元数据标识符,用于唯一标识元数据信息 + * @param value 要保存的元数据信息的字符串表示 + * @param add 是否添加元数据信息,如果为 true,则添加;否则,移除 + * @param sync 是否同步保存,如果为 true,则同步保存;否则,异步保存 + */ private void saveProperties(MetadataIdentifier metadataIdentifier, String value, boolean add, boolean sync) { if (file == null) { return; @@ -219,14 +241,19 @@ public abstract class AbstractMetadataReport implements MetadataReport { try { if (add) { + // 添加元数据信息到 properties 中 properties.setProperty(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), value); } else { + // 移除指定的元数据信息 properties.remove(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY)); } + // 更新缓存变化版本号 long version = lastCacheChanged.incrementAndGet(); if (sync) { + // 同步保存属性到文件 new SaveProperties(version).run(); } else { + // 异步执行保存属性到文件任务 reportCacheExecutor.execute(new SaveProperties(version)); } @@ -240,6 +267,9 @@ public abstract class AbstractMetadataReport implements MetadataReport { return getUrl().toString(); } + /** + * 内部类,实现了 `Runnable` 接口,用于保存属性到本地文件 + */ private class SaveProperties implements Runnable { private long version; @@ -253,9 +283,14 @@ public abstract class AbstractMetadataReport implements MetadataReport { } } + /** + * 保存属性到本地磁盘缓存文件中 + * + * @param providerMetadataIdentifier 提供者元数据标识符 + * @param serviceDefinition 要存储的服务定义 + */ @Override - public void storeProviderMetadata( - MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) { + public void storeProviderMetadata(MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) { if (syncReport) { storeProviderMetadataTask(providerMetadataIdentifier, serviceDefinition); } else { @@ -263,95 +298,135 @@ public abstract class AbstractMetadataReport implements MetadataReport { } } - private void storeProviderMetadataTask( - MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) { + /** + * 异步任务:存储服务提供者元数据的任务 + * + * @param providerMetadataIdentifier 提供者元数据的标识符 + * @param serviceDefinition 服务定义对象 + */ + private void storeProviderMetadataTask(MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) { + // 将元数据事件转换为服务订阅事件 + MetadataEvent metadataEvent = MetadataEvent.toServiceSubscribeEvent(applicationModel, providerMetadataIdentifier.getUniqueServiceName()); - MetadataEvent metadataEvent = MetadataEvent.toServiceSubscribeEvent( - applicationModel, providerMetadataIdentifier.getUniqueServiceName()); + // 发布元数据事件到指标事件总线,执行回调任务 MetricsEventBus.post( - metadataEvent, - () -> { - boolean result = true; - try { - if (logger.isInfoEnabled()) { - logger.info("store provider metadata. Identifier : " + providerMetadataIdentifier - + "; definition: " + serviceDefinition); - } - allMetadataReports.put(providerMetadataIdentifier, serviceDefinition); - failedReports.remove(providerMetadataIdentifier); - String data = JsonUtils.toJson(serviceDefinition); - doStoreProviderMetadata(providerMetadataIdentifier, data); - saveProperties(providerMetadataIdentifier, data, true, !syncReport); - } catch (Exception e) { - // retry again. If failed again, throw exception. - failedReports.put(providerMetadataIdentifier, serviceDefinition); - metadataReportRetry.startRetryTask(); - logger.error( - PROXY_FAILED_EXPORT_SERVICE, - "", - "", - "Failed to put provider metadata " + providerMetadataIdentifier + " in " - + serviceDefinition + ", cause: " + e.getMessage(), - e); - result = false; + metadataEvent, + () -> { + boolean result = true; + try { + // 记录日志:存储服务提供者元数据 + if (logger.isInfoEnabled()) { + logger.info("store provider metadata. Identifier : " + providerMetadataIdentifier + + "; definition: " + serviceDefinition); } - return result; - }, - aBoolean -> aBoolean); + + // 将服务定义对象放入所有元数据报告的缓存中,移除失败的报告 + allMetadataReports.put(providerMetadataIdentifier, serviceDefinition); + failedReports.remove(providerMetadataIdentifier); + + // 将服务定义对象转换为 JSON 字符串并存储到元数据存储中 + String data = JsonUtils.toJson(serviceDefinition); + doStoreProviderMetadata(providerMetadataIdentifier, data); + + // 保存属性变更到本地属性缓存 + saveProperties(providerMetadataIdentifier, data, true, !syncReport); + } catch (Exception e) { + // 如果存储失败,记录错误日志,加入失败的报告列表,并启动重试任务 + failedReports.put(providerMetadataIdentifier, serviceDefinition); + metadataReportRetry.startRetryTask(); + logger.error(PROXY_FAILED_EXPORT_SERVICE, "", "", + "Failed to put provider metadata " + providerMetadataIdentifier + " in " + + serviceDefinition + ", cause: " + e.getMessage(), + e); + result = false; + } + return result; + }, + aBoolean -> aBoolean); } + /** + * 存储消费者元数据 + * 如果同步报告开关打开,则直接调用同步方法存储;否则,通过线程池异步执行存储任务 + * + * @param consumerMetadataIdentifier 消费者元数据的标识符 + * @param serviceParameterMap 服务参数映射表 + */ @Override - public void storeConsumerMetadata( - MetadataIdentifier consumerMetadataIdentifier, Map serviceParameterMap) { + public void storeConsumerMetadata(MetadataIdentifier consumerMetadataIdentifier, Map serviceParameterMap) { if (syncReport) { storeConsumerMetadataTask(consumerMetadataIdentifier, serviceParameterMap); } else { - reportCacheExecutor.execute( - () -> storeConsumerMetadataTask(consumerMetadataIdentifier, serviceParameterMap)); + reportCacheExecutor.execute(() -> storeConsumerMetadataTask(consumerMetadataIdentifier, serviceParameterMap)); } } - protected void storeConsumerMetadataTask( - MetadataIdentifier consumerMetadataIdentifier, Map serviceParameterMap) { + /** + * 异步任务:存储消费者元数据的任务 + * + * @param consumerMetadataIdentifier 消费者元数据的标识符 + * @param serviceParameterMap 服务参数映射表 + */ + protected void storeConsumerMetadataTask(MetadataIdentifier consumerMetadataIdentifier, Map serviceParameterMap) { try { + // 记录日志:存储消费者元数据 if (logger.isInfoEnabled()) { logger.info("store consumer metadata. Identifier : " + consumerMetadataIdentifier + "; definition: " - + serviceParameterMap); + + serviceParameterMap); } + + // 将服务参数映射表放入所有元数据报告的缓存中,移除失败的报告 allMetadataReports.put(consumerMetadataIdentifier, serviceParameterMap); failedReports.remove(consumerMetadataIdentifier); + // 将服务参数映射表转换为 JSON 字符串并存储到元数据存储中 String data = JsonUtils.toJson(serviceParameterMap); doStoreConsumerMetadata(consumerMetadataIdentifier, data); + + // 保存属性变更到本地属性缓存 saveProperties(consumerMetadataIdentifier, data, true, !syncReport); } catch (Exception e) { - // retry again. If failed again, throw exception. + // 如果存储失败,记录错误日志,加入失败的报告列表,并启动重试任务 failedReports.put(consumerMetadataIdentifier, serviceParameterMap); metadataReportRetry.startRetryTask(); logger.error( - PROXY_FAILED_EXPORT_SERVICE, - "", - "", - "Failed to put consumer metadata " + consumerMetadataIdentifier + "; " + serviceParameterMap - + ", cause: " + e.getMessage(), - e); + PROXY_FAILED_EXPORT_SERVICE, + "", + "", + "Failed to put consumer metadata " + consumerMetadataIdentifier + "; " + serviceParameterMap + + ", cause: " + e.getMessage(), + e); } } + /** + * 销毁方法,用于释放资源和关闭相关任务调度器 + */ @Override public void destroy() { + // 关闭报告缓存执行器 if (reportCacheExecutor != null) { reportCacheExecutor.shutdown(); } + + // 关闭报告定时调度器 if (reportTimerScheduler != null) { reportTimerScheduler.shutdown(); } + + // 销毁元数据报告重试管理器,并置空引用 if (metadataReportRetry != null) { metadataReportRetry.destroy(); metadataReportRetry = null; } } + /** + * 保存服务元数据。根据同步设置,同步执行或通过报告缓存执行保存操作 + * + * @param metadataIdentifier 服务元数据标识符 + * @param url 服务的URL + */ @Override public void saveServiceMetadata(ServiceMetadataIdentifier metadataIdentifier, URL url) { if (syncReport) { @@ -361,6 +436,11 @@ public abstract class AbstractMetadataReport implements MetadataReport { } } + /** + * 移除服务元数据。根据同步设置,同步执行或通过报告缓存执行移除操作 + * + * @param metadataIdentifier 服务元数据标识符 + */ @Override public void removeServiceMetadata(ServiceMetadataIdentifier metadataIdentifier) { if (syncReport) { @@ -370,28 +450,51 @@ public abstract class AbstractMetadataReport implements MetadataReport { } } + /** + * 获取导出的URL列表。如果未能获取,则回退到本地缓存 + * + * @param metadataIdentifier 服务元数据标识符 + * @return 导出的URL列表 + */ @Override public List getExportedURLs(ServiceMetadataIdentifier metadataIdentifier) { - // TODO, fallback to local cache + // TODO 回退到本地缓存 return doGetExportedURLs(metadataIdentifier); } + /** + * 存储订阅的数据。如果同步报告开启,则直接存储订阅数据;否则,将异步执行存储操作 + * + * @param subscriberMetadataIdentifier 订阅元数据标识符 + * @param urls 订阅的URL集合 + */ @Override public void saveSubscribedData(SubscriberMetadataIdentifier subscriberMetadataIdentifier, Set urls) { if (syncReport) { doSaveSubscriberData(subscriberMetadataIdentifier, JsonUtils.toJson(urls)); } else { - reportCacheExecutor.execute( - () -> doSaveSubscriberData(subscriberMetadataIdentifier, JsonUtils.toJson(urls))); + reportCacheExecutor.execute(() -> doSaveSubscriberData(subscriberMetadataIdentifier, JsonUtils.toJson(urls))); } } + /** + * 获取订阅的URL列表 + * + * @param subscriberMetadataIdentifier 订阅元数据标识符 + * @return 订阅的URL列表 + */ @Override public List getSubscribedURLs(SubscriberMetadataIdentifier subscriberMetadataIdentifier) { String content = doGetSubscribedURLs(subscriberMetadataIdentifier); return JsonUtils.toJavaList(content, String.class); } + /** + * 获取URL的协议 + * + * @param url URL对象 + * @return URL的协议 + */ String getProtocol(URL url) { String protocol = url.getSide(); protocol = protocol == null ? url.getProtocol() : protocol; @@ -399,33 +502,52 @@ public abstract class AbstractMetadataReport implements MetadataReport { } /** - * @return if need to continue + * 判断是否需要重试处理元数据集合 + * + * @return 如果需要继续重试,则返回true;否则返回false */ public boolean retry() { return doHandleMetadataCollection(failedReports); } + /** + * 指示是否应报告定义 + * + * @return 如果应报告定义,则返回true;否则返回false + */ @Override public boolean shouldReportDefinition() { return reportDefinition; } + /** + * 指示是否应报告元数据 + * + * @return 如果应报告元数据,则返回true;否则返回false + */ @Override public boolean shouldReportMetadata() { return reportMetadata; } + /** + * 处理元数据集合的方法,根据元数据的侧边(提供者或消费者)将其存储到相应的位置 + * + * @param metadataMap 元数据映射,包含要处理的元数据标识符和相应的对象 + * @return 如果处理完毕后需要继续重试,则返回true;否则返回false + */ private boolean doHandleMetadataCollection(Map metadataMap) { if (metadataMap.isEmpty()) { return true; } - Iterator> iterable = - metadataMap.entrySet().iterator(); + Iterator> iterable = metadataMap.entrySet().iterator(); while (iterable.hasNext()) { Map.Entry item = iterable.next(); if (PROVIDER_SIDE.equals(item.getKey().getSide())) { + // 如果是提供者侧的元数据,则存储为完整的服务定义对象 this.storeProviderMetadata(item.getKey(), (FullServiceDefinition) item.getValue()); } else if (CONSUMER_SIDE.equals(item.getKey().getSide())) { + // 如果是消费者侧的元数据,则存储为参数映射 this.storeConsumerMetadata(item.getKey(), (Map) item.getValue()); } } @@ -433,7 +555,8 @@ public abstract class AbstractMetadataReport implements MetadataReport { } /** - * not private. just for unittest. + * 用于单元测试的方法,不是私有方法 + * 发布所有元数据到相应的处理方法 */ void publishAll() { logger.info("start to publish all metadata."); @@ -441,9 +564,14 @@ public abstract class AbstractMetadataReport implements MetadataReport { } /** - * between 2:00 am to 6:00 am, the time is random. + * 计算一个起始时间,用于设置定时任务的启动时间 + * 时间计算逻辑包括: + * 1. 获取当前时间的毫秒数 + * 2. 将日历设置为当天的午夜(00:00:00.000) + * 3. 计算当前时间到午夜的毫秒数差 + * 4. 加上一定的偏移量,包括四小时的一半和一个四小时内的随机毫秒数 * - * @return + * @return 计算得到的起始时间 */ long calculateStartTime() { Calendar calendar = Calendar.getInstance(); @@ -454,62 +582,100 @@ public abstract class AbstractMetadataReport implements MetadataReport { calendar.set(Calendar.MILLISECOND, 0); long subtract = calendar.getTimeInMillis() + ONE_DAY_IN_MILLISECONDS - nowMill; return subtract - + (FOUR_HOURS_IN_MILLISECONDS / 2) - + ThreadLocalRandom.current().nextInt(FOUR_HOURS_IN_MILLISECONDS); + + (FOUR_HOURS_IN_MILLISECONDS / 2) + + ThreadLocalRandom.current().nextInt(FOUR_HOURS_IN_MILLISECONDS); } + /** + * MetadataReportRetry 类用于处理元数据报告的重试机制 + */ class MetadataReportRetry { protected final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(getClass()); - final ScheduledExecutorService retryExecutor = - Executors.newScheduledThreadPool(0, new NamedThreadFactory("DubboMetadataReportRetryTimer", true)); + /** + * 用于执行定时重试任务的调度执行器服务 + */ + final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(0, new NamedThreadFactory("DubboMetadataReportRetryTimer", true)); + + /** + * 用于取消重试任务的计划执行句柄 + */ volatile ScheduledFuture retryScheduledFuture; + + /** + * 重试次数计数器,用于记录已经进行的重试次数 + */ final AtomicInteger retryCounter = new AtomicInteger(0); - // retry task schedule period + + /** + * 重试任务的执行周期,以毫秒为单位 + */ long retryPeriod; - // if no failed report, wait how many times to run retry task. + + /** + * 当没有失败报告时,等待多少次运行重试任务 + */ int retryTimesIfNonFail = 600; + /** + * 重试限制次数,达到此次数后不再继续重试 + */ int retryLimit; + /** + * 构造函数,初始化重试次数和重试周期 + * + * @param retryTimes 重试次数限制 + * @param retryPeriod 重试周期(毫秒) + */ public MetadataReportRetry(int retryTimes, int retryPeriod) { this.retryPeriod = retryPeriod; this.retryLimit = retryTimes; } + /** + * 启动重试任务,如果未启动则执行定时重试 + */ void startRetryTask() { if (retryScheduledFuture == null) { synchronized (retryCounter) { if (retryScheduledFuture == null) { retryScheduledFuture = retryExecutor.scheduleWithFixedDelay( - () -> { - // Check and connect to the metadata - try { - int times = retryCounter.incrementAndGet(); - logger.info("start to retry task for metadata report. retry times:" + times); - if (retry() && times > retryTimesIfNonFail) { - cancelRetryTask(); - } - if (times > retryLimit) { - cancelRetryTask(); - } - } catch (Throwable t) { // Defensive fault tolerance - logger.error( - COMMON_UNEXPECTED_EXCEPTION, - "", - "", - "Unexpected error occur at failed retry, cause: " + t.getMessage(), - t); + () -> { + // 检查并连接到元数据 + try { + int times = retryCounter.incrementAndGet(); + logger.info("start to retry task for metadata report. retry times:" + times); + + // 执行重试操作,如果无失败报告并且超过指定重试次数,则取消重试任务 + if (retry() && times > retryTimesIfNonFail) { + cancelRetryTask(); + } + + // 如果超过重试限制次数,则取消重试任务 + if (times > retryLimit) { + cancelRetryTask(); } - }, - 500, - retryPeriod, - TimeUnit.MILLISECONDS); + } catch (Throwable t) { // 防御性容错处理 + logger.error( + COMMON_UNEXPECTED_EXCEPTION, + "", + "", + "Unexpected error occur at failed retry, cause: " + t.getMessage(), + t); + } + }, + 500, + retryPeriod, + TimeUnit.MILLISECONDS); } } } } + /** + * 取消重试任务。如果存在已计划的任务,则取消并关闭重试执行器 + */ void cancelRetryTask() { if (retryScheduledFuture != null) { retryScheduledFuture.cancel(false); @@ -517,12 +683,17 @@ public abstract class AbstractMetadataReport implements MetadataReport { retryExecutor.shutdown(); } + /** + * 销毁操作。调用取消重试任务方法以确保所有任务被取消 + */ void destroy() { cancelRetryTask(); } /** - * @deprecated only for test + * 获取重试执行器实例,仅用于测试目的 + * + * @deprecated 仅用于测试 */ @Deprecated ScheduledExecutorService getRetryExecutor() { @@ -530,6 +701,13 @@ public abstract class AbstractMetadataReport implements MetadataReport { } } + /** + * 将订阅者数据保存到持久存储中。如果 URL 列表为空,则直接返回 + * 对 URL 列表进行编码后保存。 + * + * @param subscriberMetadataIdentifier 订阅者元数据标识 + * @param urls URL 列表 + */ private void doSaveSubscriberData(SubscriberMetadataIdentifier subscriberMetadataIdentifier, List urls) { if (CollectionUtils.isEmpty(urls)) { return; @@ -541,25 +719,66 @@ public abstract class AbstractMetadataReport implements MetadataReport { doSaveSubscriberData(subscriberMetadataIdentifier, encodedUrlList); } - protected abstract void doStoreProviderMetadata( - MetadataIdentifier providerMetadataIdentifier, String serviceDefinitions); + /** + * 存储提供者元数据信息的抽象方法。由子类实现具体存储逻辑 + * + * @param providerMetadataIdentifier 提供者元数据标识 + * @param serviceDefinitions 服务定义信息字符串 + */ + protected abstract void doStoreProviderMetadata(MetadataIdentifier providerMetadataIdentifier, String serviceDefinitions); - protected abstract void doStoreConsumerMetadata( - MetadataIdentifier consumerMetadataIdentifier, String serviceParameterString); + /** + * 存储消费者元数据信息的抽象方法。由子类实现具体存储逻辑 + * + * @param consumerMetadataIdentifier 消费者元数据标识 + * @param serviceParameterString 服务参数字符串 + */ + protected abstract void doStoreConsumerMetadata(MetadataIdentifier consumerMetadataIdentifier, String serviceParameterString); + /** + * 存储服务元数据信息的抽象方法。由子类实现具体存储逻辑 + * + * @param metadataIdentifier 服务元数据标识 + * @param url URL 对象 + */ protected abstract void doSaveMetadata(ServiceMetadataIdentifier metadataIdentifier, URL url); + /** + * 删除服务元数据信息的抽象方法。由子类实现具体删除逻辑 + * + * @param metadataIdentifier 服务元数据标识 + */ protected abstract void doRemoveMetadata(ServiceMetadataIdentifier metadataIdentifier); + /** + * 获取导出的 URL 列表的抽象方法。由子类实现具体获取逻辑 + * + * @param metadataIdentifier 服务元数据标识 + * @return 导出的 URL 列表 + */ protected abstract List doGetExportedURLs(ServiceMetadataIdentifier metadataIdentifier); - protected abstract void doSaveSubscriberData( - SubscriberMetadataIdentifier subscriberMetadataIdentifier, String urlListStr); + /** + * 存储订阅者数据的抽象方法。由子类实现具体存储逻辑 + * + * @param subscriberMetadataIdentifier 订阅者元数据标识 + * @param urlListStr URL 列表的 JSON 字符串形式 + */ + protected abstract void doSaveSubscriberData(SubscriberMetadataIdentifier subscriberMetadataIdentifier, String urlListStr); + /** + * 获取订阅的 URL 列表的抽象方法。由子类实现具体获取逻辑 + * + * @param subscriberMetadataIdentifier 订阅者元数据标识 + * @return 订阅的 URL 列表的 JSON 字符串形式 + */ protected abstract String doGetSubscribedURLs(SubscriberMetadataIdentifier subscriberMetadataIdentifier); /** - * @deprecated only for unit test + * 获取报告缓存执行器的方法。仅供单元测试使用 + * + * @return 报告缓存执行器 + * @deprecated 仅供单元测试使用 */ @Deprecated protected ExecutorService getReportCacheExecutor() { @@ -567,7 +786,10 @@ public abstract class AbstractMetadataReport implements MetadataReport { } /** - * @deprecated only for unit test + * 获取元数据报告重试管理器的方法。仅供单元测试使用 + * + * @return 元数据报告重试管理器 + * @deprecated 仅供单元测试使用 */ @Deprecated protected MetadataReportRetry getMetadataReportRetry() { diff --git a/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/store/redis/RedisMetadataReport.java b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/store/redis/RedisMetadataReport.java index b2bb4186..9d1fd0c9 100644 --- a/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/store/redis/RedisMetadataReport.java +++ b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/store/redis/RedisMetadataReport.java @@ -16,6 +16,7 @@ */ package org.apache.dubbo.metadata.store.redis; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.config.configcenter.ConfigItem; import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; @@ -28,40 +29,17 @@ import org.apache.dubbo.metadata.MappingChangedEvent; import org.apache.dubbo.metadata.MappingListener; import org.apache.dubbo.metadata.MetadataInfo; import org.apache.dubbo.metadata.ServiceNameMapping; -import org.apache.dubbo.metadata.report.identifier.BaseMetadataIdentifier; -import org.apache.dubbo.metadata.report.identifier.KeyTypeEnum; -import org.apache.dubbo.metadata.report.identifier.MetadataIdentifier; -import org.apache.dubbo.metadata.report.identifier.ServiceMetadataIdentifier; -import org.apache.dubbo.metadata.report.identifier.SubscriberMetadataIdentifier; +import org.apache.dubbo.metadata.report.identifier.*; import org.apache.dubbo.metadata.report.support.AbstractMetadataReport; import org.apache.dubbo.rpc.RpcException; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.commons.pool2.impl.GenericObjectPoolConfig; -import redis.clients.jedis.HostAndPort; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisCluster; -import redis.clients.jedis.JedisPool; -import redis.clients.jedis.JedisPoolConfig; -import redis.clients.jedis.JedisPubSub; -import redis.clients.jedis.Transaction; +import redis.clients.jedis.*; import redis.clients.jedis.params.SetParams; import redis.clients.jedis.util.JedisClusterCRC16; -import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_KEY; -import static org.apache.dubbo.common.constants.CommonConstants.CYCLE_REPORT_KEY; -import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT; -import static org.apache.dubbo.common.constants.CommonConstants.GROUP_CHAR_SEPARATOR; -import static org.apache.dubbo.common.constants.CommonConstants.QUEUES_KEY; -import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.dubbo.common.constants.CommonConstants.*; import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_RESPONSE; import static org.apache.dubbo.metadata.MetadataConstants.META_DATA_STORE_TAG; import static org.apache.dubbo.metadata.ServiceNameMapping.DEFAULT_MAPPING_GROUP; @@ -69,31 +47,40 @@ import static org.apache.dubbo.metadata.ServiceNameMapping.getAppNames; import static org.apache.dubbo.metadata.report.support.Constants.DEFAULT_METADATA_REPORT_CYCLE_REPORT; /** - * RedisMetadataReport + * RedisMetadataReport 是基于 Redis 的元数据报告实现类 */ public class RedisMetadataReport extends AbstractMetadataReport { - private static final String REDIS_DATABASE_KEY = "database"; private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(RedisMetadataReport.class); - - // protected , for test + // 受保护的 JedisPool 实例,用于测试 protected JedisPool pool; + // Redis 集群节点集合 private Set jedisClusterNodes; private int timeout; private String password; private final String root; + // 映射数据监听器映射表 private final ConcurrentHashMap mappingDataListenerMap = new ConcurrentHashMap<>(); private SetParams jedisParams = SetParams.setParams(); + /** + * 构造方法,根据给定的 URL 初始化 RedisMetadataReport + * + * @param url 元数据中心的 URL + */ public RedisMetadataReport(URL url) { super(url); timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); password = url.getPassword(); this.root = url.getGroup(DEFAULT_ROOT); + + // 设置默认的周期性报告时间 if (url.getParameter(CYCLE_REPORT_KEY, DEFAULT_METADATA_REPORT_CYCLE_REPORT)) { - // ttl default is twice the cycle-report time + // TTL 默认是周期报告时间的两倍 jedisParams.ex(ONE_DAY_IN_MILLISECONDS * 2); } + + // 判断是否为集群模式 if (url.getParameter(CLUSTER_KEY, false)) { jedisClusterNodes = new HashSet<>(); List urls = url.getBackupUrls(); @@ -101,31 +88,61 @@ public class RedisMetadataReport extends AbstractMetadataReport { jedisClusterNodes.add(new HostAndPort(tmpUrl.getHost(), tmpUrl.getPort())); } } else { + // 单机模式下的 Redis 数据库编号,默认为 0 int database = url.getParameter(REDIS_DATABASE_KEY, 0); pool = new JedisPool(new JedisPoolConfig(), url.getHost(), url.getPort(), timeout, password, database); } } + /** + * 存储提供者元数据的具体实现 + * + * @param providerMetadataIdentifier 提供者元数据标识符 + * @param serviceDefinitions 服务定义信息 + */ @Override protected void doStoreProviderMetadata(MetadataIdentifier providerMetadataIdentifier, String serviceDefinitions) { this.storeMetadata(providerMetadataIdentifier, serviceDefinitions); } + /** + * 存储消费者元数据的具体实现 + * + * @param consumerMetadataIdentifier 消费者元数据标识符 + * @param value 元数据值 + */ @Override protected void doStoreConsumerMetadata(MetadataIdentifier consumerMetadataIdentifier, String value) { this.storeMetadata(consumerMetadataIdentifier, value); } + /** + * 存储服务元数据的具体实现 + * + * @param serviceMetadataIdentifier 服务元数据标识符 + * @param url 服务URL编码后的完整字符串 + */ @Override protected void doSaveMetadata(ServiceMetadataIdentifier serviceMetadataIdentifier, URL url) { this.storeMetadata(serviceMetadataIdentifier, URL.encode(url.toFullString())); } + /** + * 移除元数据的具体实现 + * + * @param serviceMetadataIdentifier 服务元数据标识符 + */ @Override protected void doRemoveMetadata(ServiceMetadataIdentifier serviceMetadataIdentifier) { this.deleteMetadata(serviceMetadataIdentifier); } + /** + * 获取导出的URL列表 + * + * @param metadataIdentifier 服务元数据标识符 + * @return 导出的URL列表,如果内容为空则返回空列表 + */ @Override protected List doGetExportedURLs(ServiceMetadataIdentifier metadataIdentifier) { String content = getMetadata(metadataIdentifier); @@ -135,21 +152,45 @@ public class RedisMetadataReport extends AbstractMetadataReport { return new ArrayList<>(Arrays.asList(URL.decode(content))); } + /** + * 存储订阅者数据的具体实现 + * + * @param subscriberMetadataIdentifier 订阅者元数据标识符 + * @param urlListStr URL列表字符串 + */ @Override protected void doSaveSubscriberData(SubscriberMetadataIdentifier subscriberMetadataIdentifier, String urlListStr) { this.storeMetadata(subscriberMetadataIdentifier, urlListStr); } + /** + * 获取订阅的URL列表 + * + * @param subscriberMetadataIdentifier 订阅者元数据标识符 + * @return 订阅的URL列表 + */ @Override protected String doGetSubscribedURLs(SubscriberMetadataIdentifier subscriberMetadataIdentifier) { return this.getMetadata(subscriberMetadataIdentifier); } + /** + * 获取服务定义 + * + * @param metadataIdentifier 元数据标识符 + * @return 服务定义内容 + */ @Override public String getServiceDefinition(MetadataIdentifier metadataIdentifier) { return this.getMetadata(metadataIdentifier); } + /** + * 存储元数据的通用方法,根据是否有连接池选择存储方式 + * + * @param metadataIdentifier 元数据标识符 + * @param v 元数据值 + */ private void storeMetadata(BaseMetadataIdentifier metadataIdentifier, String v) { if (pool != null) { storeMetadataStandalone(metadataIdentifier, v); @@ -158,6 +199,12 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } + /** + * 在集群模式下存储元数据 + * + * @param metadataIdentifier 元数据标识符 + * @param v 元数据值 + */ private void storeMetadataInCluster(BaseMetadataIdentifier metadataIdentifier, String v) { try (JedisCluster jedisCluster = new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) { @@ -170,6 +217,12 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } + /** + * 在单机模式下存储元数据 + * + * @param metadataIdentifier 元数据标识符 + * @param v 元数据值 + */ private void storeMetadataStandalone(BaseMetadataIdentifier metadataIdentifier, String v) { try (Jedis jedis = pool.getResource()) { jedis.set(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), v, jedisParams); @@ -180,6 +233,11 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } + /** + * 删除元数据 + * + * @param metadataIdentifier 元数据标识符 + */ private void deleteMetadata(BaseMetadataIdentifier metadataIdentifier) { if (pool != null) { deleteMetadataStandalone(metadataIdentifier); @@ -188,6 +246,11 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } + /** + * 在集群模式下删除元数据 + * + * @param metadataIdentifier 元数据标识符 + */ private void deleteMetadataInCluster(BaseMetadataIdentifier metadataIdentifier) { try (JedisCluster jedisCluster = new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) { @@ -199,6 +262,11 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } + /** + * 在单机模式下删除元数据 + * + * @param metadataIdentifier 元数据标识符 + */ private void deleteMetadataStandalone(BaseMetadataIdentifier metadataIdentifier) { try (Jedis jedis = pool.getResource()) { jedis.del(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY)); @@ -209,6 +277,12 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } + /** + * 获取元数据 + * + * @param metadataIdentifier 元数据标识符 + * @return 元数据值 + */ private String getMetadata(BaseMetadataIdentifier metadataIdentifier) { if (pool != null) { return getMetadataStandalone(metadataIdentifier); @@ -217,6 +291,12 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } + /** + * 在集群模式下获取元数据 + * + * @param metadataIdentifier 元数据标识符 + * @return 元数据值 + */ private String getMetadataInCluster(BaseMetadataIdentifier metadataIdentifier) { try (JedisCluster jedisCluster = new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) { @@ -228,6 +308,12 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } + /** + * 在单机模式下获取元数据 + * + * @param metadataIdentifier 元数据标识符 + * @return 元数据值 + */ private String getMetadataStandalone(BaseMetadataIdentifier metadataIdentifier) { try (Jedis jedis = pool.getResource()) { return jedis.get(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY)); @@ -239,15 +325,17 @@ public class RedisMetadataReport extends AbstractMetadataReport { } /** - * Store class and application names using Redis hashes - * key: default 'dubbo:mapping' - * field: class (serviceInterface) - * value: application_names - * @param serviceInterface field(class) - * @param defaultMappingGroup {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP} - * @param newConfigContent new application_names - * @param ticket previous application_names - * @return + * 使用Redis哈希存储类和应用程序名称的映射关系 + *

+ * 键:默认为 'dubbo:mapping' + * 字段:类(serviceInterface) + * 值:应用程序名称列表 + * + * @param serviceInterface 类名(作为字段) + * @param defaultMappingGroup 默认映射组 {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP} + * @param newConfigContent 新的应用程序名称列表 + * @param ticket 先前的应用程序名称列表 + * @return 是否成功注册映射关系 */ @Override public boolean registerServiceAppMapping( @@ -265,6 +353,17 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } + /** + * 根据是否存在 Redis 连接池选择存储映射关系的方法。 + * 如果存在 Redis 连接池,则使用单机存储方式 {@link #storeMappingStandalone(String, String, String, String)}; + * 否则,使用集群存储方式 {@link #storeMappingInCluster(String, String, String, String)}。 + * + * @param key 存储键 + * @param field 存储字段 + * @param value 存储值 + * @param ticket 事务票据,用于 CAS 操作 + * @return 存储是否成功 + */ private boolean storeMapping(String key, String field, String value, String ticket) { if (pool != null) { return storeMappingStandalone(key, field, value, ticket); @@ -274,8 +373,17 @@ public class RedisMetadataReport extends AbstractMetadataReport { } /** - * use 'watch' to implement cas. - * Find information about slot distribution by key. + * 在 Redis 集群中存储映射关系 + * 使用 Redis 集群的方式存储给定键和字段的映射关系,并实现乐观锁 CAS 操作 + * 如果旧值为空或与给定的事务票据匹配,则更新字段的值为新值,并发布更新事件 + * 使用 WATCH 和 MULTI 来实现事务操作 + * + * @param key 存储键 + * @param field 存储字段 + * @param value 存储值 + * @param ticket 事务票据,用于 CAS 操作 + * @return 存储是否成功 + * @throws RpcException 存储过程中发生的异常,以及失败的原因 */ private boolean storeMappingInCluster(String key, String field, String value, String ticket) { try (JedisCluster jedisCluster = @@ -304,8 +412,14 @@ public class RedisMetadataReport extends AbstractMetadataReport { } /** - * use 'watch' to implement cas. - * Find information about slot distribution by key. + * 在单机Redis中存储映射关系 + * 使用 'watch' 实现CAS(比较并交换) + * + * @param key Redis键 + * @param field Redis哈希字段(类名) + * @param value 新的应用程序名称列表 + * @param ticket 先前的应用程序名称列表 + * @return 是否成功存储映射关系 */ private boolean storeMappingStandalone(String key, String field, String value, String ticket) { try (Jedis jedis = pool.getResource()) { @@ -330,36 +444,48 @@ public class RedisMetadataReport extends AbstractMetadataReport { } /** - * build mapping key - * @param defaultMappingGroup {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP} - * @return + * 构建映射关键字,用于存储服务类和应用名称的 Redis 哈希表键 + * 结合根路径和默认映射组名构建完整的映射键 + * + * @param defaultMappingGroup 默认映射组名 {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP} + * @return 构建的映射关键字 */ private String buildMappingKey(String defaultMappingGroup) { return this.root + GROUP_CHAR_SEPARATOR + defaultMappingGroup; } /** - * build pub/sub key + * 构建发布订阅键,用于 Redis 发布-订阅模式中的通道名称 + * 结合默认映射组名和队列键构建完整的发布订阅键 + * + * @return 构建的发布订阅键 */ private String buildPubSubKey() { return buildMappingKey(DEFAULT_MAPPING_GROUP) + GROUP_CHAR_SEPARATOR + QUEUES_KEY; } /** - * get content and use content to complete cas - * @param serviceKey class - * @param group {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP} + * 根据服务键和分组获取配置项 + * 使用分组构建映射键,并获取映射数据,然后返回一个配置项对象 + * + * @param serviceKey 服务键,用于标识特定的服务 + * @param group 分组,用于构建映射键 + * @return 配置项对象,包含从映射数据中获取的内容 */ @Override public ConfigItem getConfigItem(String serviceKey, String group) { String key = buildMappingKey(group); String content = getMappingData(key, serviceKey); - return new ConfigItem(content, content); } /** - * get current application_names + * 根据键和字段从 Redis 中获取映射数据 + * 如果连接池不为空,则使用独立模式获取数据;否则使用集群模式 + * + * @param key 键,用于定位数据的存储位置 + * @param field 字段,用于定位具体的数据项 + * @return 获取到的映射数据 */ private String getMappingData(String key, String field) { if (pool != null) { @@ -369,6 +495,14 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } + /** + * 从 Redis 集群中获取指定键和字段的映射数据 + * + * @param key Redis 哈希表的键 + * @param field 哈希表中的字段 + * @return 返回键和字段对应的值,如果获取失败则抛出异常 + * @throws RpcException 如果从 Redis 集群获取数据失败,抛出该异常 + */ private String getMappingDataInCluster(String key, String field) { try (JedisCluster jedisCluster = new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) { @@ -380,6 +514,13 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } + /** + * 使用集群模式从 Redis 中获取映射数据 + * + * @param key 键,用于定位数据的存储位置 + * @param field 字段,用于定位具体的数据项 + * @return 获取到的映射数据,如果获取失败则抛出 RpcException 异常 + */ private String getMappingDataStandalone(String key, String field) { try (Jedis jedis = pool.getResource()) { return jedis.hget(key, field); @@ -391,7 +532,10 @@ public class RedisMetadataReport extends AbstractMetadataReport { } /** - * remove listener. If have no listener,thread will dead + * 移除服务应用映射的监听器 + * + * @param serviceKey 服务键,用于标识特定的服务 + * @param listener 映射监听器,用于处理映射变更事件 */ @Override public void removeServiceAppMappingListener(String serviceKey, MappingListener listener) { @@ -406,8 +550,13 @@ public class RedisMetadataReport extends AbstractMetadataReport { } /** - * Start a thread and subscribe to {@link this#buildPubSubKey()}. - * Notify {@link MappingListener} if there is a change in the 'application_names' message. + * 启动一个线程并订阅 {@link this#buildPubSubKey()} + * 如果 'application_names' 消息发生变化,则通知 {@link MappingListener}。 + * + * @param serviceKey 服务键 + * @param listener 映射监听器 + * @param url URL + * @return 返回服务与应用映射关系的集合 */ @Override public Set getServiceAppMapping(String serviceKey, MappingListener listener, URL url) { @@ -421,45 +570,82 @@ public class RedisMetadataReport extends AbstractMetadataReport { return this.getServiceAppMapping(serviceKey, url); } + /** + * 获取指定服务键的服务与应用映射关系集合 + * + * @param serviceKey 服务键 + * @param url URL + * @return 返回服务与应用映射关系的集合 + */ @Override public Set getServiceAppMapping(String serviceKey, URL url) { String key = buildMappingKey(DEFAULT_MAPPING_GROUP); return getAppNames(getMappingData(key, serviceKey)); } + /** + * 获取订阅者元数据信息 + * + * @param identifier 订阅者元数据标识符 + * @param instanceMetadata 实例元数据映射 + * @return 返回订阅者的元数据信息对象 + */ @Override public MetadataInfo getAppMetadata(SubscriberMetadataIdentifier identifier, Map instanceMetadata) { String content = this.getMetadata(identifier); return JsonUtils.toJavaObject(content, MetadataInfo.class); } + /** + * 发布应用元数据信息 + * + * @param identifier 订阅者元数据标识符 + * @param metadataInfo 元数据信息对象 + */ @Override public void publishAppMetadata(SubscriberMetadataIdentifier identifier, MetadataInfo metadataInfo) { this.storeMetadata(identifier, metadataInfo.getContent()); } + /** + * 取消发布应用元数据信息 + * + * @param identifier 订阅者元数据标识符 + * @param metadataInfo 元数据信息对象 + */ @Override public void unPublishAppMetadata(SubscriberMetadataIdentifier identifier, MetadataInfo metadataInfo) { this.deleteMetadata(identifier); } - // for test + // 用于测试 public MappingDataListener getMappingDataListener() { return mappingDataListenerMap.get(buildPubSubKey()); } /** - * Listen for changes in the 'application_names' message and notify the listener. + * 监听 'application_names' 消息的变化并通知监听器 */ class NotifySub extends JedisPubSub { - private final Map> listeners = new ConcurrentHashMap<>(); + /** + * 添加监听器 + * + * @param key 监听的键 + * @param listener 监听器对象 + */ public void addListener(String key, MappingListener listener) { Set listenerSet = listeners.computeIfAbsent(key, k -> new ConcurrentHashSet<>()); listenerSet.add(listener); } + /** + * 移除监听器 + * + * @param serviceKey 服务键 + * @param listener 监听器对象 + */ public void removeListener(String serviceKey, MappingListener listener) { Set listenerSet = this.listeners.get(serviceKey); if (listenerSet != null) { @@ -470,10 +656,21 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } + /** + * 检查监听器集合是否为空 + * + * @return 如果监听器集合为空则返回 true,否则返回 false + */ public Boolean isEmpty() { return this.listeners.isEmpty(); } + /** + * 当接收到消息时触发的方法 + * + * @param key 消息的键 + * @param msg 接收到的消息内容 + */ @Override public void onMessage(String key, String msg) { logger.info("sub from redis:" + key + " message:" + msg); @@ -486,11 +683,24 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } + /** + * 当接收到模式消息时触发的方法 + * + * @param pattern 模式 + * @param key 消息的键 + * @param msg 接收到的消息内容 + */ @Override public void onPMessage(String pattern, String key, String msg) { onMessage(key, msg); } + /** + * 当成功订阅模式时触发的方法 + * + * @param pattern 订阅的模式 + * @param subscribedChannels 订阅的频道数量 + */ @Override public void onPSubscribe(String pattern, int subscribedChannels) { super.onPSubscribe(pattern, subscribedChannels); @@ -498,7 +708,7 @@ public class RedisMetadataReport extends AbstractMetadataReport { } /** - * Subscribe application names change message. + * 监听应用名称变化消息的线程类 */ class MappingDataListener extends Thread { @@ -508,14 +718,27 @@ public class RedisMetadataReport extends AbstractMetadataReport { // for test protected volatile boolean running = true; + /** + * 构造方法,指定监听的路径 + * + * @param path 监听的路径 + */ public MappingDataListener(String path) { this.path = path; } + /** + * 获取通知订阅器 + * + * @return 通知订阅器 + */ public NotifySub getNotifySub() { return notifySub; } + /** + * 线程运行方法,持续订阅指定路径的消息 + */ @Override public void run() { while (running) { @@ -540,6 +763,9 @@ public class RedisMetadataReport extends AbstractMetadataReport { } } + /** + * 关闭方法,用于停止线程运行并取消订阅指定路径的消息 + */ public void shutdown() { try { running = false; diff --git a/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/dromara/common/dubbo/config/CustomBeanFactoryPostProcessor.java b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/dromara/common/dubbo/config/CustomBeanFactoryPostProcessor.java index 0384cd7e..ef28564d 100644 --- a/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/dromara/common/dubbo/config/CustomBeanFactoryPostProcessor.java +++ b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/dromara/common/dubbo/config/CustomBeanFactoryPostProcessor.java @@ -16,27 +16,44 @@ import java.net.InetAddress; */ public class CustomBeanFactoryPostProcessor implements BeanFactoryPostProcessor, Ordered { - @Override - public int getOrder() { - return Ordered.HIGHEST_PRECEDENCE; - } + /** + * 获取该 BeanFactoryPostProcessor 的顺序,确保它在容器初始化过程中具有最高优先级 + * + * @return 优先级顺序值,越小优先级越高 + */ + @Override + public int getOrder() { + return Ordered.HIGHEST_PRECEDENCE; + } + /** + * 在 Spring 容器初始化过程中对 Bean 工厂进行后置处理 + * + * @param beanFactory 可配置的 Bean 工厂 + * @throws BeansException 如果在处理过程中发生错误 + */ @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { + // 获取 InetUtils bean,用于获取 IP 地址 InetUtils inetUtils = beanFactory.getBean(InetUtils.class); String ip = "127.0.0.1"; + // 获取第一个非回环地址 InetAddress address = inetUtils.findFirstNonLoopbackAddress(); if (address != null) { if (address instanceof Inet6Address) { + // 处理 IPv6 地址 String ipv6AddressString = address.getHostAddress(); if (ipv6AddressString.contains("%")) { + // 去掉可能存在的范围 ID ipv6AddressString = ipv6AddressString.substring(0, ipv6AddressString.indexOf("%")); } ip = ipv6AddressString; } else { + // 处理 IPv4 地址 ip = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress(); } } + // 设置系统属性 DUBBO_IP_TO_REGISTRY 为获取到的 IP 地址 System.setProperty("DUBBO_IP_TO_REGISTRY", ip); } } diff --git a/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/dromara/common/dubbo/config/DubboConfiguration.java b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/dromara/common/dubbo/config/DubboConfiguration.java index 9d50dd82..9e88785a 100644 --- a/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/dromara/common/dubbo/config/DubboConfiguration.java +++ b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/dromara/common/dubbo/config/DubboConfiguration.java @@ -16,6 +16,9 @@ import org.springframework.context.annotation.PropertySource; @PropertySource(value = "classpath:common-dubbo.yml", factory = YmlPropertySourceFactory.class) public class DubboConfiguration { + /** + * dubbo自定义IP注入(避免IP不正确问题) + */ @Bean public BeanFactoryPostProcessor customBeanFactoryPostProcessor() { return new CustomBeanFactoryPostProcessor(); diff --git a/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/dromara/common/dubbo/enumd/RequestLogEnum.java b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/dromara/common/dubbo/enumd/RequestLogEnum.java index f238eebd..950114e7 100644 --- a/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/dromara/common/dubbo/enumd/RequestLogEnum.java +++ b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/dromara/common/dubbo/enumd/RequestLogEnum.java @@ -11,8 +11,18 @@ import lombok.AllArgsConstructor; public enum RequestLogEnum { /** - * info 基础信息 param 参数信息 full 全部 + * info 基础信息 */ - INFO, PARAM, FULL; + INFO, + + /** + * param 参数信息 + */ + PARAM, + + /** + * full 全部 + */ + FULL; } diff --git a/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/dromara/common/dubbo/filter/DubboRequestFilter.java b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/dromara/common/dubbo/filter/DubboRequestFilter.java index 382dc94e..273155e2 100644 --- a/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/dromara/common/dubbo/filter/DubboRequestFilter.java +++ b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/dromara/common/dubbo/filter/DubboRequestFilter.java @@ -1,17 +1,28 @@ package org.dromara.common.dubbo.filter; -import org.dromara.common.core.utils.SpringUtils; -import org.dromara.common.dubbo.enumd.RequestLogEnum; -import org.dromara.common.dubbo.properties.DubboCustomProperties; import lombok.extern.slf4j.Slf4j; import org.apache.dubbo.common.constants.CommonConstants; import org.apache.dubbo.common.extension.Activate; import org.apache.dubbo.rpc.*; import org.apache.dubbo.rpc.service.GenericService; +import org.dromara.common.core.utils.SpringUtils; +import org.dromara.common.dubbo.enumd.RequestLogEnum; +import org.dromara.common.dubbo.properties.DubboCustomProperties; import org.dromara.common.json.utils.JsonUtils; /** - * dubbo日志过滤器 + * Dubbo 日志过滤器 + *

+ * 该过滤器通过实现 Dubbo 的 Filter 接口,在服务调用前后记录日志信息 + * 可根据配置开关和日志级别输出不同详细程度的日志信息 + *

+ * 激活条件: + * - 在 Provider 和 Consumer 端都生效 + * - 执行顺序设置为最大值,确保在所有其他过滤器之后执行 + *

+ * 使用 SpringUtils 获取配置信息,根据配置决定是否记录日志及日志详细程度 + *

+ * 使用 Lombok 的 @Slf4j 注解简化日志记录 * * @author Lion Li */ @@ -19,33 +30,48 @@ import org.dromara.common.json.utils.JsonUtils; @Activate(group = {CommonConstants.PROVIDER, CommonConstants.CONSUMER}, order = Integer.MAX_VALUE) public class DubboRequestFilter implements Filter { + /** + * Dubbo Filter 接口实现方法,处理服务调用逻辑并记录日志 + * + * @param invoker Dubbo 服务调用者实例 + * @param invocation 调用的具体方法信息 + * @return 调用结果 + * @throws RpcException 如果调用过程中发生异常 + */ @Override public Result invoke(Invoker invoker, Invocation invocation) throws RpcException { DubboCustomProperties properties = SpringUtils.getBean(DubboCustomProperties.class); + // 如果未开启请求日志记录,则直接执行服务调用并返回结果 if (!properties.getRequestLog()) { - // 未开启则跳过日志逻辑 return invoker.invoke(invocation); } + + // 判断是 Provider 还是 Consumer String client = CommonConstants.PROVIDER; if (RpcContext.getServiceContext().isConsumerSide()) { client = CommonConstants.CONSUMER; } + + // 构建基础日志信息 String baselog = "Client[" + client + "],InterfaceName=[" + invocation.getInvoker().getInterface().getSimpleName() + "],MethodName=[" + invocation.getMethodName() + "]"; + // 根据日志级别输出不同详细程度的日志信息 if (properties.getLogLevel() == RequestLogEnum.INFO) { log.info("DUBBO - 服务调用: {}", baselog); } else { log.info("DUBBO - 服务调用: {},Parameter={}", baselog, invocation.getArguments()); } + // 记录调用开始时间 long startTime = System.currentTimeMillis(); // 执行接口调用逻辑 Result result = invoker.invoke(invocation); - // 调用耗时 + // 计算调用耗时 long elapsed = System.currentTimeMillis() - startTime; - // 如果发生异常 则打印异常日志 + // 如果发生异常且调用的是泛化服务,则记录异常日志 if (result.hasException() && invoker.getInterface().equals(GenericService.class)) { log.error("DUBBO - 服务异常: {},Exception={}", baselog, result.getException()); } else { + // 根据日志级别输出服务响应信息 if (properties.getLogLevel() == RequestLogEnum.INFO) { log.info("DUBBO - 服务响应: {},SpendTime=[{}ms]", baselog, elapsed); } else if (properties.getLogLevel() == RequestLogEnum.FULL) { diff --git a/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/dromara/common/dubbo/properties/DubboCustomProperties.java b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/dromara/common/dubbo/properties/DubboCustomProperties.java index 89180886..e0df2cd7 100644 --- a/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/dromara/common/dubbo/properties/DubboCustomProperties.java +++ b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/dromara/common/dubbo/properties/DubboCustomProperties.java @@ -15,8 +15,14 @@ import org.springframework.cloud.context.config.annotation.RefreshScope; @ConfigurationProperties(prefix = "dubbo.custom") public class DubboCustomProperties { + /** + * 是否开启请求日志记录 + */ private Boolean requestLog; + /** + * 日志级别 + */ private RequestLogEnum logLevel; }