一、事件通知流程
1. org.springframework.cloud.consul.config.ConfigWatch
类中watchConfigKeyValues
方法会定时拉取远端的Consul配置和内存中的进行比较,如果配置版本号不一致,说明配置有更新,发布RefreshEvent
事件。
if (newIndex != null && !newIndex.equals(currentIndex)) {
if (!this.consulIndexes.containsValue(newIndex) && !currentIndex.equals(-1L)) {
RefreshEventData data = new RefreshEventData(context, currentIndex, newIndex);
this.publisher.publishEvent(new RefreshEvent(this, data, data.toString()));
}
this.consulIndexes.put(context, newIndex);
}
2. org.springframework.cloud.endpoint.event.RefreshEventListener
监听器,在handle
方法中处理RefreshEvent事件,this.refresh
的类型为LegacyContextRefresher
。
public void onApplicationEvent(ApplicationEvent event) {
....
else if (event instanceof RefreshEvent) {
handle((RefreshEvent) event);
}
}
public void handle(RefreshEvent event) {
if (this.ready.get()) {
Set<String> keys = this.refresh.refresh();
}
...
}
3. org.springframework.cloud.context.refresh.LegacyContextRefresher
public synchronized Set<String> refresh() {
Set<String> keys = refreshEnvironment();
this.scope.refreshAll();
return keys;
}
public synchronized Set<String> refreshEnvironment() {
Map<String, Object> before = extract(this.context.getEnvironment().getPropertySources());
updateEnvironment();//刷新Environment
Set<String> keys = changes(before, extract(this.context.getEnvironment().getPropertySources())).keySet();
this.context.publishEvent(new EnvironmentChangeEvent(this.context, keys));
return keys;
}
refreshEnvironment()
方法主要比对了新老配置中变化的key以及刷新了当前Environment,this.scope
的类型为RefreshScope
,调用其refreshAll
方法进行刷新操作。
4. org.springframework.cloud.context.scope.refresh.RefreshScope
public void refreshAll() {
super.destroy();
this.context.publishEvent(new RefreshScopeRefreshedEvent());
}
public void destroy() {
Collection<BeanLifecycleWrapper> wrappers = this.cache.clear();
for (BeanLifecycleWrapper wrapper : wrappers) {
try {
Lock lock = this.locks.get(wrapper.getName()).writeLock();
lock.lock();
try {
wrapper.destroy();
}
finally {
lock.unlock();
}
}
...
}
...
}
先调用父类的destroy
方法,将缓存里的BeanLifecycleWrapper
全部destroy
(具体什么是BeanLifecycleWrapper
一会再说),再发布一个RefreshScopeRefreshedEvent
事件。
二、微服务刷新流程
需要动态刷新的Bean要在类上或者@Bean
方法上标记@RefreshScope
,被@RefreshScope
标记的Bean的Scope属性为refresh。
@Scope("refresh")
public @interface RefreshScope {
ScopedProxyMode proxyMode() default ScopedProxyMode.TARGET_CLASS;
}
常见的Scope有singleton、prototype等,在Bean的创建流程中,AbstractBeanFactory
中的doGetBean
对Scope为refresh的会被单独处理。
protected <T> T doGetBean(String name, Class<T> requiredType, @Nullable Object[] args, boolean typeCheckOnly) {
...
if (mbd.isSingleton()) {
//处理singleton
...
} else if (mbd.isPrototype()) {
//处理prototype
} else {
String scopeName = mbd.getScope(); //refresh
Scope scope = this.scopes.get(scopeName); //类型为RefreshScope
Object scopedInstance = scope.get(beanName, () -> {
...
return createBean(beanName, mbd, args);
...
});
}
...
}
RefreshScope
继承了GenericScope
,get
方法为:
public Object get(String name, ObjectFactory<?> objectFactory) {
BeanLifecycleWrapper value = this.cache.put(name, new BeanLifecycleWrapper(name, objectFactory));
this.locks.putIfAbsent(name, new ReentrantReadWriteLock());
try {
return value.getBean();
}
...
}
RefreshScope
将传入的beanName
和ObjectFactory
的lambda包装进了BeanLifecycleWrapper
类,然后将包装类放进了this.cache
中:
private static class BeanLifecycleWrapper {
private final String name;
private final ObjectFactory<?> objectFactory;
private volatile Object bean;
...
public Object getBean() {
if (this.bean == null) {
synchronized (this.name) {
if (this.bean == null) {
this.bean = this.objectFactory.getObject();
}
}
}
return this.bean;
}
}
Scope为refresh的Bean的具体创建过程比较复杂,这里只捡一下重点,就是Bean的获取被包装了一层,即如果包装类中的Bean存在就直接返回Bean,如果Bean不存在,就使用传入的ObjectFactory
的getObject
方法重新创建Bean并返回。
再回到第一部分说的RefreshScope
类中的destroy
方法,方法中将this.cache
的BeanLifecycleWrapper
全部取出,挨个调用其destroy
方法:
public void destroy() {
Collection<BeanLifecycleWrapper> wrappers = this.cache.clear();
for (BeanLifecycleWrapper wrapper : wrappers) {
try {
Lock lock = this.locks.get(wrapper.getName()).writeLock();
lock.lock();
try {
wrapper.destroy();
}
finally {
lock.unlock();
}
}
...
}
...
}
BeanLifecycleWrapper
的destroy
如下:
private static class BeanLifecycleWrapper {
private final String name;
private final ObjectFactory<?> objectFactory;
private volatile Object bean;
private Runnable callback;
...
public void destroy() {
...
synchronized (this.name) {
Runnable callback = this.callback;
if (callback != null) {
callback.run();
}
this.callback = null;
this.bean = null;
}
}
}
BeanLifecycleWrapper
的destroy
里先执行callback(一般为DisposableBeanAdapter,用于执行Bean的destroy方法),再将保存的bean置为null,这样下次在获取bean的时候,bean不存在,就会使用ObjectFactory
重新创建一个bean,于是重新走一遍这个bean的实例化、属性填充、初始化的流程,如果在该bean里注入了属性,该属性也会在属性填充过程中从被更新的环境中获取到最新的值,从而实现了动态刷新。
注意:
- 因为
RefreshScope
类中的destroy
方法是将this.cache
的BeanLifecycleWrapper
全部取出,挨个调用其destroy
方法,所以如果Consul配置发生变化后,所有被标记@RefreshScope
的Bean都会被重新创建,而不是只创建修改的属性所涉及的Bean - 如果
@RefreshScope
标记的Bean在另一个Bean里被依赖了,那么这个Bean在创建时会生成一个动态代理对象Bean$$Enhancer,然后把这个代理对象注入到另一个Bean中,动态刷新的时候这个代理Bean不会变,调用方法时内部还是按照BeanLifecycleWrapper
流程走一遍
流程总结:
- 被
@RefreshScope
标记的Bean在生成时会被包装一层,包装内部保存了该Bean和该Bean的创建工厂 - 当配置刷新时,包装类会将内部的Bean清除,待下一次获取时使用工厂重新创建Bean,从而获得最新的配置
- 当被其它Bean依赖时,会生成新的动态代理对象注入到其它Bean中,刷新时动态代理对象不变
三、网关刷新流程
自定义网关的route扫描
先说个题外话,如何自定义网关的route扫描,让Spring Gateway能够识别并加载,以默认自带的为例,实现RouteDefinitionLocator接口,然后将其注册为Bean:
org.springframework.cloud.gateway.config.PropertiesRouteDefinitionLocator
public class PropertiesRouteDefinitionLocator implements RouteDefinitionLocator {
private final GatewayProperties properties;
public PropertiesRouteDefinitionLocator(GatewayProperties properties) {
this.properties = properties;
}
@Override
public Flux<RouteDefinition> getRouteDefinitions() {
return Flux.fromIterable(this.properties.getRoutes());
}
}
org.springframework.cloud.gateway.config.GatewayProperties
@ConfigurationProperties(GatewayProperties.PREFIX)
@Validated
public class GatewayProperties {
public static final String PREFIX = "spring.cloud.gateway";
private List<RouteDefinition> routes = new ArrayList<>();
private List<FilterDefinition> defaultFilters = new ArrayList<>();
private List<MediaType> streamingMediaTypes = Arrays.asList(MediaType.TEXT_EVENT_STREAM,
MediaType.APPLICATION_STREAM_JSON);
private boolean failOnRouteDefinitionError = true;
...getter setter
}
在GatewayAutoConfiguration
中标记@EnableConfigurationProperties
,然后将PropertiesRouteDefinitionLocator
声明为Bean。
@RefreshScope刷新
网关中被标记@RefreshScope
的Bean的刷新方式和微服务一致,
RouteProperties属性更新
回到之前刷新流程org.springframework.cloud.context.refresh.LegacyContextRefresher
的refreshEnvironment
方法中。
public synchronized Set<String> refreshEnvironment() {
Map<String, Object> before = extract(this.context.getEnvironment().getPropertySources());
updateEnvironment();//刷新环境
Set<String> keys = changes(before, extract(this.context.getEnvironment().getPropertySources())).keySet();
this.context.publishEvent(new EnvironmentChangeEvent(this.context, keys));
return keys;
}
updateEnvironment
方法中调用了addConfigFilesToEnvironment
方法。
protected void updateEnvironment() {
addConfigFilesToEnvironment();
}
ConfigurableApplicationContext addConfigFilesToEnvironment() {
ConfigurableApplicationContext capture = null;
try {
...
SpringApplicationBuilder builder = new SpringApplicationBuilder(Empty.class).bannerMode(Banner.Mode.OFF)
.web(WebApplicationType.NONE).environment(environment);
builder.application().setListeners(
Arrays.asList(new BootstrapApplicationListener(), new BootstrapConfigFileApplicationListener()));
capture = builder.run();
...
}
}
addConfigFilesToEnvironment
方法中的调用链比较长,这一部分最终调用调用到org.springframework.boot.context.event.EventPublishingRunListener
的environmentPrepared
方法,并发送ApplicationEnvironmentPreparedEvent
事件。
@Override
public void environmentPrepared(ConfigurableBootstrapContext bootstrapContext,
ConfigurableEnvironment environment) {
this.initialMulticaster.multicastEvent(
new ApplicationEnvironmentPreparedEvent(bootstrapContext, this.application, this.args, environment));
}
在org.springframework.boot.env.EnvironmentPostProcessorApplicationListener
中接收ApplicationEnvironmentPreparedEvent
事件,最终执行到org.springframework.boot.context.properties.bind.Binder
类中重新获取属性。
TODO:ConfigurationPropertiesRebinder、ConfigurationPropertiesBeans
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof ApplicationEnvironmentPreparedEvent) {
onApplicationEnvironmentPreparedEvent((ApplicationEnvironmentPreparedEvent) event);
}
...
}
org.springframework.boot.context.properties.bind.Binder
private <T> Object bindObject(ConfigurationPropertyName name, Bindable<T> target, BindHandler handler, Context context, boolean allowRecursiveBinding) {
ConfigurationProperty property = findProperty(name, target, context);
if (property == null && context.depth != 0 && containsNoDescendantOf(context.getSources(), name)) {
return null;
}
AggregateBinder<?> aggregateBinder = getAggregateBinder(target, context);
if (aggregateBinder != null) {
return bindAggregate(name, target, handler, context, aggregateBinder);
}
if (property != null) {
try {
return bindProperty(target, context, property);
}
...
}
return bindDataObject(name, target, handler, context, allowRecursiveBinding);
}
private Object bindDataObject(ConfigurationPropertyName name, Bindable<?> target, BindHandler handler, Context context, boolean allowRecursiveBinding) {
...
Class<?> type = target.getType().resolve(Object.class);
DataObjectPropertyBinder propertyBinder = (propertyName, propertyTarget) -> bind(name.append(propertyName), propertyTarget, handler, context, false, false);
return context.withDataObject(type, () -> {
for (DataObjectBinder dataObjectBinder : this.dataObjectBinders) {
Object instance = dataObjectBinder.bind(name, target, context, propertyBinder);
if (instance != null) {
return instance;
}
}
return null;
});
}
LegacyContextRefresher
类的refreshEnvironment
方法中还发送了EnvironmentChangeEvent
事件。
public synchronized Set<String> refreshEnvironment() {
Map<String, Object> before = extract(this.context.getEnvironment().getPropertySources());
updateEnvironment();//刷新环境
Set<String> keys = changes(before, extract(this.context.getEnvironment().getPropertySources())).keySet();
this.context.publishEvent(new EnvironmentChangeEvent(this.context, keys));
return keys;
}
org.springframework.cloud.context.properties.ConfigurationPropertiesRebinder
监听了EnvironmentChangeEvent
事件,对属性的值进行重新绑定。
public void onApplicationEvent(EnvironmentChangeEvent event) {
if (this.applicationContext.equals(event.getSource()) || event.getKeys().equals(event.getSource())) {
rebind();
}
}
流程比较复杂,只捡重要的,主流程为从最新配置中重新获取Properties的各个属性的值,调用原Properties的setter方法给新属性赋值。
public class GatewayProperties {
private List<RouteDefinition> routes = new ArrayList<>();
public void setRoutes(List<RouteDefinition> routes) {
this.routes = routes; //调用setter方法赋值
}
...
}
更新样例:
-
更新前:
spring: cloud: gateway: routes: - id: MyService uri: http://MyService predicates: - Path=/MyService/**
打印对应的
GatewayProperties
为:System.out.println(gatewayProperties); System.out.println(gatewayProperties.getRoutes().size());
打印为:
GatewayProperties@00000001 1
-
更新后:
spring: cloud: gateway: routes: - id: MyService uri: http://MyService predicates: - Path=/MyService/** - id: my_service_2 uri: http://my-service-2 predicates: - Path=/my-service-2/**
打印对应的
GatewayProperties
为:System.out.println(gatewayProperties); System.out.println(gatewayProperties.getRoutes().size());
打印为:
GatewayProperties@00000001 2
注意:原Properties自身的地址不会变,即Properties没有被重新创建,只是属性被更新了。
路由刷新:
路由相关的Properties被更新属性完成后,网关需要将配置重新被解析变成真正Route的才会生效。
在回到org.springframework.cloud.context.scope.refresh.RefreshScope
的refreshAll
方法,在调用父类的destroy
方法后,还发送了RefreshScopeRefreshedEvent
事件。
public void refreshAll() {
super.destroy();
this.context.publishEvent(new RefreshScopeRefreshedEvent());
}
org.springframework.cloud.gateway.route.RouteRefreshListener
监听了RefreshScopeRefreshedEvent
事件,在其reset
方法中又发出了RefreshRoutesEvent
事件。
public void onApplicationEvent(ApplicationEvent event) {
...
else if (event instanceof RefreshScopeRefreshedEvent || event instanceof InstanceRegisteredEvent) {
reset();
}
...
}
private void reset() {
this.publisher.publishEvent(new RefreshRoutesEvent(this));
}
org.springframework.cloud.gateway.route.CachingRouteLocator
监听了RefreshRoutesEvent
事件,准备对路由信息进行刷新。
public void onApplicationEvent(RefreshRoutesEvent event) {
try {
fetch().collect(Collectors.toList()).subscribe(
list -> Flux.fromIterable(list).materialize().collect(Collectors.toList()).subscribe(signals -> {
applicationEventPublisher.publishEvent(new RefreshRoutesResultEvent(this));
cache.put(CACHE_KEY, signals);
}, this::handleRefreshError), this::handleRefreshError);
}
...
}
private Flux<Route> fetch() {
return this.delegate.getRoutes().sort(AnnotationAwareOrderComparator.INSTANCE);
}
fetch
为获取最新的Route,最终调用到org.springframework.cloud.gateway.route.RouteDefinitionRouteLocator
中,先去获取之前提到的各个自定义网关的route扫描配置,再将其转换为Route,代码中先调用了this.routeDefinitionLocator.getRouteDefinitions()
获取Route定义。
public Flux<Route> getRoutes() {
Flux<Route> routes = this.routeDefinitionLocator.getRouteDefinitions().map(this::convertToRoute);
...
}
this.routeDefinitionLocator
的类型为org.springframework.cloud.gateway.route.CompositeRouteDefinitionLocator
。
public class CompositeRouteDefinitionLocator implements RouteDefinitionLocator {
private final Flux<RouteDefinitionLocator> delegates;
public CompositeRouteDefinitionLocator(Flux<RouteDefinitionLocator> delegates) {
this(delegates, new AlternativeJdkIdGenerator());
}
...
public Flux<RouteDefinition> getRouteDefinitions() {
return this.delegates.flatMapSequential(RouteDefinitionLocator::getRouteDefinitions)
.flatMap(routeDefinition -> {
if (routeDefinition.getId() == null) {
return randomId().map(id -> {
routeDefinition.setId(id);
if (log.isDebugEnabled()) {
log.debug("Id set on route definition: " + routeDefinition);
}
return routeDefinition;
});
}
return Mono.just(routeDefinition);
});
}
}
this.delegates
为生成Bean时传入。
@Bean
@Primary
public RouteDefinitionLocator routeDefinitionLocator(List<RouteDefinitionLocator> routeDefinitionLocators) {
return new CompositeRouteDefinitionLocator(Flux.fromIterable(routeDefinitionLocators));
}
而RouteDefinitionLocator
的实现类就是上面提到的【如何自定义网关的route扫描】的实现类,这样流程就串起来了。
流程总结:
- 网关中被标记
@RefreshScope
的Bean的刷新方式和微服务一致 - Routes在刷新时会通过事件触发路由信息重新从最新配置中获取,获取完后调用setter方法将新的属性设置到原
GatewayProperties
类上,但GatewayProperties
本身不会被重新创建,如果有自定义的XXGatewayProperties
,也会执行此流程更新属性 - 更新完所有
GatewayProperties
的属性后,事件触发路由刷新流程,将所有GatewayProperties
中的信息重新获取并转换成真正Route并保存起来,完成Route刷新流程