spring boot 源码解析50-Exporter详解

 2019-10-17 22:06  阅读(1405)
文章分类:Spring boot

前言

本文我们来分析一下Exporter的相关实现,类图如下:

20191017100440\_1.png

解析

Exporter

Exporter–>metric 暴露的通用接口.当你横向扩展度量的范围时,你可能会需要在本地缓冲测量数据然后周期性的暴露出去(比如:聚合集群的数据),因此,这是这些操作的标记结构.触发export操作的可能是周期性或者是事件驱动–>这些都处于Exporter的职责范围之外.比如,你可以创建Exporter的实例,并且使用@Scheduled注解.

其只声明了1个方法,如下:

void export();

AbstractMetricExporter

AbstractMetricExporter–> Exporter的抽象基类,拥有很多共有的特征,主要是导出测量数据被加上前缀和时间戳过滤(因此只有新值被暴露出去)

模板方法模式

  1. 该类实现了Exporter, Closeable, Flushable接口.

  2. 字段,构造器如下:

    private static final Log logger = LogFactory.getLog(AbstractMetricExporter.class);
    
        private final String prefix;
    
        // 数据导出的最早时间
        private Date earliestTimestamp = new Date();
    
        // 是否忽略时间戳(这将导致导出所有的测量数据),默认为false
        private boolean ignoreTimestamps = false;
    
        // 是否只导自上次导出后的新增数据,默认为true
        private boolean sendLatest = true;
    
        // 正在处理中的标记
        private volatile AtomicBoolean processing = new AtomicBoolean(false);
    
        // 上次导出数据的时间戳
        private Date latestTimestamp = new Date(0L);
    
        public AbstractMetricExporter(String prefix) {
            this.prefix = (!StringUtils.hasText(prefix) ? ""
                    : (prefix.endsWith(".") ? prefix : prefix + "."));
        }
    
  3. 方法实现如下:

    1. export,方法如下:

      public void export() {
              // 1. 将processing置为true
              if (this.processing.compareAndSet(false, true)) {
                  // 2. 获得当前时间
                  long latestTimestamp = System.currentTimeMillis();
                  try {
                      // 3. 导出数据
                      exportGroups();
                  }
                  catch (Exception ex) {
                      logger.warn("Could not write to MetricWriter: " + ex.getClass() + ": "
                              + ex.getMessage());
                  }
                  finally {
                      // 4. 调用flushQuietly方法,刷新数据,并将processing置为false
                      this.latestTimestamp = new Date(latestTimestamp);
                      flushQuietly();
                      this.processing.set(false);
                  }
              }
          }
      
      1. 将processing置为true

      2. 获得当前时间

      3. 导出数据,代码如下:

        private void exportGroups() {
                // 1. 遍历分组:
                for (String group : groups()) {
                    Collection> values = new ArrayList>();
                    // 2. 获得属于该分组的metrics,依次处理
                    for (Metric metric : next(group)) {
                        Date timestamp = metric.getTimestamp();
                        // 2.1. 如果可以导出的化,则加入到values中
                        if (canExportTimestamp(timestamp)) {
                            values.add(getPrefixedMetric(metric));
                        }
                    }
                    // 3. 如果values非空,则写出
                    if (!values.isEmpty()) {
                        write(group, values);
                    }
                }
            }
        
        1. 遍历分组–>将metrics 进行分组(比如加上前缀).如果要导出的数据要进行分组通过String 作为标识符,子类应该覆写此方法.否则默认会迭代所有的metrics.代码如下:

          protected Iterable groups() {
                  return Collections.singleton("");
              }
          
        2. 获得属于该分组的metrics,依次处理,next是1个抽象方法,由子类实现

        3. 如果可以导出的化,则加入到values中,判断是否可以导出的代码如下:

          private boolean canExportTimestamp(Date timestamp) {
                  if (this.ignoreTimestamps) {
                      return true;
                  }
                  if (this.earliestTimestamp.after(timestamp)) {
                      return false;
                  }
                  if (this.sendLatest && this.latestTimestamp.after(timestamp)) {
                      return false;
                  }
                  return true;
              }
          

          getPrefixedMetric –> 生成Metric,其name是加上前缀的.代码如下:

          private Metric getPrefixedMetric(Metric metric) {
                  String name = this.prefix + metric.getName();
                  return new Metric(name, metric.getValue(), metric.getTimestamp());
              }
          
        4. 如果values非空,则写出,write是1个抽象方法,由子类实现

      4. 调用flushQuietly方法,刷新数据,并将processing置为false.flushQuietly方法如下:

        private void flushQuietly() {
                try {
                    flush();
                }
                catch (Exception ex) {
                    logger.warn("Could not flush MetricWriter: " + ex.getClass() + ": "
                            + ex.getMessage());
                }
            }
        

        调用flush方法–>通过向底层数据流写入任何被缓冲的数据.默认空实现,只有MetricCopyExporter进行了复写. 代码如下:

        public void flush() {
            }
        
    2. close–>Closeable中声明的方法.实现如下:

      public void close() throws IOException {
              export();
              flushQuietly();
          }
      
    3. flush –> Flushable接口中的方法,之前有叙述.

MetricCopyExporter

MetricCopyExporter–> 继承自AbstractMetricExporter.通过从MetricReader copy数据到MetricWriter.**实际上output writer 可以是GaugeWriter,在这种情况下,所有的metrics 都是简单的输出他们的当前值.如果output writer 是CounterWriter,则当metrics 的名字是以counter.开头的话,则:不会将他们写出而是会增长它的计数器的值.**这涉及到exporter 存储之前的计数器的值以计算delta.为了更好的结果,不要在多线程环境下使用exporter(通常只能用周期性的顺序,即使只有1个后台程序,也是这样)

该类没有自动装配

  1. 字段,构造器如下:

    private static final Log logger = LogFactory.getLog(MetricCopyExporter.class);
    
        private final MetricReader reader;
    
        private final GaugeWriter writer;
    
        private final CounterWriter counter;
    
        // key--> counter 名字,value--> counter 值 ,用来缓存之前Counter的值
        private ConcurrentMap counts = new ConcurrentHashMap();
    
        // 过滤metrics使用(include)
        private String[] includes = new String[0];
    
        // 过滤metrics 使用(exclude)
        private String[] excludes = new String[0];
    
        public MetricCopyExporter(MetricReader reader, GaugeWriter writer) {
            this(reader, writer, "");
        }
    
        public MetricCopyExporter(MetricReader reader, GaugeWriter writer, String prefix) {
            super(prefix);
            this.reader = reader;
            this.writer = writer;
            if (writer instanceof CounterWriter) {
                this.counter = (CounterWriter) writer;
            }
            else {
                this.counter = null;
            }
        }
    
  2. 方法实现如下:

    1. next,代码如下:

      protected Iterable> next(String group) {
              // 1. 如果includes,excludes 等于空,则直接查询所有的
              if (ObjectUtils.isEmpty(this.includes) && ObjectUtils.isEmpty(this.excludes)) {
                  return this.reader.findAll();
              }
              // 2. 否则返回PatternMatchingIterable
              return new PatternMatchingIterable(MetricCopyExporter.this.reader);
          }
      
      1. 如果includes,excludes 等于空,则直接查询所有的

      2. 否则返回PatternMatchingIterable.PatternMatchingIterable实现了Iterable接口,代码如下:

        private class PatternMatchingIterable implements Iterable> {
        
                private final MetricReader reader;
        
                PatternMatchingIterable(MetricReader reader) {
                    this.reader = reader;
                }
        
                @Override
                public Iterator> iterator() {
                    return new PatternMatchingIterator(this.reader.findAll().iterator());
                }
        
            }
        

        其iterator最终返回的是PatternMatchingIterator,其实现了Iterator接口.

        1. 字段,构造器如下:

          private Metric buffer = null;
              private Iterator> iterator;
              PatternMatchingIterator(Iterator> iterator) {
                  this.iterator = iterator;
              }
          
        2. 方法实现如下:

          1. hasNext,代码如下:

            public boolean hasNext() {
                    if (this.buffer != null) {
                        return true;
                    }
                    this.buffer = findNext();
                    return this.buffer != null;
                }
            
            1. 如果buffer存在,则返回true

            2. 否则,调用findNext 查找符合要求的Metric,如果找到的话,则返回true,否则,返回false.findNext代码如下:

              private Metric findNext() {
                          while (this.iterator.hasNext()) {
                              Metric metric = this.iterator.next();
                              if (isMatch(metric)) {
                                  return metric;
                              }
                          }
                          return null;
                      }
              

              遍历MetricReader中的Metric,如果其符合要求,则返回Metric,如果最终都没有找到的话,则返回null.isMatch方法实现如下:

              private boolean isMatch(Metric metric) {
                      String[] includes = MetricCopyExporter.this.includes;
                      String[] excludes = MetricCopyExporter.this.excludes;
                      // 1. 获得metric的名字
                      String name = metric.getName();
                      // 2. 如果includes等于null或者name 匹配 includes
                      if (ObjectUtils.isEmpty(includes)
                              || PatternMatchUtils.simpleMatch(includes, name)) {
                          // 3. 如果excludes 匹配 name,则返回false,否则,返回true
                          return !PatternMatchUtils.simpleMatch(excludes, name);
                      }
                      // 4. 否则,返回false
                      return false;
                  }
              
              1. 获得metric的名字

              2. 如果includes等于null或者name 匹配 includes

                1. 如果excludes 匹配 name,则返回false,否则,返回true
              3. 否则,返回false

          2. next 实现如下:

            public Metric next() {
                    Metric metric = this.buffer;
                    this.buffer = null;
                    return metric;
                }
            
    2. write,代码如下:

      protected void write(String group, Collection> values) {
              for (Metric value : values) {
                  if (value.getName().startsWith("counter.") && this.counter != null) {
                      this.counter.increment(calculateDelta(value));
                  }
                  else {
                      this.writer.set(value);
                  }
              }
          }
      
      1. 遍历传入的values

      2. 如果是counter.开头的并且counter存在,则进行increment操作,否则调用GaugeWriter#set. calculateDelta 方法如下:

        private Delta calculateDelta(Metric value) {
                // 1. 获得当前的值
                long delta = value.getValue().longValue();
                // 2. 进行counts的替换操作
                Long old = this.counts.replace(value.getName(), delta);
                // 3. 如果之前存在对应的值,则进行计算获得增幅,否则,加入到counts中
                if (old != null) {
                    delta = delta - old;
                }
                else {
                    this.counts.putIfAbsent(value.getName(), delta);
                }
                // 4. 返回Delta
                return new Delta(value.getName(), delta, value.getTimestamp());
            }
        
        1. 获得当前的值
        2. 进行counts的替换操作
        3. 如果之前存在对应的值,则进行计算获得增幅,否则,加入到counts中
        4. 返回Delta
    3. flush,代码如下:

      public void flush() {
              flush(this.writer);
          }
      

      调用

      private void flush(GaugeWriter writer) { // 1. 如果是CompositeMetricWriter的实例,则遍历其持有的MetricWriter,依次调用该方法进行处理 if (writer instanceof CompositeMetricWriter) { for (MetricWriter child : (CompositeMetricWriter) writer) { flush(child); }
              }
              try { // 2. 如果存在java.io.Flushable if (ClassUtils.isPresent("java.io.Flushable", null)) { if (writer instanceof Flushable) { ((Flushable) writer).flush(); return; }
                  }
                  // 3. 如果writer存在flush方法
                  Method method = ReflectionUtils.findMethod(writer.getClass(), "flush");
                  if (method != null) { ReflectionUtils.invokeMethod(method, writer); } } catch (Exception ex) { logger.warn("Could not flush MetricWriter: " + ex.getClass() + ": " + ex.getMessage()); } }
      
      1. 如果是CompositeMetricWriter的实例,则遍历其持有的MetricWriter,依次调用该方法进行处理
      2. 如果存在java.io.Flushable,并且GaugeWriter实现了Flushable接口,则调用flush方法,然后retrun
      3. 如果writer存在flush方法,则通过反射的方式进行调用

PrefixMetricGroupExporter

PrefixMetricGroupExporter–> 1个对于从PrefixMetricReader获得metrics进行分组的实现.导出所有拥有指定前缀的metrics(或者是所有的metrics,如果前缀不存在的话)

注意,该类没有进行自动装配

  1. 字段,构造器如下:

    private final PrefixMetricReader reader;
    
        private final PrefixMetricWriter writer;
    
        // key--> counter 名字,value--> counter 值,用来缓存之前Counter的值
        private ConcurrentMap counts = new ConcurrentHashMap();
    
        private Set groups = new HashSet();
    
        public PrefixMetricGroupExporter(PrefixMetricReader reader,
                PrefixMetricWriter writer) {
            this(reader, writer, "");
        }
    
        public PrefixMetricGroupExporter(PrefixMetricReader reader, PrefixMetricWriter writer,
                String prefix) {
            super(prefix);
            this.reader = reader;
            this.writer = writer;
        }
    
  2. 方法实现如下:

    1. groups–> 如果持有的PrefixMetricReader是MultiMetricRepository的实例并且持有的groups为空,则最有返回的是MultiMetricRepository中的groups.否则,返回groups.代码如下:

      protected Iterable groups() {
              if ((this.reader instanceof MultiMetricRepository) && this.groups.isEmpty()) {
                  return ((MultiMetricRepository) this.reader).groups();
              }
              return this.groups;
          }
      
    2. next,代码如下:

      protected Iterable> next(String group) {
              return this.reader.findAll(group);
          }
      
    3. write,代码如下:

      protected void write(String group, Collection> values) {
              // 1. 如果组名是counter. 开头的,则调用PrefixMetricWriter#increment 进行处理
              if (group.contains("counter.")) {
                  for (Metric value : values) {
                      this.writer.increment(group, calculateDelta(value));
                  }
              }
              else {
                  // 2. 否则,调用PrefixMetricWriter#set
                  this.writer.set(group, values);
              }
          }
      
      1. 如果组名是counter. 开头的,则调用PrefixMetricWriter#increment 进行处理,calculateDelta的实现和MetricCopyExporter中类似,如下:

        private Delta calculateDelta(Metric value) {
                long delta = value.getValue().longValue();
                Long old = this.counts.replace(value.getName(), delta);
                if (old != null) {
                    delta = delta - old;
                }
                else {
                    this.counts.putIfAbsent(value.getName(), delta);
                }
                return new Delta(value.getName(), delta, value.getTimestamp());
            }
        
      2. 否则,调用PrefixMetricWriter#set

RichGaugeExporter

RichGaugeExporter–> 继承自AbstractMetricExporter.导出或者转换RichGauge 的数据到以metric为基础的后端.每1个指标都存储在1系列的具有共同前缀的指标中(gauge 的名字),并且后缀表明了数据.比如:1个名字叫foo的gauge,被存储在foo.min, foo.max. foo.val, foo.count, foo.avg, foo.alpha.如果MetricWriter是通过MultiMetricRepository 实现的,则他们的值会被存储到1个组中.并因此可以从这个仓库中通过查询1个语句获得结果

注意,该类没有自动装配

  1. 字段,构造器如下:

    private static final String MIN = ".min";
    
        private static final String MAX = ".max";
    
        private static final String COUNT = ".count";
    
        private static final String VALUE = ".val";
    
        private static final String AVG = ".avg";
    
        private static final String ALPHA = ".alpha";
    
        private final RichGaugeReader reader;
    
        private final PrefixMetricWriter writer;
    
        public RichGaugeExporter(RichGaugeReader reader, PrefixMetricWriter writer) {
            this(reader, writer, "");
        }
    
        public RichGaugeExporter(RichGaugeReader reader, PrefixMetricWriter writer,
                String prefix) {
            super(prefix);
            this.reader = reader;
            this.writer = writer;
        }
    
  2. 方法实现如下:

    1. next,代码如下:

      protected Iterable> next(String group) {
              // 1. 从RichGaugeReader中查找group对应的RichGauge
              RichGauge rich = this.reader.findOne(group);
              Collection> metrics = new ArrayList>();
              // 2. 依次添加RichGauge的指标到结果集中
              metrics.add(new Metric(group + MIN, rich.getMin()));
              metrics.add(new Metric(group + MAX, rich.getMax()));
              metrics.add(new Metric(group + COUNT, rich.getCount()));
              metrics.add(new Metric(group + VALUE, rich.getValue()));
              metrics.add(new Metric(group + AVG, rich.getAverage()));
              metrics.add(new Metric(group + ALPHA, rich.getAlpha()));
              return metrics;
          }
      
      1. 从RichGaugeReader中查找group对应的RichGauge
      2. 依次添加RichGauge的指标到结果集中
    2. groups–>从RichGaugeReader中获得所有的RichGauge,然后依次添加其名字到结果集中.代码如下:

      protected Iterable groups() {
              Collection names = new HashSet();
              for (RichGauge rich : this.reader.findAll()) {
                  names.add(rich.getName());
              }
              return names;
          }
      
    3. write,代码如下:

      protected void write(String group, Collection> values) {
              this.writer.set(group, values);
          }
      

来源:[]()

点赞(0)
版权归原创作者所有,任何形式转载请联系作者; Java 技术驿站 >> spring boot 源码解析50-Exporter详解

相关推荐