游瞠离 发表于 2025-5-29 15:53:10

通过WCF扩展实现消息压缩

对于需要进行大规模数据传输的WCF应用来说,对于请求消息和回复消息进行传输前的压缩,不但可以降低网络流量,也可以提高网络传输的性能。由于WCF的扩展性,我们可以采用不同的方式实现对消息的压缩,本文提供一种比较简单的实现方式。[源代码从这里下载]
   一、三种可行的消息压缩方案      
二、DataCompressor——用于数据压缩与解压缩组件      
三、MessageCompressor——用于消息压缩与解压的组件      
四、CompressionMessageFormatter——用于对请求/回复消息压缩和解压缩的组件      
五、CompressionOperationBehaviorAttribute——将CompressionMessageFormatter用于WCF运行时框架的操作行为      
六、查看结构压缩后的消息      
七、补充说明
一、三种可行的消息压缩方案

消息压缩在WCF中的实现其实很简单,我们只需要在消息(请求消息/回复消息)被序列化之后,发送之前进行压缩;在接收之后,反序列化之前进行解压缩即可。针对压缩/解压缩使用的时机,我们具有三种典型的解决方案。

   
[*]通过自定义MessageEncoder和MessageEncodingBindingElement 来完成。具体的实现,可以参阅张玉彬的文章《WCF进阶:将编码后的字节流压缩传输》和MSDN的文章《Custom Message Encoder: Compression Encoder》。   
[*]直接创建用于压缩和解压缩的信道,在CodePlex中具有这么一个WCF Extensions;   
[*]自定义MessageFormatter实现序列化后的压缩和法序列化前的解压缩,这就是我们今天将要介绍的解决方案。
二、DataCompressor——用于数据压缩与解压缩组件

我们支持两种方式的压缩,Dflate和GZip。两种不同的压缩算法通过如下定义的CompressionAlgorithm枚举表示。
             1: <s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"><s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/">public enum CompressionAlgorithm       2: {       3:   GZip,       4:   </s:Header></s:Header>Deflate       5: }而如下定义的DataCompressor负责基于上述两种压缩算法实际上的压缩和解压缩工作。
         1: <s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"><s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/">internal class DataCompressor       2: {       3:   public static byte[] Compress(byte[] decompressedData, CompressionAlgorithm algorithm)       4:   </s:Header></s:Header>{       5:   <s:Body><s:Body>      using (MemoryStream stream = new MemoryStream())       6:         {       7:   </s:Body></s:Body>          if (algorithm == CompressionAlgorithm.Deflate)       8: </s:Envelope></s:Envelope>            {       9:               GZipStream stream2 = new GZipStream(stream, CompressionMode.Compress, true);    10:               stream2.Write(decompressedData, 0, decompressedData.Length);    11:               stream2.Close();    12:             }    13:             else    14:             {    15:               DeflateStream stream3 = new DeflateStream(stream, CompressionMode.Compress, true);    16:               stream3.Write(decompressedData, 0, decompressedData.Length);    17:               stream3.Close();    18:             }    19:             return stream.ToArray();    20:         }    21:   }    22:      23:   public static byte[] Decompress(byte[] compressedData, CompressionAlgorithm algorithm)    24:   {    25:         using (MemoryStream stream = new MemoryStream(compressedData))    26:         {    27:             if (algorithm == CompressionAlgorithm.Deflate)    28:             {    29:               using (GZipStream stream2 = new GZipStream(stream, CompressionMode.Decompress))    30:               {    31:                     return LoadToBuffer(stream2);    32:               }    33:             }    34:             else    35:             {    36:               using (DeflateStream stream3 = new DeflateStream(stream, CompressionMode.Decompress))    37:               {    38:                     return LoadToBuffer(stream3);    39:               }    40:             }    41:         }    42:   }    43:      44:   private static byte[] LoadToBuffer(Stream stream)    45:   {    46:         using (MemoryStream stream2 = new MemoryStream())    47:         {    48:             int num;    49:             byte[] buffer = new byte;    50:             while ((num = stream.Read(buffer, 0, buffer.Length)) > 0)    51:             {    52:               stream2.Write(buffer, 0, num);    53:             }    54:             return stream2.ToArray();    55:         }    56:   }    57: }三、MessageCompressor——用于消息压缩与解压的组件
而针对消息的压缩和解压缩通过如下一个MessageCompressor来完成。具体来说,我们通过上面定义的DataCompressor对消息的主体部分内容进行压缩,并将压缩后的内容存放到一个预定义的XML元素中(名称和命名空间分别为CompressedBody和http://www.artech.com/comporession/),同时添加相应的MessageHeader表示消息经过了压缩,以及采用的压缩算法。对于解压缩,则是通过消息是否具有相应的MessageHeader判断该消息是否经过压缩,如果是则根据相应的算法对其进行解压缩。具体的实现如下:
         1: <s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"><s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/">public class MessageCompressor       2:{       3:      public MessageCompressor(CompressionAlgorithm algorithm)       4:   </s:Header></s:Header>   {       5:   <s:Body><s:Body>       this.Algorithm = algorithm;       6:      }       7:   </s:Body></s:Body>   public Message CompressMessage(Message sourceMessage)       8: </s:Envelope></s:Envelope>   {       9:          byte[] buffer;    10:          using (XmlDictionaryReader reader1 = sourceMessage.GetReaderAtBodyContents())    11:          {    12:            buffer = Encoding.UTF8.GetBytes(reader1.ReadOuterXml());    13:          }    14:          if (buffer.Length == 0)    15:          {    16:            Message emptyMessage = Message.CreateMessage(sourceMessage.Version, (string)null);    17:            sourceMessage.Headers.CopyHeadersFrom(sourceMessage);    18:            sourceMessage.Properties.CopyProperties(sourceMessage.Properties);    19:            emptyMessage.Close();    20:            return emptyMessage;    21:          }    22:          byte[] compressedData = DataCompressor.Compress(buffer, this.Algorithm);    23:          string copressedBody = CompressionUtil.CreateCompressedBody(compressedData);    24:          XmlTextReader reader = new XmlTextReader(new StringReader(copressedBody), new NameTable());    25:          Message message2 = Message.CreateMessage(sourceMessage.Version, null, (XmlReader)reader);    26:          message2.Headers.CopyHeadersFrom(sourceMessage);    27:          message2.Properties.CopyProperties(sourceMessage.Properties);    28:          message2.AddCompressionHeader(this.Algorithm);    29:          sourceMessage.Close();    30:          return message2;    31:      }    32:      33:      public Message DecompressMessage(Message sourceMessage)    34:      {    35:          if (!sourceMessage.IsCompressed())    36:          {    37:            return sourceMessage;    38:          }    39:          CompressionAlgorithm algorithm = sourceMessage.GetCompressionAlgorithm();    40:          sourceMessage.RemoveCompressionHeader();    41:          byte[] compressedBody = sourceMessage.GetCompressedBody();    42:          byte[] decompressedBody = DataCompressor.Decompress(compressedBody, algorithm);    43:          string newMessageXml = Encoding.UTF8.GetString(decompressedBody);    44:          XmlTextReader reader2 = new XmlTextReader(new StringReader(newMessageXml));    45:          Message newMessage = Message.CreateMessage(sourceMessage.Version, null, reader2);    46:          newMessage.Headers.CopyHeadersFrom(sourceMessage);    47:          newMessage.Properties.CopyProperties(sourceMessage.Properties);    48:          return newMessage;    49:      }    50:      51:      public CompressionAlgorithm Algorithm { get; private set; }    52:}下面是针对Message类型而定义了一些扩展方法和辅助方法。
         1: <s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"><s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/">public static class CompressionUtil       2: {       3:   public const string CompressionMessageHeader = "Compression";       4:   </s:Header></s:Header>public const string CompressionMessageBody = "CompressedBody";       5:   <s:Body><s:Body>public const string Namespace = "http://www.artech.com/compression";       6:         7:   </s:Body></s:Body>public static bool IsCompressed(this Message message)       8: </s:Envelope></s:Envelope>    {       9:         return message.Headers.FindHeader(CompressionMessageHeader, Namespace) > -1;    10:   }    11:      12:   public static void AddCompressionHeader(this Message message, CompressionAlgorithm algorithm)    13:   {     14:         message.Headers.Add(MessageHeader.CreateHeader(CompressionMessageHeader, Namespace, string.Format("algorithm = \"{0}\"",algorithm)));    15:   }    16:      17:   public static void RemoveCompressionHeader(this Message message)    18:   {    19:         message.Headers.RemoveAll(CompressionMessageHeader, Namespace);    20:   }    21:      22:   public static CompressionAlgorithm GetCompressionAlgorithm(this Message message)    23:   {    24:         if (message.IsCompressed())    25:         {    26:             var algorithm = message.Headers.GetHeader<string>(CompressionMessageHeader, Namespace);    27:             algorithm = algorithm.Replace("algorithm =", string.Empty).Replace("\"", string.Empty).Trim();    28:             if (algorithm == CompressionAlgorithm.Deflate.ToString())    29:             {    30:               return CompressionAlgorithm.Deflate;    31:             }    32:      33:             if (algorithm == CompressionAlgorithm.GZip.ToString())    34:             {    35:               return CompressionAlgorithm.GZip;    36:             }    37:             throw new InvalidOperationException("Invalid compression algrorithm!");    38:         }    39:         throw new InvalidOperationException("Message is not compressed!");    40:   }    41:      42:   public static byte[] GetCompressedBody(this Message message)    43:   {    44:         byte[] buffer;    45:         using (XmlReader reader1 = message.GetReaderAtBodyContents())    46:         {    47:             buffer = Convert.FromBase64String(reader1.ReadElementString(CompressionMessageBody, Namespace));    48:         }    49:         return buffer;    50:   }    51:      52:   public static string CreateCompressedBody(byte[] content)    53:   {    54:         StringWriter output = new StringWriter();    55:         using (XmlWriter writer2 = XmlWriter.Create(output))    56:         {    57:             writer2.WriteStartElement(CompressionMessageBody, Namespace);    58:             writer2.WriteBase64(content, 0, content.Length);    59:             writer2.WriteEndElement();    60:         }    61:         return output.ToString();    62:   }    63: }四、CompressionMessageFormatter——用于对请求/回复消息压缩和解压缩的组件

消息的序列化和反序列化最终是通过MessageFormatter来完成的。具体来说,客户端通过ClientMessageFormatter实现对请求消息的序列化和对回复消息的序列化,而服务端通过DispatchMessageFormatter实现对请求消息的反序列化和对回复消息的序列化。
在默认的情况下,WCF选用的MessageFormatter为DataContractSerializerOperationFormatter,它采用DataContractSerializer进行实际的序列化和法序列化操作。我们自定义的MessageFormatter实际上是对DataContractSerializerOperationFormatter的封装,我们依然使用它来完成序列化和反序列化工作,额外实现序列化后的压缩和法序列化前的解压缩。
因为DataContractSerializerOperationFormatter是一个internal类型,我们只有通过反射的方式来创建它。如下的代码片断为用于进行消息压缩与解压缩的自定义MessageFormatter,即CompressionMessageFormatter的定义。
         1: <s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"><s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/">public class CompressionMessageFormatter: IDispatchMessageFormatter, IClientMessageFormatter       2: {       3:   private const string DataContractSerializerOperationFormatterTypeName = "System.ServiceModel.Dispatcher.DataContractSerializerOperationFormatter, System.ServiceModel, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089";       4:         5:   <s:Body><s:Body>public IDispatchMessageFormatter InnerDispatchMessageFormatter { get; private set; }       6:   public IClientMessageFormatter InnerClientMessageFormatter { get; private set; }       7:   </s:Body></s:Body>public MessageCompressor MessageCompressor { get; private set; }       8:         9:   public CompressionMessageFormatter(CompressionAlgorithm algorithm, OperationDescription description, DataContractFormatAttribute dataContractFormatAttribute, DataContractSerializerOperationBehavior serializerFactory)    10:   {    11:         this.MessageCompressor = new MessageCompressor(algorithm);    12:         Type innerFormatterType = Type.GetType(DataContractSerializerOperationFormatterTypeName);    13:         var innerFormatter = Activator.CreateInstance(innerFormatterType, description, dataContractFormatAttribute, serializerFactory);    14:         this.InnerClientMessageFormatter = innerFormatter as IClientMessageFormatter;    15:         this.InnerDispatchMessageFormatter = innerFormatter as IDispatchMessageFormatter;    16:   }    17:      18:   public void DeserializeRequest(Message message, object[] parameters)    19:   {    20:         message = this.MessageCompressor.DecompressMessage(message);    21:         this.InnerDispatchMessageFormatter.DeserializeRequest(message, parameters);    22:   }    23:      24:   public Message SerializeReply(MessageVersion messageVersion, object[] parameters, object result)    25:   {    26:         var message = this.InnerDispatchMessageFormatter.SerializeReply(messageVersion, parameters, result);    27:         return this.MessageCompressor.CompressMessage(message);    28:   }    29:      30:   public object DeserializeReply(Message message, object[] parameters)    31:   {    32:         message = this.MessageCompressor.DecompressMessage(message);    33:         return this.InnerClientMessageFormatter.DeserializeReply(message, parameters);    34:   }    35:      36:   public Message SerializeRequest(MessageVersion messageVersion, object[] parameters)    37:   {    38:         var message = this.InnerClientMessageFormatter.SerializeRequest(messageVersion, parameters);    39:         return this.MessageCompressor.CompressMessage(message);    40:   }    41: }五、CompressionOperationBehaviorAttribute——将CompressionMessageFormatter用于WCF运行时框架的操作行为

ClientMessageFormatter和DispatchMessageFormatter实际上属于ClientOperation和DispatchOperation的组件。我们可以通过如下一个自定义的操作行为CompressionOperationBehaviorAttribute将其应用到相应的操作上。
         1: <s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"><s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/">       2: public class CompressionOperationBehaviorAttribute: Attribute, IOperationBehavior       3: {       4:   </s:Header></s:Header>public CompressionAlgorithm Algorithm { get; set; }       5:         6:   public void AddBindingParameters(OperationDescription operationDescription, BindingParameterCollection bindingParameters) { }       7:         8: </s:Envelope></s:Envelope>    public void ApplyClientBehavior(OperationDescription operationDescription, ClientOperation clientOperation)       9:   {    10:         clientOperation.SerializeRequest = true;    11:         clientOperation.DeserializeReply = true;    12:         var dataContractFormatAttribute = operationDescription.SyncMethod.GetCustomAttributes(typeof(DataContractFormatAttribute), true).FirstOrDefault() as DataContractFormatAttribute;    13:         if (null == dataContractFormatAttribute)    14:         {    15:             dataContractFormatAttribute = new DataContractFormatAttribute();    16:         }    17:      18:         var dataContractSerializerOperationBehavior = operationDescription.Behaviors.Find<DataContractSerializerOperationBehavior>();    19:         clientOperation.Formatter = new CompressionMessageFormatter(this.Algorithm, operationDescription, dataContractFormatAttribute, dataContractSerializerOperationBehavior);                20:   }    21:      22:   public void ApplyDispatchBehavior(OperationDescription operationDescription, DispatchOperation dispatchOperation)    23:   {    24:         dispatchOperation.SerializeReply      = true;    25:         dispatchOperation.DeserializeRequest    = true;    26:         var dataContractFormatAttribute = operationDescription.SyncMethod.GetCustomAttributes(typeof(DataContractFormatAttribute), true).FirstOrDefault() as DataContractFormatAttribute;    27:         if (null == dataContractFormatAttribute)    28:         {    29:             dataContractFormatAttribute = new DataContractFormatAttribute();    30:         }    31:         var dataContractSerializerOperationBehavior = operationDescription.Behaviors.Find<DataContractSerializerOperationBehavior>();    32:         dispatchOperation.Formatter = new CompressionMessageFormatter(this.Algorithm, operationDescription, dataContractFormatAttribute, dataContractSerializerOperationBehavior);       33:   }    34:      35:   public void Validate(OperationDescription operationDescription) { }    36: }六、查看结构压缩后的消息

为了验证应用了CompressionOperationBehaviorAttribute特性的操作方法对应的消息是否经过了压缩,我们可以通过一个简单的例子来检验。我们采用常用的计算服务的例子,下面是服务契约和服务类型的定义。我们上面定义的CompressionOperationBehaviorAttribute应用到服务契约的Add操作上。
         1: <s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"><s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/">       2: public interface ICalculator       3: {       4:   </s:Header></s:Header>       5:   <s:Body><s:Body>       6:   double Add(double x, double y);       7: }       8: </s:Envelope></s:Envelope>public class CalculatorService : ICalculator       9: {    10:   public double Add(double x, double y)    11:   {    12:         return x + y;    13:   }    14: }我们采用BasicHttpBinding作为终结点的绑定类型(具体的配置请查看源代码),下面是通过Fiddler获取的消息的内容,它们的主体部分都经过了基于压缩的编码。
         1: <s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"><s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/">       2:   <s:Header><s:Header>       3:   <Compression xmlns="http://www.artech.com/compression">algorithm = "GZip"</Compression>       4:   </s:Header></s:Header>       5:   <s:Body><s:Body>       6:   <CompressedBody xmlns="http://www.artech.com/compression">7L0HYBx ... CQAA//8=</CompressedBody>       7:   </s:Body></s:Body>       8: </s:Envelope></s:Envelope>回复消息
         1: <s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"><s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/">       2:   <s:Header><s:Header>       3:   <Compression xmlns="http://www.artech.com/compression">algorithm = "GZip"</Compression>       4:   </s:Header></s:Header>       5:   <s:Body><s:Body>       6:   <CompressedBody xmlns="http://www.artech.com/compression">7L0H...PAAAA//8=</CompressedBody>       7:   </s:Body></s:Body>       8: </s:Envelope></s:Envelope>七、补充说明

由于CompressionMessageFormatter使用基于DataContractSerializer序列化器的DataContractSerializerOperationFormatter进行消息的序列化和发序列化工作。而DataContractSerializer仅仅是WCF用于序列化的一种默认的选择(WCF还可以采用传统的XmlSeriaizer)。为了让CompressionMessageFormatter能够使用其他序列化器,你可以对于进行相应的修正。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: 通过WCF扩展实现消息压缩