update 优化 dubbo 使用 redis 作为元数据中心管理 支持过期时间 避免过期数据堆积 解放nacos存储空间
parent
aded21a4c3
commit
021f030da8
@ -0,0 +1,576 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.dubbo.metadata.report.support;
|
||||||
|
|
||||||
|
import org.apache.dubbo.common.URL;
|
||||||
|
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
|
||||||
|
import org.apache.dubbo.common.logger.LoggerFactory;
|
||||||
|
import org.apache.dubbo.common.utils.CollectionUtils;
|
||||||
|
import org.apache.dubbo.common.utils.ConfigUtils;
|
||||||
|
import org.apache.dubbo.common.utils.JsonUtils;
|
||||||
|
import org.apache.dubbo.common.utils.NamedThreadFactory;
|
||||||
|
import org.apache.dubbo.metadata.definition.model.FullServiceDefinition;
|
||||||
|
import org.apache.dubbo.metadata.definition.model.ServiceDefinition;
|
||||||
|
import org.apache.dubbo.metadata.report.MetadataReport;
|
||||||
|
import org.apache.dubbo.metadata.report.identifier.KeyTypeEnum;
|
||||||
|
import org.apache.dubbo.metadata.report.identifier.MetadataIdentifier;
|
||||||
|
import org.apache.dubbo.metadata.report.identifier.ServiceMetadataIdentifier;
|
||||||
|
import org.apache.dubbo.metadata.report.identifier.SubscriberMetadataIdentifier;
|
||||||
|
import org.apache.dubbo.metrics.event.MetricsEventBus;
|
||||||
|
import org.apache.dubbo.metrics.metadata.event.MetadataEvent;
|
||||||
|
import org.apache.dubbo.rpc.model.ApplicationModel;
|
||||||
|
|
||||||
|
import java.io.*;
|
||||||
|
import java.nio.channels.FileChannel;
|
||||||
|
import java.nio.channels.FileLock;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.*;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import static org.apache.dubbo.common.constants.CommonConstants.*;
|
||||||
|
import static org.apache.dubbo.common.constants.LoggerCodeConstants.COMMON_UNEXPECTED_EXCEPTION;
|
||||||
|
import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROXY_FAILED_EXPORT_SERVICE;
|
||||||
|
import static org.apache.dubbo.common.utils.StringUtils.replace;
|
||||||
|
import static org.apache.dubbo.metadata.report.support.Constants.*;
|
||||||
|
|
||||||
|
public abstract class AbstractMetadataReport implements MetadataReport {
|
||||||
|
|
||||||
|
protected static final String DEFAULT_ROOT = "dubbo";
|
||||||
|
|
||||||
|
protected static final int ONE_DAY_IN_MILLISECONDS = 60 * 24 * 60 * 1000;
|
||||||
|
private static final int FOUR_HOURS_IN_MILLISECONDS = 60 * 4 * 60 * 1000;
|
||||||
|
// Log output
|
||||||
|
protected final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(getClass());
|
||||||
|
|
||||||
|
// Local disk cache, where the special key value.registries records the list of metadata centers, and the others are
|
||||||
|
// the list of notified service providers
|
||||||
|
final Properties properties = new Properties();
|
||||||
|
private final ExecutorService reportCacheExecutor =
|
||||||
|
Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveMetadataReport", true));
|
||||||
|
final Map<MetadataIdentifier, Object> allMetadataReports = new ConcurrentHashMap<>(4);
|
||||||
|
|
||||||
|
private final AtomicLong lastCacheChanged = new AtomicLong();
|
||||||
|
final Map<MetadataIdentifier, Object> failedReports = new ConcurrentHashMap<>(4);
|
||||||
|
private URL reportURL;
|
||||||
|
boolean syncReport;
|
||||||
|
// Local disk cache file
|
||||||
|
File file;
|
||||||
|
private AtomicBoolean initialized = new AtomicBoolean(false);
|
||||||
|
public MetadataReportRetry metadataReportRetry;
|
||||||
|
private ScheduledExecutorService reportTimerScheduler;
|
||||||
|
|
||||||
|
private final boolean reportMetadata;
|
||||||
|
private final boolean reportDefinition;
|
||||||
|
protected ApplicationModel applicationModel;
|
||||||
|
|
||||||
|
public AbstractMetadataReport(URL reportServerURL) {
|
||||||
|
setUrl(reportServerURL);
|
||||||
|
applicationModel = reportServerURL.getOrDefaultApplicationModel();
|
||||||
|
|
||||||
|
boolean localCacheEnabled = reportServerURL.getParameter(REGISTRY_LOCAL_FILE_CACHE_ENABLED, true);
|
||||||
|
// Start file save timer
|
||||||
|
String defaultFilename = System.getProperty("user.home") + DUBBO_METADATA
|
||||||
|
+ reportServerURL.getApplication()
|
||||||
|
+ "-" + replace(reportServerURL.getAddress(), ":", "-")
|
||||||
|
+ CACHE;
|
||||||
|
String filename = reportServerURL.getParameter(FILE_KEY, defaultFilename);
|
||||||
|
File file = null;
|
||||||
|
if (localCacheEnabled && ConfigUtils.isNotEmpty(filename)) {
|
||||||
|
file = new File(filename);
|
||||||
|
if (!file.exists()
|
||||||
|
&& file.getParentFile() != null
|
||||||
|
&& !file.getParentFile().exists()) {
|
||||||
|
if (!file.getParentFile().mkdirs()) {
|
||||||
|
throw new IllegalArgumentException("Invalid service store file " + file
|
||||||
|
+ ", cause: Failed to create directory " + file.getParentFile() + "!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// if this file exists, firstly delete it.
|
||||||
|
if (!initialized.getAndSet(true) && file.exists()) {
|
||||||
|
file.delete();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.file = file;
|
||||||
|
loadProperties();
|
||||||
|
syncReport = reportServerURL.getParameter(SYNC_REPORT_KEY, false);
|
||||||
|
metadataReportRetry = new MetadataReportRetry(
|
||||||
|
reportServerURL.getParameter(RETRY_TIMES_KEY, DEFAULT_METADATA_REPORT_RETRY_TIMES),
|
||||||
|
reportServerURL.getParameter(RETRY_PERIOD_KEY, DEFAULT_METADATA_REPORT_RETRY_PERIOD));
|
||||||
|
// cycle report the data switch
|
||||||
|
if (reportServerURL.getParameter(CYCLE_REPORT_KEY, DEFAULT_METADATA_REPORT_CYCLE_REPORT)) {
|
||||||
|
reportTimerScheduler = Executors.newSingleThreadScheduledExecutor(
|
||||||
|
new NamedThreadFactory("DubboMetadataReportTimer", true));
|
||||||
|
reportTimerScheduler.scheduleAtFixedRate(
|
||||||
|
this::publishAll, calculateStartTime(), ONE_DAY_IN_MILLISECONDS, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.reportMetadata = reportServerURL.getParameter(REPORT_METADATA_KEY, false);
|
||||||
|
this.reportDefinition = reportServerURL.getParameter(REPORT_DEFINITION_KEY, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public URL getUrl() {
|
||||||
|
return reportURL;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void setUrl(URL url) {
|
||||||
|
if (url == null) {
|
||||||
|
throw new IllegalArgumentException("metadataReport url == null");
|
||||||
|
}
|
||||||
|
this.reportURL = url;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doSaveProperties(long version) {
|
||||||
|
if (version < lastCacheChanged.get()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (file == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Save
|
||||||
|
try {
|
||||||
|
File lockfile = new File(file.getAbsolutePath() + ".lock");
|
||||||
|
if (!lockfile.exists()) {
|
||||||
|
lockfile.createNewFile();
|
||||||
|
}
|
||||||
|
try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
|
||||||
|
FileChannel channel = raf.getChannel()) {
|
||||||
|
FileLock lock = channel.tryLock();
|
||||||
|
if (lock == null) {
|
||||||
|
throw new IOException(
|
||||||
|
"Can not lock the metadataReport cache file " + file.getAbsolutePath()
|
||||||
|
+ ", ignore and retry later, maybe multi java process use the file, please config: dubbo.metadata.file=xxx.properties");
|
||||||
|
}
|
||||||
|
// Save
|
||||||
|
try {
|
||||||
|
if (!file.exists()) {
|
||||||
|
file.createNewFile();
|
||||||
|
}
|
||||||
|
|
||||||
|
Properties tmpProperties;
|
||||||
|
if (!syncReport) {
|
||||||
|
// When syncReport = false, properties.setProperty and properties.store are called from the same
|
||||||
|
// thread(reportCacheExecutor), so deep copy is not required
|
||||||
|
tmpProperties = properties;
|
||||||
|
} else {
|
||||||
|
// Using store method and setProperty method of the this.properties will cause lock contention
|
||||||
|
// under multi-threading, so deep copy a new container
|
||||||
|
tmpProperties = new Properties();
|
||||||
|
Set<Map.Entry<Object, Object>> entries = properties.entrySet();
|
||||||
|
for (Map.Entry<Object, Object> entry : entries) {
|
||||||
|
tmpProperties.setProperty((String) entry.getKey(), (String) entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try (FileOutputStream outputFile = new FileOutputStream(file)) {
|
||||||
|
tmpProperties.store(outputFile, "Dubbo metadataReport Cache");
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
if (version < lastCacheChanged.get()) {
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
reportCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
|
||||||
|
}
|
||||||
|
logger.warn(
|
||||||
|
COMMON_UNEXPECTED_EXCEPTION,
|
||||||
|
"",
|
||||||
|
"",
|
||||||
|
"Failed to save service store file, cause: " + e.getMessage(),
|
||||||
|
e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void loadProperties() {
|
||||||
|
if (file != null && file.exists()) {
|
||||||
|
try (InputStream in = new FileInputStream(file)) {
|
||||||
|
properties.load(in);
|
||||||
|
if (logger.isInfoEnabled()) {
|
||||||
|
logger.info("Load service store file " + file + ", data: " + properties);
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
logger.warn(COMMON_UNEXPECTED_EXCEPTION, "", "", "Failed to load service store file" + file, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void saveProperties(MetadataIdentifier metadataIdentifier, String value, boolean add, boolean sync) {
|
||||||
|
if (file == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (add) {
|
||||||
|
properties.setProperty(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), value);
|
||||||
|
} else {
|
||||||
|
properties.remove(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY));
|
||||||
|
}
|
||||||
|
long version = lastCacheChanged.incrementAndGet();
|
||||||
|
if (sync) {
|
||||||
|
new SaveProperties(version).run();
|
||||||
|
} else {
|
||||||
|
reportCacheExecutor.execute(new SaveProperties(version));
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (Throwable t) {
|
||||||
|
logger.warn(COMMON_UNEXPECTED_EXCEPTION, "", "", t.getMessage(), t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return getUrl().toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
private class SaveProperties implements Runnable {
|
||||||
|
private long version;
|
||||||
|
|
||||||
|
private SaveProperties(long version) {
|
||||||
|
this.version = version;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
doSaveProperties(version);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void storeProviderMetadata(
|
||||||
|
MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) {
|
||||||
|
if (syncReport) {
|
||||||
|
storeProviderMetadataTask(providerMetadataIdentifier, serviceDefinition);
|
||||||
|
} else {
|
||||||
|
reportCacheExecutor.execute(() -> storeProviderMetadataTask(providerMetadataIdentifier, serviceDefinition));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void storeProviderMetadataTask(
|
||||||
|
MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) {
|
||||||
|
|
||||||
|
MetadataEvent metadataEvent = MetadataEvent.toServiceSubscribeEvent(
|
||||||
|
applicationModel, providerMetadataIdentifier.getUniqueServiceName());
|
||||||
|
MetricsEventBus.post(
|
||||||
|
metadataEvent,
|
||||||
|
() -> {
|
||||||
|
boolean result = true;
|
||||||
|
try {
|
||||||
|
if (logger.isInfoEnabled()) {
|
||||||
|
logger.info("store provider metadata. Identifier : " + providerMetadataIdentifier
|
||||||
|
+ "; definition: " + serviceDefinition);
|
||||||
|
}
|
||||||
|
allMetadataReports.put(providerMetadataIdentifier, serviceDefinition);
|
||||||
|
failedReports.remove(providerMetadataIdentifier);
|
||||||
|
String data = JsonUtils.toJson(serviceDefinition);
|
||||||
|
doStoreProviderMetadata(providerMetadataIdentifier, data);
|
||||||
|
saveProperties(providerMetadataIdentifier, data, true, !syncReport);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// retry again. If failed again, throw exception.
|
||||||
|
failedReports.put(providerMetadataIdentifier, serviceDefinition);
|
||||||
|
metadataReportRetry.startRetryTask();
|
||||||
|
logger.error(
|
||||||
|
PROXY_FAILED_EXPORT_SERVICE,
|
||||||
|
"",
|
||||||
|
"",
|
||||||
|
"Failed to put provider metadata " + providerMetadataIdentifier + " in "
|
||||||
|
+ serviceDefinition + ", cause: " + e.getMessage(),
|
||||||
|
e);
|
||||||
|
result = false;
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
},
|
||||||
|
aBoolean -> aBoolean);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void storeConsumerMetadata(
|
||||||
|
MetadataIdentifier consumerMetadataIdentifier, Map<String, String> serviceParameterMap) {
|
||||||
|
if (syncReport) {
|
||||||
|
storeConsumerMetadataTask(consumerMetadataIdentifier, serviceParameterMap);
|
||||||
|
} else {
|
||||||
|
reportCacheExecutor.execute(
|
||||||
|
() -> storeConsumerMetadataTask(consumerMetadataIdentifier, serviceParameterMap));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void storeConsumerMetadataTask(
|
||||||
|
MetadataIdentifier consumerMetadataIdentifier, Map<String, String> serviceParameterMap) {
|
||||||
|
try {
|
||||||
|
if (logger.isInfoEnabled()) {
|
||||||
|
logger.info("store consumer metadata. Identifier : " + consumerMetadataIdentifier + "; definition: "
|
||||||
|
+ serviceParameterMap);
|
||||||
|
}
|
||||||
|
allMetadataReports.put(consumerMetadataIdentifier, serviceParameterMap);
|
||||||
|
failedReports.remove(consumerMetadataIdentifier);
|
||||||
|
|
||||||
|
String data = JsonUtils.toJson(serviceParameterMap);
|
||||||
|
doStoreConsumerMetadata(consumerMetadataIdentifier, data);
|
||||||
|
saveProperties(consumerMetadataIdentifier, data, true, !syncReport);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// retry again. If failed again, throw exception.
|
||||||
|
failedReports.put(consumerMetadataIdentifier, serviceParameterMap);
|
||||||
|
metadataReportRetry.startRetryTask();
|
||||||
|
logger.error(
|
||||||
|
PROXY_FAILED_EXPORT_SERVICE,
|
||||||
|
"",
|
||||||
|
"",
|
||||||
|
"Failed to put consumer metadata " + consumerMetadataIdentifier + "; " + serviceParameterMap
|
||||||
|
+ ", cause: " + e.getMessage(),
|
||||||
|
e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy() {
|
||||||
|
if (reportCacheExecutor != null) {
|
||||||
|
reportCacheExecutor.shutdown();
|
||||||
|
}
|
||||||
|
if (reportTimerScheduler != null) {
|
||||||
|
reportTimerScheduler.shutdown();
|
||||||
|
}
|
||||||
|
if (metadataReportRetry != null) {
|
||||||
|
metadataReportRetry.destroy();
|
||||||
|
metadataReportRetry = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void saveServiceMetadata(ServiceMetadataIdentifier metadataIdentifier, URL url) {
|
||||||
|
if (syncReport) {
|
||||||
|
doSaveMetadata(metadataIdentifier, url);
|
||||||
|
} else {
|
||||||
|
reportCacheExecutor.execute(() -> doSaveMetadata(metadataIdentifier, url));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeServiceMetadata(ServiceMetadataIdentifier metadataIdentifier) {
|
||||||
|
if (syncReport) {
|
||||||
|
doRemoveMetadata(metadataIdentifier);
|
||||||
|
} else {
|
||||||
|
reportCacheExecutor.execute(() -> doRemoveMetadata(metadataIdentifier));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<String> getExportedURLs(ServiceMetadataIdentifier metadataIdentifier) {
|
||||||
|
// TODO, fallback to local cache
|
||||||
|
return doGetExportedURLs(metadataIdentifier);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void saveSubscribedData(SubscriberMetadataIdentifier subscriberMetadataIdentifier, Set<String> urls) {
|
||||||
|
if (syncReport) {
|
||||||
|
doSaveSubscriberData(subscriberMetadataIdentifier, JsonUtils.toJson(urls));
|
||||||
|
} else {
|
||||||
|
reportCacheExecutor.execute(
|
||||||
|
() -> doSaveSubscriberData(subscriberMetadataIdentifier, JsonUtils.toJson(urls)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<String> getSubscribedURLs(SubscriberMetadataIdentifier subscriberMetadataIdentifier) {
|
||||||
|
String content = doGetSubscribedURLs(subscriberMetadataIdentifier);
|
||||||
|
return JsonUtils.toJavaList(content, String.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
String getProtocol(URL url) {
|
||||||
|
String protocol = url.getSide();
|
||||||
|
protocol = protocol == null ? url.getProtocol() : protocol;
|
||||||
|
return protocol;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return if need to continue
|
||||||
|
*/
|
||||||
|
public boolean retry() {
|
||||||
|
return doHandleMetadataCollection(failedReports);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean shouldReportDefinition() {
|
||||||
|
return reportDefinition;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean shouldReportMetadata() {
|
||||||
|
return reportMetadata;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean doHandleMetadataCollection(Map<MetadataIdentifier, Object> metadataMap) {
|
||||||
|
if (metadataMap.isEmpty()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
Iterator<Map.Entry<MetadataIdentifier, Object>> iterable =
|
||||||
|
metadataMap.entrySet().iterator();
|
||||||
|
while (iterable.hasNext()) {
|
||||||
|
Map.Entry<MetadataIdentifier, Object> item = iterable.next();
|
||||||
|
if (PROVIDER_SIDE.equals(item.getKey().getSide())) {
|
||||||
|
this.storeProviderMetadata(item.getKey(), (FullServiceDefinition) item.getValue());
|
||||||
|
} else if (CONSUMER_SIDE.equals(item.getKey().getSide())) {
|
||||||
|
this.storeConsumerMetadata(item.getKey(), (Map) item.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* not private. just for unittest.
|
||||||
|
*/
|
||||||
|
void publishAll() {
|
||||||
|
logger.info("start to publish all metadata.");
|
||||||
|
this.doHandleMetadataCollection(allMetadataReports);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* between 2:00 am to 6:00 am, the time is random.
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
long calculateStartTime() {
|
||||||
|
Calendar calendar = Calendar.getInstance();
|
||||||
|
long nowMill = calendar.getTimeInMillis();
|
||||||
|
calendar.set(Calendar.HOUR_OF_DAY, 0);
|
||||||
|
calendar.set(Calendar.MINUTE, 0);
|
||||||
|
calendar.set(Calendar.SECOND, 0);
|
||||||
|
calendar.set(Calendar.MILLISECOND, 0);
|
||||||
|
long subtract = calendar.getTimeInMillis() + ONE_DAY_IN_MILLISECONDS - nowMill;
|
||||||
|
return subtract
|
||||||
|
+ (FOUR_HOURS_IN_MILLISECONDS / 2)
|
||||||
|
+ ThreadLocalRandom.current().nextInt(FOUR_HOURS_IN_MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
class MetadataReportRetry {
|
||||||
|
protected final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(getClass());
|
||||||
|
|
||||||
|
final ScheduledExecutorService retryExecutor =
|
||||||
|
Executors.newScheduledThreadPool(0, new NamedThreadFactory("DubboMetadataReportRetryTimer", true));
|
||||||
|
volatile ScheduledFuture retryScheduledFuture;
|
||||||
|
final AtomicInteger retryCounter = new AtomicInteger(0);
|
||||||
|
// retry task schedule period
|
||||||
|
long retryPeriod;
|
||||||
|
// if no failed report, wait how many times to run retry task.
|
||||||
|
int retryTimesIfNonFail = 600;
|
||||||
|
|
||||||
|
int retryLimit;
|
||||||
|
|
||||||
|
public MetadataReportRetry(int retryTimes, int retryPeriod) {
|
||||||
|
this.retryPeriod = retryPeriod;
|
||||||
|
this.retryLimit = retryTimes;
|
||||||
|
}
|
||||||
|
|
||||||
|
void startRetryTask() {
|
||||||
|
if (retryScheduledFuture == null) {
|
||||||
|
synchronized (retryCounter) {
|
||||||
|
if (retryScheduledFuture == null) {
|
||||||
|
retryScheduledFuture = retryExecutor.scheduleWithFixedDelay(
|
||||||
|
() -> {
|
||||||
|
// Check and connect to the metadata
|
||||||
|
try {
|
||||||
|
int times = retryCounter.incrementAndGet();
|
||||||
|
logger.info("start to retry task for metadata report. retry times:" + times);
|
||||||
|
if (retry() && times > retryTimesIfNonFail) {
|
||||||
|
cancelRetryTask();
|
||||||
|
}
|
||||||
|
if (times > retryLimit) {
|
||||||
|
cancelRetryTask();
|
||||||
|
}
|
||||||
|
} catch (Throwable t) { // Defensive fault tolerance
|
||||||
|
logger.error(
|
||||||
|
COMMON_UNEXPECTED_EXCEPTION,
|
||||||
|
"",
|
||||||
|
"",
|
||||||
|
"Unexpected error occur at failed retry, cause: " + t.getMessage(),
|
||||||
|
t);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
500,
|
||||||
|
retryPeriod,
|
||||||
|
TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void cancelRetryTask() {
|
||||||
|
if (retryScheduledFuture != null) {
|
||||||
|
retryScheduledFuture.cancel(false);
|
||||||
|
}
|
||||||
|
retryExecutor.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
void destroy() {
|
||||||
|
cancelRetryTask();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @deprecated only for test
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
ScheduledExecutorService getRetryExecutor() {
|
||||||
|
return retryExecutor;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doSaveSubscriberData(SubscriberMetadataIdentifier subscriberMetadataIdentifier, List<String> urls) {
|
||||||
|
if (CollectionUtils.isEmpty(urls)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
List<String> encodedUrlList = new ArrayList<>(urls.size());
|
||||||
|
for (String url : urls) {
|
||||||
|
encodedUrlList.add(URL.encode(url));
|
||||||
|
}
|
||||||
|
doSaveSubscriberData(subscriberMetadataIdentifier, encodedUrlList);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract void doStoreProviderMetadata(
|
||||||
|
MetadataIdentifier providerMetadataIdentifier, String serviceDefinitions);
|
||||||
|
|
||||||
|
protected abstract void doStoreConsumerMetadata(
|
||||||
|
MetadataIdentifier consumerMetadataIdentifier, String serviceParameterString);
|
||||||
|
|
||||||
|
protected abstract void doSaveMetadata(ServiceMetadataIdentifier metadataIdentifier, URL url);
|
||||||
|
|
||||||
|
protected abstract void doRemoveMetadata(ServiceMetadataIdentifier metadataIdentifier);
|
||||||
|
|
||||||
|
protected abstract List<String> doGetExportedURLs(ServiceMetadataIdentifier metadataIdentifier);
|
||||||
|
|
||||||
|
protected abstract void doSaveSubscriberData(
|
||||||
|
SubscriberMetadataIdentifier subscriberMetadataIdentifier, String urlListStr);
|
||||||
|
|
||||||
|
protected abstract String doGetSubscribedURLs(SubscriberMetadataIdentifier subscriberMetadataIdentifier);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @deprecated only for unit test
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
protected ExecutorService getReportCacheExecutor() {
|
||||||
|
return reportCacheExecutor;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @deprecated only for unit test
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
protected MetadataReportRetry getMetadataReportRetry() {
|
||||||
|
return metadataReportRetry;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,553 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.dubbo.metadata.store.redis;
|
||||||
|
|
||||||
|
import org.apache.dubbo.common.URL;
|
||||||
|
import org.apache.dubbo.common.config.configcenter.ConfigItem;
|
||||||
|
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
|
||||||
|
import org.apache.dubbo.common.logger.LoggerFactory;
|
||||||
|
import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
|
||||||
|
import org.apache.dubbo.common.utils.ConcurrentHashSet;
|
||||||
|
import org.apache.dubbo.common.utils.JsonUtils;
|
||||||
|
import org.apache.dubbo.common.utils.StringUtils;
|
||||||
|
import org.apache.dubbo.metadata.MappingChangedEvent;
|
||||||
|
import org.apache.dubbo.metadata.MappingListener;
|
||||||
|
import org.apache.dubbo.metadata.MetadataInfo;
|
||||||
|
import org.apache.dubbo.metadata.ServiceNameMapping;
|
||||||
|
import org.apache.dubbo.metadata.report.identifier.BaseMetadataIdentifier;
|
||||||
|
import org.apache.dubbo.metadata.report.identifier.KeyTypeEnum;
|
||||||
|
import org.apache.dubbo.metadata.report.identifier.MetadataIdentifier;
|
||||||
|
import org.apache.dubbo.metadata.report.identifier.ServiceMetadataIdentifier;
|
||||||
|
import org.apache.dubbo.metadata.report.identifier.SubscriberMetadataIdentifier;
|
||||||
|
import org.apache.dubbo.metadata.report.support.AbstractMetadataReport;
|
||||||
|
import org.apache.dubbo.rpc.RpcException;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
|
||||||
|
import redis.clients.jedis.HostAndPort;
|
||||||
|
import redis.clients.jedis.Jedis;
|
||||||
|
import redis.clients.jedis.JedisCluster;
|
||||||
|
import redis.clients.jedis.JedisPool;
|
||||||
|
import redis.clients.jedis.JedisPoolConfig;
|
||||||
|
import redis.clients.jedis.JedisPubSub;
|
||||||
|
import redis.clients.jedis.Transaction;
|
||||||
|
import redis.clients.jedis.params.SetParams;
|
||||||
|
import redis.clients.jedis.util.JedisClusterCRC16;
|
||||||
|
|
||||||
|
import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_KEY;
|
||||||
|
import static org.apache.dubbo.common.constants.CommonConstants.CYCLE_REPORT_KEY;
|
||||||
|
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
|
||||||
|
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_CHAR_SEPARATOR;
|
||||||
|
import static org.apache.dubbo.common.constants.CommonConstants.QUEUES_KEY;
|
||||||
|
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
|
||||||
|
import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_RESPONSE;
|
||||||
|
import static org.apache.dubbo.metadata.MetadataConstants.META_DATA_STORE_TAG;
|
||||||
|
import static org.apache.dubbo.metadata.ServiceNameMapping.DEFAULT_MAPPING_GROUP;
|
||||||
|
import static org.apache.dubbo.metadata.ServiceNameMapping.getAppNames;
|
||||||
|
import static org.apache.dubbo.metadata.report.support.Constants.DEFAULT_METADATA_REPORT_CYCLE_REPORT;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RedisMetadataReport
|
||||||
|
*/
|
||||||
|
public class RedisMetadataReport extends AbstractMetadataReport {
|
||||||
|
|
||||||
|
private static final String REDIS_DATABASE_KEY = "database";
|
||||||
|
private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(RedisMetadataReport.class);
|
||||||
|
|
||||||
|
// protected , for test
|
||||||
|
protected JedisPool pool;
|
||||||
|
private Set<HostAndPort> jedisClusterNodes;
|
||||||
|
private int timeout;
|
||||||
|
private String password;
|
||||||
|
private final String root;
|
||||||
|
private final ConcurrentHashMap<String, MappingDataListener> mappingDataListenerMap = new ConcurrentHashMap<>();
|
||||||
|
private SetParams jedisParams = SetParams.setParams();
|
||||||
|
|
||||||
|
public RedisMetadataReport(URL url) {
|
||||||
|
super(url);
|
||||||
|
timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
|
||||||
|
password = url.getPassword();
|
||||||
|
this.root = url.getGroup(DEFAULT_ROOT);
|
||||||
|
if (url.getParameter(CYCLE_REPORT_KEY, DEFAULT_METADATA_REPORT_CYCLE_REPORT)) {
|
||||||
|
// ttl default is twice the cycle-report time
|
||||||
|
jedisParams.ex(ONE_DAY_IN_MILLISECONDS * 2);
|
||||||
|
}
|
||||||
|
if (url.getParameter(CLUSTER_KEY, false)) {
|
||||||
|
jedisClusterNodes = new HashSet<>();
|
||||||
|
List<URL> urls = url.getBackupUrls();
|
||||||
|
for (URL tmpUrl : urls) {
|
||||||
|
jedisClusterNodes.add(new HostAndPort(tmpUrl.getHost(), tmpUrl.getPort()));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
int database = url.getParameter(REDIS_DATABASE_KEY, 0);
|
||||||
|
pool = new JedisPool(new JedisPoolConfig(), url.getHost(), url.getPort(), timeout, password, database);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doStoreProviderMetadata(MetadataIdentifier providerMetadataIdentifier, String serviceDefinitions) {
|
||||||
|
this.storeMetadata(providerMetadataIdentifier, serviceDefinitions);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doStoreConsumerMetadata(MetadataIdentifier consumerMetadataIdentifier, String value) {
|
||||||
|
this.storeMetadata(consumerMetadataIdentifier, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doSaveMetadata(ServiceMetadataIdentifier serviceMetadataIdentifier, URL url) {
|
||||||
|
this.storeMetadata(serviceMetadataIdentifier, URL.encode(url.toFullString()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doRemoveMetadata(ServiceMetadataIdentifier serviceMetadataIdentifier) {
|
||||||
|
this.deleteMetadata(serviceMetadataIdentifier);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<String> doGetExportedURLs(ServiceMetadataIdentifier metadataIdentifier) {
|
||||||
|
String content = getMetadata(metadataIdentifier);
|
||||||
|
if (StringUtils.isEmpty(content)) {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
return new ArrayList<>(Arrays.asList(URL.decode(content)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doSaveSubscriberData(SubscriberMetadataIdentifier subscriberMetadataIdentifier, String urlListStr) {
|
||||||
|
this.storeMetadata(subscriberMetadataIdentifier, urlListStr);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String doGetSubscribedURLs(SubscriberMetadataIdentifier subscriberMetadataIdentifier) {
|
||||||
|
return this.getMetadata(subscriberMetadataIdentifier);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getServiceDefinition(MetadataIdentifier metadataIdentifier) {
|
||||||
|
return this.getMetadata(metadataIdentifier);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void storeMetadata(BaseMetadataIdentifier metadataIdentifier, String v) {
|
||||||
|
if (pool != null) {
|
||||||
|
storeMetadataStandalone(metadataIdentifier, v);
|
||||||
|
} else {
|
||||||
|
storeMetadataInCluster(metadataIdentifier, v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void storeMetadataInCluster(BaseMetadataIdentifier metadataIdentifier, String v) {
|
||||||
|
try (JedisCluster jedisCluster =
|
||||||
|
new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) {
|
||||||
|
jedisCluster.set(metadataIdentifier.getIdentifierKey() + META_DATA_STORE_TAG, v, jedisParams);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
String msg =
|
||||||
|
"Failed to put " + metadataIdentifier + " to redis cluster " + v + ", cause: " + e.getMessage();
|
||||||
|
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
|
||||||
|
throw new RpcException(msg, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void storeMetadataStandalone(BaseMetadataIdentifier metadataIdentifier, String v) {
|
||||||
|
try (Jedis jedis = pool.getResource()) {
|
||||||
|
jedis.set(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), v, jedisParams);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
String msg = "Failed to put " + metadataIdentifier + " to redis " + v + ", cause: " + e.getMessage();
|
||||||
|
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
|
||||||
|
throw new RpcException(msg, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deleteMetadata(BaseMetadataIdentifier metadataIdentifier) {
|
||||||
|
if (pool != null) {
|
||||||
|
deleteMetadataStandalone(metadataIdentifier);
|
||||||
|
} else {
|
||||||
|
deleteMetadataInCluster(metadataIdentifier);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deleteMetadataInCluster(BaseMetadataIdentifier metadataIdentifier) {
|
||||||
|
try (JedisCluster jedisCluster =
|
||||||
|
new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) {
|
||||||
|
jedisCluster.del(metadataIdentifier.getIdentifierKey() + META_DATA_STORE_TAG);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
String msg = "Failed to delete " + metadataIdentifier + " from redis cluster , cause: " + e.getMessage();
|
||||||
|
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
|
||||||
|
throw new RpcException(msg, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deleteMetadataStandalone(BaseMetadataIdentifier metadataIdentifier) {
|
||||||
|
try (Jedis jedis = pool.getResource()) {
|
||||||
|
jedis.del(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY));
|
||||||
|
} catch (Throwable e) {
|
||||||
|
String msg = "Failed to delete " + metadataIdentifier + " from redis , cause: " + e.getMessage();
|
||||||
|
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
|
||||||
|
throw new RpcException(msg, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getMetadata(BaseMetadataIdentifier metadataIdentifier) {
|
||||||
|
if (pool != null) {
|
||||||
|
return getMetadataStandalone(metadataIdentifier);
|
||||||
|
} else {
|
||||||
|
return getMetadataInCluster(metadataIdentifier);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getMetadataInCluster(BaseMetadataIdentifier metadataIdentifier) {
|
||||||
|
try (JedisCluster jedisCluster =
|
||||||
|
new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) {
|
||||||
|
return jedisCluster.get(metadataIdentifier.getIdentifierKey() + META_DATA_STORE_TAG);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
String msg = "Failed to get " + metadataIdentifier + " from redis cluster , cause: " + e.getMessage();
|
||||||
|
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
|
||||||
|
throw new RpcException(msg, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getMetadataStandalone(BaseMetadataIdentifier metadataIdentifier) {
|
||||||
|
try (Jedis jedis = pool.getResource()) {
|
||||||
|
return jedis.get(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY));
|
||||||
|
} catch (Throwable e) {
|
||||||
|
String msg = "Failed to get " + metadataIdentifier + " from redis , cause: " + e.getMessage();
|
||||||
|
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
|
||||||
|
throw new RpcException(msg, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Store class and application names using Redis hashes
|
||||||
|
* key: default 'dubbo:mapping'
|
||||||
|
* field: class (serviceInterface)
|
||||||
|
* value: application_names
|
||||||
|
* @param serviceInterface field(class)
|
||||||
|
* @param defaultMappingGroup {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP}
|
||||||
|
* @param newConfigContent new application_names
|
||||||
|
* @param ticket previous application_names
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean registerServiceAppMapping(
|
||||||
|
String serviceInterface, String defaultMappingGroup, String newConfigContent, Object ticket) {
|
||||||
|
try {
|
||||||
|
if (null != ticket && !(ticket instanceof String)) {
|
||||||
|
throw new IllegalArgumentException("redis publishConfigCas requires stat type ticket");
|
||||||
|
}
|
||||||
|
String pathKey = buildMappingKey(defaultMappingGroup);
|
||||||
|
|
||||||
|
return storeMapping(pathKey, serviceInterface, newConfigContent, (String) ticket);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.warn(TRANSPORT_FAILED_RESPONSE, "", "", "redis publishConfigCas failed.", e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean storeMapping(String key, String field, String value, String ticket) {
|
||||||
|
if (pool != null) {
|
||||||
|
return storeMappingStandalone(key, field, value, ticket);
|
||||||
|
} else {
|
||||||
|
return storeMappingInCluster(key, field, value, ticket);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* use 'watch' to implement cas.
|
||||||
|
* Find information about slot distribution by key.
|
||||||
|
*/
|
||||||
|
private boolean storeMappingInCluster(String key, String field, String value, String ticket) {
|
||||||
|
try (JedisCluster jedisCluster =
|
||||||
|
new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) {
|
||||||
|
Jedis jedis = new Jedis(jedisCluster.getConnectionFromSlot(JedisClusterCRC16.getSlot(key)));
|
||||||
|
jedis.watch(key);
|
||||||
|
String oldValue = jedis.hget(key, field);
|
||||||
|
if (null == oldValue || null == ticket || oldValue.equals(ticket)) {
|
||||||
|
Transaction transaction = jedis.multi();
|
||||||
|
transaction.hset(key, field, value);
|
||||||
|
List<Object> result = transaction.exec();
|
||||||
|
if (null != result) {
|
||||||
|
jedisCluster.publish(buildPubSubKey(), field);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
jedis.unwatch();
|
||||||
|
}
|
||||||
|
jedis.close();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
String msg = "Failed to put " + key + ":" + field + " to redis " + value + ", cause: " + e.getMessage();
|
||||||
|
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
|
||||||
|
throw new RpcException(msg, e);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* use 'watch' to implement cas.
|
||||||
|
* Find information about slot distribution by key.
|
||||||
|
*/
|
||||||
|
private boolean storeMappingStandalone(String key, String field, String value, String ticket) {
|
||||||
|
try (Jedis jedis = pool.getResource()) {
|
||||||
|
jedis.watch(key);
|
||||||
|
String oldValue = jedis.hget(key, field);
|
||||||
|
if (null == oldValue || null == ticket || oldValue.equals(ticket)) {
|
||||||
|
Transaction transaction = jedis.multi();
|
||||||
|
transaction.hset(key, field, value);
|
||||||
|
List<Object> result = transaction.exec();
|
||||||
|
if (null != result) {
|
||||||
|
jedis.publish(buildPubSubKey(), field);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
jedis.unwatch();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
String msg = "Failed to put " + key + ":" + field + " to redis " + value + ", cause: " + e.getMessage();
|
||||||
|
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
|
||||||
|
throw new RpcException(msg, e);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* build mapping key
|
||||||
|
* @param defaultMappingGroup {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP}
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private String buildMappingKey(String defaultMappingGroup) {
|
||||||
|
return this.root + GROUP_CHAR_SEPARATOR + defaultMappingGroup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* build pub/sub key
|
||||||
|
*/
|
||||||
|
private String buildPubSubKey() {
|
||||||
|
return buildMappingKey(DEFAULT_MAPPING_GROUP) + GROUP_CHAR_SEPARATOR + QUEUES_KEY;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* get content and use content to complete cas
|
||||||
|
* @param serviceKey class
|
||||||
|
* @param group {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public ConfigItem getConfigItem(String serviceKey, String group) {
|
||||||
|
String key = buildMappingKey(group);
|
||||||
|
String content = getMappingData(key, serviceKey);
|
||||||
|
|
||||||
|
return new ConfigItem(content, content);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* get current application_names
|
||||||
|
*/
|
||||||
|
private String getMappingData(String key, String field) {
|
||||||
|
if (pool != null) {
|
||||||
|
return getMappingDataStandalone(key, field);
|
||||||
|
} else {
|
||||||
|
return getMappingDataInCluster(key, field);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getMappingDataInCluster(String key, String field) {
|
||||||
|
try (JedisCluster jedisCluster =
|
||||||
|
new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) {
|
||||||
|
return jedisCluster.hget(key, field);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
String msg = "Failed to get " + key + ":" + field + " from redis cluster , cause: " + e.getMessage();
|
||||||
|
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
|
||||||
|
throw new RpcException(msg, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getMappingDataStandalone(String key, String field) {
|
||||||
|
try (Jedis jedis = pool.getResource()) {
|
||||||
|
return jedis.hget(key, field);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
String msg = "Failed to get " + key + ":" + field + " from redis , cause: " + e.getMessage();
|
||||||
|
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
|
||||||
|
throw new RpcException(msg, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* remove listener. If have no listener,thread will dead
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void removeServiceAppMappingListener(String serviceKey, MappingListener listener) {
|
||||||
|
MappingDataListener mappingDataListener = mappingDataListenerMap.get(buildPubSubKey());
|
||||||
|
if (null != mappingDataListener) {
|
||||||
|
NotifySub notifySub = mappingDataListener.getNotifySub();
|
||||||
|
notifySub.removeListener(serviceKey, listener);
|
||||||
|
if (notifySub.isEmpty()) {
|
||||||
|
mappingDataListener.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start a thread and subscribe to {@link this#buildPubSubKey()}.
|
||||||
|
* Notify {@link MappingListener} if there is a change in the 'application_names' message.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Set<String> getServiceAppMapping(String serviceKey, MappingListener listener, URL url) {
|
||||||
|
MappingDataListener mappingDataListener =
|
||||||
|
ConcurrentHashMapUtils.computeIfAbsent(mappingDataListenerMap, buildPubSubKey(), k -> {
|
||||||
|
MappingDataListener dataListener = new MappingDataListener(buildPubSubKey());
|
||||||
|
dataListener.start();
|
||||||
|
return dataListener;
|
||||||
|
});
|
||||||
|
mappingDataListener.getNotifySub().addListener(serviceKey, listener);
|
||||||
|
return this.getServiceAppMapping(serviceKey, url);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<String> getServiceAppMapping(String serviceKey, URL url) {
|
||||||
|
String key = buildMappingKey(DEFAULT_MAPPING_GROUP);
|
||||||
|
return getAppNames(getMappingData(key, serviceKey));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MetadataInfo getAppMetadata(SubscriberMetadataIdentifier identifier, Map<String, String> instanceMetadata) {
|
||||||
|
String content = this.getMetadata(identifier);
|
||||||
|
return JsonUtils.toJavaObject(content, MetadataInfo.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void publishAppMetadata(SubscriberMetadataIdentifier identifier, MetadataInfo metadataInfo) {
|
||||||
|
this.storeMetadata(identifier, metadataInfo.getContent());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void unPublishAppMetadata(SubscriberMetadataIdentifier identifier, MetadataInfo metadataInfo) {
|
||||||
|
this.deleteMetadata(identifier);
|
||||||
|
}
|
||||||
|
|
||||||
|
// for test
|
||||||
|
public MappingDataListener getMappingDataListener() {
|
||||||
|
return mappingDataListenerMap.get(buildPubSubKey());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Listen for changes in the 'application_names' message and notify the listener.
|
||||||
|
*/
|
||||||
|
class NotifySub extends JedisPubSub {
|
||||||
|
|
||||||
|
private final Map<String, Set<MappingListener>> listeners = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
public void addListener(String key, MappingListener listener) {
|
||||||
|
Set<MappingListener> listenerSet = listeners.computeIfAbsent(key, k -> new ConcurrentHashSet<>());
|
||||||
|
listenerSet.add(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeListener(String serviceKey, MappingListener listener) {
|
||||||
|
Set<MappingListener> listenerSet = this.listeners.get(serviceKey);
|
||||||
|
if (listenerSet != null) {
|
||||||
|
listenerSet.remove(listener);
|
||||||
|
if (listenerSet.isEmpty()) {
|
||||||
|
this.listeners.remove(serviceKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Boolean isEmpty() {
|
||||||
|
return this.listeners.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onMessage(String key, String msg) {
|
||||||
|
logger.info("sub from redis:" + key + " message:" + msg);
|
||||||
|
String applicationNames = getMappingData(buildMappingKey(DEFAULT_MAPPING_GROUP), msg);
|
||||||
|
MappingChangedEvent mappingChangedEvent = new MappingChangedEvent(msg, getAppNames(applicationNames));
|
||||||
|
if (!listeners.get(msg).isEmpty()) {
|
||||||
|
for (MappingListener mappingListener : listeners.get(msg)) {
|
||||||
|
mappingListener.onEvent(mappingChangedEvent);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onPMessage(String pattern, String key, String msg) {
|
||||||
|
onMessage(key, msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onPSubscribe(String pattern, int subscribedChannels) {
|
||||||
|
super.onPSubscribe(pattern, subscribedChannels);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Subscribe application names change message.
|
||||||
|
*/
|
||||||
|
class MappingDataListener extends Thread {
|
||||||
|
|
||||||
|
private String path;
|
||||||
|
|
||||||
|
private final NotifySub notifySub = new NotifySub();
|
||||||
|
// for test
|
||||||
|
protected volatile boolean running = true;
|
||||||
|
|
||||||
|
public MappingDataListener(String path) {
|
||||||
|
this.path = path;
|
||||||
|
}
|
||||||
|
|
||||||
|
public NotifySub getNotifySub() {
|
||||||
|
return notifySub;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
while (running) {
|
||||||
|
if (pool != null) {
|
||||||
|
try (Jedis jedis = pool.getResource()) {
|
||||||
|
jedis.subscribe(notifySub, path);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
String msg = "Failed to subscribe " + path + ", cause: " + e.getMessage();
|
||||||
|
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
|
||||||
|
throw new RpcException(msg, e);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
try (JedisCluster jedisCluster = new JedisCluster(
|
||||||
|
jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) {
|
||||||
|
jedisCluster.subscribe(notifySub, path);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
String msg = "Failed to subscribe " + path + ", cause: " + e.getMessage();
|
||||||
|
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
|
||||||
|
throw new RpcException(msg, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void shutdown() {
|
||||||
|
try {
|
||||||
|
running = false;
|
||||||
|
notifySub.unsubscribe(path);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
String msg = "Failed to unsubscribe " + path + ", cause: " + e.getMessage();
|
||||||
|
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue