ABOUT ME

-

  • Python: Apache Spark 공부 예제 (pyspark)
    컴퓨터/파이썬 2021. 2. 2. 23:45
    728x90
    반응형

    Apache Spark

    v3.0.1 기준

     

    Apache Spark™ - Unified Analytics Engine for Big Data

    Ease of Use Write applications quickly in Java, Scala, Python, R, and SQL. Spark offers over 80 high-level operators that make it easy to build parallel apps. And you can use it interactively from the Scala, Python, R, and SQL shells.

    spark.apache.org

    txt나 csv 불러오기

    python
    from pyspark.sql import SparkSession # Spark v3.0.1 spark = SparkSession.builder.master("local").appName("SparkSQL").getOrCreate() spark.sparkContext.setLogLevel("ERROR") # CSV lines = spark.read.option("header", True).csv("./data/customer_orders.csv") # TXT lines = spark.sparkContext.textFile("./data/fake_person.txt") # TXT with schema from pyspark.sql.types import StructType, StructField, IntegerType, StringType schema = StructType( [StructField("id", IntegerType(), True), StructField("name", StringType(), True),] ) names = spark.read.option("sep", " ").schema(schema).csv("./data/Marvel-names.txt")

     

    책 txt 읽고 가장 많이 나온 단어 확인하기

    regex

    python
    import re as regex from pyspark.sql import SparkSession def normalizeWords(text): return regex.compile(r"\W+", regex.UNICODE).split(text) # Spark v3.0.1 spark = SparkSession.builder.master("local").appName("WordCounter").getOrCreate() spark.sparkContext.setLogLevel("ERROR") lines = spark.sparkContext.textFile("./data/DaddyLongLeg_book_korean.txt") words = lines.flatMap(normalizeWords) wordCounts = words.countByValue() # mostFreqWord = max(wordCounts, key=lambda x: wordCounts[x]) # print(f"{mostFreqWord}: {wordCounts[mostFreqWord]} times") for word, count in sorted(wordCounts.items(), key=lambda x: x[1]): cleanWord = word.encode("utf-8", "ignore") if count > 10: print(cleanWord.decode("utf-8"), count) spark.stop() """ 결과 ... 293 보냄 301 305 317 있는 320 제가 341 """

     

    SQL 데이터 프레임 df 이용

    python
    from pyspark.sql import SparkSession # Spark v3.0.1 spark = SparkSession.builder.master("local").appName("SparkSQL").getOrCreate() spark.sparkContext.setLogLevel("ERROR") people = ( spark.read.option("header", True) .option("inferSchema", True) .csv("./data/fakefriends-header.csv") ) print("Inferred schema:") people.printSchema() print("Name column:") people.select("name").show() print("Filter age < 21:") people.filter(people.age < 21).show() print("Group by age:") people.groupBy("age").count().show() print("Make everyone + 10y older:") people.select(people.name, people.age + 10).show() """ 결과 ... Filter age < 21: +------+-------+---+-------+ |userID| name|age|friends| +------+-------+---+-------+ | 21| Miles| 19| 268| ... """

     

    SQL 명령어 사용

    python
    from pyspark.sql import Row, SparkSession def parseLine(line): return Row( id=int(line[0]), name=str(line[1].encode("utf-8")), age=int(line[2]), numFriends=int(line[3]), ) # Spark v3.0.1 spark = SparkSession.builder.master("local").appName("SparkSQL").getOrCreate() spark.sparkContext.setLogLevel("ERROR") lines = spark.read.option("header", True).csv("./data/fakefriends-header.csv") people = lines.rdd.map(parseLine) schemaPeople = spark.createDataFrame(people).cache() schemaPeople.createOrReplaceTempView("people") # db name # Option 1 teens = spark.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19") for teen in teens.collect(): print(teen.name, teen.age, teen.numFriends) # Option 2 schemaPeople.groupBy("age").count().orderBy("age").show() spark.stop() """ 결과 b'Miles' 19 268 b'Beverly' 19 269 b'Brunt' 19 5 ... +---+-----+ |age|count| +---+-----+ | 18| 8| ... """

     

    schema 스키마 지정

    python
    from pyspark.sql import SparkSession from pyspark.sql.types import ( StructType, StructField, StringType, IntegerType, FloatType, ) # Spark v3.0.1 spark = SparkSession.builder.master("local").appName("MinTemp").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("stationID", StringType(), True), # True for allowing no value StructField("date", IntegerType(), True), StructField("measure_type", StringType(), True), StructField("temperature", FloatType(), True), ] ) df = spark.read.schema(schema).csv("./data/1800_weather.csv") df.printSchema()

     

    Sale

    python
    # Load the data into a Spark DataFrame df = spark.read.csv("sales_data.csv", header=True) # Convert the data type of a column to a different type df = df.withColumn("total_sales", df["total_sales"].cast("double")) # Create a new column based on the values of other columns df = df.withColumn("profit", df["total_sales"] - df["cost"]) # Filter rows based on a complex condition filtered_df = df.filter((df.region == "North America") & (df.profit > 1000)) # Group the data by multiple columns and calculate the sum of a column grouped_df = df.groupBy("region", "product_type").sum("total_sales") # Sort the data by multiple columns in descending order sorted_df = df.orderBy(df.region.desc(), df.total_sales.asc()) # Save the DataFrame to a Parquet file df.write.parquet("sales.parquet")

    공부 @Github

     

    Alfex4936/Spark-Studies

    Apache Spark 공부 in Python. Contribute to Alfex4936/Spark-Studies development by creating an account on GitHub.

    github.com

     

    728x90

    댓글