Apache Sparktxt나 csv 불러오기책 txt 읽고 가장 많이 나온 단어 확인하기regexSQL 데이터 프레임 df 이용SQL 명령어 사용schema 스키마 지정Sale공부 @Github
-
Python: Apache Spark 공부 예제 (pyspark)컴퓨터/파이썬 2021. 2. 2. 23:45728x90반응형
Apache Spark
v3.0.1 기준
Apache Spark™ - Unified Analytics Engine for Big Data
Ease of Use Write applications quickly in Java, Scala, Python, R, and SQL. Spark offers over 80 high-level operators that make it easy to build parallel apps. And you can use it interactively from the Scala, Python, R, and SQL shells.
spark.apache.org
txt나 csv 불러오기
pythonfrom pyspark.sql import SparkSession # Spark v3.0.1 spark = SparkSession.builder.master("local").appName("SparkSQL").getOrCreate() spark.sparkContext.setLogLevel("ERROR") # CSV lines = spark.read.option("header", True).csv("./data/customer_orders.csv") # TXT lines = spark.sparkContext.textFile("./data/fake_person.txt") # TXT with schema from pyspark.sql.types import StructType, StructField, IntegerType, StringType schema = StructType( [StructField("id", IntegerType(), True), StructField("name", StringType(), True),] ) names = spark.read.option("sep", " ").schema(schema).csv("./data/Marvel-names.txt")
책 txt 읽고 가장 많이 나온 단어 확인하기
regex
pythonimport re as regex from pyspark.sql import SparkSession def normalizeWords(text): return regex.compile(r"\W+", regex.UNICODE).split(text) # Spark v3.0.1 spark = SparkSession.builder.master("local").appName("WordCounter").getOrCreate() spark.sparkContext.setLogLevel("ERROR") lines = spark.sparkContext.textFile("./data/DaddyLongLeg_book_korean.txt") words = lines.flatMap(normalizeWords) wordCounts = words.countByValue() # mostFreqWord = max(wordCounts, key=lambda x: wordCounts[x]) # print(f"{mostFreqWord}: {wordCounts[mostFreqWord]} times") for word, count in sorted(wordCounts.items(), key=lambda x: x[1]): cleanWord = word.encode("utf-8", "ignore") if count > 10: print(cleanWord.decode("utf-8"), count) spark.stop() """ 결과 ... 한 293 보냄 301 총 305 이 317 있는 320 제가 341 """
SQL 데이터 프레임 df 이용
pythonfrom pyspark.sql import SparkSession # Spark v3.0.1 spark = SparkSession.builder.master("local").appName("SparkSQL").getOrCreate() spark.sparkContext.setLogLevel("ERROR") people = ( spark.read.option("header", True) .option("inferSchema", True) .csv("./data/fakefriends-header.csv") ) print("Inferred schema:") people.printSchema() print("Name column:") people.select("name").show() print("Filter age < 21:") people.filter(people.age < 21).show() print("Group by age:") people.groupBy("age").count().show() print("Make everyone + 10y older:") people.select(people.name, people.age + 10).show() """ 결과 ... Filter age < 21: +------+-------+---+-------+ |userID| name|age|friends| +------+-------+---+-------+ | 21| Miles| 19| 268| ... """
SQL 명령어 사용
pythonfrom pyspark.sql import Row, SparkSession def parseLine(line): return Row( id=int(line[0]), name=str(line[1].encode("utf-8")), age=int(line[2]), numFriends=int(line[3]), ) # Spark v3.0.1 spark = SparkSession.builder.master("local").appName("SparkSQL").getOrCreate() spark.sparkContext.setLogLevel("ERROR") lines = spark.read.option("header", True).csv("./data/fakefriends-header.csv") people = lines.rdd.map(parseLine) schemaPeople = spark.createDataFrame(people).cache() schemaPeople.createOrReplaceTempView("people") # db name # Option 1 teens = spark.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19") for teen in teens.collect(): print(teen.name, teen.age, teen.numFriends) # Option 2 schemaPeople.groupBy("age").count().orderBy("age").show() spark.stop() """ 결과 b'Miles' 19 268 b'Beverly' 19 269 b'Brunt' 19 5 ... +---+-----+ |age|count| +---+-----+ | 18| 8| ... """
schema 스키마 지정
pythonfrom pyspark.sql import SparkSession from pyspark.sql.types import ( StructType, StructField, StringType, IntegerType, FloatType, ) # Spark v3.0.1 spark = SparkSession.builder.master("local").appName("MinTemp").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("stationID", StringType(), True), # True for allowing no value StructField("date", IntegerType(), True), StructField("measure_type", StringType(), True), StructField("temperature", FloatType(), True), ] ) df = spark.read.schema(schema).csv("./data/1800_weather.csv") df.printSchema()
Sale
python# Load the data into a Spark DataFrame df = spark.read.csv("sales_data.csv", header=True) # Convert the data type of a column to a different type df = df.withColumn("total_sales", df["total_sales"].cast("double")) # Create a new column based on the values of other columns df = df.withColumn("profit", df["total_sales"] - df["cost"]) # Filter rows based on a complex condition filtered_df = df.filter((df.region == "North America") & (df.profit > 1000)) # Group the data by multiple columns and calculate the sum of a column grouped_df = df.groupBy("region", "product_type").sum("total_sales") # Sort the data by multiple columns in descending order sorted_df = df.orderBy(df.region.desc(), df.total_sales.asc()) # Save the DataFrame to a Parquet file df.write.parquet("sales.parquet")
공부 @Github
Alfex4936/Spark-Studies
Apache Spark 공부 in Python. Contribute to Alfex4936/Spark-Studies development by creating an account on GitHub.
github.com
728x90'컴퓨터 > 파이썬' 카테고리의 다른 글
Python: 자크 비네의 피보나치 수열 방정식 (0) 2021.02.05 Python: 카카오 챗봇 서버 FastAPI로 만들기 (0) 2021.01.17 Python selectolax: Modest 엔진 HTML parser (0) 2021.01.16