Skip to content

Commit 0ad6c82

Browse files
committed
WIP - use streams
1 parent 44cc74c commit 0ad6c82

File tree

2 files changed

+109
-3
lines changed

2 files changed

+109
-3
lines changed

lib/request.js

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,19 @@ async function getRequest ({url, referer, options = {}, afterResponse = defaultR
3939

4040
logger.debug(`[request] sending request for url ${url}, referer ${referer}`);
4141

42-
const response = await got(requestOptions);
42+
const responseStream = got.stream(requestOptions);
43+
44+
responseStream.on('data', (chunk) => {
45+
console.log(`Received ${chunk.length} bytes of data.`);
46+
//console.log(chunk.toString());
47+
});
48+
49+
responseStream.on('response', (response) => {
50+
console.log(`Received response ${JSON.stringify(response.headers)} `);
51+
});
52+
53+
return responseStream;
54+
4355
logger.debug(`[request] received response for ${response.url}, statusCode ${response.statusCode}`);
4456
const responseHandlerResult = transformResult(await afterResponse({response}));
4557

@@ -55,5 +67,6 @@ async function getRequest ({url, referer, options = {}, afterResponse = defaultR
5567
}
5668

5769
export default {
58-
get: getRequest
70+
get: getRequest,
71+
getMimeType: (headers) => getMimeType(headers['content-type'])
5972
};

lib/scraper.js

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
1+
import stream, { Transform, Writable } from 'stream';
2+
import fs from 'fs';
3+
import {promisify} from 'util';
14
import PromiseQueue from 'p-queue';
25
import logger from './logger.js';
36
import defaults from './config/defaults.js';
47
import recursiveSources from './config/recursive-sources.js';
58
import Resource from './resource.js';
69
import request from './request.js';
710
import ResourceHandler from './resource-handler/index.js';
11+
812
import {
913
SaveResourceToFileSystemPlugin,
1014
GenerateFilenameBySiteStructurePlugin,
@@ -36,6 +40,44 @@ const filenameGeneratorPlugins = {
3640
bySiteStructure: GenerateFilenameBySiteStructurePlugin
3741
};
3842

43+
class AfterResponseTransformStream extends Transform {
44+
constructor (options) {
45+
super(options);
46+
}
47+
48+
_transform (chunk, encoding, callback) {
49+
//console.log(chunk.toString().toUpperCase());
50+
callback(null, chunk.toString().toUpperCase());
51+
}
52+
}
53+
54+
class HandleChildrenResourcesStream extends Transform {
55+
constructor(resource) {
56+
super();
57+
this.resouce = resource;
58+
}
59+
60+
_transform(chunk, encoding, callback) {
61+
62+
}
63+
}
64+
65+
class SaveResourceWriteStream extends Writable {
66+
constructor (resource, dir) {
67+
super();
68+
this.stream = null;
69+
this.resource = resource;
70+
this.directory = dir;
71+
}
72+
73+
_write (chunk, encoding, callback) {
74+
if (!this.stream) {
75+
this.stream = fs.createWriteStream(this.directory + '/' + this.resource.getFilename());
76+
}
77+
this.stream._write(chunk, encoding, callback);
78+
}
79+
}
80+
3981
class Scraper {
4082
constructor (options) {
4183
this.normalizeOptions(options);
@@ -49,6 +91,7 @@ class Scraper {
4991
});
5092
this.resources = this.options.urls.map(({url, filename}) => new Resource(url, filename));
5193

94+
this.requestedResourcesStream = new NormalizedUrlMap(); // Map url -> stream
5295
this.requestedResourcePromises = new NormalizedUrlMap(); // Map url -> request promise
5396
this.loadedResources = new NormalizedUrlMap(); // Map url -> resource
5497
this.requestQueue = new PromiseQueue({concurrency: this.options.requestConcurrency});
@@ -137,10 +180,60 @@ class Scraper {
137180
}
138181
}
139182

140-
createNewRequest (resource) {
183+
async getRequestStream (resource) {
184+
const url = resource.getUrl();
185+
const referer = resource.parent ? resource.parent.getUrl() : null;
186+
187+
const {requestOptions} = await this.runActions('beforeRequest', {resource, requestOptions: this.options.request});
188+
return request.get({
189+
url,
190+
referer,
191+
options: requestOptions,
192+
//afterResponse: this.actions.afterResponse.length ? this.runActions.bind(this, 'afterResponse') : undefined
193+
});
194+
}
195+
196+
getSaveStream (resource) {
197+
198+
}
199+
200+
async createNewRequest (resource) {
141201
const self = this;
142202
const url = resource.getUrl();
143203

204+
// read stream stream
205+
const requestStream = await this.getRequestStream(resource);
206+
requestStream.on('response', async (response) => {
207+
const mimeType = request.getMimeType(response.headers);
208+
resource.setType(getTypeByMime(mimeType));
209+
210+
const { filename } = await self.runActions('generateFilename', { resource });
211+
resource.setFilename(filename);
212+
213+
// if type was not determined by mime we can try to get it from filename after it was generated
214+
if (!resource.getType()) {
215+
resource.setType(getTypeByFilename(filename));
216+
}
217+
});
218+
219+
// transformers
220+
const afterResponseTransformStream = new AfterResponseTransformStream();
221+
222+
// write stream
223+
fs.mkdirSync(this.options.directory);
224+
const saveResourceStream = new SaveResourceWriteStream(resource, this.options.directory);
225+
226+
const pipeline = promisify(stream.pipeline);
227+
228+
await pipeline(
229+
requestStream,
230+
afterResponseTransformStream,
231+
// handleChildResourcesStream,
232+
saveResourceStream
233+
);
234+
235+
//return this.requestQueue.add(requestStream);
236+
144237
const requestPromise = Promise.resolve()
145238
.then(async () => {
146239
const referer = resource.parent ? resource.parent.getUrl() : null;

0 commit comments

Comments
 (0)