最后更新于 .

博客这几天由于服务器的问题打不开,在这里跟大家抱歉啦 老读者应该都知道,笔者有两个开源项目,分别是: fuload: 性能测试工具,可以用来给服务器做压力测试 bayonet: 基于两层状态机的epoll服务器框架 对于fuload的介绍,请看这里: fuload开源压力测试框架完成! 对于bayonet的介绍,请看这里: 有限状态机的C++实现(1)-epoll状态机 有限状态机的C++实现(2)-bayonet开源网络服务器框架 之前由于工作等原因,bayonet一直被搁置,最近有时间,所以就抓紧把bayonet完成了,目前功能上基本已经OK了,我简单列一下功能点:

  1. 接管了网络,调用方只需要关心业务逻辑
  2. 配置的方式,快速切换TCP-UDP
  3. 快速的增加加状态机的状态,业务可以无限拓展

代码编写中,也用到了很多技术,如引用计数来保证野指针不被访问(通过引用计数解决野指针的问题(C&C++)),延迟析构对象,等等。 由于详细介绍是个很庞大的工作,所以这里就直接放出一个基于bayonet写的http代理,我们来看一下,只需要多少代码:

#include <iostream>
#include <memory>
#include <string>
#include <vector>
#include <set>
#include <map>

#include "bayonet_frame.h"
using namespace std;

#define APP_FSM_PROXY 2000
#define APP_FSM_LOGIC2 2001

/**
 * @brief   获取ContentLen的数字的起始和长度
 *
 * @param   strHttpBuf
 * @param   len
 *
 * @return  
 */
size_t GetContentLenPos(const string& strHttpBuf, int& len)
{
    string strContentLenKey = "Content-Length";

    size_t pos;
    size_t end_pos;
    pos = strHttpBuf.find(strContentLenKey);
    if (pos == string::npos)
    {
        return pos;
    }
    pos += strContentLenKey.size();

    pos = strHttpBuf.find(":", pos);
    if (pos == string::npos)
    {
        return pos;
    }
    pos += 1;

    end_pos = strHttpBuf.find("\r\n", pos);
    if (end_pos == string::npos)
    {
        return end_pos;
    }

    pos = strHttpBuf.find_first_not_of(" ", pos);//只能查找字符
    if (pos == string::npos || pos >= end_pos)
    {
        return pos;
    }
    len = end_pos - pos;
    return pos;
}

int HttpHandleInput(const char* buf, int len)
{
    string strHttpBuf(buf,len);

    int contentLenLen = 0;
    size_t contentLenPos = GetContentLenPos(strHttpBuf,contentLenLen);
    if (contentLenPos == string::npos)
    {
        //说明直接接收完了
        return len;
    }

    //获取原来的content-len的值
    string lenNum = strHttpBuf.substr(contentLenPos, contentLenLen);
    int iContentLen = atoi(lenNum.c_str());

    //接下来我们要看看当前接受的buf大小是否等于 head + content len

    string spStr = "\r\n\r\n";
    size_t spPos = strHttpBuf.find(spStr);
    if (spPos == string::npos)
    {
        //没接收完,继续接收
        return 0;
    }
    int headLen = spPos+spStr.size();

    int realLen = headLen + iContentLen;
    if (realLen > len)
    {
        return 0;
    }

    return realLen;
}

class CMyActor : public CAppActorBase
{
public:
    CMyActor() {}
    virtual ~CMyActor() {}

    string m_req;
    string m_rsp;
};

class CActionFirst : public IAction
{
public:
    int HandleEncodeSendBuf(
        CSocketActorData* pSocketActor,
        CAppActorBase* pAppActor,
        string & strSendBuf,
        int &len)
    {
        CMyActor* app_actor = (CMyActor*)pAppActor;
        if (app_actor == NULL)
        {
            return -1;
        }
        strSendBuf = app_actor->m_rsp;
        len = strSendBuf.size();
        return 0;
    }

    int HandleInput(
        CSocketActorData* pSocketActor,
        CAppActorBase* pAppActor,
        const char *buf,
        int len)
    {
        return HttpHandleInput(buf,len);
    }

    int HandleDecodeRecvBuf(
        CSocketActorData* pSocketActor,
        CAppActorBase* pAppActor,
        const char *buf, 
        int len)
    {
        CMyActor * app_actor = new CMyActor();
        app_actor->AttachFrame(pSocketActor->GetFrame());
        app_actor->AttachCommu(pSocketActor);

        app_actor->m_req = string(buf,len);

        //转化状态操作一定要放在最后一步
        app_actor->ChangeState(APP_FSM_PROXY);
        return 0;
    }
};

class CActionGetData: public IAction
{
public:
    // 为发送打包
    int HandleEncodeSendBuf(
        CSocketActorData* pSocketActor,
        CAppActorBase* pAppActor,
        string & strSendBuf,
        int &len)
    {
        CMyActor* app_actor = (CMyActor*)pAppActor;
        if (app_actor == NULL)
        {
            return -1;
        }
        strSendBuf=app_actor->m_req;
        len = strSendBuf.size();
        return 0;
    }

    // 回应包完整性检查
    int HandleInput(
        CSocketActorData* pSocketActor,
        CAppActorBase* pAppActor,
        const char *buf,
        int len)
    {
        return HttpHandleInput(buf,len);
    }

    // 回应包解析
    int HandleDecodeRecvBuf(
        CSocketActorData* pSocketActor,
        CAppActorBase* pAppActor,
        const char *buf, 
        int len)
    {
        CMyActor* app_actor = (CMyActor*)pAppActor;
        //因为很有可能,appactor已经由于commu超时的原因被析构掉了
        if (app_actor == NULL)
        {
            return -1;
        }
        app_actor->m_rsp = string(buf,len);
        return 0;
    }
};

class CAppFsmProxy : public CAppFsmBase
{
public:
    virtual ~CAppFsmProxy () {}
    virtual int HandleEntry(CActionInfoSet *pActionInfoSet, CAppActorBase* pAppActor)
    {
        static CActionGetData actionGetData;
        StActionInfoParam param;
        param.id = 1;
        param.ip = "10.6.207.189";
        param.port = 80;
        param.protoType = PROTO_TYPE_TCP;
        param.pAction = &actionGetData;
        param.actionType = ACTIONTYPE_SENDRECV;
        param.timeout_ms = 1000;

        CActionInfo * pActionInfo = new CActionInfo();
        pActionInfo->Init(param);
        pActionInfoSet->Add(pActionInfo);

        return 0;
    }
    virtual int HandleProcess(CActionInfoSet *pActionInfoSet, CAppActorBase* pAppActor)
    {
        set<CActionInfo*> &setAction = pActionInfoSet->GetActionSet();
        for(set<CActionInfo*>::iterator it = setAction.begin(); it != setAction.end(); ++it)
        {
            trace_log("id:%d,error no:%d,timecost:%u ms",(*it)->GetID(),(*it)->GetErrno(),(*it)->GetTimeCost());
        }
        return APP_FSM_RSP;//代表要回复客户端啦
    }
    virtual int HandleExit(CActionInfoSet *pActionInfoSet, CAppActorBase* pAppActor)
    {
        return 0;
    }
};

int main(int argc, const char *argv[])
{
    CBayonetFrame srv;
    StFrameParam param;
    param.infoDir="bayonet";
    param.ip="0.0.0.0";
    param.port = 20001;
    param.protoType = PROTO_TYPE_TCP;
    param.pAction = new CActionFirst();
    param.gcMaxCount = 10;

    int ret = srv.Init(param);
    if (ret != 0)
    {
        return -1;
    }
    srv.RegFsm(APP_FSM_PROXY,new CAppFsmProxy());
    srv.Process();
    return 0;
}

所有发往服务器的http请求,都会被转发到 10.6.207.189 的 80 端口,然后返回给调用端。 这就是所有需要调用端编写的代码了,如果去掉HTTP协议解析的那两个函数,并且去掉空行,一共才不到160行代码。而且你可以随意的再网上加逻辑状态,比如想要在拿到客户端请求之后,再去login服务器验证一下登录态,那只要在注册一个新的状态即可。

如果想看更多的例子,可以去svn上看如下路径的代码: http://code.google.com/p/bayonet/source/browse/#svn%2Ftrunk%2Fsrc%2Fsvr http://code.google.com/p/bayonet/source/browse/#svn%2Ftrunk%2Fsrc%2Fsvr2 http://code.google.com/p/bayonet/source/browse/#svn%2Ftrunk%2Fsrc%2Fhttp

当然,现在也还有很多问题,比如性能还没有做优化,各位如果有兴趣的话,欢迎和我一起完成剩下的性能调优工作,我相信这个过程一定比编码阶段有聊的多~

Pingbacks

  1. 第四部 &raquo; 博客推荐14:Vimer的程序世界 on #

    [...] bayonet开源网络服务器框架正式完成! [...]

Pingbacks已打开。

Trackbacks

引用地址

评论

  1. wf168

    wf168 on #

    学习了

    Reply

  2. moper

    moper on #

    说实话,感觉有点高深,呵呵。很厉害的呢,加油~

    Reply

  3. liu1061

    liu1061 on #

    不得啊,你在这方面走的好远啊! 我看你在开源这方面了得,看到你常用nginx, 我现在在看这个源码,看的也是一头晕, 可否给指点一下啊! 谢谢,我可是你的Blog的常客啊!呵呵!

    Reply

    1. Dante

      Dante on #

      过奖哈,只是想多练练手,也沉淀一下。
      文中的链接有详细的介绍了bayonet的设计思路,可以看一下哈
      其实代码本身不是很复杂,比起ngx的复杂度差得远。。。

      Reply

  4. vivi

    vivi on #

    支持一下 , 感觉不错。

    Reply

  5. bopy

    bopy on #

    楼主,直接编译不过,查了下是#include在高版本的linux里面已经没有了,可以修改为gcc提供的__sync_*系列的built-in函数。 最简单的办法是改为#include 但是要安装libasound2-dev. 支持楼主!

    Reply

    1. bopy

      bopy on #

      晕....尖括号不能显示 第一处为 asm/atomic.h 第二处是alsa/iatomic.h

      Reply

发表评论