利用 Python 实现抓图程序

这些天除了忙交大的复试,还一直忙于用 Python 编写抓图程序。好在前几天收到了短信,心总算放了下来,毕竟这一年在思过崖的面壁得到了回报。然而,在西安找实习工作时却处处碰壁,我想说考官大姐们你们可不能以貌取人啊,凭什么他比我帅你们就招他了……算了,不说了,男儿有泪不轻弹,只是未到桑心处。

程序的功能基本已经实现,可是原先仅仅考虑到抓一个网站的图片,当换一个网站,却又得重新编写 HTML 解析代码,好不麻烦。所以,便想着利用设计模式重构代码,使其可应用与大多数图片网站,甚至应用于视频网站。_因为程序仍处于开发期,所以在此我并不能透漏具体要抓取的页面地址,实在抱歉。_下面我们来看看该程序具体如何实现:

一、ImageLister 类

首先,我设计了一个 ImageLister 类,主要负责解析 HTML 页面(依赖于 BeautifulSoup3),返回图片 URL,当页面有分页时,自动检测分页,顺序分析所有分页。类设计如图 1。

图 1 

ImageLister 类的实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# imagelister.py
# -*- coding: utf-8 -*-

from BeautifulSoup import BeautifulSoup
from urlparse import urljoin
from re import sub
from ulib import uopen, uclose

class ImageLister(object):
def __init__(self, first_page):
self.first_page = first_page
self.title = ""
self.info = ""
self.pages = []
self.images = []

def getFirstPage(self):
return self.first_page

def getPages(self):
return self.pages

def getImages(self):
self.images = self.anlzAllImageUrls()
return self.images

def getTitle(self):
return self.title

def getInfo(self):
return self.info

def getHtmlSrc(self, url):
u = uopen(url)
src = u.read()
uclose(u)
return src

# 分析页面标题
def anlzTitle(self, data):
soup = BeautifulSoup(data, fromEncoding="gb18030")
title = soup.html.head.title.string.strip()
return title

def anlzAllPageUrls(self):
pass

def anlzAllImageUrls(self):
pass

ImageListerA 类的实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# imagelistera.py
# -*- coding: utf-8 -*-

from imagelister import *

class ImageListerA(ImageLister):
    def __init__(self, first_page):
        super(ImageListerA, self).__init__(first_page)
        data = self.getHtmlSrc(first_page)
        self.title = self.anlzTitle(data)
        self.info = self.anlzInfo(data)
        self.pages = self.anlzAllPageUrls(data, first_page)
    
    # 分析页面简介
    # 该函数实现部分不必深究,具有页面特异性
    def anlzInfo(self, data):
        soup = BeautifulSoup(data, fromEncoding="gb18030")
        comment = soup.find("div", {"class": "comment2"})
        info = comment.find("span", {"class": "i_user"}).string
        info += "\n"
        contents = comment.find("font", {"color": "#999999"}).contents
        for content in contents:
            temp_con = content.strip()
            info += sub(r"<br(\s*\/)?>", "\n", temp_con)
        return info
        
    # 分析得到所有分页页面链接
    # 该函数实现部分不必深究,具有页面特异性
    def anlzAllPageUrls(self, data, first_page):
        soup = BeautifulSoup(data, fromEncoding="gb18030")
        pagination = soup.find("div", {"id": "pagination"})
        alinks = pagination.findAll("a")[1:-3]
        pages = []
        pages.append(first_page)
        for alink in alinks:
            page = urljoin(first_page, alink["href"])
            pages.append(page)
        return pages

    # 分析所有分页得到所有图片链接
    # 该函数实现部分不必深究,具有页面特异性
    def anlzAllImageUrls(self):
        pages = self.pages
        images = []
        for page in pages:
            u = uopen(page)
            data = u.read()
            soup = BeautifulSoup(data, fromEncoding="gb18030")
            imglinks = soup.findAll("img", {"class": "IMG_show"})
            for imglink in imglinks:
                image = imglink["src"]
                images.append(image)
        return images

# 单元测试 only
if __name__ == "__main__":
lister = ImageListerA("**************************************.htm")
print lister.getTitle()
print lister.getInfo()
print lister.getPages()
print "\n".join(lister.getImages())

同样地,ImageListerB 根据抓取网页的不同,而重写 ImageLister 中的方法,这样,换一个网站,只需要新创建一个继承于 ImageLister 的 ImageListerXXX 类,实现适合于该网站的 HTML 解析算法即可。

其中,ulib.py 是我自己写的库,提供了带重试功能的 url 访问函数以及其他的一些常用的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# -*- coding: utf-8 -*-
# ulib.py

from urllib2 import urlopen, HTTPError, URLError
from time import sleep

def uopen(url, verbose=True):
retryTimes = 5
sleepTime = 10
while retryTimes > 0:
try:
if verbose:
print u"正在读取:", url
u = urlopen(url)
# 读取成功
if u.code == 200:
return u
elif u.code == 201:
break
except HTTPError, e:
print e
if e.code == 404:
break
except URLError, e:
print e
except BaseException, e:
print e
retryTimes -= 1
if verbose:
print u"读取失败,等待重试……"
sleep(sleepTime)
return None

def uclose(u):
u.close()

# 格式化文件大小
# 如 10 => "10B", 1024 => "1KB"...
def formatSize(size):
if size > pow(1024, 2):
new_size = size / pow(1024, 2)
postfix = "MB"
elif size > 1024:
new_size = size / 1024
postfix = "KB"
else:
new_size = size
postfix = "B"
strsize = "%.2f" % new_size
return strsize + postfix

二、ImageCatcher 类

ImageCatcher 类主要负责从 ImageLister 类得到图片 url,再将其存入本地(dirname = save_path + title),同时保存页面的地址(first_page)、标题(title)以及备注信息(info)。

ImageCatcher 类的实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# -*- coding: utf-8 -*-
# imagecatcher.py

import os
from time import clock
import urlparse

from imagelistera import ImageListerA
from ulib import uopen, uclose, formatSize

IMAGE_URL_FILE = "image_urls.txt"
IMAGE_INFO_FILE = "image_info.txt"

class ImageCatcher(object):
def __init__(self, save_path, image_lister):
self.image_lister = image_lister
self.save_path = save_path

self.first_page = image_lister.getFirstPage()
self.title = image_lister.getTitle()
self.info = image_lister.getInfo()
self.dirname = os.path.join(save_path, self.title);

self.__createDir(self.dirname, verbose=False)
#print self.first_page
print self.title
#print self.info
#print self.dirname
self.downAllImages()


# 创建图片文件夹
def __createDir(self, dirname, verbose=True):
if not os.path.exists(dirname):
os.makedirs(dirname)
if verbose:
print u"已创建:%s" % dirname
return True
else:
if verbose:
print u"已存在:%s" % dirname
return False

# 下载所有图片
def downAllImages(self, verbose=True):
filename = os.path.join(self.dirname, IMAGE_URL_FILE)
# 通过文件静态获取
if os.path.exists(filename):
if verbose:
print u"已存在:%s" % filename
images = self.__readImageUrls(filename, verbose)
# 远程读取 url,并保存至文件
else:
images = self.__saveImageUrls(filename, verbose)
self.images = images

imageNum = len(images)
i = 0
for image in images:
i += 1
if verbose:
print "%d/%d" % (i, imageNum)
print image
self.__saveImage(image)

# 保存信息文件
filename = os.path.join(self.dirname, IMAGE_INFO_FILE)
self.__saveInfo(filename, verbose)

# 通过文件静态获取图片 URL
def __readImageUrls(self, filename, verbose=True):
f = open(filename, "r")
images = []
for line in f:
images.append(line.rstrip("\n"))
f.close()
if verbose:
print u"搜索到:%d 张" % len(images)
return images

# 远程读取图片 URL,并保存至文件
def __saveImageUrls(self, filename, verbose=True):
images = self.image_lister.getImages()
if verbose:
print u"搜索到:%d 张" % len(images)
f = open(filename, "w")
for image in images:
f.write(image)
f.write("\n")
f.close()
if verbose:
print u"已写入:%s" % filename
return images

def __saveImage(self, url, verbose=True):
basename = url.split("/")[-1]
dirname = self.dirname
assert(os.path.exists(dirname))
filename = os.path.join(dirname, basename)

file_size = 0
if os.path.exists(filename):
print u"文件已存在:%s" % filename
else:
u = uopen(url)
if u is None:
return
block_size = 8192
downloaded_size = 0
length = u.info().getheaders("Content-Length")
if length:
file_size = int(length[0])
print u"文件大小:%s" % formatSize(file_size)

f = open(filename, "wb")
print u"正在下载:%s" % url
start = clock()
try:
while True:
buffer = u.read(block_size)
if not buffer: # EOF
break
downloaded_size += len(buffer);
f.write(buffer)

# 显示下载进度
if file_size:
print "%2.1f%%\r" % (float(downloaded_size * 100) / file_size),
else:
print '...'
except BaseException, e:
print e
f.close()
if os.path.exists(filename):
os.remove(filename)
print u"已删除损坏的文件:%s", filename
exit()
finally:
uclose(u)
f.close()
print u"文件已保存:%s" % os.path.abspath(filename)
end = clock()
spend = end - start
print u"耗时:%.2f 秒" % spend
print u"平均速度:%.2fKB/s" % (float(file_size) / 1024 / spend)


# 保存信息文件
# 文件包括:url, title, info
def __saveInfo(self, filename, verbose=True):
if not os.path.exists(filename):
info = self.image_lister.getInfo()
f = open(filename, "w")
f.write(self.first_page)
f.write("\n")
# 注意此处以将 Unicode 转换为 UTF-8 保存
if self.title:
f.write(self.title.encode("utf-8"))
f.write("\n")
if self.info:
f.write(self.info.encode("utf-8"))
f.write("\n")
if verbose:
print u"已写入:%s" % filename
return True
else:
if verbose:
print u"已存在:%s" % filename
return False

# 单元测试 only
if __name__ == "__main__":
lister = ImageListerA("http://*************************************.htm")
#lister = ImageListerB("http://*************************************.htm")
ImageCatcher("pics", lister) # "pics" 为相对路径

三、ulib 库

ulib 是我自己实现 URL 处理库(多谢 ouats 的提醒),代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# -*- coding: utf-8 -*-
import urllib2
from urllib2 import Request, urlopen, HTTPError, URLError
from time import sleep
import socket

RETRY_TIMES = 5
SLEEP_TIME = 10

def uopen(url, headers={}, timeout=None, verbose=True):
retryTimes = RETRY_TIMES
sleepTime = SLEEP_TIME

if headers:
try:
r = Request(url, headers=headers)
u = urlopen(r)
except HTTPError, e:
print e
print u"服务器已禁止断点续传"
else:
return u

while retryTimes > 0:
try:
u = urlopen(url)
if verbose:
print u"正在连接:", url
# 连接成功
if u.code == 200:
return u
elif u.code == 201:
break
except HTTPError, e:
print e
if e.code == 404:
break
except URLError, e:
print e
except socket.timeout, e:
print u"连接超时,等待重试……"
except KeyboardInterrupt, e:
print u"用户强制中止"
exit()
except BaseException, e:
print e
retryTimes -= 1
#if verbose:
# print u"读取失败,等待重试……"
try:
# 少量多次,见机中止
while sleepTime > 0:
sleep(1)
sleepTime -= 1
except KeyboardInterrupt, e:
print u"用户强制中止"
if retryTimes == 0:
exit()
return None

def uread(u):
data = u.read()
return data

def uclose(u):
u.close()

# 格式化文件大小
# 如 10 => "10B", 1024 => "1.00KB"...
def formatSize(size):
if size > pow(1024, 2):
new_size = size / pow(1024, 2)
postfix = "MB"
elif size > 1024:
new_size = size / 1024
postfix = "KB"
else:
new_size = size
postfix = "B"
strsize = "%.2f" % new_size
return strsize + postfix

if __name__ == '__main__':
# 单元测试
pass

注:在编写过程中,我一如既往地遇到了令人绝望的编码问题。我开发平台用的是 Windows 7,Python 2.7,折腾了好久,最后终于得到一个比较完美解决乱码问题的方法,即:

  1. 在程序中无论何时都使用 unicode 处理字符串,因为 BeautifulSoup3 默认返回 unicode,我们要做的只是给自己的字符串前加一个 ‘u’,然后尽情地使用 unicode 吧。

  2. 可以直接将 unicode 输出到 IDLE 或 cmd.exe,系统会自动转换为 gbk 输出(前提是你系统的代码页是 cp936 或 gbk)。

  3. 保存文本文件时,将 unicode 转换为 utf-8 存入,读取时,将 utf-8 转换为 unicode。

  4. 至于 linux 下,我还没有试过,改天测试一下。

(未完待续)

Author

whypro

Posted on

2013-04-01

Updated on

2022-11-11

Licensed under

Comments

Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×