*目次 [#m579fb68] #contents *参考情報 [#y3bb4c2c] -[[Streaming API Documentation | dev.twitter.com:http://dev.twitter.com/pages/streaming_api]]。公式ドキュメント。 -[[Twitter Streaming APIをRubyで試してみる - しばそんノート:http://d.hatena.ne.jp/shibason/20090816/1250405491]] -[[RubyからTwitterStreamingAPIを使うgem作ったよ! - yayuguのにっき:http://d.hatena.ne.jp/yayugu/20091230/1262183478]] *基本 [#b89d8901] -BASIC認証で使える。一つのアカウントで一つのみ。 -全てのデータがとれるわけではない。firehose, gardenhose, sample(spritzer)のアクセスレベルがアカウントごとに設定されており、許可無しで使えるのはsampleレベルのみ。 -[[Twitter Streaming API を gardenhose レベルで利用する - nobu-qの日記:http://d.hatena.ne.jp/nobu-q/20091202]]によるとgardenhoseレベルは申請すれば割と簡単に許可されるのだろうか?許可を申請するhttp://twitter.com/help/request_streamingのURL自体どこからたどればいいのか分からない。 *Rubyのサンプルコード [#zeca8442] #pre{{ #!/usr/bin/env ruby # coding: utf-8 require 'net/http' require 'uri' require 'rubygems' require 'json' USERNAME = '_USERNAME_' # ここを書き換える PASSWORD = '_PASSWORD_' # ここを書き換える # URLが変更になってるみたい(2010/07/22) #uri = URI.parse('http://stream.twitter.com/spritzer.json') uri = URI.parse('http://stream.twitter.com/1/statuses/sample.json') Net::HTTP.start(uri.host, uri.port) do |http| request = Net::HTTP::Get.new(uri.request_uri) # Streaming APIはBasic認証のみ request.basic_auth(USERNAME, PASSWORD) http.request(request) do |response| raise 'Response is not chuncked' unless response.chunked? response.read_body do |chunk| # 空行は無視する = JSON形式でのパースに失敗したら次へ status = JSON.parse(chunk) rescue next # 削除通知など、'text'パラメータを含まないものは無視して次へ next unless status['text'] user = status['user'] puts "#{user['screen_name']}: #{status['text']}" end end end }} *Tweepy(Python)のサンプルコード [#r2370c91] #pre{{ #!/usr/bin/python # -*- coding: utf-8 -*- import sys import tweepy import simplejson #from pit import Pit class StreamListener(tweepy.StreamListener): def on_data(self, data): data = simplejson.loads(data) if data.has_key('text'): print data['text'] def main(): user = 'xxxxx' passwd = 'yyyyy' stream = tweepy.Stream(user, passwd, StreamListener()) stream.filter(track=('#nhk',)) if __name__ == "__main__": main() }} streaming.pyで処理が行われている。 #pre{{ def filter(self, follow=None, track=None, async=False): params = {} self.headers['Content-type'] = "application/x-www-form-urlencoded" if self.running: raise TweepError('Stream object already connected!') self.url = '/%i/statuses/filter.json?delimited=length' % STREAM_VERSION if follow: params['follow'] = ','.join(map(str, follow)) if track: params['track'] = ','.join(map(str, track)) self.body = urllib.urlencode(params) self._start(async) }}