-
Notifications
You must be signed in to change notification settings - Fork 0
/
music_recommender.py
107 lines (76 loc) · 3.4 KB
/
music_recommender.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# -*- coding: utf-8 -*-
"""
Created on Tue Jul 21 12:12:19 2020
@author: Debargho Basak
"""
from pyspark.mllib.recommendation import *
import random
from operator import *
import itertools
#loading data
lines = sc.textFile("artist_data_small.txt")
artistData = lines.map(lambda x: x.split("\t")).map(lambda x: (int (x[0]), x[1]))
lines = sc.textFile("artist_alias_small.txt")
artistAlias = lines.map(lambda x: x.split("\t")).map(lambda x: (int (x[0]), int(x[1])))
alias = artistAlias.collect()
lines = sc.textFile("user_artist_data_small.txt")
def keyVal(x):
part = x.split(" ")
for x1, x2 in alias:
if int(part[1]) == x1:
return (int(part[0]), x2, int(part[2]))
else:
return (int(part[0]), int(part[1]), int(part[2]))
userArtistData = lines.map(keyVal)
total_play_count = sc.parallelize(userArtistData.map(lambda x: (x[0], x[2])).reduceByKey(add).takeOrdered(3, lambda x: -x[1]))
total_artist_count = userArtistData.map(lambda x: (x[0], x[1])).groupByKey().map(lambda x: (x[0], len(list(x[1]))))
final_join = total_play_count.join(total_artist_count).collect()
final_join = sorted(final_join, key=lambda x: x[1][0], reverse=True)
for item in final_join:
print "User %d has a total play count of %d and a mean play count of %d" \
%(item[0], item[1][0], item[1][0]/item[1][1])
trainData, validData, testData = userArtistData.randomSplit([4, 4, 2], seed=13)
print trainData.take(3)
print validData.take(3)
print testData.take(3)
print trainData.count()
print validData.count()
print testData.count()
trainData.cache()
testData.cache()
validData.cache()
def modelEval(model, dataset):
userList = dataset.map(lambda x: x[0]).collect()
allArtist = userArtistData.map(lambda x: x[1]).collect()
trainList = trainData.map(lambda x: (x[0], x[1])).groupByKey()\
.map(lambda x: (x[0], list(x[1]))).collect()
trainDict = dict((x[0], x[1]) for x in trainList)
dataList = dataset.map(lambda x: (x[0], x[1])).groupByKey()\
.map(lambda x: (x[0], list(x[1]))).collect()
dataDict = dict((x[0], x[1]) for x in dataList)
score = 0.0
for user in userList:
nonTrainArtist = set(allArtist) - set(trainDict[user])
artist = map(lambda x: (user, x), nonTrainArtist)
artist = sc.parallelize(artist)
length = len(dataDict[user])
prediction = model.predictAll(artist)
predictionList = prediction.map(lambda x: (x.product, x.rating))\
.takeOrdered(length, key=lambda x: -x[1])
predictionList = sc.parallelize(predictionList)\
.map(lambda x: x[0]).collect()
overlap = set(predictionList).intersection(dataDict[user])
score += (len(overlap) / float(length))
return (score/len(userList))
#training model
rank = [2, 10, 20]
for i in rank:
model = ALS.trainImplicit(trainData, rank = i, seed=345)
print "The model score for rank %d is %f" %(i, modelEval(model, validData))
#checking accuracy of model
bestModel = ALS.trainImplicit(trainData, rank=10, seed=345)
modelEval(bestModel, testData)
#results
recommendation = bestModel.recommendProducts(1059637, 5)
for index, reco in enumerate(recommendation):
print "Artist %d: %s" %(index, str(artistData.lookup(key=reco.product)[0]))