ABOUT ME

-

Total
-
  • Python: Apache Spark 공부 예제 (pyspark)
    컴퓨터/파이썬 2021. 2. 2. 23:45
    728x90
    반응형

    Apache Spark

    v3.0.1 기준

     

    Apache Spark™ - Unified Analytics Engine for Big Data

    Ease of Use Write applications quickly in Java, Scala, Python, R, and SQL. Spark offers over 80 high-level operators that make it easy to build parallel apps. And you can use it interactively from the Scala, Python, R, and SQL shells.

    spark.apache.org

    txt나 csv 불러오기

    from pyspark.sql import SparkSession
    
    # Spark v3.0.1
    spark = SparkSession.builder.master("local").appName("SparkSQL").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    
    # CSV
    lines = spark.read.option("header", True).csv("./data/customer_orders.csv")
    
    # TXT
    lines = spark.sparkContext.textFile("./data/fake_person.txt")
    
    # TXT with schema
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    
    
    schema = StructType(
        [StructField("id", IntegerType(), True), StructField("name", StringType(), True),]
    )
    
    names = spark.read.option("sep", " ").schema(schema).csv("./data/Marvel-names.txt")

     

    책 txt 읽고 가장 많이 나온 단어 확인하기

    regex

    import re as regex
    
    from pyspark.sql import SparkSession
    
    
    def normalizeWords(text):
        return regex.compile(r"\W+", regex.UNICODE).split(text)
    
    
    # Spark v3.0.1
    spark = SparkSession.builder.master("local").appName("WordCounter").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    
    lines = spark.sparkContext.textFile("./data/DaddyLongLeg_book_korean.txt")
    words = lines.flatMap(normalizeWords)
    wordCounts = words.countByValue()
    
    # mostFreqWord = max(wordCounts, key=lambda x: wordCounts[x])
    # print(f"{mostFreqWord}: {wordCounts[mostFreqWord]} times")
    
    for word, count in sorted(wordCounts.items(), key=lambda x: x[1]):
        cleanWord = word.encode("utf-8", "ignore")
        if count > 10:
            print(cleanWord.decode("utf-8"), count)
    
    spark.stop()
    
    """ 결과
    
    ...
    한 293
    보냄 301
    총 305
    이 317
    있는 320
    제가 341
    
    """

     

    SQL 데이터 프레임 df 이용

    from pyspark.sql import SparkSession
    
    # Spark v3.0.1
    spark = SparkSession.builder.master("local").appName("SparkSQL").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    
    people = (
        spark.read.option("header", True)
        .option("inferSchema", True)
        .csv("./data/fakefriends-header.csv")
    )
    
    print("Inferred schema:")
    people.printSchema()
    
    print("Name column:")
    people.select("name").show()
    
    print("Filter age < 21:")
    people.filter(people.age < 21).show()
    
    print("Group by age:")
    people.groupBy("age").count().show()
    
    print("Make everyone + 10y older:")
    people.select(people.name, people.age + 10).show()
    
    """ 결과
    
    ...
    Filter age < 21:
    +------+-------+---+-------+
    |userID|   name|age|friends|
    +------+-------+---+-------+
    |    21|  Miles| 19|    268|
    ...
    
    """

     

    SQL 명령어 사용

    from pyspark.sql import Row, SparkSession
    
    
    def parseLine(line):
        return Row(
            id=int(line[0]),
            name=str(line[1].encode("utf-8")),
            age=int(line[2]),
            numFriends=int(line[3]),
        )
    
    
    # Spark v3.0.1
    spark = SparkSession.builder.master("local").appName("SparkSQL").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    
    lines = spark.read.option("header", True).csv("./data/fakefriends-header.csv")
    people = lines.rdd.map(parseLine)
    
    schemaPeople = spark.createDataFrame(people).cache()
    schemaPeople.createOrReplaceTempView("people")  # db name
    
    # Option 1
    teens = spark.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19")
    for teen in teens.collect():
        print(teen.name, teen.age, teen.numFriends)
    
    # Option 2
    schemaPeople.groupBy("age").count().orderBy("age").show()
    
    spark.stop()
    
    """ 결과
    
    b'Miles' 19 268
    b'Beverly' 19 269
    b'Brunt' 19 5
    ...
    
    +---+-----+
    |age|count|
    +---+-----+
    | 18|    8|
    ...
    
    """

     

    schema 스키마 지정

    from pyspark.sql import SparkSession
    from pyspark.sql.types import (
        StructType,
        StructField,
        StringType,
        IntegerType,
        FloatType,
    )
    
    
    # Spark v3.0.1
    spark = SparkSession.builder.master("local").appName("MinTemp").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    
    schema = StructType(
        [
            StructField("stationID", StringType(), True),  # True for allowing no value
            StructField("date", IntegerType(), True),
            StructField("measure_type", StringType(), True),
            StructField("temperature", FloatType(), True),
        ]
    )
    
    df = spark.read.schema(schema).csv("./data/1800_weather.csv")
    df.printSchema()
    

     

    Sale

    # Load the data into a Spark DataFrame
    df = spark.read.csv("sales_data.csv", header=True)
    
    # Convert the data type of a column to a different type
    df = df.withColumn("total_sales", df["total_sales"].cast("double"))
    
    # Create a new column based on the values of other columns
    df = df.withColumn("profit", df["total_sales"] - df["cost"])
    
    # Filter rows based on a complex condition
    filtered_df = df.filter((df.region == "North America") & (df.profit > 1000))
    
    # Group the data by multiple columns and calculate the sum of a column
    grouped_df = df.groupBy("region", "product_type").sum("total_sales")
    
    # Sort the data by multiple columns in descending order
    sorted_df = df.orderBy(df.region.desc(), df.total_sales.asc())
    
    # Save the DataFrame to a Parquet file
    df.write.parquet("sales.parquet")

    공부 @Github

     

    Alfex4936/Spark-Studies

    Apache Spark 공부 in Python. Contribute to Alfex4936/Spark-Studies development by creating an account on GitHub.

    github.com

     

    728x90

    댓글