From 8d12a57f374354fd3163c52d630c30c995183b22 Mon Sep 17 00:00:00 2001 From: Ilya Zhitomirskiy Date: Thu, 29 Sep 2011 16:00:31 -0700 Subject: [PATCH 1/7] reding from redis cache --- Gemfile | 1 + Gemfile.lock | 2 + lib/diaspora/redis_cache.rb | 33 ++++++++++++ lib/diaspora/user/querying.rb | 73 ++++++++++++++++++--------- spec/lib/diaspora/redis_cache_spec.rb | 62 +++++++++++++++++++++++ spec/models/user/querying_spec.rb | 72 +++++++++++++++++++++----- 6 files changed, 205 insertions(+), 38 deletions(-) create mode 100644 lib/diaspora/redis_cache.rb create mode 100644 spec/lib/diaspora/redis_cache_spec.rb diff --git a/Gemfile b/Gemfile index 89d4d823b..ad45991dc 100644 --- a/Gemfile +++ b/Gemfile @@ -126,6 +126,7 @@ group :test do gem "selenium-webdriver", "~> 2.7.0" gem 'webmock', :require => false gem 'sqlite3' + gem 'mock_redis' end group :development do diff --git a/Gemfile.lock b/Gemfile.lock index c048854c2..b09c94341 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -256,6 +256,7 @@ GEM mobile-fu (0.2.1) rack-mobile-detect rails + mock_redis (0.2.0) moneta (0.6.0) mongrel (1.1.5) cgi_multipart_eof_fix (>= 2.4) @@ -506,6 +507,7 @@ DEPENDENCIES linecache (= 0.43) mini_magick (= 3.2) mobile-fu + mock_redis mongrel mysql2 (= 0.2.13) newrelic_rpm diff --git a/lib/diaspora/redis_cache.rb b/lib/diaspora/redis_cache.rb new file mode 100644 index 000000000..b5b8d7353 --- /dev/null +++ b/lib/diaspora/redis_cache.rb @@ -0,0 +1,33 @@ +# Copyright (c) 2010-2011, Diaspora Inc. This file is +# licensed under the Affero General Public License version 3 or later. See +# the COPYRIGHT file. +# + +class RedisCache + def initialize(user_id, order) + @user_id = user_id + @order = order + self + end + + # @return [Boolean] + def cache_exists? + redis.zcard(set_key) != 0 + end + + def post_ids(time=Time.now, limit=15) + post_ids = redis.zrevrangebyscore(set_key, time.to_i, "-inf") + post_ids[0...limit] + end + + protected + # @return [Redis] + def redis + @redis ||= Redis.new + end + + # @return [String] + def set_key + @set_key ||= "cache_stream_#{@user_id}_#{@order}" + end +end diff --git a/lib/diaspora/user/querying.rb b/lib/diaspora/user/querying.rb index 0cc63f481..437bc718d 100644 --- a/lib/diaspora/user/querying.rb +++ b/lib/diaspora/user/querying.rb @@ -2,6 +2,8 @@ # licensed under the Affero General Public License version 3 or later. See # the COPYRIGHT file. +require 'lib/diaspora/redis_cache' + module Diaspora module UserModules module Querying @@ -13,21 +15,32 @@ module Diaspora post ||= Post.where(key => id, :public => true).where(opts).first end - def visible_posts(opts = {}) - defaults = { - :type => ['StatusMessage', 'Photo'], - :order => 'updated_at DESC', - :limit => 15, - :hidden => false - } - opts = defaults.merge(opts) + def visible_posts(opts={}) + opts = prep_opts(opts) + post_ids = visible_post_ids(opts) + Post.where(:id => post_ids).select('DISTINCT posts.*').limit(opts[:limit]).order(opts[:order_with_table]) + end - order_field = opts[:order].split.first.to_sym - order_with_table = 'posts.' + opts[:order] + def visible_post_ids(opts={}) + opts = prep_opts(opts) - opts[:max_time] = Time.at(opts[:max_time]) if opts[:max_time].is_a?(Integer) - opts[:max_time] ||= Time.now + 1 + if AppConfig[:redis_cache] + cache = RedisCache.new(self.id, opts[:order_field]) + if cache.cache_exists? + post_ids = cache.post_ids(opts[:max_time], opts[:limit]) + end + end + if post_ids.blank? || post_ids.length < opts[:limit] + visible_ids_from_sql(opts) + else + post_ids + end + end + + # @return [Array] + def visible_ids_from_sql(opts={}) + opts = prep_opts(opts) select_clause ='DISTINCT posts.id, posts.updated_at AS updated_at, posts.created_at AS created_at' posts_from_others = Post.joins(:contacts).where( :pending => false, :type => opts[:type], :post_visibilities => {:hidden => opts[:hidden]}, :contacts => {:user_id => self.id}) @@ -39,20 +52,12 @@ module Diaspora posts_from_self = posts_from_self.joins(:aspect_visibilities).where(:aspect_visibilities => {:aspect_id => opts[:by_members_of]}) end - unless sqlite? - posts_from_others = posts_from_others.select(select_clause).order(order_with_table).where(Post.arel_table[order_field].lt(opts[:max_time])) - posts_from_self = posts_from_self.select(select_clause).order(order_with_table).where(Post.arel_table[order_field].lt(opts[:max_time])) + posts_from_others = posts_from_others.select(select_clause).order(opts[:order_with_table]).where(Post.arel_table[opts[:order_field]].lt(opts[:max_time])) + posts_from_self = posts_from_self.select(select_clause).order(opts[:order_with_table]).where(Post.arel_table[opts[:order_field]].lt(opts[:max_time])) - all_posts = "(#{posts_from_others.to_sql} LIMIT #{opts[:limit]}) UNION ALL (#{posts_from_self.to_sql} LIMIT #{opts[:limit]}) ORDER BY #{opts[:order]} LIMIT #{opts[:limit]}" - else - posts_from_others = posts_from_others.select(select_clause) - posts_from_self = posts_from_self.select(select_clause) - all_posts = "#{posts_from_others.to_sql} UNION ALL #{posts_from_self.to_sql} ORDER BY #{opts[:order]} LIMIT #{opts[:limit]}" - end + all_posts = "(#{posts_from_others.to_sql} LIMIT #{opts[:limit]}) UNION ALL (#{posts_from_self.to_sql} LIMIT #{opts[:limit]}) ORDER BY #{opts[:order]} LIMIT #{opts[:limit]}" - post_ids = Post.connection.select_values(all_posts) - - Post.where(:id => post_ids).select('DISTINCT posts.*').limit(opts[:limit]).order(order_with_table) + Post.connection.select_values(all_posts) end def contact_for(person) @@ -112,6 +117,26 @@ module Diaspora Post.where(:id => post_ids, :pending => false).select('DISTINCT posts.*').order("posts.created_at DESC") end + + protected + + # @return [Hash] + def prep_opts(opts) + defaults = { + :type => ['StatusMessage', 'Photo'], + :order => 'updated_at DESC', + :limit => 15, + :hidden => false + } + opts = defaults.merge(opts) + + opts[:order_field] = opts[:order].split.first.to_sym + opts[:order_with_table] = 'posts.' + opts[:order] + + opts[:max_time] = Time.at(opts[:max_time]) if opts[:max_time].is_a?(Integer) + opts[:max_time] ||= Time.now + 1 + opts + end end end end diff --git a/spec/lib/diaspora/redis_cache_spec.rb b/spec/lib/diaspora/redis_cache_spec.rb new file mode 100644 index 000000000..6e5ed758d --- /dev/null +++ b/spec/lib/diaspora/redis_cache_spec.rb @@ -0,0 +1,62 @@ +# Copyright (c) 2010-2011, Diaspora Inc. This file is +# licensed under the Affero General Public License version 3 or later. See +# the COPYRIGHT file. + +require 'spec_helper' +require 'diaspora/redis_cache' + +describe RedisCache do + before do + @redis = MockRedis.new + @cache = RedisCache.new(bob.id, "created_at") + @cache.stub(:redis).and_return(@redis) + end + + it 'gets initialized with user_id and an order field' do + cache = RedisCache.new(bob.id, "updated_at") + [:@user_id, :@order].each do |var| + cache.instance_variable_get(var).should_not be_blank + end + end + + describe "#cache_exists?" do + it 'returns true if the sorted set exists' do + timestamp = Time.now.to_i + @redis.zadd("cache_stream_#{@bob.id}_created_at", timestamp, "post_1") + + @cache.cache_exists?.should be_true + end + + it 'returns false if there is nothing in the set' do + @cache.cache_exists?.should be_false + end + end + + describe "#post_ids" do + before do + @timestamps = [] + @timestamp = Time.now.to_i + 30.times do |n| + created_time = @timestamp - n*1000 + @redis.zadd("cache_stream_#{@bob.id}_created_at", created_time, n) + @timestamps << created_time + end + end + + it 'returns the most recent post ids (default created at, limit 15)' do + @cache.post_ids.should =~ 15.times.map {|n| n} + end + + it 'returns posts ids after the specified time' do + @cache.post_ids(@timestamps[15]).should =~ (15...30).map {|n| n} + end + + it 'returns post ids with a non-default limit' do + @cache.post_ids(@timestamp, 20).should =~ 20.times.map {|n| n} + end + end + + describe "#populate" + describe "#add" + describe "#remove" +end diff --git a/spec/models/user/querying_spec.rb b/spec/models/user/querying_spec.rb index e62110531..7ad9edd60 100644 --- a/spec/models/user/querying_spec.rb +++ b/spec/models/user/querying_spec.rb @@ -12,49 +12,49 @@ describe User do @bobs_aspect = bob.aspects.where(:name => "generic").first end - describe "#visible_posts" do + describe "#visible_post_ids" do it "contains your public posts" do public_post = alice.post(:status_message, :text => "hi", :to => @alices_aspect.id, :public => true) - alice.visible_posts.should include(public_post) + alice.visible_post_ids.should include(public_post.id) end it "contains your non-public posts" do private_post = alice.post(:status_message, :text => "hi", :to => @alices_aspect.id, :public => false) - alice.visible_posts.should include(private_post) + alice.visible_post_ids.should include(private_post.id) end it "contains public posts from people you're following" do dogs = bob.aspects.create(:name => "dogs") bobs_public_post = bob.post(:status_message, :text => "hello", :public => true, :to => dogs.id) - alice.visible_posts.should include(bobs_public_post) + alice.visible_post_ids.should include(bobs_public_post.id) end it "contains non-public posts from people who are following you" do bobs_post = bob.post(:status_message, :text => "hello", :to => @bobs_aspect.id) - alice.visible_posts.should include(bobs_post) + alice.visible_post_ids.should include(bobs_post.id) end it "does not contain non-public posts from aspects you're not in" do dogs = bob.aspects.create(:name => "dogs") invisible_post = bob.post(:status_message, :text => "foobar", :to => dogs.id) - alice.visible_posts.should_not include(invisible_post) + alice.visible_post_ids.should_not include(invisible_post.id) end it "does not contain pending posts" do pending_post = bob.post(:status_message, :text => "hey", :public => true, :to => @bobs_aspect.id, :pending => true) pending_post.should be_pending - alice.visible_posts.should_not include pending_post + alice.visible_post_ids.should_not include pending_post.id end it "does not contain pending photos" do pending_photo = bob.post(:photo, :pending => true, :user_file=> File.open(photo_fixture_name), :to => @bobs_aspect) - alice.visible_posts.should_not include pending_photo + alice.visible_post_ids.should_not include pending_photo.id end it "respects the :type option" do photo = bob.post(:photo, :pending => false, :user_file=> File.open(photo_fixture_name), :to => @bobs_aspect) - alice.visible_posts.should include(photo) - alice.visible_posts(:type => 'StatusMessage').should_not include(photo) + alice.visible_post_ids.should include(photo.id) + alice.visible_post_ids(:type => 'StatusMessage').should_not include(photo.id) end it "does not contain duplicate posts" do @@ -64,8 +64,8 @@ describe User do bobs_post = bob.post(:status_message, :text => "hai to all my people", :to => [@bobs_aspect.id, bobs_other_aspect.id]) - alice.visible_posts.length.should == 1 - alice.visible_posts.should include(bobs_post) + alice.visible_post_ids.length.should == 1 + alice.visible_post_ids.should include(bobs_post.id) end describe 'hidden posts' do @@ -76,15 +76,58 @@ describe User do end it "pulls back non hidden posts" do - alice.visible_posts.include?(@status).should be_true + alice.visible_post_ids.include?(@status.id).should be_true end it "does not pull back hidden posts" do @vis.update_attributes(:hidden => true) - alice.visible_posts.include?(@status).should be_false + alice.visible_post_ids.include?(@status.id).should be_false end end + context "RedisCache" do + before do + AppConfig[:redis_cache] = true + end + + context 'empty cache' do + it "does not read from the cache" do + cache = mock(:cache_exists? => false) + RedisCache.stub(:new).and_return(cache) + cache.should_not_receive(:post_ids) + + alice.visible_post_ids + end + + it "is populated" do + end + end + + context 'populated cache' do + before do + @cache = mock(:cache_exists? => true) + RedisCache.stub(:new).and_return(@cache) + end + it "reads from the cache" do + @cache.stub(:post_ids).and_return([1,2,3]) + + alice.visible_post_ids(:limit => 3).should == [1,2,3] + end + + it "queries if maxtime is later than the last cached post" do + @cache.stub(:post_ids).and_return([]) + alice.should_receive(:visible_ids_from_sql) + + alice.visible_post_ids + end + + it "does not get repopulated" do + end + end + end + end + + describe "#visible_posts" do context 'with many posts' do before do bob.move_contact(eve.person, @bobs_aspect, bob.aspects.create(:name => 'new aspect')) @@ -101,6 +144,7 @@ describe User do end end end + it 'works' do # The set up takes a looong time, so to save time we do several tests in one bob.visible_posts.length.should == 15 #it returns 15 by default bob.visible_posts.should == bob.visible_posts(:by_members_of => bob.aspects.map { |a| a.id }) # it is the same when joining through aspects From 01515725fe00e5b6f8d3de87bbdfbb909cd2d377 Mon Sep 17 00:00:00 2001 From: Ilya Zhitomirskiy Date: Thu, 29 Sep 2011 18:59:04 -0700 Subject: [PATCH 2/7] WIP trim is still needed, also possible weirdness with the mock --- lib/diaspora/redis_cache.rb | 65 ++++++++++++++++-- lib/diaspora/user/querying.rb | 19 ++--- spec/lib/diaspora/redis_cache_spec.rb | 99 ++++++++++++++++++++++++--- spec/models/user/querying_spec.rb | 34 +++++---- 4 files changed, 180 insertions(+), 37 deletions(-) diff --git a/lib/diaspora/redis_cache.rb b/lib/diaspora/redis_cache.rb index b5b8d7353..cf0b87eeb 100644 --- a/lib/diaspora/redis_cache.rb +++ b/lib/diaspora/redis_cache.rb @@ -4,15 +4,24 @@ # class RedisCache - def initialize(user_id, order) - @user_id = user_id - @order = order + + SUPPORTED_CACHES = [:created_at] #['updated_at', + CACHE_LIMIT = 100 + + def initialize(user, order_field) + @user = user + @order_field = order_field.to_s self end # @return [Boolean] def cache_exists? - redis.zcard(set_key) != 0 + self.size != 0 + end + + # @return [Integer] the cardinality of the redis set + def size + redis.zcard(set_key) end def post_ids(time=Time.now, limit=15) @@ -20,6 +29,52 @@ class RedisCache post_ids[0...limit] end + # @return [RedisCache] self + def ensure_populated! + self.repopulate! unless cache_exists? + self + end + + # @return [RedisCache] self + def repopulate! + self.populate! && self.trim! + self + end + + # @return [RedisCache] self + def populate! + # user executes query and gets back hashes + sql = @user.visible_posts_sql(:limit => CACHE_LIMIT, :order => self.order) + hashes = Post.connection.select_all(sql) + + # hashes are inserted into set in a single transaction + redis.multi do + hashes.each do |h| + self.redis.zadd(set_key, h[@order_field], h["id"]) + end + end + + self + end + + # @return [RedisCache] self + def trim! + puts "cache limit #{CACHE_LIMIT}" + puts "cache size #{self.size}" + self.redis.zremrangebyrank(set_key, 0, -(CACHE_LIMIT+1)) + self + end + + # @param order [Symbol, String] + # @return [Boolean] + def self.supported_order?(order) + SUPPORTED_CACHES.include?(order.to_sym) + end + + def order + "#{@order_field} DESC" + end + protected # @return [Redis] def redis @@ -28,6 +83,6 @@ class RedisCache # @return [String] def set_key - @set_key ||= "cache_stream_#{@user_id}_#{@order}" + @set_key ||= "cache_stream_#{@user.id}_#{@order_field}" end end diff --git a/lib/diaspora/user/querying.rb b/lib/diaspora/user/querying.rb index 437bc718d..3c53b030d 100644 --- a/lib/diaspora/user/querying.rb +++ b/lib/diaspora/user/querying.rb @@ -24,11 +24,11 @@ module Diaspora def visible_post_ids(opts={}) opts = prep_opts(opts) - if AppConfig[:redis_cache] - cache = RedisCache.new(self.id, opts[:order_field]) - if cache.cache_exists? - post_ids = cache.post_ids(opts[:max_time], opts[:limit]) - end + if AppConfig[:redis_cache] && RedisCache.supported_order?(opts[:order_field]) + cache = RedisCache.new(self, opts[:order_field]) + + cache.ensure_populated! + post_ids = cache.post_ids(opts[:max_time], opts[:limit]) end if post_ids.blank? || post_ids.length < opts[:limit] @@ -40,6 +40,11 @@ module Diaspora # @return [Array] def visible_ids_from_sql(opts={}) + opts = prep_opts(opts) + Post.connection.select_values(visible_posts_sql(opts)) + end + + def visible_posts_sql(opts={}) opts = prep_opts(opts) select_clause ='DISTINCT posts.id, posts.updated_at AS updated_at, posts.created_at AS created_at' @@ -55,9 +60,7 @@ module Diaspora posts_from_others = posts_from_others.select(select_clause).order(opts[:order_with_table]).where(Post.arel_table[opts[:order_field]].lt(opts[:max_time])) posts_from_self = posts_from_self.select(select_clause).order(opts[:order_with_table]).where(Post.arel_table[opts[:order_field]].lt(opts[:max_time])) - all_posts = "(#{posts_from_others.to_sql} LIMIT #{opts[:limit]}) UNION ALL (#{posts_from_self.to_sql} LIMIT #{opts[:limit]}) ORDER BY #{opts[:order]} LIMIT #{opts[:limit]}" - - Post.connection.select_values(all_posts) + "(#{posts_from_others.to_sql} LIMIT #{opts[:limit]}) UNION ALL (#{posts_from_self.to_sql} LIMIT #{opts[:limit]}) ORDER BY #{opts[:order]} LIMIT #{opts[:limit]}" end def contact_for(person) diff --git a/spec/lib/diaspora/redis_cache_spec.rb b/spec/lib/diaspora/redis_cache_spec.rb index 6e5ed758d..f26d9a5dc 100644 --- a/spec/lib/diaspora/redis_cache_spec.rb +++ b/spec/lib/diaspora/redis_cache_spec.rb @@ -3,18 +3,20 @@ # the COPYRIGHT file. require 'spec_helper' -require 'diaspora/redis_cache' describe RedisCache do before do - @redis = MockRedis.new - @cache = RedisCache.new(bob.id, "created_at") - @cache.stub(:redis).and_return(@redis) + #@redis = MockRedis.new + @redis = Redis.new + @redis.keys.each{|p| @redis.del(p)} + + @cache = RedisCache.new(bob, :created_at) + #@cache.stub(:redis).and_return(@redis) end - it 'gets initialized with user_id and an order field' do - cache = RedisCache.new(bob.id, "updated_at") - [:@user_id, :@order].each do |var| + it 'gets initialized with user and an created_at order' do + cache = RedisCache.new(bob, :created_at) + [:@user, :@order_field].each do |var| cache.instance_variable_get(var).should_not be_blank end end @@ -22,7 +24,7 @@ describe RedisCache do describe "#cache_exists?" do it 'returns true if the sorted set exists' do timestamp = Time.now.to_i - @redis.zadd("cache_stream_#{@bob.id}_created_at", timestamp, "post_1") + @redis.zadd("cache_stream_#{bob.id}_created_at", timestamp, "post_1") @cache.cache_exists?.should be_true end @@ -38,7 +40,7 @@ describe RedisCache do @timestamp = Time.now.to_i 30.times do |n| created_time = @timestamp - n*1000 - @redis.zadd("cache_stream_#{@bob.id}_created_at", created_time, n) + @redis.zadd("cache_stream_#{bob.id}_created_at", created_time, n) @timestamps << created_time end end @@ -56,7 +58,84 @@ describe RedisCache do end end - describe "#populate" + describe "#ensure_populated!" do + it 'does nothing if the cache is populated' do + @cache.stub(:cache_exists?).and_return(true) + @cache.should_not_receive(:repopulate!) + + @cache.ensure_populated! + end + + it 'clears and poplulates if the cache is not populated' do + @cache.stub(:cache_exists?).and_return(false) + @cache.should_receive(:repopulate!) + + @cache.ensure_populated! + end + end + + describe "#repopulate!" do + it 'populates' do + @cache.stub(:trim!).and_return(true) + @cache.should_receive(:populate!).and_return(true) + @cache.repopulate! + end + + it 'trims' do + @cache.stub(:populate!).and_return(true) + @cache.should_receive(:trim!) + @cache.repopulate! + end + end + + describe "#populate!" do + it 'queries the db with the visible post sql string' do + sql = "long_sql" + order = "created_at DESC" + @cache.should_receive(:order).and_return(order) + bob.should_receive(:visible_posts_sql).with(hash_including( + :limit => 100, + :order => order)). + and_return(sql) + + Post.connection.should_receive(:select_all).with(sql).and_return([]) + + @cache.populate! + end + + it 'adds the post from the hash to the cache' + end + + describe "#trim!" do + it 'does nothing if the set is smaller than the cache limit' do + @timestamps = [] + @timestamp = Time.now.to_i + 30.times do |n| + created_time = @timestamp - n*1000 + @redis.zadd("cache_stream_#{bob.id}_created_at", created_time, n) + @timestamps << created_time + end + + post_ids = @cache.post_ids(Time.now.to_i, @cache.size) + @cache.trim! + @cache.post_ids(Time.now.to_i, @cache.size).should == post_ids + end + + it 'trims the set to the cache limit' do + @timestamps = [] + @timestamp = Time.now.to_i + 120.times do |n| + created_time = @timestamp - n*1000 + @redis.zadd("cache_stream_#{bob.id}_created_at", created_time, n) + @timestamps << created_time + end + + post_ids = 100.times.map{|n| n} + @cache.trim! + @cache.post_ids(Time.now.to_i, @cache.size).should == post_ids[0...100] + end + end + describe "#add" describe "#remove" end diff --git a/spec/models/user/querying_spec.rb b/spec/models/user/querying_spec.rb index 7ad9edd60..0cdc999b4 100644 --- a/spec/models/user/querying_spec.rb +++ b/spec/models/user/querying_spec.rb @@ -88,37 +88,43 @@ describe User do context "RedisCache" do before do AppConfig[:redis_cache] = true + @opts = {:order => "created_at DESC"} end - context 'empty cache' do - it "does not read from the cache" do - cache = mock(:cache_exists? => false) - RedisCache.stub(:new).and_return(cache) - cache.should_not_receive(:post_ids) + after do + AppConfig[:redis_cache] = nil + end - alice.visible_post_ids - end + it "gets populated with latest 100 posts" do + cache = mock(:cache_exists? => true, :ensure_populated! => mock, :post_ids => []) + RedisCache.stub(:new).and_return(cache) + cache.should_receive(:ensure_populated!) - it "is populated" do - end + alice.visible_post_ids(@opts) + end + + describe "#ensure_populated_cache" do + it 'does nothing if the cache is already populated' + it 're-populates the cache with the latest posts (in hashes)' end context 'populated cache' do before do - @cache = mock(:cache_exists? => true) + @cache = mock(:cache_exists? => true, :ensure_populated! => mock) RedisCache.stub(:new).and_return(@cache) end - it "reads from the cache" do - @cache.stub(:post_ids).and_return([1,2,3]) - alice.visible_post_ids(:limit => 3).should == [1,2,3] + it "reads from the cache" do + @cache.should_receive(:post_ids).and_return([1,2,3]) + + alice.visible_post_ids(@opts.merge({:limit => 3})).should == [1,2,3] end it "queries if maxtime is later than the last cached post" do @cache.stub(:post_ids).and_return([]) alice.should_receive(:visible_ids_from_sql) - alice.visible_post_ids + alice.visible_post_ids(@opts) end it "does not get repopulated" do From 289d582ce99d67299c67285eb016e16a6a0c2e79 Mon Sep 17 00:00:00 2001 From: Ilya Zhitomirskiy Date: Fri, 30 Sep 2011 11:09:30 -0700 Subject: [PATCH 3/7] redis cache populates --- lib/diaspora/redis_cache.rb | 14 +------------- spec/lib/diaspora/redis_cache_spec.rb | 14 +++++++------- 2 files changed, 8 insertions(+), 20 deletions(-) diff --git a/lib/diaspora/redis_cache.rb b/lib/diaspora/redis_cache.rb index cf0b87eeb..3620ee222 100644 --- a/lib/diaspora/redis_cache.rb +++ b/lib/diaspora/redis_cache.rb @@ -11,7 +11,6 @@ class RedisCache def initialize(user, order_field) @user = user @order_field = order_field.to_s - self end # @return [Boolean] @@ -29,19 +28,14 @@ class RedisCache post_ids[0...limit] end - # @return [RedisCache] self def ensure_populated! self.repopulate! unless cache_exists? - self end - # @return [RedisCache] self def repopulate! self.populate! && self.trim! - self end - # @return [RedisCache] self def populate! # user executes query and gets back hashes sql = @user.visible_posts_sql(:limit => CACHE_LIMIT, :order => self.order) @@ -50,19 +44,13 @@ class RedisCache # hashes are inserted into set in a single transaction redis.multi do hashes.each do |h| - self.redis.zadd(set_key, h[@order_field], h["id"]) + self.redis.zadd(set_key, h[@order_field].to_i, h["id"]) end end - - self end - # @return [RedisCache] self def trim! - puts "cache limit #{CACHE_LIMIT}" - puts "cache size #{self.size}" self.redis.zremrangebyrank(set_key, 0, -(CACHE_LIMIT+1)) - self end # @param order [Symbol, String] diff --git a/spec/lib/diaspora/redis_cache_spec.rb b/spec/lib/diaspora/redis_cache_spec.rb index f26d9a5dc..a38e2107a 100644 --- a/spec/lib/diaspora/redis_cache_spec.rb +++ b/spec/lib/diaspora/redis_cache_spec.rb @@ -40,21 +40,21 @@ describe RedisCache do @timestamp = Time.now.to_i 30.times do |n| created_time = @timestamp - n*1000 - @redis.zadd("cache_stream_#{bob.id}_created_at", created_time, n) + @redis.zadd("cache_stream_#{bob.id}_created_at", created_time, n.to_s) @timestamps << created_time end end it 'returns the most recent post ids (default created at, limit 15)' do - @cache.post_ids.should =~ 15.times.map {|n| n} + @cache.post_ids.should =~ 15.times.map {|n| n.to_s} end it 'returns posts ids after the specified time' do - @cache.post_ids(@timestamps[15]).should =~ (15...30).map {|n| n} + @cache.post_ids(@timestamps[15]).should =~ (15...30).map {|n| n.to_s} end it 'returns post ids with a non-default limit' do - @cache.post_ids(@timestamp, 20).should =~ 20.times.map {|n| n} + @cache.post_ids(@timestamp, 20).should =~ 20.times.map {|n| n.to_s} end end @@ -112,7 +112,7 @@ describe RedisCache do @timestamp = Time.now.to_i 30.times do |n| created_time = @timestamp - n*1000 - @redis.zadd("cache_stream_#{bob.id}_created_at", created_time, n) + @redis.zadd("cache_stream_#{bob.id}_created_at", created_time, n.to_s) @timestamps << created_time end @@ -126,11 +126,11 @@ describe RedisCache do @timestamp = Time.now.to_i 120.times do |n| created_time = @timestamp - n*1000 - @redis.zadd("cache_stream_#{bob.id}_created_at", created_time, n) + @redis.zadd("cache_stream_#{bob.id}_created_at", created_time, n.to_s) @timestamps << created_time end - post_ids = 100.times.map{|n| n} + post_ids = 100.times.map{|n| n.to_s} @cache.trim! @cache.post_ids(Time.now.to_i, @cache.size).should == post_ids[0...100] end From 4a8aea9e784446ef6c2af9f8f95e28d7ac8e70be Mon Sep 17 00:00:00 2001 From: Ilya Zhitomirskiy Date: Fri, 30 Sep 2011 16:11:31 -0700 Subject: [PATCH 4/7] changed back to mock redis --- spec/lib/diaspora/redis_cache_spec.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/spec/lib/diaspora/redis_cache_spec.rb b/spec/lib/diaspora/redis_cache_spec.rb index a38e2107a..790c02574 100644 --- a/spec/lib/diaspora/redis_cache_spec.rb +++ b/spec/lib/diaspora/redis_cache_spec.rb @@ -6,12 +6,12 @@ require 'spec_helper' describe RedisCache do before do - #@redis = MockRedis.new - @redis = Redis.new - @redis.keys.each{|p| @redis.del(p)} + @redis = MockRedis.new + #@redis = Redis.new + #@redis.keys.each{|p| @redis.del(p)} @cache = RedisCache.new(bob, :created_at) - #@cache.stub(:redis).and_return(@redis) + @cache.stub(:redis).and_return(@redis) end it 'gets initialized with user and an created_at order' do From 7926ebfb537e11953461c45ea4c023124b2a8a74 Mon Sep 17 00:00:00 2001 From: Ilya Zhitomirskiy Date: Fri, 30 Sep 2011 16:45:01 -0700 Subject: [PATCH 5/7] subclassing receivers, renamed perform to perform [bang] --- lib/postzord/receiver/local_batch.rb | 122 +++++++------- lib/postzord/receiver/private.rb | 175 ++++++++++----------- lib/postzord/receiver/public.rb | 101 ++++++------ spec/integration/attack_vectors_spec.rb | 32 ++-- spec/integration/receiving_spec.rb | 2 +- spec/lib/postzord/receiver/private_spec.rb | 10 +- 6 files changed, 220 insertions(+), 222 deletions(-) diff --git a/lib/postzord/receiver/local_batch.rb b/lib/postzord/receiver/local_batch.rb index 121f04fa9..f2da0380d 100644 --- a/lib/postzord/receiver/local_batch.rb +++ b/lib/postzord/receiver/local_batch.rb @@ -1,71 +1,75 @@ -module Postzord - module Receiver - class LocalBatch - attr_reader :object, :recipient_user_ids, :users +# Copyright (c) 2010-2011, Diaspora Inc. This file is +# licensed under the Affero General Public License version 3 or later. See +# the COPYRIGHT file. - def initialize(object, recipient_user_ids) - @object = object - @recipient_user_ids = recipient_user_ids - @users = User.where(:id => @recipient_user_ids) - end +class Postzord::Receiver::LocalBatch < Postzord::Receiver - def perform! - if @object.respond_to?(:relayable?) - receive_relayable - else - create_post_visibilities - end - notify_mentioned_users if @object.respond_to?(:mentions) + attr_reader :object, :recipient_user_ids, :users - # 09/27/11 this is slow - #socket_to_users if @object.respond_to?(:socket_to_user) - notify_users - end + def initialize(object, recipient_user_ids) + @object = object + @recipient_user_ids = recipient_user_ids + @users = User.where(:id => @recipient_user_ids) + end - # NOTE(copied over from receiver public) - # @return [Object] - def receive_relayable - if @object.parent.author.local? - # receive relayable object only for the owner of the parent object - @object.receive(@object.parent.author.owner) - end - @object - end + def perform! + if @object.respond_to?(:relayable?) + receive_relayable + else + create_post_visibilities - # Batch import post visibilities for the recipients of the given @object - # @note performs a bulk insert into mySQL - # @return [void] - def create_post_visibilities - contacts_ids = Contact.connection.select_values(Contact.where(:user_id => @recipient_user_ids, :person_id => @object.author_id).select("id").to_sql) - PostVisibility.batch_import(contacts_ids, object) - end + # if caching enabled, add to cache - # Notify any mentioned users within the @object's text - # @return [void] - def notify_mentioned_users - @object.mentions.each do |mention| - mention.notify_recipient - end - end + end + notify_mentioned_users if @object.respond_to?(:mentions) - #NOTE(these methods should be in their own module, included in this class) + # 09/27/11 this is slow + #socket_to_users if @object.respond_to?(:socket_to_user) + notify_users + end - # Issue websocket requests to all specified recipients - # @return [void] - def socket_to_users - @users.each do |user| - @object.socket_to_user(user) - end - end + # NOTE(copied over from receiver public) + # @return [Object] + def receive_relayable + if @object.parent.author.local? + # receive relayable object only for the owner of the parent object + @object.receive(@object.parent.author.owner) + end + @object + end - # Notify users of the new object - # return [void] - def notify_users - return unless @object.respond_to?(:notification_type) - @users.each do |user| - Notification.notify(user, @object, @object.author) - end - end + # Batch import post visibilities for the recipients of the given @object + # @note performs a bulk insert into mySQL + # @return [void] + def create_post_visibilities + contacts_ids = Contact.connection.select_values(Contact.where(:user_id => @recipient_user_ids, :person_id => @object.author_id).select("id").to_sql) + PostVisibility.batch_import(contacts_ids, object) + end + + # Notify any mentioned users within the @object's text + # @return [void] + def notify_mentioned_users + @object.mentions.each do |mention| + mention.notify_recipient + end + end + + #NOTE(these methods should be in their own module, included in this class) + + # Issue websocket requests to all specified recipients + # @return [void] + def socket_to_users + @users.each do |user| + @object.socket_to_user(user) + end + end + + # Notify users of the new object + # return [void] + def notify_users + return unless @object.respond_to?(:notification_type) + @users.each do |user| + Notification.notify(user, @object, @object.author) end end end diff --git a/lib/postzord/receiver/private.rb b/lib/postzord/receiver/private.rb index 2ab9ed6e6..03c799928 100644 --- a/lib/postzord/receiver/private.rb +++ b/lib/postzord/receiver/private.rb @@ -1,117 +1,114 @@ # Copyright (c) 2010-2011, Diaspora Inc. This file is # licensed under the Affero General Public License version 3 or later. See # the COPYRIGHT file. -# + require File.join(Rails.root, 'lib/webfinger') require File.join(Rails.root, 'lib/diaspora/parser') -module Postzord - module Receiver - class Private - def initialize(user, opts={}) - @user = user - @user_person = @user.person - @salmon_xml = opts[:salmon_xml] +class Postzord::Receiver::Private < Postzord::Receiver - @sender = opts[:person] || Webfinger.new(self.salmon.author_id).fetch - @author = @sender + def initialize(user, opts={}) + @user = user + @user_person = @user.person + @salmon_xml = opts[:salmon_xml] - @object = opts[:object] - end + @sender = opts[:person] || Webfinger.new(self.salmon.author_id).fetch + @author = @sender - def perform - if @sender && self.salmon.verified_for_key?(@sender.public_key) - parse_and_receive(salmon.parsed_data) - else - Rails.logger.info("event=receive status=abort recipient=#{@user.diaspora_handle} sender=#{@salmon.author_id} reason='not_verified for key'") - nil - end - end + @object = opts[:object] + end - def parse_and_receive(xml) - @object ||= Diaspora::Parser.from_xml(xml) + def perform! + if @sender && self.salmon.verified_for_key?(@sender.public_key) + parse_and_receive(salmon.parsed_data) + else + Rails.logger.info("event=receive status=abort recipient=#{@user.diaspora_handle} sender=#{@salmon.author_id} reason='not_verified for key'") + nil + end + end - Rails.logger.info("event=receive status=start recipient=#{@user_person.diaspora_handle} payload_type=#{@object.class} sender=#{@sender.diaspora_handle}") + def parse_and_receive(xml) + @object ||= Diaspora::Parser.from_xml(xml) - if self.validate_object - set_author! - receive_object - else - raise "not a valid object:#{@object.inspect}" - end - end + Rails.logger.info("event=receive status=start recipient=#{@user_person.diaspora_handle} payload_type=#{@object.class} sender=#{@sender.diaspora_handle}") - # @return [Object] - def receive_object - obj = @object.receive(@user, @author) - Notification.notify(@user, obj, @author) if obj.respond_to?(:notification_type) - Rails.logger.info("event=receive status=complete recipient=#{@user_person.diaspora_handle} sender=#{@sender.diaspora_handle} payload_type=#{obj.class}") - obj - end + if self.validate_object + set_author! + receive_object + else + raise "not a valid object:#{@object.inspect}" + end + end - protected - def salmon - @salmon ||= Salmon::EncryptedSlap.from_xml(@salmon_xml, @user) - end + # @return [Object] + def receive_object + obj = @object.receive(@user, @author) + Notification.notify(@user, obj, @author) if obj.respond_to?(:notification_type) + Rails.logger.info("event=receive status=complete recipient=#{@user_person.diaspora_handle} sender=#{@sender.diaspora_handle} payload_type=#{obj.class}") + obj + end - def xml_author - if @object.respond_to?(:relayable?) - #if A and B are friends, and A sends B a comment from C, we delegate the validation to the owner of the post being commented on - xml_author = @user.owns?(@object.parent) ? @object.diaspora_handle : @object.parent.author.diaspora_handle - @author = Webfinger.new(@object.diaspora_handle).fetch if @object.author - else - xml_author = @object.diaspora_handle - end - xml_author - end + protected + def salmon + @salmon ||= Salmon::EncryptedSlap.from_xml(@salmon_xml, @user) + end - def validate_object - return false if contact_required_unless_request - return false if relayable_without_parent? + def xml_author + if @object.respond_to?(:relayable?) + #if A and B are friends, and A sends B a comment from C, we delegate the validation to the owner of the post being commented on + xml_author = @user.owns?(@object.parent) ? @object.diaspora_handle : @object.parent.author.diaspora_handle + @author = Webfinger.new(@object.diaspora_handle).fetch if @object.author + else + xml_author = @object.diaspora_handle + end + xml_author + end - assign_sender_handle_if_request + def validate_object + return false if contact_required_unless_request + return false if relayable_without_parent? - return false if author_does_not_match_xml_author? + assign_sender_handle_if_request - @object - end + return false if author_does_not_match_xml_author? - def set_author! - return unless @author - @object.author = @author if @object.respond_to? :author= - @object.person = @author if @object.respond_to? :person= - end + @object + end - private + def set_author! + return unless @author + @object.author = @author if @object.respond_to? :author= + @object.person = @author if @object.respond_to? :person= + end - #validations - def relayable_without_parent? - if @object.respond_to?(:relayable?) && @object.parent.nil? - Rails.logger.info("event=receive status=abort reason='received a comment but no corresponding post' recipient=#{@user_person.diaspora_handle} sender=#{@sender.diaspora_handle} payload_type=#{@object.class})") - return true - end - end + private - def author_does_not_match_xml_author? - if (@author.diaspora_handle != xml_author) - Rails.logger.info("event=receive status=abort reason='author in xml does not match retrieved person' payload_type=#{@object.class} recipient=#{@user_person.diaspora_handle} sender=#{@sender.diaspora_handle}") - return true - end - end + #validations + def relayable_without_parent? + if @object.respond_to?(:relayable?) && @object.parent.nil? + Rails.logger.info("event=receive status=abort reason='received a comment but no corresponding post' recipient=#{@user_person.diaspora_handle} sender=#{@sender.diaspora_handle} payload_type=#{@object.class})") + return true + end + end - def contact_required_unless_request - unless @object.is_a?(Request) || @user.contact_for(@sender) - Rails.logger.info("event=receive status=abort reason='sender not connected to recipient' recipient=#{@user_person.diaspora_handle} sender=#{@sender.diaspora_handle}") - return true - end - end + def author_does_not_match_xml_author? + if (@author.diaspora_handle != xml_author) + Rails.logger.info("event=receive status=abort reason='author in xml does not match retrieved person' payload_type=#{@object.class} recipient=#{@user_person.diaspora_handle} sender=#{@sender.diaspora_handle}") + return true + end + end - def assign_sender_handle_if_request - #special casey - if @object.is_a?(Request) - @object.sender_handle = @sender.diaspora_handle - end - end + def contact_required_unless_request + unless @object.is_a?(Request) || @user.contact_for(@sender) + Rails.logger.info("event=receive status=abort reason='sender not connected to recipient' recipient=#{@user_person.diaspora_handle} sender=#{@sender.diaspora_handle}") + return true + end + end + + def assign_sender_handle_if_request + #special casey + if @object.is_a?(Request) + @object.sender_handle = @sender.diaspora_handle end end end diff --git a/lib/postzord/receiver/public.rb b/lib/postzord/receiver/public.rb index 0fc4374d4..955f97299 100644 --- a/lib/postzord/receiver/public.rb +++ b/lib/postzord/receiver/public.rb @@ -2,63 +2,60 @@ # licensed under the Affero General Public License version 3 or later. See # the COPYRIGHT file. -module Postzord - module Receiver - class Public - attr_accessor :salmon, :author +class Postzord::Receiver::Public < Postzord::Receiver - def initialize(xml) - @salmon = Salmon::Slap.from_xml(xml) - @author = Webfinger.new(@salmon.author_id).fetch - end + attr_accessor :salmon, :author - # @return [Boolean] - def verified_signature? - @salmon.verified_for_key?(@author.public_key) - end + def initialize(xml) + @salmon = Salmon::Slap.from_xml(xml) + @author = Webfinger.new(@salmon.author_id).fetch + end - # @return [void] - def perform! - return false unless verified_signature? - return unless save_object + # @return [Boolean] + def verified_signature? + @salmon.verified_for_key?(@author.public_key) + end - if @object.respond_to?(:relayable?) - receive_relayable - else - Resque.enqueue(Jobs::ReceiveLocalBatch, @object.class.to_s, @object.id, self.recipient_user_ids) - end - end + # @return [void] + def perform! + return false unless verified_signature? + return unless save_object - # @return [Object] - def receive_relayable - if @object.parent.author.local? - # receive relayable object only for the owner of the parent object - @object.receive(@object.parent.author.owner, @author) - end - # notify everyone who can see the parent object - receiver = Postzord::Receiver::LocalBatch.new(@object, self.recipient_user_ids) - receiver.notify_users - @object - end - - # @return [Object] - def save_object - @object = Diaspora::Parser::from_xml(@salmon.parsed_data) - raise "Object is not public" if object_can_be_public_and_it_is_not? - @object.save! - end - - # @return [Array] User ids - def recipient_user_ids - User.all_sharing_with_person(@author).select('users.id').map!{ |u| u.id } - end - - private - - # @return [Boolean] - def object_can_be_public_and_it_is_not? - @object.respond_to?(:public) && !@object.public? - end + if @object.respond_to?(:relayable?) + receive_relayable + else + Resque.enqueue(Jobs::ReceiveLocalBatch, @object.class.to_s, @object.id, self.recipient_user_ids) end end + + # @return [Object] + def receive_relayable + if @object.parent.author.local? + # receive relayable object only for the owner of the parent object + @object.receive(@object.parent.author.owner, @author) + end + # notify everyone who can see the parent object + receiver = Postzord::Receiver::LocalBatch.new(@object, self.recipient_user_ids) + receiver.notify_users + @object + end + + # @return [Object] + def save_object + @object = Diaspora::Parser::from_xml(@salmon.parsed_data) + raise "Object is not public" if object_can_be_public_and_it_is_not? + @object.save! + end + + # @return [Array] User ids + def recipient_user_ids + User.all_sharing_with_person(@author).select('users.id').map!{ |u| u.id } + end + + private + + # @return [Boolean] + def object_can_be_public_and_it_is_not? + @object.respond_to?(:public) && !@object.public? + end end diff --git a/spec/integration/attack_vectors_spec.rb b/spec/integration/attack_vectors_spec.rb index 81dd26faa..cfa8b47ad 100644 --- a/spec/integration/attack_vectors_spec.rb +++ b/spec/integration/attack_vectors_spec.rb @@ -22,7 +22,7 @@ describe "attack vectors" do zord = Postzord::Receiver::Private.new(bob, :salmon_xml => salmon_xml) expect { - zord.perform + zord.perform! }.should raise_error /not a valid object/ bob.visible_posts.include?(post_from_non_contact).should be_false @@ -39,7 +39,7 @@ describe "attack vectors" do salmon_xml = bob.salmon(original_message).xml_for(alice.person) zord = Postzord::Receiver::Private.new(bob, :salmon_xml => salmon_xml) expect { - zord.perform + zord.perform! }.should raise_error /not a valid object/ alice.reload.visible_posts.should_not include(StatusMessage.find(original_message.id)) @@ -53,12 +53,12 @@ describe "attack vectors" do salmon_xml = eve.salmon(original_message).xml_for(bob.person) zord = Postzord::Receiver::Private.new(bob, :salmon_xml => salmon_xml) - zord.perform + zord.perform! malicious_message = Factory.build(:status_message, :id => original_message.id, :text => 'BAD!!!', :author => alice.person) salmon_xml = alice.salmon(malicious_message).xml_for(bob.person) zord = Postzord::Receiver::Private.new(bob, :salmon_xml => salmon_xml) - zord.perform + zord.perform! original_message.reload.text.should == "store this!" end @@ -68,14 +68,14 @@ describe "attack vectors" do salmon_xml = eve.salmon(original_message).xml_for(bob.person) zord = Postzord::Receiver::Private.new(bob, :salmon_xml => salmon_xml) - zord.perform + zord.perform! lambda { malicious_message = Factory.build( :status_message, :id => original_message.id, :text => 'BAD!!!', :author => eve.person) salmon_xml2 = alice.salmon(malicious_message).xml_for(bob.person) zord = Postzord::Receiver::Private.new(bob, :salmon_xml => salmon_xml) - zord.perform + zord.perform! }.should_not change{ bob.reload.visible_posts.count @@ -97,7 +97,7 @@ describe "attack vectors" do zord = Postzord::Receiver::Private.new(bob, :salmon_xml => salmon_xml) expect { - zord.perform + zord.perform! }.should raise_error /not a valid object/ eve.reload.profile.first_name.should == first_name @@ -109,7 +109,7 @@ describe "attack vectors" do salmon_xml = eve.salmon(original_message).xml_for(bob.person) zord = Postzord::Receiver::Private.new(bob, :salmon_xml => salmon_xml) - zord.perform + zord.perform! bob.visible_posts.count.should == 1 StatusMessage.count.should == 1 @@ -121,7 +121,7 @@ describe "attack vectors" do salmon_xml = alice.salmon(ret).xml_for(bob.person) zord = Postzord::Receiver::Private.new(bob, :salmon_xml => salmon_xml) - zord.perform + zord.perform! StatusMessage.count.should == 1 bob.visible_posts.count.should == 1 @@ -143,7 +143,7 @@ describe "attack vectors" do proc { salmon_xml = alice.salmon(ret).xml_for(bob.person) zord = Postzord::Receiver::Private.new(bob, :salmon_xml => salmon_xml) - zord.perform + zord.perform! }.should_not raise_error end @@ -152,7 +152,7 @@ describe "attack vectors" do salmon_xml = eve.salmon(original_message).xml_for(bob.person) zord = Postzord::Receiver::Private.new(bob, :salmon_xml => salmon_xml) - zord.perform + zord.perform! bob.visible_posts.count.should == 1 @@ -164,7 +164,7 @@ describe "attack vectors" do salmon_xml = alice.salmon(ret).xml_for(bob.person) zord = Postzord::Receiver::Private.new(bob, :salmon_xml => salmon_xml) expect { - zord.perform + zord.perform! }.should raise_error /not a valid object/ bob.reload.visible_posts.count.should == 1 @@ -180,7 +180,7 @@ describe "attack vectors" do salmon_xml = alice.salmon(ret).xml_for(bob.person) zord = Postzord::Receiver::Private.new(bob, :salmon_xml => salmon_xml) - zord.perform + zord.perform! }.should_not change{bob.reload.contacts.count} end @@ -196,7 +196,7 @@ describe "attack vectors" do salmon_xml = alice.salmon(ret).xml_for(bob.person) zord = Postzord::Receiver::Private.new(bob, :salmon_xml => salmon_xml) expect { - zord.perform + zord.perform! }.should raise_error /not a valid object/ bob.reload.contacts.count.should == 2 @@ -207,7 +207,7 @@ describe "attack vectors" do salmon_xml = eve.salmon(original_message).xml_for(bob.person) zord = Postzord::Receiver::Private.new(bob, :salmon_xml => salmon_xml) - zord.perform + zord.perform! original_message.diaspora_handle = alice.diaspora_handle original_message.text= "bad bad bad" @@ -215,7 +215,7 @@ describe "attack vectors" do salmon_xml = alice.salmon(original_message).xml_for(bob.person) zord = Postzord::Receiver::Private.new(bob, :salmon_xml => salmon_xml) - zord.perform + zord.perform! original_message.reload.text.should == "store this!" end diff --git a/spec/integration/receiving_spec.rb b/spec/integration/receiving_spec.rb index f6d8f4f5f..2e18f2c87 100644 --- a/spec/integration/receiving_spec.rb +++ b/spec/integration/receiving_spec.rb @@ -334,7 +334,7 @@ describe 'a user receives a post' do salmon_xml = salmon.xml_for(bob.person) zord = Postzord::Receiver::Private.new(bob, :salmon_xml => salmon_xml) - zord.perform + zord.perform! bob.visible_posts.include?(post).should be_true end diff --git a/spec/lib/postzord/receiver/private_spec.rb b/spec/lib/postzord/receiver/private_spec.rb index 636b9f220..f86eb6a64 100644 --- a/spec/lib/postzord/receiver/private_spec.rb +++ b/spec/lib/postzord/receiver/private_spec.rb @@ -47,7 +47,7 @@ describe Postzord::Receiver::Private do end end - describe '#perform' do + describe '#perform!' do before do @zord = Postzord::Receiver::Private.new(@user, :salmon_xml => @salmon_xml) @salmon = @zord.instance_variable_get(:@salmon) @@ -56,25 +56,25 @@ describe Postzord::Receiver::Private do context 'returns nil' do it 'if the salmon author does not exist' do @zord.instance_variable_set(:@sender, nil) - @zord.perform.should be_nil + @zord.perform!.should be_nil end it 'if the author does not match the signature' do @zord.instance_variable_set(:@sender, Factory(:person)) - @zord.perform.should be_nil + @zord.perform!.should be_nil end end context 'returns the sent object' do it 'returns the received object on success' do - object = @zord.perform + object = @zord.perform! object.should respond_to(:to_diaspora_xml) end end it 'parses the salmon object' do Diaspora::Parser.should_receive(:from_xml).with(@salmon.parsed_data).and_return(@original_post) - @zord.perform + @zord.perform! end end From 6ea540fb940201ae6dedc8d108965ce2a5e26222 Mon Sep 17 00:00:00 2001 From: Ilya Zhitomirskiy Date: Fri, 30 Sep 2011 19:43:01 -0700 Subject: [PATCH 6/7] wip, adding to cache in receiver, caching is only triggered for receiving of posts --- app/models/post.rb | 4 ++ lib/diaspora/redis_cache.rb | 6 ++ lib/postzord/receiver.rb | 21 ++++++ lib/postzord/receiver/local_batch.rb | 12 ++-- lib/postzord/receiver/private.rb | 7 +- lib/postzord/receiver/public.rb | 2 +- spec/lib/diaspora/redis_cache_spec.rb | 26 ++++++- .../lib/postzord/receiver/local_batch_spec.rb | 26 +++++-- spec/lib/postzord/receiver/private_spec.rb | 22 ++++-- spec/lib/postzord/receiver_spec.rb | 71 +++++++++++++++++++ spec/models/post_spec.rb | 6 ++ 11 files changed, 187 insertions(+), 16 deletions(-) create mode 100644 lib/postzord/receiver.rb create mode 100644 spec/lib/postzord/receiver_spec.rb diff --git a/app/models/post.rb b/app/models/post.rb index b4037e4d6..04fb21fda 100644 --- a/app/models/post.rb +++ b/app/models/post.rb @@ -138,6 +138,10 @@ class Post < ActiveRecord::Base false end + def triggers_caching? + true + end + def comment_email_subject I18n.t('notifier.a_post_you_shared') end diff --git a/lib/diaspora/redis_cache.rb b/lib/diaspora/redis_cache.rb index 3620ee222..6b3492c52 100644 --- a/lib/diaspora/redis_cache.rb +++ b/lib/diaspora/redis_cache.rb @@ -63,6 +63,12 @@ class RedisCache "#{@order_field} DESC" end + def add(score, id) + return unless self.cache_exists? + self.redis.zadd(set_key, score.to_i, id) + self.trim! + end + protected # @return [Redis] def redis diff --git a/lib/postzord/receiver.rb b/lib/postzord/receiver.rb new file mode 100644 index 000000000..5c4feee03 --- /dev/null +++ b/lib/postzord/receiver.rb @@ -0,0 +1,21 @@ +# Copyright (c) 2010-2011, Diaspora Inc. This file is +# licensed under the Affero General Public License version 3 or later. See +# the COPYRIGHT file. + + +class Postzord::Receiver + require File.join(Rails.root, 'lib/postzord/receiver/private') + require File.join(Rails.root, 'lib/postzord/receiver/public') + + def perform! + receive! + update_cache! if cache? + end + + # @return [Boolean] + def cache? + self.respond_to?(:update_cache!) && AppConfig[:redis_cache] && + @object.respond_to?(:triggers_caching?) && @object.triggers_caching? + end +end + diff --git a/lib/postzord/receiver/local_batch.rb b/lib/postzord/receiver/local_batch.rb index f2da0380d..240da8309 100644 --- a/lib/postzord/receiver/local_batch.rb +++ b/lib/postzord/receiver/local_batch.rb @@ -12,14 +12,11 @@ class Postzord::Receiver::LocalBatch < Postzord::Receiver @users = User.where(:id => @recipient_user_ids) end - def perform! + def receive! if @object.respond_to?(:relayable?) receive_relayable else create_post_visibilities - - # if caching enabled, add to cache - end notify_mentioned_users if @object.respond_to?(:mentions) @@ -28,6 +25,13 @@ class Postzord::Receiver::LocalBatch < Postzord::Receiver notify_users end + def update_cache! + @users.each do |user| + cache = RedisCache.new(user, "created_at") + cache.add(@object.created_at.to_i, @object.id) + end + end + # NOTE(copied over from receiver public) # @return [Object] def receive_relayable diff --git a/lib/postzord/receiver/private.rb b/lib/postzord/receiver/private.rb index 03c799928..a1db66d96 100644 --- a/lib/postzord/receiver/private.rb +++ b/lib/postzord/receiver/private.rb @@ -18,7 +18,7 @@ class Postzord::Receiver::Private < Postzord::Receiver @object = opts[:object] end - def perform! + def receive! if @sender && self.salmon.verified_for_key?(@sender.public_key) parse_and_receive(salmon.parsed_data) else @@ -48,6 +48,11 @@ class Postzord::Receiver::Private < Postzord::Receiver obj end + def update_cache! + cache = RedisCache.new(@user, "created_at") + cache.add(@object.created_at.to_i, @object.id) + end + protected def salmon @salmon ||= Salmon::EncryptedSlap.from_xml(@salmon_xml, @user) diff --git a/lib/postzord/receiver/public.rb b/lib/postzord/receiver/public.rb index 955f97299..a5e55b1a0 100644 --- a/lib/postzord/receiver/public.rb +++ b/lib/postzord/receiver/public.rb @@ -17,7 +17,7 @@ class Postzord::Receiver::Public < Postzord::Receiver end # @return [void] - def perform! + def receive! return false unless verified_signature? return unless save_object diff --git a/spec/lib/diaspora/redis_cache_spec.rb b/spec/lib/diaspora/redis_cache_spec.rb index 790c02574..c92f928a8 100644 --- a/spec/lib/diaspora/redis_cache_spec.rb +++ b/spec/lib/diaspora/redis_cache_spec.rb @@ -136,6 +136,30 @@ describe RedisCache do end end - describe "#add" + describe "#add" do + before do + @cache.stub(:cache_exists?).and_return(true) + @id = 1 + @score = 123 + end + + it "adds an id with a given score" do + @redis.should_receive(:zadd).with(@cache.send(:set_key), @score, @id) + @cache.add(@score, @id) + end + + it 'trims' do + @cache.should_receive(:trim!) + @cache.add(@score, @id) + end + + it "doesn't add if the cache does not exist" do + @cache.stub(:cache_exists?).and_return(false) + + @redis.should_not_receive(:zadd) + @cache.add(@score, @id).should be_false + end + end + describe "#remove" end diff --git a/spec/lib/postzord/receiver/local_batch_spec.rb b/spec/lib/postzord/receiver/local_batch_spec.rb index 421fc9bb4..764c7c596 100644 --- a/spec/lib/postzord/receiver/local_batch_spec.rb +++ b/spec/lib/postzord/receiver/local_batch_spec.rb @@ -17,26 +17,26 @@ describe Postzord::Receiver::LocalBatch do end end - describe '#perform!' do + describe '#receive!' do it 'calls .create_post_visibilities' do receiver.should_receive(:create_post_visibilities) - receiver.perform! + receiver.receive! end it 'sockets to users' do pending 'not currently socketing' receiver.should_receive(:socket_to_users) - receiver.perform! + receiver.receive! end it 'notifies mentioned users' do receiver.should_receive(:notify_mentioned_users) - receiver.perform! + receiver.receive! end it 'notifies users' do receiver.should_receive(:notify_users) - receiver.perform! + receiver.receive! end end @@ -111,4 +111,20 @@ describe Postzord::Receiver::LocalBatch do receiver.perform! end end + + describe '#update_cache!' do + it 'adds to a redis cache for receiving_users' do + users = [alice, eve] + @zord = Postzord::Receiver::LocalBatch.new(@object, users.map{|u| u.id}) + + sort_order = "created_at" + + cache = mock + RedisCache.should_receive(:new).exactly(users.length).times.with(instance_of(User), sort_order).and_return(cache) + + cache.should_receive(:add).exactly(users.length).times.with(@object.created_at.to_i, @object.id) + + @zord.update_cache! + end + end end diff --git a/spec/lib/postzord/receiver/private_spec.rb b/spec/lib/postzord/receiver/private_spec.rb index f86eb6a64..8fe6779cc 100644 --- a/spec/lib/postzord/receiver/private_spec.rb +++ b/spec/lib/postzord/receiver/private_spec.rb @@ -47,7 +47,7 @@ describe Postzord::Receiver::Private do end end - describe '#perform!' do + describe '#receive!' do before do @zord = Postzord::Receiver::Private.new(@user, :salmon_xml => @salmon_xml) @salmon = @zord.instance_variable_get(:@salmon) @@ -67,8 +67,8 @@ describe Postzord::Receiver::Private do context 'returns the sent object' do it 'returns the received object on success' do - object = @zord.perform! - object.should respond_to(:to_diaspora_xml) + @zord.perform! + @zord.instance_variable_get(:@object).should respond_to(:to_diaspora_xml) end end @@ -86,7 +86,7 @@ describe Postzord::Receiver::Private do it 'calls Notification.notify if object responds to notification_type' do cm = Comment.new - cm.stub!(:receive).and_return(cm) + cm.stub(:receive).and_return(cm) Notification.should_receive(:notify).with(@user, cm, @person2) zord = Postzord::Receiver::Private.new(@user, :person => @person2, :object => cm) @@ -103,4 +103,18 @@ describe Postzord::Receiver::Private do @zord.receive_object end end + + describe '#update_cache!' do + it 'adds to redis cache for the given user' do + @original_post.save! + + @zord = Postzord::Receiver::Private.new(@user, :person => @person2, :object => @original_post) + + sort_order = "created_at" + cache = RedisCache.new(@user, sort_order) + RedisCache.should_receive(:new).with(@user, sort_order).and_return(cache) + cache.should_receive(:add).with(@original_post.created_at.to_i, @original_post.id) + @zord.update_cache! + end + end end diff --git a/spec/lib/postzord/receiver_spec.rb b/spec/lib/postzord/receiver_spec.rb new file mode 100644 index 000000000..f51164372 --- /dev/null +++ b/spec/lib/postzord/receiver_spec.rb @@ -0,0 +1,71 @@ +# Copyright (c) 2011, Diaspora Inc. This file is +# licensed under the Affero General Public License version 3 or later. See +# the COPYRIGHT file. + +require 'spec_helper' + +require File.join(Rails.root, 'lib/postzord/receiver') + +describe Postzord::Receiver do + before do + @receiver = Postzord::Receiver.new + end + + describe "#perform!" do + before do + @receiver.stub(:receive!) + end + + it 'calls receive!' do + @receiver.should_receive(:receive!) + @receiver.perform! + end + + context 'update_cache!' do + it "gets called if cache?" do + @receiver.stub(:cache?).and_return(true) + @receiver.should_receive(:update_cache!) + @receiver.perform! + end + + it "doesn't get called if !cache?" do + @receiver.stub(:cache?).and_return(false) + @receiver.should_not_receive(:update_cache!) + @receiver.perform! + end + end + end + + describe "#cache?" do + before do + @receiver.stub(:respond_to?).with(:update_cache!).and_return(true) + AppConfig[:redis_cache] = true + @receiver.instance_variable_set(:@object, mock(:triggers_caching? => true)) + end + + it 'returns true if the receiver responds to update_cache and the application has caching enabled' do + @receiver.cache?.should be_true + end + + it 'returns false if the receiver does not respond to update_cache' do + @receiver.stub(:respond_to?).with(:update_cache!).and_return(false) + @receiver.cache?.should be_false + end + + it 'returns false if the application does not have caching set' do + AppConfig[:redis_cache] = false + @receiver.cache?.should be_false + end + + it 'returns false if the object is does not respond to triggers_caching' do + @receiver.instance_variable_set(:@object, mock) + @receiver.cache?.should be_false + end + + it 'returns false if the object is not cacheable' do + @receiver.instance_variable_set(:@object, mock(:triggers_caching? => false)) + @receiver.cache?.should be_false + end + end +end + diff --git a/spec/models/post_spec.rb b/spec/models/post_spec.rb index 27ee97472..f5669c7d0 100644 --- a/spec/models/post_spec.rb +++ b/spec/models/post_spec.rb @@ -131,4 +131,10 @@ describe Post do @post.reload.updated_at.to_i.should == old_time.to_i end end + + describe "triggers_caching?" do + it 'returns true' do + Post.new.triggers_caching?.should be_true + end + end end From 97342630c4449fd5daa51100c1c966989d7f266b Mon Sep 17 00:00:00 2001 From: Ilya Zhitomirskiy Date: Tue, 4 Oct 2011 14:39:56 -0700 Subject: [PATCH 7/7] dg iz added some more documentation and only caching on all aspects --- lib/diaspora/user/querying.rb | 2 +- lib/stream/aspect_stream.rb | 25 ++++++++++++++++++++++--- spec/lib/stream/aspect_stream_spec.rb | 8 ++++++++ spec/models/user/querying_spec.rb | 8 +++++++- 4 files changed, 38 insertions(+), 5 deletions(-) diff --git a/lib/diaspora/user/querying.rb b/lib/diaspora/user/querying.rb index 3c53b030d..432a929c8 100644 --- a/lib/diaspora/user/querying.rb +++ b/lib/diaspora/user/querying.rb @@ -24,7 +24,7 @@ module Diaspora def visible_post_ids(opts={}) opts = prep_opts(opts) - if AppConfig[:redis_cache] && RedisCache.supported_order?(opts[:order_field]) + if AppConfig[:redis_cache] && RedisCache.supported_order?(opts[:order_field]) && opts[:all_aspects?].present? cache = RedisCache.new(self, opts[:order_field]) cache.ensure_populated! diff --git a/lib/stream/aspect_stream.rb b/lib/stream/aspect_stream.rb index 3edb4d0af..69ab65092 100644 --- a/lib/stream/aspect_stream.rb +++ b/lib/stream/aspect_stream.rb @@ -39,7 +39,8 @@ class AspectStream < BaseStream # @return [ActiveRecord::Association] AR association of posts def posts # NOTE(this should be something like Post.all_for_stream(@user, aspect_ids, {}) that calls visible_posts - @posts ||= user.visible_posts(:by_members_of => aspect_ids, + @posts ||= user.visible_posts(:all_aspects? => for_all_aspects?, + :by_members_of => aspect_ids, :type => TYPES_OF_POST_IN_STREAM, :order => "#{order} DESC", :max_time => max_time @@ -51,6 +52,7 @@ class AspectStream < BaseStream @people ||= Person.all_from_aspects(aspect_ids, user).includes(:profile) end + # @return [String] URL def link(opts={}) Rails.application.routes.url_helpers.aspects_path(opts.merge(:a_ids => aspect_ids)) end @@ -64,16 +66,26 @@ class AspectStream < BaseStream end end + # Only ajax in the stream if all aspects are present. + # In this case, we know we're on the first page of the stream, + # as the default view for aspects/index is showing posts from + # all a user's aspects. + # + # @return [Boolean] see #for_all_aspects? def ajax_stream? for_all_aspects? end + # The title that will display at the top of the stream's + # publisher box. + # + # @return [String] def title if self.for_all_aspects? I18n.t('aspects.aspect_stream.stream') - else + else self.aspects.to_sentence - end + end end # Determine whether or not the stream is displaying across @@ -84,6 +96,9 @@ class AspectStream < BaseStream @all_aspects ||= aspect_ids.length == user.aspects.size end + # Provides a translated title for contacts box on the right pane. + # + # @return [String] def contacts_title if self.for_all_aspects? || self.aspect_ids.size > 1 I18n.t('_contacts') @@ -92,6 +107,10 @@ class AspectStream < BaseStream end end + # Provides a link to the user to the contacts page that corresponds with + # the stream's active aspects. + # + # @return [String] Link to contacts def contacts_link if for_all_aspects? || aspect_ids.size > 1 Rails.application.routes.url_helpers.contacts_path diff --git a/spec/lib/stream/aspect_stream_spec.rb b/spec/lib/stream/aspect_stream_spec.rb index 381af67f0..6d9c486ad 100644 --- a/spec/lib/stream/aspect_stream_spec.rb +++ b/spec/lib/stream/aspect_stream_spec.rb @@ -73,6 +73,14 @@ describe AspectStream do @alice.should_receive(:visible_posts).with(hash_including(:max_time => instance_of(Time))).and_return(stub.as_null_object) stream.posts end + + it 'passes for_all_aspects to visible posts' do + stream = AspectStream.new(@alice, [1,2], :max_time => 123) + all_aspects = mock + stream.stub(:for_all_aspects?).and_return(all_aspects) + @alice.should_receive(:visible_posts).with(hash_including(:all_aspects? => all_aspects)).and_return(stub.as_null_object) + stream.posts + end end describe '#people' do diff --git a/spec/models/user/querying_spec.rb b/spec/models/user/querying_spec.rb index 0cdc999b4..43e4a5eda 100644 --- a/spec/models/user/querying_spec.rb +++ b/spec/models/user/querying_spec.rb @@ -88,7 +88,7 @@ describe User do context "RedisCache" do before do AppConfig[:redis_cache] = true - @opts = {:order => "created_at DESC"} + @opts = {:order => "created_at DESC", :all_aspects? => true} end after do @@ -103,6 +103,12 @@ describe User do alice.visible_post_ids(@opts) end + it 'does not get used if if all_aspects? option is not present' do + RedisCache.should_not_receive(:new) + + alice.visible_post_ids(@opts.merge({:all_aspects? => false})) + end + describe "#ensure_populated_cache" do it 'does nothing if the cache is already populated' it 're-populates the cache with the latest posts (in hashes)'