root/trunk/lib/workers/ferret_server_worker.rb

Revision 1982, 8.9 kB (checked in by benjamin, 1 year ago)

added production companies for seasons

Line 
1 # FerretServer
2 #
3 # BackgroundRb Worker to serve as a ferret-server. All index and search requests
4 # will be forwarded to this server, where all indexing requests gets serialized.
5 #
6 # Author:: Benjamin Krause, Jens Kraemer
7 require 'lazy_doc'
8 require 'unique_queue'
9 require 'indexer'
10 require 'util'
11
12 module OMDB
13   module Ferret
14     class FerretServerWorker < BackgrounDRb::Rails
15       include LocalSearch
16       include OMDB::Util
17       include FileUtils
18       include MonitorMixin
19
20       first_run Time.now.to_s(:db)
21
22       # Neccessary to avaid 'hanging' sql connections
23       ActiveRecord::Base.allow_concurrency = true
24       ActiveRecord::Base.verification_timeout = 10
25
26       private
27
28       # Main Loop for the worker. The Worker gets autostarted by
29       # backgroundrb and will then wait for entries in the
30       # queue. As soon as an entry is present, it will process
31       # the record until the queue is empty again.
32       # It'll optimize the Ferret Index, once the queue is empty.
33       def do_work(args)
34         initialize_server
35         loop do
36           @logger.info("starting loop...")
37           counter = 1
38           record = @queue.pop
39           # open the offline-index
40           Indexer.get_writer_and_optimize( true ) do |writer|
41             benchmark("indexing took %s")  do
42               @logger.info("opening writer")
43               process_record( record, writer )
44               # index until the queue is empty or we have indexed
45               # 256 records ..
46               while not @queue.empty? and counter < 256
47                 record = @queue.pop
48                 process_record( record, writer )
49                 counter = counter.succ
50               end
51             end
52             @logger.info("closing writer after processing #{counter} records")
53           end
54           switch_index
55         end
56       rescue Exception
57         @logger.error "caught fatal error in do_work, terminating: #{$!}\n#{$!.backtrace.join "\n"}"
58       end
59
60       public
61      
62       # == Indexing Methods
63       #
64       # This are the main method called to modify the ferret index. All of these methods
65       # will be called by the SearchObserver, but you can use them on the console as well.
66       #
67       # >> MiddleMan.get_worker('ferret_server').index_object Person.find(123).to_hash_args
68       #
69
70       # Add object to the ferret index
71       def index_object(args)
72         @logger.info "received indexing request for #{args.inspect}"
73         enqueue args, :add
74       rescue
75         @logger.error "caught error in index_object: #{$!}\n#{$!.backtrace.join "\n"}"
76       end
77       alias << index_object
78
79       # Remove object from the ferret index
80       def delete_object(args)
81         @logger.info "received delete request for #{args.inspect}"
82         enqueue args, :remove
83       rescue
84         @logger.error "caught error in delete: #{$!}\n#{$!.backtrace.join "\n"}"
85       end
86       alias delete delete_object
87       alias - delete
88
89
90       # === Index dependencies
91       #
92       # after a record has been changed, a lot of other objects might
93       # need to be updated in the index as well. To get all depending objects,
94       # omdb has several dependency-views in the database.
95       # This method will assure, that not only the object itself gets updated
96       # in the index, but all dependent objects as well.
97       # Fetching all dependencies might take some time, so we create a new
98       # thread to fetch the dependencies and add them to the index queue
99       #
100       # :TODO: fÃŒr extra Bonuspunkte die Queue double-ended machen und die records
101       # unpoppen so dass die Reihenfolge erhalten bleibt :)
102       def index_dependencies(args)
103         Thread.new do
104           @logger.info "enqueueing dependencies of #{args.inspect}"
105           record = get_object args[:type], args[:id]
106           if record
107             objects = record.dependent_objects
108             objects.each do |class_name, id_array|
109               id_array.each do |id|
110                 next if id == 0
111                 index_object :type => class_name, :id => id
112               end
113             end
114           else
115             @logger.info "record #{args.inspect} not found, no dependencies enqueued"
116           end
117           record.class.connection.disconnect!
118         end
119       end
120
121       # empties the queue and inserts a single :reindex action
122       def rebuild_index
123         @queue.clear
124         enqueue({}, :reindex)
125       rescue
126         @logger.error "caught error in rebuild_index: #{$!}\n#{$!.backtrace.join "\n"}"
127       end
128
129       def status
130         returning status = { :objects_in_queue => @queue.size } do
131           if rebuilding?
132             status[:reindexing_since]     = @reindexing_since
133             status[:last_reindexing_time] = @last_reindexing_time
134             status[:percentage]           = (Time.now - @reindexing_since)*100.0 / @last_reindexing_time if @last_reindexing_time
135           end
136         end
137       rescue
138         @logger.error "caught error in status: #{$!}\n#{$!.backtrace.join "\n"}"
139       end
140
141       def rebuilding?
142         !@reindexing_since.nil?
143       end
144
145
146       protected
147  
148       # Initialize the Ferret Server by creating the UniqueQueue for indexing-requests,
149       # loading the current index from disk to RAM and initializing the Server-Logfile
150       def initialize_server
151         @queue = UniqueQueue.new
152         @logger.info "#{Time.now} initialized indexer worker in #{ENV['RAILS_ENV']} mode"
153       end
154      
155       # === Process queue entries
156       #
157       # Main method to process the entries of the indexing queue. Each element in the
158       # queue consit of a class_name, an id and a action. We know three actions
159       # for elements in the queue:
160       # - add     : Add the class_name/id AR-object to the index (or update the object)
161       # - remove  : Remove the class_name/id AR-object from the index
162       # - reindex : Run a complete reindex, this can be triggered via the
163       #             admin/search_controller action :reindex.
164       def process_record( record, writer )
165         class_name, id, action = record
166         begin
167           case action
168             when :add     :
169               object = get_object(class_name, id)
170               @logger.info "indexing #{record.inspect}"
171               Indexer.index_local( object, writer ) if object
172             when :remove  :
173               @logger.info "deleting #{record.inspect}"
174               Indexer.delete_local( { :type => class_name, :id => id }, writer )
175             when :reindex :
176               @logger.info "rebuilding the whole index"
177               @reindexing_since = Time.now
178               Indexer.reindex( writer )
179               @last_reindexing_time = Time.now - @reindexing_since
180               @reindexing_since = nil
181             else raise 'unknown action'
182           end
183         rescue
184           @logger.error "caught error inside loop: #{$!}\n#{$!.backtrace.join "\n"}"
185         end
186       end
187  
188       # Fetch the object from the database
189       def get_object(class_name, id)
190         begin
191           clazz = class_name.constantize
192           obj = clazz.find(id)
193           @logger.info "fetched object #{obj.inspect}"
194           return obj
195         rescue ActiveRecord::RecordNotFound
196           @logger.warn "#{Time.now} record not found: #{class_name} - #{id}, sleeping for a short while"
197           sleep 3
198           begin
199             return clazz.find(id)
200           rescue ActiveRecord::RecordNotFound
201             @logger.error "still no luck with record: #{class_name} - #{id}, giving up :("
202           end
203         rescue ActiveRecord::StatementInvalid
204           @logger.error "caught statement invalid: #{$!}\n#{$!.backtrace}\nforcing db reconnect and re-enqueueing object"
205           clazz.connection.reconnect!
206           # re-enqueue object
207           enqueue({ :type => class_name, :id => id }, :add)
208         end
209         return nil
210       end
211
212       # Main method to add elements to the queue. @queue << will
213       # return false if the element is already in the queue,
214       # avoiding unneccessary double-indexing.
215       def enqueue(args, action)
216         unless @queue << [ args[:type], args[:id], action ]
217           @logger.debug "skipped enqueue, entry already present"
218         end
219       end
220      
221       def switch_index
222         @logger.debug 'switching..'
223         synchronize do
224           relink
225           sync
226         end
227       end
228      
229       def relink
230         online  = File.readlink(Indexer.online_index)
231         offline = File.readlink(Indexer.offline_index)
232         File.delete Indexer.offline_index
233         File.delete Indexer.online_index
234         FileUtils.ln_s offline, Indexer.online_index, :force => true
235         FileUtils.ln_s online,  Indexer.offline_index, :force => true
236         # Inform the mongrel servers that a new index exists.
237         FileUtils.touch Indexer.index_status_file
238       end
239      
240       def sync
241         @logger.info("syncing from #{Indexer.online_index}/ to #{Indexer.offline_index}")
242         system("rsync -r --delete --verbose #{Indexer.online_index}/ #{Indexer.offline_index}")
243       end
244     end
245   end
246 end
Note: See TracBrowser for help on using the browser.