Rails: Following and Notifications
For the curious, here is a quick introduction to the feature.
Ok, now you have the bird's eye view. Let's get into the database schema and the code. Along the way, I'll also take you down a winding path that includes academic papers and some future ideas.
The Connection
The heart of following is a connection. A connection connects a consumer to a producer. The producer is the thing that creates the events that the consumer wants to be notified about.
create_table "connections", force: :cascade do |t|
t.integer "consumer_id", null: false
t.string "consumer_type", limit: 50, null: false
t.integer "producer_id", null: false
t.string "producer_type", limit: 50, null: false
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["consumer_type", "consumer_id", "producer_type", "producer_id"], name: "consumer_to_producer", unique: true
t.index ["producer_type", "producer_id"], name: "index_connections_on_producer_type_and_producer_id"
end
In the case of Speaker Deck, the consumer is the person who is following a speaker. The producer is the speaker they are following.
I know that I won't have a type that is greater than 50, so I added limits to both consumer_type
and producer_type
.
For starting indexes, I created a unique index on ["consumer_type", "consumer_id", "producer_type", "producer_id"]
. I don't want someone to be able to follow another person more than once. This unique index prevents that. It also covers any queries to get all the producers a given consumer is following.
But the unique index doesn't cover the opposite – showing all the followers of a given producer. For this, I added another index on ["producer_type", "producer_id"]
.
Typically, you want the most selective column first in an index. In our case, there will be dramatically more unique values for producer_id
than producer_type
, thus producer_id
would be more selective.
The problem with an index of ["producer_id", "producer_type"]
is that I'd have to do a table scan to get all the rows matching a particular producer_type
or the distinct producer_type
values or how often those values are used. I know from history that these are useful, so I tend to index type before id.
Enough about indexes, here is a taste of the model:
class Connection < ApplicationRecord
VALID_CONSUMER_TYPES = Set["User"].freeze
VALID_PRODUCER_TYPES = Set["User"].freeze
belongs_to :consumer, polymorphic: true
belongs_to :producer, polymorphic: true
scope :for_producer, ->(producer) { where(producer: producer) }
scope :for_consumer, ->(consumer) { where(consumer: consumer) }
validates :consumer_id, presence: true, numericality: true
validates :consumer_type, presence: true, length: {maximum: 50},
inclusion: {in: VALID_CONSUMER_TYPES }
validates :producer_id, presence: true, numericality: true
validates :producer_type, presence: true, length: {maximum: 50},
inclusion: {in: VALID_PRODUCER_TYPES }
end
So that is how people connect to (aka follow) each other. But how do we know when to notify those consumers about producer activity?
The Event
First, let's talk about what I could have done. I could have added a callback in the Talk
model. After each save the callback would determine if it was time to notify anyone. If so, the callback would loop through all the consumers of the producer that published the talk and email each of them.
Nothing wrong with the aforementioned, but our Talk
model would have grown quite a bit larger. Instead, I went the route of a new database-backed model ConnectionEvent
.
Let's check out the schema:
create_table "connection_events", force: :cascade do |t|
t.integer "producer_id", null: false
t.string "producer_type", limit: 50, null: false
t.integer "object_id", null: false
t.string "object_type", limit: 50, null: false
t.integer "type", null: false
t.jsonb "data", default: {}, null: false
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.integer "state", limit: 2, default: 0, null: false
t.index ["object_type", "object_id"], name: "index_connection_events_on_object_type_and_object_id"
t.index ["producer_type", "producer_id"], name: "index_connection_events_on_producer_type_and_producer_id"
end
You'll recognize producer again. Additionally, this table introduces an object
. The object is the thing that the producer produced. In the case of Speaker Deck, the producer is the talk owner. The talk they publish is the object.
The type
is the what type of event occurred for the object. For now, we only have :publish
, but we have several ideas for other ConnectionEvent
types in the future (e.g. :follow
, :star
, etc.).
state
keeps track of processing for us. Each event starts as pending. Every N
minutes we run a task to grab all pending events and process them. When this occurs, the event moves from pending
to processing
and, finally, to processed
.
There is nothing special to see in the ConnectionEvent
model. It looks just like Connection
, so I'll skip over it since I can already sense this is going to be a doozy of a post.
So I lied to you a bit. I do actually have a callback on Talk, but instead of doing all the work it just creates the ConnectionEvent
. So how do all those connection events someday end up as emails in your inbox?
The ConnectionEventProcessor
Every N
minutes we run a task. It looks like this:
namespace :connection_events do
task process: :environment do
logger = Rails.env.development? ? Logger.new(STDOUT) : Rails.logger
ConnectionEventProcessor.call(logger: logger)
end
end
The primary goal of the processor is to enqueue a job for each event that can handle the notification. This keeps the processor snappy and moves any possible backups to the job system.
# Gets all pending connection events and enqueues notification jobs for them.
class ConnectionEventProcessor
def self.call(logger: Rails.logger)
new(logger: logger).call
end
def initialize(logger: Rails.logger)
@logger = logger
end
def call
processed = 0
ConnectionEvent.ready_for_processing.order(:id).find_each do |connection_event|
connection_event.processing!
@logger.info "ConnectionEventProcessor processing #{connection_event.inspect}"
begin
Resque.enqueue ConnectionEventNotificationJob, connection_event.id
rescue => error
@logger.info "ConnectionEventProcessor pending #{connection_event.inspect} due to #{error.inspect}"
connection_event.pending!
end
@logger.info "ConnectionEventProcessor enqueued ConnectionEventNotificationJob for #{connection_event.inspect}"
connection_event.processed!
@logger.info "ConnectionEventProcessor processed #{connection_event.inspect}"
processed += 1
end
@logger.info "ConnectionEventProcessor processed #{processed} event(s)"
end
end
Pardon all the logging, but I like verbosity for when things go wrong. I was going to remove it for the post, but I think it is good to show real production stuff. Real production code has problems and needs to be debugged.
The majority of the above code is a bunch of logging to show what is happening and a bunch of rescuing so one notification event doesn't cause problems to any other notification event.
The ConnectionEventNotificationJob
Once again, I'm going to give you the real guts of this code and all the error handling that comes with it.
# Sends all the notifications for a given connection event.
class ConnectionEventNotificationJob < ApplicationJob
@queue = :notifications
def self.perform(connection_event_id)
new(connection_event_id).perform
end
def initialize(connection_event_id)
@connection_event_id = connection_event_id
end
def perform
# Can we find the connection event.
unless connection_event
Rails.logger.info "#{self.class.name} skipping because connection event not found (#{@connection_event_id}"
return
end
# We only want this to deal with publish events for now.
unless connection_event.publish?
Rails.logger.info "#{self.class.name} skipping because invalid type #{connection_event.inspect}"
return
end
# Can we find the talk.
talk = connection_event.object
unless talk
Rails.logger.info "#{self.class.name} skipping because object missing #{connection_event.inspect}"
return
end
# Can we find the talk owner.
unless talk.owner
Rails.logger.info "#{self.class.name} skipping because talk owner missing #{connection_event.inspect}"
return
end
# Notify the owner that their talk is published and processed.
notify talk.owner
# Can we find the producer.
producer = connection_event.producer
unless producer
Rails.logger.info "#{self.class.name} skipping because producer missing #{connection_event.inspect}"
return
end
# Notify each of the producer's followers.
producer.followers.not_abusive.find_each { |follower| notify follower }
end
private
def connection_event
@connection_event ||= ConnectionEvent.find_by_id(@connection_event_id)
end
def notify(user)
return unless Flipper.enabled?(:connections, user)
Rails.logger.info "#{self.class.name} delivering #{connection_event.inspect} to #{user.username}"
begin
ConnectionEventMailer.published(connection_event.object, user).deliver
rescue => exception
Raven.capture_exception(exception, {
extra: {
connection_event_id: @connection_event_id,
user_id: user.id,
}
})
end
end
end
Again, the key point here is lots of error handling and rescuing, so failing to notify a single user doesn't affect the notifying of other users.
You'll also notice that all of this is wrapped with a Flipper feature. Every top level feature I work on gets wrapped by a Flipper feature. This makes it super easy to turn things off if necessary.
I occasionally have short term features when switching parts of a system up, but most top level features are wrapped in a way that they could be disabled without interrupting users.
At this point, we have a fully functional following and notification system. Not very fancy eh? But it does the trick and works great in production. That is what I care about. I like boring. Sorry not sorry.
Hopefully this is straightforward so far. But why did I add this extra polymorphic cruft and unusual verbiage? Wouldn't a Follow
model with follower_id
and followee_id
be more simple? I'm glad you asked.
The Papers
Many moons ago I read a couple papers that changed the way I think about following, timelines and notifications.
The first was Feeding Frenzy: Selectively Materializing Users’ Event Feeds. The second was the sequel to that paper – named Feed Following: The Big Data Challenge in Social Applications.
Both are easy and enjoyable reads (and I don't consider myself an academic). I'll do my best to tldr the papers for you.
Every system does reads and writes. Typical systems do far more reads than writes. Social applications can be a mixed bag. But I'm getting ahead of myself. Let's talk through two ways to materialize timelines based on consumers, producers, connections and connection events.
Read-time Materialization
To optimize for fast writes, you normalize your database tables and materialize the timeline when requested (aka user says show me what's new). This is the database structure that Speaker Deck currently uses (as noted above).
When you follow someone, we do one write to store the Connection
. When you publish a talk, we do one write to store the ConnectionEvent
.
To show a timeline of events in the Speaker Deck UI, we'd need to join connections
and connection_events
together to materialize it.
SELECT e.*
FROM events e
INNER JOIN
connections c ON
e.producer_id = c.producer_id AND
e.producer_type = c.producer_type
WHERE
c.consumer_id = :consumer_id AND
c.consumer_type = :consumer_type
ORDER BY
e.created_at DESC
LIMIT :limit
OFFSET :offset
This works fine for to a point, but when your system grows to hundreds of thousands (or worse millions) of users and events, read-time materializing with joins gets slow. I've seen it happen in several different applications – notably github.com.
If read-time materialization is slow, how do we speed it up?
Write-time Materialization
To make our reads fast again, the typical path is to denormalize our database. This means our connection_events
table will now get one row per combination of event and consumer of that event, instead of one row per event.
If a producer has 300 followers, we'll create 300 rows in the connection_events
table – one for each follower. From there, you can index ["consumer_type, "consumer_id"]
. Now, you don't need to join to build the timeline.
SELECT *
FROM events
WHERE
consumer_id = :consumer_id AND
consumer_type = :consumer_type
ORDER BY
created_at DESC
LIMIT :limit
OFFSET :offset
Just like that, your timelines will get much faster. But the speed improvements don't have to stop there. You can add created_at
to your index compound consumer index. You can even index created_at
descending if your database supports that. But now you are generating N * (# of consumers)
rows for each event, which means your table is going to get large fast.
Table growth is also a highly solveable problem, but I won't dive into it deeply here. The tldr is you can partition in some fashion. First, maybe you move the connections
and connection_events
tables to their own server. From there, you can split consumers up into N
tables. Then you can move the N
tables to N
servers.
The path to scaling materialized events is to continue making the bucket of events you have to search through for a given consumer smaller and smaller.
The tldr from the papers
What I took you through above are the two ends of the spectrum – minimum writes and storage verse maximum writes and storage.
One one end, all the work of materializing the timelines is done at read-time.
On the other end, all the work is done at the time of the event – write-time.
Typically, the choice you make is will your system materialize at read-time or write-time.
The key point of the papers above is that making that choice at the system level is inefficient. You are either constantly re-materializing the timeline at read-time or materializing the timeline at write-time when it might not ever be read.
Instead, the papers propose making the decision to materialize at read-time or write-time based on the producer and consumer relationship.
Consumers who constantly check their timelines should have pre-materialized timelines (write-time so reads are snappy).
Producers who generate a lot of events should likely have read-time materialization for any consumers who rarely check their timelines.
The papers push for a score for each producer/consumer relationship. The score determines whether the timeline should be materialized at read-time or write-time.
When materializing a timeline for a given consumer, you merge the two – the pre-materialized (write-time) and the read-time (think joins).
Pushing the decision of when to materialize to the producer-consumer combo allows you to fully optimize the system storage use and query latency.
If you want to lower query latency, you can tweak the score at which you switch to write-time materialization and as such you'll use more storage.
The same is true for the opposite. If you need to control storage use, you can tweak the score which will reduce storage, but increase query latency.
Brilliant!
Why is Speaker Deck read time materialization?
So, if you've been following along you know that we chose read-time materialization for Speaker Deck. Note the nicely normalized connections
and connection_events
database tables from above.
Speaker Deck is in the hundreds of thousands of users and talks range. So why did we choose read-time materialization of timelines?
Well, we did and we didn't. Our database tables reflect a read-time materialization of timelines, but really we just punted on materializing timelines at all.
Instead, we push email notifications at the time of the ConnectionEvent
and never show the timeline in the web interface.
The email is kind of like a write-time materialization. Every consumer (follower) of a producer (speaker) receives a write (an email) when the producer generates an event (publishes a talk).
I don't know that we'll be able to get away with this forever, but for a first iteration it works great.
People can follow their friends and favorite speakers. When those speakers publish a talk, their followers get a pretty email notification.
It uses the minimum effort to solve the problem of "I want to know when my friends and favorite speakers publish a new talk".
The great thing is we helped Speaker Deck users make progress, but avoided the effort of having to scale a write-time materialization system for now.
The Future
I actually think this could be a pretty sweet project. Using the adapter pattern like flipper, different data stores could be used to accomplish pull (read-time), push (write-time) or some combination of the two.
I had some fun several years ago whipping something together. Don't go and use this project, but if you want to see where my head was a few years ago, the remnants of redis (push), sequel (push and pull) and active record (pull) adapters exist.
So far I've only worked out the two ends of the spectrum – fully read-time and fully write-time. I'd love to spend some more time someday in the middle or see an example of an open source project that makes the decision of read-time or write-time per producer/consumer.
So, that is it. Real-world following and (email) notifications in Rails that is easy to scale into the hundreds of thousands of users.
Oh, and definitely go follow some people on Speaker Deck. If you aren't sure who, you can follow me or we have a featured speakers page that is a good place to start.