首页 文章资讯内容详情

Python爬取视频(其实是一篇福利)

2026-06-01 2 花语

本文内容纲要:

窗外下着小雨,作为单身程序员的我逛着逛着发现一篇好东西,来自知乎你都用Python来做什么?的第一个高亮答案。

到上面去看了看,地址都是明文的,得,赶紧开始吧。

下载流式文件,requests库中请求的stream设为True就可以啦,文档在此。

先找一个视频地址试验一下:

#-*-coding:utf-8-*- importrequests defdownload_file(url,path): withrequests.get(url,stream=True)asr: chunk_size=1024 content_size=int(r.headers[content-length]) print下载开始 withopen(path,"wb")asf: forchunkinr.iter_content(chunk_size=chunk_size): f.write(chunk) if__name__==__main__: url=就在原帖... path=想存哪都行 download_file(url,path)

遭遇当头一棒:

AttributeError:__exit__

这文档也会骗人的么!

看样子是没有实现上下文需要的__exit__方法。既然只是为了保证要让r最后close以释放连接池,那就使用contextlib的closing特性好了:

#-*-coding:utf-8-*- importrequests fromcontextlibimportclosing defdownload_file(url,path): withclosing(requests.get(url,stream=True))asr: chunk_size=1024 content_size=int(r.headers[content-length]) print下载开始 withopen(path,"wb")asf: forchunkinr.iter_content(chunk_size=chunk_size): f.write(chunk)

程序正常运行了,不过我盯着这文件,怎么大小不见变啊,到底是完成了多少了呢?还是要让下好的内容及时存进硬盘,还能省点内存是不是:

#-*-coding:utf-8-*- importrequests fromcontextlibimportclosing importos defdownload_file(url,path): withclosing(requests.get(url,stream=True))asr: chunk_size=1024 content_size=int(r.headers[content-length]) print下载开始 withopen(path,"wb")asf: forchunkinr.iter_content(chunk_size=chunk_size): f.write(chunk) f.flush() os.fsync(f.fileno())

文件以肉眼可见的速度在增大,真心疼我的硬盘,还是最后一次写入硬盘吧,程序中记个数就好了:

defdownload_file(url,path): withclosing(requests.get(url,stream=True))asr: chunk_size=1024 content_size=int(r.headers[content-length]) print下载开始 withopen(path,"wb")asf: n=1 forchunkinr.iter_content(chunk_size=chunk_size): loaded=n*1024.0/content_size f.write(chunk) print已下载{0:%}.format(loaded) n+=1

结果就很直观了:

已下载2.579129% 已下载2.581255% 已下载2.583382% 已下载2.585508%

心怀远大理想的我怎么会只满足于这一个呢,写个类一起使用吧:

#-*-coding:utf-8-*- importrequests fromcontextlibimportclosing importtime defdownload_file(url,path): withclosing(requests.get(url,stream=True))asr: chunk_size=1024*10 content_size=int(r.headers[content-length]) print下载开始 withopen(path,"wb")asf: p=ProgressData(size=content_size,unit=Kb,block=chunk_size) forchunkinr.iter_content(chunk_size=chunk_size): f.write(chunk) p.output() classProgressData(object): def__init__(self,block,size,unit,file_name=,): self.file_name=file_name self.block=block/1000.0 self.size=size/1000.0 self.unit=unit self.count=0 self.start=time.time() defoutput(self): self.end=time.time() self.count+=1 speed=self.block/(self.end-self.start)if(self.end-self.start)>0else0 self.start=time.time() loaded=self.count*self.block progress=round(loaded/self.size,4) ifloaded>=self.size: printu%s下载完成\r\n%self.file_name else: printu{0}下载进度{1:.2f}{2}/{3:.2f}{4}下载速度{5:.2%}{6:.2f}{7}/s.\ format(self.file_name,loaded,self.unit,\ self.size,self.unit,progress,speed,self.unit) print%50s%(/*int((1-progress)*50))

运行:

下载开始 下载进度10.24Kb/120174.05Kb0.01%下载速度4.75Kb/s ///////////////////////////////////////////////// 下载进度20.48Kb/120174.05Kb0.02%下载速度32.93Kb/s /////////////////////////////////////////////////

看上去舒服多了。

下面要做的就是多线程同时下载了,主线程生产url放入队列,下载线程获取url:

#-*-coding:utf-8-*- importrequests fromcontextlibimportclosing importtime importQueue importhashlib importthreading importos defdownload_file(url,path): withclosing(requests.get(url,stream=True))asr: chunk_size=1024*10 content_size=int(r.headers[content-length]) ifos.path.exists(path)andos.path.getsize(path)>=content_size: print已下载 return print下载开始 withopen(path,"wb")asf: p=ProgressData(size=content_size,unit=Kb,block=chunk_size,file_name=path) forchunkinr.iter_content(chunk_size=chunk_size): f.write(chunk) p.output() classProgressData(object): def__init__(self,block,size,unit,file_name=,): self.file_name=file_name self.block=block/1000.0 self.size=size/1000.0 self.unit=unit self.count=0 self.start=time.time() defoutput(self): self.end=time.time() self.count+=1 speed=self.block/(self.end-self.start)if(self.end-self.start)>0else0 self.start=time.time() loaded=self.count*self.block progress=round(loaded/self.size,4) ifloaded>=self.size: printu%s下载完成\r\n%self.file_name else: printu{0}下载进度{1:.2f}{2}/{3:.2f}{4}{5:.2%}下载速度{6:.2f}{7}/s.\ format(self.file_name,loaded,self.unit,\ self.size,self.unit,progress,speed,self.unit) print%50s%(/*int((1-progress)*50)) queue=Queue.Queue() defrun(): whileTrue: url=queue.get(timeout=100) ifurlisNone: printu全下完啦 break h=hashlib.md5() h.update(url) name=h.hexdigest() path=e:/download/+name+.mp4 download_file(url,path) defget_url(): queue.put(None) if__name__==__main__: get_url() foriinxrange(4): t=threading.Thread(target=run) t.daemon=True t.start()

加了重复下载的判断,至于怎么源源不断的生产url,诸位摸索吧,保重身体!

本文内容总结:

原文链接:https://www.cnblogs.com/linxiyue/p/8244724.html