Background

上一篇介紹了Redis,本篇繼續介紹 Redsi Queue的基本運用。

在Web的世界裡我們有時候為了讓API有更好的Peformance我們常會採用非同步的方法來減輕Server的負擔。舉例來說比如說使用者呼叫一隻API要求我們寄封email給他們,基本的做法會等到email成功寄出了API才能回應給使用者。但這樣的方式會大幅增加API的response time。如果呼叫此API的使用者暴增的時候通常會導致Server crash。

因此 Redis Queue 就是一個常見用來處理非同步的任務的工具。讓我們繼續看下去...

Installation

$ mkdir redisqueue
$ cd redisqueue
$ virtualenv .venv
$ source .venv/bin/activate
$ pip3 install redis
$ pip3 install rq
$ pip3 install requests
$ touch run.py worker.py func.py

Getting started

run.py

from rq import Worker, Queue, Connection
from func import count_words_at_url
import redis
import sys
import os
import time
import requests

rds = redis.Redis(
    host="127.0.0.1",
    port=6379,
    password="",
)

q = Queue("name", connection=rds)

def main():

    # Delay execution of count_words_at_url('http://nvie.com')
    job = q.enqueue(count_words_at_url, 'http://nvie.com')
    print(job.result)   # => None

    # Now, wait a while, until the worker is finished
    time.sleep(3)
    print(job.result)   # => 337

if __name__ == "__main__":
    main()

func.py

import requests

def count_words_at_url(url):
    resp = requests.get(url)
    return len(resp.text.split())

worker.py

from rq import Worker, Queue, Connection
from run import rds, q

worker = Worker([q], connection=rds, name='name')
print(worker.name)
worker.work()

以上是相關程式碼,填完之後我們需要開兩個 Terminal 視窗來完成整個流程

$ python3 worker.py
name
16:50:22 RQ worker 'rq:worker:name' started, version 1.0
16:50:22 *** Listening on name...

先執行 worker.py 我們可以發現程式正在 Listening on name... (監聽Queue的名稱為Name的Queue)

接著我們執行run.py的程式

$ python3 run.py
None
337

我們可以看到輸出了兩個值 1. None, 2. 337

第一個值為None是因為worker還沒有處理完相對應func的內容,過幾秒後處理完我們則順利拿到 337 這個答案。我們可以看回執行 worker.py 的 Terminal 發現內容如下:

$ python3 worker.py
name
16:50:22 RQ worker 'rq:worker:name' started, version 1.0
16:50:22 *** Listening on name...
16:52:07 name: func.count_words_at_url('http://nvie.com') (07416ce1-8b0f-4c02-a33c-6d7a6727adbc)
16:52:07 name: Job OK (07416ce1-8b0f-4c02-a33c-6d7a6727adbc)
16:52:07 Result is kept for 500 seconds

發現他監聽到有新的任務進Queue並且把它執行完成。那這整個流程就是一個非同步的流程,透過一個在Background執行的程式來處理一些比較耗時的動作,來節省使用者等待的時間,執行完之後再通知使用者即可。

Exception

順利啟動 worker 之後那我們會遇到另外一個問題,如果在 worker 執行的時候發生了問題時該怎麼辦? 這時候我們可以透過 Exception 來幫助我們處理 worker 失敗時候應該要走的流程。

worker.py

from rq import Worker, Queue, Connection
from run import rds, q

def __error_handler(job, exc_type, exc_value, traceback):
    print("run error_handler\n\n\n")
    print(job, exc_type, exc_value, traceback)
    # do something u want to do
    # ...
    # ...

worker = Worker(
    [q],
    connection=rds,
    name="name",
    exception_handlers=[__error_handler],
    disable_default_exception_handler=True,
)
print(worker.name)
worker.work()

在 worker 的程式中我們加入了 __error_handler 來取代原本的 handler. 那為了能測試 error_handler 我們在 func.py 中加入 rasie error 的相關代碼。

func.py

import requests

def count_words_at_url(url):
    resp = requests.get(url)
    raise ValueError('A very specific bad thing happened.')
    return len(resp.text.split())

接著我們一樣開兩個 Terminal 來看一下結果:

$ python3 worker.py
name
17:03:25 RQ worker 'rq:worker:name' started, version 1.0
17:03:25 *** Listening on name...
$ python3 run.py
None
None

會發現原本應該出現 337 的地方一樣是None,表示error有觸發成功。接著我們看回執行 woker.py 的Terminal

$ python3 worker.py
name
17:03:25 RQ worker 'rq:worker:name' started, version 1.0
17:03:25 *** Listening on name...
17:03:25 Cleaning registries for queue: name
17:03:29 name: func.count_words_at_url('http://nvie.com') (d7f8955d-e359-445d-9bc8-6efff924bcd0)
17:03:30 ValueError: A very specific bad thing happened.
Traceback (most recent call last):
  File "/Users/taiker/workspace/redisqueue/.venv/lib/python3.7/site-packages/rq/worker.py", line 812, in perform_job
    rv = job.perform()
  File "/Users/taiker/workspace/redisqueue/.venv/lib/python3.7/site-packages/rq/job.py", line 588, in perform
    self._result = self._execute()
  File "/Users/taiker/workspace/redisqueue/.venv/lib/python3.7/site-packages/rq/job.py", line 594, in _execute
    return self.func(*self.args, **self.kwargs)
  File "/Users/taiker/workspace/redisqueue/func.py", line 5, in count_words_at_url
    raise ValueError('A very specific bad thing happened.')
ValueError: A very specific bad thing happened.
Traceback (most recent call last):
  File "/Users/taiker/workspace/redisqueue/.venv/lib/python3.7/site-packages/rq/worker.py", line 812, in perform_job
    rv = job.perform()
  File "/Users/taiker/workspace/redisqueue/.venv/lib/python3.7/site-packages/rq/job.py", line 588, in perform
    self._result = self._execute()
  File "/Users/taiker/workspace/redisqueue/.venv/lib/python3.7/site-packages/rq/job.py", line 594, in _execute
    return self.func(*self.args, **self.kwargs)
  File "/Users/taiker/workspace/redisqueue/func.py", line 5, in count_words_at_url
    raise ValueError('A very specific bad thing happened.')
ValueError: A very specific bad thing happened.
run error_handler



<Job d7f8955d-e359-445d-9bc8-6efff924bcd0: func.count_words_at_url('http://nvie.com')> <class 'ValueError'> A very specific bad thing happened. <traceback object at 0x1018a22c8>

發現果然有Error log的產生,在最下面的部分也有看有順利執行我們 __error_handler 的程式碼。所以大家可以自己在 __error_handler 程式中自己加入在 worker 執行任務失敗的時候要走的流程。

以上就是簡單Redis Queue的使用方式。

Reference