Skip to content

Commit c99382e

Browse files
committed
Promote snowpark-connect with v1.1.1 audit pass
Re-staged with v1.1.1 holistic prompt that adds stopping-point markers, correct INSTRUCTIONS.md sub-flow cross-refs, and drops invalid tool snowflake_object_search. Note: removed rewriter-generated 'common mistakes' boilerplate line that triggered false-positive on <sub>/SKILL.md regex gate (was a negative example, not an actual cross-ref).
1 parent be318e8 commit c99382e

33 files changed

Lines changed: 15057 additions & 0 deletions

skills/snowpark-connect/LICENSE

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
Snowflake Skills License
2+
3+
© 2026 Snowflake Inc. All rights reserved.
4+
5+
LICENSE: Use of these materials (including all code, prompts, assets, files, and other components of these skills (collectively, “Skills”)) is governed by your agreement with Snowflake for the Service. If no separate agreement exists, use is governed by Snowflake’s Terms of Service (available at: https://www.snowflake.com/en/legal/terms-of-service/).
6+
7+
Your applicable agreement is referred to as the "Agreement." "Service" is as defined in the Agreement.
8+
9+
ADDITIONAL RESTRICTIONS: Notwithstanding anything in the Agreement to the contrary, you may not:
10+
11+
* Extract from the Service or retain copies of the Skills outside use with the Service;
12+
* Reproduce or copy the Skills , except for temporary copies created automatically during authorized use of the Service;
13+
* Create derivative works based on the Skills;
14+
* Distribute, sublicense, or transfer the Skills to any third party;
15+
* Make, offer to sell, sell, or import any inventions embodied in the Skills; nor,
16+
* Reverse engineer, decompile, or disassemble the Skills.
17+
18+
The receipt, viewing, or possession of the Skills does not convey or imply any license or right beyond those expressly granted above.
19+
20+
Snowflake retains all rights, title, and interest in the Skills, including all copyrights, trademarks, patents, and all other applicable intellectual property rights.
21+
22+
THE SKILLS ARE PROVIDED “AS IS,” WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SKILLS OR THE USE OR OTHER DEALINGS IN THE SKILLS.

skills/snowpark-connect/SKILL.md

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
---
2+
name: snowpark-connect
3+
title: Snowpark Connect for Spark
4+
summary: Route PySpark migration, validation, and deployment work to the right Snowpark Connect (SCOS) sub-flow.
5+
description: |
6+
Use when migrating PySpark code to Snowpark Connect (SCOS), setting up a local SCOS testing
7+
environment, validating an SCOS migration, tuning SCOS pipeline performance, or deploying a
8+
PySpark job to Snowflake compute pools via snowpark-submit. This umbrella skill detects intent
9+
and routes to the matching sub-flow.
10+
Triggers: snowpark connect, scos, pyspark migration, spark connect, validate migration, pyspark compatibility, snowpark-submit
11+
tools:
12+
- Bash
13+
- Read
14+
- Write
15+
- Edit
16+
- Glob
17+
- Grep
18+
prompt: Help me migrate my PySpark job to Snowpark Connect.
19+
language: en
20+
status: Published
21+
author: Snowflake Solutions Team
22+
type: snowflake
23+
---
24+
25+
# Snowpark Connect for Spark (SCOS)
26+
27+
## Overview
28+
29+
Snowpark Connect for Spark (SCOS) lets you run PySpark code against Snowflake compute. This skill is an umbrella that routes you to the right sub-flow based on what you need: setting up a local dev loop, migrating existing PySpark, validating a migration, tuning performance, or deploying to production.
30+
31+
The only required code change to switch a PySpark job to SCOS is the session bootstrap:
32+
33+
```python
34+
# Standard PySpark
35+
from pyspark.sql import SparkSession
36+
spark = SparkSession.builder.appName("App").getOrCreate()
37+
38+
# SCOS
39+
from snowflake import snowpark_connect
40+
spark = snowpark_connect.init_spark_session()
41+
```
42+
43+
## Prerequisites
44+
45+
- Snowflake account with an active warehouse
46+
- A `spark-connect` connection configured in `~/.snowflake/config.toml`
47+
- Python 3.11 (conda recommended)
48+
49+
## Run modes
50+
51+
| Mode | Compute | Command | Use case |
52+
|------|---------|---------|----------|
53+
| SCOS Local | Warehouse | `python script.py` | Development, testing |
54+
| Snowpark Submit | SPCS Compute Pool | `snowpark-submit` | Production |
55+
56+
## Workflow
57+
58+
Recommended order: Setup → Migrate → Validate → Optimize → Deploy.
59+
60+
### Step 1: Detect intent
61+
62+
Ask the user which sub-flow they need:
63+
64+
1. Set up local SCOS testing environment
65+
2. Migrate PySpark code to SCOS
66+
3. Validate a completed SCOS migration
67+
4. Optimize SCOS pipeline performance
68+
5. Deploy a Spark job via `snowpark-submit`
69+
70+
⚠️ STOPPING POINT: Wait for the user to pick a sub-flow before loading any sub-skill. If the request is ambiguous, ask one clarifying question first. If the user is new to SCOS, recommend starting with sub-flow 1 (Setup).
71+
72+
### Step 2: Route to sub-flow
73+
74+
| # | Phase | Trigger keywords | Load |
75+
|---|-------|------------------|------|
76+
| 1 | Setup | setup, local testing, dev environment, configure | `scos-local-testing/INSTRUCTIONS.md` |
77+
| 2 | Migrate | migrate, convert, port, rewrite for SCOS | `migrate-pyspark-to-snowpark-connect/INSTRUCTIONS.md` |
78+
| 3 | Validate | validate, verify, test migration, smoke test | `validate-pyspark-to-snowpark-connect/INSTRUCTIONS.md` |
79+
| 4 | Optimize | slow, performance, cross join, memory, optimize | `scos-performance/INSTRUCTIONS.md` |
80+
| 5 | Deploy | snowpark-submit, deploy, production, compute pool | `snowpark-submit/INSTRUCTIONS.md` |
81+
82+
Each sub-flow contains its own multi-step workflow, code diffs, and verification commands.
83+
84+
## Common Mistakes
85+
86+
- Skipping Setup and trying to migrate first — the local dev loop catches issues fast; production runs do not.
87+
- Editing more than the session bootstrap during migration. Start by changing only `SparkSession.builder...` to `snowpark_connect.init_spark_session()`, run, then fix what actually breaks.
88+
- Mixing run modes mid-flow. Use SCOS Local for iteration; switch to `snowpark-submit` only when the job is stable.
89+
- Tuning performance before the job runs end-to-end. Validate correctness first, optimize second.
90+
- Hardcoding credentials. Configure `spark-connect` in `~/.snowflake/config.toml` and let the SDK resolve auth.
91+
92+
## Stopping Points
93+
94+
- Step 1 — wait for the user to pick a sub-flow before loading any sub-skill content or running commands.
95+
96+
## Output
97+
98+
The user is routed into the matching sub-flow, which then drives the rest of the work.
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
# PySpark to Snowpark Connect (SCOS) Migration with Cortex Code
2+
3+
This document captures the end-to-end workflow for migrating a PySpark workload to Snowflake SCOS using the Cortex Code **Snowpark Connect** skill.
4+
5+
## Quick Reference
6+
7+
### Setup
8+
```bash
9+
conda run -n scos python -c "from snowflake import snowpark_connect; print('OK')" # verify runtime
10+
snow sql -q "SELECT 1" -c snowpark-connect # verify Snowflake connection
11+
```
12+
13+
### Migrate
14+
```bash
15+
# In Cortex Code: activate snowpark-connect skill → select "Migrate"
16+
# Produces pyspark_transform_scos.py + analysis.json from pyspark_transform.py
17+
```
18+
19+
### Validate
20+
```bash
21+
cd pyspark_transform_scos_test && conda run -n scos --no-capture-output python entrypoint.py
22+
```
23+
24+
### Optimize
25+
```bash
26+
# In Cortex Code: activate snowpark-connect skill → select "Optimize"
27+
# Converts Python UDFs to native SQL expressions, adds case sensitivity guard
28+
```
29+
30+
### Deploy
31+
```bash
32+
snow sql -q "CREATE STAGE IF NOT EXISTS SCOS_APPS_STAGE DIRECTORY=(ENABLE=TRUE)" -c snowpark-connect
33+
snow sql -q "CREATE STAGE IF NOT EXISTS SCOS_DATA_STAGE DIRECTORY=(ENABLE=TRUE)" -c snowpark-connect
34+
snow sql -q "PUT file://pyspark_transform_scos.py @SCOS_APPS_STAGE/ AUTO_COMPRESS=FALSE OVERWRITE=TRUE" -c snowpark-connect
35+
snow sql -q "PUT file://data/jobs.parquet @SCOS_DATA_STAGE/data/ AUTO_COMPRESS=FALSE OVERWRITE=TRUE" -c snowpark-connect
36+
snow sql -q "PUT file://data/companies.parquet @SCOS_DATA_STAGE/data/ AUTO_COMPRESS=FALSE OVERWRITE=TRUE" -c snowpark-connect
37+
snow sql -q "PUT file://data/applications.parquet @SCOS_DATA_STAGE/data/ AUTO_COMPRESS=FALSE OVERWRITE=TRUE" -c snowpark-connect
38+
conda run -n scos --no-capture-output snowpark-submit \
39+
--snowflake-stage=@DEMO.SPCONN.SCOS_APPS_STAGE \
40+
--snowflake-workload-name=scos_job_analytics \
41+
--snowflake-connection-name=snowpark-connect \
42+
--compute-pool=SNOWPARK_SUBMIT_POOL_XS \
43+
pyspark_transform_scos.py
44+
```
45+
46+
---
47+
48+
## Project Structure
49+
50+
```
51+
example/
52+
├── pyspark_transform.py # Original PySpark workload
53+
├── pyspark_transform_scos.py # Migrated SCOS workload
54+
├── analysis.json # Compatibility analysis results
55+
├── data/ # Source parquet files
56+
├── output/ # Pipeline output
57+
├── pyspark_transform_scos_test/ # Validation test directory
58+
│ ├── entrypoint.py # Test entrypoint with synthetic data
59+
│ ├── pyspark_transform_scos.py # Copy of migrated workload
60+
│ ├── data/ # Synthetic test data
61+
│ ├── output/ # Test output
62+
│ └── output.log # Validation run log
63+
└── README.md
64+
```
65+
66+
---
67+
68+
## Detailed Steps
69+
70+
### 1. Local Testing Environment Setup
71+
72+
**Prerequisites:** conda env `scos` with `snowpark-connect`, Snowflake connection `snowpark-connect` in `~/.snowflake/config.toml`, Python 3.11.
73+
74+
```bash
75+
conda run -n scos python -c "from snowflake import snowpark_connect; print('OK')"
76+
```
77+
78+
The migration analyzer uses a RAG-based Cortex Search Service (`SCOS_MIGRATION.PUBLIC.SCOS_COMPAT_ISSUES_SERVICE`). Initialized automatically on first use.
79+
80+
---
81+
82+
### 2. Migration
83+
84+
Activate the Snowpark Connect skill in Cortex Code and select **Migrate**. The 6-step workflow: analyze → copy → apply fixes → update imports → add header → verify.
85+
86+
**Analysis found 8 issues** in `pyspark_transform.py`:
87+
88+
| Lines | Risk | Issue | Action |
89+
|-------|------|-------|--------|
90+
| 46 | **1.0** | `spark.sparkContext.setLogLevel()` - RDD API not supported | Removed |
91+
| 49-51 | 0.2 | Local parquet file reads | Added stage performance tip |
92+
| 100-104 | 0.2 | `coalesce(1)` is a no-op in SCOS | Commented as no-op |
93+
| 57-80 | 0.1-0.15 | Window/filter/groupBy patterns | Reviewed, safe |
94+
95+
**Key change** — session initialization:
96+
```python
97+
# BEFORE # AFTER
98+
from pyspark.sql import SparkSession from snowflake import snowpark_connect
99+
spark = SparkSession.builder \ spark = snowpark_connect.init_spark_session()
100+
.master("local[*]").getOrCreate()
101+
```
102+
103+
---
104+
105+
### 3. Validation
106+
107+
Smoke test using synthetic data on the real SCOS runtime. The entrypoint creates 5 jobs, 3 companies, 5 applications as parquet, then calls the real `main()`.
108+
109+
```bash
110+
cd pyspark_transform_scos_test && conda run -n scos --no-capture-output python entrypoint.py
111+
```
112+
113+
**Result:** All pipeline stages passed — parquet reads, window functions, joins, aggregations, parquet write.
114+
115+
---
116+
117+
### 4. Optimization
118+
119+
Activate the Snowpark Connect skill and select **Optimize**. Changes applied:
120+
121+
- **Python UDFs → native SQL expressions**: Replaced `@F.udf` functions with `F.when/otherwise` chains (eliminates serde overhead)
122+
- **Case sensitivity**: Added `spark.conf.set("spark.sql.caseSensitive", "true")` to prevent column uppercasing
123+
- **Array indexing**: Replaced `parts[Column]` with `F.element_at(parts, -1)` (required for Spark Connect mode)
124+
125+
---
126+
127+
### 5. Deployment
128+
129+
Activate the Snowpark Connect skill and select **Deploy**. Uses `snowpark-submit` to run on SPCS compute pools.
130+
131+
**Key pattern** — dual-mode session for local dev vs. snowpark-submit:
132+
```python
133+
def create_session():
134+
if os.environ.get("SPARK_REMOTE"):
135+
return SparkSession.builder.remote(os.environ["SPARK_REMOTE"]).getOrCreate()
136+
else:
137+
from snowflake import snowpark_connect
138+
return snowpark_connect.init_spark_session()
139+
```
140+
141+
Without this, `snowpark-submit` fails with `RuntimeError: Snowpark Connect cannot be run inside of a Spark environment` because it already provides a Spark Connect session.
142+
143+
**Deployment result:** 51K jobs + 200K applications processed, output written to `@SCOS_DATA_STAGE/output/job_analytics/` (893 KB).
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
[
2+
{
3+
"file": "/Users/pjain/git/coco-work/test_scos_migration/example/pyspark_transform.py",
4+
"lines": "46-46",
5+
"code": "spark.sparkContext.setLogLevel(\"WARN\")",
6+
"final_risk": 1.0,
7+
"root_cause": "Uses '.sparkContext' which is not supported in SCOS",
8+
"explanation": "RDD operations are not supported in SCOS.",
9+
"fix": "Convert to DataFrame operations. RDD operations are not supported in SCOS.",
10+
"confidence": "HIGH"
11+
},
12+
{
13+
"file": "/Users/pjain/git/coco-work/test_scos_migration/example/pyspark_transform.py",
14+
"lines": "50-50",
15+
"code": "companies = spark.read.parquet(os.path.join(DATA_DIR, \"companies.parquet\"))",
16+
"final_risk": 0.2,
17+
"root_cause": "We don't support partitioned write in local files. 4th argument i.e numPartitions in range function is a no-op in Snowpark Connect. ",
18+
"explanation": "Reading parquet files is supported in SCOS. The preliminary assessment notes a potential performance concern when reading from external paths rather than Snowflake stages, but this is not a compatibility failure.",
19+
"fix": "For better performance, consider uploading files to a Snowflake stage first using session.file.put().",
20+
"confidence": "HIGH"
21+
},
22+
{
23+
"file": "/Users/pjain/git/coco-work/test_scos_migration/example/pyspark_transform.py",
24+
"lines": "49-49",
25+
"code": "jobs = spark.read.parquet(os.path.join(DATA_DIR, \"jobs.parquet\"))",
26+
"final_risk": 0.2,
27+
"root_cause": "We don't support partitioned write in local files. 4th argument i.e numPartitions in range function is a no-op in Snowpark Connect. ",
28+
"explanation": "Reading parquet files is supported in SCOS. The warning is about potential performance differences when reading from external paths, not a compatibility failure.",
29+
"fix": "For better performance, consider uploading files to a Snowflake stage first using session.file.put().",
30+
"confidence": "HIGH"
31+
},
32+
{
33+
"file": "/Users/pjain/git/coco-work/test_scos_migration/example/pyspark_transform.py",
34+
"lines": "51-51",
35+
"code": "applications = spark.read.parquet(os.path.join(DATA_DIR, \"applications.parquet\"))",
36+
"final_risk": 0.2,
37+
"root_cause": "We don't support partitioned write in local files. 4th argument i.e numPartitions in range function is a no-op in Snowpark Connect. ",
38+
"explanation": "Reading parquet files is supported in SCOS. The warning relates to potential performance differences when reading from external paths, not a compatibility failure.",
39+
"fix": "For better performance, consider uploading files to a Snowflake stage first using session.file.put().",
40+
"confidence": "HIGH"
41+
},
42+
{
43+
"file": "/Users/pjain/git/coco-work/test_scos_migration/example/pyspark_transform.py",
44+
"lines": "100-104",
45+
"code": "final.select(\n \"job_id\", \"company_name\", \"industry\", \"title\", \"state\",\n \"salary_bucket\", \"salary_midpoint\", \"posted_month\",\n \"total_applications\", \"unique_applicants\", \"hires\",\n ).coalesce(1).write.mode(\"overwrite\").parquet(os.path.join(OUTPUT_DIR, \"job_analytics\"))",
46+
"final_risk": 0.2,
47+
"root_cause": "coalesce() is a no-op in SCOS - the code will run but may produce multiple output files instead of the intended single file",
48+
"explanation": "The coalesce(1) call is a no-op in SCOS, meaning the code will execute successfully but may not produce a single output file as intended. This is a behavioral difference rather than a failure.",
49+
"fix": "If single-file output is required, consider post-processing to merge files or use Snowflake-native methods for file consolidation. Otherwise, the code will work but with potentially multiple output files.",
50+
"confidence": "HIGH"
51+
},
52+
{
53+
"file": "/Users/pjain/git/coco-work/test_scos_migration/example/pyspark_transform.py",
54+
"lines": "57-59",
55+
"code": "jobs_deduped = jobs.withColumn(\"_rn\", F.row_number().over(w)) \\\n .filter(F.col(\"_rn\") == 1) \\\n .drop(\"_rn\")",
56+
"final_risk": 0.15,
57+
"root_cause": "Cannot filter using original DataFrame columns after transformation operations (drop, select, withColumn, etc.)",
58+
"explanation": "The code uses standard window functions and filtering on a newly created column. Unlike the similar test cases which fail when referencing original DataFrame columns after transformations, this code filters on '_rn' which is created in the same transformation chain.",
59+
"fix": null,
60+
"confidence": "MEDIUM"
61+
},
62+
{
63+
"file": "/Users/pjain/git/coco-work/test_scos_migration/example/pyspark_transform.py",
64+
"lines": "60-62",
65+
"code": "jobs_clean = jobs_deduped \\\n .filter(F.col(\"salary_min\").isNotNull()) \\\n .filter(F.col(\"salary_max\") > F.col(\"salary_min\"))",
66+
"final_risk": 0.1,
67+
"root_cause": "Cannot filter using original DataFrame columns after transformation operations (drop, select, withColumn, etc.)",
68+
"explanation": "The input code performs standard filtering on existing columns. The similar test cases fail due to filtering on columns after drop() operations, which doesn't apply here since we're filtering directly on the DataFrame's own columns.",
69+
"fix": null,
70+
"confidence": "HIGH"
71+
},
72+
{
73+
"file": "/Users/pjain/git/coco-work/test_scos_migration/example/pyspark_transform.py",
74+
"lines": "76-80",
75+
"code": "app_stats = applications.groupBy(\"job_id\").agg(\n F.count(\"*\").alias(\"total_applications\"),\n F.countDistinct(\"applicant_id\").alias(\"unique_applicants\"),\n F.sum(F.when(F.col(\"status\") == \"hired\", 1).otherwise(0)).alias(\"hires\"),\n )",
76+
"final_risk": 0.1,
77+
"root_cause": "Ambiguous column reference in select after groupBy and agg on the same column",
78+
"explanation": "The input code uses proper aliasing for all aggregations, avoiding the ambiguous column reference issue from the similar test cases. The first/last non-determinism issue doesn't apply since those functions aren't used.",
79+
"fix": null,
80+
"confidence": "HIGH"
81+
}
82+
]
8.44 MB
Binary file not shown.
8.22 KB
Binary file not shown.
1.11 MB
Binary file not shown.

0 commit comments

Comments
 (0)