개발자/파이썬 Python

일정 간격으로 함수를 반복 실행하는 방법 python

지구빵집 2020. 11. 12. 11:29
반응형

 

 

 

시간 간격에 따라 어떤 주어진 일을 반복적으로 실행해야 하는 경우는 많습니다. 특히 주기적으로 데이터를 전송한다든가, 그러는 중에 물리적인 이벤트의 발생을 감시해야 하는 경우도 생기기 때문입니다. 이런 경우 파이선 time 모듈의 sleep 기능은 일정 시간동안 프로세스를 중지시키는 기능을 이용해 반복적인 일을 할 수가 있지만 다른 일은 하지 못합니다. 그래서 여기서는 일정 간격으로 정해진 프로세스를 반복적으로 죽을때까지 실행하는 방법을 정리합니다. 

 

time 라이브러리의 sleep 함수를 사용하면 일정 시간동안 프로세스를 일시정지할 수 있습니다. sleep(10)이라고 하면, 10초간 프로세스를 중지한다라는 의미입니다. 실수단위로도 지정할 수 있어 정교한 시간 제어가 가능합니다. sleep(5.5) 대기 상태이므로 아까운 CPU를 죽게 만듭니다.

 

import time
print ("Sleep 5 seconds from now on...")
time.sleep(5.555)
print("wake up!")

 

다음 예제는 sched 모듈을 사용하는 예제인데, sched 모듈은 일정 시간 후에 함수가 실행되도록 예약하는 데 사용됩니다. time.sleep ()을 사용하지 않고 x 초마다 함수 호출을 반복하려면 어떻게 사용합니까? 하는게 우리의 과제다. 아래 코드의 다른 문제는 "do your stuff" 는 즉각적이지 않고 time.sleep 여기에 오류가 누적될 수 있기 때문입니다. "X 초마다 실행"과 "~ X 초 간격으로 반복 실행"은 동일하지 않습니다. 대단히 심오한 문제로 보입니다. ^^ 

 

import sched, time
s = sched.scheduler(time.time, time.sleep)
def do_something(sc): 
    print("Doing stuff...")
    # do your stuff
    s.enter(60, 1, do_something, (sc,))

s.enter(60, 1, do_something, (s,))
s.run()

결과

pi@raspberrypi:~/timer $ python exam01.py
Doing stuff...
Doing stuff...
Doing stuff...
Doing stuff...
Doing stuff...
Doing stuff...

 

아래와 같이 타임루프를 시스템 시계에 고정시키는 방법이다. 정확히 동작하는데 여기에 1 초 이상 걸리는 코드를 추가해야 하는 경우 시간 초과가 발생하고 뒤처지기 시작합니다.  여기서 starttime을 특정 시간에 동기화하여 시작하면 뺄 필요가 없습니다. time.sleep(60 - time.time() % 60). "나는 그것을 그대로 사용했으며 내가 원하는대로 정확히 time.sleep(1200 - time.time() % 1200)에서 로그를 제공합니다." 

좋은 점은 여러 반복 후 드리프트를 방지합니다. 개별 반복은 sleep(), timer()정밀도 및 루프 본문을 실행하는 데 걸리는 시간 에 따라 반복은 항상 간격 경계에서 발생합니다

 

import time
starttime = time.time()
while True:
    print "tick"
    time.sleep(60.0 - ((time.time() - starttime) % 60.0))

 

다음으로 좀 오버스러운 듯 보이지만 Reactor Pattern 을 구현하는 Python 네트워킹 라이브러리인 Twisted 를 고려할 수 있습니다. Twisted는 이벤트 드라이븐 네트워크 엔진입니다. 코드는 아래와 같습니다. 

 

from twisted.internet import task, reactor

timeout = 60.0 # Sixty seconds

def doWork():
    #do work here
    pass

l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds

reactor.run()

 

함수를 주기적으로 실행하는 비 차단 방식을 원한다면 무한 루프를 사용하지 말고 스레드 타이머를 사용합니다. 이렇게하면 코드가 계속 실행되고 다른 작업을 수행하고 n 초마다 함수를 호출 할 수 있습니다. 긴 CPU / 디스크 / 네트워크 집약적인 작업에 대한 진행 정보를 틈틈이 프린트하고 로그를 보는 방식으로 이 기술을 많이 사용합니다. start()와 stop() 컨트롤을 사용한 코드

 

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

 

사용법은 아래 코드와 같습니다.

 

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

 

이 코드의 좋은 점은

  • 표준 라이브러리만 사용하여 외부 종속성 없음
  • 타이머가 이미 시작되었거나 스톱 상태라고 해고 start(), stop() 다중 호출에도 안전함.
  • 호출 할 함수는 위치 및 명명된 인수를 가질 수 있습니다.
  • interval을 언제든지 변경할 수 있으며 다음 실행 후에 적용됩니다. 

 

반응형

 

위 코드를 업데이트하여 약간씩 늦어지는 것을 방지하는 코드로 업데이한 결과가 아래 있습니다. 코드와 결과를 함께 나타냅니다. 여기서 RepeatedTimer 클래스는 OP에 의해 요청 된대로 "간격"초마다 주어진 함수를 호출합니다. 일정은 함수가 실행되는 데 걸리는 시간에 의존하지 않습니다. 이 솔루션은 외부 라이브러리 종속성이 없기 때문에 마음에 듭니다. 이것은 순수한 파이썬입니다. 

 

import threading 
import time

class RepeatedTimer(object):
  def __init__(self, interval, function, *args, **kwargs):
    self._timer = None
    self.interval = interval
    self.function = function
    self.args = args
    self.kwargs = kwargs
    self.is_running = False
    self.next_call = time.time()
    self.start()

  def _run(self):
    self.is_running = False
    self.start()
    self.function(*self.args, **self.kwargs)

  def start(self):
    if not self.is_running:
      self.next_call += self.interval
      self._timer = threading.Timer(self.next_call - time.time(), self._run)
      self._timer.start()
      self.is_running = True

  def stop(self):
    self._timer.cancel()
    self.is_running = False
    
 샘플 사용법은
 
 from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!
    
    
    
    

 

여러가지 면에서 탁월한 방법을 제안하는 코드가 아래에 있으며 다음과 같은 여러 기능을 제공합니다.

 

  • 예외 처리 : 이 수준에서 가능한 한 예외는 적절하게 처리됩니다. 즉, 프로그램을 중단하지 않고 디버깅 목적으로 기록됩니다.
  • 체인 없음 : 많은 답변에서 찾을 수있는 일반적인 체인과 같은 구현 (다음 이벤트 예약 용)은 스케줄링 메커니즘 ( threading.Timer또는 기타) 내에서 문제가 발생하면 체인이 종료 된다는 측면에서 깨지기 쉽습니다 . 문제의 원인이 이미 수정 된 경우에도 더 이상 실행되지 않습니다. 단순 루프와 대기 sleep()는 비교에서 훨씬 더 강력합니다.
  • 드리프트 없음 : 내 솔루션은 실행해야하는 시간을 정확하게 추적합니다. 실행 시간에 따른 드리프트가 없습니다 (다른 많은 솔루션에서와 같이).
  • 건너 뛰기 : 한 번의 실행에 너무 많은 시간이 걸리면 내 솔루션이 작업을 건너 뜁니다 (예 : 5 초마다 X를 수행하지만 X는 6 초가 소요됨). 이것은 표준 크론 동작입니다 (그리고 좋은 이유가 있습니다). 그런 다음 다른 많은 솔루션이 지연없이 작업을 연속으로 여러 번 실행합니다. 대부분의 경우 (예 : 정리 작업) 이것은 바람직하지 않습니다. 이 경우 되고 싶다고, 단순히 사용하는 next_time += delay대신 

 

import time, traceback

def every(delay, task):
  next_time = time.time() + delay
  while True:
    time.sleep(max(0, next_time - time.time()))
    try:
      task()
    except Exception:
      traceback.print_exc()
      # in production code you might want to have this instead of course:
      # logger.exception("Problem while executing repetitive task.")
    # skip tasks if we are behind schedule:
    next_time += (time.time() - next_time) // delay * delay + delay

def foo():
  print("foo", time.time())

every(5, foo)

 

나머지 코드를 차단하지 않고이를 수행하려면이를 사용하여 자체 스레드에서 실행할 수 있습니다. 

 

import threading
threading.Thread(target=lambda: every(5, foo)).start()

 

아래 간단한 코드도 참고

 

import threading, time

def print_every_n_seconds(n=2):
    while True:
        print(time.ctime())
        time.sleep(n)

thread = threading.Thread(target=print_every_n_seconds)
thread.start()

결과는 정확히 

Sun Nov  8 15:18:06 2020
Sun Nov  8 15:18:08 2020
Sun Nov  8 15:18:10 2020
Sun Nov  8 15:18:12 2020
Sun Nov  8 15:18:14 2020
Sun Nov  8 15:18:16 2020
Sun Nov  8 15:18:18 2020
Sun Nov  8 15:18:20 2020

 

 

참고문서

 

python time.sleep() vs event.wait

Cronus 모듈 사용 - fixed frequency computation

What is the best way to repeatedly execute a function every x seconds?  

 

 

타이머와 타이밍

 

 

 

 

반응형