-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutil.py
More file actions
533 lines (436 loc) · 14.8 KB
/
util.py
File metadata and controls
533 lines (436 loc) · 14.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
import os
import re
import click
import shutil
import unicodedata
from appscript import app, k
from tqdm import tqdm
def confirm(text, fg='green', **kwargs):
"""
Confirm prompt
:param text: prompt text
:type text: str
:param fg: foreground color
:type fg: str
:param kwargs: other arguments
:return: confirmation result
"""
return click.confirm(click.style('> {}'.format(text), fg=fg, bold=True),
**kwargs)
def status(text):
"""
Print running status
:param text: status text
:type text: str
"""
click.secho('{}'.format(text), fg='blue', bold=True)
def info(text):
"""
Print running info
:param text: status text
:type text: str
"""
click.secho('{}'.format(text), fg='green', bold=True)
def warning(text):
"""
Print warning message
:param text: warning message
:type text: str
"""
click.secho('{}'.format(text), fg='yellow', bold=True)
def error(text):
"""
Print error message
:param text: error message
:type text: str
"""
click.secho('{}'.format(text), fg='red', bold=True)
# sys.exit(1)
APPS = {
'iTunes': {
'binary': '/Applications/iTunes.app/Contents/MacOS/iTunes',
},
'Music': {
'binary': '/System/Applications/Music.app/Contents/MacOS/Music',
}
}
def detect_app():
"""
Detect application (iTunes or Music)
"""
for music_app, config in APPS.items():
if os.path.isfile(config['binary']):
return music_app
raise RuntimeError('Error detecting music player application')
def get_itunes_playlists():
"""
Get list of playlist from iTunes
:return: playlists in iTunes
:rtype: list
"""
return app(detect_app()).user_playlists()
def get_tracks_in_playlist(playlists):
"""
Get tracks of desired playlists in iTunes
:param playlists: name of desired playlists
:type playlists: str or list[str]
:return: list of file path
:rtype: list
"""
if isinstance(playlists, str):
playlists = [playlists]
tracks_in_playlist = []
for playlist in get_itunes_playlists():
if playlist.name() in playlists:
for track in playlist.file_tracks():
tracks_in_playlist.append(track)
return tracks_in_playlist
def valid_value(value):
return value != k.missing_value
def get_files_in_playlist(playlists):
"""
Get file path of desired playlists in iTunes
Note that path to track file may contain decomposed japanese
character like `0x3099` and `0x309a`, which won"t recognized by walkman
:param playlists: name of desired playlists
:type playlists: str or list[str]
:return: list of file path
:rtype: list[str]
"""
tracks_in_playlist = get_tracks_in_playlist(playlists)
files_in_playlist = []
for t in tqdm(tracks_in_playlist):
if valid_value(t.location()):
files_in_playlist.append(t.location().path)
return files_in_playlist
def get_lyrics_files_in_playlist(playlists):
"""
Get lyrics file path of desired playlists in iTunes
Note that path to lyrics file may contain decomposed japanese
character like `0x3099` and `0x309a`, which won"t recognized by walkman
:param playlists: name of desired playlists
:type playlists: str or list[str]
:return: list of lyrics file path
:rtype: list[str]
"""
tracks_in_playlist = get_tracks_in_playlist(playlists)
files_in_playlist = [
get_lyrics_path(t.location().path)
for t in tracks_in_playlist
if valid_value(t.location())
]
return files_in_playlist
def split_filepath(files, prefix=None):
"""
get common prefix and relative filepath
:param files: list of filepath
:type files: list[str]
:param prefix: common prefix of music files
:type prefix: str
:return: common prefix and relative filepath
:rtype: tuple(str, list[str])
"""
common_prefix = os.path.commonprefix(files)
if prefix is not None:
print('\n'.join([f for f in files if not f.startswith(prefix)]))
assert common_prefix == prefix
relative_paths = [os.path.relpath(f, common_prefix) for f in files]
return common_prefix, relative_paths
def scan_directory(target, extension):
"""
Scan given directory and get file lists
Note that `os.walk()` will return a path with decomposed japanese
character like `0x3099` and `0x309a` despite the real path is composed
:param target: target directory
:type target: str
:param extension: valid extension
:type extension: str or list[str]
:return: relative file lists
:rtype: list[str]
"""
if not target.endswith('/'):
target += '/'
relative_path = []
for root, dirs, files in os.walk(target):
curr = root.replace(target, '')
for f in files:
if is_extension(f, extension):
relative_path.append(os.path.join(curr, f))
return relative_path
def compare_filelists(files_src, files_dst, root_src, root_dst):
"""
Compare two file lists
Note that `files_src`, `files_dst` and files in `root_src` may contain
decomposed japanese character like `0x3099` and `0x309a`.
While files in `root_dst` won't.
:param files_src: list of files in source directory
:type files_src: list[str]
:param files_dst: list of files in source directory
:type files_dst: list[str]
:param root_src: path to source directory
:type root_src: str
:param root_dst: path to source directory
:type root_dst: str
:return: list of files to be updated, removed and ignored
:rtype: tuple(list, list, list)
"""
to_be_updated = [x for x in files_src if x not in files_dst]
to_be_removed = [x for x in files_dst if x not in files_src]
to_be_ignored = []
files_on_both_sides = [x for x in files_src if x in files_dst]
eps = 0.01
for f in files_on_both_sides:
if os.path.getmtime(os.path.join(root_src, f)) - eps > \
os.path.getmtime(os.path.join(root_dst, compose_str(f))):
print(f)
print(os.path.getmtime(os.path.join(root_src, f)))
print(os.path.getmtime(os.path.join(root_dst, compose_str(f))))
to_be_updated.append(f)
else:
to_be_ignored.append(f)
return to_be_updated, to_be_removed, to_be_ignored
def sync_filelists(to_be_updated,
to_be_removed,
src_dir,
dst_dir,
remove_unmatched=False):
"""
Sync list of files from source directory to target
:param to_be_updated: files to update
:type to_be_updated: list[str]
:param to_be_removed: files to remove
:type to_be_removed: list[str]
:param src_dir: path to source directory
:type src_dir: str
:param dst_dir: path to target directory
:type dst_dir: str
:param remove_unmatched: whether to remove unmatched files or not
:type remove_unmatched: bool
"""
if len(to_be_updated) > 0:
progress = tqdm(sorted(to_be_updated))
for file in progress:
if os.path.exists(os.path.join(src_dir, file)):
progress.set_description('Updating {}'.format(
os.path.basename(file)))
target_file = os.path.join(dst_dir, compose_str(file))
target_dir = os.path.dirname(target_file)
if not os.path.exists(target_dir):
os.makedirs(target_dir)
shutil.copy2(os.path.join(src_dir, file),
os.path.join(dst_dir, compose_str(file)))
else:
print('Nothing to update')
if len(to_be_removed) > 0 and remove_unmatched:
progress = tqdm(sorted(to_be_removed))
for file in progress:
if os.path.exists(os.path.join(src_dir, file)):
progress.set_description('Removing {}'.format(
os.path.basename(file)))
target_file = os.path.join(dst_dir, compose_str(file))
os.remove(target_file)
else:
print('Nothing to remove')
def is_extension(filepath, ext):
"""
Check whether file extension is desired
:param filepath: path to file
:type filepath: str
:param ext: valid extension
:type ext: str or list
:return: whether file extension is desired
:rtype: bool
"""
if isinstance(ext, str):
ext = [ext]
filename, file_ext = os.path.splitext(filepath)
return file_ext in ext
def get_lyricsx_file(track, lyrics_dir):
"""
Get lyrics file of given track in LyricsX directory
:param track: given track
:param lyrics_dir: root directory of lyrics files
:type lyrics_dir: str
:return: lyrics filename
:rtype: str
"""
title = track.name().replace('/', '&')
artist = track.artist().replace('/', '&')
lrc_file = os.path.join(lyrics_dir, "{} - {}.lrcx".format(title, artist))
if os.path.exists(lrc_file):
return lrc_file
else:
return None
def get_lyrics_path(song):
"""
Convert song file path to corresponding lyrics file path
:param song: path to song file
:type song: str
:return: path to corresponding lyrics file
:rtype: str
"""
return os.path.splitext(song)[0] + '.lrc'
def format_timestamp(timestamp):
"""
Format timestamp xx:xx.xxx as xx:xx.xx
:param timestamp: timestamp
:type timestamp: str
:return: formatted timestamp
:rtype: str
"""
if len(re.findall('\d+:\d+\.\d\d\d', timestamp)) != 0:
timestamp = re.findall('\d+:\d+\.\d\d\d', timestamp)[0]
tsp = timestamp.split(':')
return '%s:%05.2f' % (tsp[0], float(tsp[1]))
elif len(re.findall('\d+:\d+\.\d\d', timestamp)) == 0:
# print(timestamp)
pass
return timestamp
def format_lyrics(lrc_file):
"""
Parse lyrics file and convert it to walkman-compatible format
:param lrc_file: path to lyrics file
:type lrc_file: str
:return: lyrics in walkman-compatible format
:rtype: str
"""
formatted = []
with open(lrc_file) as f:
lines = f.readlines()
lrc_lines = [
l.strip()
for l in lines
if re.findall('\[\d+:\d+\.\d+\]', l) != [] and '[tr]' not in l and
'[tt]' not in l
]
for idx, lrc_line in enumerate(lrc_lines):
if idx + 1 == len(lrc_lines):
continue
curr_timestamp = re.findall('\d+:\d+\.\d+', lrc_line)[0]
next_timestamp = re.findall('\d+:\d+\.\d+', lrc_lines[idx + 1])[0]
curr_timestamp_formatted = format_timestamp(curr_timestamp)
next_timestamp_formatted = format_timestamp(next_timestamp)
curr_line_split = lrc_line.split('[{}]'.format(curr_timestamp))
curr_lyrics = curr_line_split[1] if len(
curr_line_split) == 2 else ''
formatted.append('[{}][{}]{}'.format(curr_timestamp_formatted,
next_timestamp_formatted,
curr_lyrics))
with open(lrc_file, 'w') as f:
f.write('\n'.join(formatted))
def struct_lyrics_dir(tracks, src_dir, dst_dir):
"""
Copy lyrics file to given directory with the same structure as tracks
:param tracks: list of tracks
:type tracks: list
:param src_dir: path to soruce directory
:type src_dir: str
:param dst_dir: path to target directory
:type dst_dir: str
"""
# get files
files = [t.location().path for t in tracks if valid_value(t.location())]
common_prefix = os.path.commonprefix(files)
relative_paths = [
get_lyrics_path(f.replace(common_prefix, '')) for f in files
]
progress = tqdm(tracks)
for idx, track in enumerate(progress):
lrc_src = get_lyricsx_file(track, src_dir)
if lrc_src is not None:
lrc_dst = os.path.join(dst_dir, relative_paths[idx])
if not os.path.exists(os.path.dirname(lrc_dst)):
os.makedirs(os.path.dirname(lrc_dst))
progress.set_description('Copying {}'.format(
os.path.basename(lrc_dst)))
shutil.copy2(lrc_src, lrc_dst)
format_lyrics(lrc_dst)
def format_playlist_with_prefix(songs, prefix):
"""
Format track list in relative paths with given prefix
:param songs: list of songs
:type songs: list[str]
:param prefix: path prefix
:type prefix: str
:return: track list in relative paths with given prefix
:rtype: list[str]
"""
_, songs_rel = split_filepath(songs)
songs_with_prefix = [
os.path.join(prefix, compose_str(song)).replace('/', '\\')
for song in songs_rel
]
return songs_with_prefix
def str_to_ord(s):
"""
Convert string to list of character orders
:param s: given string
:type s: str
:return: list of character orders
:rtype: list[int]
"""
return [ord(c) for c in s]
def ord_to_str(ord_list):
"""
Convert list of character orders to string
:param ord_list: list of character orders
:type ord_list: int or list[int]
:return: corresponding string
:rtype: str
"""
if isinstance(ord_list, int):
ord_list = [ord_list]
s = ''
for o in ord_list:
s += chr(o)
return s
def locate_all_occurrence(l, e):
"""
Return indices of all element occurrences in given list
:param l: given list
:type l: list
:param e: element to locate
:return: indices of all occurrences
:rtype: list
"""
return [i for i, x in enumerate(l) if x == e]
def compose_str(s):
"""
Replace decomposed character (like 0x3099 or 0x309a) in given string
with composed one
:param s: given string
:type s: str
:return: composed string
:rtype: str
"""
# char_list = str_to_ord(s)
# corrected_list = []
# voice_mark = locate_all_occurrence(char_list, 0x3099)
# semivoice_mark = locate_all_occurrence(char_list, 0x309a)
#
# curr_idx = 0
# while curr_idx < len(char_list):
# if curr_idx + 1 in voice_mark:
# corrected_ord = char_list[curr_idx] + 1
# corrected_list.append(corrected_ord)
# curr_idx += 2
# elif curr_idx + 1 in semivoice_mark:
# corrected_ord = char_list[curr_idx] + 2
# corrected_list.append(corrected_ord)
# curr_idx += 2
# else:
# corrected_list.append(char_list[curr_idx])
# curr_idx += 1
#
# return ord_to_str(corrected_list)
return unicodedata.normalize('NFC', s)
def decompose_str(s):
"""
Replace composed character in given string with decomposed one
(like 0x3099 or 0x309a)
:param s: given string
:type s: str
:return: decomposed string
:rtype: str
"""
return unicodedata.normalize('NFD', s)