push_notifications.rb
Example: send push notifications to a client app about interactions with a given account.
We will monitor the firehose for any events that are relevant to a given user (in practice, you would track some number of accounts that have push notification tokens listed in your database) and then notify the configured mobile app push notification service (here we just print it to stdout).
rb
require 'didkit'
require 'minisky'
require 'skyfall'
$handle = ARGV[0]
if $handle.empty?
puts "Usage: #{$PROGRAM_NAME} <handle | did>"
exit 1
end
$monitored_did = DID.resolve_handle($handle).to_s
if $monitored_did.empty?
puts "Couldn't resolve handle: #{$handle}"
exit 1
end
class InvalidURIException < StandardError
def initialize(uri)
super("Invalid AT URI: #{uri}")
end
end
class AtURI
attr_reader :did, :collection, :rkey
def initialize(uri)
if uri =~ /\Aat:\/\/(did:[\w]+:[\w\.\-]+)\/([\w\.]+)\/([\w\-]+)\z/
@did = $1
@collection = $2
@rkey = $3
else
raise InvalidURIException, uri
end
end
end
class NotificationEngine
def initialize(user_did)
@user_did = user_did
@appview = Minisky.new('api.bsky.app', nil)
end
def connect
@sky = Skyfall::Firehose.new('bsky.network', :subscribe_repos)
@sky.on_connect { puts "Connected, monitoring #{@user_did}" }
@sky.on_disconnect { puts "Disconnected" }
@sky.on_reconnect { puts "Reconnecting..." }
@sky.on_error { |e| puts "ERROR: #{e}" }
@sky.on_message do |msg|
process_message(msg)
end
@sky.connect
end
def disconnect
@sky.disconnect
end
def process_message(msg)
# we're only interested in repo commit messages
return if msg.type != :commit
# ignore user's own actions
return if msg.repo == @user_did
msg.operations.each do |op|
next if op.action != :create
begin
case op.type
when :bsky_post
process_post(msg, op)
when :bsky_like
process_like(msg, op)
when :bsky_repost
process_repost(msg, op)
when :bsky_follow
process_follow(msg, op)
end
rescue StandardError => e
puts "Error: #{e}"
end
end
end
# posts
def process_post(msg, op)
data = op.raw_record
if reply = data['reply']
# check for replies (direct only)
if uri = reply.dig('parent', 'uri')
parent_uri = AtURI.new(uri)
if parent_uri.did == @user_did
send_reply_notification(msg, op)
end
end
end
if embed = data['embed']
# check for quotes
if uri = embed.dig('record', 'uri')
quoted_uri = AtURI.new(uri)
if quoted_uri.did == @user_did
send_quote_notification(msg, op)
end
end
# second type of quote (recordWithMedia)
if uri = embed.dig('record', 'record', 'uri')
quoted_uri = AtURI.new(uri)
if quoted_uri.did == @user_did
send_quote_notification(msg, op)
end
end
end
if facets = data['facets']
# check for mentions
if facets.any? { |f| f['features']&.any? { |x| x['did'] == @user_did }}
send_mention_notification(msg, op)
end
end
end
def send_reply_notification(msg, op)
handle = get_user_handle(msg.repo)
send_push("@#{handle} replied:", op.raw_record)
end
def send_quote_notification(msg, op)
handle = get_user_handle(msg.repo)
send_push("@#{handle} quoted you:", op.raw_record)
end
def send_mention_notification(msg, op)
handle = get_user_handle(msg.repo)
send_push("@#{handle} mentioned you:", op.raw_record)
end
# likes
def process_like(msg, op)
data = op.raw_record
if uri = data.dig('subject', 'uri')
liked_uri = AtURI.new(uri)
if liked_uri.did == @user_did
case liked_uri.collection
when 'app.bsky.feed.post'
send_post_like_notification(msg, liked_uri)
when 'app.bsky.feed.generator'
send_feed_like_notification(msg, liked_uri)
end
end
end
end
def send_post_like_notification(msg, uri)
handle = get_user_handle(msg.repo)
post = get_record(uri)
send_push("@#{handle} liked your post", post)
end
def send_feed_like_notification(msg, uri)
handle = get_user_handle(msg.repo)
feed = get_record(uri)
send_push("@#{handle} liked your feed", feed)
end
# reposts
def process_repost(msg, op)
data = op.raw_record
if uri = data.dig('subject', 'uri')
reposted_uri = AtURI.new(uri)
if reposted_uri.did == @user_did && reposted_uri.collection == 'app.bsky.feed.post'
send_repost_notification(msg, reposted_uri)
end
end
end
def send_repost_notification(msg, uri)
handle = get_user_handle(msg.repo)
post = get_record(uri)
send_push("@#{handle} reposted your post", post)
end
# follows
def process_follow(msg, op)
if op.raw_record['subject'] == @user_did
send_follow_notification(msg)
end
end
def send_follow_notification(msg)
handle = get_user_handle(msg.repo)
send_push("@#{handle} followed you", msg.repo)
end
# Note: in this example, we're calling the Bluesky AppView to get details about the
# person interacting with the user and the post/feed that was liked/reposted etc.
# In a real app, you might run into rate limits if you do that, because these requests
# will all be sent from the server's IP.
#
# So you might need to take a different route and send just the info that you have
# here in the push notification data (the AT URI / DID) and fetch the details on the
# client side, e.g. in a Notification Service Extension on iOS.
def get_user_handle(did)
DID.new(did).get_verified_handle
# or, faster but less correct way (doesn't verify the handle):
# DID.new(did).document.handles.first
end
def get_record(uri)
json = @appview.get_request('com.atproto.repo.getRecord', {
repo: uri.did,
collection: uri.collection,
rkey: uri.rkey
})
json['value']
end
def send_push(message, data = nil)
# send the message to APNS/FCM here
puts
puts "[#{Time.now}] #{message} #{data&.inspect}"
end
end
engine = NotificationEngine.new($monitored_did)
# close the connection cleanly on Ctrl+C
trap("SIGINT") { engine.disconnect }
engine.connect