Главная
Блог разработчиков phpBB
 
+ 17 предустановленных модов
+ SEO-оптимизация форума
+ авторизация через соц. сети
+ защита от спама

О том, как я качал музыку с VK

Anna | 16.06.2014 | нет комментариев

Предыстория

Всякое утро я езжу на работу и это занимает N-ое число времени от 15 минут (на машине) до 40 минут (на общественном транспорте). К сожалению, утром по радио вертят вовсе не музыку, а различные «развлекательные» программы. Дюже длинно я ездил либо с отключенным магнитофоном, либо всю дорогу искал радиостанцию, либо врубал наушники (пока не раздавил свой телефон).

И вот мне это наскучило. Магнитола у меня из недорогих, но может читать с флешек. В один красивый день, по дороге на работу, я взял и приобрел SD-карточку (комфортней каждого потому как не выпирает). Все отлично, но сейчас вопрос стал напротив: «Где взять музыку?». Не длинно думая решил, что мне хватит плейлиста с VK. Каждого-то 400 песен, но их необходимо выкачать.

Посмотрев на решения, которые дозволено обнаружить в интернете, решил написать свое. Сотворил план на django, настроил ее на работу с couchdb и принялся писать.

Поводы

Несколько причин, по которым я решил написать свое, а не применять готовое решение.
— не хотел устанавливать какой-то плагин/програму для скачивания
— качать вручную по одному файлу
— да и вообще хотелось что-то свое

Что я хотел получить

Результат на данный вопрос достаточно примитивен. Наименьший комплект требований: зашел на сайт, что-то нажал, увидел аудиозаписи, нажал кнопку, скачал их на компьютер.

Дальше о том, как это происходило (пытался восстановить настоящий ход событий).

Приобретение аудиозаписей

Для приобретения доступа к аудиозаписям взял за основу VK Api.

Этап №1. Вначале авторизация и приобретение токена. (не буду описывать API VK потому как все это дозволено обнаружить у них на сайте).
Через несколько минут в папке с django-аппликацией был сделан файл vkapi.py и добавлено приблизительно следующее содержимое:

vkapi.py

def authorize():
    payload = {
        'client_id': settings.APP_ID,
        'scope': ','.join(settings.APP_PERMISSIONS),
        # TODO: сменить нах
        'redirect_uri': 'http://127.0.0.1:8000/vkapi/authorize',
        'response_type': 'code',
        'v': settings.APP_API_VERSION
    }
    return 'https://oauth.vk.com/authorize?%s' % urllib.urlencode(payload)

А в файл views.py добавлена вьюха:

views.py

def vk_authorize(request):
    return redirect(authorize())

Выходит мы получили code, тот, что передается параметром на redirect_url. Сейчас нам необходимо получить access_token.

На данном этапе меня тревожил вопрос где его беречь. Первоначально думал сделать регистрацию и вероятность подключения VK только для зарегистрированных пользователей, а access_token писать в документ (документ, потому как couchdb) пользователя. Но что, если я не хочу входить либо регистрироваться… Хватит сессии. Не вижу смысла чего-то большего для своих нужд.

Так как лень застала меня врасплох, я решил не разделять URL для авторизации и приобретения access_token’a и вьюха vk_authorize купила дальнейший, не особенно прекрасный вид:

vk_authorize

def vk_authorize(request):
    # подумать как перенести в мидлварь
    if request.GET.get('code'):
        code = request.GET['code']
        r = access_token_get(code)
        print r.text
        data = r.json()

        if data.get('error'):
            raise Http404("Error: %s. Desc: %s" % (data['error'], data['error_description']))

        data['date'] = datetime.now()
        request.session['vkapi'] = data
        return redirect('main')

    elif request.GET.get('error'):
        error = request.GET['error']
        error_description = request.GET.get('error_description')
        raise Http404("Error: %s. Desc: %s" % (error, error_description))

а в vkapi.py дописана функция для приобретения access_token’a

access_token

def access_token_get(code):
    payload = {
        'client_id': settings.APP_ID,
        'client_secret': settings.APP_SECRET,
        'code': code,
        'redirect_uri': 'http://127.0.0.1:8000/vkapi/authorize',
    }

    return requests.get('https://oauth.vk.com/access_token', params=payload)

Этап №2. У нас теснее есть access_token и пишем его в сессию. Дозволено начать доставать аудиозаписи. Так в файл vkapi.py дописана еще две функции. Одна всеобщая для запросов api вконтакте, а вторая для приобретения аудиозаписей пользователя.

vkapi.py

def request_api(method, access_token, params):
    """
    Для того Дабы вызвать способ API ВКонтакте, Вам нужно осуществить
    POST либо GET запрос по протоколу HTTPS на указанный URL:

https://api.vk.com/method/'''METHOD_NAME'''?'''PARAMETERS'''&access_token='''ACCESS_TOKEN'''

    METHOD_NAME – наименование способа из списка функций API (http://vk.com/dev/methods),
    PARAMETERS – параметры соответствующего способа API,
    ACCESS_TOKEN – ключ доступа, полученный в итоге удачной авторизации приложения.
    """
    payload = {
        'access_token': access_token,
        'v': settings.APP_API_VERSION,
    }
    payload.update(params)
    r = requests.get('https://api.vk.com/method/%s' % method, params=payload)
    return r.json().get('response', {})

def audio_get(session):

    params = {
        'owner_id': session['user_id'],
        'count': 6000,
    }
    return request_api('audio.get', session['access_token'], params)

Файл views.py в свою очередь пополнился еще одной вьюхой:

vk_audios

@render_to("downloader/vk_audios.html")
def vk_audios(request):
    audios = []
    if request.session.get('vkapi'):
        # TODO: мидлварь, которая будет обновлять access_token
        audios = audio_get(request.session['vkapi'])

    return {
        'audios': audios,
    }

Отменно, я получил список всех своих аудиозаписей. Было написано еще немножко кода на приобретение списка альбомов и отображения их песен, а также для поиска аудиозаписей. Дозволено увидеть, что я возвращаю только ‘response’. Так вот, я решил легко не заморачиватся, если запрос ложный :)

К сожалению, оставалось еще это: “# TODO: мидлварь, которая будет обновлять access_token”. Была написана мидлварь access_token.py со дальнейшим содержимым:

access_token

 # *  coding: utf8 *
from datetime import datetime, timedelta
from django.conf import settings
from django.shortcuts import redirect
from downloader.vkapi import access_token_get

class AccessTokenMiddleware(object):

    def process_request(self, request):

        if request.session.get('vkapi'):
            data = request.session['vkapi']
            expired = data['date']  timedelta(seconds=int(data['expires_in']))
            if (expired  datetime.now()).seconds < 3600:
                return redirect('vk_authorize')

        return None

Но здесь, видимо, я протупил и описал process_request взамен process_response и меня непрерывно редиректило на авторизацию. Не длинно думаю мидлварь была переписана в декоратор (решил, что дозволено получать новейший токен за час до того как он станет просроченным, считаю не принципиальным).

Отчего декоратор? Ну здесь это… про process_response подумалось даже на дальнейший день, а переделывать что-то работающее не хотелось.

authorize_require decorator

def authorize_require(view_func):

    def check_session(request, *args, **kwargs):
        if request.session.get('vkapi'):
            data = request.session['vkapi']
            expired = data['date']  timedelta(seconds=int(data['expires_in']))
            if (expired - datetime.now()).seconds < 3600:
                return redirect('vk_authorize')
        else:
            return redirect('vk_authorize')
        return view_func(request, *args, **kwargs)

    return check_session

Сейчас у меня был список аудиозаписей и механическое обновление токена (там, где это необходимо легко сначала фун-ции дописывался декоратор). Еще позднее была добавлена простенькая регистрация (не знаю для чего).

Регистрация (отхождение)

Обыкновенно использую такую регистрацию, когда пишу для себя. Стремительно и дешево, да и хватает с головой.

В urls.py добавляю строку:

url(r'^registration/$', 'downloader.views.registration', name='registration'),

views.py пополняется такой вьюхой:

@render_to("registration/registration.html")
def registration(request):

    form = RegistrationForm()

    if request.method == "POST":
        form = RegistrationForm(data=request.POST)
        if form.is_valid():
            user = User(_db=request.db,
                        is_active=True,
                        is_superuser=False,
                        type='user',
                        permissions=[],
                        groups=[], )
            username = form.cleaned_data['username']
            password = form.cleaned_data['password']
            user.update({"username": username, 'password': make_password(password)})
            user.create('u_%s' % slugify(username))

            auth_user = authenticate(username=username, password=password)
            login(request, auth_user)
            return redirect('main')

    return {
        "form": form
    }

Вьюха создает нового пользователя в CouchDB и сразу же его авторизирует, позже чего кидает на 1 страницу.

Форма RegistrationForm выглядит вот так:
forms.py

class RegistrationForm(forms.Form):

    username = forms.EmailField(label=_('Username'), max_length=30)
    password = forms.CharField(widget=forms.PasswordInput(), label=_('Password'))

    def clean_username(self):
        username = self.cleaned_data['username']
        user = django_couch.db().view('auth/auth', key=username).rows
        if user:
            raise forms.ValidationError(_("Username already in use"))
        return username

registration/registration.html

{% extends "base.html" %}
{% load i18n %}
{% load bootstrap_toolkit %}

{% block title %}
 - {% trans "Registration" %}
{% endblock %}

{% block lib %}
    <link rel="stylesheet" href="/media/css/authentification.css" />
{% endblock %}

{% block body %}
<div>
    <div id="register-form">
        <form action="" method="post">
            <legend>
                <h2>{% trans 'Registration' %}</h2>
            </legend>
            {{ form|as_bootstrap }}{% csrf_token %}
            <div>
                <button type="submit">{% trans 'Register' %}</button>  
                <small>
                    <a href="{% url login %}"> {% trans 'Login' %}</a>
                </small>
            </div>
        </form>
    </div>
</div>
{% endblock %}

Скачивание

Этап №3. Выходит мы теснее получаем список аудиозаписей. Сейчас необходимо их скачать. Безусловно дозволено пройтись каким-то ботом по всякой ссылке и скачать, но мне необходимо было получить на выходе либо папку с аудиозаписями, либо архив (что бы скачать все сразу).

Пагинация (отхождение)

Осенило меня в общем… если у пользователя будет over100500 аудиозаписей, то браузер легко загнется при рендеринге и было решено добавить пагинацию.

Функция audio_get преобразилась приблизительно до такого вида, что дало вероятность сделать пагинацию:

def audio_get(session, album_id='', offset=0, count=100):

    params = {
        'owner_id': session['user_id'],
        'offset': offset,
        'count': count,
        'album_id': album_id,
    }
    return request_api('audio.get', session['access_token'], params)

vk_audios в файле view.py купила приблизительно такой вид:

@authorize_require
@render_to("downloader/vk_audios.html")
def vk_audios(request, album_id=''):  

    try:
        page = int(request.GET.get('page', 1))
    except:
        raise Http404("Error page param: %s" % request.GET['page'])

    offset = 100 * (int(page) - 1)

    response = audio_get(request.session['vkapi'], album_id=album_id, offset=offset)

    audios = response.get('items', [])
    audios_count = response.get('count')

     return {
        'album_id': album_id,
        'audios_count': audios_count,
        'page': page,
        'offset': offset,
        'audios': audios,
     }

Был добавлен inclusion_tag, тот, что принимал число аудиозаписей, страницу на которой находится пользователь и id альбома, что бы рендерить страницы.

@register.inclusion_tag('snippets/pagination.html')
def render_pagination(audios_count, page, album_id=False):

    pages_count = int(math.ceil(audios_count / 100.0))  1

    pages = range(1, pages_count)

    return {
        "pages": pages,
        "page": page,
        "album_id": album_id,
    }

И добавлен html-файл (snippets/pagination.html):

{% load i18n %}

{% if pages|length > 1 %}
<div>
    <ul>
    {% for p in pages %}
        <li {% ifequal p page %}class="active"{% endifequal %}>
            {% if album_id %}
            <a href="{% url vk_audios album_id %}?page={{ p }}">{{ p }}</a>
            {% else %}
            <a href="{% url vk_audios %}?page={{ p }}">{{ p }}</a>
            {% endif %}
        </li>
    {% endfor %}
    </ul>
</div>
{% endif %}

Итого я ограничил себя скачиванием по 100 файлов. Осталось их скачать.

Необходимо скачать файлы… но как? Пользователь нажал кнопку и ожидает, пока ему сервер отдаст архив? Хм… Решать задачу принялся так:
Этап №3.1 — Создание запроса на скачивание. На странице с аудиозаписями вывел форму, в которую необходимо ввести свой email и сделать запрос на скачивание.

form.py Пополнился новой формой.

forms.py

class RequestsForm(forms.Form):

    username = forms.EmailField(label=_('E-mail'),
                                help_text=_("Archive with audios will be send to this email"))

Отчего поле username? Все по причине регистрации на email. Пользователь создается с username = email, указанный при регистрации. Так, если пользователь вошел на сайт, мы можем подставить его email, а он если захочет поменяет.

Сейчас пользователь тыкает в кнопку и мы создаем документ со дальнейшей конструкцией, позже чего кладем его id в nsq:

конструкция документа в couchdb

* _id - r_<hash>
* status - new
* username - test@test.com
* is_active - true
* audios - [
   {
       "url": "<url>",
       "processed": true,
       "title": "Three Days Grace - I Hate Everything About You"
   }
]
* date_created - 2013-10-20 11:27:21.208492
* type - request

Поле status может принимать еще несколько значений: «processing», «error», «processed», «deleted».

Для документа couch’a была добавлена моделька:

models.py

class DownloadRequest(django_couch.Document):

    def __init__(self, *args, **kwargs):
        self._db = django_couch.db('db_requests')
        self.type = 'request'
        self.is_active = True
        self.status = 'new'
        self.date_created = datetime.now().isoformat(' ')
        super(DownloadRequest, self).__init__(*args, **kwargs)

    @staticmethod
    def load(resp_id):
        db = django_couch.db('db_requests')

        if resp_id in db:
            doc = DownloadRequest(db[resp_id])
            assert doc.type == 'request', _("Invalid data loaded")

            return doc

        else:
            raise Http404(_("Can't find download request with id '%s'") % id)

    @staticmethod
    def get_list(email):
        pass

Что бы класть в nsq скопирована с других мест функция:

nsq_push

def nsq_push(topic, message, fmt='json'):
    url = "http://%s/put?topic=%s" % (random.choice(settings.NSQD_HTTP_ADDRESSES), topic)

    if fmt == 'json':
        message_encoded = json.dumps(message)
    elif fmt == 'raw':
        message_encoded = message
    else:
        raise Exception("Unsupported message encode format: %s" % fmt)

    r = requests.post(url, data=message_encoded)

    return r.ok

А вьюха vk_audios купила дальнейший вид:

vk_audios

@authorize_require
@render_to("downloader/vk_audios.html")
def vk_audios(request, album_id=''):

    try:
        page = int(request.GET.get('page', 1))
    except:
        page = 1
        messages.error(request, _("Error page: %s. Changed to 1") % request.GET.get('page'))

    offset = 100 * (int(page) - 1)

    response = audio_get(request.session['vkapi'], album_id=album_id, offset=offset)

    audios = response.get('items', [])
    audios_count = response.get('count')

    if request.user.is_authenticated():
        initial_data = request.user
    else:
        initial_data = {'username': ''}

    form = RequestsForm(request.POST or None, initial=initial_data)

    if form.is_valid():
        request_doc = DownloadRequest()
        request_doc.update(form.cleaned_data)

        formated_audios = []
        for audio in audios:
            formated_data = {
                'title': "%s - %s" % (audio['artist'], audio['title']),
                'url': audio['url'],
                'processed': False,
            }
            formated_audios.append(formated_data)

        request_doc.update({'audios': formated_audios})
        request_doc.create('r', force_random_suffix=True)
        messages.success(request, _("Download request successfully created"))
        nsq_push('download_requests', request_doc.id, fmt="raw")

    return {
        'album_id': album_id,
        'audios_count': audios_count,
        'page': page,
        'offset': offset,
        'audios': audios,
        'form': form,
    }

Сейчас у нас есть список аудиозаписей, мы создаем запрос на скачивание и кладем id документа в nsq. НО, захотелось видеть список своих запросов и их статусы…

Итог запросов (отхождение):

И принялся я писать вьюху для их отображения. Была использована таже форма, что и выше, для отбора по email’у. В CouchDB был сделан дизайн док, тот, что строит индекс с ключем [email, дата создания]:

function(doc) {
    if (doc.type == 'request' && doc.is_active) {
        emit([doc.username, doc.date_created])
    }
}

А в аппликацию добавлена вьюха requests_list:

@render_to("downloader/requests.html")
def requests_list(request):

    requests = []

    if request.user.is_authenticated():
        initial_data = request.user
    else:
        initial_data = {'username': ''}

    form = RequestsForm(request.GET or None, initial=initial_data)

    if form.is_valid():
        requests = DownloadRequest.get_list(form.cleaned_data['username'])

    return {
        'form': form,
        'requests': requests,
    }

И дописана функция get_list в модель DownloadRequest:

@staticmethod
def get_list(email):
    db = django_couch.db('db_requests')

    requests = db.view('requests/list', include_docs=True, startkey=[email], endkey=[email, {}]).rows
    return [DownloadRequest(request) for request in requests]

Хэх! Сейчас я еще вижу и ранги, осталось написать nsq-обработчик, тот, что собственно будет скачивать…

Этап №3.2 — Скачивание. Через некоторое время возникли очерки обработчика nsq:

management/commands/download_request_worker.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import nsq
import signal
import requests
import django_couch

from django.conf import settings
from django.core.management.base import BaseCommand
from logger import logger

from downloader.models import DownloadRequest

class Command(BaseCommand):

    def handle(self, *args, **options):

        self.log = logger('download_request', int(options['verbosity']) > 1, settings.LOG_DIR)

        signal.signal(signal.SIGINT, self.signal_callback)
        signal.signal(signal.SIGTERM, self.signal_callback)

        self.db = django_couch.db('db_requests')

        nsq.Reader({"message_callback": self.message_callback},
                   "download_requests",
                   "download_requests",
                   nsqd_tcp_addresses=settings.NSQD_TCP_ADDRESSES)
        self.log.debug("Starting NSQ...")
        nsq.run()

    def process_request(self, request):
        self.log.debug("Setting status '%s' to 'processing.'" % request.status)

        request.status = 'processing'
        request.save()

        user_path = os.path.join(settings.DOWNLOAD_AUDIOS_DIR, request.username)
        if not os.path.exists(user_path):
            os.mkdir(user_path)

        self.log.debug("User dir: %s" % user_path)

        request_path = os.path.join(user_path, request.id)
        if not os.path.exists(request_path):
            os.mkdir(request_path)

        self.log.debug("Download request dir: %s" % request_path)

        for audio in request.audios:
            self.log.debug("Title: %s. Url: %s", audio['title'], audio['url'])
            if audio.get('processed', False):
                self.log.debug("Already processed")
                continue

            self.log.debug("Downloading file..")
            response = requests.get(audio['url'])
            self.log.debug("Downloaded")

            filename = os.path.join(request_path, "%s.mp3" % audio['title'])
            self.log.debug("Writing to filename: %s" % filename)

            with open(filename, 'wb') as f:
                f.write(response.content)
            self.log.debug("Setting audio to processed")
            audio['processed'] = True

        request.save()

    def message_callback(self, message):
        self.log.debug("Processing message id: %s", message.id)

        self.log.debug("Message data: %s", message.body)

        try:
            request = DownloadRequest.load(message.body)
            self.log.info("Document loaded. Audios count: %s" % len(request.audios))
            self.process_request(request)
            self.log.debug("Setting status '%s' to 'processed.'" % request.status)
            request.status = 'processed'
            request.save()
            self.log.debug("Request successfullly processed.")
        except:
            return False
        return True

    def signal_callback(self, signal_number, stack_frame):
        self.log.critical("Signal %d received. Shutting down", signal_number)

        sys.exit(-1)

Выходит что он умел:
— получал id документа с очереди
— создавал папку, если ее не было
— скачивал туда аудиофайлы

Дальше было дописано еще архивирование и отправка email’a пользователю. Формат сообщения, которое кладется в nsq немножко изменился, потому как для того, что бы возвести url на скачивание, необходимо знать host, для этого в django есть функция request.get_host(), но нет доступа к request’у внутри менеджмент команды (допустимо кто знает что дозволено сделать в этом случае), из-за чего я решил класть в nsq меседж дальнейшего вида: {‘host’: request.get_host(), ‘id’: <id документ запроса на скачивание>}.

Но все еще это были очерки. Повод тому — nsq. У nsq есть несколько ограничений:
— всяких N секунд он шлет heartbeat подключенным воркерам и если не получает результат 2 раза, коннект закрывается. Т.е. если наш обработчик будет скачивать много файлов, коннект будет закрыть.
— если nsq не получает, что сообщение обработано в течении N секунд, сообщение отдается иному обработчику. Т.е. если я запущу 2 обработчика, то скачивание начнется как минимум 2 раза.

Немножко посмотрев в pynsq решил применять async mode и также обрабатывать скачивание в отдельных процессах. Допустимо не самое отличное решение и не самый прекрасный код у меня получился.

Функция обработки nsq-сообщений купила дальнейший вид:

management/commands/download_request_worker.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import nsq
import json
import time
import shutil
import signal
import requests
import django_couch
import multiprocessing

from logger import logger
from datetime import datetime

from django.conf import settings
from django.core.mail import send_mail
from django.core.urlresolvers import reverse
from django.template.loader import render_to_string
from django.utils.translation import ugettext_lazy as _
from django.core.management.base import BaseCommand

from downloader.models import DownloadRequest
from downloader.views import nsq_push

class DownloadRequestProcess(multiprocessing.Process):

    def __init__(self, log, message, *args, **kwargs):
        self.log = log
        self.message = message
        super(DownloadRequestProcess, self).__init__(*args, **kwargs)

    def process_request(self, drequest):
        self.log.debug("Setting status '%s' to 'processing'. For doc: %s" % (drequest.status, drequest.id))

        drequest.status = 'processing'
        drequest.save()

        request_path = os.path.join(settings.DOWNLOAD_AUDIOS_DIR, drequest.id)
        if not os.path.exists(request_path):
            os.mkdir(request_path)

        self.log.debug("Download request dir: %s" % request_path)

        for audio in drequest.audios:
            self.log.debug("Title: %s. Url: %s", audio['title'], audio['url'])
            if audio.get('processed', False):
                self.log.debug("Audio already processed")
                continue

            filename = os.path.join(request_path, "%s.mp3" % audio['title'])
            self.log.debug("Filename: %s" % filename)

            self.log.debug("Downloading file...")
            response = requests.get(audio['url'])
            self.log.debug("Downloaded")

            with open(filename, 'wb') as f:
                f.write(response.content)
            self.log.debug("Setting audio to processed")
            audio['processed'] = True

        archive_path = None
        if drequest.get('archive'):
            archive_path = os.path.exists(os.path.join(request_path, drequest.archive))

        if not drequest.get('archive') and not archive_path:
            self.log.debug("Writing archive")
            archive = shutil.make_archive(request_path, 'gztar', settings.DOWNLOAD_AUDIOS_DIR, drequest.id)
            self.log.debug("Archive: %s" % archive)
            drequest['archive'] = os.path.basename(archive)

        self.log.debug("Deleting download path dir")
        shutil.rmtree(request_path)

        self.log.debug("Setting status '%s' to 'processed.'" % drequest.status)
        drequest['date_processed'] = datetime.now().isoformat(' ')
        drequest.status = 'processed'
        drequest.save()

    def run(self):
        self.log.debug("Message data: %s", self.message.body)

        data = json.loads(self.message.body)

        attempts = data.get('attempts', 1)
        if (attempts > 5):
            self.log.debug("Attempts limit reached, dropping this request")
            return

        drequest = DownloadRequest.load(data['id'])
        self.log.info("Document loaded. Audios count: %s" % len(drequest.audios))

        if drequest.get('processed'):
            self.log.debug("Download request already processed")
            return

        try:
            self.process_request(drequest)
            self.log.debug("Download request successfullly processed. Sending mail.")

            if drequest.get('archive'):
                archive_link = 'http://%s%s' % (data['host'], reverse('archive_link', args=[drequest.archive]))
                self.log.debug("Link to archive: %s" % archive_link)
                send_mail(_("Take you archive"),
                          render_to_string("mail/archive_mail.html", {'archive_link': archive_link}),
                          settings.SERVER_EMAIL,
                          [drequest.username])
                self.log.debug("Mail sent")
        except:
            self.log.debug("Error occured: %s. Setting status to error" % sys.exc_info()[1])
            drequest.status = 'error'
            drequest.status_verbose = "%s" % sys.exc_info()[1]
            drequest.save()

            sleep_time = 30 * attempts
            self.log.debug("Pushing it back to nsq in %s seconds. Topic: download_requests" % sleep_time)
            time.sleep(sleep_time)
            nsq_push('download_requests', {"host": data['host'], 'id': drequest.id, 'attempts': attempts   1})

class Command(BaseCommand):

    def handle(self, *args, **options):

        self.log = logger('download_request', int(options['verbosity']) > 1, settings.LOG_DIR)

        signal.signal(signal.SIGINT, self.signal_callback)
        signal.signal(signal.SIGTERM, self.signal_callback)

        self.db = django_couch.db('db_requests')

        nsq.Reader({"message_callback": self.message_callback},
                   "download_requests",
                   "download_requests",
                   nsqd_tcp_addresses=settings.NSQD_TCP_ADDRESSES)
        self.log.debug("Starting NSQ...")
        self.processes = []
        nsq.run()

    def message_callback(self, message):
        self.log.debug("Processing message id: %s", message.id)

        message.enable_async()

        process = DownloadRequestProcess(self.log, message)
        process.start()

        self.log.debug("Process: %s", process)
        message.finish()

        self.processes.append(process)

    def signal_callback(self, signal_number, stack_frame):
        self.log.critical("Signal %d received. Shutting down", signal_number)

        for process in self.processes:
            if process.is_alive():
                process.join()
                process.terminate()

        sys.exit(-1)

Выходит что делает данный обработчик:
— ловит сообщение с nsq
— создает новейший процесс
— подмечает nsq-сообщение как обработанное
— в отдельном процессе скачиваются аудиозаписи и создается архив
— отсылается emal
— в случае ошибки при обработке было решено класть это сообщение в nsq вторично и при этом вести свой личный счетчик неудачных обработок. За 6 разом не обрабатывать (допустимо есть другие пути — не искал, хватило этого).

Формат сообщение в nsq купил дальнейший вид: {‘host’: , ‘id’: <id запроса на скачивание>, ‘attempts’: <№ попытки>}). Сообщение клалось позже маленький задержки. Вычислял ее по формуле 30 сек умноженных на № попытки.

Конец

Я удачно скачал все свои аудиозаписи. Посмотрев, что архив в среднем весит 650Мб решил, что их необходимо удалять через некоторое время. Была написана еще одна менеджмент команда, которая достает все удачно обработанные запросы и удаляет архив, а также меняет ранг на «deleted». Тоже не самое изысканное решение, но хотелось стремительней завершить :)

management/commands/remove_in_24.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import django_couch
from datetimeimport datetime, timedelta

from logger import logger

from django.conf import settings
from django.core.management.base import BaseCommand

class Command(BaseCommand):

    help = u'prepare and send fax document'

    def handle(self, *args, **options):

        self.log = logger('delete_in_24', int(options['verbosity']) > 1, settings.LOG_DIR)
        self.db = django_couch.db("db_requests")

        requests = self.db.view("request_processed/list", include_docs=True).rows
        self.log.debug("Founded %s processed download requests", len(requests))
        for req in requests:
            self.log.debug("%s", req.value)
            self.log.debug("ID: %s. Archive: %s", req.id, req.value)

            now = datetime.now()
            date_expired = datetime.strptime(req.key, settings.DATETIME_FMT)   timedelta(hours=24)
            self.log.debug("Now: %s. Expired: %s", now.strftime(settings.DATETIME_FMT), date_expired.strftime(settings.DATETIME_FMT))

            if now < date_expired:
                self.log.debug("Passing this doc")
                continue

            archive_path = os.path.join(settings.DOWNLOAD_AUDIOS_DIR, req.value)
            self.log.debug("Archive path: %s", archive_path)

            if os.path.exists(archive_path):
                self.log.debug("Deleting file: %s", archive_path)
                os.unlink(archive_path)
            else:
                self.log.warning("Path doesn't exists")

            doc = req.doc
            self.log.debug("Settings status '%s' to 'deleted'", doc.status)
            doc.status = 'deleted'
            doc.save(self.db)

Couchdb дизайн док request_processed/list, тот, что достает все обработанные аудиозаписи

request_processed/list

function(doc) {
    if (doc.type == 'request' && doc.archive
    && doc.date_processed && doc.status != 'deleted') {
        emit(doc.date_processed.slice(0, 19), doc.archive);
    }
}

Ссылка на bitbucket: bitbucket.org/Kolyanu4/vkdownloader/src

 

Источник: programmingmaster.ru

Оставить комментарий
Форум phpBB, русская поддержка форума phpBB
Рейтинг@Mail.ru 2008 - 2017 © BB3x.ru - русская поддержка форума phpBB