Query Service (Data Distiller) & Export datasets
This article outlines how the combination of Experience Platform Query Service (Data Distiller) and Dataset export can be used to implement the following data export use cases:
- Data Validation
- Data Lake, Data Warehouse of BI tools
- Readiness for Artificial Intelligent and Machine Learning.
ÃÛ¶¹ÊÓƵ Analytics can implement these use cases using its Data Feeds functionality. Data feeds are a powerful way to get raw data out of ÃÛ¶¹ÊÓƵ Analytics. This article describes how to get similar type of raw data out of Experience Platform, so you can implement the above mentioned use cases. Where applicable the functionalities described in this article are compared with ÃÛ¶¹ÊÓƵ Analytics Data Feeds to clarify differences in data and process.
Introduction
Exporting data using Query Service (Data Distiller) and Dataset export consists of:
- defining a scheduled query that generates the data for your data feed as an output dataset , using Query Service.
- defining a scheduled dataset export that exports the output dataset to a cloud storage destination, using Dataset export.
Prerequisites
Make sure that you meet all the following requirements before using the functionality described in this use case:
- A working implementation that collects data into Experience Platform’s data lake.
- Access to the Data Distiller add-on to ensure you are entitled to execute batch queries. See Query Service packaging for more information.
- Access to Export datasets functionality, available when you have purchased the Real-Time CDP Prime or Ultimate package, ÃÛ¶¹ÊÓƵ Journey Optimizer, or Customer Journey Analytics. See Export datasets to cloud storage destinations for more information.
- One or more configured destinations (for example: Amazon S3, Google Cloud Storage) to where you can export the raw data of your data feed.
Query service
Experience Platform Query Service allows you to query and join any dataset in the Experience Platform data lake as if it is a database table. You then can capture the results as a new dataset for further use in reporting or for export.
You can use the Query Service user interface, a client connected through the PostgresQL protocol, or RESTful APIs to create and schedule queries that collect the data for your data feed.
Create Query
You can use all the functionality of standard ANSI SQL for SELECT statements and other limited commands to create and execute queries that generate the data for your data feed. See SQL syntax for more information. Beyond this SQL syntax, ÃÛ¶¹ÊÓƵ supports:
- prebuilt ÃÛ¶¹ÊÓƵ-defined functions (ADF) that help perform common business-related tasks on event data stored in the Experience Platform data lake, including functions for Sessionization and Attribution,
- several built-in Spark SQL functions,
- metadata PostgreSQL commands,
- prepared statements.
Data feed columns
The XDM fields that you can use in your query depend on the schema definition on which your datasets are based. Ensure you do understand the schema underlying the dataset. See for more information the Datasets UI guide.
To help you to define the mapping between the Data Feed columns and XDM fields, see Analytics field mapping. See also the Schemas UI overview for more information on how to manage XDM resources, including schemas, classes, field groups, and data types.
For example, in case you want to use page name as part of your data feed:
- In ÃÛ¶¹ÊÓƵ Analytics Data Feed’s UI, you would select pagename as the column to add to your data feed definition.
- In Query Service, you include
web.webPageDetails.name
from thesample_event_dataset_for_website_global_v1_1
dataset (based on the Sample Event Schema for Website (Global v1.1) experience event schema) in your query. See the Web Details schema field group for more information.
Identities
In Experience Platform, various identities are available. When creating your queries, ensure you are querying identities correctly.
Often you find identities in a separate field group. In an implementation ECID (ecid
) can be defined as part of a field group with a core
object, which itself is part of an identification
object (for example: _sampleorg.identification.core.ecid
). The ECIDs might be organized differently in your schemas.
Alternatively, you can use identityMap
to query for identities. The identityMap
is of type Map
and uses a nested data structure.
See Define identity fields in the UI for more information on how to define identity fields in Experience Platform.
Refer to Primary identifiers in Analytics data for an understanding how ÃÛ¶¹ÊÓƵ Analytics identities are mapped to Experience Platform identities when using the Analytics source connector. This mapping might serve as guidance for setting up your identities, even when not using the Analytics source connector.
Hit level data and identification
Based on the implementation, hit level data traditionally collected in ÃÛ¶¹ÊÓƵ Analytics is now stored as timestamped event data in Experience Platform. The following table is extracted from Analytics field mapping and shows examples how to map hit level-specific ÃÛ¶¹ÊÓƵ Analytics Data Feed columns with corresponding XDM fields in your queries. The table also shows examples of how hits, visits, and visitors are identified using XDM fields.
hitid_high
+ hitid_low
_id
hitid_low
_id
hitid_high
to identify a hit uniquely.hitid_high
_id
hitid_high
to identify a hit uniquely.hit_time_gmt
receivedTimestamp
cust_hit_time_gmt
timestamp
visid_high
+ visid_low
identityMap
visid_high
+ visid_low
endUserIDs._experience.aaid.id
visid_high
endUserIDs._experience.aaid.primary
visid_low
to identify a visit uniquely.visid_high
endUserIDs._experience.aaid.namespace.code
visid_low
to identify a visit uniquely.visid_low
identityMap
visid_high
to identify a visit uniquely.cust_visid
identityMap
cust_visid
endUserIDs._experience.aacustomid.id
cust_visid
endUserIDs._experience.aacustomid.primary
cust_visid
endUserIDs._experience.aacustomid.namespace.code
visid_low
to identify the customer visitor id uniquely.geo\_*
placeContext.geo.*
event_list
commerce.purchases
, commerce.productViews
, commerce.productListOpens
, commerce.checkouts
, commerce.productListAdds
, commerce.productListRemovals
, commerce.productListViews
, _experience.analytics.event101to200.*
, …, _experience.analytics.event901_1000.*
page_event
web.webInteraction.type
page_event
web.webInteraction.linkClicks.value
page_event_var_1
web.webInteraction.URL
page_event_var_2
web.webInteraction.name
paid_search
search.isPaid
ref_type
web.webReferrertype
Post columns
ÃÛ¶¹ÊÓƵ Analytics Data Feeds use the concept of columns with a post_
prefix, which are columns containing data after processing. See Data feeds FAQ for more information.
Data collected in datasets through the Experience Platform Edge Network (Web SDK, Mobile SDK, Server API) has no concept of post_
fields. As a result, post_
prefixed and non-post_
prefixed data feed columns map to the same XDM fields. For example, both page_url
and post_page_url
data feed columns map to the same web.webPageDetails.URL
XDM field.
See Compare data processing across ÃÛ¶¹ÊÓƵ Analytics and Customer Journey Analytics for an overview of the difference in processing of data.
The post_
prefix column type of data, when collected in Experience Platform data lake, does however require advanced transformations before it can successfully be used in a data feed use case. Performing these advanced transformations in your queries involves the use of ÃÛ¶¹ÊÓƵ-defined functions for sessionization, attribution, and deduplication. See Examples on how to use these functions.
Lookups
To look up data from other datasets, you use standard SQL functionality (WHERE
clause, INNER JOIN
, OUTER JOIN
, and others).
Calculations
To perform calculations on fields (columns), use the standard SQL functions (for example COUNT(*)
), or the math and statistical operators and functions part of Spark SQL. Also, window functions provide support to update aggregations and return single items for each row in an ordered subset. See Examples on how to use these functions.
Nested data structure
The schemas on which the datasets are based often contain complex data types, including nested data structures. Previously mentioned identityMap
is an example of a nested data structure. See below for an example of identityMap
data.
{
"identityMap":{
"FPID":[
{
"id":"55613368189701342632255821452918751312",
"authenticatedState":"ambiguous"
}
],
"CRM":[
{
"id":"2394509340-30453470347",
"authenticatedState":"authenticated"
}
]
}
}
You can use the explode()
or other Arrays functions from Spark SQL to get to the data inside a nested data structure, for example:
select explode(identityMap) from demosys_cja_ee_v1_website_global_v1_1 limit 15;
Alternatively you can refer to individual elements using dot notation. For example:
select identityMap.ecid from demosys_cja_ee_v1_website_global_v1_1 limit 15;
See Working with nested data structures in Query Service for more information.
Examples
For queries:
- that use data from datasets in the Experience Platform data lake,
- are tapping on the additional capabilities of ÃÛ¶¹ÊÓƵ Defined Functions and/or Spark SQL, and
- which would deliver similar results to an equivalent ÃÛ¶¹ÊÓƵ Analytics data feed,
see:
Below is an example to properly apply attribution across sessions and that illustrates how to
-
use the last 90 days as a lookback,
-
apply window functions like sessionization and / or attribution, and
-
restrict the output based on the
ingest_time
.+++
DetailsTo do this, you have to…
- Use a processing status table,
checkpoint_log
, to keep track of the current versus the last ingest time. See this guide for more information. - disable dropping system columns, so you can use
_acp_system_metadata.ingestTime
. - Use an inner most
SELECT
to grab the fields you want to use and restrict the events to your lookback period for sessionization and / or attribution calculations. For example, 90 days. - Use a next level
SELECT
to apply you sessionization and / or attribution window functions and other calculations. - Use
INSERT INTO
in your output table to restrict te lookback to just the events that have arrived since your last processing time. You do this by filtering on_acp_system_metadata.ingestTime
versus the time last stored in your processing status table.
Sessionization window functions example
code language-sql $$ BEGIN -- Disable dropping system columns set drop_system_columns=false; -- Initialize variables SET @last_updated_timestamp = SELECT CURRENT_TIMESTAMP; -- Get the last processed batch ingestion time SET @from_batch_ingestion_time = SELECT coalesce(last_batch_ingestion_time, 'HEAD') FROM checkpoint_log a JOIN ( SELECT MAX(process_timestamp) AS process_timestamp FROM checkpoint_log WHERE process_name = 'data_feed' AND process_status = 'SUCCESSFUL' ) b ON a.process_timestamp = b.process_timestamp; -- Get the last batch ingestion time SET @to_batch_ingestion_time = SELECT MAX(_acp_system_metadata.ingestTime) FROM events_dataset; -- Sessionize the data and insert into data_feed. INSERT INTO data_feed SELECT * FROM ( SELECT userIdentity, timestamp, SESS_TIMEOUT(timestamp, 60 * 30) OVER ( PARTITION BY userIdentity ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS session_data, page_name, ingest_time FROM ( SELECT userIdentity, timestamp, web.webPageDetails.name AS page_name, _acp_system_metadata.ingestTime AS ingest_time FROM events_dataset WHERE timestamp >= current_date - 90 ) AS a ORDER BY userIdentity, timestamp ASC ) AS b WHERE b.ingest_time >= @from_batch_ingestion_time; -- Update the checkpoint_log table INSERT INTO checkpoint_log SELECT 'data_feed' process_name, 'SUCCESSFUL' process_status, cast(@to_batch_ingestion_time AS string) last_batch_ingestion_time, cast(@last_updated_timestamp AS TIMESTAMP) process_timestamp END $$;
Attribution window functions example
code language-sql $$ BEGIN SET drop_system_columns=false; -- Initialize variables SET @last_updated_timestamp = SELECT CURRENT_TIMESTAMP; -- Get the last processed batch ingestion time 1718755872325 SET @from_batch_ingestion_time = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN ( SELECT MAX(process_timestamp) AS process_timestamp FROM checkpoint_log WHERE process_name = 'data_feed' AND process_status = 'SUCCESSFUL' ) b ON a.process_timestamp = b.process_timestamp; -- Get the last batch ingestion time 1718758687865 SET @to_batch_ingestion_time = SELECT MAX(_acp_system_metadata.ingestTime) FROM demo_data_trey_mcintyre_midvalues; -- Sessionize the data and insert into new_sessionized_data INSERT INTO new_sessionized_data SELECT * FROM ( SELECT _id, timestamp, struct(User_Identity, cast(SESS_TIMEOUT(timestamp, 60 * 30) OVER ( PARTITION BY User_Identity ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as string) AS SessionData, to_timestamp(from_unixtime(ingest_time/1000, 'yyyy-MM-dd HH:mm:ss')) AS IngestTime, PageName, first_url, first_channel_type ) as _demosystem5 FROM ( SELECT _id, ENDUSERIDS._EXPERIENCE.MCID.ID as User_Identity, timestamp, web.webPageDetails.name AS PageName, attribution_first_touch(timestamp, '', web.webReferrer.url) OVER (PARTITION BY ENDUSERIDS._EXPERIENCE.MCID.ID ORDER BY timestamp ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING).value AS first_url, attribution_first_touch(timestamp, '',channel.typeAtSource) OVER (PARTITION BY ENDUSERIDS._EXPERIENCE.MCID.ID ORDER BY timestamp ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING).value AS first_channel_type, _acp_system_metadata.ingestTime AS ingest_time FROM demo_data_trey_mcintyre_midvalues WHERE timestamp >= current_date - 90 ) ORDER BY User_Identity, timestamp ASC ) WHERE _demosystem5.IngestTime >= to_timestamp(from_unixtime(@from_batch_ingestion_time/1000, 'yyyy-MM-dd HH:mm:ss')); -- Update the checkpoint_log table INSERT INTO checkpoint_log SELECT 'data_feed' as process_name, 'SUCCESSFUL' as process_status, cast(@to_batch_ingestion_time AS string) as last_snapshot_id, cast(@last_updated_timestamp AS timestamp) as process_timestamp; END $$;
+++
- Use a processing status table,
Schedule Query
You schedule the query to ensure that the query is executed and that the results are generated at your preferred interval.
Using Query Editor
You can schedule a query using the Query Editor. When scheduling the query, you define an output dataset. See Query schedules for more information.
Using Query Service API
Alternatively you can use the RESTful APIs to define a query and schedule for the query. See the Query Service API guide for more information.
Ensure you define the output dataset as part of the optional ctasParameters
property when creating the query () or when creating the schedule for a query ().
Export datasets
Once you have created and scheduled your query, and verified the results, you can then export the raw datasets to cloud storage destinations. This export is in Experience Platform Destinations terminology referred to as Dataset export destinations. See Export datasets to cloud storage destinations for an overview.
The following cloud storage destinations are supported:
Experience Platform UI
You can export and schedule the export of your output datasets through the Experience Platform UI. This section describes the steps involved.
Select destination
When you have determined which cloud storage destination you want to export the output dataset to, select the destination. When you have not yet configured a destination for your preferred cloud storage, you must create a new destination connection.
As part of configuring a destination, you can
- define the file type (JSON or Parquet),
- whether the resulting file should be compressed or not, and
- whether a manifest file should be included or not.
Select dataset
When you have selected the destination, in the next Select datasets step you have to select your output dataset from the list of datasets. If you have create multiple scheduled queries, and you want the output datasets to send to the same cloud storage destination, you can select the corresponding output datasets. See Select your datasets for more information.
Schedule dataset export
Finally, you want to schedule your dataset export as part of the Scheduling step. In that step you can define the schedule and whether the output dataset export should be incremental or not. See Schedule dataset export for more information.
Final steps
Review your selection, and when correct, start exporting your output dataset to the cloud storage destination.
You must verify a successful data export. When exporting datasets, Experience Platform creates one or multiple .json
or .parquet
files in the storage location defined in your destination. Expect new files to be deposited in your storage location according to the export schedule you set up. Experience Platform creates a folder structure in the storage location that you specified as part of the selected destination, where it deposits the exported files. A new folder is created for each export time, following the pattern: folder-name-you-provided/datasetID/exportTime=YYYYMMDDHHMM
. The default file name is randomly generated and ensures that exported file names are unique.
Flow Service API
Alternatively, you can export and schedule the export of output datasets using APIs. The steps involved are documented in Export datasets by using the Flow Service API.
Get started
To export datasets, ensure you have the required permissions. Also verify that the destination to where you want to send your output dataset supports exporting datasets. You then must gather the values for required and optional headers that you use in the API calls. You also need to identify the connection spec and flow spec IDs of the destination you are intending to export datasets to.
Retrieve eligible datasets
You can retrieve a list of eligible datasets for export and verify whether your output dataset is part of that list using the API.
Create source connection
Next you must create a source connection for the output dataset, using its unique ID, that you want to export to the cloud storage destination. You use the API.
Authenticate to destination (create base connection)
You now must create a base connection to authenticate and securely store the credentials to your cloud storage destination using the API.
Provide export parameters
Next, you must create an additional target connection that stores the export parameters for your output dataset using, once more, the API. These export parameters include location, file format, compression, and more.
Set up dataflow
Finally, you set up the dataflow to ensure that your output dataset is exported to your cloud storage destination using the API. In this step, you can define the schedule for the export, using the scheduleParams
parameter.
Validate dataflow
To check successful executions of your dataflow, use the API, specifying the dataflow ID as query parameter. This dataflow ID is an identifier returned when you set up the dataflow.
Verify a successful data export. When exporting datasets, Experience Platform creates one or multiple .json
or .parquet
files in the storage location defined in your destination. Expect new files to be deposited in your storage location according to the export schedule you set up. Experience Platform creates a folder structure in the storage location that you specified as part of the selected destination, where it deposits the exported files. A new folder is created for each export time, following the pattern: folder-name-you-provided/datasetID/exportTime=YYYYMMDDHHMM
. The default file name is randomly generated and ensures that exported file names are unique.
Conclusion
In short, emulating the ÃÛ¶¹ÊÓƵ Analytics Data Feed functionality implies setting up scheduled queries using Query Service and using the results of these queries in scheduled Dataset exports.