Skip to content

solved #66, support default degrade rule. #2232

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package com.alibaba.csp.sentinel.slots.block.degrade;

import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.property.DynamicSentinelProperty;
import com.alibaba.csp.sentinel.property.PropertyListener;
import com.alibaba.csp.sentinel.property.SentinelProperty;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker;
import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.ExceptionCircuitBreaker;
import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.ResponseTimeCircuitBreaker;
import com.alibaba.csp.sentinel.util.StringUtil;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
* @author wuwen
*/
public class DefaultDegradeRuleManager {

public static final String DEFAULT_KEY = "*";

private static volatile Map<String, List<CircuitBreaker>> circuitBreakers = new ConcurrentHashMap<>();
private static volatile Set<DegradeRule> rules = new HashSet<>();

private static final DefaultDegradeRuleManager.RulePropertyListener LISTENER = new DefaultDegradeRuleManager.RulePropertyListener();
private static SentinelProperty<List<DegradeRule>> currentProperty
= new DynamicSentinelProperty<>();

static {
currentProperty.addListener(LISTENER);
}


static List<CircuitBreaker> getDefaultCircuitBreakers(String resourceName) {
List<CircuitBreaker> circuitBreakers = DefaultDegradeRuleManager.circuitBreakers.get(resourceName);
if (circuitBreakers == null && !rules.isEmpty()) {
return DefaultDegradeRuleManager.circuitBreakers.computeIfAbsent(resourceName,
r -> rules.stream().map(DefaultDegradeRuleManager::newCircuitBreakerFrom).collect(Collectors.toList()));
}
return circuitBreakers;
}

/**
* Load {@link DegradeRule}s, former rules will be replaced.
*
* @param rules new rules to load.
*/
public static void loadRules(List<DegradeRule> rules) {
try {
currentProperty.updateValue(rules);
} catch (Throwable e) {
RecordLog.error("[DefaultDegradeRuleManager] Unexpected error when loading degrade rules", e);
}
}

public static boolean isValidDefaultRule(DegradeRule rule) {
if (!DegradeRuleManager.isValidRule(rule)) {
return false;
}

return rule.getResource().equals(DEFAULT_KEY);
}

/**
* Create a circuit breaker instance from provided circuit breaking rule.
*
* @param rule a valid circuit breaking rule
* @return new circuit breaker based on provided rule; null if rule is invalid or unsupported type
*/
private static CircuitBreaker newCircuitBreakerFrom(/*@Valid*/ DegradeRule rule) {
switch (rule.getGrade()) {
case RuleConstant.DEGRADE_GRADE_RT:
return new ResponseTimeCircuitBreaker(rule);
case RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO:
case RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT:
return new ExceptionCircuitBreaker(rule);
default:
return null;
}
}


private static class RulePropertyListener implements PropertyListener<List<DegradeRule>> {

private synchronized void reloadFrom(List<DegradeRule> list) {

if (list == null) {
return;
}

Set<DegradeRule> rules = new HashSet<>();
List<CircuitBreaker> cbs = new ArrayList<>();

for (DegradeRule rule : list) {
if (!isValidDefaultRule(rule)) {
RecordLog.warn("[DefaultDegradeRuleManager] Ignoring invalid rule when loading new rules: {}", rule);
} else {

if (StringUtil.isBlank(rule.getLimitApp())) {
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
CircuitBreaker cb = newCircuitBreakerFrom(rule);
cbs.add(cb);
rules.add(rule);
}
}

Map<String, List<CircuitBreaker>> cbMap = new ConcurrentHashMap<>(8);

DefaultDegradeRuleManager.circuitBreakers.forEach((k, v) -> cbMap.put(k, cbs));

DefaultDegradeRuleManager.rules = rules;
DefaultDegradeRuleManager.circuitBreakers = cbMap;
}

@Override
public void configUpdate(List<DegradeRule> conf) {
reloadFrom(conf);
RecordLog.info("[DefaultDegradeRuleManager] Degrade rules has been updated to: {}", rules);
}

@Override
public void configLoad(List<DegradeRule> conf) {
reloadFrom(conf);
RecordLog.info("[DefaultDegradeRuleManager] Degrade rules loaded: {}", rules);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.alibaba.csp.sentinel.slots.block.degrade;

import com.alibaba.csp.sentinel.Constants;
import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.node.DefaultNode;
import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker;
import com.alibaba.csp.sentinel.spi.Spi;

import java.util.List;

/**
* @author wuwen
*/
@Spi(order = Constants.ORDER_DEGRADE_SLOT + 100)
public class DefaultDegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
performChecking(context, resourceWrapper);

fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

private void performChecking(Context context, ResourceWrapper r) throws BlockException {

if (DegradeRuleManager.hasConfig(r.getName())) {
return;
}

List<CircuitBreaker> circuitBreakers = DefaultDegradeRuleManager.getDefaultCircuitBreakers(r.getName());

if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}

for (CircuitBreaker cb : circuitBreakers) {
if (!cb.tryPass(context)) {
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
}
}
}

@Override
public void exit(Context context, ResourceWrapper r, int count, Object... args) {
Entry curEntry = context.getCurEntry();
if (curEntry.getBlockError() != null) {
fireExit(context, r, count, args);
return;
}

if (DegradeRuleManager.hasConfig(r.getName())) {
fireExit(context, r, count, args);
return;
}

List<CircuitBreaker> circuitBreakers = DefaultDegradeRuleManager.getDefaultCircuitBreakers(r.getName());

if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}

if (curEntry.getBlockError() == null) {
// passed request
for (CircuitBreaker circuitBreaker : circuitBreakers) {
circuitBreaker.onRequestComplete(context);
}
}

fireExit(context, r, count, args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
com.alibaba.csp.sentinel.slots.block.degrade.DefaultDegradeSlot
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot;
import com.alibaba.csp.sentinel.slotchain.ProcessorSlotChain;
import com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot;
import com.alibaba.csp.sentinel.slots.block.degrade.DefaultDegradeSlot;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot;
import com.alibaba.csp.sentinel.slots.block.flow.FlowSlot;
import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot;
Expand Down Expand Up @@ -70,6 +71,9 @@ public void testBuild() {
next = next.getNext();
assertTrue(next instanceof DegradeSlot);

next = next.getNext();
assertTrue(next instanceof DefaultDegradeSlot);

next = next.getNext();
assertNull(next);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,17 @@ public class CircuitBreakingIntegrationTest extends AbstractTimeBasedTest {

@Before
public void setUp() {
DegradeRuleManager.loadRules(new ArrayList<DegradeRule>());
DegradeRuleManager.loadRules(new ArrayList<>());
}

@After
public void tearDown() throws Exception {
DegradeRuleManager.loadRules(new ArrayList<DegradeRule>());
public void tearDown() {
DegradeRuleManager.loadRules(new ArrayList<>());
DefaultDegradeRuleManager.loadRules(new ArrayList<>());
}

@Test
public void testSlowRequestMode() throws Exception {
public void testSlowRequestMode() {
CircuitBreakerStateChangeObserver observer = mock(CircuitBreakerStateChangeObserver.class);
setCurrentMillis(System.currentTimeMillis() / 1000 * 1000);
int retryTimeoutSec = 5;
Expand Down Expand Up @@ -115,7 +116,69 @@ public void testSlowRequestMode() throws Exception {
}

@Test
public void testExceptionRatioMode() throws Exception {
public void testSlowRequestModeUseDefaultRule() {
CircuitBreakerStateChangeObserver observer = mock(CircuitBreakerStateChangeObserver.class);
setCurrentMillis(System.currentTimeMillis() / 1000 * 1000);
int retryTimeoutSec = 5;
int maxRt = 50;
int statIntervalMs = 20000;
int minRequestAmount = 10;
String res = "CircuitBreakingIntegrationTest_testSlowRequestModeUseDefaultRule";
EventObserverRegistry.getInstance().addStateChangeObserver(res, observer);

DefaultDegradeRuleManager.loadRules(Arrays.asList(
new DegradeRule(DefaultDegradeRuleManager.DEFAULT_KEY).setTimeWindow(retryTimeoutSec).setCount(maxRt)
.setStatIntervalMs(statIntervalMs).setMinRequestAmount(minRequestAmount)
.setSlowRatioThreshold(0.8d).setGrade(0)));

// Try first N requests where N = minRequestAmount.
for (int i = 0; i < minRequestAmount; i++) {
if (i < 7) {
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));
} else {
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(-20, -10)));
}
}

// Till now slow ratio should be 70%.
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));
// Circuit breaker has transformed to OPEN since here.
verify(observer)
.onStateChange(eq(State.CLOSED), eq(State.OPEN), any(DegradeRule.class), anyDouble());
assertEquals(State.OPEN, DefaultDegradeRuleManager.getDefaultCircuitBreakers(res).get(0).currentState());
assertFalse(entryAndSleepFor(res, 1));

sleepSecond(1);
assertFalse(entryAndSleepFor(res, 1));
sleepSecond(retryTimeoutSec);
// Test HALF-OPEN to OPEN.
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));

verify(observer)
.onStateChange(eq(State.OPEN), eq(State.HALF_OPEN), any(DegradeRule.class), nullable(Double.class));
verify(observer)
.onStateChange(eq(State.HALF_OPEN), eq(State.OPEN), any(DegradeRule.class), anyDouble());
// Wait for next retry timeout;
reset(observer);
sleepSecond(retryTimeoutSec + 1);
assertTrue(entryAndSleepFor(res, maxRt - ThreadLocalRandom.current().nextInt(10, 20)));
verify(observer)
.onStateChange(eq(State.OPEN), eq(State.HALF_OPEN), any(DegradeRule.class), nullable(Double.class));
verify(observer)
.onStateChange(eq(State.HALF_OPEN), eq(State.CLOSED), any(DegradeRule.class), nullable(Double.class));
// Now circuit breaker has been closed.
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));

EventObserverRegistry.getInstance().removeStateChangeObserver(res);
}

@Test
public void testExceptionRatioMode() {
CircuitBreakerStateChangeObserver observer = mock(CircuitBreakerStateChangeObserver.class);
setCurrentMillis(System.currentTimeMillis() / 1000 * 1000);
int retryTimeoutSec = 5;
Expand Down Expand Up @@ -169,7 +232,7 @@ public void testExceptionRatioMode() throws Exception {
}

@Test
public void testExceptionCountMode() throws Throwable {
public void testExceptionCountMode() {
// TODO
}

Expand All @@ -188,7 +251,7 @@ private void verifyState(List<CircuitBreaker> breakers, int target) {
}

@Test
public void testMultipleHalfOpenedBreakers() throws Exception {
public void testMultipleHalfOpenedBreakers() {
CircuitBreakerStateChangeObserver observer = mock(CircuitBreakerStateChangeObserver.class);
setCurrentMillis(System.currentTimeMillis() / 1000 * 1000);
int retryTimeoutSec = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.alibaba.csp.sentinel.slotchain.SlotChainBuilder;
import com.alibaba.csp.sentinel.slots.DefaultSlotChainBuilder;
import com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot;
import com.alibaba.csp.sentinel.slots.block.degrade.DefaultDegradeSlot;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot;
import com.alibaba.csp.sentinel.slots.block.flow.FlowSlot;
import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot;
Expand Down Expand Up @@ -98,13 +99,14 @@ public void testLoadInstanceList() {
prototypeSlotClasses.add(NodeSelectorSlot.class);
prototypeSlotClasses.add(ClusterBuilderSlot.class);

List<Class<? extends ProcessorSlot>> singletonSlotClasses = new ArrayList<>(6);
List<Class<? extends ProcessorSlot>> singletonSlotClasses = new ArrayList<>(7);
singletonSlotClasses.add(LogSlot.class);
singletonSlotClasses.add(StatisticSlot.class);
singletonSlotClasses.add(AuthoritySlot.class);
singletonSlotClasses.add(SystemSlot.class);
singletonSlotClasses.add(FlowSlot.class);
singletonSlotClasses.add(DegradeSlot.class);
singletonSlotClasses.add(DefaultDegradeSlot.class);

for (int i = 0; i < slots1.size(); i++) {
ProcessorSlot slot1 = slots1.get(i);
Expand Down Expand Up @@ -148,7 +150,7 @@ public void testLoadInstanceListSorted() {
assertNotNull(sortedSlots);

// Total 8 default slot in sentinel-core
assertEquals(8, sortedSlots.size());
assertEquals(9, sortedSlots.size());

// Verify the order of slot
int index = 0;
Expand All @@ -160,6 +162,7 @@ public void testLoadInstanceListSorted() {
assertTrue(sortedSlots.get(index++) instanceof SystemSlot);
assertTrue(sortedSlots.get(index++) instanceof FlowSlot);
assertTrue(sortedSlots.get(index++) instanceof DegradeSlot);
assertTrue(sortedSlots.get(index++) instanceof DefaultDegradeSlot);
}

@Test
Expand All @@ -177,7 +180,7 @@ public void testLoadLowestPriorityInstance() {
assertNotNull(slot);

// NodeSelectorSlot is lowest order priority with @Spi(order = -1000) among all slots
assertTrue(slot instanceof DegradeSlot);
assertTrue(slot instanceof DefaultDegradeSlot);
}

@Test
Expand Down