Twitter Streaming APIをRubyで試してみる

Streaming APIとは

TwitterのStreaming APIをご存知でしょうか。

2009年4月頃から試験的に公開されているAPIなので、ご存知な方も多いと思います。2009年8月現在でまだαテスト中ですが、これを利用すると、push型でリアルタイムに情報を受け取ることができます。

TwitterAPIは基本的にpull型なので、クライアントが能動的にリクエストを発行しなければ情報を受け取ることができません。しかし、1時間あたりのリクエスト回数、1リクエストあたりの最大データ数など、APIには様々な制限がかけられていますので、例えばリアルタイム性の必要なプログラムや、大量のtweetsが必要な統計プログラムなどの用途で通常のAPIを利用するのは厳しいものがあります。

そのような時はStreaming APIを使うと問題が解決するかもしれません。

過半数APIは利用に許可が必要ですし、また通常のAPIとは機能のベクトルが異なるため、「自分のタイムラインを取得して表示する」というような普通のTwitterクライアントライクな用途には使えませんが、アイデアさえあれば色々と面白いことができるのではないかと思います。

Streaming APIの概要

通常のAPIは、

  1. クライアントからTwitterサーバへHTTP接続を開き、
  2. クライアントがリクエストを送出し、
  3. サーバがレスポンスを送り返し、
  4. 接続を閉じる。

という流れになります。

Streaming APIの場合、クライアントがTwitterサーバへ接続を開き、リクエストを送るところまでは同じなのですが、その後はエラー等で切断されるまでずっと接続を維持し続けます。その間、サーバからは次々にレスポンスが送られてきますので、クライアントは順次それを受け取って処理していくことになります。

Streaming APIを使う上での注意点

詳しくはAPIのドキュメントに書いてありますが、重要なもののみまとめると、

  • 同一アカウントからの同時接続数は1本のみ。同一アカウントから2本以上の接続があった場合、古い接続は強制的に切断される。*1
  • 使用するHTTPライブラリ*2は、レスポンスボディを全て読み込んでから返すタイプではなく、順次読み込んでいけるタイプのものでなくてはならない。
  • サーバは接続の維持のために空行を送り返すことがあるので、クライアントはそれを許容できる*3作りでなくてはならない。
  • 通常のstatusの他に、「ユーザがstatusを削除した」という通知など、他の情報も混ざって送られてくるので、適切に処理すること。
  • エラーや通信の遅延など、様々な要因によって接続が切断されることもあるので、必要ならば自動で再接続する仕組みを組み込むこと。
  • 一部のAPIは膨大な量のデータを連続して返すので、それを受けるプログラムはパフォーマンスに関して相当な努力が必要かもしれない。
  • サービスの品質はベストエフォートである。

といったところだと思います。

使ってみる

百聞は一見にしかず…ということで、試しに一回Streaming APIを使ってみることにします。

Streaming APIの種類は後述しますが、ここではとりあえず"spritzer" APIをサンプルとします。これはPublic Timelineからランダムに抽出された発言が延々と流れてくるAPIです。

#!/usr/bin/env ruby
# coding: utf-8

require 'net/http'
require 'uri'
require 'rubygems'
require 'json'

USERNAME = '_USERNAME_' # ここを書き換える
PASSWORD = '_PASSWORD_' # ここを書き換える

uri = URI.parse('http://stream.twitter.com/spritzer.json')
Net::HTTP.start(uri.host, uri.port) do |http|
  request = Net::HTTP::Get.new(uri.request_uri)
  # Streaming APIはBasic認証のみ
  request.basic_auth(USERNAME, PASSWORD)
  http.request(request) do |response|
    raise 'Response is not chuncked' unless response.chunked?
    response.read_body do |chunk|
      # 空行は無視する = JSON形式でのパースに失敗したら次へ
      status = JSON.parse(chunk) rescue next
      # 削除通知など、'text'パラメータを含まないものは無視して次へ
      next unless status['text']
      user = status['user']
      puts "#{user['screen_name']}: #{status['text']}"
    end
  end
end

実行すると、Ctrl+Cなどでプロセスを終了するまで、標準出力にずらずらと発言が出てくると思います。私が試したときはだいたい秒間5〜10発言程度といった速度でした。Public Timelineはグローバルなので、大半は英語で埋まることになります。

実用的に使うのであれば、取得したデータを標準出力に送るのではなく、例えばファイルやデータベース経由、あるいはプロセス間通信などで別プロセスに引き渡し、別プロセスでリアルタイムにそれらを解析してなんらかの統計情報を得る…とか、そういう使い方になるのではないかと思います。*4

ちなみに、上のコードはレスポンスに"Transfer-Encoding: chunked"がセットされていることを期待した作りになっています。Streaming APIでは基本的にそうなっているようなので問題ないとは思いますが、仮に"Response is not chuncked"のエラーが出るようであれば*5、次のように明示的に区切り文字を指定してデータを取り出すような処理にすれば大丈夫だと思います。

#!/usr/bin/env ruby
# coding: utf-8

require 'net/http'
require 'uri'
require 'rubygems'
require 'json'

USERNAME = '_USERNAME_' # ここを書き換える
PASSWORD = '_PASSWORD_' # ここを書き換える

# Net::HTTPResponseクラスにeach_lineメソッドを追加
module Net
  class HTTPResponse
    def each_line(rs = "\n")
      stream_check
      while line = @socket.readuntil(rs)
        yield line
      end
      self
    end
  end
end

uri = URI.parse('http://stream.twitter.com/spritzer.json')
Net::HTTP.start(uri.host, uri.port) do |http|
  request = Net::HTTP::Get.new(uri.request_uri)
  request.basic_auth(USERNAME, PASSWORD)
  http.request(request) do |response|
    response.each_line("\r\n") do |line|
      status = JSON.parse(line) rescue next
      next unless status['text']
      user = status['user']
      puts "#{user['screen_name']}: #{status['text']}"
    end
  end
end

この際、データの区切りは"\n"ではなく"\r"であることに注意してください。*6"\n"は本文内にも出現し得るためです。細かいことはAPIのドキュメントに書いてあります。

Streaming APIの種類

Streaming APIには2009/08/16現在7つのAPIが用意されていますが、使い方は基本的に全て同じです。

タイプ別に大きく3つに分けることができます。

Public Timeline垂れ流し系

Public Timelineの内容、つまり鍵のかかっていない全ての発言を取得します。

このタイプのAPIには"firehose", "gardenhose", "spritzer"の3つが用意されています。

firehoseはPublic Timelineの全ての発言を含みます。利用には許可が必要です。

gardenhoseはfirehoseからランダムにサンプリングした発言を含みます。利用には許可が必要です

spritzerはgardenhoseよりも更に少なくサンプリングした発言を含みます。誰でも利用できます。

GETメソッドでリクエストを送ります。"count", "delimited"パラメータを取ることができますが、説明は省略します。

鍵をかけていない全てのユーザの全ての発言が流れてきますので、当然firehoseは恐ろしい量のデータが送られてくることになります。一番量の少ないspritzerでもかなりの勢いですので、この辺を扱う際は十分な注意が必要でしょう。

特定ユーザの発言追跡系

"follow"パラメータで指定したユーザの発言、及びそのユーザに向けられた発言だけを取得できます。*7鍵付きのユーザの発言は含まれません。

このタイプのAPIには"birddog", "shadow", "follow"の3つが用意されています。

birddogはfollowするユーザを20万まで指定できます。利用には許可が必要です。

shadowはfollowするユーザを5万まで指定できます。利用には許可が必要です。

followはfollowするユーザを200まで指定できます。誰でも利用できます。

POSTメソッドでリクエストを送ります。"follow"パラメータには、追跡したいユーザのID*8をカンマ区切りで並べて指定します。"delimited"パラメータについては省略します。

特定キーワードの抽出系

"track"パラメータで指定したキーワードを含む発言だけを取得できます。鍵付きのユーザの発言は含まれません。

このタイプのAPIには"track"のみが用意されています。誰でも利用できます。

POSTメソッドでリクエストを送ります。"track"パラメータには、キーワードをカンマ区切りで並べて指定します。キーワードは50個まで、1つのキーワードの長さは1〜30バイトです。

キーワードは大文字・小文字を区別せず、単語単位で検索されます。複数指定した場合はORになります。

この“単語単位”が曲者で、現状英語での単語単位…つまりスペースやその他記号で区切られた場合しかヒットしないようで、日本語キーワードを指定した場合はほとんど機能しません。

ただ、ハッシュタグなんかに対する抽出には使えますし*9、また例えば"nicovideo"を指定するとニコニコ動画のURLを含む発言を抽出できますので、使いようによってはなんとかなることもあります。

もう一回使ってみる

最後にもう一つサンプルを。今度は"track" APIを使ってみます。

#!/usr/bin/env ruby
# coding: utf-8

require 'net/http'
require 'uri'
require 'rubygems'
require 'json'

USERNAME = '_USERNAME_' # ここを書き換える
PASSWORD = '_PASSWORD_' # ここを書き換える

uri = URI.parse('http://stream.twitter.com/track.json')
Net::HTTP.start(uri.host, uri.port) do |http|
  request = Net::HTTP::Post.new(uri.request_uri)
  request.set_form_data('track' => 'bit')
  request.basic_auth(USERNAME, PASSWORD)
  http.request(request) do |response|
    raise 'Response is not chuncked' unless response.chunked?
    response.read_body do |chunk|
      status = JSON.parse(chunk) rescue next
      next unless status['text'] && status['text'].include?('http://bit.ly')
      user = status['user']
      puts "#{user['screen_name']}: #{status['text']}"
    end
  end
end

実行すると、bit.lyによる短縮URLを含んだ発言がずらーっと出てきます。さすがにかなりの勢いになりますので注意してください。

キーワードにドットを含めると上手くいかないようなので、"bit"で抽出して、その後スクリプト内で更にふるいにかけています。

このデータを加工すれば、bit.lyでどんなURLが注目を集めているか、といった統計を得ることができます。確か既にそんなサービスがあったように記憶していますが、おそらくこんな感じでStreaming APIを利用しているのではないでしょうか。

まとめ

まだαテスト中なため、品質面では過度な期待はできませんし、このAPI自体今後どう変わっていくかわかりません。*10しかし、それでもアイデア次第で様々な可能性が秘められている機能だと思います。一度Streaming APIで遊んでみるのはいかがでしょう。

*1:通常のAPIとStreaming APIの併用は可能。

*2:もしくはそれに類するもの

*3:単に無視すれば良いです。

*4:RubyならマルチスレッドでQueueを使えば簡単にその類の連携ができます。

*5:上のコードの場合、認証失敗などでエラーが発生した際もこれが出ますので、そこは区別するよう注意してください。

*6:"\r"のあとには常に"\n"が続くようなので、ここではまとめて区切り文字としています。

*7:より厳密に言うと、"in_reply_to_*"パラメータでそのユーザが指定されている発言です。@付きで言及されていても、"in_reply_to_*"パラメータで指定されていない場合は含まれません。

*8:必ず数値で指定

*9:ハッシュタグを検索する場合、シャープ(#)をキーワードに含める必要はありません。勝手に上手いことやってくれます。

*10:APIのメソッド名もコードネームっぽいですし、正式リリースの時には変わりそうな気がします。