ABOUT ME

-

Total
-
  • Java: spring boot AWS opensearch 연동하기
    컴퓨터/JAVA 2023. 10. 9. 18:38
    728x90
    반응형

    @코드로 참고하기

     

    제품 이름 검색

     

    우선 AWS 보안 자격 증명에서 IAM을 하나 만든다.

     

    https://us-east-1.console.aws.amazon.com/iamv2/home#/security_credentials

     

    us-east-1.console.aws.amazon.com

     

    dependencies

    spring boot v3.1 + gradle 기준; 버전이 까다로울 수 있다.

    각각 패키지를 검색하면 elastic은 무슨 버전까지 지원하고 그런 표가 다 있으니 확인하면 편한다.

    spring-data-opensearch

    dependencies {
    ...
        implementation group: 'org.springframework.cloud', name: 'spring-cloud-starter-aws', version: '2.2.6.RELEASE'
    
        // AWS
        implementation 'org.opensearch.client:spring-data-opensearch-starter:1.2.0'
        implementation 'org.opensearch.client:spring-data-opensearch:1.2.0'
        implementation 'org.opensearch.client:opensearch-java:2.6.0'
    }

     

    Main class

    bean이 겹쳐서 실행이 안되니까 꼭 두 클래스들을 exclude 해준다.

    @EnableScheduling
    @EnableMongoAuditing
    @SpringBootApplication(exclude = {ElasticsearchDataAutoConfiguration.class, ElasticsearchRestClientAutoConfiguration.class})
    public class Application {
        public static void main(String[] args) {
            SpringApplication.run(BeautyMinderApplication.class, args);
        }
    }

     

    Config

    opensearch 공식 예제에서 가져와서 IAM으로 바꾼 코드이다.

     

    AWSCredentialsConfig

    import com.amazonaws.auth.AWSCredentialsProvider;
    import com.amazonaws.auth.AWSStaticCredentialsProvider;
    import com.amazonaws.auth.BasicAWSCredentials;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Slf4j
    @Configuration
    public class AWSCredentialsConfig {
    
        @Value("${cloud.aws.credentials.access-key}")
        private String accessKey;
    
        @Value("${cloud.aws.credentials.secret-key}")
        private String secretKey;
    
    
        @Bean
        public AWSCredentialsProvider customCredentialsProvider() {
            log.info("Using AWS credentials for access.");
            return new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey));
    //        return new AWSStaticCredentialsProvider(new BasicAWSCredentials(username, password));
        }
    }

     

    AWSRequestSigningApacheInterceptor

    AWS(Amazon Web Services)의 서비스에 요청을 보내기 전에 그 요청을 서명하는 Java 클래스

    
    import com.amazonaws.DefaultRequest;
    import com.amazonaws.auth.AWSCredentialsProvider;
    import com.amazonaws.auth.Signer;
    import com.amazonaws.http.HttpMethodName;
    import org.apache.http.*;
    import org.apache.http.client.utils.URIBuilder;
    import org.apache.http.entity.BasicHttpEntity;
    import org.apache.http.message.BasicHeader;
    import org.apache.http.protocol.HttpContext;
    
    import java.io.ByteArrayInputStream;
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.TreeMap;
    
    import static org.apache.http.protocol.HttpCoreContext.HTTP_TARGET_HOST;
    
    public class AWSRequestSigningApacheInterceptor implements HttpRequestInterceptor {
        /**
         * The service that we're connecting to. Technically not necessary.
         * Could be used by a future Signer, though.
         */
        private final String service;
    
        /**
         * The particular signer implementation.
         */
        private final Signer signer;
    
        /**
         * The source of AWS credentials for signing.
         */
        private final AWSCredentialsProvider awsCredentialsProvider;
    
        /**
         * @param service                service that we're connecting to
         * @param signer                 particular signer implementation
         * @param awsCredentialsProvider source of AWS credentials for signing
         */
        public AWSRequestSigningApacheInterceptor(final String service,
                                                  final Signer signer,
                                                  final AWSCredentialsProvider awsCredentialsProvider) {
            this.service = service;
            this.signer = signer;
            this.awsCredentialsProvider = awsCredentialsProvider;
        }
    
        /**
         * {@inheritDoc}
         */
        @Override
        public void process(final HttpRequest request, final HttpContext context)
                throws IOException {
            URIBuilder uriBuilder;
            try {
                uriBuilder = new URIBuilder(request.getRequestLine().getUri());
            } catch (URISyntaxException e) {
                throw new IOException("Invalid URI", e);
            }
    
            // Copy Apache HttpRequest to AWS DefaultRequest
            DefaultRequest<?> signableRequest = new DefaultRequest<>(service);
    
            HttpHost host = (HttpHost) context.getAttribute(HTTP_TARGET_HOST);
            if (host != null) {
                signableRequest.setEndpoint(URI.create(host.toURI()));
            }
            final HttpMethodName httpMethod =
                    HttpMethodName.fromValue(request.getRequestLine().getMethod());
            signableRequest.setHttpMethod(httpMethod);
            try {
                signableRequest.setResourcePath(uriBuilder.build().getRawPath());
            } catch (URISyntaxException e) {
                throw new IOException("Invalid URI", e);
            }
    
            if (request instanceof HttpEntityEnclosingRequest httpEntityEnclosingRequest) {
                if (httpEntityEnclosingRequest.getEntity() == null) {
                    signableRequest.setContent(new ByteArrayInputStream(new byte[0]));
                } else {
                    signableRequest.setContent(httpEntityEnclosingRequest.getEntity().getContent());
                }
            }
            signableRequest.setParameters(nvpToMapParams(uriBuilder.getQueryParams()));
            signableRequest.setHeaders(headerArrayToMap(request.getAllHeaders()));
    
            // Sign it
            signer.sign(signableRequest, awsCredentialsProvider.getCredentials());
    
            // Now copy everything back
            request.setHeaders(mapToHeaderArray(signableRequest.getHeaders()));
            if (request instanceof HttpEntityEnclosingRequest httpEntityEnclosingRequest) {
                if (httpEntityEnclosingRequest.getEntity() != null) {
                    BasicHttpEntity basicHttpEntity = new BasicHttpEntity();
                    basicHttpEntity.setContent(signableRequest.getContent());
                    httpEntityEnclosingRequest.setEntity(basicHttpEntity);
                }
            }
        }
    
        /**
         * @param params list of HTTP query params as NameValuePairs
         * @return a multimap of HTTP query params
         */
        private static Map<String, List<String>> nvpToMapParams(final List<NameValuePair> params) {
            Map<String, List<String>> parameterMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
            for (NameValuePair nvp : params) {
                List<String> argsList =
                        parameterMap.computeIfAbsent(nvp.getName(), k -> new ArrayList<>());
                argsList.add(nvp.getValue());
            }
            return parameterMap;
        }
    
        /**
         * @param headers modeled Header objects
         * @return a Map of header entries
         */
        private static Map<String, String> headerArrayToMap(final Header[] headers) {
            Map<String, String> headersMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
            for (Header header : headers) {
                if (!skipHeader(header)) {
                    headersMap.put(header.getName(), header.getValue());
                }
            }
            return headersMap;
        }
    
        /**
         * @param header header line to check
         * @return true if the given header should be excluded when signing
         */
        private static boolean skipHeader(final Header header) {
            return ("content-length".equalsIgnoreCase(header.getName())
                    && "0".equals(header.getValue())) // Strip Content-Length: 0
                    || "host".equalsIgnoreCase(header.getName()); // Host comes from endpoint
        }
    
        /**
         * @param mapHeaders Map of header entries
         * @return modeled Header objects
         */
        private static Header[] mapToHeaderArray(final Map<String, String> mapHeaders) {
            Header[] headers = new Header[mapHeaders.size()];
            int i = 0;
            for (Map.Entry<String, String> headerEntry : mapHeaders.entrySet()) {
                headers[i++] = new BasicHeader(headerEntry.getKey(), headerEntry.getValue());
            }
            return headers;
        }
    }

    OpenSearchRestClientConfiguration

    import com.amazonaws.auth.AWS4Signer;
    import com.amazonaws.auth.AWSCredentialsProvider;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.http.HttpRequestInterceptor;
    import org.opensearch.client.RestHighLevelClient;
    import org.opensearch.data.client.orhlc.AbstractOpenSearchConfiguration;
    import org.opensearch.data.client.orhlc.ClientConfiguration;
    import org.opensearch.data.client.orhlc.RestClients;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
    
    import java.time.Duration;
    
    @Configuration
    @EnableElasticsearchRepositories(basePackages = "app.repository") // repo 경로로 바꿔주기
    @Slf4j
    public class OpenSearchRestClientConfiguration extends AbstractOpenSearchConfiguration {
    
        @Value("${aws.os.endpoint}")
        private String endpoint;
    
        @Value("${aws.os.region}")
        private String region;
    
        private AWSCredentialsProvider credentialsProvider = null;
    
        @Autowired
        public OpenSearchRestClientConfiguration(AWSCredentialsProvider provider) {
            credentialsProvider = provider;
        }
    
        /**
         * SpringDataOpenSearch data provides us the flexibility to implement our custom {@link RestHighLevelClient} instance by implementing the abstract method {@link AbstractOpenSearchConfiguration#opensearchClient()},
         *
         * @return RestHighLevelClient. Amazon OpenSearch Service Https rest calls have to be signed with AWS credentials, hence an interceptor {@link HttpRequestInterceptor} is required to sign every
         * API calls with credentials. The signing is happening through the below snippet
         * <code>
         * signer.sign(signableRequest, awsCredentialsProvider.getCredentials());
         * </code>
         */
    
        @Override
        @Bean
        public RestHighLevelClient opensearchClient() {
        // 이 client는 single-node에 추천합니다. sniffer 거의 X
            AWS4Signer signer = new AWS4Signer();
            String serviceName = "es";
            signer.setServiceName(serviceName);
            signer.setRegionName(region);
            HttpRequestInterceptor interceptor = new AWSRequestSigningApacheInterceptor(serviceName, signer, credentialsProvider);
    
            RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(endpoint, 443, "https"))
                    .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.addInterceptorLast(interceptor))
                    .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
                            .setConnectTimeout((int) Duration.ofSeconds(300).toMillis())
                            .setSocketTimeout((int) Duration.ofSeconds(150).toMillis()));
    
            SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener();
            restClientBuilder.setFailureListener(sniffOnFailureListener);
    
            RestHighLevelClient client = new RestHighLevelClient(restClientBuilder);
    
            Sniffer esSniffer = Sniffer.builder(client.getLowLevelClient())
                    .setSniffIntervalMillis(999999999) // 11.574일 interval
                    .setSniffAfterFailureDelayMillis(999999999)
                    .build();
    
            sniffOnFailureListener.setSniffer(esSniffer);
    
            return client;
    //        return RestClients.create(clientConfiguration).rest();
        }
    }

     

    application.yml

    두 개 합쳐서 적든 어쨌든 설정을 아래처럼 해서 숨겨준다.

    cloud:
      aws:
        credentials:
          access-key: "A"
          secret-key: "o"
          
    aws:
      os:
        region: "ap-northeast-2"
        endpoint: "search-엔드포인트-주소.ap-northeast-2.es.amazonaws.com:443" # 포트 443이다. HTTPS

     

     

    예제 Document

    * mapping.json이나 settings.json은 /src/resources에 넣어준다. (위 예제 elastic/ 폴더 = src/resources/elastic/)

    import lombok.*;
    import org.springframework.data.annotation.Id;
    import org.springframework.data.elasticsearch.annotations.*;
    
    import java.util.List;
    
    @Data
    @Builder
    @AllArgsConstructor
    @NoArgsConstructor
    @Setter
    @Getter
    @Document(indexName = "cosmetics")
    @Mapping(mappingPath = "elastic/cosmetic-mapping.json")
    @Setting(settingPath = "elastic/cosmetic-setting.json")
    public class EsCosmetic {
        @Id
        private String id;
    
        @Field(type = FieldType.Text)
        private String name;
    
        @Field(type = FieldType.Keyword)
        private Cosmetic.Category category;
    
        @Field(type = FieldType.Keyword)
        private List<String> keywords;
    }

     

    예제 Repository

    mongodb 에 있는 데이터들을 15분마다 전체 인덱싱해서 Cosmetic (몽고)과 EsCosmetic(엘라스틱)은 다르다.

    import app.domain.Cosmetic;
    import app.domain.EsCosmetic;
    import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
    import org.springframework.stereotype.Repository;
    
    import java.util.List;
    
    
    @Repository
    public interface EsCosmeticRepository extends ElasticsearchRepository<EsCosmetic, String> {
        List<EsCosmetic> findByNameContaining(String name);
        List<EsCosmetic> findByCategory(Cosmetic.Category category);
        List<EsCosmetic> findByKeywordsContains(String keyword);
    }

     

    예제 mapping.json

    Opensearch는 은전한닢이 깔려있다.

    {
      "properties": {
        "name": {
          "type": "text",
          "analyzer": "seunjeon",
          "fields": {
            "keyword": {
              "type": "keyword"
            }
          }
        },
        "brand": {
          "type": "text",
          "analyzer": "seunjeon"
        },
        "category": {
          "type": "keyword"
        },
        "keywords": {
          "type": "keyword"
        }
      }
    }

    예제 setting.json

    root key 이름을 잘 보면 바로 analysis로 시작한다. (스프링이 알아서 settings.index 까지 간다)

    {
      "analysis": {
        "filter": {
          "synonym": {
            "type": "synonym",
            "synonyms": [
              "근디 => 근데",
              "조아요 => 좋아요",
              "아파욬 => 아파요",
              "ㅋㅋㅋ => 웃음",
              "ㅠㅠ, ㅜㅜ => 슬픔",
              "피붕, 피부 붕 => 피부 붕뎀",
              "무겁다, 무거워 => 무거움",
              "가렵다, 가려워 => 가려움",
              "건조하다, 건조해 => 건조함",
              "유분, 기름 => 유분기",
              "뽀송뽀송, 뽀송 => 부드러움",
              "촉촉하다, 촉촉해 => 촉촉함",
              "향기, 향 => 향료",
              "민감하다, 민감해 => 민감함",
              "산뜻하다, 산뜻해 => 산뜻함",
              "알러지, 알레르기 => 알러지 반응",
              "화해, 화해성분 => 화학 성분",
              "자연성분, 자연 => 자연적 성분",
              "백탁, 백탁현상 => 백탁 현상"
            ]
          },
          "korean_stop": {
            "type": "stop",
            "stopwords": ["이", "그", "저", "것"]
          },
          "lowercase": {
            "type": "lowercase"
          }
        },
        "tokenizer": {
          "seunjeon_tokenizer": {
            "type": "seunjeon_tokenizer",
            "index_eojeol": false,
            "decompound": true,
            "pos_tagging": false,
            "index_poses": [
              "UNK",
              "EP",
              "I",
              "M",
              "N",
              "SL",
              "SH",
              "SN",
              "V",
              "VCP",
              "XP",
              "XS",
              "XR"
            ]
          }
        },
        "analyzer": {
          "seunjeon": {
            "type": "custom",
            "tokenizer": "seunjeon_tokenizer"
          },
          "seunjeon_search": {
            "tokenizer": "seunjeon_tokenizer",
            "filter": [
              "synonym",
              "lowercase",
              "korean_stop"
            ]
          },
          "keyword": {
            "tokenizer": "keyword",
            "filter": [
              "lowercase"
            ]
          }
        }
      }
    }
    728x90

    댓글