Database Id config when Loading data to datastore using apache beam

21 views Asked by At

I am using apache beam to load data to datastore. I am using the below imports but i cannot find a way to specify the database ID of the datastore.

I am able to do this using python gcloud-datastore sdk , but when it comes to apache beam, i am not able to spcify the database ID.

import apache_beam as beam
from apache_beam.io.gcp.datastore.v1new.datastoreio import WriteToDatastore
from apache_beam.io.gcp.datastore.v1new.types import Entity,Key
import random



# Define your pipeline options
pipeline_options = beam.options.pipeline_options.PipelineOptions(
         project='<>',  # Specify your project ID
    region='<>'
)

# List of columns to be indexed
key_tuple = ('name')


class PrintElement(beam.DoFn):
    def process(self, element):
        print(element)
        yield element

class CreateEntity(beam.DoFn):
    def process(self,element):
    # python sdk offers database param  
    # client = datastore.Client(project='<>',database='<>')
        
        
        entity = Entity(key=Key(path_elements=['Kind_name',random.randint(1, 1000000)],project='<project>'),exclude_from_indexes=key_tuple)

        entity.set_properties(element)
        yield entity

# Create a pipeline
with beam.Pipeline(options=pipeline_options) as pipeline:
    # Read data from BigQuery
    data_from_bq = (pipeline
                    | 'Read from BigQuery' >> beam.io.ReadFromBigQuery(

                        query = 'select * from table',
                        use_standard_sql=True,
                        gcs_location="gs://<bucket name>/<folder name>/")
                    )


 
    entities = (data_from_bq
                | 'Prepare Data for Datastore' >> beam.ParDo(CreateEntity())
                )

    entities |  "Print elements" >> beam.ParDo(PrintElement())

    # Write to Datastore
    entities | 'Write to Datastore' >> WriteToDatastore(
        project='gcp-project'
    )

I am able to read from bigquery and print but i am not facing any errors and the application gets completed. 1.) How can i specify the database ID when writing to datastore ?

0

There are 0 answers