FritzAB2Matrix/libs/monitoring/__init__.py

44 lines
1.3 KiB
Python
Raw Permalink Normal View History

import queue
from fritzconnection.core.fritzmonitor import FritzMonitor
### Monitor the calls of a fritzbox continously ###
###################################################
2021-07-07 22:31:05 +02:00
def watch_disconnect(monitor, event_queue, func, tams, healthcheck_interval=10):
while True:
try:
event = event_queue.get(timeout=healthcheck_interval)
except queue.Empty:
# check health:
if not monitor.is_alive:
raise OSError("Error: fritzmonitor connection failed")
else:
# do event processing here:
print(event)
if 'DISCONNECT;0' in event:
print("Incoming call stopped. Check the TAM.\n")
2021-07-07 22:31:05 +02:00
func(tams)
elif 'DISCONNECT;1' in event:
print("Outgoing call stopped. Do nothing.\n")
else:
print("Unknown event.\n")
2021-07-07 22:31:05 +02:00
def endedCall(func, tams, fritz_ip='192.168.1.1'):
"""
Call this to trigger a given function if a call is disconnected
"""
try:
# as a context manager FritzMonitor will shut down the monitor thread
with FritzMonitor(address=fritz_ip) as monitor:
event_queue = monitor.start()
2021-07-07 22:31:05 +02:00
watch_disconnect(monitor, event_queue, func, tams)
except (OSError, KeyboardInterrupt) as err:
print(err)