WCF后续之旅(11): 关于并发、回调的线程关联性(Thread Affinity)
對于一般的多線程操作,比如異步地進行基于文件系統(tǒng)的IO操作;異步地調用Web Service;或者是異步地進行數(shù)據(jù)庫訪問等等,是和具體的線程無關的。也就是說,對于這些操作,任意創(chuàng)建一個新的線程來執(zhí)行都是等效的。但是有些情況下,有些操作卻只能在固定的線程下執(zhí)行。比如,在GUI應用下,對控件的訪問就需要在創(chuàng)建該控件的線程下執(zhí)行;或者我們在某個固定的線程中通過TLS(Thread Local Storage)設置了一些Context信息,供具體的操作使用,我們把操作和某個固定的線程的依賴稱為線程關聯(lián)性(Thread Affinity)。在這種情況下,我們的異步操作就需要被Marshal到固定的線程執(zhí)行。在WCF并發(fā)或者Callback的情況下也具有這樣的基于線程關聯(lián)性的問題。
一、從基于Windows Application客戶端的WCF回調失敗談起
在"我的WCF之旅"系列文章中,有一篇(WinForm Application中調用Duplex Service出現(xiàn)TimeoutException的原因和解決方案)專門介紹在一個Windows Application客戶端應用, 通過WCF 的Duplex通信方式進行回調失敗的文章.我們今天以此作為出發(fā)點介紹WCF在Thread Affinity下的表現(xiàn)和解決方案.
我們來創(chuàng)建一個WCF的應用來模擬該場景: 客戶端是一個基于Windows Form應用, 完成一個計算器的功能, 用戶輸入操作數(shù),點擊"計算"按鈕, 后臺通過調用WCF service, 并傳遞一個用于顯示計算結果的Callback對象; service進行相應的計算得到最后的運算結果,調用該Callback對象將運算結果顯示到客戶端界面.這是我們的WCF四層結構:
1、Contract:ICalculate & ICalculateCallback
1: namespace Artech.ThreadAffinity.Contracts 2: { 3: [ServiceContract(CallbackContract = typeof(ICalculateCallback))] 4: public interface ICalculate 5: { 6: [OperationContract] 7: void Add(double op1, double op2); 8: } 9: }這是Service Contract,下面是Callback Contract,用于顯示運算結果:
1: namespace Artech.ThreadAffinity.Contracts 2: { 3: public interface ICalculateCallback 4: { 5: [OperationContract] 6: void DisplayResult(double result); 7: } 8: }2、Service:CalculateService
1: namespace Artech.ThreadAffinity.Services 2: { 3: [ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)] 4: public class CalculateService:ICalculate 5: { 6: public static ListBox DisplayPanel 7: { get; set; } 8:? 9: #region ICalculate Members 10:? 11: public void Add(double op1, double op2) 12: { 13: double result = op1 + op2; 14: ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>(); 15:? 16: DisplayPanel.Items.Add(string.Format("{0} + {1} = {2}", op1, op2, result)); 17:? 18: callback.DisplayResult(result); 19: } 20:? 21: #endregion 22: } 23: }
由于需要進行callback, 我們把ConcurrencyMode 設為Reentrant。當?shù)玫竭\算的結果后,通過OperationContext.Current.GetCallbackChannel得到callback對象,并調用之。還有一點需要提的是,該service是通過一個Windows Form application進行host的。并且有一個ListBox列出所有service執(zhí)行的結果,就像這樣:
3、Hosting
Hosting的代碼寫在Form的Load事件中:
1: private void HostForm_Load(object sender, EventArgs e) 2: { 3: this._serviceHost = new ServiceHost(typeof(CalculateService)); 4: CalculateService.DisplayPanel = this.listBoxResult; 5: CalculateService.SynchronizationContext = SynchronizationContext.Current; 6: this._serviceHost.Opened += delegate 7: { 8: this.Text = "The calculate service has been started up!"; 9: }; 10:? 11: this._serviceHost.Open(); 12: }我們注意到了CalculateService使用到的用于顯示所有預算結果的ListBox就是在這了通過static property傳遞的。
這么配置文件
1: <configuration> 2: <system.serviceModel> 3: <services> 4: <service name="Artech.ThreadAffinity.Services.CalculateService"> 5: <endpoint binding="netTcpBinding" bindingConfiguration="" contract="Artech.ThreadAffinity.Contracts.ICalculate" /> 6: <host> 7: <baseAddresses> 8: <add baseAddress="net.tcp://127.0.0.1:8888/calculateservice" /> 9: </baseAddresses> 10: </host> 11: </service> 12: </services> 13: </system.serviceModel> 14: </configuration>4、Client
Client的界面很簡單:輸入兩個操作數(shù),點擊“=”按鈕,將運算結果顯示出來。
先來看看client端對callback contract的實現(xiàn):
1: namespace Clients 2: { 3: public class CalculateCallback : ICalculateCallback 4: { 5: public static TextBox ResultPanel; 6:? 7: #region ICalculateCallback Members 8:? 9: public void DisplayResult(double result) 10: { 11: ResultPanel.Text = result.ToString(); 12: } 13:? 14: #endregion 15: } 16: }這是配置:
1: <configuration> 2: <system.serviceModel> 3: <client> 4: <endpoint address="net.tcp://127.0.0.1:8888/calculateservice" 5: binding="netTcpBinding" bindingConfiguration="" contract="Artech.ThreadAffinity.Contracts.ICalculate" 6: name="calculateservice" /> 7: </client> 8: </system.serviceModel> 9: </configuration> 然后是我們“=”按鈕的單擊事件對運算的實現(xiàn): 1: private void buttonCalculate_Click(object sender, EventArgs e) 2: { 3: CalculateCallback.ResultPanel = this.textBoxResult; 4: DuplexChannelFactory<ICalculate> channelFactory = new DuplexChannelFactory<ICalculate>(new CalculateCallback(), "calculateservice"); 5: ICalculate calculator = channelFactory.CreateChannel(); 6: calculator.Add(double.Parse(this.textBoxOp1.Text), double.Parse(this.textBoxOp2.Text)); 7: }CalculateCallback 用于顯示運算結果的TextBox通過statis property實現(xiàn)傳遞。這個實現(xiàn)很簡單,貌似沒有什么問題,但是我們運行程序,在客戶端就會拋出這樣的exception。可以看出是一個TimeoutException。
二、是什么導致TimeoutException?
我們現(xiàn)在來分析是什么導致了TimeoutException的拋出。原因很簡單:由于我們對service的調用的是在UI 線程調用的,所以在開始調用到最終得到結果,這個UI Thread會被鎖住;但是當service進行了相應的運算的到運算的結果后,需要調用callback對象對client進行回調,默認的情況下,Callback的執(zhí)行是在UI線程執(zhí)行的。當Callback試圖執(zhí)行的時候,發(fā)現(xiàn)UI 線程被鎖,只能等待。這樣形成一個死鎖,UI線程需要等待CalculateService執(zhí)行返回后才能解鎖,而CalculateService需要Callback執(zhí)行完成;而Callback需要等到UI線程解鎖才能執(zhí)行。
基于上門的原因,我們有兩種解決方案:
- CalculateService不必等到Callback執(zhí)行完成就返回,我們可以通過異步調用Callback。或者讓Client異步方式調用CalculateService,以便及時釋放UI線程,我們可以通過One-way的方式來進行service的調用。
- 讓Callback的執(zhí)行不必綁定到UI線程
三、解決方案一:通過異步調用或者One-way回調
為了簡單起見,我們通過ThreadPool實現(xiàn)了異步回調:
1: public void Add(double op1, double op2) 2: { 3: double result = op1 + op2; 4: ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>(); 5:? 6: ThreadPool.QueueUserWorkItem(delegate{ callback.DisplayResult(result); }, null); 7: }這是一種方案,另一種是將Add操作設成One-way的:
1: namespace Artech.ThreadAffinity.Contracts 2: { 3: [ServiceContract(CallbackContract = typeof(ICalculateCallback))] 4: public interface ICalculate 5: { 6: [OperationContract(IsOneWay = true)] 7: void Add(double op1, double op2); 8: } 9: }這兩種方案都可以解決問題。
四、方案二、通過解除Callback操作和UI線程的關聯(lián)性
現(xiàn)在我們才進入我們今天討論的主題:WCF并發(fā)操作的線程關聯(lián)性問題。在這之前,我們需要了解一個重要的對象:SynchonizationContext(System.Threading.SynchronizationContext)。SynchonizationContext就是為了解決這種線程關聯(lián)性問題而設計的。SynchonizationContext提供了兩個主要的API將操作和對應的Thread關聯(lián):Post和Send。
1: public virtual void Post(SendOrPostCallback d, object state) 2: public virtual void Send(SendOrPostCallback d, object state)Send和Post分別以同步和異步的方式將以Delegate表示的具體的操作和SynchonizationContext對象對應的Thread關聯(lián),而SendOrPostCallback delegate對象代表你需要的線程關聯(lián)操作,state代表傳入delegate的參數(shù):
public delegate void SendOrPostCallback(object state);
對于某些具有線程關聯(lián)的應用,比如Windows Form application,在程序啟動的時候,會設置當前的SynchonizationContext對象(Windows Form application使用的是繼承了SynchonizationContext的WindowsFormsSynchronizationContext :System.Windows.Forms.WindowsFormsSynchronizationContext)。當前SynchonizationContext被成功初始化后,你就可以通過SynchonizationContext的靜態(tài)屬性Current得到它。在你自己的應用中,如何有需要,你也可以自定義SynchonizationContext,并通過靜態(tài)方法SetSynchronizationContext將其設置為current SynchronizationContext。
對應WCF來說,無論是host一個service,還是在調用service時制定callback,在默認的情況下,service和callback的操作將自動和當前的SynchonizationContext進行關聯(lián)(如何有的話)。也就是說,如過我們的service被host到Windows Form application下,那么service的操作將在UI 線程下執(zhí)行;同理,如何我們在一個Windows Forms UI線程下調用duplex service并制定callback,那么callback的最終執(zhí)行將在UI線程。
關于WCF對線程關聯(lián)性的控制,可以通過ServiceBehavior或者CallbackBehavior的UseSynchronizationContext屬性進行設定,該屬性默認為true,這正式WCF默認具有線程關聯(lián)性的原因。
現(xiàn)在我們來實現(xiàn)我們的第二套方案:讓Callback的執(zhí)行不必綁定到UI線程。為此我們只需要加上如何的CallbackBehavior attribute就可以了。
1: namespace Artech.ThreadAffinity.Clients 2: { 3: [CallbackBehavior(UseSynchronizationContext = false)] 4: public class CalculateCallback : ICalculateCallback 5: { 6: public static TextBox ResultPanel; 7:? 8: #region ICalculateCallback Members 9:? 10: public void DisplayResult(double result) 11: { 12: ResultPanel.Text = result.ToString(); 13:? 14: } 15:? 16: #endregion 17: } 18: } 19:?但是現(xiàn)在我們運行我們的程序,將會出現(xiàn)如下的InvalidOperation異常:
原因很簡單,由于我們將callbaclk的UseSynchronizationContext 設置成false,那么callback的操作將不會再UI線程下執(zhí)行。但是我們需要運算的結果輸入到UI的TextBox上,對UI上控件的操作需要在UI線程上執(zhí)行,顯然會拋出異常了。
為了我們引入SynchonizationContext到CalculateCallback中:將SynchonizationContext定義成一個static屬性,通過Post方法異步地實現(xiàn)對運算結果的顯示。
1: namespace Artech.ThreadAffinity.Clients 2: { 3: [CallbackBehavior(UseSynchronizationContext = false)] 4: public class CalculateCallback : ICalculateCallback 5: { 6: public static TextBox ResultPanel; 7: public static SynchronizationContext SynchronizationContext; 8:? 9: #region ICalculateCallback Members 10:? 11: public void DisplayResult(double result) 12: { 13: SynchronizationContext.Post(delegate { ResultPanel.Text = result.ToString(); }, null); 14: } 15:? 16: #endregion 17: } 18: }SynchonizationContext在調用service的時候指定:
1: private void buttonCalculate_Click(object sender, EventArgs e) 2: { 3: CalculateCallback.ResultPanel = this.textBoxResult; 4: CalculateCallback.SynchronizationContext = SynchronizationContext.Current; 5:? 6: DuplexChannelFactory<ICalculate> channelFactory = new DuplexChannelFactory<ICalculate>(new CalculateCallback(), "calculateservice"); 7: ICalculate calculator = channelFactory.CreateChannel(); 8: calculator.Add(double.Parse(this.textBoxOp1.Text), double.Parse(this.textBoxOp2.Text)); 9: }現(xiàn)在我們程序能夠正常運行了。
五、另一種可選方案:通過ISynchronizeInvoke的Invoke/BeginInvoke
熟悉Windows Form編程的讀者應該都知道,WinForm空間的基類Control(System.Windows.Forms.Control)都實現(xiàn)了System.ComponentModel.ISynchronizeInvoke接口,而Control對ISynchronizeInvoke的實現(xiàn)就是為了解決Control的操作必須在創(chuàng)建Control線程的問題,ISynchronizeInvoke定義Invoke和BeginInvoke方法方面我們以同步或者異步的方式操作Control:
1: public interface ISynchronizeInvoke 2: { 3: // Methods 4: [HostProtection(SecurityAction.LinkDemand, Synchronization=true, ExternalThreading=true)] 5: IAsyncResult BeginInvoke(Delegate method, object[] args); 6: object EndInvoke(IAsyncResult result); 7: object Invoke(Delegate method, object[] args); 8:? 9: // Properties 10: bool InvokeRequired { get; } 11: } 12:?如何我們放棄基于SynchonizationContext的解決方案,我們也可以通過基于ISynchronizeInvoke的方式來解決這個問題。為此我們這樣定義CalculateCallback:
1: namespace Artech.ThreadAffinity.Clients 2: { 3: [CallbackBehavior(UseSynchronizationContext = false)] 4: public class CalculateCallback : ICalculateCallback 5: { 6: public static TextBox ResultPanel; 7: public delegate void DisplayResultDelegate(TextBox resultPanel, double result); 8:? 9: #region ICalculateCallback Members 10:? 11: public void DisplayResult(double result) 12: { 13: DisplayResultDelegate displayResultDelegate = new DisplayResultDelegate(DisplayResult); 14: ResultPanel.BeginInvoke(displayResultDelegate, new object[] { ResultPanel, result }); 15: } 16:? 17: private void DisplayResult(TextBox resultPanel, double result) 18: { 19: resultPanel.Text = result.ToString(); 20: } 21:? 22: #endregion 23: } 24: } 25:?由于BeginInvoke方式只能接受一個具體的delegate對象(不能使用匿名方法),所以需要定義一個具體的Delegate(DisplayResultDelegate)和對應的方法(DisplayResult),參數(shù)通過一個object[]傳入。
從本質上將,這兩種方式的實現(xiàn)完全是一樣的,如何你查看System.Windows.Forms.WindowsFormsSynchronizationContext的代碼,你會發(fā)現(xiàn)其Send和Post方方法就是通過調用Invoke和BeginInvoke方式實現(xiàn)的。
六、Service Hosting的線程關聯(lián)性
我們花了很多的精力介紹了WCF Duplex通信中Callback操作的線程關聯(lián)性問題,實際上我們使用到更多的還是service操作的線程關聯(lián)性問題。就以我們上面的程序為例,我們通過一個Windows Form application來host我們的service,并且要求service的運算結束后將結果輸出到server端的Window form的ListBox中,對ListBox的操作肯定需要的Host程序的UI線程中執(zhí)行。
按照我們一般的想法,我們的Service面向若干client,肯定是并發(fā)的接收client端的請求,以多線程的方式執(zhí)行service的操作,那么操作中UI 控件的操作肯定會出現(xiàn)錯誤。
我們的程序依然可以正常運行,其根本原因是WCF的service操作默認實現(xiàn)了對Host service的當前線程的SynchonizationContext實現(xiàn)了關聯(lián)。與Callback操作的線程關聯(lián)性通過CallbackBehavior的UseSynchronizationContext 進行控制一樣,service的線程關聯(lián)性通過ServiceBehavir的UseSynchronizationContext 進行設定。UseSynchronizationContext 的默認值為true。
如何我們將CalculateService的UseSynchronizationContext 設為false:
1: namespace Artech.ThreadAffinity.Services 2: { 3: [ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant,UseSynchronizationContext = false)] 4: public class CalculateService:ICalculate 5: { 6: public static ListBox DisplayPanel 7: { get; set; } 8:? 9: #region ICalculate Members 10:? 11: public void Add(double op1, double op2) 12: { 13: double result = op1 + op2; 14: ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>(); 15:? 16: DisplayPanel.Items.Add(string.Format("{0} + {1} = {2}", op1, op2, result)); 17:? 18: callback.DisplayResult(result); 19: } 20:? 21: #endregion 22: } 23: } 24:?有control被不是創(chuàng)建它的線程操作,肯定會拋出一個InvalidOperationException,就像這樣:
我們一樣可以通過SynchonizationContext或者ISynchronizeInvoke的方式來解決這樣的問題,我們只討論前面一種,為此我們改變了CalculateService的定義:通過SynchonizationContext的Post方法實現(xiàn)對ListBox的訪問。
1: namespace Artech.ThreadAffinity.Services 2: { 3: [ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant,UseSynchronizationContext = false)] 4: public class CalculateService:ICalculate 5: { 6: public static ListBox DisplayPanel 7: { get; set; } 8:? 9: public static SynchronizationContext SynchronizationContext 10: { get; set; } 11:? 12: #region ICalculate Members 13:? 14: public void Add(double op1, double op2) 15: { 16: double result = op1 + op2; 17: ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>(); 18: SynchronizationContext.Post(delegate 19: { 20: DisplayPanel.Items.Add(string.Format("{0} + {1} = {2}", op1, op2, result)); 21: }, null); 22:? 23: callback.DisplayResult(result); 24: } 25:? 26: #endregion 27: } 28: } 29:?通過static屬性定義的SynchonizationContext在host的時候指定:
1: private void HostForm_Load(object sender, EventArgs e) 2: { 3: this._serviceHost = new ServiceHost(typeof(CalculateService)); 4: CalculateService.DisplayPanel = this.listBoxResult; 5: CalculateService.SynchronizationContext = SynchronizationContext.Current; 6: this._serviceHost.Opened += delegate 7: { 8: this.Text = "The calculate service has been started up!"; 9: }; 10:? 11: this._serviceHost.Open(); 12: } 13:?這樣我們的程序又可以正常運行了。
WCF后續(xù)之旅:
WCF后續(xù)之旅(1): WCF是如何通過Binding進行通信的
WCF后續(xù)之旅(2): 如何對Channel Layer進行擴展——創(chuàng)建自定義Channel
WCF后續(xù)之旅(3): WCF Service Mode Layer 的中樞—Dispatcher
WCF后續(xù)之旅(4):WCF Extension Point 概覽
WCF后續(xù)之旅(5): 通過WCF Extension實現(xiàn)Localization
WCF后續(xù)之旅(6): 通過WCF Extension實現(xiàn)Context信息的傳遞
WCF后續(xù)之旅(7):通過WCF Extension實現(xiàn)和Enterprise Library Unity Container的集成
WCF后續(xù)之旅(8):通過WCF Extension 實現(xiàn)與MS Enterprise Library Policy Injection Application Block 的集成
WCF后續(xù)之旅(9):通過WCF的雙向通信實現(xiàn)Session管理[Part I]
WCF后續(xù)之旅(9): 通過WCF雙向通信實現(xiàn)Session管理[Part II]
WCF后續(xù)之旅(10): 通過WCF Extension實現(xiàn)以對象池的方式創(chuàng)建Service Instance
WCF后續(xù)之旅(11): 關于并發(fā)、回調的線程關聯(lián)性(Thread Affinity)
WCF后續(xù)之旅(12): 線程關聯(lián)性(Thread Affinity)對WCF并發(fā)訪問的影響
WCF后續(xù)之旅(13): 創(chuàng)建一個簡單的WCF SOAP Message攔截、轉發(fā)工具[上篇]
WCF后續(xù)之旅(13):創(chuàng)建一個簡單的SOAP Message攔截、轉發(fā)工具[下篇]
WCF后續(xù)之旅(14):TCP端口共享
WCF后續(xù)之旅(15): 邏輯地址和物理地址
WCF后續(xù)之旅(16): 消息是如何分發(fā)到Endpoint的--消息篩選(Message Filter)
WCF后續(xù)之旅(17):通過tcpTracer進行消息的路由
轉載于:https://www.cnblogs.com/artech/archive/2008/08/21/1273021.html
總結
以上是生活随笔為你收集整理的WCF后续之旅(11): 关于并发、回调的线程关联性(Thread Affinity)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 软件质量保证
- 下一篇: 罗斯文2007(Northwind 20