Source code for django_celery_beat.models

"""Database models."""
from __future__ import absolute_import, unicode_literals

from datetime import timedelta

from django.core.exceptions import MultipleObjectsReturned, ValidationError
from django.db import models
from django.db.models import signals
from django.utils.translation import ugettext_lazy as _

from celery import schedules
from celery.five import python_2_unicode_compatible

from . import managers
from .utils import now, make_aware

DAYS = 'days'
HOURS = 'hours'
MINUTES = 'minutes'
SECONDS = 'seconds'
MICROSECONDS = 'microseconds'

PERIOD_CHOICES = (
    (DAYS, _('Days')),
    (HOURS, _('Hours')),
    (MINUTES, _('Minutes')),
    (SECONDS, _('Seconds')),
    (MICROSECONDS, _('Microseconds')),
)

SOLAR_SCHEDULES = [(x, _(x)) for x in sorted(schedules.solar._all_events)]


[docs]def cronexp(field): """Representation of cron expression.""" return field and str(field).replace(' ', '') or '*'
[docs]@python_2_unicode_compatible class SolarSchedule(models.Model): """Schedule following astronomical patterns.""" event = models.CharField( _('event'), max_length=24, choices=SOLAR_SCHEDULES ) latitude = models.DecimalField( _('latitude'), max_digits=9, decimal_places=6 ) longitude = models.DecimalField( _('longitude'), max_digits=9, decimal_places=6 ) class Meta: """Table information.""" verbose_name = _('solar event') verbose_name_plural = _('solar events') ordering = ('event', 'latitude', 'longitude') unique_together = ('event', 'latitude', 'longitude') @property def schedule(self): return schedules.solar(self.event, self.latitude, self.longitude, nowfun=lambda: make_aware(now()))
[docs] @classmethod def from_schedule(cls, schedule): spec = {'event': schedule.event, 'latitude': schedule.lat, 'longitude': schedule.lon} try: return cls.objects.get(**spec) except cls.DoesNotExist: return cls(**spec) except MultipleObjectsReturned: cls.objects.filter(**spec).delete() return cls(**spec)
def __str__(self): return '{0} ({1}, {2})'.format( self.get_event_display(), self.latitude, self.longitude )
[docs]@python_2_unicode_compatible class IntervalSchedule(models.Model): """Schedule executing every n seconds.""" DAYS = DAYS HOURS = HOURS MINUTES = MINUTES SECONDS = SECONDS MICROSECONDS = MICROSECONDS PERIOD_CHOICES = PERIOD_CHOICES every = models.IntegerField(_('every'), null=False) period = models.CharField( _('period'), max_length=24, choices=PERIOD_CHOICES, ) class Meta: """Table information.""" verbose_name = _('interval') verbose_name_plural = _('intervals') ordering = ['period', 'every'] @property def schedule(self): return schedules.schedule( timedelta(**{self.period: self.every}), nowfun=lambda: make_aware(now()) )
[docs] @classmethod def from_schedule(cls, schedule, period=SECONDS): every = max(schedule.run_every.total_seconds(), 0) try: return cls.objects.get(every=every, period=period) except cls.DoesNotExist: return cls(every=every, period=period) except MultipleObjectsReturned: cls.objects.filter(every=every, period=period).delete() return cls(every=every, period=period)
def __str__(self): if self.every == 1: return _('every {0.period_singular}').format(self) return _('every {0.every} {0.period}').format(self) @property def period_singular(self): return self.period[:-1]
[docs]@python_2_unicode_compatible class CrontabSchedule(models.Model): """Crontab-like schedule.""" # # The worst case scenario for day of month is a list of all 31 day numbers # '[1, 2, ..., 31]' which has a length of 115. Likewise, minute can be # 0..59 and hour can be 0..23. Ensure we can accomodate these by allowing # 4 chars for each value (what we save on 0-9 accomodates the []). # We leave the other fields at their historical length. # minute = models.CharField(_('minute'), max_length=60 * 4, default='*') hour = models.CharField(_('hour'), max_length=24 * 4, default='*') day_of_week = models.CharField( _('day of week'), max_length=64, default='*', ) day_of_month = models.CharField( _('day of month'), max_length=31 * 4, default='*', ) month_of_year = models.CharField( _('month of year'), max_length=64, default='*', ) class Meta: """Table information.""" verbose_name = _('crontab') verbose_name_plural = _('crontabs') ordering = ['month_of_year', 'day_of_month', 'day_of_week', 'hour', 'minute'] def __str__(self): return '{0} {1} {2} {3} {4} (m/h/d/dM/MY)'.format( cronexp(self.minute), cronexp(self.hour), cronexp(self.day_of_week), cronexp(self.day_of_month), cronexp(self.month_of_year), ) @property def schedule(self): return schedules.crontab(minute=self.minute, hour=self.hour, day_of_week=self.day_of_week, day_of_month=self.day_of_month, month_of_year=self.month_of_year, nowfun=lambda: make_aware(now()))
[docs] @classmethod def from_schedule(cls, schedule): spec = {'minute': schedule._orig_minute, 'hour': schedule._orig_hour, 'day_of_week': schedule._orig_day_of_week, 'day_of_month': schedule._orig_day_of_month, 'month_of_year': schedule._orig_month_of_year} try: return cls.objects.get(**spec) except cls.DoesNotExist: return cls(**spec) except MultipleObjectsReturned: cls.objects.filter(**spec).delete() return cls(**spec)
[docs]class PeriodicTasks(models.Model): """Helper table for tracking updates to periodic tasks.""" ident = models.SmallIntegerField(default=1, primary_key=True, unique=True) last_update = models.DateTimeField(null=False) objects = managers.ExtendedManager()
[docs] @classmethod def changed(cls, instance, **kwargs): if not instance.no_changes: cls.update_changed()
[docs] @classmethod def update_changed(cls, **kwargs): cls.objects.update_or_create(ident=1, defaults={'last_update': now()})
[docs] @classmethod def last_change(cls): try: return cls.objects.get(ident=1).last_update except cls.DoesNotExist: pass
[docs]@python_2_unicode_compatible class PeriodicTask(models.Model): """Model representing a periodic task.""" name = models.CharField( _('name'), max_length=200, unique=True, help_text=_('Useful description'), ) task = models.CharField(_('task name'), max_length=200) interval = models.ForeignKey( IntervalSchedule, on_delete=models.CASCADE, null=True, blank=True, verbose_name=_('interval'), ) crontab = models.ForeignKey( CrontabSchedule, on_delete=models.CASCADE, null=True, blank=True, verbose_name=_('crontab'), help_text=_('Use one of interval/crontab'), ) solar = models.ForeignKey( SolarSchedule, on_delete=models.CASCADE, null=True, blank=True, verbose_name=_('solar'), help_text=_('Use a solar schedule') ) args = models.TextField( _('Arguments'), blank=True, default='[]', help_text=_('JSON encoded positional arguments'), ) kwargs = models.TextField( _('Keyword arguments'), blank=True, default='{}', help_text=_('JSON encoded keyword arguments'), ) queue = models.CharField( _('queue'), max_length=200, blank=True, null=True, default=None, help_text=_('Queue defined in CELERY_TASK_QUEUES'), ) exchange = models.CharField( _('exchange'), max_length=200, blank=True, null=True, default=None, ) routing_key = models.CharField( _('routing key'), max_length=200, blank=True, null=True, default=None, ) expires = models.DateTimeField( _('expires'), blank=True, null=True, ) enabled = models.BooleanField( _('enabled'), default=True, ) last_run_at = models.DateTimeField( auto_now=False, auto_now_add=False, editable=False, blank=True, null=True, ) total_run_count = models.PositiveIntegerField( default=0, editable=False, ) date_changed = models.DateTimeField(auto_now=True) description = models.TextField(_('description'), blank=True) objects = managers.PeriodicTaskManager() no_changes = False class Meta: """Table information.""" verbose_name = _('periodic task') verbose_name_plural = _('periodic tasks')
[docs] def validate_unique(self, *args, **kwargs): super(PeriodicTask, self).validate_unique(*args, **kwargs) if not self.interval and not self.crontab and not self.solar: raise ValidationError({ 'interval': [ 'One of interval, crontab, or solar must be set.' ] }) if self.interval and self.crontab and self.solar: raise ValidationError({ 'crontab': [ 'Only one of interval, crontab, or solar must be set' ] })
[docs] def save(self, *args, **kwargs): self.exchange = self.exchange or None self.routing_key = self.routing_key or None self.queue = self.queue or None if not self.enabled: self.last_run_at = None super(PeriodicTask, self).save(*args, **kwargs)
def __str__(self): fmt = '{0.name}: {{no schedule}}' if self.interval: fmt = '{0.name}: {0.interval}' if self.crontab: fmt = '{0.name}: {0.crontab}' if self.solar: fmt = '{0.name}: {0.solar}' return fmt.format(self) @property def schedule(self): if self.interval: return self.interval.schedule if self.crontab: return self.crontab.schedule if self.solar: return self.solar.schedule
signals.pre_delete.connect(PeriodicTasks.changed, sender=PeriodicTask) signals.pre_save.connect(PeriodicTasks.changed, sender=PeriodicTask) signals.pre_delete.connect( PeriodicTasks.update_changed, sender=IntervalSchedule) signals.post_save.connect( PeriodicTasks.update_changed, sender=IntervalSchedule) signals.post_delete.connect( PeriodicTasks.update_changed, sender=CrontabSchedule) signals.post_save.connect( PeriodicTasks.update_changed, sender=CrontabSchedule) signals.post_delete.connect( PeriodicTasks.update_changed, sender=SolarSchedule) signals.post_save.connect( PeriodicTasks.update_changed, sender=SolarSchedule)