Building a real time quant trading engine on Google Cloud Dataflow and Apache Beam

Google Cloud has fully managed services that allow end users to build big data pipelines for their analytical needs. One of them is called Dataflow. It allows developers to build data pipelines based on Apache Beam SDK. It is fully managed in that gCloud takes care of auto scaling of distributed worker resources as well of redistributing work loads amongst available nodes. Together with Apache Beam it provides a unified programming model for both batch and streaming data alike. It also integrates conveniently with other gCloud offerings such as Pub/Sub and BigQuery.

In this post, we are going to build a data pipeline that analyzes real time stock tick data streamed from gCloud Pub/Sub, runs them through a pair correlation trading algorithm, and outputs trading signals onto Pub/Sub for execution.

Disclaimer: this post is intended to experiment with building real time data analytics on Dataflow, not to teach how to code quant trading strategies. Do not use this trading strategy for real trading without proper risk management.

A little background on the trading strategy before going into details of the pipeline. Our trading algorithm is based on the assumption that similar stocks move in tandem in the long term. For example, large cap tech stock prices like google, apple, facebook move in a correlated way. If for any reason they are not correlated any more, it might be caused by an unusual market event. We have an opportunity to open a long/short position of the pair of stocks and close that position when correlation reverts to mean. It’s a simple mean-reversion strategy for trading pairs of stocks.

For example, using a sliding window, we can calculate correlation of stock A and B’s most recent X tick prices every Y minutes . Under normal market condition, A and B should be positively correlated, which means their correlation should be close to 1. If for any reason their correlation becomes negative and falls below a threshold, it is our trading signal to open a position to long stock A and short stock B. We will continue to monitor A and B until they become positively correlated again, then we can close the position. As in any long/short trading strategy the pair can also move in adverse ways. Strict risk management triggers are put in place to prevent unconstrained loss, but that’s tangential to the focus of this post.

Our Dataflow based trading pipeline looks like this


Apache Beam SDK allows us to construct a data pipeline on gCloud Dataflow using a unified programming model. At the heart of it is the concept of PCollection. PCollection is an immutable container of data sets. It is the input and output of every transformer in the pipeline. It can be bounded which means the size of the data set is known, e.g. file IO from a text file, jdbc IO from a database query. It can also be unbounded, which is the case with streaming data, e.g. Kafka IO from messages off of a queue.

Beam SDK also offers several generic Transforms that work with PCollection. It’s based on the Map/Reduce programing model. In this post we will use

  1. ParDo — a generic Map transform for parallel processing that can produce 0, 1 or N outputs. In this exercise we use ParDo to split our incoming individual tick data streams into pairs of streams so their correlation can be calculated on the fly. If there are 10 tick stream, this step pairs them up and output 90 streams of data. We also use it to add timestamp to each tick data so they can be grouped in the same window.

  2. CoGroupByKey — join two or more key/value PCollection by the same key. In our exercise we use CoGroupByKey to group together tick data by stock symbol in assigned pair.

  3. Filter — filter an input PCollection using a lambda or function

  4. Map — 1 to 1 transforms an input PCollection to an output for each data points

  5. WindowInto — for unbounded tick stream data, we use a sliding window to group together 10 minutes worth of tick data every 1 minute. Correlation is calculated per window worth of data

Now let’s take a look at some code.

First we defined our universe of stock symbols and pairwise combination. Let’s initialize our Beam pipeline instance

# Universe of 3 stocks, 6 pairs of correlation
TICKER_LIST = ['goog', 'aapl', 'fb']
PAIRS = list(itertools.combinations(TICKER_LIST, 2))

known_args, pipeline_args = parser.parse_known_args(argv)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(StandardOptions).streaming = True

Then for each stock we have tick market data streamed in via gCloud Pub/Sub on topics in the form of “input_<symbol>”. Each tick data is just a tuple of timestamp and price. We then construct the pipeline by applying transforms one after another.

with beam.Pipeline(options=pipeline_options) as p:
   # Read input
   input_stage = {}
   for ticker in TICKER_LIST:
       input_ticks = (p |

       input_stage[ticker] = (input_ticks
             | 'decode: %s' % ticker >> beam.Map(lambda x: x.decode('utf-8'))
             | 'Filter: %s' % ticker >> beam.Filter(
                   lambda row: row.split(',')[0] != 'date')
             | 'Add Timestamp: %s' % ticker >> beam.ParDo(AddTimestampDoFn())
             | 'Window: %s' % ticker >> beam.WindowInto(
                  window.SlidingWindows(size=10 * 60, period=60))
             | 'Pair: %s' % ticker >> beam.ParDo(CorrelationPairDoFn(ticker)))

   # Group together all entries under the same pairs of symbols
   grouped = input_stage | 'group_by_name' >> beam.CoGroupByKey()

   # Calculate windowed pairwise correlation for last 10 min every minute
   correlations = (grouped
                   | 'Calculate pair correlation' >> beam.Map(calculate_correlation_pair))
   # Filter these pairs that are below trigger threshold
   trading_signals = (correlations | 'Filter correlation threshold' >> beam.Filter(
                                       lambda x: x[1] < CORRELATION_THRESHOLD)

   # Output trading system to Pub/Sub so execution algos are trade
   trading_signals |

There is a lot to unpack here. Let’s go through one pipe at a time.

  1. Read tick data stream from gCloud Pub/Sub for each stock in the universe. We assume tick data is published onto topics in the form of “input_<symbol>, e.g. input_goog has tick data available for Google. Tick data is a tuple of timestamp and price. Due to Python’s dynamic typing, Beam’s Python SDK uses type hints to ensure runtime type safety. In this case we tell runtime that the data read from Pub/Sub is of binary type

input_ticks = (p |

2. We then apply a Map transform and a Filter to decode the bytes into strings and filter out invalid data, if any

input_ticks | 'Decode: %s' % ticker >> beam.Map(lambda x: x.decode('utf-8'))
            | 'Filter: %s' % ticker >> beam.Filter(lambda row: row.split(',')[0] != 'date')

3. Then we apply a ParDo transform to add timestamps to each data point. The timestamp is the same as the timestamp of the tick data. This is necessary to make sliding windowing work in an unbounded PCollection. The AddTimestampDoFn extends beam.DoFn and overrides process function.

input_ticks | 'Add Timestamp: %s' % ticker >> beam.ParDo(AddTimestampDoFn())

class AddTimestampDoFn(beam.DoFn):

   def process(self, element, *args, **kwargs):
       trade_date = element.split(',')[0]
       unix_timestamp = int(datetime.datetime.strptime(trade_date, '%Y/%m/%d').strftime("%s"))
       yield beam.window.TimestampedValue(element, unix_timestamp)

4. Then we apply a sliding window to the data stream. The sliding window is calculated every minute for 10 minutes worth of tick data for each symbol. This windowing affects any grouping by key operations downstream in the pipeline

input_ticks | 'Window: %s' % ticker >> beam.WindowInto(
                 window.SlidingWindows(size=SECONDS_IN_1_DAY * 10, period=SECONDS_IN_1_DAY))

5. Now comes the interest part. We apply another ParDo to pair the individual tick data stream up and output multiple streams of paired data. Our ParDo function has the same template as the add timestamp ParDo in step 3. Note in the output streams, the key is no longer individual stock symbol, but a pair of symbols

input_ticks | 'Pair: %s' % ticker >> beam.ParDo(CorrelationPairDoFn(ticker)))

class CorrelationPairDoFn(beam.DoFn):
   """Parse each line of input text into words."""

   def __init__(self, ticker):
       super(CorrelationPairDoFn, self).__init__()
       self.ticker = ticker

   def process(self, element, *args, **kwargs):
       fields = element.split(',')
       dt, price = tuple(fields)[:2]

       for pair in PAIRS:
           if pair[0] == self.ticker:
               yield pair, (dt, price)
           elif pair[1] == self.ticker:
               yield pair, (dt, price)

6. Now that we have paired up our input streams, we can group the values by the pair key so that all the market data for each symbol is in the value list. In this case our key is one of (‘goog’, ‘aapl’), (‘aapl’, ‘fb’) or (‘goog’, ‘fb’). The values are the iterator of tick data for the pair.

# Group together all entries under the same ticker
grouped = input_stage | 'group_by_name' >> beam.CoGroupByKey()

7. Now we are ready to calculate pairwise correlation in the sliding window. We construct Pandas Series out of the tick data and calculate correlation for each pair. The output is the pair key and correlation coefficient tuple.

correlations = (grouped
               | 'Calculate pair correlation' >> beam.Map(calculate_correlation_pair))

def calculate_correlation_pair(element):
   pair, price_dict = element

   prices_1, prices_2 = price_dict[pair[0]], price_dict[pair[1]]

   if prices_1 and prices_2:
       ind, vals = zip(*prices_1)
       ser1 = pd.Series([float(v) for v in vals],
                        index=[pd.Timestamp(i) for i in ind])
       ind, vals = zip(*prices_2)
       ser2 = pd.Series([float(v) for v in vals],
                        index=[pd.Timestamp(i) for i in ind])

       return pair, ser1.corr(ser2)
       return pair, 0

8. The last step in the pipeline is to add a Filter for the pair that has negative correlation below a threshold and output that pair as a trading signal onto gCloud Pub/Sub. Again we use type hints to ensure Python uses binary type to publish data onto Pub/Sub.

trading_signals = (correlations | 'Filter correlation threshold' >> beam.Filter(
                                   lambda x: x[1] < CORRELATION_THRESHOLD)

trading_signals |

Now if we want to run our awesome quant trading pipeline in Google Cloud Dataflow, we can follow this Quickstart to set up our GCP project, authentication and storage. Then all we need is to run the pipeline on Dataflow using this command

python -m correlated_trading.trading_pipe \
  --project $PROJECT \
  --runner DataflowRunner \
  --staging_location $BUCKET/staging \
  --temp_location $BUCKET/temp \
  --input_mode stream \
  --input_topic tick_data_input \
  --output_topic trading_signal

In conclusion, Google Cloud Dataflow together with Apache Beam offers an expressive API for building data pipelines and transforms. Apache Beam programming model has some learning curve in terms of getting used to the transforms and windowing semantics. It forces users to think through each stage of the data pipeline and the input/output data types, which is a good thing in our opinion. Google Cloud manages a lot of magic under the hood in terms of parallelize processing, distributing units of work and auto-scale. Dataflow integrates seamlessly with other GCP products like BigQuery, Pub/Sub and Machine Learning, which makes the ecosystem very powerful for big data analytics.

As always you can find the full code discussed in this post on Cloudbox Labs github.