diff --git a/Gemfile b/Gemfile index 77627808d..cc92bec3d 100644 --- a/Gemfile +++ b/Gemfile @@ -28,9 +28,12 @@ gem 'redfinger', :git => 'git://github.com/rsofaer/redfinger.git' #EventMachine gem 'em-http-request',:git => 'git://github.com/igrigorik/em-http-request.git', :require => 'em-http' -gem 'em-websocket' gem 'thin' +#Websocket +gem 'em-websocket' +gem 'magent', :git => 'http://github.com/dcu/magent.git' + #File uploading gem 'carrierwave', :git => 'git://github.com/rsofaer/carrierwave.git' , :branch => 'master' #Untested mongomapper branch gem 'mini_magick' diff --git a/Gemfile.lock b/Gemfile.lock index 007810850..dd181b320 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -47,6 +47,14 @@ GIT bcrypt-ruby (~> 2.1.2) warden (~> 0.10.7) +GIT + remote: http://github.com/dcu/magent.git + revision: 06513f3dac812469a55f2e365c349af4d2abc92a + specs: + magent (0.4.2) + mongo (>= 0.1.0) + uuidtools (>= 2.0.0) + GIT remote: http://github.com/jnunemaker/mongomapper.git revision: 931dab779011aa7acf60c1a4c7ad19e1ba838345 @@ -213,6 +221,7 @@ GEM treetop (1.4.8) polyglot (>= 0.3.1) tzinfo (0.3.23) + uuidtools (2.1.1) warden (0.10.7) rack (>= 1.0.0) webmock (1.3.5) @@ -242,6 +251,7 @@ DEPENDENCIES haml jnunemaker-validatable (= 1.8.4)! json + magent! mini_magick mocha mongo_mapper (= 0.8.4)! diff --git a/app/controllers/sockets_controller.rb b/app/controllers/sockets_controller.rb index 2852a7160..9b90ea831 100644 --- a/app/controllers/sockets_controller.rb +++ b/app/controllers/sockets_controller.rb @@ -14,7 +14,7 @@ class SocketsController < ApplicationController def outgoing(uid,object,opts={}) @_request = ActionDispatch::Request.new({}) - Diaspora::WebSocket.push_to_user(uid, action_hash(uid, object, opts)) + Diaspora::WebSocket.queue_to_user(uid, action_hash(uid, object, opts)) end end diff --git a/app/models/post.rb b/app/models/post.rb index e2dd53d3d..9f7c3ff27 100644 --- a/app/models/post.rb +++ b/app/models/post.rb @@ -4,6 +4,7 @@ class Post + require 'lib/diaspora/websocket' require 'lib/encryptable' include MongoMapper::Document include ApplicationHelper diff --git a/config/app_config.yml b/config/app_config.yml index 0a206c366..a138424ee 100644 --- a/config/app_config.yml +++ b/config/app_config.yml @@ -8,6 +8,7 @@ development: debug: false socket_debug : false socket_port: 8080 + socket_collection_name: 'websocket' pubsub_server: 'https://pubsubhubbub.appspot.com/' test: diff --git a/config/deploy.rb b/config/deploy.rb index b4870f94d..805eadf4d 100644 --- a/config/deploy.rb +++ b/config/deploy.rb @@ -58,6 +58,11 @@ namespace :deploy do task :start do start_mongo start_thin + start_websocket + end + + task :start_websocket do + run("cd #{current_path} && bundle exec ruby ./script/websocket_server.rb > /dev/null&") end task :start_mongo do diff --git a/lib/diaspora/websocket.rb b/lib/diaspora/websocket.rb index fc79559f6..687bd2b7c 100644 --- a/lib/diaspora/websocket.rb +++ b/lib/diaspora/websocket.rb @@ -6,6 +6,11 @@ module Diaspora module WebSocket + def self.queue_to_user(uid, data) + channel = Magent::GenericChannel.new('websocket') + channel.enqueue({:uid => uid, :data => data}) + end + def self.initialize_channels @channels = {} end @@ -44,6 +49,5 @@ module Diaspora def unsocket_from_uid(id, opts={}) SocketsController.new.outgoing(id, Retraction.for(self), opts) end - end end diff --git a/lib/tasks/db.rake b/lib/tasks/db.rake index 22f995325..80e3bafe5 100644 --- a/lib/tasks/db.rake +++ b/lib/tasks/db.rake @@ -63,6 +63,7 @@ namespace :db do return this.diaspora_handle.charAt(this.diaspora_handle.length-1) == '@' }") + puts "Found #{people.count} people with broken diaspora_handle fields" people.each do |person| if person.owner puts "Resetting diaspora handle for #{person.owner.username}" diff --git a/config/initializers/socket.rb b/script/websocket_server.rb similarity index 58% rename from config/initializers/socket.rb rename to script/websocket_server.rb index 0b1035001..8c466ca77 100644 --- a/config/initializers/socket.rb +++ b/script/websocket_server.rb @@ -2,11 +2,24 @@ # licensed under the Affero General Public License version 3. See # the COPYRIGHT file. +require File.dirname(__FILE__) + '/../config/environment' +require File.dirname(__FILE__) + '/../lib/diaspora/websocket' -require 'em-websocket' -require 'eventmachine' -require 'lib/diaspora/websocket' - EM.next_tick { +CHANNEL = Magent::GenericChannel.new('websocket') +def process_message + if CHANNEL.queue_count > 0 + message = CHANNEL.dequeue + if message + Diaspora::WebSocket.push_to_user(message['uid'], message['data']) + end + EM.next_tick{ process_message} + else + EM::Timer.new(1){process_message} + end + +end + + EM.run { Diaspora::WebSocket.initialize_channels EventMachine::WebSocket.start( @@ -22,5 +35,6 @@ require 'lib/diaspora/websocket' ws.onclose { Diaspora::WebSocket.unsubscribe(ws.request['Path'].gsub('/',''), sid) } } end + process_message } diff --git a/spec/lib/websocket_spec.rb b/spec/lib/websocket_spec.rb new file mode 100644 index 000000000..233f7be48 --- /dev/null +++ b/spec/lib/websocket_spec.rb @@ -0,0 +1,24 @@ +# Copyright (c) 2010, Diaspora Inc. This file is +# licensed under the Affero General Public License version 3. See +# the COPYRIGHT file. + +require File.dirname(__FILE__) + '/../spec_helper' + +describe Diaspora::WebSocket do + before do + @user = Factory.create(:user) + @aspect = @user.aspect(:name => "losers") + @post = @user.build_post(:status_message, :message => "hey", :to => @aspect.id) + end + + it 'should queue a job' do + Diaspora::WebSocket.should_receive(:queue_to_user) + @post.socket_to_uid(@user.id, :aspect_ids => @aspect.id) + end + + it 'The queued job should reach Magent' do + Magent.should_receive(:push) + @post.socket_to_uid(@user.id, :aspect_ids => @aspect.id) + end + +end