Continuously Aggregating Trade Data in Timescaledb

Continuously Aggregating Trade Data in Timescaledb

In a previous blog post we learned how to stream live trade data straight from Binance for any symbol pair that we want. Now let's learn how to aggregate those trades into our familiar OHLC candlesticks that we can use to calculate indicators.

The reason we'd want to aggregate the trades is primarily for speed. If our bot is only going to use 1-hour candlesticks it's quite wasteful to be re-computing the candlesticks every time our trading logic runs. But if we can get our database to aggregate it behind the scenes as the data comes in, it saves us all that work.

To make that happen we're going to be making use of Timescale's continuous aggregation functionality. In essence, it lets us define a query to aggregate our trade data, and keeps running that query at regular intervals to ensure that it's up to date with newer data. When you query from the table, it makes sure that all data in the original table has been aggregated.

Creating our continuous aggregate

I'll be assuming that you've already got Timescale set up with trade data streaming to your liking. If not, why not try out our previous tutorial on streaming data from Binance to get you started?

Here's a quick peek at our trade table. If your table schema is different you'll have to make some adjustments in the queries we apply to reflect that.

1time                 | symbol |   price   | quantity 
3 2022-02-01 16:22:01 | SOLBTC | 0.0028192 |     1.45
4 2022-02-01 16:22:01 | SOLBTC | 0.0028192 |     0.62
5 2022-02-01 16:22:01 | ETHBTC |  0.071538 |   0.2568

Let's head on over to our psql prompt where we'll be staying for the remainder of the tutorial. I'm using docker as I showed in our previous tutorial, so I just have to call

1docker exec -it tutorial psql -U postgres

in a terminal to enter our container. Tutorial here is the name of the container with Timescale installed.

Once you're up and running the syntax to create our continuous aggregate looks like:

 2WITH (timescaledb.continuous) AS
 3SELECT symbol,
 4   time_bucket(INTERVAL '1 minute', time) AS date,
 5   FIRST(price, time) as open,
 6   MAX(price) as high,
 7   MIN(price) as low,
 8   LAST(price, time) as close,
 9   SUM(quantity) as volume
10FROM raw_trade_data
11GROUP BY symbol, date;

If you have any extra columns in your raw trade info that you'd like to aggregate you can easily just add them on here as an extra line. Most of the common aggregation functions like SUM, MAX, MIN, etc. work great here.

One thing to watch for is your GROUP BY statement at the end. We want to be sure to group by symbol first then date, as obviously we don't want ETH price data in our BTC candles!

Assuming everything has gone ok, it should have ran the aggregation once on all the data currently in the table. So if you run the following:

1SELECT * FROM ohlc_data_minute WHERE symbol='ETHBTC' ORDER BY date desc LIMIT 10;

It should return some nice bucketed ETHBTC 1-minute candlesticks. Change the symbol as you wish to whatever kind of data you're storing.

Refreshing Continuous Aggregates

When we create the table the aggregation is only called one time. If you do not refresh the view again, it will have to go and re-compute the data every time you request it, leaving us no better off than when we started. You can refresh a view manually, but it's far better to automatically run it on a schedule. To do that we can create the following job:

1SELECT add_continuous_aggregate_policy('ohlc_data_minute',
2     start_offset => INTERVAL '1 day',
3     end_offset => INTERVAL '1 minute',
4     schedule_interval => INTERVAL '1 minute');

The parameter names here can be a little confusing. Timescale published a video explaining how they work, as the documentation is a little sparse on the matter.

Briefly, the offset parameters determine how much data the aggregate is going to look at. In this case it's going to look at data from 1 hour ago to 1 minute ago. Data less than a minute old or more than an hour old is ignored. Double check that the distance between start_offset and end_offset is at least one bucket-length, otherwise it won't work! So for me that's at least one minute.

The schedule_interval is how often the job is run, setting this to one bucket-width is normally adequate.

I'd also make sure that you're not regularly expecting any new data older than start_offset. Since we're streaming live trade data here it's extremely unlikely we're going to get data more than a day old, so there's no need for the continuous aggregate to look at data more than a day old.

If your recall our function to check the candles here:

1SELECT * FROM ohlc_data_minute WHERE symbol='ETHBTC' ORDER BY date desc LIMIT 10;

you should see that the candles keep updating as new data comes in. So if you keep running that query a few times per second, you'll see the latest candle update as new trade data comes in.

1symbol  |        date         |   open   |   high   |   low    |  close   |    volume
3 ETHBTC | 2022-02-01 16:41:00 | 0.071623 | 0.071636 | 0.071619 | 0.071619 |  3.007
4 ETHBTC | 2022-02-01 16:40:00 | 0.071599 |  0.07164 | 0.071599 | 0.071617 |  40.602
5 ETHBTC | 2022-02-01 16:39:00 | 0.071595 | 0.071629 | 0.071593 | 0.071599 |  79.51
6 ETHBTC | 2022-02-01 16:38:00 | 0.071577 | 0.071628 | 0.071566 | 0.071594 |  28.516

That's it! If you need lightning-fast access to multiple time-frames, you can create another continuous aggregate for the 1-day or 1-hour candles as well.

From the perspective of your bot, they just look like regular Posgres tables. If you don't need to store data indefinitely, you might want to add a retention policy to get rid of old data and save disk space. For example,

1SELECT add_retention_policy('raw_trade_data', INTERVAL '10 days');

will tell Timescale to delete all trade data older than 10 days. You can use retention policies on continuous aggregate tables as well.

Video Tutorial

If you'd prefer a video tutorial, you can check out this free course on our youtube channel: