Skip to content

Commit 3d9fe47

Browse files
committed
Stage snowpark-connect from Snowflake-Solutions
1 parent be318e8 commit 3d9fe47

33 files changed

Lines changed: 15059 additions & 0 deletions

skills/snowpark-connect/LICENSE

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
Snowflake Skills License
2+
3+
© 2026 Snowflake Inc. All rights reserved.
4+
5+
LICENSE: Use of these materials (including all code, prompts, assets, files, and other components of these skills (collectively, “Skills”)) is governed by your agreement with Snowflake for the Service. If no separate agreement exists, use is governed by Snowflake’s Terms of Service (available at: https://www.snowflake.com/en/legal/terms-of-service/).
6+
7+
Your applicable agreement is referred to as the "Agreement." "Service" is as defined in the Agreement.
8+
9+
ADDITIONAL RESTRICTIONS: Notwithstanding anything in the Agreement to the contrary, you may not:
10+
11+
* Extract from the Service or retain copies of the Skills outside use with the Service;
12+
* Reproduce or copy the Skills , except for temporary copies created automatically during authorized use of the Service;
13+
* Create derivative works based on the Skills;
14+
* Distribute, sublicense, or transfer the Skills to any third party;
15+
* Make, offer to sell, sell, or import any inventions embodied in the Skills; nor,
16+
* Reverse engineer, decompile, or disassemble the Skills.
17+
18+
The receipt, viewing, or possession of the Skills does not convey or imply any license or right beyond those expressly granted above.
19+
20+
Snowflake retains all rights, title, and interest in the Skills, including all copyrights, trademarks, patents, and all other applicable intellectual property rights.
21+
22+
THE SKILLS ARE PROVIDED “AS IS,” WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SKILLS OR THE USE OR OTHER DEALINGS IN THE SKILLS.

skills/snowpark-connect/SKILL.md

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
---
2+
name: snowpark-connect
3+
title: Migrate to Snowpark Connect
4+
summary: Migrate, validate, optimize, and deploy PySpark workloads on Snowflake using Snowpark Connect (SCOS).
5+
description: |
6+
Use when migrating PySpark to Snowpark Connect, validating SCOS migrations, analyzing Spark
7+
compatibility, optimizing SCOS pipeline performance, or deploying PySpark jobs to Snowflake
8+
compute pools via snowpark-submit.
9+
Triggers: snowpark connect, scos, pyspark migration, spark connect, validate migration, pyspark compatibility, snowpark-submit.
10+
tools:
11+
- Bash
12+
- Read
13+
- Write
14+
- Edit
15+
- Glob
16+
- Grep
17+
- snowflake_sql_execute
18+
prompt: Help me migrate this PySpark job to Snowpark Connect and validate it runs on Snowflake.
19+
language: en
20+
status: Published
21+
author: Snowflake Solutions Team
22+
type: snowflake
23+
---
24+
25+
# Migrate to Snowpark Connect (SCOS)
26+
27+
## Overview
28+
29+
Snowpark Connect for Spark (SCOS) lets you run PySpark code on Snowflake compute with minimal changes. Most workloads only need to swap the `SparkSession` builder for `snowpark_connect.init_spark_session()`. This skill routes you through the lifecycle: set up a local dev environment, migrate code, validate behavior against the real SCOS runtime, tune performance, and deploy to production via `snowpark-submit` on SPCS compute pools.
30+
31+
## Prerequisites
32+
33+
- Snowflake account with an active warehouse
34+
- `spark-connect` connection configured in `~/.snowflake/config.toml`
35+
- Python 3.11 (conda recommended)
36+
37+
## Quick Reference
38+
39+
| Mode | Compute | Command | Use Case |
40+
|------|---------|---------|----------|
41+
| SCOS Local | Warehouse | `python script.py` | Development, testing |
42+
| Snowpark Submit | SPCS Compute Pool | `snowpark-submit` | Production |
43+
44+
### Key code change
45+
46+
```python
47+
# Standard PySpark
48+
from pyspark.sql import SparkSession
49+
spark = SparkSession.builder.appName("App").getOrCreate()
50+
51+
# SCOS
52+
from snowflake import snowpark_connect
53+
spark = snowpark_connect.init_spark_session()
54+
```
55+
56+
## Workflow
57+
58+
Recommended order: **Setup → Migrate → Validate → Optimize → Deploy**
59+
60+
### Step 1: Detect intent
61+
62+
Ask the user which phase they need:
63+
64+
```
65+
What would you like to do with Snowpark Connect?
66+
67+
1. Set up a local SCOS testing environment
68+
2. Migrate PySpark code to SCOS
69+
3. Validate a completed SCOS migration
70+
4. Optimize SCOS pipeline performance
71+
5. Deploy a Spark job to Snowflake via snowpark-submit
72+
```
73+
74+
Wait for user selection before proceeding.
75+
76+
### Step 2: Route to sub-skill
77+
78+
| # | Phase | Triggers | Load |
79+
|---|-------|----------|------|
80+
| 1 | Setup | "setup", "local testing", "configure" | `scos-local-testing/SKILL.md` |
81+
| 2 | Migrate | "migrate", "convert", "port" | `migrate-pyspark-to-snowpark-connect/SKILL.md` |
82+
| 3 | Validate | "validate", "verify", "smoke test" | `validate-pyspark-to-snowpark-connect/SKILL.md` |
83+
| 4 | Optimize | "slow", "performance", "cross join", "memory" | `scos-performance/SKILL.md` |
84+
| 5 | Deploy | "snowpark-submit", "production", "compute pool" | `snowpark-submit/SKILL.md` |
85+
86+
If intent is ambiguous, clarify before routing. If the user is new to SCOS, recommend starting with Phase 1.
87+
88+
## Common Mistakes
89+
90+
- **Skipping local setup.** Trying to migrate without a working `spark-connect` connection in `~/.snowflake/config.toml` produces opaque auth errors. Verify the connection first.
91+
- **Mixing PySpark and SCOS sessions.** Don't keep a `SparkSession.builder` call alongside `snowpark_connect.init_spark_session()`. Replace it fully.
92+
- **Assuming 1:1 API parity.** Some PySpark APIs (RDDs, certain UDFs, Hive-specific features) aren't supported. Run the validation phase against real SCOS before declaring done.
93+
- **Using a Python version other than 3.11.** SCOS pins to 3.11; mismatched envs cause import failures.
94+
- **Deploying before validating.** `snowpark-submit` runs on SPCS compute pools — debug locally first to avoid burning compute on broken jobs.
95+
- **Cross joins on large tables.** SCOS will execute them, but they'll be slow. Use the optimize phase to detect and rewrite.
96+
- **Hardcoding warehouse names.** Pull warehouse and role from `config.toml` so the same code runs in dev and prod.
97+
98+
## Output
99+
100+
The user is routed to the appropriate sub-skill, which handles the detailed workflow for that phase.
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
# PySpark to Snowpark Connect (SCOS) Migration with Cortex Code
2+
3+
This document captures the end-to-end workflow for migrating a PySpark workload to Snowflake SCOS using the Cortex Code **Snowpark Connect** skill.
4+
5+
## Quick Reference
6+
7+
### Setup
8+
```bash
9+
conda run -n scos python -c "from snowflake import snowpark_connect; print('OK')" # verify runtime
10+
snow sql -q "SELECT 1" -c snowpark-connect # verify Snowflake connection
11+
```
12+
13+
### Migrate
14+
```bash
15+
# In Cortex Code: activate snowpark-connect skill → select "Migrate"
16+
# Produces pyspark_transform_scos.py + analysis.json from pyspark_transform.py
17+
```
18+
19+
### Validate
20+
```bash
21+
cd pyspark_transform_scos_test && conda run -n scos --no-capture-output python entrypoint.py
22+
```
23+
24+
### Optimize
25+
```bash
26+
# In Cortex Code: activate snowpark-connect skill → select "Optimize"
27+
# Converts Python UDFs to native SQL expressions, adds case sensitivity guard
28+
```
29+
30+
### Deploy
31+
```bash
32+
snow sql -q "CREATE STAGE IF NOT EXISTS SCOS_APPS_STAGE DIRECTORY=(ENABLE=TRUE)" -c snowpark-connect
33+
snow sql -q "CREATE STAGE IF NOT EXISTS SCOS_DATA_STAGE DIRECTORY=(ENABLE=TRUE)" -c snowpark-connect
34+
snow sql -q "PUT file://pyspark_transform_scos.py @SCOS_APPS_STAGE/ AUTO_COMPRESS=FALSE OVERWRITE=TRUE" -c snowpark-connect
35+
snow sql -q "PUT file://data/jobs.parquet @SCOS_DATA_STAGE/data/ AUTO_COMPRESS=FALSE OVERWRITE=TRUE" -c snowpark-connect
36+
snow sql -q "PUT file://data/companies.parquet @SCOS_DATA_STAGE/data/ AUTO_COMPRESS=FALSE OVERWRITE=TRUE" -c snowpark-connect
37+
snow sql -q "PUT file://data/applications.parquet @SCOS_DATA_STAGE/data/ AUTO_COMPRESS=FALSE OVERWRITE=TRUE" -c snowpark-connect
38+
conda run -n scos --no-capture-output snowpark-submit \
39+
--snowflake-stage=@DEMO.SPCONN.SCOS_APPS_STAGE \
40+
--snowflake-workload-name=scos_job_analytics \
41+
--snowflake-connection-name=snowpark-connect \
42+
--compute-pool=SNOWPARK_SUBMIT_POOL_XS \
43+
pyspark_transform_scos.py
44+
```
45+
46+
---
47+
48+
## Project Structure
49+
50+
```
51+
example/
52+
├── pyspark_transform.py # Original PySpark workload
53+
├── pyspark_transform_scos.py # Migrated SCOS workload
54+
├── analysis.json # Compatibility analysis results
55+
├── data/ # Source parquet files
56+
├── output/ # Pipeline output
57+
├── pyspark_transform_scos_test/ # Validation test directory
58+
│ ├── entrypoint.py # Test entrypoint with synthetic data
59+
│ ├── pyspark_transform_scos.py # Copy of migrated workload
60+
│ ├── data/ # Synthetic test data
61+
│ ├── output/ # Test output
62+
│ └── output.log # Validation run log
63+
└── README.md
64+
```
65+
66+
---
67+
68+
## Detailed Steps
69+
70+
### 1. Local Testing Environment Setup
71+
72+
**Prerequisites:** conda env `scos` with `snowpark-connect`, Snowflake connection `snowpark-connect` in `~/.snowflake/config.toml`, Python 3.11.
73+
74+
```bash
75+
conda run -n scos python -c "from snowflake import snowpark_connect; print('OK')"
76+
```
77+
78+
The migration analyzer uses a RAG-based Cortex Search Service (`SCOS_MIGRATION.PUBLIC.SCOS_COMPAT_ISSUES_SERVICE`). Initialized automatically on first use.
79+
80+
---
81+
82+
### 2. Migration
83+
84+
Activate the Snowpark Connect skill in Cortex Code and select **Migrate**. The 6-step workflow: analyze → copy → apply fixes → update imports → add header → verify.
85+
86+
**Analysis found 8 issues** in `pyspark_transform.py`:
87+
88+
| Lines | Risk | Issue | Action |
89+
|-------|------|-------|--------|
90+
| 46 | **1.0** | `spark.sparkContext.setLogLevel()` - RDD API not supported | Removed |
91+
| 49-51 | 0.2 | Local parquet file reads | Added stage performance tip |
92+
| 100-104 | 0.2 | `coalesce(1)` is a no-op in SCOS | Commented as no-op |
93+
| 57-80 | 0.1-0.15 | Window/filter/groupBy patterns | Reviewed, safe |
94+
95+
**Key change** — session initialization:
96+
```python
97+
# BEFORE # AFTER
98+
from pyspark.sql import SparkSession from snowflake import snowpark_connect
99+
spark = SparkSession.builder \ spark = snowpark_connect.init_spark_session()
100+
.master("local[*]").getOrCreate()
101+
```
102+
103+
---
104+
105+
### 3. Validation
106+
107+
Smoke test using synthetic data on the real SCOS runtime. The entrypoint creates 5 jobs, 3 companies, 5 applications as parquet, then calls the real `main()`.
108+
109+
```bash
110+
cd pyspark_transform_scos_test && conda run -n scos --no-capture-output python entrypoint.py
111+
```
112+
113+
**Result:** All pipeline stages passed — parquet reads, window functions, joins, aggregations, parquet write.
114+
115+
---
116+
117+
### 4. Optimization
118+
119+
Activate the Snowpark Connect skill and select **Optimize**. Changes applied:
120+
121+
- **Python UDFs → native SQL expressions**: Replaced `@F.udf` functions with `F.when/otherwise` chains (eliminates serde overhead)
122+
- **Case sensitivity**: Added `spark.conf.set("spark.sql.caseSensitive", "true")` to prevent column uppercasing
123+
- **Array indexing**: Replaced `parts[Column]` with `F.element_at(parts, -1)` (required for Spark Connect mode)
124+
125+
---
126+
127+
### 5. Deployment
128+
129+
Activate the Snowpark Connect skill and select **Deploy**. Uses `snowpark-submit` to run on SPCS compute pools.
130+
131+
**Key pattern** — dual-mode session for local dev vs. snowpark-submit:
132+
```python
133+
def create_session():
134+
if os.environ.get("SPARK_REMOTE"):
135+
return SparkSession.builder.remote(os.environ["SPARK_REMOTE"]).getOrCreate()
136+
else:
137+
from snowflake import snowpark_connect
138+
return snowpark_connect.init_spark_session()
139+
```
140+
141+
Without this, `snowpark-submit` fails with `RuntimeError: Snowpark Connect cannot be run inside of a Spark environment` because it already provides a Spark Connect session.
142+
143+
**Deployment result:** 51K jobs + 200K applications processed, output written to `@SCOS_DATA_STAGE/output/job_analytics/` (893 KB).
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
[
2+
{
3+
"file": "/Users/pjain/git/coco-work/test_scos_migration/example/pyspark_transform.py",
4+
"lines": "46-46",
5+
"code": "spark.sparkContext.setLogLevel(\"WARN\")",
6+
"final_risk": 1.0,
7+
"root_cause": "Uses '.sparkContext' which is not supported in SCOS",
8+
"explanation": "RDD operations are not supported in SCOS.",
9+
"fix": "Convert to DataFrame operations. RDD operations are not supported in SCOS.",
10+
"confidence": "HIGH"
11+
},
12+
{
13+
"file": "/Users/pjain/git/coco-work/test_scos_migration/example/pyspark_transform.py",
14+
"lines": "50-50",
15+
"code": "companies = spark.read.parquet(os.path.join(DATA_DIR, \"companies.parquet\"))",
16+
"final_risk": 0.2,
17+
"root_cause": "We don't support partitioned write in local files. 4th argument i.e numPartitions in range function is a no-op in Snowpark Connect. ",
18+
"explanation": "Reading parquet files is supported in SCOS. The preliminary assessment notes a potential performance concern when reading from external paths rather than Snowflake stages, but this is not a compatibility failure.",
19+
"fix": "For better performance, consider uploading files to a Snowflake stage first using session.file.put().",
20+
"confidence": "HIGH"
21+
},
22+
{
23+
"file": "/Users/pjain/git/coco-work/test_scos_migration/example/pyspark_transform.py",
24+
"lines": "49-49",
25+
"code": "jobs = spark.read.parquet(os.path.join(DATA_DIR, \"jobs.parquet\"))",
26+
"final_risk": 0.2,
27+
"root_cause": "We don't support partitioned write in local files. 4th argument i.e numPartitions in range function is a no-op in Snowpark Connect. ",
28+
"explanation": "Reading parquet files is supported in SCOS. The warning is about potential performance differences when reading from external paths, not a compatibility failure.",
29+
"fix": "For better performance, consider uploading files to a Snowflake stage first using session.file.put().",
30+
"confidence": "HIGH"
31+
},
32+
{
33+
"file": "/Users/pjain/git/coco-work/test_scos_migration/example/pyspark_transform.py",
34+
"lines": "51-51",
35+
"code": "applications = spark.read.parquet(os.path.join(DATA_DIR, \"applications.parquet\"))",
36+
"final_risk": 0.2,
37+
"root_cause": "We don't support partitioned write in local files. 4th argument i.e numPartitions in range function is a no-op in Snowpark Connect. ",
38+
"explanation": "Reading parquet files is supported in SCOS. The warning relates to potential performance differences when reading from external paths, not a compatibility failure.",
39+
"fix": "For better performance, consider uploading files to a Snowflake stage first using session.file.put().",
40+
"confidence": "HIGH"
41+
},
42+
{
43+
"file": "/Users/pjain/git/coco-work/test_scos_migration/example/pyspark_transform.py",
44+
"lines": "100-104",
45+
"code": "final.select(\n \"job_id\", \"company_name\", \"industry\", \"title\", \"state\",\n \"salary_bucket\", \"salary_midpoint\", \"posted_month\",\n \"total_applications\", \"unique_applicants\", \"hires\",\n ).coalesce(1).write.mode(\"overwrite\").parquet(os.path.join(OUTPUT_DIR, \"job_analytics\"))",
46+
"final_risk": 0.2,
47+
"root_cause": "coalesce() is a no-op in SCOS - the code will run but may produce multiple output files instead of the intended single file",
48+
"explanation": "The coalesce(1) call is a no-op in SCOS, meaning the code will execute successfully but may not produce a single output file as intended. This is a behavioral difference rather than a failure.",
49+
"fix": "If single-file output is required, consider post-processing to merge files or use Snowflake-native methods for file consolidation. Otherwise, the code will work but with potentially multiple output files.",
50+
"confidence": "HIGH"
51+
},
52+
{
53+
"file": "/Users/pjain/git/coco-work/test_scos_migration/example/pyspark_transform.py",
54+
"lines": "57-59",
55+
"code": "jobs_deduped = jobs.withColumn(\"_rn\", F.row_number().over(w)) \\\n .filter(F.col(\"_rn\") == 1) \\\n .drop(\"_rn\")",
56+
"final_risk": 0.15,
57+
"root_cause": "Cannot filter using original DataFrame columns after transformation operations (drop, select, withColumn, etc.)",
58+
"explanation": "The code uses standard window functions and filtering on a newly created column. Unlike the similar test cases which fail when referencing original DataFrame columns after transformations, this code filters on '_rn' which is created in the same transformation chain.",
59+
"fix": null,
60+
"confidence": "MEDIUM"
61+
},
62+
{
63+
"file": "/Users/pjain/git/coco-work/test_scos_migration/example/pyspark_transform.py",
64+
"lines": "60-62",
65+
"code": "jobs_clean = jobs_deduped \\\n .filter(F.col(\"salary_min\").isNotNull()) \\\n .filter(F.col(\"salary_max\") > F.col(\"salary_min\"))",
66+
"final_risk": 0.1,
67+
"root_cause": "Cannot filter using original DataFrame columns after transformation operations (drop, select, withColumn, etc.)",
68+
"explanation": "The input code performs standard filtering on existing columns. The similar test cases fail due to filtering on columns after drop() operations, which doesn't apply here since we're filtering directly on the DataFrame's own columns.",
69+
"fix": null,
70+
"confidence": "HIGH"
71+
},
72+
{
73+
"file": "/Users/pjain/git/coco-work/test_scos_migration/example/pyspark_transform.py",
74+
"lines": "76-80",
75+
"code": "app_stats = applications.groupBy(\"job_id\").agg(\n F.count(\"*\").alias(\"total_applications\"),\n F.countDistinct(\"applicant_id\").alias(\"unique_applicants\"),\n F.sum(F.when(F.col(\"status\") == \"hired\", 1).otherwise(0)).alias(\"hires\"),\n )",
76+
"final_risk": 0.1,
77+
"root_cause": "Ambiguous column reference in select after groupBy and agg on the same column",
78+
"explanation": "The input code uses proper aliasing for all aggregations, avoiding the ambiguous column reference issue from the similar test cases. The first/last non-determinism issue doesn't apply since those functions aren't used.",
79+
"fix": null,
80+
"confidence": "HIGH"
81+
}
82+
]
8.44 MB
Binary file not shown.
8.22 KB
Binary file not shown.
1.11 MB
Binary file not shown.

0 commit comments

Comments
 (0)