Tracking NYC Citi Bike real time utilization using Kafka Streams

We are big fans of Apache Kafka when it comes to building distributed real time stream processing systems. It’s massively scalable, has simple pub-sub semantics, and offers fault-tolerant persistent data store. It’s a great platform to process event stream data, e.g. click events, distributed logs, time-series metrics, etc. We wrote about building real time analytics using Apache Kafka in a previous post.

But in most use cases, data streams are not processed in isolation. Often enough when a Kafka message is received, some stateful data store is queried in order to have enough information to process that message. For example, if we have an order stream that receives customer orders, most likely that order needs to be query a database with customer information and/or product metadata in order to process, so how does stream data work together with stateful data, like a database table?

This is when Kafka Streams comes in. Kafka Streams is a set of application API (currently in Java & Scala) that seamlessly integrates stateless (stream) and stateful (table) processing. The underlying premise of the design is very interesting. In short it is based on the fact that a table can be reconstructed from a stream of change data capture (CDC) or transaction log records. If we have a stream of change logs, a table is just a local store that reflects that latest state of each change record. Armed with that concept, stream-stream or stream-table joins becomes a unified operation of routing data through various internal Kafka topics. By default RocksDB is used for local stateful store, but other key value databases can be used too. I encourage you to read the blog post by Confluent explaining the motivation of designing Kafka Streams. It is very eye opening for me.

In this post, we are going to use Kafka Streams to track real time statistics of Citi Bike utilization in New York City. CitiBikeNYC publishes both a historical data set containing bike trips and a real time GBFS feed of each bike station information and availability. While the historical data set is a treasure trove of insights, some utilization metrics are better gained through the real time data feed. We are going to calculate two important utilization real time metrics.

  1. Stations that have less than 10% bikes available currently, calculated by (number of bikes available) / (station capacity). This metric can serve as real time notification for low bike availability.

  2. Stations that have the highest bike turnover ratio on an hourly basis, calculated in a rolling window by (net change of bikes at station) / (station capacity). This metric can be useful to fine tune Citi Bike station rebalancing strategy.

A visual of the data flow looks like this.

bike_arch.png

Now let’s jump into coding.

First things first, since Citi Bike feed is a Restful API, we need to add a Java app that polls the endpoints and turn the JSON response into streams and publish onto Kafka topics.

The sample of station_information and station_status JSON looks like this

# station_information
{
  "station_id": "312",
  "name": "Allen St & Stanton St",
  "short_name": "5484.09",
  "lat": 40.722055,
  "lon": -73.989111,
  "region_id": 71,
  "rental_methods": [
    "KEY",
    "CREDITCARD"
  ],
  "capacity": 31,
  "rental_url": "http://app.citibikenyc.com/S6Lr/IBV092JufD?station_id=312",
  "eightd_has_key_dispenser": false,
  "has_kiosk": true
}
# station_status
{
  "station_id": "304",
  "num_bikes_available": 12,
  "num_ebikes_available": 0,
  "num_bikes_disabled": 2,
  "num_docks_available": 19,
  "num_docks_disabled": 0,
  "is_installed": 1,
  "is_renting": 1,
  "is_returning": 1,
  "last_reported": 1534424770,
  "eightd_has_available_keys": true,
  "eightd_active_station_services": [
    {
      "id": "a58d9e34-2f28-40eb-b4a6-c8c01375657a"
    }
  ]
}

We use google http java client to transform the http response into POJO (Plain old java object).

HttpRequestFactory requestFactory =
        HTTP_TRANSPORT.createRequestFactory(new HttpRequestInitializer() {
            @Override
            public void initialize(HttpRequest request) {
                request.setParser(new JsonObjectParser(JSON_FACTORY));
            }
        });
CitiBikeURL url = new CitiBikeURL(stationInformationUrl);
HttpRequest request = requestFactory.buildGetRequest(url);
StationStatusFeed stationStatus = request.execute().parseAs(StationStatusFeed.class);
if (stationStatus.data.stations.isEmpty()) {
    System.out.println("No station found.");
} else {
    System.out.println(stationStatus.data.stations.size());
}

Then we serialize the list of station_information and station_statuses objects to string and publish it to their named Kafka topics respectively.

// public station status stream to Kafka
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
props.put("value.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");

Producer<String, String> producer = new KafkaProducer<>(props);

// send events to kafka
ObjectMapper mapper = new ObjectMapper();
stationStatus.data.stations.forEach(st -> {
    try {
        String value = mapper.writeValueAsString(st);
        producer.send(new ProducerRecord<>(topicName, st.stationId, value));
    } catch (Exception e) {
        System.out.println("Failed to serialize");
        throw new RuntimeException(e.getMessage());
    }
});

producer.flush();
producer.close();

A note about serialize/deserialize messages to/from Kafka. For production grade systems, Apache Kafka Streams API uses avro based serialization and stores the avro schema with Schema Registry. I feel that adds a lot of unnecessary cruft for the purpose of this post, so I’m leaving that part out and whenever possible use string based serialization schema. If you are curious about Avro based SerDes(Serialization/Deserialization) check out Avro Serdes class.

Now that we have data streaming in from the station_information and station_status topics, let’s look at how to find stations that has less than 10% of bike availability. The full code is here.

First we construct a KakfaStreams object with a bunch of configurations including Kafka bootstrap servers, local state on the file system, and Kafka consumer options.

final Properties streamsConfiguration = new Properties();
// unique app id on kafka cluster
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "citi-bike-low-availability");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "citi-bike-low-availability-client");
// kafka broker address
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// local state store
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
// consumer from the beginning of the topic or last offset
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// override default serdes
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

StreamsBuilder builder = new StreamsBuilder();

return new KafkaStreams(builder.build(), streamsConfiguration);

Then we can wire up each step of transformation as if we were building a streaming data pipeline.

  1. Build a KStream from the input from Kafka topic station_status. We heavily use Java8 Lambda expression and map the input stream data to POJO and extract the num_bikes_available metric

// Get the stream of station statuses
ObjectMapper mapper = new ObjectMapper();
KStream<String, String> statusStream = builder.stream(
        CitiBikeStationStatusAPI.topicName,
        Consumed.with(Serdes.String(), Serdes.String()))
        .map((station_id, v) -> {
            try {
                CitiBikeStationStatusAPI.StationStatus status = mapper.readValue(
                        v, CitiBikeStationStatusAPI.StationStatus.class);
                return new KeyValue<>(station_id, Integer.toString(status.num_bikes_available));
            } catch (Exception e) {
                throw new RuntimeException("Deserialize error" + e);
            }
        });

2. Build a KTable from the input from Kafka topic station_information.

// Build KTable of station information
KTable<String, String> stationInfo = builder.table(CitiBikeStationInfoAPI.topicName);
KTable<String, CitiBikeStationInfoAPI.StationInfo> stationInfoTable = stationInfo
        .mapValues(v -> {
            try {
                CitiBikeStationInfoAPI.StationInfo info = mapper.readValue(
                        v, CitiBikeStationInfoAPI.StationInfo.class);
                return info;
            } catch (Exception e) {
                throw new RuntimeException("Deserialize error" + e);
            }
        });

You might ask why we use a KStream for station statuses but a KTable for station information. Remember the stream and table duality? Table is the end state of a stream of change events. In this case we want to capture each station status change because it tells us how the number of bikes available changes over time. On the other hand, station information is static data, including capacity, name, geolocation, etc. We only care about the latest values for each piece of data, so a KTable allows us to compact a stream of changes into a final snapshot.

3. Now comes the “what the magic is going on here?” part. Let’s do a LeftJoin on the KStream and KTable. Remember our metric is (num_bikes_available) / (station_capacity). The numerator is from the station_status object but the denominator is from the station_information object, so we need to do a stream-table join here. This is where the power of Kafka Streams lies. Being able to casually join an evolving data stream with a static local store without a mountain of code is simple awesome.

// stations with bike availability ratio (num_bikes_avail / capacity) < threshold
KStream<String, String> outputStream = statusStream
    .leftJoin(stationInfoTable, (num_bikes, info) -> {
        return new BikeStats(Integer.parseInt(num_bikes), info.capacity,
                info.latitude, info.longitude);
    })
    .filter((k, stats) -> stats.availabilityRatio < 0.1)
    .map((k, stats) -> new KeyValue<>(k, "station_id: " + k +
            ", longitude " + stats.longitude +
            ", latitude " + stats.latitude +
            ", bikes: " + stats.numBikesAvailable +
            ", capacity: " + stats.stationCapacity +
            ", ratio: " + String.format("%.2f", stats.availabilityRatio * 100) + "%"));

After joining the streams we filter the calculated availability to be less than 10% and map the data to human readable output string.

4. This step is not strictly necessary but we finally publish the analytics onto an output Kafka topic so we can examine them.

// output to kafka topic
outputStream
    .to(LOW_BIKE_TOPIC, Produced.with(Serdes.String(), Serdes.String()));

5. If we fire up a console Kafka consumer, we can see that output looks like this

station_id: 3699, longitude -73.98918, latitude 40.763603, bikes: 1, capacity: 30, ratio: 3.33%
station_id: 3700, longitude -73.95334, latitude 40.779408, bikes: 3, capacity: 39, ratio: 7.69%
station_id: 3704, longitude -73.941765, latitude 40.74375, bikes: 1, capacity: 19, ratio: 5.26%

Now let’s take a look how to calculate turnover ratio in a session window. We have similar steps of building KStream/KTable of station_information and station_status. The code is here.

  1. The analytics we calculate is (net change of num bikes available) / (station_capacity), so we introduce a windowed aggregation here where we can fold over each grouped status and calculate net delta. The Aggregator interface takes a initializer and an accumulator. It’s similar to fold() or reduce() operations in functional programming.

// aggregate hourly over the net change in number of bikes
final KTable<Windowed<String>, String> netDeltaTable = statusStream
    .groupByKey()
    .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(60)))
    .aggregate(
            // the initializer
            () -> "0,0",

            // the "add" aggregator
            (stationId, record, accum) -> {
                Integer newValue = Integer.parseInt(record);
                Integer netDelta = Integer.parseInt(accum.split(",")[0]);
                Integer lastValue = Integer.parseInt(accum.split(",")[1]);
                Integer delta = Math.abs(newValue - lastValue);
                return (netDelta + delta) + "," + newValue;
            }
    );

2. Now that windowed net change of bikes is calculated, let’s join again with the station_information KTable and outputs our analytics onto an output topic.

// join station information table to get station capacity and calculate turnover ratio
final KStream<String, String> ratio = netDeltaTable
    .toStream()
    .map((windowedKey, v) -> new KeyValue<>(windowedKey.key(), v))
    .join(stationInfoTable, (delta, info) ->  delta + "," + info.capacity)
    .map((k, v) -> {
        Integer delta = Integer.parseInt(v.split(",")[0]);
        Integer capacity = Integer.parseInt(v.split(",")[2]);

        if (capacity == 0) {
           return new KeyValue<>(k, "NA");
        }
        Double turnover = delta / (double)capacity * 100.0;
        return new KeyValue<>(k, String.format("%.2f", turnover));
    });

ratio
    .map((k, v) -> new KeyValue<>(k, "station_id: " + k + ", turnover: " + v + "%"))
    .to(HIGH_TURNOVER_TOPIC, Produced.with(Serdes.String(), Serdes.String()));

3. The output looks like this. Note the key in this case is a Windowed<String> because we applied a rolling window over the the stream.

station_id: [472@1535061600000/1535065200000], turnover: 72.22%
station_id: [473@1535061600000/1535065200000], turnover: 81.40%
station_id: [474@1535061600000/1535065200000], turnover: 5.13%
station_id: [476@1535061600000/1535065200000], turnover: 8.47%
station_id: [477@1535061600000/1535065200000], turnover: 55.32%

In order to run the code, follow the quick start here to spin up Zookeeper, Kafka, create topics and run the Java application.

# build a fat jar
mvn clean package

# run Kafka Streams app
java -cp target/citibike-kafka-streams-5.0.0-standalone.jar com.cloudboxlabs.TurnoverRatio

# call Citibike station_information API and convert to inputstream
java -cp target/citibike-kafka-streams-5.0.0-standalone.jar com.cloudboxlabs.CitiBikeStationInfoAPI

# call Citibike station_status API and convert to inputstream
java -cp target/citibike-kafka-streams-5.0.0-standalone.jar com.cloudboxlabs.CitiBikeStationStatusAPI

# print the output
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic top-N-high-turnover

In conclusion what do we think about using Kafka Streams in practice? The answer is “it depends”. It depends on what does your data look like and how complicated is the business logic.

Kafka Streams stands on the shoulders of a giant that’s Apache Kafka, so implicitly it comes with the scalability and elasticity of a massively distributed streaming data platform. The ease with which we can work with stream and stateful data interchangeably speaks to how well designed the library APIs are. If your data is pretty simple in form, e.g. counting click events, ranking page views, it’s a great option for real time data analytics.

In the flip side, due to the fact the internals of Kafka Streams uses Kafka pub-sub as the storage backbone, you constantly have to think about how to serialize/deserialize your data at each stage of the data pipe. This means if you are using a lot of POJO(Plain old java object) in your application, you have the added task of specifying how to serialize them in order to pass them down the pipeline, Avro schema or not. Although understandable, it adds an extra dimension of complexity that I’m not sure is always worth it in business logic heavy applications. Kafka Streams does not have a python based API at time of writing, so for data scientists that perform heavy analytics, it’s a very steep learning curve.

As always you can find the full code discussed in this post on Cloudbox Labs github.