Andrew Ray https://www.youtube.com/watch?v=XrpSRCwISdk
df = pd.read_csv("mtcars.csv")
df = spark.read \ .opions(header=True, inferSchema=True) \ .csv("mtcars.csv"))
- Don't panic!
- Read the error
- Research it
- Search/Ask StackOverflow (tag apache-spark)
- Search/Ask the userlist: user@spark.apache.org
- Find a bug? Make JIRA ticket:
https://issues.apache.org/jira/browser/SPARK/
df
df.show()
df
df.head(10)
df.show()
df.show(10)
df.columns
df.dtypes
df.columns
df.dtypes
df.columns = ['a', 'b', 'c']
df.rename([columns = {'old': 'new'})
df.toDF('a', 'b', 'c')
df.withColumnRenamed('old', 'new')
df.drop('mpg', axis=1)
df = df.drop(mpg)
df[df.mpg < 20]
df[df.mpg < 20]
df[df.mpg < 20]
df[(df.mpg < 20) & (df.cyl == 6)]
df[df.mpg < 20]
df[(df.mpg < 20) & (df.cyl == 6)]
df['gpm'] = 1 / df.mpg
df.withColumn('gpm'], 1 / df.mpg)
The result of the above PySpark expression is a new dataframe object. You need to assign it to a new
or exsiting df variable in order to query/process the value later on.
Be careful with division by zero because its NULL in PySpark wheras its infinity in Pandas.
import pyspark.sql.functions as F
data.where(F.col("Label").isNull()).show()The result is a DataFrame with the rows where column label has a value of NULL (DataFrame may not have any rows if there are no null values to be had.).
df.fillna(0) (Many more options)
df.fillna(0)
The result of the above PySpark expression is a new dataframe object. You need to assign it to a new
or exsiting df variable in order to query/process the value later on.
The value in the column 'filtered' was created through a filter condition from the 'words' column. Some 'filtered' values ar empty since the filter removed in some cases all values from 'words' column. The issue to solve here was to compose another non-null column value out of 'filtered' and 'words' values.
# Column filtered contains an array of strings
# Column words contains an array of strings
# -> New Column: FilteredWords contains a filtered value if that array is not empty
# otherwise FilteredWords is assigned the value from 'words' column
# ('words' column values are always present)
#
NewDF = DF.withColumn('FilteredWords', \
F.when(F.size('filtered') == 0, DF['words']) \
.otherwise(DF['filtered']))df.groupby(['cyl', 'gear']) \
.agg({'mpg': 'mean', 'disp': 'min'})df.groupby(['cyl', 'gear']) \
.agg({'mpg': 'mean', 'disp': 'min'})import numpy as np
df['logdisp'] = np.log(df.disp)import pyspark.sql.functions as F
df.withColumn('logdisp'. F.log(df.disp))import pyspark.sql.functions as F
abs,acos,add_months,approxCountDistinctD,approx_count_distinct,array,array_contains,asc,ascii,asin,atan,atan2,avg
,base64,bin,bitwiseNOT,broadcast,bround
,cbrt,ceil,coalesce,col,collect_list,collect_set,column,concat,concat_ws,conv,corr,cos,cosh,count,countDistinct,covar_pop,covar_samp
,crc32,create_map,cume_dist,current_date,current_timestamp
,date_add,date_format,date_sub,datediff,dayofmonth,dayofyear,decode,degrees,dense_rank,desc
,encode,exp,explode,expm1,expr
,factorial,first,floor,format_number,format_string,from_json,from_unixtime,from_utc_timestamp
,get_json_object,greatest,grouping,grouping_id
,hash,hex,hour,hypot
,initcap,input_file_name,instr,isnan,isnull
,json_tuple
,kurtosis
,lag,last,last_day,lead,least,length,levenshtein,lit,locate,log,log10,log1p,log2,lower,lpad,ltrim
,max,md5,mean,min,minute,monotonically_increasing_id,month,months_between
,nanvl,next_day,ntile
,percent_rank,posexplode,pow
,quarter
,radians,rand,randn,rank,regexp_extract,regexp_replace,repeat,reverse,rint,round,row_number,rpad,rtrim
,second,sha1,sha2,shiftLeft,shiftRight,shiftRightUnsigned,signum,sin,sinh,size,skewness,sort_array
,soundex,spark_partition_id,split,sqrt,stddev,stddev_pop,stddev_samp,struct,substring,substring_index
,sum,sumDistinct
,tan,tanh,toDegreesD,toRadiansD,to_date,to_json,to_utc_timestamp,translate,trim,trunc
,udf,unbase64,unhex,unix_timestamp,upper
,var_pop,var_samp,variance
,weekofyear,when,window
,yeardf['cond']=df.apply(lamda r:
1 if r.mpg > 20 else 2 if r.cycl == 6 else 3,
axis=1)# Build a new DataFrame column from a condition over existing dataframe values
# Old and new DataFrame have the same numbers of rows
import pyspark.sql.functions as F
df.withColumn('cond', \
F.when(df.mpg > 20, 1) \
.when(df.cyl == 6, 2) \
.otherwise(3))df['disp1'] = df.disp.apply(lambda x: x+1)
import pyspark.sql.functions as F
from payspark.sql.types import DoubleType
fn = F.udf(lambda x: x+1, DoubleType())
df.withColumn('disp', fn(df.disp))left.merge(right, on='key')
left.merge(right, left_on='a', right_on='b')left.join(right, on='key')
left.join(right, left.a == right.b)pd.pivot_table(df, values='D', \
index=['A','B'], columns=['C'], \
aggfunc=np.sum)df.groupBy("A", "B").pivot("C").sum("D")
df.describe()
df.describe().show (only count, mean standard deviation, min, max)
df.selectExpr(
"percentile_approx(mpg, array(.25, .5, .75)) as mpg"
).show()df.hist()
df.sample(False, 0.1).toPandas().hist()
n/a
df.createOrReplaceTempView('foo')
df2 = spark.sql('select * from foo')- Use
pyspark.sql.functionsand other build in functions - Use the same version of python and packages on cluster as driver
- Check out the UI at http://localhost:4040/
- Learn about SSH port forwarding
- Check out Sparl MLlib
- RTFM: https://spark.apache.org/docs/latest/
-
Try to iterate through rows
-
Hard code a master in your driver
- Use spark-submit for that
-
df.toPandas().head()
- instead do: df.limit(5).toPandas()
Databricks has forms of visualization if you are running Spark DataFrames on databricks.
Use Spark when memory forces you too.