Skip to content

Commit f2d98b2

Browse files
committed
update
1 parent 6758e37 commit f2d98b2

File tree

7 files changed

+100
-30
lines changed

7 files changed

+100
-30
lines changed

api/api.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
# encoding: utf-8
22

33
import os
4+
import logging
45
from flask import Flask
56
from flask import jsonify, request, redirect, send_from_directory
67

8+
log = logging.getLogger('werkzeug')
9+
log.disabled = True
10+
711
try:
812
from db import conn
913
except:
@@ -123,9 +127,11 @@ def after_request(resp):
123127
return resp
124128
app.after_request(after_request)
125129

126-
def main():
130+
def main(proc_lock):
131+
if proc_lock is not None:
132+
conn.set_proc_lock(proc_lock)
127133
# 因为默认sqlite3中,同一个数据库连接不能在多线程环境下使用,所以这里需要禁用flask的多线程
128134
app.run(host='0.0.0.0', port=5000, threaded=False)
129135

130136
if __name__ == '__main__':
131-
main()
137+
main(None)

config.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,16 @@
1717
PROC_VALIDATOR_SLEEP = 5
1818

1919
# 验证器的配置参数
20-
VALIDATE_THREAD_NUM = 80 # 验证线程数量
20+
VALIDATE_THREAD_NUM = 200 # 验证线程数量
2121
# 验证器的逻辑是:
2222
# 使用代理访问 VALIDATE_URL 网站,超时时间设置为 VALIDATE_TIMEOUT
2323
# 如果没有超时:
2424
# 1、若选择的验证方式为GET: 返回的网页中包含 VALIDATE_KEYWORD 文字,那么就认为本次验证成功
2525
# 2、若选择的验证方式为HEAD: 返回的响应头中,对于的 VALIDATE_HEADER 响应字段内容包含 VALIDATE_KEYWORD 内容,那么就认为本次验证成功
2626
# 上述过程最多进行 VALIDATE_MAX_FAILS 次,只要有一次成功,就认为代理可用
27-
VALIDATE_URL = 'https://www.baidu.com'
28-
VALIDATE_METHOD = 'GET' # 验证方式,可选:GET、HEAD
29-
VALIDATE_HEADER = 'Server' # 仅用于HEAD验证方式,百度响应头Server字段KEYWORD可填:bfe
30-
VALIDATE_KEYWORD = '百度一下,你就知道'
27+
VALIDATE_URL = 'https://qq.com'
28+
VALIDATE_METHOD = 'HEAD' # 验证方式,可选:GET、HEAD
29+
VALIDATE_HEADER = 'location' # 仅用于HEAD验证方式,百度响应头Server字段KEYWORD可填:bfe
30+
VALIDATE_KEYWORD = 'www.qq.com'
3131
VALIDATE_TIMEOUT = 5 # 超时时间,单位s
3232
VALIDATE_MAX_FAILS = 3

db/Proxy.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,18 +99,18 @@ def validate(self, success, latency):
9999
self.validated = True
100100
self.validate_date = datetime.datetime.now()
101101
self.validate_failed_cnt = 0
102-
self.to_validate_date = datetime.datetime.now() + datetime.timedelta(minutes=5) # 5分钟之后继续验证
102+
self.to_validate_date = datetime.datetime.now() + datetime.timedelta(minutes=10) # 10分钟之后继续验证
103103
return False
104104
else:
105105
self.validated = False
106106
self.validate_date = datetime.datetime.now()
107107
self.validate_failed_cnt = self.validate_failed_cnt + 1
108108

109109
# 验证失败的次数越多,距离下次验证的时间越长
110-
delay_minutes = self.validate_failed_cnt * 5
110+
delay_minutes = self.validate_failed_cnt * 10
111111
self.to_validate_date = datetime.datetime.now() + datetime.timedelta(minutes=delay_minutes)
112112

113-
if self.validate_failed_cnt >= 3:
113+
if self.validate_failed_cnt >= 6:
114114
return True
115115
else:
116116
return False

db/conn.py

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,21 @@
99
from .Fetcher import Fetcher
1010
import sqlite3
1111
import datetime
12-
import time
12+
import threading
1313

1414
conn = sqlite3.connect(DATABASE_PATH, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
15+
# 线程锁
16+
conn_lock = threading.Lock()
17+
# 进程锁
18+
proc_lock = None
19+
20+
def set_proc_lock(proc_lock_sub):
21+
"""
22+
设置进程锁
23+
proc_lock_sub : main中的进程锁
24+
"""
25+
global proc_lock
26+
proc_lock = proc_lock_sub
1527

1628
def pushNewFetch(fetcher_name, protocol, ip, port):
1729
"""
@@ -21,13 +33,13 @@ def pushNewFetch(fetcher_name, protocol, ip, port):
2133
ip : 代理IP地址
2234
port : 代理端口
2335
"""
24-
time.sleep(0.1) # 为了解决并发读写饿死的问题
25-
2636
p = Proxy()
2737
p.fetcher_name = fetcher_name
2838
p.protocol = protocol
2939
p.ip = ip
3040
p.port = port
41+
conn_lock.acquire()
42+
proc_lock.acquire()
3143

3244
c = conn.cursor()
3345
c.execute('BEGIN EXCLUSIVE TRANSACTION;')
@@ -43,6 +55,8 @@ def pushNewFetch(fetcher_name, protocol, ip, port):
4355
c.execute('INSERT INTO proxies VALUES (?,?,?,?,?,?,?,?,?)', p.params())
4456
c.close()
4557
conn.commit()
58+
conn_lock.release()
59+
proc_lock.release()
4660

4761
def getToValidate(max_count=1):
4862
"""
@@ -51,6 +65,8 @@ def getToValidate(max_count=1):
5165
max_count : 返回数量限制
5266
返回 : list[Proxy]
5367
"""
68+
conn_lock.acquire()
69+
proc_lock.acquire()
5470
c = conn.cursor()
5571
c.execute('BEGIN EXCLUSIVE TRANSACTION;')
5672
c.execute('SELECT * FROM proxies WHERE to_validate_date<=? AND validated=? ORDER BY to_validate_date LIMIT ?', (
@@ -67,6 +83,8 @@ def getToValidate(max_count=1):
6783
proxies = proxies + [Proxy.decode(row) for row in c]
6884
c.close()
6985
conn.commit()
86+
conn_lock.release()
87+
proc_lock.release()
7088
return proxies
7189

7290
def pushValidateResult(proxy, success, latency):
@@ -76,10 +94,10 @@ def pushValidateResult(proxy, success, latency):
7694
success : True/False,验证是否成功
7795
latency : 本次验证所用的时间(单位毫秒)
7896
"""
79-
time.sleep(0.01) # 为了解决并发读写饿死的问题
80-
8197
p = proxy
8298
should_remove = p.validate(success, latency)
99+
conn_lock.acquire()
100+
proc_lock.acquire()
83101
if should_remove:
84102
conn.execute('DELETE FROM proxies WHERE protocol=? AND ip=? AND port=?', (p.protocol, p.ip, p.port))
85103
else:
@@ -92,19 +110,25 @@ def pushValidateResult(proxy, success, latency):
92110
p.protocol, p.ip, p.port
93111
))
94112
conn.commit()
113+
conn_lock.release()
114+
proc_lock.release()
95115

96116
def getValidatedRandom(max_count):
97117
"""
98118
从通过了验证的代理中,随机选择max_count个代理返回
99119
max_count<=0表示不做数量限制
100120
返回 : list[Proxy]
101121
"""
122+
conn_lock.acquire()
123+
proc_lock.acquire()
102124
if max_count > 0:
103125
r = conn.execute('SELECT * FROM proxies WHERE validated=? ORDER BY RANDOM() LIMIT ?', (True, max_count))
104126
else:
105127
r = conn.execute('SELECT * FROM proxies WHERE validated=? ORDER BY RANDOM()', (True,))
106128
proxies = [Proxy.decode(row) for row in r]
107129
r.close()
130+
conn_lock.release()
131+
proc_lock.release()
108132
return proxies
109133

110134
def pushFetcherResult(name, proxies_cnt):
@@ -113,8 +137,8 @@ def pushFetcherResult(name, proxies_cnt):
113137
name : 爬取器的名称
114138
proxies_cnt : 本次爬取到的代理数量
115139
"""
116-
time.sleep(0.1) # 为了解决并发读写饿死的问题
117-
140+
conn_lock.acquire()
141+
proc_lock.acquire()
118142
c = conn.cursor()
119143
c.execute('BEGIN EXCLUSIVE TRANSACTION;')
120144
c.execute('SELECT * FROM fetchers WHERE name=?', (name,))
@@ -131,13 +155,17 @@ def pushFetcherResult(name, proxies_cnt):
131155
))
132156
c.close()
133157
conn.commit()
158+
conn_lock.release()
159+
proc_lock.release()
134160

135161
def pushFetcherEnable(name, enable):
136162
"""
137163
设置是否起用对应爬取器,被禁用的爬取器将不会被运行
138164
name : 爬取器的名称
139165
enable : True/False, 是否启用
140166
"""
167+
conn_lock.acquire()
168+
proc_lock.acquire()
141169
c = conn.cursor()
142170
c.execute('BEGIN EXCLUSIVE TRANSACTION;')
143171
c.execute('SELECT * FROM fetchers WHERE name=?', (name,))
@@ -152,25 +180,35 @@ def pushFetcherEnable(name, enable):
152180
))
153181
c.close()
154182
conn.commit()
183+
conn_lock.release()
184+
proc_lock.release()
155185

156186
def getAllFetchers():
157187
"""
158188
获取所有的爬取器以及状态
159189
返回 : list[Fetcher]
160190
"""
191+
conn_lock.acquire()
192+
proc_lock.acquire()
161193
r = conn.execute('SELECT * FROM fetchers')
162194
fetchers = [Fetcher.decode(row) for row in r]
163195
r.close()
196+
conn_lock.release()
197+
proc_lock.release()
164198
return fetchers
165199

166200
def getFetcher(name):
167201
"""
168202
获取指定爬取器以及状态
169203
返回 : Fetcher
170204
"""
205+
conn_lock.acquire()
206+
proc_lock.acquire()
171207
r = conn.execute('SELECT * FROM fetchers WHERE name=?', (name,))
172208
row = r.fetchone()
173209
r.close()
210+
conn_lock.release()
211+
proc_lock.release()
174212
if row is None:
175213
return None
176214
else:
@@ -182,16 +220,22 @@ def getProxyCount(fetcher_name):
182220
fetcher_name : 爬取器名称
183221
返回 : int
184222
"""
223+
conn_lock.acquire()
224+
proc_lock.acquire()
185225
r = conn.execute('SELECT count(*) FROM proxies WHERE fetcher_name=?', (fetcher_name,))
186226
cnt = r.fetchone()[0]
187227
r.close()
228+
conn_lock.release()
229+
proc_lock.release()
188230
return cnt
189231

190232
def getProxiesStatus():
191233
"""
192234
获取代理状态,包括`全部代理数量`,`当前可用代理数量`,`等待验证代理数量`
193235
返回 : dict
194236
"""
237+
conn_lock.acquire()
238+
proc_lock.acquire()
195239
r = conn.execute('SELECT count(*) FROM proxies')
196240
sum_proxies_cnt = r.fetchone()[0]
197241
r.close()
@@ -203,7 +247,8 @@ def getProxiesStatus():
203247
r = conn.execute('SELECT count(*) FROM proxies WHERE to_validate_date<=?', (datetime.datetime.now(),))
204248
pending_proxies_cnt = r.fetchone()[0]
205249
r.close()
206-
250+
conn_lock.release()
251+
proc_lock.release()
207252
return dict(
208253
sum_proxies_cnt=sum_proxies_cnt,
209254
validated_proxies_cnt=validated_proxies_cnt,
@@ -214,8 +259,12 @@ def pushClearFetchersStatus():
214259
"""
215260
清空爬取器的统计信息,包括sum_proxies_cnt,last_proxies_cnt,last_fetch_date
216261
"""
262+
conn_lock.acquire()
263+
proc_lock.acquire()
217264
c = conn.cursor()
218265
c.execute('BEGIN EXCLUSIVE TRANSACTION;')
219266
c.execute('UPDATE fetchers SET sum_proxies_cnt=?, last_proxies_cnt=?, last_fetch_date=?', (0, 0, None))
220267
c.close()
221268
conn.commit()
269+
conn_lock.release()
270+
proc_lock.release()

main.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66
import time
77
from proc import run_fetcher, run_validator
88
from api import api
9+
import multiprocessing
10+
11+
# 进程锁
12+
proc_lock = multiprocessing.Lock()
913

1014
class Item:
1115
def __init__(self, target, name):
@@ -23,7 +27,7 @@ def main():
2327
while True:
2428
for p in processes:
2529
if p.process is None:
26-
p.process = Process(target=p.target, name=p.name, daemon=False)
30+
p.process = Process(target=p.target, name=p.name, daemon=False, args=(proc_lock, ))
2731
p.process.start()
2832
print(f'启动{p.name}进程,pid={p.process.pid}')
2933
p.start_time = time.time()
@@ -56,7 +60,7 @@ def citest():
5660
p.process.start()
5761
print(f'running {p.name}, pid={p.process.pid}')
5862
p.start_time = time.time()
59-
63+
6064
time.sleep(10)
6165

6266
for p in processes:

proc/run_fetcher.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@
1111
from db import conn
1212
from fetchers import fetchers
1313
from config import PROC_FETCHER_SLEEP
14+
from func_timeout import func_set_timeout
15+
from func_timeout.exceptions import FunctionTimedOut
1416

1517
logging.basicConfig(stream=sys.stdout, format="%(asctime)s-%(levelname)s:%(name)s:%(message)s", level='INFO')
1618

17-
def main():
19+
def main(proc_lock):
1820
"""
1921
定时运行爬取器
2022
主要逻辑:
@@ -27,6 +29,8 @@ def main():
2729
睡眠一段时间
2830
"""
2931
logger = logging.getLogger('fetcher')
32+
conn.set_proc_lock(proc_lock)
33+
3034
while True:
3135
logger.info('开始运行一轮爬取器')
3236
status = conn.getProxiesStatus()
@@ -35,19 +39,27 @@ def main():
3539
time.sleep(PROC_FETCHER_SLEEP)
3640
continue
3741

42+
@func_set_timeout(30)
43+
def fetch_worker(fetcher):
44+
f = fetcher()
45+
proxies = f.fetch()
46+
return proxies
47+
3848
def run_thread(name, fetcher, que):
3949
"""
4050
name: 爬取器名称
4151
fetcher: 爬取器class
4252
que: 队列,用于返回数据
4353
"""
4454
try:
45-
f = fetcher()
46-
proxies = f.fetch()
55+
proxies = fetch_worker(fetcher)
4756
que.put((name, proxies))
4857
except Exception as e:
4958
logger.error(f'运行爬取器{name}出错:' + str(e))
5059
que.put((name, []))
60+
except FunctionTimedOut:
61+
pass
62+
5163
threads = []
5264
que = Queue()
5365
for item in fetchers:
@@ -61,8 +73,7 @@ def run_thread(name, fetcher, que):
6173
threads.append(threading.Thread(target=run_thread, args=(item.name, item.fetcher, que)))
6274
[t.start() for t in threads]
6375
[t.join() for t in threads]
64-
for _ in range(len(threads)):
65-
assert not que.empty()
76+
while not que.empty():
6677
fetcher_name, proxies = que.get()
6778
for proxy in proxies:
6879
conn.pushNewFetch(fetcher_name, proxy[0], proxy[1], proxy[2])

0 commit comments

Comments
 (0)