码农戏码

新生代农民工的自我修养

0%

没有一身好内功,招式再多都是空;算法绝对是防身必备,面试时更是不可或缺;跟着算法渣一起从零学算法

定义

归并排序(MERGE-SORT)是建立在归并操作上的一种有效的排序算法,该算法是采用分治法(Divide and Conquer)的一个非常典型的应用。将已有序的子序列合并,得到完全有序的序列;即先使每个子序列有序,再使子序列段间有序。若将两个有序表合并成一个有序表,称为二路归并。

算法

归并排序(Merge Sort)就是利用归并思想对数列进行排序。根据具体的实现,归并排序包括”从上往下“和”从下往上“2种方式

从上往下的归并排序:

  1. 分解 – 将当前区间一分为二,即求分裂点 mid = (low + high)/2
  2. 求解 – 递归地对两个子区间a[low…mid] 和 a[mid+1…high]进行归并排序。递归的终结条件是子区间长度为1
  3. 合并 – 将已排序的两个子区间a[low…mid]和 a[mid+1…high]归并为一个有序的区间a[low…high]

从下往上的归并排序:

  1. 将待排序的数列分成若干个长度为1的子数列,然后将这些数列两两合并;
  2. 得到若干个长度为2的有序数列,再将这些数列两两合并;得到若干个长度为4的有序数列,再将它们两两合并;
  3. 直接合并成一个数列为止。这样就得到了我们想要的排序结果。

归并步骤

  • 第一步:申请空间,使其大小为两个已经排序序列之和,该空间用来存放合并后的序列
  • 第二步:设定两个指针,最初位置分别为两个已经排序序列的起始位置
  • 第三步:比较两个指针所指向的元素,选择相对小的元素放入到合并空间,并移动指针到下一位置
  • 重复步骤3直到某一指针超出序列尾,将另一序列剩下的所有元素直接复制到合并序列尾

比如合并,要将[4,5,7,8]和[1,2,3,6]两个已经有序的子序列,合并为最终序列[1,2,3,4,5,6,7,8],来看下实现步骤

演示

image

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* 归并排序
* @param array
* @param left
* @param right
*/
private static void mergeSort(int []array,int left,int right){
if(left < right) {
//分解
int mid = (left + right) / 2;
mergeSort(array,left,mid);
mergeSort(array,mid + 1,right);
//合并
merge(array,left,mid,right);
}
}

static void merge(int []array,int left,int mid,int right) {
System.out.println("merge->left:"+left+" mid:"+mid+" rgiht:"+right);
int i = left;
int j = mid +1;
int tmp[] = new int[right-left+1];//构建tmp数组
int t = 0;//tmp数组索引
//填充tmp数组
for (;i<=mid && j<=right;) {
if(array[i] < array[j]) {
tmp[t++] = array[i++];
} else {
tmp[t++] = array[j++];
}
}
while (i<=mid) {
tmp[t++] = array[i++];
}
while (j<=right) {
tmp[t++] = array[j++];
}

//再把tmp复制到array
t = 0;
while (left<=right) {
array[left++] = tmp[t++];
}
System.out.println(" tmp:"+ Arrays.toString(tmp));
System.out.println("merge:"+ Arrays.toString(array));
}

复杂度

归并排序的时间复杂度是O(N*lgN)

假设被排序的数列中有N个数。遍历一趟的时间复杂度是O(N),需要遍历多少次呢?

归并排序的形式就是一棵二叉树,它需要遍历的次数就是二叉树的深度,而根据完全二叉树的可以得出它的时间复杂度是O(N*lgN)

Best Average Worst Memory Stable
n log(n) n log(n) n log(n) n Yes

VS 快速排序

归并排序(MergeSort)和快速排序(QuickSort)都是用了分治算法思想。

所谓分治算法,顾名思义,就是分而治之,就是将原问题分割成同等结构的子问题,之后将子问题逐一解决后,原问题也就得到了解决。

同时,归并排序(MergeSort)和快速排序(QuickSort)也代表了两类分治算法的思想。

对于归并排序,我们对于待处理序列怎么分的问题上并没有太多关注,仅仅是简单地一刀切,将整个序列分成近乎均匀的两份,然后将子序列进行同样处理。但是,我们更多的关注点在于怎么把分开的部分合起来,也就是merge的过程。

对于快速排序来说,我们则是花了很大的功夫放在了怎么分这个问题上,我们设定了枢轴(标定点),然后通过partition的过程将这个枢轴放在合适的位置,这样我们就不用特别关心合起来的过程,只需要一步一步地递归下去即可。

归并排序是由下向上的,先处理子数组然后再合并。而快速排序正好相反,它的过程是由上向下的,先分出两个子区间,再对子区间进行排序。归并排序是稳定的时间复杂度为 O(n)O(n),但它是非原地算法,在进行子数组合并的时候,我们需要临时申请一个数组来暂时存放排好序的数据。因为这个临时空间是可以重复利用的,因此归并排序的空间复杂度为 O(n),最多需要存放 n 个数据;
而快排则是原地排序算法

没有一身好内功,招式再多都是空;算法绝对是防身必备,面试时更是不可或缺;跟着算法渣一起从零学算法

定义

堆排序(英语:Heapsort)是指利用堆这种数据结构所设计的一种排序算法。堆是一个近似完全二叉树的结构,并同时满足堆积的性质:即子结点的键值或索引总是小于(或者大于)它的父节点。

堆是一棵顺序存储的完全二叉树

其中每个结点的关键字都不大于其孩子结点的关键字,这样的堆称为小根堆

其中每个结点的关键字都不小于其孩子结点的关键字,这样的堆称为大根堆

举例来说,对于n个元素的序列{R0, R1, … , Rn}当且仅当满足下列关系之一时,称之为堆:

(1) Ri <= R2i+1 且 Ri <= R2i+2 (小根堆)

(2) Ri >= R2i+1 且 Ri >= R2i+2 (大根堆)

其中i=1,2,…,n/2向下取整;

同时,我们对堆中的结点按层进行编号,将这种逻辑结构映射到数组中就是下面这个样子

完全二叉树

完全二叉树的定义是建立在满二叉树定义的基础上的,而满二叉树又是建立在二叉树的基础上的。

二叉树

二叉树是树的特殊一种,具有如下特点:

  1. 每个结点最多有两颗子树,结点的度最大为2。
  2. 左子树和右子树是有顺序的,次序不能颠倒。
  3. 即使某结点只有一个子树,也要区分左右子树。

满二叉树

所有的分支结点都存在左子树和右子树,并且所有的叶子结点都在同一层上,这样就是满二叉树。就是完美圆满的意思,关键在于树的平衡

根据满二叉树的定义,得到其特点为:

  1. 叶子只能出现在最下一层。
  2. 非叶子结点度一定是2.
  3. 在同样深度的二叉树中,满二叉树的结点个数最多,叶子树最多。

完全二叉树

对于一个树高为h的二叉树,如果其第0层至第h-1层的节点都满。如果最下面一层节点不满,则所有的节点在左边的连续排列,空位都在右边。这样的二叉树就是一棵完全二叉树

在上图中,树1,按层次编号5结点没有左子树,有右子树,10结点缺失。树2由于3结点没有字数,是的6,7位置空挡了。树3中结点5没有子树。

完全二叉树

上图就是一个完全二叉树

算法

利用堆顶记录的是最大(以大顶堆为例)关键字这一特性,每一轮取堆顶元素放入有序区,就类似选择排序每一轮选择一个最大值放入有序区,可以把堆排序看成是选择排序的改进。

  1. 将初始待排序关键字序列(R0,R1,R2….Rn)构建成大顶堆,此堆为初始的无序区;

  2. 将堆顶元素R[0]与最后一个元素R[n]交换,此时得到新的无序区(R0,R1,R2,……Rn-1)和新的有序区(Rn);

  3. 由于交换后新的堆顶R[0]可能违反堆的性质,因此需要对当前无序区(R0,R1,R2,……Rn-1)调整为新堆

不断重复此2、3步骤直到有序区的元素个数为n-1,则整个排序过程完成

演示

  1. 由一个无序序列建成一个堆
  2. 输出堆顶元素之后,调整剩余元素成为一个新的堆

1.构造堆

构造初始堆,将给定无序序列构造成一个大顶堆(一般升序采用大顶堆,降序采用小顶堆)

数组是一个无序结构

因为(n/2-1)~0的节点才有子节点,n=5,(n/2-1) = 1 即1 0节点才有子节点

所以将1 0节点从下到上,从右到左的与它自己的子节点比较并调整最终形成大顶堆

1. 节点1和它的子节点3 4的元素进行比较,最大者作为父节点

2. 将节点0和它的子节点1 2的元素进行比较,最大者为父节点

3. 交换导致了子根[4,5,6]结构混乱,继续调整,[4,5,6]中6最大,交换4和6

构造完成将一个无需序列构造成了一个大顶堆

2. 调整堆

将堆顶元素与末尾元素进行交换,使末尾元素最大。然后继续调整堆,再将堆顶元素与末尾元素交换,得到第二大元素。如此反复进行交换、重建、交换。

1. 将堆顶元素9和末尾元素4进行交换

2. 重新调整结构,使其继续满足堆定义

3. 再将堆顶元素8与末尾元素5进行交换,得到第二大元素8

4. 继续进行调整,交换,如此反复进行,最终使得整个序列有序

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
 /**
*
* a.将无需序列构建成一个堆,根据升序降序需求选择大顶堆或小顶堆;

 b.将堆顶元素与末尾元素交换,将最大元素"沉"到数组末端;

 c.重新调整结构,使其满足堆定义,然后继续交换堆顶元素与当前末尾元素,反复执行调整+交换步骤,直到整个序列有序。

* 整个排序过程,就是构造堆的过程
*
* 这儿写得利于理解一点,把最后一步改掉,循环从数组中把第一位取出来放到新数组,剩余的再去构造堆
* @param array
*/
static void heapSort(int []array) {
//原始数组
int [] oriental = Arrays.copyOf(array,array.length);
//最终排序好的数组
int [] result = new int[array.length];

int length = array.length;
for ( int i = 0;i< length;i++) {
//1.构造堆
buildHeap(array);
//构建完array[0]就是最大的元素,取出来放到排序好的数组
result[i] = array[0];
//[0]取走之后,一个新数组,再去构造堆
array = Arrays.copyOfRange(array, 1,length-i);
}
System.out.println("finish:"+ Arrays.toString(result));
}

/**
* 构造椎
* @param array
*/
static void buildHeap(int []array) {
//(n/2-1)~0的节点才有子节点
for (int i= array.length/2 -1 ;i>=0;i--) {
// System.out.println("i:"+i);
// System.out.println("start:"+ Arrays.toString(array));
int k = i;
for (int left = k * 2 + 1; left < array.length; left = k * 2 + 1) {
// System.out.println("build:"+ Arrays.toString(array));
int max = left;
int right = left + 1;
//有右节点
if (right < array.length) {
if (array[right] > array[left]) {
max = right;
}
}
//max是left子节点,那就交换左子节点与k
if (array[max] > array[k]) {
swap(array, max, k);
// 下面就是非常关键的一步了
// 如果子节点更换了,那么,以子节点为根的子树会不会受到影响呢?
// 所以,循环对子节点所在的树继续进行判断
k = left;
} else {
break;
}
}
}
//System.out.println(" end:"+ Arrays.toString(array));
}

详细代码:
https://github.com/zhuxingsheng/javastudy/tree/master/src/main/java/com/jack/algorithms/sort

复杂度

Best Average Worst Memory Stable
n log(n) n log(n) n log(n) 1 No

参考资料

图解排序算法(三)之堆排序

图解堆排序

没有一身好内功,招式再多都是空;算法绝对是防身必备,面试时更是不可或缺;跟着算法渣一起从零学算法

定义

选择排序(Selection sort)是一种简单直观的排序算法。它的工作原理是每一次从待排序的数据元素中选出最小(或最大)的一个元素,存放在序列的起始位置,然后,再从剩余未排序元素中继续寻找最小(大)元素,然后放到已排序序列的末尾。以此类推,直到全部待排序的数据元素排完。 选择排序是不稳定的排序方法。

算法

n个记录的文件的直接选择排序可经过n-1趟直接选择排序得到有序结果:

  1. 初始状态:无序区为R[1..n],有序区为空
  2. 第1趟排序
    在无序区R[1..n]中选出关键字最小的记录R[k],将它与无序区的第1个记录R[1]交换,使R[1..1]和R[2..n]分别变为记录个数增加1个的新有序区和记录个数减少1个的新无序区。
    ……
  3. 第i趟排序
    第i趟排序开始时,当前有序区和无序区分别为R[1..i-1]和R(i..n)。该趟排序从当前无序区中选出关键字最小的记录 R[k],将它与无序区的第1个记录R交换,使R[1..i]和R分别变为记录个数增加1个的新有序区和记录个数减少1个的新无序区。

演示

image

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static void selectSort(int []array) {
for (int i=0;i<array.length-1;i++) {
int min = i;
for (int j = i+1;j<array.length;j++){
if(array[min] > array[j]) {
min = j;
}
}
if( min != i) {
int tmp = array[i];
array[i] = array[min];
array[min] = tmp;
}

}
}

复杂度

比较次数:T = (n-1))+ (n -2)+(n - 3)…. + 1; T = [n*(n-1) ] / 2;

交换次数:

最好的情况全部元素已经有序,则交换次数为0;

最差的情况,全部元素逆序,就要交换 n-1 次;

空间复杂度

最优的情况下(已经有顺序)复杂度为:O(0)

最差的情况下(全部元素都要重新排序)复杂度为:O(n );

平均的时间复杂度:O(1)

Best| Average| Worst|Memory|Stable
— | — | — |——| :–: |—
n² | n² | n² | 1 | No

VS 插入排序

插入排序特别的相似

复杂度一样,但是插入排序和读入(初始)的数据有关,最优情况只有O(n);而选择排序不论如何,永远都是O(n^2)

插入排序是边读边排,每当读入一个新的数时,目前的数组一定是排好序的。而选择排序不同,它必须是读完所有的数据之后才能开始排序的。

那么选择排序的缺点就是,万一数据量很大,比方说一百万个,光读就慢了,还要排序,那就更慢了。如果这两个耗时的步骤可以并行一起做的话,显然插入排序肯定就会快一些。

没有一身好内功,招式再多都是空;算法绝对是防身必备,面试时更是不可或缺;跟着算法渣一起从零学算法

定义

希尔排序是希尔(Donald Shell)于1959年提出的一种排序算法。希尔排序也是一种插入排序,它是简单插入排序经过改进之后的一个更高效的版本,也称为缩小增量排序,同时该算法是冲破O(n2)的第一批算法之一

算法

直接插入排序很循规蹈矩,不管数组分布是怎么样的,依然一步一步的对元素进行比较,移动,插入,比如[5,4,3,2,1,0]这种倒序序列,数组末端的0要回到首位置很是费劲,比较和移动元素均需n-1次

是否能够减少这样的移位呢?不希望是一步一步的移动,而是大步大步的移动

希尔排序是把记录按下标的一定增量分组,对每组使用直接插入排序算法排序;随着增量逐渐减少,每组包含的关键词越来越多,当增量减至1时,整个文件恰被分成一组,算法便终止

初始时,有一个大小为 10 的无序序列

在第一趟排序中,我们不妨设 gap1 = N / 2 = 5,即相隔距离为 5 的元素组成一组,可以分为 5 组

接下来,按照直接插入排序的方法对每个组进行排序

在第二趟排序中,我们把上次的 gap 缩小一半,即 gap2 = gap1 / 2 = 2 (取整数)。这样每相隔距离为 2 的元素组成一组,可以分为 2 组

按照直接插入排序的方法对每个组进行排序

在第三趟排序中,再次把 gap 缩小一半,即gap3 = gap2 / 2 = 1。 这样相隔距离为 1 的元素组成一组,即只有一组

按照直接插入排序的方法对每个组进行排序。此时,排序已经结束

演示

  • 1.增量N/2=4分组

{35, 14}, {33, 19}, {42, 27} , {10, 44}

排序完:

  • 2.增量N/2/2=2分组

{14, 27, 35, 42}, {19, 10, 33, 44}

排序完:

  • 3.增量N/2/2/2=1分组排序

也可以通过动画更清晰了解整个排序过程

动画:http://www.zhuxingsheng.com/tools/sort/sort.html

分组排序后,他们的索引还是保留在原始索引中,来两个更加直观的动画演示

image

image

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static void shellSort(int []array) {
//以length/2为增量
for (int gap=array.length/2;gap>0;gap= gap/2) {
//以gap分组,进行排序
for(int i = gap;i<array.length;i++){
int tmp = array[i];
int j = i - gap;
//相对直接插入,步长从1变成了gap
while ( j>=0 && tmp<array[j] ) {
array[j+gap] = array[j];
j=j-gap;
}
array[j+gap] = tmp;
}
}
}

复杂度

希尔排序的时间复杂度与增量(即,步长gap)的选取有关

{1,2,4,8,…}这种序列并不是很好的增量序列,使用这个增量序列的时间复杂度(最坏情形)是O(n²)

Hibbard提出了另一个增量序列{1,3,7,…,2^k-1},这种序列的时间复杂度(最坏情形)为O(n^1.5)

Sedgewick提出了几种增量序列,其最坏情形运行时间为O(n^1.3),其中最好的一个序列是{1,5,19,41,109,…}

1
9×4^i−9×2i+1  和 4^i−3×2^i+1  这两个算式

其中, i=0,1,2,⋯ 时,第一个式子计算出的值分别为1,19,109,……; i=2,3,⋯ 时,第二个式子算出的值分别为5,41,……

Best| Average| Worst|Memory|Stable
— | —| —|——| :–: |—
n log(n) |depends on gap sequenc| n² | 1 | No

前言

其实不只在微服务中,在平常网络请求,或者与第三方系统进行交互都需要设置超时时间

为什么需要超时与重试? 总体上讲,肯定是为了增加系统可靠性,具体表现在两个方面

  1. 系统自我保护:
    快速失败,在业务最大允许等待时间内未收到返回数据,主动放弃等待,释放占用资源,避免请求不断累积带来的客户端雪崩效应
  2. 成功率:服务处理超时原因有很多,但常见的超时都是短暂的,主要是GC,或者有网络抖动等。这些短时间影响服务端状态的情况而造成请求成功率下降,需要补救措施。简单的补救有超时重试操作:当前请求超时后,将会重试到非当前服务器,降低重试超时的机率

这一篇将由浅入深探索timeout机制,以及在微服务下的实践

超时

经常被提起的两种超时:connection timeout、socket timeout

通过最底层的Socket,ServerSocket演示一下这两种超时的表现,nio框架都会有对应的配置选项

connectionTimeout

建立连接超时时间

客户端,随便写个IP,设置一个timeout

1
2
Socket socket = new Socket();
socket.connect(new InetSocketAddress("10.0.0.1",8080),20000);

在timeout时间到时,就会抛出connect timed out异常

1
2
3
4
5
6
7
8
9
Exception in thread "main" java.net.SocketTimeoutException: connect timed out
at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)

socketTimeout

Enable/disable SO_TIMEOUT with the specified timeout, in milliseconds. With this option set to a non-zero timeout, a read() call on the InputStream associated with this Socket will block for only this amount of time. If the timeout expires, a java.net.SocketTimeoutException is raised, though the Socket is still valid. The option must be enabled prior to entering the blocking operation to have effect. The timeout must be > 0. A timeout of zero is interpreted as an infinite timeout.

服务端,只要让客户端能连接上就行,不发送数据

1
2
3
4
5
ServerSocket serverSocket = new ServerSocket(8080);
while ( true) {
Socket socket = serverSocket.accept();
new Thread(new P(socket)).start();
}

客户端,进行读数据

1
2
3
Socket socket = new Socket();
socket.connect(new InetSocketAddress("localhost",8080),20000);
socket.setSoTimeout(3000);

3s后,就抛出Read timed out

1
2
3
4
5
6
java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.net.SocketInputStream.read(SocketInputStream.java:223)

nio

对NIO,看网上一些示例基本没有关注到这一点,所以值得思考,难道是nio不需要关注timeout?

客户端对服务器的连接:

1
2
3
4
Selector selector = Selector.open();
InetSocketAddress isa = new InetSocketAddress(host, port);
// 调用open静态方法创建连接到指定主机的SocketChannel
SocketChannel sc = SocketChannel.open(isa);

在调用SocketChannel.open()方法时,如果连接不上服务器,那么此方法就会长时间阻塞,为了解决这个问题,我们可以在调用open()方法前,启动一个定时器,这个定时器会在指定的时间内检查是否已连接成功,这个指定的时间也就是我们希望设置的连接超时时间,当检查已连接上服务器时,提示用户已连接成功;若没有连接上,可在代码中抛出SocketTimeoutException,并提示用户连接超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void connect(){
try{
selector = Selector.open();
InetSocketAddress isa = new InetSocketAddress(host, port);
//10秒连接超时
new Timer().schedule(tt, 10000);
// 调用open静态方法创建连接到指定主机的SocketChannel
sc = SocketChannel.open(isa);
// 设置该sc以非阻塞方式工作
sc.configureBlocking(false);
// 将Socketchannel对象注册到指定Selector
sc.register(selector, SelectionKey.OP_READ);
Message msg = new Message();
msg.what = 0;
msg.obj = sc;
handler.sendMessage(msg); // 连接成功
new Thread(new NIOReceiveThread(selector, handler)).start();
}catch (IOException e) {
e.printStackTrace();
handler.sendEmptyMessage(-1); // IO异常
}
}
TimerTask tt = new TimerTask(){
@Override
public void run(){
if (sc == null || !sc.isConnected()){
try{
throw new SocketTimeoutException("连接超时");
}catch (SocketTimeoutException e){
e.printStackTrace();
handler.sendEmptyMessage(-6); // 连接超时
}
}
}
};

在stackoverflow上有人回答了Read timeout for an NIO SocketChannel?

  1. You are using a Selector, in which case you have a select timeout which you can play with, and if it goes off (select(timeout) returns zero) you close all the registered channels, or
  2. You are using blocking mode, in which case you might think you should be able to call Socket.setSoTimeout() on the underlying socket (SocketChannel.socket()), and trap the SocketTimeoutException that is thrown when the timeout expires during read(), but you can’t, because it isn’t supported for sockets originating as channels, or
  3. You are using non-blocking mode without a Selector, in which case you need to change to case (1).

netty

netty业界公认的高成熟nio产品,也是大多数微服务底层使用的通信框架,内部细节值得挖一挖处理方式,篇幅有限,另开篇深挖

先看在微服务产品中的使用

connectionTimeout

这种场景很简单,在使用netty时,对应的配置选项

1
2
Bootstrap b = new Bootstrap();
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)

socketTimeout

这种场景各个框架在处理里,手法不尽相同,各有特色

motan

Motan是一套高性能、易于使用的分布式远程服务调用(RPC)框架,新浪微博开源。

之前解读过motan源码,《motan客户端解析》,《motan服务端解析》

NettyChannel.request

  1. 获取此method的配置timeout
  2. 在write时,awaitUninterruptibly
  3. 超时就抛出timeoutException
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public Response request(Request request) throws TransportException {
int timeout = nettyClient.getUrl().getMethodParameter(request.getMethodName(), request.getParamtersDesc(),
URLParamType.requestTimeout.getName(), URLParamType.requestTimeout.getIntValue());
if (timeout <= 0) {
throw new MotanFrameworkException("NettyClient init Error: timeout(" + timeout + ") <= 0 is forbid.",
MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
}
NettyResponseFuture response = new NettyResponseFuture(request, timeout, this.nettyClient);
this.nettyClient.registerCallback(request.getRequestId(), response);

ChannelFuture writeFuture = this.channel.write(request);

boolean result = writeFuture.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS);

if (result && writeFuture.isSuccess()) {
response.addListener(new FutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
if (future.isSuccess() || (future.isDone() && ExceptionUtil.isBizException(future.getException()))) {
// 成功的调用
nettyClient.resetErrorCount();
} else {
// 失败的调用
nettyClient.incrErrorCount();
}
}
});
return response;
}

writeFuture.cancel();
response = this.nettyClient.removeCallback(request.getRequestId());

if (response != null) {
response.cancel();
}

// 失败的调用
nettyClient.incrErrorCount();

if (writeFuture.getCause() != null) {
throw new MotanServiceException("NettyChannel send request to server Error: url="
+ nettyClient.getUrl().getUri() + " local=" + localAddress + " "
+ MotanFrameworkUtil.toString(request), writeFuture.getCause());
} else {
throw new MotanServiceException("NettyChannel send request to server Timeout: url="
+ nettyClient.getUrl().getUri() + " local=" + localAddress + " "
+ MotanFrameworkUtil.toString(request));
}
}

注意到其中的NettyResponseFuture,看名字就明白是个Future模式,在《代码小析 - 异步回调》中有分析过

接收到服务端返回

1
2
3
4
5
6
7
8
9
10
11
12
13
NettyResponseFuture responseFuture = NettyClient.this.removeCallback(response.getRequestId());
if (responseFuture == null) {
LoggerUtil.warn(
"NettyClient has response from server, but resonseFuture not exist, requestId={}",
response.getRequestId());
return null;
}

if (response.getException() != null) {
responseFuture.onFailure(response);
} else {
responseFuture.onSuccess(response);
}

获取真实结果NettyResponseFuture.getValue()

通过long waitTime = timeout - (System.currentTimeMillis() - createTime);计算一下真正的wait时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public Object getValue() {
synchronized (lock) {
if (!isDoing()) {
return getValueOrThrowable();
}

if (timeout <= 0) {
try {
lock.wait();
} catch (Exception e) {
cancel(new MotanServiceException("NettyResponseFuture getValue InterruptedException : "
+ MotanFrameworkUtil.toString(request) + " cost="
+ (System.currentTimeMillis() - createTime), e));
}

// don't need to notifylisteners, because onSuccess or
// onFailure or cancel method already call notifylisteners
return getValueOrThrowable();
} else {
long waitTime = timeout - (System.currentTimeMillis() - createTime);

if (waitTime > 0) {
for (;;) {
try {
lock.wait(waitTime);
} catch (InterruptedException e) {
}

if (!isDoing()) {
break;
} else {
waitTime = timeout - (System.currentTimeMillis() - createTime);
if (waitTime <= 0) {
break;
}
}
}
}

if (isDoing()) {
timeoutSoCancel();
}
}
return getValueOrThrowable();
}
}

totalTimeout

有了timeout基本已经满足需求,但这儿再提一个totalTimeout,为什么需要一个总超时时间

比如客户端希望服务端在60ms内返回,由于成功率要求必须加一次重试,但是设置超时时间30ms重试一次会很浪费(绝大部分重试很快,但预留了30ms则压缩了初次调用的时间)。设置40ms重试一次就可能出现整体耗时大于60ms。所以希望可以配置整体超时时间为60ms,单次40ms加重试一次,这样既充分利用看重试机会也不会导致整体超过60ms

一次服务调用的正常请求的最长时间为:timeout * failovertimes + success_invocation_time

比如某个服务的timeout为100ms,failovertimes为1,正常调用的平均耗时为10ms,那么它的上游在重试情况下的耗时就是:乐观估计100 * 1+10=110ms;悲观估计100 * 1+100=200ms

为了保证总体超时时间,只能把单次超时时间压缩,使得某些情况下可能不需求重试的场景也进行了重试

对比一下,设置totalTimeout与不设置的情况:

  1. 某服务通常能在20ms返回,但是因为某些意外(比如gc),连续两次都要40ms
  • 只能设置单次timeout的情况下,timeout=30ms,failovertimes=1
    因此对于client来说,它看到的调用耗时就是:30ms(超时) + 30ms(超时) = 60ms
  • 分开设置首次超时和总体超时的情况下,timeout=40ms,total_timeout=60ms,failovertimes=1
    因此对于client来说,它看到的调用耗时就是:40ms(超时)+ (60ms-40ms)(超时) = 60ms
  1. 某服务通常能在20ms返回,但是因为某些意外(比如gc),这次需要35ms才能返回。
  • 只能设置单次timeout的情况下,timeout=30ms,failovertimes=1。
    因此对于client来说,它看到的调用耗时就是:30ms(超时) + 20ms = 50ms
  • 分开设置首次超时和总体超时的情况下,timeout=40ms,total_timeout=60ms,failovertimes=1。
    因此对于client来说,它看到的调用耗时就是:35ms(正常返回) = 35ms

重试

因某个服务实例短暂状态不佳而造成的超时,使用重试处理可以让请求转向其他服务实例的做法可以很好改善非集中式问题的成功率。

但如果超时重试只做简单的重试策略:有超时便重试,这样可能会导致服务端的崩溃。

例如:当前基础组件(如db)压力过大而造成超时,如果一律重试的话,会导致服务端集群实际接受请求量翻倍,这会使得基础组件压力无减反增,可能会导致其最终崩溃

实现

  • 思路简单,配置重试次数,出现非业务异常就重试,达到次数上限或者中途成功结束
  • 重试限流,重试造成雪崩的可能性,所以重试需要控制流量

motan

之前解读过motan源码,《motan客户端解析》,motan的failover就是这么处理的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 先使用method的配置
int tryCount =
refUrl.getMethodParameter(request.getMethodName(), request.getParamtersDesc(), URLParamType.retries.getName(),
URLParamType.retries.getIntValue());
// 如果有问题,则设置为不重试
if (tryCount < 0) {
tryCount = 0;
}

for (int i = 0; i <= tryCount; i++) {
Referer<T> refer = referers.get(i % referers.size());
try {
request.setRetries(i);
return refer.call(request);
} catch (RuntimeException e) {
// 对于业务异常,直接抛出
if (ExceptionUtil.isBizException(e)) {
throw e;
} else if (i >= tryCount) {
throw e;
}
LoggerUtil.warn(String.format("FailoverHaStrategy Call false for request:%s error=%s", request, e.getMessage()));
}
}

超时重试

但像我司框架就没有这样处理,只关注超时重试,因为超时重试主要是解决因偶尔短暂状态不佳而对成功率造成的影响,所以把重点放在处理短暂处于超时状态超时请求,对于长时间处于较大量的超时状态时,将选择不进行重试fail fast

限流

主要是防止重试次数过多,引起系统雪崩,有必要进行一下限流

《微服务-熔断机制》中提到过限流算法,细节可参考《计数器算法》

参考资料

Read timeout for an NIO SocketChannel?

没有一身好内功,招式再多都是空;算法绝对是防身必备,面试时更是不可或缺;跟着算法渣一起从零学算法

定义

插入排序的基本操作就是将一个数据插入到已经排好序的有序数据中,从而得到一个新的、个数加一的有序数据,算法适用于少量数据的排序

它的基本思想是:每步将一个待排序的记录,按其关键码值的大小插入前面已经排序的文件中适当位置上,直到全部插入完为止

联想打扑克牌时理顺手中牌的时候,你从左往右依次检查你的牌,并将其和前面的牌进行比较,然后将其插入正确的位置

理解起来,比《快速排序》容易多了

算法

  1. 设置监视哨r[0],将待插入记录的值赋值给r[0];
  2. 设置开始查找的位置j;
  3. 在数组中进行搜索,搜索中将第j个记录后移,直至r[0].key≥r[j].key为止;
  4. 将r[0]插入r[j+1]的位置上。

演示

插入排序

也可以通过动画更清晰了解整个排序过程

动画:http://www.zhuxingsheng.com/tools/sort/sort.html

实现

插入排序包括:直接插入排序,二分插入排序(又称折半插入排序),链表插入排序,希尔排序(又称缩小增量排序)

1
2
3
4
5
6
7
8
9
10
11
12
static void insertSort1(int []sorts) {
for(int i = 1;i<sorts.length;i++) {
int tmp = sorts[i];//哨兵
int j= i-1;
while(j >=0 && tmp < sorts[j]) {
sorts[j+1] = sorts[j];//比tmp大的全部往右移动
j--;
}
//别的移完位置,把哨兵放到正确的位置
sorts[++j] = tmp;
}
}

复杂度

当问题规模为n时

最好情况(原本就是有序的)

比较次数:Cmin=n-1

移动次数:Mmin=0

最差情况(逆序)

比较次数:Cmax=2+3+4+……+n=(n+2)n/2

移动次数:Mmax=1+2+3+……+n-1=n*n/2

Best| Average| Worst|Memory|Stable
— | —| —|——| :–: |—
n |n^2| n^2 | 1 | Yes

没有一身好内功,招式再多都是空;算法绝对是防身必备,面试时更是不可或缺;跟着算法渣一起从零学算法

定义

快速排序(Quicksort)是对冒泡排序的一种改进

快速排序由C. A. R. Hoare在1962年提出。

它的基本思想是:通过一趟排序将要排序的数据分割成独立的两部分,其中一部分的所有数据都比另外一部分的所有数据都要小,然后再按此方法对这两部分数据分别进行快速排序,整个排序过程可以递归进行,以此达到整个数据变成有序序列

算法

设要排序的数组是A[0]……A[N-1],首先任意选取一个数据(通常选用数组的第一个数)作为关键数据,然后将所有比它小的数都放到它前面,所有比它大的数都放到它后面,这个过程称为一趟快速排序。值得注意的是,快速排序不是一种稳定的排序算法,也就是说,多个相同的值的相对位置也许会在算法结束时产生变动

一趟快速排序的算法是:

  1. 设置两个变量i、j,排序开始的时候:i=0,j=N-1;
  2. 以第一个数组元素作为关键数据,赋值给key,即key=A[0];
  3. 从j开始向前搜索,即由后开始向前搜索(j–),找到第一个小于key的值A[j],将A[j]和A[i]互换;
  4. 从i开始向后搜索,即由前开始向后搜索(i++),找到第一个大于key的A[i],将A[i]和A[j]互换;
  5. 重复第3、4步,直到i=j; (3,4步中,没找到符合条件的值,即3中A[j]不小于key,4中A[i]不大于key的时候改变j、i的值,使得j=j-1,i=i+1,直至找到为止。找到符合条件的值,进行交换的时候i, j指针位置不变。另外,i==j这一过程一定正好是i+或j-完成的时候,此时令循环结束)。

演示

上面的定义,算法都是很学术的表达,看得有点晕了,看一个示例:

对数组[2,10,8,22,34,5,12,28,21,11]进行快速排序

快速排序

也可以通过动画更清晰了解整个排序过程

动画:http://www.zhuxingsheng.com/tools/sort/sort.html

实现

快速排序有好些实现手法,左右指针法、挖坑法、前后指针(快慢指针)法

左右指针法

算法过程与上面的演示图差不多

  1. 先从数列中取出一个数作为中轴基数。
  2. 分区过程,将比这个数大的数全放到它的右边,小于或等于它的数全放到它的左边。
  3. 再对左右区间重复第二步,直到各区间只有一个数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* 分治法,从小到大排序
* @param array
*/
public static int partition(int []array,int start,int end) {

//取一位做为基数
int pivot = array[end];//这儿取最后一位作为基数了

int left = start;
int right = end-1;//从倒数第二位开始比较

//从两头向中间搜索,从小到大排序
while (left < right) {
//从left端开始搜索
while(left < right && array[left] <= pivot) {
left ++;
}
//从right端开始搜索
while (left < right && array[right] >= pivot) {
right -- ;
}
//两数交换,大的放到右边,小的放到左边
if(left < right) {
swap(array,left,right);
left++;
right--;
}
}
//
if(left != end && array[left] > array[end] ) {
swap(array,left,end);
return left;
}
//right == left
return left+1;
}

/**
* 分治法,从小到大排序
* @param array
* @param start
* @param end
*/
private static void quicSort(int []array,int start,int end){
if( start < end ){
int partition = partition(array,start,end);
print(array);
quicSort(array,start,partition-1);
quicSort(array,partition+1,end);
}
}

挖坑法

挖坑填坑,也可以叫拆东墙补西墙

先演示一番,对[22,12,28,21,14]进行排序

index: 0 1 2 3 4
data: 22 12 28 21 14

第一步:挖最右边的数为坑

坑: pit=array[right]=14;左索引: left=0,右索引: right=3,

index: 0 1 2 3 4
data: 22 12 28 21 X

从left开始搜索,寻找比坑大的数,发现array[left=0]>14,那就用left=0 填 right=4的坑

array[right=4]=array[left=0];left++;

此时数组:

index: 0 1 2 3 4
data: X 12 28 21 22

left=1,right=3

第二步: 从right端开始找比pit小的数,发现array[1] < 14,那就用right=1 填 left=0的坑

array[left=0]= array[right=1];

此时数组:

index: 0 1 2 3 4
data: 12 X 28 21 22

left=1,right=0; left>right,那此回合结束,用pit填回坑array[left=1]

结束时:

index: 0 1 2 3 4
data: 12 14 28 21 22

发现14左都是小于它的数,右边都是大于它的数

以14为分界,两边数组进行递归下去,就行了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
int partion(int arr[], int begin, int end)
{
int pit = arr[end];//挖最右数,留一个坑
int left = begin;
int right = end;

while (left < right)
{
//从最左开始搜索比坑大的数
while (left < right && arr[left] <= pit)
left++;
if (left<right) {
arr[right] = arr[left];//拿left去填坑,left成为新坑
}
while (left < right && arr[right] >= pit)
right--;
if (left < right) {
arr[left] = arr[right];//right去填left坑,left成为新坑
}
}

arr[left] = pit;//用key填坑
return left;
}

void QuickSort(int arr[], int left, int right)
{
if (left < right)
{
int mid = partion(arr, left, right);
print(arr);
QuickSort(arr, left, mid - 1);
QuickSort(arr, mid + 1, right);
}
}

前后指针法

大体思路:

  1. 定义两个指针,一前一后,cur(前)指向起始位置,prev(后)指向cur的前一个位置;选择数组最后一个元素作为key(right)
  2. cur找比key小的数,prev在cur没有找到的情况下,一直不动(即保证prev一直指向比key大的数);直到cur找到比key小的数(+ +prev && prev != cur时)然后++cur,prev仍然不动
  3. 直到cur走到right前一个位置,终止循环。最后++prev,交换prev和right的值。返回中间位置prev。最后再继续递归

前后指针法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
int partion(int arr[], int left, int right)
{
int key = arr[right];//取最后一位为key

int cur = left;//当前指针
int prev = left - 1;//前一个指针

while (cur < right)
{
if(arr[cur] < key && ++prev != cur){//发现比key小的数
swap(arr,cur,prev);
}
cur++;
}
//最后++prev交换
swap(arr,++prev,cur);
return prev;
}

void QuickSort(int arr[], int left, int right)
{
if (left < right)
{
int mid = partion(arr, left, right);
print(arr);
QuickSort(arr, left, mid - 1);
QuickSort(arr, mid + 1, right);
}
}

分治

以上几个方法,不管怎样的思路,都是寻找一个标准数,让它成为一个分界,大于所有左边的数,小于所有右边的数,进行分而治之

复杂度

最好情况:最在最好的情况下,每次的划分都会恰好从中间将序列划分开来,那么只需要lgn次划分即可划分完成,是一个标准的分治算法Cn=2Cn/2+N,每一次划分都需要比较N次。时间复杂度O(nlogn)

最坏情况:在最坏的情况下,即序列已经排好序的情况下,每次划分都恰好把数组划分成了0,n两部分,那么需要n次划分,但是比较的次数则变成了n, n-1, n-2,….1, 所以整个比较次数约为n(n-1)/2~n2/2

比较和移动次数最少时间复杂度表示为O(nlogn);

比较和移动次数最多的时间复杂度表示为O(n^2);

使用的辅助存储空间最少为logn,最多为n^2;是不稳定的排序;

Best| Average| Worst|Memory|Stable
— | —| —|——| :–: |—
nlog(n) |nlog(n)| n^2 | log(n) | No

没有一身好内功,招式再多都是空;算法绝对是防身必备,面试时更是不可或缺;跟着算法渣一起从零学算法

定义

冒泡排序(Bubble Sort),是一种计算机科学领域的较简单的排序算法。

这个算法的名字由来是因为越大的元素会经由交换慢慢“浮”到数列的顶端(升序或降序排列),就如同碳酸饮料中二氧化碳的气泡最终会上浮到顶端一样,故名“冒泡排序”

算法

它重复地走访过要排序的元素列,依次比较两个相邻的元素,如果他们的顺序(如从大到小、首字母从A到Z)错误就把他们交换过来。走访元素的工作是重复地进行直到没有相邻元素需要交换,也就是说该元素已经排序完成。

演示

对数组[5,6,3,1,8,7,2,4]进行冒泡排序

BubbleSort

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 传统写法
* @param sorts
*/
private static void bubbleSort(int [] sorts){
for (int i=0;i<sorts.length-1;i++) {
for (int j= 0;j<sorts.length-1-i;j++) {
if(sorts[j] < sorts[j+1]) {
swap(sorts,j,j+1);
}
}
}
}

private static void swap(int []sorts,int i,int j){
int tmp = sorts[i];
sorts[i] = sorts[j];
sorts[j] = tmp;
}

改进

有序

如果一个数组本身就是有序的,或者部分已经有序

比如:[8,7,6,5,4,3,1,2]

假如现在有一个长度10000的数组,前1000无序,后9000有序并且大于前1000数据。用上面这种方法,数据交换次数在1000之内,但是剩下9000的数据虽然没有交换,但是每次循环都会进行比较。剩下9000数据已经有序了,我们不要对它们去进行判断浪费不必要的时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 如果部分已经排序,设置一个flag,有序就跳出循环
* @param sorts
*/
private static void bubbleSort1(int [] sorts){
for (int i=0;i<sorts.length-1;i++) {
boolean flag = false;
for (int j= 0;j<sorts.length-1-i;j++) {
if(sorts[j] < sorts[j+1]) {
swap(sorts,j,j+1);
flag = true;
}
}
if(!flag) {
break;
}
}
}

鸡尾酒排序

鸡尾酒排序等于是冒泡排序的轻微变形。不同的地方在于从低到高然后从高到低,而冒泡排序则仅从低到高去比较序列里的每个元素。他可以得到比冒泡排序稍微好一点的效能,原因是冒泡排序只从一个方向进行比对(由低到高),每次循环只移动一个项目。

算法

排序过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 鸡尾酒排序,定向冒泡排序
* @param sorts
*/
private static void bubbleSort2(int [] sorts){
int i = 0;
int j = sorts.length-1;
while (i <= j) {
for(int s = i;s<j;s++) {//从左往右
if(sorts[s] < sorts[s+1]) {
swap(sorts, s, s + 1);
}
}

System.out.println(JSON.toJSONString(sorts));
for(int e = j;e>i;e--) {//从右往左
if(sorts[e] > sorts[e-1]) {
swap(sorts, e, e - 1);
}
}
j--;i++;
System.out.println(JSON.toJSONString(sorts));
}
}

总体上,鸡尾酒排序可以获得比冒泡排序稍好的性能。但是完全逆序时,鸡尾酒排序与冒泡排序的效率都非常差

时间复杂度

  1. 如果我们的数据正序,只需要走一趟即可完成排序。所需的比较次数C和记录移动次数M均达到最小值,即:Cmin=n-1;Mmin=0;所以,冒泡排序最好的时间复杂度为O(n)。

  2. 如果很不幸我们的数据是反序的,则需要进行n-1趟排序。每趟排序要进行n-i次比较(1≤i≤n-1),且每次比较都必须移动记录三次来达到交换记录位置。在这种情况下,比较和移动次数均达到最大值:

Best| Average| Worst|Memory|Stable
— | —| —|——| :–: |—
n | n^2 |n^2| 1 | Stable

天下皆知美之为美,斯恶已;此专栏本想取名代码之美,但有傍名之嫌,也给别误解,所以就叫代码小析吧,看到一段好代码,思路清奇,奇巧淫技,拿出来鉴赏一番

之前是计划one week one alogrithm,结果算法是个短板,不仅要理解,还得再写出代码,特别烧脑,所以中间穿插一下,换换脑子

之前有类似一篇《仅且仅创建一次对象》

最近看到一个段子:

老板有毛病吧,写完排序就叫我走人,我还嫌你这9K工资低了呢

感觉能想到这思路的也算清奇,哈哈!

回调

if you call me, i will call back

回调分类:同步回调,异步回调

场景

建立TCP连接是很耗时的,所以在创建Socket Channel时,可以通过异步回调方式解决

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/**
* 异步取得channel
* @param index
* @param callback
*/
public void asynGetChannel(int index,final Callback callback) {
// 1. 随机获取一条channel
final int pos = ThreadLocalRandom.current().nextInt(MAX_CONNECTIONS);
Channel target = channels[pos];

// 2. 如果获取到了连接,直接返回
if (target != null && target.isActive()) {
logger.info("direct success "+index);
callback.onSuccess(target);
return;
}

synchronized (locks[pos]) {
target = channels[pos];
// 2. 如果获取到了连接,直接返回
if (target != null && target.isActive()) {
callback.onSuccess(target);
return;
}

// 3.如果连接正在创建中,则加入queue
if (target instanceof EmptyChannel) {
boolean result = jobs.offer(callback);
if (result) {
return;
} else {
throw new RuntimeException("Can't connet to target server and the waiting queue is full");
}
}

// 4. 连接尚未创建
channels[pos] = new EmptyChannel();

Connector.connect(host, port, new Callback() {
@Override
public void onSuccess(Channel channel) {
logger.info(index + " ------------connect success---------"+pos + " channel:" +channels[pos].getClass().getName());
List<Callback> tmpJobs;//建立一个tempJobs,快速释放锁
synchronized (locks[pos]) {
// 设置channels,拷贝jobs队列,释放锁
channels[pos] = channel;
tmpJobs = drainJobs();
}
for(Callback pendingCallback : tmpJobs) {
try {
if(pendingCallback != callback) {
pendingCallback.onSuccess(channel);
}
} catch (Exception e) {
logger.error("call connectionCallback fail", e);
}
}
}

@Override
public void onError(Throwable e) {
List<Callback> tmpJobs;//建立一个tempJobs,快速释放锁
synchronized (locks[pos]) {
// 设置channels,拷贝jobs队列,释放锁
channels[pos] = null;
tmpJobs = drainJobs();
}
for(Callback pendingCallback : tmpJobs) {
try {
if(pendingCallback != callback) {
pendingCallback.onError(e);
}
} catch (Exception x) {
logger.error("call connectionCallback fail", x);
}
}
}
});
}
}

完整的代码:https://github.com/zhuxingsheng/javastudy

亮点

思路很简单,亮点就在于job队列,连接在没有建立成功时,会先建立一个EmptyChannel,有些类似lazy load中的影子对象放到队列中,不造成阻塞,当channel建立完成后,回调

VS Future模式

异步回调的套路与Future模式特别类似

1
2
3
4
Future future = doTask1();
doTask2();
doTask3();
Result result = future.get();

Future 模式中,一个任务的启动和获取结果分成了两部分,启动执行是异步的,调用后立马返回,调用者可以继续做其他的任务,而等到其他任务做完,再获取Future的结果,此时调用 get 时是同步的,也就是说如果 doTask1 如果还没有做完,等它做完。

看出最大区别,异步回调不需要返回值,准确说调用者不用太关心返回值,甚至不需要关心真正执行情况,而future模式就不一样了,调用者是一定要拿到返回值的

参考

同步调用,异步回调和 Future 模式

算法虐我千万遍,我待算法如初恋;IT人永远逃脱不了的算法

概念

算法是特定问题求解步骤的描述,在计算机中表现为指令的有限序列

算法是独立存在的一种解决问题的方法和思想

对于算法而言,实现的语言并不重要,重要的是思想

特性

  1. 输入: 算法具有0个或多个输入
  2. 输出: 算法至少有1个或多个输出
  3. 有穷性: 算法在有限的步骤之后会自动结束而不会无限循环,并且每一个步骤可以在可接受的时间内完成
  4. 确定性:算法中的每一步都有确定的含义,不会出现二义性
  5. 可行性:算法的每一步都是可行的,也就是说每一步都能够执行有限的次数完成

复杂度

一个算法的优劣主要从算法的执行时间和所需要占用的存储空间两个方面衡量

时间复杂度

定义:如果一个问题的规模是n,解这一问题的某一算法所需要的时间为T(n),它是n的某一函数T(n)称为这一算法的“时间复杂性”

1
T(n) = O(f(n))

当输入量n逐渐加大时,时间复杂性的极限情形称为算法的“渐近时间复杂性”。

比如排序算法:可用算法执行中的数据比较次数与数据移动次数来衡量

空间复杂度

对一个算法在运行过程中临时占用存储空间大小的量度,记做S(n)=O(f(n))。

比如直接插入排序,需一个监视哨兵,空间复杂度是O(1) ,而一般的递归算法就要有O(n)的空间复杂度了,因为每次递归都要存储返回信息。

大O表示法

大O表示法被用来描述一个算法的性能或复杂度。大O表示法可以用来描述一个算法的最差情况,或者一个算法执行的耗时或占用空间(例如内存或磁盘占用)

假设一个算法的时间复杂度是 O(n),n在这里代表的意思就是数据的个数。

举个例子,如果你的代码用一个循环遍历 100 个元素,那么这个算法就是 O(n),n 为 100,所以这里的算法在执行时就要做 100 次工作

大O表示法就是将算法的所有步骤转换为代数项,然后排除不会对问题的整体复杂度产生较大影响的较低阶常数和系数,只关心复杂度最重要的部分

1
2
3
4
5
6
7
规律       Big-O

2 O(1) --> 就是一个常数

2n + 10 O(n) --> n 对整体结果会产生最大影响

5n^2 O(n^2) --> n^2 具有最大影响

O(log n),即对数复杂度(logarithmic complexity)。对数可以是ln(底数为e),log10,log2 或者以其它为底数,这无关紧要,它仍然是O(log n),正如O(2n^2) 和 O(100n^2) 都记为 O(n^2)。

示例

O(1)

O(1)表示该算法的执行时间(或执行时占用空间)总是为一个常量,不论输入的数据集是大是小

1
2
3
4
bool IsFirstElementNull(IList<string> elements)
{
return elements[0] == null;
}

O(N)

O(N)表示一个算法的性能会随着输入数据的大小变化而线性变化。下面的例子同时也表明了大O表示法其实是用来描述一个算法的最差情况的:在for循环中,一旦程序找到了输入数据中与第二个传入的string匹配时,程序就会提前退出,然而大O表示法却总是假定程序会运行到最差情况(在这个例子中,意味着大O会表示程序全部循环完成时的性能)

1
2
3
4
5
6
7
8
9
bool ContainsValue(IList<string> elements, string value)
{
foreach (var element in elements)
{
if (element == value) return true;
}

return false;
}

O(n²)

for循环嵌套的复杂度就是二次方的,因为你在一个线性操作里执行另外一个线性操作(或者说: n*n =n² )

如果嵌套层级不断深入的话,算法的性能将会变为O(N^3),O(N^4),以此类推

1
2
3
4
5
6
7
8
9
10
for (var outer = 0; outer < elements.Count; outer++)
{
for (var inner = 0; inner < elements.Count; inner++)
{
// Don't compare with self
if (outer == inner) continue;

if (elements[outer] == elements[inner]) return true;
}
}

O(2^N)

O(2^N)表示一个算法的性能将会随着输入数据的每次增加而增大两倍。O(2^N)的增长曲线是一条爆炸式增长曲线——开始时较为平滑,但数据增长后曲线增长非常陡峭。一个典型的O(2^N)方法就是裴波那契数列的递归计算实现

1
2
3
4
5
6
int Fibonacci(int number)
{
if (number <= 1) return number;

return Fibonacci(number - 2) + Fibonacci(number - 1);
}

(logn)

1
2
3
i=1;       
while (i<=n)
i=i*2;

比较

1
O(1)<O(logn)<O(n)<O(nlogn)<O(n^2)<O(n^3)<O(2^n)<O(n!)<O(n^n)

Big-O Complexity

引申阅读

算法渣-排序-冒泡

算法渣-排序-快速排序

算法渣-排序-插入

算法渣-排序-希尔