Nebula Graph 源码解读系列|客户端的通信秘密——fbthrift

概述
给用户提供了多种编程语言的 API 用于和Graph 交互,并且对服务端返回的数据结构进行了重新封装,便于用户使用 。
目前支持的语言有 C++、Java、、 和 Rust 。
通信框架
使用了作为服务端和客户端之间的 RPC 通信框架,实现了跨语言的交互 。
提供了三方面的功能:
生成代码: 可将不同语言序列化成数据结构序列化:将生成的数据结构序列化通信交互:在客户端、服务端之间传输消息,收到不同语言的客户端的请求时,调用相应的服务端函数 例子
这里以客户端为例 , 展示在Graph 中的应用 。
结构在服务端的定义:
struct Vertex {Value vid;std::vector tags;Vertex() = default;};
首先, 在 src//. 中定义一些数据结构:
struct Tag {1: binary name,// List of 2: map (cpp.template = "std::unordered_map") props,} (cpp.type = "nebula::Tag")struct Vertex {1: Valuevid,2: list tags,} (cpp.type = "nebula::Vertex")
在这里我们定义了一个的结构,其中 (cpp.type = "::") 标注出了这个结构对应了服务端的 :: 。
会自动为我们生成的数据结构:
// Attributes://- Vid//- Tagstype Vertex struct {Vid *Value `thrift:"vid,1" db:"vid" json:"vid"`Tags []*Tag `thrift:"tags,2" db:"tags" json:"tags"`}func NewVertex() *Vertex {return &Vertex{}}...func (p *Vertex) Read(iprot thrift.Protocol) error { // 反序列化...}func (p *Vertex) Write(oprot thrift.Protocol) error { // 序列化...}
在 MATCH (v:) WHERE id(v) == "ABC"v 这条语句中:客户端向服务端请求了一个顶点(::),服务端找到这个顶点后会进行序列化,通过 RPC 通信框架的发送到客户端,在客户端收到这份数据时,会进行反序列化,生成对应客户端中定义的数据结构(type) 。客户端模块
在这个章节会以 -go 为例,介绍客户端的各个模块和其主要接口 。
配置类  , 提供全局的配置选项 。
type PoolConfig struct {// 设置超时时间,0 代表不超时,单位 ms 。默认是 0TimeOut time.Duration// 每个连接最大空闲时间,当连接超过该时间没有被使用将会被断开和删除 , 0 表示永久 idle,连接不会关闭 。默认是 0IdleTime time.Duration// max_connection_pool_size: 设置最大连接池连接数量,默认 10MaxConnPoolSize int// 最小空闲连接数 , 默认 0MinConnPoolSize int}
客户端会话,提供用户直接调用的接口 。
//管理 Session 特有的信息type Session struct {// 用于执行命令的时候的身份校验或者消息重试sessionIDint64// 当前持有的连接connection *connection// 当前使用的连接池connPool*ConnectionPool// 日志工具logLogger// 用于保存当前 Session 所用的时区timezoneInfo}
// 执行 nGQL,返回的数据类型为 ResultSet , 该接口是非线程安全的 。func (session *Session) Execute(stmt string) (*ResultSet, error) {...}// 重新为当前 Session 从连接池中获取连接func (session *Session) reConnect() error {...}// 做 signout,释放 Session ID,归还 connection 到 poolfunc (session *Session) Release() {
【Nebula Graph 源码解读系列|客户端的通信秘密——fbthrift】连接池,管理所有的连接,主要接口有以下
// 创建新的连接池, 并用输入的服务地址完成初始化func NewConnectionPool(addresses []HostAddress, conf PoolConfig, log Logger) (*ConnectionPool, error) {...}// 验证并获取 Session 实例func (pool *ConnectionPool) GetSession(username, password string) (*Session, error) {...}
连接  , 封装的网络,提供以下接口
// 和指定的 ip 和端口的建立连接func (cn *connection) open(hostAddress HostAddress, timeout time.Duration) error {...}// 验证用户名和密码func (cn *connection) authenticate(username, password string) (*graph.AuthResponse, error) {// 执行 queryfunc (cn *connection) execute(sessionID int64, stmt string) (*graph.ExecutionResponse, error) {...}// 通过 SessionId 为 0 发送 "YIELD 1" 来判断连接是否是可用的func (cn *connection) ping() bool {...}// 向 graphd 释放 sessionIdfunc (cn *connection) signOut(sessionID int64) error {...}// 断开连接func (cn *connection) close() {...}
负载均衡,在连接池里面使用该模块模块交互解析
连接池管理连接:一般只有在程序退出时才需要关闭连接池, 在关闭时池中所有的连接都会被断开 。客户端会话连接用户使用示例
// Initialize connection poolpool, err := nebula.NewConnectionPool(hostList, testPoolConfig, log)if err != nil {log.Fatal(fmt.Sprintf("Fail to initialize the connection pool, host: %s, port: %d, %s", address, port, err.Error()))}// Close all connections in the pool when program exitsdefer pool.Close()// Create sessionsession, err := pool.GetSession(username, password)if err != nil {log.Fatal(fmt.Sprintf("Fail to create a new session from connection pool, username: %s, password: %s, %s",username, password, err.Error()))}// Release session and return connection back to connection pool when program exitsdefer session.Release()// Excute a queryresultSet, err := session.Execute(query)if err != nil {fmt.Print(err.Error())}
返回数据结构
客户端对部分复杂的服务端返回的查询结果进行了封装并添加了接口,以便于用户使用 。
查询结果基本类型封装后的类型
Null
Bool
Int64
Time
Date
List
Set
Map
Node
Edge
Path
(用于 的行操作)
对于 ::Value,在客户端会被包装成 ,并通过接口转换成其他结构 。(i.g. node = .())
数据结构的解析
对于语句 MATCH p= (v:{name:"Tim "})-[]->(v2)p , 返回结果为:
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| p|+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| <("Tim Duncan" :bachelor{name: "Tim Duncan", speciality: "psychology"} :player{age: 42, name: "Tim Duncan"})<-[:teammate@0 {end_year: 2016, start_year: 2002}]-("Manu Ginobili" :player{age: 41, name: "Manu Ginobili"})> |+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+Got 1 rows (time spent 11550/12009 us)
我们可以看到返回的结果包含了一行,类型是一条路径. 此时如果需要取得路径终点(v2)的属性,可以通过如下操作实现:
// Excute a queryresultSet, _ := session.Execute("MATCH p= (v:player{name:"\"Tim Duncan"\"})-[]->(v2) RETURN p")// 获取结果的第一行, 第一行的 index 为0record, err := resultSet.GetRowValuesByIndex(0)if err != nil {t.Fatalf(err.Error())}// 从第一行中取第一列那个 cell 的值// 此时 valInCol0 的类型为 ValueWrapper valInCol0, err := record.GetValueByIndex(0)// 将 ValueWrapper 转化成 PathWrapper 对象pathWrap, err = valInCol0.AsPath()// 通过 PathWrapper 的 GetEndNode() 接口直接得到终点node, err = pathWrap.GetEndNode()// 通过 node 的 Properties() 得到所有属性// props 的类型为 map[string]*ValueWrapperprops, err = node.Properties()
客户端地址
各语言客户端地址: