一、事件通知流程

1. org.springframework.cloud.consul.config.ConfigWatch

类中watchConfigKeyValues方法会定时拉取远端的Consul配置和内存中的进行比较,如果配置版本号不一致,说明配置有更新,发布RefreshEvent事件。

if (newIndex != null && !newIndex.equals(currentIndex)) {
	if (!this.consulIndexes.containsValue(newIndex) && !currentIndex.equals(-1L)) {
		RefreshEventData data = new RefreshEventData(context, currentIndex, newIndex);
		this.publisher.publishEvent(new RefreshEvent(this, data, data.toString()));
	}
	this.consulIndexes.put(context, newIndex);
}

2. org.springframework.cloud.endpoint.event.RefreshEventListener

监听器,在handle方法中处理RefreshEvent事件,this.refresh的类型为LegacyContextRefresher

public void onApplicationEvent(ApplicationEvent event) {
    ....
	else if (event instanceof RefreshEvent) {
		handle((RefreshEvent) event);
	}
}
public void handle(RefreshEvent event) {
	if (this.ready.get()) {
		Set<String> keys = this.refresh.refresh();
	}
    ...
}

3. org.springframework.cloud.context.refresh.LegacyContextRefresher

public synchronized Set<String> refresh() {
	Set<String> keys = refreshEnvironment();
	this.scope.refreshAll();
	return keys;
}
public synchronized Set<String> refreshEnvironment() {
	Map<String, Object> before = extract(this.context.getEnvironment().getPropertySources());
	updateEnvironment();//刷新Environment
	Set<String> keys = changes(before, extract(this.context.getEnvironment().getPropertySources())).keySet();
	this.context.publishEvent(new EnvironmentChangeEvent(this.context, keys));
	return keys;
}

refreshEnvironment()方法主要比对了新老配置中变化的key以及刷新了当前Environment,this.scope的类型为RefreshScope,调用其refreshAll方法进行刷新操作。

4. org.springframework.cloud.context.scope.refresh.RefreshScope

public void refreshAll() {
	super.destroy();
	this.context.publishEvent(new RefreshScopeRefreshedEvent());
}
public void destroy() {
	Collection<BeanLifecycleWrapper> wrappers = this.cache.clear();
	for (BeanLifecycleWrapper wrapper : wrappers) {
		try {
			Lock lock = this.locks.get(wrapper.getName()).writeLock();
			lock.lock();
			try {
				wrapper.destroy();
			}
			finally {
				lock.unlock();
			}
		}
		...
	}
	...
}

先调用父类的destroy方法,将缓存里的BeanLifecycleWrapper全部destroy(具体什么是BeanLifecycleWrapper一会再说),再发布一个RefreshScopeRefreshedEvent事件。

二、微服务刷新流程

需要动态刷新的Bean要在类上或者@Bean方法上标记@RefreshScope,被@RefreshScope标记的Bean的Scope属性为refresh。

@Scope("refresh")
public @interface RefreshScope {
	ScopedProxyMode proxyMode() default ScopedProxyMode.TARGET_CLASS;
}

常见的Scope有singleton、prototype等,在Bean的创建流程中,AbstractBeanFactory中的doGetBean对Scope为refresh的会被单独处理。

protected <T> T doGetBean(String name, Class<T> requiredType, @Nullable Object[] args, boolean typeCheckOnly) {
    ...
    if (mbd.isSingleton()) {
        //处理singleton
        ...
    } else if (mbd.isPrototype()) {
        //处理prototype
    } else {
        String scopeName = mbd.getScope(); //refresh
		Scope scope = this.scopes.get(scopeName); //类型为RefreshScope
        Object scopedInstance = scope.get(beanName, () -> {
			...
			return createBean(beanName, mbd, args);
			...
		});
    }
    ...
}

RefreshScope继承了GenericScopeget方法为:

public Object get(String name, ObjectFactory<?> objectFactory) {
	BeanLifecycleWrapper value = this.cache.put(name, new BeanLifecycleWrapper(name, objectFactory));
	this.locks.putIfAbsent(name, new ReentrantReadWriteLock());
	try {
		return value.getBean();
	}
    ...
}

RefreshScope将传入的beanNameObjectFactory的lambda包装进了BeanLifecycleWrapper类,然后将包装类放进了this.cache中:

private static class BeanLifecycleWrapper {
	private final String name;
	private final ObjectFactory<?> objectFactory;
	private volatile Object bean;
    ...
    public Object getBean() {
		if (this.bean == null) {
			synchronized (this.name) {
				if (this.bean == null) {
					this.bean = this.objectFactory.getObject();
				}
			}
		}
		return this.bean;
	}
}

Scope为refresh的Bean的具体创建过程比较复杂,这里只捡一下重点,就是Bean的获取被包装了一层,即如果包装类中的Bean存在就直接返回Bean,如果Bean不存在,就使用传入的ObjectFactorygetObject方法重新创建Bean并返回。

再回到第一部分说的RefreshScope类中的destroy方法,方法中将this.cacheBeanLifecycleWrapper全部取出,挨个调用其destroy方法:

public void destroy() {
	Collection<BeanLifecycleWrapper> wrappers = this.cache.clear();
	for (BeanLifecycleWrapper wrapper : wrappers) {
		try {
			Lock lock = this.locks.get(wrapper.getName()).writeLock();
			lock.lock();
			try {
				wrapper.destroy();
			}
			finally {
				lock.unlock();
			}
		}
		...
	}
	...
}

BeanLifecycleWrapperdestroy如下:

private static class BeanLifecycleWrapper {
	private final String name;
	private final ObjectFactory<?> objectFactory;
	private volatile Object bean;
    private Runnable callback;
    ...
    
    public void destroy() {
		...
		synchronized (this.name) {
            Runnable callback = this.callback;
            if (callback != null) {
                callback.run();
            }
            this.callback = null;
            this.bean = null;
		}
	}
}

BeanLifecycleWrapperdestroy里先执行callback(一般为DisposableBeanAdapter,用于执行Bean的destroy方法),再将保存的bean置为null,这样下次在获取bean的时候,bean不存在,就会使用ObjectFactory重新创建一个bean,于是重新走一遍这个bean的实例化、属性填充、初始化的流程,如果在该bean里注入了属性,该属性也会在属性填充过程中从被更新的环境中获取到最新的值,从而实现了动态刷新。

注意:

流程总结:

三、网关刷新流程

自定义网关的route扫描

先说个题外话,如何自定义网关的route扫描,让Spring Gateway能够识别并加载,以默认自带的为例,实现RouteDefinitionLocator接口,然后将其注册为Bean:

org.springframework.cloud.gateway.config.PropertiesRouteDefinitionLocator

public class PropertiesRouteDefinitionLocator implements RouteDefinitionLocator {

	private final GatewayProperties properties;

	public PropertiesRouteDefinitionLocator(GatewayProperties properties) {
		this.properties = properties;
	}

	@Override
	public Flux<RouteDefinition> getRouteDefinitions() {
		return Flux.fromIterable(this.properties.getRoutes());
	}
}

org.springframework.cloud.gateway.config.GatewayProperties

@ConfigurationProperties(GatewayProperties.PREFIX)
@Validated
public class GatewayProperties {

	public static final String PREFIX = "spring.cloud.gateway";

	private List<RouteDefinition> routes = new ArrayList<>();

	private List<FilterDefinition> defaultFilters = new ArrayList<>();

	private List<MediaType> streamingMediaTypes = Arrays.asList(MediaType.TEXT_EVENT_STREAM,
			MediaType.APPLICATION_STREAM_JSON);

	private boolean failOnRouteDefinitionError = true;

    ...getter setter
}

GatewayAutoConfiguration中标记@EnableConfigurationProperties,然后将PropertiesRouteDefinitionLocator声明为Bean。

@RefreshScope刷新

网关中被标记@RefreshScope的Bean的刷新方式和微服务一致,

RouteProperties属性更新

回到之前刷新流程org.springframework.cloud.context.refresh.LegacyContextRefresherrefreshEnvironment方法中。

public synchronized Set<String> refreshEnvironment() {
	Map<String, Object> before = extract(this.context.getEnvironment().getPropertySources());
	updateEnvironment();//刷新环境
	Set<String> keys = changes(before, extract(this.context.getEnvironment().getPropertySources())).keySet();
	this.context.publishEvent(new EnvironmentChangeEvent(this.context, keys));
	return keys;
}

updateEnvironment方法中调用了addConfigFilesToEnvironment方法。

protected void updateEnvironment() {
	addConfigFilesToEnvironment();
}

ConfigurableApplicationContext addConfigFilesToEnvironment() {
    ConfigurableApplicationContext capture = null;
    try {
      	...
        SpringApplicationBuilder builder = new SpringApplicationBuilder(Empty.class).bannerMode(Banner.Mode.OFF)
                .web(WebApplicationType.NONE).environment(environment);
        builder.application().setListeners(
                Arrays.asList(new BootstrapApplicationListener(), new BootstrapConfigFileApplicationListener()));
        capture = builder.run();
        ...
    }    
}

addConfigFilesToEnvironment方法中的调用链比较长,这一部分最终调用调用到org.springframework.boot.context.event.EventPublishingRunListenerenvironmentPrepared方法,并发送ApplicationEnvironmentPreparedEvent事件。

@Override
public void environmentPrepared(ConfigurableBootstrapContext bootstrapContext,
	ConfigurableEnvironment environment) {
	this.initialMulticaster.multicastEvent(
			new ApplicationEnvironmentPreparedEvent(bootstrapContext, this.application, this.args, environment));
}

org.springframework.boot.env.EnvironmentPostProcessorApplicationListener中接收ApplicationEnvironmentPreparedEvent事件,最终执行到org.springframework.boot.context.properties.bind.Binder类中重新获取属性。

TODO:ConfigurationPropertiesRebinder、ConfigurationPropertiesBeans

public void onApplicationEvent(ApplicationEvent event) {
	if (event instanceof ApplicationEnvironmentPreparedEvent) {
		onApplicationEnvironmentPreparedEvent((ApplicationEnvironmentPreparedEvent) event);
	}
	...
}

org.springframework.boot.context.properties.bind.Binder

private <T> Object bindObject(ConfigurationPropertyName name, Bindable<T> target, BindHandler handler, Context context, boolean allowRecursiveBinding) {
	ConfigurationProperty property = findProperty(name, target, context);
	if (property == null && context.depth != 0 && containsNoDescendantOf(context.getSources(), name)) {
		return null;
	}
	AggregateBinder<?> aggregateBinder = getAggregateBinder(target, context);
	if (aggregateBinder != null) {
		return bindAggregate(name, target, handler, context, aggregateBinder);
	}
	if (property != null) {
		try {
			return bindProperty(target, context, property);
		}
		...
	}
	return bindDataObject(name, target, handler, context, allowRecursiveBinding);
}
private Object bindDataObject(ConfigurationPropertyName name, Bindable<?> target, BindHandler                            handler, Context context, boolean allowRecursiveBinding) {
    ...
    Class<?> type = target.getType().resolve(Object.class);
	DataObjectPropertyBinder propertyBinder = (propertyName, propertyTarget) -> bind(name.append(propertyName), propertyTarget, handler, context, false, false);
	return context.withDataObject(type, () -> {
		for (DataObjectBinder dataObjectBinder : this.dataObjectBinders) {
			Object instance = dataObjectBinder.bind(name, target, context, propertyBinder);
			if (instance != null) {
				return instance;
			}
		}
		return null;
	});
}

LegacyContextRefresher类的refreshEnvironment方法中还发送了EnvironmentChangeEvent事件。

public synchronized Set<String> refreshEnvironment() {
	Map<String, Object> before = extract(this.context.getEnvironment().getPropertySources());
	updateEnvironment();//刷新环境
	Set<String> keys = changes(before, extract(this.context.getEnvironment().getPropertySources())).keySet();
	this.context.publishEvent(new EnvironmentChangeEvent(this.context, keys));
	return keys;
}

org.springframework.cloud.context.properties.ConfigurationPropertiesRebinder监听了EnvironmentChangeEvent事件,对属性的值进行重新绑定。

public void onApplicationEvent(EnvironmentChangeEvent event) {
	if (this.applicationContext.equals(event.getSource()) || event.getKeys().equals(event.getSource())) {
		rebind();
	}
}

流程比较复杂,只捡重要的,主流程为从最新配置中重新获取Properties的各个属性的值,调用原Properties的setter方法给新属性赋值。

public class GatewayProperties {
    private List<RouteDefinition> routes = new ArrayList<>();
    
    public void setRoutes(List<RouteDefinition> routes) {
        this.routes = routes; //调用setter方法赋值
    }
    ...
}

更新样例:

注意:原Properties自身的地址不会变,即Properties没有被重新创建,只是属性被更新了。

路由刷新:

路由相关的Properties被更新属性完成后,网关需要将配置重新被解析变成真正Route的才会生效。

在回到org.springframework.cloud.context.scope.refresh.RefreshScoperefreshAll方法,在调用父类的destroy方法后,还发送了RefreshScopeRefreshedEvent事件。

public void refreshAll() {
	super.destroy();
	this.context.publishEvent(new RefreshScopeRefreshedEvent());
}

org.springframework.cloud.gateway.route.RouteRefreshListener监听了RefreshScopeRefreshedEvent事件,在其reset方法中又发出了RefreshRoutesEvent事件。

public void onApplicationEvent(ApplicationEvent event) {
	...
	else if (event instanceof RefreshScopeRefreshedEvent || event instanceof InstanceRegisteredEvent) {
		reset();
	}
	...
}

private void reset() {
	this.publisher.publishEvent(new RefreshRoutesEvent(this));
}

org.springframework.cloud.gateway.route.CachingRouteLocator监听了RefreshRoutesEvent事件,准备对路由信息进行刷新。

public void onApplicationEvent(RefreshRoutesEvent event) {
	try {
		fetch().collect(Collectors.toList()).subscribe(
				list -> Flux.fromIterable(list).materialize().collect(Collectors.toList()).subscribe(signals -> {
					applicationEventPublisher.publishEvent(new RefreshRoutesResultEvent(this));
					cache.put(CACHE_KEY, signals);
				}, this::handleRefreshError), this::handleRefreshError);
	}
	...
}

private Flux<Route> fetch() {
	return this.delegate.getRoutes().sort(AnnotationAwareOrderComparator.INSTANCE);
}

fetch为获取最新的Route,最终调用到org.springframework.cloud.gateway.route.RouteDefinitionRouteLocator中,先去获取之前提到的各个自定义网关的route扫描配置,再将其转换为Route,代码中先调用了this.routeDefinitionLocator.getRouteDefinitions()获取Route定义。

public Flux<Route> getRoutes() {
	Flux<Route> routes = this.routeDefinitionLocator.getRouteDefinitions().map(this::convertToRoute);
    ...
}

this.routeDefinitionLocator的类型为org.springframework.cloud.gateway.route.CompositeRouteDefinitionLocator

public class CompositeRouteDefinitionLocator implements RouteDefinitionLocator {

	private final Flux<RouteDefinitionLocator> delegates;

	public CompositeRouteDefinitionLocator(Flux<RouteDefinitionLocator> delegates) {
		this(delegates, new AlternativeJdkIdGenerator());
	}
    
    ...
        
    public Flux<RouteDefinition> getRouteDefinitions() {
        return this.delegates.flatMapSequential(RouteDefinitionLocator::getRouteDefinitions)
                .flatMap(routeDefinition -> {
                    if (routeDefinition.getId() == null) {
                        return randomId().map(id -> {
                            routeDefinition.setId(id);
                            if (log.isDebugEnabled()) {
                                log.debug("Id set on route definition: " + routeDefinition);
                            }
                            return routeDefinition;
                        });
                    }
                    return Mono.just(routeDefinition);
                });
	}
    
}

this.delegates为生成Bean时传入。

@Bean
@Primary
public RouteDefinitionLocator routeDefinitionLocator(List<RouteDefinitionLocator> routeDefinitionLocators) {
	return new CompositeRouteDefinitionLocator(Flux.fromIterable(routeDefinitionLocators));
}

RouteDefinitionLocator的实现类就是上面提到的【如何自定义网关的route扫描】的实现类,这样流程就串起来了。

流程总结: