2019-07-20 12:11  阅读(1625)
文章分类:Thrift 源码分析 文章标签:ThriftThrift 源码

协议和编解码是一个网络应用程序的核心问题之一,客户端和服务器通过约定的协议来传输消息(数据),通过特定的格式来编解码字节流,并转化成业务消息,提供给上层框架调用。

Thrift的协议比较简单,它把协议和编解码整合在了一起。抽象类TProtocol定义了协议和编解码的顶层接口。个人感觉采用抽象类而不是接口的方式来定义顶层接口并不好,TProtocol关联了一个TTransport传输对象,而不是提供一个类似getTransport()的接口,导致抽象类的扩展性比接口差。

TProtocol主要做了两个事情:

  1. 关联TTransport对象

2.定义一系列读写消息的编解码接口,包括两类,一类是复杂数据结构比如readMessageBegin, readMessageEnd,  writeMessageBegin, writMessageEnd.还有一类是基本数据结构,比如readI32, writeI32, readString, writeString

public abstract class TProtocol {

  /**
   * Transport
   */
  protected TTransport trans_;

 public abstract void writeMessageBegin(TMessage message) throws TException;

 public abstract void writeMessageEnd() throws TException;

 public abstract void writeStructBegin(TStruct struct) throws TException;

 public abstract void writeStructEnd() throws TException;

 public abstract void writeFieldBegin(TField field) throws TException;

 public abstract void writeFieldEnd() throws TException;

 public abstract void writeFieldStop() throws TException;

 public abstract void writeMapBegin(TMap map) throws TException;

 public abstract void writeMapEnd() throws TException;

 public abstract void writeListBegin(TList list) throws TException;

 public abstract void writeListEnd() throws TException;

 public abstract void writeSetBegin(TSet set) throws TException;

 public abstract void writeSetEnd() throws TException;

 public abstract void writeBool(boolean b) throws TException;

 public abstract void writeByte(byte b) throws TException;

 public abstract void writeI16(short i16) throws TException;

 public abstract void writeI32(int i32) throws TException;

 public abstract void writeI64(long i64) throws TException;

 public abstract void writeDouble(double dub) throws TException;

 public abstract void writeString(String str) throws TException;

 public abstract void writeBinary(ByteBuffer buf) throws TException;

 /**
* Reading methods.
*/

 public abstract TMessage readMessageBegin() throws TException;

 public abstract void readMessageEnd() throws TException;

 public abstract TStruct readStructBegin() throws TException;

 public abstract void readStructEnd() throws TException;

 public abstract TField readFieldBegin() throws TException;

 public abstract void readFieldEnd() throws TException;

 public abstract TMap readMapBegin() throws TException;

 public abstract void readMapEnd() throws TException;

 public abstract TList readListBegin() throws TException;

 public abstract void readListEnd() throws TException;

 public abstract TSet readSetBegin() throws TException;

 public abstract void readSetEnd() throws TException;

 public abstract boolean readBool() throws TException;

 public abstract byte readByte() throws TException;

 public abstract short readI16() throws TException;

 public abstract int readI32() throws TException;

 public abstract long readI64() throws TException;

 public abstract double readDouble() throws TException;

 public abstract String readString() throws TException;

 public abstract ByteBuffer readBinary() throws TException;

 /**
* Reset any internal state back to a blank slate. This method only needs to
* be implemented for stateful protocols.
*/
 public void reset() {}

 /**
* Scheme accessor
*/
 public Class<? extends IScheme> getScheme() {
   return StandardScheme.class;
 }

}

所谓协议就是客户端和服务器端约定传输什么数据,如何解析传输的数据。对于一个RPC调用的协议来说,要传输的数据主要有:

调用方

  1. 方法的名称,包括类的名称和方法的名称
  2. 方法的参数,包括类型和参数值

3.一些附加的数据,比如附件,超时事件,自定义的控制信息等等

返回方

  1. 调用的返回码
  2. 返回值

3.异常信息

从TProtocol的定义我们可以看出Thrift的协议约定如下事情:

  1. 先writeMessageBegin表示开始传输消息了,写消息头。Message里面定义了方法名,调用的类型,版本号,消息seqId

2.接下来是写方法的参数,实际就是写消息体。如果参数是一个类,就writeStructBegin

  1. 接下来写字段,writeFieldBegin, 这个方法会写接下来的字段的数据类型和顺序号。这个顺序号是Thrfit对要传输的字段的一个编码,从1开始
  2. 如果是一个集合就writeListBegin/writeMapBegin,如果是一个基本数据类型,比如int, 就直接writeI32
  3. 每个复杂数据类型写完都调用writeXXXEnd,直到writeMessageEnd结束
  4. 读消息时根据数据类型读取相应的长度

每个writeXXX都是采用消息头+消息体的方式。我们来看TBinaryProtocol的实现。

1.writeMessgeBegin方法写了消息头,包括4字节的版本号和类型信息,字符串类型的方法名,4字节的序列号seqId

  1. writeFieldBegin,写了1个字节的字段数据类型,和2个字节字段的顺序号
  2. writeI32,写了4个字节的字节数组
  3. writeString,先写4字节消息头表示字符串长度,再写字符串字节
  4. writeBinary,先写4字节消息头表示字节数组长度,再写字节数组内容

6.readMessageBegin时,先读4字节版本和类型信息,再读字符串,再读4字节序列号

7.readFieldBegin,先读1个字节的字段数据类型,再读2个字节的字段顺序号

  1. readString时,先读4字节字符串长度,再读字符串内容。字符串统一采用UTF-8编码

    ``` public void writeMessageBegin(TMessage message) throws TException { if (strictWrite_) { int version = VERSION_1 | message.type; writeI32(version); writeString(message.name); writeI32(message.seqid); } else { writeString(message.name); writeByte(message.type); writeI32(message.seqid); } }

public void writeFieldBegin(TField field) throws TException { writeByte(field.type); writeI16(field.id); }

private byte[] i32out = new byte[4]; public void writeI32(int i32) throws TException { i32out[0] = (byte)(0xff & (i32 >> 24)); i32out[1] = (byte)(0xff & (i32 >> 16)); i32out[2] = (byte)(0xff & (i32 >> 8)); i32out[3] = (byte)(0xff & (i32)); trans_.write(i32out, 0, 4); }

public void writeString(String str) throws TException { try { byte[] dat = str.getBytes("UTF-8"); writeI32(dat.length); trans_.write(dat, 0, dat.length); } catch (UnsupportedEncodingException uex) { throw new TException("JVM DOES NOT SUPPORT UTF-8"); } }

public void writeBinary(ByteBuffer bin) throws TException { int length = bin.limit() - bin.position(); writeI32(length); trans_.write(bin.array(), bin.position() + bin.arrayOffset(), length); }

public TMessage readMessageBegin() throws TException { int size = readI32(); if (size < 0) { int version = size & VERSION_MASK; if (version != VERSION_1) { throw new TProtocolException(TProtocolException.BAD_VERSION, "Bad version in readMessageBegin"); } return new TMessage(readString(), (byte)(size & 0x000000ff), readI32()); } else { if (strictRead_) { throw new TProtocolException(TProtocolException.BAD_VERSION, "Missing version in readMessageBegin, old client?"); } return new TMessage(readStringBody(size), readByte(), readI32()); } }

public TField readFieldBegin() throws TException { byte type = readByte(); short id = type == TType.STOP ? 0 : readI16(); return new TField("", type, id); }

public String readString() throws TException { int size = readI32();

if (trans_.getBytesRemainingInBuffer() >= size) { try { String s = new String(trans_.getBuffer(), trans_.getBufferPosition(), size, "UTF-8"); trans_.consumeBuffer(size); return s; } catch (UnsupportedEncodingException e) { throw new TException("JVM DOES NOT SUPPORT UTF-8"); } }

return readStringBody(size); } ```

TProtocol定义了基本的协议信息,包括传输什么数据,如何解析传输的数据的基本方法。

2016-02-19_56c6c620781cd.jpg

还存在一个问题,就是服务器端如何知道客户端发送过来的数据是怎么组合的,比如第一个字段是字符串类型,第二个字段是int。这个信息是在IDL生成客户端时生成的代码时提供了。Thrift生成的客户端代码提供了读写参数的方法,这两个方式是一一对应的,包括字段的序号,类型等等。客户端使用写参数的方法,服务器端使用读参数的方法。

关于IDL生成的客户端代码会在后面的文章具体描述。下面简单看一下自动生成的代码

  1. 方法的调用从writeMessageBegin开始,发送了消息头信息

  2. 写方法的参数,也就是写消息体。方法参数由一个统一的接口TBase描述,提供了read和write的统一接口。自动生成的代码提供了read, write方法参数的具体实现

  3. 写完结束

    public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("handle", org.apache.thrift.protocol.TMessageType.CALL, 0)); handle_args args = new handle_args(); args.setIdentity(identity); args.setUid(uid); args.setSid(sid); args.setType(type); args.setMessage(message); args.setParams(params); args.write(prot); prot.writeMessageEnd(); }

    public interface TBase<T extends TBase, F extends TFieldIdEnum> extends Comparable, Serializable {

    public void read(TProtocol iprot) throws TException;

    public void write(TProtocol oprot) throws TException; }

    public static class handle_args implements org.apache.thrift.TBase<handle_args, handle_args._Fields>, java.io.Serializable, Cloneable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("handle_args");

    private static final org.apache.thrift.protocol.TField IDENTITY_FIELD_DESC = new org.apache.thrift.protocol.TField("identity", org.apache.thrift.protocol.TType.STRING, (short)1); private static final org.apache.thrift.protocol.TField UID_FIELD_DESC = new org.apache.thrift.protocol.TField("uid", org.apache.thrift.protocol.TType.I64, (short)2); private static final org.apache.thrift.protocol.TField SID_FIELD_DESC = new org.apache.thrift.protocol.TField("sid", org.apache.thrift.protocol.TType.STRING, (short)3); private static final org.apache.thrift.protocol.TField TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("type", org.apache.thrift.protocol.TType.I32, (short)4); private static final org.apache.thrift.protocol.TField MESSAGE_FIELD_DESC = new org.apache.thrift.protocol.TField("message", org.apache.thrift.protocol.TType.STRING, (short)5); private static final org.apache.thrift.protocol.TField PARAMS_FIELD_DESC = new org.apache.thrift.protocol.TField("params", org.apache.thrift.protocol.TType.MAP, (short)6);

    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { schemes.put(StandardScheme.class, new handle_argsStandardSchemeFactory()); schemes.put(TupleScheme.class, new handle_argsTupleSchemeFactory()); }

    public String identity; // required public long uid; // required public String sid; // required public int type; // required public String message; // required public Map<String,String> params; // required

    /**The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { IDENTITY((short)1, "identity"), UID((short)2, "uid"), SID((short)3, "sid"), TYPE((short)4, "type"), MESSAGE((short)5, "message"), PARAMS((short)6, "params"); }

    //  自动生成的写方法参数的方法,按照字段顺序写,给客户端代码使用 public void write(org.apache.thrift.protocol.TProtocol oprot, handle_args struct) throws org.apache.thrift.TException { struct.validate();

       oprot.writeStructBegin(STRUCT_DESC);
       if (struct.identity != null) {
         oprot.writeFieldBegin(IDENTITY_FIELD_DESC);
         oprot.writeString(struct.identity);
         oprot.writeFieldEnd();
       }
       oprot.writeFieldBegin(UID_FIELD_DESC);
       oprot.writeI64(struct.uid);
       oprot.writeFieldEnd();
       if (struct.sid != null) {
         oprot.writeFieldBegin(SID_FIELD_DESC);
         oprot.writeString(struct.sid);
         oprot.writeFieldEnd();
       }
       oprot.writeFieldBegin(TYPE_FIELD_DESC);
       oprot.writeI32(struct.type);
       oprot.writeFieldEnd();
       if (struct.message != null) {
         oprot.writeFieldBegin(MESSAGE_FIELD_DESC);
         oprot.writeString(struct.message);
         oprot.writeFieldEnd();
       }
    

    }

    //  自动生成的读方法参数的方法,按照字段顺序读,给服务器端代码使用

    
    public void read(org.apache.thrift.protocol.TProtocol iprot, handle_args struct) throws org.apache.thrift.TException {  
           org.apache.thrift.protocol.TField schemeField;  
           iprot.readStructBegin();  
           while (true)  
           {  
             schemeField = iprot.readFieldBegin();  
             if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {   
               break;  
             }  
             switch (schemeField.id) {  
               case 1: // IDENTITY  
                 if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {  
                   struct.identity = iprot.readString();  
                   struct.setIdentityIsSet(true);  
                 } else {   
                   org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);  
                 }  
                 break;  
               case 2: // UID  
                 if (schemeField.type == org.apache.thrift.protocol.TType.I64) {  
                   struct.uid = iprot.readI64();  
                   struct.setUidIsSet(true);  
                 } else {   
                   org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);  
                 }  
                 break;  
               case 3: // SID  
                 if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {  
                   struct.sid = iprot.readString();  
                   struct.setSidIsSet(true);  
                 } else {   
                   org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);  
                 }  
                 break;  
               case 4: // TYPE  
                 if (schemeField.type == org.apache.thrift.protocol.TType.I32) {  
                   struct.type = iprot.readI32();  
                   struct.setTypeIsSet(true);  
                 } else {   
                   org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);  
                 }  
                 break;  
    }  
    
点赞(0)
版权归原创作者所有,任何形式转载请联系作者; Java 技术驿站 >> Thrift源码分析-- 协议和编解码
上一篇
Thrift源码分析-- 基本概念
下一篇
Thrift源码分析-- IDL和生成代码分析