This Python script automates the processing of staff data by generating a filtered list of users (A3) and performing Active Directory updates. It retrieves the newest staff file and related data, runs PowerShell scripts to generate intermediate user lists (A1 and A5), and merges these lists with staff data to filter out certain users. The final filtered user list (A3) is saved to a CSV file, and all relevant files are archived. After processing, the script adds the A3 users to an Active Directory group using another PowerShell script. It also logs the time taken for the entire operation.
import os
import glob
import subprocess
import shutil
import time
import pandas as pd
from datetime import datetime
def get_newest_file(directory, file_name):
"""Get the newest file from a directory."""
return max(glob.glob(os.path.join(directory, '*/')), key=os.path.getmtime) + file_name
def process_staff_files(staff_path, A1_path, A5_path, demo_path):
"""Process the staff, A1, and A5 files to generate the A3 user list."""
# Load files
staff_df = pd.read_csv(staff_path)
A1_df = pd.read_csv(A1_path)
A5_df = pd.read_csv(A5_path)
demo_df = pd.read_csv(demo_path, encoding='latin1', keep_default_na=False)
# Merge staff with A1 and A5 to filter out users
A1_merge = staff_df.merge(A1_df, left_on='AD Samaccountname', right_on='SamAccountName', how='left', indicator=True)
A1_filtered = A1_merge.loc[A1_merge['_merge'] == 'left_only'].drop('_merge', axis=1)
A5_merge = A1_filtered.merge(A5_df, left_on='AD Samaccountname', right_on='SamAccountName', how='left', indicator=True)
A3_users = A5_merge.loc[(A5_merge['_merge'] == 'left_only') & (A5_merge['AD Disabled flag'] == False)]
return A3_users
def save_and_archive_files(A3_users, archive_path):
"""Save A3 users to CSV and archive all files."""
A3_users.to_csv('A3.csv', index=False)
timestamp = datetime.now().strftime("%Y-%m-%d--%H-%M")
new_dir = os.path.join(archive_path, timestamp)
os.makedirs(new_dir, exist_ok=True)
for file_name in ['A1.csv', 'A3.csv', 'A5.csv', 'staff.csv']:
shutil.move(f'./{file_name}', new_dir)
def main():
start_time = time.time()
# Define file paths
staff_path = get_newest_file("//PATH_TO_fILE", "FILE.csv")
shutil.copy(staff_path, './FILE.csv')
demo_path = get_newest_file('//PATH_TO_FILE', 'FILE2.csv')
# Generate A1 and A5 lists using PowerShell scripts
print('....starting genA1.ps1')
subprocess.call(["powershell", ".\\ps\\genA1.ps1"], stdout=subprocess.PIPE)
print('...complete')
print('....starting genA5.ps1')
subprocess.call(["powershell", ".\\ps\\genA5.ps1"], stdout=subprocess.PIPE)
print('...complete')
# Process files and generate A3 list
A3_users = process_staff_files('staff.csv', 'A1.csv', 'A5.csv', demo_path)
# Save and archive files
archive_path = '//PATH_TO_ARCHIVE_DIR'
save_and_archive_files(A3_users, archive_path)
# Add A3 users to AD group using PowerShell script
print("...adding A3 users to AD group")
subprocess.call(["powershell", ".\\ps\\addToGroup.ps1"])
print("...finished adding users to group")
# Log the last successful run
total_time = (time.time() - start_time) / 60
print(f"Runtime: {total_time:.2f} minutes")
with open("log.txt", 'a') as log_file:
log_file.write(f"Last run: {datetime.now()}\nRun Time: {total_time:.2f} minutes\n")
if __name__ == "__main__":
main()