Rails: Following and Notifications

For the curious, here is a quick introduction to the feature.

Ok, now you have the bird's eye view. Let's get into the database schema and the code. Along the way, I'll also take you down a winding path that includes academic papers and some future ideas.

The Connection

The heart of following is a connection. A connection connects a consumer to a producer. The producer is the thing that creates the events that the consumer wants to be notified about.

create_table "connections", force: :cascade do |t|
  t.integer "consumer_id", null: false
  t.string "consumer_type", limit: 50, null: false
  t.integer "producer_id", null: false
  t.string "producer_type", limit: 50, null: false
  t.datetime "created_at", null: false
  t.datetime "updated_at", null: false
  t.index ["consumer_type", "consumer_id", "producer_type", "producer_id"], name: "consumer_to_producer", unique: true
  t.index ["producer_type", "producer_id"], name: "index_connections_on_producer_type_and_producer_id"
end

In the case of Speaker Deck, the consumer is the person who is following a speaker. The producer is the speaker they are following.

I know that I won't have a type that is greater than 50, so I added limits to both consumer_type and producer_type.

For starting indexes, I created a unique index on ["consumer_type", "consumer_id", "producer_type", "producer_id"]. I don't want someone to be able to follow another person more than once. This unique index prevents that. It also covers any queries to get all the producers a given consumer is following.

But the unique index doesn't cover the opposite – showing all the followers of a given producer. For this, I added another index on ["producer_type", "producer_id"].

Typically, you want the most selective column first in an index. In our case, there will be dramatically more unique values for producer_id than producer_type, thus producer_id would be more selective.

The problem with an index of ["producer_id", "producer_type"] is that I'd have to do a table scan to get all the rows matching a particular producer_type or the distinct producer_type values or how often those values are used. I know from history that these are useful, so I tend to index type before id.

Enough about indexes, here is a taste of the model:

class Connection < ApplicationRecord
  VALID_CONSUMER_TYPES = Set["User"].freeze
  VALID_PRODUCER_TYPES = Set["User"].freeze

  belongs_to :consumer, polymorphic: true
  belongs_to :producer, polymorphic: true

  scope :for_producer, ->(producer) { where(producer: producer) }
  scope :for_consumer, ->(consumer) { where(consumer: consumer) }

  validates :consumer_id, presence: true, numericality: true
  validates :consumer_type, presence: true, length: {maximum: 50},
    inclusion: {in: VALID_CONSUMER_TYPES }

  validates :producer_id, presence: true, numericality: true
  validates :producer_type, presence: true, length: {maximum: 50},
    inclusion: {in: VALID_PRODUCER_TYPES }
end

So that is how people connect to (aka follow) each other. But how do we know when to notify those consumers about producer activity?

The Event

First, let's talk about what I could have done. I could have added a callback in the Talk model. After each save the callback would determine if it was time to notify anyone. If so, the callback would loop through all the consumers of the producer that published the talk and email each of them.

Nothing wrong with the aforementioned, but our Talk model would have grown quite a bit larger. Instead, I went the route of a new database-backed model ConnectionEvent.

Let's check out the schema:

create_table "connection_events", force: :cascade do |t|
  t.integer "producer_id", null: false
  t.string "producer_type", limit: 50, null: false
  t.integer "object_id", null: false
  t.string "object_type", limit: 50, null: false
  t.integer "type", null: false
  t.jsonb "data", default: {}, null: false
  t.datetime "created_at", null: false
  t.datetime "updated_at", null: false
  t.integer "state", limit: 2, default: 0, null: false
  t.index ["object_type", "object_id"], name: "index_connection_events_on_object_type_and_object_id"
  t.index ["producer_type", "producer_id"], name: "index_connection_events_on_producer_type_and_producer_id"
end

You'll recognize producer again. Additionally, this table introduces an object. The object is the thing that the producer produced. In the case of Speaker Deck, the producer is the talk owner. The talk they publish is the object.

The type is the what type of event occurred for the object. For now, we only have :publish, but we have several ideas for other ConnectionEvent types in the future (e.g. :follow, :star, etc.).

state keeps track of processing for us. Each event starts as pending. Every N minutes we run a task to grab all pending events and process them. When this occurs, the event moves from pending to processing and, finally, to processed.

There is nothing special to see in the ConnectionEvent model. It looks just like Connection, so I'll skip over it since I can already sense this is going to be a doozy of a post.

So I lied to you a bit. I do actually have a callback on Talk, but instead of doing all the work it just creates the ConnectionEvent. So how do all those connection events someday end up as emails in your inbox?

The ConnectionEventProcessor

Every N minutes we run a task. It looks like this:

namespace :connection_events do
  task process: :environment do
    logger = Rails.env.development? ? Logger.new(STDOUT) : Rails.logger
    ConnectionEventProcessor.call(logger: logger)
  end
end

The primary goal of the processor is to enqueue a job for each event that can handle the notification. This keeps the processor snappy and moves any possible backups to the job system.

# Gets all pending connection events and enqueues notification jobs for them.
class ConnectionEventProcessor
  def self.call(logger: Rails.logger)
    new(logger: logger).call
  end

  def initialize(logger: Rails.logger)
    @logger = logger
  end

  def call
    processed = 0
    ConnectionEvent.ready_for_processing.order(:id).find_each do |connection_event|
      connection_event.processing!
      @logger.info "ConnectionEventProcessor processing #{connection_event.inspect}"

      begin
        Resque.enqueue ConnectionEventNotificationJob, connection_event.id
      rescue => error
        @logger.info "ConnectionEventProcessor pending #{connection_event.inspect} due to #{error.inspect}"
        connection_event.pending!
      end
      @logger.info "ConnectionEventProcessor enqueued ConnectionEventNotificationJob for #{connection_event.inspect}"

      connection_event.processed!
      @logger.info "ConnectionEventProcessor processed #{connection_event.inspect}"

      processed += 1
    end

    @logger.info "ConnectionEventProcessor processed #{processed} event(s)"
  end
end

Pardon all the logging, but I like verbosity for when things go wrong. I was going to remove it for the post, but I think it is good to show real production stuff. Real production code has problems and needs to be debugged.

The majority of the above code is a bunch of logging to show what is happening and a bunch of rescuing so one notification event doesn't cause problems to any other notification event.

The ConnectionEventNotificationJob

Once again, I'm going to give you the real guts of this code and all the error handling that comes with it.

# Sends all the notifications for a given connection event.
class ConnectionEventNotificationJob < ApplicationJob
  @queue = :notifications

  def self.perform(connection_event_id)
    new(connection_event_id).perform
  end

  def initialize(connection_event_id)
    @connection_event_id = connection_event_id
  end

  def perform
    # Can we find the connection event.
    unless connection_event
      Rails.logger.info "#{self.class.name} skipping because connection event not found (#{@connection_event_id}"
      return
    end

    # We only want this to deal with publish events for now.
    unless connection_event.publish?
      Rails.logger.info "#{self.class.name} skipping because invalid type #{connection_event.inspect}"
      return
    end

    # Can we find the talk.
    talk = connection_event.object
    unless talk
      Rails.logger.info "#{self.class.name} skipping because object missing #{connection_event.inspect}"
      return
    end

    # Can we find the talk owner.
    unless talk.owner
      Rails.logger.info "#{self.class.name} skipping because talk owner missing #{connection_event.inspect}"
      return
    end

    # Notify the owner that their talk is published and processed.
    notify talk.owner

    # Can we find the producer.
    producer = connection_event.producer
    unless producer
      Rails.logger.info "#{self.class.name} skipping because producer missing #{connection_event.inspect}"
      return
    end

    # Notify each of the producer's followers.
    producer.followers.not_abusive.find_each { |follower| notify follower }
  end

  private

  def connection_event
    @connection_event ||= ConnectionEvent.find_by_id(@connection_event_id)
  end

  def notify(user)
    return unless Flipper.enabled?(:connections, user)

    Rails.logger.info "#{self.class.name} delivering #{connection_event.inspect} to #{user.username}"
    begin
      ConnectionEventMailer.published(connection_event.object, user).deliver
    rescue => exception
      Raven.capture_exception(exception, {
        extra: {
          connection_event_id: @connection_event_id,
          user_id: user.id,
        }
      })
    end
  end
end

Again, the key point here is lots of error handling and rescuing, so failing to notify a single user doesn't affect the notifying of other users.

You'll also notice that all of this is wrapped with a Flipper feature. Every top level feature I work on gets wrapped by a Flipper feature. This makes it super easy to turn things off if necessary.

I occasionally have short term features when switching parts of a system up, but most top level features are wrapped in a way that they could be disabled without interrupting users.

At this point, we have a fully functional following and notification system. Not very fancy eh? But it does the trick and works great in production. That is what I care about. I like boring. Sorry not sorry.

Hopefully this is straightforward so far. But why did I add this extra polymorphic cruft and unusual verbiage? Wouldn't a Follow model with follower_id and followee_id be more simple? I'm glad you asked.

The Papers

Many moons ago I read a couple papers that changed the way I think about following, timelines and notifications.

The first was Feeding Frenzy: Selectively Materializing Users’ Event Feeds. The second was the sequel to that paper – named Feed Following: The Big Data Challenge in Social Applications.

Both are easy and enjoyable reads (and I don't consider myself an academic). I'll do my best to tldr the papers for you.

Every system does reads and writes. Typical systems do far more reads than writes. Social applications can be a mixed bag. But I'm getting ahead of myself. Let's talk through two ways to materialize timelines based on consumers, producers, connections and connection events.

Read-time Materialization

To optimize for fast writes, you normalize your database tables and materialize the timeline when requested (aka user says show me what's new). This is the database structure that Speaker Deck currently uses (as noted above).

When you follow someone, we do one write to store the Connection. When you publish a talk, we do one write to store the  ConnectionEvent.

To show a timeline of events in the Speaker Deck UI, we'd need to join connectionsand connection_events together to materialize it.

SELECT e.*
FROM events e
INNER JOIN
  connections c ON
    e.producer_id = c.producer_id AND
    e.producer_type = c.producer_type
WHERE
  c.consumer_id = :consumer_id AND
  c.consumer_type = :consumer_type
ORDER BY
  e.created_at DESC
LIMIT :limit
OFFSET :offset

This works fine for to a point, but when your system grows to hundreds of thousands (or worse millions) of users and events, read-time materializing with joins gets slow. I've seen it happen in several different applications – notably github.com.

If read-time materialization is slow, how do we speed it up?

Write-time Materialization

To make our reads fast again, the typical path is to denormalize our database. This means our connection_events table will now get one row per combination of event and consumer of that event, instead of one row per event.

If a producer has 300 followers, we'll create 300 rows in the connection_events table – one for each follower. From there, you can index ["consumer_type, "consumer_id"]. Now, you don't need to join to build the timeline.

SELECT *
FROM events
WHERE
  consumer_id = :consumer_id AND
  consumer_type = :consumer_type
ORDER BY
  created_at DESC
LIMIT :limit
OFFSET :offset

Just like that, your timelines will get much faster. But the speed improvements don't have to stop there. You can add created_at to your index compound consumer index. You can even index created_at descending if your database supports that. But now you are generating N * (# of consumers) rows for each event, which means your table is going to get large fast.

Table growth is also a highly solveable problem, but I won't dive into it deeply here. The tldr is you can partition in some fashion. First, maybe you move the connections and connection_events tables to their own server. From there, you can split consumers up into N tables. Then you can move the N tables to N servers.

The path to scaling materialized events is to continue making the bucket of events you have to search through for a given consumer smaller and smaller.

The tldr from the papers

What I took you through above are the two ends of the spectrum – minimum writes and storage verse maximum writes and storage.

One one end, all the work of materializing the timelines is done at read-time.

On the other end, all the work is done at the time of the event – write-time.

Typically, the choice you make is will your system materialize at read-time or write-time.

The key point of the papers above is that making that choice at the system level is inefficient. You are either constantly re-materializing the timeline at read-time or materializing the timeline at write-time when it might not ever be read.

Instead, the papers propose making the decision to materialize at read-time or write-time based on the producer and consumer relationship.

Consumers who constantly check their timelines should have pre-materialized timelines (write-time so reads are snappy).

Producers who generate a lot of events should likely have read-time materialization for any consumers who rarely check their timelines.

The papers push for a score for each producer/consumer relationship. The score determines whether the timeline should be materialized at read-time or write-time.

When materializing a timeline for a given consumer, you merge the two – the pre-materialized (write-time) and the read-time (think joins).

Pushing the decision of when to materialize to the producer-consumer combo allows you to fully optimize the system storage use and query latency.

If you want to lower query latency, you can tweak the score at which you switch to write-time materialization and as such you'll use more storage.

The same is true for the opposite. If you need to control storage use, you can tweak the score which will reduce storage, but increase query latency.

Brilliant!

Why is Speaker Deck read time materialization?

So, if you've been following along you know that we chose read-time materialization for Speaker Deck. Note the nicely normalized connections and connection_events database tables from above.  

Speaker Deck is in the hundreds of thousands of users and talks range. So why did we choose read-time materialization of timelines?

Well, we did and we didn't. Our database tables reflect a read-time materialization of timelines, but really we just punted on materializing timelines at all.

Instead, we push email notifications at the time of the ConnectionEvent and never show the timeline in the web interface.

The email is kind of like a write-time materialization. Every consumer (follower) of a producer (speaker) receives a write (an email) when the producer generates an event (publishes a talk).

I don't know that we'll be able to get away with this forever, but for a first iteration it works great.

People can follow their friends and favorite speakers. When those speakers publish a talk, their followers get a pretty email notification.

It uses the minimum effort to solve the problem of "I want to know when my friends and favorite speakers publish a new talk".

The great thing is we helped Speaker Deck users make progress, but avoided the effort of having to scale a write-time materialization system for now.

The Future

I actually think this could be a pretty sweet project. Using the adapter pattern like flipper, different data stores could be used to accomplish pull (read-time), push (write-time) or some combination of the two.

I had some fun several years ago whipping something together. Don't go and use this project, but if you want to see where my head was a few years ago, the remnants of redis (push), sequel (push and pull) and active record (pull) adapters exist.

So far I've only worked out the two ends of the spectrum – fully read-time and fully write-time. I'd love to spend some more time someday in the middle or see an example of an open source project that makes the decision of read-time or write-time per producer/consumer.

So, that is it. Real-world following and (email) notifications in Rails that is easy to scale into the hundreds of thousands of users.

Oh, and definitely go follow some people on Speaker Deck. If you aren't sure who, you can follow me or we have a featured speakers page that is a good place to start.