-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathindex.js
67 lines (58 loc) · 1.77 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
"use strict";
const config = require('./config'),
{ logger } = require('./lib'),
TwitterStream = require('./lib/twitter'),
StorageProvider = require('./lib/storage'),
storage = new StorageProvider(config.get('storage'));
function startStream() {
const maxBuffer = 100;
const max = maxBuffer * 1000; // 1,000 batches @ 100/batch
let count = 0;
let pubsubBuffer = [];
let promiseBuffer = [];
logger.info('begin listening to stream');
const stream = new TwitterStream(config.get("twitter"));
const params = {
with: 'user',
// some keywords to track in the twitter stream
track: [
'nfl',
'super bowl',
'superbowl',
'football',
'minnesota',
'olympics',
'winter olympics',
]
};
//create stream
stream.stream(params);
stream.on('error', function(err) {
logger.error(err);
});
// listen stream data
stream.on('data', function(json) {
if (count++ < max) {
if (pubsubBuffer.length < maxBuffer - 1) {
pubsubBuffer.push(json);
}
else {
let response = pubsubBuffer.concat([]);
pubsubBuffer = [];
/* // uncomment if you would like to save this data locally
fs.writeFileSync(path.resolve(process.cwd(), 'test/artifacts/twitter/',
`${Date.now()}.bq.json`), response.map((event) => JSON.stringify(event)).join('\n'));
*/
logger.info('batch buffer full - flush and write batch to storage');
promiseBuffer.push(storage.save(`twitter_stream/${Date.now()}.bq.json`, response));
}
}
if (count == max) {
logger.info('over stream events max - wait for write batches to finish then exit');
stream.destroy();
Promise.all(promiseBuffer)
.then((lst) => process.exit());
}
});
}
startStream();