1.3 最原始的消息设计
其实,我相信很多读者在一些分布式应用的开发过程中,已经用到了较原始的消息。这里是指采用socket编程自己实现通信的那些人,而不包括采用IBM MQ、CORBA、JMS等协议及其产品的情况。
1.3.1 原始消息的设计与实现
首先,我们介绍一种在实践中最常用的消息设计与实现方法。
例如,当分布在网络上的两个应用程序需要进行如下数据交换:发送/接收一个人的信息(包括姓名、身高和年龄)。
很多应用开发者采用如下方法实现。
(1)定义一个结构,存放人的信息:
struct Person { char name[20]; float height; int age; }; struct Person p; strcpy(p.name, "Michael Zhang"); height = 170.00; age = 30;
(2)将以上结构序列化:
char sendStream[1024] = {0}; spri ntf(sendStream, "|%s|%f|%d", p.name, p.height, p.age);
(3)发送方发送该字节流:
/*注:这里省略建立/管理/关闭TCP连接的代码*/ char datalen[4+1] = {0}; sprintf(datalen, "%04d", strlen(sendStream)); if(SendBytes(socket, datalen, 4) == -1) { return -1;
} if(SendBytes (socket, sendStream, strlen(sendStream)) == -1) { return -1; }
注意,以上代码中的函数SendBytes实际上是保证一定长度的字节流全部成功发送完毕后才返回,主要是由于在socket上调用send或write函数不能保证一次能将一定长度的字节流发送完。SendBytes的基本思想是循环发送,直至成功发完所有字节,其实现代码如下所示:
int SendBytes (int sd, const void *buffer, unsigned len) { int rez = 0; int leftlen = len; int readlen = 0; while(true) { rez = write (socket, (char *)buffer+readlen, len-readlen); if(rez < 0) { if (errno != EWOULDBLOCK && errno != EINTR){ ErrorMsg("Error is serious"); DisConnect(socket); } return -1; } readlen += rez; leftlen -= rez; if(leftlen <= 0) { break; } } return len; }
(4)接收方接收该字节流:
char datalen[4+1] = {0}; char receiveStream[1024] = {0}; sprintf(datalen, "%04d", strlen(sendStream)); if(ReceiveBytes(socket, datalen, 4) == -1) { return -1; } int packet_len = atoi(datalen);
if(ReceiveBytes (socket, receiveStream, packet_len) == -1) { return -1; }
同样,以上代码中的函数ReceiveBytes实际上是保证一定长度的字节流全部接收发送完毕后才返回,其基本思想是循环接收,直至成功接收完所有字节,其实现代码如下所示:
int ReceiveBytes (int socket, void *buffer, unsigned len) { int rez = 0; int leftlen = len; int readlen = 0; while(true) { rez = read (socket, (char *)buffer+readlen, len-readlen); if (rez <= 0) { if (rez == 0 || (errno != EWOULDBLOCK && errno != EINTR)){ Disconnect (); memset (buffer, 0, len); return rez; } continue; } readlen += rez; leftlen -= rez; if(leftlen <= 0) { break; } } return len; }
(5)字节流反序列化得到结构:
struct Person p; sscanf(receiveStream, "|%[^|]|%f|%d", p.name, &p.height, &p.age);
通过以上5步,一个人的信息(结构Person的实例)即可被成功地由发送方传送到接收方。同理,我们也可以依法炮制,实现其他信息的传送。
类似以上思路的程序设计在实践中应用很多,而其实质主要是以下两点:
· 将需要发送的一组不同变量值用某分隔符(在这里是“|”)区分开来,一起发送/接收。
· 将整型、浮点型数据转换成字符串格式发送/接收。这主要是基于两方面的考虑:一是实现平台无关,二是能以字符串方式处理数值型数据(整型、浮点型数据在内存中是以二进制格式表达的),以规避字符串方式处理二进制字节流的能力的不足(这在C/C++语言中尤为明显)。
这里要说的是,以上这种已经被一些开发过分布式应用底层模块的读者所熟悉的方法,就是一种最原始的消息设计。如果我们将以上程序再优化一些,对不同类型的结构编写较为通用的序列化/反序列化函数,就基本上可以使一般编程人员在涉及网络上两个应用之间交互数据的编程需求时,不必了解TCP/IP与socket编程的细节,只考虑对类似上述Person结构的实例进行发送/接收即可。当然,关于较通用模式的序列化/反序列化函数,每增加一个新的结构struct时,还是需要增加新的分支。
1.3.2 原始消息设计方法分析
现在,让我们对上一节介绍的消息实现方法进行分析。
显而易见,该方法可以在一定程度上满足网络应用之间进行结构化数据通信的需求。对一个新的通信需求,只需定义新的结构,然后对其中的整型、浮点型、字符串型数据按照既定的规则分别进行发送与接收处理即可,实现起来并不复杂,简单实用。
但是,这种方法有什么不足之处呢?
最容易想到的一点便是:如果结构Person中name字段的值出现了分隔符“|”,则会导致该消息机制出错。更换消息机制中的分隔符自然不是解决问题的方法,于是,有些程序员对字符串字段采用固定发送与接收长度的方法(即“定长”)来避免该问题:例如,对以上结构Person中的name字段,发送方与接收方共同约定为20-1=19个字节。这样一来,似乎解决了相当一部分的问题,对于简单的分布式应用程序开发,这看起来好像是足够了。
然而,当我们的需求不断复杂起来以后,就会发现,这种消息实现方法,远远不能满足分布式通信的需要:
· 应用开发需要在消息依附的结构中加入数组元素,并且每个数组元素本身也是一个结构或类。例如,需要能包括人身体每个部位的信息,包括名字、重量、大小等(如眼睛、嘴巴、耳朵等)。
· 应用开发需要能发送/接收二进制数据,如人的照片。
· 应用开发需要通过消息机制连续发送大量的数据。例如,附着于结构Person的各种说明文件,数据文件等。
· 分布式应用之间除了需要采用消息进行数据/信息交换以外,还需要通过消息进行远程过程调用,即一个应用中的接口函数能被处于网络上的另一个应用调用。
· 大量的同一类型或不同类型的消息,需要一个简单、有效、可方便扩充的管理机制。
· 应用开发对通信控制有了更高的要求:有时当一个消息发送出去以后,需要该进程或线程阻塞等待,直到等到对方的回复然后继续;有时当一个消息发送出去以后,虽然还是需要回复,但该进程或线程不阻塞,而是继续执行,由其他的线程或进程异步接收对方的回复;而有时,消息发送出去后,则不需要任何回复。
例如,在传送一个大文件时,应该包括三大步骤:(1)“创建文件”的远程过程调用与文件基本信息传输;(2)文件内容传输与“内容写入文件”的远程过程调用;(3)“文件传输结束”的远程过程调用。其中在步骤(1)中,发送方进程/线程需要等待到接收方文件建立成功的回复才能继续,否则发送数据是没有意义的;而步骤(2)中,虽然是需要回复,但发送方并不需要等待,只需连续不停地发送数据即可;而步骤(3)则是发送方需要等待回复,因为这时,接收方往往需要进行一些对所接收数据的可靠性认证(如MD5签名),然后发送方才能继续或结束。
· 网络上的两个分布式应用之间需要能同时发送/接收大量不同类型的消息,包括发送、回复及远程过程调用。
分布式应用中对消息机制的复杂要求还有很多,远不止上面所说的几条,这一点读者可以从后面对消息机制的详细阐述中体会到,这里不再一一列举。总之,这里是想说明,1.3.1节介绍的消息设计与实现方法,远远不能满足作为一个分布式应用开发核心机制的需求,只能算是一种在实践中较常用的、简单的、最原始的结构化数据通信方法。