Examples
block_tracker.rb
rb
#!/usr/bin/env ruby
# Example: monitor the network for people blocking your account or adding you to mute lists.
# load skyfall from a local folder - you normally won't need this
$LOAD_PATH.unshift(File.expand_path('../lib', __dir__))
require 'json'
require 'open-uri'
require 'skyfall'
$monitored_did = ARGV[0]
if $monitored_did.to_s.empty?
puts "Usage: #{$PROGRAM_NAME} <monitored_did>"
exit 1
elsif ARGV[0] !~ /^did:plc:[a-z0-9]{24}$/
puts "Not a valid DID: #{$monitored_did}"
exit 1
end
sky = Skyfall::Firehose.new('bsky.network', :subscribe_repos)
sky.on_connect { puts "Connected, monitoring #{$monitored_did}" }
sky.on_disconnect { puts "Disconnected" }
sky.on_reconnect { puts "Reconnecting..." }
sky.on_error { |e| puts "ERROR: #{e}" }
sky.on_message do |msg|
# we're only interested in repo commit messages
next if msg.type != :commit
msg.operations.each do |op|
next if op.action != :create
begin
case op.type
when :bsky_block
process_block(msg, op)
when :bsky_listitem
process_list_item(msg, op)
end
rescue StandardError => e
puts "Error: #{e}"
end
end
end
def process_block(msg, op)
if op.raw_record['subject'] == $monitored_did
owner_handle = get_user_handle(op.repo)
puts "@#{owner_handle} has blocked you! (#{msg.time.getlocal})"
end
end
def process_list_item(msg, op)
if op.raw_record['subject'] == $monitored_did
owner_handle = get_user_handle(op.repo)
list_uri = op.raw_record['list']
list_name = get_list_name(list_uri)
puts "@#{owner_handle} has added you to list \"#{list_name}\" (#{msg.time.getlocal})"
end
end
def get_user_handle(did)
url = "https://plc.directory/#{did}"
json = JSON.parse(URI.open(url).read)
json['alsoKnownAs'][0].gsub('at://', '')
end
def get_list_name(list_uri)
repo, type, rkey = list_uri.gsub('at://', '').split('/')
url = "https://bsky.social/xrpc/com.atproto.repo.getRecord?repo=#{repo}&collection=#{type}&rkey=#{rkey}"
json = JSON.parse(URI.open(url).read)
json['value']['name']
end
# close the connection cleanly on Ctrl+C
trap("SIGINT") { sky.disconnect }
sky.connectjet_monitor_phrases.rb
rb
#!/usr/bin/env ruby
# Example: monitor new posts for mentions of one or more words or phrases (e.g. anyone mentioning your name or the name
# of your company, project etc.). This example uses a Jetstream connection.
# load skyfall from a local folder - you normally won't need this
$LOAD_PATH.unshift(File.expand_path('../lib', __dir__))
require 'json'
require 'open-uri'
require 'skyfall'
terms = ARGV.map(&:downcase)
if terms.empty?
puts "Usage: #{$PROGRAM_NAME} <word_or_phrase> [<word_or_phrase>...]"
exit 1
end
# tell Jetstream to send us only post records
sky = Skyfall::Jetstream.new('jetstream2.us-east.bsky.network', { wanted_collections: [:bsky_post] })
sky.on_message do |msg|
# we're only interested in repo commit messages
next if msg.type != :commit
msg.operations.each do |op|
# ignore any operations other than "create post"
next unless op.action == :create && op.type == :bsky_post
text = op.raw_record['text'].to_s.downcase
if terms.any? { |x| text.include?(x) }
owner_handle = get_user_handle(op.repo)
puts "\n#{msg.time.getlocal} @#{owner_handle}: #{op.raw_record['text']}"
end
end
end
def get_user_handle(did)
url = "https://plc.directory/#{did}"
json = JSON.parse(URI.open(url).read)
json['alsoKnownAs'][0].gsub('at://', '')
end
sky.on_connect { puts "Connected" }
sky.on_disconnect { puts "Disconnected" }
sky.on_reconnect { puts "Reconnecting..." }
sky.on_error { |e| puts "ERROR: #{e}" }
# close the connection cleanly on Ctrl+C
trap("SIGINT") { sky.disconnect }
sky.connectprint_all_posts.rb
rb
#!/usr/bin/env ruby
# Example: print the date and text of every new post made on the network as they appear.
# load skyfall from a local folder - you normally won't need this
$LOAD_PATH.unshift(File.expand_path('../lib', __dir__))
require 'skyfall'
sky = Skyfall::Firehose.new('bsky.network', :subscribe_repos)
sky.on_message do |msg|
# we're only interested in repo commit messages
next if msg.type != :commit
msg.operations.each do |op|
# ignore any operations other than "create post"
next unless op.action == :create && op.type == :bsky_post
puts "#{op.repo} • #{msg.time.getlocal}"
puts op.raw_record['text']
puts
end
end
sky.on_connect { puts "Connected" }
sky.on_disconnect { puts "Disconnected" }
sky.on_reconnect { puts "Reconnecting..." }
sky.on_error { |e| puts "ERROR: #{e}" }
# close the connection cleanly on Ctrl+C
trap("SIGINT") { sky.disconnect }
sky.connectpush_notifications.rb
rb
#!/usr/bin/env ruby
# Example: send push notifications to a client app about interactions with a given account.
# load skyfall from a local folder - you normally won't need this
$LOAD_PATH.unshift(File.expand_path('../lib', __dir__))
require 'json'
require 'open-uri'
require 'skyfall'
monitored_did = ARGV[0]
if monitored_did.to_s.empty?
puts "Usage: #{$PROGRAM_NAME} <monitored_did>"
exit 1
elsif monitored_did !~ /^did:plc:[a-z0-9]{24}$/
puts "Not a valid DID: #{monitored_did}"
exit 1
end
class InvalidURIException < StandardError
def initialize(uri)
super("Invalid AT URI: #{uri}")
end
end
class AtURI
attr_reader :did, :collection, :rkey
def initialize(uri)
if uri =~ /\Aat:\/\/(did:[\w]+:[\w\.\-]+)\/([\w\.]+)\/([\w\-]+)\z/
@did = $1
@collection = $2
@rkey = $3
else
raise InvalidURIException, uri
end
end
end
class NotificationEngine
def initialize(user_did)
@user_did = user_did
end
def connect
@sky = Skyfall::Firehose.new('bsky.network', :subscribe_repos)
@sky.on_connect { puts "Connected, monitoring #{@user_did}" }
@sky.on_disconnect { puts "Disconnected" }
@sky.on_reconnect { puts "Reconnecting..." }
@sky.on_error { |e| puts "ERROR: #{e}" }
@sky.on_message do |msg|
process_message(msg)
end
@sky.connect
end
def disconnect
@sky.disconnect
end
def process_message(msg)
# we're only interested in repo commit messages
return if msg.type != :commit
# ignore user's own actions
return if msg.repo == @user_did
msg.operations.each do |op|
next if op.action != :create
begin
case op.type
when :bsky_post
process_post(msg, op)
when :bsky_like
process_like(msg, op)
when :bsky_repost
process_repost(msg, op)
when :bsky_follow
process_follow(msg, op)
end
rescue StandardError => e
puts "Error: #{e}"
end
end
end
# posts
def process_post(msg, op)
data = op.raw_record
if reply = data['reply']
# check for replies (direct only)
if reply['parent'] && reply['parent']['uri']
parent_uri = AtURI.new(reply['parent']['uri'])
if parent_uri.did == @user_did
send_reply_notification(msg, op)
end
end
end
if embed = data['embed']
# check for quotes
if embed['record'] && embed['record']['uri']
quoted_uri = AtURI.new(embed['record']['uri'])
if quoted_uri.did == @user_did
send_quote_notification(msg, op)
end
end
# second type of quote (recordWithMedia)
if embed['record'] && embed['record']['record'] && embed['record']['record']['uri']
quoted_uri = AtURI.new(embed['record']['record']['uri'])
if quoted_uri.did == @user_did
send_quote_notification(msg, op)
end
end
end
if facets = data['facets']
# check for mentions
if facets.any? { |f| f['features'] && f['features'].any? { |x| x['did'] == @user_did }}
send_mention_notification(msg, op)
end
end
end
def send_reply_notification(msg, op)
handle = get_user_handle(msg.repo)
send_push("@#{handle} replied:", op.raw_record)
end
def send_quote_notification(msg, op)
handle = get_user_handle(msg.repo)
send_push("@#{handle} quoted you:", op.raw_record)
end
def send_mention_notification(msg, op)
handle = get_user_handle(msg.repo)
send_push("@#{handle} mentioned you:", op.raw_record)
end
# likes
def process_like(msg, op)
data = op.raw_record
if data['subject'] && data['subject']['uri']
liked_uri = AtURI.new(data['subject']['uri'])
if liked_uri.did == @user_did
case liked_uri.collection
when 'app.bsky.feed.post'
send_post_like_notification(msg, liked_uri)
when 'app.bsky.feed.generator'
send_feed_like_notification(msg, liked_uri)
end
end
end
end
def send_post_like_notification(msg, uri)
handle = get_user_handle(msg.repo)
post = get_record(uri)
send_push("@#{handle} liked your post", post)
end
def send_feed_like_notification(msg, uri)
handle = get_user_handle(msg.repo)
feed = get_record(uri)
send_push("@#{handle} liked your feed", feed)
end
# reposts
def process_repost(msg, op)
data = op.raw_record
if data['subject'] && data['subject']['uri']
reposted_uri = AtURI.new(data['subject']['uri'])
if reposted_uri.did == @user_did && reposted_uri.collection == 'app.bsky.feed.post'
send_repost_notification(msg, reposted_uri)
end
end
end
def send_repost_notification(msg, uri)
handle = get_user_handle(msg.repo)
post = get_record(uri)
send_push("@#{handle} reposted your post", post)
end
# follows
def process_follow(msg, op)
if op.raw_record['subject'] == @user_did
send_follow_notification(msg)
end
end
def send_follow_notification(msg)
handle = get_user_handle(msg.repo)
send_push("@#{handle} followed you", msg.repo)
end
#
# Note: in this example, we're calling the Bluesky AppView to get details about the person interacting with the user
# and the post/feed that was liked/reposted etc. In a real app, you might run into rate limits if you do that,
# because these requests will all be sent from the server's IP.
#
# So you might need to take a different route and send just the info that you have here in the push notification data
# (the AT URI / DID) and fetch the details on the client side, e.g. in a Notification Service Extension on iOS.
#
def get_user_handle(did)
url = "https://api.bsky.app/xrpc/app.bsky.actor.getProfile?actor=#{did}"
json = JSON.parse(URI.open(url).read)
json['handle']
end
def get_record(uri)
url = "https://api.bsky.app/xrpc/com.atproto.repo.getRecord?" +
"repo=#{uri.did}&collection=#{uri.collection}&rkey=#{uri.rkey}"
json = JSON.parse(URI.open(url).read)
json['value']
end
def send_push(message, data = nil)
# send the message to APNS/FCM here
puts
puts "[#{Time.now}] #{message} #{data&.inspect}"
end
end
engine = NotificationEngine.new(monitored_did)
# close the connection cleanly on Ctrl+C
trap("SIGINT") { engine.disconnect }
engine.connect