视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
.NetCore利用BlockingCollection实现简易消息队列
2020-11-27 22:34:50 责编:小采
文档


消息队列现今的应用场景越来越大,常用的有RabbmitMQ和KafKa。

我们用BlockingCollection来实现简单的消息队列。

BlockingCollection实现了生产者/消费者模式,是对IProducerConsumerCollection<T>接口的实现。与其他Concurrent集合一样,每次Add或Take元素,都会导致对集合的lock。只有当确定需要在内存中创建一个生产者,消费者模式时,再考虑这个类。

MSDN中的示例用法:

using (BlockingCollection<int> bc = new BlockingCollection<int>())
 {
 Task.Factory.StartNew(() =>
 {
 for (int i = 0; i < 1000; i++)
 {
 bc.Add(i);
 Thread.Sleep(50); 
 }
 
 
 // Need to do this to keep foreach below from hanging
 bc.CompleteAdding();
 });
 
 
 // Now consume the blocking collection with foreach.
 // Use bc.GetConsumingEnumerable() instead of just bc because the
 // former will block waiting for completion and the latter will
 // simply take a snapshot of the current state of the underlying collection.
 foreach (var item in bc.GetConsumingEnumerable())
 {
 Console.WriteLine(item);
 }
 }

实现消息队列

用Vs2017创建一个控制台应用程序。创建DemoQueueBlock类,封装一些常用判断。

  • HasEle,判断是否有元素
  • Add向队列中添加元素
  • Take从队列中取出元素
  • 为了不把BlockingCollection直接暴漏给使用者,我们封装一个DemoQueueBlock类

     /// <summary>
     /// BlockingCollection演示消息队列
     /// </summary>
     /// <typeparam name="T"></typeparam>
     public class DemoQueueBlock<T> where T : class
     {
     private static BlockingCollection<T> Colls;
     public DemoQueueBlock()
     {
    
     }
     public static bool IsComleted() {
     if (Colls != null && Colls.IsCompleted) {
     return true;
     }
     return false;
     }
     public static bool HasEle()
     {
     if (Colls != null && Colls.Count>0)
     {
     return true;
     }
     return false;
     }
     
     public static bool Add(T msg)
     {
     if (Colls == null)
     {
     Colls = new BlockingCollection<T>();
     }
     Colls.Add(msg);
     return true;
     }
     public static T Take()
     {
     if (Colls == null)
     {
     Colls = new BlockingCollection<T>();
     }
     return Colls.Take();
     }
     }
    
     /// <summary>
     /// 消息体
     /// </summary>
     public class DemoMessage
     {
     public string BusinessType { get; set; }
     public string BusinessId { get; set; }
     public string Body { get; set; }
     }
    
    

    添加元素进队列

    通过控制台,添加元素

     //添加元素
     while (true)
     {
     Console.WriteLine("请输入队列");
     var read = Console.ReadLine();
     if (read == "exit")
     {
     return;
     }
    
     DemoQueueBlock<DemoMessage>.Add(new DemoMessage() { BusinessId = read });
     }
    

    消费队列

    通过判断IsComleted,来确定是否获取队列

     Task.Factory.StartNew(() =>
     {
     //从队列中取元素。
     while (!DemoQueueBlock<DemoMessage>.IsComleted())
     {
     try
     {
     var m = DemoQueueBlock<DemoMessage>.Take();
     Console.WriteLine("已消费:" + m.BusinessId);
     }
     catch (Exception ex)
     {
     Console.WriteLine(ex.Message);
     }
     }
     });
    

    查看运行结果

    运行结果

    这样我们就实现了简易的消息队列。

    示例源码:简易队列

    下载本文
    显示全文
    专题