# This file is part of Beremiz runtime.
# Copyright (C) 2018: Edouard TISSERANT
# See COPYING.Runtime file for copyrights details.
from threading import Lock, Condition, Thread, get_ident
job to be executed by a worker
def __init__(self, call, *args, **kwargs):
self.job = (call, args, kwargs)
do the job by executing the call, and deal with exceptions
call, args, kwargs = self.job
self.result = call(*args, **kwargs)
serialize main thread load/unload of PLC shared objects
self.todo = Condition(self.mutex)
self.done = Condition(self.mutex)
self.free = Condition(self.mutex)
reraise exception happend in a job
@param job: job where original exception happend
def runloop(self, *args, **kwargs):
meant to be called by worker thread (blocking)
self._threadID = get_ident()
_job = job(*args, **kwargs)
# _job.success can't be None after do()
self.todo.wait_for(self.job is not None)
def interleave(self, waker, stopper, *args, **kwargs):
as for twisted reactor's interleave, it passes all jobs to waker func
additionaly, it creates a new thread to wait for new job.
self.feed = Condition(self.mutex)
self._threadID = get_ident()
self.job = job(*args, **kwargs)
self.done.wait_for(lambda: self.job.success is not None)
self.todo.wait_for(lambda: self.job is not None)
self.done.wait_for(lambda: self.job.success is not None)
self.own_thread = Thread(target = wakerfeedingloop)
def call(self, *args, **kwargs):
creates a job, execute it in worker thread, and deliver result.
if job execution raise exception, re-raise same exception
meant to be called by non-worker threads, but this is accepted.
_job = job(*args, **kwargs)
if self._threadID == get_ident():
# if caller is worker thread execute immediately
# otherwise notify and wait for completion
raise EOFError("Worker is disabled")
self.free.wait_for(lambda: self.job is None)
self.done.wait_for(lambda: _job.success is not None)
raise EOFError("Worker job was interrupted")
unblocks main thread, and terminate execution of runloop()
if self.own_thread is None:
if self.stopper is not None: