Merge pull request #75 from wuddup/master

New twitter plugin with support for API 1.1
This commit is contained in:
Ryan Hitchman 2013-07-04 11:25:51 -07:00
commit f3c034df2e
2 changed files with 102 additions and 110 deletions

View File

@ -1,134 +1,58 @@
"""
twitter.py: written by Scaevolus 2009
retrieves most recent tweets
"""
import random import random
import re import re
from time import strptime, strftime from time import strptime, strftime
from util import hook, http from util import hook, http
def unescape_xml(string):
# unescape the 5 chars that might be escaped in xml
# gratuitously functional
# return reduce(lambda x, y: x.replace(*y), (string,
# zip('&gt; &lt; &apos; &quote; &amp'.split(), '> < \' " &'.split()))
# boring, normal
return string.replace('&gt;', '>').replace('&lt;', '<').replace('&apos;',
"'").replace('&quote;', '"').replace('&amp;', '&')
history = []
history_max_size = 250
@hook.command @hook.command
def twitter(inp): def twitter(inp, bot=None):
".twitter <user>/<user> <n>/<id>/#<hashtag>/@<user> -- gets last/<n>th "\ ".twitter <user>/<id> -- get <user>'s last tweet/get tweet <id>"
"tweet from <user>/gets tweet <id>/gets random tweet with #<hashtag>/"\
"gets replied tweet from @<user>"
def add_reply(reply_name, reply_id): api_keys = {}
if len(history) == history_max_size: api_keys['consumer'] = bot.config.get("api_keys", {}).get("twitter_consumer", None)
history.pop() api_keys['consumer_secret'] = bot.config.get("api_keys", {}).get("twitter_consumer_secret", None)
history.insert(0, (reply_name, reply_id)) api_keys['access'] = bot.config.get("api_keys", {}).get("twitter_access", None)
api_keys['access_secret'] = bot.config.get("api_keys", {}).get("twitter_access_secret", None)
def find_reply(reply_name): for k in api_keys:
for name, id in history: if api_keys[k] is None:
if name == reply_name: return "error: api keys not set"
return id if id != -1 else name
if inp[0] == '@':
reply_inp = find_reply(inp[1:])
if reply_inp == None:
return 'error: no replies to %s found' % inp
inp = reply_inp
url = 'http://api.twitter.com/1'
getting_nth = False
getting_id = False getting_id = False
searching_hashtag = False
time = 'status/created_at'
text = 'status/text'
reply_name = 'status/in_reply_to_screen_name'
reply_id = 'status/in_reply_to_status_id'
reply_user = 'status/in_reply_to_user_id'
if re.match(r'^\d+$', inp): if re.match(r'^\d+$', inp):
getting_id = True getting_id = True
url += '/statuses/show/%s.xml' % inp request_url = "https://api.twitter.com/1.1/statuses/show.json?id=%s" % inp
screen_name = 'user/screen_name'
time = 'created_at'
text = 'text'
reply_name = 'in_reply_to_screen_name'
reply_id = 'in_reply_to_status_id'
reply_user = 'in_reply_to_user_id'
elif re.match(r'^\w{1,15}$', inp):
url += '/users/show/%s.xml' % inp
screen_name = 'screen_name'
elif re.match(r'^\w{1,15}\s+\d+$', inp):
getting_nth = True
name, num = inp.split()
if int(num) > 3200:
return 'error: only supports up to the 3200th tweet'
url += '/statuses/user_timeline/%s.xml?count=1&page=%s' % (name, num)
screen_name = 'status/user/screen_name'
elif re.match(r'^#\w+$', inp):
url = 'http://search.twitter.com/search.atom?q=%23' + inp[1:]
searching_hashtag = True
else: else:
return 'error: invalid request' request_url = "https://api.twitter.com/1.1/statuses/user_timeline.json?screen_name=%s" % inp
try: try:
tweet = http.get_xml(url) tweet = http.get_json(request_url, oauth=True, oauth_keys=api_keys)
except http.HTTPError, e: except http.HTTPError, e:
errors = {400: 'bad request (ratelimited?)', errors = {400: 'bad request (ratelimited?)',
401: 'tweet is private', 401: 'unauthorized',
403: 'tweet is private', 403: 'forbidden',
404: 'invalid user/id', 404: 'invalid user/id',
500: 'twitter is broken', 500: 'twitter is broken',
502: 'twitter is down ("getting upgraded")', 502: 'twitter is down ("getting upgraded")',
503: 'twitter is overloaded (lol, RoR)'} 503: 'twitter is overloaded (lol, RoR)',
410: 'twitter shut off api v1.' }
if e.code == 404: if e.code == 404:
return 'error: invalid ' + ['username', 'tweet id'][getting_id] return 'error: invalid ' + ['username', 'tweet id'][getting_id]
if e.code in errors: if e.code in errors:
return 'error: ' + errors[e.code] return 'error: ' + errors[e.code]
return 'error: unknown %s' % e.code return 'error: unknown %s' % e.code
except http.URLerror, e:
return 'error: timeout'
if searching_hashtag: if getting_id:
ns = '{http://www.w3.org/2005/Atom}' text = tweet["text"]
tweets = tweet.findall(ns + 'entry/' + ns + 'id') screen_name = tweet["user"]["screen_name"]
if not tweets: time = tweet["created_at"]
return 'error: hashtag not found' else:
id = random.choice(tweets).text text = tweet[0]["text"]
id = id[id.rfind(':') + 1:] screen_name = tweet[0]["user"]["screen_name"]
return twitter(id) time = tweet[0]["created_at"]
if getting_nth: text = text.replace('&gt;', '>').replace('&lt;', '<').replace('&apos;',"'").replace('&quote;', '"').replace('&amp;', '&')
if tweet.find('status') is None: time = strftime('%Y-%m-%d %H:%M:%S', strptime(time, '%a %b %d %H:%M:%S +0000 %Y'))
return 'error: user does not have that many tweets'
time = tweet.find(time)
if time is None:
return 'error: user has no tweets'
reply_name = tweet.find(reply_name).text
reply_id = tweet.find(reply_id).text
reply_user = tweet.find(reply_user).text
if reply_name is not None and (reply_id is not None or
reply_user is not None):
add_reply(reply_name, reply_id or -1)
time = strftime('%Y-%m-%d %H:%M:%S',
strptime(time.text,
'%a %b %d %H:%M:%S +0000 %Y'))
text = unescape_xml(tweet.find(text).text.replace('\n', ''))
screen_name = tweet.find(screen_name).text
return "%s %s: %s" % (time, screen_name, text) return "%s %s: %s" % (time, screen_name, text)

View File

@ -1,11 +1,16 @@
# convenience wrapper for urllib2 & friends # convenience wrapper for urllib2 & friends
import binascii
import cookielib import cookielib
import hmac
import json import json
import random
import string
import time
import urllib import urllib
import urllib2 import urllib2
import urlparse import urlparse
from hashlib import sha1
from urllib import quote, quote_plus as _quote_plus from urllib import quote, quote_plus as _quote_plus
from urllib2 import HTTPError, URLError from urllib2 import HTTPError, URLError
@ -38,7 +43,7 @@ def get_json(*args, **kwargs):
def open(url, query_params=None, user_agent=None, referer=None, post_data=None, def open(url, query_params=None, user_agent=None, referer=None, post_data=None,
get_method=None, cookies=False, **kwargs): get_method=None, cookies=False, oauth=False, oauth_keys=None, **kwargs):
if query_params is None: if query_params is None:
query_params = {} query_params = {}
@ -60,11 +65,22 @@ def open(url, query_params=None, user_agent=None, referer=None, post_data=None,
if referer is not None: if referer is not None:
request.add_header('Referer', referer) request.add_header('Referer', referer)
if oauth:
nonce = oauth_nonce()
timestamp = oauth_timestamp()
api_url, req_data = string.split(url, "?")
unsigned_request = oauth_unsigned_request(nonce, timestamp, req_data, oauth_keys['consumer'], oauth_keys['access'])
signature = oauth_sign_request("GET", api_url, req_data, unsigned_request, oauth_keys['consumer_secret'], oauth_keys['access_secret'])
header = oauth_build_header(nonce, signature, timestamp, oauth_keys['consumer'], oauth_keys['access'])
request.add_header('Authorization', header)
if cookies: if cookies:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(jar)) opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(jar))
else: else:
opener = urllib2.build_opener() opener = urllib2.build_opener()
return opener.open(request) return opener.open(request)
@ -92,6 +108,58 @@ def to_utf8(s):
def quote_plus(s): def quote_plus(s):
return _quote_plus(to_utf8(s)) return _quote_plus(to_utf8(s))
def oauth_nonce():
return ''.join([str(random.randint(0, 9)) for i in range(8)])
def oauth_timestamp():
return str(int(time.time()))
def oauth_unsigned_request(nonce, timestamp, req, consumer, token):
d = { 'oauth_consumer_key':consumer,
'oauth_nonce':nonce,
'oauth_signature_method':'HMAC-SHA1',
'oauth_timestamp':timestamp,
'oauth_token':token,
'oauth_version':'1.0' }
k,v = string.split(req, "=")
d[k] = v
unsigned_req = ''
for x in sorted(d, key=lambda key: key):
unsigned_req += x + "=" + d[x] + "&"
unsigned_req = quote(unsigned_req[:-1])
return unsigned_req
def oauth_build_header(nonce, signature, timestamp, consumer, token):
d = { 'oauth_consumer_key':consumer,
'oauth_nonce':nonce,
'oauth_signature':signature,
'oauth_signature_method':'HMAC-SHA1',
'oauth_timestamp':timestamp,
'oauth_token':token,
'oauth_version':'1.0' }
header='OAuth '
for x in sorted(d, key=lambda key: key):
header += x + '="' + d[x] + '", '
return header[:-1]
def oauth_sign_request(method, url, params, unsigned_request, consumer_secret, token_secret):
key = consumer_secret + "&" + token_secret
base = method + "&" + quote(url, '') + "&" + unsigned_request
hash = hmac.new(key, base, sha1)
signature = quote(binascii.b2a_base64(hash.digest())[:-1])
return signature
def unescape(s): def unescape(s):
if not s.strip(): if not s.strip():