initial commit

This commit is contained in:
aj 2020-07-20 17:07:52 +01:00
commit 786b530dc1
8 changed files with 424 additions and 0 deletions

127
.gitignore vendored Normal file
View File

@ -0,0 +1,127 @@
scratch.py
service.json
*~*
*#
node_modules/
# Byte-compiled / optimized / DLL files
*/__pycache__/*
*.py[cod]
*$py.class
.idea
.vscode
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/

7
README.md Normal file
View File

@ -0,0 +1,7 @@
Ticker
========
Interfacing with the GPIO pins on a Raspberry Pi.
Using GPIOZero and RPLCD.
* Button, Buzzer, LED, LCD

36
main.py Normal file
View File

@ -0,0 +1,36 @@
import os
from signal import pause
from ticker.ticker import Ticker
red_led_pin = 13
yellow_led_pin = 19
green_led_pin = 26
buzzer_pin = 7
button_pins = [21, 20, 16, 12]
lcd_rs = 18
lcd_e = 15
lcd_data = [14, 4, 3, 2]
def main():
tick = Ticker(lcd_rs=lcd_rs,
lcd_e=lcd_e,
lcd_data=lcd_data,
buzzer_pin=buzzer_pin,
red_led_pin=red_led_pin,
yellow_led_pin=yellow_led_pin,
green_led_pin=green_led_pin,
button_pins=button_pins,
fm_username=os.environ.get('FM_USERNAME', 'sarsoo'),
fm_key=os.environ['FM_KEY'])
tick.start()
if __name__ == '__main__':
main()
pause()

8
requirements.txt Normal file
View File

@ -0,0 +1,8 @@
certifi==2020.6.20
chardet==3.0.4
colorzero==1.1
gpiozero==1.5.1
idna==2.10
requests==2.24.0
RPLCD==1.3.0
urllib3==1.25.9

13
ticker.service Normal file
View File

@ -0,0 +1,13 @@
[Unit]
Description=Ticker
[Service]
# Command to execute when the service is started
ExecStart=/usr/bin/python3 /home/pi/ticker/main.py
[Service]
Environment=PYTHONUNBUFFERED=1
Type=simple
[Install]
WantedBy=default.target

16
ticker/__init__.py Normal file
View File

@ -0,0 +1,16 @@
import logging
from ticker.ticker import Ticker
logger = logging.getLogger(__name__)
fmlogger = logging.getLogger('fmframework')
logger.setLevel('DEBUG')
file_handler = logging.FileHandler("ticker.log")
file_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(name)s - %(funcName)s - %(message)s'))
logger.addHandler(file_handler)
fmlogger.addHandler(file_handler)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(logging.Formatter('%(levelname)s %(name)s:%(funcName)s - %(message)s'))
logger.addHandler(stream_handler)
fmlogger.addHandler(stream_handler)

View File

@ -0,0 +1,47 @@
from dataclasses import dataclass
@dataclass
class DisplayItem:
title: str = None
message: str = ''
wrap_line: bool = False
iterations: int = 2
time: int = 5
def scroll_text(text, iterations=2, width=15):
if len(text) < width:
return text
for iteration in range(iterations):
start_idx = 0
final_idx = width
while final_idx <= len(text):
yield text[start_idx:final_idx]
start_idx += 1
final_idx += 1
def loop_text(text, width=15):
while True:
if text:
for scrolled in scroll_text(text, iterations=1, width=width):
yield scrolled
yield ''
else:
yield ''
def zip_lines(top_text='', bottom_text='', iterations=2, width=15):
if len(top_text) == len(bottom_text):
return zip(
scroll_text(top_text, iterations, width), scroll_text(bottom_text, iterations, width)
)
elif len(top_text) > len(bottom_text):
return zip(
scroll_text(top_text, iterations, width), loop_text(bottom_text, width)
)
elif len(top_text) < len(bottom_text):
return zip(
loop_text(top_text, width), scroll_text(bottom_text, iterations, width)
)

170
ticker/ticker.py Normal file
View File

@ -0,0 +1,170 @@
from queue import Queue
from threading import Thread
from time import sleep
import logging
from datetime import date
from RPi import GPIO
from RPLCD.gpio import CharLCD
from gpiozero import TrafficLights, TonalBuzzer, Button
from requests import Session
from fmframework.net.network import Network, LastFMNetworkException
from ticker.display import DisplayItem
logger = logging.getLogger(__name__)
lcd_width = 16
class Ticker:
def __init__(self,
lcd_rs,
lcd_e,
lcd_data,
buzzer_pin,
red_led_pin,
yellow_led_pin,
green_led_pin,
button_pins,
fm_username: str,
fm_key: str):
self.lcd = CharLCD(numbering_mode=GPIO.BCM,
cols=lcd_width,
rows=2,
pin_rs=lcd_rs,
pin_e=lcd_e,
pins_data=lcd_data,
auto_linebreaks=True)
self.leds = TrafficLights(red=red_led_pin, yellow=yellow_led_pin, green=green_led_pin, pwm=True)
self.buzzer = TonalBuzzer(buzzer_pin)
self.notif_button = Button(button_pins[0])
self.notif_button.when_activated = self.handle_notif_click
self.button2 = Button(button_pins[1])
self.button2.when_activated = lambda: self.queue_text('hey', 'hey')
self.button3 = Button(button_pins[2])
self.button4 = Button(button_pins[3])
self.idle_text = dict()
self.notification_queue = Queue()
self.display_queue = Queue()
self.display_thread = Thread(target=self.display_worker, name='display', daemon=True)
self.rsession = Session()
self.fmnet = Network(username=fm_username, api_key=fm_key)
self.puller_thread = Thread(target=self.puller_worker, name='puller', daemon=True)
def start(self):
logger.info('starting ticker')
self.lcd.clear()
self.display_thread.start()
self.puller_thread.start()
self.set_status(green=True)
def set_status(self,
green: bool = False,
yellow: bool = False,
red: bool = False):
self.leds.green.value = 1 if green else 0
self.leds.yellow.value = 1 if yellow else 0
self.leds.red.value = 1 if red else 0
def handle_notif_click(self):
if not self.notification_queue.empty():
while not self.notification_queue.empty():
self.display_queue.put(self.notification_queue.get())
self.leds.red.off()
else:
self.queue_text('No Notifications', '', interrupt=True, time=2)
def puller_worker(self):
while True:
try:
total = self.fmnet.get_scrobble_count_from_date(input_date=date.today())
logger.debug(f'loaded daily scrobbles {total}')
self.queue_text('Scrobbles Today', total)
self.idle_text['daily_scrobbles'] = DisplayItem('Scrobbles', str(total))
except LastFMNetworkException as e:
logger.exception(e)
self.queue_text('Last.FM Error', f'{e.http_code}, {e.error_code}, {e.message}')
sleep(30)
def display_worker(self):
while True:
if not self.display_queue.empty():
display_item = self.display_queue.get(block=False)
logger.info(f'dequeued {display_item}, size {self.display_queue.qsize()}')
self.write_display_item(display_item)
self.display_queue.task_done()
else:
if len(self.idle_text) > 0:
for key, item in self.idle_text.items():
if self.display_queue.empty():
break
logger.debug(f'writing {key}')
self.write_display_item(item)
else:
self.write_to_lcd(['Ticker...', ''])
sleep(0.1)
def write_display_item(self, display_item):
"""write display item to LCD now"""
if display_item.message is None:
display_item.message = ''
if len(display_item.message) > lcd_width:
buffer = [display_item.title, '']
self.write_to_lcd(buffer)
self.loop_string(display_item.message, buffer, row=1, iterations=display_item.iterations)
else:
buffer = [display_item.title, display_item.message]
self.write_to_lcd(buffer)
sleep(display_item.time)
def queue_notification(self, title, message, wrap_line=False, iterations=2):
logger.debug(f'queueing {title}/{message} {iterations} times, wrapped: {wrap_line}')
self.notification_queue.put(DisplayItem(title, str(message), wrap_line, iterations))
self.leds.red.pulse()
def queue_text(self, title, message, wrap_line=False, time=5, iterations=2, interrupt=False):
logger.debug(f'queueing {title}/{message} {iterations} times, wrapped: {wrap_line}')
item = DisplayItem(title, str(message), wrap_line=wrap_line, iterations=iterations, time=time)
if interrupt:
self.write_display_item(item)
else:
self.display_queue.put(item)
def write_to_lcd(self, framebuffer):
"""Write the framebuffer out to the specified LCD."""
self.lcd.home()
for row in framebuffer:
self.lcd.write_string(row.ljust(lcd_width)[:lcd_width])
self.lcd.write_string('\r\n')
def loop_string(self, string, framebuffer, row, delay=0.4, iterations=2):
padding = ' ' * lcd_width
s = padding + string + padding
for round_trip in range(iterations):
for i in range(len(s) - lcd_width + 1):
framebuffer[row] = s[i:i + lcd_width]
self.write_to_lcd(framebuffer)
sleep(delay)