Text cleanup with Python. Part 02

Text cleanup with Python. Part 02

In the previous part of the article, we talked about how to clean the text from numbers and symbols. Let’s continue cleaning data and talk about how to clean numbers from letters and symbols, as well as check email. We will also count the number of non-empty lines. In this case, the string will be empty if it contains one or fewer values. Well, in conclusion, we test what we got.

Cleaning lines from letters and symbols

If in the previous function we removed numbers and symbols, now we need to perform the opposite task. For example, you need to clear the phone number. And lead to one species to facilitate the search. So that the information is not unstructured and raw. Let’s create a function phone_normalize(phone: str) -> str, which receives a string with a phone number as input, cleans it, leads to the desired look and returns from the function. First, let’s clean the line with the number from brackets, quotation marks, etc. After that, we will check whether the line is not empty. Since it can contain not only a phone number, it is therefore simply cleared. If there is a number, check the number of characters in it. For myself, I determined that if the phone number, and we are talking about Russian numbers, has more than 11 characters, then such numbers will not be taken into account. Therefore, I check, if more, I return an empty string. If the number of characters is from 6 to 10, I check with which digit the number begins. If it’s nine, I add 7. If not, I just return the number. If there are 11 digits, then I check the first digit. If it is 8, I change it to 7. It is also necessary to take into account that not all 8 need to be replaced. Therefore, we also check the second number. And if it is nine, only then do we replace it. Well, if the number starts with 7, we return it from the function as is.

def phone_normalize(phone: str) -> str:
    phone = "".join(x for x in phone if x.isdecimal())
    if phone:
        if len(phone) > 11:
            return ""
        elif 6 <= len(phone) < 10:
            return phone
        elif len(phone) == 10:
            if phone.startswith("9"):
                return f"7{phone}"
            else:
                return phone
        elif len(phone) == 11:
            if phone.startswith("8") and phone[1] == "9":
                return f"7{phone[1:]}"
            elif phone.startswith("7"):
                return phone
            else:
                return ""
        return ""
    return ""

Cleaning and verification of email

This function does not require special comments. Here we are simply checking for a dog. If there is, we will consider it soap. No, not then. Well, there are mistakes instead of a full stop. Therefore, we exchange them for her.

def email_normalize(mail: str) -> str:
    return mail.strip().replace("/", ".") if "@" in mail else ""

Counting the number of non-empty lines
In principle, this function is not necessary at all and is needed only in the example that I want to show here. However, for the completeness of the picture, it must be created. Everything is simple here. We go through the objects in the list. And if they are not empty, we increase the counter. Then, if the counter is greater than 1, we return True, if less, False.

def count_get(items: list) -> bool:
    cnt = 0
    for item in items:
        if item.strip():
            cnt += 1
    return True if cnt > 1 else False

Checking the cleaning functions using an example CSV file

Let’s check how the created functions work. Let’s download the “.csv” file and clean it with them. And let’s write the result in a separate file.
To do this, you need to write a string handler that would open a file, read it line by line and clean it.
Let’s create a function read_files (file: str, name: str, ascii_l = True) -> None. On input, it receives the path to the “.csv” file, the file name cleared of the extension, and the ascii_l parameter with the default value. It is needed to tell the function whether to process English characters or not. After all, names are not always written in Russian.

First, open the “.csv” file. In the loop, we will iterate over the rows. In my file, the delimiter is “|”. If you have something else, a comma or a semicolon, you should specify them. We check the first line. I have it contains headers. Therefore, I immediately add them to the global, previously announced list.
In this case, we know the structure of the file in advance, so we can define the variables.
If you need to process a file in which the column structure is different, then the file should be prepared manually beforehand. So, let’s say, bring to view for processing. This applies to data sets of the same type with different numbers of columns. How to process them, we will not discuss here, but I made a small algorithm. And if necessary, I will share them with you in the next article. So write in the comments.
We unpack the line. We normalize full name, email, phone and username. We also check the length of the username. Then we check the number of non-empty variables. And if it is greater than 1, then we add the list to the global list rows_list. We print out the obtained values ​​so that it is not boring.

def read_files(file: str, name: str, ascii_l=True) -> None:
    global rows_list
    with open(file, "r", encoding="utf-8") as cs:
        for nm, row in enumerate(csv.reader(cs, delimiter="|")):
            if nm == 0:
                rows_list.append(row)
                continue
            phone, email, fio, uname = row
            fio = fio_normalize(fio, ascii_l)
            email = email_normalize(email)
            phone = phone_normalize(phone)
            uname = uname.encode().decode()
            if len(uname) > 50:
                uname = ""
            if count_get([phone, email, fio, uname]):
                rows_list.append([phone, email, fio, uname])
            else:
                continue
            print(f"\r{nm+1} | {fio} | {phone} | {email} | {uname}", end="")
            if len(rows_list) == 100000:
                with open(f"{name}_clean.csv", mode="a", encoding='utf-8', newline="") as csv_f:
                    file_writer = csv.writer(csv_f, delimiter=";")
                    file_writer.writerows(rows_list)
                rows_list.clear()

It is better to write large files in parts. Therefore, we check the number of lists in the global list. And if it is equal to 100000, we write it to the file. After that, we clear the global list for a new portion.

Query the path to the file. Main function

So, we are coming to the end of this article. Let’s create the main function. We ask the user for the path to the file. We also ask whether to transliterate or not. In this case, I omitted clarifying questions like yes or no. Because in this case it is by default. And not a single input of “n”. We check whether the file exists and that it is a file at all. It would be necessary to check whether it is “.csv” at least by extension. If the file does not exist, exit the script. If everything is fine, let’s move on. We also measure the execution time of the script.
We get the name of the file without the extension. We count the number of lines in the file and output its name and number in a message to the user. We check whether transliteration is required. Well, we pass the path to the file to the cleaning function. After running the script, check whether the global list is empty. If not, save the remaining data in a new “.csv” file. We display the execution time of the script in the terminal.

def main() -> None:
    global rows_list
    path = input("path file: >>> ")
    ascii_l = input("ascii_l: >>> ")
    if not Path(path).exists() or not path or not Path(path).is_file():
        exit(0)
    tm = time.monotonic()
    name = Path(path).name.removesuffix(Path(path).suffix)
    cnt_line = sum(1 for _ in open(path, "rb"))
    print(f"\n{Path(path).name} | Lines: {cnt_line}\n{'*' * 35}")
    if ascii_l == "n":
        read_files(path, name, ascii_l=False)
    else:
        read_files(path, name, ascii_l=True)

    if 0 < len(rows_list) < 100000:
        with open(f"{name}_clean.csv", mode="a", encoding='utf-8', newline="") as csv_f:
            file_writer = csv.writer(csv_f, delimiter=";")
            file_writer.writerows(rows_list)
        rows_list.clear()

    ch_time = (f'All complete | {(int(time.monotonic() - tm) // 3600) % 24:d} h. '
               f'{(int(time.monotonic() - tm) // 60) % 60:02d} m. {int(time.monotonic() - tm) % 60:02d} s.')
    lnt = len(ch_time)
    print(f'\n{"-" * lnt}\n{ch_time}\n{"-" * lnt}')


if __name__ == "__main__":
    main()

That’s basically all. Below I will provide the complete code of the script for cleaning. That is, what we wrote for function testing.

Full script code:

"""
pip install transliterate
"""
import csv
import string
import time
from pathlib import Path

from transliterate import translit

csv.field_size_limit(2147483647)

rows_list = []


def replacer(txt: str) -> str:
    symbols = ("ahkbtmxcepAHKBTMXCEP",
               "анквтмхсерАНКВТМХСЕР")
    tr = {ord(a): ord(b) for a, b in zip(*symbols)}
    return txt.translate(tr)


def fio_normalize(fio: str, ascii_l=True) -> str:
    if "http" in fio or "https" in fio or "Http" in fio or "Https" in fio:
        return ""
    if fio.startswith("-") or fio.endswith("-"):
        fio = fio.strip("-").strip()
    if "-" in fio:
        fio = fio.replace("-", "тирре")
    fio = "".join(x for x in fio if x.isalpha() or x == " ").strip().replace("тирре", "-")
    ascii_count = 0
    for xz in fio:
        if xz == " ":
            ascii_count += 1
        ascii_count += sum(1 for x in xz if x in string.ascii_letters)
    if ascii_l and ascii_count == len(fio):
        fio = translit(fio, "ru")
    elif ascii_l:
        temp = []
        for x in fio:
            temp.append(replacer(x)) if x in string.ascii_letters else temp.append(x)
        fio = "".join(temp)
    fio = " ".join(x.strip().capitalize() for x in fio.split())
    lst = []
    for x in fio.split():
        if "-" in x:
            lst.append("-".join(z.capitalize() for z in x.split("-")))
        else:
            lst.append(x)
    fio = " ".join(lst)
    if len(fio.split()) > 3:
        fio = " ".join(fio.split()[0:3])
    if len(fio) > 50:
        fio = fio[:51]
    return fio if fio else ""


def email_normalize(mail: str) -> str:
    return mail.strip().replace("/", ".") if "@" in mail else ""


def phone_normalize(phone: str) -> str:
    phone = "".join(x for x in phone if x.isdecimal())
    if phone:
        if len(phone) > 11:
            return ""
        elif 6 <= len(phone) < 10:
            return phone
        elif len(phone) == 10:
            if phone.startswith("9"):
                return f"7{phone}"
            else:
                return phone
        elif len(phone) == 11:
            if phone.startswith("8") and phone[1] == "9":
                return f"7{phone[1:]}"
            elif phone.startswith("7"):
                return phone
            else:
                return ""
        return ""
    return ""


def count_get(items: list) -> bool:
    cnt = 0
    for item in items:
        if item.strip():
            cnt += 1
    return True if cnt > 1 else False


def read_files(file: str, name: str, ascii_l=True) -> None:
    global rows_list
    with open(file, "r", encoding="utf-8") as cs:
        for nm, row in enumerate(csv.reader(cs, delimiter="|")):
            if nm == 0:
                rows_list.append(row)
                continue
            phone, email, fio, uname = row
            fio = fio_normalize(fio, ascii_l)
            email = email_normalize(email)
            phone = phone_normalize(phone)
            uname = uname.encode().decode()
            if len(uname) > 50:
                uname = ""
            if count_get([phone, email, fio, uname]):
                rows_list.append([phone, email, fio, uname])
            else:
                continue
            print(f"\r{nm+1} | {fio} | {phone} | {email} | {uname}", end="")
            if len(rows_list) == 100000:
                with open(f"{name}_clean.csv", mode="a", encoding='utf-8', newline="") as csv_f:
                    file_writer = csv.writer(csv_f, delimiter=";")
                    file_writer.writerows(rows_list)
                rows_list.clear()


def main() -> None:
    global rows_list
    path = input("path file: >>> ")
    ascii_l = input("ascii_l: >>> ")
    if not Path(path).exists() or not path or not Path(path).is_file():
        exit(0)
    tm = time.monotonic()
    name = Path(path).name.removesuffix(Path(path).suffix)
    cnt_line = sum(1 for _ in open(path, "rb"))
    print(f"\n{Path(path).name} | Lines: {cnt_line}\n{'*' * 35}")
    if ascii_l == "n":
        read_files(path, name, ascii_l=False)
    else:
        read_files(path, name, ascii_l=True)

    if 0 < len(rows_list) < 100000:
        with open(f"{name}_clean.csv", mode="a", encoding='utf-8', newline="") as csv_f:
            file_writer = csv.writer(csv_f, delimiter=";")
            file_writer.writerows(rows_list)
        rows_list.clear()

    ch_time = (f'All complete | {(int(time.monotonic() - tm) // 3600) % 24:d} h. '
               f'{(int(time.monotonic() - tm) // 60) % 60:02d} m. {int(time.monotonic() - tm) % 60:02d} s.')
    lnt = len(ch_time)
    print(f'\n{"-" * lnt}\n{ch_time}\n{"-" * lnt}')


if __name__ == "__main__":
    main()

Testing

Let’s run the script and specify the path to the test CSV. It contains randomly generated data. Let’s clean them using a script. For your understanding, here is a piece of the image with the phone numbers.

Phone numbers

As you can see, it is a pre-assembled salt pan. Well, what was before processing and what happened after, on the example of one line.

Row to be processed

Processed and received:

String after processing

Thus, we learned that clearing a string is not that difficult. Especially using the methods of python itself, without inventing an additional bicycle.

And that’s probably all.
Thank you for attention. I hope you find this information useful!

Subscribe to our telegram channels!

Related posts