#!/usr/bin/env python3 # basé sur l'idée de Shivam Aggarwal sur https://shivama205.medium.com/audio-signals-comparison-23e431ed2207 # WTFL import argparse import subprocess import numpy import os import sys import time import multiprocessing def initialize(): defaults = { 'sample_time' : 500, # seconds to sample audio file for fingerprint calculation 'span' : 150, # number of points to scan cross correlation over 'step' : 1, # step size (in points) of cross correlation 'min_overlap' : 20, # minimum number of points that must overlap in cross correlation # exception is raised if this cannot be met 'threshold' : 80, # % 'processor' : os.cpu_count(), 'separator' : ';' } def check_nproc(arg): try: n = int(arg) except ValueError: raise argparse.ArgumentTypeError("il faut un nombre entier") if n < 1 or n > os.cpu_count(): raise argparse.ArgumentTypeError("{} n'est pas compris entre 1 et {:d}".format(n, os.cpu_count())) return n def check_threshold(arg): try: n = float(arg) except ValueError: raise argparse.ArgumentTypeError("il faut un nombre") if n < 0 or n > 100: raise argparse.ArgumentTypeError("{} n'est pas compris entre 0 et 100 inclus".format(n)) return n def parse_input_files(input_file, source_files): if isinstance(input_file, list): for f in input_file: parse_input_files(f, source_files) else: if os.path.isfile(input_file): source_files[input_file] = 1 elif os.path.isdir(input_file): for root, dirs, files in os.walk(input_file): for f in files: parse_input_files(os.path.join(root, f), source_files) parser = argparse.ArgumentParser(__file__) parser.add_argument("-i ", "--source-file", action = 'append', nargs = '+', help = "répertoire ou fichier" ) parser.add_argument("-t ", "--threshold", type = check_threshold, default = defaults['threshold'], help = "seuil en pourcentage sous lequel il est considéré qu'il n'y a pas de corrélation (défaut: %(default)d)" ) parser.add_argument("-p ", "--processor", type = check_nproc, default = defaults['processor'], help = "le nombre de processus parallèles lancés (défaut: %(default)d)" ) parser.add_argument("--sample-time", type = int, default = defaults['sample_time'], help = "seconds to sample audio file for fpcalc (défaut: %(default)d)" ) parser.add_argument("--span", type = int, default = defaults['span'], help = "finesse en points pour scanner la corrélation (défaut: %(default)d)" ) parser.add_argument("--step", type = int, default = defaults['step'], help = "valeur du pas en points de corrélation (défaut: %(default)d)" ) parser.add_argument("--min-overlap", type = int, default = defaults['min_overlap'], help = "nombre minimal de points de correspondance (défaut %(default)d)" ) parser.add_argument("--separator", type = str, default = defaults['separator'], help = "séparateur des champs de résultat (défaut '%(default)s')" ) args = parser.parse_args() source_files = {} for f in args.source_file: parse_input_files(f, source_files) return list(source_files.keys()), args def prime(i, primes): for prime in primes: if not (i == prime or i % prime): return False primes.add(i) return i def nPrimes(n): primes = set([2]) i, p = 2, 0 while True: if prime(i, primes): p += 1 if p == n: return primes i += 1 def getPrimes(n, ids): a = 0 b = 0 _ids = list(ids) for i in range(len(_ids)): if n % _ids[i] == 0: a = _ids[i] b = n / _ids[i] break return a, int(b) # calculate fingerprint def calculate_fingerprints(filename): fpcalc_out = subprocess.getoutput('fpcalc -raw -length {} "{}"'.format(args.sample_time, filename)) fingerprint_index = fpcalc_out.find('FINGERPRINT=') + 12 return fpcalc_out[fingerprint_index:] # returns correlation between lists def correlation(listx, listy): if len(listx) == 0 or len(listy) == 0: # Error checking in main program should prevent us from ever being # able to get here. raise Exception('Empty lists cannot be correlated.') if len(listx) > len(listy): listx = listx[:len(listy)] elif len(listx) < len(listy): listy = listy[:len(listx)] covariance = 0 for i in range(len(listx)): covariance += 32 - bin(listx[i] ^ listy[i]).count("1") covariance = covariance / float(len(listx)) return covariance/32 # return cross correlation, with listy offset from listx def cross_correlation(listx, listy, offset): if offset > 0: listx = listx[offset:] listy = listy[:len(listx)] elif offset < 0: offset = -offset listy = listy[offset:] listx = listx[:len(listy)] if min(len(listx), len(listy)) < args.min_overlap: # Error checking in main program should prevent us from ever being # able to get here. return return correlation(listx, listy) # cross correlate listx and listy with offsets from -span to span def compare(listx, listy, span, step): if span > min(len(list(listx)), len(list(listy))): # Error checking in main program should prevent us from ever being # able to get here. raise Exception('span >= sample size: %i >= %i\n' % (span, min(len(list(listx)), len(list(listy)))) + 'Reduce span, reduce crop or increase sample_time.') corr_xy = [] for offset in numpy.arange(-span, span + 1, step): corr_xy.append(cross_correlation(listx, listy, offset)) return corr_xy def get_max_corr(corr, source, target): max_corr_index = corr.index(max(corr)) max_corr_offset = -args.span + max_corr_index * args.step # report matches if corr[max_corr_index] * 100 >= args.threshold: return corr[max_corr_index], max_corr_offset def correlate(source, target): corr = compare(source, target, args.span, args.step) return get_max_corr(corr, source, target) def get_tests_nbr(n): return n * n - n * ( n + 1 ) / 2 def get_ETA(start, total, done): now = time.time() return time.ctime(now + (now - start) / done * (total - done)) def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) def mp_calculate_fingerprints(key): try: ziques[key] = { 'fingerprint': list(map(int, calculate_fingerprints(ziques[key]['path']).split(','))), 'path': ziques[key]['path'] } except: erreurs.append(ziques[key]['path']) del ziques[key] pass def mp_correlate(key): try: c, o = correlate( ziques[comparaison[key]['a']]['fingerprint'], ziques[comparaison[key]['b']]['fingerprint']) comparaison[key] = { 'a': comparaison[key]['a'], 'b': comparaison[key]['b'], 'correlation': c, 'offset': o } except: del comparaison[key] pass if __name__ == "__main__": global args source_files, args= initialize() if len(source_files) < 2: print("au moins deux fichiers sont nécessaires") sys.exit() ids = list(nPrimes(len(source_files))) total_ids = len(ids) manager = multiprocessing.Manager() ziques = manager.dict() comparaison = manager.dict() erreurs = manager.list() pool = multiprocessing.Pool(args.processor) for f in range(len(source_files)): ziques[ids[f]] = { 'path': source_files[f] } del source_files nb_erreurs = len(erreurs) start = time.time() for i, _ in enumerate(pool.imap_unordered(mp_calculate_fingerprints, ziques.keys()), 1): nb_erreurs = len(erreurs) print('calcul des empreintes{:s}: {:.1f}% (ETA {:s})'.format( ("", " (" + str(nb_erreurs) + " erreur{})".format(("", "s")[nb_erreurs > 1]))[nb_erreurs > 0], i / total_ids * 100, get_ETA(start, total_ids, i)), end='\r') sys.stdout.write("\033[K") #clear line print('calcul des empreintes terminé ({:d} fichiers traités{:s})'.format( len(ziques), ("", " et " + str(nb_erreurs) + " erreur{}".format(("", "s")[nb_erreurs > 1]))[nb_erreurs > 0])) if len(erreurs): print("Fichier{} en erreur:".format(("", "s")[len(erreurs) > 1])) for k in erreurs: print(k) print() erreurs[:] = [] # vide la liste d'erreurs nb_erreurs = len(erreurs) nb_tests = get_tests_nbr(len(ziques)) done = 0 start = time.time() for a in ziques.keys(): for b in ziques.keys(): id_correl = a * b if a == b or id_correl in comparaison: continue comparaison[id_correl] = { 'a': a, 'b': b } done += 1 print("construction liste: {:.1f}% (ETA {:s})".format( done / nb_tests * 100, get_ETA(start, nb_tests, done)), end='\r') sys.stdout.write("\033[K") #clear line tests_nbr = len(comparaison) start = time.time() for i, _ in enumerate(pool.imap_unordered(mp_correlate, comparaison.keys()), 1): print('comparaisons: {:.1f}% (ETA {:s})'.format( i / tests_nbr * 100, get_ETA(start, tests_nbr, i), len), end='\r') sys.stdout.write("\033[K") #clear line print('comparaison terminée:\n{0:d} comparaison{pluriel1} effectuée{pluriel1}\n{1} corrélation{pluriel2} trouvée{pluriel2} (seuil {2}%)'.format( tests_nbr, len(comparaison), args.threshold, pluriel1=("", "s")[tests_nbr > 1], pluriel2=("", "s")[len(comparaison) > 1], )) for k in comparaison.keys(): print("{:s}{sep}{:s}{sep}{:.2f}%{sep}{:d}".format( ziques[comparaison[k]['a']]['path'], ziques[comparaison[k]['b']]['path'], comparaison[k]['correlation'] * 100, comparaison[k]['offset'], sep = args.separator ))