Here I’m going to be using PySpark to create a sentiment model for amazon review data using Logistic Regression. The dataset was collected from Professor Julian McAuley’s Amazon product dataset. I will be using a subset of the data titled “Cell Phones and Accessories”, just to keep to a specific sentiment domain.
First, we need to import some necessary libraries for constructing the model, preprocessing the data, and evaluating the results.
from pyspark.sql import SparkSession, SQLContext
And now we can build our spark session:
spark = SparkSession.builder.master("local[*]") \
.config('spark.driver.memory', '15g') \
.appName('amazon_phones').getOrCreate()
sc = spark.sparkContext
Now we need to load the data in. We can load it directly from the link given above with the following function:
import json
import urllib.request
import gzip
def load_amazon_cellphones(num_examples):
link = "http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Cell_Phones_and_Accessories_5.json.gz"
stream = urllib.request.urlopen(link)
file = gzip.open(stream)
lines = []
if num_examples == -1:
for i, line in enumerate(file):
lines.append(json.loads(line))
else:
for i, line in enumerate(file):
lines.append(json.loads(line))
if i == num_examples - 1:
break
return lines
This function locates and unzips the date directly from the source, and allows for you to paritally load the whole dataset in. For example, if you only wanted to load in 100 examples, you could run
dataset = load_amazon_cellphones(100)
I found this to be really helpful when troubleshooting and testing the code. For this article, we’re going to use every row, meaning we have to run the following line:
dataset = load_amazon_cellphones(-1)
After loading in the data, we can look at an example row:
{'asin': '120401325X',
'helpful': [0, 0],
'overall': 4.0,
'reviewText': "They look good and stick good! I just don't like the rounded shape because I was always bumping it and Siri kept popping up and it was irritating. I just won't buy a product like this again",
'reviewTime': '05 21, 2014',
'reviewerID': 'A30TL5EWN6DFXT',
'reviewerName': 'christina',
'summary': 'Looks Good',
'unixReviewTime': 1400630400}
We’re only concerned with the actual review, reviewText
, and the corresponding label, overall
.
The data is currently labeled on a 1-to-5 rating scale. To simplify things, I’m going to remove neutral labels (this is done later) and map these labels to a binary classification problem with the following function:
def create_categories(x):
if x >= 4.0:
return 1.0
else:
return 0.0
The problem has now become “can we predict if a user really liked a product?” Anything less than a score of 4.0 means that a user is either just O.K. with, or really dislikes, a product.
Now we need to create a method for preprocessing our data:
import re
from nltk.corpus import stopwords
from string import punctuation
def clean(review):
#removing numbers
review = re.sub('[0-9]', '', review).lower()
#removing punctuation
review = review.translate(str.maketrans('', '', punctuation))
return review
Now let’s only get the useful columns from our data, which are reviewText
and overall
.
relevant_info = [(line['reviewText'],
line['overall'])
for line in dataset]
And finally, we can get to parallelizing and preprocessing:
#convert data to rdd
rdd = sc.parallelize(relevant_info)
#remove neutral reviews
rdd = rdd.filter(lambda x: x[1] != 3.0)
#map categories to binary labels
rdd = rdd.map(lambda x: (x[0], create_categories(x[1])))
#clean reviews
rdd = rdd.map(lambda x: (clean(x[0]), x[1]))
#remove empty rows
rdd = rdd.filter(lambda x: x[0] != '')
Now we can convert the RDD to a DataFrame object:
schema = ['review', 'label']
data = rdd.toDF(schema = schema)
data.show()
You should get something like this:
+--------------------+-----+
| review|label|
+--------------------+-----+
|they look good an...| 1.0|
|these stickers wo...| 1.0|
|these are awesome...| 1.0|
+--------------------+-----+
Now we can view the schema of the resulting dataframe and also check how many rows the data consists of:
data.printSchema()
data.groupBy('label').count().distinct().show()
root
|-- review: string (nullable = true)
|-- label: double (nullable = true)
+-----+------+
|label| count|
+-----+------+
| 0.0| 24328|
| 1.0|148575|
+-----+------+
After removing neutral reviews and preprocessing, we’re left with 172,903 reviews. As shown by the counts of unique labels, the dataseat is heavily imbalanced towards positive labels. We need to account for this in the training and evaluation stages.
First, we can calculate class weights for our sentiment model. Spark has no built-in function to calculate these weights, but [this awesome article by Dan Vatterott] shows how you can implement this feature.
Here is the code for implementing class weights in pyspark, adopted from the above article:
def get_class_weights(dataframe, labelCol):
label_collect = dataframe.select(labelCol).groupby(labelCol).count().collect()
unique_labels = [x['label'] for x in label_collect]
bin_count = [x['count'] for x in label_collect]
total_labels = sum(bin_count)
unique_label_count = len(label_collect)
class_weights = {i: ii for i, ii in zip(unique_labels, total_labels / (unique_label_count * np.array(bin_count)))}
mapping_expr = F.create_map([F.lit(x) for x in chain(*class_weights.items())])
dataframe = dataframe.withColumn('weight', mapping_expr.getItem(F.col(labelCol)))
return dataframe
Now, we can simply run the line to calculate the class weights:
data = get_class_weights(data, labelCol = 'label')
The resulting dataframe should now look like this, with a new weight
column appended.
+--------------------+-----+------------------+
| review|label| weight|
+--------------------+-----+------------------+
|they look good an...| 1.0|0.5818711088675753|
|these stickers wo...| 1.0|0.5818711088675753|
|these are awesome...| 1.0|0.5818711088675753|
|item arrived in g...| 1.0|0.5818711088675753|
|awesome stays on ...| 1.0|0.5818711088675753|
|came just as desc...| 1.0|0.5818711088675753|
|it worked for the...| 0.0| 3.553580236764222|
|good case solid b...| 1.0|0.5818711088675753|
|this is a fantast...| 1.0|0.5818711088675753|
|this case fits pe...| 1.0|0.5818711088675753|
|this is the first...| 1.0|0.5818711088675753|
|performs exactly ...| 1.0|0.5818711088675753|
|unlike most of th...| 1.0|0.5818711088675753|
|just what i neede...| 1.0|0.5818711088675753|
|when there is no ...| 1.0|0.5818711088675753|
|it works great do...| 1.0|0.5818711088675753|
|surprisingly this...| 1.0|0.5818711088675753|
|i have tested thi...| 1.0|0.5818711088675753|
|it worked great f...| 0.0| 3.553580236764222|
|i love that it ha...| 1.0|0.5818711088675753|
+--------------------+-----+------------------+
Now we can start constructing our sentiment model. We are going to construct a pipeline that includes our preprocessing steps and our logistic regression model.
At this stage, we are going to word-tokenize the reviews, remove stopwords, and then convert them to a bag-of-words representation. We’re going to prune all words that show up less than 5 times in the dataset, and also cap the vocabulary size at 10,000 words to reduce model complexity. This is shown in the following code:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
import nltk
nltk.download('stopwords')
sw = list(set(stopwords.words('english')))
#tokenize reviews
regexTokenizer = RegexTokenizer(inputCol = 'review',
outputCol = 'tokenized',
pattern = "\\W")
stopwordsRemover = StopWordsRemover(inputCol = 'tokenized',
outputCol = 'removed_sw').setStopWords(sw)
#convert to BoW representation
countVectorizer = CountVectorizer(inputCol = "removed_sw",
outputCol = "features",
vocabSize = 10000,
minDF = 5)
#weightCol takes in the resulting class weights calculated earlier
lr = LogisticRegression(featuresCol = 'features',
labelCol = 'label',
weightCol = 'weight')
pipeline = Pipeline(stages = [regexTokenizer, stopwordsRemover, countVectorizer, lr])
Now that our pipeline has been created, we can split the data into training and testing partitions and train the model:
train_data, test_data = data.randomSplit([0.9, 0.1])
pipelineFit = pipeline.fit(train_data)
Now it’s time to evaluate our model. Since our data are pretty imbalanced, we should probably avoid using simpler metrics like accuracy. Instead we’ll calculate “Area under ROC” to determine how well the model descriminates between positive and negative reviews.
from pyspark.mllib.evaluation import BinaryClassificationMetrics
metrics = BinaryClassificationMetrics(test_predictions.select('prediction', 'label').rdd)
auroc = metrics.areaUnderROC
print('Area under ROC: {}'.format(auroc))
This cell returns the following result:
Area under ROC: 0.839271845861672
which is pretty good for a simple Logistic Regression model with a BoW representation!
The source code for this article can be found here. It is slightly altered for use on the command line.