ajout d'un script pour compa...
|
1 |
#!/usr/bin/env python3 |
2 |
# basé sur l'idée de Shivam Aggarwal sur https://shivama205.medium.com/audio-signals-comparison-23e431ed2207 |
|
3 |
# WTFL |
|
4 | ||
5 |
import argparse |
|
6 |
import subprocess |
|
7 |
import numpy |
|
8 |
import os |
|
9 |
import sys |
|
10 |
import time |
|
11 |
import multiprocessing |
|
12 | ||
13 |
def initialize(): |
|
14 |
defaults = { |
|
15 |
'sample_time' : 500, # seconds to sample audio file for fingerprint calculation |
|
16 |
'span' : 150, # number of points to scan cross correlation over |
|
17 |
'step' : 1, # step size (in points) of cross correlation |
|
18 |
'min_overlap' : 20, # minimum number of points that must overlap in cross correlation |
|
19 |
# exception is raised if this cannot be met |
|
20 |
'threshold' : 80, # % |
|
21 |
'processor' : os.cpu_count(), |
|
22 |
'separator' : ';' |
|
23 |
} |
|
24 | ||
25 |
def check_nproc(arg): |
|
26 |
try: |
|
27 |
n = int(arg) |
|
28 |
except ValueError: |
|
29 |
raise argparse.ArgumentTypeError("il faut un nombre entier") |
|
30 |
if n < 1 or n > os.cpu_count(): |
|
31 |
raise argparse.ArgumentTypeError("{} n'est pas compris entre 1 et {:d}".format(n, os.cpu_count())) |
|
32 |
return n |
|
33 | ||
34 |
def check_threshold(arg): |
|
35 |
try: |
|
36 |
n = float(arg) |
|
37 |
except ValueError: |
|
38 |
raise argparse.ArgumentTypeError("il faut un nombre") |
|
39 |
if n < 0 or n > 100: |
|
40 |
raise argparse.ArgumentTypeError("{} n'est pas compris entre 0 et 100 inclus".format(n)) |
|
41 |
return n |
|
42 | ||
améliore l'analyse des argum...
|
43 |
def parse_input_files(input_file, source_files): |
44 |
if isinstance(input_file, list): |
|
45 |
for f in input_file: |
|
46 |
parse_input_files(f, source_files) |
|
47 |
else: |
|
48 |
if os.path.isfile(input_file): |
|
49 |
source_files[input_file] = 1 |
|
50 |
elif os.path.isdir(input_file): |
|
51 |
for root, dirs, files in os.walk(input_file): |
|
52 |
for f in files: |
|
53 |
parse_input_files(os.path.join(root, f), source_files) |
|
54 | ||
ajout d'un script pour compa...
|
55 |
parser = argparse.ArgumentParser(__file__) |
56 |
parser.add_argument("-i ", "--source-file", |
|
57 |
action = 'append', |
|
58 |
nargs = '+', |
|
59 |
help = "répertoire ou fichier" |
|
60 |
) |
|
61 |
parser.add_argument("-t ", "--threshold", |
|
62 |
type = check_threshold, |
|
63 |
default = defaults['threshold'], |
|
64 |
help = "seuil en pourcentage sous lequel il est considéré qu'il n'y a pas de corrélation (défaut: %(default)d)" |
|
65 |
) |
|
66 |
parser.add_argument("-p ", "--processor", |
|
67 |
type = check_nproc, |
|
68 |
default = defaults['processor'], |
|
69 |
help = "le nombre de processus parallèles lancés (défaut: %(default)d)" |
|
70 |
) |
|
71 |
parser.add_argument("--sample-time", |
|
72 |
type = int, |
|
73 |
default = defaults['sample_time'], |
|
74 |
help = "seconds to sample audio file for fpcalc (défaut: %(default)d)" |
|
75 |
) |
|
76 |
parser.add_argument("--span", |
|
77 |
type = int, |
|
78 |
default = defaults['span'], |
|
79 |
help = "finesse en points pour scanner la corrélation (défaut: %(default)d)" |
|
80 |
) |
|
81 |
parser.add_argument("--step", |
|
82 |
type = int, |
|
83 |
default = defaults['step'], |
|
84 |
help = "valeur du pas en points de corrélation (défaut: %(default)d)" |
|
85 |
) |
|
86 |
parser.add_argument("--min-overlap", |
|
87 |
type = int, |
|
88 |
default = defaults['min_overlap'], |
|
89 |
help = "nombre minimal de points de correspondance (défaut %(default)d)" |
|
90 |
) |
|
91 |
parser.add_argument("--separator", |
|
92 |
type = str, |
|
93 |
default = defaults['separator'], |
|
94 |
help = "séparateur des champs de résultat (défaut '%(default)s')" |
|
95 |
) |
|
96 | ||
97 |
args = parser.parse_args() |
|
supprime une instruction inu...
|
98 | |
améliore l'analyse des argum...
|
99 |
source_files = {} |
100 |
for f in args.source_file: |
|
101 |
parse_input_files(f, source_files) |
|
102 | ||
103 |
return list(source_files.keys()), args |
|
ajout d'un script pour compa...
|
104 |
|
105 |
def prime(i, primes): |
|
106 |
for prime in primes: |
|
107 |
if not (i == prime or i % prime): |
|
108 |
return False |
|
109 |
primes.add(i) |
|
110 |
return i |
|
111 | ||
112 |
def nPrimes(n): |
|
113 |
primes = set([2]) |
|
114 |
i, p = 2, 0 |
|
115 |
while True: |
|
116 |
if prime(i, primes): |
|
117 |
p += 1 |
|
118 |
if p == n: |
|
119 |
return primes |
|
120 |
i += 1 |
|
121 | ||
122 |
def getPrimes(n, ids): |
|
123 |
a = 0 |
|
124 |
b = 0 |
|
125 |
_ids = list(ids) |
|
126 |
for i in range(len(_ids)): |
|
127 |
if n % _ids[i] == 0: |
|
128 |
a = _ids[i] |
|
129 |
b = n / _ids[i] |
|
130 |
break |
|
131 | ||
132 |
return a, int(b) |
|
133 | ||
134 |
# calculate fingerprint |
|
135 |
def calculate_fingerprints(filename): |
|
136 |
fpcalc_out = subprocess.getoutput('fpcalc -raw -length {} "{}"'.format(args.sample_time, filename)) |
|
137 |
fingerprint_index = fpcalc_out.find('FINGERPRINT=') + 12 |
|
138 |
|
|
139 |
return fpcalc_out[fingerprint_index:] |
|
140 |
|
|
141 |
# returns correlation between lists |
|
142 |
def correlation(listx, listy): |
|
143 |
if len(listx) == 0 or len(listy) == 0: |
|
144 |
# Error checking in main program should prevent us from ever being |
|
145 |
# able to get here. |
|
146 |
raise Exception('Empty lists cannot be correlated.') |
|
147 | ||
148 |
if len(listx) > len(listy): |
|
149 |
listx = listx[:len(listy)] |
|
150 |
elif len(listx) < len(listy): |
|
151 |
listy = listy[:len(listx)] |
|
152 |
|
|
153 |
covariance = 0 |
|
154 |
for i in range(len(listx)): |
|
155 |
covariance += 32 - bin(listx[i] ^ listy[i]).count("1") |
|
156 |
covariance = covariance / float(len(listx)) |
|
157 |
|
|
158 |
return covariance/32 |
|
159 |
|
|
160 |
# return cross correlation, with listy offset from listx |
|
161 |
def cross_correlation(listx, listy, offset): |
|
162 |
if offset > 0: |
|
163 |
listx = listx[offset:] |
|
164 |
listy = listy[:len(listx)] |
|
165 |
elif offset < 0: |
|
166 |
offset = -offset |
|
167 |
listy = listy[offset:] |
|
168 |
listx = listx[:len(listy)] |
|
169 |
if min(len(listx), len(listy)) < args.min_overlap: |
|
170 |
# Error checking in main program should prevent us from ever being |
|
171 |
# able to get here. |
|
172 |
return |
|
173 | ||
174 |
return correlation(listx, listy) |
|
175 |
|
|
176 |
# cross correlate listx and listy with offsets from -span to span |
|
177 |
def compare(listx, listy, span, step): |
|
178 |
if span > min(len(list(listx)), len(list(listy))): |
|
179 |
# Error checking in main program should prevent us from ever being |
|
180 |
# able to get here. |
|
181 |
raise Exception('span >= sample size: %i >= %i\n' |
|
182 |
% (span, min(len(list(listx)), len(list(listy)))) |
|
183 |
+ 'Reduce span, reduce crop or increase sample_time.') |
|
184 |
corr_xy = [] |
|
185 |
for offset in numpy.arange(-span, span + 1, step): |
|
186 |
corr_xy.append(cross_correlation(listx, listy, offset)) |
|
187 |
return corr_xy |
|
188 |
|
|
189 |
def get_max_corr(corr, source, target): |
|
190 |
max_corr_index = corr.index(max(corr)) |
|
191 |
max_corr_offset = -args.span + max_corr_index * args.step |
|
192 |
# report matches |
|
193 |
if corr[max_corr_index] * 100 >= args.threshold: |
|
194 |
return corr[max_corr_index], max_corr_offset |
|
195 | ||
196 |
def correlate(source, target): |
|
197 |
corr = compare(source, target, args.span, args.step) |
|
198 |
return get_max_corr(corr, source, target) |
|
199 | ||
200 |
def get_tests_nbr(n): |
|
201 |
return n * n - n * ( n + 1 ) / 2 |
|
202 | ||
203 |
def get_ETA(start, total, done): |
|
204 |
now = time.time() |
|
205 |
return time.ctime(now + (now - start) / done * (total - done)) |
|
206 | ||
207 |
def eprint(*args, **kwargs): |
|
208 |
print(*args, file=sys.stderr, **kwargs) |
|
209 | ||
210 |
def mp_calculate_fingerprints(key): |
|
211 |
try: |
|
212 |
ziques[key] = { |
|
213 |
'fingerprint': list(map(int, calculate_fingerprints(ziques[key]['path']).split(','))), |
|
214 |
'path': ziques[key]['path'] |
|
215 |
} |
|
216 |
except: |
|
217 |
erreurs.append(ziques[key]['path']) |
|
218 |
del ziques[key] |
|
219 |
pass |
|
220 | ||
221 |
def mp_correlate(key): |
|
222 |
try: |
|
223 |
c, o = correlate( |
|
224 |
ziques[comparaison[key]['a']]['fingerprint'], |
|
225 |
ziques[comparaison[key]['b']]['fingerprint']) |
|
226 |
comparaison[key] = { |
|
227 |
'a': comparaison[key]['a'], |
|
228 |
'b': comparaison[key]['b'], |
|
229 |
'correlation': c, |
|
230 |
'offset': o |
|
231 |
} |
|
232 |
except: |
|
233 |
del comparaison[key] |
|
234 |
pass |
|
235 | ||
236 | ||
237 |
if __name__ == "__main__": |
|
238 |
global args |
|
239 |
source_files, args= initialize() |
|
240 | ||
241 |
if len(source_files) < 2: |
|
242 |
print("au moins deux fichiers sont nécessaires") |
|
243 |
sys.exit() |
|
244 | ||
245 |
ids = list(nPrimes(len(source_files))) |
|
246 |
total_ids = len(ids) |
|
247 | ||
248 |
manager = multiprocessing.Manager() |
|
249 |
ziques = manager.dict() |
|
250 |
comparaison = manager.dict() |
|
251 |
erreurs = manager.list() |
|
252 |
pool = multiprocessing.Pool(args.processor) |
|
253 | ||
254 |
for f in range(len(source_files)): |
|
255 |
ziques[ids[f]] = { 'path': source_files[f] } |
|
256 | ||
257 |
del source_files |
|
258 | ||
259 |
nb_erreurs = len(erreurs) |
|
260 |
start = time.time() |
|
261 |
for i, _ in enumerate(pool.imap_unordered(mp_calculate_fingerprints, ziques.keys()), 1): |
|
262 |
nb_erreurs = len(erreurs) |
|
263 |
print('calcul des empreintes{:s}: {:.1f}% (ETA {:s})'.format( |
|
264 |
("", " (" + str(nb_erreurs) + " erreur{})".format(("", "s")[nb_erreurs > 1]))[nb_erreurs > 0], |
|
265 |
i / total_ids * 100, |
|
266 |
get_ETA(start, total_ids, i)), |
|
267 |
end='\r') |
|
268 |
sys.stdout.write("\033[K") #clear line |
|
269 |
print('calcul des empreintes terminé ({:d} fichiers traités{:s})'.format( |
|
270 |
len(ziques), |
|
271 |
("", " et " + str(nb_erreurs) + " erreur{}".format(("", "s")[nb_erreurs > 1]))[nb_erreurs > 0])) |
|
272 | ||
273 |
if len(erreurs): |
|
274 |
print("Fichier{} en erreur:".format(("", "s")[len(erreurs) > 1])) |
|
275 |
for k in erreurs: |
|
276 |
print(k) |
|
277 |
print() |
|
278 | ||
279 |
erreurs[:] = [] # vide la liste d'erreurs |
|
280 |
nb_erreurs = len(erreurs) |
|
281 |
nb_tests = get_tests_nbr(len(ziques)) |
|
282 |
done = 0 |
|
283 | ||
284 |
start = time.time() |
|
285 |
for a in ziques.keys(): |
|
286 |
for b in ziques.keys(): |
|
287 |
id_correl = a * b |
|
288 |
if a == b or id_correl in comparaison: |
|
289 |
continue |
|
290 |
comparaison[id_correl] = { |
|
291 |
'a': a, |
|
292 |
'b': b |
|
293 |
} |
|
294 |
done += 1 |
|
295 |
print("construction liste: {:.1f}% (ETA {:s})".format( |
|
296 |
done / nb_tests * 100, |
|
297 |
get_ETA(start, nb_tests, done)), |
|
298 |
end='\r') |
|
299 |
sys.stdout.write("\033[K") #clear line |
|
300 | ||
301 |
tests_nbr = len(comparaison) |
|
302 | ||
303 |
start = time.time() |
|
304 |
for i, _ in enumerate(pool.imap_unordered(mp_correlate, comparaison.keys()), 1): |
|
305 |
print('comparaisons: {:.1f}% (ETA {:s})'.format( |
|
306 |
i / tests_nbr * 100, |
|
307 |
get_ETA(start, tests_nbr, i), |
|
308 |
len), |
|
309 |
end='\r') |
|
310 | ||
311 |
sys.stdout.write("\033[K") #clear line |
|
312 |
print('comparaison terminée:\n{0:d} comparaison{pluriel1} effectuée{pluriel1}\n{1} corrélation{pluriel2} trouvée{pluriel2} (seuil {2}%)'.format( |
|
313 |
tests_nbr, |
|
314 |
len(comparaison), |
|
315 |
args.threshold, |
|
316 |
pluriel1=("", "s")[tests_nbr > 1], |
|
317 |
pluriel2=("", "s")[len(comparaison) > 1], |
|
318 |
)) |
|
319 | ||
320 |
for k in comparaison.keys(): |
|
321 |
print("{:s}{sep}{:s}{sep}{:.2f}%{sep}{:d}".format( |
|
322 |
ziques[comparaison[k]['a']]['path'], |
|
323 |
ziques[comparaison[k]['b']]['path'], |
|
324 |
comparaison[k]['correlation'] * 100, |
|
325 |
comparaison[k]['offset'], |
|
326 |
sep = args.separator |
|
327 |
)) |