| 1 |
# FerretServer |
|---|
| 2 |
# |
|---|
| 3 |
# BackgroundRb Worker to serve as a ferret-server. All index and search requests |
|---|
| 4 |
# will be forwarded to this server, where all indexing requests gets serialized. |
|---|
| 5 |
# |
|---|
| 6 |
# Author:: Benjamin Krause, Jens Kraemer |
|---|
| 7 |
require 'lazy_doc' |
|---|
| 8 |
require 'unique_queue' |
|---|
| 9 |
require 'indexer' |
|---|
| 10 |
require 'util' |
|---|
| 11 |
|
|---|
| 12 |
module OMDB |
|---|
| 13 |
module Ferret |
|---|
| 14 |
class FerretServerWorker < BackgrounDRb::Rails |
|---|
| 15 |
include LocalSearch |
|---|
| 16 |
include OMDB::Util |
|---|
| 17 |
include FileUtils |
|---|
| 18 |
include MonitorMixin |
|---|
| 19 |
|
|---|
| 20 |
first_run Time.now.to_s(:db) |
|---|
| 21 |
|
|---|
| 22 |
# Neccessary to avaid 'hanging' sql connections |
|---|
| 23 |
ActiveRecord::Base.allow_concurrency = true |
|---|
| 24 |
ActiveRecord::Base.verification_timeout = 10 |
|---|
| 25 |
|
|---|
| 26 |
private |
|---|
| 27 |
|
|---|
| 28 |
# Main Loop for the worker. The Worker gets autostarted by |
|---|
| 29 |
# backgroundrb and will then wait for entries in the |
|---|
| 30 |
# queue. As soon as an entry is present, it will process |
|---|
| 31 |
# the record until the queue is empty again. |
|---|
| 32 |
# It'll optimize the Ferret Index, once the queue is empty. |
|---|
| 33 |
def do_work(args) |
|---|
| 34 |
initialize_server |
|---|
| 35 |
loop do |
|---|
| 36 |
@logger.info("starting loop...") |
|---|
| 37 |
counter = 1 |
|---|
| 38 |
record = @queue.pop |
|---|
| 39 |
# open the offline-index |
|---|
| 40 |
Indexer.get_writer_and_optimize( true ) do |writer| |
|---|
| 41 |
benchmark("indexing took %s") do |
|---|
| 42 |
@logger.info("opening writer") |
|---|
| 43 |
process_record( record, writer ) |
|---|
| 44 |
# index until the queue is empty or we have indexed |
|---|
| 45 |
# 256 records .. |
|---|
| 46 |
while not @queue.empty? and counter < 256 |
|---|
| 47 |
record = @queue.pop |
|---|
| 48 |
process_record( record, writer ) |
|---|
| 49 |
counter = counter.succ |
|---|
| 50 |
end |
|---|
| 51 |
end |
|---|
| 52 |
@logger.info("closing writer after processing #{counter} records") |
|---|
| 53 |
end |
|---|
| 54 |
switch_index |
|---|
| 55 |
end |
|---|
| 56 |
rescue Exception |
|---|
| 57 |
@logger.error "caught fatal error in do_work, terminating: #{$!}\n#{$!.backtrace.join "\n"}" |
|---|
| 58 |
end |
|---|
| 59 |
|
|---|
| 60 |
public |
|---|
| 61 |
|
|---|
| 62 |
# == Indexing Methods |
|---|
| 63 |
# |
|---|
| 64 |
# This are the main method called to modify the ferret index. All of these methods |
|---|
| 65 |
# will be called by the SearchObserver, but you can use them on the console as well. |
|---|
| 66 |
# |
|---|
| 67 |
# >> MiddleMan.get_worker('ferret_server').index_object Person.find(123).to_hash_args |
|---|
| 68 |
# |
|---|
| 69 |
|
|---|
| 70 |
# Add object to the ferret index |
|---|
| 71 |
def index_object(args) |
|---|
| 72 |
@logger.info "received indexing request for #{args.inspect}" |
|---|
| 73 |
enqueue args, :add |
|---|
| 74 |
rescue |
|---|
| 75 |
@logger.error "caught error in index_object: #{$!}\n#{$!.backtrace.join "\n"}" |
|---|
| 76 |
end |
|---|
| 77 |
alias << index_object |
|---|
| 78 |
|
|---|
| 79 |
# Remove object from the ferret index |
|---|
| 80 |
def delete_object(args) |
|---|
| 81 |
@logger.info "received delete request for #{args.inspect}" |
|---|
| 82 |
enqueue args, :remove |
|---|
| 83 |
rescue |
|---|
| 84 |
@logger.error "caught error in delete: #{$!}\n#{$!.backtrace.join "\n"}" |
|---|
| 85 |
end |
|---|
| 86 |
alias delete delete_object |
|---|
| 87 |
alias - delete |
|---|
| 88 |
|
|---|
| 89 |
|
|---|
| 90 |
# === Index dependencies |
|---|
| 91 |
# |
|---|
| 92 |
# after a record has been changed, a lot of other objects might |
|---|
| 93 |
# need to be updated in the index as well. To get all depending objects, |
|---|
| 94 |
# omdb has several dependency-views in the database. |
|---|
| 95 |
# This method will assure, that not only the object itself gets updated |
|---|
| 96 |
# in the index, but all dependent objects as well. |
|---|
| 97 |
# Fetching all dependencies might take some time, so we create a new |
|---|
| 98 |
# thread to fetch the dependencies and add them to the index queue |
|---|
| 99 |
# |
|---|
| 100 |
# :TODO: fÃŒr extra Bonuspunkte die Queue double-ended machen und die records |
|---|
| 101 |
# unpoppen so dass die Reihenfolge erhalten bleibt :) |
|---|
| 102 |
def index_dependencies(args) |
|---|
| 103 |
Thread.new do |
|---|
| 104 |
@logger.info "enqueueing dependencies of #{args.inspect}" |
|---|
| 105 |
record = get_object args[:type], args[:id] |
|---|
| 106 |
if record |
|---|
| 107 |
objects = record.dependent_objects |
|---|
| 108 |
objects.each do |class_name, id_array| |
|---|
| 109 |
id_array.each do |id| |
|---|
| 110 |
next if id == 0 |
|---|
| 111 |
index_object :type => class_name, :id => id |
|---|
| 112 |
end |
|---|
| 113 |
end |
|---|
| 114 |
else |
|---|
| 115 |
@logger.info "record #{args.inspect} not found, no dependencies enqueued" |
|---|
| 116 |
end |
|---|
| 117 |
record.class.connection.disconnect! |
|---|
| 118 |
end |
|---|
| 119 |
end |
|---|
| 120 |
|
|---|
| 121 |
# empties the queue and inserts a single :reindex action |
|---|
| 122 |
def rebuild_index |
|---|
| 123 |
@queue.clear |
|---|
| 124 |
enqueue({}, :reindex) |
|---|
| 125 |
rescue |
|---|
| 126 |
@logger.error "caught error in rebuild_index: #{$!}\n#{$!.backtrace.join "\n"}" |
|---|
| 127 |
end |
|---|
| 128 |
|
|---|
| 129 |
def status |
|---|
| 130 |
returning status = { :objects_in_queue => @queue.size } do |
|---|
| 131 |
if rebuilding? |
|---|
| 132 |
status[:reindexing_since] = @reindexing_since |
|---|
| 133 |
status[:last_reindexing_time] = @last_reindexing_time |
|---|
| 134 |
status[:percentage] = (Time.now - @reindexing_since)*100.0 / @last_reindexing_time if @last_reindexing_time |
|---|
| 135 |
end |
|---|
| 136 |
end |
|---|
| 137 |
rescue |
|---|
| 138 |
@logger.error "caught error in status: #{$!}\n#{$!.backtrace.join "\n"}" |
|---|
| 139 |
end |
|---|
| 140 |
|
|---|
| 141 |
def rebuilding? |
|---|
| 142 |
!@reindexing_since.nil? |
|---|
| 143 |
end |
|---|
| 144 |
|
|---|
| 145 |
|
|---|
| 146 |
protected |
|---|
| 147 |
|
|---|
| 148 |
# Initialize the Ferret Server by creating the UniqueQueue for indexing-requests, |
|---|
| 149 |
# loading the current index from disk to RAM and initializing the Server-Logfile |
|---|
| 150 |
def initialize_server |
|---|
| 151 |
@queue = UniqueQueue.new |
|---|
| 152 |
@logger.info "#{Time.now} initialized indexer worker in #{ENV['RAILS_ENV']} mode" |
|---|
| 153 |
end |
|---|
| 154 |
|
|---|
| 155 |
# === Process queue entries |
|---|
| 156 |
# |
|---|
| 157 |
# Main method to process the entries of the indexing queue. Each element in the |
|---|
| 158 |
# queue consit of a class_name, an id and a action. We know three actions |
|---|
| 159 |
# for elements in the queue: |
|---|
| 160 |
# - add : Add the class_name/id AR-object to the index (or update the object) |
|---|
| 161 |
# - remove : Remove the class_name/id AR-object from the index |
|---|
| 162 |
# - reindex : Run a complete reindex, this can be triggered via the |
|---|
| 163 |
# admin/search_controller action :reindex. |
|---|
| 164 |
def process_record( record, writer ) |
|---|
| 165 |
class_name, id, action = record |
|---|
| 166 |
begin |
|---|
| 167 |
case action |
|---|
| 168 |
when :add : |
|---|
| 169 |
object = get_object(class_name, id) |
|---|
| 170 |
@logger.info "indexing #{record.inspect}" |
|---|
| 171 |
Indexer.index_local( object, writer ) if object |
|---|
| 172 |
when :remove : |
|---|
| 173 |
@logger.info "deleting #{record.inspect}" |
|---|
| 174 |
Indexer.delete_local( { :type => class_name, :id => id }, writer ) |
|---|
| 175 |
when :reindex : |
|---|
| 176 |
@logger.info "rebuilding the whole index" |
|---|
| 177 |
@reindexing_since = Time.now |
|---|
| 178 |
Indexer.reindex( writer ) |
|---|
| 179 |
@last_reindexing_time = Time.now - @reindexing_since |
|---|
| 180 |
@reindexing_since = nil |
|---|
| 181 |
else raise 'unknown action' |
|---|
| 182 |
end |
|---|
| 183 |
rescue |
|---|
| 184 |
@logger.error "caught error inside loop: #{$!}\n#{$!.backtrace.join "\n"}" |
|---|
| 185 |
end |
|---|
| 186 |
end |
|---|
| 187 |
|
|---|
| 188 |
# Fetch the object from the database |
|---|
| 189 |
def get_object(class_name, id) |
|---|
| 190 |
begin |
|---|
| 191 |
clazz = class_name.constantize |
|---|
| 192 |
obj = clazz.find(id) |
|---|
| 193 |
@logger.info "fetched object #{obj.inspect}" |
|---|
| 194 |
return obj |
|---|
| 195 |
rescue ActiveRecord::RecordNotFound |
|---|
| 196 |
@logger.warn "#{Time.now} record not found: #{class_name} - #{id}, sleeping for a short while" |
|---|
| 197 |
sleep 3 |
|---|
| 198 |
begin |
|---|
| 199 |
return clazz.find(id) |
|---|
| 200 |
rescue ActiveRecord::RecordNotFound |
|---|
| 201 |
@logger.error "still no luck with record: #{class_name} - #{id}, giving up :(" |
|---|
| 202 |
end |
|---|
| 203 |
rescue ActiveRecord::StatementInvalid |
|---|
| 204 |
@logger.error "caught statement invalid: #{$!}\n#{$!.backtrace}\nforcing db reconnect and re-enqueueing object" |
|---|
| 205 |
clazz.connection.reconnect! |
|---|
| 206 |
# re-enqueue object |
|---|
| 207 |
enqueue({ :type => class_name, :id => id }, :add) |
|---|
| 208 |
end |
|---|
| 209 |
return nil |
|---|
| 210 |
end |
|---|
| 211 |
|
|---|
| 212 |
# Main method to add elements to the queue. @queue << will |
|---|
| 213 |
# return false if the element is already in the queue, |
|---|
| 214 |
# avoiding unneccessary double-indexing. |
|---|
| 215 |
def enqueue(args, action) |
|---|
| 216 |
unless @queue << [ args[:type], args[:id], action ] |
|---|
| 217 |
@logger.debug "skipped enqueue, entry already present" |
|---|
| 218 |
end |
|---|
| 219 |
end |
|---|
| 220 |
|
|---|
| 221 |
def switch_index |
|---|
| 222 |
@logger.debug 'switching..' |
|---|
| 223 |
synchronize do |
|---|
| 224 |
relink |
|---|
| 225 |
sync |
|---|
| 226 |
end |
|---|
| 227 |
end |
|---|
| 228 |
|
|---|
| 229 |
def relink |
|---|
| 230 |
online = File.readlink(Indexer.online_index) |
|---|
| 231 |
offline = File.readlink(Indexer.offline_index) |
|---|
| 232 |
File.delete Indexer.offline_index |
|---|
| 233 |
File.delete Indexer.online_index |
|---|
| 234 |
FileUtils.ln_s offline, Indexer.online_index, :force => true |
|---|
| 235 |
FileUtils.ln_s online, Indexer.offline_index, :force => true |
|---|
| 236 |
# Inform the mongrel servers that a new index exists. |
|---|
| 237 |
FileUtils.touch Indexer.index_status_file |
|---|
| 238 |
end |
|---|
| 239 |
|
|---|
| 240 |
def sync |
|---|
| 241 |
@logger.info("syncing from #{Indexer.online_index}/ to #{Indexer.offline_index}") |
|---|
| 242 |
system("rsync -r --delete --verbose #{Indexer.online_index}/ #{Indexer.offline_index}") |
|---|
| 243 |
end |
|---|
| 244 |
end |
|---|
| 245 |
end |
|---|
| 246 |
end |
|---|