From 0e77577fbd6d409a30d6a0e0281510df0a884caa Mon Sep 17 00:00:00 2001 From: Raphael Date: Fri, 10 Dec 2010 11:23:27 -0800 Subject: [PATCH] Websocket now uses redis --- Gemfile | 1 - Gemfile.lock | 10 -------- lib/diaspora/web_socket.rb | 19 ++++++++++---- script/websocket_server.rb | 5 ++-- spec/lib/diaspora/web_socket_spec.rb | 38 ++++++++++++++++++++++++++++ spec/lib/websocket_spec.rb | 37 --------------------------- 6 files changed, 54 insertions(+), 56 deletions(-) create mode 100644 spec/lib/diaspora/web_socket_spec.rb delete mode 100644 spec/lib/websocket_spec.rb diff --git a/Gemfile b/Gemfile index 4a0949787..d17024072 100644 --- a/Gemfile +++ b/Gemfile @@ -40,7 +40,6 @@ gem 'thin' #Websocket gem 'em-websocket', :git => 'git://github.com/igrigorik/em-websocket' -gem 'magent', :git => 'git://github.com/dcu/magent.git' #File uploading gem 'carrierwave', :git => 'git://github.com/rsofaer/carrierwave.git' , :branch => 'master' #Untested mongomapper branch diff --git a/Gemfile.lock b/Gemfile.lock index 38869e74d..577d91226 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -13,15 +13,6 @@ GIT devise-mongo_mapper (0.0.1) devise (~> 1.1.0) -GIT - remote: git://github.com/dcu/magent.git - revision: 663d493e0058901d8a2baec00e4d20e867b1359d - specs: - magent (0.5.2) - em-websocket - mongo - uuidtools - GIT remote: git://github.com/iain/http_accept_language.git revision: 0b78aa7849fc90cf9e12586af162fa4c408a795d @@ -410,7 +401,6 @@ DEPENDENCIES jasmine! json launchy - magent! mini_magick mocha mongo_mapper! diff --git a/lib/diaspora/web_socket.rb b/lib/diaspora/web_socket.rb index 19888ebef..da856ce92 100644 --- a/lib/diaspora/web_socket.rb +++ b/lib/diaspora/web_socket.rb @@ -4,22 +4,31 @@ module Diaspora module WebSocket + def self.redis + @redis ||= Resque.redis + end + def self.length + redis.llen :websocket + end def self.queue_to_user(uid, data) - channel = Magent::GenericChannel.new('websocket') - channel.enqueue({:uid => uid, :data => data}) + redis.lpush(:websocket, {:uid => uid, :data => data}.to_json) end def self.initialize_channels @channels = {} end + def self.next + redis.rpop(:websocket) + end + def self.push_to_user(uid, data) - Rails.logger.debug "Websocketing to #{uid}" + Rails.logger.debug "event=socket-push uid=#{uid}" @channels[uid.to_id][0].push(data) if @channels[uid.to_id] end def self.subscribe(uid, ws) - Rails.logger.debug "Subscribing socket to #{uid}" + Rails.logger.debug "event=socket-subscribe uid=#{uid}" self.ensure_channel(uid) @channels[uid][0].subscribe{ |msg| ws.send msg } @channels[uid][1] += 1 @@ -30,7 +39,7 @@ module Diaspora end def self.unsubscribe(uid,sid) - Rails.logger.debug "Unsubscribing socket #{sid} from #{uid}" + Rails.logger.debug "event=socket-unsubscribe sid=#{sid} uid=#{uid}" @channels[uid][0].unsubscribe(sid) if @channels[uid] @channels[uid][1] -= 1 if @channels[uid][1] <= 0 diff --git a/script/websocket_server.rb b/script/websocket_server.rb index f338916ad..b8da36d04 100644 --- a/script/websocket_server.rb +++ b/script/websocket_server.rb @@ -28,10 +28,9 @@ def debug_pp thing pp thing if APP_CONFIG[:socket_debug] || ENV['SOCKET_DEBUG'] end -CHANNEL = Magent::GenericChannel.new('websocket') def process_message - if CHANNEL.queue_count > 0 - message = CHANNEL.dequeue + if Diaspora::WebSocket.length > 0 + message = JSON::parse(Diaspora::WebSocket.next) if message Diaspora::WebSocket.push_to_user(message['uid'], message['data']) end diff --git a/spec/lib/diaspora/web_socket_spec.rb b/spec/lib/diaspora/web_socket_spec.rb new file mode 100644 index 000000000..521b224d9 --- /dev/null +++ b/spec/lib/diaspora/web_socket_spec.rb @@ -0,0 +1,38 @@ +# Copyright (c) 2010, Diaspora Inc. This file is +# licensed under the Affero General Public License version 3 or later. See +# the COPYRIGHT file. + +require 'spec_helper' +require 'lib/diaspora/web_socket' +describe Diaspora::WebSocket do + before do + @mock_redis = mock() + Diaspora::WebSocket.stub(:redis).and_return @mock_redis + end + describe '.next' do + it 'pops the data from redis' do + @mock_redis.should_receive(:rpop).with(:websocket) + Diaspora::WebSocket.next + end + end + describe '.queue_to_user' do + it 'push the data into redis' do + @mock_redis.should_receive(:lpush).with(:websocket, {:uid => "me", :data => "Socket!"}.to_json) + Diaspora::WebSocket.queue_to_user("me", "Socket!") + end + end +end + +describe Diaspora::Socketable do + before do + @user = make_user + @aspect = @user.aspects.create(:name => "losers") + @post = @user.build_post(:status_message, :message => "hey", :to => @aspect.id) + @post.save + end + + it 'sockets to a user' do + Diaspora::WebSocket.should_receive(:queue_to_user) + @post.socket_to_uid(@user.id, :aspect_ids => @aspect.id) + end +end diff --git a/spec/lib/websocket_spec.rb b/spec/lib/websocket_spec.rb deleted file mode 100644 index 2615ac82d..000000000 --- a/spec/lib/websocket_spec.rb +++ /dev/null @@ -1,37 +0,0 @@ -# Copyright (c) 2010, Diaspora Inc. This file is -# licensed under the Affero General Public License version 3 or later. See -# the COPYRIGHT file. - -require 'spec_helper' - -describe Diaspora::WebSocket do - before do - @user = make_user - @aspect = @user.aspects.create(:name => "losers") - @post = @user.build_post(:status_message, :message => "hey", :to => @aspect.id) - @post.save - end - - it 'should queue a job' do - Diaspora::WebSocket.should_receive(:queue_to_user) - @post.socket_to_uid(@user.id, :aspect_ids => @aspect.id) - end - - describe 'queuing and dequeuing ' do - before do - @channel = Magent::GenericChannel.new('websocket') - @messages = @channel.message_count - @post.socket_to_uid(@user.id, :aspect_ids => @aspect.id) - end - - it 'should send the queued job to Magent' do - @channel.message_count.should == @messages + 1 - end - - it 'should dequeue the job successfully' do - @channel.dequeue - @channel.message_count.should == @messages - end - end - -end