队列及其Python实现
一、什么是队列?
- 队列是一种有次序的数据集合,其特征是:新数据项的添加总是发生在一端(通常称为:"尾rear"端),而现存数据项的移除总发生在另一端(通常称为:“首front”端)
当数据项加入队列,首先出现在队尾,随着队首数据项的移除,它逐渐接近队首。
新加入的数据项必须在数据集末尾等待,而等待时间最长的数据项则是队首。
这种次序安排的原则称为(FIFO:First-in-first-out)先进先出
队列仅有一个入口和一个出口:不允许数据项直接插入队中,也不允许从中间移除数据项。
队列例子:打印机面向多个用户提供服务、进程调度等等。
二、抽象数据类型Queue
抽象数据类型Queue由如下操作定义:
Queue(): 创建一个空队列对象,返回值为Queue对象;
enqueue(item): 将数据项item添加到队尾,无返回值;
dequeue(): 从队首移除数据项,返回值为队首数据项,队列被修改;
isEmpty(): 测试是否空队列,返回值为布尔值
size(): 返回队列中数据项的个数
☆ 采用List来容纳Queue的数据项:
将List的首端作为队列的尾端,List的末端作为队列首端,enqueue()复杂度为O(n),dequeue()复杂度为O(1). 首尾倒过来的实现,复杂度也倒过来
class Queue:
def __init__(self):
self.items = []
def isEmpty(self):
return self.items == []
def enqueue(self, item):
self.items.insert(0, item)
def dequeue(self):
return self.items.pop()
def size(self):
return len(self.items)
三、队列的应用
-
热土豆问题(约瑟夫问题、击鼓传花)
模拟程序采用队列来存放所有参加游戏的人名,按照传递土豆的方向从队首排到队尾,游戏时,队首始终是持有土豆的人,模拟游戏开始,只需要将队首的人出队,随即再到队尾入队,算是土豆的一次传递,传递了num次后,将队首的人移除,不再入队,如此反复,直到队列中剩余一人。
def hotPotato(namelist, num):
"""
:param namelist: 参加游戏的人
:param num: 每一轮传递几次
:return:最后剩下的人
"""
simqueue = Queue()
for name in namelist:
simqueue.enqueue(name)
while simqueue.size() > 1:
for i in range(num):
simqueue.enqueue(simqueue.dequeue())
# 一次传递,for将一次传递循环n次
# 将队首的人挪出去不再回来
simqueue.dequeue()
return simqueue.dequeue()
print(hotPotato(['Bill', 'David', 'Susan', 'Jane', 'Kent', 'Brad'], 7))
2.幸运的基督徒
《幸运的基督徒》
有15个基督徒和15个非基督徒在海上遇险,为了能让一部分人活下来不得不将其中15个人扔到海里面去,有个人想了个办法就是大家围成一个圈,由某个人开始从1报数,报到9的人就扔到海里面,他后面的人接着从1开始报数,报到9的人继续扔到海里面,直到扔掉15个人。由于上帝的保佑,15个基督徒都幸免于难,问这些人最开始是怎么站的,哪些位置是基督徒哪些位置是非基督徒。
import queue
def josephus(number, num, q):
# number:人员总数, num:报到num的人处决 q: 幸存的人数
numbers = queue.Queue()
for _ in range(1, number + 1):
numbers.put(_)
# print(numbers.get(), end='')
print()
while numbers.qsize() > q:
for _ in range(num-1):
numbers.put(numbers.get())
numbers.get()
for i in range(q):
print(f'{numbers.get()}基', end=' ')
return None
print(josephus(30, 9, 15))
# 另一种解决方法:
def main():
persons = [True] * 30
counter, index, number = 0, 0, 0
while counter < 15:
# 共杀掉39个人
if persons[index]:
number += 1
# 计数死亡的位置
if number == 9:
# 当数到9 ,死
persons[index] = False
counter += 1
number = 0
index += 1
# 索引向下走
index %= 30
# 围成一个圈
for person in persons:
print('基' if person else '非', end='')
if __name__ == '__main__':
main()
3.模拟算法:打印任务
多人共享一台打印机,采取“先到先服务”的队列策略来执行打印任务,在这种设定下,首要的问题就是: 在这种打印作业系统的容量有多大?在能够接收的等待时间内,系统能容纳多少用户以多高频率提交多少打印任务?
★一个具体的实例配置如下:
一个实验室内,在任意一个小时内,大约有10名学生在场,这一个小时中,每人会发起2次左右的打印,每次1~20页。
★打印机的性能是:
以草稿模式打印的话,每分钟10页。
以正常模式打印的话,打印质量好,但速度下降为每分钟5页。
问题是: 怎么设定打印机的模式,让大家都不会等太久的前提下尽量提高打印质量?
这是一个典型的决策支持问题,但无法通过规则直接计算。
我们要用一段程序来模拟这种打印任务场景,然后对程序运行结果进行分析,以支持对打印机模式设定的决策。
如何对问题建模?
首先对问题进行抽象,确定相关的对象和过程: 抛弃那些对问题实质没有关系的学生性别、年龄、打印机型号、打印内容、纸张大小等等众多细节。
★ 对问题抽象后的对象: 打印任务、打印队列、打印机
打印任务的属性: 提交时间、打印页数
打印队列的属性: 具有FIFO性质的打印任务队列
打印机的属性: 打印速度、是否忙
★ 过程:生成和提交打印任务
确定生成概率:实例为 每小时会有10个学生提交的20个作业,这样,概率是每180秒会有1个作业生成并提交,概率为每秒1/180
确定打印页数:实例是1~20页,那么就是1 ~20页之间的概率相同。
\frac{20 tasks}{1 hour} \frac{1 hour}{60 minutes} \frac{1 minute}{60 seconds} *=\frac{1 tasks}{180 secons}
★过程:实施打印
当前的打印作业:正在打印的作业
打印结束倒计时:新作业开始打印时开始倒计时,回0表示打印完毕,可以处理下一个作业。
★模拟时间:
统一的时间框架:以最小单位秒均匀流逝的时间,设定结束时间。
同步所有过程:在一个时间单位里,对生成打印任务和实施打印两个过程各处理一次。
☆ 模拟流程:
★创建打印队列的对象
★时间按照秒的单位流逝
按照概率生成打印作业,加入打印队列。
若打印机空闲,且队列不空,则取出队首作业打印,记录此作业等待时间。
若打印机忙,则按照打印速度1秒打印。
若当前作业打印完成,则打印机进入空闲。
★时间用尽,开始统计平均等待时间。
★作业的等待时间:
生成作业时,记录生成的时间戳。
开始打印时,当前时间减去生成时间即可。
★作业的打印时间
生成作业时,记录作业的页数。
开始打印时,页数除以打印速度即可。
import random
class Queue:
def __init__(self):
self.items = []
def isEmpty(self):
return self.items == []
def enqueue(self, item):
self.items.insert(0, item)
def dequeue(self):
return self.items.pop()
def size(self):
return len(self.items)
class Printer:
def __init__(self, ppm):
"""
currentTask: 打印任务
timeRemaining: 任务倒计时
:param ppm: 打印速度
"""
self.pagerate = ppm
self.currentTask = None
self.timeRemaining = 0
def tick(self):
""" 打印 1秒!"""
if self.currentTask is not None:
self.timeRemaining -= 1
if self.timeRemaining <= 0:
self.currentTask = None
def busy(self):
"""打印机是否忙?"""
if self.currentTask is not None:
return True
else:
return False
def startNext(self, newtask):
"""打印新作业!"""
self.currentTask = newtask
self.timeRemaining = newtask.getPages() * 60 / self.pagerate
class Task:
def __init__(self, time):
""" 生成时间戳!"""
self.timestamp = time
# 打印页数
self.pages = random.randint(1, 21)
def getStamp(self):
return self.timestamp
def getPages(self):
return self.pages
def waitTime(self, currenttime):
return currenttime - self.timestamp
def newPrintTask():
num = random.randint(1, 180)
if num == 180:
return True
else:
return False
def simulation(numSeconds, pagesPerMinute):
#模拟打印,传入打印机使用时间,打印模式(一分钟多少页)
labprinter = Printer(pagesPerMinute)
printQueue = Queue()
waitingtimes = []
# 时间流逝
for currentSecond in range(numSeconds):
if newPrintTask():
task = Task(currentSecond)
printQueue.enqueue(task)
if (not labprinter.busy()) and (not printQueue.isEmpty()):
nexttask = printQueue.dequeue()
waitingtimes.append(nexttask.waitTime(currentSecond))
labprinter.startNext(nexttask)
labprinter.tick()
averageWait = sum(waitingtimes) / len(waitingtimes)
print('Average Wait %.2f secs %3d tasks remaining.' % (averageWait, printQueue.size()))
for i in range(10):
simulation(3600, 10)
# Average Wait 12.94 secs 0 tasks remaining.
# Average Wait 14.82 secs 2 tasks remaining.
# Average Wait 10.75 secs 0 tasks remaining.
# Average Wait 13.50 secs 0 tasks remaining.
# Average Wait 37.04 secs 0 tasks remaining.
# Average Wait 30.11 secs 0 tasks remaining.
# Average Wait 23.12 secs 0 tasks remaining.
# Average Wait 33.46 secs 0 tasks remaining.
# Average Wait 31.94 secs 0 tasks remaining.
# Average Wait 10.00 secs 1 tasks remaining.
for i in range(10):
simulation(3600, 5)
# Average Wait 156.32 secs 1 tasks remaining.
# Average Wait 69.20 secs 7 tasks remaining.
# Average Wait 174.59 secs 2 tasks remaining.
# Average Wait 44.79 secs 0 tasks remaining.
# Average Wait 104.91 secs 1 tasks remaining.
# Average Wait 41.62 secs 0 tasks remaining.
# Average Wait 96.35 secs 1 tasks remaining.
# Average Wait 87.29 secs 0 tasks remaining.
# Average Wait 211.71 secs 0 tasks remaining.
# Average Wait 199.82 secs 2 tasks remaining.
通过两种情况模拟仿真结果的分析,我们认识到如果有那么多学生要拿着打印好的程序源赶去上课的话,那么就必须得牺牲打印质量,提高打印速度。
模拟系统对现实的仿真,在不耗费现实资源的情况下,有时候真实的实现是无法进行的,可以以不同的设定,反复多次模拟,来帮助我们决策。
四、什么是双端队列(Deque)?
☆ 双端队列:是一种有次序的数据集,跟队列相似,其两端可以称作“首”“尾”端,但deque中数据项既可以从 队首加入,也可以从队尾加入;数据项也可以从两端移除。
某种意义上来说,双端队列集成了栈和队列的能力。
双端队列并不具有内在的LIFO 或者 FIFO特性,若用双端队列来模拟栈或者队列,那么需要由使用者自行维护操作的一致性。
☆ 抽象数据类型Deque
Deque定义的操作如下:
Deque(): 创建一个空双端队列
addFront(item): 将item加入队首
addRear(item): 将item加入队尾
removeFront(): 从队首移除数据项,返回值为移除的数据项
removeRear(): 从队尾移除数据项,返回值为移除的数据项
isEmpty(): 返回deque是否为空
size(): 返回deque中包含数据项的个数
采用List实现,List[0] 作为deque的尾端, List[-1] 作为deque的首端
操作复杂度:addFront/removeFront: O(1) ,addRear/removeRear: O(n)
class Deque():
def __init__(self):
self.items = []
def isEmpty(self):
return self.items == []
def addFront(self, item):
self.items.append(item)
def addRear(self, item):
self.items.insert(0, item)
def removeFront(self):
return self.items.pop()
def removeRear(self):
return self.items.pop(0)
def size(self):
return len(self.items)
☆ 回文词判定 (回文数同理)
回文词指的是 正读和反读都一样的词, 如 radar, madam,toot
中文: 上海自来水来自海上,山东落花生花落东山
★ 用双端队列很容易解决回文词问题
先将需要判定的词语从队尾加入deque,再从两端同时移除字符判定是否相同,直到deque中剩下 0 或 1 个字符
# 回文词判断
def palchecker(aString):
chardeque = Deque()
for ch in aString:
chardeque.addRear(ch)
stillEqual = True
while chardeque.size() > 1 and stillEqual:
first = chardeque.removeFront()
last = chardeque.removeRear()
if first != last:
stillEqual = False
return stillEqual
print(palchecker("sdjfhsdjgh"))
print(palchecker('radar'))
print(palchecker(('123321')))
# False
# True
# True