scripts / compare /
Newer Older
327 lines | 11.135kb
ajout d'un script pour compa...
Sébastien MARQUE authored on 2021-12-14
1
#!/usr/bin/env python3
2
# basé sur l'idée de Shivam Aggarwal sur https://shivama205.medium.com/audio-signals-comparison-23e431ed2207
3
# WTFL
4

            
5
import argparse
6
import subprocess 
7
import numpy 
8
import os
9
import sys
10
import time
11
import multiprocessing
12

            
13
def initialize():
14
    defaults = {
15
        'sample_time' : 500, # seconds to sample audio file for fingerprint calculation
16
        'span'        : 150, # number of points to scan cross correlation over
17
        'step'        : 1,   # step size (in points) of cross correlation
18
        'min_overlap' : 20,  # minimum number of points that must overlap in cross correlation
19
                             # exception is raised if this cannot be met
20
        'threshold'   : 80,  # %
21
        'processor'   : os.cpu_count(),
22
        'separator'   : ';'
23
    }
24

            
25
    def check_nproc(arg):
26
        try:
27
            n = int(arg)
28
        except ValueError:
29
            raise argparse.ArgumentTypeError("il faut un nombre entier")
30
        if n <  1 or n > os.cpu_count():
31
            raise argparse.ArgumentTypeError("{} n'est pas compris entre 1 et {:d}".format(n, os.cpu_count()))
32
        return n
33

            
34
    def check_threshold(arg):
35
        try:
36
            n = float(arg)
37
        except ValueError:
38
            raise argparse.ArgumentTypeError("il faut un nombre")
39
        if n <  0 or n > 100:
40
            raise argparse.ArgumentTypeError("{} n'est pas compris entre 0 et 100 inclus".format(n))
41
        return n
42

            
améliore l'analyse des argum...
Sébastien MARQUE authored on 2021-12-15
43
    def parse_input_files(input_file, source_files):
44
        if isinstance(input_file, list):
45
            for f in input_file:
46
                parse_input_files(f, source_files)
47
        else:
48
            if os.path.isfile(input_file):
49
                source_files[input_file] = 1
50
            elif os.path.isdir(input_file):
51
                for root, dirs, files in os.walk(input_file):
52
                    for f in files:
53
                        parse_input_files(os.path.join(root, f), source_files)
54

            
ajout d'un script pour compa...
Sébastien MARQUE authored on 2021-12-14
55
    parser = argparse.ArgumentParser(__file__)
56
    parser.add_argument("-i ", "--source-file",
57
            action   = 'append',
58
            nargs    = '+',
59
            help     = "répertoire ou fichier"
60
            )
61
    parser.add_argument("-t ", "--threshold",
62
            type    = check_threshold,
63
            default = defaults['threshold'],
64
            help    = "seuil en pourcentage sous lequel il est considéré qu'il n'y a pas de corrélation (défaut: %(default)d)"
65
            )
66
    parser.add_argument("-p ", "--processor",
67
            type    = check_nproc,
68
            default = defaults['processor'],
69
            help    = "le nombre de processus parallèles lancés (défaut: %(default)d)"
70
            )
71
    parser.add_argument("--sample-time",
72
            type    = int,
73
            default = defaults['sample_time'],
74
            help    = "seconds to sample audio file for fpcalc (défaut: %(default)d)"
75
            )
76
    parser.add_argument("--span",
77
            type    = int,
78
            default = defaults['span'],
79
            help    = "finesse en points pour scanner la corrélation (défaut: %(default)d)"
80
            )
81
    parser.add_argument("--step",
82
            type    = int,
83
            default = defaults['step'],
84
            help    = "valeur du pas en points de corrélation (défaut: %(default)d)"
85
            )
86
    parser.add_argument("--min-overlap",
87
            type    = int,
88
            default = defaults['min_overlap'],
89
            help    = "nombre minimal de points de correspondance (défaut %(default)d)"
90
            )
91
    parser.add_argument("--separator",
92
            type    = str,
93
            default = defaults['separator'],
94
            help    = "séparateur des champs de résultat (défaut '%(default)s')"
95
            )
96

            
97
    args = parser.parse_args()
supprime une instruction inu...
Sébastien MARQUE authored on 2021-12-15
98

            
améliore l'analyse des argum...
Sébastien MARQUE authored on 2021-12-15
99
    source_files = {}
100
    for f in args.source_file:
101
        parse_input_files(f, source_files)
102

            
103
    return list(source_files.keys()), args
ajout d'un script pour compa...
Sébastien MARQUE authored on 2021-12-14
104
  
105
def prime(i, primes):
106
    for prime in primes:
107
        if not (i == prime or i % prime):
108
            return False
109
    primes.add(i)
110
    return i
111

            
112
def nPrimes(n):
113
    primes = set([2])
114
    i, p = 2, 0
115
    while True:
116
        if prime(i, primes):
117
            p += 1
118
            if p == n:
119
                return primes
120
        i += 1
121

            
122
def getPrimes(n, ids):
123
    a = 0
124
    b = 0
125
    _ids = list(ids)
126
    for i in range(len(_ids)):
127
        if n % _ids[i] == 0:
128
            a = _ids[i]
129
            b = n / _ids[i]
130
            break
131

            
132
    return a, int(b)
133

            
134
# calculate fingerprint
135
def calculate_fingerprints(filename):
136
    fpcalc_out = subprocess.getoutput('fpcalc -raw -length {} "{}"'.format(args.sample_time, filename))
137
    fingerprint_index = fpcalc_out.find('FINGERPRINT=') + 12
138
    
139
    return fpcalc_out[fingerprint_index:]
140
  
141
# returns correlation between lists
142
def correlation(listx, listy):
143
    if len(listx) == 0 or len(listy) == 0:
144
        # Error checking in main program should prevent us from ever being
145
        # able to get here.
146
        raise Exception('Empty lists cannot be correlated.')
147

            
148
    if len(listx) > len(listy):
149
        listx = listx[:len(listy)]
150
    elif len(listx) < len(listy):
151
        listy = listy[:len(listx)]
152
    
153
    covariance = 0
154
    for i in range(len(listx)):
155
        covariance += 32 - bin(listx[i] ^ listy[i]).count("1")
156
    covariance = covariance / float(len(listx))
157
    
158
    return covariance/32
159
  
160
# return cross correlation, with listy offset from listx
161
def cross_correlation(listx, listy, offset):
162
    if offset > 0:
163
        listx = listx[offset:]
164
        listy = listy[:len(listx)]
165
    elif offset < 0:
166
        offset = -offset
167
        listy = listy[offset:]
168
        listx = listx[:len(listy)]
169
    if min(len(listx), len(listy)) < args.min_overlap:
170
        # Error checking in main program should prevent us from ever being
171
        # able to get here.
172
        return 
173

            
174
    return correlation(listx, listy)
175
  
176
# cross correlate listx and listy with offsets from -span to span
177
def compare(listx, listy, span, step):
178
    if span > min(len(list(listx)), len(list(listy))):
179
        # Error checking in main program should prevent us from ever being
180
        # able to get here.
181
        raise Exception('span >= sample size: %i >= %i\n'
182
                        % (span, min(len(list(listx)), len(list(listy))))
183
                        + 'Reduce span, reduce crop or increase sample_time.')
184
    corr_xy = []
185
    for offset in numpy.arange(-span, span + 1, step):
186
        corr_xy.append(cross_correlation(listx, listy, offset))
187
    return corr_xy
188
  
189
def get_max_corr(corr, source, target):
190
    max_corr_index = corr.index(max(corr))
191
    max_corr_offset = -args.span + max_corr_index * args.step
192
# report matches
193
    if corr[max_corr_index] * 100 >= args.threshold:
194
        return corr[max_corr_index], max_corr_offset
195

            
196
def correlate(source, target):
197
    corr = compare(source, target, args.span, args.step)
198
    return get_max_corr(corr, source, target)
199

            
200
def get_tests_nbr(n):
201
    return n * n - n * ( n + 1 ) / 2
202

            
203
def get_ETA(start, total, done):
204
    now = time.time()
205
    return time.ctime(now + (now - start) / done * (total - done))
206

            
207
def eprint(*args, **kwargs):
208
    print(*args, file=sys.stderr, **kwargs)
209

            
210
def mp_calculate_fingerprints(key):
211
    try:
212
        ziques[key] = {
213
            'fingerprint': list(map(int, calculate_fingerprints(ziques[key]['path']).split(','))),
214
            'path': ziques[key]['path']
215
        }
216
    except:
217
        erreurs.append(ziques[key]['path'])
218
        del ziques[key]
219
        pass
220

            
221
def mp_correlate(key):
222
    try:
223
        c, o = correlate(
224
                ziques[comparaison[key]['a']]['fingerprint'],
225
                ziques[comparaison[key]['b']]['fingerprint'])
226
        comparaison[key] = {
227
            'a': comparaison[key]['a'],
228
            'b': comparaison[key]['b'],
229
            'correlation': c,
230
            'offset': o
231
        }
232
    except:
233
        del comparaison[key]
234
        pass
235

            
236

            
237
if __name__ == "__main__":
238
    global args
239
    source_files, args= initialize()
240

            
241
    if len(source_files) < 2:
242
        print("au moins deux fichiers sont nécessaires")
243
        sys.exit()
244

            
245
    ids = list(nPrimes(len(source_files)))
246
    total_ids = len(ids)
247

            
248
    manager = multiprocessing.Manager()
249
    ziques = manager.dict()
250
    comparaison = manager.dict()
251
    erreurs = manager.list()
252
    pool = multiprocessing.Pool(args.processor)
253

            
254
    for f in range(len(source_files)):
255
        ziques[ids[f]] = { 'path': source_files[f] }
256

            
257
    del source_files
258

            
259
    nb_erreurs = len(erreurs)
260
    start = time.time()
261
    for i, _ in enumerate(pool.imap_unordered(mp_calculate_fingerprints, ziques.keys()), 1):
262
        nb_erreurs = len(erreurs)
263
        print('calcul des empreintes{:s}: {:.1f}% (ETA {:s})'.format(
264
                    ("", " (" + str(nb_erreurs) + " erreur{})".format(("", "s")[nb_erreurs > 1]))[nb_erreurs > 0],
265
                    i / total_ids * 100, 
266
                    get_ETA(start, total_ids, i)),
267
                end='\r')
268
    sys.stdout.write("\033[K") #clear line
269
    print('calcul des empreintes terminé ({:d} fichiers traités{:s})'.format(
270
                len(ziques),
271
                ("", " et " + str(nb_erreurs) + " erreur{}".format(("", "s")[nb_erreurs > 1]))[nb_erreurs > 0]))
272

            
273
    if len(erreurs):
274
        print("Fichier{} en erreur:".format(("", "s")[len(erreurs) > 1]))
275
        for k in erreurs:
276
            print(k)
277
        print()
278

            
279
    erreurs[:] = [] # vide la liste d'erreurs
280
    nb_erreurs = len(erreurs)
281
    nb_tests = get_tests_nbr(len(ziques))
282
    done = 0
283

            
284
    start = time.time()
285
    for a in ziques.keys():
286
        for b in ziques.keys():
287
            id_correl = a * b
288
            if a == b or id_correl in comparaison:
289
                continue
290
            comparaison[id_correl] = {
291
                'a': a,
292
                'b': b
293
            }
294
            done += 1
295
        print("construction liste: {:.1f}% (ETA {:s})".format(
296
                    done / nb_tests * 100,
297
                    get_ETA(start, nb_tests, done)),
298
                end='\r')
299
    sys.stdout.write("\033[K") #clear line
300

            
301
    tests_nbr = len(comparaison)
302

            
303
    start = time.time()
304
    for i, _ in enumerate(pool.imap_unordered(mp_correlate, comparaison.keys()), 1):
305
        print('comparaisons: {:.1f}% (ETA {:s})'.format(
306
                    i / tests_nbr * 100,
307
                    get_ETA(start, tests_nbr, i),
308
                    len),
309
                end='\r')
310

            
311
    sys.stdout.write("\033[K") #clear line
312
    print('comparaison terminée:\n{0:d} comparaison{pluriel1} effectuée{pluriel1}\n{1} corrélation{pluriel2} trouvée{pluriel2} (seuil {2}%)'.format(
313
                tests_nbr,
314
                len(comparaison),
315
                args.threshold,
316
                pluriel1=("", "s")[tests_nbr > 1],
317
                pluriel2=("", "s")[len(comparaison) > 1],
318
                ))
319

            
320
    for k in comparaison.keys():
321
        print("{:s}{sep}{:s}{sep}{:.2f}%{sep}{:d}".format(
322
                    ziques[comparaison[k]['a']]['path'],
323
                    ziques[comparaison[k]['b']]['path'],
324
                    comparaison[k]['correlation'] * 100,
325
                    comparaison[k]['offset'],
326
                    sep = args.separator
327
                ))