本文概览:介绍了Sentinel Client和Sentinel Dashboard通信流程,以及 Sentinel执行一次指标收集和规则校验流程(通过责任链slotChain来完成)。
1 RPC
1.1 客户端
1.1.1 Sentinel Client的RPC Server
SimplehttpCommandCenter启动了一个客户端服务
1、初始化
获取实现CommandlHandler和使用@CommandMapping的所有对象。
1 2 3 4 5 6 7 8 |
##### SimplehttpCommandCenter #### public void beforeStart() throws Exception { // Register handlers Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers(); registerCommands(handlers); } |
2、启动Server socket,使用BIO模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
class ServerThread extends Thread { private ServerSocket serverSocket; ServerThread(ServerSocket s) { this.serverSocket = s; setName("sentinel-courier-server-accept-thread"); } @Override public void run() { while (true) { Socket socket = null; try { socket = this.serverSocket.accept(); setSocketSoTimeout(socket); HttpEventTask eventTask = new HttpEventTask(socket); bizExecutor.submit(eventTask); } catch (Exception e) { .... } } } } |
查看HttpEventTask的执行逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
##### HttpEventTask #### @Override public void run() { if (socket == null) { return; } PrintWriter printWriter = null; InputStream inputStream = null; try { long start = System.currentTimeMillis(); inputStream = new BufferedInputStream(socket.getInputStream()); OutputStream outputStream = socket.getOutputStream(); printWriter = new PrintWriter( new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset()))); // 1.反序列化字节流为为CommnadRequest. String firstLine = readLine(inputStream); CommandRequest request = processQueryString(firstLine); // 2.如果是post获取post请求 if (firstLine.length() > 4 && StringUtil.equalsIgnoreCase("POST", firstLine.substring(0, 4))) { // Deal with post method processPostRequest(inputStream, request); } // 3.获取请求中commandName,获取真正CommandHandler。 String commandName = HttpCommandUtils.getTarget(request); CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName); CommandResponse<?> response = commandHandler.handle(request); // 4.将返回结果进行反序列化,拼装成http报文字节流 handleResponse(response, printWriter); } } |
1.1.2 Sentinel Dashboard 的RPC Client
定义了一个SimpleHttpClient。下面代码每次都是新建一个socket,由于只是给客户端发送心跳使用,所以没有性能要求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
##### SimpleHttpClinet ##### private SimpleHttpResponse request(InetSocketAddress socketAddress, RequestMethod type, String requestPath, Map<String, String> paramsMap, Charset charset, int soTimeout) throws IOException { Socket socket = null; BufferedWriter writer; try { socket = new Socket(); socket.setSoTimeout(soTimeout); socket.connect(socketAddress, soTimeout); // 1.发送请求 writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), charset)); requestPath = getRequestPath(type, requestPath, paramsMap, charset); writer.write(getStatusLine(type, requestPath) + "\r\n"); if (charset != null) { writer.write("Content-Type: application/x-www-form-urlencoded; charset=" + charset.name() + "\r\n"); } else { writer.write("Content-Type: application/x-www-form-urlencoded\r\n"); } writer.write("Host: " + socketAddress.getHostName() + "\r\n"); if (type == RequestMethod.GET) { writer.write("Content-Length: 0\r\n"); writer.write("\r\n"); } else { // POST method. String params = encodeRequestParams(paramsMap, charset); writer.write("Content-Length: " + params.getBytes(charset).length + "\r\n"); writer.write("\r\n"); writer.write(params); } writer.flush(); // 2.接受返回的response。 SimpleHttpResponse response = new SimpleHttpResponseParser().parse(socket.getInputStream()); socket.close(); socket = null; return response; } finally { if (socket != null) { try { socket.close(); } catch (Exception ex) { RecordLog.warn("Error when closing {} request to {} in SimpleHttpClient", type, socketAddress, ex); } } } } |
1.2 Sentinel Dashboard
1.2.1 Sentinel Dashboard 的RPC Server
直接使用dashboard服务作为一个http server。
1.2.2 Sentinel Client 的RPC Client
将sentinel 客户端相关操作封装在了SentinelAPiClient,SentinelAPiClient通过一个apache的HttpClient来请求Sentinel Client的RP Server。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
#### SentinelApiClient ##### // 1.定义httpclient private CloseableHttpAsyncClient httpClient; // 2.初始化httpClient public SentinelApiClient() { IOReactorConfig ioConfig = IOReactorConfig.custom().setConnectTimeout(3000).setSoTimeout(10000) .setIoThreadCount(Runtime.getRuntime().availableProcessors() * 2).build(); httpClient = HttpAsyncClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() { @Override protected boolean isRedirectable(final String method) { return false; } }).setMaxConnTotal(4000).setMaxConnPerRoute(1000).setDefaultIOReactorConfig(ioConfig).build(); httpClient.start(); } |
2 Sentinel Client 和Sentinel Dashboard通信
2.1 服务注册中心
客户端给服务端发送心跳,频率是5s
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
###### HeartbeatSenderInitFunc ##### private void scheduleHeartbeatTask(/*@NonNull*/ final HeartbeatSender sender, /*@Valid*/ long interval) { pool.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { sender.sendHeartbeat(); } catch (Throwable e) { RecordLog.warn("[HeartbeatSender] Send heartbeat error", e); } } }, 5000, interval, TimeUnit.MILLISECONDS); RecordLog.info("[HeartbeatSenderInit] HeartbeatSender started: " + sender.getClass().getCanonicalName()); } |
客户端直接通过httpClient调用Sentinel Dashboard的/registry/machine,来发送心跳信息。
1 2 3 4 5 6 7 8 9 10 |
########## SimpleHttpHeartbeatSender.java ########### public boolean sendHeartbeat() throws Exception { InetSocketAddress addr = new InetSocketAddress(addrInfo.r1, addrInfo.r2); SimpleHttpRequest request = new SimpleHttpRequest(addr, TransportConfig.getHeartbeatApiPath()); request.setParams(heartBeat.generateCurrentMessage()); SimpleHttpResponse response = httpClient.post(request); } |
2.2 SentinelDashboard获取实例指标数据
STEP1 sentinel dashboard通过http client访问客户端实例的接口 http://xxxx:xx/metric?startTime=1596176146000&endTime=1596176152000&refetch=false
STEP2 sentinel客户端提供了一个RPC服务,反序列化字节流为http request。执行如下:
1 2 3 4 |
@CommandMapping(name = "metric", desc = "get and aggregate metrics, accept param: " + "startTime={startTime}&endTime={endTime}&maxLines={maxLines}&identify={resourceName}") public class SendMetricCommandHandler implements CommandHandler<String> { } |
通过读取指标文件获取指标信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
###### MetricsReader #### List<MetricNode> readMetricsByEndTime(List<String> fileNames, int pos, long offset, long beginTimeMs, long endTimeMs, String identity) throws Exception { List<MetricNode> list = new ArrayList<MetricNode>(1024); if (readMetricsInOneFileByEndTime(list, fileNames.get(pos++), offset, beginTimeMs, endTimeMs, identity)) { while (pos < fileNames.size() && readMetricsInOneFileByEndTime(list, fileNames.get(pos++), 0, beginTimeMs, endTimeMs, identity)) { } } return list; } |
2.3 Sentinel Dashboard获取簇节点链路
1、 dashboard提供了/resource/machineResource.json?ip=xxx&port=xxx
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
data:[ { "parentTtId": "4b50a67b-ce9e-47fe-a0ed-68894f404567000", "ttId": "361cac64-d450-47e8-9ddf-9c6c739781e09999", "resource": "sentinel_web_servlet_context", "threadNum": 0, "passQps": 0, "blockQps": 0, "totalQps": 0, "averageRt": 0, "passRequestQps": null, "exceptionQps": 0, "oneMinutePass": 0, "oneMinuteBlock": 0, "oneMinuteException": 0, "oneMinuteTotal": 0, "visible": true },{ "parentTtId": "361cac64-d450-47e8-9ddf-9c6c739781e09999", "ttId": "44fb0f6d-ff6a-4e1b-8a2b-8642dc135ca9999", "resource": "/webjars/springfox-swagger-ui/css/reset.css", "threadNum": 0, "passQps": 0, "blockQps": 0, "totalQps": 0, "averageRt": 0, "passRequestQps": null, "exceptionQps": 0, "oneMinutePass": 0, "oneMinuteBlock": 0, "oneMinuteException": 0, "oneMinuteTotal": 0, "visible": true }] |
2、客户端对应的CommandHander如下,在返回数据中是一个List<NodeVo>,每一个NodeVo都有一个pareentId,类似于Trace实现中都会有一个parentSpanId,根据每个节点的spanid和parentSpanId就可以绘出trace调用链。
1 2 |
@CommandMapping(name = "jsonTree", desc = "get tree node VO start from root node") public class FetchJsonTreeCommandHandler implements CommandHandler<String> { |
3 一个指标收集实现的流程 (基于slot 责任链)
三种入口:
- CommonFilter
- API方式,如Spu.entry
- @SentinelResource
这里以CommonFilter为例,如下步骤:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
#### CommonFilter #### public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain){ try{ // 前置 urlEntry = SphU.entry(target, ResourceTypeConstants.COMMON_WEB, EntryType.IN); // 处理请求 chain.doFilter(request, response); }finanly {} // 后置使用slotchain的exit,只有staticSnot复写了这个方法,用与计算指标的耗时 clusterUrlEntry.exit(); } } |
SphU.entry的流程如下
1 2 3 4 5 6 7 8 9 10 |
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { // 1、获取责任链 ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); Entry e = new CtEntry(resourceWrapper, chain, context); // 2、通过责任链进行:记录指标和规则校验 chain.entry(context, resourceWrapper, null, count, prioritized, args); 。。。。。 } |
每个资源对应一个SlotChain。假设服务中有200接口,那么这个chainMap大小就是200。
1 2 3 4 5 |
##### CtSph ##### private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(); |
如果没有找到资源对应的SlotChain,通过SlotChainProvider#newSlotChain创建。最终通过DefaultSlotChainBuilder创建。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
public class DefaultSlotChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); chain.addLast(new NodeSelectorSlot()); chain.addLast(new ClusterBuilderSlot()); // 记录指标 chain.addLast(new LogSlot()); // 统计指标 chain.addLast(new StatisticSlot()); // 规则校验 chain.addLast(new AuthoritySlot()); chain.addLast(new SystemSlot()); chain.addLast(new FlowSlot()); chain.addLast(new DegradeSlot()); return chain; } } |
责任链的各个Slot流程如下:
Sentinel 的核心骨架,将不同的 Slot 按照顺序串在一起(责任链模式),从而将不同的功能(限流、降级、系统保护)组合在一起。slot chain 其实可以分为两部分:统计数据构建部分(statistic,指标收集)和 规则校验(rule checking)。参考:https://github.com/alibaba/Sentinel/wiki/Sentinel-%E6%A0%B8%E5%BF%83%E7%B1%BB%E8%A7%A3%E6%9E%90
3.1 NodeSelectorSlot
1、ContextUtil中管理所有的DefautlNode资源。
1 |
private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>(); |
2、每个请求过来都会生成一个Context,维护一个DefaultNode链,并把DefaultNode和链路关系维护到contextNameNodeMap中
3.2 ClusterBuilderSlot
生成ClusterNodde,放置在DefaultNode内部。
- DefaultNode主要记录的调用链,即簇点的链路。
- ClusterNode是资源的指标信息
3.3 LogSlot
没有逻辑
3.4 StatisticSlot-基于滑动窗口的指标收集StatisticNode
StatisticSlot 用于统计资源的指标信息。StaticNode是一个资源的上下文对象,在StatisticSlot中以时间窗口方式统计这个资源的请求信息,然后传递给下FlowSlot、DegradeSlot等规则slot。
1 2 3 4 5 6 7 8 9 10 11 12 |
public class StatisticSlot { @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { try { // Do some checking. fireEntry(context, resourceWrapper, node, count, prioritized, args); // 统计指标 node.increaseThreadNum(); node.addPassRequest(count); } |
1、滑动窗口和固定窗口比较
滑动窗口举例:
0~0.5s : 50
0.5~1s : 60
1~1.5s : 50
假设限流为100,如下
- 采用固定窗口窗口,一个窗口为1s的话,那么在1~1.5的值为50,那么此时不会命中限流了
- 采用滑动窗口,化分单位时间为多个窗口,然后多个窗口累加。可以解决1~1.5没有命中限流的问题,比如1s划分为两个窗口,滑动窗口在0.5~1.5为50+50=110,此时可以在1~1.5命中限流。
2、StaticNode采用高性能的滑动窗口数据结构 LeapArray 来统计实时的秒级指标数据。StatisticNode中定义了两个ArrayMetric来记录指标
1 2 3 4 5 |
public class StatisticNode implements Node { private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL); private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false); |
3、WindowWrap和MetricBucket,表示窗口的指标数据。WindowWrap<MetricBuket>最小的Sentinel内部统计指标组件了,指标数据都在这里保存
- MetricBucket是一个窗口内指标数据,包括sucess、fail、pass、block四个指标信息。
- WindowWrap<MetricBucket>相比MetricBucket多了 窗口时间长度和起始时间戳 两个时间窗口相关的信息。
4、LeapArray,管理窗口,保存所有的窗口数据
1 2 3 4 5 |
// 保存所有的窗口数据。 protected final AtomicReferenceArray<WindowWrap<T>> array; // 窗口数组的线程安全 private final ReentrantLock updateLock = new ReentrantLock(); |
LeapArray有两个具体实现类:
- BucketLeapArray,分钟级别指标,每一秒是一个窗口,在指标文件中记录指标数据时,使用改数据。
- OccupialbleBucketLeapArray,秒级指标。限流等slot使用该指标,如SlowFlot。
5、ArrayMetric标识一个指标数据,即influxdb的一行数据。
在ArrayMetric中定义了LeapArray成员,相比LeapArray提供了Metric的一些接口,如记录指标。
1 2 3 |
public class ArrayMetric implements Metric { private final LeapArray<MetricBucket> data; } |
一个单位时间由多个窗口组成(如StaticsticsNode中rollingCounterInSecond就是一秒内划分了两个窗口),它对外提供查询指标和记录指标接口:
- 记录接口,通过窗口WinowWarp来完成,
- 查询接口,通过汇总一个单位时间内所有窗口WinowWrap<MetricBucket>的指标得到结果。
(1)记录一个指标数据
1 2 3 4 5 6 7 |
@Override public void addSuccess(int count) { // 获取当前窗口 WindowWrap<MetricBucket> wrap = data.currentWindow(); // 记录sucess指标数据 wrap.value().addSuccess(count); } |
(2)获取success的数据, 累计每个时间窗口的sucesss的数据
1 2 3 4 5 6 7 8 9 10 11 |
@Override public long success() { data.currentWindow(); long success = 0; List<MetricBucket> list = data.values(); for (MetricBucket window : list) { success += window.success(); } return success; } |
3.5 FlowSlot限流实现
3.5.1 限流规则校验
1、流程
FlowSlot#checkFlow –> FlowRuleChecker#checkFlow –> FlowRule#TrafficShapingContoller#canpass
1 2 3 4 5 6 7 8 |
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node); if (selectedNode == null) { return true; } return rule.getRater().canPass(selectedNode, acquireCount, prioritized); } |
3.5.2 指标写入到文件
1、FlowRuleManager开启一个记录线程
1 2 3 4 5 |
public class FlowRuleManager { // 在FlowRuleManger加入一个线程 static { SCHEDULER.scheduleAtFixedRate(new MetricTimerListener(), 0, 1, TimeUnit.SECONDS); } |
2、MetricTimerListener通过MetricWriter来写入指标
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
public class MetricTimerListener implements Runnable { @Override public void run(){ for (Entry<Long, List<MetricNode>> entry : maps.entrySet()) { try { // 通过MetricWriter写入指标。 metricWriter.write(entry.getKey(), entry.getValue()); } catch (Exception e) { RecordLog.warn("[MetricTimerListener] Write metric error", e); } } } } |
3、这些数据来自哪里?
来自StatisticsNode的rollingCounterInMinute中数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
#### StatisticNode.java ##### @Override public Map<Long, MetricNode> metrics() { // The fetch operation is thread-safe under a single-thread scheduler pool. long currentTime = TimeUtil.currentTimeMillis(); currentTime = currentTime - currentTime % 1000; Map<Long, MetricNode> metrics = new ConcurrentHashMap<>(); List<MetricNode> nodesOfEverySecond = rollingCounterInMinute.details(); long newLastFetchTime = lastFetchTime; // Iterate metrics of all resources, filter valid metrics (not-empty and up-to-date). for (MetricNode node : nodesOfEverySecond) { if (isNodeInTime(node, currentTime) && isValidMetricNode(node)) { metrics.put(node.getTimestamp(), node); newLastFetchTime = Math.max(newLastFetchTime, node.getTimestamp()); } } lastFetchTime = newLastFetchTime; return metrics; } |
4、文件内容
1 2 3 4 5 |
$ tail -f xxxxxx-metrics.log.2020-07-31 1596179026000|2020-07-31 15:03:46|/registry/machine|11|0|11|0|5|0|0|1 1596179026000|2020-07-31 15:03:46|__total_inbound_traffic__|11|0|11|0|5|0|0|0 1596179027000|2020-07-31 15:03:47|/registry/machine|10|0|10|0|4|0|0|1 1596179027000|2020-07-31 15:03:47|__total_inbound_traffic__|10|0|10|0|4|0|0| |
3.6 ParamFlowSlot参数限流
1、指标存储
1 2 3 4 5 |
public final class ParameterMetricStorage { private static final Map<String, ParameterMetric> metricsMap = new ConcurrentHashMap<>(); } |
2、规则校验
和FlowSlot比较,其实就是每个参数维护一个限流阈值,使用对应参数的该资源 的流量qps和这个参数配置的阈值进行比较。
3.7 DegradeSlot熔断降级功能实现
1、熔断降级是针对客户度服务的,CommonFilter是针对服务自身接口的。目前有两种方式使用降级,如下
- 方式1 @SentinelResource,类似事务,需要自己根据业务常用抛出异常。
- 方式2 API方式。如Spu.entry
2、熔断策略
整个降级策略是比较简单的,即熔断之后,间隔一个timeWindow之后,又全部打开,没有半开概念。