Spring Cloud 源码学习之 Hystrix Metrics 收集

 2019-11-23 11:09  阅读(1033)
文章分类:Spring Cloud

欢迎访问陈同学博客原文

文中源码基于 Spring Cloud Finchley.SR1、Spring Boot 2.0.6.RELEASE.

Hystrix 其他文章:Spring Cloud 源码学习之 Hystrix 入门Spring Cloud 源码学习之 Hystrix 工作原理Spring Cloud 之 Hystrix 跨线程传递数据

在 Hystrix Command 执行过程(开始执行、结束执行、异常、超时)时会不断发出各类事件,通过收集这些数据,提供给消费者。如断路器、Hystrix Dashboard可以统计分析这些数据,从而完成特定的功能。

本文学习了 Metrics 收集的源码,并整理成下图。由于 Hystrix 发出的事件种类很多,本文仅以命令结束执行事件作为学习实例。

20191123100239\_1.png

Subject简述

Hystrix 基于 RxJava,本文涉及到 Subject 概念,这里提一下 rx.subjects.Subject

public abstract class Subject<T, R> extends Observable<R> implements Observer<T> {}

Subject 继承Observable,因此可作为被观察者、数据源,也就是一个数据发射器;

实现了接口 Observer,因此可作为观察者,可以订阅其他Observable,处理Observable发射出的数据。

因此,Subject既可以发射数据,也可以接收数据。类比于菜鸟驿站,可以收、发快递

Metrics 收集流程

整个过程分成以下三步:

1.使用HystrixCommandMetrics记录metrics

每个Command的构造器中会获取一个HystrixCommandMetrics工具,用来记录metrics。

// 构造器利用HystrixCommandMetrics获取命令key对应的对象
    HystrixCommandMetrics.getInstance(commandKey, groupKey, threadPoolKey, properties);
    // HystrixCommandMetrics 中存储HystrixCommandMetrics的数据结构
    private static final ConcurrentHashMap<String, HystrixCommandMetrics> metrics;

也就是说,每个CommandKey会拥有一个对应的HystrixCommandMetrics工具。

例如:A服务利用Feign远程调用B服务,那下面的 service-B 会自动作为命令的key。

@FeignClient(name = "service-B")

下面是利用HystrixCommandMetrics工具发射 标记命令结束 的事件代码:

void markCommandDone(...) {
        HystrixThreadEventStream.getInstance().executionDone(...);
    }

2.Per-Thread 事件处理者

HystrixCommandMetrics提供了基础工具方法给Command使用,而HystrixCommandMetrics的实际使用的是HystrixThreadEventStream: Per-thread event stream

它是线程级别的数据处理者,每个线程拥有自己的HystrixThreadEventStream,HystrixThreadEventStream.getInstance() 是从ThreadLocal中获取对象。

它包含了很多Subject<事件,事件>,用来接收和发射数据。下面是HystrixThreadEventStream类:

public class HystrixThreadEventStream {
        // Per-thread 的HystrixThreadEventStream
        private static final ThreadLocal<HystrixThreadEventStream> threadLocalStreams;
        // 用来接收和发射HystrixCommandCompletion事件的Subject
        private final Subject<HystrixCommandCompletion, HystrixCommandCompletion> writeOnlyCommandCompletionSubject;
    }

HystrixCommandCompletion是事件(HystrixCommandEvent)的一种,writeOnlyCommandCompletionSubject这个Subject的初始化方式如下:

// 创建为一个数据发射器
    writeOnlyCommandCompletionSubject = PublishSubject.create();
    writeOnlyCommandCompletionSubject
            .onBackpressureBuffer()
            // 绑定发射数据时的处理者
            .doOnNext(writeCommandCompletionsToShardedStreams)
            .unsafeSubscribe(Subscribers.empty());

writeCommandCompletionsToShardedStreams会怎么处理数据呢?下面是它的定义:

// 它是一个可执行的实体,没有返回值,可以传入一个参数; 和 Runnable很像
    private static final Action1<HystrixCommandCompletion> writeCommandCompletionsToShardedStreams = new Action1<HystrixCommandCompletion>() {
        // 当接收到数据时, 又将数据发送给了command级别的处理者
        @Override
        public void call(HystrixCommandCompletion commandCompletion) {
            // 获取CommandKey对应的HystrixCommandCompletionStream
            HystrixCommandCompletionStream commandStream = HystrixCommandCompletionStream.getInstance(commandCompletion.getCommandKey());
            // 写入数据
            commandStream.write(commandCompletion);
            ...
        }
    };

现在再回过来看HystrixThreadEventStream这个Per-thread的工具发射 标记命令结束事件 的代码:

public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {
        // 构建命令结束的数据对象
        HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey);
        // 利用上面的Subject发射数据, onNext()就是发射一条数据。
        writeOnlyCommandCompletionSubject.onNext(event);
    }

由于writeOnlyCommandCompletionSubject绑定了数据处理者(上面的writeCommandCompletionsToShardedStreams这个Action1)。它会利用command级别的工具来发射数据。

3.Per-Command 事件处理者

通过上一步知道,每个线程有自己的工具(HystrixThreadEventStream)来处理数据,最终这个工具利用了命令级别的工具。上面的HystrixCommandCompletionStream 属于 HystrixEventStream 的一种,HystrixEventStream专门用于处理command级别的数据,它有如下几个子类:

HystrixCommandCompletionStream
    HystrixCommandStartStream
    HystrixThreadPoolCompletionStream
    HystrixThreadPoolStartStream
    HystrixCollapserEventStream

这几个子类都是用来处理特定类型事件的工具,以HystrixCommandCompletionStream为例子,这些子类的结构都很类似,可以接收数据,并将数据提供给其他消费者。

public class HystrixCommandCompletionStream {
        // 一个用于接收和发射结束事件的Subject
        private final Subject<HystrixCommandCompletion, HystrixCommandCompletion> writeOnlySubject;
        // 一个Observable,将接收到的数据作为数据源发射给其他消费者
        private final Observable<HystrixCommandCompletion> readOnlyStream;
    }

先看看这个Per-Command 的对象是怎么创建的?

// 存储结构
    private static final ConcurrentMap<String, HystrixCommandCompletionStream> streams = new ConcurrentHashMap<String, HystrixCommandCompletionStream>();

    // 单例模式拿到HystrixCommandCompletionStream,以命令的key为索引存储在ConcurrentMap中
    public static HystrixCommandCompletionStream getInstance(HystrixCommandKey commandKey) {
        HystrixCommandCompletionStream initialStream = streams.get(commandKey.name());
        if (initialStream != null) {
            return initialStream;
        } else {
            synchronized (HystrixCommandCompletionStream.class) {
                ...
            }
        }
    }

下面是它的构造函数:

HystrixCommandCompletionStream(final HystrixCommandKey commandKey) {
        this.commandKey = commandKey;
        // 创建可以发射数据的Subject
        this.writeOnlySubject = new SerializedSubject<HystrixCommandCompletion, HystrixCommandCompletion>(PublishSubject.<HystrixCommandCompletion>create());
        // readOnlyStream是一个Observable, share()方法可以将上面Subject发射的数据全部广播给readOnlyStream,相当于拷贝了一份一模一样的数据
        this.readOnlyStream = writeOnlySubject.share();
    }

这个类提供了很重要的两个方法:

// 提供了接收数据的方法,其他工具(如HystrixThreadEventStream)可以将数据写进来
    public void write(HystrixCommandCompletion event) {
        writeOnlySubject.onNext(event);
    }

    // 实现HystrixEventStream的observe(方法), 其他消费者可以利用observe()拿到这个数据源,然后订阅它,处理它发射的所有数据
    @Override
    public Observable<HystrixCommandCompletion> observe() {
        return readOnlyStream;
    }

小结

通过上面三步,数据流向就很清楚了:

  • Command直接使用HystrixCommandMetrics来记录命令开始、结束等事件
  • HystrixCommandMetrics利用线程级别的HystrixThreadEventStream的来接收数据
  • HystrixThreadEventStream完成各种事件的封装(如将结束事件封装成HystrixCommandCompletion),再利用command级别的HystrixEventStream来接收数据(HystrixEventStream有不同的子类来处理不同的事件)
  • 最终消费者通过HystrixEventStream的observe()方法,拿到这个数据源,然后订阅它,从而源源不断的拿到Command发射出的各种数据

谁在最终消费数据?

通过上述步骤,将Hystrix Command执行过程的各种信息转化成了特定数据结构的事件,然后提供了一个Observable作为数据源。如果需要使用这些数据,各观察者只需要订阅Observable就可以拿到这些已经分门别类且结构化的数据了。

例如:断路器就是利用这些信息,然后统计分析数据,最终提供断路器的功能。

本文不深入断路器,仅关注各项事件的收集过程中的数据流向。下一遍文章将分享断路器是如何利用这些基础数据,如何使用滑动窗口的原理来处理数据,感兴趣可以关注奥。

附录

HystrixEvent

HystrixEvent是一个事件标记接口,其子类都是些特定数据结构的数据对象。像HystrixThreadEventStream会封装这个事件。

20191123100239\_2.png

HystrixEventStream

HystrixEventStream各子类提供了write()方法供其他对象写入HystrixEvent,然后再提供observe()方法,供其他消费者来消费这些数据。

20191123100239\_3.png

欢迎关注陈同学的公众号,一起学习,一起成长

20191123100239\_4.png


来源:http://ddrv.cn/a/88268

点赞(0)
版权归原创作者所有,任何形式转载请联系作者; Java 技术驿站 >> Spring Cloud 源码学习之 Hystrix Metrics 收集

相关推荐