视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
基于Hiredis异步API的聊天系统实现
2020-11-09 16:26:21 责编:小采
文档


基于Hiredis异步API的聊天系统实现 上一篇文章http://blog.csdn.net/qq_34788352/article/details/51313027使用Hiredis的同步API实现了发送消息的客户端,当我使用同步API实现订阅频道客户端时,一旦订阅频道,就会出现无法操作的情况,这是就是同步和异步的

基于Hiredis异步API的聊天系统实现

上一篇文章http://blog.csdn.net/qq_34788352/article/details/51313027使用Hiredis的同步API实现了发送消息的客户端,当我使用同步API实现订阅频道客户端时,一旦订阅频道,就会出现无法操作的情况,这是就是同步和异步的问题。使用同步API,订阅频道后,客户端会进入阻塞状态,等待订阅频道发布的消息,不能实现既订阅频道,又能发布消息的功能。为了实现一个客户端既能订阅频道,又能发布消息的功能,就需要使用Hiredis的异步API。

首先特别感谢黄天霸、tickTick、vah101,当我遇到各种各样奇怪问题的时候,你们的帖子给与了我解答,谢谢。

开始正题,hiredis异步的实现主要是依靠redis自带的ae事件库或者libev事件库或者libevent的事件库或者libuv事件库。网上一些人是通过libevent事件库来实现,本系统则使用Redis自带ae事件库来实现,Redis不用libevent事件库而选择重写一个ae事件库,必定有其道理。

首先介绍使用到的异步API,位于async.h中:

redisAsyncContext *redisAsyncConnect(const char *ip, int port); //用于建立异步连接

int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn); //设置连接回调函数,回调函数形式:void callback(const redisAsyncContext *c, int status);

int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn); //设置断开连接回调函数,回调函数形式:void callback(const redisAsyncContext *c, int status);

void redisAsyncDisconnect(redisAsyncContext *ac); //断开异步连接

void redisAsyncFree(redisAsyncContext *ac); //释放建立连接时,创建的redisAsyncContext结构

int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...); //发送Redis命令,需要实现一个回调函数来出来命令的返回,fn是回调函数的地址,回调函数形式:void callback(redisAsyncContext *c, void *reply, void *pridata);

有了上面的异步API,就可以开始客户端的撰写。
首先封装订阅端

//sub_client.h

#ifndef REDIS_SUB_CLIENT_H 
#define REDIS_SUB_CLIENT_H 

extern "C"
{ 
#include  
#include 
#include 
#include 
#include  
#include  
} 

#include  
#include  

using namespace std;

class CRedisSubClient 
{ 
public: 

 CRedisSubClient(); 
 ~CRedisSubClient(); 

 bool init(); //初始化,事件对象,信号量 
 bool uninit(); //释放对象
 bool connect(); //连接服务器
 bool disconnect(); //断开服务器

 //订阅频道 
 bool subscribe(const string &channel_name); 

private: 
 // 下面三个回调函数供redis服务调用 
 // 连接回调 
 static void connect_callback(const redisAsyncContext *redis_context, 
 int status); 

 // 断开连接的回调 
 static void disconnect_callback(const redisAsyncContext *redis_context, 
 int status); 

 // 执行命令回调 
 static void command_callback(redisAsyncContext *redis_context, 
 void *reply, void *privdata); 

 // 事件分发线程函数 
 static void *event_thread(void *data); 
 void *event_proc(); 

private: 
 // ae事件对象 
 aeEventLoop *loop; 
 // 事件线程ID 
 pthread_t _event_thread; 
 // 事件线程的信号量 
 sem_t _event_sem; 
 // hiredis异步对象 
 redisAsyncContext *_redis_context; 

}; 

#endif 

使用extern “C”是因为redis和hiredis都是c写的,当使用c++链接c代码生成的库中的函数时,会出现undefined reference的问题。

订阅端的实现

//sub_client.cpp

#include  
#include 
#include  
#include "sub_client.h" 

using namespace std; 

CRedisSubClient::CRedisSubClient():loop(0), _event_thread(0), 
_redis_context(0) 
{ 
} 

CRedisSubClient::~CRedisSubClient() 
{ 
} 

bool CRedisSubClient::init() 
{ 

 loop = aeCreateEventLoop(); // 创建ae对象

 if (NULL == loop) 
 { 
 printf("Create redis event failed.\n"); 
 return false; 
 } 

 memset(&_event_sem, 0, sizeof(_event_sem)); 

 int ret = sem_init(&_event_sem, 0, 0); //初始化线程信号量

 if (ret != 0) 
 { 
 printf("Init sem failed.\n"); 
 return false; 
 } 

 return true; 
} 

bool CRedisSubClient::uninit() 
{ 
 loop = NULL; 

 sem_destroy(&_event_sem); 

 return true; 
} 

bool CRedisSubClient::connect() 
{ 
 _redis_context = redisAsyncConnect("127.0.0.1", 6379); // 异步连接到redis服务器上,使用6380端口

 if (NULL == _redis_context) 
 { 
 printf(": Connect redis failed.\n"); 
 return false; 
 } 

 if (_redis_context->err) 
 { 
 printf("Connect redis error: %d, %s\n", 
 _redis_context->err, _redis_context->errstr); // 
输出错误信息 return false; } redisAeAttach(loop,_redis_context); // 将事件绑定到redis context上,使redis的回调跟事件关联 // 创建事件处理线程 int ret = pthread_create(&_event_thread, 0, &CRedisSubscriber::event_thread, this); if (ret != 0) { printf("Create event thread failed.\n"); disconnect(); return false; } // 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态 redisAsyncSetConnectCallback(_redis_context, &CRedisSubClient::connect_callback); // 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连 redisAsyncSetDisconnectCallback(_redis_context, &CRedisSubClient::disconnect_callback); // 启动事件线程 sem_post(&_event_sem); return true; } bool CRedisSubClient::disconnect() { if (_redis_context) { redisAsyncDisconnect(_redis_context); redisAsyncFree(_redis_context); _redis_context = NULL; } return true; } bool CRedisSubClient::subscribe(const string &channel_name) { int ret = redisAsyncCommand(_redis_context, &CRedisSubscriber::command_callback, this, "SUBSCRIBE %s", channel_name.c_str()); //订阅一个频道 if (REDIS_ERR == ret) { printf("Subscribe command failed: %d\n", ret); return false; } printf("Subscribe success: %s\n", channel_name.c_str()); return true; } void CRedisSubClient::connect_callback(const redisAsyncContext *redis_context, int status) { if (status != REDIS_OK) { printf("Error: %s\n", redis_context->errstr); } else { printf("Redis connected!"); } } void CRedisSubClient::disconnect_callback( const redisAsyncContext *redis_context, int status) { if (status != REDIS_OK) { printf(": Error: %s\n", redis_context->errstr); } } // 消息接收回调函数 void CRedisSubClient::command_callback(redisAsyncContext *redis_context, void *reply, void *privdata) { if (NULL == reply || NULL == privdata) { return ; } redisReply *redis_reply = reinterpret_cast(reply); // 订阅接收到的消息是一个带三元素的数组 if (redis_reply->type == REDIS_REPLY_ARRAY && redis_reply->elements == 3) { printf("Recieve message:%s %s %s\n", redis_reply->element[0]->str redis_reply->element[1]->str redis_reply->element[2]->str); } } void *CRedisSubClient::event_thread(void *data) { if (NULL == data) { printf(": Error!\n"); assert(false); return NULL; } CRedisSubClient *self_this = reinterpret_cast(data); return self_this->event_proc(); } void *CRedisSubClient::event_proc() { sem_wait(&_event_sem); //进行事件处理循环 aeMain(loop); return NULL; }

发布端的封装:

//pub_client.h

#ifndef REDIS_PUB_CLIENT_H 
#define REDIS_PUB_CLIENT_H 

extern "C"
{ 
#include  
#include 
#include 
#include 
#include  
#include  
} 

#include  
#include  

using namespace std;

class CRedisPubClient 
{ 
public: 

 CRedisPubClient(); 
 ~CRedisPubClient(); 

 bool init(); //初始化,事件对象,信号量 
 bool uninit(); //释放对象
 bool connect(); //连接服务器
 bool disconnect(); //断开服务器

 //订阅频道 
 bool publish(const string &channel_name, const string &message); 

private: 
 // 下面三个回调函数供redis服务调用 
 // 连接回调 
 static void connect_callback(const redisAsyncContext *redis_context, 
 int status); 

 // 断开连接的回调 
 static void disconnect_callback(const redisAsyncContext *redis_context, 
 int status); 

 // 执行命令回调 
 static void command_callback(redisAsyncContext *redis_context, 
 void *reply, void *privdata); 

 // 事件分发线程函数 
 static void *event_thread(void *data); 
 void *event_proc(); 

private: 
 // ae事件对象 
 aeEventLoop *loop; 
 // 事件线程ID 
 pthread_t _event_thread; 
 // 事件线程的信号量 
 sem_t _event_sem; 
 // hiredis异步对象 
 redisAsyncContext *_redis_context; 

}; 

#endif 

发布端的实现:

//pub_client.cpp

#include  
#include 
#include  
#include "pub_client.h" 

using namespace std; 

CRedisPubClient::CRedisPubClient():loop(0), _event_thread(0), 
_redis_context(0) 
{ 
} 

CRedisPubClient::~CRedisPubClient() 
{ 
} 

bool CRedisPubClient::init() 
{ 

 loop = aeCreateEventLoop(); // 创建ae对象

 if (NULL == loop) 
 { 
 printf("Create redis event failed.\n"); 
 return false; 
 } 

 memset(&_event_sem, 0, sizeof(_event_sem)); 

 int ret = sem_init(&_event_sem, 0, 0); //初始化线程信号量

 if (ret != 0) 
 { 
 printf("Init sem failed.\n"); 
 return false; 
 } 

 return true; 
} 

bool CRedisPubClient::uninit() 
{ 
 loop = NULL; 

 sem_destroy(&_event_sem); 

 return true; 
} 

bool CRedisPubClient::connect() 
{ 
 _redis_context = redisAsyncConnect("127.0.0.1", 6379); // 异步连接到redis服务器上,使用6380端口

 if (NULL == _redis_context) 
 { 
 printf(": Connect redis failed.\n"); 
 return false; 
 } 

 if (_redis_context->err) 
 { 
 printf("Connect redis error: %d, %s\n", 
 _redis_context->err, _redis_context->errstr); // 
输出错误信息 return false; } redisAeAttach(loop,_redis_context); // 将事件绑定到redis context上,使redis的回调跟事件关联 // 创建事件处理线程 int ret = pthread_create(&_event_thread, 0, &CRedisSubscriber::event_thread, this); if (ret != 0) { printf("Create event thread failed.\n"); disconnect(); return false; } // 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态 redisAsyncSetConnectCallback(_redis_context, &CRedisSubClient::connect_callback); // 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连 redisAsyncSetDisconnectCallback(_redis_context, &CRedisSubClient::disconnect_callback); // 启动事件线程 sem_post(&_event_sem); return true; } bool CRedisPubClient::disconnect() { if (_redis_context) { redisAsyncDisconnect(_redis_context); redisAsyncFree(_redis_context); _redis_context = NULL; } return true; } bool CRedisPubClient::publish(const string &channel_name, const string &message) { int ret = redisAsyncCommand(_redis_context, &CRedisPublisher::command_callback, this, "PUBLISH %s %s", channel_name.c_str(), message.c_str()); //发布消息 if (REDIS_ERR == ret) { printf("Publish command failed: %d\n", ret); return false; } return true; } void CRedisPubClient::connect_callback(const redisAsyncContext *redis_context, int status) { if (status != REDIS_OK) { printf("Error: %s\n", redis_context->errstr); } else { printf("Redis connected!"); } } void CRedisPubClient::disconnect_callback( const redisAsyncContext *redis_context, int status) { if (status != REDIS_OK) { printf(": Error: %s\n", redis_context->errstr); } } // 消息接收回调函数 void CRedisPubClient::command_callback(redisAsyncContext *redis_context, void *reply, void *privdata) { if (NULL == reply || NULL == privdata) { return ; } redisReply *redis_reply = reinterpret_cast(reply); printf("Publish: %s",redis_reply.str); } void *CRedisPubClient::event_thread(void *data) { if (NULL == data) { printf(": Error!\n"); assert(false); return NULL; } CRedisPubClient *self_this = reinterpret_cast(data); return self_this->event_proc(); } void *CRedisPubClient::event_proc() { sem_wait(&_event_sem); //进行事件处理循环 aeMain(loop); return NULL; }

测试封装的sub_client和pub_client类:

//test_subpub.cpp

#include "pub_client.h"
#include "sub_client.h" 

int main(int argc, char *argv[]) 
{ 
 CRedisPubClient publisher; 
 CRedisSubClient subcriber;

 bool ret_pub = publisher.init(); 
 bool ret_sub = subcriber.init();

 if (!ret_sub&&!ret_pub) 
 { 
 printf("Init failed.\n"); 
 return 0; 
 } 

 ret_pub = publisher.connect(); 
 ret_sub = subcriber.connect(); 

 if (!ret_sub&&!ret_pub) 
 { 
 printf("connect failed."); 
 return 0; 
 } 

 subscriber.subcribe("sports");

 while (true) 
 { 
 publisher.publish("sports", "ball"); 
 sleep(1); 
 } 

 publisher.disconnect(); 
 publisher.uninit(); 
 subscriber.disconnect();
 subscriber.disconnect();
 return 0; 

} 

终于到了编译链接运行的阶段,我在这个地方卡了快一天,期间各种编译、链接的错误。直接编译链接会出现找到不aeCreateFileEvent,aeDeleteFileEvent,aeMain等等错误。

  1. 先将redis/src文件夹中的ae.c,ae.h,ae_epoll.c,config.h,zmalloc.c,zmalloc.h拷贝至hiredis目录下
  2. 用gcc -c ae.c gcc -c zmalloc.c生成ae.o和zmalloc.o,利用这两个文件生成静态库 ar -r libar.a ae.o zmalloc.o
  3. 然后编译g++ -o test_subpub test_subpub.cpp pub_client.cpp sub_client.cpp -pthread -I ../ -I./ -I ../adapters ../libhiredis.a ../libae.a
  4. 运行./test_subpub

可以看到,客户端中消息在不断地滚动,即同时实现了订阅频道和发送消息的功能。

下载本文
显示全文
专题