Skip to content

Examples

block_tracker.rb

rb
#!/usr/bin/env ruby

# Example: monitor the network for people blocking your account or adding you to mute lists.

# load skyfall from a local folder - you normally won't need this
$LOAD_PATH.unshift(File.expand_path('../lib', __dir__))

require 'json'
require 'open-uri'
require 'skyfall'

$monitored_did = ARGV[0]

if $monitored_did.to_s.empty?
  puts "Usage: #{$PROGRAM_NAME} <monitored_did>"
  exit 1
elsif ARGV[0] !~ /^did:plc:[a-z0-9]{24}$/
  puts "Not a valid DID: #{$monitored_did}"
  exit 1
end

sky = Skyfall::Firehose.new('bsky.network', :subscribe_repos)

sky.on_connect { puts "Connected, monitoring #{$monitored_did}" }
sky.on_disconnect { puts "Disconnected" }
sky.on_reconnect { puts "Reconnecting..." }
sky.on_error { |e| puts "ERROR: #{e}" }

sky.on_message do |msg|
  # we're only interested in repo commit messages
  next if msg.type != :commit

  msg.operations.each do |op|
    next if op.action != :create

    begin
      case op.type
      when :bsky_block
        process_block(msg, op)
      when :bsky_listitem
        process_list_item(msg, op)
      end
    rescue StandardError => e
      puts "Error: #{e}"
    end
  end
end

def process_block(msg, op)
  if op.raw_record['subject'] == $monitored_did
    owner_handle = get_user_handle(op.repo)
    puts "@#{owner_handle} has blocked you! (#{msg.time.getlocal})"
  end
end

def process_list_item(msg, op)
  if op.raw_record['subject'] == $monitored_did
    owner_handle = get_user_handle(op.repo)

    list_uri = op.raw_record['list']
    list_name = get_list_name(list_uri)

    puts "@#{owner_handle} has added you to list \"#{list_name}\" (#{msg.time.getlocal})"
  end
end

def get_user_handle(did)
  url = "https://plc.directory/#{did}"
  json = JSON.parse(URI.open(url).read)
  json['alsoKnownAs'][0].gsub('at://', '')
end

def get_list_name(list_uri)
  repo, type, rkey = list_uri.gsub('at://', '').split('/')
  url = "https://bsky.social/xrpc/com.atproto.repo.getRecord?repo=#{repo}&collection=#{type}&rkey=#{rkey}"

  json = JSON.parse(URI.open(url).read)
  json['value']['name']
end

# close the connection cleanly on Ctrl+C
trap("SIGINT") { sky.disconnect }

sky.connect

jet_monitor_phrases.rb

rb
#!/usr/bin/env ruby

# Example: monitor new posts for mentions of one or more words or phrases (e.g. anyone mentioning your name or the name
# of your company, project etc.). This example uses a Jetstream connection.

# load skyfall from a local folder - you normally won't need this
$LOAD_PATH.unshift(File.expand_path('../lib', __dir__))

require 'json'
require 'open-uri'
require 'skyfall'

terms = ARGV.map(&:downcase)

if terms.empty?
  puts "Usage: #{$PROGRAM_NAME} <word_or_phrase> [<word_or_phrase>...]"
  exit 1
end

# tell Jetstream to send us only post records
sky = Skyfall::Jetstream.new('jetstream2.us-east.bsky.network', { wanted_collections: [:bsky_post] })

sky.on_message do |msg|
  # we're only interested in repo commit messages
  next if msg.type != :commit

  msg.operations.each do |op|
    # ignore any operations other than "create post"
    next unless op.action == :create && op.type == :bsky_post

    text = op.raw_record['text'].to_s.downcase

    if terms.any? { |x| text.include?(x) }
      owner_handle = get_user_handle(op.repo)
      puts "\n#{msg.time.getlocal} @#{owner_handle}: #{op.raw_record['text']}"
    end
  end
end

def get_user_handle(did)
  url = "https://plc.directory/#{did}"
  json = JSON.parse(URI.open(url).read)
  json['alsoKnownAs'][0].gsub('at://', '')
end

sky.on_connect { puts "Connected" }
sky.on_disconnect { puts "Disconnected" }
sky.on_reconnect { puts "Reconnecting..." }
sky.on_error { |e| puts "ERROR: #{e}" }

# close the connection cleanly on Ctrl+C
trap("SIGINT") { sky.disconnect }

sky.connect
rb
#!/usr/bin/env ruby

# Example: print the date and text of every new post made on the network as they appear.

# load skyfall from a local folder - you normally won't need this
$LOAD_PATH.unshift(File.expand_path('../lib', __dir__))

require 'skyfall'

sky = Skyfall::Firehose.new('bsky.network', :subscribe_repos)

sky.on_message do |msg|
  # we're only interested in repo commit messages
  next if msg.type != :commit

  msg.operations.each do |op|
    # ignore any operations other than "create post"
    next unless op.action == :create && op.type == :bsky_post

    puts "#{op.repo}#{msg.time.getlocal}"
    puts op.raw_record['text']
    puts
  end
end

sky.on_connect { puts "Connected" }
sky.on_disconnect { puts "Disconnected" }
sky.on_reconnect { puts "Reconnecting..." }
sky.on_error { |e| puts "ERROR: #{e}" }

# close the connection cleanly on Ctrl+C
trap("SIGINT") { sky.disconnect }

sky.connect

push_notifications.rb

rb
#!/usr/bin/env ruby

# Example: send push notifications to a client app about interactions with a given account.

# load skyfall from a local folder - you normally won't need this
$LOAD_PATH.unshift(File.expand_path('../lib', __dir__))

require 'json'
require 'open-uri'
require 'skyfall'

monitored_did = ARGV[0]

if monitored_did.to_s.empty?
  puts "Usage: #{$PROGRAM_NAME} <monitored_did>"
  exit 1
elsif monitored_did !~ /^did:plc:[a-z0-9]{24}$/
  puts "Not a valid DID: #{monitored_did}"
  exit 1
end

class InvalidURIException < StandardError
  def initialize(uri)
    super("Invalid AT URI: #{uri}")
  end
end

class AtURI
  attr_reader :did, :collection, :rkey

  def initialize(uri)
    if uri =~ /\Aat:\/\/(did:[\w]+:[\w\.\-]+)\/([\w\.]+)\/([\w\-]+)\z/
      @did = $1
      @collection = $2
      @rkey = $3
    else
      raise InvalidURIException, uri
    end
  end
end

class NotificationEngine
  def initialize(user_did)
    @user_did = user_did
  end

  def connect
    @sky = Skyfall::Firehose.new('bsky.network', :subscribe_repos)

    @sky.on_connect { puts "Connected, monitoring #{@user_did}" }
    @sky.on_disconnect { puts "Disconnected" }
    @sky.on_reconnect { puts "Reconnecting..." }
    @sky.on_error { |e| puts "ERROR: #{e}" }

    @sky.on_message do |msg|
      process_message(msg)
    end

    @sky.connect
  end

  def disconnect
    @sky.disconnect
  end

  def process_message(msg)
    # we're only interested in repo commit messages
    return if msg.type != :commit

    # ignore user's own actions
    return if msg.repo == @user_did

    msg.operations.each do |op|
      next if op.action != :create

      begin
        case op.type
        when :bsky_post
          process_post(msg, op)
        when :bsky_like
          process_like(msg, op)
        when :bsky_repost
          process_repost(msg, op)
        when :bsky_follow
          process_follow(msg, op)
        end
      rescue StandardError => e
        puts "Error: #{e}"
      end
    end
  end


  # posts

  def process_post(msg, op)
    data = op.raw_record

    if reply = data['reply']
      # check for replies (direct only)
      if reply['parent'] && reply['parent']['uri']
        parent_uri = AtURI.new(reply['parent']['uri'])

        if parent_uri.did == @user_did
          send_reply_notification(msg, op)
        end
      end
    end

    if embed = data['embed']
      # check for quotes
      if embed['record'] && embed['record']['uri']
        quoted_uri = AtURI.new(embed['record']['uri'])

        if quoted_uri.did == @user_did
          send_quote_notification(msg, op)
        end
      end

      # second type of quote (recordWithMedia)
      if embed['record'] && embed['record']['record'] && embed['record']['record']['uri']
        quoted_uri = AtURI.new(embed['record']['record']['uri'])

        if quoted_uri.did == @user_did
          send_quote_notification(msg, op)
        end
      end
    end

    if facets = data['facets']
      # check for mentions
      if facets.any? { |f| f['features'] && f['features'].any? { |x| x['did'] == @user_did }}
        send_mention_notification(msg, op)
      end
    end
  end

  def send_reply_notification(msg, op)
    handle = get_user_handle(msg.repo)

    send_push("@#{handle} replied:", op.raw_record)
  end

  def send_quote_notification(msg, op)
    handle = get_user_handle(msg.repo)

    send_push("@#{handle} quoted you:", op.raw_record)
  end

  def send_mention_notification(msg, op)
    handle = get_user_handle(msg.repo)

    send_push("@#{handle} mentioned you:", op.raw_record)
  end


  # likes

  def process_like(msg, op)
    data = op.raw_record

    if data['subject'] && data['subject']['uri']
      liked_uri = AtURI.new(data['subject']['uri'])

      if liked_uri.did == @user_did
        case liked_uri.collection
        when 'app.bsky.feed.post'
          send_post_like_notification(msg, liked_uri)
        when 'app.bsky.feed.generator'
          send_feed_like_notification(msg, liked_uri)
        end
      end
    end
  end

  def send_post_like_notification(msg, uri)
    handle = get_user_handle(msg.repo)
    post = get_record(uri)

    send_push("@#{handle} liked your post", post)
  end

  def send_feed_like_notification(msg, uri)
    handle = get_user_handle(msg.repo)
    feed = get_record(uri)

    send_push("@#{handle} liked your feed", feed)
  end


  # reposts

  def process_repost(msg, op)
    data = op.raw_record

    if data['subject'] && data['subject']['uri']
      reposted_uri = AtURI.new(data['subject']['uri'])

      if reposted_uri.did == @user_did && reposted_uri.collection == 'app.bsky.feed.post'
        send_repost_notification(msg, reposted_uri)
      end
    end
  end

  def send_repost_notification(msg, uri)
    handle = get_user_handle(msg.repo)
    post = get_record(uri)

    send_push("@#{handle} reposted your post", post)
  end


  # follows

  def process_follow(msg, op)
    if op.raw_record['subject'] == @user_did
      send_follow_notification(msg)
    end
  end

  def send_follow_notification(msg)
    handle = get_user_handle(msg.repo)

    send_push("@#{handle} followed you", msg.repo)
  end


  #
  # Note: in this example, we're calling the Bluesky AppView to get details about the person interacting with the user
  # and the post/feed that was liked/reposted etc. In a real app, you might run into rate limits if you do that,
  # because these requests will all be sent from the server's IP.
  #
  # So you might need to take a different route and send just the info that you have here in the push notification data
  # (the AT URI / DID) and fetch the details on the client side, e.g. in a Notification Service Extension on iOS.
  #

  def get_user_handle(did)
    url = "https://api.bsky.app/xrpc/app.bsky.actor.getProfile?actor=#{did}"
    json = JSON.parse(URI.open(url).read)
    json['handle']
  end

  def get_record(uri)
    url = "https://api.bsky.app/xrpc/com.atproto.repo.getRecord?" +
          "repo=#{uri.did}&collection=#{uri.collection}&rkey=#{uri.rkey}"
    json = JSON.parse(URI.open(url).read)
    json['value']
  end

  def send_push(message, data = nil)
    # send the message to APNS/FCM here
    puts
    puts "[#{Time.now}] #{message} #{data&.inspect}"
  end
end

engine = NotificationEngine.new(monitored_did)

# close the connection cleanly on Ctrl+C
trap("SIGINT") { engine.disconnect }

engine.connect