Batch Upsert Thousands of Rows from Alteryx to a SharePoint List using Python

Why bother?

SharePoint Lists provide a secure way to share data with specific user groups via API, with granular access controls and advantages over file-based shares. But connecting to them requires the right approach. While Alteryx offers a native SharePoint connector with broad functionality, it might not provide all the customization that's needed. When processing thousands of items daily in production, you may require finer control over batch operations, retry logic, and error handling than the official connector provides.

Here's a customizable solution for syncing thousands of Alteryx rows to a SharePoint List using the Python tool. It leverages Microsoft Graph API's $batch endpoint (20 items per request, as suggested by Microsoft), OAuth 2.0 authentication, and a retry logic with exponential backoff (see below) for handling 429/503 errors. The two scripts respect rate limits via the server's Retry-After header. Logging integrates nicely with Alteryx's native logs and two debug outputs are sent downstream in Alteryx. The scripts use only standard packages (requests, pandas, ayx) likely already available on your Alteryx Server, providing a starting point for SharePoint List integration.

The Plan

The goal is to schedule a workflow that syncs Alteryx data with a SharePoint List, so the list reflects exactly the rows present in Alteryx with all their fields. For the complete sync process, we'll end up with 2 Python tools in Alteryx, run sequentially after each other. For the syncing to work, the ShP List needs a unique row identifier. This blog assumes a unique column is set up with the name Title.

  • Script 1) Delete ShP items not present in the Alteryx data
  • Script 2) Perform batch upsert (update if present, insert if not)
Deletion and Upsert Scripts (to be Run in Sequence)

Authentication

A clean way to programmatically access the Graph API in production is to use the OAuth 2.0 client credentials flow. This means an Azure app needs to be created in the Azure portal, providing client id & client secret values. The script will then use these credentials to obtain a bearer token, which will be used to authorize the Graph API requests.

A SharePoint List lives in a SharePoint Site. Once the Azure app is created, 2 additional things are needed for it to access the list/site in a safe, least-privileges setup:

a) Azure Portal > App Registrations > (search for & select your app by name) > API permissions > Add a permission > Microsoft Graph > Application permisions > Sites.Selected > Add > "Grant admin consent for <tenant>"

b) Ask an Azure admin with access the target ShP Site (or a ShP admin) to run these PowerShell commands:

# This will open a browser window for the admin to sign in Connect-MgGraph -Scopes "Sites.FullControl.All" # Get the composite site ID $site = Get-MgSite -SiteId "your_tenant.sharepoint.com:/sites/your_site_name" Write-Host "Site ID found: $($site.Id)" # Define the permission parameters $params = @{ roles = @("write") grantedToIdentities = @( @{ application = @{ id = "your_app_client_id" displayName = "your_app_name" } } ) } # Grant the permission New-MgSitePermission -SiteId $site.Id -BodyParameter $params # Verify it worked Get-MgSitePermission -SiteId $site.Id | Where-Object { $_.GrantedToIdentities.Application.Id -eq "your_app_client_id" }

Script 1) List Cleanup

Let's have a look at the deletion script. What does it do? It

    • reads incoming Alteryx data
    • authenticates via Graph API & gets all items from ShP List
    • compares incoming and present data using a unique field
    • deletes all items from the ShP List that don't match the incoming data

To get the script to work, some details need to be entered:

CONFIG = {
    "tenant_id": "your_tenant_id",
    "client_id": "app_client_id", 
    "client_secret": "app_client_id",
    "site_id": "your_tenant.sharepoint.com,your_site_collection_id,you_site_id",
    "list_id": "your_list_id",
    "unique_field": "Title"
}

Change the unique_field to match the internal name of the unique 'main' field of your ShP List (primary key). The site_id is a composite id. Either concatenate it from its pieces or fetch the whole id like so:

GET https://graph.microsoft.com/v1.0/sites/your_tenant.sharepoint.com:/sites/your_site_name


################################# # Configuration ################################# from ayx import Alteryx import pandas as pd import requests import time from datetime import datetime CONFIG = { "tenant_id": "your_tenant_id", "client_id": "your_client_id", "client_secret": "your_client_id", "site_id": "your_tenant.sharepoint.com,your_site_collection_id,your_list_id", "list_id": "your_list_id", "unique_field": "Title", # Match the list's title field "batch_size": 20, # Items per batch } # Logging Config def log_info(message): timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') log_entry = f"[OK] [{timestamp}] | INFO | PYTHON DELETE | {message}" print(log_entry) def log_error(message): timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') log_entry = f"[X] [{timestamp}] | ERROR | PYTHON DELETE | {message}" print(log_entry)

Main Functions

authenticate() - exchanges app credentials for bearer token
get_existing_items() - fetches all ShP List items
batch_delete_items() - deletes ShP List items in batches of 20
make_request() - API wrapper with retry logic

Retry logic and delay according to exponential backoff pattern (3 retries default) and depending on API error code:

Scenario Wait Strategy Example Delays
Success (200/201/204) Return immediately -
Throttled (429) Use Retry-After header 10s (as specified by server)
Service Error (503/504) Exponential backoff (capped at 30s) 1s → 2s → 4s → 8s → 16s → 30s
Other Errors Exponential backoff (uncapped) 1s → 2s → 4s → 8s → 16s → 32s...
Max Retries Raise exception -

################################# # Definitions ################################# def authenticate(config): """Get access token for Graph API.""" token_url = f"https://login.microsoftonline.com/{config['tenant_id']}/oauth2/v2.0/token" response = requests.post( token_url, data={ "client_id": config["client_id"], "client_secret": config["client_secret"], "scope": "https://graph.microsoft.com/.default", "grant_type": "client_credentials", }, timeout=30, ) if response.status_code == 200: token = response.json().get("access_token") if token: return token raise Exception(f"Authentication failed: {response.text}") def make_request(method, url, headers, retries=3, **kwargs): """Make HTTP request""" for attempt in range(retries): try: response = requests.request(method, url, headers=headers, **kwargs) if response.status_code in [200, 201, 204]: return response # Handle throttling if response.status_code == 429: retry_after = int(response.headers.get("Retry-After", 10)) log_info(f"Throttled. Waiting {retry_after}s...") time.sleep(retry_after) continue # Handle service errors if response.status_code in [503, 504]: wait_time = min(2 ** attempt, 30) log_error(f"Service error. Retry in {wait_time}s...") time.sleep(wait_time) continue response.raise_for_status() except Exception as e: if attempt == retries - 1: raise time.sleep(2 ** attempt) raise Exception("Max retries exceeded") def get_existing_items(base_url, site_id, list_id, headers, unique_field): """Retrieve all existing items from SharePoint List.""" items = {} url = f"{base_url}/sites/{site_id}/lists/{list_id}/items?$expand=fields&$top=999" log_info("Fetching existing SharePoint items...") page_count = 0 while url: page_count += 1 response = make_request("GET", url, headers) data = response.json() # Process items for item in data.get("value", []): if unique_field in item.get("fields", {}): title = str(item["fields"][unique_field]) items[title] = item["id"] # Get next page URL url = data.get("@odata.nextLink") if url: time.sleep(0.2) # Small delay between pages log_info(f"Found {len(items)} items across {page_count} pages") return items def batch_delete_items(base_url, site_id, list_id, headers, item_ids, batch_size=20): """Delete items from SharePoint in batches using Graph API batch endpoint.""" if not item_ids: return pd.DataFrame() results = [] total_batches = (len(item_ids) + batch_size - 1) // batch_size log_info(f"Deleting {len(item_ids)} items in {total_batches} batches...") for batch_num in range(total_batches): start_idx = batch_num * batch_size end_idx = min(start_idx + batch_size, len(item_ids)) batch_ids = item_ids[start_idx:end_idx] # Build batch request batch_requests = [ { "id": str(i), "method": "DELETE", "url": f"/sites/{site_id}/lists/{list_id}/items/{item_id}", "headers": {"If-Match": "*"} } for i, item_id in enumerate(batch_ids) ] # Send batch request try: response = make_request( "POST", f"{base_url}/$batch", headers, json={"requests": batch_requests} ) batch_response = response.json() # Process responses for i, resp in enumerate(batch_response.get("responses", [])): status_code = resp.get("status", 0) results.append({ "item_id": batch_ids[i] if i < len(batch_ids) else "unknown", "status": "success" if status_code == 204 else "failed", "status_code": status_code, "batch": batch_num + 1 }) except Exception as e: log_error(f"Deletion Batch {batch_num + 1} error: {str(e)}") for item_id in batch_ids: results.append({ "item_id": item_id, "status": "failed", "status_code": 0, "error": str(e), "batch": batch_num + 1 }) # Delay in between batches if batch_num < total_batches - 1: time.sleep(0.5) return pd.DataFrame(results)

Execution

The execution part runs the functions from above, all wrapped in one big try/except block. In case of an error, it will get logged to Alteryx. An overview of successes and failures is both logged and output to the anchor 1:

################################# # Execution ################################# try: log_info("SharePoint List Cleanup - Delete Items Not in Today's Data") # Read input data from Alteryx input_data = Alteryx.read("#1") unique_field = CONFIG["unique_field"] # Validate unique field exists if unique_field not in input_data.columns: raise Exception(f"Column '{unique_field}' not found in input data") # Get unique values from today's data todays_values = set(input_data[unique_field].astype(str).unique()) log_info(f"Today's data contains {len(todays_values)} unique rows") # Authenticate log_info("Authenticating...") token = authenticate(CONFIG) # Setup headers headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", } base_url = "https://graph.microsoft.com/v1.0" # Get existing items from SharePoint existing_items = get_existing_items( base_url, CONFIG["site_id"], CONFIG["list_id"], headers, unique_field ) # Identify items to delete (exist in SharePoint but not in today's data) items_to_delete = [] for title, item_id in existing_items.items(): if title not in todays_values: items_to_delete.append(item_id) log_info(f"Items to delete: {len(items_to_delete)} of {len(existing_items)}") # Perform deletion if needed if items_to_delete: results = batch_delete_items( base_url, CONFIG["site_id"], CONFIG["list_id"], headers, items_to_delete, CONFIG["batch_size"] ) # Calculate statistics success_count = len(results[results["status"] == "success"]) failed_count = len(results[results["status"] == "failed"]) log_info(f"Deletion Complete - Success: {success_count}, Failed: {failed_count}") # Show failed items if any if failed_count > 0: failed = results[results["status"] == "failed"] log_error("Failed deletions (first 5):") for _, row in failed.head(5).iterrows(): log_error(f" Item ID {row['item_id']}: Status {row['status_code']}") # Output results overview Alteryx.write(results, 1) # Log output summary log_info(f"Items_Checked: {len(existing_items)}") log_info(f"Items_To_Delete: {len(items_to_delete)}") log_info(f"Delete_Success: {success_count}") log_info(f"Delete_Failed: {failed_count}") else: log_info("No items to delete - all SharePoint items are in today's data") # Log empty output summary log_info(f"Items_Checked: {len(existing_items)}") log_info("Items_To_Delete: 0") log_info("Delete_Success: 0") log_info("Delete_Failed: 0") except Exception as e: log_error(f"X ERROR: {str(e)}") error_df = pd.DataFrame([{ "Error": str(e), "Timestamp": datetime.now() }]) Alteryx.write(error_df, 1) raise

Script 2) Batch Upsert Operation

The first script has already removed all stale items from the ShP List. So we don't have to worry about removal from ShP anymore. So, after reading incoming Alteryx data & authenticating, this second script now

    • updates existing ShP items that match (PATCH request)
    • creates new items that don't exist in SharePoint (POST request)
    • does so in batches of 20 via Graph API $batch endpoint
    • returns detailed results per item + summary statistics to Alteryx

In case of upsert failures: The returned per-item data set (anchor 1) can be joined back to the original data in Alteryx if needed, to be run as often as needed until no rows with failure statuses are left.

The upsert script's configuration part follows the same schema as the deletion script. Additionally, a list of columns to ignore from the incoming data can be defined here. Handy if the incoming Alteryx data itself is from a ShP List. This will likely need some trial-and-error though, and this list is probably unnecessarily long:

################################# # Configuration ################################# from ayx import Alteryx import pandas as pd import numpy as np import requests import time from datetime import datetime # get from environment variables in prod CONFIG = { "tenant_id": "your_tenant_id", "client_id": "your_client_id", "client_secret": "your_client_id", "site_id": "your_tenant.sharepoint.com,your_site_collection_id,your_list_id", "list_id": "your_list_id", "unique_field": "Title", # Match the list's title field } # SharePoint internal/system columns to exclude EXCLUDED_COLUMNS = [ 'Item Type', 'Path', 'Content Type', 'Modified', 'Created', 'Created By', 'Modified By', 'ID', 'Attachments', 'Edit', '_UIVersionString', 'File Type', 'App Modified By', 'App Created By', 'Compliance Asset Id', '_ComplianceFlags', '_ComplianceTag', '_ComplianceTagUserId', '_ComplianceTagWrittenTime', '_IsRecord', 'Item Child Count', 'Folder Child Count' ] # Logging Config def log_info(message): timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') log_entry = f"[OK] [{timestamp}] | INFO | PYTHON UPSERT | {message}" print(log_entry) def log_error(message): timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') log_entry = f"[X] [{timestamp}] | ERROR | PYTHON UPSERT | {message}" print(log_entry)

Main Functions

authenticate() - same as above
get_items() - fetches all ShP List items
clean_fields_for_sharepoint() - drops unnecessary columns, handles dtypes, NULLs
batch_upsert() - upserts cleaned Alteryx fields in batches of 20, returns results df
request() - API requests with retry logic

Because this script is slightly longer, the functions are encapsulated in a class this time (SharePointSync()). The advantage is that often needed attributes don't have to be explicitly passed to all of the functions explicitly (e.g. auth header, base url, etc.). They can be set once in the init() function and referenced from anywhere in the class, using the self. prefix.

################################# # Definitions ################################# def authenticate(config): """Get access token using requests.""" token_url = f"https://login.microsoftonline.com/{config['tenant_id']}/oauth2/v2.0/token" response = requests.post( token_url, data={ "client_id": config["client_id"], "client_secret": config["client_secret"], "scope": "https://graph.microsoft.com/.default", "grant_type": "client_credentials" }, timeout=30 ) if response.status_code == 200: result = response.json() if "access_token" in result: return result["access_token"] raise Exception(f"Auth failed: {response.text}") class SharePointSync: """Production SharePoint sync for Alteryx.""" def __init__(self, token, site_id, list_id): self.headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", "Prefer": "return=representation" # Add this header for better responses } self.base = "https://graph.microsoft.com/v1.0" self.site_id = site_id self.list_id = list_id def request(self, method, url, retries=5, **kwargs): """HTTP request with retry logic.""" for i in range(retries): try: r = requests.request(method, url, headers=self.headers, **kwargs) if r.status_code in [200, 201, 204]: return r if r.status_code == 429: retry_after = int(r.headers.get('Retry-After', 10)) log_error(f"Throttled. Waiting {retry_after}s") time.sleep(retry_after) continue if r.status_code in [503, 504]: wait_time = min(2 ** i, 60) log_error(f"Service error {r.status_code}. Retry in {wait_time}s") time.sleep(wait_time) continue # Log detailed error for debugging if r.status_code == 400: log_error(f"Bad Request Error (400): {r.text}") elif r.status_code == 503: log_error(f"Service Unavailable (503): {r.text}") r.raise_for_status() except Exception as e: if i == retries - 1: raise time.sleep(min(2 ** i, 60)) raise Exception("Max retries exceeded") def get_items(self): """Get all existing items with pagination.""" items = [] url = f"{self.base}/sites/{self.site_id}/lists/{self.list_id}/items?$expand=fields&$top=999" while url: r = self.request("GET", url) data = r.json() items.extend(data.get("value", [])) url = data.get("@odata.nextLink") if url: time.sleep(0.3) return items def clean_fields_for_sharepoint(self, row_dict): """Clean and prepare fields for SharePoint API.""" cleaned = {} for key, value in row_dict.items(): # Skip excluded columns if key in EXCLUDED_COLUMNS: continue # Skip internal columns starting with underscore if key.startswith('_'): continue # Handle NaN and None values if pd.isna(value) or value is None: continue # Handle different data types if isinstance(value, bool): cleaned[key] = value elif isinstance(value, (int, float)): # Check for infinity using numpy if pd.notna(value) and not np.isinf(value): cleaned[key] = float(value) elif isinstance(value, pd.Timestamp): cleaned[key] = value.isoformat() else: # Convert to string and clean str_val = str(value).strip() if str_val and str_val.lower() not in ['nan', 'none', 'null']: cleaned[key] = str_val return cleaned def batch_upsert(self, source_df, unique_field): """Batch upsert DataFrame to SharePoint.""" log_info(f"Processing {len(source_df)} rows") log_info(f"Input columns ({len(source_df.columns)}): {list(source_df.columns)}") # Ensure unique field exists if unique_field not in source_df.columns: raise Exception(f"Unique field '{unique_field}' not found. Available: {list(source_df.columns)}") # Build lookup of existing items log_info("Fetching existing SharePoint items...") existing = self.get_items() lookup = {} for item in existing: if unique_field in item.get("fields", {}): key = str(item["fields"].get(unique_field)) lookup[key] = { "id": item["id"], "etag": item.get("eTag", item.get("@odata.etag", "*")) } log_info(f"Found {len(lookup)} existing items in SharePoint") results = [] batch_size = 20 total_batches = (len(source_df) + batch_size - 1) // batch_size for batch_num, i in enumerate(range(0, len(source_df), batch_size), 1): batch_df = source_df.iloc[i:i+batch_size] batch_requests = [] for idx, (_, row) in enumerate(batch_df.iterrows()): # Clean fields for SharePoint fields = self.clean_fields_for_sharepoint(row.to_dict()) # Get unique value for matching unique_val = str(fields.get(unique_field, "")) # Debug first item if i == 0 and idx == 0: log_info(f"First item: Title='{unique_val}'") log_info(f"Fields to send ({len(fields)}): {list(fields.keys())}") if unique_val in lookup: # Update existing item - use correct endpoint format item_info = lookup[unique_val] req = { "id": str(idx), "method": "PATCH", "url": f"/sites/{self.site_id}/lists/{self.list_id}/items/{item_info['id']}", "body": {"fields": fields}, "headers": { "Content-Type": "application/json", "If-Match": "*" # Force update regardless of eTag } } else: # Create new item req = { "id": str(idx), "method": "POST", "url": f"/sites/{self.site_id}/lists/{self.list_id}/items", "body": {"fields": fields}, "headers": {"Content-Type": "application/json"} } batch_requests.append(req) # Send batch request try: r = self.request("POST", f"{self.base}/$batch", json={"requests": batch_requests}) batch_response = r.json() # Process responses for resp in batch_response.get("responses", []): status_code = resp.get("status") error_msg = "" # Default to empty string instead of None if status_code not in [200, 201, 204]: # Parse error message error_body = resp.get("body", {}) if isinstance(error_body, dict): error = error_body.get("error", {}) error_msg = error.get("message", "Unknown error") else: error_msg = str(error_body) # Log first few errors in detail if len([r for r in results if r.get("status") == "failed"]) < 3: request_id = int(resp.get("id", 0)) if request_id < len(batch_requests): failed_req = batch_requests[request_id] method = failed_req['method'] log_error(f"\nError in {method} request (ID {request_id}):") log_error(f" Status: {status_code}") log_error(f" Message: {error_msg}") results.append({ "status": "success" if status_code in [200, 201, 204] else "failed", "id": resp.get("id"), "title": batch_df.iloc[int(resp.get("id"))][unique_field] if int(resp.get("id")) < len(batch_df) else "", "status_code": status_code, "error": error_msg if error_msg else "" }) except Exception as e: log_error(f"Batch error: {str(e)}") for req in batch_requests: results.append({ "status": "failed", "id": req["id"], "status_code": 0, "error": str(e) }) # Brief pause between batches if batch_num < total_batches: time.sleep(0.5) return pd.DataFrame(results)

Execution

After wrapping our functions in a class and dipping our toes in OOP in the last part, the execution itself is quite short and straightforward.

################################# # Execution ################################# try: log_info("SharePoint List Sync (upsert) starting") # Read input from Alteryx input_data = Alteryx.read("#1") log_info(f"Loaded {len(input_data)} rows from Alteryx") # Remove any SharePoint-specific columns that might have been included columns_to_drop = [col for col in EXCLUDED_COLUMNS if col in input_data.columns] if columns_to_drop: input_data = input_data.drop(columns=columns_to_drop) # Authenticate log_info("Authenticating...") token = authenticate(CONFIG) log_info("Authentication successful") # Initialize sync sync = SharePointSync(token, CONFIG["site_id"], CONFIG["list_id"]) # Perform batch upsert log_info("Starting sync...") results = sync.batch_upsert(input_data, CONFIG["unique_field"]) # Replace any None/NaN values in results with empty strings results = results.fillna("") # Calculate statistics success_count = len(results[results["status"] == "success"]) failed_count = len(results[results["status"] == "failed"]) log_info(f"Sync Complete - Success: {success_count}, Failed: {failed_count}") # Show specific errors if any if failed_count > 0: failed_items = results[results["status"] == "failed"] log_error("Failed items (first 5):") for _, item in failed_items.head(5).iterrows(): if item['error']: log_error(f" ID {item['id']}: {item['error']}") # Output results to Alteryx Alteryx.write(results, 1) # Also output summary summary = pd.DataFrame([{ "Status": "Completed", "TotalRows": len(input_data), "Success": success_count, "Failed": failed_count, "Timestamp": datetime.now() }]) Alteryx.write(summary, 2) except Exception as e: log_error(f"ERROR: {str(e)}") error_df = pd.DataFrame([{ "Error": str(e), "Timestamp": datetime.now() }]) Alteryx.write(error_df, 1) raise

And that's it. One could now play around with the delay and batch size variables and montior the overall script execution time. In this example, the default values have been set to avoid throttling, not optimize for execution time though. And don't forget to pass at the very least the client_secret value to the script from the environment, instead of keeping it hardcoded.

Author:
Matthias Albert
Powered by The Information Lab
1st Floor, 25 Watling Street, London, EC4M 9BR
Subscribe
to our Newsletter
Get the lastest news about The Data School and application tips
Subscribe now
© 2025 The Information Lab