| ce的实例化片段:
begin //Create TCP/Receive Port
itmp:=high(TCPSource)+1;
SetLength(TCPSource,itmp+1);
TCPSource [itmp]:=TServerSocket.Create(self);
TCPSource [itmp].OnClientRead:=TCPServersClientRead;
//分配OnClientRead事件的处理过程为TCPServersClientRead
TCPSource [itmp].OnClientError:=TCPServerClientError;
//分配OnClientError事件的处理过程为TCPServerClientError
TCPSource [itmp].Name:= ''''Receive''''+inttostr(isource);
//设置Name属性为''''Receive''''+Port ID
TCPSource [itmp].Tag:=itarget; //设置为其所在Channel的target ID
TCPSource [itmp].Socket.Data:=@ TCPSource [itmp].Tag;
//将此Port对象的target ID作为指针数据附于Socket对象上
……………
……………
end;
回来接着看我们的comSource的处理,在实例化时我们为其建立了触发时钟,但如何来处理时钟触发时的事件呢?同理,也是事件处理的动态分配。
comSource的时钟的处理定义可在ServiceCreate事件处理中加入: application.OnMessage:=Timer;
实现对消息处理的重载,当有Application的消息产生时,Timer就将被触发,在Timer事件中我们过滤处理时钟触发的WM_TIMER消息,就可以按Port ID和类型实现对特定Source Port的数据获取方法的调用:
Procedure TCarrier.Timer(var Msg: TMsg; var Handled: Boolean);
var stmp:string;
Obj:TComponent;
begin
if Msg.message =WM_TIMER then//处理时钟消息
begin//根据触发消息的Port ID找到定义此消息的对象
Obj:=FindComponent(''''Receive''''+inttostr(Msg.WParam));
if obj=nil then exit;//没有找到就退出处理
stmp:=obj.ClassName;//反射获得此Port对象的类型信息
if stmp=''''TIdPOP3'''' then GetPOP3(TIdPOP3(Obj));
if stmp=''''TIdFTP'''' then GetFTP(TIdFTP(obj));
if stmp=''''TFilePort'''' then GetFile(TFilePort(Obj));
if stmp=''''TCOMPort'''' then GetCOM(TCOMPort(Obj));
//调用COMSource的数据获取过程
……………
……………
end;
end;
vi. 获取数据
以下是COMSource的数据获取处理
procedure TCarrier.GetCOM(COMObj: TCOMPort);
var stmp:string;
COMInterface:OleVariant;
begin
try//根据ComFace的值建立COM组件对象
COMInterface:=CreateOleObject(COMObj.ComFace);
stmp:=COMInterface.GetData; //调用约定的接口方法,获取数据
while stmp<>#0 do // #0为约定的数据提取结束标志
begin
DataArrive(stmp,COMObj.Tag);
//交由data Dispatcher统一处理, COMObj.Tag为对象所在Channel的Target Port ID
stmp:=COMInterface.GetData;
end;
COMInterface:= Unassigned;
except
COMInterface:= Unassigned;
end;
end;// 完成数据提取操作,释放组件对象,直至下一次触发调用
以下是TCPSource的数据获取处理:
procedure TCarrier.TCPServersClientRead(Sender: TObject; Socket:TCustomWinSocket);
begin
DataArrive(socket.ReceiveText,integer(TServerWinSocket(sender).data^));
//交由data Dispatcher统一处理, 第二个参数为附于Socket对象sender上的Target Port ID指针值,
end;
不同类型的Source Port对象其接收数据的方式也不尽相同,但最终都将所接收到的数据交由data Dispatcher做统一处理。从实现层面讲,每加入一种数据接收对象并实现其数据接收,就为Transceiver Shell实现了一种新的Source Port。注:此处作者只是实现了接收文本数据,可能用户需要接收的是内存对象、数据流或二进制数据,对接收代码稍做更改即可。
vii. 数据调度
Transceiver Service的数据调度是由data Dispatcher逻辑单元完成的,Data Dispatcher的主要任务是对从不同的Source Port接收到的数据进行统一的管理与控制、与Channel Controller协同工作,按Channel的定义向不同的Target Port进行数据分发、监视其发送结果成功与否,并根据发送结果和系统配置库的设置决定数据是否需要提交到Queue Manager和Log Recorder进行缓冲和日志处理等等。接下来看看Source Port提交数据的DataArrive方法:
procedure TCarrier.DataArrive(sData:String;PortID:Integer);
var dTime:Datetime;
iLogID:integer;
bSendSeccess:Boolean;
begin
if sData='''''''' then exit;//如数据为空则跳出
iLogID:=-1;
dTime:= now; //接收时间
if sData[length(sdata)]=#0 then sdata:=copy(sdata,1,length(sdata)-1);
//用于兼容C语言的的字符串格式
bSendSeccess:=DataSend(sdata,PortID) ;
//调用 Data Dispatcher发送调度方法,PortID为Target Port ID
if (TSCfg.LogOnlyError=false) or (bSendSeccess=false) then
iLogID:=writeLog(dTime, now,sData, PortID, bSendSeccess);
//根据系统配置信息中的日志处理规则和发送结果记录日志
if (TSCfg.Queueing=True) and (bSendSeccess=false) then
PutQueue(dTime, now,sData, PortID, bSendSeccess, iLogID);
//根据封装系统配置信息中Queue配置定义决定Queue处理
end;
以上是Data Dispatcher的DataArrive方法,其中Queue的处理是按照系统配置信息和发送状态决定的,也可以调整为强制性的队列化处理。下面是Data Dispatcher的DataSend方法,用于将数据按Target Port类型分发处理:
Function TCarrier.DataSend(sData:String;PortID:Integer):boolean;
var Obj:TComponent;
begin
DataSend:=false;
Obj:=FindComponent(''''Send''''+inttostr(PortID)); //根据Port ID找到对象
if (obj=nil) or (obj.Tag =-1) then exit;
//对象不存在或因初始化失败已被标识为无效Port
case obj.Tag of
1:DataSend:=PutTCP(TClientSocket(obj),sdata);
3:DataSend:=PutSMTP(TIdSMTP(obj),sdata);
5:DataSend:=PutFTP(TIdFTP(obj),sdata);
7:DataSend:=PutHTTP(TIdHTTP(obj),sdata);
9:DataSend:=PutFile(TFilePort(obj),sdata);
11:DataSend:=PutMSMQ(TMSMQPort (obj),sdata);
13:DataSend:=PutDB(TDBPort(obj),sdata);
15:DataSend:=PutCOM(TCOMPort (obj),sdata);
……………
……………
end;
end;
值得注意的是,如果没有使用对象数组,而是每种类型的Port只有一个实例的话,处理数据分发处理的更佳办法应该是使用回调(Callback)函数,但在现在的情况下,那将导致不知应该由对象数组中哪一个成员处理数据。另外,现在的处理方法使Transceiver Kernel与Transceiver Shell没有彻底剥离,应该寻求更加抽象、独立性好的处理方法。
viii. 数据发送
以下是TCP的发送
Function TCarrier.PutTCP(TCPOBJ:TClientSocket;sdata:string):Boolean;
var itime:integer;
begin
PutTCP:=false;
try
TCPOBJ.Close;
TCPOBJ.Open;
itime:=gettickcount;//起始时间
repeat
application.ProcessMessages;
until (TCPOBJ.Active=true) or (gettickcount-itime>5000);
//连接成功或5秒超时就跳出循环
if TCPOBJ.Active then
begin
TCPOBJ.Socket.SendText(sdata);
PutTCP:=true;//发送数据成功时,返回值才为True
end;
TCPOBJ.Close;
Except
TCPOBJ.Close;
end;
end;
以下是COM的发送
Function TCarrier.PutCOM(COMOBJ:TCOMPort;sdata:string):Boolean;
var Com:OleVariant;
begin
PutCOM:=false;
try
Com:=CreateOleObject(COMOBJ.ComFace);//建立预定义的接口
PutCOM:=Com.PutData(sdata);//调用预定义的方法
Com:= Unassigned;
except
Com:= Unassigned;
end;
end;
其它类型的Port发送大同小异,在此不再赘述。到此为止,Source和Target的基本处理已经完成。一个基本的通讯功能已经建立,经过不同类型的Source和Target的自由匹配,就可以实现完全不同的通讯功能。建立多个Channel,就可以集中实现多个不同功用的通讯处理。
ix. 队列处理
在上文的DataArrive方法中当数据被发送之后,Data Dispatcher会调用数据日志记录的writeLog和队列化处理的PutQueue方法,二者的功能类似,都是根据系统参数对数据信息进行数据库的存储,不是本文的重点。而队列的Retry处理与Timer事件中按Port类型分发处理的原理类似,是依赖于Queue Timer的触发,将缓冲的数据从数据库中读出,并依照Target Port ID再次调用DataSend进行数据的发送重试,如发送成功,则本次数据传输的事务完成,否则重新进入队列等待下一次触发时间进行重试,直到发送成功或达到设置的最大重试数为止。
三、 开发经验总结
由于本文的侧重点在于说明Transceiver的核心思想与设计理念,简化和削弱了Transceiver作为后台服务应当考虑的多线程处理、对象池化以及事务支持、更为复杂强大的Source和Target的Group管理和Channel集成、收发内存对象、数据流、二进制数据的能力、系统配置信息的读取和其封装类的实现、系统及数据的安全性等等,希望读者朋友们能够抛砖引玉,理解Transceiver的设计思想,启发实际开发工作中的灵感火花,做出更加出色强大的软件。
作者:火鸟 redbirdli@hotmail.com
用Delphi建立通讯与数据交换服务器—Transceiver技术剖析(上)
用Delphi建立通讯与数据交换服务器—Transceiver技术剖析(下)
通过C#实现集合类纵览.NET Collections及相关技术
老东西:程序快捷方式/程序删除项/EXE自删除DIY
老东西:儿时的编程算法心得笔记
上一页 [1] [2] |