辉夜的博客

繁花似锦,辉夜如昼

0%

我们将讨论表示泄露的几种情况,以及如何防止表示泄露
原创性声明:每一个字(除了代码和引用)都是我手敲的。

表示泄露的定义

表示泄露,即Representation Exposure,指的是client端程序可以通过某种手段
查询、访问、修改ADT的内部结构
我们将通过一些例子来感受这个定义。

最基本的表示泄露-来自访问权限

在java的设计中,一个字段的访问权限有四种。
访问权限
为了避免表示泄露,如果一个字段可以是private,那么它就应当是private
下面我们来看一个因为访问权限导致表示泄露的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* This immutable data type represents a tweet from Twitter.
*/
public class Tweet {

public String author;
public String text;
public Date timestamp;

/**
* Make a Tweet.
* @param author Twitter user who wrote the tweet
* @param text text of the tweet
* @param timestamp date/time when the tweet was sent
*/
public Tweet(String author, String text, Date timestamp) {
this.author = author;
this.text = text;
this.timestamp = timestamp;
}
}

对于这个结构,我们在创建之后可以任意访问修改它的字段,因为字段被声明为public:

1
2
3
4
Tweet t = new Tweet("justinbieber", 
"Thanks to all those beliebers out there inspiring me every day",
new Date());
t.author = "rbmllr";

你可能会想:为什么改变一个对象的字段被认为是一种“泄露”呢?我们从软件构造的3个角度来回答:

  1. safe from bug: 通过赋值改变一个字段可能会导致错误,例如赋值不匹配的类型或无意义的类型。
  2. esay to understand: 赋值一个字段的行为可能是令人困惑的,尤其是当各个字段的含义并不是那么清楚的时候。
  3. ready for change:一旦对象的内部表示发生变化,所有的赋值都必须手动重写。如果用方法进行赋值,则可以很方便的重构。

此外,随意修改变量使得我们不能构造“immutable”的对象,这将让我们的程序充满了危险,我们不得不非常小心的应对每一次方法调用,
我们不知道返回之后的对象是否保持它原来的样子,这真是太可怕了!(译制腔)

为此,一个常见的方法是将所有字段声明为private, 并且如果要构建immutable对象的话,声明为private final.

稍微间接一点的表示泄露-来自传递引用

当我们把所有字段设置为私有,你可能松了口气:现在我的数据类型是安全的了。但还没结束——客户端总是需要了解ADT的“属性”,除非
这个ADT是一个纯粹的功能类(比如Math)。既然如此,我们必须为客户端服务,把ADT的内部表示转换为客户端感兴趣的属性。大部分情况下,属性和内部表示的对应关系是比较简单的,例如我们为Tweet类型增加获取作者、内容、发送时间的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/** @return Twitter user who wrote the tweet */
public String getAuthor() {
return author;
}

/** @return text of the tweet */
public String getText() {
return text;
}

/** @return date/time when the tweet was sent */
public Date getTimestamp() {
return timestamp;
}

看上去没什么问题,客户端不能直接访问我们的内部表示,因此如果我们要修改内部表示,例如把text分为title和content,我们只需要重写getText,比如返回title + content,而无需客户端做任何改动。

但是还存在这样的一种情况:客户端拿到了这个属性之后,又修改了这个属性,这会发生什么呢?
考虑下面的代码:

1
2
3
4
5
6
/** @return a tweet that retweets t, one hour later*/
public static Tweet retweetLater(Tweet t) {
Date d = t.getTimestamp();
d.setHours(d.getHours()+1);
return new Tweet("rbmllr", t.getText(), d);
}

客户端获取了发推时间后,把时间加了一个小时,又用这个时间创建了新的Tweet。这里的d是通过getTimestamp()获得的,而在d上调用了
setHours()方法将会改变d的值!因此结果是,t和新创建的Tweet现在都指向了一小时后的时间。
表示泄露-来自传递引用
这个例子中,我们的getter传递了一个引用类型Date,一个可变数据类型。因此,在引用d上调用mutator导致了表示泄露。
我们该怎样修改这个代码?通过一种称之为“防御性拷贝”的方式,创建引用的一个副本。这样对返回值的修改便不会影响到对象本身。具体来讲:

1
2
3
public Date getTimestamp() {
return new Date(timestamp.getTime());
}

现在,我们就避免了getter(observor的一种)造成的访问泄露。对于所有的observor方法(返回值非空),我们都需要做如下的检查:只要返回的值不是基本数据类型,并且是可变数据类型,那么就要进行防御式拷贝。反之,如果返回值是基本数据类型或者是不可边类型,则可以直接返回。

稍微间接一点的表示泄露-来自接收引用

引用类型始终是危险的:ADT内部的引用类型也可能被外部的客户端程序持有,这时一旦外部引用调用了mutator,ADT的内部表示也将同步发生变化。我们来看下面的例子:

1
2
3
4
5
6
7
8
9
10
/** @return a list of 24 inspiring tweets, one per hour today */
public static List<Tweet> tweetEveryHourToday () {
List<Tweet> list = new ArrayList<Tweet>();
Date date = new Date();
for (int i = 0; i < 24; i++) {
date.setHours(i);
list.add(new Tweet("rbmllr", "keep it up! you can do it", date));
}
return list;
}

这个例子中,我们试图创建24个Tweet,间隔1小时。然而,我们的构造器方法会直接将时间戳赋值给内部属性,因此当我们在循环中调用setHours时,所有Tweet内部的timestamp都将同步变化。最终,我们得到了24个时间相同的Tweet。

1
2
3
4
5
6
7
8
9
10
11
/**
* Make a Tweet.
* @param author Twitter user who wrote the tweet
* @param text text of the tweet
* @param timestamp date/time when the tweet was sent
*/
public Tweet(String author, String text, Date timestamp) {
this.author = author;
this.text = text;
this.timestamp = timestamp;
}

表示泄露-接收引用

为此,我们在接收引用时也要使用防御式拷贝。如下所示:

1
2
3
4
5
public Tweet(String author, String text, Date timestamp) {
this.author = author;
this.text = text;
this.timestamp = new Date(timestamp.getTime());
}

同样的,对基本数据类型和不可边数据类型,如String,不需要进行防御式拷贝。

总结

这就是MIT课件中关于表示泄露的内容。

适当的提醒自己学习的目的是有必要的。笔记的作者和理想中的读者应当是具有C语言编程的基础,对指针的概念有清晰理解,
正在学习面向对象编程并且希望理解类、对象的概念,特性和其大致实现原理。
本文的参考教材是java核心技术卷一

面向对象的编程

我很喜欢书中的一句话:对于面向过程而言,首先决定如何操作数据,然后再觉得如何组织数据的结构。而OOP却调换了这个次序,将数据放在第一位,然后再考虑组织数据的算法。
书中进一步提到,Web浏览器可能需要两千个过程对一组全局数据进行操作,而OOP只需要一百个含有20个方法的类,我对此存疑,但这不是重点。

一、类和对象

1.1 基本概念

是构造对象的模板或蓝图。由类构造对象的过程称为创建类的实例
对象中的数据称为实例字段,这些字段的值构成了对象的当前状态
操作数据的过程称之为方法,程序只能通过一个类的方法来与该类的数据交互,这实现了类的封装
可以通过保留一个类的全部属性和方法并扩展一些新的方法和数据字段的过程来创建一个新的类,这个过程称为继承
对象的方法集合构成了对象的行为,示例字段值得集合构成了对象的当前状态同一个类的不同对象具有不同的标识

1.2 编写OOP程序的原则

先识别类,再为类添加方法。重要的名词可能是类,重要的动词可能是类的方法。

1.3 类之间的关系

  1. 依赖关系
    一个类的方法操作或使用另一个类的对象,就称为这个类依赖于另一个类

  2. 聚合关系
    一个类的对象包含另一个类的对象

  3. 继承关系
    一个类继承并扩展了另一个类的功能

二、使用类的方法

使用对象要先构造对象,然后初始化对象,之后调用对象的方法。

2.1 构造对象

构造对象使用new运算符,这意味着为该对象分配存储空间。单独声明一个对象变量只是声明了一个指针(仔细思考一下,这意味着如果两个对象变量持有同一个对象的引用,通过任何一个对象变量调用方法都会影响另一个对象变量获得的数据):

1
2
3
4
5
6
Date birthday;          //unavailable
birthday = new Date(); //available

/*other usage*/
qaq(new Q()); //use constructed object as argument

2.2 成员

对对象变量使用.运算符,就可以在对象上取出对应的成员。如果是成员字段,将会获得这个字段的引用。如果是方法,那么可以调用这个方法并且该方法已经绑定到该对象上。
这是相对而言比较正常的语法。

2.3 方法

我们在java中将一个类的成员函数称之为“方法”,这是一个非常拗口的称呼,尽管大多数程序员并不这么认为(
通常使用的成员方法可以分为两类:

  • 更改器
  • 访问器
    在C++中,我们会使用const来标志访问器方法,从语法上限制这个函数修改对象内容的能力。然而,在java里我们无能为力。
    要问:这种区别有什么用呢?这涉及到后面重要的概念“不可变性”。

2.4 定义

如果要定义一个类,我们需要定义其数据和对数据的操作,也就是成员字段和成员方法。同时,我们还应该准备一系列构造器,以初始化这个类。
定义字段的方法为访问修饰符+类型+标识符+;
定义方法的方式为访问修饰符+函数原型+函数定义
构造器的原型没有返回值

主要介绍java和c语言的一些区别,参考课程浙江大学翁恺b站java语言基础

第一个java程序

java的开发环境基本上还是依赖ide的,因为java有比较复杂的依赖关系(至少对我而言比较复杂)。教学视频使用的是目前最广泛使用的eclipse,官网下载可以直接安装(2022年不再需要提前安装jre),汉化方式可以自行搜索。

java与C语言的第一个区别就在于”类”,和任何编程语言一样,java程序也有自己的执行入口,不过不是主函数,而是主类。我们在主类下编写的方法会被依次调用。一个类就是一个抽象数据类型,包含了数据和对数据的操作。这样的封装使得java和C++等面向对象的编程语言拥有与C语言截然不同的编程风格,会看到大量的标识符和”.”,取成员运算符。java中有许多自带的类,比如马上就会遇到的System类,执行java函数的时候如果跟进是可以直接跟进到System里的方法的,很有趣(另外很想吐槽类名不能自动补全实在是太糟糕了。)

同样java中的特色还有字符串类String,以及字符串对加号运算符的重载。所谓运算符的重载就是在一定的作用域内,把运算符替换为某个函数的执行。比如原来的加法可以看成有两个算数类型参数返回一个算数类型的add函数,而java中字符串类对加号的重载可以看成把加号对应的函数替换成了strcat函数,也就是连接两个字符串。这种情况下,如果想要保留算数运算就需要在算数变量被替换为字符串变量之前进行算数运算,最简单的方法就是在算术表达式周围加一个括号。

可以看到,类和运算符重载的概念赋予了面向对象语言极大的灵活和自由性。对于类的使用者来说,完全不需要关心类的背后发生了什么。System类背后对输入输出流的复杂处理被掩盖起来,String类进行的字符串连接也被简单的加号替代。这样的封装使得面向对象语言天然的适合用来描述某种问题(我目前并不能很好的概括出是哪种问题)。

C语言的函数也有类似的封装,然而,C语言并没有提供作用域层面的封装。我们可以在函数内部进行封装,但函数之间的数据传输必须通过清晰的传参返回来完成。C语言对访问类型(static/global)的控制可以说提供了封装的一种方案,也就是以编译单元为单位进行模块化的编程设计。不过,函数之间的数据依靠传参管理降低了函数之间的最大聚合程度,更会在系统框架变更时带来巨大的麻烦,运算符重载这种特技更是无从谈起。不过,C语言同样有其独特的好处:面向过程的控制流是清晰可见且高度解耦的,由于没有层层抽象,它还可以生成效率更高的代码。

java中的数组

java拥有美好的内存管理机制,我们可以使用new关键字分配一片地址,并且这片地址的大小是可变的。(不确定,但是new初始化的默认值全为0是很明显的静态存储特点)java中没有指针类型,而是用[]起到和指针类似的效果,于是我们会写出:

1
int[] a = new int[3];

这是一个数组的定义,也就是声明并且分配存储空间。事实上a相当于一个指针。当我们把两个数组类型的变量做比较时,我们实际上在比较他们是否指向同一个地址。如果要比较他们的内容,则要使用数组自带的成员方法equals().
数组自带许多的成员,我也不清楚这些成员是哪里来的。一个很有用的成员变量是a.length,它是a的长度,于是我们遍历数组时可以用:

1
2
3
4
for (int i = 0; i < a.length; i++)
{
System.out.println(a[i]);
}

这和C语言是完全类似的。这个成员变量被声明为final,类似于C中的const,因此它不能被随意修改,而是在创建数组时就确定了。。Java允许我们对可枚举的类型写出for循环的另外一种形式,被称为for-each循环,是全新的:

1
2
3
4
for (var i: a)
{
System.out.println(i);
}

这里的i是a的一个浅拷贝,刚刚给出的例子和一般的for循环的例子是完全等价的。不过需要注意的是,如果要改变数组中的元素的值,那么不能用for-each循环,因为for-each的循环相当于依次把a中元素的值赋给了i,不像a[i]是数组元素自身的标识符。可以想到,改变i的值的效果会在下一次循环时被清空。
事实上,for-each循环可以改写为:

1
2
3
4
5
6
7
/*for (var i: a)*/

int index = 0;
for(i = a[0] ; index < a.length;index++, i = a[index])
{
...
}

二维数组的形式和操作与C语言都是类似的,二维数组就是数组的数组。二维数组自身持有一个length成员,它的每一个元素作为一个数组也持有自己的成员。如果要遍历二维数组,可以写:

1
2
3
4
5
for(int i = 0; i < a.length; i++) {
for(int j = 0; j < a[i].length; j++) {
a[i][j] ...
}
}

更进一步地,如果探究

java中的字符与字符串

java中使用Unicode来表示字符,并且转义字符’\uxxxx’表示用十六进制表示的unicode字符码。除此之外关于字符的运算都是和C语言完全类似的。
字符串是java中一个很常用的类。创建字符串变量的方法据说有11种…让我们先看看比较常用的两种。

1
2
3
String a;
a = new String("qaq");
a = "qaq";

它提供了许多的方法,以及一种运算符重载。当其他类型和String类型进行运算时会被转换成String类型,所以要想避开加号重载就要在转换之前进行运算。常见的C语言函数都可以在java中找到对应的方法:

1
2
3
4
strcat -> +
strlen -> string.length();
strcmp -> string.compareto();
strstr -> string.indexOf();

此外还有许多方法。不同方法之间可以配合产生出很惊艳的效果。java提供的修改字符串方法都可以看成持有一个结构体指针,返回一个结构体的函数。这意味着方法本身的返回值是一个全新的对象。对于java中的查找或匹配字符串方法,也就是在C语言中会返回字符指针的那些函数(strstr),现在会返回一个整数下标。因为这更多的是实践性的问题,这里就不再举更多例子,需要时可以查阅手册。

java中的包裹类型

基本类型对应的包裹类型,是具有一些常用成员的类。他们之间的对应是这样的:

1
2
3
4
int     -> Integer
boolean -> Boolean
double -> Double
char -> Character

包裹类型会含有一些该类型的信息,比如Integer的成员MAX_VALUE就是C语言中的T_MAX,此外还有一些其他的方法,比如Character具有isdigit,isalpha方法。这些都和c语言是完全类似的。

Erpc服务处理过程梳理

RPC的服务端处理过程从传输层开始。传输层收到消息,就会根据约定的传输协议接受信息,然后调用序列化层。序列化解析出请求内容,交由代理层处理。代理层通过对Server和Service的抽象,从请求内容中解析要调用的实际方法和参数,并根据有无返回值/是否需要写回数据做进一步处理。
如果有返回消息的必要,则代理层调用序列化层写入返回内容,调用传输层用协议封装信息并进行发送

代理层服务器开始服务

一个典型的服务器启动之后便开始不断监听可能到来的请求:

1
2
3
4
5
6
7
8
9
erpc_status_t SimpleServer::run(void)
{
erpc_status_t err = kErpcStatus_Success;
while ((err == kErpcStatus_Success) && m_isServerOn)
{
err = runInternal();
}
return err;
}

处理过程大致可以分为两步,一是信息处理,二是请求处理,三是数据写回.在erpc中对应 runInternalBeginrunInternalEnd,前者完成第一步,后者完成后两步.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
erpc_status_t SimpleServer::runInternal(void)
{
MessageBuffer buff;
Codec *codec = NULL;
message_type_t msgType;
uint32_t serviceId;
uint32_t methodId;
uint32_t sequence;

erpc_status_t err = runInternalBegin(&codec, buff, msgType, serviceId, methodId, sequence);
if (err == kErpcStatus_Success)
{
err = runInternalEnd(codec, msgType, serviceId, methodId, sequence);
}

return err;
}

信息处理

Server在处理消息之前,必须获取一些关键的信息用于下一步处理事件.概括来说,一般需要以下几类信息:

  • RPC协议相关
    • 消息类型: 是单向请求信息,双向请求信息还是响应信息或系统通知等
    • RPC协议版本: 用于正确解析信息
    • 序列化方法: 用于正确配置序列化层
    • 请求序列号: 用于异步响应时区分多个请求
  • 代理相关
    • 请求的Service: 请求的包名/类名/服务名等
    • 请求的Method: 请求的具体方法
    • 请求的arguments: 实参
    • 请求的参数类型: 用于重载解析

这些信息有的在解析完之后立刻使用;有的存放起来作为参数在调用栈中传递;有的暂时不解析,而是配置一个codec,到时按需取用.

从代码中可以看到,信息解析的过程主要包括传输层接收,序列化层读取两个过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
erpc_status_t SimpleServer::runInternalBegin(Codec **codec, MessageBuffer &buff, message_type_t &msgType,
uint32_t &serviceId, uint32_t &methodId, uint32_t &sequence)
{
erpc_status_t err = kErpcStatus_Success;
// buffer创建
if (m_messageFactory->createServerBuffer() == true)
{
buff = m_messageFactory->create();
}
// 传输层接收
err = m_transport->receive(&buff);
// 序列化层初始化及读取
*codec = m_codecFactory->create();
(*codec)->setBuffer(buff);
err = readHeadOfMessage(*codec, msgType, serviceId, methodId, sequence);

return err;
}

传输层接收

传输层接受的部分就一句话,简而言之就是将信息读取到buffer中

1
2
// 传输层接收
err = m_transport->receive(&buff);

erpc在这里为所有按帧传输的传输层准备了接受一帧的函数,先接受请求头,读出请求帧的长度,再接受请求体.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
erpc_status_t FramedTransport::receive(MessageBuffer *message)
{
Header h;
erpc_status_t retVal;
uint16_t computedCrc;
{
#if !ERPC_THREADS_IS(NONE)
Mutex::Guard lock(m_receiveLock);
#endif
// Receive header first.
retVal = underlyingReceive((uint8_t *)&h, sizeof(h));
// Receive rest of the message now we know its size.
retVal = underlyingReceive(message->get(), h.m_messageSize);
}

// Verify CRC.
...

message->setUsed(h.m_messageSize);
return retVal;
}

其底层实现为TCPTransport::underlyingReceive,本质上就是对read进行了封装.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
erpc_status_t TCPTransport::underlyingReceive(uint8_t *data, uint32_t size){
ssize_t length;
erpc_status_t status = kErpcStatus_Success;

// Block until we have a valid connection.
while (m_socket <= 0){
Thread::sleep(10000);
}
// Loop until all requested data is received.
while (size > 0U){
length = read(m_socket, data, size);

// Length will be zero if the connection is closed.
if (length > 0){
size -= length;
data += length;
}else{
if (length == 0){
// close socket, not server
close(false);
status = kErpcStatus_ConnectionClosed;
}else{
status = kErpcStatus_ReceiveFailed;
}
break;
}
}
return status;
}

通过这个过程,我们将socket中的信息成功的接收到了buf中

序列化层解析

在erpc中,codec是对一次序列化的封装,一个codec不仅有读写各种数据类型的方法,还持有目前正在处理的buffer和buffer上的cursor,这纯粹是为了语法上的方便,不必在调用每个读写函数时都额外传递一个buffer参数,只需要在开始时设置即可.

1
2
3
4
5
6
7
class Codec
{
protected:
MessageBuffer m_buffer; /*!< Message buffer object */
MessageBuffer::Cursor m_cursor; /*!< Copy data to message buffers. */
erpc_status_t m_status; /*!< Status of serialized data. */
};

因此,codec处理的部分先是创建一个codec并绑定buffer,然后再调用内部处理函数读取请求头

1
2
3
4
// 序列化层初始化及读取
*codec = m_codecFactory->create();
(*codec)->setBuffer(buff);
err = readHeadOfMessage(*codec, msgType, serviceId, methodId, sequence);

这里我们就不详细探究codec的底层实现了,可以想到对字节流做处理的大概方法就是针对数据类型的长度,读取一定的字节,填入到对象指针指向的内存中,并移动buffer上的cursor.

总体来说,在这里值得注意的地方时codec兼具序列化层和RPC传输协议约定的功能,codec的readHeadOfMessage的实现方法其实就对应了协议头如何组织信息.

在默认的codec中,请求头是一个四字节头加上一个int32序列号,四个字节分别表示codecVersion, serviceId, methodId, requestType(不过很明显,先构建codec再读取头信息已经太迟了,所以当他检测到不匹配的codecversion时没法处理,只能直接报错)

1
2
3
4
5
6
7
8
9
10
11
12
13
void BasicCodec::startReadMessage(message_type_t *type, uint32_t *service, uint32_t *request, uint32_t *sequence){
uint32_t header;
read(&header);
if (((header >> 24) & 0xffU) != kBasicCodecVersion){
updateStatus(kErpcStatus_InvalidMessageVersion);
}
if (isStatusOk()){
*service = ((header >> 16) & 0xffU);
*request = ((header >> 8) & 0xffU);
*type = static_cast<message_type_t>(header & 0xffU);
read(sequence);
}
}

请求处理

在经过了信息处理之后,我们已经成功的将传输层中的一帧请求接收到buffer中,获得了这个请求的rpc协议参数,接下来要做的就是调用合适的处理程序来完成实际的处理工作.erpc中,这个函数正是Server::processMessage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
erpc_status_t SimpleServer::runInternalEnd(Codec *codec, message_type_t msgType, uint32_t serviceId, uint32_t methodId,
uint32_t sequence){
//请求处理
erpc_status_t err = processMessage(codec, msgType, serviceId, methodId, sequence);

//消息写回
if (err == kErpcStatus_Success){
if (msgType != kOnewayMessage){
err = m_transport->send(codec->getBuffer());
}
}
// Dispose of buffers and codecs.
disposeBufferAndCodec(codec);
return err;
}

通过阅读源码,我们看到请求处理分为两个阶段:服务发现和方法调用

1
2
3
4
5
6
7
8
9
10
11
erpc_status_t Server::processMessage(Codec *codec, message_type_t msgType, uint32_t serviceId, uint32_t methodId,
uint32_t sequence){
erpc_status_t err = kErpcStatus_Success;
Service *service;
//服务发现
service = findServiceWithId(serviceId);
//方法调用
err = service->handleInvocation(methodId, sequence, codec, m_messageFactory);

return err;
}

服务发现

服务发现的过程就是在服务器已注册的服务中寻找能响应请求的那个,一些语言会使用map来完成这个事情,而在erpc中直接使用了serviceId+methodId, 如果idl确定的话,对于S/C端而言,这个id号是可以唯一确定有效服务的.

在服务端,服务用链表存储:

1
2
3
4
5
6
7
8
9
10
11
12
13
class Service
{
public:
 
    uint32_t getServiceId(void) const { return m_serviceId; }
    Service *getNext(void) { return m_next; }
    void setNext(Service *next) { m_next = next; }
    virtual erpc_status_t handleInvocation(uint32_t methodId, uint32_t sequence, Codec *codec,
                                           MessageBufferFactory *messageFactory) = 0;
protected:
    uint32_t m_serviceId; /*!< Service unique id. */
    Service *m_next;      /*!< Pointer to next service. */
};

服务器在解析信息时通过遍历链表查找id对应的服务,然后调用对应的处理函数:

1
2
3
4
5
6
7
8
9
erpc_status_t Server::processMessage(Codec *codec, message_type_t msgType, uint32_t serviceId, uint32_t methodId,
                                     uint32_t sequence)
{
    erpc_status_t err = kErpcStatus_Success;
    Service *service;
    service = findServiceWithId(serviceId);
    err = service->handleInvocation(methodId, sequence, codec, m_messageFactory);
    return err;
}

方法调用

Server::processMessage调用的Service::handleInvocation是由erpcgen工具根据我们定义的IDL自动生成的.
在.erpc中声明service及其方法, 其中interface关键字后跟服务名, 花括号内可以有若干个方法.

1
2
3
4
interface MatrixMultiplyService // cover functions for same topic
{
    erpcMatrixMultiply(in Matrix matrix1, in Matrix matrix2, out Matrix result_matrix) -> void
}

然后执行下列命令:

1
erpcgen idl.erpc -g c

即可得到生成的文件

1
2
3
4
erpc_matrix_multiply_client.c++
erpc_matrix_multiply_server.c++
erpc_matrix_multiply_server.h
erpc_matrix_multiply.h

其中,erpc_matrix_multiply_server.c++就有服务处理函数的定义.

1
2
3
4
5
6
7
8
9
10
11
12
13
erpc_status_t MatrixMultiplyService_service::handleInvocation(uint32_t methodId, uint32_t sequence, Codec * codec, MessageBufferFactory *messageFactory)
{
    erpc_status_t erpcStatus;
    switch (methodId){
        case kMatrixMultiplyService_erpcMatrixMultiply_id:
            erpcStatus = erpcMatrixMultiply_shim(codec, messageFactory, sequence);
            break;
        default:
            erpcStatus = kErpcStatus_InvalidArgument;
            break;
    }
    return erpcStatus;
}

可以看到, 处理函数进一步通过methodId判断该调用Service中的哪个方法的shim, 这个shim就是方法对应的代理.
代理完成的内容就是组织codec进行反序列化, 获取入参,然后对out类型的数据进行序列化写回, 返回值也一并写回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
erpc_status_t MatrixMultiplyService_service::erpcMatrixMultiply_shim(Codec * codec, MessageBufferFactory *messageFactory, uint32_t sequence)
{
    erpc_status_t err = kErpcStatus_Success;

    Matrix matrix1;
    Matrix matrix2;
    Matrix result_matrix;

    for (uint32_t arrayCount0 = 0U; arrayCount0 < 2U; ++arrayCount0){
        for (uint32_t arrayCount1 = 0U; arrayCount1 < 2U; ++arrayCount1){
            codec->read(&matrix1[arrayCount0][arrayCount1]);
        }
    }

    for (uint32_t arrayCount0 = 0U; arrayCount0 < 2U; ++arrayCount0){
        for (uint32_t arrayCount1 = 0U; arrayCount1 < 2U; ++arrayCount1){
            codec->read(&matrix2[arrayCount0][arrayCount1]);
        }
    }
    // preparing codec for serializing data
    codec->reset();

//调用实际方法
erpcMatrixMultiply(matrix1, matrix2, result_matrix);

    // Build response message.
    codec->startWriteMessage(kReplyMessage, kMatrixMultiplyService_service_id, kMatrixMultiplyService_erpcMatrixMultiply_id, sequence);

    for (uint32_t arrayCount0 = 0U; arrayCount0 < 2U; ++arrayCount0){
        for (uint32_t arrayCount1 = 0U; arrayCount1 < 2U; ++arrayCount1){
                codec->write(result_matrix[arrayCount0][arrayCount1]);
        }
    }

    return err;
}

数据写回

首先需要注意的是,在调用实际方法之前,对codec进行了重置:

1
2
    // preparing codec for serializing data
    codec->reset();

这个重置函数将内部的buffer和cursor恢复到使用之前的状态,相当于将这个读缓冲区又作为写缓冲区复用.在写回过程中,序列化首先调用codec的startWriteMessage方法写回响应头.和我们之前谈到的一样,这一步也是和RPC协议相关的,即codec写入头消息的方式就是rpc协议的方式.

1
2
3
4
5
6
7
8
9
void BasicCodec::startWriteMessage(message_type_t type, uint32_t service, uint32_t request, uint32_t sequence)
{
    uint32_t header =
        (kBasicCodecVersion << 24u) | ((service & 0xffu) << 16u) | ((request & 0xffu) << 8u) | ((uint32_t)type & 0xffu);

    write(header);

    write(sequence);
}

然后, 调用Codec的write方法.在这里,对于数组,erpcgen将会自动生成对应的循环结构来写入.

1
2
3
4
5
6
7
8
9
    ...    
for (uint32_t arrayCount0 = 0U; arrayCount0 < 2U; ++arrayCount0)
        {
            for (uint32_t arrayCount1 = 0U; arrayCount1 < 2U; ++arrayCount1)
            {
                codec->write(result_matrix[arrayCount0][arrayCount1]);
            }
        }
...

Codec对c++的所有数据类型重载一个write方法, 这个方法是对writeData的封装, 后者将按照数据的值和长度传递写入数据

1
2
3
4
5
void BasicCodec::writeData(const void *value, uint32_t length){
    if (isStatusOk()){
        m_status = m_cursor.write(value, length);
    }
}

cursor将作为buffer的内部类完成信息的写入,就是将信息复制到缓冲区里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
erpc_status_t MessageBuffer::Cursor::write(const void *data, uint32_t length){
    erpc_status_t err = kErpcStatus_Success;
    if (length > 0U){
        if (data == NULL){
            err = kErpcStatus_MemoryError;
        }else if (length > getRemaining()){
            err = kErpcStatus_BufferOverrun;
        }else{
            (void)memcpy(m_pos, data, length);
            m_pos += length;
            m_buffer->setUsed(m_buffer->getUsed() + length);
        }
    }
    return err;
}

完成write之后shim就会直接返回,之后可以直接利用send发送,因为我们在生成的shim函数中已经完成了需要写回的数据的序列化

1
2
3
4
5
6
//数据发送
if (err == kErpcStatus_Success){
if (msgType != kOnewayMessage){
err = m_transport->send(codec->getBuffer());
}
}

Thrift

Thrift 是用于点对点 RPC 实现的轻量级、跨语言的软件栈。Thrift,为传输、序列化、应用级逻辑都提供了实现,同时具有从idl生成代码的功能。

Thrift将架构分为6层,每层都可以自由组合,以适应不同的需求。
1

架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
├── ...(一些构建相关)
├── compiler
│   └── cpp
├── debian
├── lib #代码实现
│   ├── c_glib/src/thrift
| ├── processor #处理请求
| │   ├── thrift_dispatch_processor.c #实现,使用glib-object完成面向对象
| │   ├── thrift_dispatch_processor.h
| │   ├── thrift_multiplexed_processor.c #提供多个处理器
| │   ├── thrift_multiplexed_processor.h
| │   ├── thrift_processor.c #接口
| │   └── thrift_processor.h
| ├── protocol
| │   ├── thrift_binary_protocol.c #二进制协议的实现,把数据转化为合适的形式(bool转化为uint8, double转化为IEEE int64等)
| │   ├── thrift_binary_protocol.h
| │   ├── thrift_binary_protocol_factory.c
| │   ├── thrift_binary_protocol_factory.h
| │   ├── thrift_compact_protocol.c
| │   ├── thrift_compact_protocol.h
| │   ├── thrift_compact_protocol_factory.c
| │   ├── thrift_compact_protocol_factory.h
| │   ├── thrift_multiplexed_protocol.c
| │   ├── thrift_multiplexed_protocol.h
| │   ├── thrift_protocol.c #写入数据,`THRIFT_PROTOCOL_GET_CLASS (protocol)->write`
| │   ├── thrift_protocol.h
| │   ├── thrift_protocol_decorator.c #用decorator模式提供了多协议`THRIFT_PROTOCOL_GET_CLASS (self->concrete_protocol)->write_struct_begin (self->concrete_protocol,name, error)`
| │   ├── thrift_protocol_decorator.h
| │   ├── thrift_protocol_factory.c
| │   ├── thrift_protocol_factory.h
| │   ├── thrift_stored_message_protocol.c
| │   └── thrift_stored_message_protocol.h
| ├── server
| │   ├── thrift_server.c #服务器抽象类
| │   ├── thrift_server.h
| │   ├── thrift_simple_server.c #简单实现
| │   └── thrift_simple_server.h
| ├── thrift.c
| ├── thrift.h
| ├── thrift_application_exception.c
| ├── thrift_application_exception.h
| ├── thrift_configuration.c
| ├── thrift_configuration.h
| ├── thrift_struct.c
| ├── thrift_struct.h
| └── transport
| ├── thrift_buffered_transport.c
| ├── thrift_buffered_transport.h
| ├── thrift_buffered_transport_factory.c
| ├── thrift_buffered_transport_factory.h
| ├── thrift_fd_transport.c
| ├── thrift_fd_transport.h
| ├── thrift_framed_transport.c
| ├── thrift_framed_transport.h
| ├── thrift_framed_transport_factory.c
| ├── thrift_framed_transport_factory.h
| ├── thrift_memory_buffer.c
| ├── thrift_memory_buffer.h
| ├── thrift_platform_socket.h
| ├── thrift_server_socket.c
| ├── thrift_server_socket.h
| ├── thrift_server_transport.c
| ├── thrift_server_transport.h
| ├── thrift_socket.c
| ├── thrift_socket.h
| ├── thrift_ssl_socket.c
| ├── thrift_ssl_socket.h
| ├── thrift_transport.c
| ├── thrift_transport.h
| ├── thrift_transport_factory.c
| ├── thrift_transport_factory.h
| ├── thrift_zlib_transport.c
| ├── thrift_zlib_transport.h
| ├── thrift_zlib_transport_factory.c
| └── thrift_zlib_transport_factory.h
| ...
|
├── contrib #一些实例,但不是官方测试用例
├── test #测试用例
├── doc
│   ├── specs #一些格式信息
| ├── HeaderFormat.md #请求头格式
| ├── SequenceNumbers.md #序列号(用于在一个链接中异步处理多个请求)
| ├── idl.md #接口定义语言
| ├── thrift-binary-protocol.md #二进制协议, TLV格式
| ├── thrift-compact-protocol.md #压缩方式 zigzag压缩整数, 其他的似乎编码没有太大不同?
| ├── thrift-parameter-validation-proposal.md #参数验证,也就是通过某些方式限定参数的取值范围.比如int32类型必须取1,2,4,或者限定某个字符串长度必须大于4
| ├── thrift-protocol-spec.md #BNF
| ├── thrift-rpc.md #框架整体简介
| ├── thrift-sasl-spec.txt #(Simple Authentication and Security Layer)
| ├── thrift-tconfiguration.md #一些设置
| └── thrift.tex
│   └── ... #没啥用
└── tutorial #教程
├── c_glib
...
└── swift

特色

Thrift 专门设计用于支持跨客户端和服务器代码的非原子版本更改。这使您可以升级服务器,同时仍然能够为旧客户端提供服务;或让较新的客户端向较旧的服务器发出请求。

Motan

Motan是一套基于java开发的RPC框架,除了常规的点对点调用外,Motan还提供服务治理功能,包括服务节点的自动发现、摘除、高可用和负载均衡等。

Motan具有良好的扩展性,主要模块都提供了多种不同的实现,例如支持多种注册中心,支持多种rpc协议等。

架构概述

2
默认序列化: Hessian2, 默认传输层: Netty NIO TCP长连接

  • register:
    用来和注册中心进行交互,包括注册服务、订阅服务、服务变更通知、服务心跳发送等功能;Server端会在系统初始化时通过register模块注册服务,Client端在系统初始化时会通过register模块订阅到具体提供服务的Server列表,当Server 列表发生变更时也由register模块通知Client。

  • Cluster:
    cluster是一组可用的Server在逻辑上的封装,包含若干可以提供RPC服务的Server,实际请求时会根据不同的高可用与负载均衡策略选择一个可用的Server发起远程调用。

    在进行RPC请求时,Client通过代理机制调用cluster模块,cluster根据配置的HA和LoadBalance选出一个可用的Server,通过serialize模块把RPC请求转换为字节流,然后通过transport模块发送到Server端。

代码结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
├── closable
├── cluster
│   ├── ha
│   ├── loadbalance
│   └── support
├── codec
├── common
├── config
│   ├── annotation
│   └── handler
├── core
│   └── extension
├── exception
├── filter
├── log
├── protocol
│   ├── injvm
│   ├── mock
│   ├── rpc
│   ├── support
│   └── v2motan
├── proxy
│   └── spi
├── registry #服务注册:发现注册/变更通知/流量配置/失败返回
│   └── support
│   └── command
├── rpc #奇怪的工具类 Callbackable/Future/Node/Refer/Request/...Response
│   └── init
├── serialize #FastJson, Hessian2, Breeze, Simple
├── switcher
├── transport #Channel/Transport/Server绑定/Clent心跳/..
│   ├── async
│   └── support
└── util #Math/Net/Stats/Reflect/....

Dubbo

  • 基于透明接口的 RPC
  • 智能负载均衡
  • 自动服务注册和发现
  • 高扩展性
  • 运行时流量路由
  • 可视化服务治理

架构

3

代码架构和设计内容像详解

Volo调研

Volo 是字节跳动服务框架团队研发的轻量级、高性能、可扩展性强、易用性好的 Rust RPC 框架,使用了 Rust 最新的 GAT 特性。

特色-易用性

具体来说,这个框架使用了motore中间件, 而其使用GAT语法完成了高效的异步接口

Rust不支持一个async trait, 一般的解决方法是使用Box(类似智能指针)来完成这件事情, 但这会带来额外的开销并且降低代码可读性. 有一个使用宏完成用Box实现异步接口的Crate

具体细节可能需要进一步理解一些Rust的语言特性

架构图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
├─volo          RPC框架的通用组件
│ └─src
│ ├─discovery #服务发现:一些接口,一种简单实现(返回静态单链表)
│ ├─loadbalance #负载均衡:同上(加权随机)
│ ├─net #网络链接: probe创建套接字,dial建立连接,incomming进行监听,conn进行读写
│ └─util #工具: buf_reader
| #
├─volo-build #编译用
│ └─src
├─volo-cli #用户界面
│ └─...
├─volo-grpc #grpc框架
│ └─src
│ ├─client #客户端底层组件
│ ├─codec #编解码器
│ ├─layer #待调研,类似某种封装tonic/src/metadata
│ │ └─loadbalance
│ ├─metadata
│ ├─server #服务端底层组件
│ └─transport
... #从tonic/src/中copy并修改了一些文件,完成rpc的请求响应相关工作
├─volo-macros #宏
│ └─src
└─volo-thrift #thrift框架
└─src
├─client
│ └─layer
├─codec
├─protocol
└─transport
├─pingpong
└─pool

如何用Volo的特色实现grpc
架构图

erpc

传输层(串口),嵌入式,代码生成(结合nanopb)

erpc项目地址及其详细文档

侧重点

嵌入式,轻量化

框架图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
├─erpcgen           #代码生成工具
├─erpcsniffer
│ └─src
├─erpc_c #基础架构
│ ├─config #config.h配置文件
│ └─infra
| ├─ erpc_arbitrated_client_manager.cpp
| ├─ erpc_arbitrated_client_manager.hpp
| ├─ erpc_basic_codec.cpp #基础编解码器
| ├─ erpc_basic_codec.hpp
| ├─ erpc_client_manager.cpp
| ├─ erpc_client_manager.h
| ├─ erpc_client_server_common.hpp
| ├─ erpc_codec.hpp #编解码器抽象定义
| ├─ erpc_common.h #状态码枚举类型的定义
| ├─ erpc_crc16.cpp
| ├─ erpc_crc16.hpp
| ├─ erpc_framed_transport.cpp
| ├─ erpc_framed_transport.hpp
| ├─ erpc_manually_constructed.hpp
| ├─ erpc_message_buffer.cpp
| ├─ erpc_message_buffer.hpp
| ├─ erpc_message_loggers.cpp
| ├─ erpc_message_loggers.hpp
| ├─ erpc_pre_post_action.cpp
| ├─ erpc_pre_post_action.h
| ├─ erpc_server.cpp
| ├─ erpc_server.hpp #Service和Server的抽象接口,注册移除服务,处理信息等
| ├─ erpc_simple_server.cpp #
| ├─ erpc_simple_server.hpp
| ├─ erpc_static_queue.hpp #数组实现的队列
| ├─ erpc_transport.hpp #Transport
| ├─ erpc_transport_arbitrator.cpp
| ├─ erpc_transport_arbitrator.hpp
| ├─ erpc_version.h
| ├─ infra.dox
│ ├─port #便于移植
│ ├─setup #C语言接口
│ └─transports #支持不同通信方法的工具类
├─erpc_python
│ └─erpc
...
  • rtos上的 protobuf-c
  • erpc 传输层 nanopb
  • volo 语法特性
  • motan dubbo 事件逻辑

这篇文章主要分析了grpc-go服务端的启动过程,重点考察了服务端的建立、注册、监听
等关键的生命周期对应的代码实现。
目前阶段主要考察普通的rpc调用,暂时没有研究流式传输。

首先来看服务端examples/helloworld/greeter_server/main.go的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func main() {
// 解析命令行参数,主要是port
flag.Parse()

// 新建一个对本地端口的监听
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

// 新建grpc服务器
s := grpc.NewServer()

// 注册protobuf中的服务
pb.RegisterGreeterServer(s, &server{})
log.Printf("server listening at %v", lis.Addr())

// 启动监听
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}

可以看到, main函数中和server相关的操作主要有三步:
(1)创建 server
(2)server 的注册
(3)调用 Serve 监听端口并处理请求

Server的创建

这里调用的函数就是\server.go#NewServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func NewServer(opt ...ServerOption) *Server {
//用option模式指定各个服务器选项
opts := defaultServerOptions
for _, o := range extraServerOptions {
o.apply(&opts)
}
for _, o := range opt {
o.apply(&opts)
}

//构建服务器
s := &Server{
lis: make(map[net.Listener]bool),
opts: opts,
conns: make(map[string]map[transport.ServerTransport]bool),
services: make(map[string]*serviceInfo),
quit: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
czData: new(channelzData),
}

// 配置拦截器
chainUnaryServerInterceptors(s)
chainStreamServerInterceptors(s)

// 同步相关(优雅退出时发送信号)
s.cv = sync.NewCond(&s.mu)

// 调试相关 打印调用信息
if EnableTracing {
_, file, line, _ := runtime.Caller(1)
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
}

// 并发相关
if s.opts.numServerWorkers > 0 {
s.initServerWorkers()
}

// channelz(一个调试工具)相关
s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
channelz.Info(logger, s.channelzID, "Server created")

return s
}

一个Server结构体包括了一系列网络通讯和同步相关的内容,通常是使用了sync包中的功能或利用通道完成各种同步操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Server is a gRPC server to serve RPC requests.
type Server struct {
opts serverOptions

mu sync.Mutex // guards following

lis map[net.Listener]bool

// 监听地址到 transports的映射
conns map[string]map[transport.ServerTransport]bool
// 是否在服务
serve bool
// ...
drain bool
// 优雅退出时进行广播
cv *sync.Cond
// 核心:服务名到服务信息的映射
services map[string]*serviceInfo
// 日志
events trace.EventLog

// 同步相关
quit *grpcsync.Event
done *grpcsync.Event
channelzRemoveOnce sync.Once
serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop

channelzID *channelz.Identifier
czData *channelzData

serverWorkerChannels []chan *serverWorkerData
}

用于同步的grpc.Event

对于Event类型的quit和done,这里简单的分析一下

1
2
3
4
5
6
// Event represents a one-time event that may occur in the future.
type Event struct {
fired int32
c chan struct{}
o sync.Once
}

Event可以被并发地多次触发。一旦被触发,e.c将被关闭,从而所有试图从e.c中接受一个值的协程将从阻塞中恢复,这起到了一对多通知的效果。o是为了防止e.c被多次关闭而引发panic。

核心部分:map[string]

而这一部分代码中的核心还是

1
services map[string]*serviceInfo

通过服务名,我们可以直接获取服务相关的信息,主要也是两个map,通过名称可以分别获取stream和method的描述(Desc)。每个描述都包含了name和handler

1
2
3
4
5
6
7
8
9
// serviceInfo wraps information about a service. It is very similar to
// ServiceDesc and is constructed from it for internal purposes.
type serviceInfo struct {
// Contains the implementation for the methods in this service.
serviceImpl interface{}
methods map[string]*MethodDesc
streams map[string]*StreamDesc
mdata interface{}
}

总体来说,在调用过程中Server的关键结构是这样的:

grpc-go源码分析

Server注册

main函数中的代码如下:

1
2
// 注册protobuf中的服务
pb.RegisterGreeterServer(s, &server{})

需要先考察一下server是一个什么样的结构

自定义的服务器实现:Server类型

examples/helloworld/greeter_server/main.go中:

1
2
3
4
5
6
7
8
9
10
// server is used to implement helloworld.GreeterServer.
type server struct {
pb.UnimplementedGreeterServer
}

// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("Received: %v", in.GetName())
return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}

它实际上就是对proto中定义的服务端的一个实现, 注意接口中要求的方法mustEmbedUnimplementedGreeterServer()是在protobuf的生成文件pb中的pb.UnimplementedGreeterServer中实现的,这是为了从语法上要求server的实现中必须包含pb.UnimplementedGreeterServer

1
2
3
4
5
6
7
8
// GreeterServer is the server API for Greeter service.
// All implementations must embed UnimplementedGreeterServer
// for forward compatibility
type GreeterServer interface {
// Sends a greeting
SayHello(context.Context, *HelloRequest) (*HelloReply, error)
mustEmbedUnimplementedGreeterServer()
}

至于为什么必须包含这个奇怪的结构体,可能是因为这个结构体中包含了一个默认的SayHello方法,这样即使我们忘了实现SayHello方法,也能让server实现GreeterServer接口。

1
2
3
4
5
func (UnimplementedGreeterServer) SayHello(context.Context, *HelloRequest) (*HelloReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method SayHello not implemented")
}
func (UnimplementedGreeterServer) mustEmbedUnimplementedGreeterServer() {}

总而言之,Server就是一个我们自己实现的服务器。包含了我们在proto中声明的方法。

Register调用分析

main函数中直接调用的方法是:

1
2
// 注册protobuf中的服务
pb.RegisterGreeterServer(s, &server{})

server 的注册调用了 RegisterGreeterServer 方法,这个方法是examples/helloworld/helloworld/helloworld_grpc.pb.go中的:

1
2
3
func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {
s.RegisterService(&Greeter_serviceDesc, srv)
}

这个方法调用了 server 的 RegisterService 方法,然后传入了一个 ServiceDesc 的数据结构,如下 :

1
2
3
4
5
6
7
8
9
10
11
12
var Greeter_ServiceDesc = grpc.ServiceDesc{
ServiceName: "helloworld.Greeter",
HandlerType: (*GreeterServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "SayHello",
Handler: _Greeter_SayHello_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "examples/helloworld/helloworld/helloworld.proto",
}

可以看到,这个结构体的结构serviceInfo的结构是吻合的:
grpc

下面来看RegisterService函数的实现,核心的内容在就是检查完类型之后调用register将sd中的信息注入到seviceinfo结构体中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// RegisterService registers a service and its implementation to the gRPC
// server. It is called from the IDL generated code. This must be called before
// invoking Serve. If ss is non-nil (for legacy code), its type is checked to
// ensure it implements sd.HandlerType.
func (s *Server) RegisterService(sd*ServiceDesc, ss interface{}) {
// 检查类型
if ss != nil {
ht := reflect.TypeOf(sd.HandlerType).Elem()
st := reflect.TypeOf(ss)
if !st.Implements(ht) {
logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
}
}
s.register(sd, ss)
}

func (s *Server) register(sd *ServiceDesc, ss interface{}) {
// 加锁
s.mu.Lock()
defer s.mu.Unlock()
// 打印日志
s.printf("RegisterService(%q)", sd.ServiceName)
// 检查异常
if s.serve {
logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
}
if _, ok := s.services[sd.ServiceName]; ok {
logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
}

// 将sd中的内容注入到serviceinfo中,并将ss类型保存为serviceImpl
info := &serviceInfo{
serviceImpl: ss,
methods: make(map[string]*MethodDesc),
streams: make(map[string]*StreamDesc),
mdata: sd.Metadata,
}
for i := range sd.Methods {
d := &sd.Methods[i]
info.methods[d.MethodName] = d
}
for i := range sd.Streams {
d := &sd.Streams[i]
info.streams[d.StreamName] = d
}
s.services[sd.ServiceName] = info
}

server 对不同 rpc 请求的处理,也是根据 service 中不同的 serviceName 去 service map 中取出不同的 handler 进行处理,这样相当于完成了grpc的代理操作,把字符串传递给代理,代理就能调用对应的实际方法去处理。

Sever服务过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
func (s *Server) Serve(lis net.Listener) error {
//加锁
s.mu.Lock()
//打印日志
s.printf("serving")
//更新状态
s.serve = true

//检查是否已经关闭
if s.lis == nil {
// Serve called after Stop or GracefulStop.
s.mu.Unlock()
lis.Close()
return ErrServerStopped
}

// 并发相关
s.serveWG.Add(1)
defer func() {
s.serveWG.Done()
if s.quit.HasFired() {
// Stop or GracefulStop called; block until done and return nil.
<-s.done.Done()
}
}()

/* 注册端口监听
type listenSocket struct {
net.Listener
channelzID *channelz.Identifier
}
*/
ls := &listenSocket{Listener: lis}
s.lis[ls] = true

//(在退出时)注销端口监听
defer func() {
s.mu.Lock()
if s.lis != nil && s.lis[ls] {
ls.Close()
delete(s.lis, ls)
}
s.mu.Unlock()
}()

//channelz相关
var err error
ls.channelzID, err = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
if err != nil {
s.mu.Unlock()
return err
}

//解锁,并发操作完成
s.mu.Unlock()
channelz.Info(logger, ls.channelzID, "ListenSocket created")

var tempDelay time.Duration // how long to sleep on accept failure

// 死循环,用accept监听
for {
rawConn, err := lis.Accept()
//错误检查
//https://openskill.cn/article/1792
if err != nil {
//判断是否是临时错误
if ne, ok := err.(interface {
Temporary() bool
});
/*类型断言,利用短逻辑避免调用不存在的方法*/
ok && ne.Temporary() {
// 试图恢复,等待一段时间
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
// 打印日志
s.mu.Lock()
s.printf("Accept error: %v; retrying in %v", err, tempDelay)
s.mu.Unlock()

// 等待,如果此时服务退出就不再等待直接返回
timer := time.NewTimer(tempDelay)
select {
case <-timer.C:
case <-s.quit.Done():
timer.Stop()
return nil
}
continue
}

//等待多次之后仍有错误,打印日志
s.mu.Lock()
s.printf("done serving; Accept = %v", err)
s.mu.Unlock()

// 检查是否退出
if s.quit.HasFired() {
return nil
}

// 返回错误信息
return err
}
tempDelay = 0
// Start a new goroutine to deal with rawConn so we don't stall this Accept
// loop goroutine.
//
// Make sure we account for the goroutine so GracefulStop doesn't nil out
// s.conns before this conn can be added.
s.serveWG.Add(1)
go func() {
s.handleRawConn(lis.Addr().String(), rawConn)
s.serveWG.Done()
}()
}
}

最终这个函数会把端口地址和通过Accept得到的连接传递给handle函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// handleRawConn forks a goroutine to handle a just-accepted connection that
// has not had any I/O performed on it yet.
func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
// 检查是否退出
if s.quit.HasFired() {
rawConn.Close()
return
}
// 设置一次IO操作的最大时间,如果超过直接失败
// 这里是用来限制连接时间的
rawConn.SetDeadline(time.Now().Add(s.opts. connectionTimeout))

// Finish handshaking (HTTP2)
st := s.newHTTP2Transport(rawConn)

// 完成连接之后取消时长限制
rawConn.SetDeadline(time.Time{})
if st == nil {
return
}

if !s.addConn(lisAddr, st) {
return
}
go func() {
s.serveStreams(st)
s.removeConn(lisAddr, st)
}()
}

Http2握手

handle函数处理连接的第一步就是完成HTTP2的握手

1
2
3
4
5
6
7
8
9
10
11
12
13
// newHTTP2Transport sets up a http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go).
func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
config := &transport.ServerConfig{
...
}
st, err := transport.NewServerTransport(c, config)
if err != nil {
...
}

return st
}

核心代码如下:internal/transport/http2_server.go

Application类

application是应用程序中除了activity之外的另一个context,顾名思义,它就像是“应用程序”本身。因此,它拥有与应用程序相同的生命周期,并且拥有对全局资源的访问权限,此外它还可以监控Activity的声明周期。

Application类的注册

一个应用程序只能有一个application实例,默认情况下会自动创建一个。如果要自己编写一个,就需要让一个类继承Application,并且在androidmanifest文件中为application添加android:name属性:

1
2
3
public class App extends Application {
...
}

android:name设置为自己编写的类名。

1
2
3
4
5
<application
android:name=".App"
... >
...
</application>

这样,应用程序就会以App类作为Application实例。

利用Application获取/传递全局资源

由于Application也是Context的一种,因此它也可以解析R文件中的内容。例如在res/values/strings.xml中添加:

1
<string name="qaq">qaq</string>

就可以使用Application直接进行访问,首先定义getApp()方法:

1
2
3
private App getApp() {
return (App) getApplicationContext();
}

然后在Activity中调用,可以看到解析全局资源呈现出来的内容。

1
textView.setText("资源字符串的内容是: " + getApp().getString(R.string.qaq));

另一种方法是利用应用程序只有一个application,并且每个activity都可以通过调用getApplicationContext()获得application的特性,在App类中添加字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14

public class App extends Application {

private String textData = "default";

public String getTextData() {
return textData;
}

public void setTextData(String textData) {
this.textData = textData;
}
}

上述代码添加了字段textData和它的访问器与修改器,可以在IDEA或AS中通过code->generate->getter and setter来自动生成,注意鼠标光标必须位于类定义的大括号内,否则generate可能只有copyright选项

接下来,可以编写两个activity,在一个activity中设置,启动另一个activity,再在另一个activity中显示。这部分内容就是之前讲过的了:
在MainActivity.java中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main1);

textView = findViewById(R.id.textView);
editText = findViewById(R.id.editText);

findViewById(R.id.button).setOnClickListener(
new View.OnClickListener() {
@Override
public void onClick(View v) {
getApp().setTextData(editText.getText().toString());
textView.setText("共享的数据是: " + editText.getText().toString());
}
}
);
findViewById(R.id.button2).setOnClickListener(
new View.OnClickListener() {
@Override
public void onClick(View v) {
startActivity(new Intent(MainActivity.this, MainActivity2.class));
}
}
);

}

在MainActivity2.java中:

1
2
3
4
5
6
7
8
9
10
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main2);

textView = findViewById(R.id.textView);

textView.setText("共享的数据是: " + getApp().getString(R.string.qaq));

}

Application生命周期

启动应用程序时,首先创建Application并调用其onCreate()方法。关闭程序时,如果在虚拟环境下会回调onTerminate()方法。注意,Application的onCreate比Activity的onCreate先被调用,这个特性使得Application的onCreate成为初始化内容的好时机。
在应用的运行过程中,Application还有如下几个生命周期:onLowMemory(),onTrimMemory(),onConfigurationChanges(),分别对应于低内存、内存清理和配置改变,程序可以做相应的资源释放和布局改变来响应这些事件。

从多个Activity启动应用程序

可以让两个Activity在AndroidManifest中都包含下列内容

1
2
3
4
5
<intent-filter>
<action android:name="android.intent.action.MAIN" />

<category android:name="android.intent.category.LAUNCHER" />
</intent-filter>

这样会为应用程序生成两个图标,分别从不同的Activity启动。
不知道有什么用(

网络通信

网络请求框架对比:

  • HttpURLConnection
  • Volley
    • 停更
    • 不适合上传下载文件
  • OkHttp
    • 支持大文件上传下载
    • 性能更好
    • 一般需要二次封装
  • Retrofit
    • 可以通过注解配置请求
    • 可以搭配转换器解析数据,支持jackjson,pb等

Retrofit简介

Retrofit其实是对OkHttp的一个封装,使用Retrofit库的基本流程包括引用、创建用于描述网络请求的接口、使用Retrofit实例发起网络请求。

场景:客户端知道用户uid,要在服务端查询用户姓名,通过Retrofit实现

  • 接口:https://www.bytedane.com/{uid}/name
  • 类型:GET请求
  • 接口返回:
1
2
3
4
5
6
7
8
9
{
"message": "success",
"data" : {
"uid": "1123",
"first_name":"张",
"last_name":"三丰"
}

}

1.导入dependencies依赖

1
2
3
dependencies {
implementation 'com.squareup.retrofit2:retrofit:2.4.0'
}

2.创建用于描述网络请求的接口

1
2
3
4
5
6
7
interface IUserInfoService {

@GET("users/{uid}/name")
fun getUserName(@Path("uid") uid: Int): Call<ResponseBody>

...
}
  • 接口类名:和请求的含义相关
  • 函数名:识别出该接口的作用,该Interface里可以增加多个不同的函数
  • @GET注解:指定该接口的相对路径,用get方法发起请求
  • @Path注解:需要外部调用时,用传入的uid替换注解里的{uid}
  • 返回值Call<ResponseBody>:可以直接转换把Sring转换为Model,这里就转换为User
  • ResponseBody:根据返回内容定义的类,应当包含所有字段和一些输出方法,response.body()是该类的一个实例
  1. 发起网络请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
fun getUserName(View) {

//创建Retrofit实例
val retrofit = Retrofit.Builder()
.baseUrl("https://www.bytedance.com/")//请求Url地址
.build()

//创建 网络请求接口 的实例
val iUserInterface = retrofit.create(IUserInfoService::class.java)
val call = iUserInterface.getUserName(1123)

//入队异步请求
call.enqueue(object: Callback<ResponseBody> {

override fun onResponse(call: Call<ResponseBody>,response: Response<ResponseBody>) {
request_result_tv.text = "请求成功:" + response.body()!!.string();
}

override fun onFailure(call: Call<ResponseBody>, e: Throwable) {
request_result_tv.text = "请求失败" + e.message
}
})
}

Retrofit注解

  • 注解简介
    注解可以作用在类、方法、参数、成员变量上,并且可以在合适的时机读取注解并进行替换。
    注解的处理一般有三个时机(Retention):

1.SOURCE: 只在源码有效
2.CLASS: 编译时用注解处理器处理
3.RUNTIME: 运行时处理

  • 注解定义
1
2
3
4
5
@Target(METHOD)//作用对象
@Retention(RUNTIME)//生命周期
public @interface GET {
String value() default "";
}
  • 注解使用

可以利用Method类的接口来获取注解的内容。配合动态代理可以获取方法和参数的注解,构造Request对象。

1
2
3
4
public static Object newProxyInstance(ClasslLoader loader, 
Class<?>[] interfaces,
InvocationHandler h)
throws IllegalArgumentException
1
2
3
4
public interface InvocationHandler{
public Object invoke(Object proxy, Method method, Object[] args)
throw Throwable;
}

之后可以通过Method.getAnnotaion()Method.getParameterAnnotation()来获取方法和方法参数的注解内容。

代码分析

Retrofit的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Retrofit retrofit = new Retrofit.builder()
.baseUrl("http://www.bytedance.com/")
.addConverterFactory(GsonConverterFactory.create())//利用gson转换器解析json,需要添加gson依赖
.build();

IUserInfoService iUserInterface = retrofit.create(IUserInfoService.class);

retrofit2.Call<ResponseBody> call = iUserInterface.getUserName(1123);

call.enqueue(new Callback<ResponseBody>() {

@Override
public void onResponse (Call<ResponseBody> call,
Response<ResponseBody> response){

}

@Override
public void onFailure(Call<ResponseBody> call, Throwable t){

}
})

OkHttp的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//创建OkHttpClient
OkHttpClient OkHttpClient = new OkHttpClient.Builder().build();


Request request = new Request.Builder()
.get()
.url("https://www.bytedance.com/user/1123/name")
.build();
//
OkHttp.Call call = okHttpClient.newCall(request);

Call.enqueue(new Callback() {

@Override
public void onFailure (Call call, IOException e) {

}

@Override
public void onResponse (Call call, Response response) {
//TODO: 服务器响应结果
}
});

字节跳动网络库

Cronet用C++实现,对OkHttp进行了特定优化。对其进行二次封装,设计一个高已用、功能全面的框架。最终决定基于Retrofit进行二次开发,将底层的OkHttp替换为Cronet,这就是TTNet。
核心点就是将OkHttpClient和OkHttpCall的生成替换为OkHttp和Cronet二选一。