DG MS added pubsub gem, added hub to message queue
This commit is contained in:
parent
9e93fd072e
commit
1b63fbbec7
9 changed files with 92 additions and 9 deletions
2
Gemfile
2
Gemfile
|
|
@ -13,7 +13,7 @@ gem "haml"
|
|||
gem 'roxml', :git => "git://github.com/Empact/roxml.git"
|
||||
|
||||
gem 'gpgme'
|
||||
|
||||
gem 'pubsubhubbub'
|
||||
#mai crazy async stuff
|
||||
#gem 'em-synchrony', :git => 'git://github.com/igrigorik/em-synchrony.git', :require => 'em-synchrony/em-http'
|
||||
gem 'em-http-request',:git => 'git://github.com/igrigorik/em-http-request.git', :require => 'em-http'
|
||||
|
|
|
|||
|
|
@ -1,7 +1,8 @@
|
|||
class DashboardsController < ApplicationController
|
||||
|
||||
before_filter :authenticate_user!, :except => :receive
|
||||
before_filter :authenticate_user!, :except => [:receive, :hub]
|
||||
include ApplicationHelper
|
||||
include DashboardsHelper
|
||||
|
||||
def index
|
||||
@posts = Post.paginate :page => params[:page], :order => 'created_at DESC'
|
||||
|
|
@ -15,6 +16,17 @@ class DashboardsController < ApplicationController
|
|||
render :nothing => true
|
||||
end
|
||||
|
||||
def hub
|
||||
if params[:mode] == "subscribe"
|
||||
response.status = subscribe(params)
|
||||
end
|
||||
|
||||
render :nothing => true
|
||||
end
|
||||
|
||||
|
||||
|
||||
|
||||
def warzombie
|
||||
render :nothing => true
|
||||
if User.owner.email == "tom@joindiaspora.com" && StatusMessage.where(:message => "There's a bomb in the lasagna!?").first == nil
|
||||
|
|
|
|||
|
|
@ -1,5 +1,19 @@
|
|||
module DashboardsHelper
|
||||
|
||||
def subscribe(opts = {})
|
||||
subscriber = Subscriber.first(:url => opts[:callback], :topic => opts[:topic])
|
||||
subscriber ||= Subscriber.new(:url => opts[:callback], :topic => opts[:topic])
|
||||
|
||||
if subscriber.save
|
||||
|
||||
if opts[:verify] == 'sync'
|
||||
204
|
||||
elsif opts[:verify] == 'async'
|
||||
202
|
||||
end
|
||||
else
|
||||
400
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
|||
|
|
@ -2,7 +2,8 @@ class Subscriber
|
|||
include MongoMapper::Document
|
||||
|
||||
key :url
|
||||
key :topic
|
||||
|
||||
validates_presence_of :url
|
||||
validates_presence_of :url, :topic
|
||||
|
||||
end
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ Diaspora::Application.routes.draw do |map|
|
|||
|
||||
resources :users
|
||||
match 'receive', :to => 'dashboards#receive'
|
||||
|
||||
match 'hub', :to => 'dashboards#hub'
|
||||
root :to => 'dashboards#index'
|
||||
|
||||
end
|
||||
|
|
|
|||
|
|
@ -61,6 +61,9 @@ module Diaspora
|
|||
recipients.map!{|x| x = x.url + "receive/"}
|
||||
xml = self.class.build_xml_for([self])
|
||||
@@queue.add_post_request( recipients, xml )
|
||||
|
||||
@@queue.add_hub_notification('http://pubsubhubbub.appspot.com/publish/', User.owner.url + self.class.to_s.pluralize.underscore )
|
||||
|
||||
@@queue.process
|
||||
end
|
||||
end
|
||||
|
|
@ -128,8 +131,9 @@ module Diaspora
|
|||
end
|
||||
|
||||
def self.endpoints
|
||||
#generate pubsub, poco, salmon endpoints
|
||||
""
|
||||
<<-XML
|
||||
<link href="http://pubsubhubbub.appspot.com" rel="hub"/>
|
||||
XML
|
||||
end
|
||||
|
||||
def self.subject
|
||||
|
|
|
|||
|
|
@ -17,20 +17,28 @@ class MessageHandler
|
|||
destinations.each{|dest| @queue.push(Message.new(:post, dest, b))}
|
||||
end
|
||||
|
||||
def add_hub_notification(destination, feed_location)
|
||||
@queue.push(Message.new(:pubhub, destination, feed_location))
|
||||
end
|
||||
|
||||
def process
|
||||
@queue.pop{ |query|
|
||||
case query.type
|
||||
when :post
|
||||
http = EventMachine::HttpRequest.new(query.destination).post :timeout => TIMEOUT, :body =>{:xml => query.body}
|
||||
http.callback {process}
|
||||
http = EventMachine::HttpRequest.new(query.destination).post :timeout => TIMEOUT, :body =>{:xml => query.body}
|
||||
http.callback { puts query.destination; process; process}
|
||||
when :get
|
||||
http = EventMachine::HttpRequest.new(query.destination).get :timeout => TIMEOUT
|
||||
http.callback {send_to_seed(query, http.response); process}
|
||||
when :pubhub
|
||||
http = EventMachine::PubSubHubbub.new(query.destination).publish query.body, :timeout => TIMEOUT
|
||||
http.callback { puts "boner city" + http.response ; process}
|
||||
else
|
||||
raise "message is not a type I know!"
|
||||
end
|
||||
|
||||
http.errback {
|
||||
puts http.response
|
||||
puts "failure from #{query.destination}, retrying"
|
||||
query.try_count +=1
|
||||
@queue.push query unless query.try_count >= NUM_TRIES
|
||||
|
|
|
|||
|
|
@ -4,8 +4,8 @@ describe DashboardsController do
|
|||
render_views
|
||||
|
||||
before do
|
||||
request.env['warden'] = mock_model(Warden, :authenticate? => @user, :authenticate! => @user)
|
||||
@user = Factory.create(:user, :profile => Profile.create( :first_name => "bob", :last_name => "smith"))
|
||||
request.env['warden'] = mock_model(Warden, :authenticate? => @user, :authenticate! => @user, :authenticate => @user)
|
||||
end
|
||||
|
||||
it "on index sets a variable containing all a user's friends when a user is signed in" do
|
||||
|
|
@ -15,4 +15,45 @@ describe DashboardsController do
|
|||
assigns[:friends].should == Person.friends.all
|
||||
end
|
||||
|
||||
describe 'PubSubHubBuB intergration' do
|
||||
|
||||
describe 'incoming subscriptions' do
|
||||
it 'should register a friend' do
|
||||
Subscriber.all.count.should == 0
|
||||
|
||||
post :hub, {:callback => "http://example.com/",
|
||||
:mode => 'subscribe',
|
||||
:topic => '/status_messages',
|
||||
:verify => 'async'}
|
||||
response.status.should == 202
|
||||
|
||||
Subscriber.all.count.should == 1
|
||||
end
|
||||
|
||||
it 'should keep track of what topic a subscriber wants' do
|
||||
post :hub, {:callback => "http://example.com/",
|
||||
:mode => 'subscribe',
|
||||
:topic => '/status_messages',
|
||||
:verify => 'async'}
|
||||
Subscriber.first.topic.should == '/status_messages'
|
||||
end
|
||||
end
|
||||
|
||||
it 'should return a 204 for a sync request' do
|
||||
post :hub, {:callback => "http://example.com/",
|
||||
:mode => 'subscribe',
|
||||
:topic => '/status_messages',
|
||||
:verify => 'sync'}
|
||||
response.status.should == 204
|
||||
end
|
||||
|
||||
it 'should confirm subscription of a sync request' do
|
||||
post :hub, {:callback => "http://example.com/",
|
||||
:mode => 'subscribe',
|
||||
:topic => '/status_messages',
|
||||
:verify => 'sync'}
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -5,6 +5,9 @@ describe Subscriber do
|
|||
n = Subscriber.new
|
||||
n.valid?.should be false
|
||||
|
||||
n.topic = '/status_messages'
|
||||
n.valid?.should be false
|
||||
|
||||
n.url = "http://clown.com/"
|
||||
|
||||
n.valid?.should be true
|
||||
|
|
|
|||
Loading…
Reference in a new issue