-
Python: Apache Spark 공부 예제 (pyspark)컴퓨터/파이썬 2021. 2. 2. 23:45728x90반응형
Apache Spark
v3.0.1 기준
txt나 csv 불러오기
from pyspark.sql import SparkSession # Spark v3.0.1 spark = SparkSession.builder.master("local").appName("SparkSQL").getOrCreate() spark.sparkContext.setLogLevel("ERROR") # CSV lines = spark.read.option("header", True).csv("./data/customer_orders.csv") # TXT lines = spark.sparkContext.textFile("./data/fake_person.txt") # TXT with schema from pyspark.sql.types import StructType, StructField, IntegerType, StringType schema = StructType( [StructField("id", IntegerType(), True), StructField("name", StringType(), True),] ) names = spark.read.option("sep", " ").schema(schema).csv("./data/Marvel-names.txt")
책 txt 읽고 가장 많이 나온 단어 확인하기
regex
import re as regex from pyspark.sql import SparkSession def normalizeWords(text): return regex.compile(r"\W+", regex.UNICODE).split(text) # Spark v3.0.1 spark = SparkSession.builder.master("local").appName("WordCounter").getOrCreate() spark.sparkContext.setLogLevel("ERROR") lines = spark.sparkContext.textFile("./data/DaddyLongLeg_book_korean.txt") words = lines.flatMap(normalizeWords) wordCounts = words.countByValue() # mostFreqWord = max(wordCounts, key=lambda x: wordCounts[x]) # print(f"{mostFreqWord}: {wordCounts[mostFreqWord]} times") for word, count in sorted(wordCounts.items(), key=lambda x: x[1]): cleanWord = word.encode("utf-8", "ignore") if count > 10: print(cleanWord.decode("utf-8"), count) spark.stop() """ 결과 ... 한 293 보냄 301 총 305 이 317 있는 320 제가 341 """
SQL 데이터 프레임 df 이용
from pyspark.sql import SparkSession # Spark v3.0.1 spark = SparkSession.builder.master("local").appName("SparkSQL").getOrCreate() spark.sparkContext.setLogLevel("ERROR") people = ( spark.read.option("header", True) .option("inferSchema", True) .csv("./data/fakefriends-header.csv") ) print("Inferred schema:") people.printSchema() print("Name column:") people.select("name").show() print("Filter age < 21:") people.filter(people.age < 21).show() print("Group by age:") people.groupBy("age").count().show() print("Make everyone + 10y older:") people.select(people.name, people.age + 10).show() """ 결과 ... Filter age < 21: +------+-------+---+-------+ |userID| name|age|friends| +------+-------+---+-------+ | 21| Miles| 19| 268| ... """
SQL 명령어 사용
from pyspark.sql import Row, SparkSession def parseLine(line): return Row( id=int(line[0]), name=str(line[1].encode("utf-8")), age=int(line[2]), numFriends=int(line[3]), ) # Spark v3.0.1 spark = SparkSession.builder.master("local").appName("SparkSQL").getOrCreate() spark.sparkContext.setLogLevel("ERROR") lines = spark.read.option("header", True).csv("./data/fakefriends-header.csv") people = lines.rdd.map(parseLine) schemaPeople = spark.createDataFrame(people).cache() schemaPeople.createOrReplaceTempView("people") # db name # Option 1 teens = spark.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19") for teen in teens.collect(): print(teen.name, teen.age, teen.numFriends) # Option 2 schemaPeople.groupBy("age").count().orderBy("age").show() spark.stop() """ 결과 b'Miles' 19 268 b'Beverly' 19 269 b'Brunt' 19 5 ... +---+-----+ |age|count| +---+-----+ | 18| 8| ... """
schema 스키마 지정
from pyspark.sql import SparkSession from pyspark.sql.types import ( StructType, StructField, StringType, IntegerType, FloatType, ) # Spark v3.0.1 spark = SparkSession.builder.master("local").appName("MinTemp").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("stationID", StringType(), True), # True for allowing no value StructField("date", IntegerType(), True), StructField("measure_type", StringType(), True), StructField("temperature", FloatType(), True), ] ) df = spark.read.schema(schema).csv("./data/1800_weather.csv") df.printSchema()
Sale
# Load the data into a Spark DataFrame df = spark.read.csv("sales_data.csv", header=True) # Convert the data type of a column to a different type df = df.withColumn("total_sales", df["total_sales"].cast("double")) # Create a new column based on the values of other columns df = df.withColumn("profit", df["total_sales"] - df["cost"]) # Filter rows based on a complex condition filtered_df = df.filter((df.region == "North America") & (df.profit > 1000)) # Group the data by multiple columns and calculate the sum of a column grouped_df = df.groupBy("region", "product_type").sum("total_sales") # Sort the data by multiple columns in descending order sorted_df = df.orderBy(df.region.desc(), df.total_sales.asc()) # Save the DataFrame to a Parquet file df.write.parquet("sales.parquet")
공부 @Github
728x90'컴퓨터 > 파이썬' 카테고리의 다른 글
Python: 자크 비네의 피보나치 수열 방정식 (0) 2021.02.05 Python: 카카오 챗봇 서버 FastAPI로 만들기 (0) 2021.01.17 Python selectolax: Modest 엔진 HTML parser (0) 2021.01.16