import os import sys import logging from PyQt5 import QtCore import qdarktheme import typing import traceback import DBA from subprocess import run from pyqtgraph import mkBrush from mutagen.id3 import ID3 from mutagen.id3._frames import APIC from configparser import ConfigParser from pathlib import Path from appdirs import user_config_dir from logging import debug, error, warning, basicConfig, INFO, DEBUG from ui import Ui_MainWindow from PyQt5.QtWidgets import ( QFileDialog, QLabel, QMainWindow, QApplication, QGraphicsScene, QGraphicsPixmapItem, QMessageBox, QStatusBar, ) from PyQt5.QtCore import ( QSize, QUrl, QTimer, Qt, pyqtSignal, QObject, pyqtSlot, QThreadPool, QRunnable, ) from PyQt5.QtMultimedia import QMediaPlayer, QMediaContent, QAudioProbe from PyQt5.QtGui import QClipboard, QCloseEvent, QFont, QPixmap, QResizeEvent from utils import ( delete_album_art, scan_for_music, initialize_db, add_files_to_database, set_album_art, ) from components import ( PreferencesWindow, AudioVisualizer, CreatePlaylistWindow, ExportPlaylistWindow, ) from utils.get_album_art import get_album_art # good help with signals slots in threads # https://stackoverflow.com/questions/52993677/how-do-i-setup-signals-and-slots-in-pyqt-with-qthreads-in-both-directions # GOOD # https://www.pythonguis.com/tutorials/multithreading-pyqt-applications-qthreadpool/ class WorkerSignals(QObject): """ How to use signals for a QRunnable class; Unlike most cases where signals are defined as class attributes directly in the class, here we define a class that inherits from QObject and define the signals as class attributes in that class. Then we can instantiate that class and use it as a signal object. """ # 1) # Use a naming convention for signals that makes it clear that they are signals # and a corresponding naming convention for the slots that handle them. # For example signal_* and handle_*. # 2) # And try to make the signal content as small as possible. DO NOT pass large objects through signals, like # pandas DataFrames or numpy arrays. Instead, pass the minimum amount of information needed # (i.e. lists of filepaths) signal_started = pyqtSignal() signal_result = pyqtSignal(object) signal_finished = pyqtSignal() signal_progress = pyqtSignal(str) class Worker(QRunnable): """ This is the thread that is going to do the work so that the application doesn't freeze Inherits from QRunnable to handle worker thread setup, signals, and tear down :param callback: the function callback to run on this worker thread. Supplied arg and kwargs will be passed through to the runner :type callback: function :param args: Arguments to pass to the callback function :param kwargs: Keywords to pass to the callback function """ def __init__(self, fn, *args, **kwargs): super(Worker, self).__init__() # Store constructor arguments (re-used for processing) self.fn = fn self.args = args self.kwargs = kwargs self.signals: WorkerSignals = WorkerSignals() # Add a callback to our kwargs self.kwargs["progress_callback"] = self.signals.signal_progress @pyqtSlot() def run(self) -> None: # type: ignore """ This is where the work is done. MUST be called run() in order for QRunnable to work Initialize the runner function with passed args & kwargs """ self.signals.signal_started.emit() try: result = self.fn(*self.args, **self.kwargs) except Exception: traceback.print_exc() exctype, value = sys.exc_info()[:2] self.signals.signal_finished.emit((exctype, value, traceback.format_exc())) error(f"Worker failed: {exctype} | {value} | {traceback.format_exc()}") else: if result: self.signals.signal_finished.emit() self.signals.signal_result.emit(result) else: self.signals.signal_finished.emit() class ApplicationWindow(QMainWindow, Ui_MainWindow): playlistCreatedSignal = pyqtSignal() reloadConfigSignal = pyqtSignal() reloadDatabaseSignal = pyqtSignal() def __init__(self, clipboard): super(ApplicationWindow, self).__init__() # clipboard good self.clipboard = clipboard # Config self.config: ConfigParser = ConfigParser() self.cfg_file = ( Path(user_config_dir(appname="musicpom", appauthor="billypom")) / "config.ini" ) self.config.read(self.cfg_file) # Multithreading stuff... self.threadpool = QThreadPool() # UI self.setupUi(self) self.setWindowTitle("musicpom") self.setup_fonts() # self.vLayoutAlbumArt.SetFixedSize() self.status_bar = QStatusBar() self.permanent_status_label = QLabel("Status...") self.status_bar.addPermanentWidget(self.permanent_status_label) self.setStatusBar(self.status_bar) self.selected_song_filepath: str | None = None self.current_song_filepath: str | None = None self.current_song_metadata: ID3 | dict | None = None self.current_song_album_art: bytes | None = None # widget bits self.album_art_scene: QGraphicsScene = QGraphicsScene() self.player: QMediaPlayer = QMediaPlayer() # Audio player object self.probe: QAudioProbe = QAudioProbe() # Gets audio data self.audio_visualizer: AudioVisualizer = AudioVisualizer(self.player) self.timer = QTimer(self) # Audio timing things # sharing functions with other classes and that self.tableView.load_qapp(self) self.albumGraphicsView.load_qapp(self) # Settings init self.current_volume: int = int(self.config["settings"]["volume"]) self.player.setVolume(self.current_volume) self.volumeLabel.setText(str(self.current_volume)) self.volumeSlider.setValue(self.current_volume) # Audio probe for processing audio signal in real time self.probe.setSource(self.player) self.probe.audioBufferProbed.connect(self.process_probe) # Slider Timer (realtime playback feedback horizontal bar) self.timer.start(100) self.timer.timeout.connect(self.move_slider) # Set fixed size for album art self.albumGraphicsView.setFixedSize(250, 250) # Graphics plot # Make sure PlotWidget doesn't exceed album art height # Adjust to leave room for playback controls self.PlotWidget.setFixedHeight(225) # x range self.PlotWidget.setXRange( 0, self.audio_visualizer.get_x_resolution(), padding=0 ) # y axis range for decibals (-96db to 0db) self.PlotWidget.setYRange(-96, 0, padding=0) # Logarithmic x-axis for frequency display self.PlotWidget.setLogMode(x=False, y=False) self.PlotWidget.setMouseEnabled(x=False, y=False) self.PlotWidget.showGrid(x=True, y=True) # Performance optimizations self.PlotWidget.setAntialiasing(False) self.PlotWidget.setDownsampling(auto=True, mode="peak") self.PlotWidget.setClipToView(True) # Add tick marks for common decibel values (expanded range) y_ticks = [ (-84, "-84dB"), (-60, "-60dB"), (-36, "-36dB"), (-12, "-12dB"), (0, "0dB"), ] self.PlotWidget.getAxis("left").setTicks([y_ticks]) # Add frequency ticks on x-axis freq_ticks = self.audio_visualizer.get_frequency_ticks() self.PlotWidget.getAxis("bottom").setTicks([freq_ticks]) # Connections self.playbackSlider.sliderReleased.connect( lambda: self.player.setPosition(self.playbackSlider.value()) ) # sliderReleased works better than sliderMoved self.volumeSlider.sliderMoved[int].connect(lambda: self.on_volume_changed()) self.speedSlider.sliderMoved.connect( lambda: self.on_speed_changed(self.speedSlider.value()) ) self.playButton.clicked.connect(self.on_play_clicked) # Click to play/pause self.previousButton.clicked.connect(self.on_previous_clicked) self.nextButton.clicked.connect(self.on_next_clicked) # Click to next song # FILE MENU self.actionOpenFiles.triggered.connect(self.open_files) # Open files window self.actionNewPlaylist.triggered.connect(self.create_playlist) self.actionExportPlaylist.triggered.connect(self.export_playlist) # EDIT MENU self.actionPreferences.triggered.connect(self.open_preferences) # VIEW MENU # QUICK ACTIONS MENU self.actionScanLibraries.triggered.connect(self.scan_libraries) self.actionDeleteDatabase.triggered.connect(self.delete_database) self.actionSortColumns.triggered.connect( self.tableView.sort_table_by_multiple_columns ) # QTableView # for drag & drop functionality self.tableView.viewport().installEventFilter(self) ## CONNECTIONS # tableView self.tableView.doubleClicked.connect(self.play_audio_file) self.tableView.enterKey.connect(self.play_audio_file) self.tableView.playPauseSignal.connect( self.on_play_clicked ) # Spacebar toggle play/pause signal self.tableView.handleProgressSignal.connect(self.handle_progress) # playlistTreeView self.playlistTreeView.playlistChoiceSignal.connect( self.tableView.load_music_table ) self.playlistTreeView.allSongsSignal.connect(self.tableView.load_music_table) # albumGraphicsView self.albumGraphicsView.albumArtDropped.connect( self.set_album_art_for_selected_songs ) self.albumGraphicsView.albumArtDeleted.connect( self.delete_album_art_for_current_song ) # _________________ # | | # | | # | Built-in Events | # | | # |_________________| def resizeEvent(self, a0: typing.Optional[QResizeEvent]) -> None: """Do something when the window resizes""" if a0 is not None: return super().resizeEvent(a0) def closeEvent(self, a0: QCloseEvent | None) -> None: """Save settings when closing the application""" # MusicTable/tableView column widths list_of_column_widths = [] for i in range(self.tableView.model2.columnCount()): list_of_column_widths.append(str(self.tableView.columnWidth(i))) column_widths_as_string = ",".join(list_of_column_widths) self.config["table"]["column_widths"] = column_widths_as_string self.config["settings"]["volume"] = str(self.current_volume) self.config["settings"]["window_size"] = ( str(self.width()) + "," + str(self.height()) ) # Save the config with open(self.cfg_file, "w") as configfile: self.config.write(configfile) if a0 is not None: super().closeEvent(a0) # ____________________ # | | # | | # | On action handlers | # | | # |____________________| def on_volume_changed(self) -> None: """Handles volume changes""" try: self.current_volume = self.volumeSlider.value() self.player.setVolume(self.current_volume) self.volumeLabel.setText(str(self.current_volume)) except Exception as e: error(f"main.py on_volume_changed() | Changing volume error: {e}") def on_speed_changed(self, rate: int) -> None: """Handles playback speed changes""" self.player.setPlaybackRate(rate / 50) self.speedLabel.setText("{:.2f}".format(rate / 50)) def on_play_clicked(self) -> None: """Updates the Play & Pause buttons when clicked""" if self.player.state() == QMediaPlayer.State.PlayingState: self.player.pause() self.playButton.setText("▶️") else: if self.player.state() == QMediaPlayer.State.PausedState: self.player.play() self.playButton.setText("⏸️") else: self.play_audio_file() self.playButton.setText("👽") def on_previous_clicked(self) -> None: """""" # TODO: implement this debug("main.py on_previous_clicked()") def on_next_clicked(self) -> None: """""" # TODO: implement this debug("main.py on_next_clicked()") # ____________________ # | | # | | # | Verbs | # | | # |____________________| def setup_fonts(self): """Initializes font sizes and behaviors for various UI components""" font: QFont = QFont() font.setPointSize(16) font.setBold(True) self.artistLabel: QLabel self.artistLabel.setFont(font) self.artistLabel.setTextInteractionFlags(QtCore.Qt.TextInteractionFlag.TextSelectableByMouse) font: QFont = QFont() font.setPointSize(16) font.setBold(False) self.titleLabel.setFont(font) self.titleLabel.setTextInteractionFlags(QtCore.Qt.TextInteractionFlag.TextSelectableByMouse) font: QFont = QFont() font.setPointSize(16) font.setItalic(True) self.albumLabel.setFont(font) self.albumLabel.setTextInteractionFlags(QtCore.Qt.TextInteractionFlag.TextSelectableByMouse) def load_config(self) -> None: """does what it says""" cfg_file = ( Path(user_config_dir(appname="musicpom", appauthor="billypom")) / "config.ini" ) self.config.read(cfg_file) debug("CONFIG LOADED") def get_thread_pool(self) -> QThreadPool: """Returns the threadpool instance""" return self.threadpool def set_permanent_status_bar_message(self, message: str) -> None: """ Sets the permanent message label in the status bar """ # what does this do? self.permanent_status_label.setText(message) def show_status_bar_message(self, message: str, timeout: int | None = None) -> None: """ Show a `message` in the status bar for a length of time - `timeout` in ms (bottom left) """ if timeout: self.status_bar.showMessage(message, timeout) else: self.status_bar.showMessage(message) def play_audio_file(self) -> None: """ Start playback of `tableView.current_song_filepath` & moves playback slider """ self.tableView.set_current_song_filepath() # get metadata self.current_song_metadata = self.tableView.get_current_song_metadata() # read the file url = QUrl.fromLocalFile(self.tableView.get_current_song_filepath()) # load the audio content content = QMediaContent(url) # set the player to play the content self.player.setMedia(content) self.player.play() # play self.move_slider() # mover # self.player.setPlaybackRate(1.5) # assign "now playing" labels & album artwork if self.current_song_metadata is not None: artist = ( self.current_song_metadata["TPE1"][0] if "TPE1" in self.current_song_metadata else None ) album = ( self.current_song_metadata["TALB"][0] if "TALB" in self.current_song_metadata else None ) title = ( self.current_song_metadata["TIT2"][0] if "TIT2" in self.current_song_metadata else None ) self.artistLabel.setText(artist) self.albumLabel.setText(album) self.titleLabel.setText(title) # set album artwork album_art_data = self.tableView.get_current_song_album_art() self.albumGraphicsView.load_album_art(album_art_data) def set_album_art_for_selected_songs(self, album_art_path: str) -> None: """Sets the ID3 tag APIC (album art) for all selected song filepaths""" selected_songs = self.tableView.get_selected_songs_filepaths() for song in selected_songs: debug( f"main.py set_album_art_for_selected_songs() | updating album art for {song}" ) set_album_art(song, album_art_path) def delete_album_art_for_current_song(self) -> None: """Handles deleting the ID3 tag APIC (album art) for current song""" file = self.tableView.get_current_song_filepath() result = delete_album_art(file) if result: # Load the default album artwork in the qgraphicsview album_art_data = get_album_art(None) self.albumGraphicsView.load_album_art(album_art_data) def process_probe(self, buff) -> None: """Audio visualizer buffer processing""" buff.startTime() self.update_audio_visualization() def update_audio_visualization(self) -> None: """Handles updating points on the pyqtgraph visual""" # Use decibel values instead of raw amplitudes y = self.audio_visualizer.get_decibels() if len(y) == 0: return # if self.audio_visualizer._plot_item is None: # thanks cursor sonnet whatever self.PlotWidget.clear() # Use the actual frequency values for x-axis self.audio_visualizer._plot_item = self.PlotWidget.plot( self.audio_visualizer._x_data, # We'll keep using indices for drawing y, pen="b", # Use pen instead of fill for better performance fillLevel=-96 if self.audio_visualizer.use_decibels else 0, # Fill from -96dB for decibel scale fillBrush=mkBrush("b"), ) # else: # self.audio_visualizer._plot_item.setData(self.audio_visualizer._x_data, y) def clear_audio_visualization(self) -> None: self.PlotWidget.clear() self.audio_visualizer._plot_item = None def move_slider(self) -> None: """Handles moving the playback slider""" if self.player.state() == QMediaPlayer.State.StoppedState: return else: if self.playbackSlider.isSliderDown(): # Prevents slider from updating when dragging return # Update the slider if self.player.state() == QMediaPlayer.State.PlayingState: self.playbackSlider.setMinimum(0) self.playbackSlider.setMaximum(self.player.duration()) slider_position = self.player.position() self.playbackSlider.setValue(slider_position) current_minutes, current_seconds = divmod(slider_position / 1000, 60) duration_minutes, duration_seconds = divmod( self.player.duration() / 1000, 60 ) self.startTimeLabel.setText( f"{int(current_minutes):02d}:{int(current_seconds):02d}" ) self.endTimeLabel.setText( f"{int(duration_minutes):02d}:{int(duration_seconds):02d}" ) def add_latest_playlist_to_tree(self) -> None: """Refreshes the playlist tree""" self.playlistTreeView.add_latest_playlist_to_tree() def handle_progress(self, data): """ updates the status bar when progress is emitted """ self.show_status_bar_message(data) # ____________________ # | | # | | # | menubar verbs | # | | # |____________________| # File def open_files(self) -> None: """ Opens the open files window - File > Open > List of song(s) """ open_files_window = QFileDialog( self, "Open file(s)", ".", "Audio files (*.mp3)" ) # QFileDialog.FileMode enum { AnyFile, ExistingFile, Directory, ExistingFiles } open_files_window.setFileMode(QFileDialog.ExistingFiles) open_files_window.exec_() filenames = open_files_window.selectedFiles() # Adds files to the library in a new thread worker = Worker(add_files_to_database, filenames) worker.signals.signal_finished.connect(self.tableView.load_music_table) worker.signals.signal_progress.connect(self.handle_progress) self.threadpool.start(worker) def create_playlist(self) -> None: """Creates a database record for a playlist, given a name""" window = CreatePlaylistWindow(self.playlistCreatedSignal) window.playlistCreatedSignal.connect(self.add_latest_playlist_to_tree) # type: ignore window.exec_() def import_playlist(self) -> None: """ Imports a .m3u file, given a base path attempts to match playlist files to database records that currently exist """ # TODO: implement this pass def export_playlist(self) -> None: """ Export playlist window Takes a certain database playlist and turns it into a .m3u file """ export_playlist_window = ExportPlaylistWindow() export_playlist_window.exec_() # Edit def open_preferences(self) -> None: """Opens the preferences window""" preferences_window = PreferencesWindow( self.reloadConfigSignal, self.reloadDatabaseSignal ) preferences_window.reloadConfigSignal.connect(self.load_config) # type: ignore preferences_window.reloadDatabaseSignal.connect(self.tableView.load_music_table) # type: ignore preferences_window.exec_() # Display the preferences window modally # Quick Actions def scan_libraries(self) -> None: """ Scans for new files in the configured library folder then, refreshes the datagridview """ scan_for_music() self.tableView.load_music_table() def delete_database(self) -> None: """Deletes the entire database""" reply = QMessageBox.question( self, "Confirmation", "Delete database?", QMessageBox.Yes | QMessageBox.No, QMessageBox.Yes, ) if reply == QMessageBox.Yes: initialize_db() self.tableView.load_music_table() def update_config_file(): """ If the user config file is not up to date, update it with samples from sample config """ cfg_file = ( Path(user_config_dir(appname="musicpom", appauthor="billypom")) / "config.ini" ) sample_cfg_file = './sample_config.ini' cfg_path = str(Path(user_config_dir(appname="musicpom", appauthor="billypom"))) # If config path doesn't exist, create it if not os.path.exists(cfg_path): os.makedirs(cfg_path) # If the config file doesn't exist, create it from the sample config if not os.path.exists(cfg_file): debug("copying sample config") # Create config file from sample run(["cp", "./sample_config.ini", cfg_file]) return # make sure config is up to date config = ConfigParser() sample_config = ConfigParser() config.read(cfg_file) sample_config.read(sample_cfg_file) orig_sections = config.sections() sample_sections = sample_config.sections() for section in sample_sections: if section not in orig_sections: # add new sections to the current config config.add_section(section) orig_options = dict(config.items(section)) sample_options = dict(sample_config.items(section)) for option, value in sample_options.items(): if option not in orig_options: # add new options to the section, for current config config.set(section, option, value) with open(cfg_file, "w") as configfile: config.write(configfile) if __name__ == "__main__": # logging setup file_handler = logging.FileHandler(filename="log", encoding="utf-8") stdout_handler = logging.StreamHandler(stream=sys.stdout) handlers = [file_handler, stdout_handler] # basicConfig(filename="log", encoding="utf-8", level=logging.DEBUG) basicConfig( level=DEBUG, format="[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s", handlers=handlers, ) # Initialization update_config_file() cfg_file = ( Path(user_config_dir(appname="musicpom", appauthor="billypom")) / "config.ini" ) cfg_path = str(Path(user_config_dir(appname="musicpom", appauthor="billypom"))) config = ConfigParser() config.read(cfg_file) db_filepath: str = config.get("db", "database") # If the database location isnt set at the config location, move it if not db_filepath.startswith(cfg_path): new_path = f"{cfg_path}/{db_filepath}" debug(f"setting new db-database path: {new_path}") config["db"]["database"] = new_path # Save the config with open(cfg_file, "w") as configfile: config.write(configfile) config.read(cfg_file) db_filepath: str = config.get("db", "database") db_path = db_filepath.split("/") db_path.pop() db_path = "/".join(db_path) # If the db file doesn't exist if not os.path.exists(db_filepath): # If the db directory doesn't exist if not os.path.exists(db_path): # Make the directory os.makedirs(db_path) # Create database on first run with DBA.DBAccess() as db: with open("utils/init.sql", "r") as file: lines = file.read() for statement in lines.split(";"): debug(f"executing [{statement}]") db.execute(statement, ()) # Allow for dynamic imports of my custom classes and utilities? project_root = os.path.abspath(os.path.dirname(__file__)) sys.path.append(project_root) # Start the app app = QApplication(sys.argv) clipboard = app.clipboard() # Dark theme >:3 qdarktheme.setup_theme() # qdarktheme.setup_theme("auto") # Show the UI ui = ApplicationWindow(clipboard) # window size width, height = tuple(config.get("settings", "window_size").split(",")) window_size = QSize(int(width), int(height)) ui.resize(window_size) ui.show() sys.exit(app.exec_())