第2拾一天- 进程串行(锁) 队列 生成者消费者模型

    class Program
    {
        public static readonly Queue<OrderInfo> queue = new Queue<OrderInfo>();
        public static object obj = new object();
        static void Main(string[] args)
        {
            #region 模仿淘宝处理订单问题
            //订单进入队列等待
            Task OrderTask = new Task(CreateOrder);
            OrderTask.Start();

            //开启线程处理订单
            Task taskDeal = new Task(DealOrder);
            taskDeal.Start();
            Console.WriteLine("hello");
            #endregion

            Console.ReadKey();
        }
        public static void CreateOrder()
        {
            for (int i = 1; i < 50; i++)
            {
                Thread.Sleep(300);
                lock (obj)
                {
                    OrderInfo order = new OrderInfo();
                    order.OrderId = i;
                    order.ProductId = 2800 + i;
                    order.Price = 888;
                    order.Remarks = "quick send goods";
                    queue.Enqueue(order);
                    Console.WriteLine("添加了一条订单" + i);
                }
            }
        }

        public static int flag = 0;
        public static void DealOrder()
        {
            while (true)
            {
                Thread.Sleep(500);
                if (queue.Count > 0)
                {
                    lock (obj)
                    {
                        if (queue.Count > 0)
                        {
                            OrderInfo order = queue.Dequeue();
                            Console.WriteLine("处理==>订单号{0};商品:{1}价格:{2}", order.OrderId, order.ProductId, order.Price);
                        }
                    }
                }
                else 
                {
                    Thread.Sleep(2000);
                    flag++;
                    if (flag > 10) { Console.WriteLine("All Over"); break; }
                    lock (obj)
                    {
                        if (queue.Count <= 0)
                        {
                            Console.WriteLine("订单处理完成,等待中。。。");
                        }
                    }
                }
            }

        }
    }

  基于队列的进程通信:

2、到调节实行

3.队列

    public struct OrderInfo
    {
        public int OrderId { get; set; }
        public int ProductId { get; set; }
        public Decimal Price { get; set; }
        public string Remarks { get; set; }
    }

View Code

图片 1图片 2

以模拟抢票为例:

图片 3图片 4

总结:

图片 5图片 6

  加锁可以保障几个进度修改同一块数据时,同权且间只可以有一个职责能够举办修改,即串行的修改,速度是慢了,但牺牲了快慢却保险了数码安全。

一、新建1个订单OrderInfo(用的是构造)

总结:

那是摘抄英特网的。做了个demo,但我技艺有限,如有创新的地点,接待大神多多教导。

 1 ''' 2 q = Queue([maxsize])  3 创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。  4 Queue的实例q具有以下方法: 5  6 q.get( [ block [ ,timeout ] ] )  7 返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。 8  9 q.get_nowait 10 同q.get方法。11 12 q.put(item [, block [,timeout ] ] ) 13 将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。14 15 q.qsize() 16 返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。17 18 19 q.empty() 20 如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。21 22 q.full() 23 如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty24 25 '''

壹、什么是队列

总结:

队列(Queue)代表了二个先进先出的对象集结。当您需求对每一种举行先进先出的访问时,则动用队列。当你在列表中增添一项,称为入队,当你从列表中移除一项时,称为出队

稍复杂示例参考

测试截图如下:

依赖队列来落成1个劳动者消费者模型:

贰、利用队列来拍卖订单难点

 1 import time 2 from multiprocessing import Process 3  4 def func1: 5     time.sleep(1) 6     print('我是func1',m) 7  8  9 # 注意:进程之间是互相独立的,主进程代码运行结束,不管有没有运行完,守护进程随即终止10 if __name__ == '__main__':11     p = Process(target=func1,args=(666,))12     p.daemon = True  # 守护进程,在start之前13     p.start()14 15     print('主进程执行结束')

View Code

  加锁,由并发改成了串行,就义了运转作效果能,但幸免数据竞争

View Code

简单来讲示例图片 7图片 8

 思路:开启叁个线程去创设订单,同时管理订单线程开启,如队列里有订单,那么就管理,直到未有就一贯处在等候状态(这里作者等候二十五次就退出),等待订单进入队列再持续管理订单。

 1 # 解耦合 2 import time 3 from multiprocessing import Process,Queue 4  5  6 def producer: 7     for i in range(10): 8         time.sleep(0.5) 9         q.put('包子%s号'%i)10         print('包子%s号做好了'%i)11     q.put  # None表示没有 防止后面死循环12 13 14 def consumer:15     while 1:16         baozi = q.get()17         if baozi == None:18             break19         time.sleep(1)20         print('%s被吃掉了'%baozi)21 22 23 if __name__ == '__main__':24     q = Queue  # 创建一个队列,耦合生产者和消费者,p1和p2共享q(独立于进程的一个空间)25     p1 = Process(target=producer,args=26     p2 = Process(target=consumer,args=27     p1.start()28     p2.start()

 

  方法介绍:

 1 # 生产者消费者模型总结 2  3 # 程序中有两类角色 4 一类负责生产数据 5 一类负责处理数据 6  7 # 引入生产者消费者模型为了解决的问题是: 8 平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度 9 10 # 如何实现:11 生产者 < -->队列 <—— > 消费者12 # 生产者消费者模型实现类程序的解耦和13 14 生产者消费者模型总结
 1 # 与queque类似,多了 q.task_done()  q.join() 2 from multiprocessing import Process,JoinableQueue 3 import time,random,os 4  5  6 def consumer: 7     while True: 8         res=q.get() 9         # time.sleep(random.randint10         time.sleep(random.random11         print('\033[45m%s 吃 %s\033[0m' %(os.getpid12         q.task_done() # 向q.join()发送一次信号,证明一个数据已经被取走并执行完了13 14 15 def producer:16     for i in range(10):17         # time.sleep(random.randint18         time.sleep(random.random19         res='%s%s' %20         q.put21         print('\033[44m%s 生产了 %s\033[0m' %(os.getpid22     print('%s生产结束'%name)23     q.join() # 生产完毕,使用此方法进行阻塞,直到队列中所有项目均被处理。24     print('%s生产结束~~~~~~'%name)25 26 27 if __name__ == '__main__':28     q=JoinableQueue()29     # 生产者们:即厨师们30     p1=Process(target=producer,args=('包子',q))31     p2=Process(target=producer,args=('骨头',q))32     p3=Process(target=producer,args=('泔水',q))33 34     # 消费者们:即吃货们35     c1=Process(target=consumer,args=36     c2=Process(target=consumer,args=37     c1.daemon=True38     c2.daemon=True39     # 如果不加守护,那么主进程结束不了,但是加了守护之后,必须确保生产者的内容生产完并且被处理完了,所有必须还要在主进程给生产者设置join,才能确保生产者生产的任务被执行完了,并且能够确保守护进程在所有任务执行完成之后才随着主进程的结束而结束。40 41     # 开始42     p_l=[p1,p2,p3,c1,c2]43     for p in p_l:44         p.start()45 46     p1.join() # 我要确保你的生产者进程结束了,生产者进程的结束标志着你生产的所有的人任务都已经被处理完了47     p2.join()48     p3.join()49     print('主程序')

一.进度同步/串行

View Code

1 #JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。2 3    #参数介绍:4     maxsize是队列中允许最大项数,省略则无大小限制。    5   #方法介绍:6     JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:7     q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常8     q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止,也就是队列中的数据全部被get拿走了。

图片 9

  在线程世界里,生产者正是生产数据的线程,消费者便是成本数量的线程。在八线程开拓个中,借使劳动者管理速度十分的快,而消费者管理速度异常的慢,那么生产者就必须等待顾客管理完,技能继续生产数据。同样的道理,假设消费者的拍卖才干超过生产者,那么消费者就不可能不待产者。为了消除那几个标题大家需求通过多少个容器来缓慢解决劳动者和消费者的强耦合难点。

  

  有八个生产者和多个买主时,由于队列是进程安全的,三个进度拿走了结束非确定性信号,别的1个进度就拿不到了,所以接纳时需求消费者发送新闻给劳动者已使用。

相关文章