ACE之Reactor模式使用实例
生活随笔
收集整理的這篇文章主要介紹了
ACE之Reactor模式使用实例
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
// ACE_Reactor_Client.cpp : 定義控制臺應(yīng)用程序的入口點。
//#include "stdafx.h"#include "ace/Reactor.h"
#include "ace/SOCK_Connector.h"
#include "ace/OS.h"
#include "ace/Log_Msg.h"
#include <string>
#include <iostream>
using namespace std; class MyClient:public ACE_Event_Handler
{
public: bool do_connect(string ip, int port, int local_port) { ACE_SOCK_Connector connector; ACE_INET_Addr local_addr(local_port,"0.0.0.0");ACE_INET_Addr addr(port,ip.c_str()); ACE_Time_Value timeout(5,0); if(connector.connect(peer_sock,addr,&timeout, local_addr) != 0) { cout<<"connect fail."<<endl; return false; } int ret = ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::READ_MASK); if (ret != 0){cout<<"local_port:"<<local_port<<" register_handler fail."<<endl;return false;}sprintf(buf,"%d",local_port); peer_sock.send(buf,strlen(buf)+1); return true; } ACE_HANDLE get_handle(void) const { return peer_sock.get_handle(); } int handle_input (ACE_HANDLE fd) { int rev=0; ACE_Time_Value timeout(5,0); if((rev=peer_sock.recv(buf,sizeof(buf),&timeout))>0) { buf[rev]='\0'; cout<<"recv: "<<buf<<endl; } ACE_INET_Addr raddr;peer_sock.get_local_addr(raddr);//ACE_DEBUG ((LM_DEBUG,ACE_TEXT ( " (%P|%t) close:%s %d\n " ),raddr.get_host_addr(),raddr.get_port_number()));sprintf(buf,"%d",raddr.get_port_number()); peer_sock.send(buf,strlen(buf)+1); return 0; } private: ACE_SOCK_Stream peer_sock;char buf[100];
};#include <ace/OS.h>
#include <ace/Task.h>class TTcpNetThread : public ACE_Task_Base
{
public:/// 運行int open();/// 停止運行int close();
protected:/// 線程函數(shù)virtual int svc();
};int TTcpNetThread::open() { return this->activate(); }int TTcpNetThread::close()
{ACE_Reactor::instance()->end_reactor_event_loop(); // 終止ACE_Proactor循環(huán)this->wait(); // 等待清理現(xiàn)場return 0;
}int TTcpNetThread::svc()
{// Proactor的事件循環(huán)開始while(!ACE_Reactor::instance()->event_loop_done()) { ACE_Reactor::instance()->handle_events(); }ACE_DEBUG((LM_DEBUG, ACE_TEXT("Network fin\n")));return 0;
}/**********************************************************************************************
在Socket編程中,常見的事件就是"讀就緒","寫就緒",通過對這兩個事件的捕獲分發(fā),可以實現(xiàn)Socket中的異步操作。Socket編程中的事件處理器在前面我們已經(jīng)介紹過,在ACE反應(yīng)器框架中,任何都必須派生自ACE_Event_Handler類,并通過重載其相應(yīng)會調(diào)事件處理函數(shù)來實現(xiàn)相應(yīng)的回調(diào)處理的。在Socket編程中,我們通常需要重載的函數(shù)有1.handle_input()
當(dāng)I/O句柄(比如UNIX中的文件描述符)上的輸入可用時,反應(yīng)器自動回調(diào)該方法。2.handle_output()
當(dāng)I/O設(shè)備的輸出隊列有可用空間時,反應(yīng)器自動回調(diào)該方法。3.handle_close()
當(dāng)事件處理器中的事件從Reactor中移除的時候調(diào)用。 此外,為了使Reactor能通過I/O句柄找到對應(yīng)的事件處理器,還必須重載其get_handle()方法以使得Reactor建立起I/O句柄和事件處理器的關(guān)聯(lián)。
***********************************************************************************************/
#pragma comment(lib,"ACEd.lib")#define CLIENT_THREAD_NUM 4int main(int argc, char *argv[])
{ for (int i=0;i<2000;i++){MyClient *client = new MyClient; if (!client->do_connect("127.0.0.1",4567,10000+i))break; }system("pause");TTcpNetThread netThread[CLIENT_THREAD_NUM];for(int i = 0; i < CLIENT_THREAD_NUM; i++){netThread[i].open();}while (getchar()){ACE_OS::sleep(1);}/*while(true) { ACE_Reactor::instance()->handle_events(); }*/ return 0;
}
// ACE_Reactor_Server.cpp : 定義控制臺應(yīng)用程序的入口點。 //#include "stdafx.h"#include <ace/Reactor.h> #include <ace/SOCK_Connector.h> #include <ace/SOCK_Acceptor.h> #include <ace/Auto_Ptr.h> #include "ace/OS.h" #include "ace/Log_Msg.h" #include <list> #pragma comment(lib,"ACEd.lib")class ClientService : public ACE_Event_Handler { public: ACE_SOCK_Stream &peer (void) { return this->sock_; } int regist_this(void) { //注冊讀就緒回調(diào)函數(shù) return this->reactor ()->register_handler(this, ACE_Event_Handler::READ_MASK); } virtual ACE_HANDLE get_handle (void) const { return this->sock_.get_handle (); } virtual int handle_input (ACE_HANDLE fd ) { int rev = peer().recv(buf,sizeof(buf)); if(rev<=0) return -1; buf[rev] = '\0'; printf("recv:%s",buf); return 0; } // 釋放相應(yīng)資源 virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask mask) { if (mask == ACE_Event_Handler::WRITE_MASK) return 0; mask = ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL; this->reactor ()->remove_handler (this, mask); this->sock_.close (); delete this; //socket出錯時,將自動刪除該客戶端,釋放相應(yīng)資源 return 0; } protected: char buf[100]; ACE_SOCK_Stream sock_; }; class ClientAcceptor : public ACE_Event_Handler { public: virtual ~ClientAcceptor (){this->handle_close (ACE_INVALID_HANDLE, 0);} int start_listen (const ACE_INET_Addr &listen_addr) { if (this->acceptor_.open (listen_addr, 1) == -1) { printf("open port fail\n"); return -1; } //注冊接受連接回調(diào)事件 return this->reactor ()->register_handler(this, ACE_Event_Handler::ACCEPT_MASK); } virtual ACE_HANDLE get_handle (void) const { return this->acceptor_.get_handle (); } virtual int handle_input (ACE_HANDLE fd ) { ClientService *client = new ClientService(); auto_ptr<ClientService> p (client); if (this->acceptor_.accept (client->peer ()) == -1) { printf("accept client fail\n"); return -1; } p.release (); client->reactor (this->reactor ()); if (client->regist_this () == -1) client->handle_close (ACE_INVALID_HANDLE, 0); return 0; } virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask) { if (this->acceptor_.get_handle () != ACE_INVALID_HANDLE) { ACE_Reactor_Mask m = ACE_Event_Handler::ACCEPT_MASK | ACE_Event_Handler::DONT_CALL; this->reactor ()->remove_handler (this, m); this->acceptor_.close (); } return 0; } protected: ACE_SOCK_Acceptor acceptor_; }; int main1(int argc, char *argv[]) { ACE_INET_Addr addr(4567,"127.0.0.1"); ClientAcceptor server; server.reactor(ACE_Reactor::instance()); server.start_listen(addr); while(true) { ACE_Reactor::instance()->handle_events(); } return 0; }#define MAX_BUFF_SIZE 1024 #define LISTEN_PORT 4567 #define SERVER_IP ACE_LOCALHOSTclass ClientHandler : public ACE_Event_Handler { public:friend class ServerAcceptor; public:ClientHandler(){}~ClientHandler(){sock_stream.close();ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);}int send_some(const void *buff, int bytes){return sock_stream.send(buff,bytes);}ACE_SOCK_Stream& GetStream(){return sock_stream;} //給accept提供接口綁定數(shù)據(jù)通道 public:virtual int handle_input(ACE_HANDLE fd); //I/O觸發(fā)事件后調(diào)用virtual ACE_HANDLE get_handle(void) const {return sock_stream.get_handle();} //不重載需要手動將handle傳入ACE_Reactor private:ACE_INET_Addr Cli_addr;ACE_SOCK_Stream sock_stream; };int ClientHandler::handle_input(ACE_HANDLE fd) {char strBuffer[MAX_BUFF_SIZE];int byte = sock_stream.recv(strBuffer,MAX_BUFF_SIZE); //可讀數(shù)據(jù)if (-1 == byte){ACE_DEBUG((LM_INFO, ACE_TEXT("receive data failed\n")));}else if(0 == byte){sock_stream.close();ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);ACE_DEBUG((LM_INFO, ACE_TEXT("client closed!\n")));}else{ACE_DEBUG((LM_INFO, ACE_TEXT("receive:%s\n"),strBuffer));sock_stream.send(strBuffer,strlen(strBuffer)+1);}return 0; }// ServerAcceptor class ServerAcceptor : public ACE_Event_Handler { public:ServerAcceptor(int port,char* ip);~ServerAcceptor();virtual int handle_input(ACE_HANDLE fd); // ACE框架回調(diào)virtual ACE_HANDLE get_handle(void) const {return Svr_aceept.get_handle();} private:ACE_INET_Addr Svr_addr;ACE_SOCK_Acceptor Svr_aceept;std::list<ClientHandler*> m_streamPool; //stream pool };ServerAcceptor::ServerAcceptor(int port,char* ip):Svr_addr(port,ip) {if (-1 == Svr_aceept.open(Svr_addr,1)){ACE_DEBUG((LM_ERROR,ACE_TEXT("accept open failed\n")));Svr_aceept.close();}ACE_DEBUG((LM_ERROR,ACE_TEXT("accept open success\n"))); }ServerAcceptor::~ServerAcceptor() {ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::ACCEPT_MASK);Svr_aceept.close();std::list<ClientHandler*>::iterator it;for (it = m_streamPool.begin();it != m_streamPool.end();++it){if (NULL != (*it)){delete (*it);}} } #include "ace/SOCK_SEQPACK_Association.h" int ServerAcceptor::handle_input(ACE_HANDLE fd ) {ClientHandler *stream = new ClientHandler(); //產(chǎn)生新通道if (NULL != stream){m_streamPool.push_back(stream);}if (Svr_aceept.accept(stream->GetStream()) == -1) //綁定通道{ printf("accept client fail\n"); return -1; }ACE_Reactor::instance()->register_handler(stream,ACE_Event_Handler::READ_MASK); //通道注冊到ACE_ReactorACE_INET_Addr raddr;stream->GetStream().get_remote_addr(raddr);ACE_DEBUG ((LM_DEBUG,ACE_TEXT ( "client:%s %d\n" ),raddr.get_host_addr(),raddr.get_port_number()));/*ACE_INET_Addr addr;ACE_SOCK_SEQPACK_Association ass=ACE_SOCK_SEQPACK_Association(fd);size_t addr_size=sizeof ACE_INET_Addr;ass.get_remote_addrs(&addr,addr_size);ACE_OS::printf("fd:%d ip:%d port:%d\n",(int)fd, addr.get_ip_address(), addr.get_port_number());*///ACE_DEBUG((LM_ERROR,ACE_TEXT("User connect success!\n")));return 0; } #include <ace/OS.h> #include <ace/Task.h>class TTcpNetThread : public ACE_Task_Base { public:/// 運行int open();/// 停止運行int close(); protected:/// 線程函數(shù)virtual int svc(); };int TTcpNetThread::open() { return this->activate(); }int TTcpNetThread::close() {ACE_Reactor::instance()->end_reactor_event_loop(); // 終止ACE_Proactor循環(huán)this->wait(); // 等待清理現(xiàn)場return 0; }int TTcpNetThread::svc() {ACE_Reactor::instance()->run_reactor_event_loop();ACE_DEBUG((LM_DEBUG, ACE_TEXT("Network fin\n")));return 0; }#define CLIENT_THREAD_NUM 4int main(int argc, char *argv[]) {ServerAcceptor server(LISTEN_PORT,(char *)SERVER_IP);//listen port注冊到ACE_ReactorACE_Reactor::instance()->register_handler(&server,ACE_Event_Handler::ACCEPT_MASK);TTcpNetThread netThread[CLIENT_THREAD_NUM];for(int i = 0; i < CLIENT_THREAD_NUM; i++){netThread[i].open();}while (getchar()){ACE_OS::sleep(1);}//進(jìn)入消息循環(huán),有I/O事件回調(diào)handle_input//ACE_Reactor::instance()->run_reactor_event_loop();return 0; }
// ACE_Reactor_Server.cpp : 定義控制臺應(yīng)用程序的入口點。 //#include "stdafx.h"#include <ace/Reactor.h> #include <ace/SOCK_Connector.h> #include <ace/SOCK_Acceptor.h> #include <ace/Auto_Ptr.h> #include "ace/OS.h" #include "ace/Log_Msg.h" #include <list> #pragma comment(lib,"ACEd.lib")class ClientService : public ACE_Event_Handler { public: ACE_SOCK_Stream &peer (void) { return this->sock_; } int regist_this(void) { //注冊讀就緒回調(diào)函數(shù) return this->reactor ()->register_handler(this, ACE_Event_Handler::READ_MASK); } virtual ACE_HANDLE get_handle (void) const { return this->sock_.get_handle (); } virtual int handle_input (ACE_HANDLE fd ) { int rev = peer().recv(buf,sizeof(buf)); if(rev<=0) return -1; buf[rev] = '\0'; printf("recv:%s",buf); return 0; } // 釋放相應(yīng)資源 virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask mask) { if (mask == ACE_Event_Handler::WRITE_MASK) return 0; mask = ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL; this->reactor ()->remove_handler (this, mask); this->sock_.close (); delete this; //socket出錯時,將自動刪除該客戶端,釋放相應(yīng)資源 return 0; } protected: char buf[100]; ACE_SOCK_Stream sock_; }; class ClientAcceptor : public ACE_Event_Handler { public: virtual ~ClientAcceptor (){this->handle_close (ACE_INVALID_HANDLE, 0);} int start_listen (const ACE_INET_Addr &listen_addr) { if (this->acceptor_.open (listen_addr, 1) == -1) { printf("open port fail\n"); return -1; } //注冊接受連接回調(diào)事件 return this->reactor ()->register_handler(this, ACE_Event_Handler::ACCEPT_MASK); } virtual ACE_HANDLE get_handle (void) const { return this->acceptor_.get_handle (); } virtual int handle_input (ACE_HANDLE fd ) { ClientService *client = new ClientService(); auto_ptr<ClientService> p (client); if (this->acceptor_.accept (client->peer ()) == -1) { printf("accept client fail\n"); return -1; } p.release (); client->reactor (this->reactor ()); if (client->regist_this () == -1) client->handle_close (ACE_INVALID_HANDLE, 0); return 0; } virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask) { if (this->acceptor_.get_handle () != ACE_INVALID_HANDLE) { ACE_Reactor_Mask m = ACE_Event_Handler::ACCEPT_MASK | ACE_Event_Handler::DONT_CALL; this->reactor ()->remove_handler (this, m); this->acceptor_.close (); } return 0; } protected: ACE_SOCK_Acceptor acceptor_; }; int main1(int argc, char *argv[]) { ACE_INET_Addr addr(4567,"127.0.0.1"); ClientAcceptor server; server.reactor(ACE_Reactor::instance()); server.start_listen(addr); while(true) { ACE_Reactor::instance()->handle_events(); } return 0; }#define MAX_BUFF_SIZE 1024 #define LISTEN_PORT 4567 #define SERVER_IP ACE_LOCALHOSTclass ClientHandler : public ACE_Event_Handler { public:friend class ServerAcceptor; public:ClientHandler(){}~ClientHandler(){sock_stream.close();ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);}int send_some(const void *buff, int bytes){return sock_stream.send(buff,bytes);}ACE_SOCK_Stream& GetStream(){return sock_stream;} //給accept提供接口綁定數(shù)據(jù)通道 public:virtual int handle_input(ACE_HANDLE fd); //I/O觸發(fā)事件后調(diào)用virtual ACE_HANDLE get_handle(void) const {return sock_stream.get_handle();} //不重載需要手動將handle傳入ACE_Reactor private:ACE_INET_Addr Cli_addr;ACE_SOCK_Stream sock_stream; };int ClientHandler::handle_input(ACE_HANDLE fd) {char strBuffer[MAX_BUFF_SIZE];int byte = sock_stream.recv(strBuffer,MAX_BUFF_SIZE); //可讀數(shù)據(jù)if (-1 == byte){ACE_DEBUG((LM_INFO, ACE_TEXT("receive data failed\n")));}else if(0 == byte){sock_stream.close();ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);ACE_DEBUG((LM_INFO, ACE_TEXT("client closed!\n")));}else{ACE_DEBUG((LM_INFO, ACE_TEXT("receive:%s\n"),strBuffer));sock_stream.send(strBuffer,strlen(strBuffer)+1);}return 0; }// ServerAcceptor class ServerAcceptor : public ACE_Event_Handler { public:ServerAcceptor(int port,char* ip);~ServerAcceptor();virtual int handle_input(ACE_HANDLE fd); // ACE框架回調(diào)virtual ACE_HANDLE get_handle(void) const {return Svr_aceept.get_handle();} private:ACE_INET_Addr Svr_addr;ACE_SOCK_Acceptor Svr_aceept;std::list<ClientHandler*> m_streamPool; //stream pool };ServerAcceptor::ServerAcceptor(int port,char* ip):Svr_addr(port,ip) {if (-1 == Svr_aceept.open(Svr_addr,1)){ACE_DEBUG((LM_ERROR,ACE_TEXT("accept open failed\n")));Svr_aceept.close();}ACE_DEBUG((LM_ERROR,ACE_TEXT("accept open success\n"))); }ServerAcceptor::~ServerAcceptor() {ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::ACCEPT_MASK);Svr_aceept.close();std::list<ClientHandler*>::iterator it;for (it = m_streamPool.begin();it != m_streamPool.end();++it){if (NULL != (*it)){delete (*it);}} } #include "ace/SOCK_SEQPACK_Association.h" int ServerAcceptor::handle_input(ACE_HANDLE fd ) {ClientHandler *stream = new ClientHandler(); //產(chǎn)生新通道if (NULL != stream){m_streamPool.push_back(stream);}if (Svr_aceept.accept(stream->GetStream()) == -1) //綁定通道{ printf("accept client fail\n"); return -1; }ACE_Reactor::instance()->register_handler(stream,ACE_Event_Handler::READ_MASK); //通道注冊到ACE_ReactorACE_INET_Addr raddr;stream->GetStream().get_remote_addr(raddr);ACE_DEBUG ((LM_DEBUG,ACE_TEXT ( "client:%s %d\n" ),raddr.get_host_addr(),raddr.get_port_number()));/*ACE_INET_Addr addr;ACE_SOCK_SEQPACK_Association ass=ACE_SOCK_SEQPACK_Association(fd);size_t addr_size=sizeof ACE_INET_Addr;ass.get_remote_addrs(&addr,addr_size);ACE_OS::printf("fd:%d ip:%d port:%d\n",(int)fd, addr.get_ip_address(), addr.get_port_number());*///ACE_DEBUG((LM_ERROR,ACE_TEXT("User connect success!\n")));return 0; } #include <ace/OS.h> #include <ace/Task.h>class TTcpNetThread : public ACE_Task_Base { public:/// 運行int open();/// 停止運行int close(); protected:/// 線程函數(shù)virtual int svc(); };int TTcpNetThread::open() { return this->activate(); }int TTcpNetThread::close() {ACE_Reactor::instance()->end_reactor_event_loop(); // 終止ACE_Proactor循環(huán)this->wait(); // 等待清理現(xiàn)場return 0; }int TTcpNetThread::svc() {ACE_Reactor::instance()->run_reactor_event_loop();ACE_DEBUG((LM_DEBUG, ACE_TEXT("Network fin\n")));return 0; }#define CLIENT_THREAD_NUM 4int main(int argc, char *argv[]) {ServerAcceptor server(LISTEN_PORT,(char *)SERVER_IP);//listen port注冊到ACE_ReactorACE_Reactor::instance()->register_handler(&server,ACE_Event_Handler::ACCEPT_MASK);TTcpNetThread netThread[CLIENT_THREAD_NUM];for(int i = 0; i < CLIENT_THREAD_NUM; i++){netThread[i].open();}while (getchar()){ACE_OS::sleep(1);}//進(jìn)入消息循環(huán),有I/O事件回調(diào)handle_input//ACE_Reactor::instance()->run_reactor_event_loop();return 0; }
總結(jié)
以上是生活随笔為你收集整理的ACE之Reactor模式使用实例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C++使用ICE实现两台主机通信实例
- 下一篇: ACE之Proactor模式使用实例