简单实现CircuitBreaker
CircuitBreaker java 简单实现
纸上得来终觉浅,Demo一下
CircuitBreaker.java 主体,维护resetMillis openCount lastFailureTime等
public AtomicLong openCount = new AtomicLong(0L);//熔断计数
public AtomicLong lastFailure = new AtomicLong(0L);//最后失败时间
public AtomicLong resetMillis = new AtomicLong(5 * 1000L);//重置窗口时间range
public enum BreakerState{
OPEN,
HALF_CLOSED,
CLOSED
}
public volatile BreakerState state = BreakerState.CLOSED;//默认关闭状态
public boolean isHardTrip=false;//是否强制熔断
public boolean bypass = false;//是否不使用熔断
public boolean isAttemptLive = false;//是否尝试活着
private Throwable tripException = null;//熔断异常
private FailureInterpreter failureInterpreter = new DefaultFailureInterpreter();//错误中断处理
public CircuitBreakerExceptionMapper<? extends Exception> exceptionMapper;
Runnable执行,通过handleFailure方法处理异常,是否启动熔断等
public void invoke(Runnable r) throws Exception {
if(!bypass){
if(!isAllowRequest()){
throw mapException(new CircuitBreakerException());
}
try{
isAttemptLive = true;
r.run();
close();
}catch (Throwable throwable){
handleFailure(throwable);//处理异常,是否启动熔断等
}
throw new IllegalStateException("can not goto here too.");
}else {
r.run();
}
}
public void handleFailure(Throwable throwable) throws Exception{
if (failureInterpreter == null||failureInterpreter.isTrip(throwable)) {
this.tripException = throwable;
trip();
}else if(isAttemptLive){
close();
}
if (throwable instanceof Exception) {
throw (Exception)throwable;
} else if (throwable instanceof Error) {
throw (Error)throwable;
} else {
throw (RuntimeException)throwable;
}
}
/**
* 打开熔断器
*/
public void trip(){
if(state!=BreakerState.OPEN){
openCount.getAndIncrement();
}
state = BreakerState.OPEN;
isAttemptLive = false;
lastFailure.set(System.currentTimeMillis());
}
如何判断是否可以打开熔断器呢?处理逻辑在FailureInterpreter
通过一个原子计数器counter 计算失败数量
public class DefaultFailureInterpreter implements FailureInterpreter {
private Set<Class<? extends Throwable>> exceptions = new HashSet<Class<? extends Throwable>>();
private int limit=0;//简单实现,基于errors 数量
private long windowMills=0;//简单实现,circuitBreaker open持续时间
private EventCounter counter;
public DefaultFailureInterpreter() {
}
public DefaultFailureInterpreter(int limit, long windowMills) {
this.limit = limit;
this.windowMills = windowMills;
initCounter();
}
public void initCounter(){
if(limit>0&&windowMills>0){
int capacity = limit + 1;
if (counter == null) {
counter = new EventCounter(capacity,windowMills);
}else{
counter.setCapacity(capacity);
counter.setWindowMillis(windowMills);
}
}
}
public Set<Class<? extends Throwable>> getExceptions() {
return exceptions;
}
public void setExceptions(Set<Class<? extends Throwable>> exceptions) {
this.exceptions = exceptions;
}
public int getLimit() {
return limit;
}
public void setLimit(int limit) {
this.limit = limit;
initCounter();
}
public long getWindowMills() {
return windowMills;
}
public void setWindowMills(long windowMills) {
this.windowMills = windowMills;
initCounter();
}
public EventCounter getCounter() {
return counter;
}
public void setCounter(EventCounter counter) {
this.counter = counter;
}
@Override
public boolean isTrip(Throwable cause) {
for(Class<?> clazz : exceptions) {
if (clazz.isInstance(cause)) {
return false;
}
}
if(this.limit > 0 && this.windowMills > 0){
counter.mark();
return counter.count() > limit;//开启熔断
}
return true;//有error 无配置limit 即默认开启
}
}
EventCounter 实现
LinkedList queue 简单模拟滑动时间窗口 例如 windowMillis = 5s count返回 queue size,即有多少次错误发生
public class EventCounter {
private long windowMillis;//circuitBreaker open持续时间 毫秒
private int capacity;
private LinkedList<Long> queue = new LinkedList();
public EventCounter(int capacity, long windowMillis) {
this.windowMillis = windowMillis;
this.capacity = capacity;
}
public void mark() {
long currentTimeMillis = System.currentTimeMillis();
synchronized (queue) {
if (queue.size() == capacity) {
queue.removeFirst();
}
queue.addLast(currentTimeMillis);
}
}
public int count(){
long currentTimeMillis = System.currentTimeMillis();
long removeTimesBeforeMillis = currentTimeMillis - windowMillis;
synchronized (queue){
while(!queue.isEmpty()&&queue.peek()<removeTimesBeforeMillis){
queue.removeFirst();
}
return queue.size();
}
}
public long getWindowMillis() {
return windowMillis;
}
public void setWindowMillis(long windowMillis) {
synchronized (queue) {
this.windowMillis = windowMillis;
}
}
public int getCapacity() {
return capacity;
}
public void setCapacity(int capacity) {
if (capacity <=0) {
return;
}
synchronized (queue){
if (capacity <this.capacity) {
while (capacity < queue.size()) {
queue.removeFirst();
}
}
}
this.capacity = capacity;
}
public LinkedList<Long> getQueue() {
return queue;
}
public void setQueue(LinkedList<Long> queue) {
this.queue = queue;
}
}
CircuitBreaker的Half-Closed如何实现呢?
public boolean isAllowRequest(){
if (isHardTrip) {
return false;
}
if(state==BreakerState.CLOSED){
return true;
}
System.out.println("lastFailure "+lastFailure.get());
System.out.println("resetMillis "+resetMillis.get());
if(state==BreakerState.OPEN&&System.currentTimeMillis()-lastFailure.get()>=resetMillis.get()){
System.out.println("this");
state = BreakerState.HALF_CLOSED;
return true;
}
return canAttempt();
}
private synchronized boolean canAttempt(){
if(!(BreakerState.HALF_CLOSED==state)||isAttemptLive){
return false;
}
return true;
}
invoke时 调用isAllowRequest即可
public void invoke(Runnable r) throws Exception {
if(!bypass){
if(!isAllowRequest()){
throw mapException(new CircuitBreakerException());
}
try{
isAttemptLive = true;
r.run();
close();
}catch (Throwable throwable){
handleFailure(throwable);
}
throw new IllegalStateException("can not goto here too.");
}else {
r.run();
}
}
总结
这里只是demo
Hystrix实现得更为优雅,使用了Rxjava,API非常友好流畅