One of the benefits of Sidekiq is that it allows your application to work on a single task using simultaneous worker processes which also means that you have a set of additional problems
- Ensure separate workers do not save the same record result at the same causing data duplication
- Ensure separate workers do not work on the same data
Sidekiq has proved useful on my web scraping code for my Northern Ireland Property Web Site at http://www.propertytrackerni.co.uk
Obviously neither of these issues is something that you would want to happen in your working application and the issue if data duplication can be easily resolved by adding unique database constraints so that the same data cannot be added to your database table.
The second issue can be resolved by marking off data that has been processed already which I have achieved by adding the below line to the code
if pdate == nil or (pdate.year <= Time.now.year and pdate.mon != Time.now.month)
This pdate contains the date the search area record was last processed and as this task is monthly this means search areas that have already been processed will be passed over. Now this does mean that later workers will have to iterate over search params that have already been processed but as the number of search areas can be measured in the thousands (Northern Ireland is quote small) this is not much overhead. The WHERE ActiveRecord statement could be used on top of the top condition e.g.
SearchParams.where(“searchdate is null or (EXTRACT(YEAR FROM searchdate) <= EXTRACT(YEAR FROM now()) AND EXTRACT(MONTH FROM searchdate) <= EXTRACT(MONTH FROM now()))find_each(start:isize, batch_size: batchsize)
However as find_each retrieves data in batches it is possible another worker could have retrieved one of your batches and it is now processed so the if condition is needed as well as the database constraint. The database constraint is needed as it is possible that the more than one worker could have grabbed the row before a database update has been done.
I would be interested in hearing about better methods that could be used to achieve this using SideKiq and Ruby. For example can I specify the number of workers dynamically and then calculate how the records to be processed are split among the workers. This does not seem the case in sidekiq.yml. I know I can specify the maximum number of workers but that is it
:verbose: false
:pidfile: ./tmp/pids/sidekiq.pid
:logfile: ./log/sidekiq.log
:concurrency: 25
:queues:
- default
- queue_analysis
- queue_scraper
:limits:
queue_analysis: 1
queue_scraper: 15
The code is below
class ParseResultsWorker
include Sidekiq::Worker
sidekiq_options :queue => :queue_scraper
def perform(*args)
isize = 1
batchsize = 10000
stime = Time.now
vst = stime.strftime("%H:%M:%S");
tstat = Transstatus.find_by name: 'ParseResultsWorker', created_at: Time.now.utc.to_date
if tstat.nil?
tstat = Transstatus.create(:name => "ParseResultsWorker" )
end
Rails.logger.debug ' ParseResultsWorker start job ' + vst
SearchParams.find_each(start:isize, batch_size: batchsize) do |params|
pdate = params['searchdate']
if pdate == nil or (pdate.year <= Time.now.year and pdate.mon != Time.now.month)
tstat.update(currentparam: params.searchparam)
@pnewscrawl = PropertyNewsCrawler.new('http://www.propertynews.com', params.searchparam)
@pnewscrawl.findresult
end
end
etime = Time.now
vet = etime.strftime("%H:%M:%S");
PropertySite.lastscanned
Rails.logger.debug ' ParseResultsWorker end job ' + vet
rescue StandardError => e
Rails.logger.debug 'Error running ParseResultsWorker.perform ' + e.message
return false;
end
end