From 021f030da8464776218d009037cd9f699a954744 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=96=AF=E7=8B=82=E7=9A=84=E7=8B=AE=E5=AD=90Li?= <15040126243@163.com> Date: Thu, 20 Jun 2024 17:38:07 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E4=BC=98=E5=8C=96=20dubbo=20=E4=BD=BF?= =?UTF-8?q?=E7=94=A8=20redis=20=E4=BD=9C=E4=B8=BA=E5=85=83=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E4=B8=AD=E5=BF=83=E7=AE=A1=E7=90=86=20=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E8=BF=87=E6=9C=9F=E6=97=B6=E9=97=B4=20=E9=81=BF?= =?UTF-8?q?=E5=85=8D=E8=BF=87=E6=9C=9F=E6=95=B0=E6=8D=AE=E5=A0=86=E7=A7=AF?= =?UTF-8?q?=20=E8=A7=A3=E6=94=BEnacos=E5=AD=98=E5=82=A8=E7=A9=BA=E9=97=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ruoyi-common/ruoyi-common-alibaba-bom/pom.xml | 6 + ruoyi-common/ruoyi-common-dubbo/pom.xml | 15 + .../support/AbstractMetadataReport.java | 576 ++++++++++++++++++ .../store/redis/RedisMetadataReport.java | 553 +++++++++++++++++ .../src/main/resources/common-dubbo.yml | 8 + 5 files changed, 1158 insertions(+) create mode 100644 ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/report/support/AbstractMetadataReport.java create mode 100644 ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/store/redis/RedisMetadataReport.java diff --git a/ruoyi-common/ruoyi-common-alibaba-bom/pom.xml b/ruoyi-common/ruoyi-common-alibaba-bom/pom.xml index 47dce94d..11f12752 100644 --- a/ruoyi-common/ruoyi-common-alibaba-bom/pom.xml +++ b/ruoyi-common/ruoyi-common-alibaba-bom/pom.xml @@ -166,6 +166,12 @@ ${dubbo.version} + + org.apache.dubbo + dubbo-metadata-report-redis + ${dubbo.version} + + com.alibaba.spring spring-context-support diff --git a/ruoyi-common/ruoyi-common-dubbo/pom.xml b/ruoyi-common/ruoyi-common-dubbo/pom.xml index 4ff44a9d..b5cee969 100644 --- a/ruoyi-common/ruoyi-common-dubbo/pom.xml +++ b/ruoyi-common/ruoyi-common-dubbo/pom.xml @@ -36,6 +36,21 @@ dubbo-spring-boot-actuator + + org.apache.dubbo + dubbo-metadata-report-redis + + + redis.clients + jedis + + + + + redis.clients + jedis + 5.1.0 + org.projectlombok lombok diff --git a/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/report/support/AbstractMetadataReport.java b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/report/support/AbstractMetadataReport.java new file mode 100644 index 00000000..6c53681c --- /dev/null +++ b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/report/support/AbstractMetadataReport.java @@ -0,0 +1,576 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.metadata.report.support; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.utils.CollectionUtils; +import org.apache.dubbo.common.utils.ConfigUtils; +import org.apache.dubbo.common.utils.JsonUtils; +import org.apache.dubbo.common.utils.NamedThreadFactory; +import org.apache.dubbo.metadata.definition.model.FullServiceDefinition; +import org.apache.dubbo.metadata.definition.model.ServiceDefinition; +import org.apache.dubbo.metadata.report.MetadataReport; +import org.apache.dubbo.metadata.report.identifier.KeyTypeEnum; +import org.apache.dubbo.metadata.report.identifier.MetadataIdentifier; +import org.apache.dubbo.metadata.report.identifier.ServiceMetadataIdentifier; +import org.apache.dubbo.metadata.report.identifier.SubscriberMetadataIdentifier; +import org.apache.dubbo.metrics.event.MetricsEventBus; +import org.apache.dubbo.metrics.metadata.event.MetadataEvent; +import org.apache.dubbo.rpc.model.ApplicationModel; + +import java.io.*; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.dubbo.common.constants.CommonConstants.*; +import static org.apache.dubbo.common.constants.LoggerCodeConstants.COMMON_UNEXPECTED_EXCEPTION; +import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROXY_FAILED_EXPORT_SERVICE; +import static org.apache.dubbo.common.utils.StringUtils.replace; +import static org.apache.dubbo.metadata.report.support.Constants.*; + +public abstract class AbstractMetadataReport implements MetadataReport { + + protected static final String DEFAULT_ROOT = "dubbo"; + + protected static final int ONE_DAY_IN_MILLISECONDS = 60 * 24 * 60 * 1000; + private static final int FOUR_HOURS_IN_MILLISECONDS = 60 * 4 * 60 * 1000; + // Log output + protected final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(getClass()); + + // Local disk cache, where the special key value.registries records the list of metadata centers, and the others are + // the list of notified service providers + final Properties properties = new Properties(); + private final ExecutorService reportCacheExecutor = + Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveMetadataReport", true)); + final Map allMetadataReports = new ConcurrentHashMap<>(4); + + private final AtomicLong lastCacheChanged = new AtomicLong(); + final Map failedReports = new ConcurrentHashMap<>(4); + private URL reportURL; + boolean syncReport; + // Local disk cache file + File file; + private AtomicBoolean initialized = new AtomicBoolean(false); + public MetadataReportRetry metadataReportRetry; + private ScheduledExecutorService reportTimerScheduler; + + private final boolean reportMetadata; + private final boolean reportDefinition; + protected ApplicationModel applicationModel; + + public AbstractMetadataReport(URL reportServerURL) { + setUrl(reportServerURL); + applicationModel = reportServerURL.getOrDefaultApplicationModel(); + + boolean localCacheEnabled = reportServerURL.getParameter(REGISTRY_LOCAL_FILE_CACHE_ENABLED, true); + // Start file save timer + String defaultFilename = System.getProperty("user.home") + DUBBO_METADATA + + reportServerURL.getApplication() + + "-" + replace(reportServerURL.getAddress(), ":", "-") + + CACHE; + String filename = reportServerURL.getParameter(FILE_KEY, defaultFilename); + File file = null; + if (localCacheEnabled && ConfigUtils.isNotEmpty(filename)) { + file = new File(filename); + if (!file.exists() + && file.getParentFile() != null + && !file.getParentFile().exists()) { + if (!file.getParentFile().mkdirs()) { + throw new IllegalArgumentException("Invalid service store file " + file + + ", cause: Failed to create directory " + file.getParentFile() + "!"); + } + } + // if this file exists, firstly delete it. + if (!initialized.getAndSet(true) && file.exists()) { + file.delete(); + } + } + this.file = file; + loadProperties(); + syncReport = reportServerURL.getParameter(SYNC_REPORT_KEY, false); + metadataReportRetry = new MetadataReportRetry( + reportServerURL.getParameter(RETRY_TIMES_KEY, DEFAULT_METADATA_REPORT_RETRY_TIMES), + reportServerURL.getParameter(RETRY_PERIOD_KEY, DEFAULT_METADATA_REPORT_RETRY_PERIOD)); + // cycle report the data switch + if (reportServerURL.getParameter(CYCLE_REPORT_KEY, DEFAULT_METADATA_REPORT_CYCLE_REPORT)) { + reportTimerScheduler = Executors.newSingleThreadScheduledExecutor( + new NamedThreadFactory("DubboMetadataReportTimer", true)); + reportTimerScheduler.scheduleAtFixedRate( + this::publishAll, calculateStartTime(), ONE_DAY_IN_MILLISECONDS, TimeUnit.MILLISECONDS); + } + + this.reportMetadata = reportServerURL.getParameter(REPORT_METADATA_KEY, false); + this.reportDefinition = reportServerURL.getParameter(REPORT_DEFINITION_KEY, true); + } + + public URL getUrl() { + return reportURL; + } + + protected void setUrl(URL url) { + if (url == null) { + throw new IllegalArgumentException("metadataReport url == null"); + } + this.reportURL = url; + } + + private void doSaveProperties(long version) { + if (version < lastCacheChanged.get()) { + return; + } + if (file == null) { + return; + } + // Save + try { + File lockfile = new File(file.getAbsolutePath() + ".lock"); + if (!lockfile.exists()) { + lockfile.createNewFile(); + } + try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw"); + FileChannel channel = raf.getChannel()) { + FileLock lock = channel.tryLock(); + if (lock == null) { + throw new IOException( + "Can not lock the metadataReport cache file " + file.getAbsolutePath() + + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.metadata.file=xxx.properties"); + } + // Save + try { + if (!file.exists()) { + file.createNewFile(); + } + + Properties tmpProperties; + if (!syncReport) { + // When syncReport = false, properties.setProperty and properties.store are called from the same + // thread(reportCacheExecutor), so deep copy is not required + tmpProperties = properties; + } else { + // Using store method and setProperty method of the this.properties will cause lock contention + // under multi-threading, so deep copy a new container + tmpProperties = new Properties(); + Set> entries = properties.entrySet(); + for (Map.Entry entry : entries) { + tmpProperties.setProperty((String) entry.getKey(), (String) entry.getValue()); + } + } + + try (FileOutputStream outputFile = new FileOutputStream(file)) { + tmpProperties.store(outputFile, "Dubbo metadataReport Cache"); + } + } finally { + lock.release(); + } + } + } catch (Throwable e) { + if (version < lastCacheChanged.get()) { + return; + } else { + reportCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet())); + } + logger.warn( + COMMON_UNEXPECTED_EXCEPTION, + "", + "", + "Failed to save service store file, cause: " + e.getMessage(), + e); + } + } + + void loadProperties() { + if (file != null && file.exists()) { + try (InputStream in = new FileInputStream(file)) { + properties.load(in); + if (logger.isInfoEnabled()) { + logger.info("Load service store file " + file + ", data: " + properties); + } + } catch (Throwable e) { + logger.warn(COMMON_UNEXPECTED_EXCEPTION, "", "", "Failed to load service store file" + file, e); + } + } + } + + private void saveProperties(MetadataIdentifier metadataIdentifier, String value, boolean add, boolean sync) { + if (file == null) { + return; + } + + try { + if (add) { + properties.setProperty(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), value); + } else { + properties.remove(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY)); + } + long version = lastCacheChanged.incrementAndGet(); + if (sync) { + new SaveProperties(version).run(); + } else { + reportCacheExecutor.execute(new SaveProperties(version)); + } + + } catch (Throwable t) { + logger.warn(COMMON_UNEXPECTED_EXCEPTION, "", "", t.getMessage(), t); + } + } + + @Override + public String toString() { + return getUrl().toString(); + } + + private class SaveProperties implements Runnable { + private long version; + + private SaveProperties(long version) { + this.version = version; + } + + @Override + public void run() { + doSaveProperties(version); + } + } + + @Override + public void storeProviderMetadata( + MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) { + if (syncReport) { + storeProviderMetadataTask(providerMetadataIdentifier, serviceDefinition); + } else { + reportCacheExecutor.execute(() -> storeProviderMetadataTask(providerMetadataIdentifier, serviceDefinition)); + } + } + + private void storeProviderMetadataTask( + MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) { + + MetadataEvent metadataEvent = MetadataEvent.toServiceSubscribeEvent( + applicationModel, providerMetadataIdentifier.getUniqueServiceName()); + MetricsEventBus.post( + metadataEvent, + () -> { + boolean result = true; + try { + if (logger.isInfoEnabled()) { + logger.info("store provider metadata. Identifier : " + providerMetadataIdentifier + + "; definition: " + serviceDefinition); + } + allMetadataReports.put(providerMetadataIdentifier, serviceDefinition); + failedReports.remove(providerMetadataIdentifier); + String data = JsonUtils.toJson(serviceDefinition); + doStoreProviderMetadata(providerMetadataIdentifier, data); + saveProperties(providerMetadataIdentifier, data, true, !syncReport); + } catch (Exception e) { + // retry again. If failed again, throw exception. + failedReports.put(providerMetadataIdentifier, serviceDefinition); + metadataReportRetry.startRetryTask(); + logger.error( + PROXY_FAILED_EXPORT_SERVICE, + "", + "", + "Failed to put provider metadata " + providerMetadataIdentifier + " in " + + serviceDefinition + ", cause: " + e.getMessage(), + e); + result = false; + } + return result; + }, + aBoolean -> aBoolean); + } + + @Override + public void storeConsumerMetadata( + MetadataIdentifier consumerMetadataIdentifier, Map serviceParameterMap) { + if (syncReport) { + storeConsumerMetadataTask(consumerMetadataIdentifier, serviceParameterMap); + } else { + reportCacheExecutor.execute( + () -> storeConsumerMetadataTask(consumerMetadataIdentifier, serviceParameterMap)); + } + } + + protected void storeConsumerMetadataTask( + MetadataIdentifier consumerMetadataIdentifier, Map serviceParameterMap) { + try { + if (logger.isInfoEnabled()) { + logger.info("store consumer metadata. Identifier : " + consumerMetadataIdentifier + "; definition: " + + serviceParameterMap); + } + allMetadataReports.put(consumerMetadataIdentifier, serviceParameterMap); + failedReports.remove(consumerMetadataIdentifier); + + String data = JsonUtils.toJson(serviceParameterMap); + doStoreConsumerMetadata(consumerMetadataIdentifier, data); + saveProperties(consumerMetadataIdentifier, data, true, !syncReport); + } catch (Exception e) { + // retry again. If failed again, throw exception. + failedReports.put(consumerMetadataIdentifier, serviceParameterMap); + metadataReportRetry.startRetryTask(); + logger.error( + PROXY_FAILED_EXPORT_SERVICE, + "", + "", + "Failed to put consumer metadata " + consumerMetadataIdentifier + "; " + serviceParameterMap + + ", cause: " + e.getMessage(), + e); + } + } + + @Override + public void destroy() { + if (reportCacheExecutor != null) { + reportCacheExecutor.shutdown(); + } + if (reportTimerScheduler != null) { + reportTimerScheduler.shutdown(); + } + if (metadataReportRetry != null) { + metadataReportRetry.destroy(); + metadataReportRetry = null; + } + } + + @Override + public void saveServiceMetadata(ServiceMetadataIdentifier metadataIdentifier, URL url) { + if (syncReport) { + doSaveMetadata(metadataIdentifier, url); + } else { + reportCacheExecutor.execute(() -> doSaveMetadata(metadataIdentifier, url)); + } + } + + @Override + public void removeServiceMetadata(ServiceMetadataIdentifier metadataIdentifier) { + if (syncReport) { + doRemoveMetadata(metadataIdentifier); + } else { + reportCacheExecutor.execute(() -> doRemoveMetadata(metadataIdentifier)); + } + } + + @Override + public List getExportedURLs(ServiceMetadataIdentifier metadataIdentifier) { + // TODO, fallback to local cache + return doGetExportedURLs(metadataIdentifier); + } + + @Override + public void saveSubscribedData(SubscriberMetadataIdentifier subscriberMetadataIdentifier, Set urls) { + if (syncReport) { + doSaveSubscriberData(subscriberMetadataIdentifier, JsonUtils.toJson(urls)); + } else { + reportCacheExecutor.execute( + () -> doSaveSubscriberData(subscriberMetadataIdentifier, JsonUtils.toJson(urls))); + } + } + + @Override + public List getSubscribedURLs(SubscriberMetadataIdentifier subscriberMetadataIdentifier) { + String content = doGetSubscribedURLs(subscriberMetadataIdentifier); + return JsonUtils.toJavaList(content, String.class); + } + + String getProtocol(URL url) { + String protocol = url.getSide(); + protocol = protocol == null ? url.getProtocol() : protocol; + return protocol; + } + + /** + * @return if need to continue + */ + public boolean retry() { + return doHandleMetadataCollection(failedReports); + } + + @Override + public boolean shouldReportDefinition() { + return reportDefinition; + } + + @Override + public boolean shouldReportMetadata() { + return reportMetadata; + } + + private boolean doHandleMetadataCollection(Map metadataMap) { + if (metadataMap.isEmpty()) { + return true; + } + Iterator> iterable = + metadataMap.entrySet().iterator(); + while (iterable.hasNext()) { + Map.Entry item = iterable.next(); + if (PROVIDER_SIDE.equals(item.getKey().getSide())) { + this.storeProviderMetadata(item.getKey(), (FullServiceDefinition) item.getValue()); + } else if (CONSUMER_SIDE.equals(item.getKey().getSide())) { + this.storeConsumerMetadata(item.getKey(), (Map) item.getValue()); + } + } + return false; + } + + /** + * not private. just for unittest. + */ + void publishAll() { + logger.info("start to publish all metadata."); + this.doHandleMetadataCollection(allMetadataReports); + } + + /** + * between 2:00 am to 6:00 am, the time is random. + * + * @return + */ + long calculateStartTime() { + Calendar calendar = Calendar.getInstance(); + long nowMill = calendar.getTimeInMillis(); + calendar.set(Calendar.HOUR_OF_DAY, 0); + calendar.set(Calendar.MINUTE, 0); + calendar.set(Calendar.SECOND, 0); + calendar.set(Calendar.MILLISECOND, 0); + long subtract = calendar.getTimeInMillis() + ONE_DAY_IN_MILLISECONDS - nowMill; + return subtract + + (FOUR_HOURS_IN_MILLISECONDS / 2) + + ThreadLocalRandom.current().nextInt(FOUR_HOURS_IN_MILLISECONDS); + } + + class MetadataReportRetry { + protected final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(getClass()); + + final ScheduledExecutorService retryExecutor = + Executors.newScheduledThreadPool(0, new NamedThreadFactory("DubboMetadataReportRetryTimer", true)); + volatile ScheduledFuture retryScheduledFuture; + final AtomicInteger retryCounter = new AtomicInteger(0); + // retry task schedule period + long retryPeriod; + // if no failed report, wait how many times to run retry task. + int retryTimesIfNonFail = 600; + + int retryLimit; + + public MetadataReportRetry(int retryTimes, int retryPeriod) { + this.retryPeriod = retryPeriod; + this.retryLimit = retryTimes; + } + + void startRetryTask() { + if (retryScheduledFuture == null) { + synchronized (retryCounter) { + if (retryScheduledFuture == null) { + retryScheduledFuture = retryExecutor.scheduleWithFixedDelay( + () -> { + // Check and connect to the metadata + try { + int times = retryCounter.incrementAndGet(); + logger.info("start to retry task for metadata report. retry times:" + times); + if (retry() && times > retryTimesIfNonFail) { + cancelRetryTask(); + } + if (times > retryLimit) { + cancelRetryTask(); + } + } catch (Throwable t) { // Defensive fault tolerance + logger.error( + COMMON_UNEXPECTED_EXCEPTION, + "", + "", + "Unexpected error occur at failed retry, cause: " + t.getMessage(), + t); + } + }, + 500, + retryPeriod, + TimeUnit.MILLISECONDS); + } + } + } + } + + void cancelRetryTask() { + if (retryScheduledFuture != null) { + retryScheduledFuture.cancel(false); + } + retryExecutor.shutdown(); + } + + void destroy() { + cancelRetryTask(); + } + + /** + * @deprecated only for test + */ + @Deprecated + ScheduledExecutorService getRetryExecutor() { + return retryExecutor; + } + } + + private void doSaveSubscriberData(SubscriberMetadataIdentifier subscriberMetadataIdentifier, List urls) { + if (CollectionUtils.isEmpty(urls)) { + return; + } + List encodedUrlList = new ArrayList<>(urls.size()); + for (String url : urls) { + encodedUrlList.add(URL.encode(url)); + } + doSaveSubscriberData(subscriberMetadataIdentifier, encodedUrlList); + } + + protected abstract void doStoreProviderMetadata( + MetadataIdentifier providerMetadataIdentifier, String serviceDefinitions); + + protected abstract void doStoreConsumerMetadata( + MetadataIdentifier consumerMetadataIdentifier, String serviceParameterString); + + protected abstract void doSaveMetadata(ServiceMetadataIdentifier metadataIdentifier, URL url); + + protected abstract void doRemoveMetadata(ServiceMetadataIdentifier metadataIdentifier); + + protected abstract List doGetExportedURLs(ServiceMetadataIdentifier metadataIdentifier); + + protected abstract void doSaveSubscriberData( + SubscriberMetadataIdentifier subscriberMetadataIdentifier, String urlListStr); + + protected abstract String doGetSubscribedURLs(SubscriberMetadataIdentifier subscriberMetadataIdentifier); + + /** + * @deprecated only for unit test + */ + @Deprecated + protected ExecutorService getReportCacheExecutor() { + return reportCacheExecutor; + } + + /** + * @deprecated only for unit test + */ + @Deprecated + protected MetadataReportRetry getMetadataReportRetry() { + return metadataReportRetry; + } +} diff --git a/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/store/redis/RedisMetadataReport.java b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/store/redis/RedisMetadataReport.java new file mode 100644 index 00000000..b2bb4186 --- /dev/null +++ b/ruoyi-common/ruoyi-common-dubbo/src/main/java/org/apache/dubbo/metadata/store/redis/RedisMetadataReport.java @@ -0,0 +1,553 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.metadata.store.redis; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.config.configcenter.ConfigItem; +import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.utils.ConcurrentHashMapUtils; +import org.apache.dubbo.common.utils.ConcurrentHashSet; +import org.apache.dubbo.common.utils.JsonUtils; +import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.metadata.MappingChangedEvent; +import org.apache.dubbo.metadata.MappingListener; +import org.apache.dubbo.metadata.MetadataInfo; +import org.apache.dubbo.metadata.ServiceNameMapping; +import org.apache.dubbo.metadata.report.identifier.BaseMetadataIdentifier; +import org.apache.dubbo.metadata.report.identifier.KeyTypeEnum; +import org.apache.dubbo.metadata.report.identifier.MetadataIdentifier; +import org.apache.dubbo.metadata.report.identifier.ServiceMetadataIdentifier; +import org.apache.dubbo.metadata.report.identifier.SubscriberMetadataIdentifier; +import org.apache.dubbo.metadata.report.support.AbstractMetadataReport; +import org.apache.dubbo.rpc.RpcException; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisPoolConfig; +import redis.clients.jedis.JedisPubSub; +import redis.clients.jedis.Transaction; +import redis.clients.jedis.params.SetParams; +import redis.clients.jedis.util.JedisClusterCRC16; + +import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.CYCLE_REPORT_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT; +import static org.apache.dubbo.common.constants.CommonConstants.GROUP_CHAR_SEPARATOR; +import static org.apache.dubbo.common.constants.CommonConstants.QUEUES_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY; +import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_RESPONSE; +import static org.apache.dubbo.metadata.MetadataConstants.META_DATA_STORE_TAG; +import static org.apache.dubbo.metadata.ServiceNameMapping.DEFAULT_MAPPING_GROUP; +import static org.apache.dubbo.metadata.ServiceNameMapping.getAppNames; +import static org.apache.dubbo.metadata.report.support.Constants.DEFAULT_METADATA_REPORT_CYCLE_REPORT; + +/** + * RedisMetadataReport + */ +public class RedisMetadataReport extends AbstractMetadataReport { + + private static final String REDIS_DATABASE_KEY = "database"; + private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(RedisMetadataReport.class); + + // protected , for test + protected JedisPool pool; + private Set jedisClusterNodes; + private int timeout; + private String password; + private final String root; + private final ConcurrentHashMap mappingDataListenerMap = new ConcurrentHashMap<>(); + private SetParams jedisParams = SetParams.setParams(); + + public RedisMetadataReport(URL url) { + super(url); + timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); + password = url.getPassword(); + this.root = url.getGroup(DEFAULT_ROOT); + if (url.getParameter(CYCLE_REPORT_KEY, DEFAULT_METADATA_REPORT_CYCLE_REPORT)) { + // ttl default is twice the cycle-report time + jedisParams.ex(ONE_DAY_IN_MILLISECONDS * 2); + } + if (url.getParameter(CLUSTER_KEY, false)) { + jedisClusterNodes = new HashSet<>(); + List urls = url.getBackupUrls(); + for (URL tmpUrl : urls) { + jedisClusterNodes.add(new HostAndPort(tmpUrl.getHost(), tmpUrl.getPort())); + } + } else { + int database = url.getParameter(REDIS_DATABASE_KEY, 0); + pool = new JedisPool(new JedisPoolConfig(), url.getHost(), url.getPort(), timeout, password, database); + } + } + + @Override + protected void doStoreProviderMetadata(MetadataIdentifier providerMetadataIdentifier, String serviceDefinitions) { + this.storeMetadata(providerMetadataIdentifier, serviceDefinitions); + } + + @Override + protected void doStoreConsumerMetadata(MetadataIdentifier consumerMetadataIdentifier, String value) { + this.storeMetadata(consumerMetadataIdentifier, value); + } + + @Override + protected void doSaveMetadata(ServiceMetadataIdentifier serviceMetadataIdentifier, URL url) { + this.storeMetadata(serviceMetadataIdentifier, URL.encode(url.toFullString())); + } + + @Override + protected void doRemoveMetadata(ServiceMetadataIdentifier serviceMetadataIdentifier) { + this.deleteMetadata(serviceMetadataIdentifier); + } + + @Override + protected List doGetExportedURLs(ServiceMetadataIdentifier metadataIdentifier) { + String content = getMetadata(metadataIdentifier); + if (StringUtils.isEmpty(content)) { + return Collections.emptyList(); + } + return new ArrayList<>(Arrays.asList(URL.decode(content))); + } + + @Override + protected void doSaveSubscriberData(SubscriberMetadataIdentifier subscriberMetadataIdentifier, String urlListStr) { + this.storeMetadata(subscriberMetadataIdentifier, urlListStr); + } + + @Override + protected String doGetSubscribedURLs(SubscriberMetadataIdentifier subscriberMetadataIdentifier) { + return this.getMetadata(subscriberMetadataIdentifier); + } + + @Override + public String getServiceDefinition(MetadataIdentifier metadataIdentifier) { + return this.getMetadata(metadataIdentifier); + } + + private void storeMetadata(BaseMetadataIdentifier metadataIdentifier, String v) { + if (pool != null) { + storeMetadataStandalone(metadataIdentifier, v); + } else { + storeMetadataInCluster(metadataIdentifier, v); + } + } + + private void storeMetadataInCluster(BaseMetadataIdentifier metadataIdentifier, String v) { + try (JedisCluster jedisCluster = + new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) { + jedisCluster.set(metadataIdentifier.getIdentifierKey() + META_DATA_STORE_TAG, v, jedisParams); + } catch (Throwable e) { + String msg = + "Failed to put " + metadataIdentifier + " to redis cluster " + v + ", cause: " + e.getMessage(); + logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e); + throw new RpcException(msg, e); + } + } + + private void storeMetadataStandalone(BaseMetadataIdentifier metadataIdentifier, String v) { + try (Jedis jedis = pool.getResource()) { + jedis.set(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), v, jedisParams); + } catch (Throwable e) { + String msg = "Failed to put " + metadataIdentifier + " to redis " + v + ", cause: " + e.getMessage(); + logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e); + throw new RpcException(msg, e); + } + } + + private void deleteMetadata(BaseMetadataIdentifier metadataIdentifier) { + if (pool != null) { + deleteMetadataStandalone(metadataIdentifier); + } else { + deleteMetadataInCluster(metadataIdentifier); + } + } + + private void deleteMetadataInCluster(BaseMetadataIdentifier metadataIdentifier) { + try (JedisCluster jedisCluster = + new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) { + jedisCluster.del(metadataIdentifier.getIdentifierKey() + META_DATA_STORE_TAG); + } catch (Throwable e) { + String msg = "Failed to delete " + metadataIdentifier + " from redis cluster , cause: " + e.getMessage(); + logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e); + throw new RpcException(msg, e); + } + } + + private void deleteMetadataStandalone(BaseMetadataIdentifier metadataIdentifier) { + try (Jedis jedis = pool.getResource()) { + jedis.del(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY)); + } catch (Throwable e) { + String msg = "Failed to delete " + metadataIdentifier + " from redis , cause: " + e.getMessage(); + logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e); + throw new RpcException(msg, e); + } + } + + private String getMetadata(BaseMetadataIdentifier metadataIdentifier) { + if (pool != null) { + return getMetadataStandalone(metadataIdentifier); + } else { + return getMetadataInCluster(metadataIdentifier); + } + } + + private String getMetadataInCluster(BaseMetadataIdentifier metadataIdentifier) { + try (JedisCluster jedisCluster = + new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) { + return jedisCluster.get(metadataIdentifier.getIdentifierKey() + META_DATA_STORE_TAG); + } catch (Throwable e) { + String msg = "Failed to get " + metadataIdentifier + " from redis cluster , cause: " + e.getMessage(); + logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e); + throw new RpcException(msg, e); + } + } + + private String getMetadataStandalone(BaseMetadataIdentifier metadataIdentifier) { + try (Jedis jedis = pool.getResource()) { + return jedis.get(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY)); + } catch (Throwable e) { + String msg = "Failed to get " + metadataIdentifier + " from redis , cause: " + e.getMessage(); + logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e); + throw new RpcException(msg, e); + } + } + + /** + * Store class and application names using Redis hashes + * key: default 'dubbo:mapping' + * field: class (serviceInterface) + * value: application_names + * @param serviceInterface field(class) + * @param defaultMappingGroup {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP} + * @param newConfigContent new application_names + * @param ticket previous application_names + * @return + */ + @Override + public boolean registerServiceAppMapping( + String serviceInterface, String defaultMappingGroup, String newConfigContent, Object ticket) { + try { + if (null != ticket && !(ticket instanceof String)) { + throw new IllegalArgumentException("redis publishConfigCas requires stat type ticket"); + } + String pathKey = buildMappingKey(defaultMappingGroup); + + return storeMapping(pathKey, serviceInterface, newConfigContent, (String) ticket); + } catch (Exception e) { + logger.warn(TRANSPORT_FAILED_RESPONSE, "", "", "redis publishConfigCas failed.", e); + return false; + } + } + + private boolean storeMapping(String key, String field, String value, String ticket) { + if (pool != null) { + return storeMappingStandalone(key, field, value, ticket); + } else { + return storeMappingInCluster(key, field, value, ticket); + } + } + + /** + * use 'watch' to implement cas. + * Find information about slot distribution by key. + */ + private boolean storeMappingInCluster(String key, String field, String value, String ticket) { + try (JedisCluster jedisCluster = + new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) { + Jedis jedis = new Jedis(jedisCluster.getConnectionFromSlot(JedisClusterCRC16.getSlot(key))); + jedis.watch(key); + String oldValue = jedis.hget(key, field); + if (null == oldValue || null == ticket || oldValue.equals(ticket)) { + Transaction transaction = jedis.multi(); + transaction.hset(key, field, value); + List result = transaction.exec(); + if (null != result) { + jedisCluster.publish(buildPubSubKey(), field); + return true; + } + } else { + jedis.unwatch(); + } + jedis.close(); + } catch (Throwable e) { + String msg = "Failed to put " + key + ":" + field + " to redis " + value + ", cause: " + e.getMessage(); + logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e); + throw new RpcException(msg, e); + } + return false; + } + + /** + * use 'watch' to implement cas. + * Find information about slot distribution by key. + */ + private boolean storeMappingStandalone(String key, String field, String value, String ticket) { + try (Jedis jedis = pool.getResource()) { + jedis.watch(key); + String oldValue = jedis.hget(key, field); + if (null == oldValue || null == ticket || oldValue.equals(ticket)) { + Transaction transaction = jedis.multi(); + transaction.hset(key, field, value); + List result = transaction.exec(); + if (null != result) { + jedis.publish(buildPubSubKey(), field); + return true; + } + } + jedis.unwatch(); + } catch (Throwable e) { + String msg = "Failed to put " + key + ":" + field + " to redis " + value + ", cause: " + e.getMessage(); + logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e); + throw new RpcException(msg, e); + } + return false; + } + + /** + * build mapping key + * @param defaultMappingGroup {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP} + * @return + */ + private String buildMappingKey(String defaultMappingGroup) { + return this.root + GROUP_CHAR_SEPARATOR + defaultMappingGroup; + } + + /** + * build pub/sub key + */ + private String buildPubSubKey() { + return buildMappingKey(DEFAULT_MAPPING_GROUP) + GROUP_CHAR_SEPARATOR + QUEUES_KEY; + } + + /** + * get content and use content to complete cas + * @param serviceKey class + * @param group {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP} + */ + @Override + public ConfigItem getConfigItem(String serviceKey, String group) { + String key = buildMappingKey(group); + String content = getMappingData(key, serviceKey); + + return new ConfigItem(content, content); + } + + /** + * get current application_names + */ + private String getMappingData(String key, String field) { + if (pool != null) { + return getMappingDataStandalone(key, field); + } else { + return getMappingDataInCluster(key, field); + } + } + + private String getMappingDataInCluster(String key, String field) { + try (JedisCluster jedisCluster = + new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) { + return jedisCluster.hget(key, field); + } catch (Throwable e) { + String msg = "Failed to get " + key + ":" + field + " from redis cluster , cause: " + e.getMessage(); + logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e); + throw new RpcException(msg, e); + } + } + + private String getMappingDataStandalone(String key, String field) { + try (Jedis jedis = pool.getResource()) { + return jedis.hget(key, field); + } catch (Throwable e) { + String msg = "Failed to get " + key + ":" + field + " from redis , cause: " + e.getMessage(); + logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e); + throw new RpcException(msg, e); + } + } + + /** + * remove listener. If have no listener,thread will dead + */ + @Override + public void removeServiceAppMappingListener(String serviceKey, MappingListener listener) { + MappingDataListener mappingDataListener = mappingDataListenerMap.get(buildPubSubKey()); + if (null != mappingDataListener) { + NotifySub notifySub = mappingDataListener.getNotifySub(); + notifySub.removeListener(serviceKey, listener); + if (notifySub.isEmpty()) { + mappingDataListener.shutdown(); + } + } + } + + /** + * Start a thread and subscribe to {@link this#buildPubSubKey()}. + * Notify {@link MappingListener} if there is a change in the 'application_names' message. + */ + @Override + public Set getServiceAppMapping(String serviceKey, MappingListener listener, URL url) { + MappingDataListener mappingDataListener = + ConcurrentHashMapUtils.computeIfAbsent(mappingDataListenerMap, buildPubSubKey(), k -> { + MappingDataListener dataListener = new MappingDataListener(buildPubSubKey()); + dataListener.start(); + return dataListener; + }); + mappingDataListener.getNotifySub().addListener(serviceKey, listener); + return this.getServiceAppMapping(serviceKey, url); + } + + @Override + public Set getServiceAppMapping(String serviceKey, URL url) { + String key = buildMappingKey(DEFAULT_MAPPING_GROUP); + return getAppNames(getMappingData(key, serviceKey)); + } + + @Override + public MetadataInfo getAppMetadata(SubscriberMetadataIdentifier identifier, Map instanceMetadata) { + String content = this.getMetadata(identifier); + return JsonUtils.toJavaObject(content, MetadataInfo.class); + } + + @Override + public void publishAppMetadata(SubscriberMetadataIdentifier identifier, MetadataInfo metadataInfo) { + this.storeMetadata(identifier, metadataInfo.getContent()); + } + + @Override + public void unPublishAppMetadata(SubscriberMetadataIdentifier identifier, MetadataInfo metadataInfo) { + this.deleteMetadata(identifier); + } + + // for test + public MappingDataListener getMappingDataListener() { + return mappingDataListenerMap.get(buildPubSubKey()); + } + + /** + * Listen for changes in the 'application_names' message and notify the listener. + */ + class NotifySub extends JedisPubSub { + + private final Map> listeners = new ConcurrentHashMap<>(); + + public void addListener(String key, MappingListener listener) { + Set listenerSet = listeners.computeIfAbsent(key, k -> new ConcurrentHashSet<>()); + listenerSet.add(listener); + } + + public void removeListener(String serviceKey, MappingListener listener) { + Set listenerSet = this.listeners.get(serviceKey); + if (listenerSet != null) { + listenerSet.remove(listener); + if (listenerSet.isEmpty()) { + this.listeners.remove(serviceKey); + } + } + } + + public Boolean isEmpty() { + return this.listeners.isEmpty(); + } + + @Override + public void onMessage(String key, String msg) { + logger.info("sub from redis:" + key + " message:" + msg); + String applicationNames = getMappingData(buildMappingKey(DEFAULT_MAPPING_GROUP), msg); + MappingChangedEvent mappingChangedEvent = new MappingChangedEvent(msg, getAppNames(applicationNames)); + if (!listeners.get(msg).isEmpty()) { + for (MappingListener mappingListener : listeners.get(msg)) { + mappingListener.onEvent(mappingChangedEvent); + } + } + } + + @Override + public void onPMessage(String pattern, String key, String msg) { + onMessage(key, msg); + } + + @Override + public void onPSubscribe(String pattern, int subscribedChannels) { + super.onPSubscribe(pattern, subscribedChannels); + } + } + + /** + * Subscribe application names change message. + */ + class MappingDataListener extends Thread { + + private String path; + + private final NotifySub notifySub = new NotifySub(); + // for test + protected volatile boolean running = true; + + public MappingDataListener(String path) { + this.path = path; + } + + public NotifySub getNotifySub() { + return notifySub; + } + + @Override + public void run() { + while (running) { + if (pool != null) { + try (Jedis jedis = pool.getResource()) { + jedis.subscribe(notifySub, path); + } catch (Throwable e) { + String msg = "Failed to subscribe " + path + ", cause: " + e.getMessage(); + logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e); + throw new RpcException(msg, e); + } + } else { + try (JedisCluster jedisCluster = new JedisCluster( + jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) { + jedisCluster.subscribe(notifySub, path); + } catch (Throwable e) { + String msg = "Failed to subscribe " + path + ", cause: " + e.getMessage(); + logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e); + throw new RpcException(msg, e); + } + } + } + } + + public void shutdown() { + try { + running = false; + notifySub.unsubscribe(path); + } catch (Throwable e) { + String msg = "Failed to unsubscribe " + path + ", cause: " + e.getMessage(); + logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e); + } + } + } +} diff --git a/ruoyi-common/ruoyi-common-dubbo/src/main/resources/common-dubbo.yml b/ruoyi-common/ruoyi-common-dubbo/src/main/resources/common-dubbo.yml index 45324774..e7035f1a 100644 --- a/ruoyi-common/ruoyi-common-dubbo/src/main/resources/common-dubbo.yml +++ b/ruoyi-common/ruoyi-common-dubbo/src/main/resources/common-dubbo.yml @@ -19,6 +19,14 @@ dubbo: password: ${spring.cloud.nacos.password} parameters: namespace: ${spring.profiles.active} + metadata-report: + address: redis://${spring.data.redis.host}:${spring.data.redis.port} + group: DUBBO_GROUP + username: dubbo + password: ${spring.data.redis.password} + parameters: + namespace: ${spring.profiles.active} + database: ${spring.data.redis.database} # 消费者相关配置 consumer: # 结果缓存(LRU算法)