file 还记得这个我们zipkin系列的第一篇文章中提到过的架构图么,服务端组件就这么几个,很简单。

其中稍微有点内涵的也就是collector和storage分别提供了多种不同的实现,collector支持http、rabbitMq、kafka而storage支持内存、mysql、es

现在我们来了解一下服务端的实现。服务端开启自动装配使用的是@EnableZipkinServer注解

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(InternalZipkinConfiguration.class)
public @interface EnableZipkinServer {

}
1
2
3
4
5
6
7

接着深入InternalZipkinConfiguration这个类

@Configuration
@Import({
  ZipkinServerConfiguration.class,
  TracingConfiguration.class,
  ZipkinQueryApiV2.class,
  ZipkinHttpCollector.class,
  MetricsHealthController.class
})
public class InternalZipkinConfiguration {
}
1
2
3
4
5
6
7
8
9
10

可以看到这个类一共又引入了这么多的装配类,一个一个来分析吧

ZipkinServerConfiguration

这个类中,主要是配置了Zipkin的健康检查、web容器以及、默认的存储的方式的配置

其中健康检查的bean包括以下几个:

  1. ZipkinHealthIndicator 关于SpringBoot中健康检查的原理可以参考这篇文章:SpringBoot健康检查实现原理
  2. 健康检查的端口在这个类里MetricsHealthController,其中暴露了两个端口health和metrics

Web容器主要是由这个类负责的:UndertowServletWebServerFactory

InMemoryConfiguration 这个类则是存储的自动装配类,如果没有选择使用MySQL或者ES的话则会使用内存进行存储

TracingConfiguration

Zipkin服务端其实也会存储trace信息,这个时候就需要一些Client的配置。这个类里面就是这些配置 这里就不详细展开了

ZipkinQueryApiV2

Zipkin V2版本的查询API,这个API是供Zipkin的UI界面使用的,其中包含这几个接口

  1. dependencies
  2. services
  3. spans
  4. traces
  5. /trace/{traceIdHex}
ZipkinHttpCollector

Zipkin默认的Collector使用http协议里收集Trace信息,这里就是本文的重点

核心接受请求的方法是handleRequest

  public void handleRequest(HttpServerExchange exchange) throws Exception {
    boolean v2 = exchange.getRelativePath().equals("/api/v2/spans");
    boolean v1 = !v2 && exchange.getRelativePath().equals("/api/v1/spans");
    if (!v2 && !v1) {
      next.handleRequest(exchange);
      return;
    }

    if (!POST.equals(exchange.getRequestMethod())) {
      next.handleRequest(exchange);
      return;
    }

    String contentTypeValue = exchange.getRequestHeaders().getFirst(CONTENT_TYPE);
    boolean json = contentTypeValue == null || contentTypeValue.startsWith("application/json");
    boolean thrift = !json && contentTypeValue.startsWith("application/x-thrift");
    boolean proto = v2 && !json && contentTypeValue.startsWith("application/x-protobuf");
    if (!json && !thrift && !proto) {
      exchange
          .setStatusCode(400)
          .getResponseSender()
          .send("unsupported content type " + contentTypeValue + "\n");
      return;
    }

    HttpCollector collector = v2 ? (json ? JSON_V2 : PROTO3) : thrift ? THRIFT : JSON_V1;
    metrics.incrementMessages();
    exchange.getRequestReceiver().receiveFullBytes(collector, errorCallback);
  }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

这里一共有这么几个逻辑:

  1. 首先判断接口版本,现在是v2版本
  2. 然后判断数据格式,现在是json版本
  3. 这里的collector的格式就是JSON_V2
  4. 添加监控数据
  5. 数据处理

默认情况下使用的是异步方式处理数据的,这个处理的实现类是这个AsyncReceiverImplreceiveFullBytes方法中首先会对这次请求的参数进行处理,处理完成之后的json格式如下:

[{
	"traceId": "e840c83e65d10358",
	"id": "e840c83e65d10358",
	"kind": "SERVER",
	"name": "get /user/getuser/{id}",
	"timestamp": 1574174031876439,
	"duration": 5911123,
	"localEndpoint": {
		"serviceName": "consumer-demo-feign",
		"ipv4": "192.168.0.15"
	},
	"remoteEndpoint": {
		"ipv6": "::1",
		"port": 60716
	},
	"tags": {
		"http.method": "GET",
		"http.path": "/user/getUser/3",
		"mvc.controller.class": "UserController",
		"mvc.controller.method": "getUser"
	}
}]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

当json数据格式化之后,接下来就是具体的处理

    public synchronized Call<Void> accept(List<Span> spans) {
        int delta = spans.size();
        int spansToRecover = this.spansByTraceIdTimeStamp.size() + delta - this.maxSpanCount;
        this.evictToRecoverSpans(spansToRecover);
        Iterator var4 = spans.iterator();

        while(var4.hasNext()) {
            Span span = (Span)var4.next();
            long timestamp = span.timestampAsLong();
            String lowTraceId = lowTraceId(span.traceId());
            InMemoryStorage.TraceIdTimestamp traceIdTimeStamp = new InMemoryStorage.TraceIdTimestamp(lowTraceId, timestamp);
            this.spansByTraceIdTimeStamp.put(traceIdTimeStamp, span);
            this.traceIdToTraceIdTimeStamps.put(lowTraceId, traceIdTimeStamp);
            ++this.acceptedSpanCount;
            if (this.searchEnabled) {
                String spanName = span.name();
                if (span.localServiceName() != null) {
                    this.serviceToTraceIds.put(span.localServiceName(), lowTraceId);
                    if (spanName != null) {
                        this.serviceToSpanNames.put(span.localServiceName(), spanName);
                    }
                }

                if (span.remoteServiceName() != null) {
                    this.serviceToTraceIds.put(span.remoteServiceName(), lowTraceId);
                    if (spanName != null) {
                        this.serviceToSpanNames.put(span.remoteServiceName(), spanName);
                    }
                }
            }
        }

        return Call.create((Object)null);
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

实际上,最终的span数据都是在这么几个对象中存在的

    private final InMemoryStorage.SortedMultimap<InMemoryStorage.TraceIdTimestamp, Span> spansByTraceIdTimeStamp;
    private final InMemoryStorage.SortedMultimap<String, InMemoryStorage.TraceIdTimestamp> traceIdToTraceIdTimeStamps;
    private final InMemoryStorage.ServiceNameToTraceIds serviceToTraceIds;
    private final InMemoryStorage.SortedMultimap<String, String> serviceToSpanNames;
1
2
3
4

1

目录