Streaming Websocket Trade Data From Binance to Timescaledb in Python

Streaming Websocket Trade Data From Binance to Timescaledb in Python

It's useful to have your own database full of price data rather than using a REST API to fetch historical data. It's significantly faster and you don't have to worry about rate limits or getting banned for making too many requests. Once you've got the data in your own DB it's yours to keep.

In this tutorial I'm going to be explaining how you can ingest the stream of trades for any crypto pair directly from Binance into Timescale, a database solution purpose-built for time-series data. After we've got the trades in our DB, timescale makes it easy for us to automatically aggregate this into OHLC data that we can use to power our algos.

We're gathering trade data because it's the most granular form of price data and we can construct any form of candlestick data that we want directly from it, for any timeframe.

Getting TimescaleDB set up with Docker

I'd highly recommend using the Docker container method to get quickly set up with Timescale. You'll want to go ahead and install Docker if you don't have it already, it'll make your life much easier for the remainder of this tutorial.

Once you're up and running with Docker, you'll want to pull the following image

1docker pull timescale/timescaledb:latest-pg14

After that's done we can easily boot up a TimescaleDB instance with:

1docker run -d --name tutorial -p 5432:5432 -e POSTGRES_PASSWORD=password timescale/timescaledb:latest-pg14

Please make sure the password is of a reasonable strength. Now you should be able to run docker ps and get something like the following:

1CONTAINER ID   IMAGE                               COMMAND                  CREATED              STATUS              PORTS                                       NAMES
2bc9e03bea6ee   timescale/timescaledb:latest-pg14   "docker-entrypoint.s…"   About a minute ago   Up About a minute   0.0.0.0:5432->5432/tcp, :::5432->5432/tcp   tutorial

That's it! You're all set up with a TimescaleDB instance that we can push our data to. If you already know the syntax for Postgres/Timescale you can have a play around inside the DB with

1docker exec -it tutorial psql -U postgres

Note on restarting your system

When you restart your system and do docker ps, you'll find that the timescale instance is no longer running. Don't worry, your data's still there, you can do docker ps -a to show containers that aren't running and then docker start tutorial to start up the tutorial container again.

Getting trade data from Binance

We'll be using the wonderful python-binance module to handle the interface between Python and Binance. You'll want to go grab a Binance API key so that you can stream the data. You can link your Github account on the testnet site to get a free API key that you can use for our purposes. Note that testnet data can differ wildly from true market data. Use API keys generated from your Binance account if you want the most accurate data.

We'll be using python-decouple to manage our API keys and make sure we don't accidentally push them to our repo. To get started create a file called .env in the same folder as you're going to put your python script. Make sure it's added to your .gitignore file if you're going to push this code to Github. The format should look like:

1api_key=aixorGU9FSQ-fake-api-key-ymc5nbfyF9WKki0J
2secret_key=lM6PUMGzj-fake-secret-key-VjMx56BqqqDpoWJFE4lZI

Now if we open up a python file, we should be able to access those variables in our script without hard-coding them in the script itself.

1from decouple import config
2
3api_key = config("api_key")
4api_secret = config("secret_key")
5
6print("api_key",api_key)
7print("api_secret",api_secret)

Make sure you've got python-decouple installed. Now we can lightly doctor an example from the documentation to get the following:

 1from decouple import config
 2from binance import ThreadedWebsocketManager
 3
 4api_key = config("api_key")
 5api_secret = config("secret_key")
 6
 7def main():
 8
 9    symbol = 'ETHBTC'
10
11    twm = ThreadedWebsocketManager(api_key=api_key, api_secret=api_secret)
12    twm.start()
13
14    def handle_socket_message(msg):
15        print(msg)
16
17    twm.start_trade_socket(callback=handle_socket_message, symbol=symbol)
18    twm.join()
19
20if __name__ == "__main__":
21   main()

If you use a different TLD than binance.com, like binance.us. You'll want to add the parameter tld="us" into our definition of twm.

1twm = ThreadedWebsocketManager(api_key=api_key, api_secret=api_secret, tld="us")

Running this should start spitting out some python dictionaries that represent individual trades on Binance.

1{'e': 'trade', 'E': 1643663740316, 's': 'ETHBTC', 't': 322784211, 'p': '0.06974600', 'q': '0.01160000', 'b': 2640005295, 'a': 2640005293, 'T': 1643663740316, 'm': False, 'M': True}
2{'e': 'trade', 'E': 1643663740638, 's': 'ETHBTC', 't': 322784212, 'p': '0.06974600', 'q': '0.38940000', 'b': 2640005302, 'a': 2640005293, 'T': 1643663740637, 'm': False, 'M': True}
3{'e': 'trade', 'E': 1643663745694, 's': 'ETHBTC', 't': 322784213, 'p': '0.06975000', 'q': '0.80000000', 'b': 2640005395, 'a': 2640005356, 'T': 1643663745693, 'm': False, 'M': True}
4{'e': 'trade', 'E': 1643663746515, 's': 'ETHBTC', 't': 322784214, 'p': '0.06975000', 'q': '0.20000000', 'b': 2640005403, 'a': 2640005356, 'T': 1643663746515, 'm': False, 'M': True}
5{'e': 'trade', 'E': 1643663747776, 's': 'ETHBTC', 't': 322784215, 'p': '0.06975300', 'q': '0.26210000', 'b': 2640005418, 'a': 2640005422, 'T': 1643663747776, 'm': True, 'M': True}

Obviously you can feel free to change the particular pair that we're following as you wish. The format of the returned objects can be found on the Binance API documentation:

 1{
 2  "e": "trade",     // Event type
 3  "E": 123456789,   // Event time
 4  "s": "BNBBTC",    // Symbol
 5  "t": 12345,       // Trade ID
 6  "p": "0.001",     // Price
 7  "q": "100",       // Quantity
 8  "b": 88,          // Buyer order ID
 9  "a": 50,          // Seller order ID
10  "T": 123456785,   // Trade time
11  "m": true,        // Is the buyer the market maker?
12  "M": true         // Ignore
13}

There we go! Live trade data coming straight from the exchange, ready for you to run your algos against it.

Streaming Trade Data for Multiple Tickers

You might want to stream data for multiple tickers. Whilst this can be done by just booting up a bunch of the same python script we just made, it's pretty inefficient. Instead let's use a multiplex socket. It just requires a slight change in our script.

1....
2def main():
3    ...
4    #twm.start_trade_socket(callback=handle_socket_message, symbol=symbol)
5    streams = ['ethbtc@trade', 'btcusdt@trade']
6    twm.start_multiplex_socket(callback=handle_socket_message, streams=streams)
7    twm.join()
8    ....

This way we can listen to as many streams as we'd like, for as many symbols as we like. This is the method I'll be using going forward. The messages received do look slightly different

1{'stream': 'btcusdt@trade', 'data': {'e': 'trade', 'E': 1643669057012, 's': 'BTCUSDT', 't': 1242399841, 'p': '38545.66000000', 'q': '0.00259000', 'b': 9217604782, 'a': 9217604705, 'T': 1643669057012, 'm': False, 'M': True}}

This can be rectified by indexing into msg when printing it out. I.e. print(msg["data"]).

Saving trade data to TimescaleDB

Now we're ready to start saving our data down to TimescaleDB. We'll be using psycopg2 to interact with Timescale (pip install psycopg2-binary). psycopg2 is built for Postgres, but since Timescale is just an addon for Postgres, we'll find it works perfectly.

Creating the Hypertable

A hypertable is specialized for time-series data. To the end-user it acts the same as a regular Posgres table. But behind the scenes it chops up your data into smaller chunks at regular time intervals. This largely frees you from memory concerns on how large a table has to be. It also ensures speedy access to old data without going through the full table.

To create one, we first have to connect to the database. If you've followed along with Docker, you'll be able to run the following in your command line

1docker exec -it tutorial psql -U postgres

Your prompt should have changed to postgres=#. To create a hypertable, we start by creating a regular old Postgres table. I'm just going to be saving down the time, symbol, price and quantity of the trade. So this table definition will do just fine.

1CREATE TABLE IF NOT EXISTS raw_trade_data (
2   time TIMESTAMP WITHOUT TIME ZONE NOT NULL,
3   symbol text NOT NULL,
4   price double PRECISION NOT NULL,
5   quantity double PRECISION NOT NULL
6);

If all has gone well, you'll get a CREATE TABLE message back and typing \dt will give you a table, with our new table as one of the rows. Postgres can be funny sometimes so you might have to try a couple of times. Finally we create the hypertable:

1SELECT create_hypertable('raw_trade_data','time');

By default it will have a chunk time of 7 days. Meaning all trades in the last 7 days will be in the most recent chunk, the optimum value for chunk size depends on your system RAM as well as the volume of data that you're ingesting.

Inserting the trades

Now all we have to do is use psycopg2 to push our trades into our table. We'll start off by adding our database password into our .env file so python can access it.

1api_key=aixorGU9FSQ-fake-api-key-ymc5nbfyF9WKki0J
2secret_key=lM6PUMGzj-fake-secret-key-VjMx56BqqqDpoWJFE4lZI
3db_pass=fake-db-password

Then we can go ahead and create a cursor in python

1...
2def main():
3    connection = psycopg2.connect(user="postgres",
4                              password=config("db_pass"),
5                              host="127.0.0.1",
6                              port="5432",
7                              database="postgres")
8    cursor = connection.cursor()
9...

If you've left all the other settings as default then the above configuration should work for you. Remember to import psycopg2. The final thing we need to do is modify our handle_socket_message function to insert our data in the appropriate format. If you used a different table layout you'll have to fiddle with the exact query wording.

 1...
 2    def handle_socket_message(msg, cursor = cursor):
 3        msg = msg["data"]
 4        print(msg)
 5
 6        query = "INSERT INTO raw_trade_data (TIME, SYMBOL, PRICE, QUANTITY)"+ \
 7                " VALUES (%s,%s,%s,%s)"
 8        timestamp = datetime.datetime.fromtimestamp(int(msg["T"] / 1000))
 9        record_to_insert = (timestamp, msg["s"], msg["p"], msg["q"])
10        cursor.execute(query, record_to_insert)
11        connection.commit()

make sure to import datetime since we're using it here to convert the epoch integer from the raw trade data to a timestamp.

If you run the script now you should get all of the data pouring into your DB. If you've still got your postgres terminal window open you could try

1SELECT * from raw_trade_table;

To see the contents of the table and that they're being properly written. To make the system more robust you might want to add in some exception handling when writing down data to the DB, just in case something breaks.

Video Tutorial

If you'd prefer a video tutorial, you can check out this free course on our youtube channel