DG MS; queue still working. half-way finished cleaning up code.
This commit is contained in:
parent
d91cec0881
commit
4cecb522b6
3 changed files with 57 additions and 90 deletions
|
|
@ -1,4 +1,5 @@
|
||||||
class Post
|
class Post
|
||||||
|
require 'lib/common'
|
||||||
require 'lib/message_handler'
|
require 'lib/message_handler'
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -7,6 +8,7 @@ class Post
|
||||||
include Mongoid::Document
|
include Mongoid::Document
|
||||||
include Mongoid::Timestamps
|
include Mongoid::Timestamps
|
||||||
include ROXML
|
include ROXML
|
||||||
|
include Diaspora::Hookey
|
||||||
|
|
||||||
xml_accessor :owner
|
xml_accessor :owner
|
||||||
xml_accessor :snippet
|
xml_accessor :snippet
|
||||||
|
|
@ -17,32 +19,6 @@ class Post
|
||||||
field :snippet
|
field :snippet
|
||||||
|
|
||||||
before_create :set_defaults
|
before_create :set_defaults
|
||||||
#after_update :notify_friends
|
|
||||||
after_save :notify_friends
|
|
||||||
|
|
||||||
@@queue = MessageHandler.new
|
|
||||||
|
|
||||||
def notify_friends
|
|
||||||
puts "hello"
|
|
||||||
|
|
||||||
xml = prep_webhook
|
|
||||||
#friends_with_permissions.each{ |friend| puts friend; Curl.post( "\"" + xml + "\" " + friend) }
|
|
||||||
@@queue.add_post_request( friends_with_permissions, xml )
|
|
||||||
@@queue.process
|
|
||||||
end
|
|
||||||
|
|
||||||
def prep_webhook
|
|
||||||
self.to_xml.to_s.chomp
|
|
||||||
end
|
|
||||||
|
|
||||||
def friends_with_permissions
|
|
||||||
#friends = Friend.only(:url).map{|x| x = x.url + "/receive/"}
|
|
||||||
#3.times {friends = friends + friends}
|
|
||||||
#friends
|
|
||||||
googles = []
|
|
||||||
100.times{ googles <<"http://google.com/"} #"http://localhost:4567/receive/"} #"http://google.com/"}
|
|
||||||
googles
|
|
||||||
end
|
|
||||||
|
|
||||||
@@models = ["StatusMessage", "Bookmark", "Blog"]
|
@@models = ["StatusMessage", "Bookmark", "Blog"]
|
||||||
|
|
||||||
|
|
@ -65,7 +41,5 @@ class Post
|
||||||
self.source ||= user_email
|
self.source ||= user_email
|
||||||
self.snippet ||= user_email
|
self.snippet ||= user_email
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -22,14 +22,12 @@ module Diaspora
|
||||||
def self.included(klass)
|
def self.included(klass)
|
||||||
|
|
||||||
klass.class_eval do
|
klass.class_eval do
|
||||||
include EventQueue::MessageHandler
|
|
||||||
before_save :notify_friends
|
before_save :notify_friends
|
||||||
|
|
||||||
def notify_friends
|
@@queue = MessageHandler.new
|
||||||
puts "hello"
|
|
||||||
|
|
||||||
|
def notify_friends
|
||||||
xml = prep_webhook
|
xml = prep_webhook
|
||||||
#friends_with_permissions.each{ |friend| puts friend; Curl.post( "\"" + xml + "\" " + friend) }
|
|
||||||
@@queue.add_post_request( friends_with_permissions, xml )
|
@@queue.add_post_request( friends_with_permissions, xml )
|
||||||
@@queue.process
|
@@queue.process
|
||||||
end
|
end
|
||||||
|
|
@ -39,10 +37,7 @@ module Diaspora
|
||||||
end
|
end
|
||||||
|
|
||||||
def friends_with_permissions
|
def friends_with_permissions
|
||||||
#Friend.only(:url).map{|x| x = x.url + "/receive/"}
|
Friend.only(:url).map{|x| x = x.url + "/receive/"}
|
||||||
#googles = []
|
|
||||||
#5.times{ googles <<"http://google.com/"} #"http://localhost:4567/receive/"} #"http://google.com/"}
|
|
||||||
googles
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -5,63 +5,61 @@ require 'dm-core'
|
||||||
require 'eventmachine'
|
require 'eventmachine'
|
||||||
require 'em-http'
|
require 'em-http'
|
||||||
|
|
||||||
|
class MessageHandler
|
||||||
|
|
||||||
|
NUM_TRIES = 3
|
||||||
|
TIMEOUT = 5 #seconds
|
||||||
|
|
||||||
|
def initialize
|
||||||
|
@queue = EM::Queue.new
|
||||||
|
end
|
||||||
|
|
||||||
|
def add_get_request(destinations)
|
||||||
|
destinations.each{ |dest| @queue.push(Message.new(:get, dest))}
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
class MessageHandler
|
def add_post_request(destinations, body)
|
||||||
|
destinations.each{|dest| @queue.push(Message.new(:post, dest, body))}
|
||||||
|
end
|
||||||
|
|
||||||
NUM_TRIES = 3
|
def process
|
||||||
TIMEOUT = 5 #seconds
|
@queue.pop{ |query|
|
||||||
|
case query.type
|
||||||
def initialize
|
when :post
|
||||||
@queue = EM::Queue.new
|
http = EventMachine::HttpRequest.new(query.destination).post :timeout => TIMEOUT, :body => query.body
|
||||||
end
|
http.callback {puts "YAR"; process}
|
||||||
|
when :get
|
||||||
def add_get_request(destinations)
|
http = EventMachine::HttpRequest.new(query.destination).get :timeout => TIMEOUT
|
||||||
destinations.each{ |dest| @queue.push(Message.new(:get, dest))}
|
http.callback {send_to_seed(query, http.response); process}
|
||||||
end
|
else
|
||||||
|
raise "message is not a type I know!"
|
||||||
|
|
||||||
def add_post_request(destinations, body)
|
|
||||||
destinations.each{|dest| @queue.push(Message.new(:post, dest, body))}
|
|
||||||
end
|
|
||||||
|
|
||||||
def process
|
|
||||||
@queue.pop{ |query|
|
|
||||||
case query.type
|
|
||||||
when :post
|
|
||||||
http = EventMachine::HttpRequest.new(query.destination).post :timeout => TIMEOUT, :body => query.body
|
|
||||||
http.callback {puts "YAR"; process}
|
|
||||||
when :get
|
|
||||||
http = EventMachine::HttpRequest.new(query.destination).get :timeout => TIMEOUT
|
|
||||||
http.callback {send_to_seed(query, http.response); process}
|
|
||||||
else
|
|
||||||
raise "message is not a type I know!"
|
|
||||||
end
|
|
||||||
|
|
||||||
http.errback {
|
|
||||||
query.try_count +=1
|
|
||||||
@queue.push query unless query.try_count >= NUM_TRIES
|
|
||||||
process
|
|
||||||
}
|
|
||||||
} unless @queue.size == 0
|
|
||||||
end
|
|
||||||
|
|
||||||
|
|
||||||
def send_to_seed(message, http_response)
|
|
||||||
#DO SOMETHING!
|
|
||||||
end
|
|
||||||
|
|
||||||
def size
|
|
||||||
@queue.size
|
|
||||||
end
|
|
||||||
|
|
||||||
class Message
|
|
||||||
attr_accessor :type, :destination, :body, :try_count
|
|
||||||
def initialize(type, dest, body= nil)
|
|
||||||
@type = type
|
|
||||||
@destination = dest
|
|
||||||
@body = body
|
|
||||||
@try_count = 0
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
http.errback {
|
||||||
|
query.try_count +=1
|
||||||
|
@queue.push query unless query.try_count >= NUM_TRIES
|
||||||
|
process
|
||||||
|
}
|
||||||
|
} unless @queue.size == 0
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
|
def send_to_seed(message, http_response)
|
||||||
|
#DO SOMETHING!
|
||||||
|
end
|
||||||
|
|
||||||
|
def size
|
||||||
|
@queue.size
|
||||||
|
end
|
||||||
|
|
||||||
|
class Message
|
||||||
|
attr_accessor :type, :destination, :body, :try_count
|
||||||
|
def initialize(type, dest, body= nil)
|
||||||
|
@type = type
|
||||||
|
@destination = dest
|
||||||
|
@body = body
|
||||||
|
@try_count = 0
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
end
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue