Skip to content

Correct way to use OLA in Python in a threaded way? #1616

Open
@sdbbs

Description

@sdbbs

I have tried for a while to make OLA drive DMX in a threaded way in Python - so I could do something else in my main code, and change DMX in the thread at times when needed. However, I also find it necessary because of this:

  • If OLA has started sending DMX (say, by using the sliders in localhost:9090), then it keeps sending DMX, even if values are at 0.
  • If OLA is in the previously described state, then doing say python ./ola/python/examples/ola_send_dmx.py, which does client.SendDmx only once, then the new value will be effectuated on device (via DMX) for about a second or less, and then DMX will revert back to the previous settings (those that are set on the sliders in localhost:9090)

... otherwise I might have decided not to use threads (if say, I could have just said SendDMX, and the DMX value is held by OLA indefinitely). I've observed the above with the ENTTEC DMX USB PRO as OLA DMX interface, and OLA built from commit f69e4a6.

The challenge otherwise seems to lie in:

  • "python does not provide any direct method to kill threads."
  • ClientWrapper.Run() is a blocking call (until ClientWrapper.Stop() releases it)
  • "CRITICAL:ola:SelectServer called in a thread other than the owner"

Best I could do is the code posted below (posted with all comments/misses I've made in developing it) - it works most of the time, except occasionally it makes a blink (the blink showing the data set by the sliders in localhost:9090, so it is black if the sliders are all set to zero). The thing is, I can see sometimes the fade "drag along", which is expected because of the printouts delay - but I'm not sure why the blink appears (I doubt that the printouts cause that much delay).

So my question is - what is the right way to use OLA DMX sending in a Python thread?

(it would also be nice, if ./ola/python/examples contained an example of this).

Edit: forgot to say the below is written in python3, with the patch in #1615

import sys, os
import time

from ola.ClientWrapper import ClientWrapper
from ola.OlaClient import OlaClient
# for DMX (SimpleFadeController)
from ola.DMXConstants import DMX_MIN_SLOT_VALUE, DMX_MAX_SLOT_VALUE, DMX_UNIVERSE_SIZE

# Note: array('B', ...) is `unsigned char`
from array import array
import threading

DMX_UPDATE_INTERVAL = 25  # In ms, this comes about to ~40 frames a second

class MySimpleDMXController(object):
  def __init__(self, universe):
    self._universe = universe
    self._update_interval = DMX_UPDATE_INTERVAL
    self._data = array('B', [DMX_MIN_SLOT_VALUE] * 10) # 10 slots by default
    self._wrapper = ClientWrapper()
    self._client = self._wrapper.Client()
    self.myDMXthread = None
    self.cbDMXstatus = None
    self.isDMXrunning = False
    self.wait_dmx_sent = False

  def resizedmxarray(self, newsize):
    self._data = array('B', [DMX_MIN_SLOT_VALUE] * newsize)

  def setdmxarray(self, inarr):
    while(self.wait_dmx_sent):
      time.sleep(0.001)
    if (len(inarr) != len(self._data)):
      self.resizedmxarray(len(inarr))
    for i in range(len(inarr)):
      self._data[i] = inarr[i]

  def DmxSentCallback(self, status): # does not get called if no _wrapper.Run()!
    self.cbDMXstatus = status.Succeeded()
    print("dmx callback {}".format(self.cbDMXstatus))
    self.wait_dmx_sent = False

  def UpdateDmx(self):
    """
    This function gets called periodically based on UPDATE_INTERVAL
    """
    print("UpdateDmx")
    # Send the DMX data
    self.wait_dmx_sent = True
    self._client.SendDmx(self._universe, self._data, self.DmxSentCallback)
    # For more information on Add Event, reference the OlaClient
    # Add our event again so it becomes periodic
    if self.isDMXrunning:
      self._wrapper.AddEvent(self._update_interval, self.UpdateDmx)
    else:
      print("UpdateDmx exiting")

  #def UpdateDmxThread(self):
  #  print("UpdateDmxThread")
  #  t = threading.currentThread()
  #  # call once, so it starts the loop
  #  self._wrapper.AddEvent(self._update_interval, self.UpdateDmx)
  #  self._wrapper.Run() # the _wrapper.Run() command blocks, until _wrapper.Stop() is called! Also, this command triggers "CRITICAL:ola:SelectServer called in a thread other than the owner"

  def startDMX(self):
    self.isDMXrunning = True
    self.myDMXthread = threading.Thread(name = 'myDMXthread', target = self.UpdateDmxThread, daemon = False, args = ( ))
    self.myDMXthread.start()

  def stopDMX(self):
    # just set the variable - UpdateDmx should stop looping then
    self.isDMXrunning = False
    self._wrapper.Stop()
    if self.myDMXthread is not None:
      self.myDMXthread.do_run = False
      self.myDMXthread.join()
    print("DMX update thread stopped.")


  """
  # note: due to _wrapper.Run() blocking, best it can be done without threads,
  # is the below code; however, in that case, UpdateDmx printout hogs everything,
  # not even DmxSentCallback print has time to appear!

  def DmxSentCallback(self, status): # does not get called if no _wrapper.Run()!
    self.cbDMXstatus = status.Succeeded()
    print("dmx callback {}".format(self.cbDMXstatus))
    # if we stop the wrapper here, we won't get a loop, unless _wrapper.Run() is called from UpdateDmx!
    # but if we don't stop the wrapper, it hogs everything so much, not even the print in main loop has time to print out!
    # also, the _wrapper.Run() command blocks, until _wrapper.Stop() is called!
    self._wrapper.Stop()

  def UpdateDmx(self):

    #This function gets called periodically based on UPDATE_INTERVAL

    print("UpdateDmx")
    # Send the DMX data
    self._client.SendDmx(self._universe, self._data, self.DmxSentCallback)
    # For more information on Add Event, reference the OlaClient
    # Add our event again so it becomes periodic
    if self.isDMXrunning:
      self._wrapper.AddEvent(self._update_interval, self.UpdateDmx)
      self._wrapper.Run() # will block until stopped!
    else:
      print("UpdateDmx exiting")

  def startDMX(self):
    self.isDMXrunning = True
    # call once, so it starts the loop
    self._wrapper.AddEvent(self._update_interval, self.UpdateDmx)
    self._wrapper.Run() # the _wrapper.Run() command blocks, until _wrapper.Stop() is called!

  def stopDMX(self):
    # just set the variable - UpdateDmx should stop looping then
    self.isDMXrunning = False
    self._wrapper.Stop()
  """

def dmxFadeThreadFunc(controller):
  t = threading.currentThread()
  mydata = [0]*10
  for ix in range(0, 100):
    print(ix)
    mydata[2] = ix
    controller.setdmxarray(mydata)
    time.sleep(DMX_UPDATE_INTERVAL/1000.0)
  for ix in reversed(range(0, 100)):
    print(ix)
    mydata[2] = ix
    controller.setdmxarray(mydata)
    time.sleep(DMX_UPDATE_INTERVAL/1000.0)

# Note: with a simple olaObjThread, everything starts fine (without "CRITICAL:ola:SelectServer called in a thread other than the owner") - however, it is impossible to stop it (since _wrapper.Run() blocks, no way to intervene through a variable)
# (https://www.geeksforgeeks.org/python-different-ways-to-kill-a-thread/: "python does not provide any direct method to kill threads.")
def olaObjThread():
  controller = MySimpleDMXController(universe=1)
  print("startDMX")
  #controller.startDMX()
  controller.isDMXrunning = True
  controller._wrapper.AddEvent(controller._update_interval, controller.UpdateDmx)
  controller._wrapper.Run() # the _wrapper.Run() command blocks, until _wrapper.Stop() is called!

class OlaCtlThread(threading.Thread):
  def __init__(self, *args, **kwargs):
    super(OlaCtlThread, self).__init__(*args, **kwargs)
    #self._stop = threading.Event()

    # this here does not cause "CRITICAL:ola:SelectServer called in a thread other than the owner" - but cannot really control it:
    #self.controller = MySimpleDMXController(universe=1)
    #self.controller.isDMXrunning = True
    #self.controller._wrapper.AddEvent(self.controller._update_interval, self.controller.UpdateDmx)
    #self.controller._wrapper.Run() # the _wrapper.Run() command blocks, until _wrapper.Stop() is called!

    self.controller = None # just declare here; so stop has a reference

  def start(self):
    print("OlaCtlThread start")
    #self.controller = MySimpleDMXController(universe=1) # causes "CRITICAL:ola:SelectServer called in a thread other than the owner" even here
    #self.controller.isDMXrunning = True
    #self.controller._wrapper.AddEvent(self.controller._update_interval, self.controller.UpdateDmx)
    #self.controller._wrapper.Run() # the _wrapper.Run() command blocks, until _wrapper.Stop() is called!
    threading.Thread.start(self) # must be here, else run() does not execute! (this seemingly calls run())

  def run(self):
    print("OlaCtlThread run")
    self.controller = MySimpleDMXController(universe=1) # apparently, must instantiate here, so we don't get "CRITICAL:ola:SelectServer called in a thread other than the owner"
    self.controller.isDMXrunning = True
    self.controller._wrapper.AddEvent(self.controller._update_interval, self.controller.UpdateDmx)
    self.controller._wrapper.Run() # the _wrapper.Run() command blocks, until _wrapper.Stop() is called!

  def stop(self):
    print("self.controller._wrapper.Stop")
    self.controller._wrapper.Stop()

def main(args):
  # NOTE: this MySimpleDMXController *MUST* be instantiated in the thread that calls ._wrapper.Run() - else "CRITICAL:ola:SelectServer called in a thread other than the owner"!
  #controller = MySimpleDMXController(universe=1)
  #print("startDMX")
  #controller.startDMX()

  #olaThread = threading.Thread(name = 'olaThread', target = olaObjThread, daemon = False, args = (  ))
  #olaThread.start()
  #time.sleep(1)
  #olaThread.do_run = False
  #olaThread.join()

  # this is finally OK:
  #print("pre oc")
  #ocThread = OlaCtlThread()
  #print("pre oc start")
  #ocThread.start()
  #time.sleep(1)
  #print("pre stop")
  #ocThread.stop()
  #ocThread.join()

  print("pre oc")
  ocThread = OlaCtlThread(daemon = True)
  print("pre oc start")
  ocThread.start() # calls also run()

  while ocThread.controller is None:
    print("ocThread.controller is None")
    time.sleep(0.01)

  print("start dmxFadeThread")
  # somehow, things changed from 1 hour ago, so now tuple is not accepted for args - if using tuple, getting "TypeError: dmxFadeThreadFunc() argument after * must be an iterable, not MySimpleDMXController" - have to use list ?!?!
  fadeThread = threading.Thread(name = 'fadeThread', target = dmxFadeThreadFunc, daemon = True, args = [ocThread.controller] )
  fadeThread.start()
  print("wait for dmxFadeThread to terminate ...")
  fadeThread.join()

  print("pre stop")
  ocThread.stop()
  ocThread.join()


  #print("start dmxFadeThread")
  #fadeThread = threading.Thread(name = 'fadeThread', target = dmxFadeThreadFunc, daemon = False, args = ( controller ))
  #fadeThread.start()
  #print("wait for dmxFadeThread to terminate ...")
  #fadeThread.join()
  #controller.stopDMX()


# ENTRY POINT
if __name__ == '__main__':
  main(sys.argv[1:])

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions