Websocket now uses redis
This commit is contained in:
parent
8de5c5babd
commit
0e77577fbd
6 changed files with 54 additions and 56 deletions
1
Gemfile
1
Gemfile
|
|
@ -40,7 +40,6 @@ gem 'thin'
|
||||||
|
|
||||||
#Websocket
|
#Websocket
|
||||||
gem 'em-websocket', :git => 'git://github.com/igrigorik/em-websocket'
|
gem 'em-websocket', :git => 'git://github.com/igrigorik/em-websocket'
|
||||||
gem 'magent', :git => 'git://github.com/dcu/magent.git'
|
|
||||||
|
|
||||||
#File uploading
|
#File uploading
|
||||||
gem 'carrierwave', :git => 'git://github.com/rsofaer/carrierwave.git' , :branch => 'master' #Untested mongomapper branch
|
gem 'carrierwave', :git => 'git://github.com/rsofaer/carrierwave.git' , :branch => 'master' #Untested mongomapper branch
|
||||||
|
|
|
||||||
10
Gemfile.lock
10
Gemfile.lock
|
|
@ -13,15 +13,6 @@ GIT
|
||||||
devise-mongo_mapper (0.0.1)
|
devise-mongo_mapper (0.0.1)
|
||||||
devise (~> 1.1.0)
|
devise (~> 1.1.0)
|
||||||
|
|
||||||
GIT
|
|
||||||
remote: git://github.com/dcu/magent.git
|
|
||||||
revision: 663d493e0058901d8a2baec00e4d20e867b1359d
|
|
||||||
specs:
|
|
||||||
magent (0.5.2)
|
|
||||||
em-websocket
|
|
||||||
mongo
|
|
||||||
uuidtools
|
|
||||||
|
|
||||||
GIT
|
GIT
|
||||||
remote: git://github.com/iain/http_accept_language.git
|
remote: git://github.com/iain/http_accept_language.git
|
||||||
revision: 0b78aa7849fc90cf9e12586af162fa4c408a795d
|
revision: 0b78aa7849fc90cf9e12586af162fa4c408a795d
|
||||||
|
|
@ -410,7 +401,6 @@ DEPENDENCIES
|
||||||
jasmine!
|
jasmine!
|
||||||
json
|
json
|
||||||
launchy
|
launchy
|
||||||
magent!
|
|
||||||
mini_magick
|
mini_magick
|
||||||
mocha
|
mocha
|
||||||
mongo_mapper!
|
mongo_mapper!
|
||||||
|
|
|
||||||
|
|
@ -4,22 +4,31 @@
|
||||||
|
|
||||||
module Diaspora
|
module Diaspora
|
||||||
module WebSocket
|
module WebSocket
|
||||||
|
def self.redis
|
||||||
|
@redis ||= Resque.redis
|
||||||
|
end
|
||||||
|
def self.length
|
||||||
|
redis.llen :websocket
|
||||||
|
end
|
||||||
def self.queue_to_user(uid, data)
|
def self.queue_to_user(uid, data)
|
||||||
channel = Magent::GenericChannel.new('websocket')
|
redis.lpush(:websocket, {:uid => uid, :data => data}.to_json)
|
||||||
channel.enqueue({:uid => uid, :data => data})
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.initialize_channels
|
def self.initialize_channels
|
||||||
@channels = {}
|
@channels = {}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def self.next
|
||||||
|
redis.rpop(:websocket)
|
||||||
|
end
|
||||||
|
|
||||||
def self.push_to_user(uid, data)
|
def self.push_to_user(uid, data)
|
||||||
Rails.logger.debug "Websocketing to #{uid}"
|
Rails.logger.debug "event=socket-push uid=#{uid}"
|
||||||
@channels[uid.to_id][0].push(data) if @channels[uid.to_id]
|
@channels[uid.to_id][0].push(data) if @channels[uid.to_id]
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.subscribe(uid, ws)
|
def self.subscribe(uid, ws)
|
||||||
Rails.logger.debug "Subscribing socket to #{uid}"
|
Rails.logger.debug "event=socket-subscribe uid=#{uid}"
|
||||||
self.ensure_channel(uid)
|
self.ensure_channel(uid)
|
||||||
@channels[uid][0].subscribe{ |msg| ws.send msg }
|
@channels[uid][0].subscribe{ |msg| ws.send msg }
|
||||||
@channels[uid][1] += 1
|
@channels[uid][1] += 1
|
||||||
|
|
@ -30,7 +39,7 @@ module Diaspora
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.unsubscribe(uid,sid)
|
def self.unsubscribe(uid,sid)
|
||||||
Rails.logger.debug "Unsubscribing socket #{sid} from #{uid}"
|
Rails.logger.debug "event=socket-unsubscribe sid=#{sid} uid=#{uid}"
|
||||||
@channels[uid][0].unsubscribe(sid) if @channels[uid]
|
@channels[uid][0].unsubscribe(sid) if @channels[uid]
|
||||||
@channels[uid][1] -= 1
|
@channels[uid][1] -= 1
|
||||||
if @channels[uid][1] <= 0
|
if @channels[uid][1] <= 0
|
||||||
|
|
|
||||||
|
|
@ -28,10 +28,9 @@ def debug_pp thing
|
||||||
pp thing if APP_CONFIG[:socket_debug] || ENV['SOCKET_DEBUG']
|
pp thing if APP_CONFIG[:socket_debug] || ENV['SOCKET_DEBUG']
|
||||||
end
|
end
|
||||||
|
|
||||||
CHANNEL = Magent::GenericChannel.new('websocket')
|
|
||||||
def process_message
|
def process_message
|
||||||
if CHANNEL.queue_count > 0
|
if Diaspora::WebSocket.length > 0
|
||||||
message = CHANNEL.dequeue
|
message = JSON::parse(Diaspora::WebSocket.next)
|
||||||
if message
|
if message
|
||||||
Diaspora::WebSocket.push_to_user(message['uid'], message['data'])
|
Diaspora::WebSocket.push_to_user(message['uid'], message['data'])
|
||||||
end
|
end
|
||||||
|
|
|
||||||
38
spec/lib/diaspora/web_socket_spec.rb
Normal file
38
spec/lib/diaspora/web_socket_spec.rb
Normal file
|
|
@ -0,0 +1,38 @@
|
||||||
|
# Copyright (c) 2010, Diaspora Inc. This file is
|
||||||
|
# licensed under the Affero General Public License version 3 or later. See
|
||||||
|
# the COPYRIGHT file.
|
||||||
|
|
||||||
|
require 'spec_helper'
|
||||||
|
require 'lib/diaspora/web_socket'
|
||||||
|
describe Diaspora::WebSocket do
|
||||||
|
before do
|
||||||
|
@mock_redis = mock()
|
||||||
|
Diaspora::WebSocket.stub(:redis).and_return @mock_redis
|
||||||
|
end
|
||||||
|
describe '.next' do
|
||||||
|
it 'pops the data from redis' do
|
||||||
|
@mock_redis.should_receive(:rpop).with(:websocket)
|
||||||
|
Diaspora::WebSocket.next
|
||||||
|
end
|
||||||
|
end
|
||||||
|
describe '.queue_to_user' do
|
||||||
|
it 'push the data into redis' do
|
||||||
|
@mock_redis.should_receive(:lpush).with(:websocket, {:uid => "me", :data => "Socket!"}.to_json)
|
||||||
|
Diaspora::WebSocket.queue_to_user("me", "Socket!")
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe Diaspora::Socketable do
|
||||||
|
before do
|
||||||
|
@user = make_user
|
||||||
|
@aspect = @user.aspects.create(:name => "losers")
|
||||||
|
@post = @user.build_post(:status_message, :message => "hey", :to => @aspect.id)
|
||||||
|
@post.save
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'sockets to a user' do
|
||||||
|
Diaspora::WebSocket.should_receive(:queue_to_user)
|
||||||
|
@post.socket_to_uid(@user.id, :aspect_ids => @aspect.id)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
@ -1,37 +0,0 @@
|
||||||
# Copyright (c) 2010, Diaspora Inc. This file is
|
|
||||||
# licensed under the Affero General Public License version 3 or later. See
|
|
||||||
# the COPYRIGHT file.
|
|
||||||
|
|
||||||
require 'spec_helper'
|
|
||||||
|
|
||||||
describe Diaspora::WebSocket do
|
|
||||||
before do
|
|
||||||
@user = make_user
|
|
||||||
@aspect = @user.aspects.create(:name => "losers")
|
|
||||||
@post = @user.build_post(:status_message, :message => "hey", :to => @aspect.id)
|
|
||||||
@post.save
|
|
||||||
end
|
|
||||||
|
|
||||||
it 'should queue a job' do
|
|
||||||
Diaspora::WebSocket.should_receive(:queue_to_user)
|
|
||||||
@post.socket_to_uid(@user.id, :aspect_ids => @aspect.id)
|
|
||||||
end
|
|
||||||
|
|
||||||
describe 'queuing and dequeuing ' do
|
|
||||||
before do
|
|
||||||
@channel = Magent::GenericChannel.new('websocket')
|
|
||||||
@messages = @channel.message_count
|
|
||||||
@post.socket_to_uid(@user.id, :aspect_ids => @aspect.id)
|
|
||||||
end
|
|
||||||
|
|
||||||
it 'should send the queued job to Magent' do
|
|
||||||
@channel.message_count.should == @messages + 1
|
|
||||||
end
|
|
||||||
|
|
||||||
it 'should dequeue the job successfully' do
|
|
||||||
@channel.dequeue
|
|
||||||
@channel.message_count.should == @messages
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
end
|
|
||||||
Loading…
Reference in a new issue