scripts / compare /
Newer Older
328 lines | 11.283kb
ajout d'un script pour compa...
Sébastien MARQUE authored on 2021-12-14
1
#!/usr/bin/env python3
2
# basé sur l'idée de Shivam Aggarwal sur https://shivama205.medium.com/audio-signals-comparison-23e431ed2207
3
# WTFL
4

            
5
import argparse
6
import subprocess 
7
import numpy 
8
import os
9
import sys
10
import time
11
import multiprocessing
12

            
13
def initialize():
14
    defaults = {
15
        'sample_time' : 500, # seconds to sample audio file for fingerprint calculation
16
        'span'        : 150, # number of points to scan cross correlation over
17
        'step'        : 1,   # step size (in points) of cross correlation
18
        'min_overlap' : 20,  # minimum number of points that must overlap in cross correlation
19
                             # exception is raised if this cannot be met
20
        'threshold'   : 80,  # %
21
        'processor'   : os.cpu_count(),
22
        'separator'   : ';'
23
    }
24

            
25
    def check_nproc(arg):
26
        try:
27
            n = int(arg)
28
        except ValueError:
29
            raise argparse.ArgumentTypeError("il faut un nombre entier")
30
        if n <  1 or n > os.cpu_count():
31
            raise argparse.ArgumentTypeError("{} n'est pas compris entre 1 et {:d}".format(n, os.cpu_count()))
32
        return n
33

            
34
    def check_threshold(arg):
35
        try:
36
            n = float(arg)
37
        except ValueError:
38
            raise argparse.ArgumentTypeError("il faut un nombre")
39
        if n <  0 or n > 100:
40
            raise argparse.ArgumentTypeError("{} n'est pas compris entre 0 et 100 inclus".format(n))
41
        return n
42

            
améliore l'analyse des argum...
Sébastien MARQUE authored on 2021-12-15
43
    def parse_input_files(input_file, source_files):
44
        if isinstance(input_file, list):
45
            for f in input_file:
46
                parse_input_files(f, source_files)
47
        else:
48
            if os.path.isfile(input_file):
49
                source_files[input_file] = 1
50
            elif os.path.isdir(input_file):
51
                for root, dirs, files in os.walk(input_file):
52
                    for f in files:
53
                        parse_input_files(os.path.join(root, f), source_files)
54

            
ajout d'un script pour compa...
Sébastien MARQUE authored on 2021-12-14
55
    parser = argparse.ArgumentParser(__file__)
56
    parser.add_argument("-i ", "--source-file",
57
            action   = 'append',
58
            nargs    = '+',
59
            help     = "répertoire ou fichier"
60
            )
61
    parser.add_argument("-t ", "--threshold",
62
            type    = check_threshold,
63
            default = defaults['threshold'],
64
            help    = "seuil en pourcentage sous lequel il est considéré qu'il n'y a pas de corrélation (défaut: %(default)d)"
65
            )
66
    parser.add_argument("-p ", "--processor",
67
            type    = check_nproc,
68
            default = defaults['processor'],
69
            help    = "le nombre de processus parallèles lancés (défaut: %(default)d)"
70
            )
71
    parser.add_argument("--sample-time",
72
            type    = int,
73
            default = defaults['sample_time'],
74
            help    = "seconds to sample audio file for fpcalc (défaut: %(default)d)"
75
            )
76
    parser.add_argument("--span",
77
            type    = int,
78
            default = defaults['span'],
79
            help    = "finesse en points pour scanner la corrélation (défaut: %(default)d)"
80
            )
81
    parser.add_argument("--step",
82
            type    = int,
83
            default = defaults['step'],
84
            help    = "valeur du pas en points de corrélation (défaut: %(default)d)"
85
            )
86
    parser.add_argument("--min-overlap",
87
            type    = int,
88
            default = defaults['min_overlap'],
89
            help    = "nombre minimal de points de correspondance (défaut %(default)d)"
90
            )
91
    parser.add_argument("--separator",
92
            type    = str,
93
            default = defaults['separator'],
94
            help    = "séparateur des champs de résultat (défaut '%(default)s')"
95
            )
96

            
97
    args = parser.parse_args()
supprime une instruction inu...
Sébastien MARQUE authored on 2021-12-15
98

            
améliore l'analyse des argum...
Sébastien MARQUE authored on 2021-12-15
99
    source_files = {}
100
    for f in args.source_file:
101
        parse_input_files(f, source_files)
102

            
103
    return list(source_files.keys()), args
ajout d'un script pour compa...
Sébastien MARQUE authored on 2021-12-14
104
  
105
def prime(i, primes):
106
    for prime in primes:
107
        if not (i == prime or i % prime):
108
            return False
109
    primes.add(i)
110
    return i
111

            
112
def nPrimes(n):
113
    primes = set([2])
114
    i, p = 2, 0
115
    while True:
116
        if prime(i, primes):
117
            p += 1
118
            if p == n:
119
                return primes
120
        i += 1
121

            
122
def getPrimes(n, ids):
123
    a = 0
124
    b = 0
simplifie la récupération de...
Sébastien MARQUE authored on 2021-12-17
125
    for i in ids:
126
        if n % i == 0:
127
            a = i
128
            b = int(n / i)
ajout d'un script pour compa...
Sébastien MARQUE authored on 2021-12-14
129
            break
simplifie la récupération de...
Sébastien MARQUE authored on 2021-12-17
130
    return a, b
ajout d'un script pour compa...
Sébastien MARQUE authored on 2021-12-14
131

            
132
# calculate fingerprint
133
def calculate_fingerprints(filename):
134
    fpcalc_out = subprocess.getoutput('fpcalc -raw -length {} "{}"'.format(args.sample_time, filename))
135
    fingerprint_index = fpcalc_out.find('FINGERPRINT=') + 12
136
    
137
    return fpcalc_out[fingerprint_index:]
138
  
139
# returns correlation between lists
140
def correlation(listx, listy):
141
    if len(listx) == 0 or len(listy) == 0:
142
        # Error checking in main program should prevent us from ever being
143
        # able to get here.
144
        raise Exception('Empty lists cannot be correlated.')
145

            
146
    if len(listx) > len(listy):
147
        listx = listx[:len(listy)]
148
    elif len(listx) < len(listy):
149
        listy = listy[:len(listx)]
150
    
151
    covariance = 0
152
    for i in range(len(listx)):
153
        covariance += 32 - bin(listx[i] ^ listy[i]).count("1")
154
    covariance = covariance / float(len(listx))
155
    
156
    return covariance/32
157
  
158
# return cross correlation, with listy offset from listx
159
def cross_correlation(listx, listy, offset):
160
    if offset > 0:
161
        listx = listx[offset:]
162
        listy = listy[:len(listx)]
163
    elif offset < 0:
164
        offset = -offset
165
        listy = listy[offset:]
166
        listx = listx[:len(listy)]
167
    if min(len(listx), len(listy)) < args.min_overlap:
168
        # Error checking in main program should prevent us from ever being
169
        # able to get here.
170
        return 
171

            
172
    return correlation(listx, listy)
173
  
174
# cross correlate listx and listy with offsets from -span to span
175
def compare(listx, listy, span, step):
176
    if span > min(len(list(listx)), len(list(listy))):
177
        # Error checking in main program should prevent us from ever being
178
        # able to get here.
179
        raise Exception('span >= sample size: %i >= %i\n'
180
                        % (span, min(len(list(listx)), len(list(listy))))
181
                        + 'Reduce span, reduce crop or increase sample_time.')
182
    corr_xy = []
183
    for offset in numpy.arange(-span, span + 1, step):
184
        corr_xy.append(cross_correlation(listx, listy, offset))
185
    return corr_xy
186
  
187
def get_max_corr(corr, source, target):
188
    max_corr_index = corr.index(max(corr))
189
    max_corr_offset = -args.span + max_corr_index * args.step
190
# report matches
191
    if corr[max_corr_index] * 100 >= args.threshold:
192
        return corr[max_corr_index], max_corr_offset
193

            
194
def correlate(source, target):
195
    corr = compare(source, target, args.span, args.step)
196
    return get_max_corr(corr, source, target)
197

            
198
def get_tests_nbr(n):
199
    return n * n - n * ( n + 1 ) / 2
200

            
201
def get_ETA(start, total, done):
202
    now = time.time()
203
    return time.ctime(now + (now - start) / done * (total - done))
204

            
205
def eprint(*args, **kwargs):
206
    print(*args, file=sys.stderr, **kwargs)
207

            
208
def mp_calculate_fingerprints(key):
209
    try:
210
        ziques[key] = {
211
            'fingerprint': list(map(int, calculate_fingerprints(ziques[key]['path']).split(','))),
212
            'path': ziques[key]['path']
213
        }
214
    except:
215
        erreurs.append(ziques[key]['path'])
216
        del ziques[key]
217
        pass
218

            
219
def mp_correlate(key):
220
    try:
221
        c, o = correlate(
222
                ziques[comparaison[key]['a']]['fingerprint'],
223
                ziques[comparaison[key]['b']]['fingerprint'])
224
        comparaison[key] = {
225
            'a': comparaison[key]['a'],
226
            'b': comparaison[key]['b'],
227
            'correlation': c,
228
            'offset': o
229
        }
230
    except:
231
        del comparaison[key]
232
        pass
233

            
234

            
235
if __name__ == "__main__":
236
    global args
237
    source_files, args= initialize()
238

            
239
    if len(source_files) < 2:
240
        print("au moins deux fichiers sont nécessaires")
241
        sys.exit()
242

            
243
    ids = list(nPrimes(len(source_files)))
244
    total_ids = len(ids)
245

            
246
    manager = multiprocessing.Manager()
247
    ziques = manager.dict()
248
    comparaison = manager.dict()
249
    erreurs = manager.list()
250
    pool = multiprocessing.Pool(args.processor)
251

            
252
    for f in range(len(source_files)):
253
        ziques[ids[f]] = { 'path': source_files[f] }
254

            
255
    del source_files
256

            
257
    nb_erreurs = len(erreurs)
258
    start = time.time()
259
    for i, _ in enumerate(pool.imap_unordered(mp_calculate_fingerprints, ziques.keys()), 1):
260
        nb_erreurs = len(erreurs)
261
        print('calcul des empreintes{:s}: {:.1f}% (ETA {:s})'.format(
262
                    ("", " (" + str(nb_erreurs) + " erreur{})".format(("", "s")[nb_erreurs > 1]))[nb_erreurs > 0],
263
                    i / total_ids * 100, 
264
                    get_ETA(start, total_ids, i)),
265
                end='\r')
266
    sys.stdout.write("\033[K") #clear line
267
    print('calcul des empreintes terminé ({:d} fichiers traités{:s})'.format(
268
                len(ziques),
269
                ("", " et " + str(nb_erreurs) + " erreur{}".format(("", "s")[nb_erreurs > 1]))[nb_erreurs > 0]))
270

            
271
    if len(erreurs):
272
        print("Fichier{} en erreur:".format(("", "s")[len(erreurs) > 1]))
273
        for k in erreurs:
274
            print(k)
275
        print()
276

            
277
    erreurs[:] = [] # vide la liste d'erreurs
278
    nb_erreurs = len(erreurs)
279
    nb_tests = get_tests_nbr(len(ziques))
280
    done = 0
281

            
282
    start = time.time()
283
    for a in ziques.keys():
284
        for b in ziques.keys():
285
            id_correl = a * b
286
            if a == b or id_correl in comparaison:
287
                continue
288
            comparaison[id_correl] = {
289
                'a': a,
290
                'b': b
291
            }
292
            done += 1
293
        print("construction liste: {:.1f}% (ETA {:s})".format(
294
                    done / nb_tests * 100,
295
                    get_ETA(start, nb_tests, done)),
296
                end='\r')
297
    sys.stdout.write("\033[K") #clear line
298

            
299
    tests_nbr = len(comparaison)
300

            
301
    start = time.time()
302
    for i, _ in enumerate(pool.imap_unordered(mp_correlate, comparaison.keys()), 1):
améliore l'affichage de la p...
Sébastien MARQUE authored on 2021-12-15
303
        found = len(comparaison) + i - tests_nbr
304
        print('{:s} corrélation{pluriel:s} trouvée{pluriel:s}: {:.1f}% (ETA {:s}){:s}'.format(
305
                    ("aucune", str(found))[found > 0],
ajout d'un script pour compa...
Sébastien MARQUE authored on 2021-12-14
306
                    i / tests_nbr * 100,
307
                    get_ETA(start, tests_nbr, i),
améliore l'affichage de la p...
Sébastien MARQUE authored on 2021-12-15
308
                    '      ',
309
                    pluriel = ("", "s")[found > 1]),
ajout d'un script pour compa...
Sébastien MARQUE authored on 2021-12-14
310
                end='\r')
311

            
312
    sys.stdout.write("\033[K") #clear line
313
    print('comparaison terminée:\n{0:d} comparaison{pluriel1} effectuée{pluriel1}\n{1} corrélation{pluriel2} trouvée{pluriel2} (seuil {2}%)'.format(
314
                tests_nbr,
315
                len(comparaison),
316
                args.threshold,
317
                pluriel1=("", "s")[tests_nbr > 1],
318
                pluriel2=("", "s")[len(comparaison) > 1],
319
                ))
320

            
321
    for k in comparaison.keys():
322
        print("{:s}{sep}{:s}{sep}{:.2f}%{sep}{:d}".format(
323
                    ziques[comparaison[k]['a']]['path'],
324
                    ziques[comparaison[k]['b']]['path'],
325
                    comparaison[k]['correlation'] * 100,
326
                    comparaison[k]['offset'],
327
                    sep = args.separator
328
                ))