티스토리 뷰
지난 2분기에는 "물류센터 작업자의 수고로움을 덜 수 있도록 한번의 처리 요청으로 전체 대상을 처리할 수 있도록 해달라" 라는 요구사항과 관련된 태스크를 많이 처리하게 되었다.
개중에는 전체 처리 시간이 너무 오래걸려 request - response flow 안에서 동기적으로 처리하기 무리인 것들이 있었는데
이런 경우는 클라이언트에 응답은 바로 주고, 작업은 job으로 비동기 실행하도록 했다.
그런데 이때 작업자에게 노티할 수 있도록 슬랙으로 시작, 완료 알람도 보내야하고
실행하는 동안 몇건 성공했고, 몇건을 어떤 이유로 실패했는지도 전달하고 싶었다. (기획자분이 그렇게까지 요구하진 않았지만...)
어떻게 복잡하지 않게, 그렇지만 정확하게 여러 job들을 하나의 단위로 묶고, 그와 관련된 메타 데이터들을 저장할 수 있을까?
그런데 sidekiq document를 보다가 유료인 pro 버전에서는 batch 기능을 제공한다는 것을 알게되었다.
쓰고 싶긴 한데.. 유료이고 흠..
근데, 또 이 batch 기능만 (pro 버전과 거의 동일하게) open source gem으로 제공되고 있다는 것이 아닌가!! sidekiq-batch
적용이 잘 될지 말지 긴가민가한 마음으로 들여봤는데, 몇몇 시행착오가 있긴 했지만 아직까지는 원하는대로 잘 동작해주고 있는 것 같다 (제발! 🙏)
그래서 아직 기억이 생생할 때 정리해 두려고 한다!
코드 참고 하기 : sidekiq-batch-example
1. 기본 골자
sidekiq 위키에서 소개하고 있는 기본 사용법은 다음과 같다.
batch = Sidekiq::Batch.new
batch.description = "Batch description (this is optional)"
batch.on(:success, MyCallback, :to => user.email)
batch.jobs do
rows.each { |row| RowWorker.perform_async(row) }
end
puts "Just started Batch #{batch.bid}"
# callback
class MyCallback
def on_complete(status, options)
puts "Uh oh, batch has failures" if status.failures != 0
end
def on_success(status, options)
puts "batch succeeded!. send email to #{options['to']}"
end
end
* The jobs method is atomic. All jobs created in the block are actually pushed atomically at the end of the block. If an error is raised, none of the jobs will go to Redis
batch 인스턴스를 생성하고 batch.jobs
블럭 안에서 job을 생성하면 된다.
( 문서에 따르면 이 job들은 redis에 atomic하게 들어간다고 한다..)
그리고 callback 함수가 (이름은 on_complete, on_success 여야한다) 정의되어 있는 클래스를 인자로 넘겨주서 등록한다.
on_success
: batch 안의 모든 job들이 성공했을 때
on_complete
: 하나라도 실패가 발생했을 때
그럼 status에 넘어온 각종 값들로 콜백 처리를 하면 된다.
total (전체 실행한 job 개수), failures (실패한 job개수) 등등..
그런데 실제 개발하면서 테스트 해보니, 이 count들이 싱크가 잘 맞지 않기도 하고
나는 추가로 에러 메세지나 실패한 job argument 같은 것들도 저장하고 싶은데, 기본 gem에서는 제공해주지 않았다.
어떻게 하면 좋을까?
2. 미들웨어
그래서 sidekiq-batch 코드를 까보다가 괜찮은 해결책을 발견했다.
이 gem에서는 콜백으로 넘겨주는 status의 각종 값들이 어떻게 처리되어 넘어오는고 하니
job 처리시 거치게되는 미들웨어에서 'bid'라는 키가 존재하면 batch에 속한 job이란걸 파악하고
에러 발생이 없었다면 process_successful_job이나 rescue 구문에서 process_failed_job 을 호출하고 있었다.
그리고 저 메소드들에서 레디스에 (키 이름에 해당 bid가 들어가게 지정하고) 성공 count, 실패 count 등을 하나씩 업데이트 치고 있는 것이었다.
그래서 나도 내가 원하는 값들을 (에러 메세지, 실패한 job argument 등..) 추가로 저장할 수 있도록 sidekiq 미들웨어 chain에
내가 작성한 미들웨어를 하나 더 추가했다.
class SidekiqBatchServerMiddleware
def call(_worker, msg, _queue)
if (bid = msg['bid']) && (args = msg['args'])
begin
yield
Sidekiq.redis {|r| r.hincrby("BID-#{bid}", "success_count", 1)}
rescue => exception
body = { args: args, err: exception.message }.to_json
Sidekiq.redis {|r| r.sadd("BID-#{bid}-failed-arguments", body)}
raise
end
else
yield
end
end
end
Sidekiq.configure_server do |config|
config.server_middleware do |chain|
chain.add SidekiqBatchServerMiddleware
end
end
이렇게 저장해둔 값들은 callback 함수에서 레디스에서 직접 꺼내서 사용하도록 했다.. (더 견고하고 아름다운 방식이 있겠지만.. 일단은 이게 최선이었다.)
class Callback
def on_complete(status, options)
return if status.failures == 0
bid = options.fetch('bid')
job_class = options.fetch('job_class')
description, total_count, success_count, failed_arguments = Sidekiq.redis do |r|
r.multi do |pipeline|
pipeline.hget("BID-#{bid}", 'description')
pipeline.hget("BID-#{bid}", 'size')
pipeline.hget("BID-#{bid}", 'success_count')
pipeline.smembers("BID-#{bid}-failed-arguments")
pipeline.del("BID-#{bid}-failed-arguments")
end
end
success_count ||= 0
failed_arguments = failed_arguments.map {|x| JSON.parse(x) }
job_class.constantize.send_end_message(description, total_count, failed_arguments)
end
def on_success(status, options)
bid = options.fetch('bid')
job_class = options.fetch('job_class')
description, total_count, success_count = Sidekiq.redis do |r|
r.multi do |pipeline|
pipeline.hget("BID-#{bid}", 'description')
pipeline.hget("BID-#{bid}", 'size')
pipeline.hget("BID-#{bid}", 'success_count')
end
end
job_class.constantize.send_end_message(description, total_count)
end
end
콜백 함수에서 description, total 개수, 성공개수, 실패 인자 등을 가져와서 슬랙 알림 메소드에 파라미터로 넘겨주도록 구성했다.
여기서 맨처음에는 total_count = 성공 개수 + 실패 개수로 계산한 뒤 알림을 보내는 방식이었는데,
밤에 자려고 누웠더니 1930건의 알림톡 발송을 시작합니다. => 2194건의 알림톡 발송을 완료했습니다.
라고 개수 싱크가 한번 안 맞았던 적이 있어서 좀 당황했는데.. 🥲
total_count는 job push 할때 저장한 값 그대로 사용하고
성공 개수 = total - 실패 개수로 방식을 바꾼후로는 그런 문제가 다시 발생하지 않고 있다. (다행이다...)
그때 원인이 뭐였는지 모르겠는데.. 사이드킥을 점점 딥하게 사용할수록 생각지 못했던 문제들이 간혹 발생해서 어려운 것 같다.
3. push_bulk
이 다음의 고민거리는 이거였다.
예를 들어 한번에 타겟이 되는 레코드가 극단적으로 10,000,000 정도가 된다고 쳤을 때
위의 기본 예시처럼 batch.jobs 블럭 안에서 job을 생성하려면 10,000,000 건의 데이터를 모두 한번에 서버 메모리로 가져와서 처리하고 job을 생성해야 한다.
이건 서버 oom 발생 가능성도 있고.. 좋지 않은 것 같다.
적당한 batch_size로 끊어서 가져오고, 레디스에 job push 도 효율적으로 하려면 어떻게 하는게 좋을까?
class BatchJob
include Sidekiq::Worker
include ConcurrentExecutionHelper
def initialize(job_class, description="", queue="default")
@job_class = job_class
@queue = queue
@batch = Sidekiq::Batch.new
@bid = @batch.bid
@batch.description = description
@batch.on(:complete, BatchJob::Callback, bid: @bid, job_class: job_class)
@batch.on(:success, BatchJob::Callback, bid: @bid, job_class: job_class)
end
def push_jobs(enumerable, batch_size)
# argument 계산을 위한 block 필요
raise 'job argument block needed' unless block_given?
return unless enumerable&.any? == true
total_count = 0
chunks = chunk_fetch(enumerable, batch_size)
@batch.jobs do
chunks.each do |items|
arguments = yield items
Sidekiq::Client.push_bulk('class' => @job_class, 'args' => arguments, 'queue' => @queue)
total_count += items.count
end
end
Sidekiq.redis {|r| r.hset("BID-#{@bid}", "size", total_count)}
@job_class.constantize.send_start_message(@batch.description, total_count)
end
end
1) 끊어서 가져오기
이전에 재사용하려고 concurrent 실행과 관련된 모듈을 작성해둔적이 있는데
그 chunk_fetch 함수를 사용해서 batch_size 만큼 끊어서 가져와서 job_arguments를 계산한다.
def chunk_fetch(enumerable, batch_size)
chunks =
if enumerable.respond_to? :find_in_batches
enumerable.find_in_batches(batch_size: batch_size)
else
enumerable.each_slice(batch_size)
end
chunks
end
active record에 find_in_batches라는 엄청 편리한 메소드가 있는데
예를들여 batch_size = 30으로 지정했다면 다음과 같은 쿼리로 변환된다
2) Sidekiq::Client.push_bulk
메소드로 계산한 job_arguments 들 한번에 레디스에 집어넣기.
perform_async로 하나씩 집어넣을 때와 비교해 redis로의 round trip 비용을 획기적으로 줄일 수 있다.
예를들어 job이 1000건이라면 1000번 -> 1번으로.
4. 범용 사용을 위한 batch_job 클래스
이로써 이번에 sidekiq-batch gem을 어떻게 커스텀해서 사용했는지는 다 소개한 것 같다.
batch_job 클래스를 만들어둬서 어떤 종류의 job이든 범용적으로 사용할 수 있게 했는데
( 시작 슬랙 전송 -> job 묶음으로 실행 -> 완료 슬랙 전송 )
현재로서는 알림톡과 정산서 발행등에 적용해둔 상태다.
batch_job 클래스는 다음과 같이 사용하면 된다.
1) batch 실행을 원하는 job 클래스에 슬랙 메세지 관련 메소드 정의해두기
send_start_message(description, total_count)
send_end_message(description, success_count, failed_arguments)
2) batch_job 호출 코드 예시
# @param [String] Job클래스 이름
# @param [String] description (batch에 대한 설명. 슬랙 메세지에 사용될 것입니다)
# @param [String] sidekiq 큐 이름
batch_job = BatchJob.new('CreateBillJob', @month, 'create_bill')
# 1000개씩 쪼개서 job_arguments를 계산한뒤 레디스에 한꺼번에 push
batch_job.push_jobs(User, 1000) do |users|
# job_arguments
users.map { |user| [user.id, @month] }
end
끝 ~
'시리즈 > Ruby' 카테고리의 다른 글
omniauth로 rails app에 oauth2 인증 추가하기 (0) | 2023.03.22 |
---|---|
Rails 스케줄링 방식 비교 (0) | 2022.08.05 |
루비 block, lambda, proc (0) | 2022.01.23 |
사이드킥 (0) | 2021.11.12 |
Active Record 연관 데이터 로드 (0) | 2021.10.12 |