From b1d9b57426d26419382cb582499142017512872f Mon Sep 17 00:00:00 2001 From: yvesgoeleven Date: Sat, 20 Aug 2011 20:46:27 +0200 Subject: [PATCH] Added QueuePerInstance support --- Samples/AzureFullDuplex/AzureFullDuplex.suo | Bin 114176 -> 117248 bytes .../OrderWebSite/Global.asax.cs | 7 +- .../AzurePubSub/OrderWebSite/Global.asax.cs | 10 +-- .../WebSite/Global.asax.cs | 1 + .../AzureQueueConfig.cs | 13 ++++ .../ConfigureAzureMessageQueue.cs | 16 +++++ .../IndividualQueueConfigurationSource.cs | 68 ++++++++++++++++++ ...iceBus.Unicast.Queuing.Azure.Config.csproj | 2 + .../AzureMessageQueue.cs | 1 + 9 files changed, 110 insertions(+), 8 deletions(-) create mode 100644 src/azure/Queueing/NServiceBus.Unicast.Queuing.Azure.Config/IndividualQueueConfigurationSource.cs diff --git a/Samples/AzureFullDuplex/AzureFullDuplex.suo b/Samples/AzureFullDuplex/AzureFullDuplex.suo index 12bf1c1a3086caf419b78aefd81b06f764c4dda7..f6c4dde0ae1efb1e1f505ff4be2b6c74cacbd0b5 100644 GIT binary patch delta 3691 zcmcgvdr*^C7XMCOB#132DnhDABXuOu1d*3+4HCh+3WyIVlY)5(sTe{YtXdy|Raa+x ztX#+2(pr^K3YN8$PmSZ!A-3u`9mle6r@M8nQoGZ2S*_M(th#R9bCWML+U+?0VQ=R5 zefQpT?m73|@1Ebi@!pqtcSdxziy(z`h!Q4F<2Y^#A_FlMk%`DcWFvAAxrk)`pQ7J_ z%*(fbSyRP3C7;JjMq@Y8#Z2Pe<3ANE-~kr(P}lRHDiquZY$gnKrEU_FuB#HVt8Cox zJkEkYCuikcoQw3H8pZ3h5|Jd8Df@@1iF-<)q)}+_?u_*yS|#Q0O)U~h#tHUhN3h~o z@~g5h09+@Fa;HEd*_@jK&+|ue{}M^OiP7Xw5*6>CwNobPM_s;rZXCp2!rB$YWyIHr zuMj`r%!y;(3exwxx%?-~4nuc|S|p7Up@i3#7K-2+S-J2(vUF6n)R9$-Hj=f=n~0-a zM!oA`GOsD>1Csda3OXkeGWgEL3xOtKj`Yu8u{hcVp1`c1@lLf^mfJ%|{F5tGlV+GE?XF#(Z;U;|q4&HsLh1L#K+ zp@ctI-zA2(c__ajf-~g1ieHiY8a>f8&j!k0uNWz&{$|h-ov9D*5N-WbvcxS9Xn4O+ zC%nwr@x$A=284z4a7M0{n~!fR=b;}I!*k?^t5J$H&W!tl+ms2i>RpBFOprmqJ^2%tKq3 z3r+P6x^Ed&QLh2g`O0&*pY6V)m&)UDE-aCPS@Mr9A~DD9Bq#c{Wg=P4})g@n}K*S^}YZ_)LR14zDgC`hPKHdqPLPEo9yjZ`r4DB z6KGQ*%%S~D;VNl3eD0_QMAIXVD>+WiU8K8PA(s}_LVTsmX*L*Fd7M_me2>**wb`o; zMGLIX2AkPB)8k58WVe}(E_WIeqAV?ssix)E*_Ya?4KLd*wg#KUW2_B~4fcARMz^ib zzM#(IG+Sw77o-$^4EkIR{$4Mbl9iL4pPrwZq1EF>BEJK#3ev@PSk4@eMklSfE?ekd_-3 zN~%MxK{phWk0n~^TbQ?gde;RhbiW72E1(=zSYojaNe4Yptl*gKB|?|a-2hQ?>c>oh z7SdQaqir3C!A;hkDkHZlrc;(;(1$R96BTvbSCxle6cBsiS*SPk$NzO?uWqOXQCEBcYu(Zh#roy9xX>y%b)H`Qqkg z370q+cj&;t8{4(Bt0q$80vP4%+YD|I(W^Doy(PF7bZ`rlkdqRP^gJ#~I$gOH<|x35 zfiD$;mmWpIMc>`6un%bKc8DVhR}^GVg4E~Vj_U!3m~R)kuzor0A;1BuV}jo^h|T!M5d&+Mxt?JB@ai zqt59L_c4^p3q#VhK+R}obY-kHJk%Oe3UbTIaEArV;gfQvA;6sTtS*^#8 zYOVADZpCBt?jP|#O44v8@v9~Bb*MHG!A>vf36!4;%3FhSiI>Un--Glb%03fhQYF!4 z$R%rqadG`9yM_%`=Gk240$Z)s_0sC#jpAw33|r4_LErei)5~tNcw6K2PltQZT@3s4 z7AkYG66NbT2Gd0Jyb;#Zcb0+7cWDwN0dZ_q`jl$e1N4;-VHt@|NyAT}B1t2Z)Sn3% p(g9pIDy(Ug(02>_lw@Dxuj$Di=<$7j927v8e2CkEJHs%quy6Kh!nO{bGP40Ai(8C;m`yWX_7M%)mrQqGX&3j;TpaE;zt?xp zz4!diIrrT2JRMU!y;Fjcs*OCG!qCioQB5`@ zE_H-)2L+x>+Ya;PT6VlBFy5ObXNcp&`NzO%t5>Ki=m4@t^aarg73oc zyT9xydeL|6=g~QlGpizcPHg9Im~ziFxufpRU)YQ0Fwwj}yF$`%vqGqtE`<&L8*#WY|Zs(zy34fJ`l z6V>ku=h>Rq=>4Tu`RhPYqD^EkO>VE$p|47n(*wqW4ZLdr~ZEIrMD^=)=HNDXR z7s&KnvUOf?JKwg}4*XDE1VA@>$&!sOwU$d~5+4Qw65FiB?0GQr@`hh!=<=mJKF|Cf zAlAwtrf3V_vU-aI6&yD6=5nvZ+o+2LE4poc9>3oOi}-?$J}GJ&P@=w@M&3Gb08rbW zT$Fp^=8>)iGbn0PG-08L8{$Bx22%F>{YO56qneAf_q+!%-2z1tajLy_A+PylWC}i3 z3nkc70vZ$^8O6tqUZ3Ip?u5kqwNWY;Jr-+cq7=T}OQUNC5-}=4*=PR~qLry0_;>P%gRL7kD6qA_J^(lr?d z-DY>bqtfDd+2YtxU12E+3BDwtm%VUm7DY)ZU&x-pm}Sb+W@^>COjCwB&6rN?X$@&= zlh&xwq>!Q2YB#$LO)iJUGh72H~a?{!0>$$fS*jm}14 zH=Ya_UhbZZ=t(NDdOfgDJA-Ml)K}VV)%9G3t-h*yoy$?iRom)u**mZ}r#}?Pf5>)- zZ(4}YYIEYPAAl<7(j;0}gsfNE>TO(^i?cah<<1I6b-5)Fii*^`Sk&UEW*>6uopvzF zi^YNySU<)L6H-iMThk)hTs=fzW5Mpb>vFBCjmnD&wZ7jz6 z9WYm@+yNzk%r$=O*$IB=!drJjGd}zb#0gh-ft6+|L?+UIF}F8FzAi|P$M%xA%867H z)JR83qV#xUE98m}OMOJVF8tOCZ$iw_6pH%h35q&SJ{0{VnGH+HcY@3d-hE&OR46y0 z_~CHeSpkdiNIp#EJ8K)khk*2y4hck{`1SsO>~%=q&?K-Jw#g=Pbr3#9Ee#~DL*VB# z9e!BwDA$p8#|yWQ!3+t0st3Y_@VDU(boRk2a>zpXH_52z7xO#qB+0sBn$!* zAIwM8brRq>5yatimaVfmas4S!2(NhI4@vez3h7b&1vkES7G|!VpqzM4Cx0>m<74V9 zLC%r~IYqtz=l^+41EHgNO!V?e>GYxA^DlZ`fcb9;kdg$D!>5dovkPXDU=_m9hoGT- qgAfLt7eI-X{V+Rl``#k_XHcYk2}+R1VdF*UCoc&BBr}YtSo$Z|{Xx0_ diff --git a/Samples/AzureFullDuplex/OrderWebSite/Global.asax.cs b/Samples/AzureFullDuplex/OrderWebSite/Global.asax.cs index e7d4bdfbac6..2812b1da976 100644 --- a/Samples/AzureFullDuplex/OrderWebSite/Global.asax.cs +++ b/Samples/AzureFullDuplex/OrderWebSite/Global.asax.cs @@ -37,10 +37,11 @@ protected void Application_BeginRequest(object sender, EventArgs e) .Log4Net(new AzureAppender()) .AzureConfigurationSource() .AzureMessageQueue() - .JsonSerializer() + .JsonSerializer() + .QueuePerInstance() .UnicastBus() - .LoadMessageHandlers() - .IsTransactional(true) + .LoadMessageHandlers() + .IsTransactional(true) .CreateBus(); Bus = StartBus.Value; diff --git a/Samples/AzurePubSub/OrderWebSite/Global.asax.cs b/Samples/AzurePubSub/OrderWebSite/Global.asax.cs index bcf87762b94..91008b06a72 100644 --- a/Samples/AzurePubSub/OrderWebSite/Global.asax.cs +++ b/Samples/AzurePubSub/OrderWebSite/Global.asax.cs @@ -36,12 +36,12 @@ protected void Application_Start(object sender, EventArgs e) //.Log4Net(new AzureAppender()) .AzureConfigurationSource() .AzureMessageQueue() - .JsonSerializer() + .JsonSerializer() + .QueuePerInstance() .UnicastBus() - .LoadMessageHandlers() - .IsTransactional(true) - .CreateBus() - ; + .LoadMessageHandlers() + .IsTransactional(true) + .CreateBus(); } protected void Application_BeginRequest(object sender, EventArgs e) diff --git a/Samples/AzureThumbnailCreator/WebSite/Global.asax.cs b/Samples/AzureThumbnailCreator/WebSite/Global.asax.cs index 6603e23e263..36c8e7fe8d4 100644 --- a/Samples/AzureThumbnailCreator/WebSite/Global.asax.cs +++ b/Samples/AzureThumbnailCreator/WebSite/Global.asax.cs @@ -32,6 +32,7 @@ protected void Application_BeginRequest(object sender, EventArgs e) .AzureConfigurationSource() .AzureMessageQueue() .JsonSerializer() + .QueuePerInstance() .AzureDataBus() .UnicastBus() .LoadMessageHandlers() diff --git a/src/azure/Queueing/NServiceBus.Unicast.Queuing.Azure.Config/AzureQueueConfig.cs b/src/azure/Queueing/NServiceBus.Unicast.Queuing.Azure.Config/AzureQueueConfig.cs index b94e6e5328f..fa33a97cc49 100644 --- a/src/azure/Queueing/NServiceBus.Unicast.Queuing.Azure.Config/AzureQueueConfig.cs +++ b/src/azure/Queueing/NServiceBus.Unicast.Queuing.Azure.Config/AzureQueueConfig.cs @@ -82,5 +82,18 @@ public int BatchSize this["BatchSize"] = value; } } + + [ConfigurationProperty("QueuePerInstance", IsRequired = false, DefaultValue = AzureMessageQueue.DefaultQueuePerInstance)] + public bool QueuePerInstance + { + get + { + return (bool)this["QueuePerInstance"]; + } + set + { + this["QueuePerInstance"] = value; + } + } } } \ No newline at end of file diff --git a/src/azure/Queueing/NServiceBus.Unicast.Queuing.Azure.Config/ConfigureAzureMessageQueue.cs b/src/azure/Queueing/NServiceBus.Unicast.Queuing.Azure.Config/ConfigureAzureMessageQueue.cs index 9f537a42181..be641ce2895 100644 --- a/src/azure/Queueing/NServiceBus.Unicast.Queuing.Azure.Config/ConfigureAzureMessageQueue.cs +++ b/src/azure/Queueing/NServiceBus.Unicast.Queuing.Azure.Config/ConfigureAzureMessageQueue.cs @@ -20,6 +20,9 @@ public static Configure AzureMessageQueue(this Configure config) { queueClient = CloudStorageAccount.Parse(configSection.ConnectionString).CreateCloudQueueClient(); Address.OverrideDefaultMachine(configSection.ConnectionString); + + if (configSection.QueuePerInstance) + Configure.Instance.CustomConfigurationSource(new IndividualQueueConfigurationSource(Configure.ConfigurationSource)); } else { @@ -109,5 +112,18 @@ public static Configure BatchSize(this Configure config, int value) return config; } + + /// + /// Configures a queue per instance + /// + /// + /// + public static Configure QueuePerInstance(this Configure config) + { + if(! (Configure.ConfigurationSource is IndividualQueueConfigurationSource)) + Configure.Instance.CustomConfigurationSource(new IndividualQueueConfigurationSource(Configure.ConfigurationSource)); + + return config; + } } } \ No newline at end of file diff --git a/src/azure/Queueing/NServiceBus.Unicast.Queuing.Azure.Config/IndividualQueueConfigurationSource.cs b/src/azure/Queueing/NServiceBus.Unicast.Queuing.Azure.Config/IndividualQueueConfigurationSource.cs new file mode 100644 index 00000000000..94a06fafc57 --- /dev/null +++ b/src/azure/Queueing/NServiceBus.Unicast.Queuing.Azure.Config/IndividualQueueConfigurationSource.cs @@ -0,0 +1,68 @@ +using Microsoft.WindowsAzure.ServiceRuntime; +using NServiceBus.Config.ConfigurationSource; + +namespace NServiceBus.Config +{ + public class IndividualQueueConfigurationSource : IConfigurationSource + { + private readonly IConfigurationSource innerSource; + + public IndividualQueueConfigurationSource(IConfigurationSource innerSource) + { + this.innerSource = innerSource; + } + + T IConfigurationSource.GetConfiguration() + { + var config = innerSource.GetConfiguration(); + + var unicastBusConfig = config as UnicastBusConfig; + if (unicastBusConfig != null && unicastBusConfig.LocalAddress != null && RoleEnvironment.IsAvailable) + { + var individualQueueName = ParseQueueNameFrom(unicastBusConfig.LocalAddress) + + "-" + + ParseIndexFrom(RoleEnvironment.CurrentRoleInstance.Id); + if (unicastBusConfig.LocalAddress.Contains("@")) + individualQueueName += "@" + ParseMachineNameFrom(unicastBusConfig.LocalAddress); + + unicastBusConfig.LocalAddress = individualQueueName; + } + + var msmqTransportConfig = config as MsmqTransportConfig; + if (msmqTransportConfig != null && msmqTransportConfig.InputQueue != null && RoleEnvironment.IsAvailable) + { + var individualQueueName = ParseQueueNameFrom(msmqTransportConfig.InputQueue) + + "-" + + ParseIndexFrom(RoleEnvironment.CurrentRoleInstance.Id); + if (msmqTransportConfig.InputQueue.Contains("@")) + individualQueueName += "@" + ParseMachineNameFrom(msmqTransportConfig.InputQueue); + + msmqTransportConfig.InputQueue = individualQueueName; + } + + return config; + } + + private string ParseMachineNameFrom(string inputQueue) + { + return inputQueue.Contains("@") ? inputQueue.Substring(inputQueue.IndexOf("@") + 1) : string.Empty; + } + + private object ParseQueueNameFrom(string inputQueue) + { + return inputQueue.Contains("@") ? inputQueue.Substring(0, inputQueue.IndexOf("@")) : inputQueue; + } + + private static int ParseIndexFrom(string id) + { + var idArray = id.Split('.'); + int index; + if (!int.TryParse((idArray[idArray.Length - 1]), out index)) + { + idArray = id.Split('_'); + index = int.Parse((idArray[idArray.Length - 1])); + } + return index; + } + } +} \ No newline at end of file diff --git a/src/azure/Queueing/NServiceBus.Unicast.Queuing.Azure.Config/NServiceBus.Unicast.Queuing.Azure.Config.csproj b/src/azure/Queueing/NServiceBus.Unicast.Queuing.Azure.Config/NServiceBus.Unicast.Queuing.Azure.Config.csproj index 5cdbdee8e8f..08c17c1bd84 100644 --- a/src/azure/Queueing/NServiceBus.Unicast.Queuing.Azure.Config/NServiceBus.Unicast.Queuing.Azure.Config.csproj +++ b/src/azure/Queueing/NServiceBus.Unicast.Queuing.Azure.Config/NServiceBus.Unicast.Queuing.Azure.Config.csproj @@ -32,6 +32,7 @@ 4 + False ..\..\..\lib\Azure\Microsoft.WindowsAzure.ServiceRuntime.dll @@ -59,6 +60,7 @@ + diff --git a/src/azure/Queueing/NServiceBus.Unicast.Queuing.Azure/AzureMessageQueue.cs b/src/azure/Queueing/NServiceBus.Unicast.Queuing.Azure/AzureMessageQueue.cs index 159b650745c..80b863127ec 100644 --- a/src/azure/Queueing/NServiceBus.Unicast.Queuing.Azure/AzureMessageQueue.cs +++ b/src/azure/Queueing/NServiceBus.Unicast.Queuing.Azure/AzureMessageQueue.cs @@ -20,6 +20,7 @@ public class AzureMessageQueue : IReceiveMessages,ISendMessages public const int DefaultBatchSize = 10; public const bool DefaultPurgeOnStartup = false; public const string DefaultConnectionString = "UseDevelopmentStorage=true"; + public const bool DefaultQueuePerInstance = false; private CloudQueue queue; private readonly CloudQueueClient client;