https://pantsbuild.org/ logo
#general
Title
# general
w

witty-crayon-22786

03/23/2016, 10:17 PM
Copy code
@contextmanager
def owner_printing_file_lock(path, blocking=True, timeout=None):
  lock = InterProcessLock(path)
  deadline = time.time() + (timeout or 0)

  @contextmanager
  def attempt(attempt_blocking):
    with lock.acquire(blocking=attempt_blocking, timeout=(deadline - time.time())) as lock:
      if lock:
        yield lock
      else:
        ... print a message then retry ...
        yield attempt(blocking=blocking)
  
  yield attempt(attempt_blocking=False)