From c4b0deacb236fc9ba631a0c25b5d739e1ba4ce89 Mon Sep 17 00:00:00 2001 From: Calin Crisan Date: Mon, 12 Sep 2016 21:46:54 +0300 Subject: [PATCH] the tasks module now always runs tasks in a separate process, asynchronously; upload services are now managed only via tasks; many upload service fixes --- motioneye/config.py | 6 +-- motioneye/handlers.py | 51 ++++++++++++--------- motioneye/tasks.py | 22 +++------ motioneye/uploadservices.py | 91 +++++++++++++++++++++++-------------- 4 files changed, 94 insertions(+), 76 deletions(-) diff --git a/motioneye/config.py b/motioneye/config.py index 297865e..bfbafec 100644 --- a/motioneye/config.py +++ b/motioneye/config.py @@ -796,11 +796,9 @@ def motion_camera_ui_to_dict(ui, old_config=None): if ui['upload_enabled'] and '@id' in old_config: upload_settings = {k[7:]: ui[k] for k in ui.iterkeys() if k.startswith('upload_')} - service = uploadservices.get(old_config['@id'], ui['upload_service']) - service.load(upload_settings) - service.save() - tasks.add(0, uploadservices.invalidate, tag='invalidate_uploadservices', async=True) + tasks.add(0, uploadservices.update, tag='uploadservices.update(%s)' % ui['upload_service'], + camera_id=old_config['@id'], service_name=ui['upload_service'], settings=upload_settings) if ui['text_overlay']: left_text = ui['left_text'] diff --git a/motioneye/handlers.py b/motioneye/handlers.py index 09ea275..4486861 100644 --- a/motioneye/handlers.py +++ b/motioneye/handlers.py @@ -749,6 +749,25 @@ class ConfigHandler(BaseHandler): else: self.finish_json({'ok': False}) + + @classmethod + def _on_test_result(cls, result): + upload_service_test_info = getattr(cls, '_upload_service_test_info', None) + cls._upload_service_test_info = None + + if not upload_service_test_info: + return logging.warn('no pending upload service test request') + + (request_handler, service_name) = upload_service_test_info + + if result is True: + logging.debug('accessing %s succeeded' % service_name) + request_handler.finish_json() + + else: + logging.warn('accessing %s failed: %s' % (service_name, result)) + request_handler.finish_json({'error': result}) + @BaseHandler.auth(admin=True) def test(self, camera_id): @@ -759,21 +778,11 @@ class ConfigHandler(BaseHandler): if utils.local_motion_camera(camera_config): if what == 'upload_service': service_name = data['service'] - service = uploadservices.get(camera_id, service_name) - service.load(data) - if not service: - raise HTTPError(400, 'unknown upload service %s' % service_name) - - logging.debug('testing access to %s' % service) - result = service.test_access() - if result is True: - logging.debug('accessing %s succeeded' % service) - self.finish_json() - - else: - logging.warn('accessing %s failed: %s' % (service, result)) - self.finish_json({'error': result}) - + ConfigHandler._upload_service_test_info = (self, service_name) + + tasks.add(0, uploadservices.test_access, tag='uploadservices.test(%s)'% service_name, + camera_id=camera_id, service_name=service_name, data=data, callback=self._on_test_result) + elif what == 'email': import sendmail import tzctl @@ -866,11 +875,9 @@ class ConfigHandler(BaseHandler): if not service_name: raise HTTPError(400, 'service_name required') - service = uploadservices.get(camera_id, service_name) - if not service: - raise HTTPError(400, 'unknown upload service %s' % service_name) - - url = service.get_authorize_url() + url = uploadservices.get_authorize_url(service_name) + if not url: + raise HTTPError(400, 'no authorization url for upload service %s' % service_name) logging.debug('redirected to authorization url %s' % url) self.redirect(url) @@ -1667,7 +1674,7 @@ class RelayEventHandler(BaseHandler): filename = self.get_argument('filename') # generate preview (thumbnail) - tasks.add(5, mediafiles.make_movie_preview, tag='make_movie_preview(%s)' % filename, async=True, + tasks.add(5, mediafiles.make_movie_preview, tag='make_movie_preview(%s)' % filename, camera_config=camera_config, full_path=filename) # upload to external service @@ -1689,7 +1696,7 @@ class RelayEventHandler(BaseHandler): def upload_media_file(self, filename, camera_id, camera_config): service_name = camera_config['@upload_service'] - tasks.add(5, uploadservices.upload_media_file, tag='upload_media_file(%s)' % filename, async=True, + tasks.add(5, uploadservices.upload_media_file, tag='upload_media_file(%s)' % filename, camera_id=camera_id, service_name=service_name, target_dir=camera_config['@upload_subfolders'] and camera_config['target_dir'], filename=filename) diff --git a/motioneye/tasks.py b/motioneye/tasks.py index 47418e0..16bace1 100644 --- a/motioneye/tasks.py +++ b/motioneye/tasks.py @@ -28,13 +28,11 @@ from tornado.ioloop import IOLoop import settings -_INTERVAL = 10 +_INTERVAL = 2 _STATE_FILE_NAME = 'tasks.pickle' _MAX_TASKS = 100 -# we must be sure there's only one extra process -# that handles all asynchronous tasks, -# as we often invalidate various caches +# we must be sure there's only one extra process that handles all tasks _POOL_SIZE = 1 _tasks = [] @@ -64,7 +62,7 @@ def stop(): _pool = None -def add(when, func, tag=None, async=False, **params): +def add(when, func, tag=None, callback=None, **params): if len(_tasks) >= _MAX_TASKS: return logging.error('the maximum number of tasks (%d) has been reached' % _MAX_TASKS) @@ -84,7 +82,7 @@ def add(when, func, tag=None, async=False, **params): i += 1 logging.debug('adding task "%s" in %d seconds' % (tag or func.func_name, when - now)) - _tasks.insert(i, (when, func, tag, async, params)) + _tasks.insert(i, (when, func, tag, callback, params)) _save() @@ -96,19 +94,11 @@ def _check_tasks(): now = time.time() changed = False while _tasks and _tasks[0][0] <= now: - (when, func, tag, async, params) = _tasks.pop(0) # @UnusedVariable + (when, func, tag, callback, params) = _tasks.pop(0) # @UnusedVariable logging.debug('executing task "%s"' % tag or func.func_name) - if async: - _pool.apply_async(func, kwds=params) + _pool.apply_async(func, kwds=params, callback=callback if callable(callback) else None) - else: - try: - func(**params) - - except Exception as e: - logging.error('task "%s" failed: %s' % (tag or func.func_name, e), exc_info=True) - changed = True if changed: diff --git a/motioneye/uploadservices.py b/motioneye/uploadservices.py index 5728acc..9246cb0 100644 --- a/motioneye/uploadservices.py +++ b/motioneye/uploadservices.py @@ -40,7 +40,8 @@ class UploadService(object): def __str__(self): return self.NAME - def get_authorize_url(self): + @classmethod + def get_authorize_url(cls): return '/' def test_access(self): @@ -155,21 +156,21 @@ class GoogleDrive(UploadService): self._folder_id_times = {} UploadService.__init__(self, camera_id) - - def get_authorize_url(self): + + @classmethod + def get_authorize_url(cls): query = { - 'scope': self.SCOPE, + 'scope': cls.SCOPE, 'redirect_uri': 'urn:ietf:wg:oauth:2.0:oob', 'response_type': 'code', - 'client_id': self.CLIENT_ID, + 'client_id': cls.CLIENT_ID, 'access_type': 'offline' } - return self.AUTH_URL + '?' + urllib.urlencode(query) + return cls.AUTH_URL + '?' + urllib.urlencode(query) def test_access(self): try: - self._credentials = None # invalidate credentials self._folder_ids = {} self._get_folder_id() return True @@ -215,13 +216,13 @@ class GoogleDrive(UploadService): } def load(self, data): - if 'location' in data: + if data.get('location'): self._location = data['location'] self._folder_ids = {} - if 'authorization_key' in data: + if data.get('authorization_key'): self._authorization_key = data['authorization_key'] self._credentials = None - if 'credentials' in data: + if data.get('credentials'): self._credentials = data['credentials'] def _get_folder_id(self, path=''): @@ -447,17 +448,16 @@ class Dropbox(UploadService): UploadService.__init__(self, camera_id) - def get_authorize_url(self): + @classmethod + def get_authorize_url(cls): query = { 'response_type': 'code', - 'client_id': self.CLIENT_ID + 'client_id': cls.CLIENT_ID } - return self.AUTH_URL + '?' + urllib.urlencode(query) + return cls.AUTH_URL + '?' + urllib.urlencode(query) def test_access(self): - self._credentials = None # invalidate credentials - body = { 'path': self._clean_location(), 'recursive': False, @@ -504,12 +504,12 @@ class Dropbox(UploadService): } def load(self, data): - if 'location' in data: + if data.get('location'): self._location = data['location'] - if 'authorization_key' in data: + if data.get('authorization_key'): self._authorization_key = data['authorization_key'] self._credentials = None - if 'credentials' in data: + if data.get('credentials'): self._credentials = data['credentials'] def _clean_location(self): @@ -602,7 +602,17 @@ class Dropbox(UploadService): return { 'access_token': data['access_token'] } + + +def get_authorize_url(service_name): + cls = UploadService.get_service_classes().get(service_name) + + if cls: + return cls.get_authorize_url() + else: + return None + def get(camera_id, service_name): global _services @@ -624,6 +634,35 @@ def get(camera_id, service_name): return service +def test_access(camera_id, service_name, data): + logging.debug('testing access to %s' % service_name) + + service = get(camera_id, service_name) + service.load(data) + if not service: + return 'unknown upload service %s' % service_name + + return service.test_access() + + +def update(camera_id, service_name, settings): + service = get(camera_id, service_name) + service.load(settings) + service.save() + + +def upload_media_file(camera_id, target_dir, service_name, filename): + service = get(camera_id, service_name) + if not service: + return logging.error('service "%s" not initialized for camera with id %s' % (service_name, camera_id)) + + try: + service.upload_file(target_dir, filename) + + except Exception as e: + logging.error('failed to upload file "%s" with service %s: %s' % (filename, service, e), exc_info=True) + + def _load(): services = {} @@ -692,19 +731,3 @@ def _save(services): file.close() -def invalidate(): - global _services - - _services = None - - -def upload_media_file(camera_id, target_dir, service_name, filename): - service = get(camera_id, service_name) - if not service: - return logging.error('service "%s" not initialized for camera with id %s' % (service_name, camera_id)) - - try: - service.upload_file(target_dir, filename) - - except Exception as e: - logging.error('failed to upload file "%s" with service %s: %s' % (filename, service, e), exc_info=True) -- 2.39.5