-
-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathprice.ts
42 lines (32 loc) · 1.08 KB
/
price.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
const {Spot} = require('@binance/connector')
import {Kafka, logLevel} from 'kafkajs'
import {KafkaTopics} from './events'
const BTC_USDT_TICKER = 'btcusdt'
const ETH_USDT_TICKER = 'ethusdt'
const KAFKA_BROKER = process.env.KAFKA_BROKER!
const client = new Spot()
const kafka = new Kafka({brokers: [KAFKA_BROKER], logLevel: logLevel.ERROR})
const producer = kafka.producer()
async function main() {
await producer.connect()
const callbacks = {
message: async (json: string) => {
const {stream, data} = JSON.parse(json)
const currency = stream.split('usdt@ticker')[0]
const price = Number(data.c)
const payload = JSON.stringify({price})
await producer.send({
topic: KafkaTopics.CurrencyPrice,
messages: [{key: currency, value: payload}],
})
},
}
const wsRef = client.combinedStreams([`${BTC_USDT_TICKER}@ticker`, `${ETH_USDT_TICKER}@ticker`], callbacks)
process.on('SIGTERM', async () => {
client.unsubscribe(wsRef)
await producer.disconnect()
process.exit(0)
})
console.log('Started successfully')
}
main()