协程
使用异步非阻塞编程,确实能获得很好的性能。但是在代码上,确非常不直观。因为任何一个可能阻塞的操作,都必须要要通过“回调”函数来链接。比如一个玩家登录,你需要先读数据库,然后读一个远程缓冲服务器(如redis),然后返回登录结果:用户名、等级……在这个过程里,有两个可能阻塞的操作,你就必须把这个登录的程序,分成三个函数来编写:一个是收到客户端数据包的回调,第二个是读取数据库后的回调,第三个是读取缓冲服务器后的回调。
这种情况下,代码被放在三个函数里,对于阅读代码的人来说,是一种负担。因为我们阅读代码,比如通过日志、coredump去查问题,往往会直接切入到某一个函数里。这个被切入阅读的函数,很可能就是一个回调函数,对于这个函数为什么会被调用,属于什么流程,单从这个函数的代码是很难理解的。
另外一个负担,是关于开发过程的。我们知道回调函数的代码,是需要“上下文”的,也就是发起回调时的数据状态的。为了让回调函数能获得发起函数的一个变量内容,我们就必须把这个变量内容放到某个“上下文”的变量中,然后传给回调函数。由于业务逻辑的变化,这种需要传递的上下文变量会不停的变化,反复的编写“放入”“取出”上下文的代码,也是一种重复的编码劳动。而且上下文本身的设置可能也不够安全,因为你无法预计,哪个回调函数会怎么样的修改这个上下文对象,这也是很多难以调试的BUG的来源。
为了解决这个问题,出现了所谓的协程技术。我们可以认为,协程技术提供给我们一种特殊的return语句:yield。这个语句会类似return一样从函数中返回,但你可以用另外一个特殊的语句resume(id)来从新从yield语句下方开始运行代码。更重要的是,在resume之后,之前整个函数中的所有临时变量,都是可以继续访问的。
当然,做resume(id)的时候,肯定是在进程的所谓“主循环”中,而这个id参数,则代表了被中断了的函数。这种可以被中断的函数调用过程,就叫协程。而这个id,则是代表了协程的一个数字。异步调用的上下文变量,就被自动的以这个协程函数的“栈”所取代,也就是说,协程函数中的所有局部变量,都自动的成为了上下文的内容。这样就再也不用反复的编写“放入”“取出”上下文内容的代码了。
我使用了https://github.com/Tencent/Pebble/tree/master/src/common项目下的coroutine.cpp/.h作为协程的实现者。
游戏开发中,协程确实能大大的提高开发效率。因此我认为协程也应该是Game Server所应该具备的能力。特别是在处理业务逻辑的Handler的Process()函数,本身就应该是一个协程函数。所以我设计了一个CoroutineProcessor的类,为普通的Processor添加上协程的能力。——基于装饰器模式。这样任何的Processor::Process()函数,就自然的在一个协程之中。
因为有了协程的支持,那些可能产生阻塞而要求编写回调的功能,就可以统一的变成以协程使用的API了:
- DataStore -> CoroutineDataStore
- Cache -> CoroutineCache
- Client -> CoroutineClient
使用协程的API,就完全不需要各种Callback类型的参数了,完全提供一个返回结果用的输出参数即可。
- /**
- * @brief DataStore 的具备协程能力的装饰器类型。
- * @attention 除了定义变量语句和 Update() 以外,其他的操作都需要在协程中调用。
- */
- class
- CoroutineDataStore
- :
- public
- Updateable {
- public:
- CoroutineDataStore(DataStore * data_store,
- CoroutineSchedule * schedule);
- virtual
- ~CoroutineDataStore();
- int
- Init(Config * cfg, std: :string * err_msg);
- /**
- * 读取一个数据对象,通过 key ,把数据放入到输出参数 value。
- * 此函数会在调用过程中使用协程的 yield 出去。
- */
- int
- Get(const std: :string & key,
- Serializable * value);
- /**
- * 写入一个数据对象,写入 key ,value
- * 写入结果从返回值获得,返回 0 表示成功,其他值表示失败。
- * 此函数会在调用过程中使用协程的 yield 出去。
- */
- int
- Put(const std: :string & key,
- const
- Serializable & value);
- /**
- * 删除一个数据对象,通过 key
- * 写入结果从返回值获得,返回 0 表示成功,其他值表示失败。
- * 此函数会在调用过程中使用协程的 yield 出去
- */
- int
- Remove(const std: :string & key);
- int
- Update();
- private:
- DataStore * data_store_;
- CoroutineSchedule * schedule_;
- };
复制代码
服务器对象管理
组件模型
一般来说服务器上,主要是运行各种各样处理请求的代码为主(通常叫Handler)。然而,我们也会有一些需要持续运行的逻辑代码,比如处理匹配玩家战斗的逻辑,检查玩家是否超时发呆的逻辑,循环处理支付订单等等。这些代码的很多功能,同时还需要被各种Handler所调用。所以我们必须要有一种能让所有的这些自定义代码,以一种标准的方式在进程中互相引用,以及管理生命周期的方法。
借鉴于Unity,我觉得使用所谓的组件模型是很好的。它的特点包括:
- 组件之间通过 Application::GetComponet(name) 的方式互相调用。以一个字符串作为索引,就可以方便的获得对于的对象。组件自己通过 Application::Register(com_obj) 注册到系统中去,注册的名字自己实现 string GetName() 的接口去提供。
- 每个组件有预定的几个回调函数,提供进程生命周期的调用机会。包括:
- 初始化:Init()
- 主循环更新:Update()
- 关闭:Close()
- /**
- * 代表一个应用程序组件,所有的应用程序组件应该继承此对象
- */
- class
- Component
- :
- public
- Updateable
- {
- public:
- Component();
- virtual
- ~Component();
- /**
- * 返回此组件的名字
- * @return 名字
- */
- virtual std: :string GetName()
- =
- 0;
- /**
- * 初始化过程会调用此方法
- * @param app 进程对象
- * @param cfg 配置
- * @return 返回 0 表示成功,其他表示失败
- */
- virtual
- int
- Init(Application * app,
- Config * cfg);
- /**
- * 更新过程会调用此方法
- * @return 返回更新处理的负载,0 表示没有负载,负数表示出错或需要停止对此组件进行更新
- */
- virtual
- int
- Update();
- /**
- * 应用停止时会调用此方法
- * @return 返回 0 表示可以退出,返回 >0 表示需要等待返回值的秒数,返回 < 0 表示出错退出
- */
- virtual
- int
- Stop()
- {
- return
- 0;
- };
- /**
- * 设置组件被加入的应用程序,用于让组件继承者能简单的获取 Application 对象
- * @note 如果一个组件被加入多个不同的 Application,必须使用 @see Init() 方法来具体保存 Application 对象,
- * 因为此处修改的成员对象 app_ 将是最后一次被添加进的 Application 对象。
- * @param app 要设置的 Application 对象。
- */
- void set_app(Application * app) {
- app_ = app;
- }
- protected:
- Application * app_;
- };
复制代码
Server对象
由于一个游戏服务器,所集成的功能实在是太多了,比如配置不同的协议、不同的处理器、提供数据库功能等等。要让这样一个服务器对象启动起来,需要大量的“组装代码”。为了节省这种代码,我设计了一个LocalServer的类型,作为一个Server模板,简化网络层的组装。使用者可以继承这个类,用来实现各种不同的Server。
- class
- LocalServerApp
- :
- public
- Application
- {
- public:
- LocalServerApp();
- virtual
- ~LocalServerApp();
- virtual
- int
- Init(Config * cfg = NULL);
- virtual
- int
- Exit();
- void set_transport(Transport * transport);
- void set_protocol(Protocol * protocol);
- void set_processor(Processor * processor);
- private:
- Transport * transport_;
- Protocol * protocol_;
- Processor * processor_;
- Server * server_;
- };
复制代码
这个简单的类,可以通过setter方法来自定义网络层的组件,否则就是最常用的TCP,TLV,Echo这种服务器。而且这个类还是继承于Application的,这样可以让数据库或者其他的组件,也很方便的利用组件系统安装到服务器上。
集群功能
需求分析
游戏常常是一个带状态的服务。所以集群功能非常困难。
有一些框架,试图把状态从逻辑进程中搬迁出来,放在缓冲服务器中,但是往往满足不了性能需求。另外一些框架,则把集群定义成一个固定的层次架构,通过复杂的消息转发规则,来试图“把请求发到装载状态的进程上”,但这导致了运维部署的巨大复杂性。
为了解决这些问题,我觉得有几个设计决策是必须要订立的:
- 使用SOA的模式:集群中心的地址作为集群的地址,通过服务名来分割逻辑
- 提供给用户自定义路由的接口:由于集群中的进程都带有状态,要把请求发给哪个进程,并不能完全自动选择,所以必须要用户提供代码来选择
作为SOA模式下的集群,必须定义每个服务的“合同”格式。由于一个游戏服务器,可能存在各种不同的通信协议和编码协议,所以这个合同必须要能包含所有这些内容。在传统的RPC设计中,比如WebService,就采用了WSDL的格式,但是现在这种风格更多的被RESTful所取代。因此我决定使用类似URL类型的字符串来表述合同:
- tcp://1.1.1.1:8888/tlv
复制代码
这样的合同描述,可以包含通信协议,IP地址和端口,编码协议三个部分,如果需要,还可以在PATH部分继续添加,如增加QueryString等。
集群中心
根据之前的设计,集群中心地址,即事集群的地址。而集群中心,为了避免单点故障,自己也必须是一个集群。能符合这个要求的可用开源软件,非ZooKeeper莫属。
所以我直接把集群中心的功能,使用ZooKeeper来实现。虽然ZooKeeper的API设计也足够优秀了,但是作为异步非阻塞的框架,还是必须要做一层封装和抽象。在编译C的ZK客户端API时,也碰到了一个讨厌的问题,就是这个API使用了一个旧版本的测试框架库cppunit-devel,在新版本的Linux发行版CentOS和Ubuntu中,直接从源安装的版本都和这个版本不兼容,没办法只好去官网上下载cppunit-1.13的源代码来编译安装。
为了方便使用ZooKeeper,我先实现了一个ZooKeeperMap的类,属于cache模块的DataMap的子类,用以完成标准的Key-Value存取。实际上在这里是为了完成链接ZooKeeper和初始化的功能。
如前文的合同所设计,当获得一个“合同”字符串的时候,是需要“构造”出一个使用对应合同的客户端对象的。不同的协议对应着不同类型的对象,在这里就需要一种类似“反射”生成对象的技术。对于没有这种反射能力的C++来说,我添加了一个“注册”模板方法,这个模板方法会把注册的类的构造工厂方法,记录到一个map里面。当然,这对于注册的类的构造器是有要求,需要有无参数构造器,或者是带“字符串,数字”构造器。当然,如果写错了也不要紧,只是不能编译成功而已。这也是静态绑定的好处之一了。
整个集群中心,最核心的接口其实就三个:
注册一个合同,包括提供的“服务名”和“合同”,这个合同内容必须是能让客户端访问到自己的通信地址。
查询合同,通过输入“服务名”,获得所有提供这个服务的合同列表
通过合同构建客户端,得到的客户端对象就是可以发送请求给对应合同的服务提供进程。
- /**
- * @brief 集群中心客户端
- * 每个 DenOS 进程启动时,都会向 ZooKeeper 注册自己的服务。
- * 此类型的对象,就是作为每个进程中,代表集群中心的存在。
- * ZooKeeper 默认根据 2 个 tick (心跳),大概为 3 秒,是否收到,来决定客户端是否死掉。
- * 在 DenOS 中,可以使用配置项目 ZK_RECV_TIMEOUT 参数(单位为毫秒)来决定这个超时时间。
- */
- class
- Center
- :
- public
- ZooKeeperMap
- {
- friend
- void
- ContractsWatcher(zhandle_t
- * zh,
- int type,
- int state,
- const
- char
- * path,
- void
- * watcherCtx);
- public:
- int last_error_code_;
- // 用来测试最后一个操作是否成功的变量
- std: :map < std: :string,
- std: :string > service_process_;
- // key: 服务名字,value 服务节点名字
- /**
- * @brief 构造一个集群中心客户端
- * @param urls ZooKeeper 的连接参数,形如:”127.0.0.1:2181,10.1.2.3:2182,192.168.3.23:2183″
- */
- Center(const std: :string & urls =
- “127.0.0.1:2181”);
- virtual
- ~Center();
- /// 驱动整个异步流程
- virtual
- int
- Update();
- /**
- * 往集群中声明注册服务
- * @param name 服务的名字
- * @param contract 服务的通信方式
- * @return 返回 0 表示已经发起注册流程,其他值表示失败
- */
- int
- RegisterService(const std: :string & name,
- const
- Contract & contract);
- /**
- * @brief 注册一个类作为对应协议字符串名字
- * 如果这个类是 Connector 的子类,必须要有一个形如 XXConnector(const string& p1, int p2) 这样的构造器。
- * 或者这个类是 Protocol 的子类,必须要有一个无参数构造器。
- * @param reg_name 协议字符串名字,如 tcp/udp/kcp/tconnd 或者 tlv/line/tdr
- */
- template < typename T >
- void
- RegProto(const std: :string & reg_name)
- {
- if
- (reg_name.empty())
- return;
- constructors_[reg_name]
- =
- new
- DefaultConstructor < T > ();
- }
- template < typename T >
- void
- RegConn(const std: :string & reg_name)
- {
- if
- (reg_name.empty())
- return;
- constructors_[reg_name]
- =
- new
- StrIntConstructor < T > ();
- }
- /**
- * @brief 查询一个服务去发起请求
- * 注意这是一个异步的接口,有可能会返回 -1 表示服务合同还未拿到。需要重复的去获取。
- * @param name 服务的名字
- * @param callback 当获得对应的服务的客户端的回调
- * @param client_cb 预期每个新的 Client 所注册的默认回调,用来接收连接、中断、收听通知。
- * @param route_param 用来传给路由器的自定义路由相关数据
- * @return 如果返回 0 表示成功,失败则会是其他数值
- */
- int
- QueryService(const std: :string & name,
- GetServiceClientCallback * callback,
- ClientCallback * client_cb = NULL,
- Router * router = NULL,
- void * route_param = NULL);
- /**
- * 根据合同缓存获得客户端对象
- * @param cache 合同缓存对象
- * @param client_cb 预期每个新的 Client 所注册的默认回调,用来接收连接、中断、收听通知。
- * @param router 路由器对象
- * @param route_param 路由参数
- * @return 客户端对象指针,无需主动 delete,因为会缓存起来
- */
- Client *
- GetClientByContracts(ContractCache * cache,
- ClientCallback * client_cb,
- Router * router,
- void * route_param);
- /**
- * 获得存放集群的 ZK 基础路径
- * @return ZNode 基础路径
- */
- inline
- const std: :string & cluster_prefix()
- const
- {
- return CLUSTER_PREFIX;
- }
- /**
- * 获得建立进程用的 ZK 标记 (Create flags)
- * @return zk 的 create flags
- */
- inline
- int process_flags()
- const
- {
- return process_flags_;
- }
- /// 在 ZK 写入本进程对此服务的合约
- void
- AddProcessContract(const std: :string & service_name,
- const std: :string & contract_data);
- /// 清理相关对象
- void
- CloseClient(Client * client);
- void
- SetContractsCache(const std: :string & service_name,
- ContractCache * cache);
- /*———————————- 继承自 ZooKeeper 为了实现功能用 ———————————–*/
- /// 建立存储节点父目录时,增加一个监听器,监听这些节点增加和删除变化
- virtual
- void
- CreatePrefixNode();
- /// 初始化 zookeeper 客户端连接,会修改 ZKMAP_KEY_PREFIX 为集群专用路径
- virtual
- int
- Init(Config * config = NULL);
- /**
- * 清理掉生成的客户端对象
- * @param client 客户端对象
- * @param content 相关的服务名字
- */
- void
- ClearClientMember(Client * client,
- const std: :string & content);
- };
复制代码
服务器间通信
在上面所说的集群中心功能中,最后一项“获得客户端”的方法,是需要用户输入一个Router类型的对象的。其原因就是,游戏服务器往往都是带状态,所以必须要让调用者有办法选择具体的服务提供者。比如游戏中的聊天功能,一般都支持“组队聊天”的功能,这个功能,需要把消息转发到不同的服务器进程上,因为队伍中的玩家可能登录在不同的服务器上。那么,如果玩家本身登录的规则,就是根据自己的ID做某种哈希去选择服务器进程的,那么,这个聊天功能,只要让Router对象也按同样的哈希方法去选择服务器进程,就能正确的发送消息了。当然了,根据某种类似“服务器进程ID”去选择服务器,也是一种路由方式,可以写入Router中去。
- /**
- * @brief 路由器基类
- */
- class
- Router
- {
- public:
- Router();
- virtual
- ~Router();
- /**
- * @brief 决定服务路由的接口
- * 此接口默认实现是取 cache 中的第一个非空结果
- * @param cache 需要选择的所有合同的缓存集合
- * @param content 输出参数,具体选择的合同的内容
- * @param route_param 用来提供给路由算法运行的额外参数
- */
- virtual
- void
- RouteToSevice(const
- ContractCache & cache, std: :string * content,
- void * route_param = NULL);
- };
复制代码
路由器的写法非常简单,也附带了一个route_param用来帮助传递一些路由选择所需的数据。当然你也可以构造多个不同的Router子类对象,用对象成员属性来携带更复杂的路由参数。
当我们选择出了合同,就可以利用Center的功能去发起服务请求了。下面是单元测试的部分代码,展示了如何在服务器之间调用服务:
- // 获取客户端
- DEBUG_LOG(“========== Getting Client ==========”);
- Center * center = obj_pro_reg.center();
- TestGetClientCallback callback;
- TestClientCallback cli_cb;
- center – >QueryService(handler.service_name_,
- &callback,
- &cli_cb);
- for
- (int i =
- 0; i <
- 100; i++)
- {
- usleep(100);
- svr – >Update();
- }
- EXPECT_EQ(0, callback.err_code_);
- EXPECT_TRUE(cli_cb.is_connected_);
- ASSERT_TRUE(callback.client_ != NULL);
- // 访问服务器
- DEBUG_LOG(“========== Requesting Service ==========”);
- Client * client = callback.client_;
- Request req;
- string data(“I love JMX!”);
- req.service = handler.service_name_;
- req.SetData(data.c_str(), data.length());
- client – >SendRequest( & req,
- &cli_cb);
- for
- (int i =
- 0; i <
- 100; i++)
- {
- usleep(100);
- svr – >Update();
- }
- EXPECT_EQ(data, cli_cb.resp_data_);
复制代码
在一般的异步编程中,访问集群中的服务,需要两个回调(组赛)过程,一个是通过集群中心查询合同,一个是请求服务。这样显然会让代码分散在不同的函数中,阅读起来非常不方便。所以我又使用了协程功能,封装了集群和客户端的能力,让整个过程可以用同步代码的写法来完成。
展望
写到这里,基本上关于一个游戏服务器框架的主体功能设计,都基本完成了。但是,一个游戏中还包含了很多不同的能力需要考虑。比如说排行榜、拍卖行、战斗记录日志等等,这个功能往往不能靠上文所述的key-value数据能力简单解决。而需要额外直接的对一些特殊的设施,比如redis/MySQL直接编程,这些部分,也只能放在框架之外处理了。也许以后,我会总结出更好的抽象层,能把带排序、模糊搜索、大容量记录的功能,一起放入框架的想法。另外,对于cache模块(缓冲),使用一致的API风格,去操作真正的分布式缓冲,还是一个未能很好解决的课题。虽然Orcal Conherence提供了很好的参考方案,但是限于时间和精力,也只能用简单的二级缓存来部分模拟其能力,这一方面也是值得去深入研究的部分。
总结一下,游戏服务器框架,其实基本能力也非常简单:
网络功能:提供请求响应、通知两种能力即可组合大部分功能
缓存功能:提供二级缓存的远程缓冲功能,也可以满足很多需求
持久化功能:以key-value方式的存储足以满足很多用户存档的需求
对于现代服务器系统,需要增加的能力还有:
集群功能:可以用SOA但自定义路由的方式,提供集群服务
协程功能:避免大量异步回调的代码阅读问题
组件功能:给框架一个结合不同体系代码的接口
全文完。
原创文章,作者:棋牌游戏,如若转载,请注明出处:https://www.qp49.com/2019/04/23/1997.html